Resource Leak Prevention System Architecture
Resource Leak Prevention System Architecture
Version: 1.0 Date: 2025-11-10 Status: Phase 1 Production Hardening - Day 5 Author: System Architecture Team
Executive Summary
This document specifies the architecture for HeliosDB’s comprehensive resource leak prevention system. The design ensures zero resource leaks in production through adaptive connection pooling, unified timeout enforcement, system-wide resource limits, and graceful degradation strategies.
Key Features:
- Adaptive connection pooling with leak detection
- Unified timeout framework across all operations
- System-wide resource limit enforcement
- Circuit breaker integration for fault isolation
- Graceful degradation under resource pressure
- Comprehensive monitoring and alerting
1. System Overview
1.1 Architecture Principles
- Defense in Depth: Multiple layers of protection prevent leaks
- Fail-Safe Defaults: Resources automatically released on timeout/error
- Observable: All resource usage tracked and visible
- Adaptive: System adjusts to load and pressure
- Graceful Degradation: Maintains service under resource constraints
1.2 Components
┌─────────────────────────────────────────────────────────────────┐│ Resource Leak Prevention System │├─────────────────────────────────────────────────────────────────┤│ ││ ┌────────────────────┐ ┌────────────────────┐ ││ │ Connection Pool │ │ Timeout Framework │ ││ │ Management │ │ │ ││ └────────┬───────────┘ └────────┬───────────┘ ││ │ │ ││ ┌────────▼───────────┐ ┌────────▼───────────┐ ││ │ Resource Limits │ │ Circuit Breaker │ ││ │ Enforcement │ │ Integration │ ││ └────────┬───────────┘ └────────┬───────────┘ ││ │ │ ││ ┌────────▼───────────────────────▼───────────┐ ││ │ Leak Detection & Monitoring │ ││ └─────────────────────────────────────────────┘ ││ ││ ┌─────────────────────────────────────────────┐ ││ │ Graceful Degradation & Recovery │ ││ └─────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────┘2. Adaptive Connection Pool Architecture
2.1 Design Specification
Existing Implementation: heliosdb-pooling/src/pool.rs
Enhancements Required:
pub struct AdaptiveConnectionPool { // Core Configuration config: PoolConfig, mode: PoolMode,
// Connection Management idle_queue: SegQueue<Connection>, active_connections: RwLock<HashMap<Uuid, ConnectionInfo>>, all_connections: RwLock<HashMap<Uuid, Connection>>,
// Leak Detection (NEW) leak_detector: Arc<ConnectionLeakDetector>, connection_trackers: RwLock<HashMap<Uuid, ConnectionTracker>>,
// Health & Lifecycle (ENHANCED) health_monitor: Arc<ConnectionHealthMonitor>, lifecycle_manager: Arc<ConnectionLifecycleManager>,
// Adaptive Sizing (ENHANCED) metrics_collector: Arc<PoolMetricsCollector>, adaptive_controller: Arc<AdaptivePoolController>,
// Circuit Breaker Integration (NEW) circuit_breaker: Arc<CircuitBreaker>,
// Graceful Shutdown (NEW) shutdown_coordinator: Arc<ShutdownCoordinator>,}
pub struct PoolConfig { // Existing fields... min_connections: usize, // 5 (default) max_connections: usize, // 100 (default) acquire_timeout: Duration, // 30s (default) idle_timeout: Duration, // 5 minutes (default) max_lifetime: Duration, // 1 hour (default)
// NEW: Leak Prevention max_connection_lifetime: Duration, // 4 hours (force close) leak_detection_enabled: bool, // true leak_detection_timeout: Duration, // 10 minutes (unreturned warning) leaked_connection_timeout: Duration, // 30 minutes (force reclaim)
// NEW: Health Monitoring health_check_enabled: bool, // true health_check_interval: Duration, // 30 seconds max_connection_age: Duration, // 2 hours (soft limit) min_health_score: f64, // 0.5 (close if below)
// NEW: Connection Recycling max_queries_per_connection: Option<usize>, // Some(10000) max_errors_per_connection: usize, // 100 error_rate_threshold: f64, // 0.1 (10% errors)
// NEW: Backpressure queue_timeout: Duration, // 5 seconds max_queued_requests: usize, // 1000 queue_full_strategy: QueueFullStrategy, // Reject or Block}
pub enum QueueFullStrategy { Reject, // Return error immediately Block, // Wait for queue space (with timeout) Evict, // Evict lowest priority request}2.2 Connection Leak Detection
Mechanism:
pub struct ConnectionLeakDetector { trackers: Arc<RwLock<HashMap<Uuid, ConnectionTracker>>>, config: LeakDetectorConfig, alert_manager: Arc<AlertManager>,}
pub struct ConnectionTracker { connection_id: Uuid, acquired_at: Instant, acquired_by: String, // Thread/task ID stack_trace: Option<String>, // Capture location last_activity: Instant, expected_return_time: Instant, warning_sent: bool, force_reclaim_scheduled: bool,}
impl ConnectionLeakDetector { /// Called when connection acquired pub fn track_acquisition( &self, connection_id: Uuid, acquired_by: String, expected_duration: Duration, ) { let tracker = ConnectionTracker { connection_id, acquired_at: Instant::now(), acquired_by, stack_trace: capture_stack_trace(), last_activity: Instant::now(), expected_return_time: Instant::now() + expected_duration, warning_sent: false, force_reclaim_scheduled: false, };
self.trackers.write().insert(connection_id, tracker); }
/// Background task: scan for leaked connections pub async fn detect_leaks(&self) -> Vec<LeakedConnection> { let now = Instant::now(); let mut leaks = Vec::new(); let mut trackers = self.trackers.write();
for (id, tracker) in trackers.iter_mut() { let held_duration = now.duration_since(tracker.acquired_at);
// Warn if held longer than expected if held_duration > self.config.leak_detection_timeout && !tracker.warning_sent { warn!( "Potential connection leak detected: {} held for {:?} by {}", id, held_duration, tracker.acquired_by );
self.alert_manager.send_leak_warning( *id, held_duration, &tracker.acquired_by, tracker.stack_trace.as_deref(), );
tracker.warning_sent = true; }
// Force reclaim if held too long if held_duration > self.config.leaked_connection_timeout && !tracker.force_reclaim_scheduled { error!( "Connection {} leaked - forcing reclamation after {:?}", id, held_duration );
leaks.push(LeakedConnection { connection_id: *id, held_duration, acquired_by: tracker.acquired_by.clone(), stack_trace: tracker.stack_trace.clone(), });
tracker.force_reclaim_scheduled = true; } }
leaks }
/// Called when connection released normally pub fn track_release(&self, connection_id: Uuid) { self.trackers.write().remove(&connection_id); }}2.3 Connection Health Monitoring
pub struct ConnectionHealthMonitor { config: HealthConfig, health_scores: Arc<RwLock<HashMap<Uuid, HealthScore>>>,}
pub struct HealthScore { connection_id: Uuid, current_score: f64, // 0.0 to 1.0 total_queries: u64, successful_queries: u64, failed_queries: u64, avg_query_time_ms: f64, last_health_check: Instant, consecutive_failures: u32,}
impl ConnectionHealthMonitor { /// Perform health check on connection pub async fn check_health( &self, connection: &Connection, ) -> HealthCheckResult { let start = Instant::now();
// 1. Ping test let ping_ok = self.ping_connection(connection).await.is_ok();
// 2. Simple query test let query_ok = self.test_query(connection).await.is_ok();
// 3. Check connection state let state_ok = !connection.is_closed() && !connection.is_in_error_state();
// 4. Calculate health score let score = self.calculate_health_score( connection.id, ping_ok, query_ok, state_ok, );
let duration = start.elapsed();
HealthCheckResult { healthy: score >= self.config.min_health_score, score, check_duration: duration, issues: self.identify_issues(ping_ok, query_ok, state_ok), } }
fn calculate_health_score( &self, connection_id: Uuid, ping_ok: bool, query_ok: bool, state_ok: bool, ) -> f64 { let mut scores = self.health_scores.write(); let health = scores.entry(connection_id) .or_insert_with(|| HealthScore::new(connection_id));
// Component scores let ping_score = if ping_ok { 0.33 } else { 0.0 }; let query_score = if query_ok { 0.33 } else { 0.0 }; let state_score = if state_ok { 0.34 } else { 0.0 };
// Success rate score let success_rate = if health.total_queries > 0 { health.successful_queries as f64 / health.total_queries as f64 } else { 1.0 };
// Weighted combination let base_score = ping_score + query_score + state_score; let adjusted_score = base_score * 0.7 + success_rate * 0.3;
// Penalize consecutive failures let penalty = (health.consecutive_failures as f64 * 0.1).min(0.5);
health.current_score = (adjusted_score - penalty).max(0.0).min(1.0); health.last_health_check = Instant::now();
health.current_score }}2.4 Connection Lifecycle Management
pub struct ConnectionLifecycleManager { config: LifecycleConfig, metrics: Arc<LifecycleMetrics>,}
impl ConnectionLifecycleManager { /// Determine if connection should be recycled pub fn should_recycle( &self, connection: &Connection, ) -> RecycleDecision { // Check age limits if connection.age() > self.config.max_connection_lifetime { return RecycleDecision::Recycle { reason: RecycleReason::MaxLifetimeExceeded, }; }
// Check query count if let Some(max_queries) = self.config.max_queries_per_connection { if connection.stats.query_count.load(Ordering::Relaxed) >= max_queries as u64 { return RecycleDecision::Recycle { reason: RecycleReason::MaxQueriesExceeded, }; } }
// Check error rate let error_rate = connection.health_score(); if error_rate < 1.0 - self.config.error_rate_threshold { return RecycleDecision::Recycle { reason: RecycleReason::HighErrorRate(error_rate), }; }
// Check idle time if connection.idle_duration() > self.config.max_idle_time { return RecycleDecision::Recycle { reason: RecycleReason::IdleTimeout, }; }
RecycleDecision::Keep }
/// Gracefully close connection pub async fn close_connection( &self, connection: Connection, reason: RecycleReason, ) -> Result<(), PoolError> { info!( "Closing connection {} due to {:?}", connection.id, reason );
// 1. Mark as closing connection.set_state(ConnectionState::Closing);
// 2. Wait for pending operations (with timeout) if connection.is_in_transaction() { warn!( "Connection {} still in transaction during close, rolling back", connection.id );
let _ = timeout( Duration::from_secs(5), connection.rollback() ).await; }
// 3. Close network connection let _ = timeout( Duration::from_secs(5), connection.shutdown() ).await;
// 4. Mark as closed connection.set_state(ConnectionState::Closed);
// 5. Update metrics self.metrics.record_connection_closed(reason);
Ok(()) }}3. Unified Timeout Framework
3.1 Design Specification
pub struct TimeoutFramework { config: TimeoutConfig, timeout_registry: Arc<RwLock<HashMap<String, TimeoutHandle>>>, cancellation_manager: Arc<CancellationManager>,}
pub struct TimeoutConfig { // Query Timeouts pub default_query_timeout: Duration, // 30s pub long_query_timeout: Duration, // 5 minutes pub admin_query_timeout: Option<Duration>, // None (unlimited) pub ddl_timeout: Duration, // 2 minutes pub background_query_timeout: Duration, // 1 hour
// I/O Timeouts pub file_read_timeout: Duration, // 10s pub file_write_timeout: Duration, // 30s pub network_read_timeout: Duration, // 5s pub network_write_timeout: Duration, // 5s pub network_connect_timeout: Duration, // 10s
// Lock Timeouts pub lock_timeout: Duration, // 1s pub transaction_timeout: Duration, // 10 minutes pub exclusive_lock_timeout: Duration, // 30s pub shared_lock_timeout: Duration, // 5s
// Connection Timeouts pub connection_acquire_timeout: Duration, // 30s pub connection_idle_timeout: Duration, // 5 minutes pub connection_validation_timeout: Duration, // 3s
// Background Task Timeouts pub background_task_timeout: Duration, // 1 hour pub health_check_timeout: Duration, // 10s pub gc_timeout: Duration, // 30 minutes
// Graceful Shutdown Timeouts pub graceful_shutdown_timeout: Duration, // 30s pub force_shutdown_timeout: Duration, // 10s}
impl Default for TimeoutConfig { fn default() -> Self { Self { default_query_timeout: Duration::from_secs(30), long_query_timeout: Duration::from_secs(300), admin_query_timeout: None, ddl_timeout: Duration::from_secs(120), background_query_timeout: Duration::from_secs(3600),
file_read_timeout: Duration::from_secs(10), file_write_timeout: Duration::from_secs(30), network_read_timeout: Duration::from_secs(5), network_write_timeout: Duration::from_secs(5), network_connect_timeout: Duration::from_secs(10),
lock_timeout: Duration::from_secs(1), transaction_timeout: Duration::from_secs(600), exclusive_lock_timeout: Duration::from_secs(30), shared_lock_timeout: Duration::from_secs(5),
connection_acquire_timeout: Duration::from_secs(30), connection_idle_timeout: Duration::from_secs(300), connection_validation_timeout: Duration::from_secs(3),
background_task_timeout: Duration::from_secs(3600), health_check_timeout: Duration::from_secs(10), gc_timeout: Duration::from_secs(1800),
graceful_shutdown_timeout: Duration::from_secs(30), force_shutdown_timeout: Duration::from_secs(10), } }}3.2 Timeout Enforcement Trait
#[async_trait]pub trait TimeoutEnforcer { /// Execute operation with timeout async fn with_timeout<F, T>( &self, duration: Duration, operation: F, ) -> Result<T, TimeoutError> where F: Future<Output = T> + Send, T: Send;
/// Execute with cancellation support async fn with_cancellation<F, T>( &self, operation: F, ) -> (JoinHandle<T>, CancellationToken) where F: Future<Output = T> + Send + 'static, T: Send + 'static;
/// Execute with timeout and cancellation async fn with_timeout_and_cancellation<F, T>( &self, duration: Duration, operation: F, ) -> Result<T, TimeoutError> where F: Future<Output = T> + Send + 'static, T: Send + 'static;}
pub struct DefaultTimeoutEnforcer { framework: Arc<TimeoutFramework>,}
#[async_trait]impl TimeoutEnforcer for DefaultTimeoutEnforcer { async fn with_timeout<F, T>( &self, duration: Duration, operation: F, ) -> Result<T, TimeoutError> where F: Future<Output = T> + Send, T: Send, { match timeout(duration, operation).await { Ok(result) => Ok(result), Err(_) => { error!("Operation timed out after {:?}", duration); Err(TimeoutError::Exceeded(duration)) } } }
async fn with_cancellation<F, T>( &self, operation: F, ) -> (JoinHandle<T>, CancellationToken) where F: Future<Output = T> + Send + 'static, T: Send + 'static, { let token = CancellationToken::new(); let token_clone = token.clone();
let handle = tokio::spawn(async move { tokio::select! { result = operation => result, _ = token_clone.cancelled() => { panic!("Operation cancelled"); } } });
(handle, token) }
async fn with_timeout_and_cancellation<F, T>( &self, duration: Duration, operation: F, ) -> Result<T, TimeoutError> where F: Future<Output = T> + Send + 'static, T: Send + 'static, { let token = CancellationToken::new(); let token_clone = token.clone();
let result = timeout(duration, async move { tokio::select! { result = operation => result, _ = token_clone.cancelled() => { panic!("Operation cancelled"); } } }).await;
match result { Ok(value) => Ok(value), Err(_) => { token.cancel(); Err(TimeoutError::Exceeded(duration)) } } }}3.3 Timeout Context
/// Provides timeout context for nested operationspub struct TimeoutContext { operation_id: Uuid, operation_type: OperationType, started_at: Instant, deadline: Instant, timeout_config: TimeoutConfig, parent_context: Option<Arc<TimeoutContext>>,}
impl TimeoutContext { pub fn new( operation_type: OperationType, timeout_config: TimeoutConfig, ) -> Self { let now = Instant::now(); let timeout = Self::get_timeout_for_operation(&operation_type, &timeout_config);
Self { operation_id: Uuid::new_v4(), operation_type, started_at: now, deadline: now + timeout, timeout_config, parent_context: None, } }
pub fn with_parent(mut self, parent: Arc<TimeoutContext>) -> Self { // Use the more restrictive deadline if parent.deadline < self.deadline { self.deadline = parent.deadline; } self.parent_context = Some(parent); self }
pub fn remaining_time(&self) -> Duration { self.deadline.saturating_duration_since(Instant::now()) }
pub fn is_expired(&self) -> bool { Instant::now() >= self.deadline }
pub fn elapsed(&self) -> Duration { self.started_at.elapsed() }
fn get_timeout_for_operation( op_type: &OperationType, config: &TimeoutConfig, ) -> Duration { match op_type { OperationType::Query => config.default_query_timeout, OperationType::LongQuery => config.long_query_timeout, OperationType::DDL => config.ddl_timeout, OperationType::Lock => config.lock_timeout, OperationType::Transaction => config.transaction_timeout, OperationType::FileIO => config.file_read_timeout, OperationType::NetworkIO => config.network_read_timeout, OperationType::BackgroundTask => config.background_task_timeout, } }}
pub enum OperationType { Query, LongQuery, DDL, Lock, Transaction, FileIO, NetworkIO, BackgroundTask,}4. Resource Limit Enforcement
4.1 Design Specification
Existing Implementation: heliosdb-storage/src/resource_limits.rs
Enhancements Required:
pub struct ResourceLimitEnforcer { config: Arc<RwLock<ResourceLimitsConfig>>, manager: Arc<ResourceLimitManager>,
// NEW: Per-query tracking query_tracker: Arc<QueryResourceTracker>,
// NEW: Per-user tracking user_tracker: Arc<UserResourceTracker>,
// NEW: Global pressure detection pressure_detector: Arc<ResourcePressureDetector>,
// NEW: Enforcement policies enforcement_policy: Arc<EnforcementPolicy>,
// NEW: Background cleanup cleanup_scheduler: Arc<CleanupScheduler>,}
pub struct ResourceLimitsConfig { // Connection Limits pub max_client_connections: usize, // 10000 pub max_connections_per_user: usize, // 100 pub max_connections_per_database: usize, // 500
// File Handle Limits pub max_open_files: usize, // 100000 pub max_temp_files: usize, // 10000 pub max_open_files_per_query: usize, // 100
// Memory Limits pub max_memory_per_query_mb: usize, // 1024 (1GB) pub max_memory_per_connection_mb: usize, // 512MB pub max_cache_size_mb: usize, // 4096 (4GB) pub max_total_memory_mb: usize, // 32768 (32GB)
// Thread Limits pub max_query_threads: usize, // 100 pub max_background_threads: usize, // 20 pub max_connection_threads: usize, // 50
// Query Limits pub max_concurrent_queries: usize, // 1000 pub max_queries_per_user: usize, // 50 pub max_query_result_size_mb: usize, // 100
// Transaction Limits pub max_concurrent_transactions: usize, // 500 pub max_transaction_size_mb: usize, // 512 pub max_locks_per_transaction: usize, // 10000
// Pressure Thresholds pub memory_pressure_threshold: f64, // 0.85 (85%) pub connection_pressure_threshold: f64, // 0.90 (90%) pub cpu_pressure_threshold: f64, // 0.95 (95%)
// Enforcement Actions pub reject_on_pressure: bool, // true pub queue_on_pressure: bool, // false pub evict_on_pressure: bool, // true}4.2 Query Resource Tracking
pub struct QueryResourceTracker { active_queries: Arc<RwLock<HashMap<QueryId, QueryResources>>>, config: Arc<RwLock<ResourceLimitsConfig>>,}
pub struct QueryResources { query_id: QueryId, user: String, started_at: Instant,
// Resource allocations memory_allocated_mb: AtomicUsize, open_files: AtomicUsize, threads_allocated: AtomicUsize, locks_held: AtomicUsize,
// Limits memory_limit_mb: usize, file_limit: usize, thread_limit: usize,
// Cleanup cleanup_registered: AtomicBool,}
impl QueryResourceTracker { /// Allocate memory for query pub fn allocate_memory( &self, query_id: QueryId, amount_mb: usize, ) -> Result<(), ResourceLimitError> { let queries = self.active_queries.read(); let query = queries.get(&query_id) .ok_or(ResourceLimitError::QueryNotFound(query_id))?;
let current = query.memory_allocated_mb.load(Ordering::Acquire); let new_total = current + amount_mb;
if new_total > query.memory_limit_mb { return Err(ResourceLimitError::QueryMemoryLimitExceeded { current: new_total, max: query.memory_limit_mb, }); }
// Check global limit let config = self.config.read(); let global_usage = self.get_total_memory_usage();
if global_usage + amount_mb > config.max_total_memory_mb { return Err(ResourceLimitError::TotalMemoryLimitExceeded { current: global_usage + amount_mb, max: config.max_total_memory_mb, }); }
query.memory_allocated_mb.fetch_add(amount_mb, Ordering::Release);
// Register for cleanup if not already if !query.cleanup_registered.load(Ordering::Acquire) { self.register_cleanup(query_id); query.cleanup_registered.store(true, Ordering::Release); }
Ok(()) }
/// Release all resources for query pub async fn release_query_resources( &self, query_id: QueryId, ) -> Result<(), ResourceLimitError> { let mut queries = self.active_queries.write();
if let Some(query) = queries.remove(&query_id) { let memory_freed = query.memory_allocated_mb.load(Ordering::Acquire); let files_closed = query.open_files.load(Ordering::Acquire);
info!( "Released resources for query {}: {}MB memory, {} files", query_id, memory_freed, files_closed );
// Resources automatically released via Drop implementations }
Ok(()) }
/// Emergency cleanup - force release all resources pub async fn emergency_cleanup(&self, query_id: QueryId) { warn!("Emergency cleanup for query {}", query_id);
let _ = self.release_query_resources(query_id).await;
// Additional forceful cleanup if needed // - Cancel background tasks // - Force close files // - Release locks }}4.3 Resource Pressure Detection
pub struct ResourcePressureDetector { config: Arc<RwLock<ResourceLimitsConfig>>, metrics: Arc<ResourceMetrics>, alert_manager: Arc<AlertManager>,}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum PressureLevel { Normal, // < 70% utilization Elevated, // 70-85% utilization High, // 85-95% utilization Critical, // > 95% utilization}
pub struct PressureStatus { pub memory: PressureLevel, pub connections: PressureLevel, pub files: PressureLevel, pub cpu: PressureLevel, pub overall: PressureLevel,}
impl ResourcePressureDetector { /// Detect current resource pressure pub fn detect_pressure(&self) -> PressureStatus { let config = self.config.read(); let metrics = self.metrics.snapshot();
let memory = self.calculate_pressure( metrics.memory_used_mb, config.max_total_memory_mb, config.memory_pressure_threshold, );
let connections = self.calculate_pressure( metrics.active_connections, config.max_client_connections, config.connection_pressure_threshold, );
let files = self.calculate_pressure( metrics.open_files, config.max_open_files, 0.90, );
let cpu = self.calculate_pressure_f64( metrics.cpu_utilization, 1.0, config.cpu_pressure_threshold, );
let overall = [memory, connections, files, cpu] .iter() .max() .copied() .unwrap_or(PressureLevel::Normal);
let status = PressureStatus { memory, connections, files, cpu, overall, };
// Alert on pressure changes if overall >= PressureLevel::High { self.alert_manager.send_pressure_alert(&status); }
status }
fn calculate_pressure( &self, current: usize, max: usize, threshold: f64, ) -> PressureLevel { let utilization = current as f64 / max as f64; self.calculate_pressure_f64(utilization, 1.0, threshold) }
fn calculate_pressure_f64( &self, current: f64, max: f64, threshold: f64, ) -> PressureLevel { let utilization = current / max;
match utilization { x if x >= 0.95 => PressureLevel::Critical, x if x >= threshold => PressureLevel::High, x if x >= 0.70 => PressureLevel::Elevated, _ => PressureLevel::Normal, } }}5. Circuit Breaker Integration
5.1 Connection Pool Circuit Breaker
Existing Implementation: heliosdb-circuit-breaker/src/breaker.rs
Integration:
impl AdaptiveConnectionPool { /// Acquire connection with circuit breaker protection pub async fn acquire_with_protection( &self, ) -> Result<Connection, PoolError> { // Use circuit breaker to protect pool self.circuit_breaker .call_with_timeout( || async { self.acquire_internal().await }, self.config.acquire_timeout, ) .await .map_err(|e| match e { CircuitBreakerError::CircuitOpen(_) => { PoolError::CircuitOpen { message: "Connection pool circuit breaker is open".to_string(), } } CircuitBreakerError::Timeout(ms) => { PoolError::Timeout { timeout_ms: ms } } _ => PoolError::Other { message: e.to_string(), }, }) }
async fn acquire_internal(&self) -> Result<Connection, CircuitBreakerError> { // Existing acquire logic match self.try_acquire().await { Ok(conn) => Ok(conn), Err(e) => { // Map pool errors to circuit breaker errors Err(CircuitBreakerError::OperationFailed(e.to_string())) } } }}
/// Circuit breaker configuration for connection poolpub fn create_pool_circuit_breaker() -> CircuitBreaker { CircuitBreaker::new( "connection_pool", CircuitBreakerConfig { failure_threshold: 10, // Open after 10 failures success_threshold: 5, // Close after 5 successes timeout_duration: Duration::from_secs(30), // Try again after 30s half_open_max_calls: 3, // Allow 3 test calls }, )}6. Graceful Degradation
6.1 Degradation Strategies
pub struct GracefulDegradationManager { pressure_detector: Arc<ResourcePressureDetector>, config: DegradationConfig, active_degradations: Arc<RwLock<HashSet<DegradationType>>>,}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]pub enum DegradationType { // Connection management ReduceConnectionPool, RejectNewConnections, CloseIdleConnections,
// Query management RejectComplexQueries, LimitQueryConcurrency, ReduceQueryTimeout, DisableQueryCache,
// Feature degradation DisableAnalytics, DisableBackgroundTasks, ReadOnlyMode,
// Resource limits ReduceMemoryLimits, ReduceFileLimits, ForceGarbageCollection,}
impl GracefulDegradationManager { /// Activate degradation based on pressure pub async fn handle_pressure(&self, pressure: PressureStatus) { match pressure.overall { PressureLevel::Normal => { self.clear_all_degradations().await; } PressureLevel::Elevated => { self.activate_degradation(DegradationType::CloseIdleConnections).await; self.activate_degradation(DegradationType::DisableAnalytics).await; } PressureLevel::High => { self.activate_degradation(DegradationType::ReduceConnectionPool).await; self.activate_degradation(DegradationType::LimitQueryConcurrency).await; self.activate_degradation(DegradationType::DisableBackgroundTasks).await; self.activate_degradation(DegradationType::ForceGarbageCollection).await; } PressureLevel::Critical => { self.activate_degradation(DegradationType::RejectNewConnections).await; self.activate_degradation(DegradationType::RejectComplexQueries).await; self.activate_degradation(DegradationType::ReadOnlyMode).await; self.activate_degradation(DegradationType::ReduceMemoryLimits).await; } } }
async fn activate_degradation(&self, degradation: DegradationType) { let mut active = self.active_degradations.write();
if active.insert(degradation.clone()) { warn!("Activating degradation: {:?}", degradation); self.apply_degradation(°radation).await; } }
async fn apply_degradation(&self, degradation: &DegradationType) { match degradation { DegradationType::ReduceConnectionPool => { // Reduce max connections by 50% // Close excess idle connections } DegradationType::RejectNewConnections => { // Return error for new connection attempts } DegradationType::CloseIdleConnections => { // Aggressively close idle connections } DegradationType::LimitQueryConcurrency => { // Reduce max concurrent queries } DegradationType::ForceGarbageCollection => { // Trigger immediate GC } _ => {} } }}6.2 Backpressure Mechanism
pub struct BackpressureManager { queue: Arc<RwLock<VecDeque<QueuedRequest>>>, config: BackpressureConfig, pressure_detector: Arc<ResourcePressureDetector>,}
pub struct QueuedRequest { id: Uuid, priority: Priority, queued_at: Instant, deadline: Instant, request_type: RequestType, sender: oneshot::Sender<Result<Connection, PoolError>>,}
impl BackpressureManager { /// Handle new request with backpressure pub async fn handle_request( &self, request: ConnectionRequest, ) -> Result<Connection, PoolError> { let pressure = self.pressure_detector.detect_pressure();
match pressure.overall { PressureLevel::Normal | PressureLevel::Elevated => { // Proceed normally self.process_request(request).await } PressureLevel::High => { // Queue low-priority requests if request.priority <= Priority::Low { self.queue_request(request).await } else { self.process_request(request).await } } PressureLevel::Critical => { // Reject all but critical requests if request.priority >= Priority::Critical { self.process_request(request).await } else { Err(PoolError::ResourcePressure { level: pressure.overall, }) } } } }
async fn queue_request( &self, request: ConnectionRequest, ) -> Result<Connection, PoolError> { let mut queue = self.queue.write();
// Check queue size if queue.len() >= self.config.max_queue_size { // Evict lowest priority request queue.retain(|r| r.priority > Priority::Low);
if queue.len() >= self.config.max_queue_size { return Err(PoolError::QueueFull); } }
let (tx, rx) = oneshot::channel();
let queued = QueuedRequest { id: Uuid::new_v4(), priority: request.priority, queued_at: Instant::now(), deadline: Instant::now() + self.config.queue_timeout, request_type: request.request_type, sender: tx, };
queue.push_back(queued);
// Wait for response match timeout(self.config.queue_timeout, rx).await { Ok(Ok(result)) => result, Ok(Err(_)) => Err(PoolError::QueuedRequestCancelled), Err(_) => Err(PoolError::QueueTimeout), } }}7. Monitoring and Alerting
7.1 Metrics Collection
pub struct ResourceLeakMetrics { // Connection metrics pub connections_acquired: Counter, pub connections_released: Counter, pub connections_leaked: Counter, pub connection_lifetime_ms: Histogram,
// Leak detection metrics pub leak_warnings: Counter, pub leak_forced_reclaims: Counter, pub leak_detection_runs: Counter,
// Timeout metrics pub operation_timeouts: Counter, pub timeout_cancellations: Counter,
// Resource limit metrics pub resource_limit_violations: Counter, pub memory_allocations_rejected: Counter, pub connection_requests_rejected: Counter,
// Pressure metrics pub memory_pressure_events: Counter, pub connection_pressure_events: Counter, pub degradation_activations: Counter,}7.2 Alert Definitions
pub enum Alert { ConnectionLeak { connection_id: Uuid, held_duration: Duration, acquired_by: String, stack_trace: Option<String>, },
ResourcePressure { resource: ResourceType, level: PressureLevel, current: usize, max: usize, },
CircuitBreakerOpen { circuit: String, failure_count: usize, },
MassiveLeakDetected { leaked_count: usize, total_connections: usize, },
ResourceLimitExceeded { limit_type: String, user: String, current: usize, max: usize, },}8. Implementation Phases
Phase 1: Core Infrastructure (Days 6-7)
-
Connection Leak Detection
- Implement
ConnectionLeakDetector - Add tracking to pool acquire/release
- Background leak scanner
- Alert integration
- Implement
-
Timeout Framework Foundation
- Implement
TimeoutConfig - Create
TimeoutEnforcertrait - Add
TimeoutContextfor nested operations
- Implement
Phase 2: Health & Lifecycle (Days 8-9)
-
Connection Health Monitoring
- Implement
ConnectionHealthMonitor - Health score calculation
- Integration with pool health checks
- Implement
-
Connection Lifecycle Management
- Implement
ConnectionLifecycleManager - Recycling decision logic
- Graceful close procedures
- Implement
Phase 3: Resource Enforcement (Days 10-11)
-
Query Resource Tracking
- Implement
QueryResourceTracker - Per-query memory tracking
- Emergency cleanup procedures
- Implement
-
Resource Pressure Detection
- Implement
ResourcePressureDetector - Multi-resource pressure calculation
- Alert integration
- Implement
Phase 4: Degradation & Recovery (Days 12-13)
-
Graceful Degradation
- Implement
GracefulDegradationManager - Degradation strategies
- Automatic recovery
- Implement
-
Backpressure Management
- Implement
BackpressureManager - Request queueing
- Priority-based handling
- Implement
Phase 5: Testing & Validation (Days 14-15)
-
Unit Tests
- Test each component in isolation
- Mock dependencies
-
Integration Tests
- End-to-end leak detection
- Timeout enforcement
- Pressure handling
-
Stress Tests
- Leak simulation
- Resource exhaustion
- Recovery validation
9. Success Criteria
- Zero Leaks: No resource leaks under normal or abnormal conditions
- Timeout Compliance: All operations respect configured timeouts
- Limit Enforcement: All resource limits strictly enforced
- Graceful Degradation: System remains functional under pressure
- Observable: All resource usage visible in metrics
- Recoverable: System auto-recovers from pressure conditions
10. Operational Runbooks
10.1 Responding to Connection Leaks
1. Identify leaked connections from alerts2. Review stack traces to find acquisition point3. Check if leak is in user code or system code4. If system: Emergency patch + forced reclamation5. If user: Documentation update + client notification6. Monitor for recurrence10.2 Handling Resource Pressure
1. Check pressure detector for affected resources2. Review active degradations3. If memory pressure: Force GC, evict caches4. If connection pressure: Close idle connections5. Scale horizontally if sustained pressure6. Monitor for pressure reliefAppendix A: Configuration Examples
Production Configuration
[connection_pool]min_connections = 10max_connections = 500acquire_timeout = "30s"idle_timeout = "5m"max_lifetime = "2h"max_connection_lifetime = "4h"leak_detection_enabled = trueleak_detection_timeout = "10m"leaked_connection_timeout = "30m"
[timeouts]default_query_timeout = "30s"long_query_timeout = "5m"transaction_timeout = "10m"connection_acquire_timeout = "30s"
[resource_limits]max_client_connections = 10000max_connections_per_user = 100max_memory_per_query_mb = 1024max_total_memory_mb = 32768memory_pressure_threshold = 0.85
[circuit_breaker]failure_threshold = 10success_threshold = 5timeout_duration = "30s"half_open_max_calls = 3Appendix B: Metrics Dashboard
Key metrics to monitor:
- Connection pool utilization
- Leaked connection count
- Resource pressure levels
- Timeout violations
- Circuit breaker state
- Query resource usage
- Degradation activations
Document Control:
- Version: 1.0
- Last Updated: 2025-11-10
- Next Review: Post Phase 1 Implementation
- Owner: System Architecture Team