HeliosDB Streaming v7.0 - Production Deployment Guide
HeliosDB Streaming v7.0 - Production Deployment Guide
Overview
HeliosDB Streaming v7.0 brings enterprise-grade stream processing with exactly-once semantics, advanced backpressure management, and comprehensive production features. This guide covers deployment, configuration, and best practices for production environments.
Table of Contents
- Exactly-Once Semantics
- Backpressure Management
- Production Features
- Performance Tuning
- Monitoring & Observability
- Deployment Patterns
- Troubleshooting
Exactly-Once Semantics
Overview
HeliosDB Streaming v7.0 implements true exactly-once processing guarantees using:
- Transaction Coordinator: Two-phase commit protocol
- Idempotent Producer: Prevents duplicate writes
- Offset Management: Transactional offset commits
- Message Deduplication: UUID-based duplicate detection
Quick Start
use heliosdb_streaming::*;use std::sync::Arc;use std::time::Duration;
#[tokio::main]async fn main() -> Result<()> { // Create transaction coordinator let coordinator = Arc::new(TransactionCoordinator::new( Duration::from_secs(30), // Transaction timeout 1000, // Max completed transactions in log ));
// Create offset manager let offset_manager = Arc::new( OffsetManager::new( "my_processor".to_string(), "/data/checkpoints".into(), ).await? );
// Register as participant coordinator.register_participant(offset_manager.clone()).await;
// Create idempotent producer let producer = IdempotentProducer::new(); let deduplicator = MessageDeduplicator::new(Duration::from_secs(300));
// Process messages loop { // Begin transaction let txn_id = coordinator.begin_transaction().await?; coordinator.add_participant(txn_id, "my_processor").await?;
// Get message let (msg_id, message, offset) = receive_message().await?;
// Check for duplicates if deduplicator.check_and_record(msg_id) { continue; // Skip duplicate }
// Process message process_message(&message).await?;
// Stage offset offset_manager.stage_offset(txn_id, "partition-0".to_string(), offset).await?;
// Record in producer let seq = producer.next_sequence("partition-0"); producer.record_message(msg_id, "partition-0".to_string(), seq);
// Commit transaction (2PC) coordinator.commit(txn_id).await?; }}Transaction States
Active -> Preparing -> Prepared -> Committing -> Committed | | | +-> Aborting -------> Aborted | | | +-----------> TimedOut <------------+Checkpoint-Based Recovery
// Create checkpoints periodicallyif messages_processed % 1000 == 0 { offset_manager.create_checkpoint(Some(txn_id)).await?;}
// Recover from latest checkpoint on restartif let Some(checkpoint) = offset_manager.recover_from_checkpoint().await? { println!("Recovered from checkpoint {}", checkpoint.id); println!("Offsets: {:?}", checkpoint.offsets);}
// Cleanup old checkpointsoffset_manager.cleanup_old_checkpoints(10).await?; // Keep last 10Best Practices
- Transaction Timeout: Set based on processing time (typically 30-60s)
- Checkpoint Frequency: Balance between recovery time and overhead (1000-10000 messages)
- Deduplication Window: Match to maximum message redelivery window
- Participant Registration: Register all participants before starting processing
Backpressure Management
Overview
Advanced backpressure mechanisms prevent system overload:
- Queue Depth Monitoring: Real-time queue utilization tracking
- Dynamic Batch Sizing: Automatically adjust batch sizes
- Credit-Based Flow Control: Token-based rate limiting
- Consumer Lag Monitoring: Track and respond to lag
Queue Depth Monitoring
use heliosdb_streaming::*;
let monitor = QueueDepthMonitor::new( 10_000, // Max queue depth Duration::from_secs(60), // History retention);
// Record depthmonitor.record_depth("partition-0", current_depth);
// Check if backpressure neededif monitor.should_apply_backpressure("partition-0", 0.8) { // Apply backpressure (slow down ingestion) tokio::time::sleep(Duration::from_millis(100)).await;}
// Get metricslet metrics = monitor.get_metrics("partition-0");println!("Current depth: {}", metrics.current_depth);println!("Average depth: {:.2}", metrics.average_depth);println!("Trend: {:?}", metrics.depth_trend);Dynamic Batch Sizing
let sizer = DynamicBatchSizer::new( 100, // Initial batch size 10, // Min batch size 1000, // Max batch size Duration::from_millis(100), // Target batch processing time);
loop { let batch_size = sizer.get_batch_size("partition-0"); let messages = fetch_messages(batch_size).await?;
let start = Instant::now(); process_batch(&messages).await?; let elapsed = start.elapsed();
// Record for adaptation sizer.record_processing_time("partition-0", messages.len(), elapsed);}Credit-Based Flow Control
let flow_control = CreditBasedFlowControl::new( 1000, // Max credits 100.0, // Replenishment rate (credits/sec));
loop { // Try to consume credits while !flow_control.try_consume("consumer1", batch_size as i64) { tokio::time::sleep(Duration::from_millis(10)).await; }
// Process process_messages().await?;
// Return credits after processing flow_control.return_credits("consumer1", batch_size as i64);}Consumer Lag Monitoring
let lag_monitor = ConsumerLagMonitor::new(Duration::from_secs(60));
// Update from producerlag_monitor.update_latest_offset("partition-0", producer_offset);
// Update from consumerlag_monitor.update_consumer_offset("consumer1", "partition-0", consumer_offset);
// Record lag periodicallylag_monitor.record_lag("consumer1", "partition-0");
// Get metricslet metrics = lag_monitor.get_metrics("consumer1", "partition-0");println!("Current lag: {} messages", metrics.current_lag);println!("Lag velocity: {:.2} msg/s", metrics.lag_velocity);println!("Trend: {:?}", metrics.lag_trend);
// Alert on high lagif metrics.current_lag > 10_000 { alert("High consumer lag detected!");}Production Features
Dead Letter Queue
let dlq = DeadLetterQueue::new( 10_000, // Max messages Some("/data/dlq".into()), // Persistence path);
// Add failed messagematch process_message(&message).await { Ok(_) => {}, Err(e) => { let mut metadata = HashMap::new(); metadata.insert("error_type".to_string(), "processing_error".to_string());
dlq.add_message( message, e.to_string(), retry_count, metadata, ).await?; }}
// Retrieve and retrylet failed_messages = dlq.get_all_messages().await;for msg in failed_messages { if msg.retry_count < 3 { match retry_message(&msg.original_message).await { Ok(_) => { dlq.remove_message(msg.id).await; } Err(e) => { // Re-add with incremented retry count } } }}Message Replay
let replay_manager = MessageReplayManager::new(100_000); // Max archived messages
// Archive messages during normal processingreplay_manager.archive_message("partition-0", offset, message);
// Start replay from specific offset rangelet replay_id = replay_manager.start_replay( "partition-0", start_offset, Some(end_offset),)?;
// Replay messageswhile let Some(message) = replay_manager.get_next_replay_message(&replay_id) { process_message(&message).await?;}
// Complete replayreplay_manager.complete_replay(&replay_id);Schema Evolution
let schema_manager = SchemaEvolutionManager::new(CompatibilityMode::Backward);
// Register initial schemalet schema_v1 = Schema { name: "order".to_string(), fields: vec![ SchemaField { name: "order_id".to_string(), field_type: FieldType::Integer, optional: false, default_value: None, }, SchemaField { name: "amount".to_string(), field_type: FieldType::Float, optional: false, default_value: None, }, ],};
let v1 = schema_manager.register_schema("order".to_string(), schema_v1)?;
// Evolve schema (add optional field)let schema_v2 = Schema { name: "order".to_string(), fields: vec![ SchemaField { name: "order_id".to_string(), field_type: FieldType::Integer, optional: false, default_value: None, }, SchemaField { name: "amount".to_string(), field_type: FieldType::Float, optional: false, default_value: None, }, SchemaField { name: "customer_id".to_string(), field_type: FieldType::Integer, optional: true, default_value: Some(Value::Integer(0)), }, ],};
let v2 = schema_manager.register_schema("order".to_string(), schema_v2)?;
// Migrate old data to new schemalet migrated_row = schema_manager.migrate_row("order", &old_row, v2)?;Late Data Handling
let late_data_handler = LateDataHandler::new( Duration::from_secs(300), // Allowed lateness (5 minutes) 10_000, // Max late messages to track);
// Custom allowed lateness per partitionlate_data_handler.set_allowed_lateness("critical_partition", Duration::from_secs(600));
// Process message with watermarklet watermark = calculate_watermark();if late_data_handler.is_late("partition-0", &message, watermark) { match late_data_handler.handle_late_message("partition-0", message, watermark).await { LateDataDecision::Accept => { // Process late message process_late_message(&message).await?; } LateDataDecision::Drop => { // Message too late, log and skip warn!("Dropped message with lateness > 5 minutes"); } }} else { // Normal processing process_message(&message).await?;}Performance Tuning
Throughput Optimization
Target: 1M+ messages/sec
// 1. Use batch processinglet batch_size = 1000;let messages = fetch_batch(batch_size).await?;
// 2. Parallel processinglet handles: Vec<_> = messages .chunks(100) .map(|chunk| { tokio::spawn(async move { process_chunk(chunk).await }) }) .collect();
for handle in handles { handle.await??;}
// 3. Optimize checkpoint frequencyif processed_messages % 10_000 == 0 { create_checkpoint().await?;}
// 4. Use adaptive backpressurelet monitor = QueueDepthMonitor::new(100_000, Duration::from_secs(60));let sizer = DynamicBatchSizer::new(1000, 100, 10_000, Duration::from_millis(100));Memory Management
// 1. Limit queue sizeslet max_queue_depth = 10_000;
// 2. Configure message retentionlet dlq = DeadLetterQueue::new(1_000, Some(path));let replay_manager = MessageReplayManager::new(10_000);
// 3. Cleanup old checkpointsoffset_manager.cleanup_old_checkpoints(10).await?;
// 4. Set deduplication windowlet deduplicator = MessageDeduplicator::new(Duration::from_secs(300));deduplicator.cleanup_expired().await;Latency Optimization
// 1. Reduce transaction timeoutlet coordinator = TransactionCoordinator::new(Duration::from_secs(10), 1000);
// 2. Use smaller batch sizes for low-latencylet sizer = DynamicBatchSizer::new(50, 10, 500, Duration::from_millis(10));
// 3. Optimize checkpoint frequencyif elapsed_since_checkpoint > Duration::from_secs(5) { create_checkpoint().await?;}Monitoring & Observability
Key Metrics
Transaction Metrics
let metrics = coordinator.get_metrics();
// Monitorassert!(metrics.committed_transactions > 0);assert!(metrics.aborted_transactions < metrics.total_transactions * 0.01); // < 1% abort rateassert!(metrics.average_commit_time_ms < 100.0); // < 100ms averageBackpressure Metrics
let depth_metrics = monitor.get_metrics("partition-0");let sizer_metrics = sizer.get_metrics();let flow_metrics = flow_control.get_metrics();let lag_metrics = lag_monitor.get_metrics("consumer1", "partition-0");
// Monitorassert!(depth_metrics.depth_above_threshold_pct < 20.0); // < 20% time above thresholdassert!(lag_metrics.current_lag < 10_000); // < 10k messages lagProduction Metrics
let dlq_metrics = dlq.get_metrics();let replay_metrics = replay_manager.get_metrics();let schema_metrics = schema_manager.get_metrics();let late_data_metrics = late_data_handler.get_metrics();
// Monitorassert!(dlq_metrics.current_size < 1_000); // DLQ not growingassert!(late_data_metrics.dropped_late_messages < messages_processed * 0.001); // < 0.1%Prometheus Integration
use prometheus::{Counter, Gauge, Histogram, Registry};
let registry = Registry::new();
// Transaction metricslet txn_total = Counter::new("streaming_transactions_total", "Total transactions")?;let txn_committed = Counter::new("streaming_transactions_committed", "Committed transactions")?;let txn_aborted = Counter::new("streaming_transactions_aborted", "Aborted transactions")?;
registry.register(Box::new(txn_total.clone()))?;registry.register(Box::new(txn_committed.clone()))?;registry.register(Box::new(txn_aborted.clone()))?;
// Update periodicallylet metrics = coordinator.get_metrics();txn_total.inc_by(metrics.total_transactions as f64);txn_committed.inc_by(metrics.committed_transactions as f64);Deployment Patterns
Single Node Deployment
version: '3.8'services: streaming: image: heliosdb/streaming:v7.0 environment: - CHECKPOINT_DIR=/data/checkpoints - DLQ_DIR=/data/dlq - MAX_QUEUE_DEPTH=10000 - TRANSACTION_TIMEOUT=30s volumes: - ./data:/data ports: - "8080:8080"Distributed Deployment
// Multi-instance with partitioninglet partition_count = 10;let instance_id = env::var("INSTANCE_ID")?.parse::<usize>()?;
// Assign partitions to instanceslet assigned_partitions: Vec<_> = (0..partition_count) .filter(|p| p % instance_count == instance_id) .collect();
for partition in assigned_partitions { let processor = StreamProcessor::new(partition).await?; tokio::spawn(async move { processor.run().await });}Kubernetes Deployment
apiVersion: apps/v1kind: StatefulSetmetadata: name: streaming-processorspec: serviceName: streaming replicas: 3 selector: matchLabels: app: streaming template: metadata: labels: app: streaming spec: containers: - name: processor image: heliosdb/streaming:v7.0 env: - name: INSTANCE_ID valueFrom: fieldRef: fieldPath: metadata.name resources: requests: memory: "4Gi" cpu: "2" limits: memory: "8Gi" cpu: "4" volumeMounts: - name: data mountPath: /data volumeClaimTemplates: - metadata: name: data spec: accessModes: [ "ReadWriteOnce" ] resources: requests: storage: 100GiTroubleshooting
High Transaction Abort Rate
Symptom: > 1% abort rate
Causes:
- Transaction timeout too short
- Slow participants
- Network issues
Solutions:
// Increase timeoutlet coordinator = TransactionCoordinator::new(Duration::from_secs(60), 1000);
// Check participant performancelet metrics = coordinator.get_metrics();if metrics.prepare_failures > 0 { log::error!("Prepare phase failures: {}", metrics.prepare_failures);}High Consumer Lag
Symptom: Lag > 10k messages
Causes:
- Insufficient processing capacity
- Slow downstream systems
- Backpressure not applied
Solutions:
// Apply aggressive backpressureif lag > 10_000 { monitor.record_depth("partition-0", 9_000); // Trigger backpressure}
// Increase parallelismlet parallel_tasks = 10;for chunk in messages.chunks(chunk_size) { tokio::spawn(process_chunk(chunk));}Memory Growth
Symptom: Increasing memory usage
Causes:
- DLQ growing unbounded
- Old checkpoints not cleaned
- Message deduplication cache too large
Solutions:
// Regular cleanuptokio::spawn(async move { loop { offset_manager.cleanup_old_checkpoints(10).await.ok(); deduplicator.cleanup_expired().await; tokio::time::sleep(Duration::from_secs(300)).await; }});
// Limit DLQ sizelet dlq = DeadLetterQueue::new(1_000, Some(path));Low Throughput
Symptom: < 100k messages/sec
Causes:
- Small batch sizes
- Excessive checkpointing
- Backpressure too aggressive
Solutions:
// Increase batch sizelet sizer = DynamicBatchSizer::new(1000, 100, 5000, Duration::from_millis(100));
// Reduce checkpoint frequencyif processed % 50_000 == 0 { create_checkpoint().await?;}
// Tune backpressure thresholdmonitor.should_apply_backpressure("partition-0", 0.9); // 90% instead of 80%Success Criteria
Production Readiness Checklist
- Exactly-once semantics verified
- Throughput > 1M messages/sec
- Backpressure handling graceful
- 100+ comprehensive tests passing
- Transaction abort rate < 1%
- Consumer lag < 10k messages
- DLQ size stable
- Checkpoint recovery tested
- Schema evolution validated
- Late data handling configured
- Monitoring dashboards set up
- Alerting rules configured
- Disaster recovery plan documented
- Performance benchmarks documented
API Reference
Transaction Coordinator
impl TransactionCoordinator { pub fn new(default_timeout: Duration, max_log_size: usize) -> Self; pub async fn begin_transaction(&self) -> Result<TransactionId>; pub async fn commit(&self, txn_id: TransactionId) -> Result<()>; pub async fn abort(&self, txn_id: TransactionId) -> Result<()>; pub fn get_metrics(&self) -> CoordinatorMetrics;}Offset Manager
impl OffsetManager { pub async fn new(name: String, checkpoint_dir: PathBuf) -> Result<Self>; pub async fn stage_offset(&self, txn_id: TransactionId, partition: String, offset: Offset) -> Result<()>; pub async fn create_checkpoint(&self, txn_id: Option<TransactionId>) -> Result<Checkpoint>; pub async fn recover_from_checkpoint(&self) -> Result<Option<Checkpoint>>;}Version History
- v7.0 (2025-01): Production release with exactly-once semantics
- v6.3 (2024-12): Rate limiting and authentication
- v6.0 (2024-11): Key management and security
- v5.5 (2024-10): Flink integration
- v5.1 (2024-09): CEP and backpressure
Support
- Documentation:
/docs - Examples:
/examples - Tests:
/tests - Issues: GitHub Issues
HeliosDB Streaming v7.0 - Production-Ready Stream Processing