WASM Edge Computing: State Synchronization
WASM Edge Computing: State Synchronization
Part of: WASM Edge Computing User Guide
HeliosDB uses Conflict-free Replicated Data Types (CRDTs) for automatic, coordination-free state synchronization.
Vector Clock CRDTs
Vector clocks track causal relationships between events:
use heliosdb_wasm::edge::sync::{VectorClock, EdgeStateSynchronizer};
// Create synchronizer for this edge nodelet mut sync = EdgeStateSynchronizer::new( "edge-us-west-1".to_string(), SyncConfig::default(),);
sync.start().await?;
// Write data (increments vector clock)sync.put( "user:12345".to_string(), serde_json::to_vec(&user_data)?, Some(Duration::from_secs(3600)), // TTL).await?;
// Read data (local, <1ms)let data = sync.get("user:12345").await?;Vector Clock Implementation:
#[derive(Debug, Clone)]pub struct VectorClock { clocks: BTreeMap<String, u64>,}
impl VectorClock { // Increment for this node pub fn increment(&mut self, node_id: &str) { let counter = self.clocks.entry(node_id.to_string()).or_insert(0); *counter += 1; }
// Merge two vector clocks (takes maximum for each node) pub fn merge(&mut self, other: &VectorClock) { for (node_id, ×tamp) in &other.clocks { let entry = self.clocks.entry(node_id.clone()).or_insert(0); *entry = (*entry).max(timestamp); } }
// Check causal ordering pub fn happens_before(&self, other: &VectorClock) -> bool { let mut strictly_less = false;
for (node_id, &other_ts) in &other.clocks { let self_ts = self.clocks.get(node_id).copied().unwrap_or(0); if self_ts > other_ts { return false; // Not before } if self_ts < other_ts { strictly_less = true; } }
strictly_less }
// Detect concurrent updates pub fn concurrent_with(&self, other: &VectorClock) -> bool { !self.happens_before(other) && !other.happens_before(self) && self != other }}Consistency Models
HeliosDB supports 4 consistency models:
1. Eventual Consistency (Default)
Latency: <1ms reads, async writes Guarantee: All nodes eventually converge Use Case: High performance, tolerates temporary inconsistency
let config = SyncConfig { consistency_model: ConsistencyModel::Eventual, sync_interval: Duration::from_millis(100), ..Default::default()};
let sync = EdgeStateSynchronizer::new("node1".to_string(), config);Characteristics:
- Read from local cache (no coordination)
- Write locally, propagate asynchronously
- <100ms global propagation
- 99.9% of reads see latest data within 100ms
2. Session Consistency
Latency: <5ms reads, async writes Guarantee: Monotonic reads/writes within session Use Case: User sessions, shopping carts
let config = SyncConfig { consistency_model: ConsistencyModel::Session, ..Default::default()};
// Session state tracks read/write versionslet mut session = SessionState::new("session_abc123".to_string());
// User writes (updates write version)sync.put("cart:items".to_string(), cart_data).await?;session.update_write_version(&sync.vector_clock.read());
// User reads (must see their own writes)let data = sync.get("cart:items").await?;assert!(session.is_read_consistent(&sync.vector_clock.read()));Guarantees:
- Read Your Writes: Always see your own updates
- Monotonic Reads: Never see older data
- Monotonic Writes: Writes are ordered
- Writes Follow Reads: New writes happen after reads
3. Causal Consistency
Latency: <10ms reads, <50ms writes Guarantee: Respects causal relationships Use Case: Collaborative editing, social feeds
let config = SyncConfig { consistency_model: ConsistencyModel::Causal, ..Default::default()};
// Example: Post and comment// Postsync.put("post:123".to_string(), post_data).await?;let post_version = sync.vector_clock.read().clone();
// Comment (causally depends on post)let mut comment_version = sync.vector_clock.read().clone();comment_version.merge(&post_version);
sync.put("comment:456".to_string(), comment_data).await?;
// All nodes will see post before commentCausal Ordering:
Node A: Write(x) → Read(y) → Write(z)Node B: Read(x) → Write(y)Node C: Read(z) must see Write(x) and Write(y)4. Strong Consistency
Latency: 50-200ms (coordination required) Guarantee: Linearizable reads/writes Use Case: Financial transactions, inventory
let config = SyncConfig { consistency_model: ConsistencyModel::Strong, ..Default::default()};
// Strong consistency requires consensus// Uses Raft or Paxos for coordinationlet result = sync.put_strong( "inventory:item_123".to_string(), quantity_data,).await?;
// Guaranteed: all reads see this write or later writesTrade-offs:
- Requires coordination (slower)
- Limited to single region for <50ms
- Cross-region: 100-200ms
- Use sparingly for critical data
Conflict Resolution
When concurrent updates occur:
#[derive(Debug, Clone)]pub struct StateEntry { pub key: String, pub value: Vec<u8>, pub version: VectorClock, pub timestamp: u64, pub node_id: String,}
impl StateEntry { // Determine if this entry supersedes another pub fn supersedes(&self, other: &StateEntry) -> bool { if self.version.happens_before(&other.version) { return false; // Other is newer } if other.version.happens_before(&self.version) { return true; // This is newer } // Concurrent updates: use timestamp as tie-breaker (LWW) self.timestamp > other.timestamp }}Conflict Resolution Strategies:
- Last-Write-Wins (LWW): Default, uses timestamp
- Multi-Value Register: Keep all concurrent values
- Operational Transform: Merge concurrent edits
- Custom: Application-specific resolution
// Example: LWW Registerlet entry1 = StateEntry { key: "counter".to_string(), value: vec![42], version: clock1.clone(), timestamp: 1000, node_id: "node1".to_string(),};
let entry2 = StateEntry { key: "counter".to_string(), value: vec![43], version: clock2.clone(), timestamp: 1001, node_id: "node2".to_string(),};
// entry2 wins (later timestamp)assert!(entry2.supersedes(&entry1));Synchronization Performance
Target: <100ms global propagation
// Sync statisticslet stats = sync.get_stats().await;
println!("Total syncs: {}", stats.total_syncs);println!("Successful: {}", stats.successful_syncs);println!("Avg latency: {:.2}ms", stats.avg_sync_latency_ms);println!("Conflicts resolved: {}", stats.conflicts_resolved);println!("Data sent: {} bytes", stats.bytes_sent);Optimization Techniques:
- Delta Sync: Only send changes
let config = SyncConfig { enable_delta_sync: true, batch_size: 1000, ..Default::default()};- Compression: Reduce bandwidth
let config = SyncConfig { enable_compression: true, ..Default::default()};- Selective Sync: Subscribe to relevant partitions
// Only sync user-related datasync.subscribe_partition("users").await?;sync.subscribe_partition("sessions").await?;
// Don't sync analytics data at edgesync.unsubscribe_partition("analytics").await?;- Batching: Group updates
let config = SyncConfig { sync_interval: Duration::from_millis(100), batch_size: 1000, ..Default::default()};Measured Performance:
| Scenario | P50 | P99 | P99.9 |
|---|---|---|---|
| Same region | 15ms | 30ms | 50ms |
| Cross-continent | 50ms | 80ms | 120ms |
| Global (50 nodes) | 75ms | 95ms | 150ms |
Navigation
- Previous: Routing Strategies
- Next: Caching Strategy
- Related: Advanced Topics