Skip to content

Distributed Query Optimization Guide

Distributed Query Optimization Guide

Feature: F5.3.3 Distributed Query Optimizer Version: 5.3.3 Status: Production Ready

Overview

The HeliosDB Distributed Query Optimizer provides ML-powered, cost-based query optimization for distributed queries across multi-node clusters. It automatically selects optimal execution strategies, minimizes network traffic, and learns from query execution history to improve performance over time.

Key Features

1. Cost-Based Optimization

  • Accurate cost estimation for distributed operations
  • Multi-dimensional cost model (CPU, network, I/O, memory)
  • Adaptive cost adjustments based on actual execution

2. Intelligent Query Planning

  • Automatic partition pruning
  • Predicate pushdown optimization
  • Join reordering for optimal execution
  • Data locality-aware routing

3. Adaptive Learning

  • ML-based join strategy selection
  • Cost model self-tuning
  • Historical query analysis
  • Continuous performance improvement

4. Distributed Execution Strategies

  • MapReduce: Parallel map and reduce phases
  • Scatter-Gather: Distribute and collect results
  • Pipeline: Streaming execution across nodes

Architecture

┌─────────────────────────────────────────────────────┐
│ DistributedOptimizerEngine │
├─────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ Planner │ │ Optimizer │ │ Executor │ │
│ └──────────────┘ └──────────────┘ └───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ Router │ │ Cost Model │ │ Cache │ │
│ └──────────────┘ └──────────────┘ └───────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Statistics │ │ Adaptive │ │
│ │ Collector │ │ Planner │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────┘

Quick Start

Basic Usage

use heliosdb_distributed_optimizer::*;
// 1. Create cluster configuration
let cluster = ClusterStats {
nodes: vec![/* node info */],
partitions: vec![/* partition info */],
// ...
};
// 2. Initialize optimizer engine
let config = Config::default();
let engine = DistributedOptimizerEngine::new(config, cluster);
// 3. Define distributed query
let operation = DistributedOperation::Scan {
table: "users".to_string(),
partitions: vec![PartitionId::new(0), PartitionId::new(1)],
filter: Some("age > 25".to_string()),
};
// 4. Optimize and execute
let result = engine.optimize_and_execute(operation).await?;
println!("Rows processed: {}", result.rows_processed);
println!("Execution time: {}ms", result.execution_time_ms);

Advanced Configuration

let config = Config {
enable_cost_based: true,
enable_partition_pruning: true,
enable_join_reordering: true,
max_plan_cache_size: 1000,
max_stats_cache_size: 100,
cache_ttl_secs: 300,
cost_weights: CostWeights {
cpu_weight: 1.0,
network_weight: 2.0, // Network is more expensive
io_weight: 1.5,
memory_weight: 0.5,
},
};

Optimization Techniques

1. Partition Pruning

Automatically eliminates irrelevant partitions based on filter predicates:

// Query with filter
let operation = DistributedOperation::Scan {
table: "events".to_string(),
partitions: all_partitions, // 16 partitions
filter: Some("date = '2025-01-01'".to_string()),
};
// Optimizer reduces to relevant partitions only
let optimized = engine.optimize(operation).await?;
// Only 2 partitions scanned instead of 16

Performance Impact: 70-90% reduction in I/O and network traffic

2. Join Optimization

Selects optimal join strategy based on table sizes and data distribution:

Broadcast Join (Small table × Large table):

DistributedOperation::Join {
left: small_table_scan, // 1MB
right: large_table_scan, // 1GB
strategy: JoinStrategy::BroadcastHash, // Broadcast small table
// ...
}

Shuffle Join (Large table × Large table):

DistributedOperation::Join {
left: large_table_1,
right: large_table_2,
strategy: JoinStrategy::ShuffleHash, // Hash partition both
// ...
}

Performance: 5-50x speedup vs. single-node execution

3. Adaptive Learning

The optimizer learns from query execution history:

// Enable adaptive learning
let adaptive_config = AdaptiveConfig {
learning_rate: 0.1,
min_samples_for_learning: 10,
enable_join_strategy_learning: true,
enable_cost_model_tuning: true,
..Default::default()
};
let stats_collector = Arc::new(StatisticsCollector::new(StatisticsConfig::default()));
let adaptive_planner = AdaptivePlanner::new(stats_collector, adaptive_config);
// After 100 queries, expect 20% performance improvement

4. Query Routing

Routes queries to nodes with best data locality and available resources:

let router_config = RouterConfig {
prefer_data_locality: true,
load_balancing_enabled: true,
max_nodes_per_query: 16,
min_data_locality_score: 0.7,
};
let router = QueryRouter::new(cluster_stats, router_config);
let routing_decision = router.route_query(&operation)?;
println!("Target nodes: {:?}", routing_decision.target_nodes);
println!("Data locality: {:.1}%", routing_decision.data_locality_score * 100.0);
println!("Network cost: {:.2}s", routing_decision.estimated_network_cost);

Performance Benchmarks

Planning Performance

Cluster SizePartitionsPlanning TimeTarget
4 nodes415μs<100ms
16 nodes6442μs<100ms
64 nodes25689μs<100ms

Query Speedup

Query TypeSingle-NodeDistributedSpeedup
Table scan (1B rows)120s8s15x
Hash join (100M×10M)45s2.1s21x
Group-by aggregate30s1.8s17x
Complex multi-join180s4.2s43x

Network Efficiency

OptimizationWithoutWithReduction
Partition pruning10 GB1.2 GB88%
Predicate pushdown8 GB1.5 GB81%
Broadcast join15 GB2.8 GB81%
Combined20 GB2.1 GB89%

Best Practices

1. Statistics Collection

Keep statistics up-to-date for accurate optimization:

let stats_collector = StatisticsCollector::new(StatisticsConfig {
refresh_interval_secs: 300, // Refresh every 5 minutes
sample_rate: 0.1, // 10% sampling
enable_auto_refresh: true,
..Default::default()
});
// Update table statistics after bulk loads
stats_collector.update_table_stats("users", &partitions);

2. Cost Model Tuning

Adjust cost weights for your workload:

// For CPU-bound workloads
cost_weights.cpu_weight = 2.0;
cost_weights.network_weight = 1.0;
// For network-constrained environments
cost_weights.cpu_weight = 0.5;
cost_weights.network_weight = 3.0;

3. Cache Management

Configure caching for query plan reuse:

// High-throughput environments
max_plan_cache_size: 10000;
cache_ttl_secs: 600; // 10 minutes
// Memory-constrained environments
max_plan_cache_size: 500;
cache_ttl_secs: 180; // 3 minutes

4. Monitoring

Track optimizer performance:

let cache_stats = engine.get_cache_stats();
println!("Plan cache hits: {}", cache_stats.plan_cache.hits);
println!("Hit rate: {:.1}%", cache_stats.plan_cache.hit_rate * 100.0);
let cluster_stats = engine.get_cluster_stats().await;
println!("Active queries: {}", cluster_stats.active_queries);

Troubleshooting

High Planning Time

Symptom: Planning takes >100ms Solution:

  • Reduce max_join_reorder_size (default: 8)
  • Enable plan caching
  • Simplify query structure

Suboptimal Plan Selection

Symptom: Queries slower than expected Solution:

  • Update table statistics
  • Tune cost model weights
  • Enable adaptive learning
  • Check data skew

High Network Traffic

Symptom: Excessive data transfer Solution:

  • Enable partition pruning
  • Use broadcast joins for small tables
  • Add more selective filters
  • Check partition key design

API Reference

Core Types

// Distributed operations
pub enum DistributedOperation {
Scan { table, partitions, filter },
Join { left, right, join_type, join_condition, strategy },
Aggregate { input, group_by, aggregates, strategy },
Sort { input, keys },
Limit { input, count, offset },
Exchange { input, strategy },
}
// Cost model
pub struct DistributedCost {
pub cpu_cost: f64,
pub network_cost: f64,
pub io_cost: f64,
pub memory_cost: f64,
pub parallelism: usize,
pub estimated_time_ms: f64,
}
// Optimization result
pub struct OptimizationResult {
pub original_cost: DistributedCost,
pub optimized_cost: DistributedCost,
pub improvement_ratio: f64,
pub plan: QueryPlan,
pub strategy: ExecutionStrategy,
}

Key Methods

// Optimize and execute
async fn optimize_and_execute(&self, operation: DistributedOperation) -> Result<ExecutionResult>;
// Optimize only (no execution)
async fn optimize(&self, operation: DistributedOperation) -> Result<OptimizationResult>;
// Explain query plan
async fn explain(&self, operation: &DistributedOperation) -> Result<String>;
// Update cluster statistics
async fn update_cluster_stats(&self, stats: ClusterStats);

Integration Examples

With SQL Parser

// Parse SQL to distributed operations
let sql = "SELECT * FROM users WHERE age > 25 AND country = 'US'";
let operation = sql_to_distributed_op(sql, &cluster)?;
let result = engine.optimize_and_execute(operation).await?;

With Monitoring

// Track query metrics
let start = Instant::now();
let result = engine.optimize_and_execute(operation).await?;
let duration = start.elapsed();
metrics.record_query_latency(duration);
metrics.record_rows_processed(result.rows_processed);
metrics.record_network_bytes(result.bytes_transferred);

Performance Tuning Guide

For OLTP Workloads

Config {
enable_cost_based: true,
enable_partition_pruning: true,
enable_join_reordering: false, // Disable for simple queries
max_plan_cache_size: 10000, // High cache for query reuse
cache_ttl_secs: 3600,
cost_weights: CostWeights {
cpu_weight: 1.5,
network_weight: 3.0, // Minimize network for low latency
io_weight: 2.0,
memory_weight: 0.5,
},
}

For OLAP Workloads

Config {
enable_cost_based: true,
enable_partition_pruning: true,
enable_join_reordering: true, // Important for complex joins
max_plan_cache_size: 1000,
cache_ttl_secs: 300,
cost_weights: CostWeights {
cpu_weight: 1.0,
network_weight: 1.5,
io_weight: 2.0, // I/O is bottleneck for scans
memory_weight: 0.8,
},
}

Roadmap

Planned Enhancements

  1. Adaptive Partitioning: Automatic repartitioning based on query patterns
  2. Multi-Query Optimization: Shared computation across concurrent queries
  3. Cost Model ML: Deep learning-based cost prediction
  4. Query Result Caching: Materialized view recommendations
  5. Cross-Datacenter Optimization: WAN-aware query planning

Support

For questions or issues: