Skip to content

Feature 14: Helios-DistribCache

Feature 14: Helios-DistribCache

Priority: Critical | Complexity: Very High | Phase: 5 (Flagship Feature)


Overview

Vision

Transform HeliosProxy into an intelligent distributed caching layer that goes beyond simple query caching. Helios-DistribCache provides:

  • Workload-Aware Caching: Different strategies for OLTP vs OLAP vs Vector workloads
  • Distributed Cache Mesh: Coordinated caching across proxy instances
  • Intelligent Prefetching: Predict and pre-warm cache based on patterns
  • Heatmap Analytics: Visual cache utilization and optimization recommendations
  • Hybrid Storage Tiering: L1 (memory) → L2 (SSD) → L3 (distributed)

Problem Statement

Traditional caching approaches have limitations:

  • Query caches don’t understand workload semantics
  • Cache coherency across proxies is complex
  • No visibility into cache effectiveness
  • Static TTLs don’t adapt to data access patterns
  • Vector/embedding workloads have unique needs

Solution

┌─────────────────────────────────────────────────────────────────────────────┐
│ HELIOS-DISTRIBCACHE │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ WORKLOAD CLASSIFIER │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ OLTP │ │ OLAP │ │ Vector │ │ AI/Agent │ │ │
│ │ │ (low lat) │ │(high thru) │ │ (ANN/KNN) │ │ (context) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ MULTI-TIER CACHE │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ L1: Hot Cache (In-Memory, <100μs) │ │ │
│ │ │ - Per-connection / session affinity │ │ │
│ │ │ - LRU with frequency aging │ │ │
│ │ │ - 64-512MB per proxy instance │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ miss │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ L2: Warm Cache (Local SSD, <1ms) │ │ │
│ │ │ - Compressed storage │ │ │
│ │ │ - TTL-based expiration │ │ │
│ │ │ - 1-10GB per proxy instance │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ miss │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ L3: Distributed Cache (Mesh, <10ms) │ │ │
│ │ │ - Consistent hashing across proxies │ │ │
│ │ │ - Replication for availability │ │ │
│ │ │ - Total cluster capacity pooled │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ INTELLIGENT SERVICES │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Prefetcher │ │ Invalidator │ │ Heatmap │ │ Scheduler │ │ │
│ │ │ (predictive) │ │ (WAL-based) │ │ (analytics) │ │ (workload) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘

Architecture

Core Components

pub struct HeliosDistribCache {
/// Workload classifier
classifier: WorkloadClassifier,
/// Multi-tier cache
l1_hot: Arc<HotCache>,
l2_warm: Arc<WarmCache>,
l3_distributed: Arc<DistributedCache>,
/// Intelligent prefetcher
prefetcher: Arc<PredictivePrefetcher>,
/// WAL-based invalidator
invalidator: Arc<WalInvalidator>,
/// Heatmap analytics
heatmap: Arc<CacheHeatmap>,
/// Workload scheduler
scheduler: Arc<WorkloadScheduler>,
/// Configuration
config: DistribCacheConfig,
}
pub struct DistribCacheConfig {
/// L1 configuration
pub l1_size_mb: usize,
pub l1_max_entry_size: usize,
/// L2 configuration
pub l2_size_gb: usize,
pub l2_path: PathBuf,
pub l2_compression: CompressionType,
/// L3 configuration
pub l3_enabled: bool,
pub l3_replication_factor: u32,
pub l3_peers: Vec<SocketAddr>,
/// Workload-specific settings
pub oltp_cache_ttl: Duration,
pub olap_cache_ttl: Duration,
pub vector_cache_ttl: Duration,
/// Prefetching
pub prefetch_enabled: bool,
pub prefetch_lookahead: u32,
/// Scheduling
pub oltp_priority: u32,
pub olap_priority: u32,
}

Workload Classifier

pub struct WorkloadClassifier {
/// Classification rules
rules: Vec<ClassificationRule>,
/// ML-based classifier (optional)
ml_model: Option<Arc<ClassifierModel>>,
/// Recent query history for pattern detection
history: DashMap<SessionId, QueryHistory>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WorkloadType {
/// Online Transaction Processing
/// Characteristics: Point lookups, short transactions, low latency
OLTP,
/// Online Analytical Processing
/// Characteristics: Full scans, aggregations, high throughput
OLAP,
/// Vector/Embedding Operations
/// Characteristics: ANN search, similarity queries
Vector,
/// AI Agent Workloads
/// Characteristics: Context retrieval, tool calls, conversation
AIAgent,
/// RAG Pipeline
/// Characteristics: Embedding + retrieval + reranking
RAG,
/// Mixed/Unknown
Mixed,
}
impl WorkloadClassifier {
pub fn classify(&self, query: &str, context: &QueryContext) -> WorkloadType {
// 1. Check explicit hints
if let Some(hint) = context.workload_hint {
return hint;
}
// 2. Pattern-based classification
if let Some(wt) = self.classify_by_pattern(query) {
return wt;
}
// 3. Session history-based classification
if let Some(wt) = self.classify_by_session(context.session_id) {
return wt;
}
// 4. ML-based classification (if enabled)
if let Some(model) = &self.ml_model {
return model.predict(query);
}
WorkloadType::Mixed
}
fn classify_by_pattern(&self, query: &str) -> Option<WorkloadType> {
// Vector operations
if query.contains("<->") || query.contains("vector") || query.contains("embedding") {
return Some(WorkloadType::Vector);
}
// OLAP patterns
if query.contains("GROUP BY") || query.contains("HAVING") ||
query.contains("COUNT(") || query.contains("SUM(") ||
query.contains("AVG(") {
return Some(WorkloadType::OLAP);
}
// RAG patterns
if query.contains("chunks") && query.contains("documents") {
return Some(WorkloadType::RAG);
}
// AI Agent patterns
if query.contains("conversation") || query.contains("tool_") ||
query.contains("agent_") {
return Some(WorkloadType::AIAgent);
}
// OLTP patterns (simple CRUD)
if self.is_simple_crud(query) {
return Some(WorkloadType::OLTP);
}
None
}
}

Multi-Tier Cache Implementation

L1 Hot Cache (In-Memory)

pub struct HotCache {
/// Main cache storage
cache: DashMap<CacheKey, CacheEntry>,
/// LRU with frequency tracking
eviction: Arc<LFUEviction>,
/// Per-session affinity
session_affinity: DashMap<SessionId, HashSet<CacheKey>>,
/// Size tracking
current_size: AtomicUsize,
max_size: usize,
}
impl HotCache {
pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
if let Some(entry) = self.cache.get(key) {
// Update access frequency
self.eviction.touch(key);
// Check TTL
if !entry.is_expired() {
return Some(entry.clone());
}
// Expired, remove
self.cache.remove(key);
}
None
}
pub fn insert(&self, key: CacheKey, entry: CacheEntry, session: Option<SessionId>) {
let entry_size = entry.size();
// Evict if needed
while self.current_size.load(Ordering::Relaxed) + entry_size > self.max_size {
if let Some(evicted) = self.eviction.evict_one() {
self.cache.remove(&evicted);
self.current_size.fetch_sub(
self.cache.get(&evicted).map(|e| e.size()).unwrap_or(0),
Ordering::Relaxed
);
} else {
break; // No more to evict
}
}
// Insert
self.cache.insert(key.clone(), entry);
self.current_size.fetch_add(entry_size, Ordering::Relaxed);
self.eviction.insert(key.clone());
// Track session affinity
if let Some(sid) = session {
self.session_affinity.entry(sid)
.or_default()
.insert(key);
}
}
}

L2 Warm Cache (Local SSD)

pub struct WarmCache {
/// RocksDB for persistent storage
db: rocksdb::DB,
/// Bloom filter for fast negative lookups
bloom: BloomFilter,
/// Compression
compression: CompressionType,
/// Size tracking
current_size: AtomicU64,
max_size: u64,
}
impl WarmCache {
pub fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
// Fast path: bloom filter check
if !self.bloom.may_contain(&key.to_bytes()) {
return None;
}
// Lookup in RocksDB
let bytes = self.db.get(&key.to_bytes()).ok()??;
// Decompress and deserialize
let decompressed = self.decompress(&bytes)?;
let entry: CacheEntry = bincode::deserialize(&decompressed).ok()?;
// Check TTL
if entry.is_expired() {
self.db.delete(&key.to_bytes()).ok();
return None;
}
Some(entry)
}
pub fn insert(&self, key: CacheKey, entry: CacheEntry) {
let serialized = bincode::serialize(&entry).unwrap();
let compressed = self.compress(&serialized);
// Evict if needed
while self.current_size.load(Ordering::Relaxed) + compressed.len() as u64 > self.max_size {
self.evict_oldest();
}
// Insert
self.db.put(&key.to_bytes(), &compressed).ok();
self.bloom.insert(&key.to_bytes());
self.current_size.fetch_add(compressed.len() as u64, Ordering::Relaxed);
}
fn compress(&self, data: &[u8]) -> Vec<u8> {
match self.compression {
CompressionType::None => data.to_vec(),
CompressionType::Lz4 => lz4_flex::compress(data),
CompressionType::Zstd => zstd::encode_all(data, 3).unwrap(),
}
}
}

L3 Distributed Cache (Mesh)

pub struct DistributedCache {
/// Consistent hash ring for key distribution
hash_ring: Arc<HashRing<PeerId>>,
/// Peer connections
peers: DashMap<PeerId, PeerConnection>,
/// Local cache for owned keys
local: Arc<HotCache>,
/// Replication factor
replication_factor: u32,
/// Gossip protocol for membership
gossip: Arc<GossipProtocol>,
}
impl DistributedCache {
pub async fn get(&self, key: &CacheKey) -> Option<CacheEntry> {
// 1. Determine owner(s) of the key
let owners = self.hash_ring.get_nodes(&key.to_bytes(), self.replication_factor);
// 2. Check if we own it locally
if owners.contains(&self.local_peer_id) {
if let Some(entry) = self.local.get(key) {
return Some(entry);
}
}
// 3. Query remote owners
for owner in owners {
if owner == self.local_peer_id {
continue;
}
if let Some(peer) = self.peers.get(&owner) {
if let Ok(entry) = peer.get(key).await {
// Cache locally for future access
self.local.insert(key.clone(), entry.clone(), None);
return Some(entry);
}
}
}
None
}
pub async fn insert(&self, key: CacheKey, entry: CacheEntry) {
let owners = self.hash_ring.get_nodes(&key.to_bytes(), self.replication_factor);
// Insert to all replica owners
let futures: Vec<_> = owners.iter()
.map(|owner| {
if *owner == self.local_peer_id {
Box::pin(async {
self.local.insert(key.clone(), entry.clone(), None);
Ok(())
})
} else if let Some(peer) = self.peers.get(owner) {
Box::pin(peer.insert(key.clone(), entry.clone()))
} else {
Box::pin(async { Err(CacheError::PeerNotFound) })
}
})
.collect();
// Wait for majority
let required_acks = (self.replication_factor / 2) + 1;
let _ = futures_util::future::select_ok(
futures.into_iter().take(required_acks as usize)
).await;
}
}

Predictive Prefetcher

pub struct PredictivePrefetcher {
/// Query sequence patterns
patterns: DashMap<QueryFingerprint, Vec<QueryFingerprint>>,
/// Session-based prediction
session_sequences: DashMap<SessionId, VecDeque<QueryFingerprint>>,
/// Temporal patterns (time-of-day based)
temporal_patterns: TemporalPatternStore,
/// Prefetch queue
prefetch_queue: Arc<PrefetchQueue>,
/// Background prefetch worker
worker: JoinHandle<()>,
}
impl PredictivePrefetcher {
/// Record query execution for pattern learning
pub fn record(&self, session: &str, fingerprint: QueryFingerprint) {
// Add to session sequence
let mut seq = self.session_sequences.entry(session.to_string())
.or_insert_with(VecDeque::new);
// Learn patterns from sequence
if seq.len() >= 2 {
let prev = seq.back().unwrap().clone();
self.patterns.entry(prev)
.or_default()
.push(fingerprint.clone());
}
seq.push_back(fingerprint);
// Keep last N queries
while seq.len() > 100 {
seq.pop_front();
}
}
/// Predict and prefetch next likely queries
pub fn predict_and_prefetch(&self, current: &QueryFingerprint, session: &str) {
// 1. Pattern-based prediction
if let Some(next_queries) = self.patterns.get(current) {
// Get most common next queries
let predictions = self.get_top_predictions(next_queries.value());
for (fingerprint, confidence) in predictions {
if confidence > 0.3 {
self.prefetch_queue.enqueue(PrefetchRequest {
fingerprint,
priority: (confidence * 100.0) as u32,
});
}
}
}
// 2. Temporal prediction
if let Some(temporal) = self.temporal_patterns.predict_for_time(Utc::now()) {
for fingerprint in temporal {
self.prefetch_queue.enqueue(PrefetchRequest {
fingerprint,
priority: 50, // Medium priority
});
}
}
}
/// Background prefetch worker
async fn prefetch_worker(&self) {
loop {
if let Some(request) = self.prefetch_queue.dequeue().await {
// Check if already cached
if self.cache.contains(&request.fingerprint) {
continue;
}
// Execute query and cache result
if let Ok(result) = self.execute_query(&request.fingerprint).await {
self.cache.insert(request.fingerprint, result);
}
}
}
}
}

WAL-Based Invalidator

pub struct WalInvalidator {
/// WAL stream subscription
wal_stream: WalStreamer,
/// Table to cache key index
table_index: DashMap<TableName, HashSet<CacheKey>>,
/// Cache reference
cache: Arc<HeliosDistribCache>,
}
impl WalInvalidator {
/// Start listening to WAL for invalidation
pub async fn start(&self) {
let mut stream = self.wal_stream.subscribe().await;
while let Some(entry) = stream.next().await {
match entry.operation {
WalOperation::Put { key, .. } |
WalOperation::Delete { key } => {
let table = extract_table(&key);
self.invalidate_table(&table);
}
WalOperation::UpdateCounter { table_name, .. } => {
// Sequence changes may affect caches
self.invalidate_table(&table_name);
}
_ => {}
}
}
}
fn invalidate_table(&self, table: &str) {
if let Some(keys) = self.table_index.get(table) {
for key in keys.iter() {
self.cache.invalidate(key);
}
}
}
/// Selective invalidation based on row
fn invalidate_row(&self, table: &str, row_key: &[u8]) {
// For fine-grained invalidation
let pattern = format!("{}:{}", table, hex::encode(row_key));
for entry in self.cache.l1_hot.iter() {
if entry.key().matches_pattern(&pattern) {
self.cache.invalidate(entry.key());
}
}
}
}

Cache Heatmap Analytics

pub struct CacheHeatmap {
/// Access counts per table
table_accesses: DashMap<TableName, AccessStats>,
/// Access counts per query fingerprint
query_accesses: DashMap<QueryFingerprint, AccessStats>,
/// Time-bucketed access data
time_buckets: RwLock<Vec<TimeBucket>>,
/// Heatmap grid (for visualization)
grid: RwLock<HeatmapGrid>,
}
#[derive(Debug, Clone)]
pub struct AccessStats {
pub hits: AtomicU64,
pub misses: AtomicU64,
pub total_time_saved_us: AtomicU64,
pub last_access: AtomicU64,
}
#[derive(Debug, Clone)]
pub struct TimeBucket {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub accesses: HashMap<TableName, u64>,
pub hit_ratio: f64,
}
impl CacheHeatmap {
pub fn record_access(&self, key: &CacheKey, hit: bool, time_saved: Duration) {
// Update table stats
let table = key.table();
let stats = self.table_accesses.entry(table.to_string())
.or_insert_with(AccessStats::default);
if hit {
stats.hits.fetch_add(1, Ordering::Relaxed);
stats.total_time_saved_us.fetch_add(
time_saved.as_micros() as u64,
Ordering::Relaxed
);
} else {
stats.misses.fetch_add(1, Ordering::Relaxed);
}
// Update query stats
let query_stats = self.query_accesses.entry(key.fingerprint())
.or_insert_with(AccessStats::default);
if hit {
query_stats.hits.fetch_add(1, Ordering::Relaxed);
} else {
query_stats.misses.fetch_add(1, Ordering::Relaxed);
}
// Update time bucket
self.update_time_bucket(table, hit);
}
/// Generate heatmap visualization data
pub fn generate_heatmap(&self) -> HeatmapData {
let tables: Vec<_> = self.table_accesses.iter()
.map(|entry| {
let stats = entry.value();
let hits = stats.hits.load(Ordering::Relaxed);
let misses = stats.misses.load(Ordering::Relaxed);
TableHeat {
name: entry.key().clone(),
total_accesses: hits + misses,
hit_ratio: hits as f64 / (hits + misses).max(1) as f64,
time_saved_ms: stats.total_time_saved_us.load(Ordering::Relaxed) / 1000,
temperature: self.calculate_temperature(hits + misses),
}
})
.sorted_by(|a, b| b.total_accesses.cmp(&a.total_accesses))
.collect();
HeatmapData {
tables,
time_series: self.get_time_series(),
recommendations: self.generate_recommendations(),
}
}
/// Generate optimization recommendations
fn generate_recommendations(&self) -> Vec<Recommendation> {
let mut recs = Vec::new();
for entry in self.table_accesses.iter() {
let stats = entry.value();
let hits = stats.hits.load(Ordering::Relaxed);
let misses = stats.misses.load(Ordering::Relaxed);
let hit_ratio = hits as f64 / (hits + misses).max(1) as f64;
// Low hit ratio recommendation
if hit_ratio < 0.5 && (hits + misses) > 1000 {
recs.push(Recommendation {
table: entry.key().clone(),
issue: "Low cache hit ratio".to_string(),
suggestion: format!(
"Consider increasing TTL or cache size for '{}' (current hit ratio: {:.1}%)",
entry.key(),
hit_ratio * 100.0
),
priority: Priority::High,
});
}
// Cold data in hot cache
let last_access = stats.last_access.load(Ordering::Relaxed);
if last_access > 0 {
let age = Duration::from_nanos(now_nanos() - last_access);
if age > Duration::from_secs(3600) && (hits + misses) < 100 {
recs.push(Recommendation {
table: entry.key().clone(),
issue: "Cold data in cache".to_string(),
suggestion: format!(
"'{}' hasn't been accessed in {:.0} minutes, consider reducing TTL",
entry.key(),
age.as_secs_f64() / 60.0
),
priority: Priority::Medium,
});
}
}
}
recs
}
}

Workload Scheduler

pub struct WorkloadScheduler {
/// Queue per workload type
queues: HashMap<WorkloadType, PriorityQueue>,
/// Current workload distribution
distribution: RwLock<WorkloadDistribution>,
/// Scheduling policy
policy: SchedulingPolicy,
/// Resource limits per workload
limits: HashMap<WorkloadType, ResourceLimit>,
}
#[derive(Debug, Clone)]
pub struct ResourceLimit {
/// Max concurrent queries
pub max_concurrent: u32,
/// Max cache memory
pub max_cache_mb: usize,
/// Priority weight
pub priority_weight: f64,
}
#[derive(Debug, Clone, Copy)]
pub enum SchedulingPolicy {
/// Strict priority (OLTP always first)
StrictPriority,
/// Weighted fair queuing
WeightedFair,
/// Time-based (OLAP during off-hours)
TimeBased,
/// Adaptive (learn optimal distribution)
Adaptive,
}
impl WorkloadScheduler {
/// Schedule a query based on workload type
pub fn schedule(&self, query: ScheduledQuery) -> ScheduleResult {
let workload = query.workload_type;
let limit = self.limits.get(&workload).unwrap();
// Check current concurrency
let current = self.get_current_concurrency(&workload);
if current >= limit.max_concurrent {
return ScheduleResult::Queued { position: self.queue_position(&workload) };
}
// Apply scheduling policy
match self.policy {
SchedulingPolicy::StrictPriority => {
self.schedule_strict_priority(query)
}
SchedulingPolicy::WeightedFair => {
self.schedule_weighted_fair(query)
}
SchedulingPolicy::TimeBased => {
self.schedule_time_based(query)
}
SchedulingPolicy::Adaptive => {
self.schedule_adaptive(query)
}
}
}
fn schedule_time_based(&self, query: ScheduledQuery) -> ScheduleResult {
let hour = Utc::now().hour();
// Business hours (9-18): prioritize OLTP
if hour >= 9 && hour < 18 {
if query.workload_type == WorkloadType::OLTP {
return ScheduleResult::Execute { priority: Priority::High };
}
if query.workload_type == WorkloadType::OLAP {
return ScheduleResult::Execute { priority: Priority::Low };
}
}
// Off-hours: prioritize OLAP
if query.workload_type == WorkloadType::OLAP {
return ScheduleResult::Execute { priority: Priority::High };
}
ScheduleResult::Execute { priority: Priority::Normal }
}
fn schedule_adaptive(&self, query: ScheduledQuery) -> ScheduleResult {
// Learn optimal distribution based on historical performance
let distribution = self.distribution.read();
// Calculate ideal distribution
let ideal = distribution.calculate_ideal();
// Adjust based on current state
let current = self.get_current_distribution();
if current.is_below_ideal(&ideal, query.workload_type) {
ScheduleResult::Execute { priority: Priority::High }
} else if current.is_above_ideal(&ideal, query.workload_type) {
ScheduleResult::Queued { position: 0 }
} else {
ScheduleResult::Execute { priority: Priority::Normal }
}
}
}

API Specification

Configuration (heliosproxy.toml)

[distribcache]
enabled = true
# Multi-tier configuration
[distribcache.l1]
enabled = true
size_mb = 256
max_entry_size_kb = 1024
eviction = "lfu" # lru, lfu, adaptive
[distribcache.l2]
enabled = true
size_gb = 5
path = "/var/lib/heliosproxy/cache"
compression = "lz4" # none, lz4, zstd
[distribcache.l3]
enabled = true
replication_factor = 2
peers = [
"proxy-1.internal:9100",
"proxy-2.internal:9100",
"proxy-3.internal:9100"
]
# Workload-specific settings
[distribcache.workloads.oltp]
cache_ttl = "60s"
max_concurrent = 500
priority_weight = 1.0
[distribcache.workloads.olap]
cache_ttl = "30m"
max_concurrent = 50
priority_weight = 0.3
[distribcache.workloads.vector]
cache_ttl = "5m"
max_concurrent = 100
priority_weight = 0.5
[distribcache.workloads.ai_agent]
cache_ttl = "10m"
max_concurrent = 200
priority_weight = 0.7
# Prefetching
[distribcache.prefetch]
enabled = true
lookahead = 3
confidence_threshold = 0.3
max_prefetch_queue = 100
# Invalidation
[distribcache.invalidation]
mode = "wal" # wal, ttl, hybrid
wal_lag_tolerance = "100ms"
# Scheduling
[distribcache.scheduling]
policy = "weighted_fair" # strict_priority, weighted_fair, time_based, adaptive
oltp_hours = "09:00-18:00"
olap_hours = "00:00-06:00"
# Heatmap
[distribcache.heatmap]
enabled = true
bucket_size = "5m"
retention = "7d"

Admin API

GET /distribcache/stats
{
"l1": {
"size_mb": 234,
"max_size_mb": 256,
"entries": 15234,
"hit_ratio": 0.92,
"evictions_per_sec": 12.3
},
"l2": {
"size_gb": 3.2,
"max_size_gb": 5,
"entries": 125000,
"hit_ratio": 0.78,
"compression_ratio": 2.3
},
"l3": {
"total_capacity_gb": 15,
"used_gb": 8.5,
"peers": 3,
"healthy_peers": 3,
"replication_lag_ms": 5
},
"overall_hit_ratio": 0.89,
"time_saved_seconds": 1234.5,
"queries_avoided": 150000
}
GET /distribcache/heatmap
{
"tables": [
{
"name": "users",
"total_accesses": 50000,
"hit_ratio": 0.95,
"time_saved_ms": 25000,
"temperature": "hot"
},
{
"name": "analytics_events",
"total_accesses": 5000,
"hit_ratio": 0.45,
"time_saved_ms": 15000,
"temperature": "warm"
}
],
"time_series": [
{ "timestamp": "...", "hit_ratio": 0.89 },
{ "timestamp": "...", "hit_ratio": 0.91 }
],
"recommendations": [
{
"table": "orders",
"issue": "Low cache hit ratio",
"suggestion": "Consider increasing TTL from 30s to 5m",
"priority": "high"
}
]
}
GET /distribcache/workloads
{
"distribution": {
"oltp": { "current_pct": 65, "target_pct": 60, "queued": 5 },
"olap": { "current_pct": 20, "target_pct": 25, "queued": 10 },
"vector": { "current_pct": 10, "target_pct": 10, "queued": 0 },
"ai_agent": { "current_pct": 5, "target_pct": 5, "queued": 2 }
},
"scheduling_policy": "weighted_fair",
"current_hour_policy": "oltp_priority"
}
POST /distribcache/prefetch
# Manually trigger prefetch for pattern
{ "fingerprint": "abc123" }
POST /distribcache/invalidate
# Invalidate cache entries
{ "tables": ["users", "orders"] }
POST /distribcache/resize
# Dynamically resize cache
{ "l1_size_mb": 512, "l2_size_gb": 10 }

AI/Agent Innovations

1. Conversation Context Cache

Specialized caching for AI agent conversations:

pub struct ConversationContextCache {
/// Recent turns per conversation
contexts: DashMap<ConversationId, ConversationContext>,
/// LRU for conversation eviction
lru: Mutex<LruCache<ConversationId, ()>>,
/// Max turns to cache per conversation
max_turns: usize,
}
impl ConversationContextCache {
pub fn get_context(&self, conv_id: &str, max_turns: usize) -> Option<Vec<Turn>> {
let ctx = self.contexts.get(conv_id)?;
// Return recent turns
Some(ctx.turns.iter()
.rev()
.take(max_turns)
.rev()
.cloned()
.collect())
}
pub fn append_turn(&self, conv_id: &str, turn: Turn) {
let mut ctx = self.contexts.entry(conv_id.to_string())
.or_insert_with(ConversationContext::new);
ctx.turns.push(turn);
// Maintain size limit
while ctx.turns.len() > self.max_turns {
ctx.turns.remove(0);
}
// Update LRU
self.lru.lock().put(conv_id.to_string(), ());
}
}

2. RAG Chunk Cache

Cache document chunks for RAG pipelines:

pub struct RagChunkCache {
/// Chunk cache (id -> content)
chunks: Arc<HotCache>,
/// Embedding -> chunk ID mapping
embedding_to_chunks: DashMap<EmbeddingHash, Vec<ChunkId>>,
/// Prefetch related chunks
prefetcher: Arc<ChunkPrefetcher>,
}
impl RagChunkCache {
pub async fn get_chunks(&self, embedding: &[f32], k: usize) -> Vec<Chunk> {
let hash = hash_embedding(embedding);
// Check if we've seen this embedding before
if let Some(chunk_ids) = self.embedding_to_chunks.get(&hash) {
// Retrieve cached chunks
let chunks: Vec<_> = chunk_ids.iter()
.filter_map(|id| self.chunks.get(&CacheKey::chunk(*id)))
.take(k)
.collect();
if chunks.len() >= k {
return chunks;
}
}
// Cache miss, perform ANN search
let results = self.perform_ann_search(embedding, k).await;
// Cache results
let chunk_ids: Vec<_> = results.iter().map(|c| c.id).collect();
self.embedding_to_chunks.insert(hash, chunk_ids.clone());
for chunk in &results {
self.chunks.insert(
CacheKey::chunk(chunk.id),
CacheEntry::from_chunk(chunk),
None
);
}
// Prefetch related chunks
self.prefetcher.prefetch_related(&chunk_ids);
results
}
}

3. Tool Result Cache

Cache deterministic tool call results:

pub struct ToolResultCache {
/// Tool -> Result cache
cache: DashMap<ToolCallKey, ToolResult>,
/// Tool determinism configuration
deterministic_tools: HashSet<String>,
}
impl ToolResultCache {
pub async fn execute_with_cache(
&self,
tool: &str,
params: &Value,
executor: impl Future<Output = ToolResult>,
) -> ToolResult {
// Check if tool is deterministic
if !self.deterministic_tools.contains(tool) {
return executor.await;
}
let key = ToolCallKey::new(tool, params);
// Check cache
if let Some(result) = self.cache.get(&key) {
return result.clone();
}
// Execute and cache
let result = executor.await;
self.cache.insert(key, result.clone());
result
}
}

4. Semantic Query Cache

Cache based on query semantics, not exact match:

pub struct SemanticQueryCache {
/// Query embeddings index
query_index: HnswIndex,
/// Embedding -> result mapping
results: DashMap<VectorId, CacheEntry>,
/// Similarity threshold
threshold: f32,
}
impl SemanticQueryCache {
pub async fn get_semantic(&self, query: &str) -> Option<CacheEntry> {
// Embed the query
let embedding = self.embed_query(query).await;
// Search for similar cached queries
let similar = self.query_index.search(&embedding, 1);
if let Some(hit) = similar.first() {
if hit.similarity > self.threshold {
return self.results.get(&hit.id).map(|r| r.clone());
}
}
None
}
pub fn cache_semantic(&self, query: &str, result: CacheEntry) {
let embedding = self.embed_query_sync(query);
let id = self.query_index.insert(&embedding);
self.results.insert(id, result);
}
}

HeliosDB-Lite Integration

1. WAL-Driven Invalidation

Leverage HeliosDB-Lite’s WAL for cache coherency:

impl WalInvalidator {
pub async fn connect_to_helios_wal(&self, wal_endpoint: &str) -> Result<()> {
let mut stream = WalStreamer::connect(wal_endpoint).await?;
while let Some(entry) = stream.next().await {
// Extract table from HeliosDB-Lite key format
let table = match &entry.operation {
WalOperation::Put { key, .. } => extract_table_from_key(key),
WalOperation::Delete { key } => extract_table_from_key(key),
_ => continue,
};
// Invalidate cached queries for this table
self.cache.invalidate_table(&table);
}
Ok(())
}
}

2. Branch-Aware Caching

Separate caches per branch:

pub struct BranchAwareDistribCache {
/// Cache per branch
branch_caches: DashMap<BranchName, Arc<HeliosDistribCache>>,
/// Default cache (for main branch)
main_cache: Arc<HeliosDistribCache>,
}
impl BranchAwareDistribCache {
pub fn get(&self, branch: &str, key: &CacheKey) -> Option<CacheEntry> {
if branch == "main" {
return self.main_cache.get(key);
}
self.branch_caches.get(branch)
.and_then(|c| c.get(key))
}
/// Promote cache entries when branch merges
pub fn promote_on_merge(&self, source: &str, target: &str) {
if let Some(source_cache) = self.branch_caches.get(source) {
let target_cache = if target == "main" {
self.main_cache.clone()
} else {
self.branch_caches.entry(target.to_string())
.or_insert_with(|| Arc::new(HeliosDistribCache::new()))
.clone()
};
// Copy valid entries
source_cache.copy_valid_entries_to(&target_cache);
}
}
}

3. Vector Result Caching

Cache HNSW search results:

pub struct VectorSearchCache {
/// Cache by (query vector hash, k, ef_search)
results: DashMap<VectorSearchKey, Vec<SearchResult>>,
/// Index version tracking (invalidate on rebuild)
index_versions: DashMap<IndexName, u64>,
}
impl VectorSearchCache {
pub fn get_ann_results(
&self,
query: &[f32],
k: usize,
ef_search: usize,
index: &str,
) -> Option<Vec<SearchResult>> {
let key = VectorSearchKey {
vector_hash: hash_vector(query),
k,
ef_search,
index_version: self.index_versions.get(index)?.clone(),
};
self.results.get(&key).map(|r| r.clone())
}
/// Invalidate on index rebuild
pub fn on_index_rebuild(&self, index: &str) {
let new_version = self.index_versions.entry(index.to_string())
.and_modify(|v| *v += 1)
.or_insert(1);
// Old entries will miss due to version mismatch
info!("Index {} rebuilt, cache invalidated (version {})", index, new_version);
}
}

4. Time-Travel Cache

Cache historical queries (immutable):

pub struct TimeTravelCache {
/// Historical queries are immutable, cache forever
historical: DashMap<(QueryFingerprint, Timestamp), CacheEntry>,
}
impl TimeTravelCache {
pub fn get_historical(
&self,
fingerprint: &QueryFingerprint,
as_of: Timestamp,
) -> Option<CacheEntry> {
self.historical.get(&(fingerprint.clone(), as_of))
.map(|e| e.clone())
}
pub fn cache_historical(
&self,
fingerprint: QueryFingerprint,
as_of: Timestamp,
result: CacheEntry,
) {
// Historical data never changes, no TTL needed
self.historical.insert((fingerprint, as_of), result);
}
}

Implementation Notes

File Locations

src/proxy/
├── distribcache/
│ ├── mod.rs # Public API
│ ├── config.rs # DistribCacheConfig
│ ├── classifier.rs # WorkloadClassifier
│ ├── tiers/
│ │ ├── mod.rs
│ │ ├── l1_hot.rs # HotCache (in-memory)
│ │ ├── l2_warm.rs # WarmCache (SSD)
│ │ └── l3_distributed.rs # DistributedCache (mesh)
│ ├── prefetcher.rs # PredictivePrefetcher
│ ├── invalidator.rs # WalInvalidator
│ ├── heatmap.rs # CacheHeatmap
│ ├── scheduler.rs # WorkloadScheduler
│ ├── ai/
│ │ ├── conversation.rs # ConversationContextCache
│ │ ├── rag.rs # RagChunkCache
│ │ ├── tools.rs # ToolResultCache
│ │ └── semantic.rs # SemanticQueryCache
│ └── metrics.rs # Prometheus metrics

Key Considerations

  1. Memory Management: Carefully track memory usage across tiers. Implement backpressure.

  2. Cache Coherency: WAL-based invalidation must handle lag. Consider versioning.

  3. Distributed Coordination: Use gossip protocol for peer discovery. Handle partitions.

  4. Serialization: Use efficient formats (bincode, postcard) for cache entries.

  5. Monitoring: Comprehensive metrics for hit ratios, latencies, evictions.

Performance Targets

MetricTargetMeasurement
L1 lookup<100μsp99
L2 lookup<1msp99
L3 lookup<10msp99 (cross-network)
Overall hit ratio>85%for cacheable queries
Prefetch hit rate>50%for predicted queries
Invalidation latency<100msfrom WAL to cache
Heatmap generation<1sfull report

Prometheus Metrics

# Cache tier metrics
heliosproxy_distribcache_hits_total{tier="l1"} 150000
heliosproxy_distribcache_hits_total{tier="l2"} 30000
heliosproxy_distribcache_hits_total{tier="l3"} 10000
heliosproxy_distribcache_misses_total 20000
# Latency histograms per tier
heliosproxy_distribcache_lookup_seconds_bucket{tier="l1",le="0.0001"} 140000
heliosproxy_distribcache_lookup_seconds_bucket{tier="l2",le="0.001"} 29000
# Workload distribution
heliosproxy_workload_queries_total{type="oltp"} 100000
heliosproxy_workload_queries_total{type="olap"} 10000
heliosproxy_workload_queries_total{type="vector"} 20000
# Prefetch metrics
heliosproxy_prefetch_hits_total 50000
heliosproxy_prefetch_misses_total 10000
# Heatmap data
heliosproxy_table_accesses_total{table="users"} 50000
heliosproxy_table_hit_ratio{table="users"} 0.95

Timeline

Q1-Q2 2027: Helios-DistribCache Development
Month 1-2: Core Cache Infrastructure
├── L1 hot cache implementation
├── L2 warm cache with RocksDB
├── Workload classifier
└── Basic invalidation
Month 3-4: Distributed Layer
├── L3 distributed cache mesh
├── Consistent hashing
├── Peer discovery (gossip)
└── Replication
Month 5-6: Intelligence Layer
├── Predictive prefetcher
├── WAL-based invalidation
├── Heatmap analytics
└── Workload scheduler
Month 7-8: AI/Agent Optimizations
├── Conversation context cache
├── RAG chunk cache
├── Semantic query cache
└── Tool result cache
Month 9: Integration & Testing
├── HeliosDB-Lite integration
├── Performance testing
├── Documentation
└── Release