HeliosDB Streaming Analytics
HeliosDB Streaming Analytics
Real-time streaming analytics for HeliosDB with window functions, continuous queries, and event-time processing.
Features
-
Window Functions
- Tumbling windows (non-overlapping, fixed size)
- Sliding windows (overlapping)
- Session windows (gap-based)
-
Continuous Queries
- Always-on queries that process streams in real-time
- Multiple sink types (table, callback, memory)
- Automatic buffer management
-
Event-Time Processing
- Event time vs processing time semantics
- Watermarks for handling out-of-order events
- Configurable allowed lateness
- Late data handling and statistics
-
Stream Aggregations
- COUNT, SUM, AVG, MIN, MAX, FIRST, LAST
- Windowed aggregations
- Group-by support
-
Stream Joins
- Stream-table joins (dimension enrichment)
- Stream-stream joins (temporal joins)
- Multiple join types (inner, left, right, full outer)
Installation
Add to your Cargo.toml:
[dependencies]heliosdb-streaming = "3.0.0"Quick Start
use heliosdb_streaming::*;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Create streaming engine let engine = StreamingEngine::new().await?;
// Create a stream let stream = Stream::new(StreamConfig::default());
// Apply tumbling window (5 second intervals) let windowed = engine .tumbling_window(stream.clone(), Duration::from_secs(5)) .await?;
// Send events let mut fields = HashMap::new(); fields.insert("value".to_string(), Value::Integer(42)); stream.send_row(Row::new(fields)).await?;
// Send watermark to trigger window processing stream.send(StreamEvent::Watermark(chrono::Utc::now())).await?;
Ok(())}Window Functions
Tumbling Windows
Non-overlapping, fixed-size windows:
// 1-minute tumbling windowslet windowed = engine .tumbling_window(stream, Duration::from_secs(60)) .await?;SQL equivalent:
SELECT TUMBLE(timestamp, INTERVAL '1 minute') as window_start, COUNT(*) as event_countFROM eventsGROUP BY TUMBLE(timestamp, INTERVAL '1 minute');Sliding Windows
Overlapping windows with configurable slide interval:
// 5-minute windows, sliding every 1 minutelet windowed = engine .sliding_window( stream, Duration::from_secs(300), // window size Duration::from_secs(60) // slide interval ) .await?;SQL equivalent:
SELECT HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes') as window_start, COUNT(*) as event_countFROM eventsGROUP BY HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes');Session Windows
Dynamic windows based on inactivity gap:
// Group events with less than 10 minutes gaplet windowed = engine .session_window(stream, Duration::from_secs(600)) .await?;SQL equivalent:
SELECT SESSION(timestamp, INTERVAL '10 minutes') as session_id, COUNT(*) as events_per_sessionFROM user_eventsGROUP BY user_id, SESSION(timestamp, INTERVAL '10 minutes');Continuous Queries
Always-on queries that continuously process streaming data:
use std::sync::Arc;
// Create a sink for resultslet sink = Arc::new(CallbackSink::new(|rows| { for row in rows { println!("Result: {:?}", row); }}));
// Define continuous querylet query = ContinuousQuery::new( "real_time_dashboard".to_string(), "SELECT COUNT(*) FROM api_requests".to_string(), sink, stream,);
// Start the querylet handle = engine.create_continuous_query(query).await?;
// Check statslet stats = handle.get_stats();println!("Processed: {} rows", stats.rows_processed);
// Stop the queryengine.stop_continuous_query("real_time_dashboard").await?;Windowed Aggregations
Perform aggregations over windows with group-by support:
let aggregation = WindowedAggregation::new( WindowType::Tumbling { size: Duration::from_secs(60) }, vec![ AggregateFunction::Count, AggregateFunction::Sum { field: "amount".to_string() }, AggregateFunction::Avg { field: "response_time".to_string() }, AggregateFunction::Min { field: "price".to_string() }, AggregateFunction::Max { field: "price".to_string() }, ], stream,).with_group_by(vec!["status".to_string(), "region".to_string()]).execute().await?;Available aggregation functions:
Count- Count rowsSum- Sum valuesAvg- Average valuesMin- Minimum valueMax- Maximum valueFirst- First value in windowLast- Last value in window
Watermarks and Event Time
Event Time vs Processing Time
let config = StreamConfig { name: "events".to_string(), time_semantics: TimeSemantics::EventTime, // or ProcessingTime allowed_lateness: Duration::from_secs(60), watermark_interval: Duration::from_secs(1),};
let stream = Stream::new(config);Watermark Generation
use heliosdb_streaming::*;
// Bounded out-of-orderness strategylet generator = WatermarkGenerator::new( WatermarkStrategy::BoundedOutOfOrderness { max_out_of_order: Duration::from_secs(5), }, Duration::from_secs(60), // allowed lateness);
// Update watermark with new eventlet event_time = chrono::Utc::now();generator.update(event_time);
// Check if event is latelet is_late = generator.is_late(event_time);Late Data Handling
let handler = LateDataHandler::new(Duration::from_secs(60));let watermark = chrono::Utc::now();let event_time = watermark - chrono::Duration::seconds(30);
match handler.handle_late_data(event_time, watermark) { LateDataDecision::Process => println!("On time"), LateDataDecision::ProcessAsLate => println!("Late but within allowed lateness"), LateDataDecision::Drop => println!("Too late, dropped"),}
// Get statisticslet stats = handler.get_stats();println!("Late: {}, Dropped: {}", stats.late_data_count, stats.dropped_count);Stream Joins
Stream-Table Join
Enrich streaming data with static dimension tables:
let join = StreamTableJoin::new( "users".to_string(), // table name "user_id".to_string(), // join key);
// Load dimension tablelet table_data = vec![/* rows */];join.load_table(table_data)?;
// Perform joinlet enriched = join.join(stream).await?;Stream-Stream Join
Join two streams within a time window:
let join = StreamStreamJoin::new(Duration::from_secs(60)) .with_keys("order_id".to_string(), "payment_order_id".to_string()) .with_join_type(JoinType::Inner);
let joined = join.join(orders_stream, payments_stream).await?;Join types:
JoinType::Inner- Only matching recordsJoinType::LeftOuter- All left records + matchesJoinType::RightOuter- All right records + matchesJoinType::FullOuter- All records from both sides
Complete Example: Real-Time Dashboard
use heliosdb_streaming::*;use std::collections::HashMap;use std::sync::Arc;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Initialize engine let engine = StreamingEngine::new().await?;
// Create API request stream let api_stream = Stream::new(StreamConfig { name: "api_requests".to_string(), time_semantics: TimeSemantics::EventTime, allowed_lateness: Duration::from_secs(10), watermark_interval: Duration::from_secs(1), });
// Apply 1-minute tumbling window let windowed = engine .tumbling_window(api_stream.clone(), Duration::from_secs(60)) .await?;
// Aggregate by status code let aggregated = WindowedAggregation::new( WindowType::Tumbling { size: Duration::from_secs(60) }, vec![ AggregateFunction::Count, AggregateFunction::Avg { field: "response_time".to_string() }, ], windowed, ) .with_group_by(vec!["status".to_string()]) .execute() .await?;
// Create continuous query with callback sink let sink = Arc::new(CallbackSink::new(|rows| { for row in rows { if let Some(Value::String(status)) = row.get("status") { let count = row.get("count").and_then(|v| v.as_i64()).unwrap_or(0); let avg = row.get("avg_response_time").and_then(|v| v.as_f64()).unwrap_or(0.0); println!("Status: {} | Count: {} | Avg: {:.2}ms", status, count, avg); } } }));
let query = ContinuousQuery::new( "dashboard".to_string(), "SELECT status, COUNT(*), AVG(response_time) FROM api_requests GROUP BY status".to_string(), sink, aggregated, );
let handle = engine.create_continuous_query(query).await?; println!("Dashboard running: {}", handle.name());
// Send events for i in 0..100 { let mut fields = HashMap::new(); fields.insert("status".to_string(), Value::String("200".to_string())); fields.insert("response_time".to_string(), Value::Float(50.0 + (i as f64)));
api_stream.send_row(Row::new(fields)).await?;
if i % 10 == 0 { api_stream.send(StreamEvent::Watermark(chrono::Utc::now())).await?; } }
// Shutdown engine.shutdown().await?; Ok(())}Performance Considerations
- Buffer Sizing: Configure appropriate buffer sizes for continuous queries based on throughput
- Watermark Intervals: Balance between latency and completeness
- Allowed Lateness: Set based on expected out-of-order arrival patterns
- Window Sizes: Larger windows consume more memory but provide broader context
Architecture
Stream -> Window -> Aggregation -> Continuous Query -> Sink | | | | | | | | | v v v v v Table/CallbackEvents Tumbling COUNT/SUM/AVG Buffer Memory/Storage Sliding MIN/MAX/... Manager Session GROUP BYTesting
Run all tests:
cargo testRun integration tests:
cargo test --test integration_testRun example:
cargo run --example streaming_dashboardBenchmarks
cargo benchLicense
MIT OR Apache-2.0
Contributing
Contributions are welcome! Please see the main HeliosDB repository for guidelines.