Skip to content

Resource Leak Prevention System Architecture

Resource Leak Prevention System Architecture

Version: 1.0 Date: 2025-11-10 Status: Phase 1 Production Hardening - Day 5 Author: System Architecture Team

Executive Summary

This document specifies the architecture for HeliosDB’s comprehensive resource leak prevention system. The design ensures zero resource leaks in production through adaptive connection pooling, unified timeout enforcement, system-wide resource limits, and graceful degradation strategies.

Key Features:

  • Adaptive connection pooling with leak detection
  • Unified timeout framework across all operations
  • System-wide resource limit enforcement
  • Circuit breaker integration for fault isolation
  • Graceful degradation under resource pressure
  • Comprehensive monitoring and alerting

1. System Overview

1.1 Architecture Principles

  1. Defense in Depth: Multiple layers of protection prevent leaks
  2. Fail-Safe Defaults: Resources automatically released on timeout/error
  3. Observable: All resource usage tracked and visible
  4. Adaptive: System adjusts to load and pressure
  5. Graceful Degradation: Maintains service under resource constraints

1.2 Components

┌─────────────────────────────────────────────────────────────────┐
│ Resource Leak Prevention System │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────┐ ┌────────────────────┐ │
│ │ Connection Pool │ │ Timeout Framework │ │
│ │ Management │ │ │ │
│ └────────┬───────────┘ └────────┬───────────┘ │
│ │ │ │
│ ┌────────▼───────────┐ ┌────────▼───────────┐ │
│ │ Resource Limits │ │ Circuit Breaker │ │
│ │ Enforcement │ │ Integration │ │
│ └────────┬───────────┘ └────────┬───────────┘ │
│ │ │ │
│ ┌────────▼───────────────────────▼───────────┐ │
│ │ Leak Detection & Monitoring │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Graceful Degradation & Recovery │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

2. Adaptive Connection Pool Architecture

2.1 Design Specification

Existing Implementation: heliosdb-pooling/src/pool.rs

Enhancements Required:

pub struct AdaptiveConnectionPool {
// Core Configuration
config: PoolConfig,
mode: PoolMode,
// Connection Management
idle_queue: SegQueue<Connection>,
active_connections: RwLock<HashMap<Uuid, ConnectionInfo>>,
all_connections: RwLock<HashMap<Uuid, Connection>>,
// Leak Detection (NEW)
leak_detector: Arc<ConnectionLeakDetector>,
connection_trackers: RwLock<HashMap<Uuid, ConnectionTracker>>,
// Health & Lifecycle (ENHANCED)
health_monitor: Arc<ConnectionHealthMonitor>,
lifecycle_manager: Arc<ConnectionLifecycleManager>,
// Adaptive Sizing (ENHANCED)
metrics_collector: Arc<PoolMetricsCollector>,
adaptive_controller: Arc<AdaptivePoolController>,
// Circuit Breaker Integration (NEW)
circuit_breaker: Arc<CircuitBreaker>,
// Graceful Shutdown (NEW)
shutdown_coordinator: Arc<ShutdownCoordinator>,
}
pub struct PoolConfig {
// Existing fields...
min_connections: usize, // 5 (default)
max_connections: usize, // 100 (default)
acquire_timeout: Duration, // 30s (default)
idle_timeout: Duration, // 5 minutes (default)
max_lifetime: Duration, // 1 hour (default)
// NEW: Leak Prevention
max_connection_lifetime: Duration, // 4 hours (force close)
leak_detection_enabled: bool, // true
leak_detection_timeout: Duration, // 10 minutes (unreturned warning)
leaked_connection_timeout: Duration, // 30 minutes (force reclaim)
// NEW: Health Monitoring
health_check_enabled: bool, // true
health_check_interval: Duration, // 30 seconds
max_connection_age: Duration, // 2 hours (soft limit)
min_health_score: f64, // 0.5 (close if below)
// NEW: Connection Recycling
max_queries_per_connection: Option<usize>, // Some(10000)
max_errors_per_connection: usize, // 100
error_rate_threshold: f64, // 0.1 (10% errors)
// NEW: Backpressure
queue_timeout: Duration, // 5 seconds
max_queued_requests: usize, // 1000
queue_full_strategy: QueueFullStrategy, // Reject or Block
}
pub enum QueueFullStrategy {
Reject, // Return error immediately
Block, // Wait for queue space (with timeout)
Evict, // Evict lowest priority request
}

2.2 Connection Leak Detection

Mechanism:

pub struct ConnectionLeakDetector {
trackers: Arc<RwLock<HashMap<Uuid, ConnectionTracker>>>,
config: LeakDetectorConfig,
alert_manager: Arc<AlertManager>,
}
pub struct ConnectionTracker {
connection_id: Uuid,
acquired_at: Instant,
acquired_by: String, // Thread/task ID
stack_trace: Option<String>, // Capture location
last_activity: Instant,
expected_return_time: Instant,
warning_sent: bool,
force_reclaim_scheduled: bool,
}
impl ConnectionLeakDetector {
/// Called when connection acquired
pub fn track_acquisition(
&self,
connection_id: Uuid,
acquired_by: String,
expected_duration: Duration,
) {
let tracker = ConnectionTracker {
connection_id,
acquired_at: Instant::now(),
acquired_by,
stack_trace: capture_stack_trace(),
last_activity: Instant::now(),
expected_return_time: Instant::now() + expected_duration,
warning_sent: false,
force_reclaim_scheduled: false,
};
self.trackers.write().insert(connection_id, tracker);
}
/// Background task: scan for leaked connections
pub async fn detect_leaks(&self) -> Vec<LeakedConnection> {
let now = Instant::now();
let mut leaks = Vec::new();
let mut trackers = self.trackers.write();
for (id, tracker) in trackers.iter_mut() {
let held_duration = now.duration_since(tracker.acquired_at);
// Warn if held longer than expected
if held_duration > self.config.leak_detection_timeout
&& !tracker.warning_sent {
warn!(
"Potential connection leak detected: {} held for {:?} by {}",
id, held_duration, tracker.acquired_by
);
self.alert_manager.send_leak_warning(
*id,
held_duration,
&tracker.acquired_by,
tracker.stack_trace.as_deref(),
);
tracker.warning_sent = true;
}
// Force reclaim if held too long
if held_duration > self.config.leaked_connection_timeout
&& !tracker.force_reclaim_scheduled {
error!(
"Connection {} leaked - forcing reclamation after {:?}",
id, held_duration
);
leaks.push(LeakedConnection {
connection_id: *id,
held_duration,
acquired_by: tracker.acquired_by.clone(),
stack_trace: tracker.stack_trace.clone(),
});
tracker.force_reclaim_scheduled = true;
}
}
leaks
}
/// Called when connection released normally
pub fn track_release(&self, connection_id: Uuid) {
self.trackers.write().remove(&connection_id);
}
}

2.3 Connection Health Monitoring

pub struct ConnectionHealthMonitor {
config: HealthConfig,
health_scores: Arc<RwLock<HashMap<Uuid, HealthScore>>>,
}
pub struct HealthScore {
connection_id: Uuid,
current_score: f64, // 0.0 to 1.0
total_queries: u64,
successful_queries: u64,
failed_queries: u64,
avg_query_time_ms: f64,
last_health_check: Instant,
consecutive_failures: u32,
}
impl ConnectionHealthMonitor {
/// Perform health check on connection
pub async fn check_health(
&self,
connection: &Connection,
) -> HealthCheckResult {
let start = Instant::now();
// 1. Ping test
let ping_ok = self.ping_connection(connection).await.is_ok();
// 2. Simple query test
let query_ok = self.test_query(connection).await.is_ok();
// 3. Check connection state
let state_ok = !connection.is_closed()
&& !connection.is_in_error_state();
// 4. Calculate health score
let score = self.calculate_health_score(
connection.id,
ping_ok,
query_ok,
state_ok,
);
let duration = start.elapsed();
HealthCheckResult {
healthy: score >= self.config.min_health_score,
score,
check_duration: duration,
issues: self.identify_issues(ping_ok, query_ok, state_ok),
}
}
fn calculate_health_score(
&self,
connection_id: Uuid,
ping_ok: bool,
query_ok: bool,
state_ok: bool,
) -> f64 {
let mut scores = self.health_scores.write();
let health = scores.entry(connection_id)
.or_insert_with(|| HealthScore::new(connection_id));
// Component scores
let ping_score = if ping_ok { 0.33 } else { 0.0 };
let query_score = if query_ok { 0.33 } else { 0.0 };
let state_score = if state_ok { 0.34 } else { 0.0 };
// Success rate score
let success_rate = if health.total_queries > 0 {
health.successful_queries as f64 / health.total_queries as f64
} else {
1.0
};
// Weighted combination
let base_score = ping_score + query_score + state_score;
let adjusted_score = base_score * 0.7 + success_rate * 0.3;
// Penalize consecutive failures
let penalty = (health.consecutive_failures as f64 * 0.1).min(0.5);
health.current_score = (adjusted_score - penalty).max(0.0).min(1.0);
health.last_health_check = Instant::now();
health.current_score
}
}

2.4 Connection Lifecycle Management

pub struct ConnectionLifecycleManager {
config: LifecycleConfig,
metrics: Arc<LifecycleMetrics>,
}
impl ConnectionLifecycleManager {
/// Determine if connection should be recycled
pub fn should_recycle(
&self,
connection: &Connection,
) -> RecycleDecision {
// Check age limits
if connection.age() > self.config.max_connection_lifetime {
return RecycleDecision::Recycle {
reason: RecycleReason::MaxLifetimeExceeded,
};
}
// Check query count
if let Some(max_queries) = self.config.max_queries_per_connection {
if connection.stats.query_count.load(Ordering::Relaxed) >= max_queries as u64 {
return RecycleDecision::Recycle {
reason: RecycleReason::MaxQueriesExceeded,
};
}
}
// Check error rate
let error_rate = connection.health_score();
if error_rate < 1.0 - self.config.error_rate_threshold {
return RecycleDecision::Recycle {
reason: RecycleReason::HighErrorRate(error_rate),
};
}
// Check idle time
if connection.idle_duration() > self.config.max_idle_time {
return RecycleDecision::Recycle {
reason: RecycleReason::IdleTimeout,
};
}
RecycleDecision::Keep
}
/// Gracefully close connection
pub async fn close_connection(
&self,
connection: Connection,
reason: RecycleReason,
) -> Result<(), PoolError> {
info!(
"Closing connection {} due to {:?}",
connection.id, reason
);
// 1. Mark as closing
connection.set_state(ConnectionState::Closing);
// 2. Wait for pending operations (with timeout)
if connection.is_in_transaction() {
warn!(
"Connection {} still in transaction during close, rolling back",
connection.id
);
let _ = timeout(
Duration::from_secs(5),
connection.rollback()
).await;
}
// 3. Close network connection
let _ = timeout(
Duration::from_secs(5),
connection.shutdown()
).await;
// 4. Mark as closed
connection.set_state(ConnectionState::Closed);
// 5. Update metrics
self.metrics.record_connection_closed(reason);
Ok(())
}
}

3. Unified Timeout Framework

3.1 Design Specification

pub struct TimeoutFramework {
config: TimeoutConfig,
timeout_registry: Arc<RwLock<HashMap<String, TimeoutHandle>>>,
cancellation_manager: Arc<CancellationManager>,
}
pub struct TimeoutConfig {
// Query Timeouts
pub default_query_timeout: Duration, // 30s
pub long_query_timeout: Duration, // 5 minutes
pub admin_query_timeout: Option<Duration>, // None (unlimited)
pub ddl_timeout: Duration, // 2 minutes
pub background_query_timeout: Duration, // 1 hour
// I/O Timeouts
pub file_read_timeout: Duration, // 10s
pub file_write_timeout: Duration, // 30s
pub network_read_timeout: Duration, // 5s
pub network_write_timeout: Duration, // 5s
pub network_connect_timeout: Duration, // 10s
// Lock Timeouts
pub lock_timeout: Duration, // 1s
pub transaction_timeout: Duration, // 10 minutes
pub exclusive_lock_timeout: Duration, // 30s
pub shared_lock_timeout: Duration, // 5s
// Connection Timeouts
pub connection_acquire_timeout: Duration, // 30s
pub connection_idle_timeout: Duration, // 5 minutes
pub connection_validation_timeout: Duration, // 3s
// Background Task Timeouts
pub background_task_timeout: Duration, // 1 hour
pub health_check_timeout: Duration, // 10s
pub gc_timeout: Duration, // 30 minutes
// Graceful Shutdown Timeouts
pub graceful_shutdown_timeout: Duration, // 30s
pub force_shutdown_timeout: Duration, // 10s
}
impl Default for TimeoutConfig {
fn default() -> Self {
Self {
default_query_timeout: Duration::from_secs(30),
long_query_timeout: Duration::from_secs(300),
admin_query_timeout: None,
ddl_timeout: Duration::from_secs(120),
background_query_timeout: Duration::from_secs(3600),
file_read_timeout: Duration::from_secs(10),
file_write_timeout: Duration::from_secs(30),
network_read_timeout: Duration::from_secs(5),
network_write_timeout: Duration::from_secs(5),
network_connect_timeout: Duration::from_secs(10),
lock_timeout: Duration::from_secs(1),
transaction_timeout: Duration::from_secs(600),
exclusive_lock_timeout: Duration::from_secs(30),
shared_lock_timeout: Duration::from_secs(5),
connection_acquire_timeout: Duration::from_secs(30),
connection_idle_timeout: Duration::from_secs(300),
connection_validation_timeout: Duration::from_secs(3),
background_task_timeout: Duration::from_secs(3600),
health_check_timeout: Duration::from_secs(10),
gc_timeout: Duration::from_secs(1800),
graceful_shutdown_timeout: Duration::from_secs(30),
force_shutdown_timeout: Duration::from_secs(10),
}
}
}

3.2 Timeout Enforcement Trait

#[async_trait]
pub trait TimeoutEnforcer {
/// Execute operation with timeout
async fn with_timeout<F, T>(
&self,
duration: Duration,
operation: F,
) -> Result<T, TimeoutError>
where
F: Future<Output = T> + Send,
T: Send;
/// Execute with cancellation support
async fn with_cancellation<F, T>(
&self,
operation: F,
) -> (JoinHandle<T>, CancellationToken)
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static;
/// Execute with timeout and cancellation
async fn with_timeout_and_cancellation<F, T>(
&self,
duration: Duration,
operation: F,
) -> Result<T, TimeoutError>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static;
}
pub struct DefaultTimeoutEnforcer {
framework: Arc<TimeoutFramework>,
}
#[async_trait]
impl TimeoutEnforcer for DefaultTimeoutEnforcer {
async fn with_timeout<F, T>(
&self,
duration: Duration,
operation: F,
) -> Result<T, TimeoutError>
where
F: Future<Output = T> + Send,
T: Send,
{
match timeout(duration, operation).await {
Ok(result) => Ok(result),
Err(_) => {
error!("Operation timed out after {:?}", duration);
Err(TimeoutError::Exceeded(duration))
}
}
}
async fn with_cancellation<F, T>(
&self,
operation: F,
) -> (JoinHandle<T>, CancellationToken)
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let token = CancellationToken::new();
let token_clone = token.clone();
let handle = tokio::spawn(async move {
tokio::select! {
result = operation => result,
_ = token_clone.cancelled() => {
panic!("Operation cancelled");
}
}
});
(handle, token)
}
async fn with_timeout_and_cancellation<F, T>(
&self,
duration: Duration,
operation: F,
) -> Result<T, TimeoutError>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let token = CancellationToken::new();
let token_clone = token.clone();
let result = timeout(duration, async move {
tokio::select! {
result = operation => result,
_ = token_clone.cancelled() => {
panic!("Operation cancelled");
}
}
}).await;
match result {
Ok(value) => Ok(value),
Err(_) => {
token.cancel();
Err(TimeoutError::Exceeded(duration))
}
}
}
}

3.3 Timeout Context

/// Provides timeout context for nested operations
pub struct TimeoutContext {
operation_id: Uuid,
operation_type: OperationType,
started_at: Instant,
deadline: Instant,
timeout_config: TimeoutConfig,
parent_context: Option<Arc<TimeoutContext>>,
}
impl TimeoutContext {
pub fn new(
operation_type: OperationType,
timeout_config: TimeoutConfig,
) -> Self {
let now = Instant::now();
let timeout = Self::get_timeout_for_operation(&operation_type, &timeout_config);
Self {
operation_id: Uuid::new_v4(),
operation_type,
started_at: now,
deadline: now + timeout,
timeout_config,
parent_context: None,
}
}
pub fn with_parent(mut self, parent: Arc<TimeoutContext>) -> Self {
// Use the more restrictive deadline
if parent.deadline < self.deadline {
self.deadline = parent.deadline;
}
self.parent_context = Some(parent);
self
}
pub fn remaining_time(&self) -> Duration {
self.deadline.saturating_duration_since(Instant::now())
}
pub fn is_expired(&self) -> bool {
Instant::now() >= self.deadline
}
pub fn elapsed(&self) -> Duration {
self.started_at.elapsed()
}
fn get_timeout_for_operation(
op_type: &OperationType,
config: &TimeoutConfig,
) -> Duration {
match op_type {
OperationType::Query => config.default_query_timeout,
OperationType::LongQuery => config.long_query_timeout,
OperationType::DDL => config.ddl_timeout,
OperationType::Lock => config.lock_timeout,
OperationType::Transaction => config.transaction_timeout,
OperationType::FileIO => config.file_read_timeout,
OperationType::NetworkIO => config.network_read_timeout,
OperationType::BackgroundTask => config.background_task_timeout,
}
}
}
pub enum OperationType {
Query,
LongQuery,
DDL,
Lock,
Transaction,
FileIO,
NetworkIO,
BackgroundTask,
}

4. Resource Limit Enforcement

4.1 Design Specification

Existing Implementation: heliosdb-storage/src/resource_limits.rs

Enhancements Required:

pub struct ResourceLimitEnforcer {
config: Arc<RwLock<ResourceLimitsConfig>>,
manager: Arc<ResourceLimitManager>,
// NEW: Per-query tracking
query_tracker: Arc<QueryResourceTracker>,
// NEW: Per-user tracking
user_tracker: Arc<UserResourceTracker>,
// NEW: Global pressure detection
pressure_detector: Arc<ResourcePressureDetector>,
// NEW: Enforcement policies
enforcement_policy: Arc<EnforcementPolicy>,
// NEW: Background cleanup
cleanup_scheduler: Arc<CleanupScheduler>,
}
pub struct ResourceLimitsConfig {
// Connection Limits
pub max_client_connections: usize, // 10000
pub max_connections_per_user: usize, // 100
pub max_connections_per_database: usize, // 500
// File Handle Limits
pub max_open_files: usize, // 100000
pub max_temp_files: usize, // 10000
pub max_open_files_per_query: usize, // 100
// Memory Limits
pub max_memory_per_query_mb: usize, // 1024 (1GB)
pub max_memory_per_connection_mb: usize, // 512MB
pub max_cache_size_mb: usize, // 4096 (4GB)
pub max_total_memory_mb: usize, // 32768 (32GB)
// Thread Limits
pub max_query_threads: usize, // 100
pub max_background_threads: usize, // 20
pub max_connection_threads: usize, // 50
// Query Limits
pub max_concurrent_queries: usize, // 1000
pub max_queries_per_user: usize, // 50
pub max_query_result_size_mb: usize, // 100
// Transaction Limits
pub max_concurrent_transactions: usize, // 500
pub max_transaction_size_mb: usize, // 512
pub max_locks_per_transaction: usize, // 10000
// Pressure Thresholds
pub memory_pressure_threshold: f64, // 0.85 (85%)
pub connection_pressure_threshold: f64, // 0.90 (90%)
pub cpu_pressure_threshold: f64, // 0.95 (95%)
// Enforcement Actions
pub reject_on_pressure: bool, // true
pub queue_on_pressure: bool, // false
pub evict_on_pressure: bool, // true
}

4.2 Query Resource Tracking

pub struct QueryResourceTracker {
active_queries: Arc<RwLock<HashMap<QueryId, QueryResources>>>,
config: Arc<RwLock<ResourceLimitsConfig>>,
}
pub struct QueryResources {
query_id: QueryId,
user: String,
started_at: Instant,
// Resource allocations
memory_allocated_mb: AtomicUsize,
open_files: AtomicUsize,
threads_allocated: AtomicUsize,
locks_held: AtomicUsize,
// Limits
memory_limit_mb: usize,
file_limit: usize,
thread_limit: usize,
// Cleanup
cleanup_registered: AtomicBool,
}
impl QueryResourceTracker {
/// Allocate memory for query
pub fn allocate_memory(
&self,
query_id: QueryId,
amount_mb: usize,
) -> Result<(), ResourceLimitError> {
let queries = self.active_queries.read();
let query = queries.get(&query_id)
.ok_or(ResourceLimitError::QueryNotFound(query_id))?;
let current = query.memory_allocated_mb.load(Ordering::Acquire);
let new_total = current + amount_mb;
if new_total > query.memory_limit_mb {
return Err(ResourceLimitError::QueryMemoryLimitExceeded {
current: new_total,
max: query.memory_limit_mb,
});
}
// Check global limit
let config = self.config.read();
let global_usage = self.get_total_memory_usage();
if global_usage + amount_mb > config.max_total_memory_mb {
return Err(ResourceLimitError::TotalMemoryLimitExceeded {
current: global_usage + amount_mb,
max: config.max_total_memory_mb,
});
}
query.memory_allocated_mb.fetch_add(amount_mb, Ordering::Release);
// Register for cleanup if not already
if !query.cleanup_registered.load(Ordering::Acquire) {
self.register_cleanup(query_id);
query.cleanup_registered.store(true, Ordering::Release);
}
Ok(())
}
/// Release all resources for query
pub async fn release_query_resources(
&self,
query_id: QueryId,
) -> Result<(), ResourceLimitError> {
let mut queries = self.active_queries.write();
if let Some(query) = queries.remove(&query_id) {
let memory_freed = query.memory_allocated_mb.load(Ordering::Acquire);
let files_closed = query.open_files.load(Ordering::Acquire);
info!(
"Released resources for query {}: {}MB memory, {} files",
query_id, memory_freed, files_closed
);
// Resources automatically released via Drop implementations
}
Ok(())
}
/// Emergency cleanup - force release all resources
pub async fn emergency_cleanup(&self, query_id: QueryId) {
warn!("Emergency cleanup for query {}", query_id);
let _ = self.release_query_resources(query_id).await;
// Additional forceful cleanup if needed
// - Cancel background tasks
// - Force close files
// - Release locks
}
}

4.3 Resource Pressure Detection

pub struct ResourcePressureDetector {
config: Arc<RwLock<ResourceLimitsConfig>>,
metrics: Arc<ResourceMetrics>,
alert_manager: Arc<AlertManager>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PressureLevel {
Normal, // < 70% utilization
Elevated, // 70-85% utilization
High, // 85-95% utilization
Critical, // > 95% utilization
}
pub struct PressureStatus {
pub memory: PressureLevel,
pub connections: PressureLevel,
pub files: PressureLevel,
pub cpu: PressureLevel,
pub overall: PressureLevel,
}
impl ResourcePressureDetector {
/// Detect current resource pressure
pub fn detect_pressure(&self) -> PressureStatus {
let config = self.config.read();
let metrics = self.metrics.snapshot();
let memory = self.calculate_pressure(
metrics.memory_used_mb,
config.max_total_memory_mb,
config.memory_pressure_threshold,
);
let connections = self.calculate_pressure(
metrics.active_connections,
config.max_client_connections,
config.connection_pressure_threshold,
);
let files = self.calculate_pressure(
metrics.open_files,
config.max_open_files,
0.90,
);
let cpu = self.calculate_pressure_f64(
metrics.cpu_utilization,
1.0,
config.cpu_pressure_threshold,
);
let overall = [memory, connections, files, cpu]
.iter()
.max()
.copied()
.unwrap_or(PressureLevel::Normal);
let status = PressureStatus {
memory,
connections,
files,
cpu,
overall,
};
// Alert on pressure changes
if overall >= PressureLevel::High {
self.alert_manager.send_pressure_alert(&status);
}
status
}
fn calculate_pressure(
&self,
current: usize,
max: usize,
threshold: f64,
) -> PressureLevel {
let utilization = current as f64 / max as f64;
self.calculate_pressure_f64(utilization, 1.0, threshold)
}
fn calculate_pressure_f64(
&self,
current: f64,
max: f64,
threshold: f64,
) -> PressureLevel {
let utilization = current / max;
match utilization {
x if x >= 0.95 => PressureLevel::Critical,
x if x >= threshold => PressureLevel::High,
x if x >= 0.70 => PressureLevel::Elevated,
_ => PressureLevel::Normal,
}
}
}

5. Circuit Breaker Integration

5.1 Connection Pool Circuit Breaker

Existing Implementation: heliosdb-circuit-breaker/src/breaker.rs

Integration:

impl AdaptiveConnectionPool {
/// Acquire connection with circuit breaker protection
pub async fn acquire_with_protection(
&self,
) -> Result<Connection, PoolError> {
// Use circuit breaker to protect pool
self.circuit_breaker
.call_with_timeout(
|| async {
self.acquire_internal().await
},
self.config.acquire_timeout,
)
.await
.map_err(|e| match e {
CircuitBreakerError::CircuitOpen(_) => {
PoolError::CircuitOpen {
message: "Connection pool circuit breaker is open".to_string(),
}
}
CircuitBreakerError::Timeout(ms) => {
PoolError::Timeout { timeout_ms: ms }
}
_ => PoolError::Other {
message: e.to_string(),
},
})
}
async fn acquire_internal(&self) -> Result<Connection, CircuitBreakerError> {
// Existing acquire logic
match self.try_acquire().await {
Ok(conn) => Ok(conn),
Err(e) => {
// Map pool errors to circuit breaker errors
Err(CircuitBreakerError::OperationFailed(e.to_string()))
}
}
}
}
/// Circuit breaker configuration for connection pool
pub fn create_pool_circuit_breaker() -> CircuitBreaker {
CircuitBreaker::new(
"connection_pool",
CircuitBreakerConfig {
failure_threshold: 10, // Open after 10 failures
success_threshold: 5, // Close after 5 successes
timeout_duration: Duration::from_secs(30), // Try again after 30s
half_open_max_calls: 3, // Allow 3 test calls
},
)
}

6. Graceful Degradation

6.1 Degradation Strategies

pub struct GracefulDegradationManager {
pressure_detector: Arc<ResourcePressureDetector>,
config: DegradationConfig,
active_degradations: Arc<RwLock<HashSet<DegradationType>>>,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum DegradationType {
// Connection management
ReduceConnectionPool,
RejectNewConnections,
CloseIdleConnections,
// Query management
RejectComplexQueries,
LimitQueryConcurrency,
ReduceQueryTimeout,
DisableQueryCache,
// Feature degradation
DisableAnalytics,
DisableBackgroundTasks,
ReadOnlyMode,
// Resource limits
ReduceMemoryLimits,
ReduceFileLimits,
ForceGarbageCollection,
}
impl GracefulDegradationManager {
/// Activate degradation based on pressure
pub async fn handle_pressure(&self, pressure: PressureStatus) {
match pressure.overall {
PressureLevel::Normal => {
self.clear_all_degradations().await;
}
PressureLevel::Elevated => {
self.activate_degradation(DegradationType::CloseIdleConnections).await;
self.activate_degradation(DegradationType::DisableAnalytics).await;
}
PressureLevel::High => {
self.activate_degradation(DegradationType::ReduceConnectionPool).await;
self.activate_degradation(DegradationType::LimitQueryConcurrency).await;
self.activate_degradation(DegradationType::DisableBackgroundTasks).await;
self.activate_degradation(DegradationType::ForceGarbageCollection).await;
}
PressureLevel::Critical => {
self.activate_degradation(DegradationType::RejectNewConnections).await;
self.activate_degradation(DegradationType::RejectComplexQueries).await;
self.activate_degradation(DegradationType::ReadOnlyMode).await;
self.activate_degradation(DegradationType::ReduceMemoryLimits).await;
}
}
}
async fn activate_degradation(&self, degradation: DegradationType) {
let mut active = self.active_degradations.write();
if active.insert(degradation.clone()) {
warn!("Activating degradation: {:?}", degradation);
self.apply_degradation(&degradation).await;
}
}
async fn apply_degradation(&self, degradation: &DegradationType) {
match degradation {
DegradationType::ReduceConnectionPool => {
// Reduce max connections by 50%
// Close excess idle connections
}
DegradationType::RejectNewConnections => {
// Return error for new connection attempts
}
DegradationType::CloseIdleConnections => {
// Aggressively close idle connections
}
DegradationType::LimitQueryConcurrency => {
// Reduce max concurrent queries
}
DegradationType::ForceGarbageCollection => {
// Trigger immediate GC
}
_ => {}
}
}
}

6.2 Backpressure Mechanism

pub struct BackpressureManager {
queue: Arc<RwLock<VecDeque<QueuedRequest>>>,
config: BackpressureConfig,
pressure_detector: Arc<ResourcePressureDetector>,
}
pub struct QueuedRequest {
id: Uuid,
priority: Priority,
queued_at: Instant,
deadline: Instant,
request_type: RequestType,
sender: oneshot::Sender<Result<Connection, PoolError>>,
}
impl BackpressureManager {
/// Handle new request with backpressure
pub async fn handle_request(
&self,
request: ConnectionRequest,
) -> Result<Connection, PoolError> {
let pressure = self.pressure_detector.detect_pressure();
match pressure.overall {
PressureLevel::Normal | PressureLevel::Elevated => {
// Proceed normally
self.process_request(request).await
}
PressureLevel::High => {
// Queue low-priority requests
if request.priority <= Priority::Low {
self.queue_request(request).await
} else {
self.process_request(request).await
}
}
PressureLevel::Critical => {
// Reject all but critical requests
if request.priority >= Priority::Critical {
self.process_request(request).await
} else {
Err(PoolError::ResourcePressure {
level: pressure.overall,
})
}
}
}
}
async fn queue_request(
&self,
request: ConnectionRequest,
) -> Result<Connection, PoolError> {
let mut queue = self.queue.write();
// Check queue size
if queue.len() >= self.config.max_queue_size {
// Evict lowest priority request
queue.retain(|r| r.priority > Priority::Low);
if queue.len() >= self.config.max_queue_size {
return Err(PoolError::QueueFull);
}
}
let (tx, rx) = oneshot::channel();
let queued = QueuedRequest {
id: Uuid::new_v4(),
priority: request.priority,
queued_at: Instant::now(),
deadline: Instant::now() + self.config.queue_timeout,
request_type: request.request_type,
sender: tx,
};
queue.push_back(queued);
// Wait for response
match timeout(self.config.queue_timeout, rx).await {
Ok(Ok(result)) => result,
Ok(Err(_)) => Err(PoolError::QueuedRequestCancelled),
Err(_) => Err(PoolError::QueueTimeout),
}
}
}

7. Monitoring and Alerting

7.1 Metrics Collection

pub struct ResourceLeakMetrics {
// Connection metrics
pub connections_acquired: Counter,
pub connections_released: Counter,
pub connections_leaked: Counter,
pub connection_lifetime_ms: Histogram,
// Leak detection metrics
pub leak_warnings: Counter,
pub leak_forced_reclaims: Counter,
pub leak_detection_runs: Counter,
// Timeout metrics
pub operation_timeouts: Counter,
pub timeout_cancellations: Counter,
// Resource limit metrics
pub resource_limit_violations: Counter,
pub memory_allocations_rejected: Counter,
pub connection_requests_rejected: Counter,
// Pressure metrics
pub memory_pressure_events: Counter,
pub connection_pressure_events: Counter,
pub degradation_activations: Counter,
}

7.2 Alert Definitions

pub enum Alert {
ConnectionLeak {
connection_id: Uuid,
held_duration: Duration,
acquired_by: String,
stack_trace: Option<String>,
},
ResourcePressure {
resource: ResourceType,
level: PressureLevel,
current: usize,
max: usize,
},
CircuitBreakerOpen {
circuit: String,
failure_count: usize,
},
MassiveLeakDetected {
leaked_count: usize,
total_connections: usize,
},
ResourceLimitExceeded {
limit_type: String,
user: String,
current: usize,
max: usize,
},
}

8. Implementation Phases

Phase 1: Core Infrastructure (Days 6-7)

  1. Connection Leak Detection

    • Implement ConnectionLeakDetector
    • Add tracking to pool acquire/release
    • Background leak scanner
    • Alert integration
  2. Timeout Framework Foundation

    • Implement TimeoutConfig
    • Create TimeoutEnforcer trait
    • Add TimeoutContext for nested operations

Phase 2: Health & Lifecycle (Days 8-9)

  1. Connection Health Monitoring

    • Implement ConnectionHealthMonitor
    • Health score calculation
    • Integration with pool health checks
  2. Connection Lifecycle Management

    • Implement ConnectionLifecycleManager
    • Recycling decision logic
    • Graceful close procedures

Phase 3: Resource Enforcement (Days 10-11)

  1. Query Resource Tracking

    • Implement QueryResourceTracker
    • Per-query memory tracking
    • Emergency cleanup procedures
  2. Resource Pressure Detection

    • Implement ResourcePressureDetector
    • Multi-resource pressure calculation
    • Alert integration

Phase 4: Degradation & Recovery (Days 12-13)

  1. Graceful Degradation

    • Implement GracefulDegradationManager
    • Degradation strategies
    • Automatic recovery
  2. Backpressure Management

    • Implement BackpressureManager
    • Request queueing
    • Priority-based handling

Phase 5: Testing & Validation (Days 14-15)

  1. Unit Tests

    • Test each component in isolation
    • Mock dependencies
  2. Integration Tests

    • End-to-end leak detection
    • Timeout enforcement
    • Pressure handling
  3. Stress Tests

    • Leak simulation
    • Resource exhaustion
    • Recovery validation

9. Success Criteria

  1. Zero Leaks: No resource leaks under normal or abnormal conditions
  2. Timeout Compliance: All operations respect configured timeouts
  3. Limit Enforcement: All resource limits strictly enforced
  4. Graceful Degradation: System remains functional under pressure
  5. Observable: All resource usage visible in metrics
  6. Recoverable: System auto-recovers from pressure conditions

10. Operational Runbooks

10.1 Responding to Connection Leaks

1. Identify leaked connections from alerts
2. Review stack traces to find acquisition point
3. Check if leak is in user code or system code
4. If system: Emergency patch + forced reclamation
5. If user: Documentation update + client notification
6. Monitor for recurrence

10.2 Handling Resource Pressure

1. Check pressure detector for affected resources
2. Review active degradations
3. If memory pressure: Force GC, evict caches
4. If connection pressure: Close idle connections
5. Scale horizontally if sustained pressure
6. Monitor for pressure relief

Appendix A: Configuration Examples

Production Configuration

[connection_pool]
min_connections = 10
max_connections = 500
acquire_timeout = "30s"
idle_timeout = "5m"
max_lifetime = "2h"
max_connection_lifetime = "4h"
leak_detection_enabled = true
leak_detection_timeout = "10m"
leaked_connection_timeout = "30m"
[timeouts]
default_query_timeout = "30s"
long_query_timeout = "5m"
transaction_timeout = "10m"
connection_acquire_timeout = "30s"
[resource_limits]
max_client_connections = 10000
max_connections_per_user = 100
max_memory_per_query_mb = 1024
max_total_memory_mb = 32768
memory_pressure_threshold = 0.85
[circuit_breaker]
failure_threshold = 10
success_threshold = 5
timeout_duration = "30s"
half_open_max_calls = 3

Appendix B: Metrics Dashboard

Key metrics to monitor:

  • Connection pool utilization
  • Leaked connection count
  • Resource pressure levels
  • Timeout violations
  • Circuit breaker state
  • Query resource usage
  • Degradation activations

Document Control:

  • Version: 1.0
  • Last Updated: 2025-11-10
  • Next Review: Post Phase 1 Implementation
  • Owner: System Architecture Team