Skip to content

HeliosDB Streaming v7.0 - Production Deployment Guide

HeliosDB Streaming v7.0 - Production Deployment Guide

Overview

HeliosDB Streaming v7.0 brings enterprise-grade stream processing with exactly-once semantics, advanced backpressure management, and comprehensive production features. This guide covers deployment, configuration, and best practices for production environments.

Table of Contents

  1. Exactly-Once Semantics
  2. Backpressure Management
  3. Production Features
  4. Performance Tuning
  5. Monitoring & Observability
  6. Deployment Patterns
  7. Troubleshooting

Exactly-Once Semantics

Overview

HeliosDB Streaming v7.0 implements true exactly-once processing guarantees using:

  • Transaction Coordinator: Two-phase commit protocol
  • Idempotent Producer: Prevents duplicate writes
  • Offset Management: Transactional offset commits
  • Message Deduplication: UUID-based duplicate detection

Quick Start

use heliosdb_streaming::*;
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<()> {
// Create transaction coordinator
let coordinator = Arc::new(TransactionCoordinator::new(
Duration::from_secs(30), // Transaction timeout
1000, // Max completed transactions in log
));
// Create offset manager
let offset_manager = Arc::new(
OffsetManager::new(
"my_processor".to_string(),
"/data/checkpoints".into(),
).await?
);
// Register as participant
coordinator.register_participant(offset_manager.clone()).await;
// Create idempotent producer
let producer = IdempotentProducer::new();
let deduplicator = MessageDeduplicator::new(Duration::from_secs(300));
// Process messages
loop {
// Begin transaction
let txn_id = coordinator.begin_transaction().await?;
coordinator.add_participant(txn_id, "my_processor").await?;
// Get message
let (msg_id, message, offset) = receive_message().await?;
// Check for duplicates
if deduplicator.check_and_record(msg_id) {
continue; // Skip duplicate
}
// Process message
process_message(&message).await?;
// Stage offset
offset_manager.stage_offset(txn_id, "partition-0".to_string(), offset).await?;
// Record in producer
let seq = producer.next_sequence("partition-0");
producer.record_message(msg_id, "partition-0".to_string(), seq);
// Commit transaction (2PC)
coordinator.commit(txn_id).await?;
}
}

Transaction States

Active -> Preparing -> Prepared -> Committing -> Committed
| | |
+-> Aborting -------> Aborted |
| |
+-----------> TimedOut <------------+

Checkpoint-Based Recovery

// Create checkpoints periodically
if messages_processed % 1000 == 0 {
offset_manager.create_checkpoint(Some(txn_id)).await?;
}
// Recover from latest checkpoint on restart
if let Some(checkpoint) = offset_manager.recover_from_checkpoint().await? {
println!("Recovered from checkpoint {}", checkpoint.id);
println!("Offsets: {:?}", checkpoint.offsets);
}
// Cleanup old checkpoints
offset_manager.cleanup_old_checkpoints(10).await?; // Keep last 10

Best Practices

  1. Transaction Timeout: Set based on processing time (typically 30-60s)
  2. Checkpoint Frequency: Balance between recovery time and overhead (1000-10000 messages)
  3. Deduplication Window: Match to maximum message redelivery window
  4. Participant Registration: Register all participants before starting processing

Backpressure Management

Overview

Advanced backpressure mechanisms prevent system overload:

  • Queue Depth Monitoring: Real-time queue utilization tracking
  • Dynamic Batch Sizing: Automatically adjust batch sizes
  • Credit-Based Flow Control: Token-based rate limiting
  • Consumer Lag Monitoring: Track and respond to lag

Queue Depth Monitoring

use heliosdb_streaming::*;
let monitor = QueueDepthMonitor::new(
10_000, // Max queue depth
Duration::from_secs(60), // History retention
);
// Record depth
monitor.record_depth("partition-0", current_depth);
// Check if backpressure needed
if monitor.should_apply_backpressure("partition-0", 0.8) {
// Apply backpressure (slow down ingestion)
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Get metrics
let metrics = monitor.get_metrics("partition-0");
println!("Current depth: {}", metrics.current_depth);
println!("Average depth: {:.2}", metrics.average_depth);
println!("Trend: {:?}", metrics.depth_trend);

Dynamic Batch Sizing

let sizer = DynamicBatchSizer::new(
100, // Initial batch size
10, // Min batch size
1000, // Max batch size
Duration::from_millis(100), // Target batch processing time
);
loop {
let batch_size = sizer.get_batch_size("partition-0");
let messages = fetch_messages(batch_size).await?;
let start = Instant::now();
process_batch(&messages).await?;
let elapsed = start.elapsed();
// Record for adaptation
sizer.record_processing_time("partition-0", messages.len(), elapsed);
}

Credit-Based Flow Control

let flow_control = CreditBasedFlowControl::new(
1000, // Max credits
100.0, // Replenishment rate (credits/sec)
);
loop {
// Try to consume credits
while !flow_control.try_consume("consumer1", batch_size as i64) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
// Process
process_messages().await?;
// Return credits after processing
flow_control.return_credits("consumer1", batch_size as i64);
}

Consumer Lag Monitoring

let lag_monitor = ConsumerLagMonitor::new(Duration::from_secs(60));
// Update from producer
lag_monitor.update_latest_offset("partition-0", producer_offset);
// Update from consumer
lag_monitor.update_consumer_offset("consumer1", "partition-0", consumer_offset);
// Record lag periodically
lag_monitor.record_lag("consumer1", "partition-0");
// Get metrics
let metrics = lag_monitor.get_metrics("consumer1", "partition-0");
println!("Current lag: {} messages", metrics.current_lag);
println!("Lag velocity: {:.2} msg/s", metrics.lag_velocity);
println!("Trend: {:?}", metrics.lag_trend);
// Alert on high lag
if metrics.current_lag > 10_000 {
alert("High consumer lag detected!");
}

Production Features

Dead Letter Queue

let dlq = DeadLetterQueue::new(
10_000, // Max messages
Some("/data/dlq".into()), // Persistence path
);
// Add failed message
match process_message(&message).await {
Ok(_) => {},
Err(e) => {
let mut metadata = HashMap::new();
metadata.insert("error_type".to_string(), "processing_error".to_string());
dlq.add_message(
message,
e.to_string(),
retry_count,
metadata,
).await?;
}
}
// Retrieve and retry
let failed_messages = dlq.get_all_messages().await;
for msg in failed_messages {
if msg.retry_count < 3 {
match retry_message(&msg.original_message).await {
Ok(_) => {
dlq.remove_message(msg.id).await;
}
Err(e) => {
// Re-add with incremented retry count
}
}
}
}

Message Replay

let replay_manager = MessageReplayManager::new(100_000); // Max archived messages
// Archive messages during normal processing
replay_manager.archive_message("partition-0", offset, message);
// Start replay from specific offset range
let replay_id = replay_manager.start_replay(
"partition-0",
start_offset,
Some(end_offset),
)?;
// Replay messages
while let Some(message) = replay_manager.get_next_replay_message(&replay_id) {
process_message(&message).await?;
}
// Complete replay
replay_manager.complete_replay(&replay_id);

Schema Evolution

let schema_manager = SchemaEvolutionManager::new(CompatibilityMode::Backward);
// Register initial schema
let schema_v1 = Schema {
name: "order".to_string(),
fields: vec![
SchemaField {
name: "order_id".to_string(),
field_type: FieldType::Integer,
optional: false,
default_value: None,
},
SchemaField {
name: "amount".to_string(),
field_type: FieldType::Float,
optional: false,
default_value: None,
},
],
};
let v1 = schema_manager.register_schema("order".to_string(), schema_v1)?;
// Evolve schema (add optional field)
let schema_v2 = Schema {
name: "order".to_string(),
fields: vec![
SchemaField {
name: "order_id".to_string(),
field_type: FieldType::Integer,
optional: false,
default_value: None,
},
SchemaField {
name: "amount".to_string(),
field_type: FieldType::Float,
optional: false,
default_value: None,
},
SchemaField {
name: "customer_id".to_string(),
field_type: FieldType::Integer,
optional: true,
default_value: Some(Value::Integer(0)),
},
],
};
let v2 = schema_manager.register_schema("order".to_string(), schema_v2)?;
// Migrate old data to new schema
let migrated_row = schema_manager.migrate_row("order", &old_row, v2)?;

Late Data Handling

let late_data_handler = LateDataHandler::new(
Duration::from_secs(300), // Allowed lateness (5 minutes)
10_000, // Max late messages to track
);
// Custom allowed lateness per partition
late_data_handler.set_allowed_lateness("critical_partition", Duration::from_secs(600));
// Process message with watermark
let watermark = calculate_watermark();
if late_data_handler.is_late("partition-0", &message, watermark) {
match late_data_handler.handle_late_message("partition-0", message, watermark).await {
LateDataDecision::Accept => {
// Process late message
process_late_message(&message).await?;
}
LateDataDecision::Drop => {
// Message too late, log and skip
warn!("Dropped message with lateness > 5 minutes");
}
}
} else {
// Normal processing
process_message(&message).await?;
}

Performance Tuning

Throughput Optimization

Target: 1M+ messages/sec

// 1. Use batch processing
let batch_size = 1000;
let messages = fetch_batch(batch_size).await?;
// 2. Parallel processing
let handles: Vec<_> = messages
.chunks(100)
.map(|chunk| {
tokio::spawn(async move {
process_chunk(chunk).await
})
})
.collect();
for handle in handles {
handle.await??;
}
// 3. Optimize checkpoint frequency
if processed_messages % 10_000 == 0 {
create_checkpoint().await?;
}
// 4. Use adaptive backpressure
let monitor = QueueDepthMonitor::new(100_000, Duration::from_secs(60));
let sizer = DynamicBatchSizer::new(1000, 100, 10_000, Duration::from_millis(100));

Memory Management

// 1. Limit queue sizes
let max_queue_depth = 10_000;
// 2. Configure message retention
let dlq = DeadLetterQueue::new(1_000, Some(path));
let replay_manager = MessageReplayManager::new(10_000);
// 3. Cleanup old checkpoints
offset_manager.cleanup_old_checkpoints(10).await?;
// 4. Set deduplication window
let deduplicator = MessageDeduplicator::new(Duration::from_secs(300));
deduplicator.cleanup_expired().await;

Latency Optimization

// 1. Reduce transaction timeout
let coordinator = TransactionCoordinator::new(Duration::from_secs(10), 1000);
// 2. Use smaller batch sizes for low-latency
let sizer = DynamicBatchSizer::new(50, 10, 500, Duration::from_millis(10));
// 3. Optimize checkpoint frequency
if elapsed_since_checkpoint > Duration::from_secs(5) {
create_checkpoint().await?;
}

Monitoring & Observability

Key Metrics

Transaction Metrics

let metrics = coordinator.get_metrics();
// Monitor
assert!(metrics.committed_transactions > 0);
assert!(metrics.aborted_transactions < metrics.total_transactions * 0.01); // < 1% abort rate
assert!(metrics.average_commit_time_ms < 100.0); // < 100ms average

Backpressure Metrics

let depth_metrics = monitor.get_metrics("partition-0");
let sizer_metrics = sizer.get_metrics();
let flow_metrics = flow_control.get_metrics();
let lag_metrics = lag_monitor.get_metrics("consumer1", "partition-0");
// Monitor
assert!(depth_metrics.depth_above_threshold_pct < 20.0); // < 20% time above threshold
assert!(lag_metrics.current_lag < 10_000); // < 10k messages lag

Production Metrics

let dlq_metrics = dlq.get_metrics();
let replay_metrics = replay_manager.get_metrics();
let schema_metrics = schema_manager.get_metrics();
let late_data_metrics = late_data_handler.get_metrics();
// Monitor
assert!(dlq_metrics.current_size < 1_000); // DLQ not growing
assert!(late_data_metrics.dropped_late_messages < messages_processed * 0.001); // < 0.1%

Prometheus Integration

use prometheus::{Counter, Gauge, Histogram, Registry};
let registry = Registry::new();
// Transaction metrics
let txn_total = Counter::new("streaming_transactions_total", "Total transactions")?;
let txn_committed = Counter::new("streaming_transactions_committed", "Committed transactions")?;
let txn_aborted = Counter::new("streaming_transactions_aborted", "Aborted transactions")?;
registry.register(Box::new(txn_total.clone()))?;
registry.register(Box::new(txn_committed.clone()))?;
registry.register(Box::new(txn_aborted.clone()))?;
// Update periodically
let metrics = coordinator.get_metrics();
txn_total.inc_by(metrics.total_transactions as f64);
txn_committed.inc_by(metrics.committed_transactions as f64);

Deployment Patterns

Single Node Deployment

docker-compose.yml
version: '3.8'
services:
streaming:
image: heliosdb/streaming:v7.0
environment:
- CHECKPOINT_DIR=/data/checkpoints
- DLQ_DIR=/data/dlq
- MAX_QUEUE_DEPTH=10000
- TRANSACTION_TIMEOUT=30s
volumes:
- ./data:/data
ports:
- "8080:8080"

Distributed Deployment

// Multi-instance with partitioning
let partition_count = 10;
let instance_id = env::var("INSTANCE_ID")?.parse::<usize>()?;
// Assign partitions to instances
let assigned_partitions: Vec<_> = (0..partition_count)
.filter(|p| p % instance_count == instance_id)
.collect();
for partition in assigned_partitions {
let processor = StreamProcessor::new(partition).await?;
tokio::spawn(async move {
processor.run().await
});
}

Kubernetes Deployment

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: streaming-processor
spec:
serviceName: streaming
replicas: 3
selector:
matchLabels:
app: streaming
template:
metadata:
labels:
app: streaming
spec:
containers:
- name: processor
image: heliosdb/streaming:v7.0
env:
- name: INSTANCE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "4Gi"
cpu: "2"
limits:
memory: "8Gi"
cpu: "4"
volumeMounts:
- name: data
mountPath: /data
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 100Gi

Troubleshooting

High Transaction Abort Rate

Symptom: > 1% abort rate

Causes:

  • Transaction timeout too short
  • Slow participants
  • Network issues

Solutions:

// Increase timeout
let coordinator = TransactionCoordinator::new(Duration::from_secs(60), 1000);
// Check participant performance
let metrics = coordinator.get_metrics();
if metrics.prepare_failures > 0 {
log::error!("Prepare phase failures: {}", metrics.prepare_failures);
}

High Consumer Lag

Symptom: Lag > 10k messages

Causes:

  • Insufficient processing capacity
  • Slow downstream systems
  • Backpressure not applied

Solutions:

// Apply aggressive backpressure
if lag > 10_000 {
monitor.record_depth("partition-0", 9_000); // Trigger backpressure
}
// Increase parallelism
let parallel_tasks = 10;
for chunk in messages.chunks(chunk_size) {
tokio::spawn(process_chunk(chunk));
}

Memory Growth

Symptom: Increasing memory usage

Causes:

  • DLQ growing unbounded
  • Old checkpoints not cleaned
  • Message deduplication cache too large

Solutions:

// Regular cleanup
tokio::spawn(async move {
loop {
offset_manager.cleanup_old_checkpoints(10).await.ok();
deduplicator.cleanup_expired().await;
tokio::time::sleep(Duration::from_secs(300)).await;
}
});
// Limit DLQ size
let dlq = DeadLetterQueue::new(1_000, Some(path));

Low Throughput

Symptom: < 100k messages/sec

Causes:

  • Small batch sizes
  • Excessive checkpointing
  • Backpressure too aggressive

Solutions:

// Increase batch size
let sizer = DynamicBatchSizer::new(1000, 100, 5000, Duration::from_millis(100));
// Reduce checkpoint frequency
if processed % 50_000 == 0 {
create_checkpoint().await?;
}
// Tune backpressure threshold
monitor.should_apply_backpressure("partition-0", 0.9); // 90% instead of 80%

Success Criteria

Production Readiness Checklist

  • Exactly-once semantics verified
  • Throughput > 1M messages/sec
  • Backpressure handling graceful
  • 100+ comprehensive tests passing
  • Transaction abort rate < 1%
  • Consumer lag < 10k messages
  • DLQ size stable
  • Checkpoint recovery tested
  • Schema evolution validated
  • Late data handling configured
  • Monitoring dashboards set up
  • Alerting rules configured
  • Disaster recovery plan documented
  • Performance benchmarks documented

API Reference

Transaction Coordinator

impl TransactionCoordinator {
pub fn new(default_timeout: Duration, max_log_size: usize) -> Self;
pub async fn begin_transaction(&self) -> Result<TransactionId>;
pub async fn commit(&self, txn_id: TransactionId) -> Result<()>;
pub async fn abort(&self, txn_id: TransactionId) -> Result<()>;
pub fn get_metrics(&self) -> CoordinatorMetrics;
}

Offset Manager

impl OffsetManager {
pub async fn new(name: String, checkpoint_dir: PathBuf) -> Result<Self>;
pub async fn stage_offset(&self, txn_id: TransactionId, partition: String, offset: Offset) -> Result<()>;
pub async fn create_checkpoint(&self, txn_id: Option<TransactionId>) -> Result<Checkpoint>;
pub async fn recover_from_checkpoint(&self) -> Result<Option<Checkpoint>>;
}

Version History

  • v7.0 (2025-01): Production release with exactly-once semantics
  • v6.3 (2024-12): Rate limiting and authentication
  • v6.0 (2024-11): Key management and security
  • v5.5 (2024-10): Flink integration
  • v5.1 (2024-09): CEP and backpressure

Support

  • Documentation: /docs
  • Examples: /examples
  • Tests: /tests
  • Issues: GitHub Issues

HeliosDB Streaming v7.0 - Production-Ready Stream Processing