Skip to content

Group Commit WAL Implementation Roadmap

Group Commit WAL Implementation Roadmap

Document Version: 1.0 Date: 2025-11-10 Status: Implementation Guide Related: GROUP_COMMIT_WAL_ARCHITECTURE.md

Overview

This document provides a detailed implementation roadmap for the Group Commit WAL system, breaking down the 3-day implementation into actionable tasks with clear deliverables and acceptance criteria.


Implementation Timeline

Day 1: Core Implementation
├── Morning (4h): Data Structures & LSN Management
└── Afternoon (4h): Flush Thread & Batching
Day 2: Waiter Notification & Durability
├── Morning (4h): Notification System
└── Afternoon (4h): Durability Modes & Testing
Day 3: Recovery & Integration
├── Morning (4h): Recovery Protocol
└── Afternoon (4h): Integration Testing
Day 4: Advanced Integration
├── Morning (4h): Transaction Manager Integration
└── Afternoon (4h): MVCC Integration & 2PC
Day 5: Tuning & Validation
├── Morning (4h): Performance Benchmarking
└── Afternoon (4h): Documentation & Handoff
Total: 3-5 days (depending on integration scope)

Phase 1: Basic Group Commit (1.5 days)

Day 1 Morning: Core Data Structures (4 hours)

Task 1.1: Define Core Types (1 hour)

File: heliosdb-storage/src/wal/group_commit/types.rs

// Core types to implement:
// - Lsn
// - WalEntry
// - WalEntryType
// - DurabilityMode
// - GroupCommitConfig

Acceptance Criteria:

  • All types compile without errors
  • Serialization/deserialization tests pass
  • Type conversions are safe and tested

Tests:

#[test]
fn test_lsn_ordering() {
let lsn1 = Lsn::new(1);
let lsn2 = Lsn::new(2);
assert!(lsn1 < lsn2);
}
#[test]
fn test_wal_entry_serialization() {
let entry = WalEntry::new(...);
let bytes = serialize(&entry);
let deserialized = deserialize(&bytes);
assert_eq!(entry, deserialized);
}

Task 1.2: Implement Atomic LSN Counter (1 hour)

File: heliosdb-storage/src/wal/group_commit/lsn_counter.rs

pub struct LsnCounter {
counter: AtomicU64,
}
impl LsnCounter {
pub fn new(initial: u64) -> Self;
pub fn next(&self) -> Lsn;
pub fn current(&self) -> Lsn;
}

Acceptance Criteria:

  • Thread-safe LSN assignment
  • Monotonicity guaranteed
  • No LSN collisions under concurrent load

Tests:

#[test]
fn test_concurrent_lsn_assignment() {
let counter = Arc::new(LsnCounter::new(0));
let mut handles = vec![];
for _ in 0..10 {
let c = Arc::clone(&counter);
handles.push(thread::spawn(move || {
(0..1000).map(|_| c.next()).collect::<Vec<_>>()
}));
}
let all_lsns: Vec<Lsn> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
// All LSNs should be unique
let unique: HashSet<_> = all_lsns.iter().collect();
assert_eq!(unique.len(), 10_000);
}

Task 1.3: Implement Lock-Free Pending Queue (2 hours)

File: heliosdb-storage/src/wal/group_commit/pending_queue.rs

pub struct PendingEntry {
pub lsn: Lsn,
pub entry: WalEntry,
pub notify: Arc<Notify>,
pub result: Arc<RwLock<Option<io::Result<()>>>>,
}
pub struct PendingQueue {
queue: SegQueue<PendingEntry>,
metrics: Arc<QueueMetrics>,
}
impl PendingQueue {
pub fn new() -> Self;
pub fn push(&self, entry: PendingEntry);
pub fn pop(&self) -> Option<PendingEntry>;
pub fn len(&self) -> usize;
}

Acceptance Criteria:

  • Lock-free push/pop operations
  • No data races under concurrent access
  • Queue metrics collected correctly

Tests:

#[test]
fn test_queue_concurrent_push_pop() {
let queue = Arc::new(PendingQueue::new());
// Spawn producers
let producers: Vec<_> = (0..10)
.map(|_| {
let q = Arc::clone(&queue);
thread::spawn(move || {
for i in 0..100 {
q.push(create_test_entry(i));
}
})
})
.collect();
// Spawn consumers
let consumers: Vec<_> = (0..5)
.map(|_| {
let q = Arc::clone(&queue);
thread::spawn(move || {
let mut count = 0;
while count < 200 {
if q.pop().is_some() {
count += 1;
}
}
count
})
})
.collect();
// Wait for completion
for p in producers {
p.join().unwrap();
}
let total: usize = consumers
.into_iter()
.map(|c| c.join().unwrap())
.sum();
assert_eq!(total, 1000);
}

Day 1 Afternoon: Flush Thread & Batching (4 hours)

Task 1.4: Implement Flush Thread (2 hours)

File: heliosdb-storage/src/wal/group_commit/flush_thread.rs

pub struct FlushThread {
config: GroupCommitConfig,
file: Arc<Mutex<BufWriter<File>>>,
pending_queue: Arc<PendingQueue>,
last_flushed_lsn: Arc<AtomicU64>,
shutdown: Arc<AtomicBool>,
metrics: Arc<WalMetrics>,
}
impl FlushThread {
pub fn spawn(config: GroupCommitConfig, ...) -> JoinHandle<()>;
fn run(&self);
fn collect_batch(&self, max_size: usize, timeout: Duration) -> Vec<PendingEntry>;
fn flush_batch(&self, batch: Vec<PendingEntry>);
}

Acceptance Criteria:

  • Thread spawns and runs correctly
  • Graceful shutdown on signal
  • Metrics collected accurately

Tests:

#[test]
fn test_flush_thread_lifecycle() {
let (tx, rx) = channel();
let shutdown = Arc::new(AtomicBool::new(false));
let thread = FlushThread::spawn(...);
// Verify thread is running
thread::sleep(Duration::from_millis(100));
assert!(thread.is_running());
// Signal shutdown
shutdown.store(true, Ordering::Release);
// Thread should exit within 1 second
let result = thread.join_timeout(Duration::from_secs(1));
assert!(result.is_ok());
}

Task 1.5: Implement Batching Logic (2 hours)

File: heliosdb-storage/src/wal/group_commit/batching.rs

pub struct BatchCollector {
config: GroupCommitConfig,
pending_queue: Arc<PendingQueue>,
}
impl BatchCollector {
pub fn collect_batch(&self) -> Vec<PendingEntry> {
let mut batch = Vec::new();
let deadline = Instant::now() + self.config.flush_interval();
loop {
// Time-based: flush if deadline reached
if Instant::now() >= deadline && !batch.is_empty() {
break;
}
// Size-based: flush if batch full
if batch.len() >= self.config.max_batch_size {
break;
}
// Try to collect more entries
match self.pending_queue.pop() {
Some(entry) => batch.push(entry),
None => {
if !batch.is_empty() && Instant::now() >= deadline {
break;
}
thread::sleep(Duration::from_micros(100));
}
}
}
batch
}
}

Acceptance Criteria:

  • Time-based flushing works correctly
  • Size-based flushing works correctly
  • Hybrid strategy respects both limits

Tests:

#[test]
fn test_time_based_flush() {
let config = GroupCommitConfig {
max_flush_interval_ms: 10,
max_batch_size: 1000,
..Default::default()
};
let collector = BatchCollector::new(config, queue);
// Add 10 entries
for i in 0..10 {
queue.push(create_test_entry(i));
}
// Should flush after 10ms even though batch not full
let start = Instant::now();
let batch = collector.collect_batch();
let elapsed = start.elapsed();
assert_eq!(batch.len(), 10);
assert!(elapsed >= Duration::from_millis(10));
assert!(elapsed < Duration::from_millis(20));
}
#[test]
fn test_size_based_flush() {
let config = GroupCommitConfig {
max_flush_interval_ms: 1000,
max_batch_size: 100,
..Default::default()
};
let collector = BatchCollector::new(config, queue);
// Add 100 entries rapidly
for i in 0..100 {
queue.push(create_test_entry(i));
}
// Should flush immediately when batch reaches 100
let start = Instant::now();
let batch = collector.collect_batch();
let elapsed = start.elapsed();
assert_eq!(batch.len(), 100);
assert!(elapsed < Duration::from_millis(100)); // Much less than timeout
}

Day 2 Morning: Waiter Notification System (4 hours)

Task 2.1: Implement Waiter Registry (2 hours)

File: heliosdb-storage/src/wal/group_commit/waiters.rs

pub struct WaiterRegistry {
waiters: RwLock<HashMap<Lsn, Vec<Waiter>>>,
}
pub struct Waiter {
lsn: Lsn,
notify: Arc<Notify>,
result: Arc<RwLock<Option<io::Result<()>>>>,
}
impl WaiterRegistry {
pub fn new() -> Self;
pub fn register(&self, lsn: Lsn) -> Waiter;
pub fn notify(&self, lsn: Lsn, result: io::Result<()>) {
let mut waiters = self.waiters.write().unwrap();
// Notify all waiters for LSNs <= lsn
let to_notify: Vec<_> = waiters
.iter()
.filter(|(k, _)| **k <= lsn)
.flat_map(|(_, v)| v.clone())
.collect();
for waiter in to_notify {
*waiter.result.write().unwrap() = Some(result.clone());
waiter.notify.notify_waiters();
}
// Remove notified waiters
waiters.retain(|k, _| *k > lsn);
}
pub async fn wait(&self, waiter: Waiter) -> io::Result<()> {
waiter.notify.notified().await;
waiter.result.read().unwrap()
.clone()
.expect("Result should be set after notification")
}
}

Acceptance Criteria:

  • Waiters registered correctly
  • Notifications sent to correct waiters
  • Waiters removed after notification
  • No memory leaks

Tests:

#[tokio::test]
async fn test_waiter_notification() {
let registry = Arc::new(WaiterRegistry::new());
let waiter1 = registry.register(Lsn::new(1));
let waiter2 = registry.register(Lsn::new(2));
let waiter3 = registry.register(Lsn::new(3));
// Spawn waiters
let r = Arc::clone(&registry);
let h1 = tokio::spawn(async move { r.wait(waiter1).await });
let r = Arc::clone(&registry);
let h2 = tokio::spawn(async move { r.wait(waiter2).await });
let r = Arc::clone(&registry);
let h3 = tokio::spawn(async move { r.wait(waiter3).await });
// Notify up to LSN 2
registry.notify(Lsn::new(2), Ok(()));
// Wait for results
assert!(h1.await.unwrap().is_ok());
assert!(h2.await.unwrap().is_ok());
// h3 should still be waiting
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(!h3.is_finished());
// Notify LSN 3
registry.notify(Lsn::new(3), Ok(()));
assert!(h3.await.unwrap().is_ok());
}

Task 2.2: Integrate Waiters with Flush Thread (2 hours)

File: heliosdb-storage/src/wal/group_commit/flush_thread.rs

impl FlushThread {
fn flush_batch(&self, batch: Vec<PendingEntry>) {
let start = Instant::now();
let mut file = self.file.lock().unwrap();
let mut max_lsn = Lsn::new(0);
let mut total_bytes = 0;
// Write all entries
for pending in &batch {
match self.write_entry(&mut *file, pending.lsn, &pending.entry) {
Ok(bytes) => {
total_bytes += bytes;
max_lsn = max_lsn.max(pending.lsn);
}
Err(e) => {
eprintln!("Write error: {}", e);
}
}
}
// Single fsync
let flush_result = file.flush()
.and_then(|_| file.get_mut().sync_all());
match flush_result {
Ok(_) => {
// Update last flushed LSN
self.last_flushed_lsn.store(max_lsn.value(), Ordering::Release);
// Notify all waiters
for pending in batch {
*pending.result.write().unwrap() = Some(Ok(()));
pending.notify.notify_waiters();
}
// Update metrics
self.metrics.total_flushes.fetch_add(1, Ordering::Relaxed);
self.metrics.total_bytes_written.fetch_add(total_bytes as u64, Ordering::Relaxed);
}
Err(e) => {
eprintln!("Fsync error: {}", e);
// Notify all waiters of failure
for pending in batch {
*pending.result.write().unwrap() = Some(Err(
io::Error::new(io::ErrorKind::Other, "Fsync failed")
));
pending.notify.notify_waiters();
}
}
}
let latency = start.elapsed().as_micros() as u64;
self.metrics.flush_latencies.write().unwrap().push(latency);
}
}

Acceptance Criteria:

  • Successful flush notifies all waiters with Ok
  • Failed flush notifies all waiters with Err
  • Notification happens atomically after fsync

Day 2 Afternoon: Durability Modes & Testing (4 hours)

Task 2.3: Implement Durability Modes (2 hours)

File: heliosdb-storage/src/wal/group_commit/mod.rs

impl GroupCommitWal {
pub fn append(&self, entry: WalEntry) -> io::Result<Lsn> {
let lsn = self.lsn_counter.next();
let notify = Arc::new(Notify::new());
let result = Arc::new(RwLock::new(None));
let pending = PendingEntry {
lsn,
entry,
notify: Arc::clone(&notify),
result: Arc::clone(&result),
};
self.pending_queue.push(pending);
self.metrics.total_appends.fetch_add(1, Ordering::Relaxed);
Ok(lsn)
}
pub async fn append_with_mode(
&self,
entry: WalEntry,
mode: DurabilityMode,
) -> io::Result<Lsn> {
let lsn = self.append(entry)?;
match mode {
DurabilityMode::Synchronous | DurabilityMode::GroupCommit => {
self.wait_for_lsn(lsn).await?;
}
DurabilityMode::Async => {
// Fire-and-forget
}
}
Ok(lsn)
}
pub async fn wait_for_lsn(&self, lsn: Lsn) -> io::Result<()> {
// Fast path: already flushed
if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() {
return Ok(());
}
// Slow path: wait for flush
// Note: This is simplified - real implementation uses waiter registry
let timeout = Duration::from_millis(self.config.max_flush_interval_ms * 2);
tokio::time::timeout(timeout, async {
loop {
if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() {
return Ok(());
}
tokio::time::sleep(Duration::from_micros(100)).await;
}
})
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "Wait for LSN timed out"))?
}
}

Acceptance Criteria:

  • Synchronous mode waits for flush
  • GroupCommit mode allows explicit waiting
  • Async mode returns immediately
  • Correct mode behavior under load

Tests:

#[tokio::test]
async fn test_synchronous_mode() {
let wal = GroupCommitWal::new("/tmp/test", Default::default()).unwrap();
let start = Instant::now();
let lsn = wal.append_with_mode(
test_entry(),
DurabilityMode::Synchronous
).await.unwrap();
// Should wait for flush before returning
assert!(start.elapsed() >= Duration::from_millis(5));
// Entry should be durable immediately
assert!(wal.last_flushed_lsn() >= lsn);
}
#[tokio::test]
async fn test_async_mode() {
let wal = GroupCommitWal::new("/tmp/test", Default::default()).unwrap();
let start = Instant::now();
let lsn = wal.append_with_mode(
test_entry(),
DurabilityMode::Async
).await.unwrap();
// Should return immediately
assert!(start.elapsed() < Duration::from_millis(1));
// Entry may not be durable yet
// (this is expected for async mode)
}

Task 2.4: Unit Testing Suite (2 hours)

File: heliosdb-storage/src/wal/group_commit/tests.rs

Implement comprehensive unit tests:

  • LSN assignment monotonicity
  • Concurrent append correctness
  • Batch collection logic
  • Flush thread lifecycle
  • Durability mode behavior
  • Metrics accuracy

Phase 2: Recovery Protocol (0.5 days)

Day 3 Morning: Recovery Implementation (4 hours)

Task 3.1: WAL File Format & Serialization (2 hours)

File: heliosdb-storage/src/wal/group_commit/format.rs

// WAL Entry Format:
// | LSN (8) | Type (1) | TxnID (8) | Timestamp (8) | DataLen (4) | Data (N) | CRC32 (4) |
pub struct WalEncoder;
impl WalEncoder {
pub fn encode(lsn: Lsn, entry: &WalEntry) -> Vec<u8> {
let mut buf = Vec::new();
// LSN
buf.extend_from_slice(&lsn.value().to_le_bytes());
// Entry type
buf.push(entry.entry_type as u8);
// Transaction ID
buf.extend_from_slice(&entry.txn_id.to_le_bytes());
// Timestamp
buf.extend_from_slice(&entry.timestamp.to_le_bytes());
// Data length
buf.extend_from_slice(&(entry.data.len() as u32).to_le_bytes());
// Data
buf.extend_from_slice(&entry.data);
// Checksum (CRC32 of all preceding bytes)
let checksum = crc32(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
buf
}
pub fn decode(reader: &mut impl Read) -> io::Result<(Lsn, WalEntry)> {
// Read and parse entry
// Validate checksum
// Return entry or error
}
}

Acceptance Criteria:

  • Encode/decode round-trip works
  • Checksum validation detects corruption
  • Handles partial writes correctly

Task 3.2: Recovery Algorithm (2 hours)

File: heliosdb-storage/src/wal/group_commit/recovery.rs

pub struct RecoveryManager;
impl RecoveryManager {
pub fn recover(path: &Path) -> io::Result<RecoveryResult> {
let mut entries = Vec::new();
let mut last_valid_offset = 0u64;
let mut corrupted_at = None;
let file = File::open(path)?;
let mut reader = BufReader::new(file);
loop {
let offset = reader.stream_position()?;
match WalEncoder::decode(&mut reader) {
Ok((lsn, entry)) => {
entries.push((lsn, entry));
last_valid_offset = reader.stream_position()?;
}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
break; // Normal EOF
}
Err(_) => {
corrupted_at = Some(offset);
break; // Corruption detected
}
}
}
// Truncate if corruption detected
if corrupted_at.is_some() {
let file = OpenOptions::new().write(true).open(path)?;
file.set_len(last_valid_offset)?;
}
Ok(RecoveryResult {
entries,
last_valid_offset,
corrupted_at,
})
}
}

Acceptance Criteria:

  • Recovers all valid entries
  • Truncates corrupted tail
  • Handles empty WAL
  • Handles completely corrupted WAL

Tests:

#[test]
fn test_recovery_normal() {
// Write 100 entries
let wal = create_test_wal();
for i in 0..100 {
wal.append(test_entry(i)).unwrap();
}
wal.shutdown().unwrap();
// Recover
let result = RecoveryManager::recover(&wal.path()).unwrap();
assert_eq!(result.entries.len(), 100);
assert!(result.corrupted_at.is_none());
}
#[test]
fn test_recovery_with_corruption() {
// Write 100 entries
let wal = create_test_wal();
for i in 0..100 {
wal.append(test_entry(i)).unwrap();
}
wal.shutdown().unwrap();
// Corrupt the file
let mut file = OpenOptions::new()
.write(true)
.open(wal.path())
.unwrap();
file.seek(SeekFrom::End(-100)).unwrap();
file.write_all(&[0xFF; 100]).unwrap();
// Recover
let result = RecoveryManager::recover(&wal.path()).unwrap();
// Should recover most entries
assert!(result.entries.len() >= 90);
assert!(result.corrupted_at.is_some());
}

Day 3 Afternoon: Integration Testing (4 hours)

Task 3.3: End-to-End Integration Tests (4 hours)

File: heliosdb-storage/tests/group_commit_integration_tests.rs

#[tokio::test]
async fn test_concurrent_commits_durability() {
let wal = Arc::new(GroupCommitWal::new("/tmp/test", Default::default()).unwrap());
let mut handles = vec![];
// Spawn 100 concurrent writers
for i in 0..100 {
let wal = Arc::clone(&wal);
handles.push(tokio::spawn(async move {
for j in 0..100 {
let entry = test_entry(i * 100 + j);
let lsn = wal.append(entry).unwrap();
wal.wait_for_lsn(lsn).await.unwrap();
}
}));
}
// Wait for all
for h in handles {
h.await.unwrap();
}
// Verify all written
assert_eq!(wal.metrics().total_appends, 10_000);
// Shutdown and recover
let path = wal.path().to_owned();
drop(wal);
let result = RecoveryManager::recover(&path).unwrap();
assert_eq!(result.entries.len(), 10_000);
}
#[tokio::test]
async fn test_crash_recovery() {
// Write some entries
{
let wal = GroupCommitWal::new("/tmp/test_crash", Default::default()).unwrap();
for i in 0..1000 {
let lsn = wal.append(test_entry(i)).unwrap();
// Only wait for half (simulate crash)
if i < 500 {
wal.wait_for_lsn(lsn).await.unwrap();
}
}
// Crash (don't call shutdown)
std::mem::forget(wal);
}
// Recover
let result = RecoveryManager::recover("/tmp/test_crash").unwrap();
// Should recover at least the 500 we waited for
assert!(result.entries.len() >= 500);
assert!(result.entries.len() <= 1000);
}

Acceptance Criteria:

  • Concurrent writes work correctly
  • All flushed entries recovered
  • Crash recovery works correctly
  • No data corruption under load

Phase 3: Transaction Manager Integration (1 day)

Day 4 Morning: Transaction Manager Integration (4 hours)

Task 4.1: Update Transaction Commit Protocol (2 hours)

File: heliosdb-transaction/src/coordinator.rs

impl TransactionCoordinator {
pub async fn commit(&self, txn_id: TxnId) -> Result<(), TransactionError> {
let txn = self.get_transaction(txn_id)?;
// 1. Prepare commit record
let commit_record = WalEntry {
txn_id,
entry_type: WalEntryType::Commit,
data: self.serialize_transaction(&txn)?,
timestamp: current_timestamp(),
checksum: 0, // Computed by WAL
};
// 2. Write to WAL (returns immediately)
let lsn = self.wal.append(commit_record)?;
// 3. Wait for durability
self.wal.wait_for_lsn(lsn).await
.map_err(|e| TransactionError::DurabilityFailure(e))?;
// 4. Mark as committed in memory
txn.set_state(TxnState::Committed);
// 5. Release locks
self.lock_manager.release_all(txn_id);
Ok(())
}
}

Acceptance Criteria:

  • Transactions commit correctly
  • Durability guaranteed after commit returns
  • Locks released after durability

Task 4.2: MVCC Integration (2 hours)

File: heliosdb-mvcc/src/version_store.rs

impl VersionStore {
pub async fn create_version(
&self,
key: &[u8],
value: &[u8],
txn_id: TxnId,
) -> Result<Lsn> {
// 1. Write to WAL first
let entry = WalEntry {
txn_id,
entry_type: WalEntryType::Data,
data: encode_kv(key, value),
timestamp: current_timestamp(),
checksum: 0,
};
let lsn = self.wal.append(entry)?;
// 2. Create version in memory
self.versions.insert(key, Version {
value: value.to_vec(),
txn_id,
lsn,
visible: false, // Not visible until commit
});
// 3. Don't wait for flush here
// Commit will wait, ensuring durability
Ok(lsn)
}
pub async fn commit_version(&self, txn_id: TxnId, lsn: Lsn) -> Result<()> {
// Wait for durability
self.wal.wait_for_lsn(lsn).await?;
// Mark versions as visible
for version in self.versions.values_mut() {
if version.txn_id == txn_id {
version.visible = true;
}
}
Ok(())
}
}

Day 4 Afternoon: 2PC Integration (4 hours)

Task 4.3: Two-Phase Commit Integration (4 hours)

File: heliosdb-distributed/src/two_phase_commit.rs

impl TwoPhaseCoordinator {
pub async fn commit_distributed(&self, txn: &DistributedTransaction) -> Result<()> {
// Phase 1: PREPARE
let prepare_lsns: Vec<_> = stream::iter(&txn.participants)
.then(|p| async {
let entry = WalEntry::prepare(txn.id, p.id);
self.wal.append(entry)
})
.try_collect()
.await?;
// Wait for all PREPARE records to be durable
// (They may be batched together for efficiency!)
for lsn in prepare_lsns {
self.wal.wait_for_lsn(lsn).await?;
}
// Phase 2: COMMIT
let commit_lsns: Vec<_> = stream::iter(&txn.participants)
.then(|p| async {
let entry = WalEntry::commit(txn.id, p.id);
self.wal.append(entry)
})
.try_collect()
.await?;
// Wait for all COMMIT records
for lsn in commit_lsns {
self.wal.wait_for_lsn(lsn).await?;
}
Ok(())
}
}

Acceptance Criteria:

  • 2PC protocol maintains correctness
  • PREPARE and COMMIT phases batched
  • Recovery handles partial 2PC correctly

Phase 4: Performance Tuning (1 day)

Day 5 Morning: Benchmarking (4 hours)

Task 5.1: Create Benchmark Suite (2 hours)

File: benches/group_commit_benchmarks.rs

use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn benchmark_throughput(c: &mut Criterion) {
let mut group = c.benchmark_group("throughput");
for &batch_size in &[10, 50, 100, 500] {
for &interval_ms in &[1, 5, 10, 20] {
group.bench_with_input(
BenchmarkId::from_parameter(format!("batch_{}_interval_{}", batch_size, interval_ms)),
&(batch_size, interval_ms),
|b, &(batch, interval)| {
let config = GroupCommitConfig {
max_batch_size: batch,
max_flush_interval_ms: interval,
..Default::default()
};
let wal = GroupCommitWal::new("/tmp/bench", config).unwrap();
b.iter(|| {
let entry = create_test_entry();
black_box(wal.append(entry).unwrap());
});
},
);
}
}
group.finish();
}
fn benchmark_latency(c: &mut Criterion) {
let mut group = c.benchmark_group("latency");
for &interval_ms in &[1, 5, 10, 20] {
group.bench_with_input(
BenchmarkId::from_parameter(interval_ms),
&interval_ms,
|b, &interval| {
let config = GroupCommitConfig {
max_flush_interval_ms: interval,
..Default::default()
};
let runtime = tokio::runtime::Runtime::new().unwrap();
let wal = GroupCommitWal::new("/tmp/bench", config).unwrap();
b.to_async(&runtime).iter(|| async {
let entry = create_test_entry();
let lsn = wal.append(entry).unwrap();
black_box(wal.wait_for_lsn(lsn).await.unwrap());
});
},
);
}
group.finish();
}
criterion_group!(benches, benchmark_throughput, benchmark_latency);
criterion_main!(benches);

Task 5.2: Run Benchmarks & Analyze Results (2 hours)

Commands:

Terminal window
# Run benchmarks
cargo bench --bench group_commit_benchmarks
# Generate report
cargo bench --bench group_commit_benchmarks -- --save-baseline group_commit_v1
# Compare with baseline (no group commit)
cargo bench --bench wal_benchmarks -- --baseline no_group_commit

Expected Results:

MetricBaselineGroup CommitImprovement
Throughput1,000/sec10,000/sec10x
Fsync calls1,000/sec100/sec10x reduction
P50 latency10ms15ms+5ms
P99 latency15ms20ms+5ms

Day 5 Afternoon: Documentation & Handoff (4 hours)

Task 5.3: Create User Documentation (2 hours)

File: docs/user-guides/GROUP_COMMIT_WAL_GUIDE.md

Contents:

  • Configuration guide
  • Tuning recommendations
  • Monitoring guide
  • Troubleshooting guide

Task 5.4: Code Review & Handoff (2 hours)

  • Self-review all code
  • Run clippy and fix warnings
  • Run rustfmt
  • Update CHANGELOG.md
  • Create PR with detailed description
  • Schedule knowledge transfer session

Success Criteria

Functional Requirements

  • Group commit batching works correctly
  • Durability guarantees maintained
  • Recovery handles all failure modes
  • Integration with transaction manager complete

Performance Requirements

  • Throughput ≥ 10,000 commits/sec (HDD)
  • Fsync reduction ≥ 90%
  • P99 latency ≤ 20ms
  • No performance regression in read path

Quality Requirements

  • Unit test coverage ≥ 80%
  • Integration tests cover key scenarios
  • No clippy warnings
  • Documentation complete

Production Readiness

  • Metrics and monitoring in place
  • Error handling robust
  • Configuration flexible
  • Logging comprehensive

Risk Mitigation

RiskMitigation
Implementation complexityFollow incremental plan, test each phase
Performance regressionBenchmark continuously, compare with baseline
Data lossComprehensive recovery testing, checksums
Integration issuesCoordinate with transaction team early
Timeline slippageFocus on MVP first, defer optimizations

Next Steps

  1. Review this roadmap with team
  2. Allocate resources (1-2 engineers)
  3. Set up development environment
  4. Begin Day 1 implementation
  5. Daily standup to track progress

Status: Ready for Implementation Owner: TBD Timeline: 3-5 days Priority: P0 - Critical Performance Optimization