Skip to content

Windowed Joins Guide

Windowed Joins Guide

Overview

Windowed joins enable correlation of events from multiple streams within time-based windows. HeliosDB Streaming supports all Apache Flink window types with optimized state management and encryption.

Join Types

1. Tumbling Window Join

Non-overlapping, fixed-size time windows.

Use Case: Hourly order-shipment correlation

SQL Example:

SELECT
orders.order_id,
orders.customer_id,
shipments.tracking_number,
shipments.carrier
FROM orders
INNER JOIN shipments
ON orders.order_id = shipments.order_id
WHERE
orders.event_time BETWEEN
TUMBLE_START(orders.event_time, INTERVAL '1' HOUR) AND
TUMBLE_END(orders.event_time, INTERVAL '1' HOUR)

Rust API Example:

use heliosdb_streaming::window::{TumblingWindow, JoinBuilder};
use std::time::Duration;
let joined = orders_stream
.join(shipments_stream)
.on(|order, shipment| order.order_id == shipment.order_id)
.window(TumblingWindow::of(Duration::from_secs(3600))) // 1 hour
.select(|order, shipment| OrderShipment {
order_id: order.order_id,
customer_id: order.customer_id,
tracking_number: shipment.tracking_number,
processing_time: shipment.timestamp - order.timestamp,
})
.build()?;

Characteristics:

  • Simple, predictable windows
  • Low memory usage (one window at a time)
  • Aligned boundaries (0:00, 1:00, 2:00, etc.)
  • ❌ May miss events at boundary

2. Sliding Window Join

Overlapping windows with configurable slide interval.

Use Case: Real-time metrics with 10-minute windows, 1-minute updates

SQL Example:

SELECT
page_views.page_url,
COUNT(*) AS view_count,
AVG(purchases.amount) AS avg_purchase
FROM page_views
LEFT JOIN purchases
ON page_views.user_id = purchases.user_id
WHERE
page_views.event_time BETWEEN
HOP_START(page_views.event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE) AND
HOP_END(page_views.event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
GROUP BY page_views.page_url

Rust API Example:

use heliosdb_streaming::window::SlidingWindow;
let joined = page_views_stream
.join(purchases_stream)
.on(|view, purchase| view.user_id == purchase.user_id)
.window(SlidingWindow::of(
Duration::from_secs(600), // 10 min window size
Duration::from_secs(60) // 1 min slide interval
))
.aggregate(|view, purchases| PageMetrics {
page_url: view.page_url,
view_count: purchases.len() + 1,
avg_purchase: purchases.iter().map(|p| p.amount).sum::<f64>()
/ purchases.len() as f64,
})
.build()?;

Characteristics:

  • Smooth, continuous updates
  • Events can appear in multiple windows
  • ❌ Higher memory usage (overlapping state)
  • ❌ More CPU for window management

3. Session Window Join

Dynamic windows based on inactivity gaps.

Use Case: User session attribution (clicks → purchases)

SQL Example:

SELECT
clicks.user_id,
STRING_AGG(clicks.page_url, ', ') AS click_path,
purchases.product_id,
purchases.amount
FROM clicks
INNER JOIN purchases
ON clicks.user_id = purchases.user_id
GROUP BY SESSION(clicks.event_time, INTERVAL '30' MINUTE)

Rust API Example:

use heliosdb_streaming::window::SessionWindow;
let conversions = clicks_stream
.join(purchases_stream)
.on(|click, purchase| click.user_id == purchase.user_id)
.window(SessionWindow::with_gap(Duration::from_secs(1800))) // 30 min inactivity
.select(|clicks, purchase| Conversion {
user_id: clicks[0].user_id,
session_start: clicks.first().unwrap().timestamp,
session_end: purchase.timestamp,
click_path: clicks.iter().map(|c| c.page_url.clone()).collect(),
product: purchase.product_id,
revenue: purchase.amount,
clicks_to_conversion: clicks.len(),
})
.build()?;

Characteristics:

  • Natural session boundaries
  • Handles variable-length sessions
  • Gap-based window closing
  • ❌ Unpredictable window sizes
  • ❌ Late events can reopen sessions

4. Interval Join

Time-based correlation with configurable before/after intervals.

Use Case: Click-to-conversion attribution within ±5 minutes

SQL Example:

SELECT
clicks.click_id,
clicks.ad_campaign,
conversions.conversion_id,
conversions.revenue,
(conversions.event_time - clicks.event_time) AS time_to_convert
FROM clicks
INNER JOIN conversions
ON clicks.user_id = conversions.user_id
WHERE
conversions.event_time BETWEEN
clicks.event_time - INTERVAL '5' MINUTE AND
clicks.event_time + INTERVAL '5' MINUTE

Rust API Example:

use heliosdb_streaming::window::IntervalJoin;
let attributed = clicks_stream
.interval_join(conversions_stream)
.on(|click, conversion| click.user_id == conversion.user_id)
.within(
Duration::from_secs(-300), // -5 minutes (before)
Duration::from_secs(300) // +5 minutes (after)
)
.select(|click, conversion| Attribution {
click_id: click.click_id,
campaign: click.ad_campaign,
conversion_id: conversion.conversion_id,
revenue: conversion.revenue,
time_to_convert: conversion.timestamp - click.timestamp,
})
.build()?;

Characteristics:

  • Precise time-based correlation
  • Symmetric or asymmetric intervals
  • Low latency (no window wait)
  • ❌ Requires buffering both streams

Join Semantics

Inner Join

Only matching records from both streams.

let inner = stream_a
.join(stream_b)
.on(|a, b| a.key == b.key)
.window(TumblingWindow::of(Duration::from_secs(60)))
.build()?; // Only records with matches in both streams

Left Outer Join

All records from left stream, with nulls for non-matching right records.

let left_outer = stream_a
.left_join(stream_b)
.on(|a, b| a.key == b.key)
.window(TumblingWindow::of(Duration::from_secs(60)))
.select(|a, b_opt| Match {
left: a,
right: b_opt, // Option<B>, None if no match
})
.build()?;

Full Outer Join

All records from both streams, with nulls for non-matches.

let full_outer = stream_a
.full_join(stream_b)
.on(|a, b| a.key == b.key)
.window(TumblingWindow::of(Duration::from_secs(60)))
.select(|a_opt, b_opt| FullMatch {
left: a_opt, // Option<A>
right: b_opt, // Option<B>
})
.build()?;

Optimization Techniques

1. Join Reordering

Join smaller streams first to minimize state size.

// ❌ BAD: Large stream first
let result = huge_stream
.join(small_stream)
.join(medium_stream)
.build()?;
// GOOD: Smaller streams first
let result = small_stream
.join(medium_stream)
.join(huge_stream)
.build()?;

2. State TTL (Time-To-Live)

Automatically remove expired join state to prevent memory bloat.

let config = JoinConfig {
state_ttl: Duration::from_secs(3600), // 1 hour
cleanup_interval: Duration::from_secs(300), // 5 min
..Default::default()
};
let joined = stream_a
.join(stream_b)
.with_config(config)
.on(|a, b| a.key == b.key)
.build()?;

3. Key Distribution

Ensure even key distribution to avoid hotspots.

// ❌ BAD: Skewed key distribution
// Most events have same user_id = "admin"
let joined = stream_a.join(stream_b).on(|a, b| a.user_id == b.user_id);
// GOOD: Add salt for skewed keys
let joined = stream_a
.map(|a| {
let salt = if a.user_id == "admin" {
rand::random::<u8>() % 10
} else {
0
};
(a, format!("{}_{}", a.user_id, salt))
})
.join(stream_b.map(|b| {
let salt = if b.user_id == "admin" {
rand::random::<u8>() % 10
} else {
0
};
(b, format!("{}_{}", b.user_id, salt))
}))
.on(|(a, key_a), (b, key_b)| key_a == key_b)
.select(|(a, _), (b, _)| join_result(a, b))
.build()?;

4. Watermark Configuration

Proper watermark handling prevents late data issues.

let stream = source
.assign_timestamps_and_watermarks(
WatermarkStrategy::for_bounded_out_of_orderness(
Duration::from_secs(10) // 10s max out-of-order
)
.with_timestamp_assigner(|event: &Event, _| event.timestamp)
);

Performance Metrics

Key Metrics to Monitor

use heliosdb_streaming::metrics::JoinMetrics;
// Metric collection
metrics::gauge!("join.state_size_bytes", state_size as f64);
metrics::histogram!("join.watermark_lag_ms", watermark_lag.as_millis() as f64);
metrics::counter!("join.late_events_dropped", 1);
metrics::histogram!("join.throughput_events_per_sec", throughput as f64);

Critical Metrics:

  • State Size: Memory footprint of join state
  • Watermark Lag: Time between event time and watermark
  • Late Events: Events arriving after watermark
  • Join Throughput: Events processed per second

Real-World Example: E-commerce Analytics

use heliosdb_streaming::prelude::*;
// Streams
let clicks = kafka_source("clicks_topic").deserialize_json::<Click>();
let cart_adds = kafka_source("cart_topic").deserialize_json::<CartAdd>();
let purchases = kafka_source("purchases_topic").deserialize_json::<Purchase>();
// Multi-stream join for conversion funnel
let funnel = clicks
// Join clicks → cart adds (30 min session)
.join(cart_adds)
.on(|click, cart| click.user_id == cart.user_id)
.window(SessionWindow::with_gap(Duration::from_secs(1800)))
.select(|click, cart| ClickToCart {
user_id: click.user_id,
product_id: click.product_id,
click_time: click.timestamp,
cart_time: cart.timestamp,
time_to_cart: cart.timestamp - click.timestamp,
})
// Join with purchases (1 hour window)
.join(purchases)
.on(|ctc, purchase| {
ctc.user_id == purchase.user_id &&
ctc.product_id == purchase.product_id
})
.window(TumblingWindow::of(Duration::from_secs(3600)))
.select(|ctc, purchase| Conversion {
user_id: ctc.user_id,
product_id: ctc.product_id,
click_time: ctc.click_time,
cart_time: ctc.cart_time,
purchase_time: purchase.timestamp,
time_to_cart: ctc.time_to_cart,
time_to_purchase: purchase.timestamp - ctc.click_time,
revenue: purchase.amount,
})
.sink_to_analytics()
.await?;

Troubleshooting

High Memory Usage

Symptom: OOM errors during joins

Solutions:

  1. Enable state TTL
  2. Reduce window size
  3. Increase parallelism to distribute state
  4. Use smaller join keys
// Monitor state size
let state_bytes = join_operator.state_size();
if state_bytes > 1_000_000_000 { // 1 GB
log::warn!("Join state size exceeded 1GB: {}", state_bytes);
}

Late Events Dropped

Symptom: Missing join results

Solutions:

  1. Increase allowed lateness
  2. Adjust watermark strategy
  3. Monitor late event metrics
let config = JoinConfig {
allowed_lateness: Duration::from_secs(60), // Allow 1 min late
..Default::default()
};

Low Throughput

Symptom: Slow join processing

Solutions:

  1. Increase parallelism
  2. Optimize join predicates
  3. Use faster serialization (bincode vs JSON)
  4. Reduce window size
let joined = stream_a
.join(stream_b)
.parallelism(16) // Increase from default
.on(|a, b| a.key == b.key) // Simple predicate
.window(TumblingWindow::of(Duration::from_secs(60))) // Smaller window
.build()?;

Best Practices

  1. Always Monitor Watermarks: Watermark lag directly impacts latency
  2. Set Appropriate TTL: Prevent unbounded state growth
  3. Test with Realistic Data: Validate with production-like volumes
  4. Use Session Windows Carefully: Can accumulate large state
  5. Optimize Join Keys: Simple, evenly distributed keys perform best
  6. Partition Smartly: High-cardinality partition keys enable parallelism

References


Last Updated: November 2025 Version: 1.0