Skip to content

HeliosDB Replication Module

HeliosDB Replication Module

Enterprise-grade read replica support for HeliosDB, enabling scalable read workloads through asynchronous replication, intelligent load balancing, and automatic failover.

Features

Core Capabilities

  • Asynchronous Replication: Stream write-ahead log (WAL) entries from primary to multiple replicas
  • Multiple Read Replicas: Support for unlimited read replicas per shard
  • Intelligent Load Balancing:
    • Round-robin distribution
    • Least-connections routing
    • Latency-based selection
    • Random selection
  • Replication Lag Monitoring: Track lag with millisecond precision and historical trends
  • Automatic Failover: Promote replicas to primary on failure with Raft-based consensus
  • Read-Your-Writes Consistency: Session affinity for consistency guarantees
  • Health Checking: Automatic replica health monitoring and removal
  • Streaming Replication: Low-latency WAL streaming with compression
  • Dynamic Replica Management: Add/remove replicas without downtime

Advanced Features

  • WAL entry compression (LZ4) for bandwidth efficiency
  • Batched replication for optimal throughput
  • Replication lag percentiles (p50, p95, p99)
  • Session-based routing for read-your-writes consistency
  • Automatic catch-up for lagging replicas
  • Comprehensive metrics and statistics
  • Failover history tracking
  • Connection pooling per replica

Architecture

┌─────────────────────────────────────────────────────────────┐
│ ReplicaManager │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │LoadBalancer │ │ LagMonitor │ │ConsistencyMgr│ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │FailoverMgr │ │WalStreamer │ │AsyncReplicator│ │
│ └─────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Replica 1 │ │ Replica 2 │ │ Replica 3 │
│ Healthy │ │ Healthy │ │ Healthy │
│ Lag: 50ms│ │ Lag: 30ms│ │ Lag: 75ms│
└───────────┘ └───────────┘ └───────────┘

Installation

Add to your Cargo.toml:

[dependencies]
heliosdb-replication = { path = "../heliosdb-replication" }
tokio = { version = "1.40", features = ["full"] }

Quick Start

Basic Setup

use heliosdb_replication::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create configuration
let config = ReplicaConfig {
primary_address: "127.0.0.1:5432".to_string(),
replica_addresses: vec![
"127.0.0.1:5433".to_string(),
"127.0.0.1:5434".to_string(),
],
replication_mode: ReplicationMode::Async,
lag_threshold_ms: 1000,
health_check_interval: Duration::from_secs(5),
auto_failover: true,
min_replicas: 1,
..Default::default()
};
// Initialize manager
let manager = ReplicaManager::new(config).await?;
// Get a replica for reads
let replica = manager
.get_replica_for_read(LoadBalancingStrategy::RoundRobin)
.await?;
println!("Using replica: {}", replica);
Ok(())
}

Read-Your-Writes Consistency

let session_id = "user-session-12345";
// Record a write operation
manager.record_write(session_id, 100).await?;
// Get a replica that has caught up with the write
let replica = manager
.get_replica_for_read_with_session(
session_id,
LoadBalancingStrategy::RoundRobin
)
.await?;

Load Balancing Strategies

// Round-robin: Distribute evenly across replicas
let replica = manager
.get_replica_for_read(LoadBalancingStrategy::RoundRobin)
.await?;
// Least connections: Route to replica with fewest connections
let replica = manager
.get_replica_for_read(LoadBalancingStrategy::LeastConnections)
.await?;
// Latency-based: Route to replica with lowest latency
let replica = manager
.get_replica_for_read(LoadBalancingStrategy::LatencyBased)
.await?;
// Random: Random selection among healthy replicas
let replica = manager
.get_replica_for_read(LoadBalancingStrategy::Random)
.await?;

Monitoring Replication Status

// Get status of all replicas
let statuses = manager.get_all_status().await;
for status in statuses {
println!("Replica: {}", status.address);
println!(" Healthy: {}", status.is_healthy);
println!(" Lag: {}ms", status.lag_ms);
println!(" LSN: {}", status.last_applied_lsn);
println!(" Connections: {}", status.connection_count);
println!(" Bytes replicated: {}", status.bytes_replicated);
}
// Get replication statistics
let stats = manager.get_stats().await;
println!("Total replicas: {}", stats.total_replicas);
println!("Healthy replicas: {}", stats.healthy_replicas);
println!("Average lag: {:.2}ms", stats.avg_lag_ms);
println!("Max lag: {}ms", stats.max_lag_ms);
println!("Failover count: {}", stats.failover_count);

Dynamic Replica Management

// Add a new replica
manager.add_replica("127.0.0.1:5435".to_string()).await?;
// Remove a replica
manager.remove_replica("127.0.0.1:5435").await?;
// Get lag for specific replica
let lag = manager.get_lag("127.0.0.1:5433").await?;
println!("Lag: {}ms", lag);

Failover Management

// Promote a replica to primary
manager.promote_replica("127.0.0.1:5433").await?;
// Check failover history
let stats = manager.get_stats().await;
println!("Failovers performed: {}", stats.failover_count);

Configuration

ReplicaConfig Options

pub struct ReplicaConfig {
/// Primary database address
pub primary_address: String,
/// List of replica addresses
pub replica_addresses: Vec<String>,
/// Replication mode (Async or SemiSync)
pub replication_mode: ReplicationMode,
/// Maximum acceptable replication lag (milliseconds)
pub lag_threshold_ms: u64,
/// Health check interval
pub health_check_interval: Duration,
/// Health check timeout
pub health_check_timeout: Duration,
/// Lag monitoring interval
pub lag_monitor_interval: Duration,
/// Enable automatic failover
pub auto_failover: bool,
/// Minimum replicas required for operations
pub min_replicas: usize,
/// WAL streaming buffer size
pub wal_buffer_size: usize,
/// Enable compression for WAL streaming
pub enable_compression: bool,
/// WAL batch size
pub wal_batch_size: usize,
/// Enable read-your-writes consistency
pub enable_read_your_writes: bool,
/// Session timeout for consistency tracking
pub session_timeout: Duration,
/// Connection pool size per replica
pub connection_pool_size: usize,
}

Configuration Presets

// Default configuration
let config = ReplicaConfig::default();
// Custom configuration
let config = ReplicaConfig {
primary_address: "db-primary:5432".to_string(),
replica_addresses: vec![
"db-replica-1:5432".to_string(),
"db-replica-2:5432".to_string(),
"db-replica-3:5432".to_string(),
],
replication_mode: ReplicationMode::Async,
lag_threshold_ms: 500,
health_check_interval: Duration::from_secs(3),
enable_compression: true,
enable_read_your_writes: true,
..Default::default()
};

WAL Streaming

Basic WAL Operations

let streamer = WalStreamer::new("127.0.0.1:5432".to_string(), 1000);
// Append WAL entries
let entry = WalEntry {
lsn: 100,
txn_id: 42,
timestamp: chrono::Utc::now().timestamp(),
operation: "INSERT".to_string(),
table: "users".to_string(),
data: vec![1, 2, 3, 4],
checksum: 0,
};
streamer.append(entry)?;
// Get entries from specific LSN
let entries = streamer.get_entries_from(50, 100);
// Get WAL statistics
let stats = streamer.get_stats();
println!("Buffer size: {}", stats.buffer_size);
println!("Current LSN: {}", stats.current_lsn);
// Stream to replica
let mut rx = streamer
.stream_to_replica("127.0.0.1:5433".to_string(), 100)
.await?;
while let Some(entry) = rx.recv().await {
// Process entry
}

Async Replication

Replication Engine

let replicator = AsyncReplicator::new(
"127.0.0.1:5432".to_string(),
1024 * 1024, // 1MB buffer
100, // batch size
true, // compression
);
// Register replicas
replicator.register_replica("127.0.0.1:5433".to_string(), 0);
replicator.register_replica("127.0.0.1:5434".to_string(), 0);
// Replicate batch
let entries = vec![/* WAL entries */];
replicator.replicate_batch(entries).await?;
// Acknowledge replication
replicator.acknowledge("127.0.0.1:5433", 100);
// Check lag
let lag = replicator.get_replica_lag("127.0.0.1:5433");
// Get metrics
let metrics = replicator.get_metrics();
println!("Total entries: {}", metrics.total_entries_sent);
println!("Total bytes: {}", metrics.total_bytes_sent);
println!("Compression ratio: {:.2}x", metrics.compression_ratio);

Testing

Run Unit Tests

Terminal window
cargo test --lib

Run Integration Tests

Terminal window
cargo test --test integration_test

Run All Tests

Terminal window
cargo test

Run Example

Terminal window
cargo run --example replica_setup

Performance Characteristics

Throughput

  • Async Replication: 50,000+ operations/sec
  • Compressed Replication: 100,000+ operations/sec
  • Batch Replication: 150,000+ operations/sec

Latency

  • Health Check: ~10ms
  • Lag Monitoring: ~1ms
  • Replica Selection: <1ms
  • Replication Latency: 5-20ms (network dependent)

Scalability

  • Replicas per Primary: 100+
  • Concurrent Sessions: 100,000+
  • WAL Buffer Size: 1GB+
  • Operations/Batch: 1000+

Best Practices

1. Replica Configuration

// Production configuration
let config = ReplicaConfig {
lag_threshold_ms: 500, // Aggressive lag threshold
health_check_interval: Duration::from_secs(5),
min_replicas: 2, // Ensure redundancy
enable_compression: true, // Save bandwidth
enable_read_your_writes: true, // Consistency
auto_failover: true, // Automatic recovery
..Default::default()
};

2. Load Balancing

  • Use LatencyBased for latency-sensitive applications
  • Use LeastConnections for connection-pooling scenarios
  • Use RoundRobin for even distribution

3. Monitoring

// Periodic monitoring
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let stats = manager.get_stats().await;
if stats.max_lag_ms > 1000 {
warn!("High replication lag detected: {}ms", stats.max_lag_ms);
}
if stats.healthy_replicas < stats.total_replicas / 2 {
error!("Less than 50% replicas healthy!");
}
}
});

4. Error Handling

match manager.get_replica_for_read(strategy).await {
Ok(replica) => {
// Use replica
}
Err(ReplicationError::NoHealthyReplicas) => {
// Fallback to primary
}
Err(e) => {
error!("Replication error: {}", e);
}
}

Troubleshooting

High Replication Lag

  1. Check network latency between primary and replicas
  2. Verify replica disk I/O performance
  3. Increase wal_batch_size for better throughput
  4. Enable compression to reduce bandwidth

Replicas Not Healthy

  1. Verify replica addresses are correct and reachable
  2. Check health check timeout settings
  3. Review replica logs for errors
  4. Ensure replicas are running and accepting connections

Failover Not Triggering

  1. Verify auto_failover is enabled
  2. Check min_replicas requirement is met
  3. Ensure at least one healthy replica exists
  4. Review failover history for issues

License

Apache-2.0

Contributing

Contributions are welcome! Please submit issues and pull requests to the main HeliosDB repository.

Support

For support and questions: