Skip to content

HeliosDB CDC - Change Data Capture

HeliosDB CDC - Change Data Capture

High-performance Change Data Capture (CDC) for HeliosDB v3.0. Captures database changes from the Write-Ahead Log (WAL) and streams them to external systems like Apache Kafka and AWS Kinesis.

Features

  • WAL-Based Capture: Minimal performance impact by reading from Write-Ahead Log
  • Multiple Sinks: Kafka, AWS Kinesis support with extensible sink interface
  • Flexible Serialization: JSON, Avro with schema registry integration
  • Event Filtering: Powerful filtering and transformation capabilities
  • Reliable Delivery: At-least-once semantics with checkpoint management
  • Monitoring: Comprehensive Prometheus metrics
  • Low Latency: Optimized for real-time data streaming

Architecture

┌─────────────┐
│ HeliosDB │
│ WAL │
└──────┬──────┘
┌──────────────────┐
│ Event Processor │
│ - WAL Reader │
│ - Filter Engine │
│ - Serializer │
└────────┬─────────┘
┌────┴────┐
↓ ↓
┌────────┐ ┌──────────┐
│ Kafka │ │ Kinesis │
└────────┘ └──────────┘

Quick Start

Basic Usage with Kafka

use heliosdb_cdc::{CdcConfig, EventProcessor, KafkaConnector};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Configure CDC
let config = CdcConfig::builder()
.wal_path("/var/lib/heliosdb/wal")
.database("mydb")
.checkpoint_interval(1000)
.build()?;
// Create Kafka connector
let kafka = KafkaConnector::new("localhost:9092", "heliosdb-cdc").await?;
// Start event processor
let mut processor = EventProcessor::new(config, Box::new(kafka));
processor.start().await?;
Ok(())
}

With Event Filtering

use heliosdb_cdc::{
CdcConfig, EventProcessor, KafkaConnector,
EventFilter, FilterCondition, FilterRule, TransformRule,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create filter
let filter = EventFilter::new()
.add_rule(FilterRule::include(
FilterCondition::Table("users".to_string())
))
.add_rule(FilterRule::include(
FilterCondition::Table("orders".to_string())
))
.add_transform(TransformRule::AddMetadata {
key: "source".to_string(),
value: "heliosdb-prod".to_string(),
});
// Configure and start
let config = CdcConfig::builder()
.wal_path("/var/lib/heliosdb/wal")
.database("production")
.build()?;
let kafka = KafkaConnector::new("broker1:9092", "prod-changes").await?;
let mut processor = EventProcessor::new(config, Box::new(kafka))
.with_filter(filter);
processor.start().await?;
Ok(())
}

AWS Kinesis Integration

use heliosdb_cdc::{CdcConfig, EventProcessor, KinesisConnector, SerializationFormat};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create Kinesis connector with Avro serialization
let kinesis = KinesisConnectorBuilder::new("us-east-1", "heliosdb-stream")
.format(SerializationFormat::Avro)
.batch_size(500)
.build()
.await?;
let config = CdcConfig::builder()
.wal_path("/var/lib/heliosdb/wal")
.database("analytics")
.build()?;
let mut processor = EventProcessor::new(config, Box::new(kinesis));
processor.start().await?;
Ok(())
}

Configuration

CDC Config Options

OptionDescriptionDefault
wal_pathPath to WAL directory/var/lib/heliosdb/wal
checkpoint_intervalEvents between checkpoints1000
batch_sizeEvents per batch100
buffer_sizeEvent channel buffer size10000
poll_interval_msWAL polling interval100
enable_metricsEnable Prometheus metricstrue

Kafka Config Options

OptionDescriptionDefault
brokersKafka broker addresseslocalhost:9092
topicTopic nameheliosdb-cdc
formatSerialization formatJson
compressionCompression typesnappy
batch_sizeBatch size in bytes1048576 (1MB)
enable_idempotenceEnable idempotent producertrue

Kinesis Config Options

OptionDescriptionDefault
regionAWS regionus-east-1
stream_nameStream nameheliosdb-cdc
formatSerialization formatJson
batch_sizeRecords per batch500
max_retriesRetry attempts3

Event Filtering

Filter by Table

let filter = FilterBuilder::new()
.include_table("users".to_string())
.include_table("orders".to_string())
.exclude_table("logs".to_string())
.build();

Filter by Operation

let filter = FilterBuilder::new()
.include_operations(vec![
OperationType::Insert,
OperationType::Update,
])
.build();

Complex Filters

use regex::Regex;
let filter = EventFilter::new()
.add_rule(FilterRule::include(
FilterCondition::And(vec![
FilterCondition::Database("production".to_string()),
FilterCondition::TablePattern(Regex::new("user_.*").unwrap()),
])
));

Event Transformation

Add Metadata

let filter = EventFilter::new()
.add_transform(TransformRule::AddMetadata {
key: "datacenter".to_string(),
value: "us-west-2".to_string(),
});

Mask Sensitive Data

let filter = EventFilter::new()
.add_transform(TransformRule::MaskValue);

Hash Keys (PII Protection)

let filter = EventFilter::new()
.add_transform(TransformRule::HashKey);

Monitoring

CDC exports the following Prometheus metrics:

  • heliosdb_cdc_events_processed_total - Total events processed
  • heliosdb_cdc_events_filtered_total - Total events filtered
  • heliosdb_cdc_events_failed_total - Total failed events
  • heliosdb_cdc_lag_seconds - Current replication lag
  • heliosdb_cdc_wal_sequence - Current WAL sequence number
  • heliosdb_cdc_bytes_processed_total - Total bytes processed
  • heliosdb_cdc_event_latency_seconds - Event processing latency
  • heliosdb_cdc_events_by_operation_total - Events by operation type
  • heliosdb_cdc_events_by_table_total - Events by table
  • heliosdb_cdc_sink_latency_seconds - Sink send latency

Event Schema

JSON Format

{
"event_id": "evt_123456",
"timestamp": 1234567890000,
"sequence": 42,
"operation": "INSERT",
"database": "mydb",
"table": "users",
"key": "dXNlcjEyMw==",
"value": "eyJuYW1lIjoiQWxpY2UifQ==",
"old_value": null,
"transaction_id": "txn_789",
"metadata": {
"source": "heliosdb"
}
}

Avro Schema

{
"type": "record",
"name": "CdcEvent",
"namespace": "heliosdb.cdc",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "sequence", "type": "long"},
{"name": "operation", "type": "string"},
{"name": "database", "type": "string"},
{"name": "table", "type": "string"},
{"name": "key", "type": "bytes"},
{"name": "value", "type": ["null", "bytes"]},
{"name": "old_value", "type": ["null", "bytes"]},
{"name": "transaction_id", "type": ["null", "string"]},
{"name": "metadata", "type": {"type": "map", "values": "string"}}
]
}

Custom Sinks

Implement the EventSink trait to create custom sinks:

use heliosdb_cdc::{EventSink, CdcEvent, Result};
use async_trait::async_trait;
struct CustomSink {
// Your implementation
}
#[async_trait]
impl EventSink for CustomSink {
async fn send(&self, event: &CdcEvent) -> Result<()> {
// Send event to your system
Ok(())
}
async fn flush(&self) -> Result<()> {
// Flush any buffered events
Ok(())
}
async fn close(&self) -> Result<()> {
// Cleanup resources
Ok(())
}
fn name(&self) -> &str {
"custom"
}
}

Performance Considerations

  1. Batch Size: Larger batches reduce overhead but increase latency
  2. Checkpoint Interval: More frequent checkpoints increase durability but add overhead
  3. Buffer Size: Larger buffers handle bursts better but use more memory
  4. Compression: Enable compression for better throughput over network
  5. Filtering: Apply filters early to reduce downstream processing

Best Practices

  1. Monitor Lag: Keep cdc_lag_seconds metric low
  2. Use Batching: Enable batching for better throughput
  3. Checkpoint Regularly: Balance durability vs. performance
  4. Filter Early: Apply filters before serialization
  5. Use Compression: Enable compression for network efficiency
  6. Handle Backpressure: Ensure sinks can handle peak load

Troubleshooting

High Lag

  • Check sink capacity and throughput
  • Increase batch size
  • Reduce checkpoint frequency
  • Scale out sink (more Kafka partitions, Kinesis shards)

Missing Events

  • Check filter configuration
  • Verify WAL checkpoint persistence
  • Review error logs and metrics

High Memory Usage

  • Reduce buffer size
  • Increase batch send frequency
  • Check for sink backpressure

License

Apache-2.0