Skip to content

CDC & Real-Time Streaming — Capture, Transform, Deliver

UVP

HeliosDB Full ships an end-to-end real-time data pipeline in the same binary: WAL-based Change Data Capture to Kafka / Kinesis, windowed continuous queries (tumbling / sliding / session), stream-stream and stream-table joins, a distributed cron scheduler (F6.13) for edge-function jobs, and built-in webhook ingestion with GitHub / Stripe / Shopify signature verification. Plus the world-first tenant-level intelligent replication (F6.21) — replicate at tenant granularity with <5 s lag, AI-powered hot-data prioritisation, and zero-downtime cross-region migration in <100 ms.


Prerequisites

  • HeliosDB Full v8.0.3
  • Apache Kafka or AWS Kinesis (for the CDC examples)
  • ~35 minutes for the full tour
  • Familiarity with at-least-once vs exactly-once delivery semantics

1. The Five Pieces

CrateWhat it doesProduction status
heliosdb-replication/crates/cdcWAL-based change capture; Kafka + Kinesis sinksProduction
heliosdb-replication/crates/tenant-replicationPer-tenant unidirectional replication, AI-prioritisedProduction (world-first F6.21)
heliosdb-streamingWindow functions, continuous queries, event-timeProduction
heliosdb-streaming/crates/schedulerCron-driven edge functions, distributed leader electionProduction
heliosdb-streaming/crates/webhooksInbound HTTP webhooks (GitHub, Stripe, Shopify, Generic)Production

You can use any subset. Most teams start with CDC → Kafka and add the streaming engine for in-database aggregations.


2. CDC — Capture from the WAL

The CDC crate reads the Write-Ahead Log directly. Zero polling overhead on the data path — your INSERTs aren’t slowed down to feed the stream.

Quick start (Kafka sink)

use heliosdb_cdc::{CdcConfig, EventProcessor, KafkaConnector};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = CdcConfig::builder()
.wal_path("/var/lib/heliosdb/wal")
.database("mydb")
.checkpoint_interval(1000)
.build()?;
let kafka = KafkaConnector::new("localhost:9092", "heliosdb-cdc").await?;
let mut processor = EventProcessor::new(config, Box::new(kafka));
processor.start().await?;
Ok(())
}

Filtering and transformation

use heliosdb_cdc::{
EventFilter, FilterCondition, FilterRule, TransformRule,
};
let filter = EventFilter::new()
.add_rule(FilterRule::include(FilterCondition::Table("users".into())))
.add_rule(FilterRule::include(FilterCondition::Table("orders".into())))
// Drop password column before publishing
.add_transform(TransformRule::redact_columns("users", &["password_hash"]));

Delivery semantics

  • At-least-once, with checkpoint management
  • Checkpoint interval is configurable; default is every 1000 events
  • Failed deliveries are retried with exponential backoff
  • Schema-registry integration available for Avro

Sink options

SinkStatus
Apache KafkaProduction
AWS KinesisProduction
Custom (implement the trait)Supported
Flink CDCDocumented (heliosdb-streaming/docs/FLINK_CDC.md)

3. SQL Surface for CDC

Most operators don’t want to write Rust to inspect a stream. The Full edition exposes CDC over SQL:

-- View the last N events
SHOW CDC EVENTS LIMIT 10;
SHOW CDC EVENTS; -- last 100
-- CDC connectors
CREATE CDC CONNECTOR my_connector TYPE kafka;
SHOW CDC CONNECTORS;
ALTER CDC CONNECTOR my_connector SET enabled = false;
DROP CDC CONNECTOR my_connector;

CDC events are also published on Redis Streams when Redis protocol is enabled:

Terminal window
redis-cli XREAD COUNT 10 STREAMS heliosdb:cdc:orders 0

4. Tenant-Level Replication (F6.21, World-First)

This is the differentiated feature. Most CDC systems replicate at the database or table level. HeliosDB Full replicates at tenant granularity — one tenant’s data, one read-only replica, one direction.

Why tenant-level?

  • DR per tenant: regulated tenants need their own replica in a specific region
  • Analytics per tenant: feed a tenant-specific read replica for OLAP without touching the OLTP tenant
  • Zero-downtime migration: move a tenant from us-east-1 to eu-central-1 in <100 ms
  • Compliance: keep EU tenant data exclusively in EU regions

Quick start

use heliosdb_tenant_replication::{
TenantReplicationSource, TenantReplicationTarget, TenantReplicationPipeline,
CompressionType, EncryptionType, ConflictResolution,
};
use std::time::Duration;
let source = TenantReplicationSource::new("tenant-123", "postgres://source-db")
.with_checkpoint_column("updated_at")
.with_replication_lag_target(Duration::from_secs(5))
.with_table_filter(vec!["users.*", "orders.*", "products.*"])
.build()?;
let target = TenantReplicationTarget::new("tenant-123-replica", "postgres://target-db")
.with_read_only_enforcement(true)
.with_conflict_resolution(ConflictResolution::SourceWins)
.build()?;
let replication = TenantReplicationPipeline::new(source, target)
.with_compression(CompressionType::Zstd)
.with_encryption(EncryptionType::Aes256Gcm)
.start().await?;
let status = replication.status().await?;
println!("Replication lag: {}s", status.replication_lag_seconds);
println!("Throughput: {} rows/s", status.throughput_rows_per_sec);

Patent-disclosed innovations

The crate’s README enumerates 8 patent opportunities. Highlights:

InnovationEffect
AI-powered predictive replication40-60% lag reduction by prioritising hot data
Intelligent data transformationAnonymise / aggregate / filter during replication
Semantic conflict resolutionLLM-driven merging beyond last-write-wins
Tenant mobilityCross-region migration in <100 ms
Replication QoSPer-tier SLA (Premium / Standard / BestEffort / Synchronous)
Bi-temporal replicationTransaction time + valid time for audit
Schema-aware compression3-5× better than generic zstd
Automatic failover<30 s RTO, health-based promotion

Targets: <5 second replication lag, <30 s RTO on failover, multi-region DR built-in.


5. Streaming Engine — Window Functions and Continuous Queries

The heliosdb-streaming crate is the OLAP-style stream processor that lives inside the database. It’s where SELECT count(*) FROM api_requests GROUP BY TUMBLE(timestamp, INTERVAL '1 minute') lives.

Tumbling windows

use heliosdb_streaming::*;
use std::time::Duration;
let engine = StreamingEngine::new().await?;
let stream = Stream::new(StreamConfig::default());
let windowed = engine
.tumbling_window(stream.clone(), Duration::from_secs(60))
.await?;

SQL equivalent:

SELECT TUMBLE(timestamp, INTERVAL '1 minute') AS window_start,
COUNT(*) AS event_count
FROM events
GROUP BY TUMBLE(timestamp, INTERVAL '1 minute');

Sliding windows

// 5-min windows, sliding every 1 min
engine.sliding_window(stream, Duration::from_secs(300), Duration::from_secs(60)).await?;
SELECT HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes') AS window_start,
COUNT(*) AS event_count
FROM events
GROUP BY HOP(timestamp, INTERVAL '1 minute', INTERVAL '5 minutes');

Session windows (gap-based)

engine.session_window(stream, Duration::from_secs(600)).await?; // 10-min gap

Continuous queries

A continuous query runs forever, processing events as they arrive and pushing results to a sink:

use std::sync::Arc;
let sink = Arc::new(CallbackSink::new(|rows| {
for row in rows { println!("Result: {:?}", row); }
}));
let q = ContinuousQuery::new(
"real_time_dashboard".into(),
"SELECT COUNT(*) FROM api_requests".into(),
sink,
stream,
);
let handle = engine.create_continuous_query(q).await?;

Stream-table and stream-stream joins

// Enrich a stream with a static dimension table
let join = StreamTableJoin::new("users".into(), "user_id".into());
join.load_table(table_data)?;
let enriched = join.join(events_stream).await?;
// Join two streams within a time window
let join = StreamStreamJoin::new(Duration::from_secs(60))
.with_keys("order_id".into(), "payment_order_id".into())
.with_join_type(JoinType::Inner);
let joined = join.join(orders_stream, payments_stream).await?;

Event time vs processing time

Use event time when ordering matters and out-of-order arrival is the norm:

let cfg = StreamConfig {
name: "events".into(),
time_semantics: TimeSemantics::EventTime,
allowed_lateness: Duration::from_secs(60),
watermark_interval: Duration::from_secs(1),
};

The watermark generator supports BoundedOutOfOrderness, Punctuated, and custom strategies. The late-data handler decides whether late events are processed, processed-as-late, or dropped.


6. Cron Scheduler (F6.13) — Run Edge Functions on a Schedule

The scheduler crate runs WASM edge functions on cron schedules with full distributed coordination.

use heliosdb_scheduler::{CronScheduler, JobConfig, SchedulerConfig};
let scheduler = CronScheduler::new(SchedulerConfig::default()).await?;
let job = JobConfig::new("daily_cleanup", "cleanup_old_events", "0 2 * * *")?
.with_timezone("America/New_York")
.with_max_concurrent(1)
.with_timeout_ms(300_000)
.with_catchup(true);
scheduler.schedule_job(job).await?;
scheduler.start().await?;

Features:

  • Full cron syntax (5-field + optional seconds)
  • Named timezones (America/New_York, Europe/London, Asia/Tokyo, …)
  • Distributed leader election for multi-node deployments
  • Job persistence (survive restarts; jobs reload from disk)
  • Execution history (success/failure rates per job)
  • Missed-run strategies: Skip (default), CatchUpAll, CatchUpLast, CatchUpDelayed
  • Concurrent-execution limits (semaphore-based, per-job and global)
  • REST API for full lifecycle management

REST API summary

POST /api/scheduler/jobs Create
GET /api/scheduler/jobs List
GET /api/scheduler/jobs/{id} Detail
DELETE /api/scheduler/jobs/{id} Remove
POST /api/scheduler/jobs/{id}/enable Enable
POST /api/scheduler/jobs/{id}/disable Disable
POST /api/scheduler/jobs/{id}/trigger Manual fire
GET /api/scheduler/stats Cluster-wide stats

7. Webhook Ingestion — Inbound HTTP

The webhooks crate is the symmetric counterpart to outbound CDC. It accepts inbound HTTP, verifies signatures, and routes payloads to WASM edge functions.

Built-in providers:

ProviderSignature headerDefault content-type
GitHubX-Hub-Signature-256 (sha256=…)application/json
StripeStripe-Signature (t=…,v1=…)application/json
ShopifyX-Shopify-Hmac-Sha256 (base64)application/json
Genericconfigurableconfigurable

Quick start

use heliosdb_webhooks::*;
let server = WebhookServer::new(WebhookConfig::default()).await?;
server.register_webhook(WebhookRegistration {
name: "GitHub Push Events".into(),
endpoint: "/webhooks/github/push".into(),
source: WebhookSource::GitHub,
edge_function: "process_github_push".into(),
secret: Some(std::env::var("GITHUB_SECRET")?),
allowed_methods: vec!["POST".into()],
..Default::default()
}).await?;
server.start().await?;

Security features

  • HMAC-SHA256 signature verification (provider-specific)
  • IP whitelist (CIDR ranges) — global or per-webhook
  • Basic auth on the management API
  • Rate limits (per-endpoint or global, configurable burst)
  • Request size limits (1 MB default)
  • TLS/HTTPS support

Performance

  • Request handling overhead: <5 ms
  • Signature verification: <1 ms
  • 10,000+ requests/second per node
  • Memory footprint: ~50 MB base + ~1 KB per registered webhook

8. End-to-End — A Realistic Pipeline

A common pattern for a SaaS platform:

┌──────────────┐ ┌──────────────┐
│ Stripe │ ──webhook──► /webhooks/stripe ──┐ │ Kafka │
│ Shopify │ │ │ │
└──────────────┘ ▼ └──────▲───────┘
edge function │
(WASM trigger) │
│ │
┌──────────────┐ ▼ │
│ App writes ──┘ INSERT INTO events │
│ INSERT, UPDATE │ │ │
│ DELETE │ │ │
└──────────────┘ ▼ │
WAL ──CDC──► Kafka┘
Continuous queries
(windowed aggregates)
Real-time dashboard

Each piece is independent, observable, and unit-testable.


9. Production Checklist

CDC

  • Checkpoint interval is sized for your tolerance for replay (smaller = less replay, more overhead)
  • Schema registry is configured if you use Avro
  • Sensitive columns are redacted by TransformRule before they hit Kafka
  • Sink credentials are in a secret manager
  • Monitoring scrapes the CDC Prometheus metrics

Tenant replication

  • Source/target tenants are in different failure domains
  • Replication lag alerts are wired up (default target: <5 s)
  • Failover procedure has been tested in a non-prod environment
  • Read-only enforcement is on at the target

Streaming engine

  • allowed_lateness reflects real out-of-order arrival, not optimism
  • Continuous-query buffers are sized for your throughput
  • Sinks are idempotent (continuous queries can replay on restart)

Scheduler

  • Distributed mode is enabled if you run >1 scheduler node (otherwise jobs may run on every node)
  • Missed-run strategy matches the job’s nature (don’t CatchUpAll a billing job)
  • Persistence directory is on durable storage

Webhooks

  • Signature verification is required for every provider that supports it
  • IP whitelist is enabled for known providers (Stripe, Shopify publish their CIDR ranges)
  • Rate limiting is on (a misbehaving sender or a DoS will fall over otherwise)
  • HTTPS is mandatory; no plain HTTP

10. Verification Status

ComponentStatus notes
CDC (WAL → Kafka / Kinesis)Production. At-least-once with checkpointing.
Tenant replication (F6.21)Production · world-first. 8 patent disclosures, full README.
Streaming engine (windows, joins, continuous queries)Production. SQL surface includes TUMBLE/HOP/SESSION.
SchedulerProduction. Distributed leader election + persistence.
WebhooksProduction. GitHub / Stripe / Shopify signature verification built-in; Generic for everything else.
Flink CDC integrationDocumented (heliosdb-streaming/docs/FLINK_CDC.md); test through your version of Flink before relying on it.

Where Next


References

  • CDC source: /home/app/Helios/Full/heliosdb-replication/crates/cdc/
  • Tenant replication source: /home/app/Helios/Full/heliosdb-replication/crates/tenant-replication/ (8 patent claims, beta-customer reference doc, full performance report)
  • Streaming engine: /home/app/Helios/Full/heliosdb-streaming/
  • Scheduler: /home/app/Helios/Full/heliosdb-streaming/crates/scheduler/
  • Webhooks: /home/app/Helios/Full/heliosdb-streaming/crates/webhooks/
  • Existing tutorial — basic CDC: ../intermediate/05-cdc-events.md