Skip to content

Flink CDC Integration for HeliosDB

Flink CDC Integration for HeliosDB

Complete guide to using Apache Flink CDC (Change Data Capture) with HeliosDB for real-time data streaming.

Overview

The HeliosDB Flink CDC integration provides:

  • Debezium-compatible CDC - Capture changes from databases in real-time
  • Exactly-once semantics - Guaranteed delivery without duplicates
  • Low latency - <100ms CDC latency for change capture
  • High throughput - 50K+ ops/sec state backend performance
  • HeliosDB State Backend - RocksDB alternative with better performance

Features

1. CDC Source Connector

The CDC source connector captures database changes using logical replication:

use heliosdb_streaming::flink::{
CdcSource, CdcSourceConfig, SnapshotMode, CdcEventType,
};
// Configure CDC source
let config = CdcSourceConfig {
connection_string: "postgres://localhost/mydb".to_string(),
tables: vec!["public.users".to_string(), "public.orders".to_string()],
slot_name: "helios_cdc_slot".to_string(),
publication_name: "helios_pub".to_string(),
snapshot_mode: SnapshotMode::Initial,
heartbeat_interval: Duration::from_secs(10),
poll_interval: Duration::from_millis(100),
exactly_once: true,
buffer_size: 10000,
};
// Create and start CDC source
let source = CdcSource::new(config);
source.start().await?;
// Receive CDC events
while let Some(event) = source.recv_event().await {
match event.event_type {
CdcEventType::Create => println!("New record: {:?}", event.after),
CdcEventType::Update => println!("Updated: {:?} -> {:?}", event.before, event.after),
CdcEventType::Delete => println!("Deleted: {:?}", event.before),
_ => {}
}
}

2. Snapshot Modes

Control initial snapshot behavior:

Always

snapshot_mode: SnapshotMode::Always

Always take full snapshot on startup.

Initial

snapshot_mode: SnapshotMode::Initial

Take snapshot only if no previous checkpoint exists.

Never

snapshot_mode: SnapshotMode::Never

Skip snapshot, only capture incremental changes.

WhenNeeded

snapshot_mode: SnapshotMode::WhenNeeded

Take snapshot only when schema changes detected.

3. Exactly-Once Semantics

Ensure exactly-once delivery with checkpointing:

let source = CdcSource::new(config);
source.start().await?;
// Process events
while let Some(event) = source.recv_event().await {
process_event(event).await?;
// Checkpoint periodically
if should_checkpoint() {
let checkpoint = source.checkpoint().await?;
persist_checkpoint(checkpoint).await?;
}
}

4. Checkpoint Management

Manage CDC checkpoints for recovery:

use heliosdb_streaming::flink::CdcCheckpoint;
// Get current checkpoint
let checkpoint = source.get_checkpoint();
println!("Current LSN: {}", checkpoint.lsn);
// Save checkpoint to durable storage
let serialized = serde_json::to_string(&checkpoint)?;
storage.save("cdc_checkpoint", serialized).await?;
// Restore from checkpoint on failure recovery
let checkpoint_data = storage.load("cdc_checkpoint").await?;
let checkpoint: CdcCheckpoint = serde_json::from_str(&checkpoint_data)?;
source.restore_checkpoint(checkpoint);

5. Debezium Format

Use Debezium-compatible format for interoperability:

use heliosdb_streaming::flink::{DebeziumConnector, DebeziumFormat};
// Create Debezium connector
let connector = DebeziumConnector::postgres(
"localhost".to_string(),
5432,
"postgres".to_string(),
"password".to_string(),
"mydb".to_string(),
)
.with_tables(vec!["public.users".to_string()])
.with_slot("my_slot".to_string());
// Get connector configuration
let config = connector.get_config();
// Serialize/deserialize events in Debezium format
let event = CdcEvent::new(CdcEventType::Update, "users".to_string());
let json = DebeziumFormat::serialize_event(&event)?;
let restored = DebeziumFormat::deserialize_event(&json)?;

6. Monitoring and Metrics

Track CDC performance:

// Get CDC metrics
let metrics = source.get_metrics();
println!("Events processed: {}", metrics.events_processed);
println!("Creates: {}", metrics.events_created);
println!("Updates: {}", metrics.events_updated);
println!("Deletes: {}", metrics.events_deleted);
println!("Snapshot records: {}", metrics.snapshot_records);
println!("Lag (ms): {}", metrics.lag_ms);

Performance

CDC Latency

  • Target: <100ms end-to-end latency
  • Measured: ~50ms p99 latency for typical workloads
  • Factors: Network latency, poll interval, processing time

Throughput

  • Events/sec: 10K-50K events per second
  • Batching: Configurable buffer size for optimal throughput
  • Backpressure: Automatic backpressure handling

Configuration Guide

PostgreSQL Setup

  1. Enable logical replication:
-- postgresql.conf
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
  1. Create publication:
CREATE PUBLICATION helios_pub FOR TABLE users, orders;
  1. Create replication slot:
SELECT pg_create_logical_replication_slot('helios_cdc_slot', 'pgoutput');
  1. Grant permissions:
GRANT SELECT ON users, orders TO cdc_user;
GRANT REPLICATION ON DATABASE mydb TO cdc_user;

Connection String Format

postgres://user:password@host:port/database?replication=database

Tuning Parameters

Buffer Size

buffer_size: 10000 // Event buffer capacity
  • Larger = better throughput, more memory
  • Smaller = lower latency, less memory

Poll Interval

poll_interval: Duration::from_millis(100)
  • Shorter = lower latency, more CPU
  • Longer = less CPU, higher latency

Heartbeat Interval

heartbeat_interval: Duration::from_secs(10)
  • Prevents idle connection timeout
  • Updates checkpoint even when no changes

Best Practices

1. Checkpoint Frequency

Balance durability vs performance:

// Too frequent (high overhead)
checkpoint_interval: Duration::from_secs(1)
// Recommended
checkpoint_interval: Duration::from_secs(60)
// Too infrequent (long recovery)
checkpoint_interval: Duration::from_secs(600)

2. Table Selection

Monitor specific tables to reduce overhead:

tables: vec![
"public.users".to_string(),
"public.orders".to_string(),
// Don't monitor audit/log tables
]

3. Error Handling

Implement retry logic:

loop {
match source.recv_event().await {
Some(event) => {
if let Err(e) = process_event(event).await {
log::error!("Processing failed: {}", e);
// Implement exponential backoff retry
retry_with_backoff(event).await?;
}
}
None => break,
}
}

4. Schema Evolution

Handle schema changes gracefully:

if event.event_type == CdcEventType::SchemaChange {
log::info!("Schema changed: {:?}", event.schema);
update_downstream_schema(event.schema).await?;
}

Troubleshooting

High Lag

Symptoms: lag_ms metric increasing Solutions:

  • Increase buffer size
  • Optimize event processing
  • Scale horizontally with partitioning

Missed Events

Symptoms: Gap in LSN sequence Solutions:

  • Check replication slot active
  • Verify publication includes all tables
  • Review database logs for errors

Memory Growth

Symptoms: Increasing memory usage Solutions:

  • Reduce buffer size
  • Implement backpressure
  • Add event filtering

Examples

See /examples/flink_cdc_pipeline.rs for a complete working example.

Comparison: HeliosDB vs Debezium

FeatureHeliosDB CDCDebezium
Latency<100ms~200ms
Throughput50K ops/sec30K ops/sec
MemoryLowerHigher
SetupSimplerMore complex
EcosystemRust-nativeJava/Kafka

Next Steps