Connection Pool Leak Prevention - Quick Start Guide
Connection Pool Leak Prevention - Quick Start Guide
Overview
This guide helps you quickly integrate the connection pool leak prevention system into your HeliosDB applications.
Basic Usage
1. Using Enhanced Connection Pool
use heliosdb_storage::connection_pool_enhanced::{ EnhancedConnectionPool, EnhancedPoolConfig};use heliosdb_common::ConnectionMetadata;
// Create pool with default configurationlet config = EnhancedPoolConfig::default();let pool = EnhancedConnectionPool::new(config);
// Add a connectionlet conn = MyConnection::new();let metadata = ConnectionMetadata { user: "myuser".to_string(), database: Some("mydb".to_string()), protocol: "postgres".to_string(), remote_addr: Some("127.0.0.1:5432".to_string()), tags: vec!["read-only".to_string()],};
let conn_id = pool.add_connection(conn, "myuser".to_string(), metadata) .await?;
// Acquire connection (automatically tracked for leaks)let guard = pool.acquire("myuser").await?;
// Use connection via connection_idlet result = execute_query(guard.connection_id()).await?;
// Connection automatically returned when guard dropsdrop(guard);2. Using Leak Detector Directly
use heliosdb_common::{ LeakDetector, LeakDetectorConfig, ResourceId, ResourceInfo, ResourceType};
// Create leak detectorlet config = LeakDetectorConfig { alert_threshold: Duration::from_secs(30), critical_threshold: Duration::from_secs(300), ..Default::default()};let detector = LeakDetector::with_config(config);
// Track a resourcelet id = ResourceId::new("my-resource-1");let info = ResourceInfo::new(ResourceType::Connection, "user123") .with_metadata("query", "SELECT * FROM users") .with_metadata("database", "production");
detector.track(id.clone(), info)?;
// Do work...
// Release resourcedetector.release(&id)?;
// Check for leakslet leaks = detector.check_leaks();for leak in leaks { println!("LEAK: {}", leak);}3. Using Resource Guard (RAII Pattern)
use heliosdb_common::{LeakDetector, ResourceGuard, ResourceId, ResourceInfo, ResourceType};
let detector = LeakDetector::new();
{ // Resource tracked on creation let guard = ResourceGuard::new( &detector, ResourceId::new("auto-resource"), ResourceInfo::new(ResourceType::Connection, "user") )?;
// Do work...
} // Resource automatically released when guard drops4. Connection Health Monitoring
use heliosdb_storage::connection_health_monitor::{ EnhancedHealthMonitor, HealthMonitorConfig};
// Create health monitorlet config = HealthMonitorConfig::default();let leak_detector = Arc::new(LeakDetector::new());let monitor = EnhancedHealthMonitor::new(config, leak_detector);
// Monitor a connectionmonitor.monitor_connection( "conn-123".to_string(), Arc::new(my_connection)).await;
// Start background monitoringmonitor.start_background_monitoring();
// Check healthif let Some(health) = monitor.check_connection("conn-123").await { println!("Status: {:?}", health.status); println!("Is leaked: {}", health.is_leaked);}
// Get all unhealthy connectionslet unhealthy = monitor.get_unhealthy_connections();for conn in unhealthy { println!("Unhealthy: {} - {:?}", conn.connection_id, conn.status);}Configuration Examples
Development Configuration
EnhancedPoolConfig { max_connections: 100, max_per_user: 10, acquire_timeout_ms: 5_000, idle_timeout_ms: 60_000, // 1 minute max_connection_age_ms: 600_000, // 10 minutes health_check_enabled: true, health_check_interval_ms: 30_000, leak_detection_enabled: true, leak_detection_threshold_ms: 10_000, // 10 seconds auto_cleanup_enabled: true, cleanup_interval_ms: 30_000,}Staging Configuration
EnhancedPoolConfig { max_connections: 1_000, max_per_user: 50, acquire_timeout_ms: 10_000, idle_timeout_ms: 180_000, // 3 minutes max_connection_age_ms: 1_800_000, // 30 minutes health_check_enabled: true, health_check_interval_ms: 60_000, leak_detection_enabled: true, leak_detection_threshold_ms: 15_000, auto_cleanup_enabled: true, cleanup_interval_ms: 60_000,}Production Configuration
EnhancedPoolConfig { max_connections: 10_000, max_per_user: 100, acquire_timeout_ms: 30_000, idle_timeout_ms: 300_000, // 5 minutes max_connection_age_ms: 3_600_000, // 1 hour health_check_enabled: true, health_check_interval_ms: 60_000, leak_detection_enabled: true, leak_detection_threshold_ms: 30_000, auto_cleanup_enabled: true, cleanup_interval_ms: 60_000,}Common Patterns
Pattern 1: Query Execution with Leak Protection
async fn execute_safe_query( pool: &EnhancedConnectionPool<DbConnection>, query: &str) -> Result<QueryResult> { // Guard ensures connection is returned even on error let guard = pool.acquire("myuser").await?;
// Execute query let result = timeout( Duration::from_secs(30), execute_query_internal(guard.connection_id(), query) ).await??;
Ok(result)}Pattern 2: Transaction with Auto-Rollback
async fn execute_transaction( pool: &EnhancedConnectionPool<DbConnection>,) -> Result<()> { let guard = pool.acquire("myuser").await?;
// Begin transaction begin_transaction(guard.connection_id()).await?;
// If any operation fails, guard drop ensures connection returns // and transaction can be rolled back let result = perform_operations(guard.connection_id()).await;
match result { Ok(_) => commit_transaction(guard.connection_id()).await?, Err(e) => { rollback_transaction(guard.connection_id()).await?; return Err(e); } }
Ok(())}Pattern 3: Batch Operations with Monitoring
async fn batch_process_with_monitoring( pool: &EnhancedConnectionPool<DbConnection>, items: Vec<Item>,) -> Result<()> { let guard = pool.acquire("batch_user").await?;
for (i, item) in items.iter().enumerate() { // Process item process_item(guard.connection_id(), item).await?;
// Periodically check for leaks if i % 100 == 0 { let stats = pool.stats(); if stats.leak_stats.current_tracked > expected_count { warn!("Potential leak detected during batch processing"); } } }
Ok(())}Pattern 4: Long-Running Operation with Heartbeat
async fn long_running_operation( pool: &EnhancedConnectionPool<DbConnection>, leak_detector: &LeakDetector,) -> Result<()> { let guard = pool.acquire("long_op_user").await?; let resource_id = ResourceId::new(guard.connection_id());
// Track as long-running operation let info = ResourceInfo::new(ResourceType::Connection, "long_op_user") .with_metadata("operation", "data_migration") .with_metadata("expected_duration_sec", "3600");
leak_detector.track(resource_id.clone(), info)?;
// Perform operation with periodic heartbeat let result = tokio::select! { res = perform_long_operation(guard.connection_id()) => res, _ = tokio::time::sleep(Duration::from_secs(3600)) => { Err(anyhow!("Operation timeout")) } };
// Release tracking leak_detector.release(&resource_id)?;
result}Monitoring & Debugging
Check Pool Statistics
let stats = pool.stats();println!("Total connections: {}", stats.total_connections);println!("Active: {}", stats.active_connections);println!("Idle: {}", stats.idle_connections);println!("Leaks detected: {}", stats.leak_stats.leaks_detected);println!("Current tracked: {}", stats.leak_stats.current_tracked);Get Leak Reports
let leaks = pool.leak_detector.check_leaks();for leak in leaks { eprintln!("LEAK DETECTED:"); eprintln!(" Resource: {}", leak.resource_id.as_str()); eprintln!(" Type: {}", leak.resource_type); eprintln!(" Age: {:?}", leak.age); eprintln!(" Acquired by: {}", leak.acquired_by); eprintln!(" Metadata: {:?}", leak.metadata);}Monitor Health Status
let all_health = monitor.get_all_health();for health in all_health { println!("{}: {:?} (leaked: {})", health.connection_id, health.status, health.is_leaked );}Troubleshooting
Problem: Too many leak alerts
Solution: Increase leak_detection_threshold_ms or mark long-running operations explicitly
let config = EnhancedPoolConfig { leak_detection_threshold_ms: 60_000, // Increase to 1 minute ..Default::default()};Problem: Connections cleaned up too aggressively
Solution: Increase idle timeout and max age
let config = EnhancedPoolConfig { idle_timeout_ms: 600_000, // 10 minutes max_connection_age_ms: 7_200_000, // 2 hours ..Default::default()};Problem: Memory usage from leak detector
Solution: Reduce max tracked resources or disable for non-critical services
let leak_config = LeakDetectorConfig { max_tracked_resources: 10_000, // Reduce from default ..Default::default()};Problem: Performance impact from health checks
Solution: Increase health check interval
let config = EnhancedPoolConfig { health_check_interval_ms: 300_000, // 5 minutes instead of 1 ..Default::default()};Testing
Unit Test Example
#[tokio::test]async fn test_no_leak_on_error() { let pool = EnhancedConnectionPool::new(EnhancedPoolConfig::default());
// Add connection let conn = TestConnection::new(); let metadata = create_test_metadata(); pool.add_connection(conn, "testuser".to_string(), metadata).await.unwrap();
// Simulate error path let result: Result<(), anyhow::Error> = async { let _guard = pool.acquire("testuser").await?; Err(anyhow!("Simulated error")) }.await;
assert!(result.is_err());
// Verify no leak tokio::time::sleep(Duration::from_millis(10)).await; let stats = pool.stats(); assert_eq!(stats.active_connections, 0); assert_eq!(stats.idle_connections, 1);}Best Practices
- Always use guards - Never manually manage connection lifecycle
- Set appropriate timeouts - Match your workload characteristics
- Monitor metrics - Watch for unexpected leak patterns
- Use metadata - Add context to leak reports for debugging
- Test error paths - Ensure no leaks even when operations fail
- Start conservative - Use longer timeouts initially, tune down gradually
- Enable monitoring first - Run with
auto_cleanup_enabled: falseinitially - Document long operations - Use metadata to mark expected long-running tasks
See Also
- Full Implementation Report
- Comprehensive Test Suite
- API Documentation:
cargo doc --open
Support
For issues or questions:
- Check leak detector statistics
- Review pool configuration
- Enable debug logging
- Consult implementation report