Skip to content

Feature 01: Connection Pooling Modes

Feature 01: Connection Pooling Modes

Priority: Critical | Complexity: Medium | Phase: 1 (Foundation)


Overview

Problem Statement

Database connections are expensive resources. Each PostgreSQL connection consumes ~10MB of memory on the server side, and establishing new connections incurs TCP handshake, TLS negotiation, and authentication overhead (50-200ms per connection). Applications with many short-lived queries waste significant resources creating and destroying connections.

Current HeliosProxy behavior:

  • One-to-one connection mapping (client connection = backend connection)
  • No connection reuse between clients
  • Connection limits hit quickly under load

Solution

Implement three connection pooling modes inspired by PgBouncer, optimized for modern workloads:

ModeConnection LifecycleUse Case
Session1:1 mapping until client disconnectsLegacy apps, prepared statements
TransactionReturned to pool after COMMIT/ROLLBACKWeb apps, microservices
StatementReturned after each statementSimple queries, connection-starved envs

Architecture

┌───────────────────────────────────────────────┐
│ CONNECTION POOL MANAGER │
│ │
Client A ────────────►│ ┌─────────────────────────────────────────┐ │
│ │ Session Pool (per-user, per-database) │ │
Client B ────────────►│ │ ┌───┐ ┌───┐ ┌───┐ │ │
│ │ │C1 │ │C2 │ │C3 │ → Backend Conns │ │
Client C ────────────►│ │ └───┘ └───┘ └───┘ │ │
│ └─────────────────────────────────────────┘ │
Client D ────────────►│ │
│ ┌─────────────────────────────────────────┐ │
Client E ────────────►│ │ Transaction Pool (shared) │ │
│ │ Active: [C4, C5] Idle: [C6, C7, C8] │ │
│ └─────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Statement Pool (shared, aggressive) │ │
│ │ Borrowed: [C9] Available: [C10-C15] │ │
│ └─────────────────────────────────────────┘ │
└───────────────────────────────────────────────┘
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Primary │ │ Standby1 │ │ Standby2 │
└──────────┘ └──────────┘ └──────────┘

Pool Architecture

pub struct ConnectionPoolManager {
/// Per-database, per-user session pools
session_pools: DashMap<PoolKey, SessionPool>,
/// Shared transaction pool per backend
transaction_pool: TransactionPool,
/// Ultra-shared statement pool
statement_pool: StatementPool,
/// Pool configuration
config: PoolConfig,
/// Health checker for backend connections
health_checker: Arc<HealthChecker>,
}
pub struct PoolConfig {
/// Default pooling mode
pub default_mode: PoolingMode,
/// Maximum connections per pool
pub max_pool_size: u32,
/// Minimum idle connections to maintain
pub min_idle: u32,
/// Connection idle timeout before eviction
pub idle_timeout: Duration,
/// Maximum connection lifetime (for rotation)
pub max_lifetime: Duration,
/// Connection acquisition timeout
pub acquire_timeout: Duration,
/// Server-side prepared statement handling
pub prepared_statement_mode: PreparedStatementMode,
}
#[derive(Clone, Copy)]
pub enum PoolingMode {
Session,
Transaction,
Statement,
}
#[derive(Clone, Copy)]
pub enum PreparedStatementMode {
/// Disable server-side prepared statements (safest)
Disable,
/// Track and recreate on new connections
Track,
/// Use protocol-level named statements
Named,
}

Connection Lifecycle

SESSION MODE:
Client Connect → Acquire Backend → [All Queries] → Client Disconnect → Release
TRANSACTION MODE:
Client Connect ──┐
├── BEGIN → Acquire → [Queries] → COMMIT → Release
├── BEGIN → Acquire → [Queries] → ROLLBACK → Release
└── Single Query → Acquire → Execute → Release
STATEMENT MODE:
Client Connect ──┐
├── Query 1 → Acquire → Execute → Release
├── Query 2 → Acquire → Execute → Release
└── Query N → Acquire → Execute → Release

API Specification

Configuration (heliosproxy.toml)

[pool]
# Default pooling mode: "session", "transaction", "statement"
mode = "transaction"
# Pool size limits
max_connections = 100 # Total backend connections
min_idle = 10 # Keep-warm connections
max_per_user = 20 # Per-user limit
# Timeouts
acquire_timeout = "5s" # Wait for connection
idle_timeout = "10m" # Evict idle connections
max_lifetime = "1h" # Rotate old connections
# Server reset between uses (transaction/statement mode)
reset_query = "DISCARD ALL"
# Prepared statement handling
prepared_statements = "track" # disable, track, named
[pool.per_database]
# Override settings per database
analytics_db.mode = "session"
analytics_db.max_connections = 50

Admin API Endpoints

GET /pool/stats
{
"total_connections": 85,
"active_connections": 42,
"idle_connections": 43,
"waiting_clients": 3,
"pools": {
"transaction": {
"size": 60,
"active": 35,
"idle": 25,
"wait_queue": 2
},
"session": {
"size": 25,
"active": 7,
"idle": 18,
"wait_queue": 1
}
},
"backends": {
"primary": { "connections": 50, "healthy": true },
"standby1": { "connections": 20, "healthy": true },
"standby2": { "connections": 15, "healthy": true }
}
}
POST /pool/drain
# Gracefully drain connections for maintenance
{ "backend": "standby1", "timeout": "30s" }
POST /pool/resize
# Dynamically resize pool
{ "max_connections": 150, "min_idle": 20 }

Client Hints (SQL Comments)

-- Force session mode for this connection
/*helios:pool=session*/ SELECT * FROM users;
-- Acquire dedicated connection for batch
/*helios:pool=dedicated*/ BEGIN;
-- All queries use same backend connection
INSERT INTO logs VALUES (...);
INSERT INTO logs VALUES (...);
COMMIT;

AI/Agent Innovations

1. Conversation Context Affinity

AI agents often maintain multi-turn conversations. Pool connections to preserve context:

-- Agent session with context preservation
/*helios:agent_session=conv_12345*/
SET helios.conversation_id = 'conv_12345';
SELECT * FROM embeddings WHERE topic = $1;
/// AI agent connection affinity
pub struct AgentAffinityPool {
/// Map conversation IDs to preferred connections
affinity_map: DashMap<ConversationId, ConnectionId>,
/// TTL for affinity (conversation timeout)
affinity_ttl: Duration,
}
impl AgentAffinityPool {
pub fn acquire_for_conversation(&self, conv_id: &str) -> Connection {
// Return same connection for conversation continuity
if let Some(conn_id) = self.affinity_map.get(conv_id) {
if let Some(conn) = self.pool.get_specific(*conn_id) {
return conn;
}
}
// New conversation, acquire any connection
let conn = self.pool.acquire();
self.affinity_map.insert(conv_id.to_string(), conn.id());
conn
}
}

2. RAG Pipeline Optimization

Embedding queries often come in batches. Batch-aware pooling:

/// Batch embedding queries on same connection
pub struct EmbeddingBatcher {
batch_size: usize,
batch_timeout: Duration,
pending: Vec<EmbeddingRequest>,
}
impl EmbeddingBatcher {
pub async fn queue_embedding(&mut self, query: EmbeddingQuery) -> EmbeddingResult {
self.pending.push(query);
if self.pending.len() >= self.batch_size || self.timeout_reached() {
// Acquire single connection for entire batch
let conn = self.pool.acquire();
let results = self.execute_batch(&conn, &self.pending).await;
self.pool.release(conn);
return results;
}
// Wait for more queries
self.wait_for_batch().await
}
}

3. Tool Call Connection Reuse

Agentic workflows make many tool calls. Pre-warm connections:

[pool.agent_tools]
# Pre-warm connections for common agent tools
tools = ["database_query", "search_knowledge", "vector_lookup"]
warmup_queries = [
"SELECT 1 FROM documents LIMIT 1",
"SELECT 1 FROM embeddings LIMIT 1"
]
connections_per_tool = 5

4. LLM Streaming Connection Hold

Long-running LLM responses need connection stability:

/// Hold connection during streaming response
pub struct StreamingConnectionLease {
connection: Connection,
lease_duration: Duration,
keep_alive: bool,
}
impl StreamingConnectionLease {
pub fn hold_for_streaming(&self) -> ConnectionGuard {
// Prevent pool from reclaiming during stream
ConnectionGuard::new(
self.connection.clone(),
LeaseType::Streaming { timeout: self.lease_duration }
)
}
}

HeliosDB-Lite Integration

1. Branch-Aware Pooling

Separate pools per database branch:

/// Pool connections per branch
pub struct BranchAwarePool {
pools: DashMap<BranchName, ConnectionPool>,
}
impl BranchAwarePool {
pub fn acquire_for_branch(&self, branch: &str) -> Connection {
self.pools
.entry(branch.to_string())
.or_insert_with(|| ConnectionPool::new_for_branch(branch))
.acquire()
}
}

2. Sync Mode Pool Segregation

Route based on transaction criticality:

/// Separate pools for different sync modes
pub struct SyncAwarePool {
/// Connections to fully-sync standbys (for critical reads)
sync_pool: ConnectionPool,
/// Connections to semi-sync standbys (for important reads)
semisync_pool: ConnectionPool,
/// Connections to async standbys (for analytics, eventual reads)
async_pool: ConnectionPool,
}
impl SyncAwarePool {
pub fn acquire_for_consistency(&self, level: ConsistencyLevel) -> Connection {
match level {
ConsistencyLevel::Strong => self.sync_pool.acquire(),
ConsistencyLevel::Bounded => self.semisync_pool.acquire(),
ConsistencyLevel::Eventual => self.async_pool.acquire(),
}
}
}

3. TWR-Aware Connection Routing

Transparent Write Routing through sync standbys:

/// Route writes through TWR-capable standbys
pub fn acquire_for_write(&self, prefer_local: bool) -> Connection {
if prefer_local {
// Try local TWR-capable standby first
if let Some(conn) = self.twr_pool.try_acquire_local() {
return conn;
}
}
// Fall back to primary
self.primary_pool.acquire()
}

4. In-Memory Mode Fast Path

Bypass pooling overhead for in-memory databases:

impl ConnectionPoolManager {
pub fn acquire(&self, mode: PoolingMode) -> Connection {
// In-memory mode uses direct connection (no pool overhead)
if self.config.in_memory_mode {
return self.direct_connection();
}
// Normal pooled acquisition
match mode {
PoolingMode::Session => self.session_pools.acquire(),
PoolingMode::Transaction => self.transaction_pool.acquire(),
PoolingMode::Statement => self.statement_pool.acquire(),
}
}
}

Implementation Notes

File Locations

src/proxy/
├── pool/
│ ├── mod.rs # Public API
│ ├── manager.rs # ConnectionPoolManager
│ ├── session_pool.rs # Session mode implementation
│ ├── transaction_pool.rs # Transaction mode implementation
│ ├── statement_pool.rs # Statement mode implementation
│ ├── health.rs # Connection health checking
│ └── metrics.rs # Pool metrics collection

Key Considerations

  1. Prepared Statement Tracking: In transaction/statement mode, track which prepared statements exist on each connection. Recreate on new connections or use server-side named statements.

  2. Session State Reset: Between uses in transaction/statement mode, run reset query (DISCARD ALL or custom) to clear session state.

  3. Connection Validation: Before returning connection from pool, validate with lightweight query (SELECT 1) or rely on TCP keepalive.

  4. Graceful Degradation: If pool exhausted, queue requests with timeout rather than immediate failure.

  5. Metrics: Expose Prometheus metrics for pool utilization, wait times, connection churn.

Testing Strategy

#[tokio::test]
async fn test_transaction_pool_reuse() {
let pool = TransactionPool::new(PoolConfig { max_size: 2, .. });
// Acquire, use, release
let conn1 = pool.acquire().await;
conn1.execute("BEGIN").await;
conn1.execute("SELECT 1").await;
conn1.execute("COMMIT").await;
pool.release(conn1);
// Should get same connection back
let conn2 = pool.acquire().await;
assert_eq!(conn1.id(), conn2.id());
}
#[tokio::test]
async fn test_pool_exhaustion_queueing() {
let pool = TransactionPool::new(PoolConfig { max_size: 1, acquire_timeout: Duration::from_secs(5), .. });
let conn1 = pool.acquire().await;
// Second acquire should queue
let acquire_future = pool.acquire();
// Release first connection
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
pool.release(conn1);
});
// Should eventually succeed
let conn2 = acquire_future.await;
assert!(conn2.is_ok());
}

Performance Targets

MetricTargetMeasurement
Connection acquire latency (warm)<100μsp99
Connection acquire latency (cold)<50msp99
Pool overhead per query<10μsaverage
Memory per pooled connection<1KBproxy-side
Connections reduction10-100xvs session mode