Performance Tuning Guide
Performance Tuning Guide
Overview
This guide covers performance optimization techniques for HeliosDB Streaming applications. Topics include throughput optimization, latency reduction, memory management, and monitoring setup.
Performance Dimensions
Key Metrics
| Metric | Definition | Target |
|---|---|---|
| Throughput | Events processed per second | >100K events/sec |
| Latency (P50) | Median end-to-end latency | <50ms |
| Latency (P99) | 99th percentile latency | <200ms |
| Checkpoint Duration | Time to complete checkpoint | <5 minutes |
| State Size | Total operator state memory | < Available RAM |
| Backpressure | Is pipeline saturated? | No backpressure |
Throughput Optimization
1. Parallelism Configuration
Set appropriate parallelism based on CPU cores and data volume.
Default Configuration:
use heliosdb_streaming::config::StreamingConfig;
let config = StreamingConfig { // Auto-detect available cores parallelism: num_cpus::get(), ..Default::default()};Custom Parallelism:
// High-throughput configurationlet config = StreamingConfig { parallelism: 32, // 32 parallel tasks ..Default::default()};
// Apply to specific operatorlet result = source .map(|event| process(event)) .set_parallelism(16) // Override for this operator .sink(output);Guidelines:
- Start with parallelism = CPU cores
- Increase for I/O-bound operations (2-3x cores)
- Decrease for memory-intensive operations
- Monitor CPU utilization (target 70-80%)
2. Batching
Batch events to amortize per-event overhead.
Batch Configuration:
let source = kafka_source("topic") .with_batch_size(1000) // Process 1000 events per batch .with_batch_timeout(Duration::from_millis(100)); // Or timeout after 100msBatch Processing:
let result = source .batch(1000) // Group into batches of 1000 .map(|batch: Vec<Event>| { // Process entire batch at once batch.iter().map(|e| transform(e)).collect() }) .unbatch(); // Flatten back to individual eventsTrade-offs:
- Higher throughput (10-50x improvement)
- Lower CPU overhead
- ❌ Higher latency (wait for full batch)
- ❌ More memory usage
3. Buffer Sizing
Configure internal buffers to prevent bottlenecks.
let config = StreamingConfig { // Events buffered per partition buffer_size: 10_000,
// Network buffer size network_buffer_size: 64 * 1024 * 1024, // 64 MB
..Default::default()};Guidelines:
- Small buffers (1K): Low latency, risk of backpressure
- Medium buffers (10K): Balanced
- Large buffers (100K): High throughput, more memory
4. Serialization Optimization
Use efficient serialization formats.
Format Comparison:
| Format | Ser Speed | Deser Speed | Size | Use Case |
|---|---|---|---|---|
| JSON | 50 MB/s | 40 MB/s | 100% | Human-readable |
| Bincode | 500 MB/s | 800 MB/s | 30% | Internal |
| Protobuf | 200 MB/s | 300 MB/s | 40% | Cross-lang |
| MessagePack | 150 MB/s | 180 MB/s | 50% | Compact |
Example:
// ❌ SLOW: JSON serializationlet source = kafka_source("topic") .deserialize_json::<Event>();
// FAST: Bincode serialization (5-10x faster)let source = kafka_source("topic") .deserialize_bincode::<Event>();5. Operator Fusion
Fuse operators to reduce overhead.
Without Fusion (3 separate operators):
let result = source .map(|e| e.value * 2) // Operator 1 .filter(|e| e.value > 10) // Operator 2 .map(|e| Event::new(e)) // Operator 3 .sink(output);// 3 serialization/deserialization steps, 3 buffersWith Fusion (1 combined operator):
let result = source .process(|e| { let doubled = e.value * 2; if doubled > 10 { Some(Event::new(doubled)) } else { None } }) .sink(output);// 1 serialization/deserialization step, 1 bufferImprovement: 50-70% throughput increase for simple pipelines.
Latency Optimization
1. Reduce Checkpoint Frequency
Trade recovery time for lower latency.
// ❌ HIGH LATENCY: Frequent checkpointslet config = CheckpointConfig { interval: Duration::from_secs(10), // Every 10 seconds ..Default::default()};// Adds ~5-10ms to P99 latency
// LOW LATENCY: Infrequent checkpointslet config = CheckpointConfig { interval: Duration::from_secs(300), // Every 5 minutes ..Default::default()};// Adds ~1-2ms to P99 latency2. Minimize State
Stateless operators have lower latency.
// ❌ STATEFUL: Requires state access (adds latency)let result = source .key_by(|e| e.user_id) .aggregate(|state, event| { state.count += 1; // State access state.sum += event.value; state });
// STATELESS: No state access (lower latency)let result = source .map(|event| event.value * 2) // Pure function .filter(|value| *value > 10);3. Optimize Serialization
Use zero-copy serialization where possible.
// ❌ COPYING: Serializes to intermediate bufferlet result = source .map(|event| serde_json::to_string(&event).unwrap());
// ZERO-COPY: Direct reference passinglet result = source .map(|event: &Event| event.clone()); // Shallow clone4. Window Size Reduction
Smaller windows trigger more frequently (lower latency).
// ❌ HIGH LATENCY: 1 hour windowlet result = source .window(TumblingWindow::of(Duration::from_secs(3600))) .aggregate(...);// Results every 1 hour
// LOW LATENCY: 1 minute windowlet result = source .window(TumblingWindow::of(Duration::from_secs(60))) .aggregate(...);// Results every 1 minuteMemory Management
1. State TTL
Set time-to-live for state to prevent unbounded growth.
let config = StateConfig { ttl: Duration::from_secs(3600), // 1 hour TTL cleanup_interval: Duration::from_secs(300), // Clean every 5 min ..Default::default()};
let result = source .key_by(|e| e.user_id) .with_state_ttl(Duration::from_secs(3600)) .aggregate(|state, event| { ... });2. Window Sizing
Balance window size with memory usage.
Memory Calculation:
Memory = Events_per_sec × Window_duration_sec × Event_size × ParallelismExample:
10,000 events/sec × 600 sec window × 1 KB/event × 16 parallelism= 10,000 × 600 × 1024 × 16 bytes= 96 GB memory requiredOptimization:
// ❌ LARGE WINDOW: High memorylet result = source .window(TumblingWindow::of(Duration::from_secs(600))) // 10 min .aggregate(...);// 96 GB memory
// SMALL WINDOW: Low memorylet result = source .window(TumblingWindow::of(Duration::from_secs(60))) // 1 min .aggregate(...);// 9.6 GB memory3. State Backend Configuration
Choose appropriate state backend.
// In-memory (fastest, limited size)let state = StateBackend::Memory { max_size_bytes: 10 * 1024 * 1024 * 1024, // 10 GB};
// RocksDB (larger state, slower)let state = StateBackend::RocksDB { path: "/mnt/ssd/rocksdb".into(), block_cache_size: 1024 * 1024 * 1024, // 1 GB write_buffer_size: 256 * 1024 * 1024, // 256 MB};
let config = StreamingConfig { state_backend: state, ..Default::default()};4. Garbage Collection Tuning
For JVM-based operators (rare in Rust, but if using JNI):
# G1GC for large heapsJAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=50 -Xmx32g"Monitoring & Observability
1. Prometheus Metrics
Export key metrics for monitoring.
use prometheus::{Counter, Histogram, Gauge, register_counter, register_histogram, register_gauge};
// Throughputlet events_processed = register_counter!( "streaming_events_processed_total", "Total events processed").unwrap();
// Latencylet latency_histogram = register_histogram!( "streaming_latency_ms", "End-to-end event latency in milliseconds").unwrap();
// State sizelet state_size_gauge = register_gauge!( "streaming_state_size_bytes", "Current state size in bytes").unwrap();
// Backpressurelet backpressure_gauge = register_gauge!( "streaming_backpressure", "Is the pipeline experiencing backpressure (0 or 1)").unwrap();
// In your pipelinelet result = source .inspect(|event| { events_processed.inc(); let latency = (Utc::now() - event.timestamp).num_milliseconds(); latency_histogram.observe(latency as f64); }) .process(|event| { ... });2. Grafana Dashboard
Dashboard JSON Template:
{ "dashboard": { "title": "HeliosDB Streaming Performance", "panels": [ { "title": "Throughput (events/sec)", "targets": [ { "expr": "rate(streaming_events_processed_total[1m])" } ], "type": "graph" }, { "title": "Latency (P50, P95, P99)", "targets": [ { "expr": "histogram_quantile(0.50, streaming_latency_ms)", "legendFormat": "P50" }, { "expr": "histogram_quantile(0.95, streaming_latency_ms)", "legendFormat": "P95" }, { "expr": "histogram_quantile(0.99, streaming_latency_ms)", "legendFormat": "P99" } ], "type": "graph" }, { "title": "State Size (MB)", "targets": [ { "expr": "streaming_state_size_bytes / 1024 / 1024" } ], "type": "graph" }, { "title": "Checkpoint Duration (seconds)", "targets": [ { "expr": "streaming_checkpoint_duration_seconds" } ], "type": "graph" }, { "title": "Backpressure Status", "targets": [ { "expr": "streaming_backpressure" } ], "type": "stat" } ] }}3. Logging Best Practices
use tracing::{info, warn, error, debug, instrument};
#[instrument(skip(event), fields(event_id = %event.id))]async fn process_event(event: Event) -> Result<ProcessedEvent> { debug!("Starting event processing");
let start = Instant::now(); let result = expensive_operation(&event).await?; let duration = start.elapsed();
if duration > Duration::from_millis(100) { warn!("Slow processing: {:?}", duration); }
info!("Event processed successfully"); Ok(result)}4. Alert Rules
Prometheus AlertManager Rules:
groups: - name: streaming_alerts rules: - alert: HighLatency expr: histogram_quantile(0.99, streaming_latency_ms) > 500 for: 5m labels: severity: warning annotations: summary: "High P99 latency detected" description: "P99 latency is {{ $value }}ms (threshold: 500ms)"
- alert: Backpressure expr: streaming_backpressure > 0 for: 2m labels: severity: critical annotations: summary: "Pipeline experiencing backpressure"
- alert: CheckpointFailure expr: increase(streaming_checkpoint_failures_total[5m]) > 2 labels: severity: critical annotations: summary: "Multiple checkpoint failures"Troubleshooting
High Latency
Symptom: P99 latency > 500ms
Diagnosis:
- Check backpressure: Is pipeline saturated?
- Measure per-operator latency
- Review checkpoint duration
Solutions:
- Increase parallelism
- Optimize expensive operators (profiling)
- Reduce checkpoint frequency
- Add more resources
// Enable per-operator latency trackinglet result = source .map(|e| { let _timer = metrics::histogram!("operator_map_latency_ms").start_timer(); expensive_transform(e) }) .filter(|e| { let _timer = metrics::histogram!("operator_filter_latency_ms").start_timer(); expensive_predicate(e) });Low Throughput
Symptom: Processing < 10K events/sec on powerful hardware
Diagnosis:
- Check CPU utilization (should be 70-80%)
- Review parallelism settings
- Measure serialization overhead
- Check for stragglers (slow tasks)
Solutions:
- Increase parallelism
- Use faster serialization (bincode)
- Enable operator fusion
- Increase batch size
High Memory Usage
Symptom: OOM errors or excessive GC
Diagnosis:
- Measure state size per operator
- Check window sizes
- Review state TTL configuration
Solutions:
- Enable state TTL
- Reduce window size
- Increase parallelism (distribute state)
- Use RocksDB state backend (off-heap)
Backpressure
Symptom: Pipeline slowing down, buffers full
Diagnosis:
- Identify bottleneck operator (slowest throughput)
- Check resource utilization (CPU, memory, I/O)
Solutions:
- Increase parallelism of bottleneck operator
- Optimize bottleneck logic
- Scale out cluster
- Reduce source ingestion rate
Performance Benchmarking
Benchmark Suite
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn benchmark_throughput(c: &mut Criterion) { c.bench_function("streaming_throughput", |b| { let config = create_test_config(); let (source, sink) = create_test_pipeline(config);
b.iter(|| { // Process 10K events black_box(source.emit_events(0..10_000)); black_box(source.wait_for_completion()); }); });}
fn benchmark_latency(c: &mut Criterion) { c.bench_function("streaming_latency", |b| { let config = create_test_config(); let (source, sink) = create_test_pipeline(config);
b.iter(|| { let start = Instant::now(); black_box(source.emit_single_event(Event::new())); black_box(sink.receive_single_event()); start.elapsed() }); });}
criterion_group!(benches, benchmark_throughput, benchmark_latency);criterion_main!(benches);Load Testing
#[tokio::test]async fn load_test_sustained_high_throughput() { let config = create_production_config(); let (source, sink) = create_test_pipeline(config);
// Sustain 100K events/sec for 10 minutes let duration = Duration::from_secs(600); let rate = 100_000; // events/sec let start = Instant::now();
while start.elapsed() < duration { source.emit_batch(rate / 10).await.unwrap(); // 100ms batches tokio::time::sleep(Duration::from_millis(100)).await; }
// Verify metrics let total_processed = sink.count_events().await.unwrap(); let actual_rate = total_processed as f64 / duration.as_secs_f64();
assert!(actual_rate > 95_000.0, "Throughput too low: {}", actual_rate); assert!(sink.p99_latency().await.unwrap() < Duration::from_millis(200));}Best Practices Summary
- Start with defaults, measure, then optimize
- Monitor everything: throughput, latency, state size
- Test at production scale before deploying
- Use batching for throughput-sensitive workloads
- Reduce state for latency-sensitive workloads
- Set state TTL to prevent unbounded growth
- Enable compression for checkpoint storage
- Use efficient serialization (bincode, protobuf)
- Profile before optimizing (don’t guess!)
- Scale horizontally before vertical optimization
References
- Flink Performance Tuning
- Prometheus Best Practices
- HeliosDB Streaming API:
cargo doc --package heliosdb-streaming --open
Last Updated: November 2025 Version: 1.0