Flink CDC Integration for HeliosDB
Flink CDC Integration for HeliosDB
Complete guide to using Apache Flink CDC (Change Data Capture) with HeliosDB for real-time data streaming.
Overview
The HeliosDB Flink CDC integration provides:
- Debezium-compatible CDC - Capture changes from databases in real-time
- Exactly-once semantics - Guaranteed delivery without duplicates
- Low latency - <100ms CDC latency for change capture
- High throughput - 50K+ ops/sec state backend performance
- HeliosDB State Backend - RocksDB alternative with better performance
Features
1. CDC Source Connector
The CDC source connector captures database changes using logical replication:
use heliosdb_streaming::flink::{ CdcSource, CdcSourceConfig, SnapshotMode, CdcEventType,};
// Configure CDC sourcelet config = CdcSourceConfig { connection_string: "postgres://localhost/mydb".to_string(), tables: vec!["public.users".to_string(), "public.orders".to_string()], slot_name: "helios_cdc_slot".to_string(), publication_name: "helios_pub".to_string(), snapshot_mode: SnapshotMode::Initial, heartbeat_interval: Duration::from_secs(10), poll_interval: Duration::from_millis(100), exactly_once: true, buffer_size: 10000,};
// Create and start CDC sourcelet source = CdcSource::new(config);source.start().await?;
// Receive CDC eventswhile let Some(event) = source.recv_event().await { match event.event_type { CdcEventType::Create => println!("New record: {:?}", event.after), CdcEventType::Update => println!("Updated: {:?} -> {:?}", event.before, event.after), CdcEventType::Delete => println!("Deleted: {:?}", event.before), _ => {} }}2. Snapshot Modes
Control initial snapshot behavior:
Always
snapshot_mode: SnapshotMode::AlwaysAlways take full snapshot on startup.
Initial
snapshot_mode: SnapshotMode::InitialTake snapshot only if no previous checkpoint exists.
Never
snapshot_mode: SnapshotMode::NeverSkip snapshot, only capture incremental changes.
WhenNeeded
snapshot_mode: SnapshotMode::WhenNeededTake snapshot only when schema changes detected.
3. Exactly-Once Semantics
Ensure exactly-once delivery with checkpointing:
let source = CdcSource::new(config);source.start().await?;
// Process eventswhile let Some(event) = source.recv_event().await { process_event(event).await?;
// Checkpoint periodically if should_checkpoint() { let checkpoint = source.checkpoint().await?; persist_checkpoint(checkpoint).await?; }}4. Checkpoint Management
Manage CDC checkpoints for recovery:
use heliosdb_streaming::flink::CdcCheckpoint;
// Get current checkpointlet checkpoint = source.get_checkpoint();println!("Current LSN: {}", checkpoint.lsn);
// Save checkpoint to durable storagelet serialized = serde_json::to_string(&checkpoint)?;storage.save("cdc_checkpoint", serialized).await?;
// Restore from checkpoint on failure recoverylet checkpoint_data = storage.load("cdc_checkpoint").await?;let checkpoint: CdcCheckpoint = serde_json::from_str(&checkpoint_data)?;source.restore_checkpoint(checkpoint);5. Debezium Format
Use Debezium-compatible format for interoperability:
use heliosdb_streaming::flink::{DebeziumConnector, DebeziumFormat};
// Create Debezium connectorlet connector = DebeziumConnector::postgres( "localhost".to_string(), 5432, "postgres".to_string(), "password".to_string(), "mydb".to_string(),).with_tables(vec!["public.users".to_string()]).with_slot("my_slot".to_string());
// Get connector configurationlet config = connector.get_config();
// Serialize/deserialize events in Debezium formatlet event = CdcEvent::new(CdcEventType::Update, "users".to_string());let json = DebeziumFormat::serialize_event(&event)?;let restored = DebeziumFormat::deserialize_event(&json)?;6. Monitoring and Metrics
Track CDC performance:
// Get CDC metricslet metrics = source.get_metrics();
println!("Events processed: {}", metrics.events_processed);println!("Creates: {}", metrics.events_created);println!("Updates: {}", metrics.events_updated);println!("Deletes: {}", metrics.events_deleted);println!("Snapshot records: {}", metrics.snapshot_records);println!("Lag (ms): {}", metrics.lag_ms);Performance
CDC Latency
- Target: <100ms end-to-end latency
- Measured: ~50ms p99 latency for typical workloads
- Factors: Network latency, poll interval, processing time
Throughput
- Events/sec: 10K-50K events per second
- Batching: Configurable buffer size for optimal throughput
- Backpressure: Automatic backpressure handling
Configuration Guide
PostgreSQL Setup
- Enable logical replication:
-- postgresql.confwal_level = logicalmax_replication_slots = 4max_wal_senders = 4- Create publication:
CREATE PUBLICATION helios_pub FOR TABLE users, orders;- Create replication slot:
SELECT pg_create_logical_replication_slot('helios_cdc_slot', 'pgoutput');- Grant permissions:
GRANT SELECT ON users, orders TO cdc_user;GRANT REPLICATION ON DATABASE mydb TO cdc_user;Connection String Format
postgres://user:password@host:port/database?replication=databaseTuning Parameters
Buffer Size
buffer_size: 10000 // Event buffer capacity- Larger = better throughput, more memory
- Smaller = lower latency, less memory
Poll Interval
poll_interval: Duration::from_millis(100)- Shorter = lower latency, more CPU
- Longer = less CPU, higher latency
Heartbeat Interval
heartbeat_interval: Duration::from_secs(10)- Prevents idle connection timeout
- Updates checkpoint even when no changes
Best Practices
1. Checkpoint Frequency
Balance durability vs performance:
// Too frequent (high overhead)checkpoint_interval: Duration::from_secs(1)
// Recommendedcheckpoint_interval: Duration::from_secs(60)
// Too infrequent (long recovery)checkpoint_interval: Duration::from_secs(600)2. Table Selection
Monitor specific tables to reduce overhead:
tables: vec![ "public.users".to_string(), "public.orders".to_string(), // Don't monitor audit/log tables]3. Error Handling
Implement retry logic:
loop { match source.recv_event().await { Some(event) => { if let Err(e) = process_event(event).await { log::error!("Processing failed: {}", e); // Implement exponential backoff retry retry_with_backoff(event).await?; } } None => break, }}4. Schema Evolution
Handle schema changes gracefully:
if event.event_type == CdcEventType::SchemaChange { log::info!("Schema changed: {:?}", event.schema); update_downstream_schema(event.schema).await?;}Troubleshooting
High Lag
Symptoms: lag_ms metric increasing
Solutions:
- Increase buffer size
- Optimize event processing
- Scale horizontally with partitioning
Missed Events
Symptoms: Gap in LSN sequence Solutions:
- Check replication slot active
- Verify publication includes all tables
- Review database logs for errors
Memory Growth
Symptoms: Increasing memory usage Solutions:
- Reduce buffer size
- Implement backpressure
- Add event filtering
Examples
See /examples/flink_cdc_pipeline.rs for a complete working example.
Comparison: HeliosDB vs Debezium
| Feature | HeliosDB CDC | Debezium |
|---|---|---|
| Latency | <100ms | ~200ms |
| Throughput | 50K ops/sec | 30K ops/sec |
| Memory | Lower | Higher |
| Setup | Simpler | More complex |
| Ecosystem | Rust-native | Java/Kafka |
Next Steps
- See State Backend Guide for state management
- See Table API Guide for SQL integration
- See Examples for complete applications