Database Sink Connector Specification
Database Sink Connector Specification
Version: 1.0 Status: Implementation Owner: F1.3 Stream Processing Team Date: 2025-10-29
Overview
The Database Sink Connector provides reliable, exactly-once delivery of streaming data to relational databases (PostgreSQL, MySQL, Oracle) using two-phase commit (2PC) protocol.
Architecture
┌─────────────────────────────────────────────────────────────┐│ DatabaseSinkConnector │├─────────────────────────────────────────────────────────────┤│ ││ ┌──────────────┐ ┌──────────────┐ ┌───────────────┐ ││ │ Write Buffer │───▶│ Transaction │───▶│ Connection │ ││ │ (Batching) │ │ Manager │ │ Pool │ ││ └──────────────┘ │ (2PC) │ └───────────────┘ ││ └──────────────┘ ││ ││ ┌──────────────┐ ┌──────────────┐ ││ │ Backpressure │ │ Checkpoint │ ││ │ Controller │ │ Manager │ ││ └──────────────┘ └──────────────┘ ││ │└─────────────────────────────────────────────────────────────┘Core Components
1. DatabaseSinkConnector
Main entry point for sink operations.
pub struct DatabaseSinkConnector { config: DatabaseSinkConfig, schema: Arc<Schema>, transaction_manager: Arc<TransactionManager>, connection_pool: Arc<ConnectionPool>, write_buffer: Arc<RwLock<WriteBuffer>>, backpressure: Arc<BackpressureController>, rows_written: Arc<RwLock<u64>>, bytes_written: Arc<RwLock<u64>>,}
impl DatabaseSinkConnector { pub async fn new(config: DatabaseSinkConfig) -> Result<Self>; pub async fn write(&mut self, rows: Vec<Row>) -> Result<()>; pub async fn flush(&mut self) -> Result<()>; pub async fn checkpoint(&mut self) -> Result<CheckpointMetadata>; pub async fn commit(&mut self) -> Result<()>; pub async fn abort(&mut self) -> Result<()>; pub fn metrics(&self) -> SinkMetrics;}2. TransactionManager (Two-Phase Commit)
Coordinates distributed transactions for exactly-once semantics.
pub struct TransactionManager { active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>, prepared_transactions: Arc<RwLock<HashMap<TransactionId, PreparedTransaction>>>, state_backend: Arc<dyn StateBackend>,}
pub struct Transaction { id: TransactionId, connection: DatabaseConnection, writes: Vec<WriteOperation>, status: TransactionStatus, created_at: SystemTime,}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum TransactionStatus { Active, // Transaction is open and receiving writes Preparing, // Phase 1 - preparing to commit Prepared, // Phase 1 complete - ready to commit Committing, // Phase 2 - committing Committed, // Phase 2 complete - successfully committed Aborting, // Rolling back Aborted, // Rolled back}
impl TransactionManager { pub async fn begin(&self) -> Result<TransactionId>; pub async fn prepare(&self, txn_id: TransactionId) -> Result<()>; pub async fn commit(&self, txn_id: TransactionId) -> Result<()>; pub async fn abort(&self, txn_id: TransactionId) -> Result<()>; pub async fn recover(&self) -> Result<Vec<TransactionId>>;}3. ConnectionPool
Manages database connections with pooling and health checks.
pub struct ConnectionPool { config: ConnectionPoolConfig, db_type: DatabaseType, active_connections: Arc<RwLock<Vec<PooledConnection>>>, idle_connections: Arc<RwLock<Vec<PooledConnection>>>, connection_semaphore: Arc<Semaphore>, health_checker: Arc<HealthChecker>,}
pub struct PooledConnection { conn: Box<dyn DatabaseConnection>, id: ConnectionId, created_at: SystemTime, last_used: SystemTime, in_transaction: bool,}
impl ConnectionPool { pub async fn new(config: ConnectionPoolConfig) -> Result<Self>; pub async fn acquire(&self) -> Result<PooledConnection>; pub async fn release(&self, conn: PooledConnection) -> Result<()>; pub async fn health_check(&self) -> Result<PoolHealth>;}4. WriteBuffer
Batches writes for performance with configurable flushing.
pub struct WriteBuffer { buffer: Vec<Row>, batch_size: usize, flush_interval: Duration, last_flush: Instant, total_bytes: usize,}
impl WriteBuffer { pub fn new(batch_size: usize, flush_interval: Duration) -> Self; pub fn add(&mut self, row: Row) -> Result<bool>; // Returns true if flush needed pub fn should_flush(&self) -> bool; pub fn drain(&mut self) -> Vec<Row>; pub fn len(&self) -> usize;}Two-Phase Commit Protocol
Phase 1: Prepare
-
Open Transaction
BEGIN TRANSACTION; -
Execute Writes
INSERT INTO table VALUES (...);UPDATE table SET ... WHERE ...; -
Prepare Transaction (PostgreSQL)
PREPARE TRANSACTION 'txn_id_12345'; -
Persist Transaction State
- Store transaction ID, status, and operations in state backend
- Ensure durability for crash recovery
Phase 2: Commit
-
Commit Prepared Transaction
COMMIT PREPARED 'txn_id_12345'; -
Update State Backend
- Mark transaction as committed
- Clean up transaction metadata
Error Handling
Scenario 1: Prepare Fails
- Rollback transaction immediately
- Retry with exponential backoff
- Report failure after max retries
Scenario 2: Commit Fails
- Transaction is in PREPARED state
- On recovery, retry commit
- Use transaction timeout for cleanup
Scenario 3: Crash During Commit
- On restart, check state backend for PREPARED transactions
- Attempt to commit all PREPARED transactions
- Timeout and abort stale transactions (> 1 hour old)
Write Modes
1. Insert Mode
INSERT INTO table (col1, col2, col3) VALUES ($1, $2, $3);2. Upsert Mode (PostgreSQL)
INSERT INTO table (col1, col2, col3) VALUES ($1, $2, $3)ON CONFLICT (key_col) DO UPDATE SET col2 = EXCLUDED.col2, col3 = EXCLUDED.col3;3. Upsert Mode (MySQL)
INSERT INTO table (col1, col2, col3) VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE col2 = VALUES(col2), col3 = VALUES(col3);4. Replace Mode
BEGIN TRANSACTION;DELETE FROM table WHERE key_col = $1;INSERT INTO table (col1, col2, col3) VALUES ($1, $2, $3);COMMIT;Batching Strategy
Size-Based Batching
if buffer.len() >= config.batch_size { flush().await?;}Time-Based Batching
if buffer.last_flush.elapsed() >= config.flush_interval { flush().await?;}Combined Strategy
if buffer.len() >= config.batch_size || buffer.last_flush.elapsed() >= config.flush_interval { flush().await?;}Backpressure Integration
// Check backpressure before accepting writesif backpressure.should_throttle() { let decision = backpressure.make_decision(); match decision { BackpressureDecision::Continue => { // Accept write } BackpressureDecision::Slow { delay_ms } => { tokio::time::sleep(Duration::from_millis(delay_ms)).await; } BackpressureDecision::Pause { duration_ms } => { tokio::time::sleep(Duration::from_millis(duration_ms)).await; } BackpressureDecision::Drop => { return Err(DatabaseError::Backpressure("Rate limit exceeded".into())); } }}Configuration
database: connection_string: "postgresql://user:pass@host:5432/db" db_type: PostgreSQL pool: min_connections: 2 max_connections: 10 connect_timeout_secs: 30 idle_timeout_secs: 600 max_lifetime_secs: 1800
table_name: "events"
write_mode: type: Upsert key_columns: ["event_id", "user_id"]
batch_size: 1000flush_interval_secs: 5enable_2pc: true
# Advanced settingstransaction_timeout_secs: 300 # 5 minutesmax_retries: 3retry_backoff_ms: [100, 500, 2000]enable_backpressure: trueMetrics
pub struct SinkMetrics { pub rows_written: u64, pub bytes_written: u64, pub batches_flushed: u64, pub transactions_committed: u64, pub transactions_aborted: u64, pub write_errors: u64, pub avg_batch_size: f64, pub avg_latency_ms: f64, pub throughput_rows_per_sec: f64,}Testing Strategy
Unit Tests (7 tests)
-
test_single_record_write
- Write single row
- Verify flush called
- Check metrics updated
-
test_batch_writes
- Write multiple rows (< batch size)
- Verify buffering
- Trigger flush
- Verify batch written
-
test_transaction_commit
- Begin transaction
- Execute writes
- Prepare transaction
- Commit transaction
- Verify data persisted
-
test_transaction_rollback
- Begin transaction
- Execute writes
- Simulate error
- Verify rollback
- Check data not persisted
-
test_exactly_once
- Write batch
- Checkpoint
- Crash simulation
- Recover
- Verify no duplicates
-
test_error_recovery
- Inject connection failure
- Verify retry logic
- Check exponential backoff
- Verify eventual success
-
test_performance
- Write 10,000 rows
- Measure throughput (> 5,000 rows/sec)
- Measure latency (< 200ms p99)
- Verify resource usage
Integration Tests (5 tests)
-
test_postgresql_end_to_end
- Connect to real PostgreSQL
- Create test table
- Write 1,000 rows
- Verify all data
- Clean up
-
test_mysql_upsert
- Connect to MySQL
- Insert initial data
- Upsert updates
- Verify no duplicates
-
test_2pc_crash_recovery
- Start transaction
- Prepare
- Kill process
- Restart
- Verify commit completed
-
test_backpressure_throttling
- Write at high rate
- Monitor backpressure
- Verify throttling
- Check no data loss
-
test_concurrent_writes
- Multiple sink instances
- Write to same table
- Verify no conflicts
- Check data integrity
Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Throughput | > 10,000 rows/sec | Single sink instance |
| Latency (p50) | < 50ms | Write to flush |
| Latency (p99) | < 200ms | Write to flush |
| Batch efficiency | > 80% | avg_batch_size / batch_size |
| Connection utilization | 50-80% | Active connections / max |
| Memory per sink | < 100MB | Steady state |
| Transaction success rate | > 99.9% | Commits / total |
Recovery Guarantees
Exactly-Once Semantics
- Idempotent Writes: Use upsert mode with unique keys
- Transaction Deduplication: Store transaction IDs in state backend
- Checkpoint Coordination: Align checkpoints with transaction commits
Failure Scenarios
| Failure | Recovery Action | Guarantee |
|---|---|---|
| Network partition | Retry with backoff | At-least-once |
| Database down | Pause, retry when available | At-least-once |
| Sink crash | Recover from checkpoint, replay | Exactly-once (with 2PC) |
| Duplicate checkpoint | Skip already committed transactions | Exactly-once |
| Transaction timeout | Abort and retry | At-least-once |
Implementation Phases
Phase 1: Basic Sink (Current - Day 3)
- Sink structure and buffering
- 🔲 Connection pooling
- 🔲 Simple INSERT operations
- 🔲 Basic tests
Phase 2: Two-Phase Commit (Day 3-4)
- 🔲 TransactionManager implementation
- 🔲 PREPARE/COMMIT PREPARED support
- 🔲 State backend integration
- 🔲 Crash recovery tests
Phase 3: Advanced Features (Day 4)
- 🔲 Upsert mode
- 🔲 Replace mode
- 🔲 Backpressure integration
- 🔲 Performance optimization
Phase 4: Production Readiness (Day 5)
- 🔲 Integration tests
- 🔲 Performance benchmarks
- 🔲 Documentation
- 🔲 Monitoring/observability
API Examples
Basic Usage
use heliosdb_streaming::connectors::database::*;
// Configurationlet config = DatabaseSinkConfig { database: DatabaseConfig { connection_string: "postgresql://localhost/mydb".to_string(), db_type: DatabaseType::PostgreSQL, pool: ConnectionPoolConfig::default(), }, table_name: "events".to_string(), write_mode: WriteMode::Insert, batch_size: 1000, flush_interval_secs: 5, enable_2pc: true,};
// Create sinklet mut sink = DatabaseSinkConnector::new(config).await?;
// Write datalet rows = vec![ Row::new(hashmap! { "id".to_string() => Value::Integer(1), "name".to_string() => Value::String("Alice".to_string()), }),];
sink.write(rows).await?;
// Flushsink.flush().await?;
// Checkpointlet checkpoint = sink.checkpoint().await?;
// Metricslet metrics = sink.metrics();println!("Rows written: {}", metrics.rows_written);With Upsert
let config = DatabaseSinkConfig { // ... write_mode: WriteMode::Upsert { key_columns: vec!["id".to_string()], }, // ...};