Distributed Correctness Test Suite
Distributed Correctness Test Suite
Overview
This document defines tests for verifying the correctness of HeliosDB’s distributed systems components: Raft consensus, sharding/rebalancing, replication, and distributed query execution.
Test Categories
1. Raft Consensus Tests
2. Sharding and Rebalancing Tests
3. Replication and Failover Tests
4. Cache Invalidation Tests
5. Distributed Transaction Tests
1. Raft Consensus Correctness
Test Infrastructure
use heliosdb_metadata::raft::MetadataNode;use std::time::Duration;
pub struct RaftTestCluster { pub nodes: Vec<MetadataNode>, pub network: NetworkSimulator,}
impl RaftTestCluster { pub async fn new(size: usize) -> Self { let mut nodes = Vec::new(); for i in 0..size { let node = MetadataNode::new_test(i as u64).await; nodes.push(node); }
// Connect nodes let network = NetworkSimulator::connect_all(&nodes).await;
Self { nodes, network } }
pub async fn current_leader(&self) -> Option<u64> { for node in &self.nodes { if node.is_leader().await { return Some(node.id()); } } None }
pub async fn kill_node(&mut self, node_id: u64) { self.network.disconnect(node_id).await; self.nodes[node_id as usize].shutdown().await; }
pub async fn partition(&mut self, group_a: Vec<u64>, group_b: Vec<u64>) { self.network.partition(group_a, group_b).await; }
pub async fn heal(&mut self) { self.network.heal_all().await; }}Consensus Tests
use heliosdb_metadata::raft::*;use std::time::Duration;
#[tokio::test]async fn test_leader_election() { // RAFT-001: Leader election on startup let cluster = RaftTestCluster::new(3).await;
// Wait for leader election tokio::time::sleep(Duration::from_secs(2)).await;
let leader_id = cluster.current_leader().await; assert!(leader_id.is_some(), "No leader elected");
// Verify only one leader let leader_count = cluster.nodes.iter() .filter(|n| n.is_leader()) .count(); assert_eq!(leader_count, 1, "Multiple leaders detected");}
#[tokio::test]async fn test_leader_re_election_on_failure() { // RAFT-002: New leader elected when current leader fails let mut cluster = RaftTestCluster::new(5).await;
tokio::time::sleep(Duration::from_secs(2)).await; let initial_leader = cluster.current_leader().await.unwrap();
// Kill the leader cluster.kill_node(initial_leader).await;
// Wait for re-election (should be < election timeout) tokio::time::sleep(Duration::from_secs(3)).await;
let new_leader = cluster.current_leader().await; assert!(new_leader.is_some(), "No new leader elected"); assert_ne!(new_leader.unwrap(), initial_leader, "Same leader re-elected");
// Verify new leader is functional let result = cluster.nodes[new_leader.unwrap() as usize] .propose_change("test_key", "test_value") .await; assert!(result.is_ok(), "New leader cannot process proposals");}
#[tokio::test]async fn test_log_replication() { // RAFT-003: Log entries replicated to all nodes let cluster = RaftTestCluster::new(3).await; tokio::time::sleep(Duration::from_secs(2)).await;
let leader_id = cluster.current_leader().await.unwrap(); let leader = &cluster.nodes[leader_id as usize];
// Propose multiple changes for i in 0..10 { leader.propose_change( &format!("key_{}", i), &format!("value_{}", i) ).await.unwrap(); }
// Wait for replication tokio::time::sleep(Duration::from_secs(1)).await;
// Verify all nodes have same log let leader_log = leader.get_log().await; for node in &cluster.nodes { if node.id() != leader_id { let follower_log = node.get_log().await; assert_eq!( follower_log.len(), leader_log.len(), "Node {} log diverged", node.id() );
for (idx, entry) in leader_log.iter().enumerate() { assert_eq!( follower_log[idx], *entry, "Log entry {} mismatch on node {}", idx, node.id() ); } } }}
#[tokio::test]async fn test_network_partition_recovery() { // RAFT-004: Cluster recovers from network partition let mut cluster = RaftTestCluster::new(5).await; tokio::time::sleep(Duration::from_secs(2)).await;
let initial_leader = cluster.current_leader().await.unwrap();
// Partition: {0, 1} vs {2, 3, 4} cluster.partition(vec![0, 1], vec![2, 3, 4]).await;
// Wait for new leader in majority partition tokio::time::sleep(Duration::from_secs(3)).await;
// Majority partition should have a leader let majority_has_leader = cluster.nodes[2..5] .iter() .any(|n| n.is_leader()); assert!(majority_has_leader, "Majority partition has no leader");
// Minority partition should have no leader let minority_has_leader = cluster.nodes[0..2] .iter() .any(|n| n.is_leader()); assert!(!minority_has_leader, "Minority partition elected leader");
// Heal partition cluster.heal().await; tokio::time::sleep(Duration::from_secs(2)).await;
// Should converge to single leader let leader_count = cluster.nodes.iter() .filter(|n| n.is_leader()) .count(); assert_eq!(leader_count, 1, "Multiple leaders after healing");
// All nodes should have consistent state let leader_state = cluster.nodes .iter() .find(|n| n.is_leader()) .unwrap() .get_state() .await;
for node in &cluster.nodes { if !node.is_leader() { let state = node.get_state().await; assert_eq!(state, leader_state, "Node {} state diverged", node.id()); } }}
#[tokio::test]async fn test_committed_entries_never_lost() { // RAFT-005: Committed entries are never lost let mut cluster = RaftTestCluster::new(5).await; tokio::time::sleep(Duration::from_secs(2)).await;
let leader_id = cluster.current_leader().await.unwrap(); let leader = &cluster.nodes[leader_id as usize];
// Commit some entries let keys: Vec<_> = (0..20) .map(|i| format!("persistent_key_{}", i)) .collect();
for key in &keys { leader.propose_change(key, &format!("value_for_{}", key)) .await .unwrap(); }
// Wait for commit tokio::time::sleep(Duration::from_secs(1)).await;
// Kill leader and 1 follower (but keep majority) cluster.kill_node(leader_id).await; cluster.kill_node((leader_id + 1) % 5).await;
// Wait for new leader tokio::time::sleep(Duration::from_secs(3)).await;
let new_leader_id = cluster.current_leader().await.unwrap(); let new_leader = &cluster.nodes[new_leader_id as usize];
// All committed keys should still be present for key in &keys { let value = new_leader.get(key).await; assert!( value.is_some(), "Committed key {} was lost", key ); assert_eq!( value.unwrap(), format!("value_for_{}", key) ); }}2. Sharding and Rebalancing Tests
use heliosdb_metadata::sharding::*;use heliosdb_storage::StorageNode;
#[tokio::test]async fn test_shard_assignment() { // SHARD-001: Consistent hashing distributes data evenly let metadata = MetadataService::new_test(3).await;
// Create table with sharding metadata.create_table(CreateTableRequest { name: "users".to_string(), shard_key: "user_id".to_string(), schema: vec![ Column { name: "user_id".to_string(), dtype: DataType::BigInt }, Column { name: "name".to_string(), dtype: DataType::VarChar(255) }, ], }).await.unwrap();
// Add 3 storage nodes let nodes = vec![ metadata.add_storage_node("node_0", "127.0.0.1:7000").await.unwrap(), metadata.add_storage_node("node_1", "127.0.0.1:7001").await.unwrap(), metadata.add_storage_node("node_2", "127.0.0.1:7002").await.unwrap(), ];
// Insert 10,000 keys and track distribution let mut distribution = std::collections::HashMap::new(); for i in 0..10_000 { let shard_id = metadata.get_shard_for_key("users", &i).await.unwrap(); *distribution.entry(shard_id).or_insert(0) += 1; }
// Verify even distribution (within 10% of ideal) let ideal = 10_000 / 3; for (shard, count) in distribution { let deviation = ((count as f64 - ideal as f64) / ideal as f64).abs(); assert!( deviation < 0.1, "Shard {} has {} items ({}% deviation)", shard, count, deviation * 100.0 ); }}
#[tokio::test]async fn test_rebalancing_on_node_add() { // SHARD-002: Adding node triggers rebalancing let metadata = MetadataService::new_test(3).await; let mut cluster = StorageCluster::new_test(3).await;
// Create table and insert data cluster.create_table("data", "id").await; for i in 0..100_000 { cluster.insert("data", vec![("id", i), ("value", i * 2)]).await.unwrap(); }
// Capture initial distribution let initial_dist = cluster.get_data_distribution().await;
// Add 4th node let new_node = cluster.add_node("node_3", "127.0.0.1:7003").await.unwrap();
// Wait for rebalancing tokio::time::sleep(Duration::from_secs(10)).await;
// Verify rebalancing occurred let final_dist = cluster.get_data_distribution().await;
// New node should have data assert!( final_dist.get(&new_node.id()).unwrap() > &0, "New node has no data after rebalancing" );
// Total data should be unchanged let initial_total: usize = initial_dist.values().sum(); let final_total: usize = final_dist.values().sum(); assert_eq!(initial_total, final_total, "Data lost during rebalancing");
// Distribution should be more even let final_ideal = 100_000 / 4; for (node_id, count) in final_dist { let deviation = ((count as f64 - final_ideal as f64) / final_ideal as f64).abs(); assert!( deviation < 0.15, "Node {} has {} items after rebalancing ({}% deviation)", node_id, count, deviation * 100.0 ); }}
#[tokio::test]async fn test_no_data_loss_during_rebalancing() { // SHARD-003: No data lost during rebalancing let mut cluster = StorageCluster::new_test(3).await;
cluster.create_table("critical", "id").await;
// Insert test data let test_keys: Vec<_> = (0..50_000).collect(); for id in &test_keys { cluster.insert("critical", vec![("id", id), ("data", format!("value_{}", id))]) .await .unwrap(); }
// Start rebalancing by adding node cluster.add_node("node_3", "127.0.0.1:7003").await.unwrap();
// Continuously verify data presence during rebalancing for _ in 0..20 { tokio::time::sleep(Duration::from_millis(500)).await;
// Sample random keys for _ in 0..100 { let key = test_keys[rand::random::<usize>() % test_keys.len()]; let result = cluster.query( "SELECT data FROM critical WHERE id = ?", &[key] ).await;
assert!( result.is_ok() && result.unwrap().rows.len() == 1, "Key {} missing during rebalancing", key ); } }
// Wait for rebalancing to complete cluster.wait_for_rebalancing_complete().await;
// Verify all keys still exist for id in &test_keys { let result = cluster.query( "SELECT data FROM critical WHERE id = ?", &[id] ).await .unwrap();
assert_eq!(result.rows.len(), 1, "Key {} lost after rebalancing", id); assert_eq!( result.rows[0].get::<String>("data"), format!("value_{}", id) ); }}
#[tokio::test]async fn test_rebalancing_with_concurrent_writes() { // SHARD-004: Rebalancing works correctly with concurrent writes let mut cluster = StorageCluster::new_test(3).await; cluster.create_table("concurrent", "id").await;
// Start background writer let cluster_clone = cluster.clone(); let writer_handle = tokio::spawn(async move { for i in 0..100_000 { cluster_clone.insert( "concurrent", vec![("id", i), ("value", i * 3)] ).await.unwrap(); tokio::time::sleep(Duration::from_micros(100)).await; } });
// Trigger rebalancing mid-writes tokio::time::sleep(Duration::from_secs(2)).await; cluster.add_node("node_3", "127.0.0.1:7003").await.unwrap();
// Wait for writer to complete writer_handle.await.unwrap();
// Wait for rebalancing cluster.wait_for_rebalancing_complete().await;
// Verify all writes succeeded and are present let count = cluster.query("SELECT COUNT(*) as cnt FROM concurrent", &[]) .await .unwrap() .rows[0] .get::<i64>("cnt");
assert_eq!(count, 100_000, "Writes lost during rebalancing");}3. Replication and Failover Tests
use heliosdb_storage::replication::*;
#[tokio::test]async fn test_synchronous_replication() { // REP-001: Writes are synchronously replicated to mirror let cluster = StorageCluster::with_replication(1, 1).await; // 1 primary, 1 mirror
let primary = cluster.get_primary(0).await; let mirror = cluster.get_mirror(0).await;
// Write to primary primary.write(b"key1", b"value1").await.unwrap();
// Immediately read from mirror (should be present due to sync replication) let value = mirror.read(b"key1").await.unwrap(); assert_eq!(value, Some(b"value1".to_vec()));}
#[tokio::test]async fn test_rpo_zero_on_primary_failure() { // REP-002: RPO=0 (no data loss) when primary fails let cluster = StorageCluster::with_replication(1, 1).await;
let primary = cluster.get_primary(0).await;
// Write 1000 records for i in 0..1000 { primary.write( &format!("key_{}", i).into_bytes(), &format!("value_{}", i).into_bytes() ).await.unwrap(); }
// Immediately kill primary (no delay for async replication) cluster.kill_primary(0).await;
// Promote mirror cluster.promote_mirror_to_primary(0).await; let new_primary = cluster.get_primary(0).await;
// Verify all 1000 records present for i in 0..1000 { let key = format!("key_{}", i).into_bytes(); let value = new_primary.read(&key).await.unwrap(); assert!( value.is_some(), "Key key_{} missing after failover", i ); assert_eq!( value.unwrap(), format!("value_{}", i).into_bytes() ); }}
#[tokio::test]async fn test_witness_quorum_failover() { // REP-003: Witness provides quorum for split-brain prevention let cluster = ReplicationCluster::with_witness(1).await; // 1 shard with witness
let primary_id = cluster.get_primary_id(0).await; let mirror_id = cluster.get_mirror_id(0).await; let witness_id = cluster.get_witness_id(0).await;
// Partition: primary isolated from mirror+witness cluster.partition_network( vec![primary_id], vec![mirror_id, witness_id] ).await;
// Wait for failover decision tokio::time::sleep(Duration::from_secs(5)).await;
// Mirror should be promoted (has quorum with witness) assert!(cluster.is_primary(mirror_id).await); assert!(!cluster.is_primary(primary_id).await);
// Heal network cluster.heal_network().await; tokio::time::sleep(Duration::from_secs(2)).await;
// Old primary should recognize new primary and become follower assert!(!cluster.is_primary(primary_id).await); assert!(cluster.is_primary(mirror_id).await);}
#[tokio::test]async fn test_no_split_brain() { // REP-004: Split-brain prevention via witness quorum let cluster = ReplicationCluster::with_witness(1).await;
let primary_id = cluster.get_primary_id(0).await; let mirror_id = cluster.get_mirror_id(0).await; let witness_id = cluster.get_witness_id(0).await;
// Partition: mirror isolated (no quorum) cluster.partition_network( vec![primary_id, witness_id], vec![mirror_id] ).await;
tokio::time::sleep(Duration::from_secs(5)).await;
// Primary should remain active (has witness quorum) assert!(cluster.is_primary(primary_id).await);
// Mirror should NOT self-promote (no quorum) assert!(!cluster.is_primary(mirror_id).await);
// Both should accept writes from clients let primary_write = cluster.write_to_node(primary_id, b"key", b"value1").await; assert!(primary_write.is_ok(), "Primary cannot accept writes");
let mirror_write = cluster.write_to_node(mirror_id, b"key", b"value2").await; assert!(mirror_write.is_err(), "Mirror accepted write without being primary");}
#[tokio::test]async fn test_automatic_failover_latency() { // REP-005: Failover completes within acceptable time let cluster = ReplicationCluster::with_witness(1).await;
let start = std::time::Instant::now();
// Kill primary let primary_id = cluster.get_primary_id(0).await; cluster.kill_node(primary_id).await;
// Wait for mirror promotion loop { if cluster.has_active_primary(0).await { break; } tokio::time::sleep(Duration::from_millis(100)).await;
if start.elapsed() > Duration::from_secs(30) { panic!("Failover did not complete within 30 seconds"); } }
let failover_duration = start.elapsed(); println!("Failover completed in {:?}", failover_duration);
// Failover should complete in < 10 seconds assert!( failover_duration < Duration::from_secs(10), "Failover took too long: {:?}", failover_duration );}4. Cache Invalidation Tests
use heliosdb_compute::cache::*;
#[tokio::test]async fn test_cache_invalidation_on_write() { // CACHE-001: Compute node cache invalidated on write let cluster = TestCluster::builder() .storage_nodes(2) .compute_nodes(2) .build() .await;
// Read from compute-0 (caches data) let compute_0 = cluster.get_compute_node(0).await; let result = compute_0.query("SELECT * FROM data WHERE id = 1").await.unwrap(); assert!(compute_0.cache_contains(1).await);
// Write via compute-1 let compute_1 = cluster.get_compute_node(1).await; compute_1.execute("UPDATE data SET value = 999 WHERE id = 1").await.unwrap();
// Wait for invalidation broadcast tokio::time::sleep(Duration::from_millis(100)).await;
// compute-0 cache should be invalidated assert!(!compute_0.cache_contains(1).await);
// Next read should fetch fresh data let result = compute_0.query("SELECT value FROM data WHERE id = 1").await.unwrap(); assert_eq!(result.rows[0].get::<i64>("value"), 999);}
#[tokio::test]async fn test_distributed_cache_coherence() { // CACHE-002: Cache coherent across all compute nodes let cluster = TestCluster::builder() .storage_nodes(1) .compute_nodes(5) .build() .await;
// All compute nodes read same key (cache it) for i in 0..5 { let compute = cluster.get_compute_node(i).await; compute.query("SELECT * FROM data WHERE id = 100").await.unwrap(); assert!(compute.cache_contains(100).await); }
// Update via compute-0 let compute_0 = cluster.get_compute_node(0).await; compute_0.execute("UPDATE data SET value = 777 WHERE id = 100").await.unwrap();
// Wait for invalidation tokio::time::sleep(Duration::from_millis(200)).await;
// All compute nodes should have invalidated cache for i in 0..5 { let compute = cluster.get_compute_node(i).await; assert!( !compute.cache_contains(100).await, "Compute-{} cache not invalidated", i ); }}5. Distributed Transaction Tests
use heliosdb_compute::transactions::*;
#[tokio::test]async fn test_cross_shard_transaction_commit() { // TX-001: Cross-shard transaction commits atomically let cluster = TestCluster::builder() .storage_nodes(3) .compute_nodes(1) .build() .await;
let compute = cluster.get_compute_node(0).await;
// Start distributed transaction let tx = compute.begin_transaction().await;
// Write to multiple shards tx.execute("INSERT INTO shard_0_table (id, value) VALUES (1, 100)").await.unwrap(); tx.execute("INSERT INTO shard_1_table (id, value) VALUES (2, 200)").await.unwrap(); tx.execute("INSERT INTO shard_2_table (id, value) VALUES (3, 300)").await.unwrap();
// Commit tx.commit().await.unwrap();
// Verify all writes visible let r0 = compute.query("SELECT value FROM shard_0_table WHERE id = 1").await.unwrap(); let r1 = compute.query("SELECT value FROM shard_1_table WHERE id = 2").await.unwrap(); let r2 = compute.query("SELECT value FROM shard_2_table WHERE id = 3").await.unwrap();
assert_eq!(r0.rows[0].get::<i64>("value"), 100); assert_eq!(r1.rows[0].get::<i64>("value"), 200); assert_eq!(r2.rows[0].get::<i64>("value"), 300);}
#[tokio::test]async fn test_cross_shard_transaction_rollback() { // TX-002: Cross-shard transaction rolls back atomically let cluster = TestCluster::builder() .storage_nodes(3) .compute_nodes(1) .build() .await;
let compute = cluster.get_compute_node(0).await; let tx = compute.begin_transaction().await;
// Write to multiple shards tx.execute("INSERT INTO shard_0_table (id, value) VALUES (10, 1000)").await.unwrap(); tx.execute("INSERT INTO shard_1_table (id, value) VALUES (20, 2000)").await.unwrap();
// Rollback tx.rollback().await.unwrap();
// Verify no writes are visible let r0 = compute.query("SELECT COUNT(*) as cnt FROM shard_0_table WHERE id = 10").await.unwrap(); let r1 = compute.query("SELECT COUNT(*) as cnt FROM shard_1_table WHERE id = 20").await.unwrap();
assert_eq!(r0.rows[0].get::<i64>("cnt"), 0); assert_eq!(r1.rows[0].get::<i64>("cnt"), 0);}
#[tokio::test]async fn test_transaction_isolation() { // TX-003: Read Committed isolation level let cluster = TestCluster::new_test().await;
let compute1 = cluster.get_compute_node(0).await; let compute2 = cluster.get_compute_node(1).await;
// TX1: Begin and write (uncommitted) let tx1 = compute1.begin_transaction().await; tx1.execute("INSERT INTO test (id, value) VALUES (1, 100)").await.unwrap();
// TX2: Should not see uncommitted write let tx2 = compute2.begin_transaction().await; let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap(); assert_eq!(result.rows.len(), 0);
// TX1: Commit tx1.commit().await.unwrap();
// TX2: Now should see committed write (with new query) let result = tx2.query("SELECT value FROM test WHERE id = 1").await.unwrap(); assert_eq!(result.rows.len(), 1); assert_eq!(result.rows[0].get::<i64>("value"), 100);}Test Execution
# Run all distributed testscargo test --test distributed
# Run specific categorycargo test --test raft_consensuscargo test --test shardingcargo test --test replicationcargo test --test cache_invalidation
# Run with detailed loggingRUST_LOG=debug cargo test --test distributed -- --nocapture
# Run chaos scenarios (requires Docker)cargo test --test chaos -- --ignoredQuality Metrics
Success Criteria
- All Raft consensus tests pass (5-node cluster)
- Rebalancing completes without data loss
- Failover latency < 10 seconds
- Cache invalidation latency < 100ms
- Cross-shard transactions maintain ACID properties
Performance Targets
- Leader election: < 3 seconds
- Rebalancing throughput: > 100 MB/s
- Failover RTO: < 10 seconds
- Cache invalidation propagation: < 100ms