Database Sink Optimization Recommendations
Database Sink Optimization Recommendations
Version: 1.0 Date: 2025-10-29 Author: Performance Benchmarker Agent Priority: High - Phase 2 Performance Targets
Overview
This document provides detailed, actionable optimization recommendations to achieve Phase 2 performance targets for the Database Sink connector.
Performance Target Summary
| Metric | Current Projection | Target | Gap | Priority |
|---|---|---|---|---|
| Throughput | ~35K events/sec | >100K events/sec | +65K | P0 |
| Latency P99 | ~130ms | <100ms | -30ms | P0 |
| Memory/Sink | ~36MB | <100MB | Met | P2 |
| Checkpoint Overhead | ~10% | <5% | -5% | P1 |
| Connection Util | ~35% | 50-80% | +20% | P1 |
Critical Path Optimizations (P0)
OPT-001: Lock-Free Write Buffer
Priority: P0 - Critical Impact: +40% throughput, -20ms P99 latency Effort: 2 days Risk: Medium
Current Implementation Problem
// FILE: sink.rs:131-150pub async fn write(&mut self, rows: Vec<Row>) -> Result<()> { // 🔴 PROBLEM: Single RwLock serializes all writes let mut buffer = self.write_buffer.write().await; let mut should_flush = false;
for row in rows { if buffer.add(row) { should_flush = true; } }
drop(buffer); // Lock held for entire loop duration
if should_flush { self.flush().await?; }
Ok(())}Issues:
- Lock held for O(n) time where n = rows.len()
- Blocks all concurrent writers
- Write throughput does NOT scale with concurrency
Recommended Solution: Channel-Based Lock-Free Buffer
use tokio::sync::mpsc;
pub struct DatabaseSinkConnector { // Replace RwLock<WriteBuffer> with channel write_tx: mpsc::Sender<Row>, flush_worker: tokio::task::JoinHandle<()>, // ... other fields}
impl DatabaseSinkConnector { pub async fn new(config: DatabaseSinkConfig) -> Result<Self> { let (write_tx, write_rx) = mpsc::channel(config.batch_size * 2);
// Spawn background flush worker let flush_worker = tokio::spawn(Self::flush_worker_loop( write_rx, config.batch_size, config.flush_interval_secs, ));
Ok(Self { write_tx, flush_worker, // ... }) }
pub async fn write(&mut self, rows: Vec<Row>) -> Result<()> { // SOLUTION: Lock-free channel send for row in rows { self.write_tx.send(row).await .map_err(|_| DatabaseError::WriteBufferFull)?; } Ok(()) }
async fn flush_worker_loop( mut rx: mpsc::Receiver<Row>, batch_size: usize, flush_interval_secs: u64, ) { let mut buffer = Vec::with_capacity(batch_size); let mut last_flush = Instant::now(); let flush_interval = Duration::from_secs(flush_interval_secs);
loop { tokio::select! { // Receive rows Some(row) = rx.recv() => { buffer.push(row);
// Size-based flush if buffer.len() >= batch_size { Self::flush_batch(&mut buffer).await; last_flush = Instant::now(); } }
// Time-based flush _ = tokio::time::sleep(flush_interval), if !buffer.is_empty() => { Self::flush_batch(&mut buffer).await; last_flush = Instant::now(); } } } }}Benefits:
- Zero lock contention on write path
- Automatic batching in background
- Time-based flushing without blocking writes
- Scales linearly with concurrent writers
Risks & Mitigation:
- Risk: Channel backpressure on slow database
- Mitigation: Monitor channel capacity, integrate with backpressure controller
- Risk: Worker task panic loses data
- Mitigation: Graceful shutdown with drain, task restart logic
Testing:
#[tokio::test]async fn test_lock_free_write_throughput() { let sink = DatabaseSinkConnector::new(config).await?;
// Spawn 10 concurrent writers let mut handles = vec![]; for _ in 0..10 { let mut sink_clone = sink.clone(); let handle = tokio::spawn(async move { for _ in 0..1000 { sink_clone.write(vec![generate_row()]).await.unwrap(); } }); handles.push(handle); }
// Should complete in <1 second for 10K writes let start = Instant::now(); for handle in handles { handle.await.unwrap(); } let elapsed = start.elapsed(); assert!(elapsed < Duration::from_secs(1));}OPT-002: Batch Row Processing
Priority: P0 - Critical Impact: +25% throughput for large batches Effort: 1 day Risk: Low
Current Implementation Problem
// FILE: sink.rs:136-140for row in rows { // 🔴 Sequential processing if buffer.add(row) { should_flush = true; }}Recommended Solution: Batch Add Operation
impl WriteBuffer { // Add new batch method pub fn add_batch(&mut self, rows: Vec<Row>) -> bool { // Pre-calculate total size let batch_size_estimate = rows.len() * 100; // Rough estimate self.total_bytes += batch_size_estimate;
// Reserve capacity if needed if self.buffer.capacity() < self.buffer.len() + rows.len() { self.buffer.reserve(rows.len()); }
// Batch append (single operation) self.buffer.extend(rows);
self.should_flush() }
// Keep old method for single-row compatibility pub fn add(&mut self, row: Row) -> bool { self.add_batch(vec![row]) }}
// Update write() to use batch addpub async fn write(&mut self, rows: Vec<Row>) -> Result<()> { let mut buffer = self.write_buffer.write().await; let should_flush = buffer.add_batch(rows); // Single operation drop(buffer);
if should_flush { self.flush().await?; }
Ok(())}Benefits:
- Single allocation for capacity reservation
- Vectorized operation (CPU-friendly)
- Reduces loop overhead
OPT-003: Connection Pool Lock-Free Queue
Priority: P0 - Critical Impact: -5ms P99 latency, +20% concurrent throughput Effort: 2 days Risk: Medium
Current Implementation Problem
// FILE: pool.rs:89-123pub async fn acquire(self: &Arc<Self>) -> Result<PooledConnection> { let _permit = self.connection_semaphore.acquire().await?;
// 🔴 PROBLEM 1: Lock on idle connections let mut idle = self.idle_connections.write().await; if let Some(mut conn) = idle.pop() { // ... validation ... drop(idle);
// 🔴 PROBLEM 2: Second lock on active connections self.active_connections.write().await.push(conn.clone()); return Ok(conn); }
// ... create new connection ...}Recommended Solution: Lock-Free Queue
use crossbeam::queue::SegQueue;
pub struct ConnectionPool { config: ConnectionPoolConfig, db_type: DatabaseType, connection_string: String,
// SOLUTION: Lock-free queues idle_connections: Arc<SegQueue<PooledConnection>>, active_connections: Arc<DashMap<ConnectionId, PooledConnection>>,
connection_semaphore: Arc<Semaphore>, total_created: Arc<AtomicUsize>, // Atomic instead of RwLock health_checker: Arc<HealthChecker>,}
impl ConnectionPool { pub async fn acquire(self: &Arc<Self>) -> Result<PooledConnection> { // Wait for semaphore let _permit = self.connection_semaphore.acquire().await?;
// Lock-free pop if let Some(mut conn) = self.idle_connections.pop() { // Check staleness if conn.last_used.elapsed().as_secs() <= self.config.idle_timeout_secs { conn.last_used = Instant::now();
// Lock-free insert self.active_connections.insert(conn.id, conn.clone()); return Ok(conn); } // Stale connection, fall through to create new }
// Create new connection let conn = self.create_connection().await?; self.active_connections.insert(conn.id, conn.clone()); Ok(conn) }
pub async fn release(&self, mut conn: PooledConnection) -> Result<()> { conn.last_used = Instant::now(); conn.in_transaction = false;
// Lock-free remove self.active_connections.remove(&conn.id);
// Check lifetime let age = conn.created_at.elapsed() .map_err(|e| DatabaseError::ConnectionFailed(format!("Time error: {}", e)))? .as_secs();
if age <= self.config.max_lifetime_secs { // Lock-free push self.idle_connections.push(conn); }
Ok(()) }
async fn create_connection(self: &Arc<Self>) -> Result<PooledConnection> { let id = Uuid::new_v4();
// Real database connection creation here tokio::time::sleep(Duration::from_millis(10)).await;
// Atomic increment self.total_created.fetch_add(1, Ordering::Relaxed);
Ok(PooledConnection { id, created_at: SystemTime::now(), last_used: Instant::now(), in_transaction: false, _pool: self.clone(), }) }}Dependencies:
# Add to Cargo.tomlcrossbeam = "0.8"dashmap = "5.5" # Already in dependenciesBenefits:
- Zero lock contention on acquire/release
- Scales linearly with concurrent requests
- Atomic metrics updates
OPT-004: Transaction Manager DashMap
Priority: P0 - Critical Impact: -10ms transaction overhead, +30% 2PC throughput Effort: 1 day Risk: Low
Current Implementation Problem
// FILE: transaction.rs:14-20pub struct TransactionManager { // 🔴 PROBLEM: Multiple lock acquisitions per transaction active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>, prepared_transactions: Arc<RwLock<HashMap<TransactionId, PreparedTransaction>>>, // ...}Recommended Solution: DashMap for Concurrent Access
use dashmap::DashMap;
pub struct TransactionManager { // SOLUTION: Lock-free concurrent maps active_transactions: Arc<DashMap<TransactionId, Transaction>>, prepared_transactions: Arc<DashMap<TransactionId, PreparedTransaction>>, state_backend: Arc<dyn StateBackend>, config: TransactionConfig,}
impl TransactionManager { pub async fn begin(&self, connection_id: ConnectionId) -> Result<TransactionId> { let txn_id = Uuid::new_v4();
let transaction = Transaction { id: txn_id, connection_id, status: TransactionStatus::Active, created_at: SystemTime::now(), operations: Vec::new(), retries: 0, };
// Lock-free insert self.active_transactions.insert(txn_id, transaction); Ok(txn_id) }
pub async fn add_operation( &self, txn_id: TransactionId, operation: WriteOperation, ) -> Result<()> { // Lock-free get_mut let mut txn = self.active_transactions .get_mut(&txn_id) .ok_or_else(|| DatabaseError::InvalidConfig("Transaction not found".to_string()))?;
if txn.status != TransactionStatus::Active { return Err(DatabaseError::InvalidConfig(format!( "Transaction is not active: {:?}", txn.status ))); }
txn.operations.push(operation); Ok(()) }
pub async fn prepare(&self, txn_id: TransactionId) -> Result<()> { // Single lock-free access let mut txn_ref = self.active_transactions .get_mut(&txn_id) .ok_or_else(|| DatabaseError::InvalidConfig("Transaction not found".to_string()))?;
if txn_ref.status != TransactionStatus::Active { return Err(DatabaseError::InvalidConfig(format!( "Transaction cannot be prepared from status: {:?}", txn_ref.status ))); }
txn_ref.status = TransactionStatus::Preparing;
// Simulate prepare tokio::time::sleep(Duration::from_millis(5)).await;
txn_ref.status = TransactionStatus::Prepared;
// Create prepared transaction let prepared = PreparedTransaction { id: txn_id, prepared_at: SystemTime::now(), operations_count: txn_ref.operations.len(), checksum: self.calculate_checksum(&txn_ref.operations), };
// Persist to state backend self.persist_prepared_transaction(&prepared).await?;
// Move to prepared map drop(txn_ref); // Release lock self.prepared_transactions.insert(txn_id, prepared);
Ok(()) }
pub async fn commit(&self, txn_id: TransactionId) -> Result<()> { // Check prepared without lock contention if !self.prepared_transactions.contains_key(&txn_id) { if let Some(txn) = self.active_transactions.get(&txn_id) { if txn.status != TransactionStatus::Prepared { return Err(DatabaseError::InvalidConfig(format!( "Transaction not prepared: {:?}", txn.status ))); } } else { return Err(DatabaseError::InvalidConfig( "Transaction not found".to_string(), )); } }
// Update status if let Some(mut txn) = self.active_transactions.get_mut(&txn_id) { txn.status = TransactionStatus::Committing; }
// Simulate commit tokio::time::sleep(Duration::from_millis(5)).await;
// Update status and cleanup if let Some(mut txn) = self.active_transactions.get_mut(&txn_id) { txn.status = TransactionStatus::Committed; }
// Remove from both maps self.prepared_transactions.remove(&txn_id); self.active_transactions.remove(&txn_id);
// Clean up state backend self.cleanup_prepared_transaction(txn_id).await?;
Ok(()) }}Benefits:
- Eliminates lock contention between transactions
- Allows concurrent transaction lifecycle operations
- Simplifies code (no explicit lock management)
Secondary Optimizations (P1)
OPT-005: Accurate Row Size Calculation
Priority: P1 Impact: Better memory pressure detection Effort: 4 hours Risk: Low
impl Row { /// Calculate approximate memory usage of this row pub fn estimated_size_bytes(&self) -> usize { let mut size = std::mem::size_of::<Self>();
for (key, value) in self.fields() { size += key.len(); size += match value { Value::Null => 1, Value::Boolean(_) => 1, Value::Integer(_) => 8, Value::Float(_) => 8, Value::String(s) => s.len(), Value::Bytes(b) => b.len(), Value::Timestamp(_) => 8, }; }
size }}
impl WriteBuffer { pub fn add(&mut self, row: Row) -> bool { // Use actual row size let row_size = row.estimated_size_bytes(); self.total_bytes += row_size; self.buffer.push(row);
self.should_flush() }}OPT-006: Zero-Copy Buffer Drain
Priority: P1 Impact: -15% allocation rate Effort: 2 hours Risk: Low
pub struct WriteBuffer { buffer: Vec<Row>, swap_buffer: Vec<Row>, // Add swap buffer // ...}
impl WriteBuffer { pub fn new(batch_size: usize, flush_interval_secs: u64) -> Self { Self { buffer: Vec::with_capacity(batch_size), swap_buffer: Vec::with_capacity(batch_size), // Pre-allocate // ... } }
pub fn drain(&mut self) -> Vec<Row> { self.last_flush = Instant::now(); self.total_bytes = 0;
// Zero-copy swap std::mem::swap(&mut self.buffer, &mut self.swap_buffer);
// Return swapped buffer (will be cleared by caller) std::mem::take(&mut self.swap_buffer) }}OPT-007: Atomic Metrics Counters
Priority: P1 Impact: -3% lock overhead Effort: 3 hours Risk: Low
use std::sync::atomic::{AtomicU64, Ordering};
pub struct DatabaseSinkConnector { // ... // Replace RwLock<u64> with AtomicU64 rows_written: Arc<AtomicU64>, bytes_written: Arc<AtomicU64>, batches_flushed: Arc<AtomicU64>, transactions_committed: Arc<AtomicU64>, transactions_aborted: Arc<AtomicU64>, write_errors: Arc<AtomicU64>,}
impl DatabaseSinkConnector { async fn write_batch_2pc(&self, rows: &[DatabaseRow]) -> Result<()> { // ...
match self.transaction_manager.commit(txn_id).await { Ok(_) => { // Atomic increment (lock-free) self.transactions_committed.fetch_add(1, Ordering::Relaxed); *self.current_transaction.write().await = None; } Err(e) => { self.transactions_aborted.fetch_add(1, Ordering::Relaxed); self.write_errors.fetch_add(1, Ordering::Relaxed); // ... } }
Ok(()) }
pub async fn metrics(&self) -> SinkMetrics { let rows_written = self.rows_written.load(Ordering::Relaxed); let batches_flushed = self.batches_flushed.load(Ordering::Relaxed);
SinkMetrics { rows_written, bytes_written: self.bytes_written.load(Ordering::Relaxed), batches_flushed, transactions_committed: self.transactions_committed.load(Ordering::Relaxed), transactions_aborted: self.transactions_aborted.load(Ordering::Relaxed), write_errors: self.write_errors.load(Ordering::Relaxed), avg_batch_size: if batches_flushed > 0 { rows_written as f64 / batches_flushed as f64 } else { 0.0 }, avg_latency_ms: 0.0, throughput_rows_per_sec: 0.0, } }}OPT-008: Batch Serialization
Priority: P1 Impact: +10% throughput Effort: 1 day Risk: Low
impl DatabaseSinkConnector { fn serialize_row(&self, row: &DatabaseRow) -> Result<Vec<u8>> { // Use bincode for efficient serialization bincode::serialize(row) .map_err(|e| DatabaseError::SerializationError(e.to_string())) }
fn serialize_rows_batch(&self, rows: &[DatabaseRow]) -> Result<Vec<Vec<u8>>> { // Pre-allocate result vector let mut serialized = Vec::with_capacity(rows.len());
for row in rows { serialized.push(self.serialize_row(row)?); }
Ok(serialized) }}Advanced Optimizations (P2)
OPT-009: SIMD Row Comparison (Stretch Goal)
Priority: P2 - Nice to have Impact: +5% throughput for upsert operations Effort: 3 days Risk: High
Use SIMD instructions for bulk row comparison in upsert mode:
#[cfg(target_arch = "x86_64")]use std::arch::x86_64::*;
fn bulk_compare_keys(keys1: &[i64], keys2: &[i64]) -> Vec<bool> { // Use AVX2 for parallel comparison // Implementation details...}OPT-010: Connection Pool Warm-Keeping
Priority: P2 Impact: -50ms cold start latency Effort: 1 day Risk: Low
impl ConnectionPool { /// Background task to keep connections warm pub async fn start_warmer(&self) { let pool = self.clone(); tokio::spawn(async move { loop { tokio::time::sleep(Duration::from_secs(60)).await;
let stats = pool.stats().await; let target = pool.config.min_connections;
// Ensure minimum connections are always available if stats.idle_connections < target { for _ in 0..(target - stats.idle_connections) { if let Ok(conn) = pool.create_connection().await { pool.idle_connections.push(conn); } } } } }); }}Implementation Priority Matrix
High Impact │ │ OPT-001 OPT-003 │ Lock-Free Pool Lock-Free │ Buffer │ │ OPT-002 OPT-004 │ Batch Ops DashMap │────┼─────────────────────────────────▶ Low Effort │ OPT-005 OPT-009 │ Row Size SIMD (stretch) │ │ OPT-006 OPT-007 │ Drain Atomic Metrics │Low │ OPT-008 OPT-010Impact Warm PoolImplementation Order:
- Week 1: OPT-001, OPT-002, OPT-003, OPT-004 (Critical path)
- Week 2: OPT-005, OPT-006, OPT-007, OPT-008 (Secondary)
- Week 3: OPT-010, testing, tuning
Testing Strategy
Per-Optimization Validation
// Benchmark before/after each optimizationcargo bench --bench database_sink_bench -- --save-baseline before_opt_001// Apply optimizationcargo bench --bench database_sink_bench -- --baseline before_opt_001
// Expect improvements:// - OPT-001: +40% throughput// - OPT-002: +25% throughput// - OPT-003: -5ms latency// - OPT-004: +30% 2PC throughputRegression Testing
#!/bin/bash# Run full benchmark suitecargo bench --bench database_sink_bench -- --save-baseline main
# Alert on regressionsif [ $THROUGHPUT_DROP -gt 10 ]; then echo "ERROR: Throughput regression detected: -${THROUGHPUT_DROP}%" exit 1fiSuccess Metrics
Must-Have Targets
- Throughput: 35K → 100K+ events/sec (+185%)
- Latency P99: 130ms → <100ms (-23%)
- Memory: 36MB → <50MB (-14%)
- Checkpoint: 10% → <5% (-50%)
Stretch Targets
- Throughput: >150K events/sec
- Latency P99: <50ms
- Memory: <30MB
- Checkpoint: <3%
Conclusion
These optimizations provide a clear path to meeting Phase 2 performance targets. The critical path focuses on eliminating lock contention and enabling true concurrent execution. With systematic implementation and validation, all targets are achievable within 2-3 weeks.
Status: Ready for Implementation Next Step: Begin OPT-001 (Lock-Free Write Buffer) Owner: Performance Benchmarker Agent Review Date: After each optimization completion