Data Integrity Test Suite
Data Integrity Test Suite
Overview
This document defines comprehensive tests for verifying data integrity in HeliosDB’s LSM-tree storage engine, including compaction, tombstone handling, gc_grace_seconds, and multi-version concurrency control (MVCC).
Test Categories
1. LSM-Tree Write Path Tests
2. Compaction Correctness Tests
3. Tombstone and Deletion Tests
4. gc_grace_seconds Tests
5. MVCC and Version Management Tests
6. HCC (Hybrid Columnar Compression) Tests
1. LSM-Tree Write Path Tests
use heliosdb_storage::lsm::*;use std::time::Duration;
#[tokio::test]async fn test_write_commit_log_durability() { // LSM-001: Writes are durable after commit log write let engine = LsmStorageEngine::new_test().await;
// Write data for i in 0..100 { engine.write( format!("key_{}", i).into_bytes(), format!("value_{}", i).into_bytes() ).await.unwrap(); }
// Force crash before memtable flush let engine_path = engine.path().clone(); drop(engine);
// Restart engine let engine = LsmStorageEngine::open(engine_path).await.unwrap();
// Verify all writes recovered from commit log for i in 0..100 { let key = format!("key_{}", i).into_bytes(); let value = engine.read(&key).await.unwrap(); assert_eq!( value, Some(format!("value_{}", i).into_bytes()), "Key key_{} not recovered from commit log", i ); }}
#[tokio::test]async fn test_memtable_flush() { // LSM-002: Memtable flushes to SSTable correctly let mut config = LsmConfig::default(); config.memtable_size_threshold = 1024 * 1024; // 1 MB
let engine = LsmStorageEngine::with_config(config).await;
// Write enough data to trigger flush let value = vec![0u8; 1024]; // 1 KB per value for i in 0..2000 { engine.write( format!("key_{:06}", i).into_bytes(), value.clone() ).await.unwrap(); }
// Wait for flush tokio::time::sleep(Duration::from_secs(2)).await;
// Verify SSTable was created let sstables = engine.list_sstables().await; assert!(sstables.len() > 0, "No SSTables created");
// Verify data is readable for i in 0..2000 { let key = format!("key_{:06}", i).into_bytes(); let result = engine.read(&key).await.unwrap(); assert!(result.is_some(), "Key key_{} missing after flush", i); }}
#[tokio::test]async fn test_bloom_filter_effectiveness() { // LSM-003: Bloom filters reduce unnecessary SSTable reads let engine = LsmStorageEngine::new_test().await;
// Insert 10,000 keys and flush for i in 0..10_000 { engine.write( format!("exists_{}", i).into_bytes(), b"value".to_vec() ).await.unwrap(); } engine.flush_memtable().await.unwrap();
// Query non-existent keys let mut false_positive_count = 0; let mut bloom_filter_hits = 0;
for i in 0..10_000 { let key = format!("nonexistent_{}", i).into_bytes(); let metrics = engine.read_with_metrics(&key).await;
if metrics.bloom_filter_rejected { bloom_filter_hits += 1; } if metrics.sstable_reads > 0 && metrics.result.is_none() { false_positive_count += 1; } }
// Bloom filter should reject most queries let rejection_rate = bloom_filter_hits as f64 / 10_000.0; println!("Bloom filter rejection rate: {:.2}%", rejection_rate * 100.0); assert!(rejection_rate > 0.95, "Bloom filter rejection rate too low");
// False positive rate should be low (< 2%) let fp_rate = false_positive_count as f64 / 10_000.0; println!("False positive rate: {:.2}%", fp_rate * 100.0); assert!(fp_rate < 0.02, "Bloom filter false positive rate too high");}
#[tokio::test]async fn test_write_amplification() { // LSM-004: Write amplification is within acceptable bounds let engine = LsmStorageEngine::new_test().await;
// Write 100 MB of data let value = vec![0u8; 1024]; // 1 KB for i in 0..100_000 { engine.write( format!("key_{}", i).into_bytes(), value.clone() ).await.unwrap(); }
// Trigger compaction engine.compact_all().await.unwrap();
let metrics = engine.get_metrics().await; let write_amp = metrics.total_bytes_written as f64 / metrics.user_bytes_written as f64;
println!("Write amplification: {:.2}x", write_amp);
// Write amplification should be < 10x for LSM with good compaction assert!(write_amp < 10.0, "Write amplification too high: {:.2}x", write_amp);}2. Compaction Correctness Tests
use heliosdb_storage::lsm::compaction::*;
#[tokio::test]async fn test_size_tiered_compaction() { // COMPACT-001: Size-tiered compaction merges similar-sized SSTables let mut config = LsmConfig::default(); config.compaction_strategy = CompactionStrategy::SizeTiered { min_sstables: 4, size_ratio: 2.0, };
let engine = LsmStorageEngine::with_config(config).await;
// Create multiple SSTables of similar size for batch in 0..8 { for i in 0..1000 { engine.write( format!("key_{}_{}", batch, i).into_bytes(), b"value".to_vec() ).await.unwrap(); } engine.flush_memtable().await.unwrap(); }
let initial_count = engine.list_sstables().await.len(); assert_eq!(initial_count, 8);
// Trigger compaction engine.compact().await.unwrap();
// Should have fewer SSTables let final_count = engine.list_sstables().await.len(); assert!(final_count < initial_count, "Compaction did not reduce SSTable count");
// Verify all data still readable for batch in 0..8 { for i in 0..1000 { let key = format!("key_{}_{}", batch, i).into_bytes(); assert!(engine.read(&key).await.unwrap().is_some()); } }}
#[tokio::test]async fn test_leveled_compaction() { // COMPACT-002: Leveled compaction maintains sorted runs let mut config = LsmConfig::default(); config.compaction_strategy = CompactionStrategy::Leveled { level_size_multiplier: 10, max_level_0_files: 4, };
let engine = LsmStorageEngine::with_config(config).await;
// Write data with overlapping key ranges for round in 0..10 { for i in 0..1000 { engine.write( format!("key_{:06}", i).into_bytes(), format!("value_round_{}", round).into_bytes() ).await.unwrap(); } engine.flush_memtable().await.unwrap(); }
// Trigger compaction engine.compact_all().await.unwrap();
// Verify data organized into levels let levels = engine.get_level_info().await; println!("Level structure: {:?}", levels);
// Level 0 should have few files assert!(levels[0].file_count <= 4, "Too many L0 files");
// Each level should be larger than previous for i in 1..levels.len() { if levels[i].file_count > 0 { assert!( levels[i].total_size >= levels[i-1].total_size, "Level {} smaller than level {}", i, i-1 ); } }
// Verify latest values for i in 0..1000 { let key = format!("key_{:06}", i).into_bytes(); let value = engine.read(&key).await.unwrap().unwrap(); assert_eq!(value, b"value_round_9"); }}
#[tokio::test]async fn test_compaction_removes_old_versions() { // COMPACT-003: Compaction removes old versions of keys let engine = LsmStorageEngine::new_test().await;
// Write multiple versions for version in 0..10 { for i in 0..100 { engine.write_with_timestamp( format!("key_{}", i).into_bytes(), format!("value_v{}", version).into_bytes(), version * 1000 ).await.unwrap(); } engine.flush_memtable().await.unwrap(); }
let size_before = engine.total_size().await;
// Compact all engine.compact_all().await.unwrap();
let size_after = engine.total_size().await;
// Size should be much smaller (old versions removed) let reduction = (size_before - size_after) as f64 / size_before as f64; println!("Size reduction: {:.2}%", reduction * 100.0); assert!(reduction > 0.8, "Compaction did not remove old versions");
// Verify latest versions still accessible for i in 0..100 { let key = format!("key_{}", i).into_bytes(); let value = engine.read(&key).await.unwrap().unwrap(); assert_eq!(value, b"value_v9"); }}3. Tombstone and Deletion Tests
use heliosdb_storage::lsm::tombstone::*;
#[tokio::test]async fn test_delete_creates_tombstone() { // TOMB-001: DELETE operation creates tombstone marker let engine = LsmStorageEngine::new_test().await;
// Write and delete engine.write(b"key1", b"value1").await.unwrap(); engine.delete(b"key1").await.unwrap();
// Read should return None let result = engine.read(b"key1").await.unwrap(); assert_eq!(result, None);
// Tombstone should exist in memtable assert!(engine.tombstone_exists(b"key1").await);}
#[tokio::test]async fn test_tombstone_shadows_older_writes() { // TOMB-002: Tombstone shadows all older versions let engine = LsmStorageEngine::new_test().await;
// Write v1 and flush engine.write_with_timestamp(b"key", b"v1", 100).await.unwrap(); engine.flush_memtable().await.unwrap();
// Write v2 and flush engine.write_with_timestamp(b"key", b"v2", 200).await.unwrap(); engine.flush_memtable().await.unwrap();
// Delete (tombstone at t=300) engine.delete_with_timestamp(b"key", 300).await.unwrap();
// Read should return None (tombstone shadows all older versions) let result = engine.read(b"key").await.unwrap(); assert_eq!(result, None);
// Even if we have old versions in SSTables let internal_versions = engine.get_all_versions(b"key").await; assert_eq!(internal_versions.len(), 3); // v1, v2, tombstone
let latest = &internal_versions[0]; assert!(latest.is_tombstone);}
#[tokio::test]async fn test_multiple_deletes() { // TOMB-003: Multiple deletes handled correctly let engine = LsmStorageEngine::new_test().await;
engine.write(b"key", b"value").await.unwrap(); engine.delete(b"key").await.unwrap(); engine.delete(b"key").await.unwrap(); // Delete again
let result = engine.read(b"key").await.unwrap(); assert_eq!(result, None);
// Should have tombstones but not error assert!(engine.tombstone_exists(b"key").await);}
#[tokio::test]async fn test_write_after_delete() { // TOMB-004: Write after delete resurrects key let engine = LsmStorageEngine::new_test().await;
// Write, delete, write engine.write_with_timestamp(b"key", b"v1", 100).await.unwrap(); engine.delete_with_timestamp(b"key", 200).await.unwrap(); engine.write_with_timestamp(b"key", b"v2", 300).await.unwrap();
// Should return latest write let result = engine.read(b"key").await.unwrap(); assert_eq!(result, Some(b"v2".to_vec()));}4. gc_grace_seconds Tests
use heliosdb_storage::lsm::gc::*;use std::time::Duration;
#[tokio::test]async fn test_tombstone_retained_during_grace_period() { // GC-001: Tombstones retained during gc_grace_seconds let mut config = LsmConfig::default(); config.gc_grace_seconds = 10; // 10 seconds
let engine = LsmStorageEngine::with_config(config).await;
// Write and delete engine.write(b"key1", b"value1").await.unwrap(); engine.delete(b"key1").await.unwrap(); engine.flush_memtable().await.unwrap();
// Compact immediately (within grace period) engine.compact_all().await.unwrap();
// Tombstone should still exist assert!( engine.tombstone_exists_in_sstables(b"key1").await, "Tombstone removed before gc_grace_seconds elapsed" );}
#[tokio::test]async fn test_tombstone_gc_after_grace_period() { // GC-002: Tombstones garbage collected after gc_grace_seconds let mut config = LsmConfig::default(); config.gc_grace_seconds = 2; // 2 seconds for faster testing
let engine = LsmStorageEngine::with_config(config).await;
// Write and delete engine.write(b"key1", b"value1").await.unwrap(); engine.delete(b"key1").await.unwrap(); engine.flush_memtable().await.unwrap();
// Wait past gc_grace_seconds tokio::time::sleep(Duration::from_secs(3)).await;
// Compact engine.compact_all().await.unwrap();
// Tombstone should be removed assert!( !engine.tombstone_exists_in_sstables(b"key1").await, "Tombstone not garbage collected after gc_grace_seconds" );}
#[tokio::test]async fn test_delete_consistency_with_gc_grace() { // GC-003: gc_grace_seconds prevents zombie records in distributed system let mut config = LsmConfig::default(); config.gc_grace_seconds = 30;
// Simulate distributed scenario let primary = LsmStorageEngine::with_config(config.clone()).await; let mirror = LsmStorageEngine::with_config(config.clone()).await;
// Write to both primary.write(b"key", b"value").await.unwrap(); mirror.write(b"key", b"value").await.unwrap();
// Delete on primary primary.delete(b"key").await.unwrap(); primary.flush_memtable().await.unwrap();
// Mirror is temporarily offline (misses delete) // Simulate by not replicating delete
// Primary compacts (but keeps tombstone due to gc_grace) primary.compact_all().await.unwrap(); assert!(primary.tombstone_exists_in_sstables(b"key").await);
// Mirror comes back online and receives tombstone mirror.delete(b"key").await.unwrap(); mirror.flush_memtable().await.unwrap();
// Both should return None assert_eq!(primary.read(b"key").await.unwrap(), None); assert_eq!(mirror.read(b"key").await.unwrap(), None);
// Wait past gc_grace tokio::time::sleep(Duration::from_secs(31)).await;
// Now both can safely remove tombstone primary.compact_all().await.unwrap(); mirror.compact_all().await.unwrap();
assert!(!primary.tombstone_exists_in_sstables(b"key").await); assert!(!mirror.tombstone_exists_in_sstables(b"key").await);}
#[tokio::test]async fn test_gc_grace_configurable_per_table() { // GC-004: gc_grace_seconds configurable per table let engine = LsmStorageEngine::new_test().await;
// Table with short gc_grace (archival data) engine.create_table("archive", TableConfig { gc_grace_seconds: 1, ..Default::default() }).await.unwrap();
// Table with long gc_grace (critical data) engine.create_table("critical", TableConfig { gc_grace_seconds: 864000, // 10 days ..Default::default() }).await.unwrap();
// Delete from both engine.write_to_table("archive", b"key1", b"val").await.unwrap(); engine.delete_from_table("archive", b"key1").await.unwrap();
engine.write_to_table("critical", b"key2", b"val").await.unwrap(); engine.delete_from_table("critical", b"key2").await.unwrap();
engine.flush_all().await.unwrap();
// Wait 2 seconds tokio::time::sleep(Duration::from_secs(2)).await; engine.compact_all().await.unwrap();
// Archive tombstone should be GC'd assert!(!engine.tombstone_exists_in_table("archive", b"key1").await);
// Critical tombstone should be retained assert!(engine.tombstone_exists_in_table("critical", b"key2").await);}5. MVCC and Version Management Tests
use heliosdb_storage::lsm::mvcc::*;
#[tokio::test]async fn test_timestamp_ordering() { // MVCC-001: Timestamps determine value visibility let engine = LsmStorageEngine::new_test().await;
// Write versions with explicit timestamps engine.write_with_timestamp(b"key", b"v1", 1000).await.unwrap(); engine.write_with_timestamp(b"key", b"v2", 2000).await.unwrap(); engine.write_with_timestamp(b"key", b"v3", 3000).await.unwrap();
// Read at different timestamps let v1 = engine.read_at_timestamp(b"key", 1500).await.unwrap(); let v2 = engine.read_at_timestamp(b"key", 2500).await.unwrap(); let v3 = engine.read_at_timestamp(b"key", 3500).await.unwrap();
assert_eq!(v1, Some(b"v1".to_vec())); assert_eq!(v2, Some(b"v2".to_vec())); assert_eq!(v3, Some(b"v3".to_vec()));}
#[tokio::test]async fn test_snapshot_isolation() { // MVCC-002: Snapshot isolation for consistent reads let engine = LsmStorageEngine::new_test().await;
// Insert initial data for i in 0..100 { engine.write_with_timestamp( format!("key_{}", i).into_bytes(), b"v1".to_vec(), 1000 ).await.unwrap(); }
// Create snapshot let snapshot = engine.create_snapshot(1500).await;
// Update data after snapshot for i in 0..100 { engine.write_with_timestamp( format!("key_{}", i).into_bytes(), b"v2".to_vec(), 2000 ).await.unwrap(); }
// Reads from snapshot should see old values for i in 0..100 { let key = format!("key_{}", i).into_bytes(); let value = snapshot.read(&key).await.unwrap(); assert_eq!(value, Some(b"v1".to_vec())); }
// Normal reads should see new values for i in 0..100 { let key = format!("key_{}", i).into_bytes(); let value = engine.read(&key).await.unwrap(); assert_eq!(value, Some(b"v2".to_vec())); }}
#[tokio::test]async fn test_version_garbage_collection() { // MVCC-003: Old versions removed when no longer needed let engine = LsmStorageEngine::new_test().await;
// Write many versions for v in 0..100 { engine.write_with_timestamp(b"key", format!("v{}", v).into_bytes(), v * 100) .await .unwrap(); }
engine.flush_memtable().await.unwrap();
// Create snapshot at t=5000 let snapshot = engine.create_snapshot(5000).await;
// Compact (should keep versions needed by snapshot) engine.compact_all().await.unwrap();
let versions = engine.get_all_versions(b"key").await;
// Should have versions from t=5000 onwards assert!(versions.len() < 100, "Old versions not removed");
// Snapshot should still work let value = snapshot.read(b"key").await.unwrap(); assert_eq!(value, Some(b"v50".to_vec()));
// Drop snapshot drop(snapshot);
// Compact again engine.compact_all().await.unwrap();
// Now should only have latest version let versions = engine.get_all_versions(b"key").await; assert_eq!(versions.len(), 1); assert_eq!(versions[0].value, b"v99");}6. HCC (Hybrid Columnar Compression) Tests
use heliosdb_storage::hcc::*;
#[tokio::test]async fn test_hcc_compression_ratio() { // HCC-001: Verify high compression ratio for columnar data let mut config = HccConfig::default(); config.compression_mode = HccMode::WarehouseOptimized; config.compression_unit_size = 10_000; // 10k rows per CU
let storage = HccStorage::with_config(config).await;
// Insert low-cardinality data (good compression) let statuses = vec!["active", "inactive", "pending"]; for i in 0..100_000 { storage.insert(vec![ ("id", Value::BigInt(i)), ("status", Value::VarChar(statuses[i as usize % 3].to_string())), ("value", Value::Integer(i % 100)), ]).await.unwrap(); }
let raw_size = storage.estimate_raw_size().await; let compressed_size = storage.actual_size().await; let ratio = raw_size as f64 / compressed_size as f64;
println!("HCC compression ratio: {:.2}x", ratio); assert!(ratio > 6.0, "Compression ratio below 6x for warehouse mode");}
#[tokio::test]async fn test_hcc_column_projection() { // HCC-002: Only decompress projected columns let storage = HccStorage::new_test().await;
// Insert wide rows (20 columns) for i in 0..10_000 { let mut row = vec![("id", Value::BigInt(i))]; for col in 0..19 { row.push(( &format!("col_{}", col), Value::VarChar(format!("data_{}", i)) )); } storage.insert(row).await.unwrap(); }
// Query with column projection (only 2 columns) let metrics = storage.query_with_metrics( "SELECT id, col_5 FROM table WHERE id < 1000", &[] ).await.unwrap();
// Verify only 2 columns decompressed assert_eq!(metrics.columns_decompressed, 2);
// Should scan much less data than full table scan let full_scan_bytes = storage.estimate_scan_size(10_000, 20).await; let projected_scan_bytes = metrics.bytes_decompressed;
let savings = 1.0 - (projected_scan_bytes as f64 / full_scan_bytes as f64); println!("I/O savings from projection: {:.1}%", savings * 100.0); assert!(savings > 0.8, "Column projection not effective");}
#[tokio::test]async fn test_hcc_update_overhead() { // HCC-003: Updates on HCC data require CU-level rewrite let storage = HccStorage::new_test().await;
// Insert data for i in 0..10_000 { storage.insert(vec![ ("id", Value::BigInt(i)), ("value", Value::Integer(i * 2)), ]).await.unwrap(); }
// Flush to HCC format storage.flush_to_hcc().await.unwrap();
// Update single row let start = std::time::Instant::now(); storage.execute("UPDATE table SET value = 999 WHERE id = 5000").await.unwrap(); let update_duration = start.elapsed();
println!("HCC update latency: {:?}", update_duration);
// Update should be slow (decompress + modify + recompress CU) assert!( update_duration > Duration::from_millis(10), "HCC update unexpectedly fast" );
// Verify update succeeded let result = storage.query("SELECT value FROM table WHERE id = 5000", &[]) .await .unwrap(); assert_eq!(result.rows[0].get::<i32>("value"), 999);}Test Execution
# Run all data integrity testscargo test --test data_integrity
# Run specific categoriescargo test --test lsm_write_pathcargo test --test compactioncargo test --test tombstonecargo test --test gc_gracecargo test --test mvcccargo test --test hcc
# Run with detailed metricsRUST_LOG=info cargo test --test data_integrity -- --nocapture --test-threads=1Quality Metrics
Success Criteria
- All writes recoverable from commit log
- Compaction reduces size by >80% for multi-version data
- Bloom filter false positive rate < 2%
- Tombstones respect gc_grace_seconds
- MVCC provides snapshot isolation
- HCC compression ratio > 6x for warehouse mode
Performance Targets
- Write amplification: < 10x
- Compaction throughput: > 100 MB/s
- HCC compression ratio: > 6x (warehouse), > 10x (archive)
- Column projection I/O savings: > 80%