Embedded-Cloud Synchronization Protocol Specification
Embedded-Cloud Synchronization Protocol Specification
Version: 1.0 Date: November 14, 2025 Status: Design Specification Implementation: Weeks 3-5
Executive Summary
This document specifies the synchronization protocol for HeliosDB Embedded+Cloud feature (F5.1.1), enabling seamless bidirectional sync between embedded databases and cloud instances with <1s latency for datasets <10MB.
Key Requirements
- Latency: <1s for datasets <10MB
- Conflict Resolution: Vector clock-based automatic resolution
- Offline Support: Queue changes when offline, sync when reconnected
- Security: End-to-end encryption for all sync traffic
- Incremental Sync: Delta-based synchronization (only changes)
- Scalability: Support 10,000+ concurrent embedded clients per cloud instance
Protocol Overview
Architecture
┌─────────────────────┐ ┌─────────────────────┐│ Embedded Device │ │ Cloud Instance ││ (HeliosDB Lite) │ │ (HeliosDB) │├─────────────────────┤ ├─────────────────────┤│ │ │ ││ Local Database │ Sync │ Central Database ││ + Vector Clock │ <────> │ + Coordination ││ + Change Log │ TLS │ + Conflict Mgr ││ + Offline Queue │ 1.3+ │ + Distribution ││ │ │ │└─────────────────────┘ └─────────────────────┘ │ │ │ 1. SyncRequest │ │─────────────────────────────>│ │ │ │ 2. SyncResponse (Delta) │ │<─────────────────────────────│ │ │ │ 3. RowDelta (Changes) │ │─────────────────────────────>│ │ │ │ 4. Ack + Conflicts │ │<─────────────────────────────│Communication Flow
- Client → Server:
SyncRequestwith last known version + changed tables - Server → Client:
SyncResponsewith delta since last version - Client → Server:
RowDeltamessages with local changes - Server → Client:
Ackwith conflict notifications (if any) - Conflict Resolution: Automatic using vector clocks or manual intervention
Message Protocol
Binary Protocol Format
Using bincode for efficient binary serialization:
// All messages prefixed with:// - 4 bytes: Message length (u32, little-endian)// - 1 byte: Message type (u8)// - N bytes: Message payload (bincode-encoded)Message Types
#[repr(u8)]pub enum MessageType { SyncRequest = 0x01, SyncResponse = 0x02, RowDelta = 0x03, Acknowledgment = 0x04, Heartbeat = 0x05, ConflictNotification = 0x06, Error = 0xFF,}1. SyncRequest
Client initiates sync by sending current state:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct SyncRequest { /// Unique client identifier pub client_id: Uuid,
/// Last version successfully synced pub last_sync_version: u64,
/// List of tables with local changes pub changed_tables: Vec<String>,
/// Number of pending changes pub pending_changes: u32,
/// Client vector clock pub vector_clock: VectorClock,
/// Sync mode (full or incremental) pub sync_mode: SyncMode,}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]pub enum SyncMode { Incremental, // Only changes since last_sync_version Full, // Complete state transfer (recovery)}2. SyncResponse
Server responds with changes from cloud:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct SyncResponse { /// Current server version pub server_version: u64,
/// Delta changes since client's last_sync_version pub delta: Vec<RowDelta>,
/// Detected conflicts (if any) pub conflicts: Vec<Conflict>,
/// Next sync token (for resumable sync) pub continuation_token: Option<String>,
/// Server vector clock pub vector_clock: VectorClock,}3. RowDelta
Represents a single row change:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct RowDelta { /// Table name pub table: String,
/// Operation type pub operation: Operation,
/// Row identifier pub row_id: RowId,
/// Changed data (compressed) pub data: Vec<u8>,
/// Vector clock for this change pub vector_clock: VectorClock,
/// Timestamp of change pub timestamp: DateTime<Utc>,
/// Checksum for integrity pub checksum: u32,}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]pub enum Operation { Insert, Update { columns: Vec<String> }, // Only changed columns Delete,}
pub type RowId = Vec<u8>; // Primary key bytes4. Acknowledgment
Server acknowledges receipt:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct Acknowledgment { /// Version after applying changes pub new_version: u64,
/// Successfully applied changes pub applied_count: u32,
/// Failed changes (with reasons) pub failed: Vec<FailedChange>,
/// Updated vector clock pub vector_clock: VectorClock,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct FailedChange { pub row_id: RowId, pub reason: String, pub conflict: Option<Conflict>,}5. Conflict
Conflict representation:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct Conflict { pub id: Uuid, pub table: String, pub row_id: RowId, pub conflict_type: ConflictType, pub client_version: Vec<u8>, pub server_version: Vec<u8>, pub resolution: ConflictResolution,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub enum ConflictType { ConcurrentUpdate, // Both sides modified same row DeleteUpdate, // One deleted, other updated UniqueViolation, // Primary key or unique constraint}
#[derive(Debug, Clone, Serialize, Deserialize)]pub enum ConflictResolution { UseClient, // Client wins UseServer, // Server wins Merge, // Automatic merge Manual, // Requires manual intervention}Vector Clocks
Vector clocks track causality and detect conflicts:
#[derive(Debug, Clone, Default, Serialize, Deserialize)]pub struct VectorClock { /// Map of node_id → version pub clocks: HashMap<Uuid, u64>,}
impl VectorClock { /// Increment this node's clock pub fn increment(&mut self, node_id: Uuid) { *self.clocks.entry(node_id).or_insert(0) += 1; }
/// Check if this clock happens-before other pub fn happens_before(&self, other: &VectorClock) -> bool { // Returns true if all our clocks <= other's clocks // and at least one is strictly less let all_less_or_equal = self.clocks.iter().all(|(id, v)| { other.clocks.get(id).map_or(false, |ov| v <= ov) });
let at_least_one_less = self.clocks.iter().any(|(id, v)| { other.clocks.get(id).map_or(true, |ov| v < ov) });
all_less_or_equal && at_least_one_less }
/// Detect conflict (concurrent updates) pub fn conflicts_with(&self, other: &VectorClock) -> bool { !self.happens_before(other) && !other.happens_before(self) }
/// Merge two vector clocks (take max for each node) pub fn merge(&mut self, other: &VectorClock) { for (id, v) in &other.clocks { let entry = self.clocks.entry(*id).or_insert(0); *entry = (*entry).max(*v); } }}Optimizations
1. Delta Compression
Only send changed columns, not entire rows:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct CompressedDelta { pub base_version: u64, pub column_deltas: Vec<ColumnDelta>,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct ColumnDelta { pub column_name: String, pub old_value: Option<Vec<u8>>, // None = was NULL pub new_value: Option<Vec<u8>>, // None = now NULL}Space Savings:
- Full row: ~1KB
- Column delta: ~50-100 bytes (95% reduction)
2. Batch Updates
Group multiple changes into single message:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct BatchDelta { pub batch_id: Uuid, pub deltas: Vec<RowDelta>, pub compressed: bool, // Use zstd compression}Latency Improvement:
- Individual messages: 100ms × 100 changes = 10s
- Batched: 100ms × 1 batch = 100ms (100x faster)
3. Predicate Pushdown
Client specifies filter predicates to sync only relevant rows:
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct SyncFilter { pub table: String, pub predicate: String, // SQL WHERE clause pub columns: Option<Vec<String>>, // Specific columns only}
// Example: Only sync user's own dataSyncFilter { table: "orders".to_string(), predicate: "user_id = ?".to_string(), columns: Some(vec!["id", "total", "status"]),}Network Reduction:
- Without filter: 10MB entire table
- With filter: 100KB user’s data (100x reduction)
4. Compression
Use zstd for all payloads >1KB:
pub fn compress_payload(data: &[u8]) -> Vec<u8> { if data.len() < 1024 { return data.to_vec(); // Not worth compressing }
zstd::encode_all(data, 3).unwrap_or_else(|_| data.to_vec())}Typical Compression Ratios:
- JSON: 60-70% reduction
- Binary: 30-40% reduction
Performance Targets
Latency Targets
| Dataset Size | Target Latency | Strategy |
|---|---|---|
| <10KB | <100ms | Single round-trip |
| 10KB-100KB | <500ms | Batched deltas |
| 100KB-1MB | <1s | Compressed batches |
| 1MB-10MB | <5s | Chunked transfer |
| >10MB | Background | Async job |
Throughput Targets
- Sync operations/sec: 1,000+ per cloud instance
- Concurrent clients: 10,000+ per cloud instance
- Delta processing: 100,000 rows/sec
- Conflict resolution: <10ms per conflict
Bandwidth Efficiency
- Average payload: <5KB per sync
- Compression ratio: 3:1 typical
- Effective bandwidth: ~1.6KB per sync
Offline Queue
When offline, client queues changes for later sync:
pub struct OfflineQueue { db: RocksDB, max_size: usize, // Max queue size (bytes)}
impl OfflineQueue { pub fn enqueue(&mut self, delta: RowDelta) -> Result<()> { let key = format!("queue:{}:{}", delta.timestamp.timestamp(), delta.row_id); let value = bincode::serialize(&delta)?;
if self.current_size() + value.len() > self.max_size { return Err(Error::QueueFull); }
self.db.put(key, value)?; Ok(()) }
pub fn drain(&mut self) -> Result<Vec<RowDelta>> { let mut deltas = Vec::new();
for (key, value) in self.db.iterator(rocksdb::IteratorMode::Start) { let delta: RowDelta = bincode::deserialize(&value)?; deltas.push(delta); }
Ok(deltas) }}Queue Management
- Max queue size: 100MB (configurable)
- Eviction policy: FIFO when full
- Persistence: RocksDB (survives restarts)
- Retry strategy: Exponential backoff (1s, 2s, 4s, 8s, max 60s)
Security
TLS 1.3
All communication encrypted with TLS 1.3:
pub struct SyncClient { tls_config: Arc<rustls::ClientConfig>,}
impl SyncClient { pub fn new(ca_cert: &[u8]) -> Result<Self> { let mut config = rustls::ClientConfig::builder() .with_safe_defaults() .with_root_certificates(load_ca_cert(ca_cert)?) .with_no_client_auth();
Ok(Self { tls_config: Arc::new(config), }) }}End-to-End Encryption
Optional E2E encryption for data at rest:
pub struct E2EEncryption { key: [u8; 32], // AES-256 key}
impl E2EEncryption { pub fn encrypt_delta(&self, delta: &RowDelta) -> Result<EncryptedDelta> { let plaintext = bincode::serialize(delta)?; let ciphertext = aes_gcm_encrypt(&self.key, &plaintext)?;
Ok(EncryptedDelta { ciphertext, nonce: generate_nonce(), }) }}Authentication
Client authentication via JWT tokens:
pub struct SyncAuth { pub client_id: Uuid, pub jwt_token: String,}
// Include in every request headerheaders.insert("Authorization", format!("Bearer {}", auth.jwt_token));Implementation Roadmap
Week 3: Protocol + Basic Sync
Days 1-2: Core Protocol
- Implement message types and serialization
- Binary protocol with bincode
- TLS 1.3 client/server setup
Days 3-4: Basic Sync
- Incremental sync (delta-based)
- Vector clock implementation
- Offline queue
Day 5: Testing
- Unit tests for all message types
- Integration test: 2 clients syncing
- Latency measurements
Deliverables:
heliosdb-sync/src/protocol.rs(500 LOC)heliosdb-sync/src/client.rs(400 LOC)heliosdb-sync/src/server.rs(400 LOC)- Basic sync working: <5s for 1MB
Week 4: Optimizations + Conflict Resolution
Days 1-2: Optimizations
- Delta compression (column-level)
- Batch updates (100+ rows per message)
- Predicate pushdown
Days 3-4: Conflict Resolution
- Automatic conflict detection
- Resolution strategies (client-wins, server-wins, merge)
- Conflict history tracking
Day 5: Performance
- Achieve <1s for <10MB target
- Benchmark suite
- Performance report
Deliverables:
heliosdb-sync/src/compression.rs(300 LOC)heliosdb-sync/src/conflicts.rs(600 LOC)- Performance: <1s for 10MB datasets
Week 5: Production Hardening + Testing
Days 1-2: Hardening
- Error handling and retry logic
- Network resilience (reconnection)
- Monitoring and metrics
Days 3-4: Testing
- Chaos testing (network failures)
- Concurrent sync (1000+ clients)
- Data integrity verification
Day 5: Documentation
- API documentation
- User guide
- Deployment guide
Deliverables:
heliosdb-sync/tests/(2000+ LOC)- Production-ready sync engine
- Complete documentation
API Design
Client API
pub struct SyncClient { config: SyncConfig, client_id: Uuid, last_sync_version: u64, offline_queue: OfflineQueue,}
impl SyncClient { /// Start automatic background sync pub async fn start_sync(&mut self) -> Result<()> { loop { match self.sync_once().await { Ok(_) => tokio::time::sleep(self.config.sync_interval).await, Err(e) if e.is_network_error() => { // Queue changes offline self.offline_queue.flush_pending()?; tokio::time::sleep(self.config.retry_interval).await; } Err(e) => return Err(e), } } }
/// Perform single sync cycle pub async fn sync_once(&mut self) -> Result<SyncResult> { // 1. Prepare sync request let request = self.prepare_sync_request().await?;
// 2. Send to server let response = self.send_sync_request(request).await?;
// 3. Apply server changes self.apply_server_deltas(&response.delta).await?;
// 4. Send local changes let local_deltas = self.collect_local_changes().await?; let ack = self.send_deltas(local_deltas).await?;
// 5. Handle conflicts self.resolve_conflicts(&ack.failed).await?;
Ok(SyncResult { version: ack.new_version, synced_rows: local_deltas.len(), conflicts: ack.failed.len(), }) }}Server API
pub struct SyncServer { config: ServerConfig, conflict_manager: ConflictManager, change_log: ChangeLog,}
impl SyncServer { /// Handle sync request from client pub async fn handle_sync_request( &self, request: SyncRequest, ) -> Result<SyncResponse> { // 1. Validate client self.authenticate_client(&request.client_id).await?;
// 2. Get changes since last_sync_version let delta = self.change_log .get_changes_since(request.last_sync_version) .await?;
// 3. Detect conflicts let conflicts = self.conflict_manager .detect_conflicts(&request.vector_clock, &delta) .await?;
Ok(SyncResponse { server_version: self.change_log.current_version(), delta, conflicts, continuation_token: None, vector_clock: self.get_vector_clock(), }) }}Testing Strategy
Unit Tests
- Message serialization/deserialization
- Vector clock operations
- Compression/decompression
- Conflict detection
Integration Tests
- Client-server sync
- Offline queue
- Conflict resolution
- Network failures
Performance Tests
| Test | Target | Actual |
|---|---|---|
| Latency (1MB) | <1s | TBD |
| Latency (10MB) | <5s | TBD |
| Throughput | 1000 ops/s | TBD |
| Concurrent clients | 10,000 | TBD |
| Conflict resolution | <10ms | TBD |
Chaos Tests
- Network interruption during sync
- Partial message delivery
- Server restart mid-sync
- Concurrent conflicting updates
Success Criteria
- Latency <1s for datasets <10MB
- Conflict resolution >95% automatic
- Offline queue supports 100MB+ changes
- Zero data loss during network failures
- Supports 10,000+ concurrent clients
- E2E encryption optional but available
- Comprehensive test coverage (>90%)
Future Enhancements (Post-Week 5)
V2 Features
- Partial Sync: Sync only specific tables/columns
- Peer-to-Peer: Embedded devices sync directly
- Multi-Cloud: Sync across multiple cloud providers
- Real-Time: WebSocket-based push notifications
- Geo-Replication: Multi-region conflict-free sync
Advanced Conflict Resolution
- Three-way merge (common ancestor)
- Custom resolution rules
- AI-powered conflict prediction
- Manual conflict resolution UI
References
- Vector Clocks: Lamport, L. (1978). “Time, Clocks, and the Ordering of Events”
- CRDTs: Shapiro et al. (2011). “Conflict-Free Replicated Data Types”
- Operational Transformation: Ellis & Gibbs (1989)
Document Status: Draft for Review Next Review: Week 3, Day 1 Implementation Start: Week 3, Day 1