Skip to content

F3.5 Distributed Query Optimization - Architecture Design Document

F3.5 Distributed Query Optimization - Architecture Design Document

Feature ID: F3.5 ARR Value: $14M Priority: Priority 1 Complexity: High Version: v5.5 Document Version: 1.0 Created: November 9, 2025 Status: Architecture Design


Table of Contents

  1. Executive Summary
  2. System Context
  3. Architecture Overview
  4. Component Design (SPARC)
  5. Data Flow & Algorithms
  6. API Specifications
  7. Performance Targets
  8. Implementation Roadmap
  9. Integration Points
  10. Patent Analysis

Executive Summary

Mission

Design a Google Spanner-scale distributed query optimizer capable of optimizing queries across 1000+ shards in multiple regions, achieving sub-100ms planning time while minimizing network transfer and supporting both OLTP and OLAP workloads.

Key Objectives

  • Scale: Support 1000+ shards across 10+ geographic regions
  • Performance: Sub-100ms query planning for simple queries, <1s for complex queries
  • Cost Optimization: Minimize network data transfer by 60-80%
  • Reliability: Handle partial failures, support degraded mode operation
  • Flexibility: Support OLTP (low-latency) and OLAP (high-throughput) workloads

Current State Analysis

The heliosdb-distributed-optimizer crate exists with:

  • Basic cost model (CPU, network, I/O, memory)
  • Partition pruning
  • Join reordering (simplified)
  • Query hints support
  • Enhanced cost model with histograms
  • ⚠ Limited to 4-node clusters in tests
  • ⚠ Simplified join selectivity estimation
  • ⚠ No multi-region network modeling
  • ⚠ No adaptive query execution
  • ⚠ Limited streaming support

Architecture Principles

  1. Network-Aware: Model multi-region latency and bandwidth accurately
  2. Data Locality: Prefer co-located operations to minimize data movement
  3. Adaptive: Learn from execution statistics to improve future plans
  4. Fault-Tolerant: Handle node failures and network partitions gracefully
  5. Incremental: Support streaming results for OLAP queries

System Context

External Dependencies

┌─────────────────────────────────────────────────────────────┐
│ External Systems │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Metadata │ │ Sharding │ │ Compute │ │
│ │ Service │ │ Manager │ │ Executor │ │
│ │ (Raft-based) │ │ (Elastic) │ │ (Vectorized) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ │ │ │ │
│ v v v │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ F3.5 Distributed Query Optimizer │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ Planner │→ │ Optimizer │→ │ Executor │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

Integration Points

  1. heliosdb-metadata: Cluster topology, table schemas, partition locations
  2. heliosdb-sharding: Shard routing, partition assignment, data distribution
  3. heliosdb-compute: Query execution, operator implementation, expression evaluation
  4. heliosdb-cache: Result caching, metadata caching, intermediate results
  5. heliosdb-network: RPC framework, streaming, data transfer

Data Flow

SQL Query → Parser → Logical Plan → Optimizer → Physical Plan → Executor → Results
↑ ↑ ↑
Metadata Statistics Runtime Feedback

Architecture Overview

System Architecture (C4 Level 2)

┌─────────────────────────────────────────────────────────────────────┐
│ Distributed Query Optimizer │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Query Planning Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Logical │→ │ Physical │→ │ Execution │ │ │
│ │ │ Planner │ │ Planner │ │ Scheduler │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Cost Estimation Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Cardinality │ │ Network │ │ Resource │ │ │
│ │ │ Estimator │ │ Cost Model │ │ Predictor │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Join Strategy Selection Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Broadcast │ │ Shuffle │ │ Co-located │ │ │
│ │ │ Strategy │ │ Strategy │ │ Strategy │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Pushdown Optimization Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Predicate │ │ Projection │ │ Aggregate │ │ │
│ │ │ Pushdown │ │ Pushdown │ │ Pushdown │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Adaptive Execution Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Runtime │ │ Dynamic │ │ Streaming │ │ │
│ │ │ Feedback │ │ Re-planning │ │ Aggregation │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Statistics Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Histogram │ │ Bloom │ │ Sketch │ │ │
│ │ │ Collector │ │ Filters │ │ Synopses │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

Component Interaction Diagram

┌─────────────────────────────────────────────────────────────────┐
│ Query Lifecycle │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. SQL Query │
│ │ │
│ v │
│ ┌──────────────────┐ │
│ │ Logical Planner │ ← Metadata (schemas, partitions) │
│ └────────┬─────────┘ │
│ │ │
│ v │
│ ┌──────────────────┐ │
│ │ Query Optimizer │ ← Statistics (histograms, cardinality) │
│ └────────┬─────────┘ │
│ │ │
│ ├─→ Partition Pruning │
│ ├─→ Predicate Pushdown │
│ ├─→ Join Reordering │
│ └─→ Cost-Based Selection │
│ │ │
│ v │
│ ┌──────────────────┐ │
│ │ Physical Planner │ │
│ └────────┬─────────┘ │
│ │ │
│ ├─→ Join Strategy Selection │
│ │ • Broadcast Hash │
│ │ • Shuffle Hash │
│ │ • Co-located │
│ │ • Sort-Merge │
│ │ │
│ ├─→ Data Movement Planning │
│ │ • Network topology │
│ │ • Bandwidth estimation │
│ │ • Latency modeling │
│ │ │
│ └─→ Execution Scheduling │
│ • Node selection │
│ • Parallelism degree │
│ • Pipeline stages │
│ │ │
│ v │
│ ┌──────────────────┐ │
│ │ Execution Plan │ │
│ └────────┬─────────┘ │
│ │ │
│ v │
│ ┌──────────────────┐ │
│ │ Query Executor │ → Runtime Feedback │
│ └────────┬─────────┘ │
│ │ │
│ v │
│ Results + Statistics │
│ │
└─────────────────────────────────────────────────────────────────┘

Component Design (SPARC)

1. Network-Aware Cost Model

Specification

Purpose: Accurately estimate query execution cost considering multi-region network topology, bandwidth constraints, and latency.

Inputs:

  • Query plan (logical/physical operations)
  • Network topology (latency matrix, bandwidth matrix)
  • Cluster state (node locations, resource availability)
  • Table statistics (cardinality, data distribution)

Outputs:

  • Total cost estimate (ms)
  • Cost breakdown (CPU, network, I/O, memory)
  • Recommended execution strategy
  • Expected parallelism degree

Pseudocode

struct NetworkAwareCostModel {
topology: NetworkTopology,
resource_tracker: ResourceTracker,
statistics: StatisticsManager,
}
impl NetworkAwareCostModel {
fn estimate_query_cost(
&self,
plan: &PhysicalPlan,
source_region: RegionId,
target_region: RegionId,
) -> QueryCost {
let mut total_cost = Cost::zero();
// 1. Estimate local computation cost
let cpu_cost = self.estimate_cpu_cost(plan);
total_cost.add_cpu(cpu_cost);
// 2. Estimate network transfer cost
if source_region != target_region {
let data_size = self.estimate_data_transfer_size(plan);
let network_cost = self.estimate_network_cost(
data_size,
source_region,
target_region,
);
total_cost.add_network(network_cost);
}
// 3. Estimate I/O cost
let io_cost = self.estimate_io_cost(plan);
total_cost.add_io(io_cost);
// 4. Estimate memory cost
let memory_cost = self.estimate_memory_cost(plan);
total_cost.add_memory(memory_cost);
// 5. Apply congestion factor
let congestion = self.resource_tracker.get_congestion_factor(
target_region
);
total_cost.apply_congestion(congestion);
total_cost
}
fn estimate_network_cost(
&self,
data_size_bytes: u64,
source: RegionId,
target: RegionId,
) -> Duration {
// Get network characteristics from topology
let latency_ms = self.topology.get_latency(source, target);
let bandwidth_mbps = self.topology.get_bandwidth(source, target);
// Calculate transfer time
let data_mb = data_size_bytes as f64 / (1024.0 * 1024.0);
let transfer_time_ms = (data_mb / bandwidth_mbps) * 1000.0;
// Total time = latency + transfer + protocol overhead
let protocol_overhead = 2.0 * latency_ms; // RTT for TCP
Duration::from_millis(
(latency_ms + transfer_time_ms + protocol_overhead) as u64
)
}
fn estimate_data_transfer_size(&self, plan: &PhysicalPlan) -> u64 {
match plan {
PhysicalPlan::TableScan { table, predicates } => {
let table_stats = self.statistics.get_table_stats(table);
let selectivity = self.estimate_selectivity(predicates);
let row_count = table_stats.row_count as f64 * selectivity;
(row_count * table_stats.avg_row_size as f64) as u64
}
PhysicalPlan::HashJoin { left, right, .. } => {
// Sum of both input sizes
self.estimate_data_transfer_size(left) +
self.estimate_data_transfer_size(right)
}
PhysicalPlan::Aggregate { input, .. } => {
// Output is typically much smaller due to aggregation
let input_size = self.estimate_data_transfer_size(input);
input_size / 10 // Assume 10x reduction (tune based on stats)
}
_ => 0
}
}
}

Architecture

┌─────────────────────────────────────────────────────────┐
│ Network-Aware Cost Model │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Cost Estimation Engine │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ CPU Cost │ │ I/O Cost │ │ │
│ │ │ Estimator │ │ Estimator │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Network Cost Model │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Topology │ │ Bandwidth │ │ │
│ │ │ Manager │ │ Predictor │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │
│ │ v v │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ Latency Matrix (Region×Region) │ │ │
│ │ │ ┌───────┬────────┬────────┐ │ │ │
│ │ │ │ US-E │ US-W │ EU-C │ │ │ │
│ │ │ ├───────┼────────┼────────┤ │ │ │
│ │ │ │ 0.5ms │ 60ms │ 80ms │ │ │ │
│ │ │ ├───────┼────────┼────────┤ │ │ │
│ │ │ │ 60ms │ 0.5ms │ 150ms │ │ │ │
│ │ │ └───────┴────────┴────────┘ │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Cardinality Estimator │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Histogram │ │ HyperLogLog │ │ │
│ │ │ Analyzer │ │ Sketches │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘

Representation

Cost Vector Structure:

pub struct QueryCost {
// Time-based costs (milliseconds)
pub cpu_time_ms: f64,
pub network_time_ms: f64,
pub io_time_ms: f64,
// Resource costs
pub memory_mb: f64,
pub network_bytes: u64,
pub disk_reads: u64,
// Parallelism potential
pub max_parallelism: usize,
pub recommended_parallelism: usize,
// Confidence metrics
pub confidence: f64, // 0.0-1.0
pub estimation_method: EstimationMethod,
}
pub enum EstimationMethod {
HistogramBased,
SampleBased,
Heuristic,
MachineLearning,
}

Considerations

Accuracy vs. Speed Trade-offs:

  • Simple Queries: Use heuristics, aim for <10ms planning time
  • Complex Queries: Use histogram-based estimation, allow up to 1s
  • Very Large Joins: Sample-based estimation for 5+ table joins

Network Modeling Challenges:

  1. Dynamic Bandwidth: Measure and update bandwidth every 60s
  2. Congestion: Track active transfers, apply queuing delay model
  3. Multi-path: Consider multiple network paths for cross-region transfers
  4. Protocol Overhead: TCP/IP headers, compression, serialization

Cost Model Calibration:

  • Collect actual execution times for queries
  • Build ML model to predict costs from historical data
  • Periodically recalibrate based on cluster changes

2. Cross-Shard Join Strategy Selector

Specification

Purpose: Select optimal join execution strategy based on data distribution, network topology, and table characteristics.

Join Strategies:

  1. Broadcast Hash Join: Broadcast smaller table to all nodes
  2. Shuffle Hash Join: Partition both tables by join key
  3. Co-located Join: Join already co-partitioned tables locally
  4. Sort-Merge Join: Sort and merge (for ordered data)
  5. Hybrid Join: Adaptive strategy based on runtime statistics

Decision Factors:

  • Table sizes (rows, bytes)
  • Data distribution (skew, partitioning)
  • Network characteristics (bandwidth, latency)
  • Available memory on nodes
  • Join selectivity

Pseudocode

struct JoinStrategySelector {
cost_model: NetworkAwareCostModel,
statistics: StatisticsManager,
}
impl JoinStrategySelector {
fn select_join_strategy(
&self,
left_table: &TableInfo,
right_table: &TableInfo,
join_keys: &[String],
network_topology: &NetworkTopology,
) -> JoinStrategy {
// 1. Check for co-location opportunity
if self.is_colocated(left_table, right_table, join_keys) {
return JoinStrategy::ColocatedHash {
partition_key: join_keys[0].clone(),
};
}
// 2. Get table statistics
let left_size = self.statistics.get_table_size(left_table);
let right_size = self.statistics.get_table_size(right_table);
let left_cardinality = self.statistics.get_cardinality(left_table);
let right_cardinality = self.statistics.get_cardinality(right_table);
// 3. Identify smaller table
let (smaller, larger) = if left_size < right_size {
(left_table, right_table)
} else {
(right_table, left_table)
};
let smaller_size_mb = smaller.size_bytes as f64 / (1024.0 * 1024.0);
// 4. Broadcast threshold (tune based on network)
let broadcast_threshold_mb = self.calculate_broadcast_threshold(
network_topology
);
if smaller_size_mb < broadcast_threshold_mb {
// Small table → Broadcast
return JoinStrategy::BroadcastHash {
broadcast_side: if left_size < right_size {
JoinSide::Left
} else {
JoinSide::Right
},
};
}
// 5. Check for data skew
let left_skew = self.detect_skew(left_table, join_keys);
let right_skew = self.detect_skew(right_table, join_keys);
if left_skew > 0.8 || right_skew > 0.8 {
// High skew → Use skew-aware shuffle
return JoinStrategy::SkewAwareShuffle {
skew_threshold: 0.8,
heavy_hitter_broadcast: true,
};
}
// 6. Default: Shuffle Hash Join
JoinStrategy::ShuffleHash {
partition_count: self.optimal_partition_count(
left_cardinality,
right_cardinality,
),
}
}
fn calculate_broadcast_threshold(
&self,
topology: &NetworkTopology,
) -> f64 {
// Broadcast cost = table_size × (num_nodes - 1) × network_cost
// Shuffle cost = (table1_size + table2_size) × network_cost
let num_nodes = topology.node_count() as f64;
let avg_bandwidth = topology.average_bandwidth();
// Threshold where broadcast = shuffle cost
// threshold × (N-1) = 2 × threshold
// threshold = 2 / (N-1)
// Simplified: 10% of average table size per node
let base_threshold_mb = 100.0; // 100 MB base
let bandwidth_factor = avg_bandwidth / 10_000.0; // Normalize to 10 Gbps
base_threshold_mb * bandwidth_factor / num_nodes.sqrt()
}
fn detect_skew(
&self,
table: &TableInfo,
join_keys: &[String],
) -> f64 {
// Get histogram for join key
let histogram = self.statistics.get_histogram(
&table.name,
&join_keys[0],
);
// Calculate skew factor
let total_count: u64 = histogram.buckets.iter()
.map(|b| b.count)
.sum();
let max_bucket_count = histogram.buckets.iter()
.map(|b| b.count)
.max()
.unwrap_or(0);
// Skew = max_bucket_frequency / average_frequency
let avg_count = total_count as f64 / histogram.buckets.len() as f64;
let skew_factor = max_bucket_count as f64 / avg_count;
// Normalize to 0-1 range
(skew_factor - 1.0) / 10.0.min(1.0)
}
fn is_colocated(
&self,
left: &TableInfo,
right: &TableInfo,
join_keys: &[String],
) -> bool {
// Check if tables are partitioned on the same key
if let (Some(left_key), Some(right_key)) = (
left.partition_key.as_ref(),
right.partition_key.as_ref(),
) {
return join_keys.contains(left_key) &&
join_keys.contains(right_key) &&
left.partition_strategy == right.partition_strategy;
}
false
}
}

Architecture

┌─────────────────────────────────────────────────────────┐
│ Join Strategy Selection │
├─────────────────────────────────────────────────────────┤
│ │
│ Input: Left Table, Right Table, Join Condition │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 1: Co-location Check │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Same partition key? YES → Co-located│ │
│ │ │ NO → Continue │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 2: Table Size Analysis │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Smaller < Threshold? YES → Broadcast │ │
│ │ │ NO → Continue │ │
│ │ │ │ │
│ │ │ Threshold = f(bandwidth, #nodes) │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 3: Skew Detection │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ High skew detected? YES → Skew-aware │ │
│ │ │ NO → Continue │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 4: Default Strategy │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ Select: Shuffle Hash Join │ │
│ │ │ Optimize partition count │ │
│ │ └──────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ Output: Join Strategy + Execution Parameters │
│ │
└─────────────────────────────────────────────────────────┘

Representation

pub enum JoinStrategy {
/// Broadcast smaller table to all nodes
BroadcastHash {
broadcast_side: JoinSide,
},
/// Shuffle both tables by join key
ShuffleHash {
partition_count: usize,
},
/// Tables already co-located, local join
ColocatedHash {
partition_key: String,
},
/// Sort-merge join (for sorted data)
SortMerge {
sort_required: bool,
},
/// Skew-aware shuffle with broadcast of heavy hitters
SkewAwareShuffle {
skew_threshold: f64,
heavy_hitter_broadcast: bool,
},
/// Hybrid: Start with one, adapt to another
AdaptiveHybrid {
initial_strategy: Box<JoinStrategy>,
fallback_strategy: Box<JoinStrategy>,
switch_threshold: usize, // rows processed
},
}
pub enum JoinSide {
Left,
Right,
}
pub struct JoinExecutionPlan {
pub strategy: JoinStrategy,
pub estimated_cost: QueryCost,
pub data_movement: DataMovementPlan,
pub node_assignments: HashMap<NodeId, JoinTask>,
}
pub struct DataMovementPlan {
pub total_bytes: u64,
pub movements: Vec<DataMovement>,
}
pub struct DataMovement {
pub source_node: NodeId,
pub target_nodes: Vec<NodeId>,
pub data_size: u64,
pub estimated_time_ms: f64,
}

Considerations

Broadcast vs. Shuffle Trade-offs:

FactorBroadcastShuffle
Network TransferN-1 copies2 transfers
Memory UsageHigh (full table)Low (partitioned)
CoordinationSimpleComplex
Skew ToleranceLowMedium
Best ForSmall dimension tablesLarge fact tables

Skew Handling:

  1. Detect: Use histograms to identify heavy hitters
  2. Isolate: Broadcast top-K frequent keys
  3. Partition: Shuffle remaining keys normally
  4. Merge: Combine results from both paths

3. Distributed Predicate Pushdown Engine

Specification

Purpose: Push filtering predicates as close to data sources as possible to minimize data movement across the network.

Optimization Levels:

  1. Storage-Level: Push to storage engine (skip blocks/pages)
  2. Partition-Level: Prune entire partitions based on metadata
  3. Node-Level: Filter before network transfer
  4. Join-Level: Push join conditions to scan operations

Predicate Types:

  • Simple: column op constant (e.g., age > 25)
  • Range: column BETWEEN x AND y
  • IN-list: column IN (v1, v2, ...)
  • Complex: Conjunctions, disjunctions, NOT
  • Join predicates: Conditions involving multiple tables

Pseudocode

struct PredicatePushdownEngine {
metadata: MetadataService,
statistics: StatisticsManager,
}
impl PredicatePushdownEngine {
fn optimize_plan(
&self,
plan: LogicalPlan,
) -> Result<LogicalPlan> {
// 1. Extract all filter predicates
let filters = self.extract_filters(&plan);
// 2. Classify predicates by pushability
let classified = self.classify_predicates(&filters, &plan);
// 3. Push predicates down the plan tree
let optimized = self.push_predicates(plan, &classified)?;
// 4. Verify correctness (optional, for testing)
debug_assert!(self.verify_equivalence(&plan, &optimized));
Ok(optimized)
}
fn push_predicates(
&self,
plan: LogicalPlan,
predicates: &ClassifiedPredicates,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::TableScan { table, projection } => {
// Push all applicable predicates to scan
let pushable = predicates.get_scan_predicates(&table);
Ok(LogicalPlan::Filter {
input: Box::new(LogicalPlan::TableScan {
table,
projection,
}),
predicate: self.combine_predicates(pushable),
})
}
LogicalPlan::Join {
left,
right,
join_type,
condition,
} => {
// Split join condition into:
// 1. Conditions pushable to left
// 2. Conditions pushable to right
// 3. Conditions that must remain at join
let (left_preds, right_preds, join_preds) =
self.split_join_predicates(&condition, &left, &right);
// Recursively push to children
let left_optimized = self.push_predicates(
self.add_filter(*left, left_preds),
predicates,
)?;
let right_optimized = self.push_predicates(
self.add_filter(*right, right_preds),
predicates,
)?;
Ok(LogicalPlan::Join {
left: Box::new(left_optimized),
right: Box::new(right_optimized),
join_type,
condition: join_preds,
})
}
LogicalPlan::Aggregate {
input,
group_by,
aggregates,
} => {
// Can only push predicates on group-by columns
let pushable = predicates.get_group_by_predicates(&group_by);
let input_optimized = self.push_predicates(
self.add_filter(*input, pushable),
predicates,
)?;
Ok(LogicalPlan::Aggregate {
input: Box::new(input_optimized),
group_by,
aggregates,
})
}
// Other operators...
_ => Ok(plan),
}
}
fn partition_pruning(
&self,
table: &str,
predicate: &Predicate,
) -> Result<Vec<PartitionId>> {
// Get table partition metadata
let partitions = self.metadata.get_table_partitions(table)?;
// Get partition key
let partition_key = self.metadata.get_partition_key(table)?;
// Extract predicate on partition key
let partition_predicate = self.extract_partition_predicate(
predicate,
&partition_key,
);
if let Some(pred) = partition_predicate {
// Prune partitions based on predicate
let retained = partitions.into_iter()
.filter(|p| self.partition_matches_predicate(p, &pred))
.collect();
Ok(retained)
} else {
// No partition pruning possible
Ok(partitions)
}
}
fn partition_matches_predicate(
&self,
partition: &PartitionMetadata,
predicate: &Predicate,
) -> bool {
match predicate {
Predicate::Comparison { column, op, value } => {
// Check against partition bounds
let min = &partition.min_value;
let max = &partition.max_value;
match op {
CompOp::Eq => {
value >= min && value <= max
}
CompOp::Gt => {
max > value
}
CompOp::Lt => {
min < value
}
CompOp::GtEq => {
max >= value
}
CompOp::LtEq => {
min <= value
}
_ => true, // Conservative: keep partition
}
}
Predicate::Range { column, low, high } => {
// Partition overlaps range?
let min = &partition.min_value;
let max = &partition.max_value;
!(max < low || min > high)
}
_ => true, // Conservative: keep partition
}
}
}

Architecture

┌─────────────────────────────────────────────────────────┐
│ Predicate Pushdown Optimization │
├─────────────────────────────────────────────────────────┤
│ │
│ Original Query: │
│ SELECT * FROM orders │
│ JOIN customers ON orders.customer_id = customers.id │
│ WHERE orders.date > '2024-01-01' │
│ AND customers.region = 'US' │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 1: Extract Predicates │ │
│ │ • orders.date > '2024-01-01' │ │
│ │ • customers.region = 'US' │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 2: Classify by Pushability │ │
│ │ • orders.date → Pushable to orders scan │ │
│ │ • customers.region → Pushable to customers │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 3: Transform Plan │ │
│ │ │ │
│ │ Join │ │
│ │ / \ │ │
│ │ Filter Filter │ │
│ │ (date) (region) │ │
│ │ | | │ │
│ │ Scan Scan │ │
│ │ orders customers │ │
│ └────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ Step 4: Partition Pruning │ │
│ │ orders partitioned by date: │ │
│ │ • 2023-Q4: Skip (before 2024-01-01) │ │
│ │ • 2024-Q1: Keep │ │
│ │ • 2024-Q2: Keep │ │
│ │ │ │
│ │ Result: 67% partitions pruned │ │
│ └────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘

4. Query Result Streaming & Aggregation

Specification

Purpose: Support streaming query results for large OLAP workloads, with incremental aggregation to provide early results.

Features:

  1. Pipelined Execution: Start returning results before full computation
  2. Online Aggregation: Provide approximate results with confidence intervals
  3. Backpressure: Handle slow consumers gracefully
  4. Fault Tolerance: Resume from checkpoints on failures

Pseudocode

struct StreamingQueryExecutor {
executor: QueryExecutor,
buffer_size: usize,
}
impl StreamingQueryExecutor {
async fn execute_streaming(
&self,
plan: PhysicalPlan,
) -> Result<impl Stream<Item = RecordBatch>> {
// 1. Decompose plan into pipeline stages
let stages = self.create_pipeline_stages(plan)?;
// 2. Create streaming channels between stages
let (tx, rx) = mpsc::channel(self.buffer_size);
// 3. Execute stages in parallel
for stage in stages {
let tx_clone = tx.clone();
tokio::spawn(async move {
stage.execute_streaming(tx_clone).await
});
}
// 4. Return stream
Ok(ReceiverStream::new(rx))
}
async fn execute_online_aggregation(
&self,
plan: AggregationPlan,
sample_rate: f64,
) -> Result<impl Stream<Item = AggregateResult>> {
let (tx, rx) = mpsc::channel(1000);
tokio::spawn(async move {
let mut aggregator = OnlineAggregator::new(plan, sample_rate);
let mut sample_count = 0;
let report_interval = 10000; // Report every 10K rows
// Process input stream
while let Some(batch) = input_stream.next().await {
aggregator.update(batch);
sample_count += batch.row_count();
if sample_count % report_interval == 0 {
// Send intermediate result
let result = aggregator.get_current_estimate();
tx.send(result).await?;
}
}
// Send final result
tx.send(aggregator.finalize()).await?;
});
Ok(ReceiverStream::new(rx))
}
}
struct OnlineAggregator {
// Partial aggregate state
partial_sum: f64,
partial_count: u64,
sample_variance: f64,
// For confidence intervals
sample_rate: f64,
total_rows_processed: u64,
}
impl OnlineAggregator {
fn update(&mut self, batch: RecordBatch) {
// Welford's online algorithm for variance
for value in batch.values() {
self.total_rows_processed += 1;
let delta = value - self.partial_sum / self.partial_count as f64;
self.partial_count += 1;
self.partial_sum += value;
let delta2 = value - self.partial_sum / self.partial_count as f64;
self.sample_variance += delta * delta2;
}
}
fn get_current_estimate(&self) -> AggregateResult {
let mean = self.partial_sum / self.partial_count as f64;
let variance = self.sample_variance / (self.partial_count - 1) as f64;
let std_dev = variance.sqrt();
// 95% confidence interval
let margin_of_error = 1.96 * std_dev / (self.partial_count as f64).sqrt();
AggregateResult {
value: mean,
confidence_interval: (
mean - margin_of_error,
mean + margin_of_error,
),
confidence_level: 0.95,
rows_processed: self.total_rows_processed,
is_final: false,
}
}
}

Data Flow & Algorithms

Algorithm 1: Distributed Join Execution (Broadcast)

Algorithm: BROADCAST_HASH_JOIN
Input:
- Left table L (large)
- Right table R (small, size < threshold)
- Join key K
- Cluster nodes N = {n1, n2, ..., nm}
Output: Join result distributed across nodes
1. COORDINATOR selects broadcast strategy
threshold = calculate_broadcast_threshold(network_bandwidth, m)
IF size(R) > threshold THEN
RETURN shuffle_hash_join(L, R, K) // Fallback
END IF
2. BUILD phase (executed on coordinator or one node)
hash_table_R = empty_hash_table()
FOR each row r IN R DO
key = hash(r[K])
hash_table_R[key].append(r)
END FOR
3. BROADCAST phase
FOR each node n IN N DO
send_async(n, hash_table_R) // Parallel broadcast
END FOR
wait_for_all_acks()
4. PROBE phase (executed on each node in parallel)
PARALLEL FOR each node n IN N DO
local_result = []
partition_L_n = get_local_partition(L, n)
FOR each row l IN partition_L_n DO
key = hash(l[K])
IF key IN hash_table_R THEN
FOR each row r IN hash_table_R[key] DO
IF l[K] == r[K] THEN // Handle hash collisions
local_result.append(join(l, r))
END IF
END FOR
END IF
END FOR
store_local_result(n, local_result)
END PARALLEL FOR
5. RETURN distributed result
Time Complexity: O(|L|/m + |R|) per node
Space Complexity: O(|R|) per node
Network Cost: O(|R| × m)

Algorithm 2: Partition Pruning with Histograms

Algorithm: HISTOGRAM_PARTITION_PRUNING
Input:
- Table T with partitions P = {p1, p2, ..., pn}
- Predicate φ on column C
- Histogram H for column C (per partition)
Output: Pruned set of partitions P' ⊆ P
1. Initialize result set
P' = empty_set()
2. Extract predicate bounds
(op, value) = parse_predicate(φ) // e.g., (>, 25)
3. FOR each partition pi IN P DO
histogram_i = H[pi][C]
// Check if partition can contain matching rows
IF op == '=' THEN
IF value IN histogram_i.bounds() THEN
P'.add(pi)
END IF
ELSE IF op == '>' THEN
IF histogram_i.max > value THEN
P'.add(pi)
END IF
ELSE IF op == '<' THEN
IF histogram_i.min < value THEN
P'.add(pi)
END IF
ELSE IF op == 'BETWEEN' THEN
(low, high) = value
IF histogram_i.max >= low AND histogram_i.min <= high THEN
P'.add(pi)
END IF
END IF
END FOR
4. IF |P'| == 0 THEN
// Safety: include at least one partition
P'.add(P[0])
END IF
5. RETURN P'
Pruning Ratio: 1 - |P'|/|P|
Time Complexity: O(n) // Linear scan of partitions

Algorithm 3: Adaptive Query Re-optimization

Algorithm: ADAPTIVE_QUERY_REOPTIMIZATION
Input:
- Initial query plan Q
- Execution statistics S (collected during execution)
- Re-optimization threshold τ
Output: Potentially re-optimized plan Q'
1. Execute Q with statistics collection
rows_processed = 0
estimated_rows = Q.estimated_cardinality
FOR each operator op IN Q DO
actual_rows = execute(op)
rows_processed += actual_rows
// Check for significant deviation
deviation = |actual_rows - op.estimated_rows| / op.estimated_rows
IF deviation > τ AND rows_processed < estimated_rows * 0.3 THEN
// Early in execution, large deviation
// Collect updated statistics
S' = collect_runtime_statistics(op)
// Re-optimize remaining plan
remaining_plan = get_remaining_plan(Q, op)
Q_new = optimize(remaining_plan, S')
// Cost-benefit analysis
reopt_cost = estimate_reoptimization_cost(Q_new)
benefit = estimate_benefit(Q, Q_new)
IF benefit > reopt_cost * 2 THEN
// Significant benefit, switch plan
Q = merge_plans(executed_prefix, Q_new)
log("Re-optimized at operator", op.id)
END IF
END IF
END FOR
2. RETURN Q
Typical Re-optimization Triggers:
- Cardinality deviation > 5x
- Execution time deviation > 3x
- Skew detected in join
- Node failure (fault tolerance)

API Specifications

Public API

// Main entry point for distributed query optimization
pub struct DistributedQueryOptimizer {
config: OptimizerConfig,
metadata: Arc<MetadataService>,
statistics: Arc<StatisticsManager>,
network_topology: Arc<NetworkTopologyManager>,
}
impl DistributedQueryOptimizer {
/// Create a new distributed query optimizer
pub fn new(
config: OptimizerConfig,
metadata: Arc<MetadataService>,
) -> Result<Self>;
/// Optimize a SQL query for distributed execution
pub async fn optimize_query(
&self,
sql: &str,
) -> Result<DistributedQueryPlan>;
/// Optimize a logical plan
pub async fn optimize_logical_plan(
&self,
plan: LogicalPlan,
) -> Result<PhysicalPlan>;
/// Explain query execution plan
pub async fn explain(
&self,
sql: &str,
verbosity: ExplainVerbosity,
) -> Result<String>;
/// Update network topology information
pub async fn update_network_topology(
&self,
topology: NetworkTopology,
) -> Result<()>;
/// Register table statistics
pub async fn register_statistics(
&self,
table: &str,
stats: TableStatistics,
) -> Result<()>;
/// Get optimizer metrics
pub fn get_metrics(&self) -> OptimizerMetrics;
}
/// Configuration for distributed query optimizer
#[derive(Debug, Clone)]
pub struct OptimizerConfig {
/// Enable cost-based optimization
pub enable_cost_based: bool,
/// Enable partition pruning
pub enable_partition_pruning: bool,
/// Enable predicate pushdown
pub enable_predicate_pushdown: bool,
/// Enable join reordering
pub enable_join_reordering: bool,
/// Maximum number of tables in join reordering
pub max_join_reorder_tables: usize,
/// Query planning timeout (milliseconds)
pub planning_timeout_ms: u64,
/// Enable adaptive query execution
pub enable_adaptive_execution: bool,
/// Broadcast join threshold (MB)
pub broadcast_threshold_mb: f64,
/// Enable online aggregation
pub enable_online_aggregation: bool,
/// Statistics freshness threshold (seconds)
pub stats_freshness_threshold_secs: u64,
}
impl Default for OptimizerConfig {
fn default() -> Self {
Self {
enable_cost_based: true,
enable_partition_pruning: true,
enable_predicate_pushdown: true,
enable_join_reordering: true,
max_join_reorder_tables: 8,
planning_timeout_ms: 100,
enable_adaptive_execution: true,
broadcast_threshold_mb: 100.0,
enable_online_aggregation: true,
stats_freshness_threshold_secs: 300,
}
}
}
/// Distributed query execution plan
#[derive(Debug, Clone)]
pub struct DistributedQueryPlan {
/// Query ID
pub id: QueryId,
/// Physical execution plan
pub physical_plan: PhysicalPlan,
/// Estimated cost
pub estimated_cost: QueryCost,
/// Node assignments for each operator
pub node_assignments: HashMap<OperatorId, Vec<NodeId>>,
/// Data movement plan
pub data_movements: Vec<DataMovement>,
/// Execution strategy
pub execution_strategy: ExecutionStrategy,
/// Estimated execution time (milliseconds)
pub estimated_time_ms: f64,
/// Estimated result size
pub estimated_result_rows: u64,
}
/// Physical execution plan operators
#[derive(Debug, Clone)]
pub enum PhysicalPlan {
/// Table scan with predicates
TableScan {
table: String,
partitions: Vec<PartitionId>,
predicates: Vec<Predicate>,
projection: Vec<String>,
},
/// Hash join
HashJoin {
left: Box<PhysicalPlan>,
right: Box<PhysicalPlan>,
join_keys: Vec<String>,
join_type: JoinType,
strategy: JoinStrategy,
},
/// Aggregation
Aggregate {
input: Box<PhysicalPlan>,
group_by: Vec<Expression>,
aggregates: Vec<AggregateExpression>,
strategy: AggregateStrategy,
},
/// Sort
Sort {
input: Box<PhysicalPlan>,
sort_keys: Vec<SortKey>,
},
/// Limit
Limit {
input: Box<PhysicalPlan>,
limit: usize,
offset: usize,
},
/// Data exchange (shuffle/broadcast)
Exchange {
input: Box<PhysicalPlan>,
strategy: ExchangeStrategy,
target_nodes: Vec<NodeId>,
},
/// Filter
Filter {
input: Box<PhysicalPlan>,
predicate: Expression,
},
/// Projection
Project {
input: Box<PhysicalPlan>,
expressions: Vec<Expression>,
},
}
/// Execution strategy
#[derive(Debug, Clone, PartialEq)]
pub enum ExecutionStrategy {
/// Map-Reduce style execution
MapReduce {
map_nodes: Vec<NodeId>,
reduce_node: NodeId,
},
/// Scatter-gather execution
ScatterGather {
scatter_nodes: Vec<NodeId>,
gather_node: NodeId,
},
/// Pipelined execution with multiple stages
Pipeline {
stages: Vec<PipelineStage>,
},
/// Streaming execution
Streaming {
source_nodes: Vec<NodeId>,
buffer_size: usize,
},
}
/// Optimizer metrics for monitoring
#[derive(Debug, Clone)]
pub struct OptimizerMetrics {
/// Total queries optimized
pub total_queries: u64,
/// Average planning time (milliseconds)
pub avg_planning_time_ms: f64,
/// P99 planning time (milliseconds)
pub p99_planning_time_ms: f64,
/// Partition pruning hit rate
pub partition_pruning_rate: f64,
/// Join strategy distribution
pub join_strategy_counts: HashMap<String, u64>,
/// Average cost improvement ratio
pub avg_cost_improvement: f64,
/// Re-optimization count
pub reoptimization_count: u64,
}

Internal APIs

// Internal cost model API
trait CostModel {
fn estimate_scan_cost(
&self,
table: &TableInfo,
predicates: &[Predicate],
) -> QueryCost;
fn estimate_join_cost(
&self,
left: &PhysicalPlan,
right: &PhysicalPlan,
strategy: JoinStrategy,
) -> QueryCost;
fn estimate_network_cost(
&self,
source: NodeId,
target: NodeId,
bytes: u64,
) -> Duration;
}
// Statistics collection API
trait StatisticsCollector {
async fn collect_table_statistics(
&self,
table: &str,
) -> Result<TableStatistics>;
async fn collect_column_histogram(
&self,
table: &str,
column: &str,
) -> Result<Histogram>;
async fn update_runtime_statistics(
&self,
query_id: QueryId,
stats: RuntimeStatistics,
) -> Result<()>;
}
// Network topology management API
trait NetworkTopologyManager {
async fn measure_latency(
&self,
source: NodeId,
target: NodeId,
) -> Result<Duration>;
async fn estimate_bandwidth(
&self,
source: NodeId,
target: NodeId,
) -> Result<f64>;
async fn get_network_topology(&self) -> Result<NetworkTopology>;
}

Performance Targets

Planning Performance

Query TypeComplexityTarget Planning TimeMax Acceptable
Simple scan1 table, <3 predicates<10ms20ms
Single join2 tables<25ms50ms
Complex join3-5 tables<100ms200ms
Very complex6-10 tables<500ms1000ms
OLAP aggregationLarge scan + grouping<50ms100ms

Execution Performance

MetricTargetStretch Goal
Network efficiency60-70% reduction in data transfer80%
Partition pruning50-70% partitions eliminated85%
Join performance3-5x faster vs. naïve10x
Query throughput10,000 queries/sec (cluster)50,000
Latency (P99)<500ms for OLTP<100ms

Scalability Targets

DimensionTargetNotes
Max shards1,000-5,000Per cluster
Max nodes100-500Across regions
Max regions10-20Global deployment
Max table size100TB per tablePartitioned
Max query complexity20-table joinsPractical limit

Resource Utilization

ResourceTarget UtilizationMax Acceptable
CPU50-70% average85% peak
Memory60-80% for caching90% peak
Network40-60% average80% peak
I/O50-70% throughput90% peak

Implementation Roadmap

Phase 1: Enhanced Cost Model (2 weeks)

Objectives:

  • Implement multi-region network cost modeling
  • Add histogram-based cardinality estimation
  • Integrate with existing cost model

Deliverables:

  1. NetworkTopologyManager implementation
  2. MultiRegionCostModel with latency/bandwidth matrices
  3. HistogramCardinalityEstimator
  4. Unit tests achieving 90%+ coverage
  5. Benchmark suite comparing old vs. new cost model

Files to Modify:

  • /heliosdb-distributed-optimizer/src/cost_model.rs
  • /heliosdb-distributed-optimizer/src/enhanced_cost_model.rs
  • /heliosdb-distributed-optimizer/src/statistics.rs

New Files:

  • /heliosdb-distributed-optimizer/src/network_topology.rs
  • /heliosdb-distributed-optimizer/src/cardinality_estimator.rs

Phase 2: Join Strategy Selection (3 weeks)

Objectives:

  • Implement comprehensive join strategy selector
  • Add skew detection and handling
  • Support co-located join optimization

Deliverables:

  1. JoinStrategySelector with all 5 strategies
  2. SkewDetector using histograms
  3. CoLocationAnalyzer for partition-aware joins
  4. Integration with physical planner
  5. TPC-H benchmark validation (Q3, Q5, Q8, Q10)

Files to Modify:

  • /heliosdb-distributed-optimizer/src/planner.rs
  • /heliosdb-distributed-optimizer/src/advanced_planner.rs

New Files:

  • /heliosdb-distributed-optimizer/src/join/strategy_selector.rs
  • /heliosdb-distributed-optimizer/src/join/skew_handler.rs
  • /heliosdb-distributed-optimizer/src/join/colocation.rs

Phase 3: Predicate Pushdown Enhancement (2 weeks)

Objectives:

  • Enhance partition pruning with histogram-based bounds
  • Implement multi-level pushdown (storage, node, partition)
  • Add join predicate extraction and pushdown

Deliverables:

  1. Enhanced PartitionPruner with histogram support
  2. JoinPredicateExtractor for condition splitting
  3. Integration with storage layer pushdown
  4. Comprehensive test suite
  5. Performance benchmarks showing 60-80% partition pruning

Files to Modify:

  • /heliosdb-distributed-optimizer/src/pushdown/predicate_pushdown.rs
  • /heliosdb-distributed-optimizer/src/pruning/partition_pruner.rs

New Files:

  • /heliosdb-distributed-optimizer/src/pushdown/join_predicate.rs
  • /heliosdb-distributed-optimizer/src/pushdown/storage_pushdown.rs

Phase 4: Streaming & Aggregation (3 weeks)

Objectives:

  • Implement streaming query execution
  • Add online aggregation with confidence intervals
  • Support backpressure and fault tolerance

Deliverables:

  1. StreamingExecutor with pipelined execution
  2. OnlineAggregator with incremental results
  3. Backpressure mechanism using async channels
  4. Checkpoint-based fault tolerance
  5. Demo showing real-time aggregation over 1B rows

Files to Modify:

  • /heliosdb-distributed-optimizer/src/executor.rs
  • /heliosdb-compute/src/streaming.rs
  • /heliosdb-compute/src/online_aggregation.rs

New Files:

  • /heliosdb-distributed-optimizer/src/streaming/executor.rs
  • /heliosdb-distributed-optimizer/src/streaming/aggregator.rs
  • /heliosdb-distributed-optimizer/src/streaming/fault_tolerance.rs

Phase 5: Adaptive Execution (2 weeks)

Objectives:

  • Implement runtime statistics collection
  • Add adaptive re-optimization
  • Support dynamic strategy switching

Deliverables:

  1. RuntimeStatisticsCollector
  2. AdaptiveReoptimizer with cost-benefit analysis
  3. Dynamic join strategy switching
  4. Integration with feedback loop
  5. Benchmarks showing 20-30% improvement on skewed data

Files to Modify:

  • /heliosdb-distributed-optimizer/src/adaptive.rs
  • /heliosdb-distributed-optimizer/src/adaptive_execution.rs

New Files:

  • /heliosdb-distributed-optimizer/src/adaptive/reoptimizer.rs
  • /heliosdb-distributed-optimizer/src/adaptive/statistics_collector.rs

Phase 6: Integration & Testing (2 weeks)

Objectives:

  • Integrate all components
  • Comprehensive end-to-end testing
  • Performance benchmarking at scale

Deliverables:

  1. Full integration with heliosdb-compute
  2. Integration with heliosdb-sharding
  3. TPC-H benchmark suite (all 22 queries)
  4. Scale testing (1000+ shards, 10+ regions)
  5. Production-readiness checklist
  6. User documentation and guides

Testing Plan:

  • Unit tests: 90%+ coverage
  • Integration tests: All major query patterns
  • Performance tests: TPC-H, TPC-DS benchmarks
  • Scale tests: 1000 shards, 100 nodes
  • Fault tolerance tests: Node failures, network partitions
  • Stress tests: 10,000 concurrent queries

Integration Points

1. heliosdb-metadata Integration

Purpose: Access cluster topology, table schemas, partition locations

Required APIs:

trait MetadataService {
async fn get_table_schema(&self, table: &str) -> Result<Schema>;
async fn get_table_partitions(&self, table: &str) -> Result<Vec<PartitionInfo>>;
async fn get_node_info(&self, node_id: NodeId) -> Result<NodeInfo>;
async fn get_cluster_topology(&self) -> Result<ClusterTopology>;
}

Data Flow:

Optimizer → Metadata Service → Raft Cluster
Cache locally (TTL: 5 minutes)
Use for query planning

2. heliosdb-sharding Integration

Purpose: Understand data distribution and routing

Required APIs:

trait ShardingService {
async fn get_shard_for_key(&self, table: &str, key: &Value) -> Result<ShardId>;
async fn get_colocated_tables(&self) -> Result<Vec<(String, String)>>;
async fn get_partition_strategy(&self, table: &str) -> Result<PartitionStrategy>;
}

Usage:

  • Detect co-located joins
  • Understand partition strategies
  • Route queries to correct shards

3. heliosdb-compute Integration

Purpose: Execute optimized physical plans

Required APIs:

trait QueryExecutor {
async fn execute(&self, plan: PhysicalPlan) -> Result<RecordBatchStream>;
async fn execute_on_node(&self, node: NodeId, plan: PhysicalPlan) -> Result<RecordBatchStream>;
async fn collect_statistics(&self, query_id: QueryId) -> Result<RuntimeStatistics>;
}

Data Flow:

Optimizer → Physical Plan → Executor
Distributed Execution
Runtime Statistics → Feedback to Optimizer

4. heliosdb-cache Integration

Purpose: Cache query plans, metadata, intermediate results

Required APIs:

trait CacheService {
async fn get_cached_plan(&self, sql: &str) -> Option<PhysicalPlan>;
async fn cache_plan(&self, sql: &str, plan: PhysicalPlan) -> Result<()>;
async fn get_cached_statistics(&self, table: &str) -> Option<TableStatistics>;
}

Caching Strategy:

  • Plan cache: 1000 most recent queries, TTL 5 minutes
  • Statistics cache: All tables, TTL based on update frequency
  • Intermediate results: Top-K queries, evict LRU

Patent Analysis

Novel Contributions

1. Network-Aware Multi-Region Query Optimization

Innovation: Dynamic cost model that adapts to real-time network conditions across geographic regions.

Prior Art Comparison:

  • Google Spanner: Static cost model, doesn’t adapt to network changes
  • CockroachDB: Limited multi-region optimization
  • Amazon Aurora: Primarily single-region

Patentability: HIGH

  • Novel application of real-time network telemetry to query optimization
  • Dynamic bandwidth and latency modeling
  • Congestion-aware query routing

Claims:

  1. Method for optimizing distributed queries using real-time network topology
  2. Dynamic cost model incorporating latency matrices and bandwidth prediction
  3. Adaptive query execution based on network congestion metrics

Commercial Value: $8M-$12M

  • Critical for multi-region deployments
  • Reduces cross-region data transfer costs by 60-80%
  • Enables truly global databases

2. Histogram-Based Partition Pruning with Bloom Filters

Innovation: Combined use of histograms and Bloom filters for aggressive partition elimination.

Prior Art Comparison:

  • PostgreSQL: Basic partition pruning
  • Oracle: Advanced partitioning, but not distributed
  • Snowflake: Micro-partitions with metadata, different approach

Patentability: MEDIUM

  • Novel combination of existing techniques
  • Specific application to distributed systems
  • Integration with distributed cost model

Claims:

  1. Partition pruning method using multi-level statistics (histograms + Bloom filters)
  2. Cost-based decision for which pruning technique to use
  3. Dynamic statistics collection strategy based on query patterns

Commercial Value: $3M-$5M

  • Reduces query latency by 50-70%
  • Lowers I/O and network costs significantly
  • Applicable to all table scans

3. Adaptive Skew-Aware Join Execution

Innovation: Runtime detection of data skew with dynamic strategy switching between broadcast and shuffle.

Prior Art Comparison:

  • Apache Spark: Skew join optimization (known technique)
  • Presto: Adaptive execution (different approach)
  • Databricks: Runtime adaptive query execution

Patentability: MEDIUM-LOW

  • Builds on existing adaptive execution concepts
  • Novel integration with distributed cost model
  • Specific threshold calculation method

Claims:

  1. Method for detecting data skew during join execution
  2. Dynamic switching between join strategies based on runtime statistics
  3. Hybrid join execution combining broadcast and shuffle for skewed data

Commercial Value: $2M-$4M

  • Handles real-world data skew gracefully
  • Prevents query failures on skewed data
  • Improves join performance by 3-10x on skewed datasets

4. Streaming Online Aggregation with Confidence Intervals

Innovation: Provide incremental aggregation results with statistical confidence bounds for OLAP queries.

Prior Art Comparison:

  • BlinkDB: Online aggregation (research prototype)
  • Apache Druid: Approximate queries (different technique)
  • ClickHouse: Fast aggregation, but not online

Patentability: MEDIUM

  • Specific application to distributed systems
  • Novel integration with streaming execution
  • Practical confidence interval calculation

Claims:

  1. Streaming aggregation method with confidence interval reporting
  2. Adaptive sampling strategy based on query requirements
  3. Integration with distributed query execution framework

Commercial Value: $4M-$6M

  • Enables interactive OLAP on massive datasets
  • Provides early results (10-100x faster perceived latency)
  • Critical for BI dashboards and ad-hoc analytics

Patent Portfolio Summary

InnovationPatentabilityCommercial ValuePriority
Network-Aware Multi-Region OptimizationHIGH$8M-$12MP0
Histogram-Based Partition PruningMEDIUM$3M-$5MP1
Adaptive Skew-Aware JoinsMEDIUM-LOW$2M-$4MP2
Streaming Online AggregationMEDIUM$4M-$6MP1
TOTALMixed$17M-$27M-

Recommendation: File provisional patents for all four innovations. Prioritize Network-Aware Optimization and Streaming Aggregation for full patent applications.


Risk Assessment & Mitigation

Technical Risks

RiskProbabilityImpactMitigation
Inaccurate cost modelMediumHighCalibrate with real workloads, ML-based refinement
Statistics stalenessHighMediumAutomatic stats refresh, timestamp tracking
Network instabilityMediumHighFault-tolerant execution, retry logic
Skew misdetectionLowMediumConservative thresholds, fallback strategies
Memory pressureMediumHighSpilling to disk, memory limits, backpressure

Performance Risks

RiskProbabilityImpactMitigation
Planning timeoutLowMediumIncremental optimization, heuristics
Network bottleneckMediumHighCompression, batching, prioritization
StragglersHighMediumSpeculative execution, dynamic re-optimization
Lock contentionLowLowLock-free data structures, sharding

Operational Risks

RiskProbabilityImpactMitigation
Statistics collection overheadMediumLowSampling, background collection
Config complexityHighMediumSensible defaults, auto-tuning
Monitoring gapsMediumHighComprehensive metrics, alerting
Version compatibilityLowHighCareful versioning, backward compatibility

Success Metrics

Functional Metrics

  • All 22 TPC-H queries execute correctly
  • Join strategies selected optimally (>90% correct choice)
  • Partition pruning rate >60% on filtered queries
  • Cost estimation accuracy within 2x of actual

Performance Metrics

  • Planning time: <100ms for 95% of queries
  • Execution time: 3-5x improvement over naïve plans
  • Network efficiency: 60-80% reduction in data transfer
  • Throughput: 10,000 queries/sec on 100-node cluster

Scalability Metrics

  • Supports 1,000 shards without degradation
  • Handles 10-table joins within 500ms planning time
  • Scales linearly to 500 nodes

Reliability Metrics

  • Handles single node failures gracefully
  • Recovers from network partitions within 10s
  • Zero data loss on failures

Conclusion

This architecture design provides a comprehensive blueprint for implementing Google Spanner-scale distributed query optimization in HeliosDB. The design emphasizes:

  1. Network-Aware Optimization: Accurate modeling of multi-region costs
  2. Intelligent Join Selection: Five join strategies with adaptive execution
  3. Aggressive Pushdown: Multi-level predicate and projection pushdown
  4. Streaming Support: Online aggregation for OLAP workloads
  5. Adaptive Execution: Runtime re-optimization based on statistics

Key Innovations:

  • Network topology-aware cost model
  • Histogram-based partition pruning
  • Skew-aware adaptive joins
  • Streaming online aggregation

Expected Impact:

  • $14M ARR contribution
  • 3-5x query performance improvement
  • 60-80% reduction in network transfer
  • Supports 1,000+ shard deployments

Implementation Timeline: 14 weeks (3.5 months) Investment: $1.2M-$1.5M Patent Value: $17M-$27M

This design is ready for implementation and positions HeliosDB as a leader in distributed query optimization.


Document Prepared By: System Architecture Designer Review Status: Ready for Technical Review Next Steps:

  1. Technical review by senior engineers
  2. Resource allocation approval
  3. Begin Phase 1 implementation
  4. File provisional patents