Real-Time Streaming Integration (Kafka/Flink) - User Guide
Real-Time Streaming Integration (Kafka/Flink) - User Guide
Feature ID: F5.1.3 Version: v5.1 Status: In Development (Target: 85%) ARR Value: $15M Patent Status: Defensive Publication Planned
OVERVIEW
HeliosDB’s Real-Time Streaming Integration bridges the gap between traditional database storage and modern event-driven architectures. Seamlessly integrate with Apache Kafka and Apache Flink to enable real-time data ingestion, processing, and analytics without complex ETL pipelines.
Key Features
- Native Kafka producer/consumer integration
- Apache Flink job integration for stream processing
- Change Data Capture (CDC) streaming
- Real-time materialized view updates
- Exactly-once delivery semantics
- Schema evolution support
- Multi-format support (Avro, JSON, Protobuf)
- Monitoring and alerting
Business Value
- Real-time analytics on streaming data
- Eliminate batch processing delays (hours to seconds)
- Reduce infrastructure complexity by 50%
- Enable event-driven applications
- Support for IoT and sensor data at scale
Target Users
- Platform engineers building real-time pipelines
- Data engineers managing streaming workloads
- Application developers building event-driven systems
- DevOps teams managing data infrastructure
GETTING STARTED
Prerequisites
- HeliosDB v5.1 or later
- Kafka cluster (v2.8 or later) or Confluent Cloud
- Optional: Apache Flink cluster (v1.17 or later)
- Network connectivity to streaming infrastructure
Quick Start
1. Configure Kafka Connection
use heliosdb_streaming::{StreamingConfig, KafkaConfig};
let kafka_config = KafkaConfig { bootstrap_servers: vec!["localhost:9092".to_string()], security_protocol: SecurityProtocol::SaslSsl, sasl_mechanism: SaslMechanism::Plain, sasl_username: "your-username".to_string(), sasl_password: "your-password".to_string(),};
let streaming = StreamingConfig::new() .with_kafka(kafka_config) .build()?;2. Stream Database Changes to Kafka
use heliosdb_streaming::cdc::CDCStream;
// Stream all changes from 'orders' table to Kafkalet cdc = CDCStream::new("orders") .to_topic("db.orders.changes") .with_format(Format::Avro) .with_schema_registry("http://localhost:8081") .start()?;
println!("CDC streaming started for 'orders' table");3. Consume Kafka Events into Database
use heliosdb_streaming::consumer::KafkaConsumer;
// Consume events from Kafka topic into database tablelet consumer = KafkaConsumer::new() .subscribe("clickstream-events") .into_table("clickstream") .with_mapping(|event| { json!({ "user_id": event["userId"], "event_type": event["eventType"], "timestamp": event["timestamp"], "properties": event["properties"] }) }) .start()?;
println!("Consuming clickstream events into database");CONFIGURATION
Kafka Producer Configuration
use heliosdb_streaming::producer::ProducerConfig;
let producer = ProducerConfig { // Connection bootstrap_servers: vec!["kafka1:9092", "kafka2:9092", "kafka3:9092"],
// Performance batch_size: 65536, // 64KB batches linger_ms: 10, // Wait up to 10ms for batching compression: Compression::Snappy,
// Reliability acks: Acks::All, // Wait for all replicas max_in_flight: 5, // Pipelining for performance retries: 3,
// Monitoring enable_metrics: true, metrics_interval: Duration::from_secs(60),};Kafka Consumer Configuration
use heliosdb_streaming::consumer::ConsumerConfig;
let consumer = ConsumerConfig { // Connection bootstrap_servers: vec!["kafka1:9092", "kafka2:9092"], group_id: "heliosdb-consumer-group".to_string(),
// Consumption auto_offset_reset: AutoOffsetReset::Earliest, enable_auto_commit: false, // Manual commit for exactly-once max_poll_records: 500,
// Performance fetch_min_bytes: 1024, fetch_max_wait_ms: 500,
// Reliability session_timeout_ms: 30000, heartbeat_interval_ms: 3000,};Flink Integration Configuration
use heliosdb_streaming::flink::FlinkConfig;
let flink = FlinkConfig { // Job Manager jobmanager_address: "flink-jobmanager:8081".to_string(),
// Checkpointing checkpoint_interval: Duration::from_secs(60), checkpoint_mode: CheckpointMode::ExactlyOnce, checkpoint_storage: "s3://my-bucket/checkpoints".to_string(),
// Parallelism default_parallelism: 4, max_parallelism: 128,
// State Backend state_backend: StateBackend::RocksDB, state_ttl: Some(Duration::from_days(7)),};EXAMPLES
Example 1: Real-Time CDC to Data Warehouse
Stream database changes to a data warehouse for real-time analytics:
use heliosdb_streaming::cdc::CDCPipeline;
// Configure CDC pipelinelet pipeline = CDCPipeline::new() .source_tables(vec!["orders", "customers", "products"]) .to_kafka_topics(|table| format!("warehouse.{}.cdc", table)) .with_format(Format::Avro) .with_schema_evolution(SchemaEvolution::Backward) .with_transformations(vec![ Transformation::MaskPII, Transformation::AddMetadata, ]) .start()?;
// Monitor pipelinepipeline.monitor(|stats| { println!("Events/sec: {}", stats.throughput); println!("Lag: {}ms", stats.latency_p99);});Example 2: Clickstream Analytics
Ingest clickstream events and run real-time analytics:
use heliosdb_streaming::consumer::StreamingSQL;
// Create streaming tabledb.execute(r#" CREATE STREAMING TABLE clickstream ( user_id BIGINT, session_id VARCHAR(64), event_type VARCHAR(32), page_url TEXT, timestamp TIMESTAMP, properties JSONB ) FROM KAFKA TOPIC 'clickstream-raw' WITH ( 'format' = 'json', 'consumer.group' = 'analytics-group' )"#)?;
// Create real-time materialized viewdb.execute(r#" CREATE MATERIALIZED VIEW user_sessions_realtime AS SELECT user_id, session_id, COUNT(*) as event_count, MIN(timestamp) as session_start, MAX(timestamp) as session_end, ARRAY_AGG(page_url ORDER BY timestamp) as page_path FROM clickstream WHERE timestamp > NOW() - INTERVAL '1 hour' GROUP BY user_id, session_id WITH (refresh_mode = 'streaming')"#)?;
// Query always shows fresh datalet active_sessions = db.query( "SELECT * FROM user_sessions_realtime WHERE session_end > NOW() - INTERVAL '5 minutes'")?;Example 3: IoT Sensor Data Pipeline
Process high-volume sensor data with Flink integration:
use heliosdb_streaming::flink::FlinkJob;
// Define Flink processing joblet job = FlinkJob::new("sensor-processing") .source_kafka("iot.sensors.raw") .map(|sensor_reading| { // Parse and validate sensor data SensorReading { device_id: sensor_reading.device_id, temperature: sensor_reading.temperature, humidity: sensor_reading.humidity, timestamp: sensor_reading.timestamp, } }) .filter(|reading| { // Filter out anomalous readings reading.temperature >= -50.0 && reading.temperature <= 100.0 }) .window(TumblingWindow::of_seconds(60)) .aggregate(|readings| { // 1-minute aggregations Aggregate { device_id: readings[0].device_id, avg_temperature: readings.iter().map(|r| r.temperature).sum::<f64>() / readings.len() as f64, min_temperature: readings.iter().map(|r| r.temperature).min_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(), max_temperature: readings.iter().map(|r| r.temperature).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(), reading_count: readings.len(), window_start: readings[0].timestamp, } }) .sink_heliosdb("sensor_aggregates_1min") .submit()?;
println!("Flink job submitted: {}", job.job_id);Example 4: Event Sourcing Pattern
Implement event sourcing with automatic state management:
use heliosdb_streaming::event_sourcing::EventStore;
// Create event store backed by Kafkalet event_store = EventStore::new() .with_kafka_topic("events.orders") .with_snapshot_table("order_snapshots") .with_snapshot_interval(100) // Snapshot every 100 events .build()?;
// Append eventsevent_store.append(OrderEvent::Created { order_id: "12345", customer_id: "customer-1", items: vec![...], total: 99.99,})?;
event_store.append(OrderEvent::PaymentReceived { order_id: "12345", payment_id: "payment-abc", amount: 99.99,})?;
event_store.append(OrderEvent::Shipped { order_id: "12345", tracking_number: "TRACK123",})?;
// Reconstruct current statelet order = event_store.get_state("12345")?;assert_eq!(order.status, OrderStatus::Shipped);
// Query event historylet events = event_store.get_events("12345")?;for event in events { println!("{:?} at {}", event.event_type, event.timestamp);}Example 5: Multi-Source Data Integration
Combine data from multiple Kafka topics:
use heliosdb_streaming::integration::MultiSourcePipeline;
let pipeline = MultiSourcePipeline::new() // Source 1: User events .add_source("user-events", |event| { UserEvent { user_id: event["userId"].as_str()?, event_type: event["type"].as_str()?, timestamp: event["timestamp"].as_i64()?, } }) // Source 2: Product catalog updates .add_source("product-updates", |update| { ProductUpdate { product_id: update["productId"].as_str()?, name: update["name"].as_str()?, price: update["price"].as_f64()?, } }) // Join and enrich .join_on(|user_event, products| { // Enrich user events with product data if let Some(product_id) = user_event.product_id { if let Some(product) = products.get(product_id) { return Some(EnrichedEvent { user_id: user_event.user_id, event_type: user_event.event_type, product_name: product.name, product_price: product.price, }); } } None }) .sink_to_table("enriched_user_events") .start()?;API REFERENCE
StreamingConfig
pub struct StreamingConfig { pub kafka: Option<KafkaConfig>, pub flink: Option<FlinkConfig>, pub schema_registry: Option<SchemaRegistryConfig>,}
impl StreamingConfig { pub fn new() -> StreamingConfigBuilder; pub async fn validate(&self) -> Result<()>; pub async fn test_connection(&self) -> Result<ConnectionStatus>;}CDCStream
pub struct CDCStream { // Configuration}
impl CDCStream { pub fn new(table: &str) -> CDCStreamBuilder; pub async fn start(&self) -> Result<StreamHandle>; pub async fn pause(&self) -> Result<()>; pub async fn resume(&self) -> Result<()>; pub async fn stop(&self) -> Result<StreamStats>;}
pub struct StreamStats { pub events_published: u64, pub bytes_published: u64, pub errors: u64, pub current_offset: i64, pub lag_ms: i64,}KafkaProducer
pub struct KafkaProducer { // Internal state}
impl KafkaProducer { pub async fn send(&self, topic: &str, key: &[u8], value: &[u8]) -> Result<RecordMetadata>; pub async fn send_batch(&self, records: Vec<ProducerRecord>) -> Result<Vec<RecordMetadata>>; pub async fn flush(&self) -> Result<()>; pub fn metrics(&self) -> ProducerMetrics;}KafkaConsumer
pub struct KafkaConsumer { // Internal state}
impl KafkaConsumer { pub fn new() -> KafkaConsumerBuilder; pub async fn subscribe(&mut self, topics: Vec<String>) -> Result<()>; pub async fn poll(&mut self, timeout: Duration) -> Result<ConsumerRecords>; pub async fn commit(&mut self) -> Result<()>; pub async fn seek(&mut self, partition: i32, offset: i64) -> Result<()>;}TROUBLESHOOTING
Issue: Consumer Lag Increasing
Problem: Consumer group falling behind Kafka topic
Solutions:
- Increase parallelism
- Optimize processing logic
- Tune batch sizes
// Increase consumer parallelismlet consumer = KafkaConsumer::new() .with_parallelism(16) // Process 16 partitions concurrently .with_batch_size(1000) // Larger batches .build()?;Issue: Duplicate Messages
Problem: Messages processed multiple times
Solutions:
- Enable exactly-once semantics
- Use idempotent processing
- Implement deduplication
// Enable exactly-oncelet consumer = KafkaConsumer::new() .with_isolation_level(IsolationLevel::ReadCommitted) .with_exactly_once_semantics(true) .build()?;Issue: Schema Evolution Failures
Problem: Breaking schema changes causing failures
Solutions:
- Use schema registry
- Enable compatibility checking
- Version your schemas
let schema_config = SchemaRegistryConfig { url: "http://schema-registry:8081", compatibility_mode: CompatibilityMode::BackwardTransitive, auto_register: false, // Prevent accidental breaking changes};PERFORMANCE CHARACTERISTICS
Throughput
- Single table CDC: 50K-100K events/sec
- Multi-table CDC: 200K+ events/sec aggregate
- Kafka consumption: 100K+ events/sec per consumer
Latency
- End-to-end CDC latency: 10-50ms (p99)
- Kafka round-trip: 5-20ms (p99)
Resource Usage
- CDC per table: 100-200MB memory, 0.5 CPU cores
- Kafka consumer: 200-500MB memory, 1-2 CPU cores
BEST PRACTICES
1. Use Avro for Schema Evolution
// Good: Avro with schema registryCDCStream::new("orders") .with_format(Format::Avro) .with_schema_registry("http://registry:8081") .start()?;
// Bad: JSON without schema validationCDCStream::new("orders") .with_format(Format::Json) .start()?; // Schema changes will break consumers2. Implement Monitoring
// Monitor all streamslet monitor = StreamingMonitor::new() .track_throughput() .track_latency() .alert_on_lag(Duration::from_secs(300)) // Alert if 5 min lag .alert_on_errors(error_rate: 0.01) // Alert if > 1% error rate .export_to_prometheus() .start()?;3. Handle Backpressure
// Configure backpressurelet consumer = KafkaConsumer::new() .with_backpressure_strategy(BackpressureStrategy::DropOldest) .with_buffer_size(10000) .build()?;NEXT STEPS
- Set up your first CDC stream
- Integrate with existing Kafka infrastructure
- Build real-time materialized views
- Implement monitoring and alerting
- Explore Flink integration for complex stream processing
RELATED DOCUMENTATION
- Architecture:
/home/claude/HeliosDB/docs/architecture/STREAMING_ARCHITECTURE.md - API Reference: Complete rustdoc in
heliosdb-streaming/src/lib.rs - Implementation Report:
/home/claude/HeliosDB/docs/releases/v5.2-v5.4/F5.1.3_IMPLEMENTATION_REPORT.md
Version: 1.0 Last Updated: November 1, 2025 Author: HeliosDB Engineering Team