Skip to content

Feature 06: Circuit Breaker Pattern

Feature 06: Circuit Breaker Pattern

Priority: High | Complexity: Medium | Phase: 2 (Resilience)


Overview

Problem Statement

When backend nodes fail or degrade, without circuit breakers:

  • Requests pile up waiting for timeouts
  • Connection pools are exhausted by slow queries
  • Failures cascade across the system
  • Recovery is slow (all clients retry simultaneously)

Common failure modes:

  • Node unresponsive (network partition)
  • Node overloaded (slow responses)
  • Node returning errors (disk full, OOM)
  • Intermittent failures (flapping)

Solution

Implement the circuit breaker pattern at the proxy layer:

STATE MACHINE
┌──────────────────────────────────────────────────┐
│ │
│ ┌─────────┐ │
│ │ CLOSED │◄────────────────────────────────┐ │
│ └────┬────┘ │ │
│ │ failures > threshold │ │
│ ▼ │ │
│ ┌─────────┐ after cooldown ┌────────┴─┐│
│ │ OPEN │─────────────────────► │HALF-OPEN ││
│ └─────────┘ └────┬─────┘│
│ ▲ │ │
│ │ probe fails │ │
│ └──────────────────────────────────┘ │
│ success │
│ │
└──────────────────────────────────────────────────┘
CLOSED: All requests pass through, failures counted
OPEN: All requests fail fast, no backend calls
HALF-OPEN: Limited probe requests to test recovery

Architecture

Circuit Breaker Core

pub struct CircuitBreaker {
/// Current state
state: AtomicU8,
/// Failure counter (rolling window)
failure_counter: SlidingWindowCounter,
/// Success counter (for half-open validation)
success_counter: AtomicU32,
/// Time when circuit opened
opened_at: AtomicU64,
/// Configuration
config: CircuitBreakerConfig,
/// Node this circuit protects
node_id: String,
/// Event listeners
listeners: Vec<Box<dyn CircuitBreakerListener>>,
}
pub struct CircuitBreakerConfig {
/// Failure threshold to open circuit
pub failure_threshold: u32,
/// Time window for counting failures
pub failure_window: Duration,
/// Time to wait before trying half-open
pub cooldown: Duration,
/// Successful probes needed to close
pub half_open_success_threshold: u32,
/// Max concurrent probes in half-open
pub half_open_max_probes: u32,
/// What counts as failure
pub failure_conditions: FailureConditions,
}
#[derive(Clone)]
pub struct FailureConditions {
/// Timeout threshold
pub timeout: Duration,
/// Error codes that count as failures
pub error_codes: HashSet<String>,
/// Response time threshold (slow = failure)
pub slow_threshold: Option<Duration>,
/// Ignore transient errors
pub ignore_transient: bool,
}
#[derive(Clone, Copy, PartialEq)]
pub enum CircuitState {
Closed = 0,
Open = 1,
HalfOpen = 2,
}

Circuit Breaker Implementation

impl CircuitBreaker {
pub fn allow_request(&self) -> Result<RequestGuard, CircuitOpen> {
match self.get_state() {
CircuitState::Closed => {
Ok(RequestGuard::new(self))
}
CircuitState::Open => {
if self.should_try_half_open() {
self.transition_to_half_open();
Ok(RequestGuard::new_probe(self))
} else {
Err(CircuitOpen {
retry_after: self.time_until_half_open(),
node_id: self.node_id.clone(),
})
}
}
CircuitState::HalfOpen => {
if self.can_probe() {
Ok(RequestGuard::new_probe(self))
} else {
Err(CircuitOpen {
retry_after: Duration::from_millis(100),
node_id: self.node_id.clone(),
})
}
}
}
}
pub fn record_success(&self) {
match self.get_state() {
CircuitState::Closed => {
// Reset failure counter on success
self.failure_counter.reset();
}
CircuitState::HalfOpen => {
let count = self.success_counter.fetch_add(1, Ordering::SeqCst);
if count + 1 >= self.config.half_open_success_threshold {
self.transition_to_closed();
}
}
CircuitState::Open => {
// Should not happen
}
}
}
pub fn record_failure(&self, error: &ProxyError) {
if !self.is_failure(error) {
return;
}
match self.get_state() {
CircuitState::Closed => {
let count = self.failure_counter.increment();
if count >= self.config.failure_threshold {
self.transition_to_open();
}
}
CircuitState::HalfOpen => {
// Any failure in half-open goes back to open
self.transition_to_open();
}
CircuitState::Open => {
// Already open, reset cooldown timer
self.opened_at.store(now_nanos(), Ordering::SeqCst);
}
}
}
fn transition_to_open(&self) {
let prev = self.state.swap(CircuitState::Open as u8, Ordering::SeqCst);
if prev != CircuitState::Open as u8 {
self.opened_at.store(now_nanos(), Ordering::SeqCst);
self.notify_listeners(CircuitEvent::Opened);
warn!("Circuit breaker opened for node {}", self.node_id);
}
}
fn transition_to_half_open(&self) {
self.state.store(CircuitState::HalfOpen as u8, Ordering::SeqCst);
self.success_counter.store(0, Ordering::SeqCst);
self.notify_listeners(CircuitEvent::HalfOpened);
info!("Circuit breaker half-open for node {}", self.node_id);
}
fn transition_to_closed(&self) {
self.state.store(CircuitState::Closed as u8, Ordering::SeqCst);
self.failure_counter.reset();
self.notify_listeners(CircuitEvent::Closed);
info!("Circuit breaker closed for node {}", self.node_id);
}
}

Circuit Breaker Manager

pub struct CircuitBreakerManager {
/// Circuit breakers per node
breakers: DashMap<NodeId, CircuitBreaker>,
/// Global configuration
config: CircuitBreakerConfig,
/// Metrics collector
metrics: Arc<CircuitMetrics>,
}
impl CircuitBreakerManager {
pub fn get_healthy_nodes(&self, nodes: &[Node]) -> Vec<Node> {
nodes.iter()
.filter(|n| {
self.breakers.get(&n.id)
.map(|b| b.get_state() != CircuitState::Open)
.unwrap_or(true)
})
.cloned()
.collect()
}
pub fn wrap_request<F, T>(&self, node_id: &str, f: F) -> Result<T, ProxyError>
where
F: FnOnce() -> Result<T, ProxyError>,
{
let breaker = self.breakers.entry(node_id.to_string())
.or_insert_with(|| CircuitBreaker::new(node_id, &self.config));
let guard = breaker.allow_request()?;
match f() {
Ok(result) => {
guard.success();
Ok(result)
}
Err(e) => {
guard.failure(&e);
Err(e)
}
}
}
}

API Specification

Configuration (heliosproxy.toml)

[circuit_breaker]
enabled = true
# Failure thresholds
failure_threshold = 5 # Failures to trigger open
failure_window = "30s" # Rolling window for counting
# Recovery settings
cooldown = "10s" # Wait before half-open
half_open_success = 3 # Successes to close
half_open_max_probes = 2 # Concurrent probes
# What counts as failure
[circuit_breaker.failure_conditions]
timeout = "5s"
slow_threshold = "2s" # Slow responses count as failures
error_codes = ["08001", "57P01", "XX000"] # Connection, shutdown, internal
ignore_transient = true # Don't count transient network errors
# Per-node overrides
[circuit_breaker.nodes.standby-async-1]
failure_threshold = 10 # More tolerant for async standby
cooldown = "30s"

Admin API

GET /circuit-breaker/status
{
"breakers": [
{
"node": "primary",
"state": "closed",
"failure_count": 0,
"last_failure": null,
"opened_count": 5,
"last_opened": "2026-01-20T10:30:00Z"
},
{
"node": "standby-async-1",
"state": "half_open",
"failure_count": 5,
"last_failure": "2026-01-25T10:29:55Z",
"probes_sent": 2,
"probes_success": 1
}
]
}
POST /circuit-breaker/force-open
# Manually open circuit (for maintenance)
{ "node": "standby-sync-1" }
POST /circuit-breaker/force-close
# Manually close circuit
{ "node": "standby-sync-1" }
POST /circuit-breaker/reset
# Reset all counters
{ "node": "standby-async-1" }
GET /circuit-breaker/history?node=standby-async-1&duration=24h
# Historical state transitions
{
"transitions": [
{ "timestamp": "...", "from": "closed", "to": "open", "reason": "5 failures in 30s" },
{ "timestamp": "...", "from": "open", "to": "half_open", "reason": "cooldown elapsed" },
...
]
}

Event Webhooks

[circuit_breaker.webhooks]
url = "https://alerts.example.com/webhook"
events = ["opened", "closed"]
secret = "${WEBHOOK_SECRET}"
{
"event": "circuit_opened",
"node": "standby-sync-1",
"timestamp": "2026-01-25T10:30:00Z",
"failure_count": 5,
"last_error": "connection timeout after 5s"
}

AI/Agent Innovations

1. Adaptive Failure Thresholds

Learn normal failure rates per workload:

pub struct AdaptiveCircuitBreaker {
base_breaker: CircuitBreaker,
/// Historical failure rates
failure_history: RollingAverage,
/// Standard deviation tracking
std_dev: RollingStdDev,
}
impl AdaptiveCircuitBreaker {
pub fn compute_threshold(&self) -> u32 {
let avg_failures = self.failure_history.average();
let std_dev = self.std_dev.value();
// Open circuit at 3 standard deviations above normal
(avg_failures + 3.0 * std_dev) as u32
}
}

2. Workload-Aware Circuits

Different circuits for different operation types:

pub struct WorkloadCircuitBreaker {
/// Circuit for embedding queries
embedding_circuit: CircuitBreaker,
/// Circuit for transactional queries
transactional_circuit: CircuitBreaker,
/// Circuit for analytics queries
analytics_circuit: CircuitBreaker,
}
impl WorkloadCircuitBreaker {
pub fn get_circuit(&self, query: &str) -> &CircuitBreaker {
if self.is_embedding_query(query) {
&self.embedding_circuit
} else if self.is_analytics_query(query) {
&self.analytics_circuit
} else {
&self.transactional_circuit
}
}
}

3. Agent Retry Strategy

Provide retry guidance for AI agents:

pub struct AgentRetryStrategy {
/// Exponential backoff with jitter
pub fn get_retry_delay(&self, attempt: u32) -> Duration {
let base = Duration::from_millis(100);
let max = Duration::from_secs(30);
let delay = base * 2u32.pow(attempt.min(10));
let jitter = rand::random::<f64>() * 0.3;
delay.min(max).mul_f64(1.0 + jitter)
}
/// Should agent retry?
pub fn should_retry(&self, error: &ProxyError, attempt: u32) -> bool {
if attempt >= 5 {
return false;
}
match error {
ProxyError::CircuitOpen { .. } => true, // Retry later
ProxyError::RateLimited { .. } => true, // Retry later
ProxyError::Timeout { .. } => true, // May recover
ProxyError::QueryError { .. } => false, // Don't retry bad queries
_ => true,
}
}
}

4. Conversation Continuity

Maintain conversation state during outages:

pub struct ConversationCircuitBreaker {
breaker: CircuitBreaker,
/// Cached conversation contexts
context_cache: DashMap<ConversationId, ConversationContext>,
}
impl ConversationCircuitBreaker {
pub async fn execute_with_fallback<T>(
&self,
conv_id: &str,
query: impl FnOnce() -> Result<T, ProxyError>,
fallback: impl FnOnce(&ConversationContext) -> T,
) -> T {
match self.breaker.wrap_request(query) {
Ok(result) => result,
Err(ProxyError::CircuitOpen { .. }) => {
// Use cached context as fallback
if let Some(ctx) = self.context_cache.get(conv_id) {
fallback(&ctx)
} else {
// No cached context, must fail
panic!("Circuit open and no cached context")
}
}
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
}

HeliosDB-Lite Integration

1. Sync Mode-Aware Thresholds

Different thresholds for different sync modes:

[circuit_breaker.sync_modes]
# Sync standbys are critical, sensitive thresholds
sync.failure_threshold = 3
sync.cooldown = "5s"
# Semi-sync can tolerate more failures
semisync.failure_threshold = 5
semisync.cooldown = "10s"
# Async standbys are least critical
async.failure_threshold = 10
async.cooldown = "30s"

2. Replication Health Integration

Consider replication status in circuit decisions:

impl CircuitBreaker {
pub fn check_replication_health(&self, lag_monitor: &LagMonitor) -> CircuitState {
let lag = lag_monitor.get_lag(&self.node_id);
if let Some(lag) = lag {
// If lag exceeds threshold, open circuit proactively
if lag.lag_time > self.config.max_acceptable_lag {
self.transition_to_open();
return CircuitState::Open;
}
// If lag is growing rapidly, open circuit
if lag.trend == LagTrend::Degrading && lag.lag_time > Duration::from_secs(1) {
self.transition_to_open();
return CircuitState::Open;
}
}
self.get_state()
}
}

3. TWR Fallback Circuit

Circuit breaker for TWR path:

pub struct TwrCircuitBreaker {
/// Circuit for TWR path to primary
twr_to_primary: CircuitBreaker,
/// Fallback: direct to primary
direct_primary: CircuitBreaker,
}
impl TwrCircuitBreaker {
pub fn route_write(&self, query: &str) -> RoutingDecision {
// Try TWR first
if self.twr_to_primary.get_state() == CircuitState::Closed {
return RoutingDecision::ThroughTwr;
}
// Fall back to direct primary
if self.direct_primary.get_state() == CircuitState::Closed {
return RoutingDecision::DirectPrimary;
}
// Both circuits open
RoutingDecision::Reject
}
}

4. Branch-Specific Circuits

Separate circuits per branch:

pub struct BranchCircuitBreakers {
/// Circuit breakers per (node, branch)
breakers: DashMap<(NodeId, BranchName), CircuitBreaker>,
}
impl BranchCircuitBreakers {
pub fn get_breaker(&self, node: &str, branch: &str) -> &CircuitBreaker {
self.breakers.entry((node.to_string(), branch.to_string()))
.or_insert_with(|| CircuitBreaker::new_for_branch(node, branch))
}
}

Implementation Notes

File Locations

src/proxy/
├── circuit_breaker/
│ ├── mod.rs # Public API
│ ├── breaker.rs # CircuitBreaker implementation
│ ├── manager.rs # CircuitBreakerManager
│ ├── sliding_window.rs # Failure counter
│ ├── state.rs # State machine
│ └── metrics.rs # Circuit metrics

Key Considerations

  1. Thread Safety: Use atomic operations for state transitions.

  2. Distributed Coordination: For multi-proxy setups, consider sharing circuit state via Redis.

  3. Probe Traffic: Limit probe traffic in half-open state to avoid overwhelming recovering node.

  4. Recovery Thundering Herd: Stagger recovery attempts across circuits.

  5. Monitoring: Alert on circuit opens, track recovery times.

State Transition Diagram

failures > threshold
┌─────────────────────────────────────────────────────┐
│ │
│ ┌───────────┐ │
│ │ │ success │
│ │ CLOSED │◄──────────────────────────────┐ │
│ │ │ │ │
│ └─────┬─────┘ │ │
│ │ failures > threshold │ │
│ │ │ │
│ ▼ │ │
│ ┌───────────┐ cooldown elapsed ┌─────┴─────┐
│ │ │─────────────────────────│ │
│ │ OPEN │ │ HALF-OPEN │
│ │ │◄────────────────────────│ │
│ └───────────┘ probe failure └───────────┘
│ │
│ │
└──────────────────────────────────────────────┘
success_count >= threshold

Performance Targets

MetricTargetMeasurement
State check<1μsp99
State transition<10μsp99
Failure recording<5μsp99
Memory per circuit<1KBper node