Skip to content

Online Aggregation Architecture

Online Aggregation Architecture

System Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│ HeliosDB Query Layer │
│ │
│ ┌────────────────┐ ┌──────────────────────────────────┐ │
│ │ Query Parser │────────▶│ Query Planner │ │
│ └────────────────┘ └──────────────┬───────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────┐ │
│ │ Physical Plan │ │
│ │ - TableScan │ │
│ │ - HashAggregate │ │
│ │ - GroupBy │ │
│ └───────────┬───────────────┘ │
└─────────────────────────────────────────┼───────────────────────────────┘
┌─────────────────────┴─────────────────────┐
│ │
▼ ▼
┌───────────────────────┐ ┌───────────────────────┐
│ QueryExecutor │ │ OnlineQueryExecutor │
│ (Traditional) │ │ (Progressive) │
│ │ │ │
│ - Full scan │ │ - Applicability │
│ - Exact results │ │ check │
│ - Blocking │ │ - Auto-sampling │
└───────────────────────┘ │ - Progressive │
└───────┬───────────────┘
┌────────────────────────────────────┐
│ OnlineAggregationEngine │
│ │
│ ┌──────────────────────────────┐ │
│ │ Configuration │ │
│ │ - confidence_level │ │
│ │ - target_error │ │
│ │ - sampling_strategy │ │
│ └──────────────────────────────┘ │
│ │
│ ┌──────────────────────────────┐ │
│ │ Aggregation State │ │
│ │ - total_processed │ │
│ │ - sampled_rows │ │
│ │ - group_estimates │ │
│ │ - adaptive_rate │ │
│ └──────────────────────────────┘ │
└────────┬───────────────────────────┘
┌────────────────────────────────────────────────────┐
│ Data Streaming Pipeline │
└────────────────────────────────────────────────────┘
┌───────────────────────────────┼───────────────────────────┐
│ │ │
▼ ▼ ▼
┌────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ Random Sampling │ │ Stratified Sampling │ │ Adaptive Sampling │
│ │ │ │ │ │
│ - Hash-based │ │ - Per-stratum │ │ - Variance tracking │
│ - Uniform prob │ │ - Balanced samples │ │ - Rate adjustment │
│ - Simple │ │ - Group-aware │ │ - Self-tuning │
└────────┬───────────┘ └──────────┬──────────┘ └──────────┬──────────┘
│ │ │
└───────────────────────────────┼─────────────────────────────┘
┌─────────────────────────┐
│ Sample Selection │
│ - Row filtering │
│ - Sample storage │
└────────────┬────────────┘
┌────────────────────────────────────┐
│ Group Estimate Update │
│ │
│ For each aggregate: │
│ ┌──────────────────────────────┐ │
│ │ COUNT: Increment counter │ │
│ │ SUM: Add value + variance │ │
│ │ AVG: Update sum + count │ │
│ │ MIN: Track minimum │ │
│ │ MAX: Track maximum │ │
│ └──────────────────────────────┘ │
└────────────┬───────────────────────┘
┌────────────────────────────────────────┐
│ Confidence Interval Calculation │
│ │
│ 1. Calculate point estimate │
│ 2. Compute standard error │
│ 3. Apply z-score (1.96 for 95%) │
│ 4. Generate bounds │
│ │
│ Lower = estimate - z * SE │
│ Upper = estimate + z * SE │
└────────────┬───────────────────────────┘
┌────────────────────────────────────────────────┐
│ Progressive Result Creation │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ ProgressiveResult │ │
│ │ - timestamp │ │
│ │ - progress (%) │ │
│ │ - sample_size │ │
│ │ - estimated_total │ │
│ │ - results: Vec<AggregateEstimate> │ │
│ │ - visualization: VisualizationData │ │
│ │ - target_achieved: bool │ │
│ └──────────────────────────────────────────┘ │
└────────────┬───────────────────────────────────┘
┌────────────────────────────────────────┐
│ Result Stream (mpsc channel) │
│ │
│ Update Interval: 100ms (configurable) │
└────────────┬───────────────────────────┘
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────────┐
│ Client │ │ Client │ │ Client │
│ │ │ │ │ │
│ Display │ │ Decision │ │ Termination │
└─────────┘ └──────────┘ └──────────────┘

Component Interaction Flow

1. Query Submission
────────────────────────────────────────────────────────────────
User Query → Parser → Planner → OnlineQueryExecutor
Applicability Check
┌───────────────┴───────────────┐
│ │
YES│ │NO
│ │
▼ ▼
OnlineAggregationEngine Traditional Executor
│ │
│ ▼
│ Full Scan
│ │
│ ▼
│ Complete Result
2. Streaming Pipeline Setup
────────────────────────────────────────────────────────────────
Create channels: (data_tx, data_rx), (result_tx, result_rx)
├─→ Spawn: Input executor task
│ (Reads data, sends batches)
└─→ Spawn: Aggregation task
(Samples, computes, sends results)
3. Continuous Processing Loop
────────────────────────────────────────────────────────────────
FOR EACH data batch:
├─→ Apply sampling strategy
│ │
│ ├─→ Random: Hash-based decision
│ ├─→ Stratified: Per-stratum sampling
│ └─→ Adaptive: Variance-based rate
├─→ Update group estimates
│ │
│ └─→ For each aggregate: accumulate statistics
├─→ Adjust adaptive rate (if applicable)
│ │
│ └─→ Based on observed variance
└─→ Check update interval
├─→ IF elapsed >= update_interval:
│ │
│ ├─→ Compute confidence intervals
│ ├─→ Create ProgressiveResult
│ ├─→ Send to result_stream
│ └─→ Check early termination
└─→ ELSE: Continue processing
4. Result Consumption
────────────────────────────────────────────────────────────────
WHILE receiving results:
├─→ Display current estimate
├─→ Show confidence bounds
├─→ Visualize convergence
└─→ IF target_achieved OR timeout:
└─→ STOP and return final result

Sampling Strategy Decision Tree

┌─────────────────────┐
│ Choose Sampling │
│ Strategy │
└──────────┬──────────┘
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
┌────────────────┐ ┌────────────┐ ┌─────────────────┐
│ No GROUP BY? │ │ GROUP BY? │ │ Unknown data? │
└────────┬───────┘ └─────┬──────┘ └────────┬────────┘
│ │ │
│YES │YES │YES
▼ ▼ ▼
┌────────────────┐ ┌────────────┐ ┌─────────────────┐
│ Random │ │ Stratified │ │ Adaptive │
│ Sampling │ │ Sampling │ │ Sampling │
│ │ │ │ │ │
│ rate: 0.01 │ │ rate: 0.02 │ │ initial: 0.01 │
└────────────────┘ └────────────┘ └─────────────────┘
│ │ │
└───────────────┼──────────────────┘
┌─────────────────┐
│ Sample Stream │
└─────────────────┘

Confidence Interval Calculation Pipeline

┌──────────────────────────────────────────────────────────────────┐
│ For Each Group │
└──────────────────────────────────────────────────────────────────┘
┌───────────────┴───────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ COUNT Estimate │ │ SUM Estimate │
└────────┬─────────┘ └────────┬─────────┘
│ │
▼ ▼
Scaling Factor = 1 / sample_rate
│ │
▼ ▼
estimate = sample_count * scale estimate = sample_sum * scale
│ │
▼ ▼
SE = binomial_se(...) SE = variance_se(...)
│ │
▼ ▼
z = z_score(confidence_level) z = z_score(confidence_level)
│ │
▼ ▼
lower = estimate - z * SE lower = estimate - z * SE
upper = estimate + z * SE upper = estimate + z * SE
│ │
└──────────────┬───────────────┘
┌─────────────────────────┐
│ EstimateWithBounds │
│ - estimate │
│ - lower_bound │
│ - upper_bound │
│ - relative_error │
│ - standard_error │
└─────────────────────────┘

Adaptive Sampling Rate Adjustment

┌──────────────────┐
│ Initial Rate │
│ (e.g., 0.01) │
└────────┬─────────┘
┌────────────────────────┐
│ Collect 30+ samples │
└────────┬───────────────┘
┌────────────────────────────┐
│ Calculate variance across │
│ all groups │
└────────┬───────────────────┘
┌──────────────────────────────────┐
│ Compare: observed vs target │
└──────────────┬───────────────────┘
┌───────────────┼───────────────┐
│ │ │
│ HIGH │ OK │ LOW
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌─────────┐
│Increase │ │ Keep │ │Decrease │
│ rate │ │ rate │ │ rate │
│ ×1.1 │ │ │ │ ×0.9 │
└────┬────┘ └────┬─────┘ └────┬────┘
│ │ │
└──────────────┼───────────────┘
┌───────────────┐
│ Update Rate │
│ (0.001-0.5) │
└───────────────┘
Back to sampling loop

Memory Layout

┌─────────────────────────────────────────────────────────────────┐
│ OnlineAggregationEngine │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ config: AggregationConfig │ │
│ │ - confidence_level: f64 │ │
│ │ - target_error: f64 │ │
│ │ - min_sample_size: usize │ │
│ │ - sampling_strategy: SamplingStrategy │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ state: Arc<RwLock<AggregationState>> │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ total_processed: usize │ │ │
│ │ │ total_estimated: usize │ │ │
│ │ │ adaptive_rate: f64 │ │ │
│ │ │ │ │ │
│ │ │ sampled_rows: Vec<Row> │ │ │
│ │ │ └─→ O(sample_size) │ │ │
│ │ │ │ │ │
│ │ │ group_estimates: HashMap<Vec<Bytes>, GroupEst> │ │ │
│ │ │ └─→ O(num_groups) │ │ │
│ │ │ ├─→ sample_count: usize │ │ │
│ │ │ ├─→ sum_values: Vec<f64> │ │ │
│ │ │ ├─→ sum_squares: Vec<f64> │ │ │
│ │ │ ├─→ min_values: Vec<f64> │ │ │
│ │ │ └─→ max_values: Vec<f64> │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Total Memory: O(sample_size × row_size + num_groups × num_aggregates)
Typical: <1% of dataset size

Execution Timeline

Time │ Action
──────┼────────────────────────────────────────────────────────────
0.00s │ Query submitted
│ OnlineQueryExecutor checks applicability
0.01s │ ✓ Suitable for online aggregation
│ Create OnlineAggregationEngine
│ Setup data pipeline
0.05s │ Start processing rows
│ Sample: row 0, 47, 93, ... (rate=0.01)
0.10s │ ━━━━━━━━━━ 10% progress
│ Sample size: 100 (min reached)
│ → Send first progressive result
│ COUNT: 1000 [800, 1200] (±20%)
0.20s │ ━━━━━━━━━━━━━━━━━━━━ 20% progress
│ Sample size: 200
│ → Send update
│ COUNT: 1000 [850, 1150] (±15%)
0.40s │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 40% progress
│ Sample size: 400
│ → Send update
│ COUNT: 1000 [920, 1080] (±8%)
0.60s │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 60%
│ Sample size: 600
│ → Send update
│ COUNT: 1000 [950, 1050] (±5%)
│ ✓ Target achieved (±5% at 95% confidence)
│ → Early termination
0.60s │ Return final result
│ Processed: 6,000 / 10,000 rows (60%)
│ Samples: 600 (6% sample rate)
│ Speedup: ~15x (vs 1.0s full scan)
DONE │ Query completed successfully

Integration Points

┌──────────────────────────────────────────────────────────────────────┐
│ HeliosDB System Integration │
└──────────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Storage │────────▶│ Network │────────▶│ Compute │
│ Layer │ │ Protocol │ │ Layer │
│ │ │ │ │ │
│ - HiDB │ │ - Row format │ │ - Executor │
│ - Sharding │ │ - Predicate │ │ - Planner │
│ │ │ pushdown │ │ - Online Agg │
└──────────────┘ └──────────────┘ └──────┬───────┘
┌───────────────────┐
│ Client Interface │
│ │
│ - SQL API │
│ - Result Stream │
└───────────────────┘

Key Design Decisions

1. Tokio Channels for Streaming

  • Choice: mpsc channels
  • Reason: Non-blocking, backpressure support, async-native
  • Alternative: Crossbeam channels (considered, but tokio integration better)

2. Arc for State

  • Choice: Shared mutable state with read-write lock
  • Reason: Multiple readers (updates), single writer (main loop)
  • Alternative: Mutex (considered, but read-heavy workload)

3. Hash-based Sampling

  • Choice: Hash row index for determinism
  • Reason: Reproducible sampling, no PRNG state
  • Alternative: rand::random (non-deterministic)

4. In-memory Sample Storage

  • Choice: Store sampled rows in Vec
  • Reason: Small sample sizes (<1% typically)
  • Alternative: Reservoir sampling (future enhancement)

5. Z-score Confidence Intervals

  • Choice: Normal approximation with Z-scores
  • Reason: Fast, works well for large samples (CLT)
  • Alternative: Bootstrap (more accurate, but slower)

Performance Optimizations

1. Lazy Evaluation
─────────────────────────────────────────────────────
Only compute confidence intervals when sending updates
(not on every row)
2. Batched Updates
─────────────────────────────────────────────────────
Process rows in batches, update state in bulk
3. Incremental Statistics
─────────────────────────────────────────────────────
Track sum, sum_squares incrementally (no re-scan)
4. Early Termination
─────────────────────────────────────────────────────
Stop processing when target_error achieved
(saves 40-90% of work typically)
5. Zero-copy Row Handling
─────────────────────────────────────────────────────
Use Bytes (ref-counted) to avoid copying row data

Conclusion

The online aggregation architecture provides:

  • Modular design: Clear separation of concerns
  • Scalable: O(n) time, O(sample_size) space
  • Flexible: Multiple sampling strategies
  • Observable: Progressive result streaming
  • Efficient: Early termination, lazy evaluation
  • Accurate: Statistical confidence intervals

This enables interactive analytics on large datasets with approximate results available in milliseconds instead of seconds or minutes.