HeliosDB Replication Module
HeliosDB Replication Module
Enterprise-grade read replica support for HeliosDB, enabling scalable read workloads through asynchronous replication, intelligent load balancing, and automatic failover.
Features
Core Capabilities
- Asynchronous Replication: Stream write-ahead log (WAL) entries from primary to multiple replicas
- Multiple Read Replicas: Support for unlimited read replicas per shard
- Intelligent Load Balancing:
- Round-robin distribution
- Least-connections routing
- Latency-based selection
- Random selection
- Replication Lag Monitoring: Track lag with millisecond precision and historical trends
- Automatic Failover: Promote replicas to primary on failure with Raft-based consensus
- Read-Your-Writes Consistency: Session affinity for consistency guarantees
- Health Checking: Automatic replica health monitoring and removal
- Streaming Replication: Low-latency WAL streaming with compression
- Dynamic Replica Management: Add/remove replicas without downtime
Advanced Features
- WAL entry compression (LZ4) for bandwidth efficiency
- Batched replication for optimal throughput
- Replication lag percentiles (p50, p95, p99)
- Session-based routing for read-your-writes consistency
- Automatic catch-up for lagging replicas
- Comprehensive metrics and statistics
- Failover history tracking
- Connection pooling per replica
Architecture
┌─────────────────────────────────────────────────────────────┐│ ReplicaManager ││ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │LoadBalancer │ │ LagMonitor │ │ConsistencyMgr│ ││ └─────────────┘ └──────────────┘ └──────────────┘ ││ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │FailoverMgr │ │WalStreamer │ │AsyncReplicator│ ││ └─────────────┘ └──────────────┘ └──────────────┘ │└─────────────────────────────────────────────────────────────┘ │ ┌─────────────────┼─────────────────┐ ▼ ▼ ▼ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ Replica 1 │ │ Replica 2 │ │ Replica 3 │ │ Healthy │ │ Healthy │ │ Healthy │ │ Lag: 50ms│ │ Lag: 30ms│ │ Lag: 75ms│ └───────────┘ └───────────┘ └───────────┘Installation
Add to your Cargo.toml:
[dependencies]heliosdb-replication = { path = "../heliosdb-replication" }tokio = { version = "1.40", features = ["full"] }Quick Start
Basic Setup
use heliosdb_replication::*;use std::time::Duration;
#[tokio::main]async fn main() -> anyhow::Result<()> { // Create configuration let config = ReplicaConfig { primary_address: "127.0.0.1:5432".to_string(), replica_addresses: vec![ "127.0.0.1:5433".to_string(), "127.0.0.1:5434".to_string(), ], replication_mode: ReplicationMode::Async, lag_threshold_ms: 1000, health_check_interval: Duration::from_secs(5), auto_failover: true, min_replicas: 1, ..Default::default() };
// Initialize manager let manager = ReplicaManager::new(config).await?;
// Get a replica for reads let replica = manager .get_replica_for_read(LoadBalancingStrategy::RoundRobin) .await?;
println!("Using replica: {}", replica);
Ok(())}Read-Your-Writes Consistency
let session_id = "user-session-12345";
// Record a write operationmanager.record_write(session_id, 100).await?;
// Get a replica that has caught up with the writelet replica = manager .get_replica_for_read_with_session( session_id, LoadBalancingStrategy::RoundRobin ) .await?;Load Balancing Strategies
// Round-robin: Distribute evenly across replicaslet replica = manager .get_replica_for_read(LoadBalancingStrategy::RoundRobin) .await?;
// Least connections: Route to replica with fewest connectionslet replica = manager .get_replica_for_read(LoadBalancingStrategy::LeastConnections) .await?;
// Latency-based: Route to replica with lowest latencylet replica = manager .get_replica_for_read(LoadBalancingStrategy::LatencyBased) .await?;
// Random: Random selection among healthy replicaslet replica = manager .get_replica_for_read(LoadBalancingStrategy::Random) .await?;Monitoring Replication Status
// Get status of all replicaslet statuses = manager.get_all_status().await;for status in statuses { println!("Replica: {}", status.address); println!(" Healthy: {}", status.is_healthy); println!(" Lag: {}ms", status.lag_ms); println!(" LSN: {}", status.last_applied_lsn); println!(" Connections: {}", status.connection_count); println!(" Bytes replicated: {}", status.bytes_replicated);}
// Get replication statisticslet stats = manager.get_stats().await;println!("Total replicas: {}", stats.total_replicas);println!("Healthy replicas: {}", stats.healthy_replicas);println!("Average lag: {:.2}ms", stats.avg_lag_ms);println!("Max lag: {}ms", stats.max_lag_ms);println!("Failover count: {}", stats.failover_count);Dynamic Replica Management
// Add a new replicamanager.add_replica("127.0.0.1:5435".to_string()).await?;
// Remove a replicamanager.remove_replica("127.0.0.1:5435").await?;
// Get lag for specific replicalet lag = manager.get_lag("127.0.0.1:5433").await?;println!("Lag: {}ms", lag);Failover Management
// Promote a replica to primarymanager.promote_replica("127.0.0.1:5433").await?;
// Check failover historylet stats = manager.get_stats().await;println!("Failovers performed: {}", stats.failover_count);Configuration
ReplicaConfig Options
pub struct ReplicaConfig { /// Primary database address pub primary_address: String,
/// List of replica addresses pub replica_addresses: Vec<String>,
/// Replication mode (Async or SemiSync) pub replication_mode: ReplicationMode,
/// Maximum acceptable replication lag (milliseconds) pub lag_threshold_ms: u64,
/// Health check interval pub health_check_interval: Duration,
/// Health check timeout pub health_check_timeout: Duration,
/// Lag monitoring interval pub lag_monitor_interval: Duration,
/// Enable automatic failover pub auto_failover: bool,
/// Minimum replicas required for operations pub min_replicas: usize,
/// WAL streaming buffer size pub wal_buffer_size: usize,
/// Enable compression for WAL streaming pub enable_compression: bool,
/// WAL batch size pub wal_batch_size: usize,
/// Enable read-your-writes consistency pub enable_read_your_writes: bool,
/// Session timeout for consistency tracking pub session_timeout: Duration,
/// Connection pool size per replica pub connection_pool_size: usize,}Configuration Presets
// Default configurationlet config = ReplicaConfig::default();
// Custom configurationlet config = ReplicaConfig { primary_address: "db-primary:5432".to_string(), replica_addresses: vec![ "db-replica-1:5432".to_string(), "db-replica-2:5432".to_string(), "db-replica-3:5432".to_string(), ], replication_mode: ReplicationMode::Async, lag_threshold_ms: 500, health_check_interval: Duration::from_secs(3), enable_compression: true, enable_read_your_writes: true, ..Default::default()};WAL Streaming
Basic WAL Operations
let streamer = WalStreamer::new("127.0.0.1:5432".to_string(), 1000);
// Append WAL entrieslet entry = WalEntry { lsn: 100, txn_id: 42, timestamp: chrono::Utc::now().timestamp(), operation: "INSERT".to_string(), table: "users".to_string(), data: vec![1, 2, 3, 4], checksum: 0,};streamer.append(entry)?;
// Get entries from specific LSNlet entries = streamer.get_entries_from(50, 100);
// Get WAL statisticslet stats = streamer.get_stats();println!("Buffer size: {}", stats.buffer_size);println!("Current LSN: {}", stats.current_lsn);
// Stream to replicalet mut rx = streamer .stream_to_replica("127.0.0.1:5433".to_string(), 100) .await?;
while let Some(entry) = rx.recv().await { // Process entry}Async Replication
Replication Engine
let replicator = AsyncReplicator::new( "127.0.0.1:5432".to_string(), 1024 * 1024, // 1MB buffer 100, // batch size true, // compression);
// Register replicasreplicator.register_replica("127.0.0.1:5433".to_string(), 0);replicator.register_replica("127.0.0.1:5434".to_string(), 0);
// Replicate batchlet entries = vec![/* WAL entries */];replicator.replicate_batch(entries).await?;
// Acknowledge replicationreplicator.acknowledge("127.0.0.1:5433", 100);
// Check laglet lag = replicator.get_replica_lag("127.0.0.1:5433");
// Get metricslet metrics = replicator.get_metrics();println!("Total entries: {}", metrics.total_entries_sent);println!("Total bytes: {}", metrics.total_bytes_sent);println!("Compression ratio: {:.2}x", metrics.compression_ratio);Testing
Run Unit Tests
cargo test --libRun Integration Tests
cargo test --test integration_testRun All Tests
cargo testRun Example
cargo run --example replica_setupPerformance Characteristics
Throughput
- Async Replication: 50,000+ operations/sec
- Compressed Replication: 100,000+ operations/sec
- Batch Replication: 150,000+ operations/sec
Latency
- Health Check: ~10ms
- Lag Monitoring: ~1ms
- Replica Selection: <1ms
- Replication Latency: 5-20ms (network dependent)
Scalability
- Replicas per Primary: 100+
- Concurrent Sessions: 100,000+
- WAL Buffer Size: 1GB+
- Operations/Batch: 1000+
Best Practices
1. Replica Configuration
// Production configurationlet config = ReplicaConfig { lag_threshold_ms: 500, // Aggressive lag threshold health_check_interval: Duration::from_secs(5), min_replicas: 2, // Ensure redundancy enable_compression: true, // Save bandwidth enable_read_your_writes: true, // Consistency auto_failover: true, // Automatic recovery ..Default::default()};2. Load Balancing
- Use LatencyBased for latency-sensitive applications
- Use LeastConnections for connection-pooling scenarios
- Use RoundRobin for even distribution
3. Monitoring
// Periodic monitoringtokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(10)); loop { interval.tick().await; let stats = manager.get_stats().await;
if stats.max_lag_ms > 1000 { warn!("High replication lag detected: {}ms", stats.max_lag_ms); }
if stats.healthy_replicas < stats.total_replicas / 2 { error!("Less than 50% replicas healthy!"); } }});4. Error Handling
match manager.get_replica_for_read(strategy).await { Ok(replica) => { // Use replica } Err(ReplicationError::NoHealthyReplicas) => { // Fallback to primary } Err(e) => { error!("Replication error: {}", e); }}Troubleshooting
High Replication Lag
- Check network latency between primary and replicas
- Verify replica disk I/O performance
- Increase
wal_batch_sizefor better throughput - Enable compression to reduce bandwidth
Replicas Not Healthy
- Verify replica addresses are correct and reachable
- Check health check timeout settings
- Review replica logs for errors
- Ensure replicas are running and accepting connections
Failover Not Triggering
- Verify
auto_failoveris enabled - Check
min_replicasrequirement is met - Ensure at least one healthy replica exists
- Review failover history for issues
License
Apache-2.0
Contributing
Contributions are welcome! Please submit issues and pull requests to the main HeliosDB repository.
Support
For support and questions:
- GitHub Issues: HeliosDB Issues
- Documentation: HeliosDB Docs