HeliosDB CDC - Change Data Capture
HeliosDB CDC - Change Data Capture
High-performance Change Data Capture (CDC) for HeliosDB v3.0. Captures database changes from the Write-Ahead Log (WAL) and streams them to external systems like Apache Kafka and AWS Kinesis.
Features
- WAL-Based Capture: Minimal performance impact by reading from Write-Ahead Log
- Multiple Sinks: Kafka, AWS Kinesis support with extensible sink interface
- Flexible Serialization: JSON, Avro with schema registry integration
- Event Filtering: Powerful filtering and transformation capabilities
- Reliable Delivery: At-least-once semantics with checkpoint management
- Monitoring: Comprehensive Prometheus metrics
- Low Latency: Optimized for real-time data streaming
Architecture
┌─────────────┐│ HeliosDB ││ WAL │└──────┬──────┘ │ ↓┌──────────────────┐│ Event Processor ││ - WAL Reader ││ - Filter Engine ││ - Serializer │└────────┬─────────┘ │ ┌────┴────┐ ↓ ↓┌────────┐ ┌──────────┐│ Kafka │ │ Kinesis │└────────┘ └──────────┘Quick Start
Basic Usage with Kafka
use heliosdb_cdc::{CdcConfig, EventProcessor, KafkaConnector};
#[tokio::main]async fn main() -> anyhow::Result<()> { // Configure CDC let config = CdcConfig::builder() .wal_path("/var/lib/heliosdb/wal") .database("mydb") .checkpoint_interval(1000) .build()?;
// Create Kafka connector let kafka = KafkaConnector::new("localhost:9092", "heliosdb-cdc").await?;
// Start event processor let mut processor = EventProcessor::new(config, Box::new(kafka)); processor.start().await?;
Ok(())}With Event Filtering
use heliosdb_cdc::{ CdcConfig, EventProcessor, KafkaConnector, EventFilter, FilterCondition, FilterRule, TransformRule,};
#[tokio::main]async fn main() -> anyhow::Result<()> { // Create filter let filter = EventFilter::new() .add_rule(FilterRule::include( FilterCondition::Table("users".to_string()) )) .add_rule(FilterRule::include( FilterCondition::Table("orders".to_string()) )) .add_transform(TransformRule::AddMetadata { key: "source".to_string(), value: "heliosdb-prod".to_string(), });
// Configure and start let config = CdcConfig::builder() .wal_path("/var/lib/heliosdb/wal") .database("production") .build()?;
let kafka = KafkaConnector::new("broker1:9092", "prod-changes").await?; let mut processor = EventProcessor::new(config, Box::new(kafka)) .with_filter(filter);
processor.start().await?; Ok(())}AWS Kinesis Integration
use heliosdb_cdc::{CdcConfig, EventProcessor, KinesisConnector, SerializationFormat};
#[tokio::main]async fn main() -> anyhow::Result<()> { // Create Kinesis connector with Avro serialization let kinesis = KinesisConnectorBuilder::new("us-east-1", "heliosdb-stream") .format(SerializationFormat::Avro) .batch_size(500) .build() .await?;
let config = CdcConfig::builder() .wal_path("/var/lib/heliosdb/wal") .database("analytics") .build()?;
let mut processor = EventProcessor::new(config, Box::new(kinesis)); processor.start().await?;
Ok(())}Configuration
CDC Config Options
| Option | Description | Default |
|---|---|---|
wal_path | Path to WAL directory | /var/lib/heliosdb/wal |
checkpoint_interval | Events between checkpoints | 1000 |
batch_size | Events per batch | 100 |
buffer_size | Event channel buffer size | 10000 |
poll_interval_ms | WAL polling interval | 100 |
enable_metrics | Enable Prometheus metrics | true |
Kafka Config Options
| Option | Description | Default |
|---|---|---|
brokers | Kafka broker addresses | localhost:9092 |
topic | Topic name | heliosdb-cdc |
format | Serialization format | Json |
compression | Compression type | snappy |
batch_size | Batch size in bytes | 1048576 (1MB) |
enable_idempotence | Enable idempotent producer | true |
Kinesis Config Options
| Option | Description | Default |
|---|---|---|
region | AWS region | us-east-1 |
stream_name | Stream name | heliosdb-cdc |
format | Serialization format | Json |
batch_size | Records per batch | 500 |
max_retries | Retry attempts | 3 |
Event Filtering
Filter by Table
let filter = FilterBuilder::new() .include_table("users".to_string()) .include_table("orders".to_string()) .exclude_table("logs".to_string()) .build();Filter by Operation
let filter = FilterBuilder::new() .include_operations(vec![ OperationType::Insert, OperationType::Update, ]) .build();Complex Filters
use regex::Regex;
let filter = EventFilter::new() .add_rule(FilterRule::include( FilterCondition::And(vec![ FilterCondition::Database("production".to_string()), FilterCondition::TablePattern(Regex::new("user_.*").unwrap()), ]) ));Event Transformation
Add Metadata
let filter = EventFilter::new() .add_transform(TransformRule::AddMetadata { key: "datacenter".to_string(), value: "us-west-2".to_string(), });Mask Sensitive Data
let filter = EventFilter::new() .add_transform(TransformRule::MaskValue);Hash Keys (PII Protection)
let filter = EventFilter::new() .add_transform(TransformRule::HashKey);Monitoring
CDC exports the following Prometheus metrics:
heliosdb_cdc_events_processed_total- Total events processedheliosdb_cdc_events_filtered_total- Total events filteredheliosdb_cdc_events_failed_total- Total failed eventsheliosdb_cdc_lag_seconds- Current replication lagheliosdb_cdc_wal_sequence- Current WAL sequence numberheliosdb_cdc_bytes_processed_total- Total bytes processedheliosdb_cdc_event_latency_seconds- Event processing latencyheliosdb_cdc_events_by_operation_total- Events by operation typeheliosdb_cdc_events_by_table_total- Events by tableheliosdb_cdc_sink_latency_seconds- Sink send latency
Event Schema
JSON Format
{ "event_id": "evt_123456", "timestamp": 1234567890000, "sequence": 42, "operation": "INSERT", "database": "mydb", "table": "users", "key": "dXNlcjEyMw==", "value": "eyJuYW1lIjoiQWxpY2UifQ==", "old_value": null, "transaction_id": "txn_789", "metadata": { "source": "heliosdb" }}Avro Schema
{ "type": "record", "name": "CdcEvent", "namespace": "heliosdb.cdc", "fields": [ {"name": "event_id", "type": "string"}, {"name": "timestamp", "type": "long"}, {"name": "sequence", "type": "long"}, {"name": "operation", "type": "string"}, {"name": "database", "type": "string"}, {"name": "table", "type": "string"}, {"name": "key", "type": "bytes"}, {"name": "value", "type": ["null", "bytes"]}, {"name": "old_value", "type": ["null", "bytes"]}, {"name": "transaction_id", "type": ["null", "string"]}, {"name": "metadata", "type": {"type": "map", "values": "string"}} ]}Custom Sinks
Implement the EventSink trait to create custom sinks:
use heliosdb_cdc::{EventSink, CdcEvent, Result};use async_trait::async_trait;
struct CustomSink { // Your implementation}
#[async_trait]impl EventSink for CustomSink { async fn send(&self, event: &CdcEvent) -> Result<()> { // Send event to your system Ok(()) }
async fn flush(&self) -> Result<()> { // Flush any buffered events Ok(()) }
async fn close(&self) -> Result<()> { // Cleanup resources Ok(()) }
fn name(&self) -> &str { "custom" }}Performance Considerations
- Batch Size: Larger batches reduce overhead but increase latency
- Checkpoint Interval: More frequent checkpoints increase durability but add overhead
- Buffer Size: Larger buffers handle bursts better but use more memory
- Compression: Enable compression for better throughput over network
- Filtering: Apply filters early to reduce downstream processing
Best Practices
- Monitor Lag: Keep
cdc_lag_secondsmetric low - Use Batching: Enable batching for better throughput
- Checkpoint Regularly: Balance durability vs. performance
- Filter Early: Apply filters before serialization
- Use Compression: Enable compression for network efficiency
- Handle Backpressure: Ensure sinks can handle peak load
Troubleshooting
High Lag
- Check sink capacity and throughput
- Increase batch size
- Reduce checkpoint frequency
- Scale out sink (more Kafka partitions, Kinesis shards)
Missing Events
- Check filter configuration
- Verify WAL checkpoint persistence
- Review error logs and metrics
High Memory Usage
- Reduce buffer size
- Increase batch send frequency
- Check for sink backpressure
License
Apache-2.0