Skip to content

Sharded Memtable: Implementation Roadmap

Sharded Memtable: Implementation Roadmap

Related Documents:

This document provides a detailed, day-by-day implementation plan with specific tasks, acceptance criteria, and risk mitigations.


Timeline Overview

Total Duration: 3.5 development days + 0.5 buffer = 4 days

Day 1: Core Implementation (Phases 1-2)
├─ Morning: Basic structure, insert/get/remove
└─ Afternoon: Memtable trait, backward compatibility
Day 2: Advanced Operations (Phase 2-3)
├─ Morning: Range scans with k-way merge
└─ Afternoon: Flush to SSTable
Day 3: Optimization & Integration (Phases 3-4)
├─ Morning: LSM integration, testing
└─ Afternoon: Bloom filters, metrics
Day 4: Polish & Validation
├─ Morning: Performance benchmarks, tuning
└─ Afternoon: Documentation, review

Phase 1: Core Sharded Implementation (Day 1)

Day 1 - Morning Session (4 hours)

Task 1.1: Project Setup (30 minutes)

Objective: Create module structure and dependencies

Actions:

Terminal window
# Create module structure
mkdir -p heliosdb-storage/src/memtable
touch heliosdb-storage/src/memtable/mod.rs
touch heliosdb-storage/src/memtable/sharded.rs
touch heliosdb-storage/src/memtable/config.rs
touch heliosdb-storage/src/memtable/metrics.rs

Update Cargo.toml:

[dependencies]
seahash = "4.1"
rayon = "1.7" # For parallel operations

Acceptance Criteria:

  • Module structure created
  • Dependencies added and compiling
  • Imports working

Task 1.2: Configuration Structures (30 minutes)

File: heliosdb-storage/src/memtable/config.rs

Implementation:

/// Configuration for sharded memtable
#[derive(Clone, Debug)]
pub struct MemtableConfig {
/// Number of shards (typically 32)
pub shard_count: usize,
/// Maximum size before flush (bytes)
pub max_size_bytes: usize,
/// Enable bloom filters for read optimization
pub enable_bloom: bool,
/// Bloom filter false positive rate
pub bloom_fpr: f64,
}
impl Default for MemtableConfig {
fn default() -> Self {
Self {
shard_count: 32,
max_size_bytes: 64 * 1024 * 1024, // 64MB
enable_bloom: false, // Disable initially, add in Phase 4
bloom_fpr: 0.01,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = MemtableConfig::default();
assert_eq!(config.shard_count, 32);
assert_eq!(config.max_size_bytes, 64 * 1024 * 1024);
}
}

Acceptance Criteria:

  • MemtableConfig struct defined
  • Default implementation provided
  • Basic test passing

Task 1.3: Core ShardedMemtable Structure (1 hour)

File: heliosdb-storage/src/memtable/sharded.rs

Implementation:

use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::BTreeMap;
pub struct ShardedMemtable {
/// Individual shards with independent locks
shards: Vec<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
/// Number of shards
shard_count: usize,
/// Total size across all shards (lock-free)
size_bytes: AtomicUsize,
/// Per-shard size tracking
shard_sizes: Vec<AtomicUsize>,
/// Configuration
config: MemtableConfig,
}
impl ShardedMemtable {
/// Create new sharded memtable
pub fn new(config: MemtableConfig) -> Self {
let mut shards = Vec::with_capacity(config.shard_count);
let mut shard_sizes = Vec::with_capacity(config.shard_count);
for _ in 0..config.shard_count {
shards.push(RwLock::new(BTreeMap::new()));
shard_sizes.push(AtomicUsize::new(0));
}
Self {
shards,
shard_count: config.shard_count,
size_bytes: AtomicUsize::new(0),
shard_sizes,
config,
}
}
/// Calculate shard index for key
#[inline]
fn shard_index(&self, key: &[u8]) -> usize {
use seahash::hash;
(hash(key) as usize) % self.shard_count
}
/// Get total size
#[inline]
pub fn size(&self) -> usize {
self.size_bytes.load(Ordering::Relaxed)
}
/// Check if flush needed
#[inline]
pub fn should_flush(&self) -> bool {
self.size() >= self.config.max_size_bytes
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_memtable() {
let config = MemtableConfig::default();
let memtable = ShardedMemtable::new(config);
assert_eq!(memtable.shard_count, 32);
assert_eq!(memtable.shards.len(), 32);
assert_eq!(memtable.shard_sizes.len(), 32);
assert_eq!(memtable.size(), 0);
}
#[test]
fn test_shard_index_distribution() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
let mut counts = vec![0; 32];
for i in 0..10_000 {
let key = format!("key{}", i);
let idx = memtable.shard_index(key.as_bytes());
counts[idx] += 1;
}
// Verify reasonable distribution (within 2x of mean)
let mean = 10_000 / 32;
for count in counts {
assert!(count > mean / 2 && count < mean * 2,
"Poor distribution: {} (mean: {})", count, mean);
}
}
}

Acceptance Criteria:

  • ShardedMemtable struct compiles
  • Constructor works
  • Shard index calculation works
  • Hash distribution test passes

Task 1.4: Insert Operation (1 hour)

Add to: heliosdb-storage/src/memtable/sharded.rs

Implementation:

use crate::error::{Result, MemtableError};
impl ShardedMemtable {
/// Insert key-value pair
pub fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
let shard_idx = self.shard_index(&key);
let entry_size = key.len() + value.len();
// Acquire write lock for target shard only
let mut shard = self.shards[shard_idx]
.write()
.map_err(|e| MemtableError::LockPoisoned(format!("Shard {} poisoned: {}", shard_idx, e)))?;
// Insert and track size change
let size_delta = if let Some(old_value) = shard.insert(key, value) {
// Replaced existing key
entry_size as isize - old_value.len() as isize
} else {
// New key
entry_size as isize
};
// Update atomic counters
if size_delta > 0 {
self.size_bytes.fetch_add(size_delta as usize, Ordering::Relaxed);
self.shard_sizes[shard_idx].fetch_add(size_delta as usize, Ordering::Relaxed);
} else if size_delta < 0 {
self.size_bytes.fetch_sub((-size_delta) as usize, Ordering::Relaxed);
self.shard_sizes[shard_idx].fetch_sub((-size_delta) as usize, Ordering::Relaxed);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_insert() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
memtable.insert(b"key1".to_vec(), b"value1".to_vec()).unwrap();
assert_eq!(memtable.size(), 10); // 4 + 6 bytes
}
#[test]
fn test_insert_update() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
memtable.insert(b"key1".to_vec(), b"value1".to_vec()).unwrap();
assert_eq!(memtable.size(), 10);
// Update with longer value
memtable.insert(b"key1".to_vec(), b"longer_value".to_vec()).unwrap();
assert_eq!(memtable.size(), 16); // 4 + 12 bytes
}
#[test]
fn test_insert_multiple() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
for i in 0..100 {
let key = format!("key{}", i);
memtable.insert(key.into_bytes(), b"value".to_vec()).unwrap();
}
// 100 keys, each ~8 bytes (key4..key99) + 5 bytes (value)
let expected_size = 100 * (7 + 5); // Approximate
assert!((memtable.size() as isize - expected_size as isize).abs() < 200,
"Size mismatch: {} vs {}", memtable.size(), expected_size);
}
}

Acceptance Criteria:

  • Insert operation compiles
  • Single insert works
  • Update works correctly
  • Size tracking accurate
  • Tests pass

Task 1.5: Get and Remove Operations (1 hour)

Add to: heliosdb-storage/src/memtable/sharded.rs

Implementation:

impl ShardedMemtable {
/// Get value for key
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
let shard_idx = self.shard_index(key);
// Acquire read lock (allows concurrent readers)
let shard = self.shards[shard_idx].read().ok()?;
shard.get(key).cloned()
}
/// Remove key
pub fn remove(&self, key: &[u8]) -> Option<Vec<u8>> {
let shard_idx = self.shard_index(key);
let mut shard = self.shards[shard_idx].write().ok()?;
if let Some(value) = shard.remove(key) {
let size_delta = key.len() + value.len();
self.size_bytes.fetch_sub(size_delta, Ordering::Relaxed);
self.shard_sizes[shard_idx].fetch_sub(size_delta, Ordering::Relaxed);
Some(value)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_get() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
memtable.insert(b"key1".to_vec(), b"value1".to_vec()).unwrap();
assert_eq!(memtable.get(b"key1"), Some(b"value1".to_vec()));
assert_eq!(memtable.get(b"key2"), None);
}
#[test]
fn test_remove() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
memtable.insert(b"key1".to_vec(), b"value1".to_vec()).unwrap();
assert_eq!(memtable.size(), 10);
assert_eq!(memtable.remove(b"key1"), Some(b"value1".to_vec()));
assert_eq!(memtable.size(), 0);
assert_eq!(memtable.get(b"key1"), None);
}
#[test]
fn test_remove_nonexistent() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
assert_eq!(memtable.remove(b"key1"), None);
assert_eq!(memtable.size(), 0);
}
}

Acceptance Criteria:

  • Get operation works
  • Remove operation works
  • Size tracking correct after remove
  • Tests pass

Day 1 - Afternoon Session (4 hours)

Task 1.6: Memtable Trait Definition (1 hour)

File: heliosdb-storage/src/memtable/mod.rs

Implementation:

use std::path::Path;
use crate::error::Result;
pub mod config;
pub mod sharded;
pub mod metrics;
pub use config::MemtableConfig;
pub use sharded::ShardedMemtable;
/// Abstract memtable interface for backward compatibility
pub trait Memtable: Send + Sync {
/// Insert key-value pair
fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()>;
/// Get value for key
fn get(&self, key: &[u8]) -> Option<Vec<u8>>;
/// Remove key
fn remove(&self, key: &[u8]) -> Option<Vec<u8>>;
/// Range scan
fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>>;
/// Total size in bytes
fn size(&self) -> usize;
/// Check if flush needed
fn should_flush(&self) -> bool;
/// Flush to SSTable
fn flush_to_sstable(&self, path: &Path) -> Result<()>;
}

Acceptance Criteria:

  • Trait defined
  • Compiles without errors
  • Methods match requirements

Task 1.7: Implement Trait for ShardedMemtable (1 hour)

Add to: heliosdb-storage/src/memtable/sharded.rs

Implementation:

use crate::memtable::Memtable;
use std::path::Path;
impl Memtable for ShardedMemtable {
fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
ShardedMemtable::insert(self, key, value)
}
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
ShardedMemtable::get(self, key)
}
fn remove(&self, key: &[u8]) -> Option<Vec<u8>> {
ShardedMemtable::remove(self, key)
}
fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
// Placeholder for Phase 2
todo!("Implement in Phase 2")
}
fn size(&self) -> usize {
ShardedMemtable::size(self)
}
fn should_flush(&self) -> bool {
ShardedMemtable::should_flush(self)
}
fn flush_to_sstable(&self, path: &Path) -> Result<()> {
// Placeholder for Phase 3
todo!("Implement in Phase 3")
}
}

Acceptance Criteria:

  • Trait implemented
  • Basic methods work
  • Placeholders for scan/flush

Task 1.8: Concurrent Write Tests (1 hour)

File: heliosdb-storage/src/memtable/sharded.rs (add to tests module)

Implementation:

#[cfg(test)]
mod concurrent_tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_concurrent_writes() {
let memtable = Arc::new(ShardedMemtable::new(MemtableConfig::default()));
let num_threads = 32;
let num_ops = 1_000;
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let memtable = Arc::clone(&memtable);
thread::spawn(move || {
for i in 0..num_ops {
let key = format!("thread{}_key{}", thread_id, i);
let value = format!("value{}", i);
memtable.insert(key.into_bytes(), value.into_bytes()).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
// Verify all writes succeeded
let total_ops = num_threads * num_ops;
// Each operation is ~15 bytes (thread0_key0 = ~13 bytes + value0 = ~6 bytes)
assert!(memtable.size() > total_ops * 10, "Size too small: {}", memtable.size());
}
#[test]
fn test_concurrent_read_write() {
let memtable = Arc::new(ShardedMemtable::new(MemtableConfig::default()));
// Pre-populate
for i in 0..1000 {
memtable.insert(format!("key{}", i).into_bytes(), b"value".to_vec()).unwrap();
}
let writers: Vec<_> = (0..8)
.map(|_| {
let memtable = Arc::clone(&memtable);
thread::spawn(move || {
for i in 0..1000 {
memtable.insert(format!("wkey{}", i).into_bytes(), b"value".to_vec()).unwrap();
}
})
})
.collect();
let readers: Vec<_> = (0..8)
.map(|_| {
let memtable = Arc::clone(&memtable);
thread::spawn(move || {
for i in 0..1000 {
let _ = memtable.get(format!("key{}", i % 1000).as_bytes());
}
})
})
.collect();
for handle in writers.into_iter().chain(readers.into_iter()) {
handle.join().unwrap();
}
}
}

Acceptance Criteria:

  • Concurrent write test passes
  • Concurrent read-write test passes
  • No data races or panics

Task 1.9: Basic Benchmark (30 minutes)

File: heliosdb-storage/benches/sharded_memtable_bench.rs

Implementation:

use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use heliosdb_storage::memtable::{ShardedMemtable, MemtableConfig};
use std::sync::Arc;
use std::thread;
fn bench_single_threaded(c: &mut Criterion) {
let mut group = c.benchmark_group("sharded_memtable");
group.throughput(Throughput::Elements(10_000));
group.bench_function("insert_single_thread", |b| {
let memtable = ShardedMemtable::new(MemtableConfig::default());
let mut i = 0;
b.iter(|| {
let key = format!("key{}", i);
memtable.insert(key.into_bytes(), b"value".to_vec()).unwrap();
i += 1;
});
});
group.finish();
}
fn bench_multi_threaded(c: &mut Criterion) {
let mut group = c.benchmark_group("sharded_memtable");
group.throughput(Throughput::Elements(64_000));
group.bench_function("insert_64_threads", |b| {
b.iter(|| {
let memtable = Arc::new(ShardedMemtable::new(MemtableConfig::default()));
let handles: Vec<_> = (0..64)
.map(|thread_id| {
let memtable = Arc::clone(&memtable);
thread::spawn(move || {
for i in 0..1_000 {
let key = format!("thread{}_key{}", thread_id, i);
memtable.insert(key.into_bytes(), b"value".to_vec()).unwrap();
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
});
});
group.finish();
}
criterion_group!(benches, bench_single_threaded, bench_multi_threaded);
criterion_main!(benches);

Acceptance Criteria:

  • Benchmark compiles
  • Single-threaded throughput ≥ 8M ops/sec
  • Multi-threaded shows improvement over baseline

Day 1 - End of Day Checkpoint

Deliverables:

  • ShardedMemtable struct implemented
  • Insert, get, remove operations working
  • Memtable trait defined
  • Concurrent tests passing
  • Basic benchmarks running

Review Points:

  1. Code review: Structure, error handling, tests
  2. Performance check: Single-threaded within 10% of baseline?
  3. Concurrent tests: No deadlocks or data races?

Go/No-Go Decision:

  • GO: If all tests pass and single-threaded performance within 20% of baseline
  • NO-GO: If correctness issues or >30% performance regression → Investigate root cause

Phase 2: Range Scan Implementation (Day 2 Morning)

Task 2.1: K-Way Merge Algorithm (2 hours)

Add to: heliosdb-storage/src/memtable/sharded.rs

Implementation:

use std::collections::BinaryHeap;
use std::cmp::Reverse;
use rayon::prelude::*;
#[derive(Eq, PartialEq)]
struct HeapEntry {
key: Vec<u8>,
value: Vec<u8>,
shard_idx: usize,
pos: usize,
}
impl Ord for HeapEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.key.cmp(&other.key)
}
}
impl PartialOrd for HeapEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl ShardedMemtable {
/// Range scan with parallel k-way merge
pub fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
// Phase 1: Parallel shard scans
let shard_results: Vec<Vec<(Vec<u8>, Vec<u8>)>> = self.shards
.par_iter()
.map(|shard| {
let guard = shard.read()
.map_err(|e| MemtableError::LockPoisoned(e.to_string()))?;
let results: Vec<_> = guard
.range(start.to_vec()..end.to_vec())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
Ok(results)
})
.collect::<Result<Vec<_>>>()?;
// Phase 2: K-way merge
let mut heap: BinaryHeap<Reverse<HeapEntry>> = BinaryHeap::new();
let mut merged = Vec::new();
// Initialize heap
for (shard_idx, results) in shard_results.iter().enumerate() {
if let Some((key, value)) = results.first() {
heap.push(Reverse(HeapEntry {
key: key.clone(),
value: value.clone(),
shard_idx,
pos: 0,
}));
}
}
// Merge
while let Some(Reverse(entry)) = heap.pop() {
merged.push((entry.key.clone(), entry.value.clone()));
let next_pos = entry.pos + 1;
if next_pos < shard_results[entry.shard_idx].len() {
let (key, value) = &shard_results[entry.shard_idx][next_pos];
heap.push(Reverse(HeapEntry {
key: key.clone(),
value: value.clone(),
shard_idx: entry.shard_idx,
pos: next_pos,
}));
}
}
Ok(merged)
}
}
#[cfg(test)]
mod scan_tests {
use super::*;
#[test]
fn test_scan_ordering() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
// Insert out of order
memtable.insert(b"key3".to_vec(), b"value3".to_vec()).unwrap();
memtable.insert(b"key1".to_vec(), b"value1".to_vec()).unwrap();
memtable.insert(b"key5".to_vec(), b"value5".to_vec()).unwrap();
memtable.insert(b"key2".to_vec(), b"value2".to_vec()).unwrap();
memtable.insert(b"key4".to_vec(), b"value4".to_vec()).unwrap();
let results = memtable.scan(b"key0", b"key9").unwrap();
// Verify sorted
assert_eq!(results.len(), 5);
assert_eq!(results[0].0, b"key1");
assert_eq!(results[1].0, b"key2");
assert_eq!(results[2].0, b"key3");
assert_eq!(results[3].0, b"key4");
assert_eq!(results[4].0, b"key5");
}
#[test]
fn test_scan_range() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
for i in 0..100 {
memtable.insert(format!("key{:03}", i).into_bytes(), b"value".to_vec()).unwrap();
}
let results = memtable.scan(b"key020", b"key030").unwrap();
assert_eq!(results.len(), 10); // key020..key029
assert_eq!(results[0].0, b"key020");
assert_eq!(results[9].0, b"key029");
}
#[test]
fn test_scan_empty_range() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
memtable.insert(b"key1".to_vec(), b"value1".to_vec()).unwrap();
memtable.insert(b"key5".to_vec(), b"value5".to_vec()).unwrap();
// Range with no matching keys
let results = memtable.scan(b"key2", b"key4").unwrap();
assert_eq!(results.len(), 0);
}
#[test]
fn test_scan_large() {
let memtable = ShardedMemtable::new(MemtableConfig::default());
// Insert 10K keys
for i in 0..10_000 {
memtable.insert(format!("key{:06}", i).into_bytes(), b"value".to_vec()).unwrap();
}
let results = memtable.scan(b"key000000", b"key010000").unwrap();
assert_eq!(results.len(), 10_000);
// Verify ordering
for i in 0..results.len() - 1 {
assert!(results[i].0 < results[i + 1].0, "Not sorted at index {}", i);
}
}
}

Acceptance Criteria:

  • Scan returns sorted results
  • Range filtering works correctly
  • Empty ranges handled
  • Large scans (10K keys) work
  • All tests pass

Task 2.2: Scan Benchmark (1 hour)

Add to: heliosdb-storage/benches/sharded_memtable_bench.rs

Implementation:

fn bench_scan(c: &mut Criterion) {
let mut group = c.benchmark_group("sharded_scan");
for size in [10, 100, 1_000, 10_000].iter() {
group.bench_function(format!("scan_{}_keys", size), |b| {
let memtable = ShardedMemtable::new(MemtableConfig::default());
// Pre-populate
for i in 0..*size {
memtable.insert(format!("key{:06}", i).into_bytes(), b"value".to_vec()).unwrap();
}
b.iter(|| {
let results = memtable.scan(b"key000000", b"key999999").unwrap();
black_box(results);
});
});
}
group.finish();
}

Acceptance Criteria:

  • Benchmark runs successfully
  • Performance reasonable (< 2x slower than baseline for large scans)

Phase 3: Flush Implementation (Day 2 Afternoon)

Task 3.1: Flush to SSTable (2 hours)

Add to: heliosdb-storage/src/memtable/sharded.rs

Implementation:

use std::path::Path;
use crate::sstable::SSTableWriter;
impl ShardedMemtable {
/// Flush to SSTable with parallel snapshot
pub fn flush_to_sstable(&self, path: &Path) -> Result<()> {
// Phase 1: Parallel snapshot
let snapshots: Vec<_> = self.shards
.par_iter()
.map(|shard| {
let mut guard = shard.write()
.map_err(|e| MemtableError::LockPoisoned(e.to_string()))?;
// Swap with empty map (atomic operation)
let snapshot = std::mem::replace(&mut *guard, BTreeMap::new());
Ok(snapshot)
})
.collect::<Result<Vec<_>>>()?;
// Reset counters
self.size_bytes.store(0, Ordering::Relaxed);
for size in &self.shard_sizes {
size.store(0, Ordering::Relaxed);
}
// Phase 2: K-way merge to SSTable
let mut writer = SSTableWriter::new(path)?;
let mut heap: BinaryHeap<Reverse<MergeEntry>> = BinaryHeap::new();
let mut iters: Vec<_> = snapshots
.into_iter()
.map(|map| map.into_iter())
.collect();
// Initialize heap
for (shard_idx, iter) in iters.iter_mut().enumerate() {
if let Some((key, value)) = iter.next() {
heap.push(Reverse(MergeEntry {
key,
value,
shard_idx,
}));
}
}
// Merge and write
while let Some(Reverse(entry)) = heap.pop() {
writer.append(&entry.key, &entry.value)?;
if let Some((key, value)) = iters[entry.shard_idx].next() {
heap.push(Reverse(MergeEntry {
key,
value,
shard_idx: entry.shard_idx,
}));
}
}
writer.finish()?;
Ok(())
}
}
#[derive(Eq, PartialEq)]
struct MergeEntry {
key: Vec<u8>,
value: Vec<u8>,
shard_idx: usize,
}
impl Ord for MergeEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.key.cmp(&other.key)
}
}
impl PartialOrd for MergeEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

Acceptance Criteria:

  • Flush completes without errors
  • SSTable is correctly sorted
  • Memtable cleared after flush
  • Size counters reset

Task 3.2: LSM Integration (1.5 hours)

Update: heliosdb-storage/src/lsm.rs

Implementation:

use crate::memtable::{Memtable, ShardedMemtable, MemtableConfig};
pub struct LSMTree {
memtable: Arc<dyn Memtable>,
// ... other fields
}
impl LSMTree {
pub fn new_with_sharding(config: LSMConfig) -> Self {
let memtable = Arc::new(ShardedMemtable::new(
config.memtable_config
)) as Arc<dyn Memtable>;
Self {
memtable,
// ... initialize other fields
}
}
// Existing methods use trait, no changes needed!
pub fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
self.memtable.insert(key, value)
}
pub fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
self.memtable.get(key)
}
}

Acceptance Criteria:

  • LSMTree compiles with sharded memtable
  • Existing tests pass
  • Feature flag for easy toggle

Phase 4: Advanced Features (Day 3)

Task 4.1: Bloom Filters (2 hours)

Implementation: Add bloom filter support (skip if time-constrained, can be Phase 5)

Task 4.2: Metrics & Monitoring (2 hours)

File: heliosdb-storage/src/memtable/metrics.rs

Implementation: Shard health metrics, Prometheus export

Task 4.3: Performance Tuning (2 hours)

  • Run comprehensive benchmarks
  • Compare with baseline
  • Tune shard count if needed

Acceptance & Sign-Off

Final Checklist

Functionality:

  • All operations implemented (insert, get, remove, scan, flush)
  • Memtable trait working
  • LSM integration complete
  • All unit tests passing
  • All integration tests passing

Performance:

  • Single-threaded: Within 10% of baseline
  • Multi-threaded (64 threads): ≥3x improvement
  • Scan performance: Acceptable overhead (<50%)

Code Quality:

  • Code reviewed
  • Documentation complete
  • No clippy warnings
  • Formatted with rustfmt

Operational:

  • Metrics exportable
  • Monitoring dashboard ready
  • Feature flag implemented
  • Rollback plan documented

Document Status: Ready for Implementation Owner: Phase 2 Implementation Team Timeline: 4 days