Skip to content

Performance Tuning Guide

Performance Tuning Guide

Overview

This guide covers performance optimization techniques for HeliosDB Streaming applications. Topics include throughput optimization, latency reduction, memory management, and monitoring setup.

Performance Dimensions

Key Metrics

MetricDefinitionTarget
ThroughputEvents processed per second>100K events/sec
Latency (P50)Median end-to-end latency<50ms
Latency (P99)99th percentile latency<200ms
Checkpoint DurationTime to complete checkpoint<5 minutes
State SizeTotal operator state memory< Available RAM
BackpressureIs pipeline saturated?No backpressure

Throughput Optimization

1. Parallelism Configuration

Set appropriate parallelism based on CPU cores and data volume.

Default Configuration:

use heliosdb_streaming::config::StreamingConfig;
let config = StreamingConfig {
// Auto-detect available cores
parallelism: num_cpus::get(),
..Default::default()
};

Custom Parallelism:

// High-throughput configuration
let config = StreamingConfig {
parallelism: 32, // 32 parallel tasks
..Default::default()
};
// Apply to specific operator
let result = source
.map(|event| process(event))
.set_parallelism(16) // Override for this operator
.sink(output);

Guidelines:

  • Start with parallelism = CPU cores
  • Increase for I/O-bound operations (2-3x cores)
  • Decrease for memory-intensive operations
  • Monitor CPU utilization (target 70-80%)

2. Batching

Batch events to amortize per-event overhead.

Batch Configuration:

let source = kafka_source("topic")
.with_batch_size(1000) // Process 1000 events per batch
.with_batch_timeout(Duration::from_millis(100)); // Or timeout after 100ms

Batch Processing:

let result = source
.batch(1000) // Group into batches of 1000
.map(|batch: Vec<Event>| {
// Process entire batch at once
batch.iter().map(|e| transform(e)).collect()
})
.unbatch(); // Flatten back to individual events

Trade-offs:

  • Higher throughput (10-50x improvement)
  • Lower CPU overhead
  • ❌ Higher latency (wait for full batch)
  • ❌ More memory usage

3. Buffer Sizing

Configure internal buffers to prevent bottlenecks.

let config = StreamingConfig {
// Events buffered per partition
buffer_size: 10_000,
// Network buffer size
network_buffer_size: 64 * 1024 * 1024, // 64 MB
..Default::default()
};

Guidelines:

  • Small buffers (1K): Low latency, risk of backpressure
  • Medium buffers (10K): Balanced
  • Large buffers (100K): High throughput, more memory

4. Serialization Optimization

Use efficient serialization formats.

Format Comparison:

FormatSer SpeedDeser SpeedSizeUse Case
JSON50 MB/s40 MB/s100%Human-readable
Bincode500 MB/s800 MB/s30%Internal
Protobuf200 MB/s300 MB/s40%Cross-lang
MessagePack150 MB/s180 MB/s50%Compact

Example:

// ❌ SLOW: JSON serialization
let source = kafka_source("topic")
.deserialize_json::<Event>();
// FAST: Bincode serialization (5-10x faster)
let source = kafka_source("topic")
.deserialize_bincode::<Event>();

5. Operator Fusion

Fuse operators to reduce overhead.

Without Fusion (3 separate operators):

let result = source
.map(|e| e.value * 2) // Operator 1
.filter(|e| e.value > 10) // Operator 2
.map(|e| Event::new(e)) // Operator 3
.sink(output);
// 3 serialization/deserialization steps, 3 buffers

With Fusion (1 combined operator):

let result = source
.process(|e| {
let doubled = e.value * 2;
if doubled > 10 {
Some(Event::new(doubled))
} else {
None
}
})
.sink(output);
// 1 serialization/deserialization step, 1 buffer

Improvement: 50-70% throughput increase for simple pipelines.

Latency Optimization

1. Reduce Checkpoint Frequency

Trade recovery time for lower latency.

// ❌ HIGH LATENCY: Frequent checkpoints
let config = CheckpointConfig {
interval: Duration::from_secs(10), // Every 10 seconds
..Default::default()
};
// Adds ~5-10ms to P99 latency
// LOW LATENCY: Infrequent checkpoints
let config = CheckpointConfig {
interval: Duration::from_secs(300), // Every 5 minutes
..Default::default()
};
// Adds ~1-2ms to P99 latency

2. Minimize State

Stateless operators have lower latency.

// ❌ STATEFUL: Requires state access (adds latency)
let result = source
.key_by(|e| e.user_id)
.aggregate(|state, event| {
state.count += 1; // State access
state.sum += event.value;
state
});
// STATELESS: No state access (lower latency)
let result = source
.map(|event| event.value * 2) // Pure function
.filter(|value| *value > 10);

3. Optimize Serialization

Use zero-copy serialization where possible.

// ❌ COPYING: Serializes to intermediate buffer
let result = source
.map(|event| serde_json::to_string(&event).unwrap());
// ZERO-COPY: Direct reference passing
let result = source
.map(|event: &Event| event.clone()); // Shallow clone

4. Window Size Reduction

Smaller windows trigger more frequently (lower latency).

// ❌ HIGH LATENCY: 1 hour window
let result = source
.window(TumblingWindow::of(Duration::from_secs(3600)))
.aggregate(...);
// Results every 1 hour
// LOW LATENCY: 1 minute window
let result = source
.window(TumblingWindow::of(Duration::from_secs(60)))
.aggregate(...);
// Results every 1 minute

Memory Management

1. State TTL

Set time-to-live for state to prevent unbounded growth.

let config = StateConfig {
ttl: Duration::from_secs(3600), // 1 hour TTL
cleanup_interval: Duration::from_secs(300), // Clean every 5 min
..Default::default()
};
let result = source
.key_by(|e| e.user_id)
.with_state_ttl(Duration::from_secs(3600))
.aggregate(|state, event| { ... });

2. Window Sizing

Balance window size with memory usage.

Memory Calculation:

Memory = Events_per_sec × Window_duration_sec × Event_size × Parallelism

Example:

10,000 events/sec × 600 sec window × 1 KB/event × 16 parallelism
= 10,000 × 600 × 1024 × 16 bytes
= 96 GB memory required

Optimization:

// ❌ LARGE WINDOW: High memory
let result = source
.window(TumblingWindow::of(Duration::from_secs(600))) // 10 min
.aggregate(...);
// 96 GB memory
// SMALL WINDOW: Low memory
let result = source
.window(TumblingWindow::of(Duration::from_secs(60))) // 1 min
.aggregate(...);
// 9.6 GB memory

3. State Backend Configuration

Choose appropriate state backend.

// In-memory (fastest, limited size)
let state = StateBackend::Memory {
max_size_bytes: 10 * 1024 * 1024 * 1024, // 10 GB
};
// RocksDB (larger state, slower)
let state = StateBackend::RocksDB {
path: "/mnt/ssd/rocksdb".into(),
block_cache_size: 1024 * 1024 * 1024, // 1 GB
write_buffer_size: 256 * 1024 * 1024, // 256 MB
};
let config = StreamingConfig {
state_backend: state,
..Default::default()
};

4. Garbage Collection Tuning

For JVM-based operators (rare in Rust, but if using JNI):

Terminal window
# G1GC for large heaps
JAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=50 -Xmx32g"

Monitoring & Observability

1. Prometheus Metrics

Export key metrics for monitoring.

use prometheus::{Counter, Histogram, Gauge, register_counter, register_histogram, register_gauge};
// Throughput
let events_processed = register_counter!(
"streaming_events_processed_total",
"Total events processed"
).unwrap();
// Latency
let latency_histogram = register_histogram!(
"streaming_latency_ms",
"End-to-end event latency in milliseconds"
).unwrap();
// State size
let state_size_gauge = register_gauge!(
"streaming_state_size_bytes",
"Current state size in bytes"
).unwrap();
// Backpressure
let backpressure_gauge = register_gauge!(
"streaming_backpressure",
"Is the pipeline experiencing backpressure (0 or 1)"
).unwrap();
// In your pipeline
let result = source
.inspect(|event| {
events_processed.inc();
let latency = (Utc::now() - event.timestamp).num_milliseconds();
latency_histogram.observe(latency as f64);
})
.process(|event| { ... });

2. Grafana Dashboard

Dashboard JSON Template:

{
"dashboard": {
"title": "HeliosDB Streaming Performance",
"panels": [
{
"title": "Throughput (events/sec)",
"targets": [
{
"expr": "rate(streaming_events_processed_total[1m])"
}
],
"type": "graph"
},
{
"title": "Latency (P50, P95, P99)",
"targets": [
{
"expr": "histogram_quantile(0.50, streaming_latency_ms)",
"legendFormat": "P50"
},
{
"expr": "histogram_quantile(0.95, streaming_latency_ms)",
"legendFormat": "P95"
},
{
"expr": "histogram_quantile(0.99, streaming_latency_ms)",
"legendFormat": "P99"
}
],
"type": "graph"
},
{
"title": "State Size (MB)",
"targets": [
{
"expr": "streaming_state_size_bytes / 1024 / 1024"
}
],
"type": "graph"
},
{
"title": "Checkpoint Duration (seconds)",
"targets": [
{
"expr": "streaming_checkpoint_duration_seconds"
}
],
"type": "graph"
},
{
"title": "Backpressure Status",
"targets": [
{
"expr": "streaming_backpressure"
}
],
"type": "stat"
}
]
}
}

3. Logging Best Practices

use tracing::{info, warn, error, debug, instrument};
#[instrument(skip(event), fields(event_id = %event.id))]
async fn process_event(event: Event) -> Result<ProcessedEvent> {
debug!("Starting event processing");
let start = Instant::now();
let result = expensive_operation(&event).await?;
let duration = start.elapsed();
if duration > Duration::from_millis(100) {
warn!("Slow processing: {:?}", duration);
}
info!("Event processed successfully");
Ok(result)
}

4. Alert Rules

Prometheus AlertManager Rules:

groups:
- name: streaming_alerts
rules:
- alert: HighLatency
expr: histogram_quantile(0.99, streaming_latency_ms) > 500
for: 5m
labels:
severity: warning
annotations:
summary: "High P99 latency detected"
description: "P99 latency is {{ $value }}ms (threshold: 500ms)"
- alert: Backpressure
expr: streaming_backpressure > 0
for: 2m
labels:
severity: critical
annotations:
summary: "Pipeline experiencing backpressure"
- alert: CheckpointFailure
expr: increase(streaming_checkpoint_failures_total[5m]) > 2
labels:
severity: critical
annotations:
summary: "Multiple checkpoint failures"

Troubleshooting

High Latency

Symptom: P99 latency > 500ms

Diagnosis:

  1. Check backpressure: Is pipeline saturated?
  2. Measure per-operator latency
  3. Review checkpoint duration

Solutions:

  1. Increase parallelism
  2. Optimize expensive operators (profiling)
  3. Reduce checkpoint frequency
  4. Add more resources
// Enable per-operator latency tracking
let result = source
.map(|e| {
let _timer = metrics::histogram!("operator_map_latency_ms").start_timer();
expensive_transform(e)
})
.filter(|e| {
let _timer = metrics::histogram!("operator_filter_latency_ms").start_timer();
expensive_predicate(e)
});

Low Throughput

Symptom: Processing < 10K events/sec on powerful hardware

Diagnosis:

  1. Check CPU utilization (should be 70-80%)
  2. Review parallelism settings
  3. Measure serialization overhead
  4. Check for stragglers (slow tasks)

Solutions:

  1. Increase parallelism
  2. Use faster serialization (bincode)
  3. Enable operator fusion
  4. Increase batch size

High Memory Usage

Symptom: OOM errors or excessive GC

Diagnosis:

  1. Measure state size per operator
  2. Check window sizes
  3. Review state TTL configuration

Solutions:

  1. Enable state TTL
  2. Reduce window size
  3. Increase parallelism (distribute state)
  4. Use RocksDB state backend (off-heap)

Backpressure

Symptom: Pipeline slowing down, buffers full

Diagnosis:

  1. Identify bottleneck operator (slowest throughput)
  2. Check resource utilization (CPU, memory, I/O)

Solutions:

  1. Increase parallelism of bottleneck operator
  2. Optimize bottleneck logic
  3. Scale out cluster
  4. Reduce source ingestion rate

Performance Benchmarking

Benchmark Suite

use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn benchmark_throughput(c: &mut Criterion) {
c.bench_function("streaming_throughput", |b| {
let config = create_test_config();
let (source, sink) = create_test_pipeline(config);
b.iter(|| {
// Process 10K events
black_box(source.emit_events(0..10_000));
black_box(source.wait_for_completion());
});
});
}
fn benchmark_latency(c: &mut Criterion) {
c.bench_function("streaming_latency", |b| {
let config = create_test_config();
let (source, sink) = create_test_pipeline(config);
b.iter(|| {
let start = Instant::now();
black_box(source.emit_single_event(Event::new()));
black_box(sink.receive_single_event());
start.elapsed()
});
});
}
criterion_group!(benches, benchmark_throughput, benchmark_latency);
criterion_main!(benches);

Load Testing

#[tokio::test]
async fn load_test_sustained_high_throughput() {
let config = create_production_config();
let (source, sink) = create_test_pipeline(config);
// Sustain 100K events/sec for 10 minutes
let duration = Duration::from_secs(600);
let rate = 100_000; // events/sec
let start = Instant::now();
while start.elapsed() < duration {
source.emit_batch(rate / 10).await.unwrap(); // 100ms batches
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Verify metrics
let total_processed = sink.count_events().await.unwrap();
let actual_rate = total_processed as f64 / duration.as_secs_f64();
assert!(actual_rate > 95_000.0, "Throughput too low: {}", actual_rate);
assert!(sink.p99_latency().await.unwrap() < Duration::from_millis(200));
}

Best Practices Summary

  1. Start with defaults, measure, then optimize
  2. Monitor everything: throughput, latency, state size
  3. Test at production scale before deploying
  4. Use batching for throughput-sensitive workloads
  5. Reduce state for latency-sensitive workloads
  6. Set state TTL to prevent unbounded growth
  7. Enable compression for checkpoint storage
  8. Use efficient serialization (bincode, protobuf)
  9. Profile before optimizing (don’t guess!)
  10. Scale horizontally before vertical optimization

References


Last Updated: November 2025 Version: 1.0