Feature 06: Circuit Breaker Pattern
Feature 06: Circuit Breaker Pattern
Priority: High | Complexity: Medium | Phase: 2 (Resilience)
Overview
Problem Statement
When backend nodes fail or degrade, without circuit breakers:
- Requests pile up waiting for timeouts
- Connection pools are exhausted by slow queries
- Failures cascade across the system
- Recovery is slow (all clients retry simultaneously)
Common failure modes:
- Node unresponsive (network partition)
- Node overloaded (slow responses)
- Node returning errors (disk full, OOM)
- Intermittent failures (flapping)
Solution
Implement the circuit breaker pattern at the proxy layer:
STATE MACHINE ┌──────────────────────────────────────────────────┐ │ │ │ ┌─────────┐ │ │ │ CLOSED │◄────────────────────────────────┐ │ │ └────┬────┘ │ │ │ │ failures > threshold │ │ │ ▼ │ │ │ ┌─────────┐ after cooldown ┌────────┴─┐│ │ │ OPEN │─────────────────────► │HALF-OPEN ││ │ └─────────┘ └────┬─────┘│ │ ▲ │ │ │ │ probe fails │ │ │ └──────────────────────────────────┘ │ │ success │ │ │ └──────────────────────────────────────────────────┘
CLOSED: All requests pass through, failures counted OPEN: All requests fail fast, no backend calls HALF-OPEN: Limited probe requests to test recoveryArchitecture
Circuit Breaker Core
pub struct CircuitBreaker { /// Current state state: AtomicU8,
/// Failure counter (rolling window) failure_counter: SlidingWindowCounter,
/// Success counter (for half-open validation) success_counter: AtomicU32,
/// Time when circuit opened opened_at: AtomicU64,
/// Configuration config: CircuitBreakerConfig,
/// Node this circuit protects node_id: String,
/// Event listeners listeners: Vec<Box<dyn CircuitBreakerListener>>,}
pub struct CircuitBreakerConfig { /// Failure threshold to open circuit pub failure_threshold: u32,
/// Time window for counting failures pub failure_window: Duration,
/// Time to wait before trying half-open pub cooldown: Duration,
/// Successful probes needed to close pub half_open_success_threshold: u32,
/// Max concurrent probes in half-open pub half_open_max_probes: u32,
/// What counts as failure pub failure_conditions: FailureConditions,}
#[derive(Clone)]pub struct FailureConditions { /// Timeout threshold pub timeout: Duration,
/// Error codes that count as failures pub error_codes: HashSet<String>,
/// Response time threshold (slow = failure) pub slow_threshold: Option<Duration>,
/// Ignore transient errors pub ignore_transient: bool,}
#[derive(Clone, Copy, PartialEq)]pub enum CircuitState { Closed = 0, Open = 1, HalfOpen = 2,}Circuit Breaker Implementation
impl CircuitBreaker { pub fn allow_request(&self) -> Result<RequestGuard, CircuitOpen> { match self.get_state() { CircuitState::Closed => { Ok(RequestGuard::new(self)) } CircuitState::Open => { if self.should_try_half_open() { self.transition_to_half_open(); Ok(RequestGuard::new_probe(self)) } else { Err(CircuitOpen { retry_after: self.time_until_half_open(), node_id: self.node_id.clone(), }) } } CircuitState::HalfOpen => { if self.can_probe() { Ok(RequestGuard::new_probe(self)) } else { Err(CircuitOpen { retry_after: Duration::from_millis(100), node_id: self.node_id.clone(), }) } } } }
pub fn record_success(&self) { match self.get_state() { CircuitState::Closed => { // Reset failure counter on success self.failure_counter.reset(); } CircuitState::HalfOpen => { let count = self.success_counter.fetch_add(1, Ordering::SeqCst); if count + 1 >= self.config.half_open_success_threshold { self.transition_to_closed(); } } CircuitState::Open => { // Should not happen } } }
pub fn record_failure(&self, error: &ProxyError) { if !self.is_failure(error) { return; }
match self.get_state() { CircuitState::Closed => { let count = self.failure_counter.increment(); if count >= self.config.failure_threshold { self.transition_to_open(); } } CircuitState::HalfOpen => { // Any failure in half-open goes back to open self.transition_to_open(); } CircuitState::Open => { // Already open, reset cooldown timer self.opened_at.store(now_nanos(), Ordering::SeqCst); } } }
fn transition_to_open(&self) { let prev = self.state.swap(CircuitState::Open as u8, Ordering::SeqCst); if prev != CircuitState::Open as u8 { self.opened_at.store(now_nanos(), Ordering::SeqCst); self.notify_listeners(CircuitEvent::Opened); warn!("Circuit breaker opened for node {}", self.node_id); } }
fn transition_to_half_open(&self) { self.state.store(CircuitState::HalfOpen as u8, Ordering::SeqCst); self.success_counter.store(0, Ordering::SeqCst); self.notify_listeners(CircuitEvent::HalfOpened); info!("Circuit breaker half-open for node {}", self.node_id); }
fn transition_to_closed(&self) { self.state.store(CircuitState::Closed as u8, Ordering::SeqCst); self.failure_counter.reset(); self.notify_listeners(CircuitEvent::Closed); info!("Circuit breaker closed for node {}", self.node_id); }}Circuit Breaker Manager
pub struct CircuitBreakerManager { /// Circuit breakers per node breakers: DashMap<NodeId, CircuitBreaker>,
/// Global configuration config: CircuitBreakerConfig,
/// Metrics collector metrics: Arc<CircuitMetrics>,}
impl CircuitBreakerManager { pub fn get_healthy_nodes(&self, nodes: &[Node]) -> Vec<Node> { nodes.iter() .filter(|n| { self.breakers.get(&n.id) .map(|b| b.get_state() != CircuitState::Open) .unwrap_or(true) }) .cloned() .collect() }
pub fn wrap_request<F, T>(&self, node_id: &str, f: F) -> Result<T, ProxyError> where F: FnOnce() -> Result<T, ProxyError>, { let breaker = self.breakers.entry(node_id.to_string()) .or_insert_with(|| CircuitBreaker::new(node_id, &self.config));
let guard = breaker.allow_request()?;
match f() { Ok(result) => { guard.success(); Ok(result) } Err(e) => { guard.failure(&e); Err(e) } } }}API Specification
Configuration (heliosproxy.toml)
[circuit_breaker]enabled = true
# Failure thresholdsfailure_threshold = 5 # Failures to trigger openfailure_window = "30s" # Rolling window for counting
# Recovery settingscooldown = "10s" # Wait before half-openhalf_open_success = 3 # Successes to closehalf_open_max_probes = 2 # Concurrent probes
# What counts as failure[circuit_breaker.failure_conditions]timeout = "5s"slow_threshold = "2s" # Slow responses count as failureserror_codes = ["08001", "57P01", "XX000"] # Connection, shutdown, internalignore_transient = true # Don't count transient network errors
# Per-node overrides[circuit_breaker.nodes.standby-async-1]failure_threshold = 10 # More tolerant for async standbycooldown = "30s"Admin API
GET /circuit-breaker/status{ "breakers": [ { "node": "primary", "state": "closed", "failure_count": 0, "last_failure": null, "opened_count": 5, "last_opened": "2026-01-20T10:30:00Z" }, { "node": "standby-async-1", "state": "half_open", "failure_count": 5, "last_failure": "2026-01-25T10:29:55Z", "probes_sent": 2, "probes_success": 1 } ]}
POST /circuit-breaker/force-open# Manually open circuit (for maintenance){ "node": "standby-sync-1" }
POST /circuit-breaker/force-close# Manually close circuit{ "node": "standby-sync-1" }
POST /circuit-breaker/reset# Reset all counters{ "node": "standby-async-1" }
GET /circuit-breaker/history?node=standby-async-1&duration=24h# Historical state transitions{ "transitions": [ { "timestamp": "...", "from": "closed", "to": "open", "reason": "5 failures in 30s" }, { "timestamp": "...", "from": "open", "to": "half_open", "reason": "cooldown elapsed" }, ... ]}Event Webhooks
[circuit_breaker.webhooks]url = "https://alerts.example.com/webhook"events = ["opened", "closed"]secret = "${WEBHOOK_SECRET}"{ "event": "circuit_opened", "node": "standby-sync-1", "timestamp": "2026-01-25T10:30:00Z", "failure_count": 5, "last_error": "connection timeout after 5s"}AI/Agent Innovations
1. Adaptive Failure Thresholds
Learn normal failure rates per workload:
pub struct AdaptiveCircuitBreaker { base_breaker: CircuitBreaker,
/// Historical failure rates failure_history: RollingAverage,
/// Standard deviation tracking std_dev: RollingStdDev,}
impl AdaptiveCircuitBreaker { pub fn compute_threshold(&self) -> u32 { let avg_failures = self.failure_history.average(); let std_dev = self.std_dev.value();
// Open circuit at 3 standard deviations above normal (avg_failures + 3.0 * std_dev) as u32 }}2. Workload-Aware Circuits
Different circuits for different operation types:
pub struct WorkloadCircuitBreaker { /// Circuit for embedding queries embedding_circuit: CircuitBreaker,
/// Circuit for transactional queries transactional_circuit: CircuitBreaker,
/// Circuit for analytics queries analytics_circuit: CircuitBreaker,}
impl WorkloadCircuitBreaker { pub fn get_circuit(&self, query: &str) -> &CircuitBreaker { if self.is_embedding_query(query) { &self.embedding_circuit } else if self.is_analytics_query(query) { &self.analytics_circuit } else { &self.transactional_circuit } }}3. Agent Retry Strategy
Provide retry guidance for AI agents:
pub struct AgentRetryStrategy { /// Exponential backoff with jitter pub fn get_retry_delay(&self, attempt: u32) -> Duration { let base = Duration::from_millis(100); let max = Duration::from_secs(30);
let delay = base * 2u32.pow(attempt.min(10)); let jitter = rand::random::<f64>() * 0.3;
delay.min(max).mul_f64(1.0 + jitter) }
/// Should agent retry? pub fn should_retry(&self, error: &ProxyError, attempt: u32) -> bool { if attempt >= 5 { return false; }
match error { ProxyError::CircuitOpen { .. } => true, // Retry later ProxyError::RateLimited { .. } => true, // Retry later ProxyError::Timeout { .. } => true, // May recover ProxyError::QueryError { .. } => false, // Don't retry bad queries _ => true, } }}4. Conversation Continuity
Maintain conversation state during outages:
pub struct ConversationCircuitBreaker { breaker: CircuitBreaker,
/// Cached conversation contexts context_cache: DashMap<ConversationId, ConversationContext>,}
impl ConversationCircuitBreaker { pub async fn execute_with_fallback<T>( &self, conv_id: &str, query: impl FnOnce() -> Result<T, ProxyError>, fallback: impl FnOnce(&ConversationContext) -> T, ) -> T { match self.breaker.wrap_request(query) { Ok(result) => result, Err(ProxyError::CircuitOpen { .. }) => { // Use cached context as fallback if let Some(ctx) = self.context_cache.get(conv_id) { fallback(&ctx) } else { // No cached context, must fail panic!("Circuit open and no cached context") } } Err(e) => panic!("Unexpected error: {:?}", e), } }}HeliosDB-Lite Integration
1. Sync Mode-Aware Thresholds
Different thresholds for different sync modes:
[circuit_breaker.sync_modes]# Sync standbys are critical, sensitive thresholdssync.failure_threshold = 3sync.cooldown = "5s"
# Semi-sync can tolerate more failuressemisync.failure_threshold = 5semisync.cooldown = "10s"
# Async standbys are least criticalasync.failure_threshold = 10async.cooldown = "30s"2. Replication Health Integration
Consider replication status in circuit decisions:
impl CircuitBreaker { pub fn check_replication_health(&self, lag_monitor: &LagMonitor) -> CircuitState { let lag = lag_monitor.get_lag(&self.node_id);
if let Some(lag) = lag { // If lag exceeds threshold, open circuit proactively if lag.lag_time > self.config.max_acceptable_lag { self.transition_to_open(); return CircuitState::Open; }
// If lag is growing rapidly, open circuit if lag.trend == LagTrend::Degrading && lag.lag_time > Duration::from_secs(1) { self.transition_to_open(); return CircuitState::Open; } }
self.get_state() }}3. TWR Fallback Circuit
Circuit breaker for TWR path:
pub struct TwrCircuitBreaker { /// Circuit for TWR path to primary twr_to_primary: CircuitBreaker,
/// Fallback: direct to primary direct_primary: CircuitBreaker,}
impl TwrCircuitBreaker { pub fn route_write(&self, query: &str) -> RoutingDecision { // Try TWR first if self.twr_to_primary.get_state() == CircuitState::Closed { return RoutingDecision::ThroughTwr; }
// Fall back to direct primary if self.direct_primary.get_state() == CircuitState::Closed { return RoutingDecision::DirectPrimary; }
// Both circuits open RoutingDecision::Reject }}4. Branch-Specific Circuits
Separate circuits per branch:
pub struct BranchCircuitBreakers { /// Circuit breakers per (node, branch) breakers: DashMap<(NodeId, BranchName), CircuitBreaker>,}
impl BranchCircuitBreakers { pub fn get_breaker(&self, node: &str, branch: &str) -> &CircuitBreaker { self.breakers.entry((node.to_string(), branch.to_string())) .or_insert_with(|| CircuitBreaker::new_for_branch(node, branch)) }}Implementation Notes
File Locations
src/proxy/├── circuit_breaker/│ ├── mod.rs # Public API│ ├── breaker.rs # CircuitBreaker implementation│ ├── manager.rs # CircuitBreakerManager│ ├── sliding_window.rs # Failure counter│ ├── state.rs # State machine│ └── metrics.rs # Circuit metricsKey Considerations
-
Thread Safety: Use atomic operations for state transitions.
-
Distributed Coordination: For multi-proxy setups, consider sharing circuit state via Redis.
-
Probe Traffic: Limit probe traffic in half-open state to avoid overwhelming recovering node.
-
Recovery Thundering Herd: Stagger recovery attempts across circuits.
-
Monitoring: Alert on circuit opens, track recovery times.
State Transition Diagram
failures > threshold ┌─────────────────────────────────────────────────────┐ │ │ │ ┌───────────┐ │ │ │ │ success │ │ │ CLOSED │◄──────────────────────────────┐ │ │ │ │ │ │ │ └─────┬─────┘ │ │ │ │ failures > threshold │ │ │ │ │ │ │ ▼ │ │ │ ┌───────────┐ cooldown elapsed ┌─────┴─────┐ │ │ │─────────────────────────│ │ │ │ OPEN │ │ HALF-OPEN │ │ │ │◄────────────────────────│ │ │ └───────────┘ probe failure └───────────┘ │ │ │ │ └──────────────────────────────────────────────┘ success_count >= thresholdPerformance Targets
| Metric | Target | Measurement |
|---|---|---|
| State check | <1μs | p99 |
| State transition | <10μs | p99 |
| Failure recording | <5μs | p99 |
| Memory per circuit | <1KB | per node |
Related Features
- Rate Limiting - Prevent overload
- Replica Lag-Aware Routing - Avoid lagging nodes
- Query Analytics - Identify failure patterns