Feature 01: Connection Pooling Modes
Feature 01: Connection Pooling Modes
Priority: Critical | Complexity: Medium | Phase: 1 (Foundation)
Overview
Problem Statement
Database connections are expensive resources. Each PostgreSQL connection consumes ~10MB of memory on the server side, and establishing new connections incurs TCP handshake, TLS negotiation, and authentication overhead (50-200ms per connection). Applications with many short-lived queries waste significant resources creating and destroying connections.
Current HeliosProxy behavior:
- One-to-one connection mapping (client connection = backend connection)
- No connection reuse between clients
- Connection limits hit quickly under load
Solution
Implement three connection pooling modes inspired by PgBouncer, optimized for modern workloads:
| Mode | Connection Lifecycle | Use Case |
|---|---|---|
| Session | 1:1 mapping until client disconnects | Legacy apps, prepared statements |
| Transaction | Returned to pool after COMMIT/ROLLBACK | Web apps, microservices |
| Statement | Returned after each statement | Simple queries, connection-starved envs |
Architecture
┌───────────────────────────────────────────────┐ │ CONNECTION POOL MANAGER │ │ │ Client A ────────────►│ ┌─────────────────────────────────────────┐ │ │ │ Session Pool (per-user, per-database) │ │ Client B ────────────►│ │ ┌───┐ ┌───┐ ┌───┐ │ │ │ │ │C1 │ │C2 │ │C3 │ → Backend Conns │ │ Client C ────────────►│ │ └───┘ └───┘ └───┘ │ │ │ └─────────────────────────────────────────┘ │ Client D ────────────►│ │ │ ┌─────────────────────────────────────────┐ │ Client E ────────────►│ │ Transaction Pool (shared) │ │ │ │ Active: [C4, C5] Idle: [C6, C7, C8] │ │ │ └─────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ Statement Pool (shared, aggressive) │ │ │ │ Borrowed: [C9] Available: [C10-C15] │ │ │ └─────────────────────────────────────────┘ │ └───────────────────────────────────────────────┘ │ ┌───────────────────────┼───────────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Primary │ │ Standby1 │ │ Standby2 │ └──────────┘ └──────────┘ └──────────┘Pool Architecture
pub struct ConnectionPoolManager { /// Per-database, per-user session pools session_pools: DashMap<PoolKey, SessionPool>,
/// Shared transaction pool per backend transaction_pool: TransactionPool,
/// Ultra-shared statement pool statement_pool: StatementPool,
/// Pool configuration config: PoolConfig,
/// Health checker for backend connections health_checker: Arc<HealthChecker>,}
pub struct PoolConfig { /// Default pooling mode pub default_mode: PoolingMode,
/// Maximum connections per pool pub max_pool_size: u32,
/// Minimum idle connections to maintain pub min_idle: u32,
/// Connection idle timeout before eviction pub idle_timeout: Duration,
/// Maximum connection lifetime (for rotation) pub max_lifetime: Duration,
/// Connection acquisition timeout pub acquire_timeout: Duration,
/// Server-side prepared statement handling pub prepared_statement_mode: PreparedStatementMode,}
#[derive(Clone, Copy)]pub enum PoolingMode { Session, Transaction, Statement,}
#[derive(Clone, Copy)]pub enum PreparedStatementMode { /// Disable server-side prepared statements (safest) Disable, /// Track and recreate on new connections Track, /// Use protocol-level named statements Named,}Connection Lifecycle
SESSION MODE: Client Connect → Acquire Backend → [All Queries] → Client Disconnect → Release
TRANSACTION MODE: Client Connect ──┐ │ ├── BEGIN → Acquire → [Queries] → COMMIT → Release │ ├── BEGIN → Acquire → [Queries] → ROLLBACK → Release │ └── Single Query → Acquire → Execute → Release
STATEMENT MODE: Client Connect ──┐ │ ├── Query 1 → Acquire → Execute → Release │ ├── Query 2 → Acquire → Execute → Release │ └── Query N → Acquire → Execute → ReleaseAPI Specification
Configuration (heliosproxy.toml)
[pool]# Default pooling mode: "session", "transaction", "statement"mode = "transaction"
# Pool size limitsmax_connections = 100 # Total backend connectionsmin_idle = 10 # Keep-warm connectionsmax_per_user = 20 # Per-user limit
# Timeoutsacquire_timeout = "5s" # Wait for connectionidle_timeout = "10m" # Evict idle connectionsmax_lifetime = "1h" # Rotate old connections
# Server reset between uses (transaction/statement mode)reset_query = "DISCARD ALL"
# Prepared statement handlingprepared_statements = "track" # disable, track, named
[pool.per_database]# Override settings per databaseanalytics_db.mode = "session"analytics_db.max_connections = 50Admin API Endpoints
GET /pool/stats{ "total_connections": 85, "active_connections": 42, "idle_connections": 43, "waiting_clients": 3, "pools": { "transaction": { "size": 60, "active": 35, "idle": 25, "wait_queue": 2 }, "session": { "size": 25, "active": 7, "idle": 18, "wait_queue": 1 } }, "backends": { "primary": { "connections": 50, "healthy": true }, "standby1": { "connections": 20, "healthy": true }, "standby2": { "connections": 15, "healthy": true } }}
POST /pool/drain# Gracefully drain connections for maintenance{ "backend": "standby1", "timeout": "30s" }
POST /pool/resize# Dynamically resize pool{ "max_connections": 150, "min_idle": 20 }Client Hints (SQL Comments)
-- Force session mode for this connection/*helios:pool=session*/ SELECT * FROM users;
-- Acquire dedicated connection for batch/*helios:pool=dedicated*/ BEGIN; -- All queries use same backend connection INSERT INTO logs VALUES (...); INSERT INTO logs VALUES (...);COMMIT;AI/Agent Innovations
1. Conversation Context Affinity
AI agents often maintain multi-turn conversations. Pool connections to preserve context:
-- Agent session with context preservation/*helios:agent_session=conv_12345*/SET helios.conversation_id = 'conv_12345';SELECT * FROM embeddings WHERE topic = $1;/// AI agent connection affinitypub struct AgentAffinityPool { /// Map conversation IDs to preferred connections affinity_map: DashMap<ConversationId, ConnectionId>,
/// TTL for affinity (conversation timeout) affinity_ttl: Duration,}
impl AgentAffinityPool { pub fn acquire_for_conversation(&self, conv_id: &str) -> Connection { // Return same connection for conversation continuity if let Some(conn_id) = self.affinity_map.get(conv_id) { if let Some(conn) = self.pool.get_specific(*conn_id) { return conn; } }
// New conversation, acquire any connection let conn = self.pool.acquire(); self.affinity_map.insert(conv_id.to_string(), conn.id()); conn }}2. RAG Pipeline Optimization
Embedding queries often come in batches. Batch-aware pooling:
/// Batch embedding queries on same connectionpub struct EmbeddingBatcher { batch_size: usize, batch_timeout: Duration, pending: Vec<EmbeddingRequest>,}
impl EmbeddingBatcher { pub async fn queue_embedding(&mut self, query: EmbeddingQuery) -> EmbeddingResult { self.pending.push(query);
if self.pending.len() >= self.batch_size || self.timeout_reached() { // Acquire single connection for entire batch let conn = self.pool.acquire(); let results = self.execute_batch(&conn, &self.pending).await; self.pool.release(conn); return results; }
// Wait for more queries self.wait_for_batch().await }}3. Tool Call Connection Reuse
Agentic workflows make many tool calls. Pre-warm connections:
[pool.agent_tools]# Pre-warm connections for common agent toolstools = ["database_query", "search_knowledge", "vector_lookup"]warmup_queries = [ "SELECT 1 FROM documents LIMIT 1", "SELECT 1 FROM embeddings LIMIT 1"]connections_per_tool = 54. LLM Streaming Connection Hold
Long-running LLM responses need connection stability:
/// Hold connection during streaming responsepub struct StreamingConnectionLease { connection: Connection, lease_duration: Duration, keep_alive: bool,}
impl StreamingConnectionLease { pub fn hold_for_streaming(&self) -> ConnectionGuard { // Prevent pool from reclaiming during stream ConnectionGuard::new( self.connection.clone(), LeaseType::Streaming { timeout: self.lease_duration } ) }}HeliosDB-Lite Integration
1. Branch-Aware Pooling
Separate pools per database branch:
/// Pool connections per branchpub struct BranchAwarePool { pools: DashMap<BranchName, ConnectionPool>,}
impl BranchAwarePool { pub fn acquire_for_branch(&self, branch: &str) -> Connection { self.pools .entry(branch.to_string()) .or_insert_with(|| ConnectionPool::new_for_branch(branch)) .acquire() }}2. Sync Mode Pool Segregation
Route based on transaction criticality:
/// Separate pools for different sync modespub struct SyncAwarePool { /// Connections to fully-sync standbys (for critical reads) sync_pool: ConnectionPool,
/// Connections to semi-sync standbys (for important reads) semisync_pool: ConnectionPool,
/// Connections to async standbys (for analytics, eventual reads) async_pool: ConnectionPool,}
impl SyncAwarePool { pub fn acquire_for_consistency(&self, level: ConsistencyLevel) -> Connection { match level { ConsistencyLevel::Strong => self.sync_pool.acquire(), ConsistencyLevel::Bounded => self.semisync_pool.acquire(), ConsistencyLevel::Eventual => self.async_pool.acquire(), } }}3. TWR-Aware Connection Routing
Transparent Write Routing through sync standbys:
/// Route writes through TWR-capable standbyspub fn acquire_for_write(&self, prefer_local: bool) -> Connection { if prefer_local { // Try local TWR-capable standby first if let Some(conn) = self.twr_pool.try_acquire_local() { return conn; } }
// Fall back to primary self.primary_pool.acquire()}4. In-Memory Mode Fast Path
Bypass pooling overhead for in-memory databases:
impl ConnectionPoolManager { pub fn acquire(&self, mode: PoolingMode) -> Connection { // In-memory mode uses direct connection (no pool overhead) if self.config.in_memory_mode { return self.direct_connection(); }
// Normal pooled acquisition match mode { PoolingMode::Session => self.session_pools.acquire(), PoolingMode::Transaction => self.transaction_pool.acquire(), PoolingMode::Statement => self.statement_pool.acquire(), } }}Implementation Notes
File Locations
src/proxy/├── pool/│ ├── mod.rs # Public API│ ├── manager.rs # ConnectionPoolManager│ ├── session_pool.rs # Session mode implementation│ ├── transaction_pool.rs # Transaction mode implementation│ ├── statement_pool.rs # Statement mode implementation│ ├── health.rs # Connection health checking│ └── metrics.rs # Pool metrics collectionKey Considerations
-
Prepared Statement Tracking: In transaction/statement mode, track which prepared statements exist on each connection. Recreate on new connections or use server-side named statements.
-
Session State Reset: Between uses in transaction/statement mode, run reset query (
DISCARD ALLor custom) to clear session state. -
Connection Validation: Before returning connection from pool, validate with lightweight query (
SELECT 1) or rely on TCP keepalive. -
Graceful Degradation: If pool exhausted, queue requests with timeout rather than immediate failure.
-
Metrics: Expose Prometheus metrics for pool utilization, wait times, connection churn.
Testing Strategy
#[tokio::test]async fn test_transaction_pool_reuse() { let pool = TransactionPool::new(PoolConfig { max_size: 2, .. });
// Acquire, use, release let conn1 = pool.acquire().await; conn1.execute("BEGIN").await; conn1.execute("SELECT 1").await; conn1.execute("COMMIT").await; pool.release(conn1);
// Should get same connection back let conn2 = pool.acquire().await; assert_eq!(conn1.id(), conn2.id());}
#[tokio::test]async fn test_pool_exhaustion_queueing() { let pool = TransactionPool::new(PoolConfig { max_size: 1, acquire_timeout: Duration::from_secs(5), .. });
let conn1 = pool.acquire().await;
// Second acquire should queue let acquire_future = pool.acquire();
// Release first connection tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(100)).await; pool.release(conn1); });
// Should eventually succeed let conn2 = acquire_future.await; assert!(conn2.is_ok());}Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Connection acquire latency (warm) | <100μs | p99 |
| Connection acquire latency (cold) | <50ms | p99 |
| Pool overhead per query | <10μs | average |
| Memory per pooled connection | <1KB | proxy-side |
| Connections reduction | 10-100x | vs session mode |
Related Features
- Query Caching - Cache query results to reduce connection usage
- Rate Limiting - Limit connection acquisition rate
- Circuit Breaker - Handle backend failures gracefully