Skip to content

Real-Time Streaming Integration (Kafka/Flink) - User Guide

Real-Time Streaming Integration (Kafka/Flink) - User Guide

Feature ID: F5.1.3 Version: v5.1 Status: In Development (Target: 85%) ARR Value: $15M Patent Status: Defensive Publication Planned


OVERVIEW

HeliosDB’s Real-Time Streaming Integration bridges the gap between traditional database storage and modern event-driven architectures. Seamlessly integrate with Apache Kafka and Apache Flink to enable real-time data ingestion, processing, and analytics without complex ETL pipelines.

Key Features

  • Native Kafka producer/consumer integration
  • Apache Flink job integration for stream processing
  • Change Data Capture (CDC) streaming
  • Real-time materialized view updates
  • Exactly-once delivery semantics
  • Schema evolution support
  • Multi-format support (Avro, JSON, Protobuf)
  • Monitoring and alerting

Business Value

  • Real-time analytics on streaming data
  • Eliminate batch processing delays (hours to seconds)
  • Reduce infrastructure complexity by 50%
  • Enable event-driven applications
  • Support for IoT and sensor data at scale

Target Users

  • Platform engineers building real-time pipelines
  • Data engineers managing streaming workloads
  • Application developers building event-driven systems
  • DevOps teams managing data infrastructure

GETTING STARTED

Prerequisites

  • HeliosDB v5.1 or later
  • Kafka cluster (v2.8 or later) or Confluent Cloud
  • Optional: Apache Flink cluster (v1.17 or later)
  • Network connectivity to streaming infrastructure

Quick Start

1. Configure Kafka Connection

use heliosdb_streaming::{StreamingConfig, KafkaConfig};
let kafka_config = KafkaConfig {
bootstrap_servers: vec!["localhost:9092".to_string()],
security_protocol: SecurityProtocol::SaslSsl,
sasl_mechanism: SaslMechanism::Plain,
sasl_username: "your-username".to_string(),
sasl_password: "your-password".to_string(),
};
let streaming = StreamingConfig::new()
.with_kafka(kafka_config)
.build()?;

2. Stream Database Changes to Kafka

use heliosdb_streaming::cdc::CDCStream;
// Stream all changes from 'orders' table to Kafka
let cdc = CDCStream::new("orders")
.to_topic("db.orders.changes")
.with_format(Format::Avro)
.with_schema_registry("http://localhost:8081")
.start()?;
println!("CDC streaming started for 'orders' table");

3. Consume Kafka Events into Database

use heliosdb_streaming::consumer::KafkaConsumer;
// Consume events from Kafka topic into database table
let consumer = KafkaConsumer::new()
.subscribe("clickstream-events")
.into_table("clickstream")
.with_mapping(|event| {
json!({
"user_id": event["userId"],
"event_type": event["eventType"],
"timestamp": event["timestamp"],
"properties": event["properties"]
})
})
.start()?;
println!("Consuming clickstream events into database");

CONFIGURATION

Kafka Producer Configuration

use heliosdb_streaming::producer::ProducerConfig;
let producer = ProducerConfig {
// Connection
bootstrap_servers: vec!["kafka1:9092", "kafka2:9092", "kafka3:9092"],
// Performance
batch_size: 65536, // 64KB batches
linger_ms: 10, // Wait up to 10ms for batching
compression: Compression::Snappy,
// Reliability
acks: Acks::All, // Wait for all replicas
max_in_flight: 5, // Pipelining for performance
retries: 3,
// Monitoring
enable_metrics: true,
metrics_interval: Duration::from_secs(60),
};

Kafka Consumer Configuration

use heliosdb_streaming::consumer::ConsumerConfig;
let consumer = ConsumerConfig {
// Connection
bootstrap_servers: vec!["kafka1:9092", "kafka2:9092"],
group_id: "heliosdb-consumer-group".to_string(),
// Consumption
auto_offset_reset: AutoOffsetReset::Earliest,
enable_auto_commit: false, // Manual commit for exactly-once
max_poll_records: 500,
// Performance
fetch_min_bytes: 1024,
fetch_max_wait_ms: 500,
// Reliability
session_timeout_ms: 30000,
heartbeat_interval_ms: 3000,
};
use heliosdb_streaming::flink::FlinkConfig;
let flink = FlinkConfig {
// Job Manager
jobmanager_address: "flink-jobmanager:8081".to_string(),
// Checkpointing
checkpoint_interval: Duration::from_secs(60),
checkpoint_mode: CheckpointMode::ExactlyOnce,
checkpoint_storage: "s3://my-bucket/checkpoints".to_string(),
// Parallelism
default_parallelism: 4,
max_parallelism: 128,
// State Backend
state_backend: StateBackend::RocksDB,
state_ttl: Some(Duration::from_days(7)),
};

EXAMPLES

Example 1: Real-Time CDC to Data Warehouse

Stream database changes to a data warehouse for real-time analytics:

use heliosdb_streaming::cdc::CDCPipeline;
// Configure CDC pipeline
let pipeline = CDCPipeline::new()
.source_tables(vec!["orders", "customers", "products"])
.to_kafka_topics(|table| format!("warehouse.{}.cdc", table))
.with_format(Format::Avro)
.with_schema_evolution(SchemaEvolution::Backward)
.with_transformations(vec![
Transformation::MaskPII,
Transformation::AddMetadata,
])
.start()?;
// Monitor pipeline
pipeline.monitor(|stats| {
println!("Events/sec: {}", stats.throughput);
println!("Lag: {}ms", stats.latency_p99);
});

Example 2: Clickstream Analytics

Ingest clickstream events and run real-time analytics:

use heliosdb_streaming::consumer::StreamingSQL;
// Create streaming table
db.execute(r#"
CREATE STREAMING TABLE clickstream (
user_id BIGINT,
session_id VARCHAR(64),
event_type VARCHAR(32),
page_url TEXT,
timestamp TIMESTAMP,
properties JSONB
) FROM KAFKA TOPIC 'clickstream-raw'
WITH (
'format' = 'json',
'consumer.group' = 'analytics-group'
)
"#)?;
// Create real-time materialized view
db.execute(r#"
CREATE MATERIALIZED VIEW user_sessions_realtime AS
SELECT
user_id,
session_id,
COUNT(*) as event_count,
MIN(timestamp) as session_start,
MAX(timestamp) as session_end,
ARRAY_AGG(page_url ORDER BY timestamp) as page_path
FROM clickstream
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_id, session_id
WITH (refresh_mode = 'streaming')
"#)?;
// Query always shows fresh data
let active_sessions = db.query(
"SELECT * FROM user_sessions_realtime WHERE session_end > NOW() - INTERVAL '5 minutes'"
)?;

Example 3: IoT Sensor Data Pipeline

Process high-volume sensor data with Flink integration:

use heliosdb_streaming::flink::FlinkJob;
// Define Flink processing job
let job = FlinkJob::new("sensor-processing")
.source_kafka("iot.sensors.raw")
.map(|sensor_reading| {
// Parse and validate sensor data
SensorReading {
device_id: sensor_reading.device_id,
temperature: sensor_reading.temperature,
humidity: sensor_reading.humidity,
timestamp: sensor_reading.timestamp,
}
})
.filter(|reading| {
// Filter out anomalous readings
reading.temperature >= -50.0 &&
reading.temperature <= 100.0
})
.window(TumblingWindow::of_seconds(60))
.aggregate(|readings| {
// 1-minute aggregations
Aggregate {
device_id: readings[0].device_id,
avg_temperature: readings.iter().map(|r| r.temperature).sum::<f64>() / readings.len() as f64,
min_temperature: readings.iter().map(|r| r.temperature).min_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(),
max_temperature: readings.iter().map(|r| r.temperature).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(),
reading_count: readings.len(),
window_start: readings[0].timestamp,
}
})
.sink_heliosdb("sensor_aggregates_1min")
.submit()?;
println!("Flink job submitted: {}", job.job_id);

Example 4: Event Sourcing Pattern

Implement event sourcing with automatic state management:

use heliosdb_streaming::event_sourcing::EventStore;
// Create event store backed by Kafka
let event_store = EventStore::new()
.with_kafka_topic("events.orders")
.with_snapshot_table("order_snapshots")
.with_snapshot_interval(100) // Snapshot every 100 events
.build()?;
// Append events
event_store.append(OrderEvent::Created {
order_id: "12345",
customer_id: "customer-1",
items: vec![...],
total: 99.99,
})?;
event_store.append(OrderEvent::PaymentReceived {
order_id: "12345",
payment_id: "payment-abc",
amount: 99.99,
})?;
event_store.append(OrderEvent::Shipped {
order_id: "12345",
tracking_number: "TRACK123",
})?;
// Reconstruct current state
let order = event_store.get_state("12345")?;
assert_eq!(order.status, OrderStatus::Shipped);
// Query event history
let events = event_store.get_events("12345")?;
for event in events {
println!("{:?} at {}", event.event_type, event.timestamp);
}

Example 5: Multi-Source Data Integration

Combine data from multiple Kafka topics:

use heliosdb_streaming::integration::MultiSourcePipeline;
let pipeline = MultiSourcePipeline::new()
// Source 1: User events
.add_source("user-events", |event| {
UserEvent {
user_id: event["userId"].as_str()?,
event_type: event["type"].as_str()?,
timestamp: event["timestamp"].as_i64()?,
}
})
// Source 2: Product catalog updates
.add_source("product-updates", |update| {
ProductUpdate {
product_id: update["productId"].as_str()?,
name: update["name"].as_str()?,
price: update["price"].as_f64()?,
}
})
// Join and enrich
.join_on(|user_event, products| {
// Enrich user events with product data
if let Some(product_id) = user_event.product_id {
if let Some(product) = products.get(product_id) {
return Some(EnrichedEvent {
user_id: user_event.user_id,
event_type: user_event.event_type,
product_name: product.name,
product_price: product.price,
});
}
}
None
})
.sink_to_table("enriched_user_events")
.start()?;

API REFERENCE

StreamingConfig

pub struct StreamingConfig {
pub kafka: Option<KafkaConfig>,
pub flink: Option<FlinkConfig>,
pub schema_registry: Option<SchemaRegistryConfig>,
}
impl StreamingConfig {
pub fn new() -> StreamingConfigBuilder;
pub async fn validate(&self) -> Result<()>;
pub async fn test_connection(&self) -> Result<ConnectionStatus>;
}

CDCStream

pub struct CDCStream {
// Configuration
}
impl CDCStream {
pub fn new(table: &str) -> CDCStreamBuilder;
pub async fn start(&self) -> Result<StreamHandle>;
pub async fn pause(&self) -> Result<()>;
pub async fn resume(&self) -> Result<()>;
pub async fn stop(&self) -> Result<StreamStats>;
}
pub struct StreamStats {
pub events_published: u64,
pub bytes_published: u64,
pub errors: u64,
pub current_offset: i64,
pub lag_ms: i64,
}

KafkaProducer

pub struct KafkaProducer {
// Internal state
}
impl KafkaProducer {
pub async fn send(&self, topic: &str, key: &[u8], value: &[u8]) -> Result<RecordMetadata>;
pub async fn send_batch(&self, records: Vec<ProducerRecord>) -> Result<Vec<RecordMetadata>>;
pub async fn flush(&self) -> Result<()>;
pub fn metrics(&self) -> ProducerMetrics;
}

KafkaConsumer

pub struct KafkaConsumer {
// Internal state
}
impl KafkaConsumer {
pub fn new() -> KafkaConsumerBuilder;
pub async fn subscribe(&mut self, topics: Vec<String>) -> Result<()>;
pub async fn poll(&mut self, timeout: Duration) -> Result<ConsumerRecords>;
pub async fn commit(&mut self) -> Result<()>;
pub async fn seek(&mut self, partition: i32, offset: i64) -> Result<()>;
}

TROUBLESHOOTING

Issue: Consumer Lag Increasing

Problem: Consumer group falling behind Kafka topic

Solutions:

  1. Increase parallelism
  2. Optimize processing logic
  3. Tune batch sizes
// Increase consumer parallelism
let consumer = KafkaConsumer::new()
.with_parallelism(16) // Process 16 partitions concurrently
.with_batch_size(1000) // Larger batches
.build()?;

Issue: Duplicate Messages

Problem: Messages processed multiple times

Solutions:

  1. Enable exactly-once semantics
  2. Use idempotent processing
  3. Implement deduplication
// Enable exactly-once
let consumer = KafkaConsumer::new()
.with_isolation_level(IsolationLevel::ReadCommitted)
.with_exactly_once_semantics(true)
.build()?;

Issue: Schema Evolution Failures

Problem: Breaking schema changes causing failures

Solutions:

  1. Use schema registry
  2. Enable compatibility checking
  3. Version your schemas
let schema_config = SchemaRegistryConfig {
url: "http://schema-registry:8081",
compatibility_mode: CompatibilityMode::BackwardTransitive,
auto_register: false, // Prevent accidental breaking changes
};

PERFORMANCE CHARACTERISTICS

Throughput

  • Single table CDC: 50K-100K events/sec
  • Multi-table CDC: 200K+ events/sec aggregate
  • Kafka consumption: 100K+ events/sec per consumer

Latency

  • End-to-end CDC latency: 10-50ms (p99)
  • Kafka round-trip: 5-20ms (p99)

Resource Usage

  • CDC per table: 100-200MB memory, 0.5 CPU cores
  • Kafka consumer: 200-500MB memory, 1-2 CPU cores

BEST PRACTICES

1. Use Avro for Schema Evolution

// Good: Avro with schema registry
CDCStream::new("orders")
.with_format(Format::Avro)
.with_schema_registry("http://registry:8081")
.start()?;
// Bad: JSON without schema validation
CDCStream::new("orders")
.with_format(Format::Json)
.start()?; // Schema changes will break consumers

2. Implement Monitoring

// Monitor all streams
let monitor = StreamingMonitor::new()
.track_throughput()
.track_latency()
.alert_on_lag(Duration::from_secs(300)) // Alert if 5 min lag
.alert_on_errors(error_rate: 0.01) // Alert if > 1% error rate
.export_to_prometheus()
.start()?;

3. Handle Backpressure

// Configure backpressure
let consumer = KafkaConsumer::new()
.with_backpressure_strategy(BackpressureStrategy::DropOldest)
.with_buffer_size(10000)
.build()?;

NEXT STEPS

  1. Set up your first CDC stream
  2. Integrate with existing Kafka infrastructure
  3. Build real-time materialized views
  4. Implement monitoring and alerting
  5. Explore Flink integration for complex stream processing

  • Architecture: /home/claude/HeliosDB/docs/architecture/STREAMING_ARCHITECTURE.md
  • API Reference: Complete rustdoc in heliosdb-streaming/src/lib.rs
  • Implementation Report: /home/claude/HeliosDB/docs/releases/v5.2-v5.4/F5.1.3_IMPLEMENTATION_REPORT.md

Version: 1.0 Last Updated: November 1, 2025 Author: HeliosDB Engineering Team