Skip to content

Distributed Correctness Test Suite

Distributed Correctness Test Suite

Overview

This document defines tests for verifying the correctness of HeliosDB’s distributed systems components: Raft consensus, sharding/rebalancing, replication, and distributed query execution.

Test Categories

1. Raft Consensus Tests

2. Sharding and Rebalancing Tests

3. Replication and Failover Tests

4. Cache Invalidation Tests

5. Distributed Transaction Tests

1. Raft Consensus Correctness

Test Infrastructure

tests/distributed/raft/test_cluster.rs
use heliosdb_metadata::raft::MetadataNode;
use std::time::Duration;
pub struct RaftTestCluster {
pub nodes: Vec<MetadataNode>,
pub network: NetworkSimulator,
}
impl RaftTestCluster {
pub async fn new(size: usize) -> Self {
let mut nodes = Vec::new();
for i in 0..size {
let node = MetadataNode::new_test(i as u64).await;
nodes.push(node);
}
// Connect nodes
let network = NetworkSimulator::connect_all(&nodes).await;
Self { nodes, network }
}
pub async fn current_leader(&self) -> Option<u64> {
for node in &self.nodes {
if node.is_leader().await {
return Some(node.id());
}
}
None
}
pub async fn kill_node(&mut self, node_id: u64) {
self.network.disconnect(node_id).await;
self.nodes[node_id as usize].shutdown().await;
}
pub async fn partition(&mut self, group_a: Vec<u64>, group_b: Vec<u64>) {
self.network.partition(group_a, group_b).await;
}
pub async fn heal(&mut self) {
self.network.heal_all().await;
}
}

Consensus Tests

tests/distributed/raft/consensus_test.rs
use heliosdb_metadata::raft::*;
use std::time::Duration;
#[tokio::test]
async fn test_leader_election() {
// RAFT-001: Leader election on startup
let cluster = RaftTestCluster::new(3).await;
// Wait for leader election
tokio::time::sleep(Duration::from_secs(2)).await;
let leader_id = cluster.current_leader().await;
assert!(leader_id.is_some(), "No leader elected");
// Verify only one leader
let leader_count = cluster.nodes.iter()
.filter(|n| n.is_leader())
.count();
assert_eq!(leader_count, 1, "Multiple leaders detected");
}
#[tokio::test]
async fn test_leader_re_election_on_failure() {
// RAFT-002: New leader elected when current leader fails
let mut cluster = RaftTestCluster::new(5).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let initial_leader = cluster.current_leader().await.unwrap();
// Kill the leader
cluster.kill_node(initial_leader).await;
// Wait for re-election (should be < election timeout)
tokio::time::sleep(Duration::from_secs(3)).await;
let new_leader = cluster.current_leader().await;
assert!(new_leader.is_some(), "No new leader elected");
assert_ne!(new_leader.unwrap(), initial_leader, "Same leader re-elected");
// Verify new leader is functional
let result = cluster.nodes[new_leader.unwrap() as usize]
.propose_change("test_key", "test_value")
.await;
assert!(result.is_ok(), "New leader cannot process proposals");
}
#[tokio::test]
async fn test_log_replication() {
// RAFT-003: Log entries replicated to all nodes
let cluster = RaftTestCluster::new(3).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let leader_id = cluster.current_leader().await.unwrap();
let leader = &cluster.nodes[leader_id as usize];
// Propose multiple changes
for i in 0..10 {
leader.propose_change(
&format!("key_{}", i),
&format!("value_{}", i)
).await.unwrap();
}
// Wait for replication
tokio::time::sleep(Duration::from_secs(1)).await;
// Verify all nodes have same log
let leader_log = leader.get_log().await;
for node in &cluster.nodes {
if node.id() != leader_id {
let follower_log = node.get_log().await;
assert_eq!(
follower_log.len(),
leader_log.len(),
"Node {} log diverged",
node.id()
);
for (idx, entry) in leader_log.iter().enumerate() {
assert_eq!(
follower_log[idx],
*entry,
"Log entry {} mismatch on node {}",
idx,
node.id()
);
}
}
}
}
#[tokio::test]
async fn test_network_partition_recovery() {
// RAFT-004: Cluster recovers from network partition
let mut cluster = RaftTestCluster::new(5).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let initial_leader = cluster.current_leader().await.unwrap();
// Partition: {0, 1} vs {2, 3, 4}
cluster.partition(vec![0, 1], vec![2, 3, 4]).await;
// Wait for new leader in majority partition
tokio::time::sleep(Duration::from_secs(3)).await;
// Majority partition should have a leader
let majority_has_leader = cluster.nodes[2..5]
.iter()
.any(|n| n.is_leader());
assert!(majority_has_leader, "Majority partition has no leader");
// Minority partition should have no leader
let minority_has_leader = cluster.nodes[0..2]
.iter()
.any(|n| n.is_leader());
assert!(!minority_has_leader, "Minority partition elected leader");
// Heal partition
cluster.heal().await;
tokio::time::sleep(Duration::from_secs(2)).await;
// Should converge to single leader
let leader_count = cluster.nodes.iter()
.filter(|n| n.is_leader())
.count();
assert_eq!(leader_count, 1, "Multiple leaders after healing");
// All nodes should have consistent state
let leader_state = cluster.nodes
.iter()
.find(|n| n.is_leader())
.unwrap()
.get_state()
.await;
for node in &cluster.nodes {
if !node.is_leader() {
let state = node.get_state().await;
assert_eq!(state, leader_state, "Node {} state diverged", node.id());
}
}
}
#[tokio::test]
async fn test_committed_entries_never_lost() {
// RAFT-005: Committed entries are never lost
let mut cluster = RaftTestCluster::new(5).await;
tokio::time::sleep(Duration::from_secs(2)).await;
let leader_id = cluster.current_leader().await.unwrap();
let leader = &cluster.nodes[leader_id as usize];
// Commit some entries
let keys: Vec<_> = (0..20)
.map(|i| format!("persistent_key_{}", i))
.collect();
for key in &keys {
leader.propose_change(key, &format!("value_for_{}", key))
.await
.unwrap();
}
// Wait for commit
tokio::time::sleep(Duration::from_secs(1)).await;
// Kill leader and 1 follower (but keep majority)
cluster.kill_node(leader_id).await;
cluster.kill_node((leader_id + 1) % 5).await;
// Wait for new leader
tokio::time::sleep(Duration::from_secs(3)).await;
let new_leader_id = cluster.current_leader().await.unwrap();
let new_leader = &cluster.nodes[new_leader_id as usize];
// All committed keys should still be present
for key in &keys {
let value = new_leader.get(key).await;
assert!(
value.is_some(),
"Committed key {} was lost",
key
);
assert_eq!(
value.unwrap(),
format!("value_for_{}", key)
);
}
}

2. Sharding and Rebalancing Tests

tests/distributed/sharding/rebalancing_test.rs
use heliosdb_metadata::sharding::*;
use heliosdb_storage::StorageNode;
#[tokio::test]
async fn test_shard_assignment() {
// SHARD-001: Consistent hashing distributes data evenly
let metadata = MetadataService::new_test(3).await;
// Create table with sharding
metadata.create_table(CreateTableRequest {
name: "users".to_string(),
shard_key: "user_id".to_string(),
schema: vec![
Column { name: "user_id".to_string(), dtype: DataType::BigInt },
Column { name: "name".to_string(), dtype: DataType::VarChar(255) },
],
}).await.unwrap();
// Add 3 storage nodes
let nodes = vec![
metadata.add_storage_node("node_0", "127.0.0.1:7000").await.unwrap(),
metadata.add_storage_node("node_1", "127.0.0.1:7001").await.unwrap(),
metadata.add_storage_node("node_2", "127.0.0.1:7002").await.unwrap(),
];
// Insert 10,000 keys and track distribution
let mut distribution = std::collections::HashMap::new();
for i in 0..10_000 {
let shard_id = metadata.get_shard_for_key("users", &i).await.unwrap();
*distribution.entry(shard_id).or_insert(0) += 1;
}
// Verify even distribution (within 10% of ideal)
let ideal = 10_000 / 3;
for (shard, count) in distribution {
let deviation = ((count as f64 - ideal as f64) / ideal as f64).abs();
assert!(
deviation < 0.1,
"Shard {} has {} items ({}% deviation)",
shard,
count,
deviation * 100.0
);
}
}
#[tokio::test]
async fn test_rebalancing_on_node_add() {
// SHARD-002: Adding node triggers rebalancing
let metadata = MetadataService::new_test(3).await;
let mut cluster = StorageCluster::new_test(3).await;
// Create table and insert data
cluster.create_table("data", "id").await;
for i in 0..100_000 {
cluster.insert("data", vec![("id", i), ("value", i * 2)]).await.unwrap();
}
// Capture initial distribution
let initial_dist = cluster.get_data_distribution().await;
// Add 4th node
let new_node = cluster.add_node("node_3", "127.0.0.1:7003").await.unwrap();
// Wait for rebalancing
tokio::time::sleep(Duration::from_secs(10)).await;
// Verify rebalancing occurred
let final_dist = cluster.get_data_distribution().await;
// New node should have data
assert!(
final_dist.get(&new_node.id()).unwrap() > &0,
"New node has no data after rebalancing"
);
// Total data should be unchanged
let initial_total: usize = initial_dist.values().sum();
let final_total: usize = final_dist.values().sum();
assert_eq!(initial_total, final_total, "Data lost during rebalancing");
// Distribution should be more even
let final_ideal = 100_000 / 4;
for (node_id, count) in final_dist {
let deviation = ((count as f64 - final_ideal as f64) / final_ideal as f64).abs();
assert!(
deviation < 0.15,
"Node {} has {} items after rebalancing ({}% deviation)",
node_id,
count,
deviation * 100.0
);
}
}
#[tokio::test]
async fn test_no_data_loss_during_rebalancing() {
// SHARD-003: No data lost during rebalancing
let mut cluster = StorageCluster::new_test(3).await;
cluster.create_table("critical", "id").await;
// Insert test data
let test_keys: Vec<_> = (0..50_000).collect();
for id in &test_keys {
cluster.insert("critical", vec![("id", id), ("data", format!("value_{}", id))])
.await
.unwrap();
}
// Start rebalancing by adding node
cluster.add_node("node_3", "127.0.0.1:7003").await.unwrap();
// Continuously verify data presence during rebalancing
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(500)).await;
// Sample random keys
for _ in 0..100 {
let key = test_keys[rand::random::<usize>() % test_keys.len()];
let result = cluster.query(
"SELECT data FROM critical WHERE id = ?",
&[key]
).await;
assert!(
result.is_ok() && result.unwrap().rows.len() == 1,
"Key {} missing during rebalancing",
key
);
}
}
// Wait for rebalancing to complete
cluster.wait_for_rebalancing_complete().await;
// Verify all keys still exist
for id in &test_keys {
let result = cluster.query(
"SELECT data FROM critical WHERE id = ?",
&[id]
).await
.unwrap();
assert_eq!(result.rows.len(), 1, "Key {} lost after rebalancing", id);
assert_eq!(
result.rows[0].get::<String>("data"),
format!("value_{}", id)
);
}
}
#[tokio::test]
async fn test_rebalancing_with_concurrent_writes() {
// SHARD-004: Rebalancing works correctly with concurrent writes
let mut cluster = StorageCluster::new_test(3).await;
cluster.create_table("concurrent", "id").await;
// Start background writer
let cluster_clone = cluster.clone();
let writer_handle = tokio::spawn(async move {
for i in 0..100_000 {
cluster_clone.insert(
"concurrent",
vec![("id", i), ("value", i * 3)]
).await.unwrap();
tokio::time::sleep(Duration::from_micros(100)).await;
}
});
// Trigger rebalancing mid-writes
tokio::time::sleep(Duration::from_secs(2)).await;
cluster.add_node("node_3", "127.0.0.1:7003").await.unwrap();
// Wait for writer to complete
writer_handle.await.unwrap();
// Wait for rebalancing
cluster.wait_for_rebalancing_complete().await;
// Verify all writes succeeded and are present
let count = cluster.query("SELECT COUNT(*) as cnt FROM concurrent", &[])
.await
.unwrap()
.rows[0]
.get::<i64>("cnt");
assert_eq!(count, 100_000, "Writes lost during rebalancing");
}

3. Replication and Failover Tests

tests/distributed/replication/failover_test.rs
use heliosdb_storage::replication::*;
#[tokio::test]
async fn test_synchronous_replication() {
// REP-001: Writes are synchronously replicated to mirror
let cluster = StorageCluster::with_replication(1, 1).await; // 1 primary, 1 mirror
let primary = cluster.get_primary(0).await;
let mirror = cluster.get_mirror(0).await;
// Write to primary
primary.write(b"key1", b"value1").await.unwrap();
// Immediately read from mirror (should be present due to sync replication)
let value = mirror.read(b"key1").await.unwrap();
assert_eq!(value, Some(b"value1".to_vec()));
}
#[tokio::test]
async fn test_rpo_zero_on_primary_failure() {
// REP-002: RPO=0 (no data loss) when primary fails
let cluster = StorageCluster::with_replication(1, 1).await;
let primary = cluster.get_primary(0).await;
// Write 1000 records
for i in 0..1000 {
primary.write(
&format!("key_{}", i).into_bytes(),
&format!("value_{}", i).into_bytes()
).await.unwrap();
}
// Immediately kill primary (no delay for async replication)
cluster.kill_primary(0).await;
// Promote mirror
cluster.promote_mirror_to_primary(0).await;
let new_primary = cluster.get_primary(0).await;
// Verify all 1000 records present
for i in 0..1000 {
let key = format!("key_{}", i).into_bytes();
let value = new_primary.read(&key).await.unwrap();
assert!(
value.is_some(),
"Key key_{} missing after failover",
i
);
assert_eq!(
value.unwrap(),
format!("value_{}", i).into_bytes()
);
}
}
#[tokio::test]
async fn test_witness_quorum_failover() {
// REP-003: Witness provides quorum for split-brain prevention
let cluster = ReplicationCluster::with_witness(1).await; // 1 shard with witness
let primary_id = cluster.get_primary_id(0).await;
let mirror_id = cluster.get_mirror_id(0).await;
let witness_id = cluster.get_witness_id(0).await;
// Partition: primary isolated from mirror+witness
cluster.partition_network(
vec![primary_id],
vec![mirror_id, witness_id]
).await;
// Wait for failover decision
tokio::time::sleep(Duration::from_secs(5)).await;
// Mirror should be promoted (has quorum with witness)
assert!(cluster.is_primary(mirror_id).await);
assert!(!cluster.is_primary(primary_id).await);
// Heal network
cluster.heal_network().await;
tokio::time::sleep(Duration::from_secs(2)).await;
// Old primary should recognize new primary and become follower
assert!(!cluster.is_primary(primary_id).await);
assert!(cluster.is_primary(mirror_id).await);
}
#[tokio::test]
async fn test_no_split_brain() {
// REP-004: Split-brain prevention via witness quorum
let cluster = ReplicationCluster::with_witness(1).await;
let primary_id = cluster.get_primary_id(0).await;
let mirror_id = cluster.get_mirror_id(0).await;
let witness_id = cluster.get_witness_id(0).await;
// Partition: mirror isolated (no quorum)
cluster.partition_network(
vec![primary_id, witness_id],
vec![mirror_id]
).await;
tokio::time::sleep(Duration::from_secs(5)).await;
// Primary should remain active (has witness quorum)
assert!(cluster.is_primary(primary_id).await);
// Mirror should NOT self-promote (no quorum)
assert!(!cluster.is_primary(mirror_id).await);
// Both should accept writes from clients
let primary_write = cluster.write_to_node(primary_id, b"key", b"value1").await;
assert!(primary_write.is_ok(), "Primary cannot accept writes");
let mirror_write = cluster.write_to_node(mirror_id, b"key", b"value2").await;
assert!(mirror_write.is_err(), "Mirror accepted write without being primary");
}
#[tokio::test]
async fn test_automatic_failover_latency() {
// REP-005: Failover completes within acceptable time
let cluster = ReplicationCluster::with_witness(1).await;
let start = std::time::Instant::now();
// Kill primary
let primary_id = cluster.get_primary_id(0).await;
cluster.kill_node(primary_id).await;
// Wait for mirror promotion
loop {
if cluster.has_active_primary(0).await {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
if start.elapsed() > Duration::from_secs(30) {
panic!("Failover did not complete within 30 seconds");
}
}
let failover_duration = start.elapsed();
println!("Failover completed in {:?}", failover_duration);
// Failover should complete in < 10 seconds
assert!(
failover_duration < Duration::from_secs(10),
"Failover took too long: {:?}",
failover_duration
);
}

4. Cache Invalidation Tests

tests/distributed/cache/invalidation_test.rs
use heliosdb_compute::cache::*;
#[tokio::test]
async fn test_cache_invalidation_on_write() {
// CACHE-001: Compute node cache invalidated on write
let cluster = TestCluster::builder()
.storage_nodes(2)
.compute_nodes(2)
.build()
.await;
// Read from compute-0 (caches data)
let compute_0 = cluster.get_compute_node(0).await;
let result = compute_0.query("SELECT * FROM data WHERE id = 1").await.unwrap();
assert!(compute_0.cache_contains(1).await);
// Write via compute-1
let compute_1 = cluster.get_compute_node(1).await;
compute_1.execute("UPDATE data SET value = 999 WHERE id = 1").await.unwrap();
// Wait for invalidation broadcast
tokio::time::sleep(Duration::from_millis(100)).await;
// compute-0 cache should be invalidated
assert!(!compute_0.cache_contains(1).await);
// Next read should fetch fresh data
let result = compute_0.query("SELECT value FROM data WHERE id = 1").await.unwrap();
assert_eq!(result.rows[0].get::<i64>("value"), 999);
}
#[tokio::test]
async fn test_distributed_cache_coherence() {
// CACHE-002: Cache coherent across all compute nodes
let cluster = TestCluster::builder()
.storage_nodes(1)
.compute_nodes(5)
.build()
.await;
// All compute nodes read same key (cache it)
for i in 0..5 {
let compute = cluster.get_compute_node(i).await;
compute.query("SELECT * FROM data WHERE id = 100").await.unwrap();
assert!(compute.cache_contains(100).await);
}
// Update via compute-0
let compute_0 = cluster.get_compute_node(0).await;
compute_0.execute("UPDATE data SET value = 777 WHERE id = 100").await.unwrap();
// Wait for invalidation
tokio::time::sleep(Duration::from_millis(200)).await;
// All compute nodes should have invalidated cache
for i in 0..5 {
let compute = cluster.get_compute_node(i).await;
assert!(
!compute.cache_contains(100).await,
"Compute-{} cache not invalidated",
i
);
}
}

5. Distributed Transaction Tests

tests/distributed/transactions/distributed_tx_test.rs
use heliosdb_compute::transactions::*;
#[tokio::test]
async fn test_cross_shard_transaction_commit() {
// TX-001: Cross-shard transaction commits atomically
let cluster = TestCluster::builder()
.storage_nodes(3)
.compute_nodes(1)
.build()
.await;
let compute = cluster.get_compute_node(0).await;
// Start distributed transaction
let tx = compute.begin_transaction().await;
// Write to multiple shards
tx.execute("INSERT INTO shard_0_table (id, value) VALUES (1, 100)").await.unwrap();
tx.execute("INSERT INTO shard_1_table (id, value) VALUES (2, 200)").await.unwrap();
tx.execute("INSERT INTO shard_2_table (id, value) VALUES (3, 300)").await.unwrap();
// Commit
tx.commit().await.unwrap();
// Verify all writes visible
let r0 = compute.query("SELECT value FROM shard_0_table WHERE id = 1").await.unwrap();
let r1 = compute.query("SELECT value FROM shard_1_table WHERE id = 2").await.unwrap();
let r2 = compute.query("SELECT value FROM shard_2_table WHERE id = 3").await.unwrap();
assert_eq!(r0.rows[0].get::<i64>("value"), 100);
assert_eq!(r1.rows[0].get::<i64>("value"), 200);
assert_eq!(r2.rows[0].get::<i64>("value"), 300);
}
#[tokio::test]
async fn test_cross_shard_transaction_rollback() {
// TX-002: Cross-shard transaction rolls back atomically
let cluster = TestCluster::builder()
.storage_nodes(3)
.compute_nodes(1)
.build()
.await;
let compute = cluster.get_compute_node(0).await;
let tx = compute.begin_transaction().await;
// Write to multiple shards
tx.execute("INSERT INTO shard_0_table (id, value) VALUES (10, 1000)").await.unwrap();
tx.execute("INSERT INTO shard_1_table (id, value) VALUES (20, 2000)").await.unwrap();
// Rollback
tx.rollback().await.unwrap();
// Verify no writes are visible
let r0 = compute.query("SELECT COUNT(*) as cnt FROM shard_0_table WHERE id = 10").await.unwrap();
let r1 = compute.query("SELECT COUNT(*) as cnt FROM shard_1_table WHERE id = 20").await.unwrap();
assert_eq!(r0.rows[0].get::<i64>("cnt"), 0);
assert_eq!(r1.rows[0].get::<i64>("cnt"), 0);
}
#[tokio::test]
async fn test_transaction_isolation() {
// TX-003: Read Committed isolation level
let cluster = TestCluster::new_test().await;
let compute1 = cluster.get_compute_node(0).await;
let compute2 = cluster.get_compute_node(1).await;
// TX1: Begin and write (uncommitted)
let tx1 = compute1.begin_transaction().await;
tx1.execute("INSERT INTO test (id, value) VALUES (1, 100)").await.unwrap();
// TX2: Should not see uncommitted write
let tx2 = compute2.begin_transaction().await;
let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap();
assert_eq!(result.rows.len(), 0);
// TX1: Commit
tx1.commit().await.unwrap();
// TX2: Now should see committed write (with new query)
let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap();
assert_eq!(result.rows.len(), 1);
assert_eq!(result.rows[0].get::<i64>("value"), 100);
}

Test Execution

Terminal window
# Run all distributed tests
cargo test --test distributed
# Run specific category
cargo test --test raft_consensus
cargo test --test sharding
cargo test --test replication
cargo test --test cache_invalidation
# Run with detailed logging
RUST_LOG=debug cargo test --test distributed -- --nocapture
# Run chaos scenarios (requires Docker)
cargo test --test chaos -- --ignored

Quality Metrics

Success Criteria

  • All Raft consensus tests pass (5-node cluster)
  • Rebalancing completes without data loss
  • Failover latency < 10 seconds
  • Cache invalidation latency < 100ms
  • Cross-shard transactions maintain ACID properties

Performance Targets

  • Leader election: < 3 seconds
  • Rebalancing throughput: > 100 MB/s
  • Failover RTO: < 10 seconds
  • Cache invalidation propagation: < 100ms