Feature 13: Schema-Aware Routing
Feature 13: Schema-Aware Routing
Priority: Medium | Complexity: High | Phase: 4 (Differentiation)
Overview
Problem Statement
Generic query routing ignores schema semantics:
- All tables treated equally (but some are hot, some cold)
- No awareness of data locality
- Can’t optimize for specific access patterns
- Sharding decisions made blindly
Without schema awareness:
- Hot tables compete with cold tables
- Analytics queries interfere with OLTP
- Vector searches not routed to vector-optimized nodes
- Joins across shards are inefficient
Solution
Implement schema-aware routing that understands data semantics:
┌─────────────────────────────────────────────────┐ │ SCHEMA-AWARE ROUTER │ │ │ │ ┌──────────────────────────────────────────┐ │ │ │ Schema Registry │ │ │ │ ┌────────────────────────────────────┐ │ │ │ │ │ Tables: │ │ │ │ │ │ users: HOT, OLTP, shard_by=id │ │ │ │ │ │ events: COLD, OLAP, partition=d │ │ │ │ │ │ embeddings: VECTOR, gpu_nodes │ │ │ │ │ └────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────┘ │ │ │ │ Query ───────────►│ ┌──────────────────────────────────────────┐ │ │ │ Query Analyzer │ │ │ │ - Extract tables │ │ │ │ - Detect access pattern │ │ │ │ - Identify sharding keys │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ Routing Decision │ │ │ │ - Select optimal node/shard │ │ │ │ - Parallelize if needed │ │ │ │ - Aggregate results │ │ │ └──────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘Architecture
Schema Registry
pub struct SchemaRegistry { /// Table metadata tables: DashMap<TableName, TableSchema>,
/// Index metadata indexes: DashMap<IndexName, IndexSchema>,
/// Relationship graph relationships: Graph<TableName, Relationship>,
/// Sharding configuration sharding: ShardingConfig,
/// Node capabilities node_capabilities: DashMap<NodeId, NodeCapabilities>,}
#[derive(Debug, Clone)]pub struct TableSchema { /// Table name pub name: String,
/// Columns pub columns: Vec<ColumnSchema>,
/// Access pattern classification pub access_pattern: AccessPattern,
/// Temperature (HOT/WARM/COLD) pub temperature: DataTemperature,
/// Workload type pub workload: WorkloadType,
/// Sharding key (if sharded) pub shard_key: Option<String>,
/// Partition key (if partitioned) pub partition_key: Option<PartitionKey>,
/// Preferred nodes pub preferred_nodes: Vec<NodeId>,
/// Estimated row count pub estimated_rows: u64,
/// Average row size pub avg_row_size: usize,}
#[derive(Debug, Clone, Copy)]pub enum AccessPattern { /// Point lookups by primary key PointLookup,
/// Range scans RangeScan,
/// Full table scans (OLAP) FullScan,
/// Vector similarity search VectorSearch,
/// Time-series append TimeSeriesAppend,
/// Mixed Mixed,}
#[derive(Debug, Clone, Copy)]pub enum DataTemperature { /// Frequently accessed, keep in memory Hot,
/// Occasionally accessed Warm,
/// Rarely accessed, can be on slower storage Cold,
/// Archive, acceptable to be slow Frozen,}
#[derive(Debug, Clone, Copy)]pub enum WorkloadType { /// Online Transaction Processing OLTP,
/// Online Analytical Processing OLAP,
/// Hybrid Transactional/Analytical HTAP,
/// Vector/AI workloads Vector,
/// Mixed Mixed,}Query Analyzer
pub struct QueryAnalyzer { parser: SqlParser, schema: Arc<SchemaRegistry>,}
impl QueryAnalyzer { /// Analyze query and determine routing requirements pub fn analyze(&self, query: &str) -> QueryAnalysis { let ast = self.parser.parse(query)?;
let tables = self.extract_tables(&ast); let access_patterns = self.detect_access_patterns(&ast, &tables); let shard_keys = self.extract_shard_keys(&ast, &tables); let workload_type = self.classify_workload(&ast, &tables);
QueryAnalysis { tables, access_patterns, shard_keys, workload_type, complexity: self.estimate_complexity(&ast), selectivity: self.estimate_selectivity(&ast), } }
fn detect_access_patterns(&self, ast: &Ast, tables: &[TableRef]) -> Vec<AccessPattern> { let mut patterns = Vec::new();
for table in tables { let schema = self.schema.get_table(&table.name)?;
// Check for point lookup (WHERE pk = ?) if self.has_equality_on_pk(ast, &schema) { patterns.push(AccessPattern::PointLookup); } // Check for range scan else if self.has_range_predicate(ast, &schema) { patterns.push(AccessPattern::RangeScan); } // Check for vector search else if self.has_vector_operator(ast) { patterns.push(AccessPattern::VectorSearch); } // Default to full scan else { patterns.push(AccessPattern::FullScan); } }
patterns }
fn classify_workload(&self, ast: &Ast, tables: &[TableRef]) -> WorkloadType { // Check for aggregations (OLAP indicator) if self.has_aggregations(ast) { return WorkloadType::OLAP; }
// Check for simple point queries (OLTP indicator) if self.is_simple_crud(ast) { return WorkloadType::OLTP; }
// Check for vector operations if self.has_vector_operator(ast) { return WorkloadType::Vector; }
// Mixed workload WorkloadType::Mixed }}Schema-Aware Router
pub struct SchemaAwareRouter { schema: Arc<SchemaRegistry>, analyzer: QueryAnalyzer, nodes: Arc<NodeRegistry>,}
impl SchemaAwareRouter { pub fn route(&self, query: &str) -> RoutingDecision { let analysis = self.analyzer.analyze(query);
// 1. Determine node requirements based on workload let required_capabilities = self.get_required_capabilities(&analysis);
// 2. Filter eligible nodes let eligible = self.filter_by_capabilities(&required_capabilities);
// 3. Check sharding if let Some(shard_routing) = self.try_shard_routing(&analysis) { return shard_routing; }
// 4. Route based on workload type match analysis.workload_type { WorkloadType::OLTP => self.route_oltp(&eligible, &analysis), WorkloadType::OLAP => self.route_olap(&eligible, &analysis), WorkloadType::Vector => self.route_vector(&eligible, &analysis), WorkloadType::HTAP | WorkloadType::Mixed => self.route_mixed(&eligible, &analysis), } }
fn get_required_capabilities(&self, analysis: &QueryAnalysis) -> NodeCapabilities { let mut caps = NodeCapabilities::default();
// Vector queries need vector-capable nodes if analysis.access_patterns.contains(&AccessPattern::VectorSearch) { caps.vector_search = true; caps.gpu_acceleration = true; // Prefer GPU nodes }
// OLAP queries prefer columnar storage if analysis.workload_type == WorkloadType::OLAP { caps.columnar_storage = true; }
// Hot tables need in-memory nodes for table in &analysis.tables { if let Some(schema) = self.schema.get_table(&table.name) { if schema.temperature == DataTemperature::Hot { caps.in_memory = true; } } }
caps }
fn route_oltp(&self, nodes: &[Node], analysis: &QueryAnalysis) -> RoutingDecision { // OLTP: Low latency, prefer primary or sync standbys let preferred: Vec<_> = nodes.iter() .filter(|n| n.sync_mode == SyncMode::Sync || n.is_primary) .sorted_by_key(|n| n.current_latency_ms) .collect();
RoutingDecision::single(preferred.first().unwrap().clone()) }
fn route_olap(&self, nodes: &[Node], analysis: &QueryAnalysis) -> RoutingDecision { // OLAP: Throughput over latency, prefer async standbys let preferred: Vec<_> = nodes.iter() .filter(|n| n.capabilities.columnar_storage) .sorted_by_key(|n| n.current_load) .collect();
if preferred.is_empty() { // Fall back to any async standby let async_nodes: Vec<_> = nodes.iter() .filter(|n| n.sync_mode == SyncMode::Async) .collect();
return RoutingDecision::single(async_nodes.first().unwrap().clone()); }
RoutingDecision::single(preferred.first().unwrap().clone()) }
fn route_vector(&self, nodes: &[Node], analysis: &QueryAnalysis) -> RoutingDecision { // Vector: Need vector-capable nodes, prefer GPU let vector_nodes: Vec<_> = nodes.iter() .filter(|n| n.capabilities.vector_search) .sorted_by(|a, b| { // Prefer GPU, then lower load b.capabilities.gpu_acceleration.cmp(&a.capabilities.gpu_acceleration) .then_with(|| a.current_load.cmp(&b.current_load)) }) .collect();
RoutingDecision::single(vector_nodes.first().unwrap().clone()) }
fn try_shard_routing(&self, analysis: &QueryAnalysis) -> Option<RoutingDecision> { // Check if query can be routed to specific shard for table in &analysis.tables { if let Some(schema) = self.schema.get_table(&table.name) { if let Some(shard_key) = &schema.shard_key { if let Some(shard_value) = analysis.shard_keys.get(shard_key) { let shard = self.schema.sharding.get_shard(shard_value); return Some(RoutingDecision::shard(shard)); } } } }
// Cross-shard query None }}API Specification
Configuration (heliosproxy.toml)
[schema_routing]enabled = trueauto_discover = truerefresh_interval = "5m"
# Table classifications[[schema_routing.tables]]name = "users"temperature = "hot"workload = "oltp"access_pattern = "point_lookup"preferred_nodes = ["primary", "standby-sync"]
[[schema_routing.tables]]name = "events"temperature = "cold"workload = "olap"access_pattern = "full_scan"preferred_nodes = ["standby-analytics"]
[[schema_routing.tables]]name = "embeddings"temperature = "warm"workload = "vector"access_pattern = "vector_search"preferred_nodes = ["standby-vector-1", "standby-vector-2"]
[[schema_routing.tables]]name = "orders"temperature = "hot"workload = "oltp"shard_key = "customer_id"shard_count = 4
# Node capabilities[[schema_routing.nodes]]name = "standby-analytics"capabilities = { columnar_storage = true, in_memory = false }
[[schema_routing.nodes]]name = "standby-vector-1"capabilities = { vector_search = true, gpu_acceleration = true }Admin API
GET /schema/tables{ "tables": [ { "name": "users", "temperature": "hot", "workload": "oltp", "estimated_rows": 1000000, "preferred_nodes": ["primary", "standby-sync"], "queries_per_minute": 5000 } ]}
GET /schema/routing-stats{ "by_workload": { "oltp": { "queries": 50000, "avg_latency_ms": 2.1 }, "olap": { "queries": 1000, "avg_latency_ms": 150.3 }, "vector": { "queries": 10000, "avg_latency_ms": 5.2 } }, "by_temperature": { "hot": { "queries": 55000, "cache_hit_ratio": 0.95 }, "cold": { "queries": 5000, "cache_hit_ratio": 0.20 } }}
POST /schema/classify# Manually classify table{ "table": "new_table", "temperature": "warm", "workload": "mixed"}
POST /schema/analyze# Analyze query for routing{ "query": "SELECT * FROM users JOIN orders ON ..."}
Response:{ "tables": ["users", "orders"], "workload": "oltp", "access_patterns": ["point_lookup", "range_scan"], "recommended_route": "primary", "shard_info": { "key": "customer_id", "shards": [0, 2] }}AI/Agent Innovations
1. AI Workload Detection
Automatically detect AI-specific access patterns:
pub struct AIWorkloadDetector { /// Patterns indicating AI workloads patterns: Vec<AIPattern>,}
impl AIWorkloadDetector { pub fn detect(&self, query: &str) -> Option<AIWorkloadType> { // Embedding retrieval if query.contains("<->") || query.contains("vector") { if query.contains("ORDER BY") && query.contains("LIMIT") { return Some(AIWorkloadType::EmbeddingRetrieval); } }
// Context lookup (conversation history) if query.contains("conversation") || query.contains("turns") { return Some(AIWorkloadType::ContextLookup); }
// Knowledge base query if query.contains("documents") && query.contains("chunks") { return Some(AIWorkloadType::KnowledgeBase); }
// Tool result storage if query.contains("tool_results") || query.contains("actions") { return Some(AIWorkloadType::ToolExecution); }
None }
pub fn get_optimal_routing(&self, workload: AIWorkloadType) -> RoutingPreference { match workload { AIWorkloadType::EmbeddingRetrieval => { RoutingPreference::VectorNodes { prefer_gpu: true } } AIWorkloadType::ContextLookup => { RoutingPreference::LowLatency { max_lag_ms: 100 } } AIWorkloadType::KnowledgeBase => { RoutingPreference::HighThroughput } AIWorkloadType::ToolExecution => { RoutingPreference::Primary // Needs writes } } }}2. RAG Pipeline Routing
Optimize routing for RAG stages:
pub struct RAGRouter { schema_router: Arc<SchemaAwareRouter>,}
impl RAGRouter { pub fn route_rag_query(&self, stage: RAGStage, query: &str) -> RoutingDecision { match stage { RAGStage::Retrieval => { // Vector search on embeddings table self.route_to_vector_nodes(query) } RAGStage::Fetch => { // Bulk fetch from documents table self.route_to_high_throughput_nodes(query) } RAGStage::Rerank => { // Light computation, any node self.route_to_lowest_latency(query) } RAGStage::Generate => { // May write to cache self.route_to_primary_if_writing(query) } } }}3. Agent Workspace Routing
Route agent queries based on workspace structure:
pub struct AgentWorkspaceRouter { /// Workspace to shard mapping workspace_shards: HashMap<WorkspaceId, ShardId>,}
impl AgentWorkspaceRouter { pub fn route_agent_query(&self, workspace: &str, query: &str) -> RoutingDecision { // Agent workspace data is co-located for efficiency if let Some(shard) = self.workspace_shards.get(workspace) { return RoutingDecision::shard(*shard); }
// New workspace, create affinity let shard = self.assign_workspace_shard(workspace); self.workspace_shards.insert(workspace.to_string(), shard); RoutingDecision::shard(shard) }}4. Learning-Based Classification
Learn table classifications from query patterns:
pub struct LearningClassifier { /// Query history per table history: DashMap<TableName, QueryHistory>,
/// Classification model model: ClassificationModel,}
impl LearningClassifier { pub fn update(&self, table: &str, query_type: QueryType, latency: Duration) { let history = self.history.entry(table.to_string()) .or_insert_with(QueryHistory::new);
history.record(query_type, latency);
// Periodically retrain classification if history.count() % 1000 == 0 { self.reclassify(table); } }
fn reclassify(&self, table: &str) { let history = self.history.get(table)?;
// Determine temperature based on access frequency let temperature = if history.qpm() > 1000 { DataTemperature::Hot } else if history.qpm() > 100 { DataTemperature::Warm } else { DataTemperature::Cold };
// Determine workload based on query types let workload = if history.read_write_ratio() > 10.0 { WorkloadType::OLAP } else if history.read_write_ratio() < 2.0 { WorkloadType::OLTP } else { WorkloadType::HTAP };
self.schema.update_classification(table, temperature, workload); }}HeliosDB-Lite Integration
1. Branch-Based Data Locality
Route queries based on branch data location:
impl SchemaAwareRouter { pub fn route_with_branch(&self, query: &str, branch: &str) -> RoutingDecision { // Get nodes that have the branch data let branch_nodes = self.schema.get_branch_locations(branch);
// Filter by query requirements let analysis = self.analyzer.analyze(query); let eligible = self.filter_by_capabilities(&analysis);
// Intersection let available: Vec<_> = eligible.iter() .filter(|n| branch_nodes.contains(&n.id)) .collect();
if available.is_empty() { // Branch not replicated to eligible nodes return RoutingDecision::primary_with_branch(branch); }
self.select_best(&available, &analysis) }}2. Time-Temperature Routing
Route historical queries to cold storage:
impl SchemaAwareRouter { pub fn route_time_travel(&self, query: &str, as_of: Timestamp) -> RoutingDecision { let age = Utc::now().signed_duration_since(as_of);
// Recent data on hot nodes if age < Duration::days(7) { return self.route_to_hot_nodes(query); }
// Older data on warm nodes if age < Duration::days(30) { return self.route_to_warm_nodes(query); }
// Historical data on cold/archive nodes self.route_to_cold_nodes(query) }}3. Vector Index Routing
Route based on vector index availability:
impl SchemaAwareRouter { pub fn route_vector_query(&self, query: &str) -> RoutingDecision { let tables = self.analyzer.extract_tables(query);
// Find nodes with HNSW index loaded for table in tables { if let Some(index) = self.schema.get_vector_index(&table) { let nodes_with_index = self.schema.get_index_locations(&index);
if !nodes_with_index.is_empty() { // Prefer node with index in memory let best = nodes_with_index.iter() .max_by_key(|n| n.index_in_memory.get(&index).unwrap_or(&false));
return RoutingDecision::single(best.clone()); } } }
// No pre-loaded index, route to any vector-capable node self.route_vector_default(query) }}4. Per-Column Storage Routing
Route based on column storage type:
impl SchemaAwareRouter { pub fn route_by_column_storage(&self, query: &str) -> RoutingDecision { let columns = self.analyzer.extract_columns(query);
// Check if query accesses columnar columns let has_columnar = columns.iter() .any(|c| self.schema.is_columnar_column(c));
// Check if query accesses content-addressed columns let has_content_addressed = columns.iter() .any(|c| self.schema.is_content_addressed(c));
if has_columnar { // Route to nodes with columnar engine return self.route_to_columnar_nodes(query); }
if has_content_addressed { // Route to nodes with content store return self.route_to_content_store_nodes(query); }
self.route_default(query) }}Implementation Notes
File Locations
src/proxy/├── schema_routing/│ ├── mod.rs # Public API│ ├── registry.rs # SchemaRegistry│ ├── analyzer.rs # QueryAnalyzer│ ├── router.rs # SchemaAwareRouter│ ├── classifier.rs # LearningClassifier│ ├── discovery.rs # Schema auto-discovery│ └── metrics.rs # Routing metricsKey Considerations
-
Schema Sync: Keep schema registry in sync with actual database schema.
-
Classification Updates: Allow runtime updates to classifications.
-
Cross-Table Queries: Handle queries spanning multiple classifications.
-
Fallback: Gracefully degrade when preferred nodes unavailable.
-
Metrics: Track routing effectiveness per classification.
Auto-Discovery
pub struct SchemaDiscovery { db: Arc<DatabasePool>,}
impl SchemaDiscovery { pub async fn discover(&self) -> Result<Vec<TableSchema>> { // Query pg_catalog for table metadata let tables = self.db.query( "SELECT table_name, pg_class.reltuples, pg_relation_size(...) FROM information_schema.tables JOIN pg_class ON ..." ).await?;
// Analyze query patterns from pg_stat_statements let patterns = self.db.query( "SELECT query, calls, mean_time FROM pg_stat_statements" ).await?;
self.classify_tables(&tables, &patterns) }}Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Query analysis | <1ms | p99 |
| Routing decision | <100μs | p99 |
| Schema refresh | <5s | full refresh |
| Classification accuracy | >90% | vs optimal routing |
Related Features
- Query Routing Hints - Manual routing
- Query Analytics - Pattern analysis
- Helios-DistribCache - Data locality caching