Skip to content

Feature 04: Replica Lag-Aware Routing

Feature 04: Replica Lag-Aware Routing

Priority: High | Complexity: Medium | Phase: 1 (Foundation)


Overview

Problem Statement

In read-scaling architectures, replicas can lag behind the primary:

  • Network latency between nodes
  • Disk I/O bottlenecks on replicas
  • Heavy read workloads delaying WAL apply

Routing reads to a lagging replica causes:

  • Stale reads (user doesn’t see their recent write)
  • Inconsistent results (different replicas return different data)
  • Application bugs (assuming freshness that doesn’t exist)

Solution

Implement lag-aware routing that:

  1. Continuously monitors replication lag on all standbys
  2. Routes queries based on freshness requirements
  3. Provides “read-your-writes” guarantees
  4. Adapts routing in real-time as lag changes
┌─────────────────────────────────────────────────┐
│ LAG-AWARE ROUTER │
│ │
Query ───────────►│ ┌──────────────────────────────────────────┐ │
+ LSN hint │ │ 1. Extract freshness requirement │ │
│ │ - Max acceptable lag │ │
│ │ - Required LSN (read-your-writes) │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 2. Query Lag Monitor │ │
│ │ Primary LSN: 1000 │ │
│ │ Standby-1: 998 (200ms lag) │ │
│ │ Standby-2: 990 (1s lag) │ │
│ │ Standby-3: 950 (5s lag) │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 3. Filter by Freshness │ │
│ │ Requirement: lag < 500ms │ │
│ │ Eligible: [Standby-1] │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
┌──────────────┐
│ Standby-1 │
│ (freshest) │
└──────────────┘

Architecture

Lag Monitor

pub struct LagMonitor {
/// Current lag for each node (updated continuously)
node_lags: DashMap<NodeId, LagInfo>,
/// Primary LSN watermark
primary_lsn: AtomicU64,
/// Lag collection interval
poll_interval: Duration,
/// Connection to each node for lag queries
connections: DashMap<NodeId, Connection>,
}
#[derive(Debug, Clone)]
pub struct LagInfo {
/// Current LSN on this node
pub current_lsn: u64,
/// Lag in LSN units (bytes behind primary)
pub lag_bytes: u64,
/// Estimated lag in time
pub lag_time: Duration,
/// Last update timestamp
pub updated_at: Instant,
/// Lag trend (improving, stable, degrading)
pub trend: LagTrend,
/// Node sync mode
pub sync_mode: SyncMode,
}
#[derive(Debug, Clone, Copy)]
pub enum LagTrend {
Improving,
Stable,
Degrading,
}
impl LagMonitor {
/// Start background lag monitoring
pub async fn start(&self) {
loop {
self.update_primary_lsn().await;
self.update_all_standby_lags().await;
tokio::time::sleep(self.poll_interval).await;
}
}
async fn update_primary_lsn(&self) {
if let Some(conn) = self.connections.get("primary") {
let lsn: u64 = conn.query_one(
"SELECT pg_current_wal_lsn()::bigint",
&[]
).await.unwrap();
self.primary_lsn.store(lsn, Ordering::SeqCst);
}
}
async fn update_all_standby_lags(&self) {
for (node_id, conn) in self.connections.iter() {
if node_id.as_ref() == "primary" {
continue;
}
let row = conn.query_one(
"SELECT pg_last_wal_receive_lsn()::bigint,
pg_last_wal_replay_lsn()::bigint,
EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
",
&[]
).await;
if let Ok((receive_lsn, replay_lsn, lag_secs)) = row {
let primary_lsn = self.primary_lsn.load(Ordering::SeqCst);
let lag_bytes = primary_lsn.saturating_sub(replay_lsn);
let previous = self.node_lags.get(&node_id).map(|l| l.lag_bytes);
let trend = match previous {
Some(prev) if lag_bytes < prev => LagTrend::Improving,
Some(prev) if lag_bytes > prev => LagTrend::Degrading,
_ => LagTrend::Stable,
};
self.node_lags.insert(node_id.clone(), LagInfo {
current_lsn: replay_lsn,
lag_bytes,
lag_time: Duration::from_secs_f64(lag_secs),
updated_at: Instant::now(),
trend,
sync_mode: self.get_sync_mode(&node_id),
});
}
}
}
/// Get nodes that meet freshness requirement
pub fn get_fresh_nodes(&self, max_lag: Duration) -> Vec<NodeId> {
self.node_lags.iter()
.filter(|entry| entry.value().lag_time <= max_lag)
.map(|entry| entry.key().clone())
.collect()
}
/// Check if node has caught up to specific LSN
pub fn has_reached_lsn(&self, node_id: &str, required_lsn: u64) -> bool {
self.node_lags.get(node_id)
.map(|info| info.current_lsn >= required_lsn)
.unwrap_or(false)
}
}

Lag-Aware Router

pub struct LagAwareRouter {
lag_monitor: Arc<LagMonitor>,
nodes: Arc<NodeRegistry>,
config: LagRoutingConfig,
}
pub struct LagRoutingConfig {
/// Default max lag for reads
pub default_max_lag: Duration,
/// Max lag for "fresh" reads
pub fresh_threshold: Duration,
/// Route to primary if all standbys exceed this
pub fallback_threshold: Duration,
/// Enable read-your-writes tracking
pub read_your_writes: bool,
}
impl LagAwareRouter {
pub fn route(&self, query: &str, session: &Session) -> RoutingDecision {
// 1. Determine freshness requirement
let max_lag = self.extract_lag_requirement(query)
.unwrap_or(self.config.default_max_lag);
// 2. Check read-your-writes requirement
if let Some(required_lsn) = session.last_write_lsn {
return self.route_with_lsn_requirement(required_lsn);
}
// 3. Get nodes meeting freshness requirement
let eligible = self.lag_monitor.get_fresh_nodes(max_lag);
if eligible.is_empty() {
// All standbys too laggy, route to primary
return RoutingDecision::primary("All standbys exceed lag threshold");
}
// 4. Select best node (lowest lag, then load balance)
let best = self.select_best_node(&eligible);
RoutingDecision::standby(best, "Lag-aware selection")
}
fn route_with_lsn_requirement(&self, required_lsn: u64) -> RoutingDecision {
// Find standbys that have replayed past required LSN
let eligible: Vec<_> = self.nodes.standbys()
.filter(|n| self.lag_monitor.has_reached_lsn(&n.id, required_lsn))
.collect();
if eligible.is_empty() {
// No standby caught up yet, route to primary
return RoutingDecision::primary("Read-your-writes: no standby caught up");
}
let best = self.select_best_node(&eligible);
RoutingDecision::standby(best, "Read-your-writes satisfied")
}
fn select_best_node(&self, eligible: &[Node]) -> Node {
// Prefer lower lag, then apply load balancing
eligible.iter()
.min_by_key(|n| {
let lag = self.lag_monitor.get_lag(&n.id)
.map(|l| l.lag_time)
.unwrap_or(Duration::MAX);
(lag, n.current_connections())
})
.unwrap()
.clone()
}
}

Read-Your-Writes Tracker

pub struct ReadYourWritesTracker {
/// Session -> last write LSN
session_lsns: DashMap<SessionId, u64>,
/// LSN retention time
retention: Duration,
}
impl ReadYourWritesTracker {
/// Record that session wrote at this LSN
pub fn record_write(&self, session_id: &str, lsn: u64) {
self.session_lsns.insert(session_id.to_string(), lsn);
}
/// Get required LSN for read-your-writes
pub fn get_required_lsn(&self, session_id: &str) -> Option<u64> {
self.session_lsns.get(session_id).map(|v| *v)
}
/// Clear LSN requirement (after successful read)
pub fn clear(&self, session_id: &str) {
self.session_lsns.remove(session_id);
}
}

API Specification

Configuration (heliosproxy.toml)

[lag_routing]
enabled = true
# Lag monitoring
poll_interval = "100ms"
lag_calculation = "wal" # "wal", "time", "hybrid"
# Freshness thresholds
default_max_lag = "1s"
fresh_threshold = "100ms"
stale_threshold = "10s" # Mark node unhealthy
# Fallback behavior
fallback_to_primary = true
fallback_threshold = "5s"
# Read-your-writes
read_your_writes = true
ryw_retention = "5m"
# Per-sync-mode limits
[lag_routing.sync_modes]
sync.max_lag = "0ms"
semisync.max_lag = "500ms"
async.max_lag = "10s"

SQL Hints

-- Require fresh data (max 100ms lag)
/*helios:lag=100ms*/
SELECT * FROM users WHERE id = $1;
-- Allow stale data (analytics query)
/*helios:lag=5s*/
SELECT COUNT(*) FROM events WHERE date > $1;
-- Read-your-writes (ensure seeing own writes)
/*helios:ryw=true*/
SELECT * FROM orders WHERE user_id = $1;
-- Require specific LSN (advanced)
/*helios:lsn=1234567*/
SELECT * FROM inventory WHERE product_id = $1;

Admin API

GET /lag/status
{
"primary": {
"current_lsn": 1000000,
"wal_rate_bytes_sec": 50000
},
"standbys": [
{
"node": "standby-sync-1",
"sync_mode": "sync",
"current_lsn": 999990,
"lag_bytes": 10,
"lag_time_ms": 5,
"trend": "stable",
"healthy": true
},
{
"node": "standby-async-1",
"sync_mode": "async",
"current_lsn": 998000,
"lag_bytes": 2000,
"lag_time_ms": 150,
"trend": "improving",
"healthy": true
}
]
}
GET /lag/history?node=standby-async-1&duration=1h
# Historical lag data for graphing
{
"data_points": [
{"timestamp": "...", "lag_ms": 150},
{"timestamp": "...", "lag_ms": 160},
...
]
}
POST /lag/config
# Update lag thresholds dynamically
{ "default_max_lag": "2s" }

AI/Agent Innovations

1. Context-Aware Lag Tolerance

AI operations have different freshness needs:

pub struct AgentLagPolicy {
/// Tool-specific lag tolerances
tool_lags: HashMap<String, Duration>,
}
impl AgentLagPolicy {
pub fn get_lag_for_tool(&self, tool: &str) -> Duration {
match tool {
// Knowledge retrieval can be eventually consistent
"knowledge_search" => Duration::from_secs(60),
// User data needs to be fresh
"user_lookup" => Duration::from_millis(100),
// Conversation context needs to be current
"conversation_history" => Duration::from_millis(0),
// Default
_ => Duration::from_secs(1),
}
}
}

2. RAG Freshness Windows

Different stages of RAG have different requirements:

-- Embedding retrieval (can use older index)
/*helios:lag=5m,rag_stage=retrieval*/
SELECT chunk_id FROM embeddings
ORDER BY vector <-> $1
LIMIT 100;
-- Document fetch (needs recent content)
/*helios:lag=1s,rag_stage=fetch*/
SELECT content FROM documents
WHERE id = ANY($1);
-- Reranking metadata (needs fresh metadata)
/*helios:lag=100ms,rag_stage=rerank*/
SELECT relevance_score FROM document_scores
WHERE doc_id = ANY($1);

3. Agentic Workflow Consistency

Multi-step workflows need coordinated consistency:

pub struct WorkflowConsistency {
/// Track LSN at workflow start
start_lsn: u64,
/// Ensure all reads see at least this LSN
consistency_point: u64,
}
impl WorkflowConsistency {
pub fn begin_workflow(&mut self) {
self.start_lsn = self.get_current_lsn();
self.consistency_point = self.start_lsn;
}
pub fn record_write(&mut self, write_lsn: u64) {
self.consistency_point = write_lsn.max(self.consistency_point);
}
pub fn get_read_lsn_requirement(&self) -> u64 {
self.consistency_point
}
}

4. LLM Caching with Lag Awareness

Cache results with lag metadata:

pub struct LagAwareCache {
cache: QueryCache,
lag_monitor: Arc<LagMonitor>,
}
impl LagAwareCache {
pub fn get(&self, query: &str, max_lag: Duration) -> Option<CachedResult> {
if let Some(result) = self.cache.get(query) {
// Check if cached result is fresh enough
let cache_age = result.cached_at.elapsed();
let effective_lag = cache_age + self.get_source_lag(&result.source_node);
if effective_lag <= max_lag {
return Some(result);
}
}
None
}
}

HeliosDB-Lite Integration

1. WAL-Based Lag Calculation

Use HeliosDB-Lite’s replication LSN:

impl LagMonitor {
async fn get_lag_from_helios(&self, node_id: &str) -> Result<LagInfo> {
let conn = self.get_connection(node_id)?;
// Query HeliosDB-Lite specific replication status
let row = conn.query_one(
"SELECT * FROM helios_stat_replication WHERE node_id = $1",
&[node_id]
).await?;
Ok(LagInfo {
current_lsn: row.get("replay_lsn"),
lag_bytes: row.get("lag_bytes"),
lag_time: Duration::from_millis(row.get("lag_ms")),
sync_mode: row.get::<_, String>("sync_mode").parse()?,
..Default::default()
})
}
}

2. Sync Mode Integration

Respect HeliosDB-Lite sync mode guarantees:

impl LagAwareRouter {
fn route_by_sync_mode(&self, required_freshness: Duration) -> Vec<Node> {
// Sync standbys: guaranteed zero lag
if required_freshness == Duration::ZERO {
return self.nodes.filter(|n| n.sync_mode == SyncMode::Sync);
}
// Semi-sync: bounded lag (configurable)
if required_freshness <= self.config.semisync_max_lag {
let eligible: Vec<_> = self.nodes.filter(|n|
n.sync_mode == SyncMode::Sync ||
n.sync_mode == SyncMode::SemiSync
);
return eligible;
}
// Async: eventual consistency
self.lag_monitor.get_fresh_nodes(required_freshness)
}
}

3. TWR Lag Considerations

Transparent Write Routing adds latency:

impl LagAwareRouter {
fn adjust_for_twr(&self, node: &Node) -> Duration {
if node.supports_twr() {
// TWR writes go through standby to primary
// Add estimated TWR latency to lag calculation
let twr_latency = self.estimate_twr_latency(node);
return node.lag_time + twr_latency;
}
node.lag_time
}
}

4. Branch Replication Lag

Track lag per branch for branch-aware routing:

pub struct BranchLagMonitor {
/// Lag per (node, branch) pair
branch_lags: DashMap<(NodeId, BranchName), LagInfo>,
}
impl BranchLagMonitor {
pub fn get_branch_lag(&self, node: &str, branch: &str) -> Option<LagInfo> {
self.branch_lags.get(&(node.to_string(), branch.to_string()))
.map(|v| v.clone())
}
pub fn route_for_branch(&self, branch: &str, max_lag: Duration) -> Vec<NodeId> {
self.branch_lags.iter()
.filter(|entry| entry.key().1 == branch)
.filter(|entry| entry.value().lag_time <= max_lag)
.map(|entry| entry.key().0.clone())
.collect()
}
}

Implementation Notes

File Locations

src/proxy/
├── lag/
│ ├── mod.rs # Public API
│ ├── monitor.rs # LagMonitor implementation
│ ├── router.rs # LagAwareRouter
│ ├── ryw.rs # Read-your-writes tracker
│ └── metrics.rs # Lag metrics (Prometheus)

Key Considerations

  1. Polling vs Push: Use WAL streaming notifications when available, fall back to polling.

  2. Clock Skew: Use LSN-based lag primarily. Time-based lag requires synchronized clocks.

  3. Lag Spikes: Implement smoothing/averaging to avoid oscillating routing decisions.

  4. Cascading Standbys: Track lag through replication chains.

  5. Metrics: Expose lag histograms for monitoring and alerting.

Lag Calculation Methods

pub enum LagCalculation {
/// WAL-based (LSN difference)
Wal {
bytes_per_second: u64, // For time estimation
},
/// Time-based (last transaction timestamp)
Time,
/// Hybrid (use both, take max)
Hybrid,
}
impl LagMonitor {
fn calculate_lag(&self, method: LagCalculation, info: &RawLagInfo) -> Duration {
match method {
LagCalculation::Wal { bytes_per_second } => {
Duration::from_secs_f64(
info.lag_bytes as f64 / bytes_per_second as f64
)
}
LagCalculation::Time => {
info.last_transaction_lag
}
LagCalculation::Hybrid => {
let wal_lag = self.calculate_lag(LagCalculation::Wal { .. }, info);
let time_lag = info.last_transaction_lag;
wal_lag.max(time_lag)
}
}
}
}

Performance Targets

MetricTargetMeasurement
Lag polling latency<10msp99
Routing decision<100μsp99 (with lag lookup)
Lag data freshness<200msstaleness
RYW lookup<10μsp99