Group Commit WAL Implementation Roadmap
Group Commit WAL Implementation Roadmap
Document Version: 1.0 Date: 2025-11-10 Status: Implementation Guide Related: GROUP_COMMIT_WAL_ARCHITECTURE.md
Overview
This document provides a detailed implementation roadmap for the Group Commit WAL system, breaking down the 3-day implementation into actionable tasks with clear deliverables and acceptance criteria.
Implementation Timeline
Day 1: Core Implementation├── Morning (4h): Data Structures & LSN Management└── Afternoon (4h): Flush Thread & Batching
Day 2: Waiter Notification & Durability├── Morning (4h): Notification System└── Afternoon (4h): Durability Modes & Testing
Day 3: Recovery & Integration├── Morning (4h): Recovery Protocol└── Afternoon (4h): Integration Testing
Day 4: Advanced Integration├── Morning (4h): Transaction Manager Integration└── Afternoon (4h): MVCC Integration & 2PC
Day 5: Tuning & Validation├── Morning (4h): Performance Benchmarking└── Afternoon (4h): Documentation & Handoff
Total: 3-5 days (depending on integration scope)Phase 1: Basic Group Commit (1.5 days)
Day 1 Morning: Core Data Structures (4 hours)
Task 1.1: Define Core Types (1 hour)
File: heliosdb-storage/src/wal/group_commit/types.rs
// Core types to implement:// - Lsn// - WalEntry// - WalEntryType// - DurabilityMode// - GroupCommitConfigAcceptance Criteria:
- All types compile without errors
- Serialization/deserialization tests pass
- Type conversions are safe and tested
Tests:
#[test]fn test_lsn_ordering() { let lsn1 = Lsn::new(1); let lsn2 = Lsn::new(2); assert!(lsn1 < lsn2);}
#[test]fn test_wal_entry_serialization() { let entry = WalEntry::new(...); let bytes = serialize(&entry); let deserialized = deserialize(&bytes); assert_eq!(entry, deserialized);}Task 1.2: Implement Atomic LSN Counter (1 hour)
File: heliosdb-storage/src/wal/group_commit/lsn_counter.rs
pub struct LsnCounter { counter: AtomicU64,}
impl LsnCounter { pub fn new(initial: u64) -> Self; pub fn next(&self) -> Lsn; pub fn current(&self) -> Lsn;}Acceptance Criteria:
- Thread-safe LSN assignment
- Monotonicity guaranteed
- No LSN collisions under concurrent load
Tests:
#[test]fn test_concurrent_lsn_assignment() { let counter = Arc::new(LsnCounter::new(0)); let mut handles = vec![];
for _ in 0..10 { let c = Arc::clone(&counter); handles.push(thread::spawn(move || { (0..1000).map(|_| c.next()).collect::<Vec<_>>() })); }
let all_lsns: Vec<Lsn> = handles .into_iter() .flat_map(|h| h.join().unwrap()) .collect();
// All LSNs should be unique let unique: HashSet<_> = all_lsns.iter().collect(); assert_eq!(unique.len(), 10_000);}Task 1.3: Implement Lock-Free Pending Queue (2 hours)
File: heliosdb-storage/src/wal/group_commit/pending_queue.rs
pub struct PendingEntry { pub lsn: Lsn, pub entry: WalEntry, pub notify: Arc<Notify>, pub result: Arc<RwLock<Option<io::Result<()>>>>,}
pub struct PendingQueue { queue: SegQueue<PendingEntry>, metrics: Arc<QueueMetrics>,}
impl PendingQueue { pub fn new() -> Self; pub fn push(&self, entry: PendingEntry); pub fn pop(&self) -> Option<PendingEntry>; pub fn len(&self) -> usize;}Acceptance Criteria:
- Lock-free push/pop operations
- No data races under concurrent access
- Queue metrics collected correctly
Tests:
#[test]fn test_queue_concurrent_push_pop() { let queue = Arc::new(PendingQueue::new());
// Spawn producers let producers: Vec<_> = (0..10) .map(|_| { let q = Arc::clone(&queue); thread::spawn(move || { for i in 0..100 { q.push(create_test_entry(i)); } }) }) .collect();
// Spawn consumers let consumers: Vec<_> = (0..5) .map(|_| { let q = Arc::clone(&queue); thread::spawn(move || { let mut count = 0; while count < 200 { if q.pop().is_some() { count += 1; } } count }) }) .collect();
// Wait for completion for p in producers { p.join().unwrap(); }
let total: usize = consumers .into_iter() .map(|c| c.join().unwrap()) .sum();
assert_eq!(total, 1000);}Day 1 Afternoon: Flush Thread & Batching (4 hours)
Task 1.4: Implement Flush Thread (2 hours)
File: heliosdb-storage/src/wal/group_commit/flush_thread.rs
pub struct FlushThread { config: GroupCommitConfig, file: Arc<Mutex<BufWriter<File>>>, pending_queue: Arc<PendingQueue>, last_flushed_lsn: Arc<AtomicU64>, shutdown: Arc<AtomicBool>, metrics: Arc<WalMetrics>,}
impl FlushThread { pub fn spawn(config: GroupCommitConfig, ...) -> JoinHandle<()>; fn run(&self); fn collect_batch(&self, max_size: usize, timeout: Duration) -> Vec<PendingEntry>; fn flush_batch(&self, batch: Vec<PendingEntry>);}Acceptance Criteria:
- Thread spawns and runs correctly
- Graceful shutdown on signal
- Metrics collected accurately
Tests:
#[test]fn test_flush_thread_lifecycle() { let (tx, rx) = channel(); let shutdown = Arc::new(AtomicBool::new(false));
let thread = FlushThread::spawn(...);
// Verify thread is running thread::sleep(Duration::from_millis(100)); assert!(thread.is_running());
// Signal shutdown shutdown.store(true, Ordering::Release);
// Thread should exit within 1 second let result = thread.join_timeout(Duration::from_secs(1)); assert!(result.is_ok());}Task 1.5: Implement Batching Logic (2 hours)
File: heliosdb-storage/src/wal/group_commit/batching.rs
pub struct BatchCollector { config: GroupCommitConfig, pending_queue: Arc<PendingQueue>,}
impl BatchCollector { pub fn collect_batch(&self) -> Vec<PendingEntry> { let mut batch = Vec::new(); let deadline = Instant::now() + self.config.flush_interval();
loop { // Time-based: flush if deadline reached if Instant::now() >= deadline && !batch.is_empty() { break; }
// Size-based: flush if batch full if batch.len() >= self.config.max_batch_size { break; }
// Try to collect more entries match self.pending_queue.pop() { Some(entry) => batch.push(entry), None => { if !batch.is_empty() && Instant::now() >= deadline { break; } thread::sleep(Duration::from_micros(100)); } } }
batch }}Acceptance Criteria:
- Time-based flushing works correctly
- Size-based flushing works correctly
- Hybrid strategy respects both limits
Tests:
#[test]fn test_time_based_flush() { let config = GroupCommitConfig { max_flush_interval_ms: 10, max_batch_size: 1000, ..Default::default() };
let collector = BatchCollector::new(config, queue);
// Add 10 entries for i in 0..10 { queue.push(create_test_entry(i)); }
// Should flush after 10ms even though batch not full let start = Instant::now(); let batch = collector.collect_batch(); let elapsed = start.elapsed();
assert_eq!(batch.len(), 10); assert!(elapsed >= Duration::from_millis(10)); assert!(elapsed < Duration::from_millis(20));}
#[test]fn test_size_based_flush() { let config = GroupCommitConfig { max_flush_interval_ms: 1000, max_batch_size: 100, ..Default::default() };
let collector = BatchCollector::new(config, queue);
// Add 100 entries rapidly for i in 0..100 { queue.push(create_test_entry(i)); }
// Should flush immediately when batch reaches 100 let start = Instant::now(); let batch = collector.collect_batch(); let elapsed = start.elapsed();
assert_eq!(batch.len(), 100); assert!(elapsed < Duration::from_millis(100)); // Much less than timeout}Day 2 Morning: Waiter Notification System (4 hours)
Task 2.1: Implement Waiter Registry (2 hours)
File: heliosdb-storage/src/wal/group_commit/waiters.rs
pub struct WaiterRegistry { waiters: RwLock<HashMap<Lsn, Vec<Waiter>>>,}
pub struct Waiter { lsn: Lsn, notify: Arc<Notify>, result: Arc<RwLock<Option<io::Result<()>>>>,}
impl WaiterRegistry { pub fn new() -> Self;
pub fn register(&self, lsn: Lsn) -> Waiter;
pub fn notify(&self, lsn: Lsn, result: io::Result<()>) { let mut waiters = self.waiters.write().unwrap();
// Notify all waiters for LSNs <= lsn let to_notify: Vec<_> = waiters .iter() .filter(|(k, _)| **k <= lsn) .flat_map(|(_, v)| v.clone()) .collect();
for waiter in to_notify { *waiter.result.write().unwrap() = Some(result.clone()); waiter.notify.notify_waiters(); }
// Remove notified waiters waiters.retain(|k, _| *k > lsn); }
pub async fn wait(&self, waiter: Waiter) -> io::Result<()> { waiter.notify.notified().await; waiter.result.read().unwrap() .clone() .expect("Result should be set after notification") }}Acceptance Criteria:
- Waiters registered correctly
- Notifications sent to correct waiters
- Waiters removed after notification
- No memory leaks
Tests:
#[tokio::test]async fn test_waiter_notification() { let registry = Arc::new(WaiterRegistry::new());
let waiter1 = registry.register(Lsn::new(1)); let waiter2 = registry.register(Lsn::new(2)); let waiter3 = registry.register(Lsn::new(3));
// Spawn waiters let r = Arc::clone(®istry); let h1 = tokio::spawn(async move { r.wait(waiter1).await });
let r = Arc::clone(®istry); let h2 = tokio::spawn(async move { r.wait(waiter2).await });
let r = Arc::clone(®istry); let h3 = tokio::spawn(async move { r.wait(waiter3).await });
// Notify up to LSN 2 registry.notify(Lsn::new(2), Ok(()));
// Wait for results assert!(h1.await.unwrap().is_ok()); assert!(h2.await.unwrap().is_ok());
// h3 should still be waiting tokio::time::sleep(Duration::from_millis(100)).await; assert!(!h3.is_finished());
// Notify LSN 3 registry.notify(Lsn::new(3), Ok(())); assert!(h3.await.unwrap().is_ok());}Task 2.2: Integrate Waiters with Flush Thread (2 hours)
File: heliosdb-storage/src/wal/group_commit/flush_thread.rs
impl FlushThread { fn flush_batch(&self, batch: Vec<PendingEntry>) { let start = Instant::now();
let mut file = self.file.lock().unwrap(); let mut max_lsn = Lsn::new(0); let mut total_bytes = 0;
// Write all entries for pending in &batch { match self.write_entry(&mut *file, pending.lsn, &pending.entry) { Ok(bytes) => { total_bytes += bytes; max_lsn = max_lsn.max(pending.lsn); } Err(e) => { eprintln!("Write error: {}", e); } } }
// Single fsync let flush_result = file.flush() .and_then(|_| file.get_mut().sync_all());
match flush_result { Ok(_) => { // Update last flushed LSN self.last_flushed_lsn.store(max_lsn.value(), Ordering::Release);
// Notify all waiters for pending in batch { *pending.result.write().unwrap() = Some(Ok(())); pending.notify.notify_waiters(); }
// Update metrics self.metrics.total_flushes.fetch_add(1, Ordering::Relaxed); self.metrics.total_bytes_written.fetch_add(total_bytes as u64, Ordering::Relaxed); } Err(e) => { eprintln!("Fsync error: {}", e);
// Notify all waiters of failure for pending in batch { *pending.result.write().unwrap() = Some(Err( io::Error::new(io::ErrorKind::Other, "Fsync failed") )); pending.notify.notify_waiters(); } } }
let latency = start.elapsed().as_micros() as u64; self.metrics.flush_latencies.write().unwrap().push(latency); }}Acceptance Criteria:
- Successful flush notifies all waiters with Ok
- Failed flush notifies all waiters with Err
- Notification happens atomically after fsync
Day 2 Afternoon: Durability Modes & Testing (4 hours)
Task 2.3: Implement Durability Modes (2 hours)
File: heliosdb-storage/src/wal/group_commit/mod.rs
impl GroupCommitWal { pub fn append(&self, entry: WalEntry) -> io::Result<Lsn> { let lsn = self.lsn_counter.next();
let notify = Arc::new(Notify::new()); let result = Arc::new(RwLock::new(None));
let pending = PendingEntry { lsn, entry, notify: Arc::clone(¬ify), result: Arc::clone(&result), };
self.pending_queue.push(pending); self.metrics.total_appends.fetch_add(1, Ordering::Relaxed);
Ok(lsn) }
pub async fn append_with_mode( &self, entry: WalEntry, mode: DurabilityMode, ) -> io::Result<Lsn> { let lsn = self.append(entry)?;
match mode { DurabilityMode::Synchronous | DurabilityMode::GroupCommit => { self.wait_for_lsn(lsn).await?; } DurabilityMode::Async => { // Fire-and-forget } }
Ok(lsn) }
pub async fn wait_for_lsn(&self, lsn: Lsn) -> io::Result<()> { // Fast path: already flushed if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() { return Ok(()); }
// Slow path: wait for flush // Note: This is simplified - real implementation uses waiter registry let timeout = Duration::from_millis(self.config.max_flush_interval_ms * 2);
tokio::time::timeout(timeout, async { loop { if self.last_flushed_lsn.load(Ordering::Acquire) >= lsn.value() { return Ok(()); } tokio::time::sleep(Duration::from_micros(100)).await; } }) .await .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "Wait for LSN timed out"))? }}Acceptance Criteria:
- Synchronous mode waits for flush
- GroupCommit mode allows explicit waiting
- Async mode returns immediately
- Correct mode behavior under load
Tests:
#[tokio::test]async fn test_synchronous_mode() { let wal = GroupCommitWal::new("/tmp/test", Default::default()).unwrap();
let start = Instant::now(); let lsn = wal.append_with_mode( test_entry(), DurabilityMode::Synchronous ).await.unwrap();
// Should wait for flush before returning assert!(start.elapsed() >= Duration::from_millis(5));
// Entry should be durable immediately assert!(wal.last_flushed_lsn() >= lsn);}
#[tokio::test]async fn test_async_mode() { let wal = GroupCommitWal::new("/tmp/test", Default::default()).unwrap();
let start = Instant::now(); let lsn = wal.append_with_mode( test_entry(), DurabilityMode::Async ).await.unwrap();
// Should return immediately assert!(start.elapsed() < Duration::from_millis(1));
// Entry may not be durable yet // (this is expected for async mode)}Task 2.4: Unit Testing Suite (2 hours)
File: heliosdb-storage/src/wal/group_commit/tests.rs
Implement comprehensive unit tests:
- LSN assignment monotonicity
- Concurrent append correctness
- Batch collection logic
- Flush thread lifecycle
- Durability mode behavior
- Metrics accuracy
Phase 2: Recovery Protocol (0.5 days)
Day 3 Morning: Recovery Implementation (4 hours)
Task 3.1: WAL File Format & Serialization (2 hours)
File: heliosdb-storage/src/wal/group_commit/format.rs
// WAL Entry Format:// | LSN (8) | Type (1) | TxnID (8) | Timestamp (8) | DataLen (4) | Data (N) | CRC32 (4) |
pub struct WalEncoder;
impl WalEncoder { pub fn encode(lsn: Lsn, entry: &WalEntry) -> Vec<u8> { let mut buf = Vec::new();
// LSN buf.extend_from_slice(&lsn.value().to_le_bytes());
// Entry type buf.push(entry.entry_type as u8);
// Transaction ID buf.extend_from_slice(&entry.txn_id.to_le_bytes());
// Timestamp buf.extend_from_slice(&entry.timestamp.to_le_bytes());
// Data length buf.extend_from_slice(&(entry.data.len() as u32).to_le_bytes());
// Data buf.extend_from_slice(&entry.data);
// Checksum (CRC32 of all preceding bytes) let checksum = crc32(&buf); buf.extend_from_slice(&checksum.to_le_bytes());
buf }
pub fn decode(reader: &mut impl Read) -> io::Result<(Lsn, WalEntry)> { // Read and parse entry // Validate checksum // Return entry or error }}Acceptance Criteria:
- Encode/decode round-trip works
- Checksum validation detects corruption
- Handles partial writes correctly
Task 3.2: Recovery Algorithm (2 hours)
File: heliosdb-storage/src/wal/group_commit/recovery.rs
pub struct RecoveryManager;
impl RecoveryManager { pub fn recover(path: &Path) -> io::Result<RecoveryResult> { let mut entries = Vec::new(); let mut last_valid_offset = 0u64; let mut corrupted_at = None;
let file = File::open(path)?; let mut reader = BufReader::new(file);
loop { let offset = reader.stream_position()?;
match WalEncoder::decode(&mut reader) { Ok((lsn, entry)) => { entries.push((lsn, entry)); last_valid_offset = reader.stream_position()?; } Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { break; // Normal EOF } Err(_) => { corrupted_at = Some(offset); break; // Corruption detected } } }
// Truncate if corruption detected if corrupted_at.is_some() { let file = OpenOptions::new().write(true).open(path)?; file.set_len(last_valid_offset)?; }
Ok(RecoveryResult { entries, last_valid_offset, corrupted_at, }) }}Acceptance Criteria:
- Recovers all valid entries
- Truncates corrupted tail
- Handles empty WAL
- Handles completely corrupted WAL
Tests:
#[test]fn test_recovery_normal() { // Write 100 entries let wal = create_test_wal(); for i in 0..100 { wal.append(test_entry(i)).unwrap(); } wal.shutdown().unwrap();
// Recover let result = RecoveryManager::recover(&wal.path()).unwrap();
assert_eq!(result.entries.len(), 100); assert!(result.corrupted_at.is_none());}
#[test]fn test_recovery_with_corruption() { // Write 100 entries let wal = create_test_wal(); for i in 0..100 { wal.append(test_entry(i)).unwrap(); } wal.shutdown().unwrap();
// Corrupt the file let mut file = OpenOptions::new() .write(true) .open(wal.path()) .unwrap(); file.seek(SeekFrom::End(-100)).unwrap(); file.write_all(&[0xFF; 100]).unwrap();
// Recover let result = RecoveryManager::recover(&wal.path()).unwrap();
// Should recover most entries assert!(result.entries.len() >= 90); assert!(result.corrupted_at.is_some());}Day 3 Afternoon: Integration Testing (4 hours)
Task 3.3: End-to-End Integration Tests (4 hours)
File: heliosdb-storage/tests/group_commit_integration_tests.rs
#[tokio::test]async fn test_concurrent_commits_durability() { let wal = Arc::new(GroupCommitWal::new("/tmp/test", Default::default()).unwrap()); let mut handles = vec![];
// Spawn 100 concurrent writers for i in 0..100 { let wal = Arc::clone(&wal); handles.push(tokio::spawn(async move { for j in 0..100 { let entry = test_entry(i * 100 + j); let lsn = wal.append(entry).unwrap(); wal.wait_for_lsn(lsn).await.unwrap(); } })); }
// Wait for all for h in handles { h.await.unwrap(); }
// Verify all written assert_eq!(wal.metrics().total_appends, 10_000);
// Shutdown and recover let path = wal.path().to_owned(); drop(wal);
let result = RecoveryManager::recover(&path).unwrap(); assert_eq!(result.entries.len(), 10_000);}
#[tokio::test]async fn test_crash_recovery() { // Write some entries { let wal = GroupCommitWal::new("/tmp/test_crash", Default::default()).unwrap(); for i in 0..1000 { let lsn = wal.append(test_entry(i)).unwrap();
// Only wait for half (simulate crash) if i < 500 { wal.wait_for_lsn(lsn).await.unwrap(); } }
// Crash (don't call shutdown) std::mem::forget(wal); }
// Recover let result = RecoveryManager::recover("/tmp/test_crash").unwrap();
// Should recover at least the 500 we waited for assert!(result.entries.len() >= 500); assert!(result.entries.len() <= 1000);}Acceptance Criteria:
- Concurrent writes work correctly
- All flushed entries recovered
- Crash recovery works correctly
- No data corruption under load
Phase 3: Transaction Manager Integration (1 day)
Day 4 Morning: Transaction Manager Integration (4 hours)
Task 4.1: Update Transaction Commit Protocol (2 hours)
File: heliosdb-transaction/src/coordinator.rs
impl TransactionCoordinator { pub async fn commit(&self, txn_id: TxnId) -> Result<(), TransactionError> { let txn = self.get_transaction(txn_id)?;
// 1. Prepare commit record let commit_record = WalEntry { txn_id, entry_type: WalEntryType::Commit, data: self.serialize_transaction(&txn)?, timestamp: current_timestamp(), checksum: 0, // Computed by WAL };
// 2. Write to WAL (returns immediately) let lsn = self.wal.append(commit_record)?;
// 3. Wait for durability self.wal.wait_for_lsn(lsn).await .map_err(|e| TransactionError::DurabilityFailure(e))?;
// 4. Mark as committed in memory txn.set_state(TxnState::Committed);
// 5. Release locks self.lock_manager.release_all(txn_id);
Ok(()) }}Acceptance Criteria:
- Transactions commit correctly
- Durability guaranteed after commit returns
- Locks released after durability
Task 4.2: MVCC Integration (2 hours)
File: heliosdb-mvcc/src/version_store.rs
impl VersionStore { pub async fn create_version( &self, key: &[u8], value: &[u8], txn_id: TxnId, ) -> Result<Lsn> { // 1. Write to WAL first let entry = WalEntry { txn_id, entry_type: WalEntryType::Data, data: encode_kv(key, value), timestamp: current_timestamp(), checksum: 0, };
let lsn = self.wal.append(entry)?;
// 2. Create version in memory self.versions.insert(key, Version { value: value.to_vec(), txn_id, lsn, visible: false, // Not visible until commit });
// 3. Don't wait for flush here // Commit will wait, ensuring durability
Ok(lsn) }
pub async fn commit_version(&self, txn_id: TxnId, lsn: Lsn) -> Result<()> { // Wait for durability self.wal.wait_for_lsn(lsn).await?;
// Mark versions as visible for version in self.versions.values_mut() { if version.txn_id == txn_id { version.visible = true; } }
Ok(()) }}Day 4 Afternoon: 2PC Integration (4 hours)
Task 4.3: Two-Phase Commit Integration (4 hours)
File: heliosdb-distributed/src/two_phase_commit.rs
impl TwoPhaseCoordinator { pub async fn commit_distributed(&self, txn: &DistributedTransaction) -> Result<()> { // Phase 1: PREPARE let prepare_lsns: Vec<_> = stream::iter(&txn.participants) .then(|p| async { let entry = WalEntry::prepare(txn.id, p.id); self.wal.append(entry) }) .try_collect() .await?;
// Wait for all PREPARE records to be durable // (They may be batched together for efficiency!) for lsn in prepare_lsns { self.wal.wait_for_lsn(lsn).await?; }
// Phase 2: COMMIT let commit_lsns: Vec<_> = stream::iter(&txn.participants) .then(|p| async { let entry = WalEntry::commit(txn.id, p.id); self.wal.append(entry) }) .try_collect() .await?;
// Wait for all COMMIT records for lsn in commit_lsns { self.wal.wait_for_lsn(lsn).await?; }
Ok(()) }}Acceptance Criteria:
- 2PC protocol maintains correctness
- PREPARE and COMMIT phases batched
- Recovery handles partial 2PC correctly
Phase 4: Performance Tuning (1 day)
Day 5 Morning: Benchmarking (4 hours)
Task 5.1: Create Benchmark Suite (2 hours)
File: benches/group_commit_benchmarks.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn benchmark_throughput(c: &mut Criterion) { let mut group = c.benchmark_group("throughput");
for &batch_size in &[10, 50, 100, 500] { for &interval_ms in &[1, 5, 10, 20] { group.bench_with_input( BenchmarkId::from_parameter(format!("batch_{}_interval_{}", batch_size, interval_ms)), &(batch_size, interval_ms), |b, &(batch, interval)| { let config = GroupCommitConfig { max_batch_size: batch, max_flush_interval_ms: interval, ..Default::default() };
let wal = GroupCommitWal::new("/tmp/bench", config).unwrap();
b.iter(|| { let entry = create_test_entry(); black_box(wal.append(entry).unwrap()); }); }, ); } }
group.finish();}
fn benchmark_latency(c: &mut Criterion) { let mut group = c.benchmark_group("latency");
for &interval_ms in &[1, 5, 10, 20] { group.bench_with_input( BenchmarkId::from_parameter(interval_ms), &interval_ms, |b, &interval| { let config = GroupCommitConfig { max_flush_interval_ms: interval, ..Default::default() };
let runtime = tokio::runtime::Runtime::new().unwrap(); let wal = GroupCommitWal::new("/tmp/bench", config).unwrap();
b.to_async(&runtime).iter(|| async { let entry = create_test_entry(); let lsn = wal.append(entry).unwrap(); black_box(wal.wait_for_lsn(lsn).await.unwrap()); }); }, ); }
group.finish();}
criterion_group!(benches, benchmark_throughput, benchmark_latency);criterion_main!(benches);Task 5.2: Run Benchmarks & Analyze Results (2 hours)
Commands:
# Run benchmarkscargo bench --bench group_commit_benchmarks
# Generate reportcargo bench --bench group_commit_benchmarks -- --save-baseline group_commit_v1
# Compare with baseline (no group commit)cargo bench --bench wal_benchmarks -- --baseline no_group_commitExpected Results:
| Metric | Baseline | Group Commit | Improvement |
|---|---|---|---|
| Throughput | 1,000/sec | 10,000/sec | 10x |
| Fsync calls | 1,000/sec | 100/sec | 10x reduction |
| P50 latency | 10ms | 15ms | +5ms |
| P99 latency | 15ms | 20ms | +5ms |
Day 5 Afternoon: Documentation & Handoff (4 hours)
Task 5.3: Create User Documentation (2 hours)
File: docs/user-guides/GROUP_COMMIT_WAL_GUIDE.md
Contents:
- Configuration guide
- Tuning recommendations
- Monitoring guide
- Troubleshooting guide
Task 5.4: Code Review & Handoff (2 hours)
- Self-review all code
- Run clippy and fix warnings
- Run rustfmt
- Update CHANGELOG.md
- Create PR with detailed description
- Schedule knowledge transfer session
Success Criteria
Functional Requirements
- Group commit batching works correctly
- Durability guarantees maintained
- Recovery handles all failure modes
- Integration with transaction manager complete
Performance Requirements
- Throughput ≥ 10,000 commits/sec (HDD)
- Fsync reduction ≥ 90%
- P99 latency ≤ 20ms
- No performance regression in read path
Quality Requirements
- Unit test coverage ≥ 80%
- Integration tests cover key scenarios
- No clippy warnings
- Documentation complete
Production Readiness
- Metrics and monitoring in place
- Error handling robust
- Configuration flexible
- Logging comprehensive
Risk Mitigation
| Risk | Mitigation |
|---|---|
| Implementation complexity | Follow incremental plan, test each phase |
| Performance regression | Benchmark continuously, compare with baseline |
| Data loss | Comprehensive recovery testing, checksums |
| Integration issues | Coordinate with transaction team early |
| Timeline slippage | Focus on MVP first, defer optimizations |
Next Steps
- Review this roadmap with team
- Allocate resources (1-2 engineers)
- Set up development environment
- Begin Day 1 implementation
- Daily standup to track progress
Status: Ready for Implementation Owner: TBD Timeline: 3-5 days Priority: P0 - Critical Performance Optimization