Feature 02: Query Caching
Feature 02: Query Caching
Priority: Critical | Complexity: High | Phase: 1 (Foundation)
Overview
Problem Statement
Databases repeatedly execute identical queries, especially in read-heavy applications:
- Dashboard queries refreshing every 30 seconds
- API endpoints serving cached content
- AI agents re-fetching context for each interaction
- RAG pipelines retrieving the same embeddings
Each query consumes:
- Network round-trip (1-10ms)
- Query parsing and planning (0.1-1ms)
- Execution and I/O (1-100ms+)
Solution
Implement a multi-tier query cache at the proxy layer:
┌─────────────────────────────────────────────────┐ │ QUERY CACHE LAYER │ │ │ Query ───────────►│ ┌──────────────────────────────────────────────┐│ ││ L1: Hot Cache (in-memory, <1ms) ││ ││ - Exact query match ││ ││ - LRU eviction ││ │└──────────────────────────────────────────────┘│ │ │ miss │ │ ▼ │ │ ┌──────────────────────────────────────────────┐│ ││ L2: Warm Cache (shared memory, <5ms) ││ ││ - Normalized query match ││ ││ - TTL-based expiration ││ │└──────────────────────────────────────────────┘│ │ │ miss │ │ ▼ │ │ ┌──────────────────────────────────────────────┐│ ││ L3: Semantic Cache (vector similarity, <20ms) ││ ││ - Embedding-based matching ││ ││ - AI query deduplication ││ │└──────────────────────────────────────────────┘│ │ │ miss │ │ ▼ │ │ BACKEND │ └─────────────────────────────────────────────────┘Architecture
Cache Components
pub struct QueryCache { /// L1: Per-connection hot cache (exact match) l1_hot: DashMap<QueryHash, CachedResult>,
/// L2: Shared normalized cache (parameterized) l2_warm: Arc<NormalizedCache>,
/// L3: Semantic similarity cache (AI workloads) l3_semantic: Option<Arc<SemanticCache>>,
/// Cache invalidation manager invalidator: Arc<InvalidationManager>,
/// Configuration config: CacheConfig,}
pub struct CacheConfig { /// Enable/disable cache pub enabled: bool,
/// L1 hot cache size (entries) pub l1_size: usize,
/// L2 warm cache size (MB) pub l2_size_mb: usize,
/// L3 semantic cache (requires vector support) pub l3_enabled: bool,
/// Default TTL for cached results pub default_ttl: Duration,
/// Maximum result size to cache (bytes) pub max_result_size: usize,
/// Tables to cache (whitelist) pub cached_tables: HashSet<String>,
/// Tables to never cache (blacklist) pub excluded_tables: HashSet<String>,}
pub struct CachedResult { /// Serialized query result pub data: Bytes,
/// Row count for statistics pub row_count: usize,
/// Cached timestamp pub cached_at: Instant,
/// Time-to-live pub ttl: Duration,
/// Tables involved (for invalidation) pub tables: Vec<String>,
/// Original query execution time pub execution_time: Duration,}Query Normalization
/// Normalize queries for cache key generationpub struct QueryNormalizer { parser: SqlParser,}
impl QueryNormalizer { /// Normalize query to canonical form pub fn normalize(&self, query: &str) -> NormalizedQuery { let ast = self.parser.parse(query);
// Extract parameters let params = self.extract_parameters(&ast);
// Generate normalized template let template = self.generate_template(&ast);
NormalizedQuery { template_hash: hash(&template), parameters: params, tables: self.extract_tables(&ast), } }}
// Example:// Input: "SELECT * FROM users WHERE id = 123 AND status = 'active'"// Output: NormalizedQuery {// template_hash: 0xABC123,// template: "SELECT * FROM users WHERE id = $1 AND status = $2",// parameters: [123, "active"],// tables: ["users"]// }Cache Invalidation
/// Table-based cache invalidationpub struct InvalidationManager { /// Table -> cached query hashes table_index: DashMap<String, HashSet<QueryHash>>,
/// WAL listener for write detection wal_listener: Option<WalListener>,}
impl InvalidationManager { /// Invalidate all queries touching a table pub fn invalidate_table(&self, table: &str) { if let Some(queries) = self.table_index.get(table) { for query_hash in queries.iter() { self.cache.remove(query_hash); } } }
/// Listen to WAL for automatic invalidation pub async fn watch_wal(&self) { while let Some(entry) = self.wal_listener.next().await { match entry.operation { WalOp::Insert { table, .. } | WalOp::Update { table, .. } | WalOp::Delete { table, .. } => { self.invalidate_table(&table); } _ => {} } } }}API Specification
Configuration (heliosproxy.toml)
[cache]enabled = true
# L1: Hot cache (per-connection, in-memory)[cache.l1]enabled = truesize = 1000 # Entries per connectionttl = "30s"
# L2: Warm cache (shared, larger)[cache.l2]enabled = truesize_mb = 512 # Total cache sizettl = "5m"normalize_queries = true # Enable parameterized caching
# L3: Semantic cache (AI workloads)[cache.l3]enabled = truesimilarity_threshold = 0.95 # Cosine similarityembedding_model = "all-MiniLM-L6-v2"max_entries = 10000
# Invalidation[cache.invalidation]mode = "wal" # "wal", "ttl", "manual"wal_lag_tolerance = "100ms"
# Table-specific settings[cache.tables.users]ttl = "1m" # Shorter TTL for users tableexclude_columns = ["password_hash"]
[cache.tables.analytics]ttl = "1h" # Longer TTL for analyticsSQL Hints
-- Force cache bypass/*helios:cache=skip*/ SELECT * FROM users WHERE id = 1;
-- Set custom TTL/*helios:cache_ttl=60*/ SELECT * FROM reports;
-- Force cache refresh/*helios:cache=refresh*/ SELECT * FROM dashboard_stats;
-- Semantic cache hint for AI queries/*helios:semantic_cache*/SELECT content FROM documentsWHERE embedding <-> $1 < 0.5LIMIT 10;Admin API
GET /cache/stats{ "l1": { "hits": 15234, "misses": 3421, "hit_ratio": 0.816, "entries": 892, "memory_mb": 45 }, "l2": { "hits": 8765, "misses": 2156, "hit_ratio": 0.802, "entries": 12453, "memory_mb": 234 }, "l3_semantic": { "hits": 1234, "misses": 567, "hit_ratio": 0.685, "entries": 5678 }, "invalidations": { "total": 456, "by_wal": 389, "by_ttl": 67 }, "savings": { "queries_avoided": 25233, "estimated_time_saved_ms": 126165 }}
POST /cache/invalidate# Manually invalidate cache{ "tables": ["users", "orders"] }
POST /cache/clear# Clear all caches{ "levels": ["l1", "l2"] }
GET /cache/entries?table=users&limit=100# Inspect cached entriesAI/Agent Innovations
1. Semantic Query Deduplication
AI agents often ask similar questions with different wording:
-- These should share cache:"SELECT * FROM products WHERE category = 'electronics'""Find all products in the electronics category""Get electronics products"pub struct SemanticCache { /// Vector index for query embeddings query_index: HnswIndex,
/// Query embedding model embedder: Box<dyn Embedder>,
/// Cached results by vector ID results: DashMap<VectorId, CachedResult>,}
impl SemanticCache { pub async fn get_or_compute( &self, query: &str, compute: impl Future<Output = QueryResult>, ) -> QueryResult { // Embed the query let embedding = self.embedder.embed(query).await;
// Search for similar queries let similar = self.query_index.search(&embedding, k: 1);
if let Some(hit) = similar.first() { if hit.similarity > self.config.threshold { // Cache hit! Return cached result if let Some(result) = self.results.get(&hit.id) { return result.clone(); } } }
// Cache miss, compute and store let result = compute.await; let id = self.query_index.insert(&embedding); self.results.insert(id, result.clone()); result }}2. RAG Context Caching
Cache embedding search results for repeated retrievals:
pub struct RagCache { /// Cache embedding search results embedding_results: DashMap<EmbeddingHash, Vec<ChunkId>>,
/// Cache retrieved chunks chunk_cache: DashMap<ChunkId, String>,}
impl RagCache { pub async fn retrieve_context( &self, query_embedding: &[f32], k: usize, ) -> Vec<String> { let hash = hash_embedding(query_embedding);
// Check if we've seen this embedding before if let Some(chunk_ids) = self.embedding_results.get(&hash) { // Return cached chunks return chunk_ids.iter() .filter_map(|id| self.chunk_cache.get(id)) .map(|c| c.clone()) .collect(); }
// Execute ANN search let chunks = self.execute_ann_search(query_embedding, k).await;
// Cache results let chunk_ids: Vec<_> = chunks.iter().map(|c| c.id).collect(); self.embedding_results.insert(hash, chunk_ids.clone()); for chunk in &chunks { self.chunk_cache.insert(chunk.id, chunk.content.clone()); }
chunks.into_iter().map(|c| c.content).collect() }}3. Conversation Memory Cache
Cache conversation context retrievals:
pub struct ConversationCache { /// Recent conversation turns per session turns: DashMap<SessionId, VecDeque<Turn>>,
/// Sliding window size window_size: usize,}
impl ConversationCache { pub fn cache_turn(&self, session: &str, turn: Turn) { let mut turns = self.turns.entry(session.to_string()) .or_insert_with(VecDeque::new);
turns.push_back(turn);
// Maintain sliding window while turns.len() > self.window_size { turns.pop_front(); } }
pub fn get_context(&self, session: &str) -> Vec<Turn> { self.turns.get(session) .map(|t| t.iter().cloned().collect()) .unwrap_or_default() }}4. Tool Result Caching
Cache results from deterministic tool calls:
[cache.tools]# Cache tool call resultsenabled = true
[cache.tools.database_query]ttl = "5m"deterministic = true
[cache.tools.web_search]ttl = "1h"deterministic = false # Results may changeHeliosDB-Lite Integration
1. WAL-Based Invalidation
Leverage HeliosDB-Lite’s WAL streaming for real-time invalidation:
impl InvalidationManager { pub async fn connect_to_wal_stream(&self, wal: &WalStreamer) { let mut stream = wal.subscribe().await;
while let Some(entry) = stream.next().await { // Invalidate based on WAL operations match entry.operation { WalOperation::Put { key, .. } => { let table = extract_table_from_key(&key); self.invalidate_table(&table); } WalOperation::Delete { key } => { let table = extract_table_from_key(&key); self.invalidate_table(&table); } _ => {} } } }}2. Branch-Aware Caching
Separate caches per branch:
pub struct BranchAwareCache { /// Cache per branch caches: DashMap<BranchName, QueryCache>,}
impl BranchAwareCache { pub fn get(&self, branch: &str, query: &NormalizedQuery) -> Option<CachedResult> { self.caches.get(branch)?.get(query) }
pub fn invalidate_branch(&self, branch: &str) { if let Some(cache) = self.caches.get(branch) { cache.clear(); } }
/// Promote cache entries when branch merges pub fn promote_on_merge(&self, source: &str, target: &str) { if let Some(source_cache) = self.caches.get(source) { let target_cache = self.caches.entry(target.to_string()) .or_insert_with(QueryCache::new);
// Copy still-valid entries for (key, value) in source_cache.iter() { if !value.is_expired() { target_cache.insert(key.clone(), value.clone()); } } } }}3. Vector Search Result Caching
Cache ANN query results:
pub struct VectorSearchCache { /// Cache by query vector hash + search params results: DashMap<VectorQueryKey, Vec<SearchResult>>,}
impl VectorSearchCache { pub fn cache_ann_results( &self, query_vector: &[f32], k: usize, ef_search: usize, results: Vec<SearchResult>, ) { let key = VectorQueryKey { vector_hash: hash_vector(query_vector), k, ef_search, };
self.results.insert(key, results); }
pub fn get_ann_results( &self, query_vector: &[f32], k: usize, ef_search: usize, ) -> Option<Vec<SearchResult>> { let key = VectorQueryKey { vector_hash: hash_vector(query_vector), k, ef_search, };
self.results.get(&key).map(|r| r.clone()) }}4. Time-Travel Query Caching
Cache historical queries by timestamp:
pub struct TimeTravelCache { /// Cache by (query, as_of_timestamp) historical: DashMap<(QueryHash, Timestamp), CachedResult>,}
impl TimeTravelCache { pub fn get_historical( &self, query: &NormalizedQuery, as_of: Timestamp, ) -> Option<CachedResult> { // Historical queries are immutable, infinite TTL self.historical.get(&(query.hash(), as_of)).map(|r| r.clone()) }}Implementation Notes
File Locations
src/proxy/├── cache/│ ├── mod.rs # Public API│ ├── query_cache.rs # Main QueryCache implementation│ ├── l1_hot.rs # L1 hot cache│ ├── l2_warm.rs # L2 warm cache│ ├── l3_semantic.rs # L3 semantic cache│ ├── normalizer.rs # Query normalization│ ├── invalidation.rs # Cache invalidation│ └── metrics.rs # Cache metricsKey Considerations
-
Memory Pressure: Implement eviction policies (LRU, LFU) and memory limits. Use background thread for eviction.
-
Serialization: Use efficient serialization (bincode, postcard) for cached results. Consider compression for large results.
-
Cache Stampede: Implement request coalescing - if multiple clients request same query during cache miss, only one executes.
-
Prepared Statements: Cache prepared statement IDs separately from results.
-
Transactions: Don’t cache queries inside transactions (isolation violation risk).
Cache Key Design
/// Cache key for L2 normalized cache#[derive(Hash, Eq, PartialEq)]pub struct CacheKey { /// Hash of normalized query template template_hash: u64,
/// Hash of parameter values params_hash: u64,
/// Database name database: String,
/// User (for RLS) user: Option<String>,
/// Branch (for HeliosDB-Lite) branch: Option<String>,}Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| L1 cache lookup | <10μs | p99 |
| L2 cache lookup | <100μs | p99 |
| L3 semantic lookup | <5ms | p99 |
| Cache hit ratio | >80% | for read-heavy workloads |
| Memory efficiency | <2x result size | overhead |
Related Features
- Connection Pooling - Reduce connection overhead
- Query Analytics - Identify cacheable queries
- Helios-DistribCache - Distributed cache layer