Skip to content

Group Commit WAL Architecture Specification

Group Commit WAL Architecture Specification

Document Version: 1.0 Date: 2025-11-10 Status: Design Specification Priority: P0 - Critical Performance Optimization

Executive Summary

This document specifies the architecture for Group Commit Write-Ahead Logging (WAL), designed to reduce fsync overhead by batching multiple transaction commits into a single disk flush. This optimization addresses the #2 performance bottleneck in HeliosDB.

Key Metrics:

  • Expected Throughput Gain: 5-10x improvement
  • Expected Commit Rate: 10,000+ commits/sec (vs current 1,000)
  • Fsync Reduction: 90-95% fewer fsync calls
  • Latency Impact: +5-10ms max (configurable)
  • Durability: Full ACID guarantees maintained

1. Architecture Decisions

1.1 Batching Strategy: Hybrid Approach

Decision: Implement hybrid time-based + size-based batching

Rationale:

  • Time-based (10ms default): Ensures bounded commit latency
  • Size-based (100 entries default): Prevents memory bloat, ensures I/O efficiency
  • Hybrid: Flush when EITHER condition is met

Configuration:

pub struct GroupCommitConfig {
/// Maximum time to wait before flushing (milliseconds)
pub max_flush_interval_ms: u64,
/// Maximum entries in a batch before forcing flush
pub max_batch_size: usize,
/// Minimum batch size (optimization: skip flush if below threshold)
pub min_batch_size: usize,
/// Durability mode
pub durability_mode: DurabilityMode,
/// Enable adaptive tuning
pub adaptive_tuning: bool,
}
impl Default for GroupCommitConfig {
fn default() -> Self {
Self {
max_flush_interval_ms: 10,
max_batch_size: 100,
min_batch_size: 1,
durability_mode: DurabilityMode::GroupCommit,
adaptive_tuning: false,
}
}
}

1.2 Durability Guarantees: Two-Phase Commitment

Decision: Two-phase protocol with notification waiters

Protocol:

  1. Phase 1 - Log: Write to WAL buffer, assign LSN, return immediately
  2. Phase 2 - Flush: Batch fsync, then notify all waiters

Guarantee: Transaction is durable when waiter is notified, NOT when append() returns

Rationale:

  • Maintains ACID durability guarantees
  • Enables batching for throughput
  • Clients choose to wait or not based on durability requirements

1.3 Thread Model: Dedicated Flush Thread

Decision: Single dedicated flush thread with lock-free queue

Architecture:

┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Client │────>│ Append Queue │────>│ Flush Thread│
│ Threads │ │ (Lock-free) │ │ │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
│ ▼
│ ┌─────────────┐
│ │ WAL File │
│ │ (fsync) │
│ └─────────────┘
│ │
│ ▼
│ ┌─────────────┐
└─────────────>│ Waiters Map │
│ (Notify) │
└─────────────┘

Rationale:

  • Eliminates lock contention on hot path (append)
  • Dedicated thread optimizes for I/O batching
  • Lock-free queue provides predictable latency
  • Simple failure recovery (single point of failure isolation)

Alternative Considered: Work-stealing thread pool

  • Rejected: Adds complexity, coordination overhead
  • Use Case: Only beneficial for very high write rates (>100K/sec)

1.4 Failure Handling: Batch Abort + Recovery

Decision: Atomic batch commit with rollback on failure

Protocol:

  1. Write all entries to WAL file (buffered)
  2. Single fsync for entire batch
  3. If fsync fails:
    • Mark all entries in batch as failed
    • Notify waiters with error
    • Trigger recovery protocol
  4. If fsync succeeds:
    • Update commit LSN
    • Notify all waiters with success

Rationale:

  • Atomic batch semantics simplify reasoning
  • Clear failure boundaries
  • No partial commits visible to clients

2. Interface Design

2.1 Core Data Structures

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::{Mutex, Notify, RwLock};
use crossbeam::queue::SegQueue;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::fs::File;
use std::io::{self, Write, BufWriter};
use std::time::{Duration, Instant};
/// Logical Sequence Number - monotonically increasing
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Lsn(u64);
impl Lsn {
pub fn new(value: u64) -> Self {
Self(value)
}
pub fn value(&self) -> u64 {
self.0
}
}
/// Write-Ahead Log Entry
#[derive(Debug, Clone)]
pub struct WalEntry {
/// Transaction ID
pub txn_id: u64,
/// Entry type (Begin, Commit, Abort, Data)
pub entry_type: WalEntryType,
/// Serialized data
pub data: Vec<u8>,
/// Timestamp
pub timestamp: u64,
/// Checksum (CRC32)
pub checksum: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WalEntryType {
Begin,
Data,
Commit,
Abort,
Checkpoint,
}
/// Durability modes
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DurabilityMode {
/// Synchronous: Wait for fsync before returning
Synchronous,
/// Group Commit: Batch fsync (default)
GroupCommit,
/// Async: Fire-and-forget (no durability guarantee)
Async,
}
/// Pending entry waiting for flush
struct PendingEntry {
lsn: Lsn,
entry: WalEntry,
notify: Arc<Notify>,
result: Arc<RwLock<Option<io::Result<()>>>>,
}
/// Group Commit WAL
pub struct GroupCommitWal {
/// Configuration
config: GroupCommitConfig,
/// WAL file path
path: PathBuf,
/// WAL file handle (managed by flush thread)
file: Arc<Mutex<BufWriter<File>>>,
/// Lock-free pending queue
pending_queue: Arc<SegQueue<PendingEntry>>,
/// LSN counter (atomic)
lsn_counter: Arc<AtomicU64>,
/// Last flushed LSN
last_flushed_lsn: Arc<AtomicU64>,
/// Flush thread handle
flush_thread: Option<std::thread::JoinHandle<()>>,
/// Shutdown signal
shutdown: Arc<AtomicBool>,
/// Metrics
metrics: Arc<WalMetrics>,
}
/// WAL Metrics
#[derive(Debug, Default)]
pub struct WalMetrics {
/// Total appends
pub total_appends: AtomicU64,
/// Total flushes
pub total_flushes: AtomicU64,
/// Total bytes written
pub total_bytes_written: AtomicU64,
/// Batch size histogram
pub batch_sizes: RwLock<Vec<usize>>,
/// Flush latency histogram (microseconds)
pub flush_latencies: RwLock<Vec<u64>>,
/// Commit latency histogram (microseconds)
pub commit_latencies: RwLock<Vec<u64>>,
}

2.2 Public API

impl GroupCommitWal {
/// Create new Group Commit WAL
pub fn new(
path: impl AsRef<Path>,
config: GroupCommitConfig,
) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
// Open WAL file with direct I/O hints
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let file = Arc::new(Mutex::new(BufWriter::with_capacity(
1024 * 1024, // 1MB buffer
file,
)));
let pending_queue = Arc::new(SegQueue::new());
let lsn_counter = Arc::new(AtomicU64::new(1));
let last_flushed_lsn = Arc::new(AtomicU64::new(0));
let shutdown = Arc::new(AtomicBool::new(false));
let metrics = Arc::new(WalMetrics::default());
// Spawn flush thread
let flush_thread = Self::spawn_flush_thread(
config.clone(),
Arc::clone(&file),
Arc::clone(&pending_queue),
Arc::clone(&last_flushed_lsn),
Arc::clone(&shutdown),
Arc::clone(&metrics),
);
Ok(Self {
config,
path,
file,
pending_queue,
lsn_counter,
last_flushed_lsn,
flush_thread: Some(flush_thread),
shutdown,
metrics,
})
}
/// Append single entry to WAL
///
/// Returns LSN immediately. For durability guarantee, call wait_for_lsn(lsn).
pub fn append(&self, entry: WalEntry) -> io::Result<Lsn> {
let start = Instant::now();
// Assign LSN atomically
let lsn = Lsn::new(self.lsn_counter.fetch_add(1, Ordering::SeqCst));
// Create notification channel
let notify = Arc::new(Notify::new());
let result = Arc::new(RwLock::new(None));
// Enqueue for flush
let pending = PendingEntry {
lsn,
entry,
notify,
result,
};
self.pending_queue.push(pending);
// Update metrics
self.metrics.total_appends.fetch_add(1, Ordering::Relaxed);
Ok(lsn)
}
/// Append batch of entries
pub fn append_batch(&self, entries: Vec<WalEntry>) -> io::Result<Vec<Lsn>> {
let mut lsns = Vec::with_capacity(entries.len());
for entry in entries {
let lsn = self.append(entry)?;
lsns.push(lsn);
}
Ok(lsns)
}
/// Wait for LSN to be flushed (durability guarantee)
pub async fn wait_for_lsn(&self, lsn: Lsn) -> io::Result<()> {
// Check if already flushed
if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() {
return Ok(());
}
// Wait for flush notification
// Note: This is simplified - real implementation needs waiter registry
loop {
tokio::time::sleep(Duration::from_micros(100)).await;
if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() {
return Ok(());
}
}
}
/// Force immediate flush (bypasses batching)
pub async fn force_flush(&self) -> io::Result<()> {
// Signal flush thread to flush immediately
// Implementation: Use channel or atomic flag
todo!("Implement immediate flush signal")
}
/// Get last flushed LSN
pub fn last_flushed_lsn(&self) -> Lsn {
Lsn::new(self.last_flushed_lsn.load(Ordering::Acquire))
}
/// Recover WAL from file
pub fn recover(path: impl AsRef<Path>) -> io::Result<Vec<WalEntry>> {
let mut entries = Vec::new();
let file = std::fs::File::open(path)?;
let mut reader = std::io::BufReader::new(file);
// Read entries until EOF
loop {
match Self::read_entry(&mut reader) {
Ok(entry) => entries.push(entry),
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
}
Ok(entries)
}
/// Shutdown WAL gracefully
pub fn shutdown(&mut self) -> io::Result<()> {
// Signal shutdown
self.shutdown.store(true, Ordering::Release);
// Wait for flush thread
if let Some(handle) = self.flush_thread.take() {
handle.join().expect("Flush thread panicked");
}
// Final flush
let mut file = self.file.blocking_lock();
file.flush()?;
file.get_mut().sync_all()?;
Ok(())
}
/// Get metrics snapshot
pub fn metrics(&self) -> WalMetricsSnapshot {
WalMetricsSnapshot {
total_appends: self.metrics.total_appends.load(Ordering::Relaxed),
total_flushes: self.metrics.total_flushes.load(Ordering::Relaxed),
total_bytes_written: self.metrics.total_bytes_written.load(Ordering::Relaxed),
}
}
}

2.3 Flush Thread Implementation

impl GroupCommitWal {
fn spawn_flush_thread(
config: GroupCommitConfig,
file: Arc<Mutex<BufWriter<File>>>,
pending_queue: Arc<SegQueue<PendingEntry>>,
last_flushed_lsn: Arc<AtomicU64>,
shutdown: Arc<AtomicBool>,
metrics: Arc<WalMetrics>,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let flush_interval = Duration::from_millis(config.max_flush_interval_ms);
let max_batch_size = config.max_batch_size;
let mut last_flush = Instant::now();
let mut batch = Vec::with_capacity(max_batch_size);
loop {
// Check shutdown
if shutdown.load(Ordering::Acquire) {
// Final flush
if !batch.is_empty() {
Self::flush_batch(&file, &batch, &last_flushed_lsn, &metrics);
}
break;
}
// Collect batch
while batch.len() < max_batch_size {
match pending_queue.pop() {
Some(entry) => batch.push(entry),
None => break,
}
}
// Flush conditions
let should_flush = !batch.is_empty() && (
batch.len() >= max_batch_size ||
last_flush.elapsed() >= flush_interval
);
if should_flush {
Self::flush_batch(&file, &batch, &last_flushed_lsn, &metrics);
batch.clear();
last_flush = Instant::now();
} else {
// Sleep briefly to avoid busy-wait
std::thread::sleep(Duration::from_micros(100));
}
}
})
}
fn flush_batch(
file: &Arc<Mutex<BufWriter<File>>>,
batch: &[PendingEntry],
last_flushed_lsn: &Arc<AtomicU64>,
metrics: &Arc<WalMetrics>,
) {
let start = Instant::now();
let mut file = file.blocking_lock();
// Write all entries
let mut max_lsn = 0u64;
let mut total_bytes = 0usize;
for pending in batch {
match Self::write_entry(&mut *file, pending.lsn, &pending.entry) {
Ok(bytes) => {
total_bytes += bytes;
max_lsn = max_lsn.max(pending.lsn.value());
}
Err(e) => {
eprintln!("WAL write error: {}", e);
// Mark as failed
continue;
}
}
}
// Single fsync for entire batch
match file.flush().and_then(|_| file.get_mut().sync_all()) {
Ok(_) => {
// Update last flushed LSN
last_flushed_lsn.store(max_lsn, Ordering::Release);
// Notify all waiters (simplified - need waiter registry)
// for pending in batch {
// pending.notify.notify_waiters();
// }
// Update metrics
metrics.total_flushes.fetch_add(1, Ordering::Relaxed);
metrics.total_bytes_written.fetch_add(total_bytes as u64, Ordering::Relaxed);
let latency = start.elapsed().as_micros() as u64;
metrics.flush_latencies.blocking_write().push(latency);
metrics.batch_sizes.blocking_write().push(batch.len());
}
Err(e) => {
eprintln!("WAL fsync error: {}", e);
// All entries in batch failed
// Notify waiters with error
}
}
}
fn write_entry(
file: &mut BufWriter<File>,
lsn: Lsn,
entry: &WalEntry,
) -> io::Result<usize> {
// Format: | LSN (8) | Type (1) | TxnID (8) | Timestamp (8) | DataLen (4) | Data (N) | Checksum (4) |
let mut bytes_written = 0;
// LSN
file.write_all(&lsn.value().to_le_bytes())?;
bytes_written += 8;
// Entry type
file.write_all(&[entry.entry_type as u8])?;
bytes_written += 1;
// Transaction ID
file.write_all(&entry.txn_id.to_le_bytes())?;
bytes_written += 8;
// Timestamp
file.write_all(&entry.timestamp.to_le_bytes())?;
bytes_written += 8;
// Data length
file.write_all(&(entry.data.len() as u32).to_le_bytes())?;
bytes_written += 4;
// Data
file.write_all(&entry.data)?;
bytes_written += entry.data.len();
// Checksum
file.write_all(&entry.checksum.to_le_bytes())?;
bytes_written += 4;
Ok(bytes_written)
}
fn read_entry(reader: &mut impl std::io::Read) -> io::Result<WalEntry> {
use std::io::Read;
// Read LSN
let mut lsn_buf = [0u8; 8];
reader.read_exact(&mut lsn_buf)?;
let _lsn = u64::from_le_bytes(lsn_buf);
// Read entry type
let mut type_buf = [0u8; 1];
reader.read_exact(&mut type_buf)?;
let entry_type = match type_buf[0] {
0 => WalEntryType::Begin,
1 => WalEntryType::Data,
2 => WalEntryType::Commit,
3 => WalEntryType::Abort,
4 => WalEntryType::Checkpoint,
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid entry type")),
};
// Read transaction ID
let mut txn_buf = [0u8; 8];
reader.read_exact(&mut txn_buf)?;
let txn_id = u64::from_le_bytes(txn_buf);
// Read timestamp
let mut ts_buf = [0u8; 8];
reader.read_exact(&mut ts_buf)?;
let timestamp = u64::from_le_bytes(ts_buf);
// Read data length
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf)?;
let data_len = u32::from_le_bytes(len_buf) as usize;
// Read data
let mut data = vec![0u8; data_len];
reader.read_exact(&mut data)?;
// Read checksum
let mut checksum_buf = [0u8; 4];
reader.read_exact(&mut checksum_buf)?;
let checksum = u32::from_le_bytes(checksum_buf);
Ok(WalEntry {
txn_id,
entry_type,
data,
timestamp,
checksum,
})
}
}
#[derive(Debug, Clone)]
pub struct WalMetricsSnapshot {
pub total_appends: u64,
pub total_flushes: u64,
pub total_bytes_written: u64,
}

3. Key Design Challenges - Solutions

3.1 Challenge 1: Durability Guarantee

Recommendation: Two-phase protocol with explicit wait

Solution:

// Client code for durable commit
let lsn = wal.append(entry)?; // Phase 1: Log (fast, async)
wal.wait_for_lsn(lsn).await?; // Phase 2: Wait for flush (durability)

Rationale:

  • Separation of concerns: Logging vs durability
  • Flexibility: Clients choose their durability requirements
  • Performance: Non-critical writes can skip wait
  • Correctness: Explicit contract, no hidden behavior

Durability Modes:

  1. Synchronous Mode: wait_for_lsn() called automatically
  2. Group Commit Mode: wait_for_lsn() called by client
  3. Async Mode: no wait (fire-and-forget)

3.2 Challenge 2: Failure Recovery

Recovery Protocol State Machine:

┌─────────────┐
│ START │
└──────┬──────┘
┌─────────────┐
│ Open WAL │
│ File │
└──────┬──────┘
┌─────────────┐
│ Read Entry │<───────┐
└──────┬──────┘ │
│ │
├──EOF──────────┼─────> Done
│ │
▼ │
┌─────────────┐ │
│ Validate │ │
│ Checksum │ │
└──────┬──────┘ │
│ │
├──FAIL─────────┼─────> Truncate & Stop
│ │
▼ │
┌─────────────┐ │
│ Apply to │ │
│ State │ │
└──────┬──────┘ │
│ │
└───────────────┘

Recovery Algorithm:

pub fn recover_wal(path: impl AsRef<Path>) -> io::Result<RecoveryResult> {
let mut entries = Vec::new();
let mut last_valid_offset = 0u64;
let mut corrupted_at = None;
let file = std::fs::File::open(&path)?;
let mut reader = std::io::BufReader::new(file);
loop {
let offset = reader.stream_position()?;
match Self::read_entry(&mut reader) {
Ok(entry) => {
// Validate checksum
let computed = crc32(&entry.data);
if computed != entry.checksum {
corrupted_at = Some(offset);
break;
}
entries.push(entry);
last_valid_offset = reader.stream_position()?;
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
// Normal EOF
break;
}
Err(_) => {
// Corruption detected
corrupted_at = Some(offset);
break;
}
}
}
// Truncate WAL if corruption detected
if let Some(corrupt_offset) = corrupted_at {
let file = std::fs::OpenOptions::new()
.write(true)
.open(&path)?;
file.set_len(last_valid_offset)?;
}
Ok(RecoveryResult {
entries,
last_valid_offset,
corrupted_at,
})
}
pub struct RecoveryResult {
pub entries: Vec<WalEntry>,
pub last_valid_offset: u64,
pub corrupted_at: Option<u64>,
}

Failure Scenarios:

ScenarioDetectionRecovery
Crash during writePartial entry (EOF mid-entry)Truncate at last valid entry
Crash during fsyncChecksum mismatchTruncate at last valid entry
Disk corruptionChecksum mismatchTruncate at last valid entry
Partial batch flushSome entries flushed, some notAll-or-nothing batch semantics

Key Invariants:

  1. All entries before last_flushed_lsn are durable
  2. Entries after last_flushed_lsn may be lost on crash
  3. Partial entries are never visible (checksum validation)
  4. WAL file is always truncated to valid boundary

3.3 Challenge 3: Flush Thread Management

Recommendation: Fixed interval with adaptive enhancement (Phase 2)

Phase 1 - Fixed Interval (Simple):

// Flush every 10ms OR when batch reaches 100 entries
let flush_interval = Duration::from_millis(10);
let max_batch_size = 100;
loop {
let batch = collect_batch(max_batch_size, flush_interval);
if !batch.is_empty() {
flush_batch(batch);
}
}

Phase 2 - Adaptive Interval (Advanced):

pub struct AdaptiveFlusher {
min_interval: Duration, // 1ms
max_interval: Duration, // 20ms
current_interval: Duration,
// Adjust based on metrics
avg_batch_size: f64,
target_batch_size: usize,
}
impl AdaptiveFlusher {
fn adjust_interval(&mut self, metrics: &WalMetrics) {
// If batches are small, increase interval to collect more
if self.avg_batch_size < self.target_batch_size as f64 * 0.5 {
self.current_interval = (self.current_interval * 1.2)
.min(self.max_interval);
}
// If batches are large, decrease interval to reduce latency
else if self.avg_batch_size > self.target_batch_size as f64 * 0.9 {
self.current_interval = (self.current_interval * 0.8)
.max(self.min_interval);
}
}
}

Rationale:

  • Start simple: Fixed interval is predictable, easy to reason about
  • Optimize later: Adaptive tuning for specific workloads
  • Single thread: Eliminates coordination overhead
  • Future: Multiple flush threads for >100K writes/sec workloads

Alternative - Multiple Flush Threads: Only beneficial when:

  • Write rate exceeds 100K/sec
  • Single fsync latency becomes bottleneck
  • Parallel I/O subsystem (NVMe, SSD array)

3.4 Challenge 4: LSN Assignment Protocol

Design: Atomic counter with monotonic guarantee

LSN Assignment:

impl GroupCommitWal {
pub fn assign_lsn(&self) -> Lsn {
// Atomic fetch-add ensures:
// 1. Unique LSN per entry
// 2. Monotonically increasing
// 3. Thread-safe without locks
let lsn_value = self.lsn_counter.fetch_add(1, Ordering::SeqCst);
Lsn::new(lsn_value)
}
}

Ordering Guarantees:

  1. LSN Assignment Order: Matches entry submission order (atomic counter)
  2. Flush Order: Batches flushed in LSN order
  3. Visibility Order: Entries visible after flush in LSN order

Key Invariant: LSN monotonicity

LSN(entry_i) < LSN(entry_j) => entry_i flushed before or with entry_j

Recovery Consistency:

  • During recovery, entries are replayed in LSN order
  • Gaps in LSN sequence indicate lost entries (crash before flush)
  • Last flushed LSN determines recovery point

Multi-threaded Correctness:

// Thread 1
let lsn1 = wal.assign_lsn(); // LSN=1
wal.enqueue(lsn1, entry1);
// Thread 2
let lsn2 = wal.assign_lsn(); // LSN=2
wal.enqueue(lsn2, entry2);
// Flush thread guarantees: entry1 flushed before entry2 (or same batch)

3.5 Challenge 5: Performance Tuning Matrix

Tuning Parameters:

Flush IntervalBatch SizeExpected ThroughputLatency (p99)Use Case
1ms1010,000/sec2msLow-latency OLTP
5ms5050,000/sec8msBalanced workload
10ms100100,000/sec15msHigh-throughput analytics
20ms500250,000/sec30msBatch processing
50ms1000500,000/sec75msData ingestion

Disk Type Impact:

StorageFsync LatencyThroughput w/o BatchingThroughput w/ Batching (10ms)
HDD (7200 RPM)10ms100/sec10,000/sec (100x)
SATA SSD1ms1,000/sec50,000/sec (50x)
NVMe SSD0.1ms10,000/sec100,000/sec (10x)

Benchmark Script:

benchmarks/group_commit_tuning.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn benchmark_flush_intervals(c: &mut Criterion) {
let mut group = c.benchmark_group("flush_intervals");
for interval_ms in [1, 5, 10, 20, 50] {
group.bench_with_input(
BenchmarkId::from_parameter(interval_ms),
&interval_ms,
|b, &interval| {
let config = GroupCommitConfig {
max_flush_interval_ms: interval,
max_batch_size: 100,
..Default::default()
};
let wal = GroupCommitWal::new("/tmp/wal_bench", config).unwrap();
b.iter(|| {
let entry = create_test_entry();
black_box(wal.append(entry).unwrap());
});
},
);
}
group.finish();
}
fn benchmark_batch_sizes(c: &mut Criterion) {
let mut group = c.benchmark_group("batch_sizes");
for batch_size in [10, 50, 100, 500, 1000] {
group.bench_with_input(
BenchmarkId::from_parameter(batch_size),
&batch_size,
|b, &size| {
let config = GroupCommitConfig {
max_flush_interval_ms: 10,
max_batch_size: size,
..Default::default()
};
let wal = GroupCommitWal::new("/tmp/wal_bench", config).unwrap();
b.iter(|| {
let entries: Vec<_> = (0..100)
.map(|_| create_test_entry())
.collect();
black_box(wal.append_batch(entries).unwrap());
});
},
);
}
group.finish();
}
criterion_group!(benches, benchmark_flush_intervals, benchmark_batch_sizes);
criterion_main!(benches);

Recommended Defaults:

  • OLTP Workload: 5ms interval, 50 batch size
  • Mixed Workload: 10ms interval, 100 batch size
  • Analytics Workload: 20ms interval, 500 batch size

4. Performance Analysis

4.1 Throughput Model

Current System (No Batching):

Throughput = 1 / fsync_latency
= 1 / 10ms
= 100 commits/sec

Group Commit System:

Throughput = batch_size / fsync_latency
= 100 entries / 10ms
= 10,000 commits/sec

Improvement: 100x throughput gain

4.2 Latency Model

Commit Latency Components:

  1. Enqueue time: ~1μs (lock-free queue push)
  2. Wait time: 0-10ms (time until next flush)
  3. Flush time: ~10ms (fsync on HDD)
  4. Notification time: ~10μs (wake up waiter)

Total Latency:

P50 (median): 5ms (avg wait) + 10ms (flush) = 15ms
P99 (worst): 10ms (max wait) + 10ms (flush) = 20ms
P99.9: 10ms + 15ms (slow fsync) = 25ms

Latency vs Throughput Tradeoff:

Flush Interval ↑ => Latency ↑, Throughput ↑
Batch Size ↑ => Throughput ↑

4.3 Resource Utilization

CPU Usage:

  • Flush thread: ~1-5% CPU (mostly I/O wait)
  • Client threads: <0.1% CPU per thread (lock-free operations)

Memory Usage:

  • Pending queue: O(max_batch_size) entries
  • Per entry: ~100 bytes (estimate)
  • Total: ~10KB for batch of 100 entries

I/O Bandwidth:

Bandwidth = (entry_size * batch_size) / flush_interval
= (100 bytes * 100 entries) / 10ms
= 1 MB/sec

4.4 Scalability Analysis

Vertical Scaling (Single Node):

  • Current: 10,000 commits/sec
  • With NVMe: 100,000 commits/sec
  • Bottleneck: Single flush thread becomes CPU-bound at >500K/sec

Horizontal Scaling (Multiple WAL Files):

  • Partition by table/shard: Linear scaling
  • Independent flush threads: N × 10,000 commits/sec

5. Durability Modes

5.1 Mode Definitions

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DurabilityMode {
/// Synchronous: fsync before returning from append()
/// - Strongest durability guarantee
/// - Lowest throughput (~100/sec)
/// - Use for: Financial transactions, critical data
Synchronous,
/// Group Commit: fsync batched, client must wait_for_lsn()
/// - Balanced durability and performance
/// - High throughput (~10,000/sec)
/// - Use for: Most OLTP workloads
GroupCommit,
/// Async: Fire-and-forget, no fsync guarantee
/// - No durability guarantee (data may be lost on crash)
/// - Highest throughput (~1M/sec, memory-bound)
/// - Use for: Caching, telemetry, non-critical logs
Async,
}

5.2 Configuration API

impl GroupCommitWal {
/// Set durability mode
pub fn set_durability_mode(&mut self, mode: DurabilityMode) {
self.config.durability_mode = mode;
}
/// Append with explicit durability mode
pub async fn append_with_mode(
&self,
entry: WalEntry,
mode: DurabilityMode,
) -> io::Result<Lsn> {
let lsn = self.append(entry)?;
match mode {
DurabilityMode::Synchronous => {
self.wait_for_lsn(lsn).await?;
}
DurabilityMode::GroupCommit => {
// Client decides when to wait
}
DurabilityMode::Async => {
// Fire-and-forget
}
}
Ok(lsn)
}
}

5.3 Transaction Manager Integration

pub struct TransactionManager {
wal: Arc<GroupCommitWal>,
durability_mode: DurabilityMode,
}
impl TransactionManager {
pub async fn commit(&self, txn: &Transaction) -> Result<(), Error> {
// Write commit record to WAL
let entry = WalEntry {
txn_id: txn.id,
entry_type: WalEntryType::Commit,
data: serialize_transaction(txn),
timestamp: current_timestamp(),
checksum: 0, // Computed by WAL
};
let lsn = self.wal.append(entry)?;
// Wait for durability based on mode
match self.durability_mode {
DurabilityMode::Synchronous | DurabilityMode::GroupCommit => {
self.wal.wait_for_lsn(lsn).await?;
}
DurabilityMode::Async => {
// Don't wait
}
}
Ok(())
}
}

6. Integration with Transaction Manager

6.1 Commit Protocol Changes

Before (Synchronous WAL):

pub async fn commit_transaction(txn: Transaction) -> Result<()> {
// 1. Write to WAL (blocks on fsync)
wal.append(txn.commit_record()).await?; // 10ms
// 2. Mark transaction as committed
txn.set_state(TxnState::Committed);
// 3. Release locks
txn.release_locks();
Ok(())
}

After (Group Commit WAL):

pub async fn commit_transaction(txn: Transaction) -> Result<()> {
// 1. Write to WAL (returns immediately)
let lsn = wal.append(txn.commit_record())?; // 1μs
// 2. Wait for durability (batched with other commits)
wal.wait_for_lsn(lsn).await?; // 0-10ms (avg 5ms)
// 3. Mark transaction as committed
txn.set_state(TxnState::Committed);
// 4. Release locks
txn.release_locks();
Ok(())
}

Key Change: Append and wait are separate, allowing batching

6.2 Two-Phase Commit (2PC) Integration

2PC Protocol:

Coordinator Participant
│ │
├─────── PREPARE ────────────────>│
│ │ (1) Write PREPARE to WAL
│ │ (2) Wait for flush
│<────── PREPARED ─────────────────┤
│ │
├─────── COMMIT ─────────────────>│
│ │ (3) Write COMMIT to WAL
│ │ (4) Wait for flush
│<────── ACK ──────────────────────┤

Group Commit Optimization:

pub async fn two_phase_commit(txn: &DistributedTransaction) -> Result<()> {
// Phase 1: PREPARE
let prepare_lsns: Vec<Lsn> = participants
.iter()
.map(|p| p.prepare(txn)) // Returns immediately
.collect();
// Wait for all PREPARE flushes (batched together!)
for lsn in prepare_lsns {
wal.wait_for_lsn(lsn).await?;
}
// Phase 2: COMMIT
let commit_lsns: Vec<Lsn> = participants
.iter()
.map(|p| p.commit(txn)) // Returns immediately
.collect();
// Wait for all COMMIT flushes (batched together!)
for lsn in commit_lsns {
wal.wait_for_lsn(lsn).await?;
}
Ok(())
}

Benefit: Multiple participants’ WAL entries batched in single flush

6.3 Integration Points

1. Transaction Coordinator:

pub struct TxnCoordinator {
wal: Arc<GroupCommitWal>,
active_txns: HashMap<TxnId, Transaction>,
}
impl TxnCoordinator {
pub async fn begin_txn(&self) -> Result<TxnId> {
let txn_id = self.next_txn_id();
let entry = WalEntry::begin(txn_id);
let lsn = self.wal.append(entry)?;
// Don't wait for BEGIN (optimization)
// Wait only on COMMIT for durability
Ok(txn_id)
}
pub async fn commit_txn(&self, txn_id: TxnId) -> Result<()> {
let entry = WalEntry::commit(txn_id);
let lsn = self.wal.append(entry)?;
// MUST wait for COMMIT (durability critical)
self.wal.wait_for_lsn(lsn).await?;
Ok(())
}
}

2. MVCC Integration:

pub struct MvccManager {
wal: Arc<GroupCommitWal>,
version_store: VersionStore,
}
impl MvccManager {
pub async fn create_version(&self, key: &[u8], value: &[u8], txn_id: TxnId) -> Result<()> {
// Write to WAL
let entry = WalEntry::data(txn_id, key, value);
let lsn = self.wal.append(entry)?;
// Create version in memory
self.version_store.insert(key, value, txn_id, lsn);
// Don't wait for flush (optimization)
// Commit will wait, ensuring durability
Ok(())
}
}

3. Checkpoint Integration:

pub struct CheckpointManager {
wal: Arc<GroupCommitWal>,
}
impl CheckpointManager {
pub async fn create_checkpoint(&self) -> Result<Lsn> {
// Flush all pending WAL entries
self.wal.force_flush().await?;
let checkpoint_lsn = self.wal.last_flushed_lsn();
// Write checkpoint marker
let entry = WalEntry::checkpoint(checkpoint_lsn);
let lsn = self.wal.append(entry)?;
self.wal.wait_for_lsn(lsn).await?;
Ok(checkpoint_lsn)
}
}

7. Testing Strategy

7.1 Unit Tests

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lsn_monotonicity() {
let wal = GroupCommitWal::new("/tmp/test_wal", Default::default()).unwrap();
let lsn1 = wal.append(test_entry(1)).unwrap();
let lsn2 = wal.append(test_entry(2)).unwrap();
let lsn3 = wal.append(test_entry(3)).unwrap();
assert!(lsn1 < lsn2);
assert!(lsn2 < lsn3);
}
#[test]
fn test_batch_flushing() {
let config = GroupCommitConfig {
max_batch_size: 10,
max_flush_interval_ms: 100,
..Default::default()
};
let wal = GroupCommitWal::new("/tmp/test_wal", config).unwrap();
// Write 10 entries (should trigger size-based flush)
for i in 0..10 {
wal.append(test_entry(i)).unwrap();
}
// Wait for flush
std::thread::sleep(Duration::from_millis(50));
let metrics = wal.metrics();
assert_eq!(metrics.total_flushes, 1);
assert_eq!(metrics.total_appends, 10);
}
#[tokio::test]
async fn test_wait_for_lsn() {
let wal = GroupCommitWal::new("/tmp/test_wal", Default::default()).unwrap();
let lsn = wal.append(test_entry(1)).unwrap();
// Should complete within flush interval
let result = tokio::time::timeout(
Duration::from_millis(50),
wal.wait_for_lsn(lsn),
).await;
assert!(result.is_ok());
}
}

7.2 Integration Tests

#[tokio::test]
async fn test_concurrent_commits() {
let wal = Arc::new(GroupCommitWal::new("/tmp/test_wal", Default::default()).unwrap());
let mut handles = vec![];
// Spawn 100 concurrent writers
for i in 0..100 {
let wal = Arc::clone(&wal);
let handle = tokio::spawn(async move {
for j in 0..100 {
let lsn = wal.append(test_entry(i * 100 + j)).unwrap();
wal.wait_for_lsn(lsn).await.unwrap();
}
});
handles.push(handle);
}
// Wait for all writers
for handle in handles {
handle.await.unwrap();
}
// Verify all 10,000 entries written
let metrics = wal.metrics();
assert_eq!(metrics.total_appends, 10_000);
}
#[test]
fn test_recovery_from_corruption() {
let path = "/tmp/test_wal_recovery";
// Write some entries
{
let wal = GroupCommitWal::new(path, Default::default()).unwrap();
for i in 0..10 {
wal.append(test_entry(i)).unwrap();
}
wal.shutdown().unwrap();
}
// Corrupt the file (truncate mid-entry)
{
let mut file = std::fs::OpenOptions::new()
.write(true)
.open(path)
.unwrap();
file.set_len(500).unwrap(); // Arbitrary mid-point
}
// Recover
let result = GroupCommitWal::recover(path).unwrap();
// Should recover valid entries only
assert!(result.entries.len() < 10);
assert!(result.corrupted_at.is_some());
}

7.3 Chaos Tests

#[tokio::test]
async fn test_crash_during_flush() {
let wal = Arc::new(GroupCommitWal::new("/tmp/test_wal_chaos", Default::default()).unwrap());
// Start writing
let wal_clone = Arc::clone(&wal);
let writer = tokio::spawn(async move {
for i in 0..1000 {
wal_clone.append(test_entry(i)).unwrap();
tokio::time::sleep(Duration::from_micros(100)).await;
}
});
// Simulate crash after random delay
tokio::time::sleep(Duration::from_millis(rand::random::<u64>() % 100)).await;
std::process::abort(); // Simulate hard crash
// In separate test run:
// - Recover from WAL
// - Verify all flushed entries are present
// - Verify no corrupted entries
}
#[test]
fn test_fsync_failure_handling() {
// Mock filesystem that fails fsync randomly
let wal = GroupCommitWal::new("/tmp/test_wal_fsync_fail", Default::default()).unwrap();
for i in 0..100 {
match wal.append(test_entry(i)) {
Ok(lsn) => {
match wal.wait_for_lsn(lsn).await {
Ok(_) => {
// Entry is durable
}
Err(e) => {
// Fsync failed, entry NOT durable
// Transaction should be aborted
}
}
}
Err(e) => {
// Write failed
}
}
}
}

7.4 Performance Tests

#[bench]
fn bench_throughput_no_wait(b: &mut Bencher) {
let wal = GroupCommitWal::new("/tmp/bench_wal", Default::default()).unwrap();
b.iter(|| {
// Measure append-only throughput
wal.append(test_entry(1)).unwrap();
});
}
#[bench]
fn bench_throughput_with_wait(b: &mut Bencher) {
let wal = GroupCommitWal::new("/tmp/bench_wal", Default::default()).unwrap();
b.iter(|| {
// Measure end-to-end commit throughput
let lsn = wal.append(test_entry(1)).unwrap();
wal.wait_for_lsn(lsn).await.unwrap();
});
}
#[bench]
fn bench_latency_percentiles(b: &mut Bencher) {
let wal = GroupCommitWal::new("/tmp/bench_wal", Default::default()).unwrap();
let mut latencies = vec![];
for _ in 0..10_000 {
let start = Instant::now();
let lsn = wal.append(test_entry(1)).unwrap();
wal.wait_for_lsn(lsn).await.unwrap();
latencies.push(start.elapsed());
}
latencies.sort();
println!("P50: {:?}", latencies[5000]);
println!("P99: {:?}", latencies[9900]);
println!("P99.9: {:?}", latencies[9990]);
}

7.5 Test Coverage Matrix

CategoryTest CaseSuccess Criteria
CorrectnessLSN monotonicityLSNs always increase
Durability guaranteeCommitted data survives crash
Batch atomicityAll or none in batch
Recovery correctnessAll flushed entries recovered
PerformanceThroughput>10,000 commits/sec
Latency P99<20ms
Fsync reduction>90% fewer fsyncs
ConcurrencyConcurrent writesNo data races
Lock-free appendNo contention
Waiter notificationAll waiters notified
FailureCrash during flushTruncate to valid boundary
Fsync failureBatch rollback
Disk fullGraceful error
IntegrationTransaction commitACID guarantees
2PC protocolDistributed correctness
MVCC versioningCorrect version ordering

8. Implementation Plan

Phase 1: Basic Group Commit (1.5 days)

Day 1 Morning:

  • Implement core data structures (Lsn, WalEntry, GroupCommitWal)
  • Implement atomic LSN assignment
  • Implement lock-free pending queue

Day 1 Afternoon:

  • Implement flush thread logic
  • Implement time-based + size-based batching
  • Implement WAL entry serialization/deserialization

Day 2 Morning:

  • Implement waiter notification mechanism
  • Implement wait_for_lsn() API
  • Implement durability modes

Day 2 Afternoon:

  • Unit tests for batching logic
  • Unit tests for LSN assignment
  • Initial benchmarking

Phase 2: Recovery Protocol (0.5 days)

Day 3 Morning:

  • Implement WAL recovery algorithm
  • Implement checksum validation
  • Implement file truncation on corruption

Day 3 Afternoon:

  • Recovery integration tests
  • Corruption handling tests

Phase 3: Integration Testing (0.5 days)

Day 4 Morning:

  • Integration with transaction manager
  • Integration with MVCC
  • End-to-end transaction tests

Day 4 Afternoon:

  • Concurrent commit tests
  • 2PC integration tests
  • Chaos testing

Phase 4: Tuning (0.5 days)

Day 5 Morning:

  • Benchmark different flush intervals
  • Benchmark different batch sizes
  • Create tuning matrix

Day 5 Afternoon:

  • Document optimal parameters
  • Create configuration guide
  • Performance regression tests

Total Timeline: 3 days


9. Configuration Tuning Guide

9.1 Workload-Based Tuning

OLTP (Low Latency):

[wal.group_commit]
max_flush_interval_ms = 5
max_batch_size = 50
durability_mode = "group_commit"
adaptive_tuning = true

Analytics (High Throughput):

[wal.group_commit]
max_flush_interval_ms = 20
max_batch_size = 500
durability_mode = "group_commit"
adaptive_tuning = false

Mixed Workload:

[wal.group_commit]
max_flush_interval_ms = 10
max_batch_size = 100
durability_mode = "group_commit"
adaptive_tuning = true

Batch Processing:

[wal.group_commit]
max_flush_interval_ms = 50
max_batch_size = 1000
durability_mode = "async" # Non-critical data
adaptive_tuning = false

9.2 Hardware-Based Tuning

HDD (High fsync latency):

[wal.group_commit]
max_flush_interval_ms = 20 # Maximize batching
max_batch_size = 500

SATA SSD (Medium latency):

[wal.group_commit]
max_flush_interval_ms = 10
max_batch_size = 100

NVMe SSD (Low latency):

[wal.group_commit]
max_flush_interval_ms = 5 # Lower latency priority
max_batch_size = 50

9.3 Monitoring and Alerts

Key Metrics:

pub struct WalMetrics {
// Throughput
pub commits_per_sec: f64,
pub bytes_per_sec: f64,
// Latency
pub commit_latency_p50: Duration,
pub commit_latency_p99: Duration,
pub commit_latency_p999: Duration,
// Batching efficiency
pub avg_batch_size: f64,
pub flush_per_sec: f64,
pub fsync_reduction_pct: f64,
// Queue health
pub pending_queue_depth: usize,
pub max_queue_depth: usize,
}

Alerts:

  • High latency: P99 > 50ms → Increase flush frequency or check disk
  • Small batches: Avg batch size < 10 → Increase flush interval
  • Queue buildup: Pending queue > 1000 → Disk bottleneck, scale storage
  • Low throughput: Commits/sec < 1000 → Check batching configuration

10. Risk Analysis and Mitigation

10.1 Risks

RiskLikelihoodImpactMitigation
Data loss on crashLowCriticalChecksum validation, recovery protocol
Latency spikeMediumHighAdaptive tuning, configurable intervals
Queue buildupMediumMediumBack-pressure, monitoring
Flush thread failureLowCriticalWatchdog, automatic restart
Disk fullMediumCriticalGraceful degradation, alerts

10.2 Failure Modes

1. Flush Thread Crash:

  • Detection: Watchdog monitors thread health
  • Recovery: Restart flush thread, replay pending queue
  • Prevention: Robust error handling, panic guards

2. Disk Corruption:

  • Detection: Checksum validation on recovery
  • Recovery: Truncate to last valid entry
  • Prevention: Hardware RAID, checksums

3. Queue Overflow:

  • Detection: Monitor queue depth
  • Recovery: Apply back-pressure to writers
  • Prevention: Bounded queue, flow control

10.3 Correctness Invariants

Invariant 1: LSN Monotonicity

∀ entry_i, entry_j: LSN(entry_i) < LSN(entry_j) ⇒ entry_i submitted before entry_j

Invariant 2: Flush Atomicity

∀ batch: (All entries flushed) ∨ (No entries flushed)

Invariant 3: Durability Guarantee

∀ lsn: last_flushed_lsn ≥ lsn ⇒ entry(lsn) is durable

Invariant 4: Recovery Correctness

∀ recovered_entries: ∀ entry ∈ recovered_entries ⇒ checksum(entry) is valid

11. Future Enhancements

11.1 Adaptive Flushing (Phase 2)

Dynamically adjust flush interval based on workload:

  • Monitor batch sizes
  • Adjust interval to maintain target batch size
  • React to latency SLA violations

11.2 Parallel Flush Threads (Phase 3)

For very high write rates (>100K/sec):

  • Multiple flush threads
  • Partition entries by hash
  • Independent flush queues

11.3 Remote WAL (Phase 4)

Replicate WAL to remote storage:

  • S3/Object storage backup
  • Cross-region replication
  • Disaster recovery

11.4 WAL Compression (Phase 5)

Compress WAL entries before write:

  • LZ4 compression for speed
  • Reduce I/O bandwidth
  • Trade CPU for I/O

12. Conclusion

This Group Commit WAL design provides:

Performance:

  • 5-10x throughput improvement
  • 90-95% fsync reduction
  • Predictable latency (P99 < 20ms)

Correctness:

  • Full ACID guarantees
  • Atomic batch commits
  • Robust recovery protocol

Flexibility:

  • Multiple durability modes
  • Tunable for different workloads
  • Easy integration with existing transaction manager

Production-Ready:

  • Comprehensive testing strategy
  • Clear monitoring and alerting
  • Risk mitigation plans

Next Steps:

  1. Review and approve architecture
  2. Begin Phase 1 implementation
  3. Benchmark on target hardware
  4. Integrate with transaction manager
  5. Deploy to staging environment

Appendix A: References

  1. PostgreSQL Group Commit: https://www.postgresql.org/docs/current/wal-async-commit.html
  2. MySQL Group Commit: https://dev.mysql.com/doc/refman/8.0/en/innodb-group-commit.html
  3. “The Design and Implementation of Modern Column-Oriented Database Systems” (Abadi et al.)
  4. “Transactional Information Systems” (Weikum & Vossen) - Chapter on WAL

Appendix B: Glossary

  • LSN: Logical Sequence Number - monotonically increasing identifier for WAL entries
  • Fsync: File system synchronization - forces buffered writes to persistent storage
  • Group Commit: Batching multiple transaction commits into a single fsync
  • Durability: ACID property - committed data survives system failures
  • Two-Phase Commit (2PC): Distributed transaction protocol
  • MVCC: Multi-Version Concurrency Control
  • WAL: Write-Ahead Log - transaction log written before data modifications

Document Status: Ready for Implementation Approval Required: Architecture Team, Performance Team Implementation Start: Upon approval