Windowed Joins Guide
Windowed Joins Guide
Overview
Windowed joins enable correlation of events from multiple streams within time-based windows. HeliosDB Streaming supports all Apache Flink window types with optimized state management and encryption.
Join Types
1. Tumbling Window Join
Non-overlapping, fixed-size time windows.
Use Case: Hourly order-shipment correlation
SQL Example:
SELECT orders.order_id, orders.customer_id, shipments.tracking_number, shipments.carrierFROM ordersINNER JOIN shipments ON orders.order_id = shipments.order_idWHERE orders.event_time BETWEEN TUMBLE_START(orders.event_time, INTERVAL '1' HOUR) AND TUMBLE_END(orders.event_time, INTERVAL '1' HOUR)Rust API Example:
use heliosdb_streaming::window::{TumblingWindow, JoinBuilder};use std::time::Duration;
let joined = orders_stream .join(shipments_stream) .on(|order, shipment| order.order_id == shipment.order_id) .window(TumblingWindow::of(Duration::from_secs(3600))) // 1 hour .select(|order, shipment| OrderShipment { order_id: order.order_id, customer_id: order.customer_id, tracking_number: shipment.tracking_number, processing_time: shipment.timestamp - order.timestamp, }) .build()?;Characteristics:
- Simple, predictable windows
- Low memory usage (one window at a time)
- Aligned boundaries (0:00, 1:00, 2:00, etc.)
- ❌ May miss events at boundary
2. Sliding Window Join
Overlapping windows with configurable slide interval.
Use Case: Real-time metrics with 10-minute windows, 1-minute updates
SQL Example:
SELECT page_views.page_url, COUNT(*) AS view_count, AVG(purchases.amount) AS avg_purchaseFROM page_viewsLEFT JOIN purchases ON page_views.user_id = purchases.user_idWHERE page_views.event_time BETWEEN HOP_START(page_views.event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE) AND HOP_END(page_views.event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)GROUP BY page_views.page_urlRust API Example:
use heliosdb_streaming::window::SlidingWindow;
let joined = page_views_stream .join(purchases_stream) .on(|view, purchase| view.user_id == purchase.user_id) .window(SlidingWindow::of( Duration::from_secs(600), // 10 min window size Duration::from_secs(60) // 1 min slide interval )) .aggregate(|view, purchases| PageMetrics { page_url: view.page_url, view_count: purchases.len() + 1, avg_purchase: purchases.iter().map(|p| p.amount).sum::<f64>() / purchases.len() as f64, }) .build()?;Characteristics:
- Smooth, continuous updates
- Events can appear in multiple windows
- ❌ Higher memory usage (overlapping state)
- ❌ More CPU for window management
3. Session Window Join
Dynamic windows based on inactivity gaps.
Use Case: User session attribution (clicks → purchases)
SQL Example:
SELECT clicks.user_id, STRING_AGG(clicks.page_url, ', ') AS click_path, purchases.product_id, purchases.amountFROM clicksINNER JOIN purchases ON clicks.user_id = purchases.user_idGROUP BY SESSION(clicks.event_time, INTERVAL '30' MINUTE)Rust API Example:
use heliosdb_streaming::window::SessionWindow;
let conversions = clicks_stream .join(purchases_stream) .on(|click, purchase| click.user_id == purchase.user_id) .window(SessionWindow::with_gap(Duration::from_secs(1800))) // 30 min inactivity .select(|clicks, purchase| Conversion { user_id: clicks[0].user_id, session_start: clicks.first().unwrap().timestamp, session_end: purchase.timestamp, click_path: clicks.iter().map(|c| c.page_url.clone()).collect(), product: purchase.product_id, revenue: purchase.amount, clicks_to_conversion: clicks.len(), }) .build()?;Characteristics:
- Natural session boundaries
- Handles variable-length sessions
- Gap-based window closing
- ❌ Unpredictable window sizes
- ❌ Late events can reopen sessions
4. Interval Join
Time-based correlation with configurable before/after intervals.
Use Case: Click-to-conversion attribution within ±5 minutes
SQL Example:
SELECT clicks.click_id, clicks.ad_campaign, conversions.conversion_id, conversions.revenue, (conversions.event_time - clicks.event_time) AS time_to_convertFROM clicksINNER JOIN conversions ON clicks.user_id = conversions.user_idWHERE conversions.event_time BETWEEN clicks.event_time - INTERVAL '5' MINUTE AND clicks.event_time + INTERVAL '5' MINUTERust API Example:
use heliosdb_streaming::window::IntervalJoin;
let attributed = clicks_stream .interval_join(conversions_stream) .on(|click, conversion| click.user_id == conversion.user_id) .within( Duration::from_secs(-300), // -5 minutes (before) Duration::from_secs(300) // +5 minutes (after) ) .select(|click, conversion| Attribution { click_id: click.click_id, campaign: click.ad_campaign, conversion_id: conversion.conversion_id, revenue: conversion.revenue, time_to_convert: conversion.timestamp - click.timestamp, }) .build()?;Characteristics:
- Precise time-based correlation
- Symmetric or asymmetric intervals
- Low latency (no window wait)
- ❌ Requires buffering both streams
Join Semantics
Inner Join
Only matching records from both streams.
let inner = stream_a .join(stream_b) .on(|a, b| a.key == b.key) .window(TumblingWindow::of(Duration::from_secs(60))) .build()?; // Only records with matches in both streamsLeft Outer Join
All records from left stream, with nulls for non-matching right records.
let left_outer = stream_a .left_join(stream_b) .on(|a, b| a.key == b.key) .window(TumblingWindow::of(Duration::from_secs(60))) .select(|a, b_opt| Match { left: a, right: b_opt, // Option<B>, None if no match }) .build()?;Full Outer Join
All records from both streams, with nulls for non-matches.
let full_outer = stream_a .full_join(stream_b) .on(|a, b| a.key == b.key) .window(TumblingWindow::of(Duration::from_secs(60))) .select(|a_opt, b_opt| FullMatch { left: a_opt, // Option<A> right: b_opt, // Option<B> }) .build()?;Optimization Techniques
1. Join Reordering
Join smaller streams first to minimize state size.
// ❌ BAD: Large stream firstlet result = huge_stream .join(small_stream) .join(medium_stream) .build()?;
// GOOD: Smaller streams firstlet result = small_stream .join(medium_stream) .join(huge_stream) .build()?;2. State TTL (Time-To-Live)
Automatically remove expired join state to prevent memory bloat.
let config = JoinConfig { state_ttl: Duration::from_secs(3600), // 1 hour cleanup_interval: Duration::from_secs(300), // 5 min ..Default::default()};
let joined = stream_a .join(stream_b) .with_config(config) .on(|a, b| a.key == b.key) .build()?;3. Key Distribution
Ensure even key distribution to avoid hotspots.
// ❌ BAD: Skewed key distribution// Most events have same user_id = "admin"let joined = stream_a.join(stream_b).on(|a, b| a.user_id == b.user_id);
// GOOD: Add salt for skewed keyslet joined = stream_a .map(|a| { let salt = if a.user_id == "admin" { rand::random::<u8>() % 10 } else { 0 }; (a, format!("{}_{}", a.user_id, salt)) }) .join(stream_b.map(|b| { let salt = if b.user_id == "admin" { rand::random::<u8>() % 10 } else { 0 }; (b, format!("{}_{}", b.user_id, salt)) })) .on(|(a, key_a), (b, key_b)| key_a == key_b) .select(|(a, _), (b, _)| join_result(a, b)) .build()?;4. Watermark Configuration
Proper watermark handling prevents late data issues.
let stream = source .assign_timestamps_and_watermarks( WatermarkStrategy::for_bounded_out_of_orderness( Duration::from_secs(10) // 10s max out-of-order ) .with_timestamp_assigner(|event: &Event, _| event.timestamp) );Performance Metrics
Key Metrics to Monitor
use heliosdb_streaming::metrics::JoinMetrics;
// Metric collectionmetrics::gauge!("join.state_size_bytes", state_size as f64);metrics::histogram!("join.watermark_lag_ms", watermark_lag.as_millis() as f64);metrics::counter!("join.late_events_dropped", 1);metrics::histogram!("join.throughput_events_per_sec", throughput as f64);Critical Metrics:
- State Size: Memory footprint of join state
- Watermark Lag: Time between event time and watermark
- Late Events: Events arriving after watermark
- Join Throughput: Events processed per second
Real-World Example: E-commerce Analytics
use heliosdb_streaming::prelude::*;
// Streamslet clicks = kafka_source("clicks_topic").deserialize_json::<Click>();let cart_adds = kafka_source("cart_topic").deserialize_json::<CartAdd>();let purchases = kafka_source("purchases_topic").deserialize_json::<Purchase>();
// Multi-stream join for conversion funnellet funnel = clicks // Join clicks → cart adds (30 min session) .join(cart_adds) .on(|click, cart| click.user_id == cart.user_id) .window(SessionWindow::with_gap(Duration::from_secs(1800))) .select(|click, cart| ClickToCart { user_id: click.user_id, product_id: click.product_id, click_time: click.timestamp, cart_time: cart.timestamp, time_to_cart: cart.timestamp - click.timestamp, }) // Join with purchases (1 hour window) .join(purchases) .on(|ctc, purchase| { ctc.user_id == purchase.user_id && ctc.product_id == purchase.product_id }) .window(TumblingWindow::of(Duration::from_secs(3600))) .select(|ctc, purchase| Conversion { user_id: ctc.user_id, product_id: ctc.product_id, click_time: ctc.click_time, cart_time: ctc.cart_time, purchase_time: purchase.timestamp, time_to_cart: ctc.time_to_cart, time_to_purchase: purchase.timestamp - ctc.click_time, revenue: purchase.amount, }) .sink_to_analytics() .await?;Troubleshooting
High Memory Usage
Symptom: OOM errors during joins
Solutions:
- Enable state TTL
- Reduce window size
- Increase parallelism to distribute state
- Use smaller join keys
// Monitor state sizelet state_bytes = join_operator.state_size();if state_bytes > 1_000_000_000 { // 1 GB log::warn!("Join state size exceeded 1GB: {}", state_bytes);}Late Events Dropped
Symptom: Missing join results
Solutions:
- Increase allowed lateness
- Adjust watermark strategy
- Monitor late event metrics
let config = JoinConfig { allowed_lateness: Duration::from_secs(60), // Allow 1 min late ..Default::default()};Low Throughput
Symptom: Slow join processing
Solutions:
- Increase parallelism
- Optimize join predicates
- Use faster serialization (bincode vs JSON)
- Reduce window size
let joined = stream_a .join(stream_b) .parallelism(16) // Increase from default .on(|a, b| a.key == b.key) // Simple predicate .window(TumblingWindow::of(Duration::from_secs(60))) // Smaller window .build()?;Best Practices
- Always Monitor Watermarks: Watermark lag directly impacts latency
- Set Appropriate TTL: Prevent unbounded state growth
- Test with Realistic Data: Validate with production-like volumes
- Use Session Windows Carefully: Can accumulate large state
- Optimize Join Keys: Simple, evenly distributed keys perform best
- Partition Smartly: High-cardinality partition keys enable parallelism
References
- Flink Windowing Documentation
- Watermark Strategies
- HeliosDB Streaming API:
cargo doc --package heliosdb-streaming --open
Last Updated: November 2025 Version: 1.0