Skip to content

HeliosDB Test Framework Design

HeliosDB Test Framework Design

Overview

This document defines the comprehensive testing strategy for HeliosDB, a next-generation distributed HTAP database. The testing framework is designed to validate protocol compatibility, distributed correctness, data integrity, vector search accuracy, and transactional semantics.

Testing Architecture

Test Pyramid

/\
/E2E\ <- Few, high-value end-to-end tests
/------\
/ Chaos \ <- Fault injection and resilience tests
/----------\
/Integration\ <- Multi-node distributed scenarios
/--------------\
/ Unit \ <- Many, fast, focused component tests
/------------------\

Language Strategy

  • Core Test Framework: Rust (for performance-critical tests)
  • Protocol Compatibility Tests: Python (for client driver validation)
  • Chaos Tests: Python/Rust hybrid
  • CI/CD Integration: GitHub Actions / GitLab CI

Test Categories

1. Unit Tests (Rust)

Location: */tests/ and */src/ (inline)

Coverage Requirements:

  • Statements: >85%
  • Branches: >80%
  • Functions: >85%

Key Components:

heliosdb-storage/tests/lsm_engine_test.rs
#[cfg(test)]
mod tests {
use heliosdb_storage::lsm::LsmStorageEngine;
#[tokio::test]
async fn test_write_read_cycle() {
// Test LSM write path: Commit Log -> Memtable
let engine = LsmStorageEngine::new_test().await;
let key = b"test_key".to_vec();
let value = b"test_value".to_vec();
engine.write(key.clone(), value.clone()).await.unwrap();
let result = engine.read(&key).await.unwrap();
assert_eq!(result, Some(value));
}
#[tokio::test]
async fn test_tombstone_handling() {
// Verify DELETE creates tombstone
let engine = LsmStorageEngine::new_test().await;
let key = b"delete_me".to_vec();
engine.write(key.clone(), b"value".to_vec()).await.unwrap();
engine.delete(&key).await.unwrap();
let result = engine.read(&key).await.unwrap();
assert_eq!(result, None); // Should return None due to tombstone
}
}

2. Integration Tests (Multi-Node)

Location: tests/integration/

Purpose: Validate distributed behavior across compute and storage nodes

Key Scenarios:

tests/integration/distributed_query_test.rs
#[tokio::test]
async fn test_cross_shard_query() {
// Setup: 3 storage nodes, 2 compute nodes
let cluster = TestCluster::builder()
.storage_nodes(3)
.compute_nodes(2)
.build()
.await;
// Insert data across shards
cluster.insert_sharded_data("users", 10_000).await;
// Execute cross-shard join
let result = cluster
.query("SELECT u.name, o.total FROM users u JOIN orders o ON u.id = o.user_id")
.await
.unwrap();
assert!(result.rows.len() > 0);
}
#[tokio::test]
async fn test_predicate_pushdown() {
let cluster = TestCluster::new().await;
// Insert 1M rows
cluster.insert_test_data(1_000_000).await;
// Query with predicate
let metrics = cluster
.query_with_metrics("SELECT * FROM data WHERE value > 500000")
.await
.unwrap();
// Verify pushdown occurred (should scan much less than 1M rows)
assert!(metrics.rows_scanned < 600_000);
assert!(metrics.bytes_transferred < cluster.total_bytes() / 2);
}

3. Protocol Compatibility Tests (Python)

Location: tests/protocol/

Must-Pass CI Tests: All Python clients from 01_PROTOCOL_TEST_MATRIX.md

tests/protocol/test_postgresql.py
import psycopg2
import pytest
@pytest.mark.protocol
def test_psycopg2_connect():
"""PostgreSQL protocol: Connect via psycopg2"""
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="test",
user="test",
password="test",
sslmode="require"
)
cur = conn.cursor()
cur.execute("SELECT 1")
assert cur.fetchone()[0] == 1
conn.close()
@pytest.mark.protocol
def test_psycopg2_prepared_statement():
"""PostgreSQL protocol: Prepared statements with parameters"""
conn = psycopg2.connect(DSN)
cur = conn.cursor()
cur.execute("INSERT INTO users (id, name) VALUES (%s, %s)", (1, "Alice"))
conn.commit()
cur.execute("SELECT name FROM users WHERE id = %s", (1,))
assert cur.fetchone()[0] == "Alice"
conn.close()
@pytest.mark.protocol
def test_psycopg2_transaction():
"""PostgreSQL protocol: BEGIN/COMMIT/ROLLBACK"""
conn = psycopg2.connect(DSN)
cur = conn.cursor()
conn.autocommit = False
cur.execute("INSERT INTO test (id) VALUES (100)")
conn.rollback()
cur.execute("SELECT COUNT(*) FROM test WHERE id = 100")
assert cur.fetchone()[0] == 0
conn.close()
@pytest.mark.protocol
def test_psycopg2_cursor_iteration():
"""PostgreSQL protocol: Server-side cursors"""
conn = psycopg2.connect(DSN)
cur = conn.cursor(name="server_cursor")
cur.execute("SELECT id FROM large_table")
batch = cur.fetchmany(100)
assert len(batch) == 100
conn.close()

4. Distributed Correctness Tests

Location: tests/distributed/

Focus Areas:

  • Raft consensus correctness
  • Shard rebalancing without data loss
  • Primary-mirror synchronous replication
  • Witness-based quorum and split-brain prevention
  • Cache invalidation across compute nodes
tests/distributed/raft_consensus_test.rs
#[tokio::test]
async fn test_raft_leader_election() {
let metadata_cluster = MetadataCluster::new(3).await;
// Kill current leader
let leader_id = metadata_cluster.current_leader().await;
metadata_cluster.kill_node(leader_id).await;
// Wait for new leader election
tokio::time::sleep(Duration::from_secs(5)).await;
let new_leader = metadata_cluster.current_leader().await;
assert_ne!(leader_id, new_leader);
// Verify state machine consistency
let state = metadata_cluster.get_state(new_leader).await;
assert!(state.is_consistent());
}
#[tokio::test]
async fn test_synchronous_replication() {
let cluster = TestCluster::new().await;
// Write data to primary
cluster.write_to_shard(0, "key1", "value1").await.unwrap();
// Immediately kill primary (before mirror can async sync)
cluster.kill_primary(0).await;
// Promote mirror
cluster.promote_mirror(0).await;
// Verify data exists on new primary (was sync replicated)
let value = cluster.read_from_shard(0, "key1").await.unwrap();
assert_eq!(value, "value1");
}
#[tokio::test]
async fn test_split_brain_prevention() {
let cluster = TestCluster::with_witness(2, 1).await;
// Partition network between primary and mirror
cluster.partition_network(vec![0], vec![1]).await;
// Mirror should not self-promote (no witness quorum)
tokio::time::sleep(Duration::from_secs(10)).await;
assert!(!cluster.is_primary(1).await);
// Partition mirror from witness too
cluster.partition_network(vec![0, 2], vec![1]).await;
// Primary+witness have quorum, primary stays active
assert!(cluster.is_primary(0).await);
}

5. Data Integrity Tests

Location: tests/data_integrity/

Focus: LSM-tree compaction, tombstone handling, gc_grace_seconds

tests/data_integrity/compaction_test.rs
#[tokio::test]
async fn test_tombstone_gc_grace_period() {
let mut config = LsmConfig::default();
config.gc_grace_seconds = 10; // 10 seconds for testing
let engine = LsmStorageEngine::with_config(config).await;
// Write and delete
engine.write(b"key1", b"value1").await.unwrap();
engine.delete(b"key1").await.unwrap();
// Trigger compaction immediately
engine.compact().await.unwrap();
// Tombstone should still exist (within gc_grace_seconds)
assert!(engine.tombstone_exists(b"key1").await);
// Wait past gc_grace_seconds
tokio::time::sleep(Duration::from_secs(11)).await;
engine.compact().await.unwrap();
// Tombstone should be garbage collected
assert!(!engine.tombstone_exists(b"key1").await);
}
#[tokio::test]
async fn test_multi_version_read() {
let engine = LsmStorageEngine::new_test().await;
// Write v1
engine.write_with_timestamp(b"key", b"v1", 100).await.unwrap();
// Write v2
engine.write_with_timestamp(b"key", b"v2", 200).await.unwrap();
// Flush to create SSTable with both versions
engine.flush_memtable().await.unwrap();
// Read latest
let latest = engine.read(b"key").await.unwrap();
assert_eq!(latest, Some(b"v2".to_vec()));
// Trigger compaction
engine.compact().await.unwrap();
// Only v2 should remain after compaction
let post_compact = engine.read(b"key").await.unwrap();
assert_eq!(post_compact, Some(b"v2".to_vec()));
}

6. Vector Search Accuracy Tests

Location: tests/vector/

Metrics: Recall@K, precision, latency

tests/vector/filtered_ann_test.rs
#[tokio::test]
async fn test_filtered_hnsw_recall() {
let index = HNSWIndex::new(384, 16, 200).await; // dims=384, M=16, efConstruction=200
// Insert 100k vectors with metadata
let dataset = generate_test_vectors(100_000, 384);
for (i, vec) in dataset.iter().enumerate() {
index.insert(i as u64, vec, vec![
("category", format!("cat_{}", i % 10)),
("price", (i % 1000).to_string())
]).await.unwrap();
}
// Query with filter
let query_vec = dataset[0].clone();
let results = index.search(
&query_vec,
10, // top_k
Some(vec![
Filter::Equals("category", "cat_0"),
Filter::LessThan("price", "500")
])
).await.unwrap();
// Calculate recall vs brute-force
let ground_truth = brute_force_search(&dataset, &query_vec, 10, &filters);
let recall = calculate_recall(&results, &ground_truth);
assert!(recall > 0.95, "Recall too low: {}", recall);
}
#[tokio::test]
async fn test_vector_storage_toast() {
let storage = VectorStorage::new().await;
// Small vector (should be in-line)
let small_vec = vec![0.1f32; 128]; // 512 bytes
storage.insert(1, &small_vec, StorageHint::Plain).await.unwrap();
assert!(storage.is_inline(1).await);
// Large vector (should be out-of-line with EXTERNAL)
let large_vec = vec![0.1f32; 1536]; // 6144 bytes
storage.insert(2, &large_vec, StorageHint::External).await.unwrap();
assert!(!storage.is_inline(2).await);
// Verify retrieval correctness
let retrieved = storage.get(2).await.unwrap();
assert_eq!(retrieved, large_vec);
}

7. ACID Compliance Tests

Location: tests/transactions/

Focus: Atomicity, Consistency, Isolation, Durability

tests/transactions/acid_test.rs
#[tokio::test]
async fn test_atomicity_distributed() {
let cluster = TestCluster::new().await;
// Multi-shard transaction
let tx = cluster.begin_transaction().await;
tx.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')").await.unwrap();
tx.execute("INSERT INTO orders (user_id, total) VALUES (1, 100)").await.unwrap();
// Simulate node failure mid-transaction
cluster.kill_compute_node(0).await;
// Transaction should be rolled back
let users = cluster.query("SELECT * FROM users WHERE id = 1").await.unwrap();
let orders = cluster.query("SELECT * FROM orders WHERE user_id = 1").await.unwrap();
assert_eq!(users.rows.len(), 0);
assert_eq!(orders.rows.len(), 0);
}
#[tokio::test]
async fn test_isolation_read_committed() {
let cluster = TestCluster::new().await;
// TX1: Begin
let tx1 = cluster.begin_transaction().await;
tx1.execute("INSERT INTO test (id, value) VALUES (1, 100)").await.unwrap();
// TX2: Should not see uncommitted data
let tx2 = cluster.begin_transaction().await;
let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap();
assert_eq!(result.rows.len(), 0);
// TX1: Commit
tx1.commit().await.unwrap();
// TX2: Now should see committed data
let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap();
assert_eq!(result.rows.len(), 1);
}
#[tokio::test]
async fn test_durability() {
let cluster = TestCluster::new().await;
cluster.execute("INSERT INTO durable (id, data) VALUES (1, 'critical')").await.unwrap();
// Immediate cluster crash (all nodes)
cluster.crash_all().await;
// Restart cluster
let cluster = TestCluster::restart(cluster.id).await;
// Data should be recovered from commit log
let result = cluster.query("SELECT data FROM durable WHERE id = 1").await.unwrap();
assert_eq!(result.rows[0].get::<String>("data"), "critical");
}

8. Performance Regression Tests

Location: tests/benchmarks/

Metrics: Throughput (ops/sec), Latency (p50, p95, p99), CPU/Memory usage

tests/benchmarks/ingestion_bench.rs
#[bench]
fn bench_lsm_write_throughput(b: &mut Bencher) {
let rt = tokio::runtime::Runtime::new().unwrap();
let engine = rt.block_on(LsmStorageEngine::new_test());
b.iter(|| {
rt.block_on(async {
for i in 0..10_000 {
let key = format!("key_{}", i).into_bytes();
let value = format!("value_{}", i).into_bytes();
engine.write(key, value).await.unwrap();
}
});
});
}
#[tokio::test]
async fn test_query_latency_sla() {
let cluster = TestCluster::new().await;
cluster.insert_test_data(1_000_000).await;
let mut latencies = Vec::new();
for _ in 0..1000 {
let start = Instant::now();
cluster.query("SELECT * FROM data WHERE id = ?", rand::random()).await.unwrap();
latencies.push(start.elapsed());
}
latencies.sort();
let p95 = latencies[(latencies.len() as f64 * 0.95) as usize];
let p99 = latencies[(latencies.len() as f64 * 0.99) as usize];
assert!(p95 < Duration::from_millis(10), "P95 latency too high");
assert!(p99 < Duration::from_millis(50), "P99 latency too high");
}

9. Chaos Engineering Tests

Location: tests/chaos/

Scenarios: Network partitions, node failures, disk corruption, clock skew

tests/chaos/network_partition_test.py
import pytest
from chaos_toolkit.experiment import run_experiment
@pytest.mark.chaos
def test_network_partition_resilience():
experiment = {
"title": "Network partition between storage nodes",
"steady-state-hypothesis": {
"title": "Cluster is healthy",
"probes": [
{
"type": "probe",
"name": "all-nodes-responding",
"tolerance": True,
"provider": {
"type": "python",
"module": "heliosdb.chaos.probes",
"func": "all_nodes_healthy"
}
}
]
},
"method": [
{
"type": "action",
"name": "partition-storage-nodes",
"provider": {
"type": "python",
"module": "heliosdb.chaos.actions",
"func": "partition_network",
"arguments": {
"group_a": ["storage-0", "storage-1"],
"group_b": ["storage-2"]
}
},
"pauses": {"after": 30}
},
{
"type": "probe",
"name": "writes-still-succeed",
"tolerance": True,
"provider": {
"type": "python",
"module": "heliosdb.chaos.probes",
"func": "can_write_to_quorum"
}
},
{
"type": "action",
"name": "heal-partition",
"provider": {
"type": "python",
"module": "heliosdb.chaos.actions",
"func": "heal_network"
}
}
],
"rollbacks": [
{
"type": "action",
"name": "heal-network",
"provider": {
"type": "python",
"module": "heliosdb.chaos.actions",
"func": "heal_network"
}
}
]
}
run_experiment(experiment)

Test Execution Strategy

Local Development

Terminal window
# Unit tests (fast feedback)
cargo test --workspace
# Integration tests (requires Docker)
cargo test --test integration -- --test-threads=1
# Protocol tests
python -m pytest tests/protocol/ -v
# Benchmarks
cargo bench

CI/CD Pipeline

.github/workflows/test.yml
name: HeliosDB Test Suite
on: [push, pull_request]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
- run: cargo test --workspace --lib
protocol-compatibility:
runs-on: ubuntu-latest
strategy:
matrix:
client:
- psycopg2
- pymysql
- snowflake-connector-python
- databricks-sql-connector
- pinecone-client
steps:
- uses: actions/checkout@v3
- uses: docker/setup-buildx-action@v2
- run: docker-compose up -d heliosdb
- run: pytest tests/protocol/test_${{ matrix.client }}.py -v
distributed-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: docker-compose -f docker-compose.cluster.yml up -d
- run: cargo test --test distributed -- --test-threads=1
chaos-tests:
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- run: pip install chaostoolkit
- run: pytest tests/chaos/ -v -m chaos

Test Data Management

Test Fixtures

tests/common/fixtures.rs
pub struct TestCluster {
pub storage_nodes: Vec<StorageNode>,
pub compute_nodes: Vec<ComputeNode>,
pub metadata_cluster: MetadataCluster,
}
impl TestCluster {
pub async fn builder() -> TestClusterBuilder {
TestClusterBuilder::default()
}
pub async fn insert_test_data(&self, rows: usize) {
// Generate deterministic test data
for i in 0..rows {
self.execute(
"INSERT INTO test (id, value) VALUES (?, ?)",
&[i, i * 2]
).await.unwrap();
}
}
}
pub fn generate_test_vectors(count: usize, dims: usize) -> Vec<Vec<f32>> {
// Reproducible random vectors for testing
let mut rng = StdRng::seed_from_u64(42);
(0..count)
.map(|_| {
(0..dims).map(|_| rng.gen_range(-1.0..1.0)).collect()
})
.collect()
}

Quality Gates

Merge Requirements

  • All unit tests pass
  • All protocol compatibility tests pass
  • Code coverage > 80%
  • No performance regression (>5% slowdown)
  • All distributed correctness tests pass

Release Requirements

  • All tests pass
  • Chaos tests pass
  • Security audit complete
  • Performance benchmarks meet SLA
  • Protocol compatibility verified for all clients

Monitoring and Observability

Test Metrics Collection

tests/common/metrics.rs
pub struct TestMetrics {
pub rows_scanned: u64,
pub bytes_transferred: u64,
pub cache_hits: u64,
pub cache_misses: u64,
pub network_round_trips: u64,
}
impl TestCluster {
pub async fn query_with_metrics(&self, sql: &str) -> (QueryResult, TestMetrics) {
let start_metrics = self.collect_metrics().await;
let result = self.query(sql).await.unwrap();
let end_metrics = self.collect_metrics().await;
(result, end_metrics - start_metrics)
}
}

Future Enhancements

  1. Property-Based Testing: Use proptest for fuzz testing
  2. Mutation Testing: Use cargo-mutants to verify test quality
  3. Load Testing: Integrate k6 or Gatling for stress testing
  4. Security Testing: OWASP ZAP, SQL injection fuzzing
  5. Compliance Testing: TPC-H, TPC-C benchmark suites