Online Aggregation Architecture
Online Aggregation Architecture
System Architecture
┌─────────────────────────────────────────────────────────────────────────┐│ HeliosDB Query Layer ││ ││ ┌────────────────┐ ┌──────────────────────────────────┐ ││ │ Query Parser │────────▶│ Query Planner │ ││ └────────────────┘ └──────────────┬───────────────────┘ ││ │ ││ ▼ ││ ┌───────────────────────────┐ ││ │ Physical Plan │ ││ │ - TableScan │ ││ │ - HashAggregate │ ││ │ - GroupBy │ ││ └───────────┬───────────────┘ │└─────────────────────────────────────────┼───────────────────────────────┘ │ ┌─────────────────────┴─────────────────────┐ │ │ ▼ ▼ ┌───────────────────────┐ ┌───────────────────────┐ │ QueryExecutor │ │ OnlineQueryExecutor │ │ (Traditional) │ │ (Progressive) │ │ │ │ │ │ - Full scan │ │ - Applicability │ │ - Exact results │ │ check │ │ - Blocking │ │ - Auto-sampling │ └───────────────────────┘ │ - Progressive │ └───────┬───────────────┘ │ ▼ ┌────────────────────────────────────┐ │ OnlineAggregationEngine │ │ │ │ ┌──────────────────────────────┐ │ │ │ Configuration │ │ │ │ - confidence_level │ │ │ │ - target_error │ │ │ │ - sampling_strategy │ │ │ └──────────────────────────────┘ │ │ │ │ ┌──────────────────────────────┐ │ │ │ Aggregation State │ │ │ │ - total_processed │ │ │ │ - sampled_rows │ │ │ │ - group_estimates │ │ │ │ - adaptive_rate │ │ │ └──────────────────────────────┘ │ └────────┬───────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────┐ │ Data Streaming Pipeline │ └────────────────────────────────────────────────────┘ │ ┌───────────────────────────────┼───────────────────────────┐ │ │ │ ▼ ▼ ▼ ┌────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ │ Random Sampling │ │ Stratified Sampling │ │ Adaptive Sampling │ │ │ │ │ │ │ │ - Hash-based │ │ - Per-stratum │ │ - Variance tracking │ │ - Uniform prob │ │ - Balanced samples │ │ - Rate adjustment │ │ - Simple │ │ - Group-aware │ │ - Self-tuning │ └────────┬───────────┘ └──────────┬──────────┘ └──────────┬──────────┘ │ │ │ └───────────────────────────────┼─────────────────────────────┘ │ ▼ ┌─────────────────────────┐ │ Sample Selection │ │ - Row filtering │ │ - Sample storage │ └────────────┬────────────┘ │ ▼ ┌────────────────────────────────────┐ │ Group Estimate Update │ │ │ │ For each aggregate: │ │ ┌──────────────────────────────┐ │ │ │ COUNT: Increment counter │ │ │ │ SUM: Add value + variance │ │ │ │ AVG: Update sum + count │ │ │ │ MIN: Track minimum │ │ │ │ MAX: Track maximum │ │ │ └──────────────────────────────┘ │ └────────────┬───────────────────────┘ │ ▼ ┌────────────────────────────────────────┐ │ Confidence Interval Calculation │ │ │ │ 1. Calculate point estimate │ │ 2. Compute standard error │ │ 3. Apply z-score (1.96 for 95%) │ │ 4. Generate bounds │ │ │ │ Lower = estimate - z * SE │ │ Upper = estimate + z * SE │ └────────────┬───────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────┐ │ Progressive Result Creation │ │ │ │ ┌──────────────────────────────────────────┐ │ │ │ ProgressiveResult │ │ │ │ - timestamp │ │ │ │ - progress (%) │ │ │ │ - sample_size │ │ │ │ - estimated_total │ │ │ │ - results: Vec<AggregateEstimate> │ │ │ │ - visualization: VisualizationData │ │ │ │ - target_achieved: bool │ │ │ └──────────────────────────────────────────┘ │ └────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────┐ │ Result Stream (mpsc channel) │ │ │ │ Update Interval: 100ms (configurable) │ └────────────┬───────────────────────────┘ │ ┌───────────────┼───────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌──────────┐ ┌──────────────┐ │ Client │ │ Client │ │ Client │ │ │ │ │ │ │ │ Display │ │ Decision │ │ Termination │ └─────────┘ └──────────┘ └──────────────┘Component Interaction Flow
1. Query Submission ──────────────────────────────────────────────────────────────── User Query → Parser → Planner → OnlineQueryExecutor │ ▼ Applicability Check │ ┌───────────────┴───────────────┐ │ │ YES│ │NO │ │ ▼ ▼ OnlineAggregationEngine Traditional Executor │ │ │ ▼ │ Full Scan │ │ │ ▼ │ Complete Result │ ▼2. Streaming Pipeline Setup ──────────────────────────────────────────────────────────────── Create channels: (data_tx, data_rx), (result_tx, result_rx) │ ├─→ Spawn: Input executor task │ (Reads data, sends batches) │ └─→ Spawn: Aggregation task (Samples, computes, sends results)
3. Continuous Processing Loop ──────────────────────────────────────────────────────────────── FOR EACH data batch: │ ├─→ Apply sampling strategy │ │ │ ├─→ Random: Hash-based decision │ ├─→ Stratified: Per-stratum sampling │ └─→ Adaptive: Variance-based rate │ ├─→ Update group estimates │ │ │ └─→ For each aggregate: accumulate statistics │ ├─→ Adjust adaptive rate (if applicable) │ │ │ └─→ Based on observed variance │ └─→ Check update interval │ ├─→ IF elapsed >= update_interval: │ │ │ ├─→ Compute confidence intervals │ ├─→ Create ProgressiveResult │ ├─→ Send to result_stream │ └─→ Check early termination │ └─→ ELSE: Continue processing
4. Result Consumption ──────────────────────────────────────────────────────────────── WHILE receiving results: │ ├─→ Display current estimate ├─→ Show confidence bounds ├─→ Visualize convergence │ └─→ IF target_achieved OR timeout: │ └─→ STOP and return final resultSampling Strategy Decision Tree
┌─────────────────────┐ │ Choose Sampling │ │ Strategy │ └──────────┬──────────┘ │ ┌────────────────┼────────────────┐ │ │ │ ▼ ▼ ▼ ┌────────────────┐ ┌────────────┐ ┌─────────────────┐ │ No GROUP BY? │ │ GROUP BY? │ │ Unknown data? │ └────────┬───────┘ └─────┬──────┘ └────────┬────────┘ │ │ │ │YES │YES │YES ▼ ▼ ▼ ┌────────────────┐ ┌────────────┐ ┌─────────────────┐ │ Random │ │ Stratified │ │ Adaptive │ │ Sampling │ │ Sampling │ │ Sampling │ │ │ │ │ │ │ │ rate: 0.01 │ │ rate: 0.02 │ │ initial: 0.01 │ └────────────────┘ └────────────┘ └─────────────────┘ │ │ │ └───────────────┼──────────────────┘ │ ▼ ┌─────────────────┐ │ Sample Stream │ └─────────────────┘Confidence Interval Calculation Pipeline
┌──────────────────────────────────────────────────────────────────┐│ For Each Group │└──────────────────────────────────────────────────────────────────┘ │ ┌───────────────┴───────────────┐ │ │ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ COUNT Estimate │ │ SUM Estimate │ └────────┬─────────┘ └────────┬─────────┘ │ │ ▼ ▼ Scaling Factor = 1 / sample_rate │ │ ▼ ▼ estimate = sample_count * scale estimate = sample_sum * scale │ │ ▼ ▼ SE = binomial_se(...) SE = variance_se(...) │ │ ▼ ▼ z = z_score(confidence_level) z = z_score(confidence_level) │ │ ▼ ▼ lower = estimate - z * SE lower = estimate - z * SE upper = estimate + z * SE upper = estimate + z * SE │ │ └──────────────┬───────────────┘ │ ▼ ┌─────────────────────────┐ │ EstimateWithBounds │ │ - estimate │ │ - lower_bound │ │ - upper_bound │ │ - relative_error │ │ - standard_error │ └─────────────────────────┘Adaptive Sampling Rate Adjustment
┌──────────────────┐ │ Initial Rate │ │ (e.g., 0.01) │ └────────┬─────────┘ │ ▼ ┌────────────────────────┐ │ Collect 30+ samples │ └────────┬───────────────┘ │ ▼ ┌────────────────────────────┐ │ Calculate variance across │ │ all groups │ └────────┬───────────────────┘ │ ▼ ┌──────────────────────────────────┐ │ Compare: observed vs target │ └──────────────┬───────────────────┘ │ ┌───────────────┼───────────────┐ │ │ │ │ HIGH │ OK │ LOW ▼ ▼ ▼┌─────────┐ ┌──────────┐ ┌─────────┐│Increase │ │ Keep │ │Decrease ││ rate │ │ rate │ │ rate ││ ×1.1 │ │ │ │ ×0.9 │└────┬────┘ └────┬─────┘ └────┬────┘ │ │ │ └──────────────┼───────────────┘ │ ▼ ┌───────────────┐ │ Update Rate │ │ (0.001-0.5) │ └───────────────┘ │ ▼ Back to sampling loopMemory Layout
┌─────────────────────────────────────────────────────────────────┐│ OnlineAggregationEngine ││ ││ ┌──────────────────────────────────────────────────────────┐ ││ │ config: AggregationConfig │ ││ │ - confidence_level: f64 │ ││ │ - target_error: f64 │ ││ │ - min_sample_size: usize │ ││ │ - sampling_strategy: SamplingStrategy │ ││ └──────────────────────────────────────────────────────────┘ ││ ││ ┌──────────────────────────────────────────────────────────┐ ││ │ state: Arc<RwLock<AggregationState>> │ ││ │ │ ││ │ ┌────────────────────────────────────────────────────┐ │ ││ │ │ total_processed: usize │ │ ││ │ │ total_estimated: usize │ │ ││ │ │ adaptive_rate: f64 │ │ ││ │ │ │ │ ││ │ │ sampled_rows: Vec<Row> │ │ ││ │ │ └─→ O(sample_size) │ │ ││ │ │ │ │ ││ │ │ group_estimates: HashMap<Vec<Bytes>, GroupEst> │ │ ││ │ │ └─→ O(num_groups) │ │ ││ │ │ ├─→ sample_count: usize │ │ ││ │ │ ├─→ sum_values: Vec<f64> │ │ ││ │ │ ├─→ sum_squares: Vec<f64> │ │ ││ │ │ ├─→ min_values: Vec<f64> │ │ ││ │ │ └─→ max_values: Vec<f64> │ │ ││ │ └────────────────────────────────────────────────────┘ │ ││ └──────────────────────────────────────────────────────────┘ │└─────────────────────────────────────────────────────────────────┘
Total Memory: O(sample_size × row_size + num_groups × num_aggregates)Typical: <1% of dataset sizeExecution Timeline
Time │ Action──────┼────────────────────────────────────────────────────────────0.00s │ Query submitted │ OnlineQueryExecutor checks applicability0.01s │ ✓ Suitable for online aggregation │ Create OnlineAggregationEngine │ Setup data pipeline │0.05s │ Start processing rows │ Sample: row 0, 47, 93, ... (rate=0.01) │0.10s │ ━━━━━━━━━━ 10% progress │ Sample size: 100 (min reached) │ → Send first progressive result │ COUNT: 1000 [800, 1200] (±20%) │0.20s │ ━━━━━━━━━━━━━━━━━━━━ 20% progress │ Sample size: 200 │ → Send update │ COUNT: 1000 [850, 1150] (±15%) │0.40s │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 40% progress │ Sample size: 400 │ → Send update │ COUNT: 1000 [920, 1080] (±8%) │0.60s │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 60% │ Sample size: 600 │ → Send update │ COUNT: 1000 [950, 1050] (±5%) │ ✓ Target achieved (±5% at 95% confidence) │ → Early termination │0.60s │ Return final result │ Processed: 6,000 / 10,000 rows (60%) │ Samples: 600 (6% sample rate) │ Speedup: ~15x (vs 1.0s full scan) │DONE │ Query completed successfullyIntegration Points
┌──────────────────────────────────────────────────────────────────────┐│ HeliosDB System Integration │└──────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐│ Storage │────────▶│ Network │────────▶│ Compute ││ Layer │ │ Protocol │ │ Layer ││ │ │ │ │ ││ - HiDB │ │ - Row format │ │ - Executor ││ - Sharding │ │ - Predicate │ │ - Planner ││ │ │ pushdown │ │ - Online Agg │└──────────────┘ └──────────────┘ └──────┬───────┘ │ ▼ ┌───────────────────┐ │ Client Interface │ │ │ │ - SQL API │ │ - Result Stream │ └───────────────────┘Key Design Decisions
1. Tokio Channels for Streaming
- Choice: mpsc channels
- Reason: Non-blocking, backpressure support, async-native
- Alternative: Crossbeam channels (considered, but tokio integration better)
2. Arc for State
- Choice: Shared mutable state with read-write lock
- Reason: Multiple readers (updates), single writer (main loop)
- Alternative: Mutex (considered, but read-heavy workload)
3. Hash-based Sampling
- Choice: Hash row index for determinism
- Reason: Reproducible sampling, no PRNG state
- Alternative: rand::random (non-deterministic)
4. In-memory Sample Storage
- Choice: Store sampled rows in Vec
- Reason: Small sample sizes (<1% typically)
- Alternative: Reservoir sampling (future enhancement)
5. Z-score Confidence Intervals
- Choice: Normal approximation with Z-scores
- Reason: Fast, works well for large samples (CLT)
- Alternative: Bootstrap (more accurate, but slower)
Performance Optimizations
1. Lazy Evaluation ───────────────────────────────────────────────────── Only compute confidence intervals when sending updates (not on every row)
2. Batched Updates ───────────────────────────────────────────────────── Process rows in batches, update state in bulk
3. Incremental Statistics ───────────────────────────────────────────────────── Track sum, sum_squares incrementally (no re-scan)
4. Early Termination ───────────────────────────────────────────────────── Stop processing when target_error achieved (saves 40-90% of work typically)
5. Zero-copy Row Handling ───────────────────────────────────────────────────── Use Bytes (ref-counted) to avoid copying row dataConclusion
The online aggregation architecture provides:
- Modular design: Clear separation of concerns
- Scalable: O(n) time, O(sample_size) space
- Flexible: Multiple sampling strategies
- Observable: Progressive result streaming
- Efficient: Early termination, lazy evaluation
- Accurate: Statistical confidence intervals
This enables interactive analytics on large datasets with approximate results available in milliseconds instead of seconds or minutes.