Skip to content

Feature 07: Query Analytics & Slow Query Log

Feature 07: Query Analytics & Slow Query Log

Priority: Medium | Complexity: Medium | Phase: 2 (Resilience)


Overview

Problem Statement

Understanding query patterns is essential for optimization:

  • Which queries consume the most resources?
  • What’s causing slow response times?
  • Are there N+1 query patterns?
  • Which queries should be cached?

Without proxy-level analytics:

  • Must enable pg_stat_statements on each node
  • No visibility into cross-node patterns
  • Can’t correlate queries with client behavior
  • Hard to attribute costs to applications/users

Solution

Implement comprehensive query analytics at the proxy layer:

┌─────────────────────────────────────────────────┐
│ QUERY ANALYTICS ENGINE │
│ │
Query ───────────►│ ┌──────────────────────────────────────────┐ │
│ │ 1. Query Fingerprinting │ │
│ │ - Normalize query │ │
│ │ - Extract pattern │ │
│ │ - Compute fingerprint hash │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 2. Metrics Collection │ │
│ │ - Execution time │ │
│ │ - Rows returned │ │
│ │ - Bytes transferred │ │
│ │ - Error rate │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 3. Pattern Analysis │ │
│ │ - N+1 detection │ │
│ │ - Query clustering │ │
│ │ - Anomaly detection │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 4. Reporting & Alerts │ │
│ │ - Top queries dashboard │ │
│ │ - Slow query log │ │
│ │ - Performance alerts │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘

Architecture

Query Fingerprinter

pub struct QueryFingerprinter {
parser: SqlParser,
}
impl QueryFingerprinter {
/// Generate fingerprint from query
pub fn fingerprint(&self, query: &str) -> QueryFingerprint {
let normalized = self.normalize(query);
let hash = xxhash64(&normalized);
QueryFingerprint {
hash,
normalized,
tables: self.extract_tables(query),
operation: self.detect_operation(query),
}
}
/// Normalize query (remove literals, standardize whitespace)
fn normalize(&self, query: &str) -> String {
let mut normalized = query.to_string();
// Replace string literals with ?
normalized = regex_replace!(r"'[^']*'", "?", &normalized);
// Replace numeric literals with ?
normalized = regex_replace!(r"\b\d+\b", "?", &normalized);
// Replace IN lists with (?)
normalized = regex_replace!(r"IN\s*\([^)]+\)", "IN (?)", &normalized);
// Normalize whitespace
normalized = regex_replace!(r"\s+", " ", &normalized);
normalized.trim().to_lowercase()
}
}
#[derive(Debug, Clone)]
pub struct QueryFingerprint {
/// 64-bit hash of normalized query
pub hash: u64,
/// Normalized query text
pub normalized: String,
/// Tables involved
pub tables: Vec<String>,
/// Operation type
pub operation: OperationType,
}

Query Statistics

pub struct QueryStatistics {
/// Query fingerprint
pub fingerprint: QueryFingerprint,
/// Call count
pub calls: AtomicU64,
/// Total execution time
pub total_time: AtomicU64,
/// Min/Max execution time
pub min_time: AtomicU64,
pub max_time: AtomicU64,
/// Rows returned (total)
pub rows: AtomicU64,
/// Errors
pub errors: AtomicU64,
/// Histogram buckets (for percentiles)
pub latency_histogram: Histogram,
/// First/last seen
pub first_seen: Instant,
pub last_seen: AtomicU64,
/// User attribution
pub users: DashMap<String, u64>,
/// Client attribution
pub clients: DashMap<String, u64>,
}
impl QueryStatistics {
pub fn record(&self, execution: &QueryExecution) {
self.calls.fetch_add(1, Ordering::Relaxed);
self.total_time.fetch_add(execution.duration.as_micros() as u64, Ordering::Relaxed);
self.rows.fetch_add(execution.rows as u64, Ordering::Relaxed);
if execution.error.is_some() {
self.errors.fetch_add(1, Ordering::Relaxed);
}
// Update min/max
loop {
let current_min = self.min_time.load(Ordering::Relaxed);
let duration = execution.duration.as_micros() as u64;
if duration < current_min {
if self.min_time.compare_exchange(
current_min, duration, Ordering::SeqCst, Ordering::Relaxed
).is_ok() {
break;
}
} else {
break;
}
}
// Record in histogram
self.latency_histogram.record(execution.duration);
// Update user/client attribution
*self.users.entry(execution.user.clone()).or_insert(0) += 1;
*self.clients.entry(execution.client_ip.clone()).or_insert(0) += 1;
self.last_seen.store(now_nanos(), Ordering::Relaxed);
}
pub fn avg_time(&self) -> Duration {
let total = self.total_time.load(Ordering::Relaxed);
let calls = self.calls.load(Ordering::Relaxed);
Duration::from_micros(total / calls.max(1))
}
pub fn p99_time(&self) -> Duration {
self.latency_histogram.percentile(0.99)
}
}

Slow Query Log

pub struct SlowQueryLog {
/// Threshold for slow queries
threshold: Duration,
/// Log file writer
writer: Arc<Mutex<BufWriter<File>>>,
/// In-memory buffer for recent queries
recent: VecDeque<SlowQueryEntry>,
/// Max entries in memory
max_recent: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct SlowQueryEntry {
pub timestamp: DateTime<Utc>,
pub fingerprint_hash: u64,
pub query: String,
pub duration: Duration,
pub rows: usize,
pub user: String,
pub client_ip: String,
pub database: String,
pub node: String,
pub parameters: Option<Vec<String>>,
}
impl SlowQueryLog {
pub fn log_if_slow(&self, execution: &QueryExecution) {
if execution.duration < self.threshold {
return;
}
let entry = SlowQueryEntry {
timestamp: Utc::now(),
fingerprint_hash: execution.fingerprint.hash,
query: execution.query.clone(),
duration: execution.duration,
rows: execution.rows,
user: execution.user.clone(),
client_ip: execution.client_ip.clone(),
database: execution.database.clone(),
node: execution.node.clone(),
parameters: execution.parameters.clone(),
};
// Write to log file
if let Ok(mut writer) = self.writer.lock() {
let json = serde_json::to_string(&entry).unwrap();
writeln!(writer, "{}", json).ok();
}
// Keep in memory for API access
self.recent.push_back(entry);
while self.recent.len() > self.max_recent {
self.recent.pop_front();
}
}
}

Pattern Detector

pub struct PatternDetector {
/// Recent queries per session (for N+1 detection)
session_queries: DashMap<SessionId, VecDeque<QueryExecution>>,
/// Pattern configurations
config: PatternConfig,
}
impl PatternDetector {
/// Detect N+1 query pattern
pub fn detect_n_plus_one(&self, session: &str) -> Option<NplusOnePattern> {
let queries = self.session_queries.get(session)?;
// Look for repeated similar queries in short time window
let window = Duration::from_secs(1);
let recent: Vec<_> = queries.iter()
.filter(|q| q.timestamp.elapsed() < window)
.collect();
// Group by fingerprint
let mut by_fingerprint: HashMap<u64, Vec<_>> = HashMap::new();
for q in &recent {
by_fingerprint.entry(q.fingerprint.hash)
.or_default()
.push(q);
}
// Find patterns with many repetitions
for (hash, group) in by_fingerprint {
if group.len() >= self.config.n_plus_one_threshold {
return Some(NplusOnePattern {
fingerprint_hash: hash,
count: group.len(),
example_query: group[0].query.clone(),
total_time: group.iter().map(|q| q.duration).sum(),
});
}
}
None
}
/// Detect query burst
pub fn detect_burst(&self, session: &str) -> Option<QueryBurst> {
let queries = self.session_queries.get(session)?;
let window = Duration::from_millis(100);
let count = queries.iter()
.filter(|q| q.timestamp.elapsed() < window)
.count();
if count >= self.config.burst_threshold {
Some(QueryBurst {
queries_per_second: count as f64 / window.as_secs_f64(),
session: session.to_string(),
})
} else {
None
}
}
}

API Specification

Configuration (heliosproxy.toml)

[analytics]
enabled = true
# Query fingerprinting
normalize_queries = true
track_parameters = false # Store parameter values (privacy)
# Statistics retention
retention = "7d"
max_fingerprints = 10000
# Slow query log
[analytics.slow_query]
enabled = true
threshold = "1s"
log_file = "/var/log/heliosproxy/slow.log"
log_parameters = false # Privacy consideration
max_query_length = 4096
# Pattern detection
[analytics.patterns]
n_plus_one_detection = true
n_plus_one_threshold = 5 # 5+ similar queries = N+1
burst_detection = true
burst_threshold = 50 # 50 queries/100ms = burst
# Sampling (for high-volume)
[analytics.sampling]
enabled = false
rate = 0.1 # Sample 10% of queries
# Alerting
[analytics.alerts]
slow_query_threshold = "5s"
error_rate_threshold = 0.05 # 5% error rate
webhook_url = "https://alerts.example.com/webhook"

Admin API

GET /analytics/top-queries?orderBy=total_time&limit=10
{
"queries": [
{
"fingerprint_hash": "abc123",
"normalized": "SELECT * FROM users WHERE id = ?",
"calls": 15234,
"total_time_ms": 45678,
"avg_time_ms": 3.0,
"p99_time_ms": 15.2,
"rows": 15234,
"errors": 5,
"tables": ["users"],
"first_seen": "2026-01-20T00:00:00Z",
"last_seen": "2026-01-25T10:30:00Z"
}
]
}
GET /analytics/slow-queries?since=1h&limit=50
{
"queries": [
{
"timestamp": "2026-01-25T10:29:55Z",
"query": "SELECT * FROM orders WHERE ...",
"duration_ms": 5234,
"rows": 10000,
"user": "app_user",
"client_ip": "192.168.1.100",
"node": "primary"
}
]
}
GET /analytics/patterns
{
"n_plus_one": [
{
"fingerprint_hash": "def456",
"query": "SELECT * FROM products WHERE id = ?",
"occurrences": 47,
"session": "session_123",
"total_time_ms": 234
}
],
"bursts": [
{
"session": "session_456",
"qps": 150,
"detected_at": "2026-01-25T10:28:00Z"
}
]
}
GET /analytics/histogram?fingerprint=abc123
# Latency distribution for specific query
{
"buckets": [
{"le": 1, "count": 100},
{"le": 5, "count": 500},
{"le": 10, "count": 150},
{"le": 50, "count": 30},
{"le": 100, "count": 10},
{"le": "+Inf", "count": 5}
],
"p50": 3.2,
"p90": 12.5,
"p99": 45.3
}
GET /analytics/by-user?user=app_user&since=1h
# Query stats per user
GET /analytics/by-table?table=users&since=1h
# Query stats per table
POST /analytics/explain
# Get query plan for fingerprint
{ "fingerprint_hash": "abc123" }

Prometheus Metrics

# Query counts by fingerprint
heliosproxy_query_total{fingerprint="abc123",user="app",database="heliosdb"} 15234
# Query latency histogram
heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="0.001"} 100
heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="0.01"} 500
heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="0.1"} 650
heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="1"} 700
heliosproxy_query_duration_seconds_bucket{fingerprint="abc123",le="+Inf"} 710
# Slow queries
heliosproxy_slow_queries_total{database="heliosdb"} 45
# Pattern detections
heliosproxy_n_plus_one_detected_total 12
heliosproxy_burst_detected_total 3

AI/Agent Innovations

1. Query Intent Classification

Classify queries by AI workload type:

pub struct QueryClassifier {
/// Classification rules
rules: Vec<ClassificationRule>,
}
impl QueryClassifier {
pub fn classify(&self, query: &str) -> QueryIntent {
// Embedding search
if query.contains("<->") || query.contains("vector") {
return QueryIntent::EmbeddingSearch;
}
// Knowledge retrieval
if query.contains("documents") || query.contains("chunks") {
return QueryIntent::KnowledgeRetrieval;
}
// Conversation context
if query.contains("conversation") || query.contains("turns") {
return QueryIntent::ConversationContext;
}
// Tool execution
if query.contains("tool_results") || query.contains("actions") {
return QueryIntent::ToolExecution;
}
QueryIntent::General
}
}
// Analytics grouped by intent
GET /analytics/by-intent
{
"embedding_search": {
"calls": 50000,
"avg_time_ms": 5.2,
"cache_hit_ratio": 0.85
},
"knowledge_retrieval": {
"calls": 30000,
"avg_time_ms": 12.3,
"cache_hit_ratio": 0.72
}
}

2. RAG Pipeline Analytics

Track RAG-specific patterns:

pub struct RagAnalytics {
/// Embedding queries per retrieval
embeddings_per_retrieval: Histogram,
/// Chunks retrieved per query
chunks_per_query: Histogram,
/// Time spent in each RAG stage
stage_timings: HashMap<RagStage, Histogram>,
}
GET /analytics/rag
{
"avg_embeddings_per_retrieval": 3.2,
"avg_chunks_per_query": 15.4,
"stage_breakdown": {
"embedding": { "avg_ms": 5.2, "p99_ms": 20.1 },
"retrieval": { "avg_ms": 12.3, "p99_ms": 45.6 },
"reranking": { "avg_ms": 8.7, "p99_ms": 30.2 }
}
}

3. Agentic Workflow Tracing

Track multi-step agent workflows:

pub struct WorkflowTracer {
/// Active workflows
workflows: DashMap<WorkflowId, WorkflowTrace>,
}
#[derive(Debug)]
pub struct WorkflowTrace {
pub id: WorkflowId,
pub started_at: Instant,
pub steps: Vec<WorkflowStep>,
pub total_queries: u32,
pub total_time: Duration,
}
GET /analytics/workflows?since=1h
{
"workflows": [
{
"id": "wf_123",
"duration_ms": 2345,
"steps": 5,
"queries": 12,
"status": "completed"
}
],
"avg_workflow_duration_ms": 1500,
"avg_steps_per_workflow": 4.2
}

4. Cost Attribution

Attribute query costs to AI operations:

pub struct CostAttribution {
/// Cost per query fingerprint
costs: DashMap<u64, f64>,
}
impl CostAttribution {
pub fn estimate_cost(&self, fingerprint: u64, duration: Duration) -> f64 {
let base_cost = 0.0001; // $0.0001 per query
let time_cost = duration.as_secs_f64() * 0.001; // $0.001 per second
base_cost + time_cost
}
}
GET /analytics/costs?by=agent
{
"agents": [
{
"agent_id": "agent_1",
"queries": 5000,
"total_time_seconds": 125.4,
"estimated_cost_usd": 0.52
}
]
}

HeliosDB-Lite Integration

1. Branch-Aware Analytics

Track queries per branch:

GET /analytics/by-branch
{
"main": {
"calls": 50000,
"avg_time_ms": 3.2
},
"analytics": {
"calls": 10000,
"avg_time_ms": 45.6
}
}

2. Sync Mode Correlation

Correlate query performance with sync mode:

GET /analytics/by-sync-mode
{
"sync": {
"calls": 30000,
"avg_time_ms": 2.1,
"write_time_overhead_ms": 5.3
},
"async": {
"calls": 20000,
"avg_time_ms": 1.8,
"lag_at_query_time_ms": 150
}
}

3. Vector Query Analytics

Track vector search performance:

GET /analytics/vector
{
"ann_queries": {
"calls": 10000,
"avg_time_ms": 5.2,
"avg_k": 10,
"avg_ef_search": 100,
"cache_hit_ratio": 0.75
},
"index_stats": {
"index_size_mb": 512,
"vectors_indexed": 1000000
}
}

4. Time-Travel Query Analytics

Track historical query patterns:

GET /analytics/time-travel
{
"as_of_queries": {
"calls": 5000,
"avg_time_ms": 8.5,
"time_range_accessed": {
"min": "2025-01-01T00:00:00Z",
"max": "2026-01-25T00:00:00Z"
}
}
}

Implementation Notes

File Locations

src/proxy/
├── analytics/
│ ├── mod.rs # Public API
│ ├── fingerprinter.rs # Query fingerprinting
│ ├── statistics.rs # QueryStatistics
│ ├── slow_log.rs # SlowQueryLog
│ ├── patterns.rs # PatternDetector
│ ├── histogram.rs # Latency histogram
│ └── metrics.rs # Prometheus metrics

Key Considerations

  1. Memory Pressure: Limit fingerprint count, use approximate data structures (HyperLogLog, Count-Min Sketch).

  2. Sampling: For high-volume workloads, sample queries to reduce overhead.

  3. Privacy: Option to redact/hash parameter values and query text.

  4. Cardinality: Beware of high-cardinality labels in Prometheus metrics.

  5. Storage: Rotate slow query logs, export to external analytics.

Performance Targets

MetricTargetMeasurement
Fingerprinting overhead<50μsp99
Stats recording<10μsp99
Pattern detection<1msper batch
Memory per fingerprint<10KBaverage