HeliosDB Test Framework Design
HeliosDB Test Framework Design
Overview
This document defines the comprehensive testing strategy for HeliosDB, a next-generation distributed HTAP database. The testing framework is designed to validate protocol compatibility, distributed correctness, data integrity, vector search accuracy, and transactional semantics.
Testing Architecture
Test Pyramid
/\ /E2E\ <- Few, high-value end-to-end tests /------\ / Chaos \ <- Fault injection and resilience tests /----------\ /Integration\ <- Multi-node distributed scenarios /--------------\ / Unit \ <- Many, fast, focused component tests /------------------\Language Strategy
- Core Test Framework: Rust (for performance-critical tests)
- Protocol Compatibility Tests: Python (for client driver validation)
- Chaos Tests: Python/Rust hybrid
- CI/CD Integration: GitHub Actions / GitLab CI
Test Categories
1. Unit Tests (Rust)
Location: */tests/ and */src/ (inline)
Coverage Requirements:
- Statements: >85%
- Branches: >80%
- Functions: >85%
Key Components:
#[cfg(test)]mod tests { use heliosdb_storage::lsm::LsmStorageEngine;
#[tokio::test] async fn test_write_read_cycle() { // Test LSM write path: Commit Log -> Memtable let engine = LsmStorageEngine::new_test().await; let key = b"test_key".to_vec(); let value = b"test_value".to_vec();
engine.write(key.clone(), value.clone()).await.unwrap(); let result = engine.read(&key).await.unwrap();
assert_eq!(result, Some(value)); }
#[tokio::test] async fn test_tombstone_handling() { // Verify DELETE creates tombstone let engine = LsmStorageEngine::new_test().await; let key = b"delete_me".to_vec();
engine.write(key.clone(), b"value".to_vec()).await.unwrap(); engine.delete(&key).await.unwrap();
let result = engine.read(&key).await.unwrap(); assert_eq!(result, None); // Should return None due to tombstone }}2. Integration Tests (Multi-Node)
Location: tests/integration/
Purpose: Validate distributed behavior across compute and storage nodes
Key Scenarios:
#[tokio::test]async fn test_cross_shard_query() { // Setup: 3 storage nodes, 2 compute nodes let cluster = TestCluster::builder() .storage_nodes(3) .compute_nodes(2) .build() .await;
// Insert data across shards cluster.insert_sharded_data("users", 10_000).await;
// Execute cross-shard join let result = cluster .query("SELECT u.name, o.total FROM users u JOIN orders o ON u.id = o.user_id") .await .unwrap();
assert!(result.rows.len() > 0);}
#[tokio::test]async fn test_predicate_pushdown() { let cluster = TestCluster::new().await;
// Insert 1M rows cluster.insert_test_data(1_000_000).await;
// Query with predicate let metrics = cluster .query_with_metrics("SELECT * FROM data WHERE value > 500000") .await .unwrap();
// Verify pushdown occurred (should scan much less than 1M rows) assert!(metrics.rows_scanned < 600_000); assert!(metrics.bytes_transferred < cluster.total_bytes() / 2);}3. Protocol Compatibility Tests (Python)
Location: tests/protocol/
Must-Pass CI Tests: All Python clients from 01_PROTOCOL_TEST_MATRIX.md
import psycopg2import pytest
@pytest.mark.protocoldef test_psycopg2_connect(): """PostgreSQL protocol: Connect via psycopg2""" conn = psycopg2.connect( host="localhost", port=5432, dbname="test", user="test", password="test", sslmode="require" ) cur = conn.cursor() cur.execute("SELECT 1") assert cur.fetchone()[0] == 1 conn.close()
@pytest.mark.protocoldef test_psycopg2_prepared_statement(): """PostgreSQL protocol: Prepared statements with parameters""" conn = psycopg2.connect(DSN) cur = conn.cursor()
cur.execute("INSERT INTO users (id, name) VALUES (%s, %s)", (1, "Alice")) conn.commit()
cur.execute("SELECT name FROM users WHERE id = %s", (1,)) assert cur.fetchone()[0] == "Alice" conn.close()
@pytest.mark.protocoldef test_psycopg2_transaction(): """PostgreSQL protocol: BEGIN/COMMIT/ROLLBACK""" conn = psycopg2.connect(DSN) cur = conn.cursor()
conn.autocommit = False cur.execute("INSERT INTO test (id) VALUES (100)") conn.rollback()
cur.execute("SELECT COUNT(*) FROM test WHERE id = 100") assert cur.fetchone()[0] == 0 conn.close()
@pytest.mark.protocoldef test_psycopg2_cursor_iteration(): """PostgreSQL protocol: Server-side cursors""" conn = psycopg2.connect(DSN) cur = conn.cursor(name="server_cursor")
cur.execute("SELECT id FROM large_table") batch = cur.fetchmany(100) assert len(batch) == 100 conn.close()4. Distributed Correctness Tests
Location: tests/distributed/
Focus Areas:
- Raft consensus correctness
- Shard rebalancing without data loss
- Primary-mirror synchronous replication
- Witness-based quorum and split-brain prevention
- Cache invalidation across compute nodes
#[tokio::test]async fn test_raft_leader_election() { let metadata_cluster = MetadataCluster::new(3).await;
// Kill current leader let leader_id = metadata_cluster.current_leader().await; metadata_cluster.kill_node(leader_id).await;
// Wait for new leader election tokio::time::sleep(Duration::from_secs(5)).await;
let new_leader = metadata_cluster.current_leader().await; assert_ne!(leader_id, new_leader);
// Verify state machine consistency let state = metadata_cluster.get_state(new_leader).await; assert!(state.is_consistent());}
#[tokio::test]async fn test_synchronous_replication() { let cluster = TestCluster::new().await;
// Write data to primary cluster.write_to_shard(0, "key1", "value1").await.unwrap();
// Immediately kill primary (before mirror can async sync) cluster.kill_primary(0).await;
// Promote mirror cluster.promote_mirror(0).await;
// Verify data exists on new primary (was sync replicated) let value = cluster.read_from_shard(0, "key1").await.unwrap(); assert_eq!(value, "value1");}
#[tokio::test]async fn test_split_brain_prevention() { let cluster = TestCluster::with_witness(2, 1).await;
// Partition network between primary and mirror cluster.partition_network(vec![0], vec![1]).await;
// Mirror should not self-promote (no witness quorum) tokio::time::sleep(Duration::from_secs(10)).await; assert!(!cluster.is_primary(1).await);
// Partition mirror from witness too cluster.partition_network(vec![0, 2], vec![1]).await;
// Primary+witness have quorum, primary stays active assert!(cluster.is_primary(0).await);}5. Data Integrity Tests
Location: tests/data_integrity/
Focus: LSM-tree compaction, tombstone handling, gc_grace_seconds
#[tokio::test]async fn test_tombstone_gc_grace_period() { let mut config = LsmConfig::default(); config.gc_grace_seconds = 10; // 10 seconds for testing
let engine = LsmStorageEngine::with_config(config).await;
// Write and delete engine.write(b"key1", b"value1").await.unwrap(); engine.delete(b"key1").await.unwrap();
// Trigger compaction immediately engine.compact().await.unwrap();
// Tombstone should still exist (within gc_grace_seconds) assert!(engine.tombstone_exists(b"key1").await);
// Wait past gc_grace_seconds tokio::time::sleep(Duration::from_secs(11)).await; engine.compact().await.unwrap();
// Tombstone should be garbage collected assert!(!engine.tombstone_exists(b"key1").await);}
#[tokio::test]async fn test_multi_version_read() { let engine = LsmStorageEngine::new_test().await;
// Write v1 engine.write_with_timestamp(b"key", b"v1", 100).await.unwrap();
// Write v2 engine.write_with_timestamp(b"key", b"v2", 200).await.unwrap();
// Flush to create SSTable with both versions engine.flush_memtable().await.unwrap();
// Read latest let latest = engine.read(b"key").await.unwrap(); assert_eq!(latest, Some(b"v2".to_vec()));
// Trigger compaction engine.compact().await.unwrap();
// Only v2 should remain after compaction let post_compact = engine.read(b"key").await.unwrap(); assert_eq!(post_compact, Some(b"v2".to_vec()));}6. Vector Search Accuracy Tests
Location: tests/vector/
Metrics: Recall@K, precision, latency
#[tokio::test]async fn test_filtered_hnsw_recall() { let index = HNSWIndex::new(384, 16, 200).await; // dims=384, M=16, efConstruction=200
// Insert 100k vectors with metadata let dataset = generate_test_vectors(100_000, 384); for (i, vec) in dataset.iter().enumerate() { index.insert(i as u64, vec, vec![ ("category", format!("cat_{}", i % 10)), ("price", (i % 1000).to_string()) ]).await.unwrap(); }
// Query with filter let query_vec = dataset[0].clone(); let results = index.search( &query_vec, 10, // top_k Some(vec![ Filter::Equals("category", "cat_0"), Filter::LessThan("price", "500") ]) ).await.unwrap();
// Calculate recall vs brute-force let ground_truth = brute_force_search(&dataset, &query_vec, 10, &filters); let recall = calculate_recall(&results, &ground_truth);
assert!(recall > 0.95, "Recall too low: {}", recall);}
#[tokio::test]async fn test_vector_storage_toast() { let storage = VectorStorage::new().await;
// Small vector (should be in-line) let small_vec = vec![0.1f32; 128]; // 512 bytes storage.insert(1, &small_vec, StorageHint::Plain).await.unwrap(); assert!(storage.is_inline(1).await);
// Large vector (should be out-of-line with EXTERNAL) let large_vec = vec![0.1f32; 1536]; // 6144 bytes storage.insert(2, &large_vec, StorageHint::External).await.unwrap(); assert!(!storage.is_inline(2).await);
// Verify retrieval correctness let retrieved = storage.get(2).await.unwrap(); assert_eq!(retrieved, large_vec);}7. ACID Compliance Tests
Location: tests/transactions/
Focus: Atomicity, Consistency, Isolation, Durability
#[tokio::test]async fn test_atomicity_distributed() { let cluster = TestCluster::new().await;
// Multi-shard transaction let tx = cluster.begin_transaction().await;
tx.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')").await.unwrap(); tx.execute("INSERT INTO orders (user_id, total) VALUES (1, 100)").await.unwrap();
// Simulate node failure mid-transaction cluster.kill_compute_node(0).await;
// Transaction should be rolled back let users = cluster.query("SELECT * FROM users WHERE id = 1").await.unwrap(); let orders = cluster.query("SELECT * FROM orders WHERE user_id = 1").await.unwrap();
assert_eq!(users.rows.len(), 0); assert_eq!(orders.rows.len(), 0);}
#[tokio::test]async fn test_isolation_read_committed() { let cluster = TestCluster::new().await;
// TX1: Begin let tx1 = cluster.begin_transaction().await; tx1.execute("INSERT INTO test (id, value) VALUES (1, 100)").await.unwrap();
// TX2: Should not see uncommitted data let tx2 = cluster.begin_transaction().await; let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap(); assert_eq!(result.rows.len(), 0);
// TX1: Commit tx1.commit().await.unwrap();
// TX2: Now should see committed data let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap(); assert_eq!(result.rows.len(), 1);}
#[tokio::test]async fn test_durability() { let cluster = TestCluster::new().await;
cluster.execute("INSERT INTO durable (id, data) VALUES (1, 'critical')").await.unwrap();
// Immediate cluster crash (all nodes) cluster.crash_all().await;
// Restart cluster let cluster = TestCluster::restart(cluster.id).await;
// Data should be recovered from commit log let result = cluster.query("SELECT data FROM durable WHERE id = 1").await.unwrap(); assert_eq!(result.rows[0].get::<String>("data"), "critical");}8. Performance Regression Tests
Location: tests/benchmarks/
Metrics: Throughput (ops/sec), Latency (p50, p95, p99), CPU/Memory usage
#[bench]fn bench_lsm_write_throughput(b: &mut Bencher) { let rt = tokio::runtime::Runtime::new().unwrap(); let engine = rt.block_on(LsmStorageEngine::new_test());
b.iter(|| { rt.block_on(async { for i in 0..10_000 { let key = format!("key_{}", i).into_bytes(); let value = format!("value_{}", i).into_bytes(); engine.write(key, value).await.unwrap(); } }); });}
#[tokio::test]async fn test_query_latency_sla() { let cluster = TestCluster::new().await; cluster.insert_test_data(1_000_000).await;
let mut latencies = Vec::new();
for _ in 0..1000 { let start = Instant::now(); cluster.query("SELECT * FROM data WHERE id = ?", rand::random()).await.unwrap(); latencies.push(start.elapsed()); }
latencies.sort(); let p95 = latencies[(latencies.len() as f64 * 0.95) as usize]; let p99 = latencies[(latencies.len() as f64 * 0.99) as usize];
assert!(p95 < Duration::from_millis(10), "P95 latency too high"); assert!(p99 < Duration::from_millis(50), "P99 latency too high");}9. Chaos Engineering Tests
Location: tests/chaos/
Scenarios: Network partitions, node failures, disk corruption, clock skew
import pytestfrom chaos_toolkit.experiment import run_experiment
@pytest.mark.chaosdef test_network_partition_resilience(): experiment = { "title": "Network partition between storage nodes", "steady-state-hypothesis": { "title": "Cluster is healthy", "probes": [ { "type": "probe", "name": "all-nodes-responding", "tolerance": True, "provider": { "type": "python", "module": "heliosdb.chaos.probes", "func": "all_nodes_healthy" } } ] }, "method": [ { "type": "action", "name": "partition-storage-nodes", "provider": { "type": "python", "module": "heliosdb.chaos.actions", "func": "partition_network", "arguments": { "group_a": ["storage-0", "storage-1"], "group_b": ["storage-2"] } }, "pauses": {"after": 30} }, { "type": "probe", "name": "writes-still-succeed", "tolerance": True, "provider": { "type": "python", "module": "heliosdb.chaos.probes", "func": "can_write_to_quorum" } }, { "type": "action", "name": "heal-partition", "provider": { "type": "python", "module": "heliosdb.chaos.actions", "func": "heal_network" } } ], "rollbacks": [ { "type": "action", "name": "heal-network", "provider": { "type": "python", "module": "heliosdb.chaos.actions", "func": "heal_network" } } ] }
run_experiment(experiment)Test Execution Strategy
Local Development
# Unit tests (fast feedback)cargo test --workspace
# Integration tests (requires Docker)cargo test --test integration -- --test-threads=1
# Protocol testspython -m pytest tests/protocol/ -v
# Benchmarkscargo benchCI/CD Pipeline
name: HeliosDB Test Suite
on: [push, pull_request]
jobs: unit-tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: actions-rs/toolchain@v1 - run: cargo test --workspace --lib
protocol-compatibility: runs-on: ubuntu-latest strategy: matrix: client: - psycopg2 - pymysql - snowflake-connector-python - databricks-sql-connector - pinecone-client steps: - uses: actions/checkout@v3 - uses: docker/setup-buildx-action@v2 - run: docker-compose up -d heliosdb - run: pytest tests/protocol/test_${{ matrix.client }}.py -v
distributed-tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - run: docker-compose -f docker-compose.cluster.yml up -d - run: cargo test --test distributed -- --test-threads=1
chaos-tests: runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - uses: actions/checkout@v3 - run: pip install chaostoolkit - run: pytest tests/chaos/ -v -m chaosTest Data Management
Test Fixtures
pub struct TestCluster { pub storage_nodes: Vec<StorageNode>, pub compute_nodes: Vec<ComputeNode>, pub metadata_cluster: MetadataCluster,}
impl TestCluster { pub async fn builder() -> TestClusterBuilder { TestClusterBuilder::default() }
pub async fn insert_test_data(&self, rows: usize) { // Generate deterministic test data for i in 0..rows { self.execute( "INSERT INTO test (id, value) VALUES (?, ?)", &[i, i * 2] ).await.unwrap(); } }}
pub fn generate_test_vectors(count: usize, dims: usize) -> Vec<Vec<f32>> { // Reproducible random vectors for testing let mut rng = StdRng::seed_from_u64(42); (0..count) .map(|_| { (0..dims).map(|_| rng.gen_range(-1.0..1.0)).collect() }) .collect()}Quality Gates
Merge Requirements
- All unit tests pass
- All protocol compatibility tests pass
- Code coverage > 80%
- No performance regression (>5% slowdown)
- All distributed correctness tests pass
Release Requirements
- All tests pass
- Chaos tests pass
- Security audit complete
- Performance benchmarks meet SLA
- Protocol compatibility verified for all clients
Monitoring and Observability
Test Metrics Collection
pub struct TestMetrics { pub rows_scanned: u64, pub bytes_transferred: u64, pub cache_hits: u64, pub cache_misses: u64, pub network_round_trips: u64,}
impl TestCluster { pub async fn query_with_metrics(&self, sql: &str) -> (QueryResult, TestMetrics) { let start_metrics = self.collect_metrics().await; let result = self.query(sql).await.unwrap(); let end_metrics = self.collect_metrics().await;
(result, end_metrics - start_metrics) }}Future Enhancements
- Property-Based Testing: Use
proptestfor fuzz testing - Mutation Testing: Use
cargo-mutantsto verify test quality - Load Testing: Integrate k6 or Gatling for stress testing
- Security Testing: OWASP ZAP, SQL injection fuzzing
- Compliance Testing: TPC-H, TPC-C benchmark suites