Skip to content

Data Integrity Test Suite

Data Integrity Test Suite

Overview

This document defines comprehensive tests for verifying data integrity in HeliosDB’s LSM-tree storage engine, including compaction, tombstone handling, gc_grace_seconds, and multi-version concurrency control (MVCC).

Test Categories

1. LSM-Tree Write Path Tests

2. Compaction Correctness Tests

3. Tombstone and Deletion Tests

4. gc_grace_seconds Tests

5. MVCC and Version Management Tests

6. HCC (Hybrid Columnar Compression) Tests

1. LSM-Tree Write Path Tests

tests/data_integrity/lsm_write_path_test.rs
use heliosdb_storage::lsm::*;
use std::time::Duration;
#[tokio::test]
async fn test_write_commit_log_durability() {
// LSM-001: Writes are durable after commit log write
let engine = LsmStorageEngine::new_test().await;
// Write data
for i in 0..100 {
engine.write(
format!("key_{}", i).into_bytes(),
format!("value_{}", i).into_bytes()
).await.unwrap();
}
// Force crash before memtable flush
let engine_path = engine.path().clone();
drop(engine);
// Restart engine
let engine = LsmStorageEngine::open(engine_path).await.unwrap();
// Verify all writes recovered from commit log
for i in 0..100 {
let key = format!("key_{}", i).into_bytes();
let value = engine.read(&key).await.unwrap();
assert_eq!(
value,
Some(format!("value_{}", i).into_bytes()),
"Key key_{} not recovered from commit log",
i
);
}
}
#[tokio::test]
async fn test_memtable_flush() {
// LSM-002: Memtable flushes to SSTable correctly
let mut config = LsmConfig::default();
config.memtable_size_threshold = 1024 * 1024; // 1 MB
let engine = LsmStorageEngine::with_config(config).await;
// Write enough data to trigger flush
let value = vec![0u8; 1024]; // 1 KB per value
for i in 0..2000 {
engine.write(
format!("key_{:06}", i).into_bytes(),
value.clone()
).await.unwrap();
}
// Wait for flush
tokio::time::sleep(Duration::from_secs(2)).await;
// Verify SSTable was created
let sstables = engine.list_sstables().await;
assert!(sstables.len() > 0, "No SSTables created");
// Verify data is readable
for i in 0..2000 {
let key = format!("key_{:06}", i).into_bytes();
let result = engine.read(&key).await.unwrap();
assert!(result.is_some(), "Key key_{} missing after flush", i);
}
}
#[tokio::test]
async fn test_bloom_filter_effectiveness() {
// LSM-003: Bloom filters reduce unnecessary SSTable reads
let engine = LsmStorageEngine::new_test().await;
// Insert 10,000 keys and flush
for i in 0..10_000 {
engine.write(
format!("exists_{}", i).into_bytes(),
b"value".to_vec()
).await.unwrap();
}
engine.flush_memtable().await.unwrap();
// Query non-existent keys
let mut false_positive_count = 0;
let mut bloom_filter_hits = 0;
for i in 0..10_000 {
let key = format!("nonexistent_{}", i).into_bytes();
let metrics = engine.read_with_metrics(&key).await;
if metrics.bloom_filter_rejected {
bloom_filter_hits += 1;
}
if metrics.sstable_reads > 0 && metrics.result.is_none() {
false_positive_count += 1;
}
}
// Bloom filter should reject most queries
let rejection_rate = bloom_filter_hits as f64 / 10_000.0;
println!("Bloom filter rejection rate: {:.2}%", rejection_rate * 100.0);
assert!(rejection_rate > 0.95, "Bloom filter rejection rate too low");
// False positive rate should be low (< 2%)
let fp_rate = false_positive_count as f64 / 10_000.0;
println!("False positive rate: {:.2}%", fp_rate * 100.0);
assert!(fp_rate < 0.02, "Bloom filter false positive rate too high");
}
#[tokio::test]
async fn test_write_amplification() {
// LSM-004: Write amplification is within acceptable bounds
let engine = LsmStorageEngine::new_test().await;
// Write 100 MB of data
let value = vec![0u8; 1024]; // 1 KB
for i in 0..100_000 {
engine.write(
format!("key_{}", i).into_bytes(),
value.clone()
).await.unwrap();
}
// Trigger compaction
engine.compact_all().await.unwrap();
let metrics = engine.get_metrics().await;
let write_amp = metrics.total_bytes_written as f64 / metrics.user_bytes_written as f64;
println!("Write amplification: {:.2}x", write_amp);
// Write amplification should be < 10x for LSM with good compaction
assert!(write_amp < 10.0, "Write amplification too high: {:.2}x", write_amp);
}

2. Compaction Correctness Tests

tests/data_integrity/compaction_test.rs
use heliosdb_storage::lsm::compaction::*;
#[tokio::test]
async fn test_size_tiered_compaction() {
// COMPACT-001: Size-tiered compaction merges similar-sized SSTables
let mut config = LsmConfig::default();
config.compaction_strategy = CompactionStrategy::SizeTiered {
min_sstables: 4,
size_ratio: 2.0,
};
let engine = LsmStorageEngine::with_config(config).await;
// Create multiple SSTables of similar size
for batch in 0..8 {
for i in 0..1000 {
engine.write(
format!("key_{}_{}", batch, i).into_bytes(),
b"value".to_vec()
).await.unwrap();
}
engine.flush_memtable().await.unwrap();
}
let initial_count = engine.list_sstables().await.len();
assert_eq!(initial_count, 8);
// Trigger compaction
engine.compact().await.unwrap();
// Should have fewer SSTables
let final_count = engine.list_sstables().await.len();
assert!(final_count < initial_count, "Compaction did not reduce SSTable count");
// Verify all data still readable
for batch in 0..8 {
for i in 0..1000 {
let key = format!("key_{}_{}", batch, i).into_bytes();
assert!(engine.read(&key).await.unwrap().is_some());
}
}
}
#[tokio::test]
async fn test_leveled_compaction() {
// COMPACT-002: Leveled compaction maintains sorted runs
let mut config = LsmConfig::default();
config.compaction_strategy = CompactionStrategy::Leveled {
level_size_multiplier: 10,
max_level_0_files: 4,
};
let engine = LsmStorageEngine::with_config(config).await;
// Write data with overlapping key ranges
for round in 0..10 {
for i in 0..1000 {
engine.write(
format!("key_{:06}", i).into_bytes(),
format!("value_round_{}", round).into_bytes()
).await.unwrap();
}
engine.flush_memtable().await.unwrap();
}
// Trigger compaction
engine.compact_all().await.unwrap();
// Verify data organized into levels
let levels = engine.get_level_info().await;
println!("Level structure: {:?}", levels);
// Level 0 should have few files
assert!(levels[0].file_count <= 4, "Too many L0 files");
// Each level should be larger than previous
for i in 1..levels.len() {
if levels[i].file_count > 0 {
assert!(
levels[i].total_size >= levels[i-1].total_size,
"Level {} smaller than level {}",
i, i-1
);
}
}
// Verify latest values
for i in 0..1000 {
let key = format!("key_{:06}", i).into_bytes();
let value = engine.read(&key).await.unwrap().unwrap();
assert_eq!(value, b"value_round_9");
}
}
#[tokio::test]
async fn test_compaction_removes_old_versions() {
// COMPACT-003: Compaction removes old versions of keys
let engine = LsmStorageEngine::new_test().await;
// Write multiple versions
for version in 0..10 {
for i in 0..100 {
engine.write_with_timestamp(
format!("key_{}", i).into_bytes(),
format!("value_v{}", version).into_bytes(),
version * 1000
).await.unwrap();
}
engine.flush_memtable().await.unwrap();
}
let size_before = engine.total_size().await;
// Compact all
engine.compact_all().await.unwrap();
let size_after = engine.total_size().await;
// Size should be much smaller (old versions removed)
let reduction = (size_before - size_after) as f64 / size_before as f64;
println!("Size reduction: {:.2}%", reduction * 100.0);
assert!(reduction > 0.8, "Compaction did not remove old versions");
// Verify latest versions still accessible
for i in 0..100 {
let key = format!("key_{}", i).into_bytes();
let value = engine.read(&key).await.unwrap().unwrap();
assert_eq!(value, b"value_v9");
}
}

3. Tombstone and Deletion Tests

tests/data_integrity/tombstone_test.rs
use heliosdb_storage::lsm::tombstone::*;
#[tokio::test]
async fn test_delete_creates_tombstone() {
// TOMB-001: DELETE operation creates tombstone marker
let engine = LsmStorageEngine::new_test().await;
// Write and delete
engine.write(b"key1", b"value1").await.unwrap();
engine.delete(b"key1").await.unwrap();
// Read should return None
let result = engine.read(b"key1").await.unwrap();
assert_eq!(result, None);
// Tombstone should exist in memtable
assert!(engine.tombstone_exists(b"key1").await);
}
#[tokio::test]
async fn test_tombstone_shadows_older_writes() {
// TOMB-002: Tombstone shadows all older versions
let engine = LsmStorageEngine::new_test().await;
// Write v1 and flush
engine.write_with_timestamp(b"key", b"v1", 100).await.unwrap();
engine.flush_memtable().await.unwrap();
// Write v2 and flush
engine.write_with_timestamp(b"key", b"v2", 200).await.unwrap();
engine.flush_memtable().await.unwrap();
// Delete (tombstone at t=300)
engine.delete_with_timestamp(b"key", 300).await.unwrap();
// Read should return None (tombstone shadows all older versions)
let result = engine.read(b"key").await.unwrap();
assert_eq!(result, None);
// Even if we have old versions in SSTables
let internal_versions = engine.get_all_versions(b"key").await;
assert_eq!(internal_versions.len(), 3); // v1, v2, tombstone
let latest = &internal_versions[0];
assert!(latest.is_tombstone);
}
#[tokio::test]
async fn test_multiple_deletes() {
// TOMB-003: Multiple deletes handled correctly
let engine = LsmStorageEngine::new_test().await;
engine.write(b"key", b"value").await.unwrap();
engine.delete(b"key").await.unwrap();
engine.delete(b"key").await.unwrap(); // Delete again
let result = engine.read(b"key").await.unwrap();
assert_eq!(result, None);
// Should have tombstones but not error
assert!(engine.tombstone_exists(b"key").await);
}
#[tokio::test]
async fn test_write_after_delete() {
// TOMB-004: Write after delete resurrects key
let engine = LsmStorageEngine::new_test().await;
// Write, delete, write
engine.write_with_timestamp(b"key", b"v1", 100).await.unwrap();
engine.delete_with_timestamp(b"key", 200).await.unwrap();
engine.write_with_timestamp(b"key", b"v2", 300).await.unwrap();
// Should return latest write
let result = engine.read(b"key").await.unwrap();
assert_eq!(result, Some(b"v2".to_vec()));
}

4. gc_grace_seconds Tests

tests/data_integrity/gc_grace_test.rs
use heliosdb_storage::lsm::gc::*;
use std::time::Duration;
#[tokio::test]
async fn test_tombstone_retained_during_grace_period() {
// GC-001: Tombstones retained during gc_grace_seconds
let mut config = LsmConfig::default();
config.gc_grace_seconds = 10; // 10 seconds
let engine = LsmStorageEngine::with_config(config).await;
// Write and delete
engine.write(b"key1", b"value1").await.unwrap();
engine.delete(b"key1").await.unwrap();
engine.flush_memtable().await.unwrap();
// Compact immediately (within grace period)
engine.compact_all().await.unwrap();
// Tombstone should still exist
assert!(
engine.tombstone_exists_in_sstables(b"key1").await,
"Tombstone removed before gc_grace_seconds elapsed"
);
}
#[tokio::test]
async fn test_tombstone_gc_after_grace_period() {
// GC-002: Tombstones garbage collected after gc_grace_seconds
let mut config = LsmConfig::default();
config.gc_grace_seconds = 2; // 2 seconds for faster testing
let engine = LsmStorageEngine::with_config(config).await;
// Write and delete
engine.write(b"key1", b"value1").await.unwrap();
engine.delete(b"key1").await.unwrap();
engine.flush_memtable().await.unwrap();
// Wait past gc_grace_seconds
tokio::time::sleep(Duration::from_secs(3)).await;
// Compact
engine.compact_all().await.unwrap();
// Tombstone should be removed
assert!(
!engine.tombstone_exists_in_sstables(b"key1").await,
"Tombstone not garbage collected after gc_grace_seconds"
);
}
#[tokio::test]
async fn test_delete_consistency_with_gc_grace() {
// GC-003: gc_grace_seconds prevents zombie records in distributed system
let mut config = LsmConfig::default();
config.gc_grace_seconds = 30;
// Simulate distributed scenario
let primary = LsmStorageEngine::with_config(config.clone()).await;
let mirror = LsmStorageEngine::with_config(config.clone()).await;
// Write to both
primary.write(b"key", b"value").await.unwrap();
mirror.write(b"key", b"value").await.unwrap();
// Delete on primary
primary.delete(b"key").await.unwrap();
primary.flush_memtable().await.unwrap();
// Mirror is temporarily offline (misses delete)
// Simulate by not replicating delete
// Primary compacts (but keeps tombstone due to gc_grace)
primary.compact_all().await.unwrap();
assert!(primary.tombstone_exists_in_sstables(b"key").await);
// Mirror comes back online and receives tombstone
mirror.delete(b"key").await.unwrap();
mirror.flush_memtable().await.unwrap();
// Both should return None
assert_eq!(primary.read(b"key").await.unwrap(), None);
assert_eq!(mirror.read(b"key").await.unwrap(), None);
// Wait past gc_grace
tokio::time::sleep(Duration::from_secs(31)).await;
// Now both can safely remove tombstone
primary.compact_all().await.unwrap();
mirror.compact_all().await.unwrap();
assert!(!primary.tombstone_exists_in_sstables(b"key").await);
assert!(!mirror.tombstone_exists_in_sstables(b"key").await);
}
#[tokio::test]
async fn test_gc_grace_configurable_per_table() {
// GC-004: gc_grace_seconds configurable per table
let engine = LsmStorageEngine::new_test().await;
// Table with short gc_grace (archival data)
engine.create_table("archive", TableConfig {
gc_grace_seconds: 1,
..Default::default()
}).await.unwrap();
// Table with long gc_grace (critical data)
engine.create_table("critical", TableConfig {
gc_grace_seconds: 864000, // 10 days
..Default::default()
}).await.unwrap();
// Delete from both
engine.write_to_table("archive", b"key1", b"val").await.unwrap();
engine.delete_from_table("archive", b"key1").await.unwrap();
engine.write_to_table("critical", b"key2", b"val").await.unwrap();
engine.delete_from_table("critical", b"key2").await.unwrap();
engine.flush_all().await.unwrap();
// Wait 2 seconds
tokio::time::sleep(Duration::from_secs(2)).await;
engine.compact_all().await.unwrap();
// Archive tombstone should be GC'd
assert!(!engine.tombstone_exists_in_table("archive", b"key1").await);
// Critical tombstone should be retained
assert!(engine.tombstone_exists_in_table("critical", b"key2").await);
}

5. MVCC and Version Management Tests

tests/data_integrity/mvcc_test.rs
use heliosdb_storage::lsm::mvcc::*;
#[tokio::test]
async fn test_timestamp_ordering() {
// MVCC-001: Timestamps determine value visibility
let engine = LsmStorageEngine::new_test().await;
// Write versions with explicit timestamps
engine.write_with_timestamp(b"key", b"v1", 1000).await.unwrap();
engine.write_with_timestamp(b"key", b"v2", 2000).await.unwrap();
engine.write_with_timestamp(b"key", b"v3", 3000).await.unwrap();
// Read at different timestamps
let v1 = engine.read_at_timestamp(b"key", 1500).await.unwrap();
let v2 = engine.read_at_timestamp(b"key", 2500).await.unwrap();
let v3 = engine.read_at_timestamp(b"key", 3500).await.unwrap();
assert_eq!(v1, Some(b"v1".to_vec()));
assert_eq!(v2, Some(b"v2".to_vec()));
assert_eq!(v3, Some(b"v3".to_vec()));
}
#[tokio::test]
async fn test_snapshot_isolation() {
// MVCC-002: Snapshot isolation for consistent reads
let engine = LsmStorageEngine::new_test().await;
// Insert initial data
for i in 0..100 {
engine.write_with_timestamp(
format!("key_{}", i).into_bytes(),
b"v1".to_vec(),
1000
).await.unwrap();
}
// Create snapshot
let snapshot = engine.create_snapshot(1500).await;
// Update data after snapshot
for i in 0..100 {
engine.write_with_timestamp(
format!("key_{}", i).into_bytes(),
b"v2".to_vec(),
2000
).await.unwrap();
}
// Reads from snapshot should see old values
for i in 0..100 {
let key = format!("key_{}", i).into_bytes();
let value = snapshot.read(&key).await.unwrap();
assert_eq!(value, Some(b"v1".to_vec()));
}
// Normal reads should see new values
for i in 0..100 {
let key = format!("key_{}", i).into_bytes();
let value = engine.read(&key).await.unwrap();
assert_eq!(value, Some(b"v2".to_vec()));
}
}
#[tokio::test]
async fn test_version_garbage_collection() {
// MVCC-003: Old versions removed when no longer needed
let engine = LsmStorageEngine::new_test().await;
// Write many versions
for v in 0..100 {
engine.write_with_timestamp(b"key", format!("v{}", v).into_bytes(), v * 100)
.await
.unwrap();
}
engine.flush_memtable().await.unwrap();
// Create snapshot at t=5000
let snapshot = engine.create_snapshot(5000).await;
// Compact (should keep versions needed by snapshot)
engine.compact_all().await.unwrap();
let versions = engine.get_all_versions(b"key").await;
// Should have versions from t=5000 onwards
assert!(versions.len() < 100, "Old versions not removed");
// Snapshot should still work
let value = snapshot.read(b"key").await.unwrap();
assert_eq!(value, Some(b"v50".to_vec()));
// Drop snapshot
drop(snapshot);
// Compact again
engine.compact_all().await.unwrap();
// Now should only have latest version
let versions = engine.get_all_versions(b"key").await;
assert_eq!(versions.len(), 1);
assert_eq!(versions[0].value, b"v99");
}

6. HCC (Hybrid Columnar Compression) Tests

tests/data_integrity/hcc_test.rs
use heliosdb_storage::hcc::*;
#[tokio::test]
async fn test_hcc_compression_ratio() {
// HCC-001: Verify high compression ratio for columnar data
let mut config = HccConfig::default();
config.compression_mode = HccMode::WarehouseOptimized;
config.compression_unit_size = 10_000; // 10k rows per CU
let storage = HccStorage::with_config(config).await;
// Insert low-cardinality data (good compression)
let statuses = vec!["active", "inactive", "pending"];
for i in 0..100_000 {
storage.insert(vec![
("id", Value::BigInt(i)),
("status", Value::VarChar(statuses[i as usize % 3].to_string())),
("value", Value::Integer(i % 100)),
]).await.unwrap();
}
let raw_size = storage.estimate_raw_size().await;
let compressed_size = storage.actual_size().await;
let ratio = raw_size as f64 / compressed_size as f64;
println!("HCC compression ratio: {:.2}x", ratio);
assert!(ratio > 6.0, "Compression ratio below 6x for warehouse mode");
}
#[tokio::test]
async fn test_hcc_column_projection() {
// HCC-002: Only decompress projected columns
let storage = HccStorage::new_test().await;
// Insert wide rows (20 columns)
for i in 0..10_000 {
let mut row = vec![("id", Value::BigInt(i))];
for col in 0..19 {
row.push((
&format!("col_{}", col),
Value::VarChar(format!("data_{}", i))
));
}
storage.insert(row).await.unwrap();
}
// Query with column projection (only 2 columns)
let metrics = storage.query_with_metrics(
"SELECT id, col_5 FROM table WHERE id < 1000",
&[]
).await.unwrap();
// Verify only 2 columns decompressed
assert_eq!(metrics.columns_decompressed, 2);
// Should scan much less data than full table scan
let full_scan_bytes = storage.estimate_scan_size(10_000, 20).await;
let projected_scan_bytes = metrics.bytes_decompressed;
let savings = 1.0 - (projected_scan_bytes as f64 / full_scan_bytes as f64);
println!("I/O savings from projection: {:.1}%", savings * 100.0);
assert!(savings > 0.8, "Column projection not effective");
}
#[tokio::test]
async fn test_hcc_update_overhead() {
// HCC-003: Updates on HCC data require CU-level rewrite
let storage = HccStorage::new_test().await;
// Insert data
for i in 0..10_000 {
storage.insert(vec![
("id", Value::BigInt(i)),
("value", Value::Integer(i * 2)),
]).await.unwrap();
}
// Flush to HCC format
storage.flush_to_hcc().await.unwrap();
// Update single row
let start = std::time::Instant::now();
storage.execute("UPDATE table SET value = 999 WHERE id = 5000").await.unwrap();
let update_duration = start.elapsed();
println!("HCC update latency: {:?}", update_duration);
// Update should be slow (decompress + modify + recompress CU)
assert!(
update_duration > Duration::from_millis(10),
"HCC update unexpectedly fast"
);
// Verify update succeeded
let result = storage.query("SELECT value FROM table WHERE id = 5000", &[])
.await
.unwrap();
assert_eq!(result.rows[0].get::<i32>("value"), 999);
}

Test Execution

Terminal window
# Run all data integrity tests
cargo test --test data_integrity
# Run specific categories
cargo test --test lsm_write_path
cargo test --test compaction
cargo test --test tombstone
cargo test --test gc_grace
cargo test --test mvcc
cargo test --test hcc
# Run with detailed metrics
RUST_LOG=info cargo test --test data_integrity -- --nocapture --test-threads=1

Quality Metrics

Success Criteria

  • All writes recoverable from commit log
  • Compaction reduces size by >80% for multi-version data
  • Bloom filter false positive rate < 2%
  • Tombstones respect gc_grace_seconds
  • MVCC provides snapshot isolation
  • HCC compression ratio > 6x for warehouse mode

Performance Targets

  • Write amplification: < 10x
  • Compaction throughput: > 100 MB/s
  • HCC compression ratio: > 6x (warehouse), > 10x (archive)
  • Column projection I/O savings: > 80%