Skip to content

HeliosDB CDC Quick Reference

HeliosDB CDC Quick Reference

Installation

Add to your Cargo.toml:

[dependencies]
heliosdb-cdc = { path = "../heliosdb-cdc" }

Basic Usage

Kafka CDC

use heliosdb_cdc::{CdcConfig, EventProcessor, KafkaConnector};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Configure
let config = CdcConfig::builder()
.wal_path("/var/lib/heliosdb/wal")
.database("mydb")
.build()?;
// Create Kafka sink
let kafka = KafkaConnector::new("localhost:9092", "cdc-topic").await?;
// Start processor
let mut processor = EventProcessor::new(config, Box::new(kafka));
processor.start().await?;
Ok(())
}

Kinesis CDC

use heliosdb_cdc::{CdcConfig, EventProcessor, KinesisConnector};
let kinesis = KinesisConnector::new("us-east-1", "cdc-stream").await?;
let processor = EventProcessor::new(config, Box::new(kinesis));

Configuration

CdcConfig Options

CdcConfig::builder()
.wal_path("/path/to/wal") // WAL directory
.database("mydb") // Database name
.checkpoint_interval(1000) // Checkpoint every N events
.batch_size(100) // Events per batch
.buffer_size(10000) // Event channel buffer
.poll_interval_ms(100) // WAL poll interval
.enable_metrics(true) // Enable Prometheus
.build()?

KafkaConfig Options

KafkaConfig::new("brokers", "topic")
.with_format(SerializationFormat::Json)
.with_compression("snappy")
.with_sasl("PLAIN", "user", "pass")

KinesisConfig Options

KinesisConfig::new("region", "stream")
.with_format(SerializationFormat::Avro)
.with_batch_size(500)

Filtering

Simple Filters

use heliosdb_cdc::{FilterBuilder, FilterCondition, FilterRule};
// Include specific tables
let filter = FilterBuilder::new()
.include_table("users".to_string())
.include_table("orders".to_string())
.build();
// Exclude tables
let filter = FilterBuilder::new()
.exclude_table("temp_logs".to_string())
.build();
// By operation type
let filter = FilterBuilder::new()
.include_operations(vec![
OperationType::Insert,
OperationType::Update,
])
.build();

Complex Filters

use heliosdb_cdc::{EventFilter, FilterCondition, FilterRule};
use regex::Regex;
let filter = EventFilter::new()
.add_rule(FilterRule::include(
FilterCondition::And(vec![
FilterCondition::Database("prod".to_string()),
FilterCondition::TablePattern(Regex::new("user_.*")?),
])
));

Transformations

use heliosdb_cdc::{EventFilter, TransformRule};
let filter = EventFilter::new()
// Add metadata
.add_transform(TransformRule::AddMetadata {
key: "source".to_string(),
value: "heliosdb-prod".to_string(),
})
// Mask sensitive values
.add_transform(TransformRule::MaskValue)
// Hash keys for PII protection
.add_transform(TransformRule::HashKey);

Custom Sinks

use heliosdb_cdc::{EventSink, CdcEvent, Result};
use async_trait::async_trait;
struct MySink;
#[async_trait]
impl EventSink for MySink {
async fn send(&self, event: &CdcEvent) -> Result<()> {
// Your logic here
println!("Event: {:?}", event);
Ok(())
}
async fn flush(&self) -> Result<()> {
Ok(())
}
async fn close(&self) -> Result<()> {
Ok(())
}
fn name(&self) -> &str {
"my-sink"
}
}

Event Schema

CdcEvent Structure

pub struct CdcEvent {
pub event_id: String, // Unique event ID
pub timestamp: u64, // Microseconds since epoch
pub sequence: u64, // WAL sequence number
pub operation: OperationType, // INSERT, UPDATE, DELETE, etc.
pub database: String, // Database name
pub table: String, // Table name
pub key: Bytes, // Record key
pub value: Option<Bytes>, // Record value (None for DELETE)
pub old_value: Option<Bytes>, // Previous value (for UPDATE)
pub transaction_id: Option<String>, // Transaction ID
pub metadata: HashMap<String, String>, // Additional metadata
}

OperationType

pub enum OperationType {
Insert,
Update,
Delete,
Truncate,
SchemaChange,
}

Monitoring

Prometheus Metrics

# Event counters
heliosdb_cdc_events_processed_total
heliosdb_cdc_events_filtered_total
heliosdb_cdc_events_failed_total
# Performance
heliosdb_cdc_lag_seconds
heliosdb_cdc_event_latency_seconds
heliosdb_cdc_sink_latency_seconds
# By operation
heliosdb_cdc_events_by_operation_total{operation="INSERT"}
heliosdb_cdc_events_by_operation_total{operation="UPDATE"}
# By table
heliosdb_cdc_events_by_table_total{database="mydb",table="users"}

Error Handling

use heliosdb_cdc::{Result, CdcError};
match processor.start().await {
Ok(_) => println!("CDC started"),
Err(CdcError::WalRead(msg)) => eprintln!("WAL error: {}", msg),
Err(CdcError::Kafka(msg)) => eprintln!("Kafka error: {}", msg),
Err(CdcError::Kinesis(msg)) => eprintln!("Kinesis error: {}", msg),
Err(e) => eprintln!("Error: {}", e),
}

Common Patterns

Filter + Transform Pipeline

let filter = EventFilter::new()
.add_rule(FilterRule::include(
FilterCondition::Table("sensitive_data".to_string())
))
.add_transform(TransformRule::MaskValue)
.add_transform(TransformRule::HashKey);
let processor = EventProcessor::new(config, sink)
.with_filter(filter);

Multi-Database CDC

// Separate processor per database
let db1_processor = EventProcessor::new(
CdcConfig::builder().database("db1").build()?,
Box::new(KafkaConnector::new("broker", "db1-changes").await?)
);
let db2_processor = EventProcessor::new(
CdcConfig::builder().database("db2").build()?,
Box::new(KafkaConnector::new("broker", "db2-changes").await?)
);

Graceful Shutdown

let mut processor = EventProcessor::new(config, sink);
tokio::spawn(async move {
processor.start().await.unwrap();
});
// Wait for signal
tokio::signal::ctrl_c().await?;
// Stop processor
processor.stop().await?;

Testing

Mock Sink

use heliosdb_cdc::EventSink;
struct MockSink {
events: Arc<RwLock<Vec<CdcEvent>>>,
}
#[async_trait]
impl EventSink for MockSink {
async fn send(&self, event: &CdcEvent) -> Result<()> {
self.events.write().push(event.clone());
Ok(())
}
// ... implement other methods
}

Integration Test

#[tokio::test]
async fn test_cdc_pipeline() {
let sink = MockSink::new();
let config = CdcConfig::builder()
.wal_path(test_wal_path)
.build()?;
let processor = EventProcessor::new(config, Box::new(sink.clone()));
// Test CDC flow
processor.start().await?;
// Verify events
assert_eq!(sink.event_count(), expected_count);
}

Performance Tuning

High Throughput

CdcConfig::builder()
.batch_size(500) // Larger batches
.buffer_size(50000) // Larger buffer
.checkpoint_interval(5000) // Less frequent checkpoints
.poll_interval_ms(10) // Faster polling
.build()?

Low Latency

CdcConfig::builder()
.batch_size(10) // Smaller batches
.poll_interval_ms(50) // Moderate polling
.checkpoint_interval(100) // More frequent checkpoints
.build()?

Balanced

CdcConfig::builder()
.batch_size(100) // Default
.buffer_size(10000) // Default
.checkpoint_interval(1000) // Default
.poll_interval_ms(100) // Default
.build()?

Troubleshooting

High Lag

  1. Check heliosdb_cdc_lag_seconds metric
  2. Increase batch size
  3. Reduce checkpoint frequency
  4. Scale sink capacity

Missing Events

  1. Verify filter configuration
  2. Check heliosdb_cdc_events_filtered_total
  3. Review checkpoint persistence
  4. Check error logs

High CPU Usage

  1. Reduce poll frequency
  2. Increase batch size
  3. Enable compression
  4. Check filter complexity

Memory Issues

  1. Reduce buffer size
  2. Increase batch send frequency
  3. Check for sink backpressure
  4. Monitor event queue depth

Best Practices

  1. Always use checkpoints - Enable checkpoint persistence
  2. Monitor lag - Keep lag under 60 seconds
  3. Use batching - Batch size > 50 for better throughput
  4. Filter early - Apply filters before serialization
  5. Handle backpressure - Ensure sink can handle peak load
  6. Test thoroughly - Use integration tests
  7. Monitor metrics - Set up Prometheus alerts
  8. Graceful shutdown - Always call processor.stop()

Examples

See the examples/ directory:

  • basic_kafka.rs - Basic Kafka CDC
  • filtered_kinesis.rs - Kinesis with filtering
  • custom_sink.rs - Custom sink implementation

Run examples:

Terminal window
cargo run --example basic_kafka
cargo run --example filtered_kinesis
cargo run --example custom_sink

Documentation

  • Full documentation: README.md
  • Implementation details: PACKAGE_SUMMARY.md
  • API docs: cargo doc --open -p heliosdb-cdc