Feature 14: Helios-DistribCache
Feature 14: Helios-DistribCache
Priority: Critical | Complexity: Very High | Phase: 5 (Flagship Feature)
Overview
Vision
Transform HeliosProxy into an intelligent distributed caching layer that goes beyond simple query caching. Helios-DistribCache provides:
- Workload-Aware Caching: Different strategies for OLTP vs OLAP vs Vector workloads
- Distributed Cache Mesh: Coordinated caching across proxy instances
- Intelligent Prefetching: Predict and pre-warm cache based on patterns
- Heatmap Analytics: Visual cache utilization and optimization recommendations
- Hybrid Storage Tiering: L1 (memory) → L2 (SSD) → L3 (distributed)
Problem Statement
Traditional caching approaches have limitations:
- Query caches don’t understand workload semantics
- Cache coherency across proxies is complex
- No visibility into cache effectiveness
- Static TTLs don’t adapt to data access patterns
- Vector/embedding workloads have unique needs
Solution
┌─────────────────────────────────────────────────────────────────────────────┐│ HELIOS-DISTRIBCACHE ││ ││ ┌───────────────────────────────────────────────────────────────────────┐ ││ │ WORKLOAD CLASSIFIER │ ││ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ ││ │ │ OLTP │ │ OLAP │ │ Vector │ │ AI/Agent │ │ ││ │ │ (low lat) │ │(high thru) │ │ (ANN/KNN) │ │ (context) │ │ ││ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ ││ └───────────────────────────────────────────────────────────────────────┘ ││ │ ││ ┌───────────────────────────────────────────────────────────────────────┐ ││ │ MULTI-TIER CACHE │ ││ │ │ ││ │ ┌─────────────────────────────────────────────────────────────────┐ │ ││ │ │ L1: Hot Cache (In-Memory, <100μs) │ │ ││ │ │ - Per-connection / session affinity │ │ ││ │ │ - LRU with frequency aging │ │ ││ │ │ - 64-512MB per proxy instance │ │ ││ │ └─────────────────────────────────────────────────────────────────┘ │ ││ │ │ miss │ ││ │ ▼ │ ││ │ ┌─────────────────────────────────────────────────────────────────┐ │ ││ │ │ L2: Warm Cache (Local SSD, <1ms) │ │ ││ │ │ - Compressed storage │ │ ││ │ │ - TTL-based expiration │ │ ││ │ │ - 1-10GB per proxy instance │ │ ││ │ └─────────────────────────────────────────────────────────────────┘ │ ││ │ │ miss │ ││ │ ▼ │ ││ │ ┌─────────────────────────────────────────────────────────────────┐ │ ││ │ │ L3: Distributed Cache (Mesh, <10ms) │ │ ││ │ │ - Consistent hashing across proxies │ │ ││ │ │ - Replication for availability │ │ ││ │ │ - Total cluster capacity pooled │ │ ││ │ └─────────────────────────────────────────────────────────────────┘ │ ││ └───────────────────────────────────────────────────────────────────────┘ ││ ││ ┌───────────────────────────────────────────────────────────────────────┐ ││ │ INTELLIGENT SERVICES │ ││ │ │ ││ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ ││ │ │ Prefetcher │ │ Invalidator │ │ Heatmap │ │ Scheduler │ │ ││ │ │ (predictive) │ │ (WAL-based) │ │ (analytics) │ │ (workload) │ │ ││ │ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │ ││ └───────────────────────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────────────────┘Architecture
Core Components
pub struct HeliosDistribCache { /// Workload classifier classifier: WorkloadClassifier,
/// Multi-tier cache l1_hot: Arc<HotCache>, l2_warm: Arc<WarmCache>, l3_distributed: Arc<DistributedCache>,
/// Intelligent prefetcher prefetcher: Arc<PredictivePrefetcher>,
/// WAL-based invalidator invalidator: Arc<WalInvalidator>,
/// Heatmap analytics heatmap: Arc<CacheHeatmap>,
/// Workload scheduler scheduler: Arc<WorkloadScheduler>,
/// Configuration config: DistribCacheConfig,}
pub struct DistribCacheConfig { /// L1 configuration pub l1_size_mb: usize, pub l1_max_entry_size: usize,
/// L2 configuration pub l2_size_gb: usize, pub l2_path: PathBuf, pub l2_compression: CompressionType,
/// L3 configuration pub l3_enabled: bool, pub l3_replication_factor: u32, pub l3_peers: Vec<SocketAddr>,
/// Workload-specific settings pub oltp_cache_ttl: Duration, pub olap_cache_ttl: Duration, pub vector_cache_ttl: Duration,
/// Prefetching pub prefetch_enabled: bool, pub prefetch_lookahead: u32,
/// Scheduling pub oltp_priority: u32, pub olap_priority: u32,}Workload Classifier
pub struct WorkloadClassifier { /// Classification rules rules: Vec<ClassificationRule>,
/// ML-based classifier (optional) ml_model: Option<Arc<ClassifierModel>>,
/// Recent query history for pattern detection history: DashMap<SessionId, QueryHistory>,}
#[derive(Debug, Clone, Copy, PartialEq)]pub enum WorkloadType { /// Online Transaction Processing /// Characteristics: Point lookups, short transactions, low latency OLTP,
/// Online Analytical Processing /// Characteristics: Full scans, aggregations, high throughput OLAP,
/// Vector/Embedding Operations /// Characteristics: ANN search, similarity queries Vector,
/// AI Agent Workloads /// Characteristics: Context retrieval, tool calls, conversation AIAgent,
/// RAG Pipeline /// Characteristics: Embedding + retrieval + reranking RAG,
/// Mixed/Unknown Mixed,}
impl WorkloadClassifier { pub fn classify(&self, query: &str, context: &QueryContext) -> WorkloadType { // 1. Check explicit hints if let Some(hint) = context.workload_hint { return hint; }
// 2. Pattern-based classification if let Some(wt) = self.classify_by_pattern(query) { return wt; }
// 3. Session history-based classification if let Some(wt) = self.classify_by_session(context.session_id) { return wt; }
// 4. ML-based classification (if enabled) if let Some(model) = &self.ml_model { return model.predict(query); }
WorkloadType::Mixed }
fn classify_by_pattern(&self, query: &str) -> Option<WorkloadType> { // Vector operations if query.contains("<->") || query.contains("vector") || query.contains("embedding") { return Some(WorkloadType::Vector); }
// OLAP patterns if query.contains("GROUP BY") || query.contains("HAVING") || query.contains("COUNT(") || query.contains("SUM(") || query.contains("AVG(") { return Some(WorkloadType::OLAP); }
// RAG patterns if query.contains("chunks") && query.contains("documents") { return Some(WorkloadType::RAG); }
// AI Agent patterns if query.contains("conversation") || query.contains("tool_") || query.contains("agent_") { return Some(WorkloadType::AIAgent); }
// OLTP patterns (simple CRUD) if self.is_simple_crud(query) { return Some(WorkloadType::OLTP); }
None }}Multi-Tier Cache Implementation
L1 Hot Cache (In-Memory)
pub struct HotCache { /// Main cache storage cache: DashMap<CacheKey, CacheEntry>,
/// LRU with frequency tracking eviction: Arc<LFUEviction>,
/// Per-session affinity session_affinity: DashMap<SessionId, HashSet<CacheKey>>,
/// Size tracking current_size: AtomicUsize, max_size: usize,}
impl HotCache { pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> { if let Some(entry) = self.cache.get(key) { // Update access frequency self.eviction.touch(key);
// Check TTL if !entry.is_expired() { return Some(entry.clone()); }
// Expired, remove self.cache.remove(key); }
None }
pub fn insert(&self, key: CacheKey, entry: CacheEntry, session: Option<SessionId>) { let entry_size = entry.size();
// Evict if needed while self.current_size.load(Ordering::Relaxed) + entry_size > self.max_size { if let Some(evicted) = self.eviction.evict_one() { self.cache.remove(&evicted); self.current_size.fetch_sub( self.cache.get(&evicted).map(|e| e.size()).unwrap_or(0), Ordering::Relaxed ); } else { break; // No more to evict } }
// Insert self.cache.insert(key.clone(), entry); self.current_size.fetch_add(entry_size, Ordering::Relaxed); self.eviction.insert(key.clone());
// Track session affinity if let Some(sid) = session { self.session_affinity.entry(sid) .or_default() .insert(key); } }}L2 Warm Cache (Local SSD)
pub struct WarmCache { /// RocksDB for persistent storage db: rocksdb::DB,
/// Bloom filter for fast negative lookups bloom: BloomFilter,
/// Compression compression: CompressionType,
/// Size tracking current_size: AtomicU64, max_size: u64,}
impl WarmCache { pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> { // Fast path: bloom filter check if !self.bloom.may_contain(&key.to_bytes()) { return None; }
// Lookup in RocksDB let bytes = self.db.get(&key.to_bytes()).ok()??;
// Decompress and deserialize let decompressed = self.decompress(&bytes)?; let entry: CacheEntry = bincode::deserialize(&decompressed).ok()?;
// Check TTL if entry.is_expired() { self.db.delete(&key.to_bytes()).ok(); return None; }
Some(entry) }
pub fn insert(&self, key: CacheKey, entry: CacheEntry) { let serialized = bincode::serialize(&entry).unwrap(); let compressed = self.compress(&serialized);
// Evict if needed while self.current_size.load(Ordering::Relaxed) + compressed.len() as u64 > self.max_size { self.evict_oldest(); }
// Insert self.db.put(&key.to_bytes(), &compressed).ok(); self.bloom.insert(&key.to_bytes()); self.current_size.fetch_add(compressed.len() as u64, Ordering::Relaxed); }
fn compress(&self, data: &[u8]) -> Vec<u8> { match self.compression { CompressionType::None => data.to_vec(), CompressionType::Lz4 => lz4_flex::compress(data), CompressionType::Zstd => zstd::encode_all(data, 3).unwrap(), } }}L3 Distributed Cache (Mesh)
pub struct DistributedCache { /// Consistent hash ring for key distribution hash_ring: Arc<HashRing<PeerId>>,
/// Peer connections peers: DashMap<PeerId, PeerConnection>,
/// Local cache for owned keys local: Arc<HotCache>,
/// Replication factor replication_factor: u32,
/// Gossip protocol for membership gossip: Arc<GossipProtocol>,}
impl DistributedCache { pub async fn get(&self, key: &CacheKey) -> Option<CacheEntry> { // 1. Determine owner(s) of the key let owners = self.hash_ring.get_nodes(&key.to_bytes(), self.replication_factor);
// 2. Check if we own it locally if owners.contains(&self.local_peer_id) { if let Some(entry) = self.local.get(key) { return Some(entry); } }
// 3. Query remote owners for owner in owners { if owner == self.local_peer_id { continue; }
if let Some(peer) = self.peers.get(&owner) { if let Ok(entry) = peer.get(key).await { // Cache locally for future access self.local.insert(key.clone(), entry.clone(), None); return Some(entry); } } }
None }
pub async fn insert(&self, key: CacheKey, entry: CacheEntry) { let owners = self.hash_ring.get_nodes(&key.to_bytes(), self.replication_factor);
// Insert to all replica owners let futures: Vec<_> = owners.iter() .map(|owner| { if *owner == self.local_peer_id { Box::pin(async { self.local.insert(key.clone(), entry.clone(), None); Ok(()) }) } else if let Some(peer) = self.peers.get(owner) { Box::pin(peer.insert(key.clone(), entry.clone())) } else { Box::pin(async { Err(CacheError::PeerNotFound) }) } }) .collect();
// Wait for majority let required_acks = (self.replication_factor / 2) + 1; let _ = futures_util::future::select_ok( futures.into_iter().take(required_acks as usize) ).await; }}Predictive Prefetcher
pub struct PredictivePrefetcher { /// Query sequence patterns patterns: DashMap<QueryFingerprint, Vec<QueryFingerprint>>,
/// Session-based prediction session_sequences: DashMap<SessionId, VecDeque<QueryFingerprint>>,
/// Temporal patterns (time-of-day based) temporal_patterns: TemporalPatternStore,
/// Prefetch queue prefetch_queue: Arc<PrefetchQueue>,
/// Background prefetch worker worker: JoinHandle<()>,}
impl PredictivePrefetcher { /// Record query execution for pattern learning pub fn record(&self, session: &str, fingerprint: QueryFingerprint) { // Add to session sequence let mut seq = self.session_sequences.entry(session.to_string()) .or_insert_with(VecDeque::new);
// Learn patterns from sequence if seq.len() >= 2 { let prev = seq.back().unwrap().clone(); self.patterns.entry(prev) .or_default() .push(fingerprint.clone()); }
seq.push_back(fingerprint);
// Keep last N queries while seq.len() > 100 { seq.pop_front(); } }
/// Predict and prefetch next likely queries pub fn predict_and_prefetch(&self, current: &QueryFingerprint, session: &str) { // 1. Pattern-based prediction if let Some(next_queries) = self.patterns.get(current) { // Get most common next queries let predictions = self.get_top_predictions(next_queries.value());
for (fingerprint, confidence) in predictions { if confidence > 0.3 { self.prefetch_queue.enqueue(PrefetchRequest { fingerprint, priority: (confidence * 100.0) as u32, }); } } }
// 2. Temporal prediction if let Some(temporal) = self.temporal_patterns.predict_for_time(Utc::now()) { for fingerprint in temporal { self.prefetch_queue.enqueue(PrefetchRequest { fingerprint, priority: 50, // Medium priority }); } } }
/// Background prefetch worker async fn prefetch_worker(&self) { loop { if let Some(request) = self.prefetch_queue.dequeue().await { // Check if already cached if self.cache.contains(&request.fingerprint) { continue; }
// Execute query and cache result if let Ok(result) = self.execute_query(&request.fingerprint).await { self.cache.insert(request.fingerprint, result); } } } }}WAL-Based Invalidator
pub struct WalInvalidator { /// WAL stream subscription wal_stream: WalStreamer,
/// Table to cache key index table_index: DashMap<TableName, HashSet<CacheKey>>,
/// Cache reference cache: Arc<HeliosDistribCache>,}
impl WalInvalidator { /// Start listening to WAL for invalidation pub async fn start(&self) { let mut stream = self.wal_stream.subscribe().await;
while let Some(entry) = stream.next().await { match entry.operation { WalOperation::Put { key, .. } | WalOperation::Delete { key } => { let table = extract_table(&key); self.invalidate_table(&table); } WalOperation::UpdateCounter { table_name, .. } => { // Sequence changes may affect caches self.invalidate_table(&table_name); } _ => {} } } }
fn invalidate_table(&self, table: &str) { if let Some(keys) = self.table_index.get(table) { for key in keys.iter() { self.cache.invalidate(key); } } }
/// Selective invalidation based on row fn invalidate_row(&self, table: &str, row_key: &[u8]) { // For fine-grained invalidation let pattern = format!("{}:{}", table, hex::encode(row_key));
for entry in self.cache.l1_hot.iter() { if entry.key().matches_pattern(&pattern) { self.cache.invalidate(entry.key()); } } }}Cache Heatmap Analytics
pub struct CacheHeatmap { /// Access counts per table table_accesses: DashMap<TableName, AccessStats>,
/// Access counts per query fingerprint query_accesses: DashMap<QueryFingerprint, AccessStats>,
/// Time-bucketed access data time_buckets: RwLock<Vec<TimeBucket>>,
/// Heatmap grid (for visualization) grid: RwLock<HeatmapGrid>,}
#[derive(Debug, Clone)]pub struct AccessStats { pub hits: AtomicU64, pub misses: AtomicU64, pub total_time_saved_us: AtomicU64, pub last_access: AtomicU64,}
#[derive(Debug, Clone)]pub struct TimeBucket { pub start: DateTime<Utc>, pub end: DateTime<Utc>, pub accesses: HashMap<TableName, u64>, pub hit_ratio: f64,}
impl CacheHeatmap { pub fn record_access(&self, key: &CacheKey, hit: bool, time_saved: Duration) { // Update table stats let table = key.table(); let stats = self.table_accesses.entry(table.to_string()) .or_insert_with(AccessStats::default);
if hit { stats.hits.fetch_add(1, Ordering::Relaxed); stats.total_time_saved_us.fetch_add( time_saved.as_micros() as u64, Ordering::Relaxed ); } else { stats.misses.fetch_add(1, Ordering::Relaxed); }
// Update query stats let query_stats = self.query_accesses.entry(key.fingerprint()) .or_insert_with(AccessStats::default);
if hit { query_stats.hits.fetch_add(1, Ordering::Relaxed); } else { query_stats.misses.fetch_add(1, Ordering::Relaxed); }
// Update time bucket self.update_time_bucket(table, hit); }
/// Generate heatmap visualization data pub fn generate_heatmap(&self) -> HeatmapData { let tables: Vec<_> = self.table_accesses.iter() .map(|entry| { let stats = entry.value(); let hits = stats.hits.load(Ordering::Relaxed); let misses = stats.misses.load(Ordering::Relaxed);
TableHeat { name: entry.key().clone(), total_accesses: hits + misses, hit_ratio: hits as f64 / (hits + misses).max(1) as f64, time_saved_ms: stats.total_time_saved_us.load(Ordering::Relaxed) / 1000, temperature: self.calculate_temperature(hits + misses), } }) .sorted_by(|a, b| b.total_accesses.cmp(&a.total_accesses)) .collect();
HeatmapData { tables, time_series: self.get_time_series(), recommendations: self.generate_recommendations(), } }
/// Generate optimization recommendations fn generate_recommendations(&self) -> Vec<Recommendation> { let mut recs = Vec::new();
for entry in self.table_accesses.iter() { let stats = entry.value(); let hits = stats.hits.load(Ordering::Relaxed); let misses = stats.misses.load(Ordering::Relaxed); let hit_ratio = hits as f64 / (hits + misses).max(1) as f64;
// Low hit ratio recommendation if hit_ratio < 0.5 && (hits + misses) > 1000 { recs.push(Recommendation { table: entry.key().clone(), issue: "Low cache hit ratio".to_string(), suggestion: format!( "Consider increasing TTL or cache size for '{}' (current hit ratio: {:.1}%)", entry.key(), hit_ratio * 100.0 ), priority: Priority::High, }); }
// Cold data in hot cache let last_access = stats.last_access.load(Ordering::Relaxed); if last_access > 0 { let age = Duration::from_nanos(now_nanos() - last_access); if age > Duration::from_secs(3600) && (hits + misses) < 100 { recs.push(Recommendation { table: entry.key().clone(), issue: "Cold data in cache".to_string(), suggestion: format!( "'{}' hasn't been accessed in {:.0} minutes, consider reducing TTL", entry.key(), age.as_secs_f64() / 60.0 ), priority: Priority::Medium, }); } } }
recs }}Workload Scheduler
pub struct WorkloadScheduler { /// Queue per workload type queues: HashMap<WorkloadType, PriorityQueue>,
/// Current workload distribution distribution: RwLock<WorkloadDistribution>,
/// Scheduling policy policy: SchedulingPolicy,
/// Resource limits per workload limits: HashMap<WorkloadType, ResourceLimit>,}
#[derive(Debug, Clone)]pub struct ResourceLimit { /// Max concurrent queries pub max_concurrent: u32,
/// Max cache memory pub max_cache_mb: usize,
/// Priority weight pub priority_weight: f64,}
#[derive(Debug, Clone, Copy)]pub enum SchedulingPolicy { /// Strict priority (OLTP always first) StrictPriority,
/// Weighted fair queuing WeightedFair,
/// Time-based (OLAP during off-hours) TimeBased,
/// Adaptive (learn optimal distribution) Adaptive,}
impl WorkloadScheduler { /// Schedule a query based on workload type pub fn schedule(&self, query: ScheduledQuery) -> ScheduleResult { let workload = query.workload_type; let limit = self.limits.get(&workload).unwrap();
// Check current concurrency let current = self.get_current_concurrency(&workload); if current >= limit.max_concurrent { return ScheduleResult::Queued { position: self.queue_position(&workload) }; }
// Apply scheduling policy match self.policy { SchedulingPolicy::StrictPriority => { self.schedule_strict_priority(query) } SchedulingPolicy::WeightedFair => { self.schedule_weighted_fair(query) } SchedulingPolicy::TimeBased => { self.schedule_time_based(query) } SchedulingPolicy::Adaptive => { self.schedule_adaptive(query) } } }
fn schedule_time_based(&self, query: ScheduledQuery) -> ScheduleResult { let hour = Utc::now().hour();
// Business hours (9-18): prioritize OLTP if hour >= 9 && hour < 18 { if query.workload_type == WorkloadType::OLTP { return ScheduleResult::Execute { priority: Priority::High }; } if query.workload_type == WorkloadType::OLAP { return ScheduleResult::Execute { priority: Priority::Low }; } }
// Off-hours: prioritize OLAP if query.workload_type == WorkloadType::OLAP { return ScheduleResult::Execute { priority: Priority::High }; }
ScheduleResult::Execute { priority: Priority::Normal } }
fn schedule_adaptive(&self, query: ScheduledQuery) -> ScheduleResult { // Learn optimal distribution based on historical performance let distribution = self.distribution.read();
// Calculate ideal distribution let ideal = distribution.calculate_ideal();
// Adjust based on current state let current = self.get_current_distribution();
if current.is_below_ideal(&ideal, query.workload_type) { ScheduleResult::Execute { priority: Priority::High } } else if current.is_above_ideal(&ideal, query.workload_type) { ScheduleResult::Queued { position: 0 } } else { ScheduleResult::Execute { priority: Priority::Normal } } }}API Specification
Configuration (heliosproxy.toml)
[distribcache]enabled = true
# Multi-tier configuration[distribcache.l1]enabled = truesize_mb = 256max_entry_size_kb = 1024eviction = "lfu" # lru, lfu, adaptive
[distribcache.l2]enabled = truesize_gb = 5path = "/var/lib/heliosproxy/cache"compression = "lz4" # none, lz4, zstd
[distribcache.l3]enabled = truereplication_factor = 2peers = [ "proxy-1.internal:9100", "proxy-2.internal:9100", "proxy-3.internal:9100"]
# Workload-specific settings[distribcache.workloads.oltp]cache_ttl = "60s"max_concurrent = 500priority_weight = 1.0
[distribcache.workloads.olap]cache_ttl = "30m"max_concurrent = 50priority_weight = 0.3
[distribcache.workloads.vector]cache_ttl = "5m"max_concurrent = 100priority_weight = 0.5
[distribcache.workloads.ai_agent]cache_ttl = "10m"max_concurrent = 200priority_weight = 0.7
# Prefetching[distribcache.prefetch]enabled = truelookahead = 3confidence_threshold = 0.3max_prefetch_queue = 100
# Invalidation[distribcache.invalidation]mode = "wal" # wal, ttl, hybridwal_lag_tolerance = "100ms"
# Scheduling[distribcache.scheduling]policy = "weighted_fair" # strict_priority, weighted_fair, time_based, adaptiveoltp_hours = "09:00-18:00"olap_hours = "00:00-06:00"
# Heatmap[distribcache.heatmap]enabled = truebucket_size = "5m"retention = "7d"Admin API
GET /distribcache/stats{ "l1": { "size_mb": 234, "max_size_mb": 256, "entries": 15234, "hit_ratio": 0.92, "evictions_per_sec": 12.3 }, "l2": { "size_gb": 3.2, "max_size_gb": 5, "entries": 125000, "hit_ratio": 0.78, "compression_ratio": 2.3 }, "l3": { "total_capacity_gb": 15, "used_gb": 8.5, "peers": 3, "healthy_peers": 3, "replication_lag_ms": 5 }, "overall_hit_ratio": 0.89, "time_saved_seconds": 1234.5, "queries_avoided": 150000}
GET /distribcache/heatmap{ "tables": [ { "name": "users", "total_accesses": 50000, "hit_ratio": 0.95, "time_saved_ms": 25000, "temperature": "hot" }, { "name": "analytics_events", "total_accesses": 5000, "hit_ratio": 0.45, "time_saved_ms": 15000, "temperature": "warm" } ], "time_series": [ { "timestamp": "...", "hit_ratio": 0.89 }, { "timestamp": "...", "hit_ratio": 0.91 } ], "recommendations": [ { "table": "orders", "issue": "Low cache hit ratio", "suggestion": "Consider increasing TTL from 30s to 5m", "priority": "high" } ]}
GET /distribcache/workloads{ "distribution": { "oltp": { "current_pct": 65, "target_pct": 60, "queued": 5 }, "olap": { "current_pct": 20, "target_pct": 25, "queued": 10 }, "vector": { "current_pct": 10, "target_pct": 10, "queued": 0 }, "ai_agent": { "current_pct": 5, "target_pct": 5, "queued": 2 } }, "scheduling_policy": "weighted_fair", "current_hour_policy": "oltp_priority"}
POST /distribcache/prefetch# Manually trigger prefetch for pattern{ "fingerprint": "abc123" }
POST /distribcache/invalidate# Invalidate cache entries{ "tables": ["users", "orders"] }
POST /distribcache/resize# Dynamically resize cache{ "l1_size_mb": 512, "l2_size_gb": 10 }AI/Agent Innovations
1. Conversation Context Cache
Specialized caching for AI agent conversations:
pub struct ConversationContextCache { /// Recent turns per conversation contexts: DashMap<ConversationId, ConversationContext>,
/// LRU for conversation eviction lru: Mutex<LruCache<ConversationId, ()>>,
/// Max turns to cache per conversation max_turns: usize,}
impl ConversationContextCache { pub fn get_context(&self, conv_id: &str, max_turns: usize) -> Option<Vec<Turn>> { let ctx = self.contexts.get(conv_id)?;
// Return recent turns Some(ctx.turns.iter() .rev() .take(max_turns) .rev() .cloned() .collect()) }
pub fn append_turn(&self, conv_id: &str, turn: Turn) { let mut ctx = self.contexts.entry(conv_id.to_string()) .or_insert_with(ConversationContext::new);
ctx.turns.push(turn);
// Maintain size limit while ctx.turns.len() > self.max_turns { ctx.turns.remove(0); }
// Update LRU self.lru.lock().put(conv_id.to_string(), ()); }}2. RAG Chunk Cache
Cache document chunks for RAG pipelines:
pub struct RagChunkCache { /// Chunk cache (id -> content) chunks: Arc<HotCache>,
/// Embedding -> chunk ID mapping embedding_to_chunks: DashMap<EmbeddingHash, Vec<ChunkId>>,
/// Prefetch related chunks prefetcher: Arc<ChunkPrefetcher>,}
impl RagChunkCache { pub async fn get_chunks(&self, embedding: &[f32], k: usize) -> Vec<Chunk> { let hash = hash_embedding(embedding);
// Check if we've seen this embedding before if let Some(chunk_ids) = self.embedding_to_chunks.get(&hash) { // Retrieve cached chunks let chunks: Vec<_> = chunk_ids.iter() .filter_map(|id| self.chunks.get(&CacheKey::chunk(*id))) .take(k) .collect();
if chunks.len() >= k { return chunks; } }
// Cache miss, perform ANN search let results = self.perform_ann_search(embedding, k).await;
// Cache results let chunk_ids: Vec<_> = results.iter().map(|c| c.id).collect(); self.embedding_to_chunks.insert(hash, chunk_ids.clone());
for chunk in &results { self.chunks.insert( CacheKey::chunk(chunk.id), CacheEntry::from_chunk(chunk), None ); }
// Prefetch related chunks self.prefetcher.prefetch_related(&chunk_ids);
results }}3. Tool Result Cache
Cache deterministic tool call results:
pub struct ToolResultCache { /// Tool -> Result cache cache: DashMap<ToolCallKey, ToolResult>,
/// Tool determinism configuration deterministic_tools: HashSet<String>,}
impl ToolResultCache { pub async fn execute_with_cache( &self, tool: &str, params: &Value, executor: impl Future<Output = ToolResult>, ) -> ToolResult { // Check if tool is deterministic if !self.deterministic_tools.contains(tool) { return executor.await; }
let key = ToolCallKey::new(tool, params);
// Check cache if let Some(result) = self.cache.get(&key) { return result.clone(); }
// Execute and cache let result = executor.await; self.cache.insert(key, result.clone());
result }}4. Semantic Query Cache
Cache based on query semantics, not exact match:
pub struct SemanticQueryCache { /// Query embeddings index query_index: HnswIndex,
/// Embedding -> result mapping results: DashMap<VectorId, CacheEntry>,
/// Similarity threshold threshold: f32,}
impl SemanticQueryCache { pub async fn get_semantic(&self, query: &str) -> Option<CacheEntry> { // Embed the query let embedding = self.embed_query(query).await;
// Search for similar cached queries let similar = self.query_index.search(&embedding, 1);
if let Some(hit) = similar.first() { if hit.similarity > self.threshold { return self.results.get(&hit.id).map(|r| r.clone()); } }
None }
pub fn cache_semantic(&self, query: &str, result: CacheEntry) { let embedding = self.embed_query_sync(query); let id = self.query_index.insert(&embedding); self.results.insert(id, result); }}HeliosDB-Lite Integration
1. WAL-Driven Invalidation
Leverage HeliosDB-Lite’s WAL for cache coherency:
impl WalInvalidator { pub async fn connect_to_helios_wal(&self, wal_endpoint: &str) -> Result<()> { let mut stream = WalStreamer::connect(wal_endpoint).await?;
while let Some(entry) = stream.next().await { // Extract table from HeliosDB-Lite key format let table = match &entry.operation { WalOperation::Put { key, .. } => extract_table_from_key(key), WalOperation::Delete { key } => extract_table_from_key(key), _ => continue, };
// Invalidate cached queries for this table self.cache.invalidate_table(&table); }
Ok(()) }}2. Branch-Aware Caching
Separate caches per branch:
pub struct BranchAwareDistribCache { /// Cache per branch branch_caches: DashMap<BranchName, Arc<HeliosDistribCache>>,
/// Default cache (for main branch) main_cache: Arc<HeliosDistribCache>,}
impl BranchAwareDistribCache { pub fn get(&self, branch: &str, key: &CacheKey) -> Option<CacheEntry> { if branch == "main" { return self.main_cache.get(key); }
self.branch_caches.get(branch) .and_then(|c| c.get(key)) }
/// Promote cache entries when branch merges pub fn promote_on_merge(&self, source: &str, target: &str) { if let Some(source_cache) = self.branch_caches.get(source) { let target_cache = if target == "main" { self.main_cache.clone() } else { self.branch_caches.entry(target.to_string()) .or_insert_with(|| Arc::new(HeliosDistribCache::new())) .clone() };
// Copy valid entries source_cache.copy_valid_entries_to(&target_cache); } }}3. Vector Result Caching
Cache HNSW search results:
pub struct VectorSearchCache { /// Cache by (query vector hash, k, ef_search) results: DashMap<VectorSearchKey, Vec<SearchResult>>,
/// Index version tracking (invalidate on rebuild) index_versions: DashMap<IndexName, u64>,}
impl VectorSearchCache { pub fn get_ann_results( &self, query: &[f32], k: usize, ef_search: usize, index: &str, ) -> Option<Vec<SearchResult>> { let key = VectorSearchKey { vector_hash: hash_vector(query), k, ef_search, index_version: self.index_versions.get(index)?.clone(), };
self.results.get(&key).map(|r| r.clone()) }
/// Invalidate on index rebuild pub fn on_index_rebuild(&self, index: &str) { let new_version = self.index_versions.entry(index.to_string()) .and_modify(|v| *v += 1) .or_insert(1);
// Old entries will miss due to version mismatch info!("Index {} rebuilt, cache invalidated (version {})", index, new_version); }}4. Time-Travel Cache
Cache historical queries (immutable):
pub struct TimeTravelCache { /// Historical queries are immutable, cache forever historical: DashMap<(QueryFingerprint, Timestamp), CacheEntry>,}
impl TimeTravelCache { pub fn get_historical( &self, fingerprint: &QueryFingerprint, as_of: Timestamp, ) -> Option<CacheEntry> { self.historical.get(&(fingerprint.clone(), as_of)) .map(|e| e.clone()) }
pub fn cache_historical( &self, fingerprint: QueryFingerprint, as_of: Timestamp, result: CacheEntry, ) { // Historical data never changes, no TTL needed self.historical.insert((fingerprint, as_of), result); }}Implementation Notes
File Locations
src/proxy/├── distribcache/│ ├── mod.rs # Public API│ ├── config.rs # DistribCacheConfig│ ├── classifier.rs # WorkloadClassifier│ ├── tiers/│ │ ├── mod.rs│ │ ├── l1_hot.rs # HotCache (in-memory)│ │ ├── l2_warm.rs # WarmCache (SSD)│ │ └── l3_distributed.rs # DistributedCache (mesh)│ ├── prefetcher.rs # PredictivePrefetcher│ ├── invalidator.rs # WalInvalidator│ ├── heatmap.rs # CacheHeatmap│ ├── scheduler.rs # WorkloadScheduler│ ├── ai/│ │ ├── conversation.rs # ConversationContextCache│ │ ├── rag.rs # RagChunkCache│ │ ├── tools.rs # ToolResultCache│ │ └── semantic.rs # SemanticQueryCache│ └── metrics.rs # Prometheus metricsKey Considerations
-
Memory Management: Carefully track memory usage across tiers. Implement backpressure.
-
Cache Coherency: WAL-based invalidation must handle lag. Consider versioning.
-
Distributed Coordination: Use gossip protocol for peer discovery. Handle partitions.
-
Serialization: Use efficient formats (bincode, postcard) for cache entries.
-
Monitoring: Comprehensive metrics for hit ratios, latencies, evictions.
Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| L1 lookup | <100μs | p99 |
| L2 lookup | <1ms | p99 |
| L3 lookup | <10ms | p99 (cross-network) |
| Overall hit ratio | >85% | for cacheable queries |
| Prefetch hit rate | >50% | for predicted queries |
| Invalidation latency | <100ms | from WAL to cache |
| Heatmap generation | <1s | full report |
Prometheus Metrics
# Cache tier metricsheliosproxy_distribcache_hits_total{tier="l1"} 150000heliosproxy_distribcache_hits_total{tier="l2"} 30000heliosproxy_distribcache_hits_total{tier="l3"} 10000heliosproxy_distribcache_misses_total 20000
# Latency histograms per tierheliosproxy_distribcache_lookup_seconds_bucket{tier="l1",le="0.0001"} 140000heliosproxy_distribcache_lookup_seconds_bucket{tier="l2",le="0.001"} 29000
# Workload distributionheliosproxy_workload_queries_total{type="oltp"} 100000heliosproxy_workload_queries_total{type="olap"} 10000heliosproxy_workload_queries_total{type="vector"} 20000
# Prefetch metricsheliosproxy_prefetch_hits_total 50000heliosproxy_prefetch_misses_total 10000
# Heatmap dataheliosproxy_table_accesses_total{table="users"} 50000heliosproxy_table_hit_ratio{table="users"} 0.95Timeline
Q1-Q2 2027: Helios-DistribCache Development
Month 1-2: Core Cache Infrastructure├── L1 hot cache implementation├── L2 warm cache with RocksDB├── Workload classifier└── Basic invalidation
Month 3-4: Distributed Layer├── L3 distributed cache mesh├── Consistent hashing├── Peer discovery (gossip)└── Replication
Month 5-6: Intelligence Layer├── Predictive prefetcher├── WAL-based invalidation├── Heatmap analytics└── Workload scheduler
Month 7-8: AI/Agent Optimizations├── Conversation context cache├── RAG chunk cache├── Semantic query cache└── Tool result cache
Month 9: Integration & Testing├── HeliosDB-Lite integration├── Performance testing├── Documentation└── ReleaseRelated Features
- Query Caching - Foundation for DistribCache
- Schema-Aware Routing - Workload classification
- Query Analytics - Heatmap data source
- Replica Lag-Aware Routing - Cache coherency