Online Aggregation with Progressive Results
Online Aggregation with Progressive Results
Overview
HeliosDB’s online aggregation feature, inspired by LeanXcale’s progressive query processing, enables approximate query results with confidence intervals that improve over time. This allows users to:
- Get early results while query is still executing
- Make decisions before full scan completes
- Trade accuracy for response time
- Visualize result convergence in real-time
Key Concepts
Online Aggregation
Traditional aggregation requires scanning the entire dataset before returning results. Online aggregation provides progressive estimates that improve as more data is processed:
Traditional: [==============================] → Result (5s)Online: [======] → Estimate±10% (1s) [===============] → Estimate±5% (2s) [========================] → Estimate±2% (3s)Confidence Intervals
Every estimate includes:
- Point Estimate: Most likely value
- Confidence Bounds: Range containing true value with specified probability (e.g., 95%)
- Relative Error: Accuracy as percentage of estimate
- Standard Error: Measure of estimate precision
Example:
COUNT: 10,000 [9,500, 10,500] (±5.0%)AVG: 42.5 [40.3, 44.7] (±5.2%)Sampling Strategies
1. Random Sampling
Simple uniform random sampling across entire dataset.
SamplingStrategy::Random { sample_rate: 0.01 // 1% sample}Best for: Uniform data distributions, simple queries
Pros: Simple, unbiased, works well for homogeneous data Cons: May miss rare groups, inefficient for skewed distributions
2. Stratified Sampling
Ensures representative samples from each stratum (group).
SamplingStrategy::Stratified { strata_columns: vec![region_col], sample_rate: 0.02 // 2% per stratum}Best for: GROUP BY queries, skewed distributions, rare groups
Pros: Better accuracy for grouped queries, handles skew Cons: Requires knowing stratification columns upfront
3. Adaptive Sampling
Dynamically adjusts sample rate based on observed variance.
SamplingStrategy::Adaptive { initial_rate: 0.01, target_variance: 100.0}Best for: Unknown data distributions, mixed variance
Pros: Self-tuning, optimal accuracy/cost tradeoff Cons: Needs warm-up period, more complex
API Usage
Basic Usage
use heliosdb_compute::{ OnlineAggregationEngine, AggregationConfig, SamplingStrategy, AggregateType, ProgressiveResultVisualizer,};
// Configure online aggregationlet config = AggregationConfig { confidence_level: 0.95, // 95% confidence intervals target_error: 0.05, // Target ±5% accuracy min_sample_size: 100, // Min samples before reporting sampling_strategy: SamplingStrategy::Random { sample_rate: 0.01 }, enable_visualization: true, ..Default::default()};
// Create enginelet engine = OnlineAggregationEngine::new(config);
// Define aggregateslet aggregates = vec![ AggregateType::Count, AggregateType::Sum { column: 0 }, AggregateType::Avg { column: 0 }, AggregateType::Min { column: 0 }, AggregateType::Max { column: 0 },];
// Execute with progressive resultslet mut result_rx = engine.execute_online_aggregation( data_stream, vec![], // group_by columns aggregates,).await?;
// Process progressive updateswhile let Some(result) = result_rx.recv().await { println!("{}", ProgressiveResultVisualizer::format_result(&result));
if result.target_achieved { // Sufficient accuracy reached break; }}Grouped Aggregation
// GROUP BY with stratified samplinglet config = AggregationConfig { sampling_strategy: SamplingStrategy::Stratified { strata_columns: vec![region_col], sample_rate: 0.05, }, ..Default::default()};
let engine = OnlineAggregationEngine::new(config);
// Group by region columnlet mut result_rx = engine.execute_online_aggregation( data_stream, vec![region_col], // GROUP BY vec![ AggregateType::Count, AggregateType::Avg { column: sales_col }, ],).await?;
// Each result contains estimates per groupwhile let Some(result) = result_rx.recv().await { for estimate in &result.results { println!("Group: {:?}", estimate.group_key); println!("Count: {} ±{}%", estimate.count.estimate, estimate.count.relative_error * 100.0 ); }}Query Integration
use heliosdb_compute::{ OnlineQueryExecutor, OnlineQueryBuilder, QueryExecutor, QueryResult,};
// Create online query executorlet query_executor = QueryExecutor::new(storage_addr, 10);
let online_config = OnlineQueryBuilder::new() .confidence(0.95) .target_error(0.05) .adaptive_sampling(0.01, 100.0) .build();
let online_executor = OnlineQueryExecutor::new( query_executor, Some(online_config));
// Execute query (automatically uses online aggregation if applicable)match online_executor.execute_with_progressive_results(plan).await? { QueryResult::Complete { rows } => { // Traditional complete result println!("Got {} rows", rows.len()); } QueryResult::Progressive { mut result_stream } => { // Progressive results while let Some(update) = result_stream.recv().await { println!("Progress: {:.1}%", update.progress * 100.0); // Display estimates... } }}Configuration Guide
Choosing Confidence Level
- 90% (z=1.645): Fast convergence, wider bounds
- 95% (z=1.960): Standard choice, balanced
- 99% (z=2.576): High confidence, requires more samples
Trade-off: Higher confidence → Wider intervals OR more samples needed
Setting Target Error
Target relative error determines when to stop sampling:
target_error: 0.10 // Stop when error ≤ ±10%target_error: 0.05 // Stop when error ≤ ±5%target_error: 0.01 // Stop when error ≤ ±1%Lower target error requires more samples (approximately O(1/error²)).
Selecting Sample Rate
Initial sample rate depends on:
- Data size: Larger datasets can use lower rates
- Variance: High variance needs higher rates
- Groups: More groups need higher rates
Guidelines:
- 1M rows, low variance: 0.01 (1%)
- 10M rows, high variance: 0.05 (5%)
- 100M rows with groups: 0.02 (2%) with stratification
Accuracy Trade-offs
Sample Size vs. Accuracy
Confidence interval width decreases with √n:
| Sample Size | Relative Error (typical) |
|---|---|
| 100 | ±15% |
| 400 | ±7.5% |
| 1,000 | ±5% |
| 10,000 | ±1.5% |
Supported Aggregates
| Aggregate | Accuracy | Notes |
|---|---|---|
| COUNT | Excellent | Binomial estimation |
| SUM | Good | Variance-dependent |
| AVG | Excellent | Central Limit Theorem |
| MIN | Poor | Biased estimate |
| MAX | Poor | Biased estimate |
Recommendation: MIN/MAX are exact after seeing min/max value, but biased during sampling. Use with caution.
Error Sources
- Sampling Error: Inherent in random sampling (quantified by confidence intervals)
- Stratification Bias: If strata not properly identified
- Variance: High variance requires more samples
- Skewness: Heavy-tailed distributions slow convergence
Visualization
Result Formatting
// Format progressive resultlet output = ProgressiveResultVisualizer::format_result(&result);println!("{}", output);Output:
=== Progressive Aggregation Result ===Time: 2.50s | Progress: 45.2% | Samples: 1,234/10,000
Target Achieved: false
Group 1: COUNT: 5000 [4750, 5250] (±5.0%) SUM[0]: 250000.00 [240000.00, 260000.00] (±4.0%) AVG[0]: 50.00 [48.50, 51.50] (±3.0%)Convergence Visualization
if let Some(viz) = &result.visualization { let chart = ProgressiveResultVisualizer::visualize_convergence( viz, 60, // width 10, // height ); println!("{}", chart);}Output:
=== Estimate Convergence ===
10500 | * 10250 | * 10000 | * 9750 | * 9500 | * 9250 | * 9000 | * 8750 | * 8500 | * 8250 |* +------------------------------------------------------------ 0 time (s)
=== Error Convergence ===
50.0% |# 45.0% |## 40.0% |### 35.0% |#### 30.0% |##### 25.0% |###### 20.0% |####### 15.0% |######## 10.0% |########## 5.0% |############Performance Characteristics
Latency
First result arrives after processing:
min_sample_sizerows- First
update_interval(typically 50-200ms)
Subsequent updates every update_interval.
Throughput
Sampling overhead is minimal:
- Random: ~1% CPU per sample decision
- Stratified: ~2% CPU (hash computation)
- Adaptive: ~3% CPU (variance tracking)
Memory
Memory usage:
- Sampled rows:
O(sample_size * row_size) - Group state:
O(num_groups * num_aggregates) - Typically: <1% of dataset size
Early Termination
Best case (low variance, no groups):
- 0.1% of data for ±10% error
- 1% of data for ±5% error
- 10% of data for ±1% error
Worst case (high variance, many groups):
- May need 50%+ of data for tight bounds
Best Practices
1. Start with Loose Bounds
// Development/explorationtarget_error: 0.1, // ±10%confidence_level: 0.90,
// Productiontarget_error: 0.05, // ±5%confidence_level: 0.95,2. Use Appropriate Sampling
// No GROUP BY → Random samplingsampling_strategy: SamplingStrategy::Random { sample_rate: 0.01 }
// With GROUP BY → Stratified samplingsampling_strategy: SamplingStrategy::Stratified { strata_columns: group_by_columns.clone(), sample_rate: 0.02,}
// Unknown characteristics → Adaptive samplingsampling_strategy: SamplingStrategy::Adaptive { initial_rate: 0.01, target_variance: 100.0,}3. Monitor Progress
while let Some(result) = result_rx.recv().await { if result.progress > 0.1 { // Show first estimate after 10% display_results(&result); }
if result.target_achieved { // Sufficient accuracy break; }
if result.progress > 0.5 && !improving(&result) { // Convergence stalled, may need full scan continue_full_scan(); }}4. Handle Edge Cases
// Check sample sizeif result.sample_size < 100 { println!("Warning: Sample size too small for reliable estimates");}
// Check for empty groupsfor estimate in &result.results { if estimate.count.estimate < 10.0 { println!("Warning: Group has very few samples"); }}
// Verify bounds are reasonableif estimate.count.relative_error > 0.5 { println!("Warning: High uncertainty (±50%+)");}Limitations
Not Suitable For
- Exact queries: WHERE clauses need exact evaluation
- Complex joins: Join selectivity hard to estimate
- DISTINCT: Cardinality estimation requires different techniques
- ORDER BY + LIMIT: Need full sort for correctness
- Subqueries: Correlated subqueries need exact inner results
When to Use Full Scan
- Required accuracy < 1%
- Small datasets (< 10K rows)
- Many small groups (high cardinality GROUP BY)
- MIN/MAX critical to application logic
Examples
See /examples/online_aggregation_demo.rs for comprehensive examples:
- Basic Aggregation: Random sampling with confidence intervals
- Grouped Aggregation: Sales analysis by region
- Stratified Sampling: Handling imbalanced groups
- Adaptive Sampling: High-variance data
- Early Termination: Low-variance optimization
Run examples:
cargo run --example online_aggregation_demoReferences
- BlinkDB: Agarwal et al., “BlinkDB: Queries with Bounded Errors and Bounded Response Times on Very Large Data”, EuroSys 2013
- Online Aggregation: Hellerstein et al., “Online Aggregation”, SIGMOD 1997
- LeanXcale: Influenced design of progressive result streaming
- Confidence Intervals: Based on Central Limit Theorem and bootstrap methods
Future Enhancements
- Stratified sampling with automatic stratification
- Bootstrap confidence intervals for non-normal distributions
- Reservoir sampling for streaming data
- Approximate DISTINCT via HyperLogLog
- Approximate JOIN via sampling
- Query plan optimization based on sample statistics