HeliosDB CDC Quick Reference
HeliosDB CDC Quick Reference
Installation
Add to your Cargo.toml:
[dependencies]heliosdb-cdc = { path = "../heliosdb-cdc" }Basic Usage
Kafka CDC
use heliosdb_cdc::{CdcConfig, EventProcessor, KafkaConnector};
#[tokio::main]async fn main() -> anyhow::Result<()> { // Configure let config = CdcConfig::builder() .wal_path("/var/lib/heliosdb/wal") .database("mydb") .build()?;
// Create Kafka sink let kafka = KafkaConnector::new("localhost:9092", "cdc-topic").await?;
// Start processor let mut processor = EventProcessor::new(config, Box::new(kafka)); processor.start().await?;
Ok(())}Kinesis CDC
use heliosdb_cdc::{CdcConfig, EventProcessor, KinesisConnector};
let kinesis = KinesisConnector::new("us-east-1", "cdc-stream").await?;let processor = EventProcessor::new(config, Box::new(kinesis));Configuration
CdcConfig Options
CdcConfig::builder() .wal_path("/path/to/wal") // WAL directory .database("mydb") // Database name .checkpoint_interval(1000) // Checkpoint every N events .batch_size(100) // Events per batch .buffer_size(10000) // Event channel buffer .poll_interval_ms(100) // WAL poll interval .enable_metrics(true) // Enable Prometheus .build()?KafkaConfig Options
KafkaConfig::new("brokers", "topic") .with_format(SerializationFormat::Json) .with_compression("snappy") .with_sasl("PLAIN", "user", "pass")KinesisConfig Options
KinesisConfig::new("region", "stream") .with_format(SerializationFormat::Avro) .with_batch_size(500)Filtering
Simple Filters
use heliosdb_cdc::{FilterBuilder, FilterCondition, FilterRule};
// Include specific tableslet filter = FilterBuilder::new() .include_table("users".to_string()) .include_table("orders".to_string()) .build();
// Exclude tableslet filter = FilterBuilder::new() .exclude_table("temp_logs".to_string()) .build();
// By operation typelet filter = FilterBuilder::new() .include_operations(vec![ OperationType::Insert, OperationType::Update, ]) .build();Complex Filters
use heliosdb_cdc::{EventFilter, FilterCondition, FilterRule};use regex::Regex;
let filter = EventFilter::new() .add_rule(FilterRule::include( FilterCondition::And(vec![ FilterCondition::Database("prod".to_string()), FilterCondition::TablePattern(Regex::new("user_.*")?), ]) ));Transformations
use heliosdb_cdc::{EventFilter, TransformRule};
let filter = EventFilter::new() // Add metadata .add_transform(TransformRule::AddMetadata { key: "source".to_string(), value: "heliosdb-prod".to_string(), }) // Mask sensitive values .add_transform(TransformRule::MaskValue) // Hash keys for PII protection .add_transform(TransformRule::HashKey);Custom Sinks
use heliosdb_cdc::{EventSink, CdcEvent, Result};use async_trait::async_trait;
struct MySink;
#[async_trait]impl EventSink for MySink { async fn send(&self, event: &CdcEvent) -> Result<()> { // Your logic here println!("Event: {:?}", event); Ok(()) }
async fn flush(&self) -> Result<()> { Ok(()) }
async fn close(&self) -> Result<()> { Ok(()) }
fn name(&self) -> &str { "my-sink" }}Event Schema
CdcEvent Structure
pub struct CdcEvent { pub event_id: String, // Unique event ID pub timestamp: u64, // Microseconds since epoch pub sequence: u64, // WAL sequence number pub operation: OperationType, // INSERT, UPDATE, DELETE, etc. pub database: String, // Database name pub table: String, // Table name pub key: Bytes, // Record key pub value: Option<Bytes>, // Record value (None for DELETE) pub old_value: Option<Bytes>, // Previous value (for UPDATE) pub transaction_id: Option<String>, // Transaction ID pub metadata: HashMap<String, String>, // Additional metadata}OperationType
pub enum OperationType { Insert, Update, Delete, Truncate, SchemaChange,}Monitoring
Prometheus Metrics
# Event countersheliosdb_cdc_events_processed_totalheliosdb_cdc_events_filtered_totalheliosdb_cdc_events_failed_total
# Performanceheliosdb_cdc_lag_secondsheliosdb_cdc_event_latency_secondsheliosdb_cdc_sink_latency_seconds
# By operationheliosdb_cdc_events_by_operation_total{operation="INSERT"}heliosdb_cdc_events_by_operation_total{operation="UPDATE"}
# By tableheliosdb_cdc_events_by_table_total{database="mydb",table="users"}Error Handling
use heliosdb_cdc::{Result, CdcError};
match processor.start().await { Ok(_) => println!("CDC started"), Err(CdcError::WalRead(msg)) => eprintln!("WAL error: {}", msg), Err(CdcError::Kafka(msg)) => eprintln!("Kafka error: {}", msg), Err(CdcError::Kinesis(msg)) => eprintln!("Kinesis error: {}", msg), Err(e) => eprintln!("Error: {}", e),}Common Patterns
Filter + Transform Pipeline
let filter = EventFilter::new() .add_rule(FilterRule::include( FilterCondition::Table("sensitive_data".to_string()) )) .add_transform(TransformRule::MaskValue) .add_transform(TransformRule::HashKey);
let processor = EventProcessor::new(config, sink) .with_filter(filter);Multi-Database CDC
// Separate processor per databaselet db1_processor = EventProcessor::new( CdcConfig::builder().database("db1").build()?, Box::new(KafkaConnector::new("broker", "db1-changes").await?));
let db2_processor = EventProcessor::new( CdcConfig::builder().database("db2").build()?, Box::new(KafkaConnector::new("broker", "db2-changes").await?));Graceful Shutdown
let mut processor = EventProcessor::new(config, sink);
tokio::spawn(async move { processor.start().await.unwrap();});
// Wait for signaltokio::signal::ctrl_c().await?;
// Stop processorprocessor.stop().await?;Testing
Mock Sink
use heliosdb_cdc::EventSink;
struct MockSink { events: Arc<RwLock<Vec<CdcEvent>>>,}
#[async_trait]impl EventSink for MockSink { async fn send(&self, event: &CdcEvent) -> Result<()> { self.events.write().push(event.clone()); Ok(()) } // ... implement other methods}Integration Test
#[tokio::test]async fn test_cdc_pipeline() { let sink = MockSink::new(); let config = CdcConfig::builder() .wal_path(test_wal_path) .build()?;
let processor = EventProcessor::new(config, Box::new(sink.clone()));
// Test CDC flow processor.start().await?;
// Verify events assert_eq!(sink.event_count(), expected_count);}Performance Tuning
High Throughput
CdcConfig::builder() .batch_size(500) // Larger batches .buffer_size(50000) // Larger buffer .checkpoint_interval(5000) // Less frequent checkpoints .poll_interval_ms(10) // Faster polling .build()?Low Latency
CdcConfig::builder() .batch_size(10) // Smaller batches .poll_interval_ms(50) // Moderate polling .checkpoint_interval(100) // More frequent checkpoints .build()?Balanced
CdcConfig::builder() .batch_size(100) // Default .buffer_size(10000) // Default .checkpoint_interval(1000) // Default .poll_interval_ms(100) // Default .build()?Troubleshooting
High Lag
- Check
heliosdb_cdc_lag_secondsmetric - Increase batch size
- Reduce checkpoint frequency
- Scale sink capacity
Missing Events
- Verify filter configuration
- Check
heliosdb_cdc_events_filtered_total - Review checkpoint persistence
- Check error logs
High CPU Usage
- Reduce poll frequency
- Increase batch size
- Enable compression
- Check filter complexity
Memory Issues
- Reduce buffer size
- Increase batch send frequency
- Check for sink backpressure
- Monitor event queue depth
Best Practices
- Always use checkpoints - Enable checkpoint persistence
- Monitor lag - Keep lag under 60 seconds
- Use batching - Batch size > 50 for better throughput
- Filter early - Apply filters before serialization
- Handle backpressure - Ensure sink can handle peak load
- Test thoroughly - Use integration tests
- Monitor metrics - Set up Prometheus alerts
- Graceful shutdown - Always call
processor.stop()
Examples
See the examples/ directory:
basic_kafka.rs- Basic Kafka CDCfiltered_kinesis.rs- Kinesis with filteringcustom_sink.rs- Custom sink implementation
Run examples:
cargo run --example basic_kafkacargo run --example filtered_kinesiscargo run --example custom_sinkDocumentation
- Full documentation:
README.md - Implementation details:
PACKAGE_SUMMARY.md - API docs:
cargo doc --open -p heliosdb-cdc