Phase 7 - Pull Protocol Implementation
Date: 2025-11-11 Status: Complete ✅ Effort: ~2 hours
Overview
Completed the pull protocol (Request/Response) flow for gossip synchronization, enabling nodes to request missing entries on-demand rather than relying solely on push-based Announce messages.
Problem Statement
Phase 6 implemented the push-based gossip protocol where nodes broadcast Announce messages when they have new entries. However, the pull mechanism was incomplete:
- When receiving an Announce for a missing entry, nodes didn't request it
- When receiving a Request, nodes didn't send back the full entry
- RequestBloomFilter and RequestMissing handlers were stubs
- No bidirectional communication between GossipActor and NetworkActor
The challenge was that GossipActor had no way to send messages back - it could only receive them via handle_message().
Architecture Challenge: Circular Dependencies
The naive approach would be to add NetworkHandle to GossipActor:
pub struct GossipActor {
network_handle: Option<NetworkHandle>, // ❌ Creates circular dependency
// ...
}
This creates a circular dependency:
icn-gossipwould need to depend onicn-net(for NetworkHandle)icn-netalready depends onicn-gossip(for GossipMessage types)
Solution: Callback-Based Architecture
Instead of tight coupling, we use a callback pattern similar to the existing trust_lookup:
pub type SendMessageCallback = Arc<dyn Fn(Option<Did>, GossipMessage) + Send + Sync>;
pub struct GossipActor {
send_callback: Option<SendMessageCallback>,
// ...
}
Benefits:
- No circular dependency - gossip doesn't depend on net
- Flexible - callback can be any implementation (mock in tests, real network in production)
- Consistent - matches existing
trust_lookuppattern - Testable - easy to inject test callbacks to verify messages sent
Implementation
1. GossipActor Changes (gossip.rs:15-101)
Added callback infrastructure:
pub type SendMessageCallback = Arc<dyn Fn(Option<Did>, GossipMessage) + Send + Sync>;
pub struct GossipActor {
// ... existing fields
send_callback: Option<SendMessageCallback>,
}
impl GossipActor {
pub fn set_send_callback(&mut self, callback: SendMessageCallback) {
self.send_callback = Some(callback);
}
fn send_message(&self, recipient: Option<Did>, message: GossipMessage) {
if let Some(callback) = &self.send_callback {
callback(recipient, message);
} else {
debug!("Cannot send message - no send callback set");
}
}
}
2. Message Handler Implementation (gossip.rs:257-403)
Completed all TODO handlers:
Announce Handler
GossipMessage::Announce { hash, author, .. } => {
// Check if we already have this entry
if let Some(entries) = self.entries.get(&topic) {
if entries.contains_key(&hash) {
return Ok(()); // Already have it
}
}
// Request full entry if missing
self.send_message(Some(author), GossipMessage::Request { hash });
Ok(())
}
Request Handler
GossipMessage::Request { hash } => {
// Find entry across all topics
for (_topic_name, entries) in &self.entries {
if let Some(entry) = entries.get(&hash) {
// Send Response with the entry
self.send_message(None, GossipMessage::Response {
entry: entry.clone(),
});
return Ok(());
}
}
Ok(())
}
RequestBloomFilter Handler
GossipMessage::RequestBloomFilter { topic } => {
if let Some(filter) = self.bloom_filters.get(&topic) {
let filter_data = filter.to_data();
self.send_message(None, GossipMessage::SendBloomFilter {
topic: topic.clone(),
filter: filter_data,
});
}
Ok(())
}
RequestMissing Handler
GossipMessage::RequestMissing { hashes } => {
for hash in hashes {
for (_topic_name, entries) in &self.entries {
if let Some(entry) = entries.get(&hash) {
self.send_message(None, GossipMessage::Response {
entry: entry.clone(),
});
break;
}
}
}
Ok(())
}
3. Supervisor Integration (supervisor.rs:95-124)
Wire up the callback using NetworkHandle:
// Set send callback on gossip actor to enable request/response
{
let mut gossip = gossip_handle.write().await;
let network_handle_clone = network_handle.clone();
let own_did_clone = did.clone();
let send_callback: icn_gossip::SendMessageCallback = Arc::new(move |recipient, gossip_msg| {
let net_handle = network_handle_clone.clone();
let from_did = own_did_clone.clone();
// Spawn async task to send message
tokio::spawn(async move {
let result = if let Some(target_did) = recipient {
// Unicast
let net_msg = icn_net::NetworkMessage::gossip(from_did, Some(target_did.clone()), gossip_msg);
net_handle.send_message(target_did, net_msg).await
} else {
// Broadcast
let net_msg = icn_net::NetworkMessage::gossip(from_did, None, gossip_msg);
net_handle.broadcast(net_msg).await
};
if let Err(e) = result {
warn!("Failed to send gossip message: {}", e);
}
});
});
gossip.set_send_callback(send_callback);
}
Key points:
- Callback clones NetworkHandle and DIDs to move into closure
- Spawns async task for each send (callback itself is synchronous)
- Handles both unicast (Some(did)) and broadcast (None) routing
4. Integration Tests (gossip.rs:554-668)
Added two comprehensive tests:
test_pull_protocol_request_response
Tests the full Announce → Request → Response flow:
- Node 1 publishes entry, Node 2 doesn't have it
- Node 2 receives Announce and sends Request
- Node 1 receives Request and sends Response
- Verifies correct messages sent at each step
test_request_missing_handler
Tests RequestMissing message handling:
- Node publishes 3 entries
- Receives RequestMissing for 2 entries
- Sends 2 Response messages
- Verifies responses contain correct entries
Message Flow Example
Complete pull protocol sequence:
Node A Node B
| |
| --- Announce(hash=X, author=A) -----> |
| | (checks local store)
| | (doesn't have hash X)
| <---- Request(hash=X) ---------------- |
| |
| (finds entry X in local store) |
| --- Response(entry=X) ---------------> |
| | (stores entry X)
| | (updates bloom filter)
Design Decisions
1. Synchronous Callback with Async Spawn
The callback itself is Fn (synchronous), but spawns async tasks:
Arc<dyn Fn(Option<Did>, GossipMessage) + Send + Sync>
Why?
- GossipActor's
handle_message()is&mut self(not async) - Can't await in a sync function
- Solution: Spawn async task inside callback
Alternative considered: Make callback async Fn
- Would require
handle_message()to be async - Would propagate async through entire call chain
- More invasive change
2. Broadcast vs. Unicast
When to use each:
Unicast (Some(did)):
- Request messages (we know who has the data)
- Direct responses to specific peers
Broadcast (None):
- Response messages (we don't know who requested)
- Anti-entropy messages (everyone should see)
Future improvement: Add source DID to Request messages so Response can be unicast.
3. SendBloomFilter Limitation
Current implementation notes:
// For a full implementation, we'd need to:
// 1. Compare our bloom filter with the remote one
// 2. Identify hashes present in remote but not in ours
// 3. Request those missing hashes
//
// However, bloom filters are probabilistic - they can only tell us
// "definitely not present" or "might be present". We can't extract
// the actual hashes from a bloom filter.
//
// The proper approach is for the sender to also send their entry hashes
// or we need a different anti-entropy approach.
Why the limitation?
- Bloom filters are one-way:
insert(hash)→ can'textract_hashes() - Can only test membership:
contains(hash)→ bool
Current anti-entropy approach:
- Use
RequestMissingwith explicit hash list - Anti-entropy task broadcasts RequestBloomFilter
- Peers respond with SendBloomFilter
- Comparison happens externally, then RequestMissing sent
Testing Results
All tests passing:
$ cargo test -p icn-gossip --lib
running 20 tests
test gossip::tests::test_pull_protocol_request_response ... ok
test gossip::tests::test_request_missing_handler ... ok
[18 other tests] ... ok
test result: ok. 20 passed; 0 failed
$ cargo test -p icn-net --lib
running 15 tests
[12 tests] ... ok
[3 tests] ... ignored
test result: ok. 12 passed; 0 failed; 3 ignored
$ cargo build --release
Finished `release` profile [optimized] target(s) in 19.66s
Performance Considerations
Message Overhead
- Each Announce triggers a Request (if missing)
- Each Request triggers a Response
- For N new entries across M nodes: O(N*M) messages
Async Spawn Per Message
- Each callback spawns a tokio task
- Lightweight but not free
- Alternative: Use a channel for batching (future optimization)
Bloom Filter Efficiency
- 1% false positive rate
- ~9.6 bits per element
- For 10,000 entries: ~12 KB per topic
Limitations & Future Work
No Request Deduplication
- Multiple Announces for same hash → multiple Requests
- Could add "pending requests" cache
No Timeout on Requests
- If Request is lost, entry never arrives
- Need retry mechanism with exponential backoff
Broadcast Responses
- Response goes to all peers, not just requester
- Wastes bandwidth
- Fix: Add source DID to Request message
No Flow Control
- Large RequestMissing (1000+ hashes) → flood of Responses
- Need batching or rate limiting
Synchronous Callback
- Spawns task per message
- Could be more efficient with async callback
Files Changed
crates/icn-gossip/src/gossip.rs- Added callback, implemented handlers, tests (+115 lines)crates/icn-gossip/src/lib.rs- Exported SendMessageCallback (+1 line)crates/icn-core/src/supervisor.rs- Wired send callback (+30 lines)crates/icn-net/src/actor.rs- Fixed test signature (+1 line)crates/icn-gossip/src/vector_clock.rs- Fixed unused mut warning (-1 line)
Total: ~150 lines added/modified
Lessons Learned
Callback Pattern Wins
- Avoided circular dependency elegantly
- Similar to existing
trust_lookuppattern - Easy to test with mock callbacks
Spawn Inside Sync Callback
- Enables async operations from sync context
- Alternative to propagating async everywhere
- Common pattern in Tokio applications
Message Design Matters
- Not including source in Request → broadcast Response
- Small protocol change → big efficiency impact
Bloom Filter Tradeoffs
- Great for "do you have this?" checks
- Can't extract what's inside
- Need complementary mechanisms (RequestMissing)
Next Steps
Phase 7 remaining:
- Metrics exporter (Prometheus) ✅
- Complete pull protocol (Request/Response) ✅
- Topic subscriptions & routing
- Production hardening
- Comprehensive documentation
Update (2025-11-11): Metrics implementation complete! See 2025-11-11-metrics-implementation.md for full details.
Immediate next: Topic subscriptions & routing or Integration testing