Skip to content

HeliosDB Streaming Analytics

HeliosDB Streaming Analytics

Real-time streaming analytics for HeliosDB with window functions, continuous queries, and event-time processing.

Features

  • Window Functions

    • Tumbling windows (non-overlapping, fixed size)
    • Sliding windows (overlapping)
    • Session windows (gap-based)
  • Continuous Queries

    • Always-on queries that process streams in real-time
    • Multiple sink types (table, callback, memory)
    • Automatic buffer management
  • Event-Time Processing

    • Event time vs processing time semantics
    • Watermarks for handling out-of-order events
    • Configurable allowed lateness
    • Late data handling and statistics
  • Stream Aggregations

    • COUNT, SUM, AVG, MIN, MAX, FIRST, LAST
    • Windowed aggregations
    • Group-by support
  • Stream Joins

    • Stream-table joins (dimension enrichment)
    • Stream-stream joins (temporal joins)
    • Multiple join types (inner, left, right, full outer)

Installation

Add to your Cargo.toml:

[dependencies]
heliosdb-streaming = "3.0.0"

Quick Start

use heliosdb_streaming::*;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Create streaming engine
let engine = StreamingEngine::new().await?;
// Create a stream
let stream = Stream::new(StreamConfig::default());
// Apply tumbling window (5 second intervals)
let windowed = engine
.tumbling_window(stream.clone(), Duration::from_secs(5))
.await?;
// Send events
let mut fields = HashMap::new();
fields.insert("value".to_string(), Value::Integer(42));
stream.send_row(Row::new(fields)).await?;
// Send watermark to trigger window processing
stream.send(StreamEvent::Watermark(chrono::Utc::now())).await?;
Ok(())
}

Window Functions

Tumbling Windows

Non-overlapping, fixed-size windows:

// 1-minute tumbling windows
let windowed = engine
.tumbling_window(stream, Duration::from_secs(60))
.await?;

SQL equivalent:

SELECT
TUMBLE(timestamp, INTERVAL '1 minute') as window_start,
COUNT(*) as event_count
FROM events
GROUP BY TUMBLE(timestamp, INTERVAL '1 minute');

Sliding Windows

Overlapping windows with configurable slide interval:

// 5-minute windows, sliding every 1 minute
let windowed = engine
.sliding_window(
stream,
Duration::from_secs(300), // window size
Duration::from_secs(60) // slide interval
)
.await?;

SQL equivalent:

SELECT
HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes') as window_start,
COUNT(*) as event_count
FROM events
GROUP BY HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes');

Session Windows

Dynamic windows based on inactivity gap:

// Group events with less than 10 minutes gap
let windowed = engine
.session_window(stream, Duration::from_secs(600))
.await?;

SQL equivalent:

SELECT
SESSION(timestamp, INTERVAL '10 minutes') as session_id,
COUNT(*) as events_per_session
FROM user_events
GROUP BY user_id, SESSION(timestamp, INTERVAL '10 minutes');

Continuous Queries

Always-on queries that continuously process streaming data:

use std::sync::Arc;
// Create a sink for results
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
println!("Result: {:?}", row);
}
}));
// Define continuous query
let query = ContinuousQuery::new(
"real_time_dashboard".to_string(),
"SELECT COUNT(*) FROM api_requests".to_string(),
sink,
stream,
);
// Start the query
let handle = engine.create_continuous_query(query).await?;
// Check stats
let stats = handle.get_stats();
println!("Processed: {} rows", stats.rows_processed);
// Stop the query
engine.stop_continuous_query("real_time_dashboard").await?;

Windowed Aggregations

Perform aggregations over windows with group-by support:

let aggregation = WindowedAggregation::new(
WindowType::Tumbling { size: Duration::from_secs(60) },
vec![
AggregateFunction::Count,
AggregateFunction::Sum { field: "amount".to_string() },
AggregateFunction::Avg { field: "response_time".to_string() },
AggregateFunction::Min { field: "price".to_string() },
AggregateFunction::Max { field: "price".to_string() },
],
stream,
)
.with_group_by(vec!["status".to_string(), "region".to_string()])
.execute()
.await?;

Available aggregation functions:

  • Count - Count rows
  • Sum - Sum values
  • Avg - Average values
  • Min - Minimum value
  • Max - Maximum value
  • First - First value in window
  • Last - Last value in window

Watermarks and Event Time

Event Time vs Processing Time

let config = StreamConfig {
name: "events".to_string(),
time_semantics: TimeSemantics::EventTime, // or ProcessingTime
allowed_lateness: Duration::from_secs(60),
watermark_interval: Duration::from_secs(1),
};
let stream = Stream::new(config);

Watermark Generation

use heliosdb_streaming::*;
// Bounded out-of-orderness strategy
let generator = WatermarkGenerator::new(
WatermarkStrategy::BoundedOutOfOrderness {
max_out_of_order: Duration::from_secs(5),
},
Duration::from_secs(60), // allowed lateness
);
// Update watermark with new event
let event_time = chrono::Utc::now();
generator.update(event_time);
// Check if event is late
let is_late = generator.is_late(event_time);

Late Data Handling

let handler = LateDataHandler::new(Duration::from_secs(60));
let watermark = chrono::Utc::now();
let event_time = watermark - chrono::Duration::seconds(30);
match handler.handle_late_data(event_time, watermark) {
LateDataDecision::Process => println!("On time"),
LateDataDecision::ProcessAsLate => println!("Late but within allowed lateness"),
LateDataDecision::Drop => println!("Too late, dropped"),
}
// Get statistics
let stats = handler.get_stats();
println!("Late: {}, Dropped: {}", stats.late_data_count, stats.dropped_count);

Stream Joins

Stream-Table Join

Enrich streaming data with static dimension tables:

let join = StreamTableJoin::new(
"users".to_string(), // table name
"user_id".to_string(), // join key
);
// Load dimension table
let table_data = vec![/* rows */];
join.load_table(table_data)?;
// Perform join
let enriched = join.join(stream).await?;

Stream-Stream Join

Join two streams within a time window:

let join = StreamStreamJoin::new(Duration::from_secs(60))
.with_keys("order_id".to_string(), "payment_order_id".to_string())
.with_join_type(JoinType::Inner);
let joined = join.join(orders_stream, payments_stream).await?;

Join types:

  • JoinType::Inner - Only matching records
  • JoinType::LeftOuter - All left records + matches
  • JoinType::RightOuter - All right records + matches
  • JoinType::FullOuter - All records from both sides

Complete Example: Real-Time Dashboard

use heliosdb_streaming::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize engine
let engine = StreamingEngine::new().await?;
// Create API request stream
let api_stream = Stream::new(StreamConfig {
name: "api_requests".to_string(),
time_semantics: TimeSemantics::EventTime,
allowed_lateness: Duration::from_secs(10),
watermark_interval: Duration::from_secs(1),
});
// Apply 1-minute tumbling window
let windowed = engine
.tumbling_window(api_stream.clone(), Duration::from_secs(60))
.await?;
// Aggregate by status code
let aggregated = WindowedAggregation::new(
WindowType::Tumbling { size: Duration::from_secs(60) },
vec![
AggregateFunction::Count,
AggregateFunction::Avg { field: "response_time".to_string() },
],
windowed,
)
.with_group_by(vec!["status".to_string()])
.execute()
.await?;
// Create continuous query with callback sink
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows {
if let Some(Value::String(status)) = row.get("status") {
let count = row.get("count").and_then(|v| v.as_i64()).unwrap_or(0);
let avg = row.get("avg_response_time").and_then(|v| v.as_f64()).unwrap_or(0.0);
println!("Status: {} | Count: {} | Avg: {:.2}ms", status, count, avg);
}
}
}));
let query = ContinuousQuery::new(
"dashboard".to_string(),
"SELECT status, COUNT(*), AVG(response_time) FROM api_requests GROUP BY status".to_string(),
sink,
aggregated,
);
let handle = engine.create_continuous_query(query).await?;
println!("Dashboard running: {}", handle.name());
// Send events
for i in 0..100 {
let mut fields = HashMap::new();
fields.insert("status".to_string(), Value::String("200".to_string()));
fields.insert("response_time".to_string(), Value::Float(50.0 + (i as f64)));
api_stream.send_row(Row::new(fields)).await?;
if i % 10 == 0 {
api_stream.send(StreamEvent::Watermark(chrono::Utc::now())).await?;
}
}
// Shutdown
engine.shutdown().await?;
Ok(())
}

Performance Considerations

  1. Buffer Sizing: Configure appropriate buffer sizes for continuous queries based on throughput
  2. Watermark Intervals: Balance between latency and completeness
  3. Allowed Lateness: Set based on expected out-of-order arrival patterns
  4. Window Sizes: Larger windows consume more memory but provide broader context

Architecture

Stream -> Window -> Aggregation -> Continuous Query -> Sink
| | | | |
| | | | v
v v v v Table/Callback
Events Tumbling COUNT/SUM/AVG Buffer Memory/Storage
Sliding MIN/MAX/... Manager
Session GROUP BY

Testing

Run all tests:

Terminal window
cargo test

Run integration tests:

Terminal window
cargo test --test integration_test

Run example:

Terminal window
cargo run --example streaming_dashboard

Benchmarks

Terminal window
cargo bench

License

MIT OR Apache-2.0

Contributing

Contributions are welcome! Please see the main HeliosDB repository for guidelines.