Lab 07: Gossip Sync with Vector Clocks

Goal

Implement vector clocks and anti-entropy sync for eventual consistency.

Requirements

1. Vector Clock

pub struct VectorClock {
    clocks: HashMap<String, u64>,  // NodeID → sequence
}

impl VectorClock {
    pub fn get(&self, node_id: &str) -> u64 {
        *self.clocks.get(node_id).unwrap_or(&0)
    }

    pub fn increment(&mut self, node_id: &str) {
        *self.clocks.entry(node_id.to_string()).or_insert(0) += 1;
    }
    
    pub fn merge(&mut self, other: &VectorClock) {
        for (node_id, &count) in &other.clocks {
            let entry = self.clocks.entry(node_id.clone()).or_insert(0);
            *entry = (*entry).max(count);
        }
    }
    
    pub fn happens_before(&self, other: &VectorClock) -> bool {
        // Returns true if self <= other (all clocks)
        self.clocks.iter().all(|(k, &v)| {
            other.clocks.get(k).map_or(false, |&other_v| v <= other_v)
        })
    }
}

2. Gossip Entry

pub struct GossipEntry {
    pub id: String,
    pub data: Vec<u8>,
    pub vector_clock: VectorClock,
}

3. In-Memory Node

pub struct GossipNode {
    node_id: String,
    entries: HashMap<String, GossipEntry>,
    vector_clock: VectorClock,
}

impl GossipNode {
    pub fn announce(&mut self, data: Vec<u8>) -> String {
        let id = format!("{}-{}", self.node_id, self.vector_clock.get(&self.node_id));
        self.vector_clock.increment(&self.node_id);
        
        let entry = GossipEntry {
            id: id.clone(),
            data,
            vector_clock: self.vector_clock.clone(),
        };
        
        self.entries.insert(id.clone(), entry);
        id
    }
    
    pub fn pull(&mut self, entry: GossipEntry) {
        if !self.entries.contains_key(&entry.id) {
            self.vector_clock.merge(&entry.vector_clock);
            self.entries.insert(entry.id.clone(), entry);
        }
    }
    
    pub fn get_missing(&self, other_clock: &VectorClock) -> Vec<String> {
        self.entries.values()
            .filter(|e| !e.vector_clock.happens_before(other_clock))
            .map(|e| e.id.clone())
            .collect()
    }
}

Tests to Write

Test 1: Single Node Announcement

#[test]
fn test_single_node_announce() {
    let mut node = GossipNode::new("A");
    
    let id1 = node.announce(b"msg1".to_vec());
    let id2 = node.announce(b"msg2".to_vec());
    
    assert_eq!(node.entries.len(), 2);
    assert!(node.entries.contains_key(&id1));
    assert!(node.entries.contains_key(&id2));
}

Test 2: Two Node Sync

#[test]
fn test_two_node_sync() {
    let mut node_a = GossipNode::new("A");
    let mut node_b = GossipNode::new("B");
    
    // Node A announces
    let id1 = node_a.announce(b"msg1".to_vec());
    
    // Node B pulls from A
    let entry = node_a.entries.get(&id1).unwrap().clone();
    node_b.pull(entry);
    
    // Node B should have the entry
    assert!(node_b.entries.contains_key(&id1));
}

Test 3: Anti-Entropy Convergence

#[test]
fn test_anti_entropy_convergence() {
    let mut node_a = GossipNode::new("A");
    let mut node_b = GossipNode::new("B");
    
    // Node A creates 3 entries
    node_a.announce(b"a1".to_vec());
    node_a.announce(b"a2".to_vec());
    node_a.announce(b"a3".to_vec());
    
    // Node B creates 2 entries
    node_b.announce(b"b1".to_vec());
    node_b.announce(b"b2".to_vec());
    
    // Anti-entropy: A detects B is missing entries
    let missing_on_b = node_a.get_missing(&node_b.vector_clock);
    
    // B pulls missing entries from A
    for id in missing_on_b {
        let entry = node_a.entries.get(&id).unwrap().clone();
        node_b.pull(entry);
    }
    
    // B detects A is missing entries
    let missing_on_a = node_b.get_missing(&node_a.vector_clock);
    
    // A pulls missing entries from B
    for id in missing_on_a {
        let entry = node_b.entries.get(&id).unwrap().clone();
        node_a.pull(entry);
    }
    
    // Both nodes should converge
    assert_eq!(node_a.entries.len(), 5);
    assert_eq!(node_b.entries.len(), 5);
}

Test 4: Partition Healing

#[test]
fn test_partition_healing() {
    let mut node_a = GossipNode::new("A");
    let mut node_b = GossipNode::new("B");
    
    // Initial sync
    let id1 = node_a.announce(b"before-partition".to_vec());
    node_b.pull(node_a.entries.get(&id1).unwrap().clone());
    
    // PARTITION: nodes can't communicate
    
    // Node A continues
    node_a.announce(b"a-during".to_vec());
    
    // Node B continues
    node_b.announce(b"b-during".to_vec());
    
    // PARTITION HEALED: anti-entropy runs
    let missing_on_b = node_a.get_missing(&node_b.vector_clock);
    for id in missing_on_b {
        node_b.pull(node_a.entries.get(&id).unwrap().clone());
    }
    
    let missing_on_a = node_b.get_missing(&node_a.vector_clock);
    for id in missing_on_a {
        node_a.pull(node_b.entries.get(&id).unwrap().clone());
    }
    
    // Both nodes converge
    assert_eq!(node_a.entries.len(), 3);
    assert_eq!(node_b.entries.len(), 3);
}

Done When

  • Vector clocks track causality correctly
  • Nodes detect missing entries via clock comparison
  • Anti-entropy sync achieves convergence
  • Partition healing works (nodes diverge then reconverge)
  • All tests pass

Resources

  • icn-gossip/src/vector_clock.rs
  • icn/crates/icn-gossip/src/gossip.rs (anti-entropy logic)
  • Vector Clock Wikipedia