Feature 07: Query Analytics & Slow Query Log
Feature 07: Query Analytics & Slow Query Log
Priority: Medium | Complexity: Medium | Phase: 2 (Resilience)
Overview
Problem Statement
Understanding query patterns is essential for optimization:
- Which queries consume the most resources?
- What’s causing slow response times?
- Are there N+1 query patterns?
- Which queries should be cached?
Without proxy-level analytics:
- Must enable
pg_stat_statementson each node - No visibility into cross-node patterns
- Can’t correlate queries with client behavior
- Hard to attribute costs to applications/users
Solution
Implement comprehensive query analytics at the proxy layer:
┌─────────────────────────────────────────────────┐ │ QUERY ANALYTICS ENGINE │ │ │ Query ───────────►│ ┌──────────────────────────────────────────┐ │ │ │ 1. Query Fingerprinting │ │ │ │ - Normalize query │ │ │ │ - Extract pattern │ │ │ │ - Compute fingerprint hash │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 2. Metrics Collection │ │ │ │ - Execution time │ │ │ │ - Rows returned │ │ │ │ - Bytes transferred │ │ │ │ - Error rate │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 3. Pattern Analysis │ │ │ │ - N+1 detection │ │ │ │ - Query clustering │ │ │ │ - Anomaly detection │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 4. Reporting & Alerts │ │ │ │ - Top queries dashboard │ │ │ │ - Slow query log │ │ │ │ - Performance alerts │ │ │ └──────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘Architecture
Query Fingerprinter
pub struct QueryFingerprinter { parser: SqlParser,}
impl QueryFingerprinter { /// Generate fingerprint from query pub fn fingerprint(&self, query: &str) -> QueryFingerprint { let normalized = self.normalize(query); let hash = xxhash64(&normalized);
QueryFingerprint { hash, normalized, tables: self.extract_tables(query), operation: self.detect_operation(query), } }
/// Normalize query (remove literals, standardize whitespace) fn normalize(&self, query: &str) -> String { let mut normalized = query.to_string();
// Replace string literals with ? normalized = regex_replace!(r"'[^']*'", "?", &normalized);
// Replace numeric literals with ? normalized = regex_replace!(r"\b\d+\b", "?", &normalized);
// Replace IN lists with (?) normalized = regex_replace!(r"IN\s*\([^)]+\)", "IN (?)", &normalized);
// Normalize whitespace normalized = regex_replace!(r"\s+", " ", &normalized);
normalized.trim().to_lowercase() }}
#[derive(Debug, Clone)]pub struct QueryFingerprint { /// 64-bit hash of normalized query pub hash: u64,
/// Normalized query text pub normalized: String,
/// Tables involved pub tables: Vec<String>,
/// Operation type pub operation: OperationType,}Query Statistics
pub struct QueryStatistics { /// Query fingerprint pub fingerprint: QueryFingerprint,
/// Call count pub calls: AtomicU64,
/// Total execution time pub total_time: AtomicU64,
/// Min/Max execution time pub min_time: AtomicU64, pub max_time: AtomicU64,
/// Rows returned (total) pub rows: AtomicU64,
/// Errors pub errors: AtomicU64,
/// Histogram buckets (for percentiles) pub latency_histogram: Histogram,
/// First/last seen pub first_seen: Instant, pub last_seen: AtomicU64,
/// User attribution pub users: DashMap<String, u64>,
/// Client attribution pub clients: DashMap<String, u64>,}
impl QueryStatistics { pub fn record(&self, execution: &QueryExecution) { self.calls.fetch_add(1, Ordering::Relaxed); self.total_time.fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed); self.rows.fetch_add(execution.rows as u64, Ordering::Relaxed);
if execution.error.is_some() { self.errors.fetch_add(1, Ordering::Relaxed); }
// Update min/max loop { let current_min = self.min_time.load(Ordering::Relaxed); let duration = execution.duration.as_micros() as u64; if duration < current_min { if self.min_time.compare_exchange( current_min, duration, Ordering::SeqCst, Ordering::Relaxed ).is_ok() { break; } } else { break; } }
// Record in histogram self.latency_histogram.record(execution.duration);
// Update user/client attribution *self.users.entry(execution.user.clone()).or_insert(0) += 1; *self.clients.entry(execution.client_ip.clone()).or_insert(0) += 1;
self.last_seen.store(now_nanos(), Ordering::Relaxed); }
pub fn avg_time(&self) -> Duration { let total = self.total_time.load(Ordering::Relaxed); let calls = self.calls.load(Ordering::Relaxed); Duration::from_micros(total / calls.max(1)) }
pub fn p99_time(&self) -> Duration { self.latency_histogram.percentile(0.99) }}Slow Query Log
pub struct SlowQueryLog { /// Threshold for slow queries threshold: Duration,
/// Log file writer writer: Arc<Mutex<BufWriter<File>>>,
/// In-memory buffer for recent queries recent: VecDeque<SlowQueryEntry>,
/// Max entries in memory max_recent: usize,}
#[derive(Debug, Clone, Serialize)]pub struct SlowQueryEntry { pub timestamp: DateTime<Utc>, pub fingerprint_hash: u64, pub query: String, pub duration: Duration, pub rows: usize, pub user: String, pub client_ip: String, pub database: String, pub node: String, pub parameters: Option<Vec<String>>,}
impl SlowQueryLog { pub fn log_if_slow(&self, execution: &QueryExecution) { if execution.duration < self.threshold { return; }
let entry = SlowQueryEntry { timestamp: Utc::now(), fingerprint_hash: execution.fingerprint.hash, query: execution.query.clone(), duration: execution.duration, rows: execution.rows, user: execution.user.clone(), client_ip: execution.client_ip.clone(), database: execution.database.clone(), node: execution.node.clone(), parameters: execution.parameters.clone(), };
// Write to log file if let Ok(mut writer) = self.writer.lock() { let json = serde_json::to_string(&entry).unwrap(); writeln!(writer, "{}", json).ok(); }
// Keep in memory for API access self.recent.push_back(entry); while self.recent.len() > self.max_recent { self.recent.pop_front(); } }}Pattern Detector
pub struct PatternDetector { /// Recent queries per session (for N+1 detection) session_queries: DashMap<SessionId, VecDeque<QueryExecution>>,
/// Pattern configurations config: PatternConfig,}
impl PatternDetector { /// Detect N+1 query pattern pub fn detect_n_plus_one(&self, session: &str) -> Option<NplusOnePattern> { let queries = self.session_queries.get(session)?;
// Look for repeated similar queries in short time window let window = Duration::from_secs(1); let recent: Vec<_> = queries.iter() .filter(|q| q.timestamp.elapsed() < window) .collect();
// Group by fingerprint let mut by_fingerprint: HashMap<u64, Vec<_>> = HashMap::new(); for q in &recent { by_fingerprint.entry(q.fingerprint.hash) .or_default() .push(q); }
// Find patterns with many repetitions for (hash, group) in by_fingerprint { if group.len() >= self.config.n_plus_one_threshold { return Some(NplusOnePattern { fingerprint_hash: hash, count: group.len(), example_query: group[0].query.clone(), total_time: group.iter().map(|q| q.duration).sum(), }); } }
None }
/// Detect query burst pub fn detect_burst(&self, session: &str) -> Option<QueryBurst> { let queries = self.session_queries.get(session)?;
let window = Duration::from_millis(100); let count = queries.iter() .filter(|q| q.timestamp.elapsed() < window) .count();
if count >= self.config.burst_threshold { Some(QueryBurst { queries_per_second: count as f64 / window.as_secs_f64(), session: session.to_string(), }) } else { None } }}API Specification
Configuration (heliosproxy.toml)
[analytics]enabled = true
# Query fingerprintingnormalize_queries = truetrack_parameters = false # Store parameter values (privacy)
# Statistics retentionretention = "7d"max_fingerprints = 10000
# Slow query log[analytics.slow_query]enabled = truethreshold = "1s"log_file = "/var/log/heliosproxy/slow.log"log_parameters = false # Privacy considerationmax_query_length = 4096
# Pattern detection[analytics.patterns]n_plus_one_detection = truen_plus_one_threshold = 5 # 5+ similar queries = N+1burst_detection = trueburst_threshold = 50 # 50 queries/100ms = burst
# Sampling (for high-volume)[analytics.sampling]enabled = falserate = 0.1 # Sample 10% of queries
# Alerting[analytics.alerts]slow_query_threshold = "5s"error_rate_threshold = 0.05 # 5% error ratewebhook_url = "https://alerts.example.com/webhook"Admin API
GET /analytics/top-queries?orderBy=total_time&limit=10{ "queries": [ { "fingerprint_hash": "abc123", "normalized": "SELECT * FROM users WHERE id = ?", "calls": 15234, "total_time_ms": 45678, "avg_time_ms": 3.0, "p99_time_ms": 15.2, "rows": 15234, "errors": 5, "tables": ["users"], "first_seen": "2026-01-20T00:00:00Z", "last_seen": "2026-01-25T10:30:00Z" } ]}
GET /analytics/slow-queries?since=1h&limit=50{ "queries": [ { "timestamp": "2026-01-25T10:29:55Z", "query": "SELECT * FROM orders WHERE ...", "duration_ms": 5234, "rows": 10000, "user": "app_user", "client_ip": "192.168.1.100", "node": "primary" } ]}
GET /analytics/patterns{ "n_plus_one": [ { "fingerprint_hash": "def456", "query": "SELECT * FROM products WHERE id = ?", "occurrences": 47, "session": "session_123", "total_time_ms": 234 } ], "bursts": [ { "session": "session_456", "qps": 150, "detected_at": "2026-01-25T10:28:00Z" } ]}
GET /analytics/histogram?fingerprint=abc123# Latency distribution for specific query{ "buckets": [ {"le": 1, "count": 100}, {"le": 5, "count": 500}, {"le": 10, "count": 150}, {"le": 50, "count": 30}, {"le": 100, "count": 10}, {"le": "+Inf", "count": 5} ], "p50": 3.2, "p90": 12.5, "p99": 45.3}
GET /analytics/by-user?user=app_user&since=1h# Query stats per user
GET /analytics/by-table?table=users&since=1h# Query stats per table
POST /analytics/explain# Get query plan for fingerprint{ "fingerprint_hash": "abc123" }Prometheus Metrics
# Query counts by fingerprintheliosproxy_query_total{fingerprint="abc123",user="app",database="heliosdb"} 15234
# Query latency histogramheliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="0.001"} 100heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="0.01"} 500heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="0.1"} 650heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="1"} 700heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="+Inf"} 710
# Slow queriesheliosproxy_slow_queries_total{database="heliosdb"} 45
# Pattern detectionsheliosproxy_n_plus_one_detected_total 12heliosproxy_burst_detected_total 3AI/Agent Innovations
1. Query Intent Classification
Classify queries by AI workload type:
pub struct QueryClassifier { /// Classification rules rules: Vec<ClassificationRule>,}
impl QueryClassifier { pub fn classify(&self, query: &str) -> QueryIntent { // Embedding search if query.contains("<->") || query.contains("vector") { return QueryIntent::EmbeddingSearch; }
// Knowledge retrieval if query.contains("documents") || query.contains("chunks") { return QueryIntent::KnowledgeRetrieval; }
// Conversation context if query.contains("conversation") || query.contains("turns") { return QueryIntent::ConversationContext; }
// Tool execution if query.contains("tool_results") || query.contains("actions") { return QueryIntent::ToolExecution; }
QueryIntent::General }}
// Analytics grouped by intentGET /analytics/by-intent{ "embedding_search": { "calls": 50000, "avg_time_ms": 5.2, "cache_hit_ratio": 0.85 }, "knowledge_retrieval": { "calls": 30000, "avg_time_ms": 12.3, "cache_hit_ratio": 0.72 }}2. RAG Pipeline Analytics
Track RAG-specific patterns:
pub struct RagAnalytics { /// Embedding queries per retrieval embeddings_per_retrieval: Histogram,
/// Chunks retrieved per query chunks_per_query: Histogram,
/// Time spent in each RAG stage stage_timings: HashMap<RagStage, Histogram>,}
GET /analytics/rag{ "avg_embeddings_per_retrieval": 3.2, "avg_chunks_per_query": 15.4, "stage_breakdown": { "embedding": { "avg_ms": 5.2, "p99_ms": 20.1 }, "retrieval": { "avg_ms": 12.3, "p99_ms": 45.6 }, "reranking": { "avg_ms": 8.7, "p99_ms": 30.2 } }}3. Agentic Workflow Tracing
Track multi-step agent workflows:
pub struct WorkflowTracer { /// Active workflows workflows: DashMap<WorkflowId, WorkflowTrace>,}
#[derive(Debug)]pub struct WorkflowTrace { pub id: WorkflowId, pub started_at: Instant, pub steps: Vec<WorkflowStep>, pub total_queries: u32, pub total_time: Duration,}
GET /analytics/workflows?since=1h{ "workflows": [ { "id": "wf_123", "duration_ms": 2345, "steps": 5, "queries": 12, "status": "completed" } ], "avg_workflow_duration_ms": 1500, "avg_steps_per_workflow": 4.2}4. Cost Attribution
Attribute query costs to AI operations:
pub struct CostAttribution { /// Cost per query fingerprint costs: DashMap<u64, f64>,}
impl CostAttribution { pub fn estimate_cost(&self, fingerprint: u64, duration: Duration) -> f64 { let base_cost = 0.0001; // $0.0001 per query let time_cost = duration.as_secs_f64() * 0.001; // $0.001 per second
base_cost + time_cost }}
GET /analytics/costs?by=agent{ "agents": [ { "agent_id": "agent_1", "queries": 5000, "total_time_seconds": 125.4, "estimated_cost_usd": 0.52 } ]}HeliosDB-Lite Integration
1. Branch-Aware Analytics
Track queries per branch:
GET /analytics/by-branch{ "main": { "calls": 50000, "avg_time_ms": 3.2 }, "analytics": { "calls": 10000, "avg_time_ms": 45.6 }}2. Sync Mode Correlation
Correlate query performance with sync mode:
GET /analytics/by-sync-mode{ "sync": { "calls": 30000, "avg_time_ms": 2.1, "write_time_overhead_ms": 5.3 }, "async": { "calls": 20000, "avg_time_ms": 1.8, "lag_at_query_time_ms": 150 }}3. Vector Query Analytics
Track vector search performance:
GET /analytics/vector{ "ann_queries": { "calls": 10000, "avg_time_ms": 5.2, "avg_k": 10, "avg_ef_search": 100, "cache_hit_ratio": 0.75 }, "index_stats": { "index_size_mb": 512, "vectors_indexed": 1000000 }}4. Time-Travel Query Analytics
Track historical query patterns:
GET /analytics/time-travel{ "as_of_queries": { "calls": 5000, "avg_time_ms": 8.5, "time_range_accessed": { "min": "2025-01-01T00:00:00Z", "max": "2026-01-25T00:00:00Z" } }}Implementation Notes
File Locations
src/proxy/├── analytics/│ ├── mod.rs # Public API│ ├── fingerprinter.rs # Query fingerprinting│ ├── statistics.rs # QueryStatistics│ ├── slow_log.rs # SlowQueryLog│ ├── patterns.rs # PatternDetector│ ├── histogram.rs # Latency histogram│ └── metrics.rs # Prometheus metricsKey Considerations
-
Memory Pressure: Limit fingerprint count, use approximate data structures (HyperLogLog, Count-Min Sketch).
-
Sampling: For high-volume workloads, sample queries to reduce overhead.
-
Privacy: Option to redact/hash parameter values and query text.
-
Cardinality: Beware of high-cardinality labels in Prometheus metrics.
-
Storage: Rotate slow query logs, export to external analytics.
Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Fingerprinting overhead | <50μs | p99 |
| Stats recording | <10μs | p99 |
| Pattern detection | <1ms | per batch |
| Memory per fingerprint | <10KB | average |
Related Features
- Query Caching - Cache based on analytics
- Rate Limiting - Limit based on patterns
- Query Rewriting - Optimize detected patterns