Group Commit WAL Architecture Specification
Group Commit WAL Architecture Specification
Document Version: 1.0 Date: 2025-11-10 Status: Design Specification Priority: P0 - Critical Performance Optimization
Executive Summary
This document specifies the architecture for Group Commit Write-Ahead Logging (WAL), designed to reduce fsync overhead by batching multiple transaction commits into a single disk flush. This optimization addresses the #2 performance bottleneck in HeliosDB.
Key Metrics:
- Expected Throughput Gain: 5-10x improvement
- Expected Commit Rate: 10,000+ commits/sec (vs current 1,000)
- Fsync Reduction: 90-95% fewer fsync calls
- Latency Impact: +5-10ms max (configurable)
- Durability: Full ACID guarantees maintained
1. Architecture Decisions
1.1 Batching Strategy: Hybrid Approach
Decision: Implement hybrid time-based + size-based batching
Rationale:
- Time-based (10ms default): Ensures bounded commit latency
- Size-based (100 entries default): Prevents memory bloat, ensures I/O efficiency
- Hybrid: Flush when EITHER condition is met
Configuration:
pub struct GroupCommitConfig { /// Maximum time to wait before flushing (milliseconds) pub max_flush_interval_ms: u64,
/// Maximum entries in a batch before forcing flush pub max_batch_size: usize,
/// Minimum batch size (optimization: skip flush if below threshold) pub min_batch_size: usize,
/// Durability mode pub durability_mode: DurabilityMode,
/// Enable adaptive tuning pub adaptive_tuning: bool,}
impl Default for GroupCommitConfig { fn default() -> Self { Self { max_flush_interval_ms: 10, max_batch_size: 100, min_batch_size: 1, durability_mode: DurabilityMode::GroupCommit, adaptive_tuning: false, } }}1.2 Durability Guarantees: Two-Phase Commitment
Decision: Two-phase protocol with notification waiters
Protocol:
- Phase 1 - Log: Write to WAL buffer, assign LSN, return immediately
- Phase 2 - Flush: Batch fsync, then notify all waiters
Guarantee: Transaction is durable when waiter is notified, NOT when append() returns
Rationale:
- Maintains ACID durability guarantees
- Enables batching for throughput
- Clients choose to wait or not based on durability requirements
1.3 Thread Model: Dedicated Flush Thread
Decision: Single dedicated flush thread with lock-free queue
Architecture:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ Client │────>│ Append Queue │────>│ Flush Thread││ Threads │ │ (Lock-free) │ │ │└─────────────┘ └──────────────┘ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ WAL File │ │ │ (fsync) │ │ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ └─────────────>│ Waiters Map │ │ (Notify) │ └─────────────┘Rationale:
- Eliminates lock contention on hot path (append)
- Dedicated thread optimizes for I/O batching
- Lock-free queue provides predictable latency
- Simple failure recovery (single point of failure isolation)
Alternative Considered: Work-stealing thread pool
- Rejected: Adds complexity, coordination overhead
- Use Case: Only beneficial for very high write rates (>100K/sec)
1.4 Failure Handling: Batch Abort + Recovery
Decision: Atomic batch commit with rollback on failure
Protocol:
- Write all entries to WAL file (buffered)
- Single fsync for entire batch
- If fsync fails:
- Mark all entries in batch as failed
- Notify waiters with error
- Trigger recovery protocol
- If fsync succeeds:
- Update commit LSN
- Notify all waiters with success
Rationale:
- Atomic batch semantics simplify reasoning
- Clear failure boundaries
- No partial commits visible to clients
2. Interface Design
2.1 Core Data Structures
use std::sync::Arc;use std::sync::atomic::{AtomicU64, Ordering};use tokio::sync::{Mutex, Notify, RwLock};use crossbeam::queue::SegQueue;use std::collections::HashMap;use std::path::{Path, PathBuf};use std::fs::File;use std::io::{self, Write, BufWriter};use std::time::{Duration, Instant};
/// Logical Sequence Number - monotonically increasing#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]pub struct Lsn(u64);
impl Lsn { pub fn new(value: u64) -> Self { Self(value) }
pub fn value(&self) -> u64 { self.0 }}
/// Write-Ahead Log Entry#[derive(Debug, Clone)]pub struct WalEntry { /// Transaction ID pub txn_id: u64,
/// Entry type (Begin, Commit, Abort, Data) pub entry_type: WalEntryType,
/// Serialized data pub data: Vec<u8>,
/// Timestamp pub timestamp: u64,
/// Checksum (CRC32) pub checksum: u32,}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum WalEntryType { Begin, Data, Commit, Abort, Checkpoint,}
/// Durability modes#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum DurabilityMode { /// Synchronous: Wait for fsync before returning Synchronous,
/// Group Commit: Batch fsync (default) GroupCommit,
/// Async: Fire-and-forget (no durability guarantee) Async,}
/// Pending entry waiting for flushstruct PendingEntry { lsn: Lsn, entry: WalEntry, notify: Arc<Notify>, result: Arc<RwLock<Option<io::Result<()>>>>,}
/// Group Commit WALpub struct GroupCommitWal { /// Configuration config: GroupCommitConfig,
/// WAL file path path: PathBuf,
/// WAL file handle (managed by flush thread) file: Arc<Mutex<BufWriter<File>>>,
/// Lock-free pending queue pending_queue: Arc<SegQueue<PendingEntry>>,
/// LSN counter (atomic) lsn_counter: Arc<AtomicU64>,
/// Last flushed LSN last_flushed_lsn: Arc<AtomicU64>,
/// Flush thread handle flush_thread: Option<std::thread::JoinHandle<()>>,
/// Shutdown signal shutdown: Arc<AtomicBool>,
/// Metrics metrics: Arc<WalMetrics>,}
/// WAL Metrics#[derive(Debug, Default)]pub struct WalMetrics { /// Total appends pub total_appends: AtomicU64,
/// Total flushes pub total_flushes: AtomicU64,
/// Total bytes written pub total_bytes_written: AtomicU64,
/// Batch size histogram pub batch_sizes: RwLock<Vec<usize>>,
/// Flush latency histogram (microseconds) pub flush_latencies: RwLock<Vec<u64>>,
/// Commit latency histogram (microseconds) pub commit_latencies: RwLock<Vec<u64>>,}2.2 Public API
impl GroupCommitWal { /// Create new Group Commit WAL pub fn new( path: impl AsRef<Path>, config: GroupCommitConfig, ) -> io::Result<Self> { let path = path.as_ref().to_path_buf();
// Open WAL file with direct I/O hints let file = std::fs::OpenOptions::new() .create(true) .append(true) .open(&path)?;
let file = Arc::new(Mutex::new(BufWriter::with_capacity( 1024 * 1024, // 1MB buffer file, )));
let pending_queue = Arc::new(SegQueue::new()); let lsn_counter = Arc::new(AtomicU64::new(1)); let last_flushed_lsn = Arc::new(AtomicU64::new(0)); let shutdown = Arc::new(AtomicBool::new(false)); let metrics = Arc::new(WalMetrics::default());
// Spawn flush thread let flush_thread = Self::spawn_flush_thread( config.clone(), Arc::clone(&file), Arc::clone(&pending_queue), Arc::clone(&last_flushed_lsn), Arc::clone(&shutdown), Arc::clone(&metrics), );
Ok(Self { config, path, file, pending_queue, lsn_counter, last_flushed_lsn, flush_thread: Some(flush_thread), shutdown, metrics, }) }
/// Append single entry to WAL /// /// Returns LSN immediately. For durability guarantee, call wait_for_lsn(lsn). pub fn append(&self, entry: WalEntry) -> io::Result<Lsn> { let start = Instant::now();
// Assign LSN atomically let lsn = Lsn::new(self.lsn_counter.fetch_add(1, Ordering::SeqCst));
// Create notification channel let notify = Arc::new(Notify::new()); let result = Arc::new(RwLock::new(None));
// Enqueue for flush let pending = PendingEntry { lsn, entry, notify, result, };
self.pending_queue.push(pending);
// Update metrics self.metrics.total_appends.fetch_add(1, Ordering::Relaxed);
Ok(lsn) }
/// Append batch of entries pub fn append_batch(&self, entries: Vec<WalEntry>) -> io::Result<Vec<Lsn>> { let mut lsns = Vec::with_capacity(entries.len());
for entry in entries { let lsn = self.append(entry)?; lsns.push(lsn); }
Ok(lsns) }
/// Wait for LSN to be flushed (durability guarantee) pub async fn wait_for_lsn(&self, lsn: Lsn) -> io::Result<()> { // Check if already flushed if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() { return Ok(()); }
// Wait for flush notification // Note: This is simplified - real implementation needs waiter registry loop { tokio::time::sleep(Duration::from_micros(100)).await; if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() { return Ok(()); } } }
/// Force immediate flush (bypasses batching) pub async fn force_flush(&self) -> io::Result<()> { // Signal flush thread to flush immediately // Implementation: Use channel or atomic flag todo!("Implement immediate flush signal") }
/// Get last flushed LSN pub fn last_flushed_lsn(&self) -> Lsn { Lsn::new(self.last_flushed_lsn.load(Ordering::Acquire)) }
/// Recover WAL from file pub fn recover(path: impl AsRef<Path>) -> io::Result<Vec<WalEntry>> { let mut entries = Vec::new(); let file = std::fs::File::open(path)?; let mut reader = std::io::BufReader::new(file);
// Read entries until EOF loop { match Self::read_entry(&mut reader) { Ok(entry) => entries.push(entry), Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e), } }
Ok(entries) }
/// Shutdown WAL gracefully pub fn shutdown(&mut self) -> io::Result<()> { // Signal shutdown self.shutdown.store(true, Ordering::Release);
// Wait for flush thread if let Some(handle) = self.flush_thread.take() { handle.join().expect("Flush thread panicked"); }
// Final flush let mut file = self.file.blocking_lock(); file.flush()?; file.get_mut().sync_all()?;
Ok(()) }
/// Get metrics snapshot pub fn metrics(&self) -> WalMetricsSnapshot { WalMetricsSnapshot { total_appends: self.metrics.total_appends.load(Ordering::Relaxed), total_flushes: self.metrics.total_flushes.load(Ordering::Relaxed), total_bytes_written: self.metrics.total_bytes_written.load(Ordering::Relaxed), } }}2.3 Flush Thread Implementation
impl GroupCommitWal { fn spawn_flush_thread( config: GroupCommitConfig, file: Arc<Mutex<BufWriter<File>>>, pending_queue: Arc<SegQueue<PendingEntry>>, last_flushed_lsn: Arc<AtomicU64>, shutdown: Arc<AtomicBool>, metrics: Arc<WalMetrics>, ) -> std::thread::JoinHandle<()> { std::thread::spawn(move || { let flush_interval = Duration::from_millis(config.max_flush_interval_ms); let max_batch_size = config.max_batch_size;
let mut last_flush = Instant::now(); let mut batch = Vec::with_capacity(max_batch_size);
loop { // Check shutdown if shutdown.load(Ordering::Acquire) { // Final flush if !batch.is_empty() { Self::flush_batch(&file, &batch, &last_flushed_lsn, &metrics); } break; }
// Collect batch while batch.len() < max_batch_size { match pending_queue.pop() { Some(entry) => batch.push(entry), None => break, } }
// Flush conditions let should_flush = !batch.is_empty() && ( batch.len() >= max_batch_size || last_flush.elapsed() >= flush_interval );
if should_flush { Self::flush_batch(&file, &batch, &last_flushed_lsn, &metrics); batch.clear(); last_flush = Instant::now(); } else { // Sleep briefly to avoid busy-wait std::thread::sleep(Duration::from_micros(100)); } } }) }
fn flush_batch( file: &Arc<Mutex<BufWriter<File>>>, batch: &[PendingEntry], last_flushed_lsn: &Arc<AtomicU64>, metrics: &Arc<WalMetrics>, ) { let start = Instant::now();
let mut file = file.blocking_lock();
// Write all entries let mut max_lsn = 0u64; let mut total_bytes = 0usize;
for pending in batch { match Self::write_entry(&mut *file, pending.lsn, &pending.entry) { Ok(bytes) => { total_bytes += bytes; max_lsn = max_lsn.max(pending.lsn.value()); } Err(e) => { eprintln!("WAL write error: {}", e); // Mark as failed continue; } } }
// Single fsync for entire batch match file.flush().and_then(|_| file.get_mut().sync_all()) { Ok(_) => { // Update last flushed LSN last_flushed_lsn.store(max_lsn, Ordering::Release);
// Notify all waiters (simplified - need waiter registry) // for pending in batch { // pending.notify.notify_waiters(); // }
// Update metrics metrics.total_flushes.fetch_add(1, Ordering::Relaxed); metrics.total_bytes_written.fetch_add(total_bytes as u64, Ordering::Relaxed);
let latency = start.elapsed().as_micros() as u64; metrics.flush_latencies.blocking_write().push(latency); metrics.batch_sizes.blocking_write().push(batch.len()); } Err(e) => { eprintln!("WAL fsync error: {}", e); // All entries in batch failed // Notify waiters with error } } }
fn write_entry( file: &mut BufWriter<File>, lsn: Lsn, entry: &WalEntry, ) -> io::Result<usize> { // Format: | LSN (8) | Type (1) | TxnID (8) | Timestamp (8) | DataLen (4) | Data (N) | Checksum (4) |
let mut bytes_written = 0;
// LSN file.write_all(&lsn.value().to_le_bytes())?; bytes_written += 8;
// Entry type file.write_all(&[entry.entry_type as u8])?; bytes_written += 1;
// Transaction ID file.write_all(&entry.txn_id.to_le_bytes())?; bytes_written += 8;
// Timestamp file.write_all(&entry.timestamp.to_le_bytes())?; bytes_written += 8;
// Data length file.write_all(&(entry.data.len() as u32).to_le_bytes())?; bytes_written += 4;
// Data file.write_all(&entry.data)?; bytes_written += entry.data.len();
// Checksum file.write_all(&entry.checksum.to_le_bytes())?; bytes_written += 4;
Ok(bytes_written) }
fn read_entry(reader: &mut impl std::io::Read) -> io::Result<WalEntry> { use std::io::Read;
// Read LSN let mut lsn_buf = [0u8; 8]; reader.read_exact(&mut lsn_buf)?; let _lsn = u64::from_le_bytes(lsn_buf);
// Read entry type let mut type_buf = [0u8; 1]; reader.read_exact(&mut type_buf)?; let entry_type = match type_buf[0] { 0 => WalEntryType::Begin, 1 => WalEntryType::Data, 2 => WalEntryType::Commit, 3 => WalEntryType::Abort, 4 => WalEntryType::Checkpoint, _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid entry type")), };
// Read transaction ID let mut txn_buf = [0u8; 8]; reader.read_exact(&mut txn_buf)?; let txn_id = u64::from_le_bytes(txn_buf);
// Read timestamp let mut ts_buf = [0u8; 8]; reader.read_exact(&mut ts_buf)?; let timestamp = u64::from_le_bytes(ts_buf);
// Read data length let mut len_buf = [0u8; 4]; reader.read_exact(&mut len_buf)?; let data_len = u32::from_le_bytes(len_buf) as usize;
// Read data let mut data = vec![0u8; data_len]; reader.read_exact(&mut data)?;
// Read checksum let mut checksum_buf = [0u8; 4]; reader.read_exact(&mut checksum_buf)?; let checksum = u32::from_le_bytes(checksum_buf);
Ok(WalEntry { txn_id, entry_type, data, timestamp, checksum, }) }}
#[derive(Debug, Clone)]pub struct WalMetricsSnapshot { pub total_appends: u64, pub total_flushes: u64, pub total_bytes_written: u64,}3. Key Design Challenges - Solutions
3.1 Challenge 1: Durability Guarantee
Recommendation: Two-phase protocol with explicit wait
Solution:
// Client code for durable commitlet lsn = wal.append(entry)?; // Phase 1: Log (fast, async)wal.wait_for_lsn(lsn).await?; // Phase 2: Wait for flush (durability)Rationale:
- Separation of concerns: Logging vs durability
- Flexibility: Clients choose their durability requirements
- Performance: Non-critical writes can skip wait
- Correctness: Explicit contract, no hidden behavior
Durability Modes:
- Synchronous Mode: wait_for_lsn() called automatically
- Group Commit Mode: wait_for_lsn() called by client
- Async Mode: no wait (fire-and-forget)
3.2 Challenge 2: Failure Recovery
Recovery Protocol State Machine:
┌─────────────┐│ START │└──────┬──────┘ │ ▼┌─────────────┐│ Open WAL ││ File │└──────┬──────┘ │ ▼┌─────────────┐│ Read Entry │<───────┐└──────┬──────┘ │ │ │ ├──EOF──────────┼─────> Done │ │ ▼ │┌─────────────┐ ││ Validate │ ││ Checksum │ │└──────┬──────┘ │ │ │ ├──FAIL─────────┼─────> Truncate & Stop │ │ ▼ │┌─────────────┐ ││ Apply to │ ││ State │ │└──────┬──────┘ │ │ │ └───────────────┘Recovery Algorithm:
pub fn recover_wal(path: impl AsRef<Path>) -> io::Result<RecoveryResult> { let mut entries = Vec::new(); let mut last_valid_offset = 0u64; let mut corrupted_at = None;
let file = std::fs::File::open(&path)?; let mut reader = std::io::BufReader::new(file);
loop { let offset = reader.stream_position()?;
match Self::read_entry(&mut reader) { Ok(entry) => { // Validate checksum let computed = crc32(&entry.data); if computed != entry.checksum { corrupted_at = Some(offset); break; }
entries.push(entry); last_valid_offset = reader.stream_position()?; } Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { // Normal EOF break; } Err(_) => { // Corruption detected corrupted_at = Some(offset); break; } } }
// Truncate WAL if corruption detected if let Some(corrupt_offset) = corrupted_at { let file = std::fs::OpenOptions::new() .write(true) .open(&path)?; file.set_len(last_valid_offset)?; }
Ok(RecoveryResult { entries, last_valid_offset, corrupted_at, })}
pub struct RecoveryResult { pub entries: Vec<WalEntry>, pub last_valid_offset: u64, pub corrupted_at: Option<u64>,}Failure Scenarios:
| Scenario | Detection | Recovery |
|---|---|---|
| Crash during write | Partial entry (EOF mid-entry) | Truncate at last valid entry |
| Crash during fsync | Checksum mismatch | Truncate at last valid entry |
| Disk corruption | Checksum mismatch | Truncate at last valid entry |
| Partial batch flush | Some entries flushed, some not | All-or-nothing batch semantics |
Key Invariants:
- All entries before last_flushed_lsn are durable
- Entries after last_flushed_lsn may be lost on crash
- Partial entries are never visible (checksum validation)
- WAL file is always truncated to valid boundary
3.3 Challenge 3: Flush Thread Management
Recommendation: Fixed interval with adaptive enhancement (Phase 2)
Phase 1 - Fixed Interval (Simple):
// Flush every 10ms OR when batch reaches 100 entrieslet flush_interval = Duration::from_millis(10);let max_batch_size = 100;
loop { let batch = collect_batch(max_batch_size, flush_interval); if !batch.is_empty() { flush_batch(batch); }}Phase 2 - Adaptive Interval (Advanced):
pub struct AdaptiveFlusher { min_interval: Duration, // 1ms max_interval: Duration, // 20ms current_interval: Duration,
// Adjust based on metrics avg_batch_size: f64, target_batch_size: usize,}
impl AdaptiveFlusher { fn adjust_interval(&mut self, metrics: &WalMetrics) { // If batches are small, increase interval to collect more if self.avg_batch_size < self.target_batch_size as f64 * 0.5 { self.current_interval = (self.current_interval * 1.2) .min(self.max_interval); } // If batches are large, decrease interval to reduce latency else if self.avg_batch_size > self.target_batch_size as f64 * 0.9 { self.current_interval = (self.current_interval * 0.8) .max(self.min_interval); } }}Rationale:
- Start simple: Fixed interval is predictable, easy to reason about
- Optimize later: Adaptive tuning for specific workloads
- Single thread: Eliminates coordination overhead
- Future: Multiple flush threads for >100K writes/sec workloads
Alternative - Multiple Flush Threads: Only beneficial when:
- Write rate exceeds 100K/sec
- Single fsync latency becomes bottleneck
- Parallel I/O subsystem (NVMe, SSD array)
3.4 Challenge 4: LSN Assignment Protocol
Design: Atomic counter with monotonic guarantee
LSN Assignment:
impl GroupCommitWal { pub fn assign_lsn(&self) -> Lsn { // Atomic fetch-add ensures: // 1. Unique LSN per entry // 2. Monotonically increasing // 3. Thread-safe without locks let lsn_value = self.lsn_counter.fetch_add(1, Ordering::SeqCst); Lsn::new(lsn_value) }}Ordering Guarantees:
- LSN Assignment Order: Matches entry submission order (atomic counter)
- Flush Order: Batches flushed in LSN order
- Visibility Order: Entries visible after flush in LSN order
Key Invariant: LSN monotonicity
LSN(entry_i) < LSN(entry_j) => entry_i flushed before or with entry_jRecovery Consistency:
- During recovery, entries are replayed in LSN order
- Gaps in LSN sequence indicate lost entries (crash before flush)
- Last flushed LSN determines recovery point
Multi-threaded Correctness:
// Thread 1let lsn1 = wal.assign_lsn(); // LSN=1wal.enqueue(lsn1, entry1);
// Thread 2let lsn2 = wal.assign_lsn(); // LSN=2wal.enqueue(lsn2, entry2);
// Flush thread guarantees: entry1 flushed before entry2 (or same batch)3.5 Challenge 5: Performance Tuning Matrix
Tuning Parameters:
| Flush Interval | Batch Size | Expected Throughput | Latency (p99) | Use Case |
|---|---|---|---|---|
| 1ms | 10 | 10,000/sec | 2ms | Low-latency OLTP |
| 5ms | 50 | 50,000/sec | 8ms | Balanced workload |
| 10ms | 100 | 100,000/sec | 15ms | High-throughput analytics |
| 20ms | 500 | 250,000/sec | 30ms | Batch processing |
| 50ms | 1000 | 500,000/sec | 75ms | Data ingestion |
Disk Type Impact:
| Storage | Fsync Latency | Throughput w/o Batching | Throughput w/ Batching (10ms) |
|---|---|---|---|
| HDD (7200 RPM) | 10ms | 100/sec | 10,000/sec (100x) |
| SATA SSD | 1ms | 1,000/sec | 50,000/sec (50x) |
| NVMe SSD | 0.1ms | 10,000/sec | 100,000/sec (10x) |
Benchmark Script:
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn benchmark_flush_intervals(c: &mut Criterion) { let mut group = c.benchmark_group("flush_intervals");
for interval_ms in [1, 5, 10, 20, 50] { group.bench_with_input( BenchmarkId::from_parameter(interval_ms), &interval_ms, |b, &interval| { let config = GroupCommitConfig { max_flush_interval_ms: interval, max_batch_size: 100, ..Default::default() };
let wal = GroupCommitWal::new("/tmp/wal_bench", config).unwrap();
b.iter(|| { let entry = create_test_entry(); black_box(wal.append(entry).unwrap()); }); }, ); }
group.finish();}
fn benchmark_batch_sizes(c: &mut Criterion) { let mut group = c.benchmark_group("batch_sizes");
for batch_size in [10, 50, 100, 500, 1000] { group.bench_with_input( BenchmarkId::from_parameter(batch_size), &batch_size, |b, &size| { let config = GroupCommitConfig { max_flush_interval_ms: 10, max_batch_size: size, ..Default::default() };
let wal = GroupCommitWal::new("/tmp/wal_bench", config).unwrap();
b.iter(|| { let entries: Vec<_> = (0..100) .map(|_| create_test_entry()) .collect(); black_box(wal.append_batch(entries).unwrap()); }); }, ); }
group.finish();}
criterion_group!(benches, benchmark_flush_intervals, benchmark_batch_sizes);criterion_main!(benches);Recommended Defaults:
- OLTP Workload: 5ms interval, 50 batch size
- Mixed Workload: 10ms interval, 100 batch size
- Analytics Workload: 20ms interval, 500 batch size
4. Performance Analysis
4.1 Throughput Model
Current System (No Batching):
Throughput = 1 / fsync_latency = 1 / 10ms = 100 commits/secGroup Commit System:
Throughput = batch_size / fsync_latency = 100 entries / 10ms = 10,000 commits/secImprovement: 100x throughput gain
4.2 Latency Model
Commit Latency Components:
- Enqueue time: ~1μs (lock-free queue push)
- Wait time: 0-10ms (time until next flush)
- Flush time: ~10ms (fsync on HDD)
- Notification time: ~10μs (wake up waiter)
Total Latency:
P50 (median): 5ms (avg wait) + 10ms (flush) = 15msP99 (worst): 10ms (max wait) + 10ms (flush) = 20msP99.9: 10ms + 15ms (slow fsync) = 25msLatency vs Throughput Tradeoff:
Flush Interval ↑ => Latency ↑, Throughput ↑Batch Size ↑ => Throughput ↑4.3 Resource Utilization
CPU Usage:
- Flush thread: ~1-5% CPU (mostly I/O wait)
- Client threads: <0.1% CPU per thread (lock-free operations)
Memory Usage:
- Pending queue: O(max_batch_size) entries
- Per entry: ~100 bytes (estimate)
- Total: ~10KB for batch of 100 entries
I/O Bandwidth:
Bandwidth = (entry_size * batch_size) / flush_interval = (100 bytes * 100 entries) / 10ms = 1 MB/sec4.4 Scalability Analysis
Vertical Scaling (Single Node):
- Current: 10,000 commits/sec
- With NVMe: 100,000 commits/sec
- Bottleneck: Single flush thread becomes CPU-bound at >500K/sec
Horizontal Scaling (Multiple WAL Files):
- Partition by table/shard: Linear scaling
- Independent flush threads: N × 10,000 commits/sec
5. Durability Modes
5.1 Mode Definitions
#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum DurabilityMode { /// Synchronous: fsync before returning from append() /// - Strongest durability guarantee /// - Lowest throughput (~100/sec) /// - Use for: Financial transactions, critical data Synchronous,
/// Group Commit: fsync batched, client must wait_for_lsn() /// - Balanced durability and performance /// - High throughput (~10,000/sec) /// - Use for: Most OLTP workloads GroupCommit,
/// Async: Fire-and-forget, no fsync guarantee /// - No durability guarantee (data may be lost on crash) /// - Highest throughput (~1M/sec, memory-bound) /// - Use for: Caching, telemetry, non-critical logs Async,}5.2 Configuration API
impl GroupCommitWal { /// Set durability mode pub fn set_durability_mode(&mut self, mode: DurabilityMode) { self.config.durability_mode = mode; }
/// Append with explicit durability mode pub async fn append_with_mode( &self, entry: WalEntry, mode: DurabilityMode, ) -> io::Result<Lsn> { let lsn = self.append(entry)?;
match mode { DurabilityMode::Synchronous => { self.wait_for_lsn(lsn).await?; } DurabilityMode::GroupCommit => { // Client decides when to wait } DurabilityMode::Async => { // Fire-and-forget } }
Ok(lsn) }}5.3 Transaction Manager Integration
pub struct TransactionManager { wal: Arc<GroupCommitWal>, durability_mode: DurabilityMode,}
impl TransactionManager { pub async fn commit(&self, txn: &Transaction) -> Result<(), Error> { // Write commit record to WAL let entry = WalEntry { txn_id: txn.id, entry_type: WalEntryType::Commit, data: serialize_transaction(txn), timestamp: current_timestamp(), checksum: 0, // Computed by WAL };
let lsn = self.wal.append(entry)?;
// Wait for durability based on mode match self.durability_mode { DurabilityMode::Synchronous | DurabilityMode::GroupCommit => { self.wal.wait_for_lsn(lsn).await?; } DurabilityMode::Async => { // Don't wait } }
Ok(()) }}6. Integration with Transaction Manager
6.1 Commit Protocol Changes
Before (Synchronous WAL):
pub async fn commit_transaction(txn: Transaction) -> Result<()> { // 1. Write to WAL (blocks on fsync) wal.append(txn.commit_record()).await?; // 10ms
// 2. Mark transaction as committed txn.set_state(TxnState::Committed);
// 3. Release locks txn.release_locks();
Ok(())}After (Group Commit WAL):
pub async fn commit_transaction(txn: Transaction) -> Result<()> { // 1. Write to WAL (returns immediately) let lsn = wal.append(txn.commit_record())?; // 1μs
// 2. Wait for durability (batched with other commits) wal.wait_for_lsn(lsn).await?; // 0-10ms (avg 5ms)
// 3. Mark transaction as committed txn.set_state(TxnState::Committed);
// 4. Release locks txn.release_locks();
Ok(())}Key Change: Append and wait are separate, allowing batching
6.2 Two-Phase Commit (2PC) Integration
2PC Protocol:
Coordinator Participant │ │ ├─────── PREPARE ────────────────>│ │ │ (1) Write PREPARE to WAL │ │ (2) Wait for flush │<────── PREPARED ─────────────────┤ │ │ ├─────── COMMIT ─────────────────>│ │ │ (3) Write COMMIT to WAL │ │ (4) Wait for flush │<────── ACK ──────────────────────┤Group Commit Optimization:
pub async fn two_phase_commit(txn: &DistributedTransaction) -> Result<()> { // Phase 1: PREPARE let prepare_lsns: Vec<Lsn> = participants .iter() .map(|p| p.prepare(txn)) // Returns immediately .collect();
// Wait for all PREPARE flushes (batched together!) for lsn in prepare_lsns { wal.wait_for_lsn(lsn).await?; }
// Phase 2: COMMIT let commit_lsns: Vec<Lsn> = participants .iter() .map(|p| p.commit(txn)) // Returns immediately .collect();
// Wait for all COMMIT flushes (batched together!) for lsn in commit_lsns { wal.wait_for_lsn(lsn).await?; }
Ok(())}Benefit: Multiple participants’ WAL entries batched in single flush
6.3 Integration Points
1. Transaction Coordinator:
pub struct TxnCoordinator { wal: Arc<GroupCommitWal>, active_txns: HashMap<TxnId, Transaction>,}
impl TxnCoordinator { pub async fn begin_txn(&self) -> Result<TxnId> { let txn_id = self.next_txn_id();
let entry = WalEntry::begin(txn_id); let lsn = self.wal.append(entry)?;
// Don't wait for BEGIN (optimization) // Wait only on COMMIT for durability
Ok(txn_id) }
pub async fn commit_txn(&self, txn_id: TxnId) -> Result<()> { let entry = WalEntry::commit(txn_id); let lsn = self.wal.append(entry)?;
// MUST wait for COMMIT (durability critical) self.wal.wait_for_lsn(lsn).await?;
Ok(()) }}2. MVCC Integration:
pub struct MvccManager { wal: Arc<GroupCommitWal>, version_store: VersionStore,}
impl MvccManager { pub async fn create_version(&self, key: &[u8], value: &[u8], txn_id: TxnId) -> Result<()> { // Write to WAL let entry = WalEntry::data(txn_id, key, value); let lsn = self.wal.append(entry)?;
// Create version in memory self.version_store.insert(key, value, txn_id, lsn);
// Don't wait for flush (optimization) // Commit will wait, ensuring durability
Ok(()) }}3. Checkpoint Integration:
pub struct CheckpointManager { wal: Arc<GroupCommitWal>,}
impl CheckpointManager { pub async fn create_checkpoint(&self) -> Result<Lsn> { // Flush all pending WAL entries self.wal.force_flush().await?;
let checkpoint_lsn = self.wal.last_flushed_lsn();
// Write checkpoint marker let entry = WalEntry::checkpoint(checkpoint_lsn); let lsn = self.wal.append(entry)?; self.wal.wait_for_lsn(lsn).await?;
Ok(checkpoint_lsn) }}7. Testing Strategy
7.1 Unit Tests
#[cfg(test)]mod tests { use super::*;
#[test] fn test_lsn_monotonicity() { let wal = GroupCommitWal::new("/tmp/test_wal", Default::default()).unwrap();
let lsn1 = wal.append(test_entry(1)).unwrap(); let lsn2 = wal.append(test_entry(2)).unwrap(); let lsn3 = wal.append(test_entry(3)).unwrap();
assert!(lsn1 < lsn2); assert!(lsn2 < lsn3); }
#[test] fn test_batch_flushing() { let config = GroupCommitConfig { max_batch_size: 10, max_flush_interval_ms: 100, ..Default::default() };
let wal = GroupCommitWal::new("/tmp/test_wal", config).unwrap();
// Write 10 entries (should trigger size-based flush) for i in 0..10 { wal.append(test_entry(i)).unwrap(); }
// Wait for flush std::thread::sleep(Duration::from_millis(50));
let metrics = wal.metrics(); assert_eq!(metrics.total_flushes, 1); assert_eq!(metrics.total_appends, 10); }
#[tokio::test] async fn test_wait_for_lsn() { let wal = GroupCommitWal::new("/tmp/test_wal", Default::default()).unwrap();
let lsn = wal.append(test_entry(1)).unwrap();
// Should complete within flush interval let result = tokio::time::timeout( Duration::from_millis(50), wal.wait_for_lsn(lsn), ).await;
assert!(result.is_ok()); }}7.2 Integration Tests
#[tokio::test]async fn test_concurrent_commits() { let wal = Arc::new(GroupCommitWal::new("/tmp/test_wal", Default::default()).unwrap());
let mut handles = vec![];
// Spawn 100 concurrent writers for i in 0..100 { let wal = Arc::clone(&wal); let handle = tokio::spawn(async move { for j in 0..100 { let lsn = wal.append(test_entry(i * 100 + j)).unwrap(); wal.wait_for_lsn(lsn).await.unwrap(); } }); handles.push(handle); }
// Wait for all writers for handle in handles { handle.await.unwrap(); }
// Verify all 10,000 entries written let metrics = wal.metrics(); assert_eq!(metrics.total_appends, 10_000);}
#[test]fn test_recovery_from_corruption() { let path = "/tmp/test_wal_recovery";
// Write some entries { let wal = GroupCommitWal::new(path, Default::default()).unwrap(); for i in 0..10 { wal.append(test_entry(i)).unwrap(); } wal.shutdown().unwrap(); }
// Corrupt the file (truncate mid-entry) { let mut file = std::fs::OpenOptions::new() .write(true) .open(path) .unwrap(); file.set_len(500).unwrap(); // Arbitrary mid-point }
// Recover let result = GroupCommitWal::recover(path).unwrap();
// Should recover valid entries only assert!(result.entries.len() < 10); assert!(result.corrupted_at.is_some());}7.3 Chaos Tests
#[tokio::test]async fn test_crash_during_flush() { let wal = Arc::new(GroupCommitWal::new("/tmp/test_wal_chaos", Default::default()).unwrap());
// Start writing let wal_clone = Arc::clone(&wal); let writer = tokio::spawn(async move { for i in 0..1000 { wal_clone.append(test_entry(i)).unwrap(); tokio::time::sleep(Duration::from_micros(100)).await; } });
// Simulate crash after random delay tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 100)).await; std::process::abort(); // Simulate hard crash
// In separate test run: // - Recover from WAL // - Verify all flushed entries are present // - Verify no corrupted entries}
#[test]fn test_fsync_failure_handling() { // Mock filesystem that fails fsync randomly let wal = GroupCommitWal::new("/tmp/test_wal_fsync_fail", Default::default()).unwrap();
for i in 0..100 { match wal.append(test_entry(i)) { Ok(lsn) => { match wal.wait_for_lsn(lsn).await { Ok(_) => { // Entry is durable } Err(e) => { // Fsync failed, entry NOT durable // Transaction should be aborted } } } Err(e) => { // Write failed } } }}7.4 Performance Tests
#[bench]fn bench_throughput_no_wait(b: &mut Bencher) { let wal = GroupCommitWal::new("/tmp/bench_wal", Default::default()).unwrap();
b.iter(|| { // Measure append-only throughput wal.append(test_entry(1)).unwrap(); });}
#[bench]fn bench_throughput_with_wait(b: &mut Bencher) { let wal = GroupCommitWal::new("/tmp/bench_wal", Default::default()).unwrap();
b.iter(|| { // Measure end-to-end commit throughput let lsn = wal.append(test_entry(1)).unwrap(); wal.wait_for_lsn(lsn).await.unwrap(); });}
#[bench]fn bench_latency_percentiles(b: &mut Bencher) { let wal = GroupCommitWal::new("/tmp/bench_wal", Default::default()).unwrap(); let mut latencies = vec![];
for _ in 0..10_000 { let start = Instant::now(); let lsn = wal.append(test_entry(1)).unwrap(); wal.wait_for_lsn(lsn).await.unwrap(); latencies.push(start.elapsed()); }
latencies.sort(); println!("P50: {:?}", latencies[5000]); println!("P99: {:?}", latencies[9900]); println!("P99.9: {:?}", latencies[9990]);}7.5 Test Coverage Matrix
| Category | Test Case | Success Criteria |
|---|---|---|
| Correctness | LSN monotonicity | LSNs always increase |
| Durability guarantee | Committed data survives crash | |
| Batch atomicity | All or none in batch | |
| Recovery correctness | All flushed entries recovered | |
| Performance | Throughput | >10,000 commits/sec |
| Latency P99 | <20ms | |
| Fsync reduction | >90% fewer fsyncs | |
| Concurrency | Concurrent writes | No data races |
| Lock-free append | No contention | |
| Waiter notification | All waiters notified | |
| Failure | Crash during flush | Truncate to valid boundary |
| Fsync failure | Batch rollback | |
| Disk full | Graceful error | |
| Integration | Transaction commit | ACID guarantees |
| 2PC protocol | Distributed correctness | |
| MVCC versioning | Correct version ordering |
8. Implementation Plan
Phase 1: Basic Group Commit (1.5 days)
Day 1 Morning:
- Implement core data structures (Lsn, WalEntry, GroupCommitWal)
- Implement atomic LSN assignment
- Implement lock-free pending queue
Day 1 Afternoon:
- Implement flush thread logic
- Implement time-based + size-based batching
- Implement WAL entry serialization/deserialization
Day 2 Morning:
- Implement waiter notification mechanism
- Implement wait_for_lsn() API
- Implement durability modes
Day 2 Afternoon:
- Unit tests for batching logic
- Unit tests for LSN assignment
- Initial benchmarking
Phase 2: Recovery Protocol (0.5 days)
Day 3 Morning:
- Implement WAL recovery algorithm
- Implement checksum validation
- Implement file truncation on corruption
Day 3 Afternoon:
- Recovery integration tests
- Corruption handling tests
Phase 3: Integration Testing (0.5 days)
Day 4 Morning:
- Integration with transaction manager
- Integration with MVCC
- End-to-end transaction tests
Day 4 Afternoon:
- Concurrent commit tests
- 2PC integration tests
- Chaos testing
Phase 4: Tuning (0.5 days)
Day 5 Morning:
- Benchmark different flush intervals
- Benchmark different batch sizes
- Create tuning matrix
Day 5 Afternoon:
- Document optimal parameters
- Create configuration guide
- Performance regression tests
Total Timeline: 3 days
9. Configuration Tuning Guide
9.1 Workload-Based Tuning
OLTP (Low Latency):
[wal.group_commit]max_flush_interval_ms = 5max_batch_size = 50durability_mode = "group_commit"adaptive_tuning = trueAnalytics (High Throughput):
[wal.group_commit]max_flush_interval_ms = 20max_batch_size = 500durability_mode = "group_commit"adaptive_tuning = falseMixed Workload:
[wal.group_commit]max_flush_interval_ms = 10max_batch_size = 100durability_mode = "group_commit"adaptive_tuning = trueBatch Processing:
[wal.group_commit]max_flush_interval_ms = 50max_batch_size = 1000durability_mode = "async" # Non-critical dataadaptive_tuning = false9.2 Hardware-Based Tuning
HDD (High fsync latency):
[wal.group_commit]max_flush_interval_ms = 20 # Maximize batchingmax_batch_size = 500SATA SSD (Medium latency):
[wal.group_commit]max_flush_interval_ms = 10max_batch_size = 100NVMe SSD (Low latency):
[wal.group_commit]max_flush_interval_ms = 5 # Lower latency prioritymax_batch_size = 509.3 Monitoring and Alerts
Key Metrics:
pub struct WalMetrics { // Throughput pub commits_per_sec: f64, pub bytes_per_sec: f64,
// Latency pub commit_latency_p50: Duration, pub commit_latency_p99: Duration, pub commit_latency_p999: Duration,
// Batching efficiency pub avg_batch_size: f64, pub flush_per_sec: f64, pub fsync_reduction_pct: f64,
// Queue health pub pending_queue_depth: usize, pub max_queue_depth: usize,}Alerts:
- High latency: P99 > 50ms → Increase flush frequency or check disk
- Small batches: Avg batch size < 10 → Increase flush interval
- Queue buildup: Pending queue > 1000 → Disk bottleneck, scale storage
- Low throughput: Commits/sec < 1000 → Check batching configuration
10. Risk Analysis and Mitigation
10.1 Risks
| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
| Data loss on crash | Low | Critical | Checksum validation, recovery protocol |
| Latency spike | Medium | High | Adaptive tuning, configurable intervals |
| Queue buildup | Medium | Medium | Back-pressure, monitoring |
| Flush thread failure | Low | Critical | Watchdog, automatic restart |
| Disk full | Medium | Critical | Graceful degradation, alerts |
10.2 Failure Modes
1. Flush Thread Crash:
- Detection: Watchdog monitors thread health
- Recovery: Restart flush thread, replay pending queue
- Prevention: Robust error handling, panic guards
2. Disk Corruption:
- Detection: Checksum validation on recovery
- Recovery: Truncate to last valid entry
- Prevention: Hardware RAID, checksums
3. Queue Overflow:
- Detection: Monitor queue depth
- Recovery: Apply back-pressure to writers
- Prevention: Bounded queue, flow control
10.3 Correctness Invariants
Invariant 1: LSN Monotonicity
∀ entry_i, entry_j: LSN(entry_i) < LSN(entry_j) ⇒ entry_i submitted before entry_jInvariant 2: Flush Atomicity
∀ batch: (All entries flushed) ∨ (No entries flushed)Invariant 3: Durability Guarantee
∀ lsn: last_flushed_lsn ≥ lsn ⇒ entry(lsn) is durableInvariant 4: Recovery Correctness
∀ recovered_entries: ∀ entry ∈ recovered_entries ⇒ checksum(entry) is valid11. Future Enhancements
11.1 Adaptive Flushing (Phase 2)
Dynamically adjust flush interval based on workload:
- Monitor batch sizes
- Adjust interval to maintain target batch size
- React to latency SLA violations
11.2 Parallel Flush Threads (Phase 3)
For very high write rates (>100K/sec):
- Multiple flush threads
- Partition entries by hash
- Independent flush queues
11.3 Remote WAL (Phase 4)
Replicate WAL to remote storage:
- S3/Object storage backup
- Cross-region replication
- Disaster recovery
11.4 WAL Compression (Phase 5)
Compress WAL entries before write:
- LZ4 compression for speed
- Reduce I/O bandwidth
- Trade CPU for I/O
12. Conclusion
This Group Commit WAL design provides:
Performance:
- 5-10x throughput improvement
- 90-95% fsync reduction
- Predictable latency (P99 < 20ms)
Correctness:
- Full ACID guarantees
- Atomic batch commits
- Robust recovery protocol
Flexibility:
- Multiple durability modes
- Tunable for different workloads
- Easy integration with existing transaction manager
Production-Ready:
- Comprehensive testing strategy
- Clear monitoring and alerting
- Risk mitigation plans
Next Steps:
- Review and approve architecture
- Begin Phase 1 implementation
- Benchmark on target hardware
- Integrate with transaction manager
- Deploy to staging environment
Appendix A: References
- PostgreSQL Group Commit: https://www.postgresql.org/docs/current/wal-async-commit.html
- MySQL Group Commit: https://dev.mysql.com/doc/refman/8.0/en/innodb-group-commit.html
- “The Design and Implementation of Modern Column-Oriented Database Systems” (Abadi et al.)
- “Transactional Information Systems” (Weikum & Vossen) - Chapter on WAL
Appendix B: Glossary
- LSN: Logical Sequence Number - monotonically increasing identifier for WAL entries
- Fsync: File system synchronization - forces buffered writes to persistent storage
- Group Commit: Batching multiple transaction commits into a single fsync
- Durability: ACID property - committed data survives system failures
- Two-Phase Commit (2PC): Distributed transaction protocol
- MVCC: Multi-Version Concurrency Control
- WAL: Write-Ahead Log - transaction log written before data modifications
Document Status: Ready for Implementation Approval Required: Architecture Team, Performance Team Implementation Start: Upon approval