Skip to content

Exactly-Once Semantics Guide

Exactly-Once Semantics Guide

Overview

HeliosDB Streaming guarantees exactly-once processing semantics using a combination of:

  1. Distributed Checkpointing with AES-256-GCM encryption
  2. Two-Phase Commit (2PC) for sinks
  3. Idempotent Source reads with offset tracking
  4. State Recovery from encrypted checkpoints

This ensures that each event is processed exactly once, even in the presence of failures.

How It Works

Architecture

Source → Operators → Checkpoint Barrier → 2PC Sink → External System
↓ ↓ ↓
Offset Track State Snapshot Pre-commit → Commit
Encrypted Storage
(S3/Distributed FS)

1. Checkpointing Mechanism

HeliosDB periodically creates consistent snapshots of all operator state.

Configuration:

use heliosdb_streaming::checkpoint::{CheckpointConfig, EncryptionConfig};
use std::time::Duration;
let checkpoint_config = CheckpointConfig {
// Checkpoint every 60 seconds
interval: Duration::from_secs(60),
// Minimum time between checkpoints (prevents thrashing)
min_pause: Duration::from_secs(10),
// Maximum concurrent checkpoints
max_concurrent: 1,
// Encryption configuration
encryption: EncryptionConfig {
algorithm: EncryptionAlgorithm::AES256GCM,
key_rotation_days: 30,
compress_before_encrypt: true,
},
// Storage location
storage: CheckpointStorage::S3 {
bucket: "heliosdb-checkpoints".to_string(),
prefix: "streaming/prod/".to_string(),
region: "us-east-1".to_string(),
},
// Cleanup old checkpoints
retain_count: 3,
// Timeout for checkpoint completion
timeout: Duration::from_secs(300), // 5 minutes
};
let env = StreamExecutionEnvironment::new()
.with_checkpoint_config(checkpoint_config)
.build()?;

2. Checkpoint Barriers

Checkpoint barriers flow through the dataflow graph ensuring all operators checkpoint at the same logical time.

Barrier Alignment:

Stream A: [e1, e2, e3, |barrier-42|, e4, e5, ...]
Stream B: [e6, e7, |barrier-42|, e8, e9, e10, ...]
Aligned Checkpoint
(both streams at barrier-42)

Implementation:

impl CheckpointBarrier {
async fn align_barriers(
&mut self,
inputs: Vec<Stream>,
barrier_id: u64
) -> Result<()> {
let mut received = HashSet::new();
// Wait for barrier from all inputs
for input in inputs {
let barrier = input.receive_barrier().await?;
assert_eq!(barrier.id, barrier_id);
received.insert(input.id);
}
// All barriers received, trigger checkpoint
self.trigger_checkpoint(barrier_id).await
}
}

3. Two-Phase Commit Sinks

Sinks implement 2PC to ensure atomic writes to external systems.

Interface:

use async_trait::async_trait;
#[async_trait]
pub trait TwoPhaseCommitSink<T> {
/// Phase 1: Pre-commit (prepare)
async fn pre_commit(&mut self, checkpoint_id: u64, transactions: Vec<T>) -> Result<()>;
/// Phase 2: Commit
async fn commit(&mut self, checkpoint_id: u64) -> Result<()>;
/// Abort on failure
async fn abort(&mut self, checkpoint_id: u64) -> Result<()>;
}

Example Implementation (Database Sink):

use heliosdb_streaming::sink::TwoPhaseCommitSink;
pub struct DatabaseSink {
connection: DatabaseConnection,
pending_transactions: HashMap<u64, Vec<Transaction>>,
}
#[async_trait]
impl TwoPhaseCommitSink<Event> for DatabaseSink {
async fn pre_commit(&mut self, checkpoint_id: u64, events: Vec<Event>) -> Result<()> {
// Start transaction
let tx = self.connection.begin().await?;
// Write all events within transaction
for event in &events {
tx.execute("INSERT INTO events VALUES ($1, $2, $3)",
&[&event.id, &event.data, &event.timestamp]).await?;
}
// Prepare transaction (does not commit yet)
tx.prepare(format!("checkpoint_{}", checkpoint_id)).await?;
// Store transaction reference
self.pending_transactions.insert(checkpoint_id, events);
Ok(())
}
async fn commit(&mut self, checkpoint_id: u64) -> Result<()> {
// Commit the prepared transaction
self.connection.commit_prepared(
format!("checkpoint_{}", checkpoint_id)
).await?;
// Remove from pending
self.pending_transactions.remove(&checkpoint_id);
Ok(())
}
async fn abort(&mut self, checkpoint_id: u64) -> Result<()> {
// Rollback the prepared transaction
self.connection.rollback_prepared(
format!("checkpoint_{}", checkpoint_id)
).await?;
// Remove from pending
self.pending_transactions.remove(&checkpoint_id);
Ok(())
}
}

4. Recovery Semantics

On failure, the system recovers from the last successful checkpoint.

Recovery Process:

  1. Detect Failure: Operator fails or becomes unresponsive
  2. Stop Processing: All operators stop accepting new events
  3. Load Checkpoint: Retrieve last successful checkpoint from storage
  4. Decrypt State: Decrypt operator state using key management
  5. Restore State: Restore all operator state to checkpoint
  6. Resume Sources: Sources resume from checkpointed offsets
  7. Abort Uncommitted: Sinks abort any pre-committed but uncommitted transactions
  8. Resume Processing: Processing continues from checkpoint

Example:

impl StreamingJob {
async fn recover_from_failure(&mut self) -> Result<()> {
info!("Starting recovery from checkpoint");
// 1. Load latest checkpoint
let checkpoint = self.checkpoint_coordinator
.load_latest_checkpoint()
.await?;
info!("Loaded checkpoint: {}", checkpoint.id);
// 2. Restore operator state
for operator in &mut self.operators {
let state = checkpoint.get_operator_state(&operator.id)?;
operator.restore_state(state).await?;
}
// 3. Restore source offsets
for source in &mut self.sources {
let offset = checkpoint.get_source_offset(&source.id)?;
source.seek_to_offset(offset).await?;
}
// 4. Abort uncommitted transactions in sinks
for sink in &mut self.sinks {
sink.abort_pending_transactions().await?;
}
// 5. Resume processing
self.start_processing().await?;
info!("Recovery complete");
Ok(())
}
}

Exactly-Once Guarantees

What Is Guaranteed

Each event processed exactly once within the streaming application State updates are consistent across all operators Sinks write each result exactly once to external systems No data loss on failures No duplicate writes to external systems

What Is NOT Guaranteed

External system reads may see partial results during recovery ❌ Side effects in user functions (e.g., logging, metrics) may occur multiple times ❌ Non-transactional sinks cannot provide exactly-once (only at-least-once)

Validation Tests

Test 1: Basic Exactly-Once Validation

#[tokio::test]
async fn test_exactly_once_no_failures() {
let config = create_test_config();
let (source, sink) = create_test_pipeline(config);
// Send 10,000 events with unique IDs
let sent_ids: HashSet<_> = (0..10_000).collect();
source.emit_events(sent_ids.clone()).await.unwrap();
// Wait for processing
source.wait_for_completion().await.unwrap();
// Verify exactly 10,000 events in sink
let received_ids = sink.get_all_event_ids().await.unwrap();
assert_eq!(received_ids.len(), 10_000);
assert_eq!(received_ids, sent_ids);
}

Test 2: Exactly-Once Under Failure

#[tokio::test]
async fn test_exactly_once_with_failure() {
let config = create_test_config();
let (source, sink) = create_test_pipeline(config);
// Send first batch
source.emit_events(0..5_000).await.unwrap();
// Force checkpoint
source.trigger_checkpoint().await.unwrap();
// Send second batch
source.emit_events(5_000..10_000).await.unwrap();
// Simulate failure before second checkpoint
source.simulate_operator_failure().await.unwrap();
// Recover from checkpoint
let (recovered_source, recovered_sink) = recover_pipeline(config).await.unwrap();
// Verify exactly 10,000 unique events (no duplicates from replay)
let received_ids = recovered_sink.get_all_event_ids().await.unwrap();
assert_eq!(received_ids.len(), 10_000);
// Verify all IDs from 0-9999 present
for i in 0..10_000 {
assert!(received_ids.contains(&i), "Missing event ID: {}", i);
}
}

Test 3: Checkpoint Encryption Validation

#[tokio::test]
async fn test_checkpoint_encryption() {
let config = CheckpointConfig {
encryption: EncryptionConfig {
algorithm: EncryptionAlgorithm::AES256GCM,
..Default::default()
},
..Default::default()
};
let (source, sink) = create_test_pipeline(config);
// Process events and checkpoint
source.emit_events(0..1_000).await.unwrap();
let checkpoint_path = source.trigger_checkpoint().await.unwrap();
// Read checkpoint file directly
let encrypted_data = std::fs::read(&checkpoint_path).unwrap();
// Verify data is encrypted (should not contain plaintext)
let plaintext = b"event_id";
assert!(!encrypted_data.windows(plaintext.len()).any(|w| w == plaintext),
"Checkpoint contains unencrypted data!");
// Verify checkpoint can be decrypted and used for recovery
let (recovered_source, recovered_sink) = recover_from_checkpoint(checkpoint_path)
.await.unwrap();
assert!(recovered_source.is_healthy().await.unwrap());
}

Performance Impact

Checkpoint Overhead

Measured on production workload (10K events/sec):

ConfigurationCheckpoint DurationThroughput ImpactLatency Impact
No encryption2.3s<1%+5ms P99
AES-256-GCM2.8s<2%+8ms P99
With compression2.1s<1%+6ms P99

Recommendation: Use encryption + compression for best security/performance balance.

2PC Latency Impact

Per-batch latency overhead:

Batch Size2PC OverheadTotal Latency
100 events+12ms45ms
1000 events+18ms62ms
10000 events+35ms110ms

Recommendation: Batch 1000-5000 events for optimal throughput/latency.

State Size Impact

Encrypted checkpoint size vs raw state:

Raw State SizeEncrypted SizeCompression Ratio
100 MB105 MB1.05x
1 GB1.08 GB1.08x
10 GB10.2 GB1.02x

Conclusion: Encryption overhead is minimal (< 10%).

Best Practices

1. Checkpoint Interval Tuning

// ❌ TOO FREQUENT: High overhead, low throughput
let config = CheckpointConfig {
interval: Duration::from_secs(10), // Every 10 seconds
..Default::default()
};
// OPTIMAL: Balanced recovery time and overhead
let config = CheckpointConfig {
interval: Duration::from_secs(60), // Every 60 seconds
min_pause: Duration::from_secs(20), // Prevent thrashing
..Default::default()
};
// ❌ TOO INFREQUENT: Long recovery time on failure
let config = CheckpointConfig {
interval: Duration::from_secs(600), // Every 10 minutes
..Default::default()
};

2. Monitor Checkpoint Health

// Emit metrics for monitoring
metrics::histogram!("checkpoint.duration_ms", duration.as_millis() as f64);
metrics::gauge!("checkpoint.state_size_bytes", state_size as f64);
metrics::counter!("checkpoint.failures", 1);
// Alert on anomalies
if duration > Duration::from_secs(300) {
alert!("Checkpoint taking too long: {:?}", duration);
}

3. Test Recovery Scenarios Regularly

// Chaos engineering: Randomly inject failures
#[tokio::test]
async fn test_random_failures() {
let config = create_test_config();
let (source, sink) = create_test_pipeline(config);
for _ in 0..10 {
source.emit_events(0..1_000).await.unwrap();
// 20% chance of failure
if rand::random::<f64>() < 0.2 {
source.simulate_failure().await.unwrap();
(source, sink) = recover_pipeline(config).await.unwrap();
}
}
// Verify exactly-once despite failures
let count = sink.count_unique_events().await.unwrap();
assert_eq!(count, 10_000);
}

4. Use Idempotent Sinks When Possible

// Even if 2PC fails, idempotent sinks prevent duplicates
impl IdempotentSink for DatabaseSink {
async fn write_idempotent(&mut self, event: &Event) -> Result<()> {
// Use UPSERT to make writes idempotent
self.connection.execute(
"INSERT INTO events (id, data) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET data = $2",
&[&event.id, &event.data]
).await?;
Ok(())
}
}

Troubleshooting

Checkpoint Timeouts

Symptom: Checkpoints failing with timeout errors

Solutions:

  1. Increase timeout in checkpoint config
  2. Reduce state size by enabling TTL
  3. Use faster checkpoint storage (SSD vs HDD)

High Checkpoint Duration

Symptom: Checkpoints taking >5 minutes

Solutions:

  1. Enable compression
  2. Increase checkpoint interval to reduce frequency
  3. Scale out parallelism to reduce per-task state

Recovery Failures

Symptom: Job fails to recover from checkpoint

Solutions:

  1. Verify checkpoint storage is accessible
  2. Check encryption keys are available
  3. Validate checkpoint format compatibility
  4. Review logs for specific error messages

References


Last Updated: November 2025 Version: 1.0