Cache & Connection Pooling - Quick Fix Guide
Cache & Connection Pooling - Quick Fix Guide
For Developers: Fast Reference for Critical Issues
🚨 Critical Issues - Fix First
Issue #1: Race Condition in Cache Eviction
File: heliosdb-cache/src/cache_manager.rs:352
Problem:
// ❌ CURRENT (BROKEN)async fn evict_entries(&self, needed_bytes: usize) -> Result<()> { let keys = self.storage.keys().await?; // NOT ATOMIC let mut entries = Vec::new(); for key in keys { if let Some(entry) = self.storage.get(&key).await? { // RACE HERE entries.push(entry); } } // ... eviction}Fix:
// FIXEDasync fn evict_entries(&self, needed_bytes: usize) -> Result<()> { // Acquire eviction lock let _eviction_guard = self.eviction_lock.lock().await;
// Snapshot entries atomically let entries_snapshot = { let keys = self.storage.keys().await?; let mut entries = Vec::new(); for key in keys { if let Some(entry) = self.storage.get(&key).await? { entries.push(entry); } } entries };
// Calculate eviction scores let mut scored: Vec<_> = entries_snapshot .into_iter() .map(|e| { let score = if self.config.enable_ml_eviction { e.metadata.eviction_priority() } else { let age = Utc::now() .signed_duration_since(e.metadata.last_accessed) .num_seconds() as f64; -age }; (e, score) }) .collect();
scored.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
// Evict until we have enough space let mut freed = 0; for (entry, _) in scored { if freed >= needed_bytes { break; } freed += entry.metadata.size_bytes; self.storage.remove(&entry.key).await?; }
Ok(())}Add to struct:
pub struct CacheManager { // ... existing fields eviction_lock: Arc<tokio::sync::Mutex<()>>, // ADD THIS}Issue #2: Memory Leak in Stampede Protection
File: heliosdb-cache/src/stampede_protection.rs:56-80
Problem:
// ❌ CURRENT (LEAKS MEMORY)pub async fn try_acquire(&self, key: &QueryKey) -> Result<Option<ComputeGuard>> { if let Some(semaphore) = self.in_flight.get(key) { match tokio::time::timeout(self.timeout, semaphore.acquire()).await { Err(_) => { // TIMEOUT - but semaphore stays in map forever! return Ok(Some(ComputeGuard::new(key.clone(), None))); } // ... } } // ...}Fix:
// FIXEDpub async fn try_acquire(&self, key: &QueryKey) -> Result<Option<ComputeGuard>> { if let Some(semaphore) = self.in_flight.get(key) { match tokio::time::timeout(self.timeout, semaphore.acquire()).await { Ok(Ok(_permit)) => return Ok(None), Ok(Err(_)) => return Ok(None), Err(_) => { // CLEANUP ON TIMEOUT warn!("Stampede timeout for key: {}, cleaning up", key.query); self.in_flight.remove(key); // REMOVE FROM MAP
// Create new guard let semaphore = Arc::new(Semaphore::new(0)); self.in_flight.insert(key.clone(), semaphore.clone()); return Ok(Some(ComputeGuard::new( key.clone(), Some((self.in_flight.clone(), semaphore)), ))); } } }
// Check capacity if self.in_flight.len() >= self.max_in_flight { return Err(CacheError::Overload("Too many in-flight requests".to_string())); }
// This request gets to compute let semaphore = Arc::new(Semaphore::new(0)); self.in_flight.insert(key.clone(), semaphore.clone());
Ok(Some(ComputeGuard::new( key.clone(), Some((self.in_flight.clone(), semaphore)), )))}Issue #3: Missing Connection Health Checks
File: heliosdb-network/src/connection_pool.rs:244
Problem:
// ❌ CURRENT (DOESN'T CHECK IF CONNECTION ALIVE)pub async fn health_check(&self) -> bool { let stream = self.stream.lock().await; stream.peer_addr().is_ok() // Only checks if socket open}Fix:
// FIXEDpub async fn health_check(&self) -> bool { // Send ping and wait for pong match timeout( Duration::from_secs(5), self.send_ping_pong() ).await { Ok(Ok(())) => true, _ => false, }}
async fn send_ping_pong(&self) -> Result<()> { // Send ping message let ping = HidbMessage::Ping; self.send_message(ping).await?;
// Wait for pong match timeout( Duration::from_secs(5), self.receive_message() ).await { Ok(Ok(HidbMessage::Pong)) => Ok(()), _ => Err(HeliosError::Network("Health check failed".to_string())), }}Add to protocol:
// In heliosdb-network/src/protocol.rs#[derive(Debug, Clone, Serialize, Deserialize)]pub enum HidbMessage { // ... existing variants Ping, // ADD THIS Pong, // ADD THIS}Issue #4: Connection Expiration Not Enforced
File: heliosdb-network/src/connection_pool.rs:287-300
Problem:
// ❌ CURRENT (MISSING)impl EnhancedConnectionPool { pub fn new(addr: String, config: ConnectionPoolConfig) -> Arc<Self> { let pool = Arc::new(Self { /* ... */ });
// Start background tasks // ❌ NO EXPIRATION TASK!
pool }}Fix:
// FIXEDimpl EnhancedConnectionPool { pub fn new(addr: String, config: ConnectionPoolConfig) -> Arc<Self> { let pool = Arc::new(Self { addr, config: config.clone(), pool: Arc::new(RwLock::new(VecDeque::new())), semaphore: Arc::new(Semaphore::new(config.max_connections)), connection_id_counter: Arc::new(AtomicU64::new(0)), metrics: Arc::new(RwLock::new(PoolMetrics::default())), shutdown: Arc::new(AtomicBool::new(false)), });
// START EXPIRATION TASK pool.clone().start_expiration_task();
// START HEALTH CHECK TASK pool.clone().start_health_check_task();
pool }
fn start_expiration_task(self: Arc<Self>) { tokio::spawn(async move { let mut interval = tokio::time::interval( Duration::from_secs(self.config.health_check_interval_secs) );
loop { interval.tick().await;
if self.shutdown.load(Ordering::Relaxed) { break; }
// Remove expired connections let mut pool = self.pool.write().await; let before_count = pool.len();
pool.retain(|conn| { !conn.meta.is_expired(&self.config) });
let removed = before_count - pool.len(); if removed > 0 { info!("Removed {} expired connections", removed); } } }); }
fn start_health_check_task(self: Arc<Self>) { tokio::spawn(async move { let mut interval = tokio::time::interval( Duration::from_secs(self.config.health_check_interval_secs) );
loop { interval.tick().await;
if self.shutdown.load(Ordering::Relaxed) { break; }
// Health check all idle connections let mut pool = self.pool.write().await; let mut unhealthy = Vec::new();
for (idx, conn) in pool.iter().enumerate() { if !conn.meta.in_use { if !conn.client.health_check().await { unhealthy.push(idx); } } }
// Remove unhealthy connections (in reverse order) for idx in unhealthy.into_iter().rev() { pool.remove(idx); warn!("Removed unhealthy connection at index {}", idx); } } }); }}⚠ Major Issues - Fix for Phase 2
Issue #5: Add Eviction Policy Abstraction
Create new file: heliosdb-cache/src/eviction.rs
use crate::types::CacheEntry;use chrono::Utc;
/// Trait for cache eviction policiespub trait EvictionPolicy: Send + Sync { /// Calculate eviction score (lower = more likely to evict) fn calculate_score(&self, entry: &CacheEntry) -> f64;
/// Policy name fn name(&self) -> &str;}
/// LRU eviction policypub struct LruPolicy;
impl EvictionPolicy for LruPolicy { fn calculate_score(&self, entry: &CacheEntry) -> f64 { let age = Utc::now() .signed_duration_since(entry.metadata.last_accessed) .num_seconds() as f64; -age // Older = lower score = evict first }
fn name(&self) -> &str { "LRU" }}
/// LFU eviction policypub struct LfuPolicy;
impl EvictionPolicy for LfuPolicy { fn calculate_score(&self, entry: &CacheEntry) -> f64 { -(entry.metadata.access_count as f64) }
fn name(&self) -> &str { "LFU" }}
/// Hybrid LRU+LFU policy (for Phase 2)pub struct HybridPolicy { lru_weight: f64, lfu_weight: f64,}
impl HybridPolicy { pub fn new(lru_weight: f64, lfu_weight: f64) -> Self { assert!((lru_weight + lfu_weight - 1.0).abs() < 0.01, "Weights must sum to 1.0"); Self { lru_weight, lfu_weight } }
pub fn default_balanced() -> Self { Self { lru_weight: 0.5, lfu_weight: 0.5 } }}
impl EvictionPolicy for HybridPolicy { fn calculate_score(&self, entry: &CacheEntry) -> f64 { let lru_score = LruPolicy.calculate_score(entry); let lfu_score = LfuPolicy.calculate_score(entry); self.lru_weight * lru_score + self.lfu_weight * lfu_score }
fn name(&self) -> &str { "Hybrid LRU+LFU" }}Update CacheConfig:
use std::sync::Arc;
pub struct CacheConfig { // ... existing fields
/// Eviction policy (default: LRU) pub eviction_policy: Arc<dyn EvictionPolicy>,}
impl Default for CacheConfig { fn default() -> Self { Self { // ... existing defaults eviction_policy: Arc::new(LruPolicy), } }}Update evict_entries:
async fn evict_entries(&self, needed_bytes: usize) -> Result<()> { // ... snapshot entries ...
// Use configured eviction policy let mut scored: Vec<_> = entries_snapshot .into_iter() .map(|e| { let score = self.config.eviction_policy.calculate_score(&e); (e, score) }) .collect();
// ... rest of eviction logic}Issue #6: Fix Affinity Map Unbounded Growth
File: heliosdb-pooling/src/optimizer.rs:194-200
Problem:
// ❌ CURRENT (GROWS FOREVER)pub fn record_query_execution(...) { let mut affinity = self.affinity_map.write(); affinity .entry(fingerprint.clone()) .or_insert_with(Vec::new) .push(connection_id); // NEVER REMOVED}Fix:
// FIXEDconst MAX_AFFINITY_CONNECTIONS: usize = 10;
pub fn record_query_execution( &self, connection_id: Uuid, fingerprint: QueryFingerprint, execution_time_ms: f64,) { // Update affinity map with size limit { let mut affinity = self.affinity_map.write(); affinity .entry(fingerprint.clone()) .and_modify(|connections| { connections.push(connection_id);
// LIMIT SIZE (LRU eviction for affinity) if connections.len() > MAX_AFFINITY_CONNECTIONS { connections.remove(0); // Remove oldest } }) .or_insert_with(|| vec![connection_id]); }
// ... rest of logic}🧪 Add Critical Tests
Test #1: Concurrent Eviction Test
File: heliosdb-cache/tests/concurrent_eviction_test.rs
#[tokio::test]async fn test_concurrent_eviction_no_race() { let backend = Arc::new(InMemoryBackend::new(1000)); let mut config = CacheConfig::default(); config.max_size_bytes = 1000; config.min_result_size = 0; config.min_execution_time_ms = 0.0;
let manager = Arc::new(CacheManager::new(backend, config));
// Spawn 100 concurrent tasks trying to fill cache let mut handles = vec![]; for i in 0..100 { let mgr = manager.clone(); let handle = tokio::spawn(async move { let key = QueryKey::new(&format!("SELECT {}", i), &[], "test"); let data = vec![i as u8; 100]; mgr.put(key, data, vec![], 10.0).await.unwrap(); }); handles.push(handle); }
// Wait for all for h in handles { h.await.unwrap(); }
// Verify cache size is within bounds let stats = manager.stats(); assert!(stats.size_bytes <= 1000, "Cache size exceeded limit: {}", stats.size_bytes);
// Verify evictions happened assert!(stats.evictions > 0, "No evictions occurred");}Test #2: Stampede Protection Memory Test
#[tokio::test]async fn test_stampede_no_memory_leak() { let protection = Arc::new(StampedeProtection::with_config( Duration::from_millis(50), // Short timeout 1000 ));
let key = QueryKey::new("SELECT * FROM test", &[], "test");
// Trigger many timeouts for _ in 0..100 { let prot = protection.clone(); let k = key.clone();
tokio::spawn(async move { // First acquire - will timeout let _guard = prot.try_acquire(&k).await.unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; // Guard dropped here }); }
tokio::time::sleep(Duration::from_secs(1)).await;
// Verify in_flight map is clean assert_eq!(protection.in_flight_count(), 0, "Memory leak: in_flight not cleaned");}⏱ Time Estimates
| Issue | Priority | Time | Can Parallelize? |
|---|---|---|---|
| #1 Race Condition | P0 | 1 day | No |
| #2 Memory Leak | P0 | 0.5 days | No |
| #3 Health Checks | P0 | 1 day | Yes |
| #4 Expiration | P0 | 1 day | Yes |
| #5 Eviction Policy | P1 | 3 days | Yes |
| #6 Affinity Map | P1 | 0.5 days | Yes |
Critical Path: Issues #1 → #2 → Tests = 2.5 days Parallel Work: Issues #3, #4 = 1 day Phase 2 Prep: Issues #5, #6 = 3.5 days
Total Time: ~7 days for all critical + Phase 2 blockers
Quick Start
# 1. Fix critical issues first (P0)git checkout -b fix/critical-cache-issues
# Fix race condition (Issue #1)vim heliosdb-cache/src/cache_manager.rs
# Fix memory leak (Issue #2)vim heliosdb-cache/src/stampede_protection.rs
# Run testscargo test -p heliosdb-cache
# 2. Fix connection pool issues (P0)git checkout -b fix/connection-pool-health
# Fix health checks (Issue #3)vim heliosdb-network/src/connection_pool.rsvim heliosdb-network/src/protocol.rs
# Fix expiration (Issue #4)vim heliosdb-network/src/connection_pool.rs
# Run testscargo test -p heliosdb-network
# 3. Add critical testsmkdir -p heliosdb-cache/testsvim heliosdb-cache/tests/concurrent_eviction_test.rsvim heliosdb-cache/tests/stampede_memory_test.rs
cargo test -p heliosdb-cache -- --test-threads=1Verification Checklist
After applying fixes, verify:
-
cargo test -p heliosdb-cachepasses -
cargo test -p heliosdb-networkpasses -
cargo test -p heliosdb-poolingpasses - No warnings about unused code
-
cargo clippyhas no errors - Concurrent eviction test passes 100 times
- Stampede memory test shows 0 leaks
- Connection pool health checks work
- Connections expire correctly
For detailed analysis, see: CACHE_CONNECTION_POOLING_CODE_REVIEW_REPORT.md