Feature 10: Query Rewriting
Feature 10: Query Rewriting
Priority: Medium | Complexity: Medium | Phase: 3 (Enterprise)
Overview
Problem Statement
Applications often send suboptimal queries:
- Legacy queries that could be optimized
- Missing indexes not utilized
- Inefficient patterns (N+1, SELECT *)
- Cross-platform compatibility issues
Manual query optimization requires:
- Code changes across applications
- Testing and deployment cycles
- Coordination between teams
Solution
Implement transparent query rewriting at the proxy layer:
┌─────────────────────────────────────────────────┐ │ QUERY REWRITER │ │ │ Original ────────►│ ┌──────────────────────────────────────────┐ │ Query │ │ 1. Parse Query │ │ │ │ - SQL AST generation │ │ │ │ - Fingerprint extraction │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 2. Match Rewrite Rules │ │ │ │ - Pattern matching │ │ │ │ - Condition evaluation │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 3. Apply Transformations │ │ │ │ - Index hints │ │ │ │ - Query restructuring │ │ │ │ - Parameter injection │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ Rewritten Query ─────────────────────►│ └─────────────────────────────────────────────────┘Architecture
Query Rewriter
pub struct QueryRewriter { /// SQL parser parser: SqlParser,
/// Rewrite rules rules: Vec<RewriteRule>,
/// Rule matcher matcher: RuleMatcher,
/// Metrics metrics: RewriteMetrics,}
#[derive(Debug, Clone)]pub struct RewriteRule { /// Rule identifier pub id: String,
/// Human-readable description pub description: String,
/// Pattern to match pub pattern: QueryPattern,
/// Transformation to apply pub transformation: Transformation,
/// Condition for applying rule pub condition: Option<Condition>,
/// Priority (higher = applied first) pub priority: i32,
/// Enabled/disabled pub enabled: bool,}
#[derive(Debug, Clone)]pub enum QueryPattern { /// Match by fingerprint hash Fingerprint(u64),
/// Match by SQL pattern (regex) Regex(String),
/// Match by AST pattern Ast(AstPattern),
/// Match by table name Table(String),
/// Match all queries All,}
#[derive(Debug, Clone)]pub enum Transformation { /// Replace entire query Replace(String),
/// Add index hint AddIndexHint { table: String, index: String },
/// Rewrite SELECT * to specific columns ExpandSelectStar { columns: Vec<String> },
/// Add LIMIT clause AddLimit(u32),
/// Add WHERE condition AddWhereClause(String),
/// Replace table name ReplaceTable { from: String, to: String },
/// Custom transformation function Custom(String),}Rule Matcher
pub struct RuleMatcher { /// Fingerprint index for fast lookup fingerprint_index: HashMap<u64, Vec<usize>>,
/// Regex patterns (compiled) regex_patterns: Vec<(Regex, usize)>,
/// Table index table_index: HashMap<String, Vec<usize>>,}
impl RuleMatcher { pub fn match_rules(&self, query: &str, fingerprint: u64) -> Vec<&RewriteRule> { let mut matches = Vec::new();
// Check fingerprint matches (fast path) if let Some(indices) = self.fingerprint_index.get(&fingerprint) { for &idx in indices { matches.push(&self.rules[idx]); } }
// Check regex matches for (regex, idx) in &self.regex_patterns { if regex.is_match(query) { matches.push(&self.rules[*idx]); } }
// Sort by priority matches.sort_by_key(|r| -r.priority); matches }}Transformation Engine
pub struct TransformationEngine { parser: SqlParser,}
impl TransformationEngine { pub fn apply(&self, query: &str, transformation: &Transformation) -> Result<String, RewriteError> { match transformation { Transformation::Replace(replacement) => { Ok(replacement.clone()) }
Transformation::AddIndexHint { table, index } => { let mut ast = self.parser.parse(query)?; self.add_index_hint(&mut ast, table, index); Ok(self.parser.to_sql(&ast)) }
Transformation::ExpandSelectStar { columns } => { let mut ast = self.parser.parse(query)?; self.expand_select_star(&mut ast, columns); Ok(self.parser.to_sql(&ast)) }
Transformation::AddLimit(limit) => { let mut ast = self.parser.parse(query)?; if !self.has_limit(&ast) { self.add_limit(&mut ast, *limit); } Ok(self.parser.to_sql(&ast)) }
Transformation::AddWhereClause(condition) => { let mut ast = self.parser.parse(query)?; self.add_where_clause(&mut ast, condition); Ok(self.parser.to_sql(&ast)) }
Transformation::ReplaceTable { from, to } => { let mut ast = self.parser.parse(query)?; self.replace_table(&mut ast, from, to); Ok(self.parser.to_sql(&ast)) }
Transformation::Custom(function_name) => { self.apply_custom_function(query, function_name) } } }
fn expand_select_star(&self, ast: &mut Ast, columns: &[String]) { // Find SELECT * and replace with column list if let Some(select) = ast.as_select_mut() { if select.projection.iter().any(|p| matches!(p, SelectItem::Wildcard)) { select.projection = columns.iter() .map(|c| SelectItem::UnnamedExpr(Expr::Identifier(c.clone()))) .collect(); } } }}API Specification
Configuration (heliosproxy.toml)
[rewriter]enabled = truelog_rewrites = true
# Built-in rulesexpand_select_star = trueadd_default_limit = truedefault_limit = 1000
# Custom rules[[rewriter.rules]]id = "optimize_user_lookup"description = "Add index hint for user lookups by email"pattern = { regex = "SELECT .* FROM users WHERE email = .*" }transformation = { add_index_hint = { table = "users", index = "idx_users_email" } }priority = 100
[[rewriter.rules]]id = "replace_legacy_table"description = "Redirect queries from old table to new table"pattern = { table = "old_users" }transformation = { replace_table = { from = "old_users", to = "users_v2" } }priority = 90
[[rewriter.rules]]id = "limit_analytics"description = "Add limit to analytics queries"pattern = { regex = "SELECT .* FROM analytics_events.*" }transformation = { add_limit = 10000 }condition = { no_existing_limit = true }priority = 50
[[rewriter.rules]]id = "expand_orders_star"description = "Expand SELECT * on orders table"pattern = { ast = { select_star = true, table = "orders" } }transformation = { expand_select_star = { columns = ["id", "user_id", "status", "total", "created_at"] } }priority = 40
[[rewriter.rules]]id = "add_tenant_filter"description = "Add tenant filter for multi-tenant queries"pattern = { table = "projects" }transformation = { add_where_clause = "tenant_id = current_setting('app.tenant_id')::uuid" }condition = { session_var = { name = "app.tenant_id", exists = true } }priority = 200Admin API
GET /rewriter/rules{ "rules": [ { "id": "optimize_user_lookup", "description": "Add index hint for user lookups by email", "enabled": true, "priority": 100, "matches_last_hour": 1234, "avg_time_saved_ms": 5.2 } ]}
POST /rewriter/rules# Add new rule dynamically{ "id": "new_rule", "pattern": { "regex": "SELECT .* FROM slow_table" }, "transformation": { "add_limit": 100 }}
PUT /rewriter/rules/{rule_id}# Update existing rule{ "enabled": false }
DELETE /rewriter/rules/{rule_id}# Remove rule
POST /rewriter/test# Test rewrite without executing{ "query": "SELECT * FROM users WHERE email = 'test@example.com'"}
Response:{ "original": "SELECT * FROM users WHERE email = 'test@example.com'", "rewritten": "SELECT * FROM users /*+ INDEX(users idx_users_email) */ WHERE email = 'test@example.com'", "rules_applied": ["optimize_user_lookup"]}
GET /rewriter/stats{ "total_queries": 100000, "queries_rewritten": 15234, "rewrite_ratio": 0.152, "estimated_time_saved_ms": 76170, "by_rule": { "optimize_user_lookup": { "matches": 5000, "time_saved_ms": 25000 } }}AI/Agent Innovations
1. Semantic Query Normalization
Normalize natural language-influenced queries:
pub struct SemanticNormalizer { /// Map natural patterns to SQL patterns patterns: Vec<(String, String)>,}
impl SemanticNormalizer { pub fn normalize(&self, query: &str) -> String { // "Get all users" -> "SELECT * FROM users" // "Find orders from last week" -> "SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '7 days'"
for (nl_pattern, sql_pattern) in &self.patterns { if let Some(captures) = regex_match(nl_pattern, query) { return self.apply_template(sql_pattern, &captures); } }
query.to_string() }}2. RAG Query Optimization
Optimize embedding queries for RAG:
[[rewriter.rules]]id = "optimize_rag_embedding"description = "Optimize vector similarity queries"pattern = { regex = "SELECT .* FROM embeddings.* ORDER BY .* <->.*" }transformation = { custom = "optimize_hnsw_search" }
[[rewriter.rules]]id = "batch_rag_lookups"description = "Combine multiple chunk lookups into single query"pattern = { ast = { n_plus_one_pattern = "chunks" } }transformation = { custom = "batch_chunk_lookup" }3. Agentic Query Safety
Add safety limits to agent queries:
pub struct AgentQuerySafetyRules { /// Max rows for agent queries max_rows: u32,
/// Timeout for agent queries max_timeout: Duration,
/// Forbidden tables for agents forbidden_tables: HashSet<String>,}
impl AgentQuerySafetyRules { pub fn apply(&self, query: &str, is_agent: bool) -> Result<String, RewriteError> { if !is_agent { return Ok(query.to_string()); }
let mut ast = self.parser.parse(query)?;
// Add limit if not present if !self.has_limit(&ast) { self.add_limit(&mut ast, self.max_rows); }
// Check for forbidden tables for table in self.extract_tables(&ast) { if self.forbidden_tables.contains(&table) { return Err(RewriteError::ForbiddenTable(table)); } }
Ok(self.parser.to_sql(&ast)) }}4. LLM Context Optimization
Optimize queries for LLM context building:
[[rewriter.rules]]id = "optimize_context_retrieval"description = "Limit and structure context queries for LLM"pattern = { regex = "SELECT .* FROM conversation_history.*" }transformation = { custom = "format_for_llm_context" }fn format_for_llm_context(&self, query: &str) -> String { // Add limit and ordering for context window let mut ast = self.parser.parse(query)?;
// Limit to recent turns self.add_limit(&mut ast, 20);
// Order by recency self.add_order_by(&mut ast, "turn_number", "DESC");
// Select only needed columns self.select_columns(&mut ast, vec!["role", "content", "timestamp"]);
self.parser.to_sql(&ast)}HeliosDB-Lite Integration
1. Branch Query Routing
Rewrite queries to target specific branches:
[[rewriter.rules]]id = "route_analytics_to_branch"description = "Route analytics queries to analytics branch"pattern = { table = "analytics_events" }transformation = { custom = "add_branch_hint" }fn add_branch_hint(&self, query: &str) -> String { format!("/*helios:branch=analytics*/ {}", query)}2. Time-Travel Query Enhancement
Auto-add time-travel clauses:
[[rewriter.rules]]id = "add_time_travel"description = "Add AS OF clause for audit queries"pattern = { regex = "SELECT .* FROM audit_log.*" }condition = { session_var = { name = "audit.as_of", exists = true } }transformation = { custom = "add_as_of_clause" }3. Vector Query Optimization
Optimize vector search queries:
pub fn optimize_vector_query(&self, query: &str) -> String { let mut ast = self.parser.parse(query)?;
// Add ef_search hint for HNSW if self.is_ann_query(&ast) { self.add_hint(&mut ast, "ef_search", "100"); }
// Rewrite to use index if self.can_use_hnsw_index(&ast) { self.add_index_hint(&mut ast, "embeddings", "idx_embeddings_hnsw"); }
self.parser.to_sql(&ast)}4. Sync Mode Query Hints
Add sync mode hints based on query type:
[[rewriter.rules]]id = "critical_read_sync"description = "Route critical reads to sync standbys"pattern = { regex = "SELECT .* FROM (accounts|transactions|balances).*" }transformation = { custom = "add_sync_hint" }Implementation Notes
File Locations
src/proxy/├── rewriter/│ ├── mod.rs # Public API│ ├── rewriter.rs # QueryRewriter│ ├── rules.rs # RewriteRule, QueryPattern, Transformation│ ├── matcher.rs # RuleMatcher│ ├── transformer.rs # TransformationEngine│ ├── parser.rs # SQL parser integration│ └── metrics.rs # Rewrite metricsKey Considerations
-
Correctness: Ensure rewrites preserve query semantics. Test thoroughly.
-
Performance: Rewriting adds latency. Cache rewritten queries by fingerprint.
-
Debugging: Log original and rewritten queries for troubleshooting.
-
Rollback: Easy way to disable rules if they cause issues.
-
Prepared Statements: Handle parameterized queries correctly.
SQL Parser Integration
// Using sqlparser-rsuse sqlparser::dialect::PostgreSqlDialect;use sqlparser::parser::Parser;
pub struct SqlParser { dialect: PostgreSqlDialect,}
impl SqlParser { pub fn parse(&self, sql: &str) -> Result<Vec<Statement>, ParseError> { Parser::parse_sql(&self.dialect, sql) }
pub fn to_sql(&self, statements: &[Statement]) -> String { statements.iter() .map(|s| s.to_string()) .collect::<Vec<_>>() .join("; ") }}Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Rule matching | <100μs | p99 |
| Query transformation | <500μs | p99 |
| Overall rewrite overhead | <1ms | p99 |
| Cache hit for fingerprint | >90% | for repeated queries |
Related Features
- Query Analytics - Identify queries to optimize
- Query Routing Hints - Routing hints via rewriting
- Multi-Tenancy - Tenant filter injection