Feature 04: Replica Lag-Aware Routing
Feature 04: Replica Lag-Aware Routing
Priority: High | Complexity: Medium | Phase: 1 (Foundation)
Overview
Problem Statement
In read-scaling architectures, replicas can lag behind the primary:
- Network latency between nodes
- Disk I/O bottlenecks on replicas
- Heavy read workloads delaying WAL apply
Routing reads to a lagging replica causes:
- Stale reads (user doesn’t see their recent write)
- Inconsistent results (different replicas return different data)
- Application bugs (assuming freshness that doesn’t exist)
Solution
Implement lag-aware routing that:
- Continuously monitors replication lag on all standbys
- Routes queries based on freshness requirements
- Provides “read-your-writes” guarantees
- Adapts routing in real-time as lag changes
┌─────────────────────────────────────────────────┐ │ LAG-AWARE ROUTER │ │ │ Query ───────────►│ ┌──────────────────────────────────────────┐ │ + LSN hint │ │ 1. Extract freshness requirement │ │ │ │ - Max acceptable lag │ │ │ │ - Required LSN (read-your-writes) │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 2. Query Lag Monitor │ │ │ │ Primary LSN: 1000 │ │ │ │ Standby-1: 998 (200ms lag) │ │ │ │ Standby-2: 990 (1s lag) │ │ │ │ Standby-3: 950 (5s lag) │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 3. Filter by Freshness │ │ │ │ Requirement: lag < 500ms │ │ │ │ Eligible: [Standby-1] │ │ │ └──────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘ │ ▼ ┌──────────────┐ │ Standby-1 │ │ (freshest) │ └──────────────┘Architecture
Lag Monitor
pub struct LagMonitor { /// Current lag for each node (updated continuously) node_lags: DashMap<NodeId, LagInfo>,
/// Primary LSN watermark primary_lsn: AtomicU64,
/// Lag collection interval poll_interval: Duration,
/// Connection to each node for lag queries connections: DashMap<NodeId, Connection>,}
#[derive(Debug, Clone)]pub struct LagInfo { /// Current LSN on this node pub current_lsn: u64,
/// Lag in LSN units (bytes behind primary) pub lag_bytes: u64,
/// Estimated lag in time pub lag_time: Duration,
/// Last update timestamp pub updated_at: Instant,
/// Lag trend (improving, stable, degrading) pub trend: LagTrend,
/// Node sync mode pub sync_mode: SyncMode,}
#[derive(Debug, Clone, Copy)]pub enum LagTrend { Improving, Stable, Degrading,}
impl LagMonitor { /// Start background lag monitoring pub async fn start(&self) { loop { self.update_primary_lsn().await; self.update_all_standby_lags().await; tokio::time::sleep(self.poll_interval).await; } }
async fn update_primary_lsn(&self) { if let Some(conn) = self.connections.get("primary") { let lsn: u64 = conn.query_one( "SELECT pg_current_wal_lsn()::bigint", &[] ).await.unwrap(); self.primary_lsn.store(lsn, Ordering::SeqCst); } }
async fn update_all_standby_lags(&self) { for (node_id, conn) in self.connections.iter() { if node_id.as_ref() == "primary" { continue; }
let row = conn.query_one( "SELECT pg_last_wal_receive_lsn()::bigint, pg_last_wal_replay_lsn()::bigint, EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) ", &[] ).await;
if let Ok((receive_lsn, replay_lsn, lag_secs)) = row { let primary_lsn = self.primary_lsn.load(Ordering::SeqCst); let lag_bytes = primary_lsn.saturating_sub(replay_lsn);
let previous = self.node_lags.get(&node_id).map(|l| l.lag_bytes); let trend = match previous { Some(prev) if lag_bytes < prev => LagTrend::Improving, Some(prev) if lag_bytes > prev => LagTrend::Degrading, _ => LagTrend::Stable, };
self.node_lags.insert(node_id.clone(), LagInfo { current_lsn: replay_lsn, lag_bytes, lag_time: Duration::from_secs_f64(lag_secs), updated_at: Instant::now(), trend, sync_mode: self.get_sync_mode(&node_id), }); } } }
/// Get nodes that meet freshness requirement pub fn get_fresh_nodes(&self, max_lag: Duration) -> Vec<NodeId> { self.node_lags.iter() .filter(|entry| entry.value().lag_time <= max_lag) .map(|entry| entry.key().clone()) .collect() }
/// Check if node has caught up to specific LSN pub fn has_reached_lsn(&self, node_id: &str, required_lsn: u64) -> bool { self.node_lags.get(node_id) .map(|info| info.current_lsn >= required_lsn) .unwrap_or(false) }}Lag-Aware Router
pub struct LagAwareRouter { lag_monitor: Arc<LagMonitor>, nodes: Arc<NodeRegistry>, config: LagRoutingConfig,}
pub struct LagRoutingConfig { /// Default max lag for reads pub default_max_lag: Duration,
/// Max lag for "fresh" reads pub fresh_threshold: Duration,
/// Route to primary if all standbys exceed this pub fallback_threshold: Duration,
/// Enable read-your-writes tracking pub read_your_writes: bool,}
impl LagAwareRouter { pub fn route(&self, query: &str, session: &Session) -> RoutingDecision { // 1. Determine freshness requirement let max_lag = self.extract_lag_requirement(query) .unwrap_or(self.config.default_max_lag);
// 2. Check read-your-writes requirement if let Some(required_lsn) = session.last_write_lsn { return self.route_with_lsn_requirement(required_lsn); }
// 3. Get nodes meeting freshness requirement let eligible = self.lag_monitor.get_fresh_nodes(max_lag);
if eligible.is_empty() { // All standbys too laggy, route to primary return RoutingDecision::primary("All standbys exceed lag threshold"); }
// 4. Select best node (lowest lag, then load balance) let best = self.select_best_node(&eligible); RoutingDecision::standby(best, "Lag-aware selection") }
fn route_with_lsn_requirement(&self, required_lsn: u64) -> RoutingDecision { // Find standbys that have replayed past required LSN let eligible: Vec<_> = self.nodes.standbys() .filter(|n| self.lag_monitor.has_reached_lsn(&n.id, required_lsn)) .collect();
if eligible.is_empty() { // No standby caught up yet, route to primary return RoutingDecision::primary("Read-your-writes: no standby caught up"); }
let best = self.select_best_node(&eligible); RoutingDecision::standby(best, "Read-your-writes satisfied") }
fn select_best_node(&self, eligible: &[Node]) -> Node { // Prefer lower lag, then apply load balancing eligible.iter() .min_by_key(|n| { let lag = self.lag_monitor.get_lag(&n.id) .map(|l| l.lag_time) .unwrap_or(Duration::MAX); (lag, n.current_connections()) }) .unwrap() .clone() }}Read-Your-Writes Tracker
pub struct ReadYourWritesTracker { /// Session -> last write LSN session_lsns: DashMap<SessionId, u64>,
/// LSN retention time retention: Duration,}
impl ReadYourWritesTracker { /// Record that session wrote at this LSN pub fn record_write(&self, session_id: &str, lsn: u64) { self.session_lsns.insert(session_id.to_string(), lsn); }
/// Get required LSN for read-your-writes pub fn get_required_lsn(&self, session_id: &str) -> Option<u64> { self.session_lsns.get(session_id).map(|v| *v) }
/// Clear LSN requirement (after successful read) pub fn clear(&self, session_id: &str) { self.session_lsns.remove(session_id); }}API Specification
Configuration (heliosproxy.toml)
[lag_routing]enabled = true
# Lag monitoringpoll_interval = "100ms"lag_calculation = "wal" # "wal", "time", "hybrid"
# Freshness thresholdsdefault_max_lag = "1s"fresh_threshold = "100ms"stale_threshold = "10s" # Mark node unhealthy
# Fallback behaviorfallback_to_primary = truefallback_threshold = "5s"
# Read-your-writesread_your_writes = trueryw_retention = "5m"
# Per-sync-mode limits[lag_routing.sync_modes]sync.max_lag = "0ms"semisync.max_lag = "500ms"async.max_lag = "10s"SQL Hints
-- Require fresh data (max 100ms lag)/*helios:lag=100ms*/SELECT * FROM users WHERE id = $1;
-- Allow stale data (analytics query)/*helios:lag=5s*/SELECT COUNT(*) FROM events WHERE date > $1;
-- Read-your-writes (ensure seeing own writes)/*helios:ryw=true*/SELECT * FROM orders WHERE user_id = $1;
-- Require specific LSN (advanced)/*helios:lsn=1234567*/SELECT * FROM inventory WHERE product_id = $1;Admin API
GET /lag/status{ "primary": { "current_lsn": 1000000, "wal_rate_bytes_sec": 50000 }, "standbys": [ { "node": "standby-sync-1", "sync_mode": "sync", "current_lsn": 999990, "lag_bytes": 10, "lag_time_ms": 5, "trend": "stable", "healthy": true }, { "node": "standby-async-1", "sync_mode": "async", "current_lsn": 998000, "lag_bytes": 2000, "lag_time_ms": 150, "trend": "improving", "healthy": true } ]}
GET /lag/history?node=standby-async-1&duration=1h# Historical lag data for graphing{ "data_points": [ {"timestamp": "...", "lag_ms": 150}, {"timestamp": "...", "lag_ms": 160}, ... ]}
POST /lag/config# Update lag thresholds dynamically{ "default_max_lag": "2s" }AI/Agent Innovations
1. Context-Aware Lag Tolerance
AI operations have different freshness needs:
pub struct AgentLagPolicy { /// Tool-specific lag tolerances tool_lags: HashMap<String, Duration>,}
impl AgentLagPolicy { pub fn get_lag_for_tool(&self, tool: &str) -> Duration { match tool { // Knowledge retrieval can be eventually consistent "knowledge_search" => Duration::from_secs(60),
// User data needs to be fresh "user_lookup" => Duration::from_millis(100),
// Conversation context needs to be current "conversation_history" => Duration::from_millis(0),
// Default _ => Duration::from_secs(1), } }}2. RAG Freshness Windows
Different stages of RAG have different requirements:
-- Embedding retrieval (can use older index)/*helios:lag=5m,rag_stage=retrieval*/SELECT chunk_id FROM embeddingsORDER BY vector <-> $1LIMIT 100;
-- Document fetch (needs recent content)/*helios:lag=1s,rag_stage=fetch*/SELECT content FROM documentsWHERE id = ANY($1);
-- Reranking metadata (needs fresh metadata)/*helios:lag=100ms,rag_stage=rerank*/SELECT relevance_score FROM document_scoresWHERE doc_id = ANY($1);3. Agentic Workflow Consistency
Multi-step workflows need coordinated consistency:
pub struct WorkflowConsistency { /// Track LSN at workflow start start_lsn: u64,
/// Ensure all reads see at least this LSN consistency_point: u64,}
impl WorkflowConsistency { pub fn begin_workflow(&mut self) { self.start_lsn = self.get_current_lsn(); self.consistency_point = self.start_lsn; }
pub fn record_write(&mut self, write_lsn: u64) { self.consistency_point = write_lsn.max(self.consistency_point); }
pub fn get_read_lsn_requirement(&self) -> u64 { self.consistency_point }}4. LLM Caching with Lag Awareness
Cache results with lag metadata:
pub struct LagAwareCache { cache: QueryCache, lag_monitor: Arc<LagMonitor>,}
impl LagAwareCache { pub fn get(&self, query: &str, max_lag: Duration) -> Option<CachedResult> { if let Some(result) = self.cache.get(query) { // Check if cached result is fresh enough let cache_age = result.cached_at.elapsed(); let effective_lag = cache_age + self.get_source_lag(&result.source_node);
if effective_lag <= max_lag { return Some(result); } } None }}HeliosDB-Lite Integration
1. WAL-Based Lag Calculation
Use HeliosDB-Lite’s replication LSN:
impl LagMonitor { async fn get_lag_from_helios(&self, node_id: &str) -> Result<LagInfo> { let conn = self.get_connection(node_id)?;
// Query HeliosDB-Lite specific replication status let row = conn.query_one( "SELECT * FROM helios_stat_replication WHERE node_id = $1", &[node_id] ).await?;
Ok(LagInfo { current_lsn: row.get("replay_lsn"), lag_bytes: row.get("lag_bytes"), lag_time: Duration::from_millis(row.get("lag_ms")), sync_mode: row.get::<_, String>("sync_mode").parse()?, ..Default::default() }) }}2. Sync Mode Integration
Respect HeliosDB-Lite sync mode guarantees:
impl LagAwareRouter { fn route_by_sync_mode(&self, required_freshness: Duration) -> Vec<Node> { // Sync standbys: guaranteed zero lag if required_freshness == Duration::ZERO { return self.nodes.filter(|n| n.sync_mode == SyncMode::Sync); }
// Semi-sync: bounded lag (configurable) if required_freshness <= self.config.semisync_max_lag { let eligible: Vec<_> = self.nodes.filter(|n| n.sync_mode == SyncMode::Sync || n.sync_mode == SyncMode::SemiSync ); return eligible; }
// Async: eventual consistency self.lag_monitor.get_fresh_nodes(required_freshness) }}3. TWR Lag Considerations
Transparent Write Routing adds latency:
impl LagAwareRouter { fn adjust_for_twr(&self, node: &Node) -> Duration { if node.supports_twr() { // TWR writes go through standby to primary // Add estimated TWR latency to lag calculation let twr_latency = self.estimate_twr_latency(node); return node.lag_time + twr_latency; } node.lag_time }}4. Branch Replication Lag
Track lag per branch for branch-aware routing:
pub struct BranchLagMonitor { /// Lag per (node, branch) pair branch_lags: DashMap<(NodeId, BranchName), LagInfo>,}
impl BranchLagMonitor { pub fn get_branch_lag(&self, node: &str, branch: &str) -> Option<LagInfo> { self.branch_lags.get(&(node.to_string(), branch.to_string())) .map(|v| v.clone()) }
pub fn route_for_branch(&self, branch: &str, max_lag: Duration) -> Vec<NodeId> { self.branch_lags.iter() .filter(|entry| entry.key().1 == branch) .filter(|entry| entry.value().lag_time <= max_lag) .map(|entry| entry.key().0.clone()) .collect() }}Implementation Notes
File Locations
src/proxy/├── lag/│ ├── mod.rs # Public API│ ├── monitor.rs # LagMonitor implementation│ ├── router.rs # LagAwareRouter│ ├── ryw.rs # Read-your-writes tracker│ └── metrics.rs # Lag metrics (Prometheus)Key Considerations
-
Polling vs Push: Use WAL streaming notifications when available, fall back to polling.
-
Clock Skew: Use LSN-based lag primarily. Time-based lag requires synchronized clocks.
-
Lag Spikes: Implement smoothing/averaging to avoid oscillating routing decisions.
-
Cascading Standbys: Track lag through replication chains.
-
Metrics: Expose lag histograms for monitoring and alerting.
Lag Calculation Methods
pub enum LagCalculation { /// WAL-based (LSN difference) Wal { bytes_per_second: u64, // For time estimation },
/// Time-based (last transaction timestamp) Time,
/// Hybrid (use both, take max) Hybrid,}
impl LagMonitor { fn calculate_lag(&self, method: LagCalculation, info: &RawLagInfo) -> Duration { match method { LagCalculation::Wal { bytes_per_second } => { Duration::from_secs_f64( info.lag_bytes as f64 / bytes_per_second as f64 ) } LagCalculation::Time => { info.last_transaction_lag } LagCalculation::Hybrid => { let wal_lag = self.calculate_lag(LagCalculation::Wal { .. }, info); let time_lag = info.last_transaction_lag; wal_lag.max(time_lag) } } }}Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Lag polling latency | <10ms | p99 |
| Routing decision | <100μs | p99 (with lag lookup) |
| Lag data freshness | <200ms | staleness |
| RYW lookup | <10μs | p99 |
Related Features
- Query Routing Hints - Manual routing control
- Circuit Breaker - Handle lagging nodes
- Query Analytics - Analyze lag patterns