Skip to content

Connection Pool Leak Prevention - Quick Start Guide

Connection Pool Leak Prevention - Quick Start Guide

Overview

This guide helps you quickly integrate the connection pool leak prevention system into your HeliosDB applications.

Basic Usage

1. Using Enhanced Connection Pool

use heliosdb_storage::connection_pool_enhanced::{
EnhancedConnectionPool, EnhancedPoolConfig
};
use heliosdb_common::ConnectionMetadata;
// Create pool with default configuration
let config = EnhancedPoolConfig::default();
let pool = EnhancedConnectionPool::new(config);
// Add a connection
let conn = MyConnection::new();
let metadata = ConnectionMetadata {
user: "myuser".to_string(),
database: Some("mydb".to_string()),
protocol: "postgres".to_string(),
remote_addr: Some("127.0.0.1:5432".to_string()),
tags: vec!["read-only".to_string()],
};
let conn_id = pool.add_connection(conn, "myuser".to_string(), metadata)
.await?;
// Acquire connection (automatically tracked for leaks)
let guard = pool.acquire("myuser").await?;
// Use connection via connection_id
let result = execute_query(guard.connection_id()).await?;
// Connection automatically returned when guard drops
drop(guard);

2. Using Leak Detector Directly

use heliosdb_common::{
LeakDetector, LeakDetectorConfig, ResourceId, ResourceInfo, ResourceType
};
// Create leak detector
let config = LeakDetectorConfig {
alert_threshold: Duration::from_secs(30),
critical_threshold: Duration::from_secs(300),
..Default::default()
};
let detector = LeakDetector::with_config(config);
// Track a resource
let id = ResourceId::new("my-resource-1");
let info = ResourceInfo::new(ResourceType::Connection, "user123")
.with_metadata("query", "SELECT * FROM users")
.with_metadata("database", "production");
detector.track(id.clone(), info)?;
// Do work...
// Release resource
detector.release(&id)?;
// Check for leaks
let leaks = detector.check_leaks();
for leak in leaks {
println!("LEAK: {}", leak);
}

3. Using Resource Guard (RAII Pattern)

use heliosdb_common::{LeakDetector, ResourceGuard, ResourceId, ResourceInfo, ResourceType};
let detector = LeakDetector::new();
{
// Resource tracked on creation
let guard = ResourceGuard::new(
&detector,
ResourceId::new("auto-resource"),
ResourceInfo::new(ResourceType::Connection, "user")
)?;
// Do work...
} // Resource automatically released when guard drops

4. Connection Health Monitoring

use heliosdb_storage::connection_health_monitor::{
EnhancedHealthMonitor, HealthMonitorConfig
};
// Create health monitor
let config = HealthMonitorConfig::default();
let leak_detector = Arc::new(LeakDetector::new());
let monitor = EnhancedHealthMonitor::new(config, leak_detector);
// Monitor a connection
monitor.monitor_connection(
"conn-123".to_string(),
Arc::new(my_connection)
).await;
// Start background monitoring
monitor.start_background_monitoring();
// Check health
if let Some(health) = monitor.check_connection("conn-123").await {
println!("Status: {:?}", health.status);
println!("Is leaked: {}", health.is_leaked);
}
// Get all unhealthy connections
let unhealthy = monitor.get_unhealthy_connections();
for conn in unhealthy {
println!("Unhealthy: {} - {:?}", conn.connection_id, conn.status);
}

Configuration Examples

Development Configuration

EnhancedPoolConfig {
max_connections: 100,
max_per_user: 10,
acquire_timeout_ms: 5_000,
idle_timeout_ms: 60_000, // 1 minute
max_connection_age_ms: 600_000, // 10 minutes
health_check_enabled: true,
health_check_interval_ms: 30_000,
leak_detection_enabled: true,
leak_detection_threshold_ms: 10_000, // 10 seconds
auto_cleanup_enabled: true,
cleanup_interval_ms: 30_000,
}

Staging Configuration

EnhancedPoolConfig {
max_connections: 1_000,
max_per_user: 50,
acquire_timeout_ms: 10_000,
idle_timeout_ms: 180_000, // 3 minutes
max_connection_age_ms: 1_800_000, // 30 minutes
health_check_enabled: true,
health_check_interval_ms: 60_000,
leak_detection_enabled: true,
leak_detection_threshold_ms: 15_000,
auto_cleanup_enabled: true,
cleanup_interval_ms: 60_000,
}

Production Configuration

EnhancedPoolConfig {
max_connections: 10_000,
max_per_user: 100,
acquire_timeout_ms: 30_000,
idle_timeout_ms: 300_000, // 5 minutes
max_connection_age_ms: 3_600_000, // 1 hour
health_check_enabled: true,
health_check_interval_ms: 60_000,
leak_detection_enabled: true,
leak_detection_threshold_ms: 30_000,
auto_cleanup_enabled: true,
cleanup_interval_ms: 60_000,
}

Common Patterns

Pattern 1: Query Execution with Leak Protection

async fn execute_safe_query(
pool: &EnhancedConnectionPool<DbConnection>,
query: &str
) -> Result<QueryResult> {
// Guard ensures connection is returned even on error
let guard = pool.acquire("myuser").await?;
// Execute query
let result = timeout(
Duration::from_secs(30),
execute_query_internal(guard.connection_id(), query)
).await??;
Ok(result)
}

Pattern 2: Transaction with Auto-Rollback

async fn execute_transaction(
pool: &EnhancedConnectionPool<DbConnection>,
) -> Result<()> {
let guard = pool.acquire("myuser").await?;
// Begin transaction
begin_transaction(guard.connection_id()).await?;
// If any operation fails, guard drop ensures connection returns
// and transaction can be rolled back
let result = perform_operations(guard.connection_id()).await;
match result {
Ok(_) => commit_transaction(guard.connection_id()).await?,
Err(e) => {
rollback_transaction(guard.connection_id()).await?;
return Err(e);
}
}
Ok(())
}

Pattern 3: Batch Operations with Monitoring

async fn batch_process_with_monitoring(
pool: &EnhancedConnectionPool<DbConnection>,
items: Vec<Item>,
) -> Result<()> {
let guard = pool.acquire("batch_user").await?;
for (i, item) in items.iter().enumerate() {
// Process item
process_item(guard.connection_id(), item).await?;
// Periodically check for leaks
if i % 100 == 0 {
let stats = pool.stats();
if stats.leak_stats.current_tracked > expected_count {
warn!("Potential leak detected during batch processing");
}
}
}
Ok(())
}

Pattern 4: Long-Running Operation with Heartbeat

async fn long_running_operation(
pool: &EnhancedConnectionPool<DbConnection>,
leak_detector: &LeakDetector,
) -> Result<()> {
let guard = pool.acquire("long_op_user").await?;
let resource_id = ResourceId::new(guard.connection_id());
// Track as long-running operation
let info = ResourceInfo::new(ResourceType::Connection, "long_op_user")
.with_metadata("operation", "data_migration")
.with_metadata("expected_duration_sec", "3600");
leak_detector.track(resource_id.clone(), info)?;
// Perform operation with periodic heartbeat
let result = tokio::select! {
res = perform_long_operation(guard.connection_id()) => res,
_ = tokio::time::sleep(Duration::from_secs(3600)) => {
Err(anyhow!("Operation timeout"))
}
};
// Release tracking
leak_detector.release(&resource_id)?;
result
}

Monitoring & Debugging

Check Pool Statistics

let stats = pool.stats();
println!("Total connections: {}", stats.total_connections);
println!("Active: {}", stats.active_connections);
println!("Idle: {}", stats.idle_connections);
println!("Leaks detected: {}", stats.leak_stats.leaks_detected);
println!("Current tracked: {}", stats.leak_stats.current_tracked);

Get Leak Reports

let leaks = pool.leak_detector.check_leaks();
for leak in leaks {
eprintln!("LEAK DETECTED:");
eprintln!(" Resource: {}", leak.resource_id.as_str());
eprintln!(" Type: {}", leak.resource_type);
eprintln!(" Age: {:?}", leak.age);
eprintln!(" Acquired by: {}", leak.acquired_by);
eprintln!(" Metadata: {:?}", leak.metadata);
}

Monitor Health Status

let all_health = monitor.get_all_health();
for health in all_health {
println!("{}: {:?} (leaked: {})",
health.connection_id,
health.status,
health.is_leaked
);
}

Troubleshooting

Problem: Too many leak alerts

Solution: Increase leak_detection_threshold_ms or mark long-running operations explicitly

let config = EnhancedPoolConfig {
leak_detection_threshold_ms: 60_000, // Increase to 1 minute
..Default::default()
};

Problem: Connections cleaned up too aggressively

Solution: Increase idle timeout and max age

let config = EnhancedPoolConfig {
idle_timeout_ms: 600_000, // 10 minutes
max_connection_age_ms: 7_200_000, // 2 hours
..Default::default()
};

Problem: Memory usage from leak detector

Solution: Reduce max tracked resources or disable for non-critical services

let leak_config = LeakDetectorConfig {
max_tracked_resources: 10_000, // Reduce from default
..Default::default()
};

Problem: Performance impact from health checks

Solution: Increase health check interval

let config = EnhancedPoolConfig {
health_check_interval_ms: 300_000, // 5 minutes instead of 1
..Default::default()
};

Testing

Unit Test Example

#[tokio::test]
async fn test_no_leak_on_error() {
let pool = EnhancedConnectionPool::new(EnhancedPoolConfig::default());
// Add connection
let conn = TestConnection::new();
let metadata = create_test_metadata();
pool.add_connection(conn, "testuser".to_string(), metadata).await.unwrap();
// Simulate error path
let result: Result<(), anyhow::Error> = async {
let _guard = pool.acquire("testuser").await?;
Err(anyhow!("Simulated error"))
}.await;
assert!(result.is_err());
// Verify no leak
tokio::time::sleep(Duration::from_millis(10)).await;
let stats = pool.stats();
assert_eq!(stats.active_connections, 0);
assert_eq!(stats.idle_connections, 1);
}

Best Practices

  1. Always use guards - Never manually manage connection lifecycle
  2. Set appropriate timeouts - Match your workload characteristics
  3. Monitor metrics - Watch for unexpected leak patterns
  4. Use metadata - Add context to leak reports for debugging
  5. Test error paths - Ensure no leaks even when operations fail
  6. Start conservative - Use longer timeouts initially, tune down gradually
  7. Enable monitoring first - Run with auto_cleanup_enabled: false initially
  8. Document long operations - Use metadata to mark expected long-running tasks

See Also

Support

For issues or questions:

  1. Check leak detector statistics
  2. Review pool configuration
  3. Enable debug logging
  4. Consult implementation report