Skip to content

Feature 13: Schema-Aware Routing

Feature 13: Schema-Aware Routing

Priority: Medium | Complexity: High | Phase: 4 (Differentiation)


Overview

Problem Statement

Generic query routing ignores schema semantics:

  • All tables treated equally (but some are hot, some cold)
  • No awareness of data locality
  • Can’t optimize for specific access patterns
  • Sharding decisions made blindly

Without schema awareness:

  • Hot tables compete with cold tables
  • Analytics queries interfere with OLTP
  • Vector searches not routed to vector-optimized nodes
  • Joins across shards are inefficient

Solution

Implement schema-aware routing that understands data semantics:

┌─────────────────────────────────────────────────┐
│ SCHEMA-AWARE ROUTER │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Schema Registry │ │
│ │ ┌────────────────────────────────────┐ │ │
│ │ │ Tables: │ │ │
│ │ │ users: HOT, OLTP, shard_by=id │ │ │
│ │ │ events: COLD, OLAP, partition=d │ │ │
│ │ │ embeddings: VECTOR, gpu_nodes │ │ │
│ │ └────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │ │
Query ───────────►│ ┌──────────────────────────────────────────┐ │
│ │ Query Analyzer │ │
│ │ - Extract tables │ │
│ │ - Detect access pattern │ │
│ │ - Identify sharding keys │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ Routing Decision │ │
│ │ - Select optimal node/shard │ │
│ │ - Parallelize if needed │ │
│ │ - Aggregate results │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘

Architecture

Schema Registry

pub struct SchemaRegistry {
/// Table metadata
tables: DashMap<TableName, TableSchema>,
/// Index metadata
indexes: DashMap<IndexName, IndexSchema>,
/// Relationship graph
relationships: Graph<TableName, Relationship>,
/// Sharding configuration
sharding: ShardingConfig,
/// Node capabilities
node_capabilities: DashMap<NodeId, NodeCapabilities>,
}
#[derive(Debug, Clone)]
pub struct TableSchema {
/// Table name
pub name: String,
/// Columns
pub columns: Vec<ColumnSchema>,
/// Access pattern classification
pub access_pattern: AccessPattern,
/// Temperature (HOT/WARM/COLD)
pub temperature: DataTemperature,
/// Workload type
pub workload: WorkloadType,
/// Sharding key (if sharded)
pub shard_key: Option<String>,
/// Partition key (if partitioned)
pub partition_key: Option<PartitionKey>,
/// Preferred nodes
pub preferred_nodes: Vec<NodeId>,
/// Estimated row count
pub estimated_rows: u64,
/// Average row size
pub avg_row_size: usize,
}
#[derive(Debug, Clone, Copy)]
pub enum AccessPattern {
/// Point lookups by primary key
PointLookup,
/// Range scans
RangeScan,
/// Full table scans (OLAP)
FullScan,
/// Vector similarity search
VectorSearch,
/// Time-series append
TimeSeriesAppend,
/// Mixed
Mixed,
}
#[derive(Debug, Clone, Copy)]
pub enum DataTemperature {
/// Frequently accessed, keep in memory
Hot,
/// Occasionally accessed
Warm,
/// Rarely accessed, can be on slower storage
Cold,
/// Archive, acceptable to be slow
Frozen,
}
#[derive(Debug, Clone, Copy)]
pub enum WorkloadType {
/// Online Transaction Processing
OLTP,
/// Online Analytical Processing
OLAP,
/// Hybrid Transactional/Analytical
HTAP,
/// Vector/AI workloads
Vector,
/// Mixed
Mixed,
}

Query Analyzer

pub struct QueryAnalyzer {
parser: SqlParser,
schema: Arc<SchemaRegistry>,
}
impl QueryAnalyzer {
/// Analyze query and determine routing requirements
pub fn analyze(&self, query: &str) -> QueryAnalysis {
let ast = self.parser.parse(query)?;
let tables = self.extract_tables(&ast);
let access_patterns = self.detect_access_patterns(&ast, &tables);
let shard_keys = self.extract_shard_keys(&ast, &tables);
let workload_type = self.classify_workload(&ast, &tables);
QueryAnalysis {
tables,
access_patterns,
shard_keys,
workload_type,
complexity: self.estimate_complexity(&ast),
selectivity: self.estimate_selectivity(&ast),
}
}
fn detect_access_patterns(&self, ast: &Ast, tables: &[TableRef]) -> Vec<AccessPattern> {
let mut patterns = Vec::new();
for table in tables {
let schema = self.schema.get_table(&table.name)?;
// Check for point lookup (WHERE pk = ?)
if self.has_equality_on_pk(ast, &schema) {
patterns.push(AccessPattern::PointLookup);
}
// Check for range scan
else if self.has_range_predicate(ast, &schema) {
patterns.push(AccessPattern::RangeScan);
}
// Check for vector search
else if self.has_vector_operator(ast) {
patterns.push(AccessPattern::VectorSearch);
}
// Default to full scan
else {
patterns.push(AccessPattern::FullScan);
}
}
patterns
}
fn classify_workload(&self, ast: &Ast, tables: &[TableRef]) -> WorkloadType {
// Check for aggregations (OLAP indicator)
if self.has_aggregations(ast) {
return WorkloadType::OLAP;
}
// Check for simple point queries (OLTP indicator)
if self.is_simple_crud(ast) {
return WorkloadType::OLTP;
}
// Check for vector operations
if self.has_vector_operator(ast) {
return WorkloadType::Vector;
}
// Mixed workload
WorkloadType::Mixed
}
}

Schema-Aware Router

pub struct SchemaAwareRouter {
schema: Arc<SchemaRegistry>,
analyzer: QueryAnalyzer,
nodes: Arc<NodeRegistry>,
}
impl SchemaAwareRouter {
pub fn route(&self, query: &str) -> RoutingDecision {
let analysis = self.analyzer.analyze(query);
// 1. Determine node requirements based on workload
let required_capabilities = self.get_required_capabilities(&analysis);
// 2. Filter eligible nodes
let eligible = self.filter_by_capabilities(&required_capabilities);
// 3. Check sharding
if let Some(shard_routing) = self.try_shard_routing(&analysis) {
return shard_routing;
}
// 4. Route based on workload type
match analysis.workload_type {
WorkloadType::OLTP => self.route_oltp(&eligible, &analysis),
WorkloadType::OLAP => self.route_olap(&eligible, &analysis),
WorkloadType::Vector => self.route_vector(&eligible, &analysis),
WorkloadType::HTAP | WorkloadType::Mixed => self.route_mixed(&eligible, &analysis),
}
}
fn get_required_capabilities(&self, analysis: &QueryAnalysis) -> NodeCapabilities {
let mut caps = NodeCapabilities::default();
// Vector queries need vector-capable nodes
if analysis.access_patterns.contains(&AccessPattern::VectorSearch) {
caps.vector_search = true;
caps.gpu_acceleration = true; // Prefer GPU nodes
}
// OLAP queries prefer columnar storage
if analysis.workload_type == WorkloadType::OLAP {
caps.columnar_storage = true;
}
// Hot tables need in-memory nodes
for table in &analysis.tables {
if let Some(schema) = self.schema.get_table(&table.name) {
if schema.temperature == DataTemperature::Hot {
caps.in_memory = true;
}
}
}
caps
}
fn route_oltp(&self, nodes: &[Node], analysis: &QueryAnalysis) -> RoutingDecision {
// OLTP: Low latency, prefer primary or sync standbys
let preferred: Vec<_> = nodes.iter()
.filter(|n| n.sync_mode == SyncMode::Sync || n.is_primary)
.sorted_by_key(|n| n.current_latency_ms)
.collect();
RoutingDecision::single(preferred.first().unwrap().clone())
}
fn route_olap(&self, nodes: &[Node], analysis: &QueryAnalysis) -> RoutingDecision {
// OLAP: Throughput over latency, prefer async standbys
let preferred: Vec<_> = nodes.iter()
.filter(|n| n.capabilities.columnar_storage)
.sorted_by_key(|n| n.current_load)
.collect();
if preferred.is_empty() {
// Fall back to any async standby
let async_nodes: Vec<_> = nodes.iter()
.filter(|n| n.sync_mode == SyncMode::Async)
.collect();
return RoutingDecision::single(async_nodes.first().unwrap().clone());
}
RoutingDecision::single(preferred.first().unwrap().clone())
}
fn route_vector(&self, nodes: &[Node], analysis: &QueryAnalysis) -> RoutingDecision {
// Vector: Need vector-capable nodes, prefer GPU
let vector_nodes: Vec<_> = nodes.iter()
.filter(|n| n.capabilities.vector_search)
.sorted_by(|a, b| {
// Prefer GPU, then lower load
b.capabilities.gpu_acceleration.cmp(&a.capabilities.gpu_acceleration)
.then_with(|| a.current_load.cmp(&b.current_load))
})
.collect();
RoutingDecision::single(vector_nodes.first().unwrap().clone())
}
fn try_shard_routing(&self, analysis: &QueryAnalysis) -> Option<RoutingDecision> {
// Check if query can be routed to specific shard
for table in &analysis.tables {
if let Some(schema) = self.schema.get_table(&table.name) {
if let Some(shard_key) = &schema.shard_key {
if let Some(shard_value) = analysis.shard_keys.get(shard_key) {
let shard = self.schema.sharding.get_shard(shard_value);
return Some(RoutingDecision::shard(shard));
}
}
}
}
// Cross-shard query
None
}
}

API Specification

Configuration (heliosproxy.toml)

[schema_routing]
enabled = true
auto_discover = true
refresh_interval = "5m"
# Table classifications
[[schema_routing.tables]]
name = "users"
temperature = "hot"
workload = "oltp"
access_pattern = "point_lookup"
preferred_nodes = ["primary", "standby-sync"]
[[schema_routing.tables]]
name = "events"
temperature = "cold"
workload = "olap"
access_pattern = "full_scan"
preferred_nodes = ["standby-analytics"]
[[schema_routing.tables]]
name = "embeddings"
temperature = "warm"
workload = "vector"
access_pattern = "vector_search"
preferred_nodes = ["standby-vector-1", "standby-vector-2"]
[[schema_routing.tables]]
name = "orders"
temperature = "hot"
workload = "oltp"
shard_key = "customer_id"
shard_count = 4
# Node capabilities
[[schema_routing.nodes]]
name = "standby-analytics"
capabilities = { columnar_storage = true, in_memory = false }
[[schema_routing.nodes]]
name = "standby-vector-1"
capabilities = { vector_search = true, gpu_acceleration = true }

Admin API

GET /schema/tables
{
"tables": [
{
"name": "users",
"temperature": "hot",
"workload": "oltp",
"estimated_rows": 1000000,
"preferred_nodes": ["primary", "standby-sync"],
"queries_per_minute": 5000
}
]
}
GET /schema/routing-stats
{
"by_workload": {
"oltp": { "queries": 50000, "avg_latency_ms": 2.1 },
"olap": { "queries": 1000, "avg_latency_ms": 150.3 },
"vector": { "queries": 10000, "avg_latency_ms": 5.2 }
},
"by_temperature": {
"hot": { "queries": 55000, "cache_hit_ratio": 0.95 },
"cold": { "queries": 5000, "cache_hit_ratio": 0.20 }
}
}
POST /schema/classify
# Manually classify table
{
"table": "new_table",
"temperature": "warm",
"workload": "mixed"
}
POST /schema/analyze
# Analyze query for routing
{
"query": "SELECT * FROM users JOIN orders ON ..."
}
Response:
{
"tables": ["users", "orders"],
"workload": "oltp",
"access_patterns": ["point_lookup", "range_scan"],
"recommended_route": "primary",
"shard_info": { "key": "customer_id", "shards": [0, 2] }
}

AI/Agent Innovations

1. AI Workload Detection

Automatically detect AI-specific access patterns:

pub struct AIWorkloadDetector {
/// Patterns indicating AI workloads
patterns: Vec<AIPattern>,
}
impl AIWorkloadDetector {
pub fn detect(&self, query: &str) -> Option<AIWorkloadType> {
// Embedding retrieval
if query.contains("<->") || query.contains("vector") {
if query.contains("ORDER BY") && query.contains("LIMIT") {
return Some(AIWorkloadType::EmbeddingRetrieval);
}
}
// Context lookup (conversation history)
if query.contains("conversation") || query.contains("turns") {
return Some(AIWorkloadType::ContextLookup);
}
// Knowledge base query
if query.contains("documents") && query.contains("chunks") {
return Some(AIWorkloadType::KnowledgeBase);
}
// Tool result storage
if query.contains("tool_results") || query.contains("actions") {
return Some(AIWorkloadType::ToolExecution);
}
None
}
pub fn get_optimal_routing(&self, workload: AIWorkloadType) -> RoutingPreference {
match workload {
AIWorkloadType::EmbeddingRetrieval => {
RoutingPreference::VectorNodes { prefer_gpu: true }
}
AIWorkloadType::ContextLookup => {
RoutingPreference::LowLatency { max_lag_ms: 100 }
}
AIWorkloadType::KnowledgeBase => {
RoutingPreference::HighThroughput
}
AIWorkloadType::ToolExecution => {
RoutingPreference::Primary // Needs writes
}
}
}
}

2. RAG Pipeline Routing

Optimize routing for RAG stages:

pub struct RAGRouter {
schema_router: Arc<SchemaAwareRouter>,
}
impl RAGRouter {
pub fn route_rag_query(&self, stage: RAGStage, query: &str) -> RoutingDecision {
match stage {
RAGStage::Retrieval => {
// Vector search on embeddings table
self.route_to_vector_nodes(query)
}
RAGStage::Fetch => {
// Bulk fetch from documents table
self.route_to_high_throughput_nodes(query)
}
RAGStage::Rerank => {
// Light computation, any node
self.route_to_lowest_latency(query)
}
RAGStage::Generate => {
// May write to cache
self.route_to_primary_if_writing(query)
}
}
}
}

3. Agent Workspace Routing

Route agent queries based on workspace structure:

pub struct AgentWorkspaceRouter {
/// Workspace to shard mapping
workspace_shards: HashMap<WorkspaceId, ShardId>,
}
impl AgentWorkspaceRouter {
pub fn route_agent_query(&self, workspace: &str, query: &str) -> RoutingDecision {
// Agent workspace data is co-located for efficiency
if let Some(shard) = self.workspace_shards.get(workspace) {
return RoutingDecision::shard(*shard);
}
// New workspace, create affinity
let shard = self.assign_workspace_shard(workspace);
self.workspace_shards.insert(workspace.to_string(), shard);
RoutingDecision::shard(shard)
}
}

4. Learning-Based Classification

Learn table classifications from query patterns:

pub struct LearningClassifier {
/// Query history per table
history: DashMap<TableName, QueryHistory>,
/// Classification model
model: ClassificationModel,
}
impl LearningClassifier {
pub fn update(&self, table: &str, query_type: QueryType, latency: Duration) {
let history = self.history.entry(table.to_string())
.or_insert_with(QueryHistory::new);
history.record(query_type, latency);
// Periodically retrain classification
if history.count() % 1000 == 0 {
self.reclassify(table);
}
}
fn reclassify(&self, table: &str) {
let history = self.history.get(table)?;
// Determine temperature based on access frequency
let temperature = if history.qpm() > 1000 {
DataTemperature::Hot
} else if history.qpm() > 100 {
DataTemperature::Warm
} else {
DataTemperature::Cold
};
// Determine workload based on query types
let workload = if history.read_write_ratio() > 10.0 {
WorkloadType::OLAP
} else if history.read_write_ratio() < 2.0 {
WorkloadType::OLTP
} else {
WorkloadType::HTAP
};
self.schema.update_classification(table, temperature, workload);
}
}

HeliosDB-Lite Integration

1. Branch-Based Data Locality

Route queries based on branch data location:

impl SchemaAwareRouter {
pub fn route_with_branch(&self, query: &str, branch: &str) -> RoutingDecision {
// Get nodes that have the branch data
let branch_nodes = self.schema.get_branch_locations(branch);
// Filter by query requirements
let analysis = self.analyzer.analyze(query);
let eligible = self.filter_by_capabilities(&analysis);
// Intersection
let available: Vec<_> = eligible.iter()
.filter(|n| branch_nodes.contains(&n.id))
.collect();
if available.is_empty() {
// Branch not replicated to eligible nodes
return RoutingDecision::primary_with_branch(branch);
}
self.select_best(&available, &analysis)
}
}

2. Time-Temperature Routing

Route historical queries to cold storage:

impl SchemaAwareRouter {
pub fn route_time_travel(&self, query: &str, as_of: Timestamp) -> RoutingDecision {
let age = Utc::now().signed_duration_since(as_of);
// Recent data on hot nodes
if age < Duration::days(7) {
return self.route_to_hot_nodes(query);
}
// Older data on warm nodes
if age < Duration::days(30) {
return self.route_to_warm_nodes(query);
}
// Historical data on cold/archive nodes
self.route_to_cold_nodes(query)
}
}

3. Vector Index Routing

Route based on vector index availability:

impl SchemaAwareRouter {
pub fn route_vector_query(&self, query: &str) -> RoutingDecision {
let tables = self.analyzer.extract_tables(query);
// Find nodes with HNSW index loaded
for table in tables {
if let Some(index) = self.schema.get_vector_index(&table) {
let nodes_with_index = self.schema.get_index_locations(&index);
if !nodes_with_index.is_empty() {
// Prefer node with index in memory
let best = nodes_with_index.iter()
.max_by_key(|n| n.index_in_memory.get(&index).unwrap_or(&false));
return RoutingDecision::single(best.clone());
}
}
}
// No pre-loaded index, route to any vector-capable node
self.route_vector_default(query)
}
}

4. Per-Column Storage Routing

Route based on column storage type:

impl SchemaAwareRouter {
pub fn route_by_column_storage(&self, query: &str) -> RoutingDecision {
let columns = self.analyzer.extract_columns(query);
// Check if query accesses columnar columns
let has_columnar = columns.iter()
.any(|c| self.schema.is_columnar_column(c));
// Check if query accesses content-addressed columns
let has_content_addressed = columns.iter()
.any(|c| self.schema.is_content_addressed(c));
if has_columnar {
// Route to nodes with columnar engine
return self.route_to_columnar_nodes(query);
}
if has_content_addressed {
// Route to nodes with content store
return self.route_to_content_store_nodes(query);
}
self.route_default(query)
}
}

Implementation Notes

File Locations

src/proxy/
├── schema_routing/
│ ├── mod.rs # Public API
│ ├── registry.rs # SchemaRegistry
│ ├── analyzer.rs # QueryAnalyzer
│ ├── router.rs # SchemaAwareRouter
│ ├── classifier.rs # LearningClassifier
│ ├── discovery.rs # Schema auto-discovery
│ └── metrics.rs # Routing metrics

Key Considerations

  1. Schema Sync: Keep schema registry in sync with actual database schema.

  2. Classification Updates: Allow runtime updates to classifications.

  3. Cross-Table Queries: Handle queries spanning multiple classifications.

  4. Fallback: Gracefully degrade when preferred nodes unavailable.

  5. Metrics: Track routing effectiveness per classification.

Auto-Discovery

pub struct SchemaDiscovery {
db: Arc<DatabasePool>,
}
impl SchemaDiscovery {
pub async fn discover(&self) -> Result<Vec<TableSchema>> {
// Query pg_catalog for table metadata
let tables = self.db.query(
"SELECT table_name, pg_class.reltuples, pg_relation_size(...)
FROM information_schema.tables
JOIN pg_class ON ..."
).await?;
// Analyze query patterns from pg_stat_statements
let patterns = self.db.query(
"SELECT query, calls, mean_time FROM pg_stat_statements"
).await?;
self.classify_tables(&tables, &patterns)
}
}

Performance Targets

MetricTargetMeasurement
Query analysis<1msp99
Routing decision<100μsp99
Schema refresh<5sfull refresh
Classification accuracy>90%vs optimal routing