CDC & Real-Time Streaming — Capture, Transform, Deliver
UVP
HeliosDB Full ships an end-to-end real-time data pipeline in the same binary: WAL-based Change Data Capture to Kafka / Kinesis, windowed continuous queries (tumbling / sliding / session), stream-stream and stream-table joins, a distributed cron scheduler (F6.13) for edge-function jobs, and built-in webhook ingestion with GitHub / Stripe / Shopify signature verification. Plus the world-first tenant-level intelligent replication (F6.21) — replicate at tenant granularity with <5 s lag, AI-powered hot-data prioritisation, and zero-downtime cross-region migration in <100 ms.
Prerequisites
- HeliosDB Full v8.0.3
- Apache Kafka or AWS Kinesis (for the CDC examples)
- ~35 minutes for the full tour
- Familiarity with at-least-once vs exactly-once delivery semantics
1. The Five Pieces
| Crate | What it does | Production status |
|---|---|---|
heliosdb-replication/crates/cdc | WAL-based change capture; Kafka + Kinesis sinks | Production |
heliosdb-replication/crates/tenant-replication | Per-tenant unidirectional replication, AI-prioritised | Production (world-first F6.21) |
heliosdb-streaming | Window functions, continuous queries, event-time | Production |
heliosdb-streaming/crates/scheduler | Cron-driven edge functions, distributed leader election | Production |
heliosdb-streaming/crates/webhooks | Inbound HTTP webhooks (GitHub, Stripe, Shopify, Generic) | Production |
You can use any subset. Most teams start with CDC → Kafka and add the streaming engine for in-database aggregations.
2. CDC — Capture from the WAL
The CDC crate reads the Write-Ahead Log directly. Zero polling overhead on the data path — your INSERTs aren’t slowed down to feed the stream.
Quick start (Kafka sink)
use heliosdb_cdc::{CdcConfig, EventProcessor, KafkaConnector};
#[tokio::main]async fn main() -> anyhow::Result<()> { let config = CdcConfig::builder() .wal_path("/var/lib/heliosdb/wal") .database("mydb") .checkpoint_interval(1000) .build()?;
let kafka = KafkaConnector::new("localhost:9092", "heliosdb-cdc").await?;
let mut processor = EventProcessor::new(config, Box::new(kafka)); processor.start().await?; Ok(())}Filtering and transformation
use heliosdb_cdc::{ EventFilter, FilterCondition, FilterRule, TransformRule,};
let filter = EventFilter::new() .add_rule(FilterRule::include(FilterCondition::Table("users".into()))) .add_rule(FilterRule::include(FilterCondition::Table("orders".into()))) // Drop password column before publishing .add_transform(TransformRule::redact_columns("users", &["password_hash"]));Delivery semantics
- At-least-once, with checkpoint management
- Checkpoint interval is configurable; default is every 1000 events
- Failed deliveries are retried with exponential backoff
- Schema-registry integration available for Avro
Sink options
| Sink | Status |
|---|---|
| Apache Kafka | Production |
| AWS Kinesis | Production |
| Custom (implement the trait) | Supported |
| Flink CDC | Documented (heliosdb-streaming/docs/FLINK_CDC.md) |
3. SQL Surface for CDC
Most operators don’t want to write Rust to inspect a stream. The Full edition exposes CDC over SQL:
-- View the last N eventsSHOW CDC EVENTS LIMIT 10;SHOW CDC EVENTS; -- last 100
-- CDC connectorsCREATE CDC CONNECTOR my_connector TYPE kafka;SHOW CDC CONNECTORS;ALTER CDC CONNECTOR my_connector SET enabled = false;DROP CDC CONNECTOR my_connector;CDC events are also published on Redis Streams when Redis protocol is enabled:
redis-cli XREAD COUNT 10 STREAMS heliosdb:cdc:orders 04. Tenant-Level Replication (F6.21, World-First)
This is the differentiated feature. Most CDC systems replicate at the database or table level. HeliosDB Full replicates at tenant granularity — one tenant’s data, one read-only replica, one direction.
Why tenant-level?
- DR per tenant: regulated tenants need their own replica in a specific region
- Analytics per tenant: feed a tenant-specific read replica for OLAP without touching the OLTP tenant
- Zero-downtime migration: move a tenant from us-east-1 to eu-central-1 in <100 ms
- Compliance: keep EU tenant data exclusively in EU regions
Quick start
use heliosdb_tenant_replication::{ TenantReplicationSource, TenantReplicationTarget, TenantReplicationPipeline, CompressionType, EncryptionType, ConflictResolution,};use std::time::Duration;
let source = TenantReplicationSource::new("tenant-123", "postgres://source-db") .with_checkpoint_column("updated_at") .with_replication_lag_target(Duration::from_secs(5)) .with_table_filter(vec!["users.*", "orders.*", "products.*"]) .build()?;
let target = TenantReplicationTarget::new("tenant-123-replica", "postgres://target-db") .with_read_only_enforcement(true) .with_conflict_resolution(ConflictResolution::SourceWins) .build()?;
let replication = TenantReplicationPipeline::new(source, target) .with_compression(CompressionType::Zstd) .with_encryption(EncryptionType::Aes256Gcm) .start().await?;
let status = replication.status().await?;println!("Replication lag: {}s", status.replication_lag_seconds);println!("Throughput: {} rows/s", status.throughput_rows_per_sec);Patent-disclosed innovations
The crate’s README enumerates 8 patent opportunities. Highlights:
| Innovation | Effect |
|---|---|
| AI-powered predictive replication | 40-60% lag reduction by prioritising hot data |
| Intelligent data transformation | Anonymise / aggregate / filter during replication |
| Semantic conflict resolution | LLM-driven merging beyond last-write-wins |
| Tenant mobility | Cross-region migration in <100 ms |
| Replication QoS | Per-tier SLA (Premium / Standard / BestEffort / Synchronous) |
| Bi-temporal replication | Transaction time + valid time for audit |
| Schema-aware compression | 3-5× better than generic zstd |
| Automatic failover | <30 s RTO, health-based promotion |
Targets: <5 second replication lag, <30 s RTO on failover, multi-region DR built-in.
5. Streaming Engine — Window Functions and Continuous Queries
The heliosdb-streaming crate is the OLAP-style stream processor that lives inside the database. It’s where SELECT count(*) FROM api_requests GROUP BY TUMBLE(timestamp, INTERVAL '1 minute') lives.
Tumbling windows
use heliosdb_streaming::*;use std::time::Duration;
let engine = StreamingEngine::new().await?;let stream = Stream::new(StreamConfig::default());
let windowed = engine .tumbling_window(stream.clone(), Duration::from_secs(60)) .await?;SQL equivalent:
SELECT TUMBLE(timestamp, INTERVAL '1 minute') AS window_start, COUNT(*) AS event_countFROM eventsGROUP BY TUMBLE(timestamp, INTERVAL '1 minute');Sliding windows
// 5-min windows, sliding every 1 minengine.sliding_window(stream, Duration::from_secs(300), Duration::from_secs(60)).await?;SELECT HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes') AS window_start, COUNT(*) AS event_countFROM eventsGROUP BY HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes');Session windows (gap-based)
engine.session_window(stream, Duration::from_secs(600)).await?; // 10-min gapContinuous queries
A continuous query runs forever, processing events as they arrive and pushing results to a sink:
use std::sync::Arc;
let sink = Arc::new(CallbackSink::new(|rows| { for row in rows { println!("Result: {:?}", row); }}));
let q = ContinuousQuery::new( "real_time_dashboard".into(), "SELECT COUNT(*) FROM api_requests".into(), sink, stream,);
let handle = engine.create_continuous_query(q).await?;Stream-table and stream-stream joins
// Enrich a stream with a static dimension tablelet join = StreamTableJoin::new("users".into(), "user_id".into());join.load_table(table_data)?;let enriched = join.join(events_stream).await?;
// Join two streams within a time windowlet join = StreamStreamJoin::new(Duration::from_secs(60)) .with_keys("order_id".into(), "payment_order_id".into()) .with_join_type(JoinType::Inner);let joined = join.join(orders_stream, payments_stream).await?;Event time vs processing time
Use event time when ordering matters and out-of-order arrival is the norm:
let cfg = StreamConfig { name: "events".into(), time_semantics: TimeSemantics::EventTime, allowed_lateness: Duration::from_secs(60), watermark_interval: Duration::from_secs(1),};The watermark generator supports BoundedOutOfOrderness, Punctuated, and custom strategies. The late-data handler decides whether late events are processed, processed-as-late, or dropped.
6. Cron Scheduler (F6.13) — Run Edge Functions on a Schedule
The scheduler crate runs WASM edge functions on cron schedules with full distributed coordination.
use heliosdb_scheduler::{CronScheduler, JobConfig, SchedulerConfig};
let scheduler = CronScheduler::new(SchedulerConfig::default()).await?;
let job = JobConfig::new("daily_cleanup", "cleanup_old_events", "0 2 * * *")? .with_timezone("America/New_York") .with_max_concurrent(1) .with_timeout_ms(300_000) .with_catchup(true);
scheduler.schedule_job(job).await?;scheduler.start().await?;Features:
- Full cron syntax (5-field + optional seconds)
- Named timezones (
America/New_York,Europe/London,Asia/Tokyo, …) - Distributed leader election for multi-node deployments
- Job persistence (survive restarts; jobs reload from disk)
- Execution history (success/failure rates per job)
- Missed-run strategies:
Skip(default),CatchUpAll,CatchUpLast,CatchUpDelayed - Concurrent-execution limits (semaphore-based, per-job and global)
- REST API for full lifecycle management
REST API summary
POST /api/scheduler/jobs CreateGET /api/scheduler/jobs ListGET /api/scheduler/jobs/{id} DetailDELETE /api/scheduler/jobs/{id} RemovePOST /api/scheduler/jobs/{id}/enable EnablePOST /api/scheduler/jobs/{id}/disable DisablePOST /api/scheduler/jobs/{id}/trigger Manual fireGET /api/scheduler/stats Cluster-wide stats7. Webhook Ingestion — Inbound HTTP
The webhooks crate is the symmetric counterpart to outbound CDC. It accepts inbound HTTP, verifies signatures, and routes payloads to WASM edge functions.
Built-in providers:
| Provider | Signature header | Default content-type |
|---|---|---|
| GitHub | X-Hub-Signature-256 (sha256=…) | application/json |
| Stripe | Stripe-Signature (t=…,v1=…) | application/json |
| Shopify | X-Shopify-Hmac-Sha256 (base64) | application/json |
| Generic | configurable | configurable |
Quick start
use heliosdb_webhooks::*;
let server = WebhookServer::new(WebhookConfig::default()).await?;
server.register_webhook(WebhookRegistration { name: "GitHub Push Events".into(), endpoint: "/webhooks/github/push".into(), source: WebhookSource::GitHub, edge_function: "process_github_push".into(), secret: Some(std::env::var("GITHUB_SECRET")?), allowed_methods: vec!["POST".into()], ..Default::default()}).await?;
server.start().await?;Security features
- HMAC-SHA256 signature verification (provider-specific)
- IP whitelist (CIDR ranges) — global or per-webhook
- Basic auth on the management API
- Rate limits (per-endpoint or global, configurable burst)
- Request size limits (1 MB default)
- TLS/HTTPS support
Performance
- Request handling overhead: <5 ms
- Signature verification: <1 ms
- 10,000+ requests/second per node
- Memory footprint: ~50 MB base + ~1 KB per registered webhook
8. End-to-End — A Realistic Pipeline
A common pattern for a SaaS platform:
┌──────────────┐ ┌──────────────┐│ Stripe │ ──webhook──► /webhooks/stripe ──┐ │ Kafka ││ Shopify │ │ │ │└──────────────┘ ▼ └──────▲───────┘ edge function │ (WASM trigger) │ │ │┌──────────────┐ ▼ ││ App writes ──┘ INSERT INTO events ││ INSERT, UPDATE │ │ ││ DELETE │ │ │└──────────────┘ ▼ │ WAL ──CDC──► Kafka┘ │ ▼ Continuous queries (windowed aggregates) │ ▼ Real-time dashboardEach piece is independent, observable, and unit-testable.
9. Production Checklist
CDC
- Checkpoint interval is sized for your tolerance for replay (smaller = less replay, more overhead)
- Schema registry is configured if you use Avro
- Sensitive columns are redacted by
TransformRulebefore they hit Kafka - Sink credentials are in a secret manager
- Monitoring scrapes the CDC Prometheus metrics
Tenant replication
- Source/target tenants are in different failure domains
- Replication lag alerts are wired up (default target: <5 s)
- Failover procedure has been tested in a non-prod environment
- Read-only enforcement is on at the target
Streaming engine
-
allowed_latenessreflects real out-of-order arrival, not optimism - Continuous-query buffers are sized for your throughput
- Sinks are idempotent (continuous queries can replay on restart)
Scheduler
- Distributed mode is enabled if you run >1 scheduler node (otherwise jobs may run on every node)
- Missed-run strategy matches the job’s nature (don’t
CatchUpAlla billing job) - Persistence directory is on durable storage
Webhooks
- Signature verification is required for every provider that supports it
- IP whitelist is enabled for known providers (Stripe, Shopify publish their CIDR ranges)
- Rate limiting is on (a misbehaving sender or a DoS will fall over otherwise)
- HTTPS is mandatory; no plain HTTP
10. Verification Status
| Component | Status notes |
|---|---|
| CDC (WAL → Kafka / Kinesis) | Production. At-least-once with checkpointing. |
| Tenant replication (F6.21) | Production · world-first. 8 patent disclosures, full README. |
| Streaming engine (windows, joins, continuous queries) | Production. SQL surface includes TUMBLE/HOP/SESSION. |
| Scheduler | Production. Distributed leader election + persistence. |
| Webhooks | Production. GitHub / Stripe / Shopify signature verification built-in; Generic for everything else. |
| Flink CDC integration | Documented (heliosdb-streaming/docs/FLINK_CDC.md); test through your version of Flink before relying on it. |
Where Next
- WASM Triggers — the edge functions that webhooks and the scheduler invoke
- Multi-Tenancy — the prerequisite for tenant-level replication
- Edge Deployment — drive CDC events to edge fleets
References
- CDC source:
/home/app/Helios/Full/heliosdb-replication/crates/cdc/ - Tenant replication source:
/home/app/Helios/Full/heliosdb-replication/crates/tenant-replication/(8 patent claims, beta-customer reference doc, full performance report) - Streaming engine:
/home/app/Helios/Full/heliosdb-streaming/ - Scheduler:
/home/app/Helios/Full/heliosdb-streaming/crates/scheduler/ - Webhooks:
/home/app/Helios/Full/heliosdb-streaming/crates/webhooks/ - Existing tutorial — basic CDC:
../intermediate/05-cdc-events.md