Skip to content

Database Sink Optimization Recommendations

Database Sink Optimization Recommendations

Version: 1.0 Date: 2025-10-29 Author: Performance Benchmarker Agent Priority: High - Phase 2 Performance Targets

Overview

This document provides detailed, actionable optimization recommendations to achieve Phase 2 performance targets for the Database Sink connector.

Performance Target Summary

MetricCurrent ProjectionTargetGapPriority
Throughput~35K events/sec>100K events/sec+65KP0
Latency P99~130ms<100ms-30msP0
Memory/Sink~36MB<100MBMetP2
Checkpoint Overhead~10%<5%-5%P1
Connection Util~35%50-80%+20%P1

Critical Path Optimizations (P0)

OPT-001: Lock-Free Write Buffer

Priority: P0 - Critical Impact: +40% throughput, -20ms P99 latency Effort: 2 days Risk: Medium

Current Implementation Problem

// FILE: sink.rs:131-150
pub async fn write(&mut self, rows: Vec<Row>) -> Result<()> {
// 🔴 PROBLEM: Single RwLock serializes all writes
let mut buffer = self.write_buffer.write().await;
let mut should_flush = false;
for row in rows {
if buffer.add(row) {
should_flush = true;
}
}
drop(buffer); // Lock held for entire loop duration
if should_flush {
self.flush().await?;
}
Ok(())
}

Issues:

  1. Lock held for O(n) time where n = rows.len()
  2. Blocks all concurrent writers
  3. Write throughput does NOT scale with concurrency
use tokio::sync::mpsc;
pub struct DatabaseSinkConnector {
// Replace RwLock<WriteBuffer> with channel
write_tx: mpsc::Sender<Row>,
flush_worker: tokio::task::JoinHandle<()>,
// ... other fields
}
impl DatabaseSinkConnector {
pub async fn new(config: DatabaseSinkConfig) -> Result<Self> {
let (write_tx, write_rx) = mpsc::channel(config.batch_size * 2);
// Spawn background flush worker
let flush_worker = tokio::spawn(Self::flush_worker_loop(
write_rx,
config.batch_size,
config.flush_interval_secs,
));
Ok(Self {
write_tx,
flush_worker,
// ...
})
}
pub async fn write(&mut self, rows: Vec<Row>) -> Result<()> {
// SOLUTION: Lock-free channel send
for row in rows {
self.write_tx.send(row).await
.map_err(|_| DatabaseError::WriteBufferFull)?;
}
Ok(())
}
async fn flush_worker_loop(
mut rx: mpsc::Receiver<Row>,
batch_size: usize,
flush_interval_secs: u64,
) {
let mut buffer = Vec::with_capacity(batch_size);
let mut last_flush = Instant::now();
let flush_interval = Duration::from_secs(flush_interval_secs);
loop {
tokio::select! {
// Receive rows
Some(row) = rx.recv() => {
buffer.push(row);
// Size-based flush
if buffer.len() >= batch_size {
Self::flush_batch(&mut buffer).await;
last_flush = Instant::now();
}
}
// Time-based flush
_ = tokio::time::sleep(flush_interval), if !buffer.is_empty() => {
Self::flush_batch(&mut buffer).await;
last_flush = Instant::now();
}
}
}
}
}

Benefits:

  • Zero lock contention on write path
  • Automatic batching in background
  • Time-based flushing without blocking writes
  • Scales linearly with concurrent writers

Risks & Mitigation:

  • Risk: Channel backpressure on slow database
    • Mitigation: Monitor channel capacity, integrate with backpressure controller
  • Risk: Worker task panic loses data
    • Mitigation: Graceful shutdown with drain, task restart logic

Testing:

#[tokio::test]
async fn test_lock_free_write_throughput() {
let sink = DatabaseSinkConnector::new(config).await?;
// Spawn 10 concurrent writers
let mut handles = vec![];
for _ in 0..10 {
let mut sink_clone = sink.clone();
let handle = tokio::spawn(async move {
for _ in 0..1000 {
sink_clone.write(vec![generate_row()]).await.unwrap();
}
});
handles.push(handle);
}
// Should complete in <1 second for 10K writes
let start = Instant::now();
for handle in handles {
handle.await.unwrap();
}
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_secs(1));
}

OPT-002: Batch Row Processing

Priority: P0 - Critical Impact: +25% throughput for large batches Effort: 1 day Risk: Low

Current Implementation Problem

// FILE: sink.rs:136-140
for row in rows { // 🔴 Sequential processing
if buffer.add(row) {
should_flush = true;
}
}
impl WriteBuffer {
// Add new batch method
pub fn add_batch(&mut self, rows: Vec<Row>) -> bool {
// Pre-calculate total size
let batch_size_estimate = rows.len() * 100; // Rough estimate
self.total_bytes += batch_size_estimate;
// Reserve capacity if needed
if self.buffer.capacity() < self.buffer.len() + rows.len() {
self.buffer.reserve(rows.len());
}
// Batch append (single operation)
self.buffer.extend(rows);
self.should_flush()
}
// Keep old method for single-row compatibility
pub fn add(&mut self, row: Row) -> bool {
self.add_batch(vec![row])
}
}
// Update write() to use batch add
pub async fn write(&mut self, rows: Vec<Row>) -> Result<()> {
let mut buffer = self.write_buffer.write().await;
let should_flush = buffer.add_batch(rows); // Single operation
drop(buffer);
if should_flush {
self.flush().await?;
}
Ok(())
}

Benefits:

  • Single allocation for capacity reservation
  • Vectorized operation (CPU-friendly)
  • Reduces loop overhead

OPT-003: Connection Pool Lock-Free Queue

Priority: P0 - Critical Impact: -5ms P99 latency, +20% concurrent throughput Effort: 2 days Risk: Medium

Current Implementation Problem

// FILE: pool.rs:89-123
pub async fn acquire(self: &Arc<Self>) -> Result<PooledConnection> {
let _permit = self.connection_semaphore.acquire().await?;
// 🔴 PROBLEM 1: Lock on idle connections
let mut idle = self.idle_connections.write().await;
if let Some(mut conn) = idle.pop() {
// ... validation ...
drop(idle);
// 🔴 PROBLEM 2: Second lock on active connections
self.active_connections.write().await.push(conn.clone());
return Ok(conn);
}
// ... create new connection ...
}
use crossbeam::queue::SegQueue;
pub struct ConnectionPool {
config: ConnectionPoolConfig,
db_type: DatabaseType,
connection_string: String,
// SOLUTION: Lock-free queues
idle_connections: Arc<SegQueue<PooledConnection>>,
active_connections: Arc<DashMap<ConnectionId, PooledConnection>>,
connection_semaphore: Arc<Semaphore>,
total_created: Arc<AtomicUsize>, // Atomic instead of RwLock
health_checker: Arc<HealthChecker>,
}
impl ConnectionPool {
pub async fn acquire(self: &Arc<Self>) -> Result<PooledConnection> {
// Wait for semaphore
let _permit = self.connection_semaphore.acquire().await?;
// Lock-free pop
if let Some(mut conn) = self.idle_connections.pop() {
// Check staleness
if conn.last_used.elapsed().as_secs() <= self.config.idle_timeout_secs {
conn.last_used = Instant::now();
// Lock-free insert
self.active_connections.insert(conn.id, conn.clone());
return Ok(conn);
}
// Stale connection, fall through to create new
}
// Create new connection
let conn = self.create_connection().await?;
self.active_connections.insert(conn.id, conn.clone());
Ok(conn)
}
pub async fn release(&self, mut conn: PooledConnection) -> Result<()> {
conn.last_used = Instant::now();
conn.in_transaction = false;
// Lock-free remove
self.active_connections.remove(&conn.id);
// Check lifetime
let age = conn.created_at.elapsed()
.map_err(|e| DatabaseError::ConnectionFailed(format!("Time error: {}", e)))?
.as_secs();
if age <= self.config.max_lifetime_secs {
// Lock-free push
self.idle_connections.push(conn);
}
Ok(())
}
async fn create_connection(self: &Arc<Self>) -> Result<PooledConnection> {
let id = Uuid::new_v4();
// Real database connection creation here
tokio::time::sleep(Duration::from_millis(10)).await;
// Atomic increment
self.total_created.fetch_add(1, Ordering::Relaxed);
Ok(PooledConnection {
id,
created_at: SystemTime::now(),
last_used: Instant::now(),
in_transaction: false,
_pool: self.clone(),
})
}
}

Dependencies:

# Add to Cargo.toml
crossbeam = "0.8"
dashmap = "5.5" # Already in dependencies

Benefits:

  • Zero lock contention on acquire/release
  • Scales linearly with concurrent requests
  • Atomic metrics updates

OPT-004: Transaction Manager DashMap

Priority: P0 - Critical Impact: -10ms transaction overhead, +30% 2PC throughput Effort: 1 day Risk: Low

Current Implementation Problem

// FILE: transaction.rs:14-20
pub struct TransactionManager {
// 🔴 PROBLEM: Multiple lock acquisitions per transaction
active_transactions: Arc<RwLock<HashMap<TransactionId, Transaction>>>,
prepared_transactions: Arc<RwLock<HashMap<TransactionId, PreparedTransaction>>>,
// ...
}
use dashmap::DashMap;
pub struct TransactionManager {
// SOLUTION: Lock-free concurrent maps
active_transactions: Arc<DashMap<TransactionId, Transaction>>,
prepared_transactions: Arc<DashMap<TransactionId, PreparedTransaction>>,
state_backend: Arc<dyn StateBackend>,
config: TransactionConfig,
}
impl TransactionManager {
pub async fn begin(&self, connection_id: ConnectionId) -> Result<TransactionId> {
let txn_id = Uuid::new_v4();
let transaction = Transaction {
id: txn_id,
connection_id,
status: TransactionStatus::Active,
created_at: SystemTime::now(),
operations: Vec::new(),
retries: 0,
};
// Lock-free insert
self.active_transactions.insert(txn_id, transaction);
Ok(txn_id)
}
pub async fn add_operation(
&self,
txn_id: TransactionId,
operation: WriteOperation,
) -> Result<()> {
// Lock-free get_mut
let mut txn = self.active_transactions
.get_mut(&txn_id)
.ok_or_else(|| DatabaseError::InvalidConfig("Transaction not found".to_string()))?;
if txn.status != TransactionStatus::Active {
return Err(DatabaseError::InvalidConfig(format!(
"Transaction is not active: {:?}",
txn.status
)));
}
txn.operations.push(operation);
Ok(())
}
pub async fn prepare(&self, txn_id: TransactionId) -> Result<()> {
// Single lock-free access
let mut txn_ref = self.active_transactions
.get_mut(&txn_id)
.ok_or_else(|| DatabaseError::InvalidConfig("Transaction not found".to_string()))?;
if txn_ref.status != TransactionStatus::Active {
return Err(DatabaseError::InvalidConfig(format!(
"Transaction cannot be prepared from status: {:?}",
txn_ref.status
)));
}
txn_ref.status = TransactionStatus::Preparing;
// Simulate prepare
tokio::time::sleep(Duration::from_millis(5)).await;
txn_ref.status = TransactionStatus::Prepared;
// Create prepared transaction
let prepared = PreparedTransaction {
id: txn_id,
prepared_at: SystemTime::now(),
operations_count: txn_ref.operations.len(),
checksum: self.calculate_checksum(&txn_ref.operations),
};
// Persist to state backend
self.persist_prepared_transaction(&prepared).await?;
// Move to prepared map
drop(txn_ref); // Release lock
self.prepared_transactions.insert(txn_id, prepared);
Ok(())
}
pub async fn commit(&self, txn_id: TransactionId) -> Result<()> {
// Check prepared without lock contention
if !self.prepared_transactions.contains_key(&txn_id) {
if let Some(txn) = self.active_transactions.get(&txn_id) {
if txn.status != TransactionStatus::Prepared {
return Err(DatabaseError::InvalidConfig(format!(
"Transaction not prepared: {:?}",
txn.status
)));
}
} else {
return Err(DatabaseError::InvalidConfig(
"Transaction not found".to_string(),
));
}
}
// Update status
if let Some(mut txn) = self.active_transactions.get_mut(&txn_id) {
txn.status = TransactionStatus::Committing;
}
// Simulate commit
tokio::time::sleep(Duration::from_millis(5)).await;
// Update status and cleanup
if let Some(mut txn) = self.active_transactions.get_mut(&txn_id) {
txn.status = TransactionStatus::Committed;
}
// Remove from both maps
self.prepared_transactions.remove(&txn_id);
self.active_transactions.remove(&txn_id);
// Clean up state backend
self.cleanup_prepared_transaction(txn_id).await?;
Ok(())
}
}

Benefits:

  • Eliminates lock contention between transactions
  • Allows concurrent transaction lifecycle operations
  • Simplifies code (no explicit lock management)

Secondary Optimizations (P1)

OPT-005: Accurate Row Size Calculation

Priority: P1 Impact: Better memory pressure detection Effort: 4 hours Risk: Low

impl Row {
/// Calculate approximate memory usage of this row
pub fn estimated_size_bytes(&self) -> usize {
let mut size = std::mem::size_of::<Self>();
for (key, value) in self.fields() {
size += key.len();
size += match value {
Value::Null => 1,
Value::Boolean(_) => 1,
Value::Integer(_) => 8,
Value::Float(_) => 8,
Value::String(s) => s.len(),
Value::Bytes(b) => b.len(),
Value::Timestamp(_) => 8,
};
}
size
}
}
impl WriteBuffer {
pub fn add(&mut self, row: Row) -> bool {
// Use actual row size
let row_size = row.estimated_size_bytes();
self.total_bytes += row_size;
self.buffer.push(row);
self.should_flush()
}
}

OPT-006: Zero-Copy Buffer Drain

Priority: P1 Impact: -15% allocation rate Effort: 2 hours Risk: Low

pub struct WriteBuffer {
buffer: Vec<Row>,
swap_buffer: Vec<Row>, // Add swap buffer
// ...
}
impl WriteBuffer {
pub fn new(batch_size: usize, flush_interval_secs: u64) -> Self {
Self {
buffer: Vec::with_capacity(batch_size),
swap_buffer: Vec::with_capacity(batch_size), // Pre-allocate
// ...
}
}
pub fn drain(&mut self) -> Vec<Row> {
self.last_flush = Instant::now();
self.total_bytes = 0;
// Zero-copy swap
std::mem::swap(&mut self.buffer, &mut self.swap_buffer);
// Return swapped buffer (will be cleared by caller)
std::mem::take(&mut self.swap_buffer)
}
}

OPT-007: Atomic Metrics Counters

Priority: P1 Impact: -3% lock overhead Effort: 3 hours Risk: Low

use std::sync::atomic::{AtomicU64, Ordering};
pub struct DatabaseSinkConnector {
// ...
// Replace RwLock<u64> with AtomicU64
rows_written: Arc<AtomicU64>,
bytes_written: Arc<AtomicU64>,
batches_flushed: Arc<AtomicU64>,
transactions_committed: Arc<AtomicU64>,
transactions_aborted: Arc<AtomicU64>,
write_errors: Arc<AtomicU64>,
}
impl DatabaseSinkConnector {
async fn write_batch_2pc(&self, rows: &[DatabaseRow]) -> Result<()> {
// ...
match self.transaction_manager.commit(txn_id).await {
Ok(_) => {
// Atomic increment (lock-free)
self.transactions_committed.fetch_add(1, Ordering::Relaxed);
*self.current_transaction.write().await = None;
}
Err(e) => {
self.transactions_aborted.fetch_add(1, Ordering::Relaxed);
self.write_errors.fetch_add(1, Ordering::Relaxed);
// ...
}
}
Ok(())
}
pub async fn metrics(&self) -> SinkMetrics {
let rows_written = self.rows_written.load(Ordering::Relaxed);
let batches_flushed = self.batches_flushed.load(Ordering::Relaxed);
SinkMetrics {
rows_written,
bytes_written: self.bytes_written.load(Ordering::Relaxed),
batches_flushed,
transactions_committed: self.transactions_committed.load(Ordering::Relaxed),
transactions_aborted: self.transactions_aborted.load(Ordering::Relaxed),
write_errors: self.write_errors.load(Ordering::Relaxed),
avg_batch_size: if batches_flushed > 0 {
rows_written as f64 / batches_flushed as f64
} else {
0.0
},
avg_latency_ms: 0.0,
throughput_rows_per_sec: 0.0,
}
}
}

OPT-008: Batch Serialization

Priority: P1 Impact: +10% throughput Effort: 1 day Risk: Low

impl DatabaseSinkConnector {
fn serialize_row(&self, row: &DatabaseRow) -> Result<Vec<u8>> {
// Use bincode for efficient serialization
bincode::serialize(row)
.map_err(|e| DatabaseError::SerializationError(e.to_string()))
}
fn serialize_rows_batch(&self, rows: &[DatabaseRow]) -> Result<Vec<Vec<u8>>> {
// Pre-allocate result vector
let mut serialized = Vec::with_capacity(rows.len());
for row in rows {
serialized.push(self.serialize_row(row)?);
}
Ok(serialized)
}
}

Advanced Optimizations (P2)

OPT-009: SIMD Row Comparison (Stretch Goal)

Priority: P2 - Nice to have Impact: +5% throughput for upsert operations Effort: 3 days Risk: High

Use SIMD instructions for bulk row comparison in upsert mode:

#[cfg(target_arch = "x86_64")]
use std::arch::x86_64::*;
fn bulk_compare_keys(keys1: &[i64], keys2: &[i64]) -> Vec<bool> {
// Use AVX2 for parallel comparison
// Implementation details...
}

OPT-010: Connection Pool Warm-Keeping

Priority: P2 Impact: -50ms cold start latency Effort: 1 day Risk: Low

impl ConnectionPool {
/// Background task to keep connections warm
pub async fn start_warmer(&self) {
let pool = self.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
let stats = pool.stats().await;
let target = pool.config.min_connections;
// Ensure minimum connections are always available
if stats.idle_connections < target {
for _ in 0..(target - stats.idle_connections) {
if let Ok(conn) = pool.create_connection().await {
pool.idle_connections.push(conn);
}
}
}
}
});
}
}

Implementation Priority Matrix

High Impact
│ OPT-001 OPT-003
│ Lock-Free Pool Lock-Free
│ Buffer
│ OPT-002 OPT-004
│ Batch Ops DashMap
────┼─────────────────────────────────▶ Low Effort
│ OPT-005 OPT-009
│ Row Size SIMD (stretch)
│ OPT-006 OPT-007
│ Drain Atomic Metrics
Low │ OPT-008 OPT-010
Impact Warm Pool

Implementation Order:

  1. Week 1: OPT-001, OPT-002, OPT-003, OPT-004 (Critical path)
  2. Week 2: OPT-005, OPT-006, OPT-007, OPT-008 (Secondary)
  3. Week 3: OPT-010, testing, tuning

Testing Strategy

Per-Optimization Validation

// Benchmark before/after each optimization
cargo bench --bench database_sink_bench -- --save-baseline before_opt_001
// Apply optimization
cargo bench --bench database_sink_bench -- --baseline before_opt_001
// Expect improvements:
// - OPT-001: +40% throughput
// - OPT-002: +25% throughput
// - OPT-003: -5ms latency
// - OPT-004: +30% 2PC throughput

Regression Testing

scripts/benchmark_regression.sh
#!/bin/bash
# Run full benchmark suite
cargo bench --bench database_sink_bench -- --save-baseline main
# Alert on regressions
if [ $THROUGHPUT_DROP -gt 10 ]; then
echo "ERROR: Throughput regression detected: -${THROUGHPUT_DROP}%"
exit 1
fi

Success Metrics

Must-Have Targets

  • Throughput: 35K → 100K+ events/sec (+185%)
  • Latency P99: 130ms → <100ms (-23%)
  • Memory: 36MB → <50MB (-14%)
  • Checkpoint: 10% → <5% (-50%)

Stretch Targets

  • Throughput: >150K events/sec
  • Latency P99: <50ms
  • Memory: <30MB
  • Checkpoint: <3%

Conclusion

These optimizations provide a clear path to meeting Phase 2 performance targets. The critical path focuses on eliminating lock contention and enabling true concurrent execution. With systematic implementation and validation, all targets are achievable within 2-3 weeks.

Status: Ready for Implementation Next Step: Begin OPT-001 (Lock-Free Write Buffer) Owner: Performance Benchmarker Agent Review Date: After each optimization completion