Skip to content

Feature 10: Query Rewriting

Feature 10: Query Rewriting

Priority: Medium | Complexity: Medium | Phase: 3 (Enterprise)


Overview

Problem Statement

Applications often send suboptimal queries:

  • Legacy queries that could be optimized
  • Missing indexes not utilized
  • Inefficient patterns (N+1, SELECT *)
  • Cross-platform compatibility issues

Manual query optimization requires:

  • Code changes across applications
  • Testing and deployment cycles
  • Coordination between teams

Solution

Implement transparent query rewriting at the proxy layer:

┌─────────────────────────────────────────────────┐
│ QUERY REWRITER │
│ │
Original ────────►│ ┌──────────────────────────────────────────┐ │
Query │ │ 1. Parse Query │ │
│ │ - SQL AST generation │ │
│ │ - Fingerprint extraction │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 2. Match Rewrite Rules │ │
│ │ - Pattern matching │ │
│ │ - Condition evaluation │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 3. Apply Transformations │ │
│ │ - Index hints │ │
│ │ - Query restructuring │ │
│ │ - Parameter injection │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Rewritten Query ─────────────────────►│
└─────────────────────────────────────────────────┘

Architecture

Query Rewriter

pub struct QueryRewriter {
/// SQL parser
parser: SqlParser,
/// Rewrite rules
rules: Vec<RewriteRule>,
/// Rule matcher
matcher: RuleMatcher,
/// Metrics
metrics: RewriteMetrics,
}
#[derive(Debug, Clone)]
pub struct RewriteRule {
/// Rule identifier
pub id: String,
/// Human-readable description
pub description: String,
/// Pattern to match
pub pattern: QueryPattern,
/// Transformation to apply
pub transformation: Transformation,
/// Condition for applying rule
pub condition: Option<Condition>,
/// Priority (higher = applied first)
pub priority: i32,
/// Enabled/disabled
pub enabled: bool,
}
#[derive(Debug, Clone)]
pub enum QueryPattern {
/// Match by fingerprint hash
Fingerprint(u64),
/// Match by SQL pattern (regex)
Regex(String),
/// Match by AST pattern
Ast(AstPattern),
/// Match by table name
Table(String),
/// Match all queries
All,
}
#[derive(Debug, Clone)]
pub enum Transformation {
/// Replace entire query
Replace(String),
/// Add index hint
AddIndexHint { table: String, index: String },
/// Rewrite SELECT * to specific columns
ExpandSelectStar { columns: Vec<String> },
/// Add LIMIT clause
AddLimit(u32),
/// Add WHERE condition
AddWhereClause(String),
/// Replace table name
ReplaceTable { from: String, to: String },
/// Custom transformation function
Custom(String),
}

Rule Matcher

pub struct RuleMatcher {
/// Fingerprint index for fast lookup
fingerprint_index: HashMap<u64, Vec<usize>>,
/// Regex patterns (compiled)
regex_patterns: Vec<(Regex, usize)>,
/// Table index
table_index: HashMap<String, Vec<usize>>,
}
impl RuleMatcher {
pub fn match_rules(&self, query: &str, fingerprint: u64) -> Vec<&RewriteRule> {
let mut matches = Vec::new();
// Check fingerprint matches (fast path)
if let Some(indices) = self.fingerprint_index.get(&fingerprint) {
for &idx in indices {
matches.push(&self.rules[idx]);
}
}
// Check regex matches
for (regex, idx) in &self.regex_patterns {
if regex.is_match(query) {
matches.push(&self.rules[*idx]);
}
}
// Sort by priority
matches.sort_by_key(|r| -r.priority);
matches
}
}

Transformation Engine

pub struct TransformationEngine {
parser: SqlParser,
}
impl TransformationEngine {
pub fn apply(&self, query: &str, transformation: &Transformation) -> Result<String, RewriteError> {
match transformation {
Transformation::Replace(replacement) => {
Ok(replacement.clone())
}
Transformation::AddIndexHint { table, index } => {
let mut ast = self.parser.parse(query)?;
self.add_index_hint(&mut ast, table, index);
Ok(self.parser.to_sql(&ast))
}
Transformation::ExpandSelectStar { columns } => {
let mut ast = self.parser.parse(query)?;
self.expand_select_star(&mut ast, columns);
Ok(self.parser.to_sql(&ast))
}
Transformation::AddLimit(limit) => {
let mut ast = self.parser.parse(query)?;
if !self.has_limit(&ast) {
self.add_limit(&mut ast, *limit);
}
Ok(self.parser.to_sql(&ast))
}
Transformation::AddWhereClause(condition) => {
let mut ast = self.parser.parse(query)?;
self.add_where_clause(&mut ast, condition);
Ok(self.parser.to_sql(&ast))
}
Transformation::ReplaceTable { from, to } => {
let mut ast = self.parser.parse(query)?;
self.replace_table(&mut ast, from, to);
Ok(self.parser.to_sql(&ast))
}
Transformation::Custom(function_name) => {
self.apply_custom_function(query, function_name)
}
}
}
fn expand_select_star(&self, ast: &mut Ast, columns: &[String]) {
// Find SELECT * and replace with column list
if let Some(select) = ast.as_select_mut() {
if select.projection.iter().any(|p| matches!(p, SelectItem::Wildcard)) {
select.projection = columns.iter()
.map(|c| SelectItem::UnnamedExpr(Expr::Identifier(c.clone())))
.collect();
}
}
}
}

API Specification

Configuration (heliosproxy.toml)

[rewriter]
enabled = true
log_rewrites = true
# Built-in rules
expand_select_star = true
add_default_limit = true
default_limit = 1000
# Custom rules
[[rewriter.rules]]
id = "optimize_user_lookup"
description = "Add index hint for user lookups by email"
pattern = { regex = "SELECT .* FROM users WHERE email = .*" }
transformation = { add_index_hint = { table = "users", index = "idx_users_email" } }
priority = 100
[[rewriter.rules]]
id = "replace_legacy_table"
description = "Redirect queries from old table to new table"
pattern = { table = "old_users" }
transformation = { replace_table = { from = "old_users", to = "users_v2" } }
priority = 90
[[rewriter.rules]]
id = "limit_analytics"
description = "Add limit to analytics queries"
pattern = { regex = "SELECT .* FROM analytics_events.*" }
transformation = { add_limit = 10000 }
condition = { no_existing_limit = true }
priority = 50
[[rewriter.rules]]
id = "expand_orders_star"
description = "Expand SELECT * on orders table"
pattern = { ast = { select_star = true, table = "orders" } }
transformation = { expand_select_star = { columns = ["id", "user_id", "status", "total", "created_at"] } }
priority = 40
[[rewriter.rules]]
id = "add_tenant_filter"
description = "Add tenant filter for multi-tenant queries"
pattern = { table = "projects" }
transformation = { add_where_clause = "tenant_id = current_setting('app.tenant_id')::uuid" }
condition = { session_var = { name = "app.tenant_id", exists = true } }
priority = 200

Admin API

GET /rewriter/rules
{
"rules": [
{
"id": "optimize_user_lookup",
"description": "Add index hint for user lookups by email",
"enabled": true,
"priority": 100,
"matches_last_hour": 1234,
"avg_time_saved_ms": 5.2
}
]
}
POST /rewriter/rules
# Add new rule dynamically
{
"id": "new_rule",
"pattern": { "regex": "SELECT .* FROM slow_table" },
"transformation": { "add_limit": 100 }
}
PUT /rewriter/rules/{rule_id}
# Update existing rule
{ "enabled": false }
DELETE /rewriter/rules/{rule_id}
# Remove rule
POST /rewriter/test
# Test rewrite without executing
{
"query": "SELECT * FROM users WHERE email = 'test@example.com'"
}
Response:
{
"original": "SELECT * FROM users WHERE email = 'test@example.com'",
"rewritten": "SELECT * FROM users /*+ INDEX(users idx_users_email) */ WHERE email = 'test@example.com'",
"rules_applied": ["optimize_user_lookup"]
}
GET /rewriter/stats
{
"total_queries": 100000,
"queries_rewritten": 15234,
"rewrite_ratio": 0.152,
"estimated_time_saved_ms": 76170,
"by_rule": {
"optimize_user_lookup": { "matches": 5000, "time_saved_ms": 25000 }
}
}

AI/Agent Innovations

1. Semantic Query Normalization

Normalize natural language-influenced queries:

pub struct SemanticNormalizer {
/// Map natural patterns to SQL patterns
patterns: Vec<(String, String)>,
}
impl SemanticNormalizer {
pub fn normalize(&self, query: &str) -> String {
// "Get all users" -> "SELECT * FROM users"
// "Find orders from last week" -> "SELECT * FROM orders WHERE created_at > NOW() - INTERVAL '7 days'"
for (nl_pattern, sql_pattern) in &self.patterns {
if let Some(captures) = regex_match(nl_pattern, query) {
return self.apply_template(sql_pattern, &captures);
}
}
query.to_string()
}
}

2. RAG Query Optimization

Optimize embedding queries for RAG:

[[rewriter.rules]]
id = "optimize_rag_embedding"
description = "Optimize vector similarity queries"
pattern = { regex = "SELECT .* FROM embeddings.* ORDER BY .* <->.*" }
transformation = { custom = "optimize_hnsw_search" }
[[rewriter.rules]]
id = "batch_rag_lookups"
description = "Combine multiple chunk lookups into single query"
pattern = { ast = { n_plus_one_pattern = "chunks" } }
transformation = { custom = "batch_chunk_lookup" }

3. Agentic Query Safety

Add safety limits to agent queries:

pub struct AgentQuerySafetyRules {
/// Max rows for agent queries
max_rows: u32,
/// Timeout for agent queries
max_timeout: Duration,
/// Forbidden tables for agents
forbidden_tables: HashSet<String>,
}
impl AgentQuerySafetyRules {
pub fn apply(&self, query: &str, is_agent: bool) -> Result<String, RewriteError> {
if !is_agent {
return Ok(query.to_string());
}
let mut ast = self.parser.parse(query)?;
// Add limit if not present
if !self.has_limit(&ast) {
self.add_limit(&mut ast, self.max_rows);
}
// Check for forbidden tables
for table in self.extract_tables(&ast) {
if self.forbidden_tables.contains(&table) {
return Err(RewriteError::ForbiddenTable(table));
}
}
Ok(self.parser.to_sql(&ast))
}
}

4. LLM Context Optimization

Optimize queries for LLM context building:

[[rewriter.rules]]
id = "optimize_context_retrieval"
description = "Limit and structure context queries for LLM"
pattern = { regex = "SELECT .* FROM conversation_history.*" }
transformation = { custom = "format_for_llm_context" }
fn format_for_llm_context(&self, query: &str) -> String {
// Add limit and ordering for context window
let mut ast = self.parser.parse(query)?;
// Limit to recent turns
self.add_limit(&mut ast, 20);
// Order by recency
self.add_order_by(&mut ast, "turn_number", "DESC");
// Select only needed columns
self.select_columns(&mut ast, vec!["role", "content", "timestamp"]);
self.parser.to_sql(&ast)
}

HeliosDB-Lite Integration

1. Branch Query Routing

Rewrite queries to target specific branches:

[[rewriter.rules]]
id = "route_analytics_to_branch"
description = "Route analytics queries to analytics branch"
pattern = { table = "analytics_events" }
transformation = { custom = "add_branch_hint" }
fn add_branch_hint(&self, query: &str) -> String {
format!("/*helios:branch=analytics*/ {}", query)
}

2. Time-Travel Query Enhancement

Auto-add time-travel clauses:

[[rewriter.rules]]
id = "add_time_travel"
description = "Add AS OF clause for audit queries"
pattern = { regex = "SELECT .* FROM audit_log.*" }
condition = { session_var = { name = "audit.as_of", exists = true } }
transformation = { custom = "add_as_of_clause" }

3. Vector Query Optimization

Optimize vector search queries:

pub fn optimize_vector_query(&self, query: &str) -> String {
let mut ast = self.parser.parse(query)?;
// Add ef_search hint for HNSW
if self.is_ann_query(&ast) {
self.add_hint(&mut ast, "ef_search", "100");
}
// Rewrite to use index
if self.can_use_hnsw_index(&ast) {
self.add_index_hint(&mut ast, "embeddings", "idx_embeddings_hnsw");
}
self.parser.to_sql(&ast)
}

4. Sync Mode Query Hints

Add sync mode hints based on query type:

[[rewriter.rules]]
id = "critical_read_sync"
description = "Route critical reads to sync standbys"
pattern = { regex = "SELECT .* FROM (accounts|transactions|balances).*" }
transformation = { custom = "add_sync_hint" }

Implementation Notes

File Locations

src/proxy/
├── rewriter/
│ ├── mod.rs # Public API
│ ├── rewriter.rs # QueryRewriter
│ ├── rules.rs # RewriteRule, QueryPattern, Transformation
│ ├── matcher.rs # RuleMatcher
│ ├── transformer.rs # TransformationEngine
│ ├── parser.rs # SQL parser integration
│ └── metrics.rs # Rewrite metrics

Key Considerations

  1. Correctness: Ensure rewrites preserve query semantics. Test thoroughly.

  2. Performance: Rewriting adds latency. Cache rewritten queries by fingerprint.

  3. Debugging: Log original and rewritten queries for troubleshooting.

  4. Rollback: Easy way to disable rules if they cause issues.

  5. Prepared Statements: Handle parameterized queries correctly.

SQL Parser Integration

// Using sqlparser-rs
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
pub struct SqlParser {
dialect: PostgreSqlDialect,
}
impl SqlParser {
pub fn parse(&self, sql: &str) -> Result<Vec<Statement>, ParseError> {
Parser::parse_sql(&self.dialect, sql)
}
pub fn to_sql(&self, statements: &[Statement]) -> String {
statements.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join("; ")
}
}

Performance Targets

MetricTargetMeasurement
Rule matching<100μsp99
Query transformation<500μsp99
Overall rewrite overhead<1msp99
Cache hit for fingerprint>90%for repeated queries