Skip to content

Cache & Connection Pooling - Quick Fix Guide

Cache & Connection Pooling - Quick Fix Guide

For Developers: Fast Reference for Critical Issues


🚨 Critical Issues - Fix First

Issue #1: Race Condition in Cache Eviction

File: heliosdb-cache/src/cache_manager.rs:352

Problem:

// ❌ CURRENT (BROKEN)
async fn evict_entries(&self, needed_bytes: usize) -> Result<()> {
let keys = self.storage.keys().await?; // NOT ATOMIC
let mut entries = Vec::new();
for key in keys {
if let Some(entry) = self.storage.get(&key).await? { // RACE HERE
entries.push(entry);
}
}
// ... eviction
}

Fix:

// FIXED
async fn evict_entries(&self, needed_bytes: usize) -> Result<()> {
// Acquire eviction lock
let _eviction_guard = self.eviction_lock.lock().await;
// Snapshot entries atomically
let entries_snapshot = {
let keys = self.storage.keys().await?;
let mut entries = Vec::new();
for key in keys {
if let Some(entry) = self.storage.get(&key).await? {
entries.push(entry);
}
}
entries
};
// Calculate eviction scores
let mut scored: Vec<_> = entries_snapshot
.into_iter()
.map(|e| {
let score = if self.config.enable_ml_eviction {
e.metadata.eviction_priority()
} else {
let age = Utc::now()
.signed_duration_since(e.metadata.last_accessed)
.num_seconds() as f64;
-age
};
(e, score)
})
.collect();
scored.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
// Evict until we have enough space
let mut freed = 0;
for (entry, _) in scored {
if freed >= needed_bytes { break; }
freed += entry.metadata.size_bytes;
self.storage.remove(&entry.key).await?;
}
Ok(())
}

Add to struct:

pub struct CacheManager {
// ... existing fields
eviction_lock: Arc<tokio::sync::Mutex<()>>, // ADD THIS
}

Issue #2: Memory Leak in Stampede Protection

File: heliosdb-cache/src/stampede_protection.rs:56-80

Problem:

// ❌ CURRENT (LEAKS MEMORY)
pub async fn try_acquire(&self, key: &QueryKey) -> Result<Option<ComputeGuard>> {
if let Some(semaphore) = self.in_flight.get(key) {
match tokio::time::timeout(self.timeout, semaphore.acquire()).await {
Err(_) => {
// TIMEOUT - but semaphore stays in map forever!
return Ok(Some(ComputeGuard::new(key.clone(), None)));
}
// ...
}
}
// ...
}

Fix:

// FIXED
pub async fn try_acquire(&self, key: &QueryKey) -> Result<Option<ComputeGuard>> {
if let Some(semaphore) = self.in_flight.get(key) {
match tokio::time::timeout(self.timeout, semaphore.acquire()).await {
Ok(Ok(_permit)) => return Ok(None),
Ok(Err(_)) => return Ok(None),
Err(_) => {
// CLEANUP ON TIMEOUT
warn!("Stampede timeout for key: {}, cleaning up", key.query);
self.in_flight.remove(key); // REMOVE FROM MAP
// Create new guard
let semaphore = Arc::new(Semaphore::new(0));
self.in_flight.insert(key.clone(), semaphore.clone());
return Ok(Some(ComputeGuard::new(
key.clone(),
Some((self.in_flight.clone(), semaphore)),
)));
}
}
}
// Check capacity
if self.in_flight.len() >= self.max_in_flight {
return Err(CacheError::Overload("Too many in-flight requests".to_string()));
}
// This request gets to compute
let semaphore = Arc::new(Semaphore::new(0));
self.in_flight.insert(key.clone(), semaphore.clone());
Ok(Some(ComputeGuard::new(
key.clone(),
Some((self.in_flight.clone(), semaphore)),
)))
}

Issue #3: Missing Connection Health Checks

File: heliosdb-network/src/connection_pool.rs:244

Problem:

// ❌ CURRENT (DOESN'T CHECK IF CONNECTION ALIVE)
pub async fn health_check(&self) -> bool {
let stream = self.stream.lock().await;
stream.peer_addr().is_ok() // Only checks if socket open
}

Fix:

// FIXED
pub async fn health_check(&self) -> bool {
// Send ping and wait for pong
match timeout(
Duration::from_secs(5),
self.send_ping_pong()
).await {
Ok(Ok(())) => true,
_ => false,
}
}
async fn send_ping_pong(&self) -> Result<()> {
// Send ping message
let ping = HidbMessage::Ping;
self.send_message(ping).await?;
// Wait for pong
match timeout(
Duration::from_secs(5),
self.receive_message()
).await {
Ok(Ok(HidbMessage::Pong)) => Ok(()),
_ => Err(HeliosError::Network("Health check failed".to_string())),
}
}

Add to protocol:

// In heliosdb-network/src/protocol.rs
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HidbMessage {
// ... existing variants
Ping, // ADD THIS
Pong, // ADD THIS
}

Issue #4: Connection Expiration Not Enforced

File: heliosdb-network/src/connection_pool.rs:287-300

Problem:

// ❌ CURRENT (MISSING)
impl EnhancedConnectionPool {
pub fn new(addr: String, config: ConnectionPoolConfig) -> Arc<Self> {
let pool = Arc::new(Self { /* ... */ });
// Start background tasks
// ❌ NO EXPIRATION TASK!
pool
}
}

Fix:

// FIXED
impl EnhancedConnectionPool {
pub fn new(addr: String, config: ConnectionPoolConfig) -> Arc<Self> {
let pool = Arc::new(Self {
addr,
config: config.clone(),
pool: Arc::new(RwLock::new(VecDeque::new())),
semaphore: Arc::new(Semaphore::new(config.max_connections)),
connection_id_counter: Arc::new(AtomicU64::new(0)),
metrics: Arc::new(RwLock::new(PoolMetrics::default())),
shutdown: Arc::new(AtomicBool::new(false)),
});
// START EXPIRATION TASK
pool.clone().start_expiration_task();
// START HEALTH CHECK TASK
pool.clone().start_health_check_task();
pool
}
fn start_expiration_task(self: Arc<Self>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(
Duration::from_secs(self.config.health_check_interval_secs)
);
loop {
interval.tick().await;
if self.shutdown.load(Ordering::Relaxed) {
break;
}
// Remove expired connections
let mut pool = self.pool.write().await;
let before_count = pool.len();
pool.retain(|conn| {
!conn.meta.is_expired(&self.config)
});
let removed = before_count - pool.len();
if removed > 0 {
info!("Removed {} expired connections", removed);
}
}
});
}
fn start_health_check_task(self: Arc<Self>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(
Duration::from_secs(self.config.health_check_interval_secs)
);
loop {
interval.tick().await;
if self.shutdown.load(Ordering::Relaxed) {
break;
}
// Health check all idle connections
let mut pool = self.pool.write().await;
let mut unhealthy = Vec::new();
for (idx, conn) in pool.iter().enumerate() {
if !conn.meta.in_use {
if !conn.client.health_check().await {
unhealthy.push(idx);
}
}
}
// Remove unhealthy connections (in reverse order)
for idx in unhealthy.into_iter().rev() {
pool.remove(idx);
warn!("Removed unhealthy connection at index {}", idx);
}
}
});
}
}

⚠ Major Issues - Fix for Phase 2

Issue #5: Add Eviction Policy Abstraction

Create new file: heliosdb-cache/src/eviction.rs

use crate::types::CacheEntry;
use chrono::Utc;
/// Trait for cache eviction policies
pub trait EvictionPolicy: Send + Sync {
/// Calculate eviction score (lower = more likely to evict)
fn calculate_score(&self, entry: &CacheEntry) -> f64;
/// Policy name
fn name(&self) -> &str;
}
/// LRU eviction policy
pub struct LruPolicy;
impl EvictionPolicy for LruPolicy {
fn calculate_score(&self, entry: &CacheEntry) -> f64 {
let age = Utc::now()
.signed_duration_since(entry.metadata.last_accessed)
.num_seconds() as f64;
-age // Older = lower score = evict first
}
fn name(&self) -> &str { "LRU" }
}
/// LFU eviction policy
pub struct LfuPolicy;
impl EvictionPolicy for LfuPolicy {
fn calculate_score(&self, entry: &CacheEntry) -> f64 {
-(entry.metadata.access_count as f64)
}
fn name(&self) -> &str { "LFU" }
}
/// Hybrid LRU+LFU policy (for Phase 2)
pub struct HybridPolicy {
lru_weight: f64,
lfu_weight: f64,
}
impl HybridPolicy {
pub fn new(lru_weight: f64, lfu_weight: f64) -> Self {
assert!((lru_weight + lfu_weight - 1.0).abs() < 0.01, "Weights must sum to 1.0");
Self { lru_weight, lfu_weight }
}
pub fn default_balanced() -> Self {
Self { lru_weight: 0.5, lfu_weight: 0.5 }
}
}
impl EvictionPolicy for HybridPolicy {
fn calculate_score(&self, entry: &CacheEntry) -> f64 {
let lru_score = LruPolicy.calculate_score(entry);
let lfu_score = LfuPolicy.calculate_score(entry);
self.lru_weight * lru_score + self.lfu_weight * lfu_score
}
fn name(&self) -> &str { "Hybrid LRU+LFU" }
}

Update CacheConfig:

use std::sync::Arc;
pub struct CacheConfig {
// ... existing fields
/// Eviction policy (default: LRU)
pub eviction_policy: Arc<dyn EvictionPolicy>,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
// ... existing defaults
eviction_policy: Arc::new(LruPolicy),
}
}
}

Update evict_entries:

async fn evict_entries(&self, needed_bytes: usize) -> Result<()> {
// ... snapshot entries ...
// Use configured eviction policy
let mut scored: Vec<_> = entries_snapshot
.into_iter()
.map(|e| {
let score = self.config.eviction_policy.calculate_score(&e);
(e, score)
})
.collect();
// ... rest of eviction logic
}

Issue #6: Fix Affinity Map Unbounded Growth

File: heliosdb-pooling/src/optimizer.rs:194-200

Problem:

// ❌ CURRENT (GROWS FOREVER)
pub fn record_query_execution(...) {
let mut affinity = self.affinity_map.write();
affinity
.entry(fingerprint.clone())
.or_insert_with(Vec::new)
.push(connection_id); // NEVER REMOVED
}

Fix:

// FIXED
const MAX_AFFINITY_CONNECTIONS: usize = 10;
pub fn record_query_execution(
&self,
connection_id: Uuid,
fingerprint: QueryFingerprint,
execution_time_ms: f64,
) {
// Update affinity map with size limit
{
let mut affinity = self.affinity_map.write();
affinity
.entry(fingerprint.clone())
.and_modify(|connections| {
connections.push(connection_id);
// LIMIT SIZE (LRU eviction for affinity)
if connections.len() > MAX_AFFINITY_CONNECTIONS {
connections.remove(0); // Remove oldest
}
})
.or_insert_with(|| vec![connection_id]);
}
// ... rest of logic
}

🧪 Add Critical Tests

Test #1: Concurrent Eviction Test

File: heliosdb-cache/tests/concurrent_eviction_test.rs

#[tokio::test]
async fn test_concurrent_eviction_no_race() {
let backend = Arc::new(InMemoryBackend::new(1000));
let mut config = CacheConfig::default();
config.max_size_bytes = 1000;
config.min_result_size = 0;
config.min_execution_time_ms = 0.0;
let manager = Arc::new(CacheManager::new(backend, config));
// Spawn 100 concurrent tasks trying to fill cache
let mut handles = vec![];
for i in 0..100 {
let mgr = manager.clone();
let handle = tokio::spawn(async move {
let key = QueryKey::new(&format!("SELECT {}", i), &[], "test");
let data = vec![i as u8; 100];
mgr.put(key, data, vec![], 10.0).await.unwrap();
});
handles.push(handle);
}
// Wait for all
for h in handles {
h.await.unwrap();
}
// Verify cache size is within bounds
let stats = manager.stats();
assert!(stats.size_bytes <= 1000, "Cache size exceeded limit: {}", stats.size_bytes);
// Verify evictions happened
assert!(stats.evictions > 0, "No evictions occurred");
}

Test #2: Stampede Protection Memory Test

#[tokio::test]
async fn test_stampede_no_memory_leak() {
let protection = Arc::new(StampedeProtection::with_config(
Duration::from_millis(50), // Short timeout
1000
));
let key = QueryKey::new("SELECT * FROM test", &[], "test");
// Trigger many timeouts
for _ in 0..100 {
let prot = protection.clone();
let k = key.clone();
tokio::spawn(async move {
// First acquire - will timeout
let _guard = prot.try_acquire(&k).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
// Guard dropped here
});
}
tokio::time::sleep(Duration::from_secs(1)).await;
// Verify in_flight map is clean
assert_eq!(protection.in_flight_count(), 0, "Memory leak: in_flight not cleaned");
}

⏱ Time Estimates

IssuePriorityTimeCan Parallelize?
#1 Race ConditionP01 dayNo
#2 Memory LeakP00.5 daysNo
#3 Health ChecksP01 dayYes
#4 ExpirationP01 dayYes
#5 Eviction PolicyP13 daysYes
#6 Affinity MapP10.5 daysYes

Critical Path: Issues #1 → #2 → Tests = 2.5 days Parallel Work: Issues #3, #4 = 1 day Phase 2 Prep: Issues #5, #6 = 3.5 days

Total Time: ~7 days for all critical + Phase 2 blockers


Quick Start

Terminal window
# 1. Fix critical issues first (P0)
git checkout -b fix/critical-cache-issues
# Fix race condition (Issue #1)
vim heliosdb-cache/src/cache_manager.rs
# Fix memory leak (Issue #2)
vim heliosdb-cache/src/stampede_protection.rs
# Run tests
cargo test -p heliosdb-cache
# 2. Fix connection pool issues (P0)
git checkout -b fix/connection-pool-health
# Fix health checks (Issue #3)
vim heliosdb-network/src/connection_pool.rs
vim heliosdb-network/src/protocol.rs
# Fix expiration (Issue #4)
vim heliosdb-network/src/connection_pool.rs
# Run tests
cargo test -p heliosdb-network
# 3. Add critical tests
mkdir -p heliosdb-cache/tests
vim heliosdb-cache/tests/concurrent_eviction_test.rs
vim heliosdb-cache/tests/stampede_memory_test.rs
cargo test -p heliosdb-cache -- --test-threads=1

Verification Checklist

After applying fixes, verify:

  • cargo test -p heliosdb-cache passes
  • cargo test -p heliosdb-network passes
  • cargo test -p heliosdb-pooling passes
  • No warnings about unused code
  • cargo clippy has no errors
  • Concurrent eviction test passes 100 times
  • Stampede memory test shows 0 leaks
  • Connection pool health checks work
  • Connections expire correctly

For detailed analysis, see: CACHE_CONNECTION_POOLING_CODE_REVIEW_REPORT.md