Skip to content

Database Sink Connector Specification

Database Sink Connector Specification

Version: 1.0 Status: Implementation Owner: F1.3 Stream Processing Team Date: 2025-10-29

Overview

The Database Sink Connector provides reliable, exactly-once delivery of streaming data to relational databases (PostgreSQL, MySQL, Oracle) using two-phase commit (2PC) protocol.

Architecture

┌─────────────────────────────────────────────────────────────┐
│ DatabaseSinkConnector │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ Write Buffer │───▶│ Transaction │───▶│ Connection │ │
│ │ (Batching) │ │ Manager │ │ Pool │ │
│ └──────────────┘ │ (2PC) │ └───────────────┘ │
│ └──────────────┘ │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Backpressure │ │ Checkpoint │ │
│ │ Controller │ │ Manager │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘

Core Components

1. DatabaseSinkConnector

Main entry point for sink operations.

pub struct DatabaseSinkConnector {
config: DatabaseSinkConfig,
schema: Arc<Schema>,
transaction_manager: Arc<TransactionManager>,
connection_pool: Arc<ConnectionPool>,
write_buffer: Arc<RwLock<WriteBuffer>>,
backpressure: Arc<BackpressureController>,
rows_written: Arc<RwLock<u64>>,
bytes_written: Arc<RwLock<u64>>,
}
impl DatabaseSinkConnector {
pub async fn new(config: DatabaseSinkConfig) -> Result<Self>;
pub async fn write(&mut self, rows: Vec<Row>) -> Result<()>;
pub async fn flush(&mut self) -> Result<()>;
pub async fn checkpoint(&mut self) -> Result<CheckpointMetadata>;
pub async fn commit(&mut self) -> Result<()>;
pub async fn abort(&mut self) -> Result<()>;
pub fn metrics(&self) -> SinkMetrics;
}

2. TransactionManager (Two-Phase Commit)

Coordinates distributed transactions for exactly-once semantics.

pub struct TransactionManager {
active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
prepared_transactions: Arc<RwLock<HashMap<TransactionId, PreparedTransaction>>>,
state_backend: Arc<dyn StateBackend>,
}
pub struct Transaction {
id: TransactionId,
connection: DatabaseConnection,
writes: Vec<WriteOperation>,
status: TransactionStatus,
created_at: SystemTime,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionStatus {
Active, // Transaction is open and receiving writes
Preparing, // Phase 1 - preparing to commit
Prepared, // Phase 1 complete - ready to commit
Committing, // Phase 2 - committing
Committed, // Phase 2 complete - successfully committed
Aborting, // Rolling back
Aborted, // Rolled back
}
impl TransactionManager {
pub async fn begin(&self) -> Result<TransactionId>;
pub async fn prepare(&self, txn_id: TransactionId) -> Result<()>;
pub async fn commit(&self, txn_id: TransactionId) -> Result<()>;
pub async fn abort(&self, txn_id: TransactionId) -> Result<()>;
pub async fn recover(&self) -> Result<Vec<TransactionId>>;
}

3. ConnectionPool

Manages database connections with pooling and health checks.

pub struct ConnectionPool {
config: ConnectionPoolConfig,
db_type: DatabaseType,
active_connections: Arc<RwLock<Vec<PooledConnection>>>,
idle_connections: Arc<RwLock<Vec<PooledConnection>>>,
connection_semaphore: Arc<Semaphore>,
health_checker: Arc<HealthChecker>,
}
pub struct PooledConnection {
conn: Box<dyn DatabaseConnection>,
id: ConnectionId,
created_at: SystemTime,
last_used: SystemTime,
in_transaction: bool,
}
impl ConnectionPool {
pub async fn new(config: ConnectionPoolConfig) -> Result<Self>;
pub async fn acquire(&self) -> Result<PooledConnection>;
pub async fn release(&self, conn: PooledConnection) -> Result<()>;
pub async fn health_check(&self) -> Result<PoolHealth>;
}

4. WriteBuffer

Batches writes for performance with configurable flushing.

pub struct WriteBuffer {
buffer: Vec<Row>,
batch_size: usize,
flush_interval: Duration,
last_flush: Instant,
total_bytes: usize,
}
impl WriteBuffer {
pub fn new(batch_size: usize, flush_interval: Duration) -> Self;
pub fn add(&mut self, row: Row) -> Result<bool>; // Returns true if flush needed
pub fn should_flush(&self) -> bool;
pub fn drain(&mut self) -> Vec<Row>;
pub fn len(&self) -> usize;
}

Two-Phase Commit Protocol

Phase 1: Prepare

  1. Open Transaction

    BEGIN TRANSACTION;
  2. Execute Writes

    INSERT INTO table VALUES (...);
    UPDATE table SET ... WHERE ...;
  3. Prepare Transaction (PostgreSQL)

    PREPARE TRANSACTION 'txn_id_12345';
  4. Persist Transaction State

    • Store transaction ID, status, and operations in state backend
    • Ensure durability for crash recovery

Phase 2: Commit

  1. Commit Prepared Transaction

    COMMIT PREPARED 'txn_id_12345';
  2. Update State Backend

    • Mark transaction as committed
    • Clean up transaction metadata

Error Handling

Scenario 1: Prepare Fails

  • Rollback transaction immediately
  • Retry with exponential backoff
  • Report failure after max retries

Scenario 2: Commit Fails

  • Transaction is in PREPARED state
  • On recovery, retry commit
  • Use transaction timeout for cleanup

Scenario 3: Crash During Commit

  • On restart, check state backend for PREPARED transactions
  • Attempt to commit all PREPARED transactions
  • Timeout and abort stale transactions (> 1 hour old)

Write Modes

1. Insert Mode

INSERT INTO table (col1, col2, col3) VALUES ($1, $2, $3);

2. Upsert Mode (PostgreSQL)

INSERT INTO table (col1, col2, col3) VALUES ($1, $2, $3)
ON CONFLICT (key_col) DO UPDATE SET col2 = EXCLUDED.col2, col3 = EXCLUDED.col3;

3. Upsert Mode (MySQL)

INSERT INTO table (col1, col2, col3) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE col2 = VALUES(col2), col3 = VALUES(col3);

4. Replace Mode

BEGIN TRANSACTION;
DELETE FROM table WHERE key_col = $1;
INSERT INTO table (col1, col2, col3) VALUES ($1, $2, $3);
COMMIT;

Batching Strategy

Size-Based Batching

if buffer.len() >= config.batch_size {
flush().await?;
}

Time-Based Batching

if buffer.last_flush.elapsed() >= config.flush_interval {
flush().await?;
}

Combined Strategy

if buffer.len() >= config.batch_size ||
buffer.last_flush.elapsed() >= config.flush_interval {
flush().await?;
}

Backpressure Integration

// Check backpressure before accepting writes
if backpressure.should_throttle() {
let decision = backpressure.make_decision();
match decision {
BackpressureDecision::Continue => {
// Accept write
}
BackpressureDecision::Slow { delay_ms } => {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
BackpressureDecision::Pause { duration_ms } => {
tokio::time::sleep(Duration::from_millis(duration_ms)).await;
}
BackpressureDecision::Drop => {
return Err(DatabaseError::Backpressure("Rate limit exceeded".into()));
}
}
}

Configuration

database:
connection_string: "postgresql://user:pass@host:5432/db"
db_type: PostgreSQL
pool:
min_connections: 2
max_connections: 10
connect_timeout_secs: 30
idle_timeout_secs: 600
max_lifetime_secs: 1800
table_name: "events"
write_mode:
type: Upsert
key_columns: ["event_id", "user_id"]
batch_size: 1000
flush_interval_secs: 5
enable_2pc: true
# Advanced settings
transaction_timeout_secs: 300 # 5 minutes
max_retries: 3
retry_backoff_ms: [100, 500, 2000]
enable_backpressure: true

Metrics

pub struct SinkMetrics {
pub rows_written: u64,
pub bytes_written: u64,
pub batches_flushed: u64,
pub transactions_committed: u64,
pub transactions_aborted: u64,
pub write_errors: u64,
pub avg_batch_size: f64,
pub avg_latency_ms: f64,
pub throughput_rows_per_sec: f64,
}

Testing Strategy

Unit Tests (7 tests)

  1. test_single_record_write

    • Write single row
    • Verify flush called
    • Check metrics updated
  2. test_batch_writes

    • Write multiple rows (< batch size)
    • Verify buffering
    • Trigger flush
    • Verify batch written
  3. test_transaction_commit

    • Begin transaction
    • Execute writes
    • Prepare transaction
    • Commit transaction
    • Verify data persisted
  4. test_transaction_rollback

    • Begin transaction
    • Execute writes
    • Simulate error
    • Verify rollback
    • Check data not persisted
  5. test_exactly_once

    • Write batch
    • Checkpoint
    • Crash simulation
    • Recover
    • Verify no duplicates
  6. test_error_recovery

    • Inject connection failure
    • Verify retry logic
    • Check exponential backoff
    • Verify eventual success
  7. test_performance

    • Write 10,000 rows
    • Measure throughput (> 5,000 rows/sec)
    • Measure latency (< 200ms p99)
    • Verify resource usage

Integration Tests (5 tests)

  1. test_postgresql_end_to_end

    • Connect to real PostgreSQL
    • Create test table
    • Write 1,000 rows
    • Verify all data
    • Clean up
  2. test_mysql_upsert

    • Connect to MySQL
    • Insert initial data
    • Upsert updates
    • Verify no duplicates
  3. test_2pc_crash_recovery

    • Start transaction
    • Prepare
    • Kill process
    • Restart
    • Verify commit completed
  4. test_backpressure_throttling

    • Write at high rate
    • Monitor backpressure
    • Verify throttling
    • Check no data loss
  5. test_concurrent_writes

    • Multiple sink instances
    • Write to same table
    • Verify no conflicts
    • Check data integrity

Performance Targets

MetricTargetMeasurement
Throughput> 10,000 rows/secSingle sink instance
Latency (p50)< 50msWrite to flush
Latency (p99)< 200msWrite to flush
Batch efficiency> 80%avg_batch_size / batch_size
Connection utilization50-80%Active connections / max
Memory per sink< 100MBSteady state
Transaction success rate> 99.9%Commits / total

Recovery Guarantees

Exactly-Once Semantics

  1. Idempotent Writes: Use upsert mode with unique keys
  2. Transaction Deduplication: Store transaction IDs in state backend
  3. Checkpoint Coordination: Align checkpoints with transaction commits

Failure Scenarios

FailureRecovery ActionGuarantee
Network partitionRetry with backoffAt-least-once
Database downPause, retry when availableAt-least-once
Sink crashRecover from checkpoint, replayExactly-once (with 2PC)
Duplicate checkpointSkip already committed transactionsExactly-once
Transaction timeoutAbort and retryAt-least-once

Implementation Phases

Phase 1: Basic Sink (Current - Day 3)

  • Sink structure and buffering
  • 🔲 Connection pooling
  • 🔲 Simple INSERT operations
  • 🔲 Basic tests

Phase 2: Two-Phase Commit (Day 3-4)

  • 🔲 TransactionManager implementation
  • 🔲 PREPARE/COMMIT PREPARED support
  • 🔲 State backend integration
  • 🔲 Crash recovery tests

Phase 3: Advanced Features (Day 4)

  • 🔲 Upsert mode
  • 🔲 Replace mode
  • 🔲 Backpressure integration
  • 🔲 Performance optimization

Phase 4: Production Readiness (Day 5)

  • 🔲 Integration tests
  • 🔲 Performance benchmarks
  • 🔲 Documentation
  • 🔲 Monitoring/observability

API Examples

Basic Usage

use heliosdb_streaming::connectors::database::*;
// Configuration
let config = DatabaseSinkConfig {
database: DatabaseConfig {
connection_string: "postgresql://localhost/mydb".to_string(),
db_type: DatabaseType::PostgreSQL,
pool: ConnectionPoolConfig::default(),
},
table_name: "events".to_string(),
write_mode: WriteMode::Insert,
batch_size: 1000,
flush_interval_secs: 5,
enable_2pc: true,
};
// Create sink
let mut sink = DatabaseSinkConnector::new(config).await?;
// Write data
let rows = vec![
Row::new(hashmap! {
"id".to_string() => Value::Integer(1),
"name".to_string() => Value::String("Alice".to_string()),
}),
];
sink.write(rows).await?;
// Flush
sink.flush().await?;
// Checkpoint
let checkpoint = sink.checkpoint().await?;
// Metrics
let metrics = sink.metrics();
println!("Rows written: {}", metrics.rows_written);

With Upsert

let config = DatabaseSinkConfig {
// ...
write_mode: WriteMode::Upsert {
key_columns: vec!["id".to_string()],
},
// ...
};

References