Two-Phase Commit Testing Infrastructure Architecture
Two-Phase Commit Testing Infrastructure Architecture
Document Version: 1.0 Created: November 28, 2025 Status: READY FOR IMPLEMENTATION Phase: Phase 1, Weeks 8-19 Team: 2 Engineers + 3 AI Agents Priority: P0 - PRODUCTION BLOCKER #4
Executive Summary
This document specifies the complete testing infrastructure for validating HeliosDB’s Two-Phase Commit (2PC) implementation at production scale. The design focuses on practical, automatable scenarios that validate correctness, performance, and fault tolerance without requiring massive infrastructure.
Problem Statement
Current State (Week 8):
- 2PC implementation exists (
xa_coordinator.rs,xa_participant.rs,xa_log.rs) - Basic tests exist (42 tests in
xa_transactions_test.rs) - User documentation complete (21_distributed_transactions_2pc.md)
- ❌ NO scale testing (>100 nodes)
- ❌ NO chaos engineering (failure injection)
- ❌ NO distributed correctness validation
- ❌ NO performance regression testing
Gap: Cannot certify production readiness without comprehensive testing infrastructure.
What This Infrastructure Provides
1. Simulation-Based Scale Testing
- Simulate 1000+ node scenarios on single VM
- Use virtual participants with configurable behavior
- Memory-efficient testing (100MB per 1000 nodes)
2. Chaos Engineering Framework
- 50+ failure scenario definitions
- Deterministic and random failure injection
- Network partition/latency simulation
- Crash/recovery testing
3. Correctness Validation
- Atomicity verification (all-or-nothing commits)
- Isolation verification (no dirty reads)
- Durability verification (WAL-based recovery)
- Serializability testing (conflict detection)
4. Performance Monitoring
- Real-time metrics collection
- Regression detection
- Bottleneck identification
- SLA validation
Success Criteria (Week 19)
All 50+ failure scenarios pass with zero data corruption 1000-node distributed transactions validated <5% performance degradation under chaos 100% atomicity guarantee across all scenarios Recovery time <10s for all failure modes Automated CI/CD integrationResource Requirements
Development (Weeks 8-11):
- 2 Engineers: Test framework implementation
- 3 AI Agents: Scenario generation, validation code
- Budget: $200K
Execution (Weeks 12-19):
- Single 16-core VM (64GB RAM)
- No multi-machine cluster required
- Total runtime: ~48 hours for full suite
Table of Contents
- Architecture Overview
- Component Specifications
- Failure Scenario Catalog
- Test Harness Design
- Validation Framework
- Implementation Roadmap
- Module Templates
- Integration Guide
1. Architecture Overview
1.1 System Architecture
┌─────────────────────────────────────────────────────────────────┐│ 2PC Test Orchestrator ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Test Runner │─▶│ Coordinator │─▶│ Validator │ ││ │ (Main) │ │ Simulator │ │ (Results) │ ││ └──────────────┘ └──────────────┘ └──────────────┘ │└─────────────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼┌─────────────────┐ ┌──────────────────┐ ┌───────────────────┐│ Scenario │ │ Virtual │ │ Correctness ││ Generator │ │ Participant │ │ Checker ││ │ │ Pool (1000+) │ │ ││ • Load from DB │ │ │ │ • WAL validator ││ • Randomize │ │ • In-memory │ │ • Atomicity check ││ • Replay │ │ • Configurable │ │ • Isolation check │└─────────────────┘ └──────────────────┘ └───────────────────┘ │ │ │ ▼ ▼ ▼┌─────────────────────────────────────────────────────────────────┐│ Failure Injector ││ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ││ │ Network │ │ Node │ │ Timing │ ││ │ Faults │ │ Crashes │ │ Delays │ ││ └──────────────┘ └──────────────┘ └──────────────┘ │└─────────────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼┌─────────────────────────────────────────────────────────────────┐│ Metrics & Monitoring Layer ││ • Transaction throughput ││ • Commit latency (p50, p99, p99.9) ││ • Failure recovery time ││ • Data corruption detection │└─────────────────────────────────────────────────────────────────┘1.2 Virtual Participant Design
Key Innovation: Simulate 1000+ participants without 1000 processes.
/// Lightweight in-memory participant for scale testingpub struct VirtualParticipant { pub id: ParticipantId, pub state: ParticipantState, pub config: ParticipantConfig,
// Simulated storage (in-memory) pub data: HashMap<Key, Value>, pub wal: Vec<WalEntry>, pub prepared_txns: HashMap<Gid, PreparedTransaction>,
// Behavior configuration pub failure_mode: Option<FailureMode>, pub network_latency_ms: u64, pub crash_probability: f64,}
pub enum ParticipantState { Healthy, Slow, // Responds with delay NetworkDown, // Doesn't respond Crashed, // Permanently unavailable Byzantine, // Sends incorrect responses}
impl VirtualParticipant { /// Process PREPARE request pub fn prepare(&mut self, gid: &Gid, writes: &[(Key, Value)]) -> Result<PrepareVote> { // Simulate failure modes self.inject_failure()?;
// Simulate network latency thread::sleep(Duration::from_millis(self.network_latency_ms));
// Execute prepare logic if self.can_commit(writes) { self.prepared_txns.insert(gid.clone(), PreparedTransaction { gid: gid.clone(), writes: writes.to_vec(), prepared_at: Instant::now(), });
// Write to WAL self.wal.push(WalEntry::Prepare(gid.clone()));
Ok(PrepareVote::Yes) } else { Ok(PrepareVote::No) } }
/// Process COMMIT request pub fn commit(&mut self, gid: &Gid) -> Result<()> { self.inject_failure()?;
if let Some(prepared_txn) = self.prepared_txns.remove(gid) { // Apply writes for (key, value) in prepared_txn.writes { self.data.insert(key, value); }
// Write to WAL self.wal.push(WalEntry::Commit(gid.clone()));
Ok(()) } else { Err(Error::TransactionNotPrepared(gid.clone())) } }
/// Simulate crash and recovery pub fn crash_and_recover(&mut self) -> Result<()> { // Lose in-memory state self.data.clear(); self.prepared_txns.clear();
// Recover from WAL self.recover_from_wal()?;
self.state = ParticipantState::Healthy; Ok(()) }
/// Inject configured failure fn inject_failure(&self) -> Result<()> { if let Some(ref failure_mode) = self.failure_mode { match failure_mode { FailureMode::AlwaysCrash => return Err(Error::NodeCrashed), FailureMode::RandomCrash if rand::random::<f64>() < self.crash_probability => { return Err(Error::NodeCrashed) }, FailureMode::NetworkPartition => return Err(Error::NetworkTimeout), FailureMode::SlowResponse => { thread::sleep(Duration::from_secs(10)); }, _ => {} } } Ok(()) }}1.3 Test Execution Model
Single-VM Testing Approach:
/// Test harness runs all components in single processpub struct TwoPhaseCommitTestHarness { /// Real XA coordinator (production code) coordinator: Arc<XaCoordinator>,
/// Pool of virtual participants (1000+) participants: HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
/// Failure injection controller failure_injector: FailureInjector,
/// Correctness validator validator: CorrectnessValidator,
/// Metrics collector metrics: MetricsCollector,}
impl TwoPhaseCommitTestHarness { /// Execute test scenario pub async fn run_scenario(&mut self, scenario: TestScenario) -> TestResult { // 1. Setup: Configure participants and failures self.setup_scenario(&scenario)?;
// 2. Execute: Run distributed transactions let txn_results = self.execute_transactions(&scenario.transactions).await?;
// 3. Inject failures at configured times self.failure_injector.inject_failures(&scenario.failures).await?;
// 4. Wait for recovery/timeout self.wait_for_quiescence().await?;
// 5. Validate correctness let validation_result = self.validator.validate(&txn_results)?;
// 6. Collect metrics let metrics = self.metrics.collect();
Ok(TestResult { scenario_name: scenario.name, validation: validation_result, metrics, passed: validation_result.is_ok() && metrics.meets_sla(), }) }}2. Component Specifications
2.1 Test Harness (2pc_test_harness.rs)
Purpose: Orchestrate test execution, coordinate components LOC: ~1,200 Owner: Engineer 1
use std::collections::HashMap;use std::sync::{Arc, Mutex};use std::time::{Duration, Instant};use heliosdb_compute::xa_coordinator::XaCoordinator;use heliosdb_storage::xa_participant::XaParticipant;
/// Main test harness for 2PC testingpub struct TwoPhaseCommitTestHarness { /// Production XA coordinator coordinator: Arc<XaCoordinator>,
/// Virtual participant pool participants: HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
/// Failure injection engine failure_injector: FailureInjector,
/// Correctness validator validator: CorrectnessValidator,
/// Metrics collector metrics: Arc<Mutex<MetricsCollector>>,
/// Configuration config: TestHarnessConfig,}
#[derive(Clone)]pub struct TestHarnessConfig { /// Number of virtual participants pub num_participants: usize,
/// Base network latency (ms) pub base_latency_ms: u64,
/// Enable detailed logging pub verbose: bool,
/// Timeout for coordinator operations (ms) pub coordinator_timeout_ms: u64,
/// Maximum concurrent transactions pub max_concurrent_txns: usize,}
impl Default for TestHarnessConfig { fn default() -> Self { Self { num_participants: 100, base_latency_ms: 5, verbose: false, coordinator_timeout_ms: 5000, max_concurrent_txns: 1000, } }}
impl TwoPhaseCommitTestHarness { /// Create new test harness pub fn new(config: TestHarnessConfig) -> Result<Self> { // Initialize coordinator let coordinator_config = XaCoordinatorConfig { max_prepared_transactions: config.max_concurrent_txns, timeout_ms: config.coordinator_timeout_ms, ..Default::default() }; let coordinator = Arc::new(XaCoordinator::new(coordinator_config)?);
// Create virtual participant pool let mut participants = HashMap::new(); for i in 0..config.num_participants { let participant_id = ParticipantId::new(format!("participant_{}", i)); let participant = VirtualParticipant::new(participant_id.clone(), config.clone()); participants.insert(participant_id, Arc::new(Mutex::new(participant))); }
Ok(Self { coordinator, participants, failure_injector: FailureInjector::new(), validator: CorrectnessValidator::new(), metrics: Arc::new(Mutex::new(MetricsCollector::new())), config, }) }
/// Run single test scenario pub async fn run_scenario(&mut self, scenario: TestScenario) -> TestResult { println!("Running scenario: {}", scenario.name);
// Setup self.setup_scenario(&scenario)?;
// Start metrics collection let metrics_handle = self.metrics.clone();
// Execute transactions let txn_results = self.execute_transactions(&scenario).await?;
// Wait for quiescence tokio::time::sleep(Duration::from_millis(100)).await;
// Validate let validation = self.validator.validate(&txn_results, &self.participants)?;
// Collect metrics let metrics = metrics_handle.lock().unwrap().snapshot();
let passed = validation.all_checks_passed() && metrics.meets_sla(&scenario.sla);
Ok(TestResult { scenario_name: scenario.name.clone(), validation, metrics, passed, duration: Duration::from_secs(0), // Set properly }) }
/// Setup scenario configuration fn setup_scenario(&mut self, scenario: &TestScenario) -> Result<()> { // Configure failure modes for (participant_id, failure_config) in &scenario.failure_configs { if let Some(participant) = self.participants.get(participant_id) { let mut p = participant.lock().unwrap(); p.configure_failure(failure_config.clone()); } }
// Configure network conditions self.failure_injector.configure(&scenario.network_conditions);
Ok(()) }
/// Execute distributed transactions async fn execute_transactions(&mut self, scenario: &TestScenario) -> Result<Vec<TransactionResult>> { let mut results = Vec::new(); let mut handles = Vec::new();
for txn in &scenario.transactions { let coordinator = self.coordinator.clone(); let participants = self.get_participants_for_txn(txn); let metrics = self.metrics.clone();
let handle = tokio::spawn(async move { Self::execute_single_transaction( coordinator, participants, txn.clone(), metrics ).await });
handles.push(handle); }
// Await all transactions for handle in handles { results.push(handle.await??); }
Ok(results) }
/// Execute single distributed transaction async fn execute_single_transaction( coordinator: Arc<XaCoordinator>, participants: Vec<Arc<Mutex<VirtualParticipant>>>, txn: DistributedTransaction, metrics: Arc<Mutex<MetricsCollector>>, ) -> Result<TransactionResult> { let start = Instant::now();
// Begin transaction let gid = Gid::new(txn.id.clone());
// Phase 1: PREPARE let prepare_start = Instant::now(); let mut votes = Vec::new();
for participant in &participants { let mut p = participant.lock().unwrap(); let vote = p.prepare(&gid, &txn.writes)?; votes.push(vote); }
let prepare_duration = prepare_start.elapsed(); metrics.lock().unwrap().record_prepare_latency(prepare_duration);
// All must vote YES let all_yes = votes.iter().all(|v| matches!(v, PrepareVote::Yes));
// Phase 2: COMMIT or ABORT let commit_start = Instant::now(); let outcome = if all_yes { for participant in &participants { participant.lock().unwrap().commit(&gid)?; } TransactionOutcome::Committed } else { for participant in &participants { participant.lock().unwrap().abort(&gid)?; } TransactionOutcome::Aborted };
let commit_duration = commit_start.elapsed(); metrics.lock().unwrap().record_commit_latency(commit_duration);
let total_duration = start.elapsed();
Ok(TransactionResult { gid, outcome, prepare_duration, commit_duration, total_duration, }) }
fn get_participants_for_txn(&self, txn: &DistributedTransaction) -> Vec<Arc<Mutex<VirtualParticipant>>> { txn.participant_ids .iter() .filter_map(|id| self.participants.get(id).cloned()) .collect() }}
/// Test scenario definition#[derive(Clone)]pub struct TestScenario { pub name: String, pub description: String, pub transactions: Vec<DistributedTransaction>, pub failure_configs: HashMap<ParticipantId, FailureConfig>, pub network_conditions: NetworkConditions, pub sla: SlaRequirements,}
/// Distributed transaction definition#[derive(Clone)]pub struct DistributedTransaction { pub id: String, pub participant_ids: Vec<ParticipantId>, pub writes: Vec<(Key, Value)>,}
/// Transaction execution result#[derive(Debug)]pub struct TransactionResult { pub gid: Gid, pub outcome: TransactionOutcome, pub prepare_duration: Duration, pub commit_duration: Duration, pub total_duration: Duration,}
#[derive(Debug, PartialEq)]pub enum TransactionOutcome { Committed, Aborted, Failed(String),}
/// Test execution resultpub struct TestResult { pub scenario_name: String, pub validation: ValidationResult, pub metrics: MetricsSnapshot, pub passed: bool, pub duration: Duration,}
impl TestResult { pub fn print_summary(&self) { println!("\n=== Test Result: {} ===", self.scenario_name); println!("Status: {}", if self.passed { "✓ PASS" } else { "✗ FAIL" }); println!("\nValidation:"); self.validation.print(); println!("\nMetrics:"); self.metrics.print(); println!("Duration: {:?}", self.duration); }}2.2 Failure Injector (failure_injector.rs)
Purpose: Inject failures at deterministic or random times LOC: ~800 Owner: Engineer 1 + AI Agent
use std::collections::HashMap;use std::time::{Duration, Instant};use std::sync::{Arc, Mutex};use rand::Rng;
/// Failure injection enginepub struct FailureInjector { /// Scheduled failure events scheduled_failures: Vec<ScheduledFailure>,
/// Network fault simulator network_simulator: NetworkSimulator,
/// Active failures active_failures: HashMap<ParticipantId, FailureMode>,}
impl FailureInjector { pub fn new() -> Self { Self { scheduled_failures: Vec::new(), network_simulator: NetworkSimulator::new(), active_failures: HashMap::new(), } }
/// Configure network conditions pub fn configure(&mut self, conditions: &NetworkConditions) { self.network_simulator.configure(conditions); }
/// Schedule failure at specific time pub fn schedule_failure(&mut self, failure: ScheduledFailure) { self.scheduled_failures.push(failure); }
/// Inject failures according to schedule pub async fn inject_failures(&mut self, scenario_start: Instant) -> Result<()> { for failure in &self.scheduled_failures { let delay = failure.delay_from_start; tokio::time::sleep(delay).await;
self.activate_failure(&failure.participant_id, failure.mode.clone())?; }
Ok(()) }
/// Activate failure mode for participant fn activate_failure(&mut self, participant_id: &ParticipantId, mode: FailureMode) -> Result<()> { self.active_failures.insert(participant_id.clone(), mode); Ok(()) }
/// Clear all failures pub fn clear_failures(&mut self) { self.active_failures.clear(); self.scheduled_failures.clear(); }}
/// Scheduled failure event#[derive(Clone)]pub struct ScheduledFailure { pub participant_id: ParticipantId, pub mode: FailureMode, pub delay_from_start: Duration, pub duration: Option<Duration>, // None = permanent}
/// Failure modes#[derive(Clone, Debug)]pub enum FailureMode { /// Node crashes immediately Crash,
/// Node crashes with probability on each request RandomCrash { probability: f64 },
/// Network partition (can't communicate) NetworkPartition,
/// Slow responses (delay all requests) SlowResponse { delay_ms: u64 },
/// Intermittent failures (fail X% of requests) Intermittent { failure_rate: f64 },
/// Byzantine (send incorrect responses) Byzantine { behavior: ByzantineBehavior },
/// Out of memory OutOfMemory,
/// Disk full DiskFull,}
#[derive(Clone, Debug)]pub enum ByzantineBehavior { /// Vote YES in prepare but don't actually prepare FalseYesVote,
/// Commit without prepare CommitWithoutPrepare,
/// Send different votes to different coordinators InconsistentVotes,}
/// Network conditions simulatorpub struct NetworkSimulator { /// Base latency (ms) base_latency_ms: u64,
/// Latency variance (ms) latency_variance_ms: u64,
/// Packet loss probability packet_loss_rate: f64,
/// Bandwidth limit (bytes/sec) bandwidth_limit: Option<u64>,}
impl NetworkSimulator { pub fn new() -> Self { Self { base_latency_ms: 5, latency_variance_ms: 2, packet_loss_rate: 0.0, bandwidth_limit: None, } }
pub fn configure(&mut self, conditions: &NetworkConditions) { self.base_latency_ms = conditions.base_latency_ms; self.latency_variance_ms = conditions.latency_variance_ms; self.packet_loss_rate = conditions.packet_loss_rate; self.bandwidth_limit = conditions.bandwidth_limit; }
/// Simulate network delay pub fn simulate_delay(&self) -> Duration { let mut rng = rand::thread_rng(); let variance = rng.gen_range(0..=self.latency_variance_ms); let total_latency = self.base_latency_ms + variance; Duration::from_millis(total_latency) }
/// Check if packet should be dropped pub fn should_drop_packet(&self) -> bool { let mut rng = rand::thread_rng(); rng.gen::<f64>() < self.packet_loss_rate }}
#[derive(Clone)]pub struct NetworkConditions { pub base_latency_ms: u64, pub latency_variance_ms: u64, pub packet_loss_rate: f64, pub bandwidth_limit: Option<u64>,}
impl Default for NetworkConditions { fn default() -> Self { Self { base_latency_ms: 5, latency_variance_ms: 2, packet_loss_rate: 0.0, bandwidth_limit: None, } }}
/// Failure configuration for participant#[derive(Clone)]pub struct FailureConfig { pub mode: FailureMode, pub trigger_time: FailureTriggerTime,}
#[derive(Clone)]pub enum FailureTriggerTime { /// Fail immediately at test start Immediate,
/// Fail after delay Delayed(Duration),
/// Fail at specific transaction number AtTransaction(usize),
/// Fail randomly during test Random,}2.3 Correctness Validator (correctness_validator.rs)
Purpose: Verify ACID properties and correctness invariants LOC: ~600 Owner: Engineer 2
use std::collections::{HashMap, HashSet};
/// Correctness validation enginepub struct CorrectnessValidator { /// Track all transaction outcomes transaction_log: Vec<TransactionRecord>,
/// Track data state across all participants global_state_tracker: GlobalStateTracker,}
impl CorrectnessValidator { pub fn new() -> Self { Self { transaction_log: Vec::new(), global_state_tracker: GlobalStateTracker::new(), } }
/// Validate test execution results pub fn validate( &mut self, txn_results: &[TransactionResult], participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<ValidationResult> { let mut result = ValidationResult::default();
// Check 1: Atomicity (all-or-nothing) result.atomicity = self.check_atomicity(txn_results, participants)?;
// Check 2: Consistency (invariants maintained) result.consistency = self.check_consistency(participants)?;
// Check 3: Isolation (no dirty reads) result.isolation = self.check_isolation(txn_results, participants)?;
// Check 4: Durability (WAL recovery) result.durability = self.check_durability(participants)?;
// Check 5: No data corruption result.data_integrity = self.check_data_integrity(participants)?;
Ok(result) }
/// Check atomicity: committed txns visible everywhere, aborted txns nowhere fn check_atomicity( &self, txn_results: &[TransactionResult], participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<CheckResult> { let mut failures = Vec::new();
for txn_result in txn_results { match txn_result.outcome { TransactionOutcome::Committed => { // All participants must have committed writes for (pid, participant) in participants { let p = participant.lock().unwrap();
// Check if writes are present let all_writes_present = self.check_writes_present( &p, &txn_result.gid )?;
if !all_writes_present { failures.push(format!( "Transaction {} committed but writes missing on {}", txn_result.gid, pid )); } } }, TransactionOutcome::Aborted => { // No participant should have committed writes for (pid, participant) in participants { let p = participant.lock().unwrap();
let any_writes_present = self.check_any_writes_present( &p, &txn_result.gid )?;
if any_writes_present { failures.push(format!( "Transaction {} aborted but writes present on {}", txn_result.gid, pid )); } } }, TransactionOutcome::Failed(_) => { // Same as aborted } } }
Ok(CheckResult { passed: failures.is_empty(), failures, }) }
/// Check consistency: application-level invariants fn check_consistency( &self, participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<CheckResult> { let mut failures = Vec::new();
// Example: Bank transfer invariant (total balance unchanged) let initial_total = self.calculate_total_balance_initial(); let final_total = self.calculate_total_balance(participants)?;
if initial_total != final_total { failures.push(format!( "Balance invariant violated: initial={}, final={}", initial_total, final_total )); }
Ok(CheckResult { passed: failures.is_empty(), failures, }) }
/// Check isolation: no dirty reads, no non-repeatable reads fn check_isolation( &self, txn_results: &[TransactionResult], participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<CheckResult> { let mut failures = Vec::new();
// Check for dirty reads (reads from uncommitted transactions) for txn_result in txn_results { // Implementation depends on read tracking // For now, check that all reads were from committed transactions }
Ok(CheckResult { passed: failures.is_empty(), failures, }) }
/// Check durability: WAL can recover all committed transactions fn check_durability( &self, participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<CheckResult> { let mut failures = Vec::new();
for (pid, participant) in participants { let mut p = participant.lock().unwrap();
// Simulate crash and recovery let data_before = p.data.clone(); p.crash_and_recover()?; let data_after = p.data.clone();
// Verify data is identical after recovery if data_before != data_after { failures.push(format!( "Participant {} lost data during recovery", pid )); } }
Ok(CheckResult { passed: failures.is_empty(), failures, }) }
/// Check data integrity: no corruption, checksums valid fn check_data_integrity( &self, participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<CheckResult> { let mut failures = Vec::new();
for (pid, participant) in participants { let p = participant.lock().unwrap();
// Verify WAL checksums for wal_entry in &p.wal { if !wal_entry.verify_checksum() { failures.push(format!( "WAL corruption detected on participant {}", pid )); } }
// Verify data checksums (if implemented) // ... }
Ok(CheckResult { passed: failures.is_empty(), failures, }) }
fn check_writes_present(&self, participant: &VirtualParticipant, gid: &Gid) -> Result<bool> { // Check if transaction's writes are in participant's data // Implementation specific to test scenario Ok(true) }
fn check_any_writes_present(&self, participant: &VirtualParticipant, gid: &Gid) -> Result<bool> { Ok(false) }
fn calculate_total_balance_initial(&self) -> i64 { // Return initial total from test setup 0 }
fn calculate_total_balance( &self, participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, ) -> Result<i64> { let mut total = 0i64;
for participant in participants.values() { let p = participant.lock().unwrap(); // Sum balances from participant data // Implementation specific }
Ok(total) }}
/// Validation result#[derive(Default)]pub struct ValidationResult { pub atomicity: CheckResult, pub consistency: CheckResult, pub isolation: CheckResult, pub durability: CheckResult, pub data_integrity: CheckResult,}
impl ValidationResult { pub fn all_checks_passed(&self) -> bool { self.atomicity.passed && self.consistency.passed && self.isolation.passed && self.durability.passed && self.data_integrity.passed }
pub fn print(&self) { println!(" Atomicity: {}", self.atomicity.status()); println!(" Consistency: {}", self.consistency.status()); println!(" Isolation: {}", self.isolation.status()); println!(" Durability: {}", self.durability.status()); println!(" Data Integrity: {}", self.data_integrity.status());
if !self.all_checks_passed() { println!("\nFailures:"); self.print_failures(); } }
fn print_failures(&self) { for (name, check) in [ ("Atomicity", &self.atomicity), ("Consistency", &self.consistency), ("Isolation", &self.isolation), ("Durability", &self.durability), ("Data Integrity", &self.data_integrity), ] { if !check.passed { for failure in &check.failures { println!(" [{}] {}", name, failure); } } } }}
/// Individual check result#[derive(Default)]pub struct CheckResult { pub passed: bool, pub failures: Vec<String>,}
impl CheckResult { pub fn status(&self) -> &str { if self.passed { "✓ PASS" } else { "✗ FAIL" } }}
/// Global state tracker for consistency checksstruct GlobalStateTracker { // Track global invariants across all participants}
impl GlobalStateTracker { fn new() -> Self { Self {} }}2.4 Scenario Generator (scenario_generator.rs)
Purpose: Generate and load test scenarios LOC: ~500 Owner: AI Agent (Coder-1)
use std::collections::HashMap;use rand::Rng;
/// Scenario generatorpub struct ScenarioGenerator { /// Scenario database scenarios: Vec<TestScenario>,}
impl ScenarioGenerator { pub fn new() -> Self { let mut generator = Self { scenarios: Vec::new(), };
// Load all 50+ predefined scenarios generator.load_predefined_scenarios();
generator }
/// Load all predefined test scenarios fn load_predefined_scenarios(&mut self) { // Category 1: Node Crashes (6 scenarios) self.scenarios.push(self.scenario_single_participant_crash()); self.scenarios.push(self.scenario_coordinator_crash_before_prepare()); self.scenarios.push(self.scenario_coordinator_crash_after_prepare()); self.scenarios.push(self.scenario_coordinator_crash_during_commit()); self.scenarios.push(self.scenario_multiple_participant_crashes()); self.scenarios.push(self.scenario_cascading_crashes());
// Category 2: Network Partitions (8 scenarios) self.scenarios.push(self.scenario_network_partition_minority()); self.scenarios.push(self.scenario_network_partition_split_brain()); self.scenarios.push(self.scenario_asymmetric_partition()); self.scenarios.push(self.scenario_partition_during_prepare()); self.scenarios.push(self.scenario_partition_during_commit()); self.scenarios.push(self.scenario_partition_heal_during_recovery()); self.scenarios.push(self.scenario_multiple_partitions()); self.scenarios.push(self.scenario_flapping_network());
// Category 3: Timing Anomalies (10 scenarios) self.scenarios.push(self.scenario_slow_participant_prepare()); self.scenarios.push(self.scenario_slow_participant_commit()); self.scenarios.push(self.scenario_timeout_during_prepare()); self.scenarios.push(self.scenario_timeout_during_commit()); self.scenarios.push(self.scenario_variable_latency()); self.scenarios.push(self.scenario_packet_loss_5_percent()); self.scenarios.push(self.scenario_packet_loss_20_percent()); self.scenarios.push(self.scenario_high_latency_cross_region()); self.scenarios.push(self.scenario_bandwidth_constraint()); self.scenarios.push(self.scenario_clock_skew());
// Category 4: Coordinator Failures (8 scenarios) self.scenarios.push(self.scenario_coordinator_failover()); self.scenarios.push(self.scenario_coordinator_restart_with_recovery()); self.scenarios.push(self.scenario_coordinator_out_of_memory()); self.scenarios.push(self.scenario_coordinator_thread_exhaustion()); self.scenarios.push(self.scenario_coordinator_disk_full()); self.scenarios.push(self.scenario_multiple_coordinator_crashes()); self.scenarios.push(self.scenario_coordinator_byzantine()); self.scenarios.push(self.scenario_coordinator_stuck_in_prepare());
// Category 5: Lock Timeouts (6 scenarios) self.scenarios.push(self.scenario_lock_timeout_single_key()); self.scenarios.push(self.scenario_lock_timeout_multiple_keys()); self.scenarios.push(self.scenario_deadlock_detection()); self.scenarios.push(self.scenario_deadlock_resolution()); self.scenarios.push(self.scenario_livelock_prevention()); self.scenarios.push(self.scenario_long_running_transactions());
// Category 6: Combined Failures (6 scenarios) self.scenarios.push(self.scenario_network_partition_plus_crash()); self.scenarios.push(self.scenario_slow_network_plus_timeout()); self.scenarios.push(self.scenario_cascading_failures()); self.scenarios.push(self.scenario_recovery_during_new_failures()); self.scenarios.push(self.scenario_multiple_failure_modes()); self.scenarios.push(self.scenario_worst_case_chaos());
// Additional scenarios... }
/// Get all scenarios pub fn all_scenarios(&self) -> &[TestScenario] { &self.scenarios }
/// Get scenarios by category pub fn scenarios_by_category(&self, category: &str) -> Vec<&TestScenario> { self.scenarios .iter() .filter(|s| s.category == category) .collect() }
/// Generate random scenario pub fn generate_random_scenario(&self, num_participants: usize, num_txns: usize) -> TestScenario { let mut rng = rand::thread_rng();
// Generate random transactions let mut transactions = Vec::new(); for i in 0..num_txns { let num_parts = rng.gen_range(2..=10); let participant_ids: Vec<_> = (0..num_parts) .map(|j| ParticipantId::new(format!("participant_{}", j))) .collect();
transactions.push(DistributedTransaction { id: format!("txn_{}", i), participant_ids, writes: vec![], }); }
TestScenario { name: "Random Scenario".to_string(), description: "Randomly generated test scenario".to_string(), category: "Random".to_string(), transactions, failure_configs: HashMap::new(), network_conditions: NetworkConditions::default(), sla: SlaRequirements::default(), } }}
// Example scenario implementationsimpl ScenarioGenerator { /// Scenario: Single participant crashes during prepare fn scenario_single_participant_crash(&self) -> TestScenario { TestScenario { name: "Single Participant Crash During Prepare".to_string(), description: "One participant crashes after receiving PREPARE but before voting".to_string(), category: "Node Crashes".to_string(), transactions: vec![ DistributedTransaction { id: "txn_1".to_string(), participant_ids: vec![ ParticipantId::new("participant_0"), ParticipantId::new("participant_1"), // Will crash ParticipantId::new("participant_2"), ], writes: vec![ (Key::new("key1"), Value::new("value1")), (Key::new("key2"), Value::new("value2")), ], } ], failure_configs: { let mut configs = HashMap::new(); configs.insert( ParticipantId::new("participant_1"), FailureConfig { mode: FailureMode::Crash, trigger_time: FailureTriggerTime::AtTransaction(0), } ); configs }, network_conditions: NetworkConditions::default(), sla: SlaRequirements { max_abort_rate: 1.0, // Expect 100% abort max_prepare_latency_ms: 5000, max_commit_latency_ms: 5000, max_recovery_time_ms: 10000, }, } }
/// Scenario: Coordinator crash after PREPARE, before COMMIT fn scenario_coordinator_crash_after_prepare(&self) -> TestScenario { TestScenario { name: "Coordinator Crash After Prepare".to_string(), description: "Coordinator crashes after sending PREPARE but before COMMIT decision".to_string(), category: "Coordinator Failures".to_string(), transactions: vec![ DistributedTransaction { id: "txn_1".to_string(), participant_ids: vec![ ParticipantId::new("participant_0"), ParticipantId::new("participant_1"), ParticipantId::new("participant_2"), ], writes: vec![ (Key::new("key1"), Value::new("value1")), ], } ], failure_configs: HashMap::new(), // Coordinator crash handled separately network_conditions: NetworkConditions::default(), sla: SlaRequirements { max_abort_rate: 0.0, // Should recover and commit max_prepare_latency_ms: 5000, max_commit_latency_ms: 5000, max_recovery_time_ms: 10000, }, } }
/// Scenario: Network partition (split-brain) fn scenario_network_partition_split_brain(&self) -> TestScenario { TestScenario { name: "Network Partition - Split Brain".to_string(), description: "Cluster splits into two equal partitions".to_string(), category: "Network Partitions".to_string(), transactions: vec![ DistributedTransaction { id: "txn_1".to_string(), participant_ids: (0..10) .map(|i| ParticipantId::new(format!("participant_{}", i))) .collect(), writes: vec![ (Key::new("key1"), Value::new("value1")), ], } ], failure_configs: { let mut configs = HashMap::new(); // Participants 5-9 partitioned for i in 5..10 { configs.insert( ParticipantId::new(format!("participant_{}", i)), FailureConfig { mode: FailureMode::NetworkPartition, trigger_time: FailureTriggerTime::Delayed( Duration::from_millis(50) ), } ); } configs }, network_conditions: NetworkConditions::default(), sla: SlaRequirements { max_abort_rate: 1.0, // Transaction should abort max_prepare_latency_ms: 5000, max_commit_latency_ms: 5000, max_recovery_time_ms: 10000, }, } }
// ... 47+ more scenario implementations ...}
/// SLA requirements for test scenario#[derive(Clone)]pub struct SlaRequirements { /// Maximum acceptable abort rate (0.0 - 1.0) pub max_abort_rate: f64,
/// Maximum prepare latency (ms) pub max_prepare_latency_ms: u64,
/// Maximum commit latency (ms) pub max_commit_latency_ms: u64,
/// Maximum recovery time (ms) pub max_recovery_time_ms: u64,}
impl Default for SlaRequirements { fn default() -> Self { Self { max_abort_rate: 0.05, // 5% abort rate max_prepare_latency_ms: 100, max_commit_latency_ms: 100, max_recovery_time_ms: 10000, } }}3. Failure Scenario Catalog
This section defines all 50+ failure scenarios with expected outcomes.
3.1 Category 1: Node Crashes (6 Scenarios)
Scenario 1.1: Single Participant Crash During Prepare
Description: One participant crashes after receiving PREPARE message but before voting.
Setup:
- 3 participants
- 1 transaction writing to all 3 participants
- Participant 1 crashes immediately upon receiving PREPARE
Expected Outcome:
- Transaction ABORTS (coordinator timeout waiting for vote)
- Participants 0, 2 abort their prepared state
- Participant 1 recovers with no committed data
- Recovery time: <5s
Validation:
- No data written to any participant
- WAL shows PREPARE but no COMMIT
- Atomicity preserved
Scenario 1.2: Coordinator Crash Before Prepare
Description: Coordinator crashes before sending PREPARE to any participant.
Setup:
- 5 participants
- 1 transaction
- Coordinator crashes immediately
Expected Outcome:
- Transaction ABORTS
- No participant receives PREPARE
- No data modified
- Client receives error
Validation:
- All participants in IDLE state
- No WAL entries for this transaction
Scenario 1.3: Coordinator Crash After Prepare, Before Commit
Description: Coordinator crashes after all participants voted YES, before sending COMMIT.
Setup:
- 3 participants
- 1 transaction
- All participants vote YES
- Coordinator crashes before COMMIT
Expected Outcome:
- Participants remain in PREPARED state
- Coordinator recovery detects prepared transaction in WAL
- Recovery process sends COMMIT to all participants
- Transaction eventually COMMITS
- Recovery time: <10s
Validation:
- All participants have committed data
- WAL shows PREPARE + COMMIT
- Atomicity preserved
- Critical: This tests WAL-based recovery
Scenario 1.4: Coordinator Crash During Commit Phase
Description: Coordinator crashes after sending COMMIT to some (but not all) participants.
Setup:
- 5 participants
- 1 transaction
- Coordinator crashes after sending COMMIT to participants 0, 1
Expected Outcome:
- Participants 0, 1 commit
- Participants 2, 3, 4 remain in PREPARED state
- Coordinator recovery completes commit to remaining participants
- Recovery time: <10s
Validation:
- Eventually, ALL participants have committed data
- No participant left in PREPARED state indefinitely
- Atomicity eventually consistent
Scenario 1.5: Multiple Participant Crashes
Description: 40% of participants crash during prepare phase.
Setup:
- 10 participants
- 4 participants crash during PREPARE
Expected Outcome:
- Transaction ABORTS (cannot reach quorum)
- Remaining 6 participants abort
- Crashed participants recover with no data
Validation:
- Zero data committed anywhere
- Recovery of crashed participants clean
Scenario 1.6: Cascading Crashes
Description: Participants crash sequentially during different phases.
Setup:
- 5 participants
- Participant 0 crashes during PREPARE
- Participant 1 crashes during COMMIT
Expected Outcome:
- Transaction ABORTS (Participant 0 failed to vote)
- Participant 1 crash has no effect (transaction already aborted)
Validation:
- No data committed
- Recovery of both participants clean
3.2 Category 2: Network Partitions (8 Scenarios)
Scenario 2.1: Network Partition - Minority Partition
Description: Network split isolates 20% of participants.
Setup:
- 10 participants
- 2 participants (minority) partitioned
- 1 transaction spanning all 10
Expected Outcome:
- Majority partition (8 nodes) can reach quorum
- Transaction ABORTS (cannot commit with 2 participants unreachable)
- Partition heals → participants sync
Validation:
- Eventual consistency after partition heal
- No split-brain data
Scenario 2.2: Split-Brain Network Partition
Description: Network splits cluster exactly in half.
Setup:
- 10 participants (5 + 5 split)
- 1 transaction
Expected Outcome:
- Neither partition has quorum
- Transaction ABORTS on both sides
- Partition heals → coordinator resolves to ABORT
Validation:
- No data committed on either side
- Atomicity preserved
Scenario 2.3: Asymmetric Partition
Description: Coordinator can reach some participants, but participants can’t reach each other.
Setup:
- Coordinator → All participants (reachable)
- Participant 0 ↔ Participant 1 (unreachable)
Expected Outcome:
- Depends on 2PC implementation
- If participants need peer communication: ABORT
- If only coordinator communication: COMMIT (if all vote YES)
Validation:
- Consistency maintained
- No conflicting commits
Scenario 2.4: Partition During Prepare Phase
Description: Network partition occurs halfway through PREPARE voting.
Setup:
- 8 participants
- 4 vote YES
- Partition isolates remaining 4 before voting
Expected Outcome:
- Coordinator cannot collect all votes
- Transaction ABORTS
- Timeout triggers abort on all prepared participants
Validation:
- Participants that voted YES eventually abort
- No data committed
Scenario 2.5: Partition During Commit Phase
Description: Network partition during COMMIT message propagation.
Setup:
- All participants vote YES
- Partition occurs after COMMIT sent to 5/10 participants
Expected Outcome:
- 5 participants commit
- 5 participants remain PREPARED
- Partition heals → recovery completes commit
- Eventual atomicity
Validation:
- Eventually all 10 participants have committed data
- Data consistency across all participants
Scenario 2.6: Partition Heal During Recovery
Description: Network heals while coordinator is recovering a previous transaction.
Setup:
- Transaction prepared on all participants
- Coordinator crashes
- Network partitions
- Coordinator recovers
- Network heals
Expected Outcome:
- Recovery process completes successfully
- All participants eventually commit
- Recovery time: <15s (including partition time)
Validation:
- Atomicity eventually consistent
- No duplicate commits
Scenario 2.7: Multiple Overlapping Partitions
Description: Multiple partitions occur simultaneously, affecting different participant subsets.
Setup:
- 15 participants
- Partition 1: Isolates participants 0-4
- Partition 2: Isolates participants 10-14
Expected Outcome:
- Coordinator can only reach 5 participants (5-9)
- Transaction ABORTS (cannot reach all participants)
Validation:
- No data committed
- All partitions eventually heal
- Consistent state after heal
Scenario 2.8: Flapping Network
Description: Network connectivity flaps repeatedly (connect/disconnect).
Setup:
- 5 participants
- Network flaps every 100ms for 2 seconds
Expected Outcome:
- Coordinator retries failed connections
- Either transaction commits during stable window OR aborts after timeout
- No partial commits
Validation:
- Atomicity maintained despite flapping
- Performance degradation acceptable (<10x slowdown)
3.3 Category 3: Timing Anomalies (10 Scenarios)
Scenario 3.1: Slow Participant Prepare (1s delay)
Description: One participant is slow to vote in PREPARE phase.
Setup:
- 5 participants
- Participant 2 delays PREPARE vote by 1s
Expected Outcome:
- If timeout > 1s: Transaction commits successfully
- If timeout < 1s: Transaction aborts (timeout)
Validation:
- No partial commits
- Timeout handling correct
Scenario 3.2: Slow Participant Commit (2s delay)
Description: One participant is slow to execute COMMIT.
Setup:
- 5 participants
- Participant 3 delays COMMIT execution by 2s
Expected Outcome:
- Transaction commits on all participants eventually
- Coordinator waits for slow participant
- Total latency: ~2s
Validation:
- All participants eventually commit
- No timeout during commit phase (commit must be durable)
Scenario 3.3: Timeout During Prepare
Description: Coordinator timeout expires before all votes collected.
Setup:
- 10 participants
- 2 participants never respond to PREPARE
- Timeout: 5s
Expected Outcome:
- After 5s, coordinator aborts transaction
- 8 participants that voted YES abort
- 2 non-responsive participants eventually timeout locally
Validation:
- No data committed anywhere
- All participants eventually idle
Scenario 3.4: Timeout During Commit
Description: Participant doesn’t acknowledge COMMIT within timeout.
Setup:
- 5 participants
- Participant 4 doesn’t ACK COMMIT (network issue)
Expected Outcome:
- Coordinator retries COMMIT indefinitely (commit must succeed)
- Participant 4 eventually receives COMMIT (retry succeeds)
- Transaction commits on all participants
Validation:
- Atomicity maintained
- No permanent PREPARED state
Scenario 3.5: Variable Latency (50ms - 500ms)
Description: Network latency varies randomly between participants.
Setup:
- 20 participants
- Latency range: 50ms - 500ms per participant
Expected Outcome:
- Transaction commits successfully
- Total latency: ~500ms (slowest participant)
- Performance degradation proportional to max latency
Validation:
- All participants commit
- Latency distribution tracked
Scenario 3.6: Packet Loss 5%
Description: 5% of network packets dropped randomly.
Setup:
- 10 participants
- 5% packet loss rate
- No network-level retries
Expected Outcome:
- Coordinator retries failed messages
- Transaction commits eventually
- Latency increase: ~20% (due to retries)
Validation:
- No data loss
- Retries handled correctly
Scenario 3.7: Packet Loss 20%
Description: 20% packet loss (severe network degradation).
Setup:
- 10 participants
- 20% packet loss
- Timeout: 10s
Expected Outcome:
- High retry count
- Transaction commits OR aborts within timeout
- Performance severely degraded
Validation:
- Atomicity maintained despite severe packet loss
- No deadlocks from retries
Scenario 3.8: High Latency Cross-Region (200ms)
Description: Simulate cross-region deployment with high latency.
Setup:
- 6 participants (3 in region A, 3 in region B)
- Inter-region latency: 200ms
- Intra-region latency: 5ms
Expected Outcome:
- Transaction commits successfully
- Prepare phase: ~400ms (round-trip)
- Commit phase: ~400ms (round-trip)
- Total: ~800ms
Validation:
- Latency matches expectation
- No timeouts due to high latency
Scenario 3.9: Bandwidth Constraint (1 Mbps)
Description: Simulate low-bandwidth network (1 Mbps).
Setup:
- 10 participants
- Large transaction payload (1 MB per participant)
- Bandwidth: 1 Mbps
Expected Outcome:
- Transaction commits
- Latency dominated by bandwidth (10s for 10 MB total)
Validation:
- No errors due to bandwidth constraint
- Throughput matches bandwidth limit
Scenario 3.10: Clock Skew (10s difference)
Description: Participant clocks differ by 10 seconds.
Setup:
- 5 participants
- Participant 2 clock is 10s ahead
Expected Outcome:
- Transaction commits successfully
- Logical timestamps prevent issues
- No impact on correctness
Validation:
- Clock skew doesn’t affect atomicity
- Timestamps monotonically increase
3.4 Category 4: Coordinator Failures (8 Scenarios)
Scenario 4.1: Coordinator Failover
Description: Primary coordinator fails, backup takes over.
Setup:
- Primary coordinator crashes
- Backup coordinator detects failure and takes over
Expected Outcome:
- Backup coordinator recovers in-flight transactions from WAL
- Prepared transactions completed
- New transactions routed to backup coordinator
- Failover time: <5s
Validation:
- No transaction loss
- Atomicity maintained across failover
Scenario 4.2: Coordinator Restart with Recovery
Description: Coordinator restarts and recovers prepared transactions.
Setup:
- 3 prepared transactions in coordinator WAL
- Coordinator restarts
Expected Outcome:
- Recovery process reads WAL
- Completes all 3 prepared transactions
- Recovery time: <10s
Validation:
- All 3 transactions committed
- WAL entries complete
Scenario 4.3: Coordinator Out of Memory
Description: Coordinator runs out of memory during transaction.
Setup:
- Simulate OOM error during PREPARE processing
Expected Outcome:
- Coordinator crashes or rejects new transactions
- In-flight transactions may abort
- Recovery after restart
Validation:
- No data corruption
- Clean recovery
Scenario 4.4: Coordinator Thread Exhaustion
Description: Coordinator thread pool exhausted.
Setup:
- 1000 concurrent transactions
- Thread pool size: 100
Expected Outcome:
- Coordinator queues new transactions
- Throughput limited by thread pool
- No failures, just degraded performance
Validation:
- All transactions eventually complete
- No deadlocks
Scenario 4.5: Coordinator Disk Full
Description: Coordinator runs out of disk space for WAL.
Setup:
- Simulate disk full error during WAL write
Expected Outcome:
- Coordinator rejects new transactions
- Returns error to client
- No data loss for completed transactions
Validation:
- No corrupted WAL
- Graceful degradation
Scenario 4.6: Multiple Coordinator Crashes
Description: Coordinator crashes multiple times during recovery.
Setup:
- Coordinator crashes during initial recovery
- Crashes again after partial recovery
Expected Outcome:
- Each restart resumes recovery from WAL
- Eventually completes all prepared transactions
- Recovery idempotent
Validation:
- No duplicate commits
- Atomicity maintained
Scenario 4.7: Byzantine Coordinator
Description: Coordinator sends conflicting COMMIT/ABORT messages.
Setup:
- Coordinator sends COMMIT to participants 0-4
- Sends ABORT to participants 5-9
Expected Outcome:
- Depends on implementation
- Ideally: Participants detect inconsistency, escalate to admin
- Worst case: Split-brain data (manual resolution needed)
Validation:
- Byzantine behavior detected
- Alerting triggered
Scenario 4.8: Coordinator Stuck in Prepare
Description: Coordinator gets stuck waiting for PREPARE votes indefinitely.
Setup:
- All participants vote, but coordinator doesn’t process votes (bug simulation)
Expected Outcome:
- Participants timeout in PREPARED state
- Auto-abort after timeout
- Coordinator needs restart
Validation:
- Participants don’t stay PREPARED forever
- Timeout protection works
3.5 Category 5: Lock Timeouts (6 Scenarios)
Scenario 5.1: Lock Timeout on Single Key
Description: Transaction waits for lock longer than timeout.
Setup:
- Transaction T1 holds lock on key “x”
- Transaction T2 requests lock on “x”
- Lock timeout: 5s
Expected Outcome:
- T2 waits for 5s
- T2 aborts with lock timeout error
- T1 commits successfully
Validation:
- No deadlock
- Timeout enforced correctly
Scenario 5.2: Lock Timeout on Multiple Keys
Description: Transaction times out waiting for multiple locks.
Setup:
- T1 holds locks on keys {a, b, c}
- T2 requests locks on keys {a, b, c, d}
- T2 acquires lock on “d”, waits for {a, b, c}
Expected Outcome:
- T2 times out
- T2 releases lock on “d” and aborts
Validation:
- No partial locks held after timeout
- Clean lock release
Scenario 5.3: Deadlock Detection (2 transactions)
Description: Classic deadlock between two transactions.
Setup:
- T1: locks “a”, waits for “b”
- T2: locks “b”, waits for “a”
Expected Outcome:
- Deadlock detector identifies cycle
- One transaction aborted (youngest)
- Other transaction proceeds
Validation:
- Deadlock resolved within 1s
- No permanent deadlock
Scenario 5.4: Deadlock Resolution (3+ transactions)
Description: Complex deadlock involving 3 transactions.
Setup:
- T1: locks “a”, waits for “b”
- T2: locks “b”, waits for “c”
- T3: locks “c”, waits for “a”
Expected Outcome:
- Deadlock detector identifies 3-way cycle
- One transaction aborted (breaks cycle)
- Other 2 proceed
Validation:
- Deadlock resolved
- Minimal aborts (only 1 transaction)
Scenario 5.5: Livelock Prevention
Description: Two transactions repeatedly abort and retry, never making progress.
Setup:
- T1 and T2 both need same locks
- Both use immediate retry on abort
Expected Outcome:
- Exponential backoff prevents livelock
- Eventually one transaction succeeds
Validation:
- Progress within reasonable time (<10s)
- No infinite retry loop
Scenario 5.6: Long-Running Transactions
Description: Transaction runs for 60s (longer than typical timeout).
Setup:
- T1 runs for 60s (complex computation)
- T2 waits for locks held by T1
Expected Outcome:
- T1 configures longer timeout (or no timeout)
- T2 eventually proceeds after T1 commits
- No premature abort of T1
Validation:
- Long-running transactions supported
- Configurable timeouts work
3.6 Category 6: Combined Failures (6 Scenarios)
Scenario 6.1: Network Partition + Node Crash
Description: Network partition followed by node crash on minority side.
Setup:
- Network partitions (5 + 5 split)
- 2 nodes crash on minority side
Expected Outcome:
- Transaction aborts (no quorum on either side)
- Crashed nodes recover after partition heals
- No data loss
Validation:
- Complex failure handled correctly
- Recovery sequence correct
Scenario 6.2: Slow Network + Timeout
Description: High latency network triggers timeout before transaction completes.
Setup:
- Network latency: 8s
- Timeout: 5s
Expected Outcome:
- Coordinator times out during PREPARE
- Transaction aborts
- Participants eventually receive ABORT
Validation:
- Timeout correctly aborts transaction
- No orphaned PREPARED participants
Scenario 6.3: Cascading Failures
Description: Failure of one component triggers failures in others.
Setup:
- Participant 0 crashes → coordinator overload → coordinator crash
Expected Outcome:
- System handles cascading failures gracefully
- Recovery doesn’t trigger more failures
- Eventually stable
Validation:
- No failure cascade loop
- System self-stabilizes
Scenario 6.4: Recovery During New Failures
Description: New failures occur during recovery process.
Setup:
- Coordinator recovering prepared transactions
- Participant crashes during recovery
Expected Outcome:
- Recovery process handles new failures
- Retries failed recovery operations
- Eventually all transactions resolved
Validation:
- Recovery robust to new failures
- No permanent stuck state
Scenario 6.5: Multiple Failure Modes Simultaneously
Description: Network partition, slow participants, and crashes all at once.
Setup:
- 10 participants
- 3 partitioned
- 2 slow (1s delay)
- 1 crashed
Expected Outcome:
- Coordinator makes best effort
- Transaction likely aborts (too many failures)
- Clean recovery
Validation:
- No data corruption despite chaos
- System survives extreme conditions
Scenario 6.6: Worst-Case Chaos (Kitchen Sink)
Description: Every possible failure mode injected randomly.
Setup:
- 100 participants
- 100 concurrent transactions
- Random failures: crashes, partitions, delays, byzantine
- Duration: 5 minutes
Expected Outcome:
- Some transactions commit
- Some transactions abort
- Zero data corruption
- System remains stable
Validation:
- THIS IS THE FINAL BOSS TEST
- Atomicity: 100%
- Consistency: 100%
- Isolation: 100%
- Durability: 100%
- Performance degradation: <5x
4. Test Harness Design
4.1 Test Execution Flow
┌─────────────────────────────────────────────────────────────────┐│ Test Execution Flow │└─────────────────────────────────────────────────────────────────┘
1. Load Scenario ├─ Read scenario definition ├─ Validate configuration └─ Initialize test environment
2. Setup Phase ├─ Create coordinator (production code) ├─ Create virtual participants (N = 10-1000) ├─ Configure failure injector ├─ Configure network simulator └─ Initialize validator & metrics
3. Execution Phase ├─ Start metrics collection ├─ Execute transactions (concurrent) │ ├─ BEGIN transaction │ ├─ Coordinator sends PREPARE to participants │ ├─ Participants vote YES/NO │ ├─ Coordinator decides COMMIT/ABORT │ └─ Participants execute decision ├─ Inject failures (according to schedule) └─ Wait for quiescence
4. Validation Phase ├─ Check atomicity (all-or-nothing) ├─ Check consistency (invariants) ├─ Check isolation (no dirty reads) ├─ Check durability (WAL recovery) └─ Check data integrity (checksums)
5. Metrics Collection ├─ Transaction throughput ├─ Latency (p50, p99, p99.9) ├─ Failure recovery time ├─ Resource utilization └─ Error rates
6. Report Generation ├─ Test result (PASS/FAIL) ├─ Detailed validation report ├─ Performance metrics ├─ Failure logs └─ Recommendations4.2 Metrics Collection
/// Metrics collectorpub struct MetricsCollector { /// Transaction metrics pub txn_count: AtomicU64, pub txn_committed: AtomicU64, pub txn_aborted: AtomicU64,
/// Latency metrics pub prepare_latencies: Mutex<Vec<Duration>>, pub commit_latencies: Mutex<Vec<Duration>>, pub total_latencies: Mutex<Vec<Duration>>,
/// Failure metrics pub failures_detected: AtomicU64, pub recoveries_completed: AtomicU64, pub recovery_times: Mutex<Vec<Duration>>,
/// Resource metrics pub peak_memory_mb: AtomicU64, pub peak_cpu_percent: AtomicU64,}
impl MetricsCollector { pub fn snapshot(&self) -> MetricsSnapshot { let prepare_latencies = self.prepare_latencies.lock().unwrap(); let commit_latencies = self.commit_latencies.lock().unwrap(); let total_latencies = self.total_latencies.lock().unwrap();
MetricsSnapshot { txn_count: self.txn_count.load(Ordering::Relaxed), txn_committed: self.txn_committed.load(Ordering::Relaxed), txn_aborted: self.txn_aborted.load(Ordering::Relaxed),
prepare_p50: Self::percentile(&prepare_latencies, 0.50), prepare_p99: Self::percentile(&prepare_latencies, 0.99), prepare_p999: Self::percentile(&prepare_latencies, 0.999),
commit_p50: Self::percentile(&commit_latencies, 0.50), commit_p99: Self::percentile(&commit_latencies, 0.99), commit_p999: Self::percentile(&commit_latencies, 0.999),
total_p50: Self::percentile(&total_latencies, 0.50), total_p99: Self::percentile(&total_latencies, 0.99), total_p999: Self::percentile(&total_latencies, 0.999),
failures_detected: self.failures_detected.load(Ordering::Relaxed), recoveries_completed: self.recoveries_completed.load(Ordering::Relaxed),
peak_memory_mb: self.peak_memory_mb.load(Ordering::Relaxed), peak_cpu_percent: self.peak_cpu_percent.load(Ordering::Relaxed), } }
fn percentile(values: &[Duration], p: f64) -> Duration { if values.is_empty() { return Duration::from_millis(0); }
let mut sorted = values.to_vec(); sorted.sort();
let index = ((values.len() as f64) * p) as usize; sorted[index.min(values.len() - 1)] }
pub fn meets_sla(&self, sla: &SlaRequirements) -> bool { let snapshot = self.snapshot();
let abort_rate = snapshot.txn_aborted as f64 / snapshot.txn_count as f64;
abort_rate <= sla.max_abort_rate && snapshot.prepare_p99.as_millis() as u64 <= sla.max_prepare_latency_ms && snapshot.commit_p99.as_millis() as u64 <= sla.max_commit_latency_ms }}
pub struct MetricsSnapshot { pub txn_count: u64, pub txn_committed: u64, pub txn_aborted: u64,
pub prepare_p50: Duration, pub prepare_p99: Duration, pub prepare_p999: Duration,
pub commit_p50: Duration, pub commit_p99: Duration, pub commit_p999: Duration,
pub total_p50: Duration, pub total_p99: Duration, pub total_p999: Duration,
pub failures_detected: u64, pub recoveries_completed: u64,
pub peak_memory_mb: u64, pub peak_cpu_percent: u64,}
impl MetricsSnapshot { pub fn print(&self) { println!(" Transactions:"); println!(" Total: {}", self.txn_count); println!(" Committed: {}", self.txn_committed); println!(" Aborted: {}", self.txn_aborted); println!(" Abort Rate: {:.2}%", (self.txn_aborted as f64 / self.txn_count as f64) * 100.0);
println!(" Latency (Prepare):"); println!(" P50: {:?}", self.prepare_p50); println!(" P99: {:?}", self.prepare_p99); println!(" P99.9: {:?}", self.prepare_p999);
println!(" Latency (Commit):"); println!(" P50: {:?}", self.commit_p50); println!(" P99: {:?}", self.commit_p99); println!(" P99.9: {:?}", self.commit_p999);
println!(" Latency (Total):"); println!(" P50: {:?}", self.total_p50); println!(" P99: {:?}", self.total_p99); println!(" P99.9: {:?}", self.total_p999);
println!(" Failures:"); println!(" Detected: {}", self.failures_detected); println!(" Recovered: {}", self.recoveries_completed);
println!(" Resources:"); println!(" Peak Memory: {} MB", self.peak_memory_mb); println!(" Peak CPU: {}%", self.peak_cpu_percent); }}5. Validation Framework
5.1 Atomicity Validation
Invariant: For any transaction T, either ALL participants commit T’s writes, or NONE do.
Validation Algorithm:
fn check_atomicity( txn_results: &[TransactionResult], participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,) -> Result<CheckResult> { let mut failures = Vec::new();
for txn_result in txn_results { let gid = &txn_result.gid;
// Collect commit status from all participants let mut commit_count = 0; let mut total_participants = 0;
for (_pid, participant) in participants { let p = participant.lock().unwrap();
// Check if this participant committed this transaction if p.has_committed(gid)? { commit_count += 1; } total_participants += 1; }
// Atomicity check: commit_count must be 0 OR total_participants if commit_count != 0 && commit_count != total_participants { failures.push(format!( "Atomicity violation for {}: {} out of {} participants committed", gid, commit_count, total_participants )); }
// Cross-check with expected outcome match txn_result.outcome { TransactionOutcome::Committed => { if commit_count != total_participants { failures.push(format!( "Transaction {} marked as committed but only {} of {} participants committed", gid, commit_count, total_participants )); } }, TransactionOutcome::Aborted => { if commit_count != 0 { failures.push(format!( "Transaction {} marked as aborted but {} participants committed", gid, commit_count )); } }, _ => {} } }
Ok(CheckResult { passed: failures.is_empty(), failures, })}5.2 Consistency Validation
Invariant: Application-specific invariants (e.g., total balance unchanged in bank transfers).
Example (Bank Transfer):
fn check_consistency_bank_transfer( participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>, initial_total_balance: i64,) -> Result<CheckResult> { let mut failures = Vec::new();
// Calculate final total balance let mut final_total_balance = 0i64;
for participant in participants.values() { let p = participant.lock().unwrap();
for (key, value) in &p.data { if key.starts_with("account_") { let balance = value.parse::<i64>()?; final_total_balance += balance; } } }
if initial_total_balance != final_total_balance { failures.push(format!( "Total balance changed: initial={}, final={}", initial_total_balance, final_total_balance )); }
Ok(CheckResult { passed: failures.is_empty(), failures, })}5.3 Isolation Validation
Invariant: No transaction reads uncommitted data from another transaction.
Validation: Replay transaction execution log, check that all reads came from committed versions.
5.4 Durability Validation
Invariant: All committed transactions survive crash and recovery.
Validation Algorithm:
fn check_durability( participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,) -> Result<CheckResult> { let mut failures = Vec::new();
for (pid, participant) in participants { let mut p = participant.lock().unwrap();
// Snapshot current data let data_before = p.data.clone();
// Simulate crash and recovery p.crash_and_recover()?;
// Check data after recovery let data_after = p.data.clone();
// All committed data must survive for (key, value_before) in &data_before { match data_after.get(key) { Some(value_after) if value_after == value_before => { // OK: Data survived }, Some(value_after) => { failures.push(format!( "Participant {}: Key {} changed during recovery: {:?} -> {:?}", pid, key, value_before, value_after )); }, None => { failures.push(format!( "Participant {}: Key {} lost during recovery", pid, key )); }, } }
// No new data should appear for key in data_after.keys() { if !data_before.contains_key(key) { failures.push(format!( "Participant {}: Key {} appeared after recovery", pid, key )); } } }
Ok(CheckResult { passed: failures.is_empty(), failures, })}6. Implementation Roadmap
Week 8-11: Development (4 weeks, $200K)
Week 8: Foundation
- Module structure setup
- Virtual participant implementation
- Basic test harness
Week 9: Failure Injection
- Failure injector implementation
- Network simulator
- Scenario loader
Week 10: Validation
- Correctness validator
- Metrics collector
- WAL recovery testing
Week 11: Scenarios
- Implement all 50+ scenarios
- Integration testing
- Documentation
Deliverables (Week 11):
- Complete test framework (4 modules, 3,100 LOC)
- All 50+ scenarios defined
- CI/CD integration
- Developer documentation
Week 12-15: Basic Testing (4 weeks, $0 - automated)
Execute scenarios:
- Category 1: Node Crashes (6 scenarios)
- Category 2: Network Partitions (8 scenarios)
- Category 3: Timing Anomalies (10 scenarios)
Activities:
- Run tests on CI/CD
- Collect results
- File bugs for failures
- Iterate on fixes
Week 16-18: Advanced Testing (3 weeks, $0 - automated)
Execute scenarios:
- Category 4: Coordinator Failures (8 scenarios)
- Category 5: Lock Timeouts (6 scenarios)
- Category 6: Combined Failures (6 scenarios)
Activities:
- Scale testing (100, 500, 1000 nodes)
- Performance regression testing
- Stress testing
- Chaos testing
Week 19: Final Validation & Certification (1 week, $0 - automated)
Activities:
- Run full test suite (50+ scenarios × 3 repetitions)
- Aggregate results
- Generate final report
- Production certification sign-off
Success Criteria:
- All 50+ scenarios pass
- Zero data corruption
- <5% performance degradation
- Recovery time <10s (all scenarios)
7. Module Templates
7.1 Main Test Runner
use std::time::Instant;
mod 2pc_test_harness;mod failure_injector;mod correctness_validator;mod scenario_generator;mod virtual_participant;mod metrics_collector;
use 2pc_test_harness::*;use scenario_generator::ScenarioGenerator;
#[tokio::main]async fn main() -> Result<()> { println!("=== HeliosDB 2PC Testing Infrastructure ===\n");
// Initialize let config = TestHarnessConfig::default(); let mut harness = TwoPhaseCommitTestHarness::new(config)?; let generator = ScenarioGenerator::new();
// Get all scenarios let scenarios = generator.all_scenarios();
println!("Loaded {} test scenarios\n", scenarios.len());
// Run all scenarios let start = Instant::now(); let mut results = Vec::new();
for scenario in scenarios { println!("Running: {}", scenario.name); let result = harness.run_scenario(scenario.clone()).await?; result.print_summary(); results.push(result); }
let total_duration = start.elapsed();
// Print summary print_final_summary(&results, total_duration);
Ok(())}
fn print_final_summary(results: &[TestResult], total_duration: Duration) { println!("\n=== FINAL SUMMARY ===");
let total_tests = results.len(); let passed_tests = results.iter().filter(|r| r.passed).count(); let failed_tests = total_tests - passed_tests;
println!("Total Tests: {}", total_tests); println!("Passed: {} ({}%)", passed_tests, (passed_tests * 100) / total_tests); println!("Failed: {} ({}%)", failed_tests, (failed_tests * 100) / total_tests); println!("Total Duration: {:?}", total_duration);
if failed_tests > 0 { println!("\nFailed Tests:"); for result in results.iter().filter(|r| !r.passed) { println!(" - {}", result.scenario_name); } }
if passed_tests == total_tests { println!("\n ALL TESTS PASSED - PRODUCTION READY"); } else { println!("\n❌ SOME TESTS FAILED - NOT PRODUCTION READY"); }}7.2 Virtual Participant (Complete Implementation)
use std::collections::HashMap;use std::sync::{Arc, Mutex};use std::time::{Duration, Instant};use heliosdb_storage::xa_participant::PrepareVote;use rand::Rng;
/// Virtual participant for testingpub struct VirtualParticipant { pub id: ParticipantId, pub state: ParticipantState,
// Simulated storage pub data: HashMap<Key, Value>, pub wal: Vec<WalEntry>, pub prepared_txns: HashMap<Gid, PreparedTransaction>,
// Behavior configuration pub failure_mode: Option<FailureMode>, pub network_latency_ms: u64, pub crash_probability: f64,
// Metrics pub prepare_count: usize, pub commit_count: usize, pub abort_count: usize,}
#[derive(Clone, PartialEq, Debug)]pub enum ParticipantState { Healthy, Slow, NetworkDown, Crashed, Byzantine,}
pub struct PreparedTransaction { pub gid: Gid, pub writes: Vec<(Key, Value)>, pub prepared_at: Instant,}
#[derive(Clone, Debug)]pub enum WalEntry { Prepare(Gid), Commit(Gid), Abort(Gid),}
impl WalEntry { pub fn verify_checksum(&self) -> bool { // Simplified checksum verification true }}
impl VirtualParticipant { pub fn new(id: ParticipantId, config: TestHarnessConfig) -> Self { Self { id, state: ParticipantState::Healthy, data: HashMap::new(), wal: Vec::new(), prepared_txns: HashMap::new(), failure_mode: None, network_latency_ms: config.base_latency_ms, crash_probability: 0.0, prepare_count: 0, commit_count: 0, abort_count: 0, } }
pub fn configure_failure(&mut self, config: FailureConfig) { self.failure_mode = Some(config.mode); }
pub fn prepare(&mut self, gid: &Gid, writes: &[(Key, Value)]) -> Result<PrepareVote> { self.inject_failure()?; self.simulate_network_delay();
if self.state == ParticipantState::Crashed { return Err(Error::NodeCrashed); }
// Check if we can prepare let can_prepare = self.can_prepare(writes)?;
if can_prepare { // Record in WAL self.wal.push(WalEntry::Prepare(gid.clone()));
// Store prepared transaction self.prepared_txns.insert(gid.clone(), PreparedTransaction { gid: gid.clone(), writes: writes.to_vec(), prepared_at: Instant::now(), });
self.prepare_count += 1;
Ok(PrepareVote::Yes) } else { Ok(PrepareVote::No) } }
pub fn commit(&mut self, gid: &Gid) -> Result<()> { self.inject_failure()?; self.simulate_network_delay();
if let Some(prepared_txn) = self.prepared_txns.remove(gid) { // Apply writes for (key, value) in prepared_txn.writes { self.data.insert(key, value); }
// Record in WAL self.wal.push(WalEntry::Commit(gid.clone()));
self.commit_count += 1;
Ok(()) } else { Err(Error::TransactionNotPrepared(gid.clone())) } }
pub fn abort(&mut self, gid: &Gid) -> Result<()> { self.inject_failure()?; self.simulate_network_delay();
if let Some(_) = self.prepared_txns.remove(gid) { // Remove prepared state self.wal.push(WalEntry::Abort(gid.clone())); self.abort_count += 1; Ok(()) } else { // Not prepared = already committed or never prepared Ok(()) } }
pub fn has_committed(&self, gid: &Gid) -> Result<bool> { // Check WAL for commit entry Ok(self.wal.iter().any(|entry| { matches!(entry, WalEntry::Commit(g) if g == gid) })) }
pub fn crash_and_recover(&mut self) -> Result<()> { // Simulate crash: lose in-memory state self.data.clear(); self.prepared_txns.clear(); self.state = ParticipantState::Crashed;
// Recover from WAL self.recover_from_wal()?;
self.state = ParticipantState::Healthy;
Ok(()) }
fn recover_from_wal(&mut self) -> Result<()> { // Replay WAL to restore state let mut committed_txns = HashMap::new();
for entry in &self.wal { match entry { WalEntry::Prepare(gid) => { // Prepared but not committed = need to query coordinator // For testing, we'll mark as uncertain }, WalEntry::Commit(gid) => { committed_txns.insert(gid.clone(), true); }, WalEntry::Abort(gid) => { committed_txns.insert(gid.clone(), false); }, } }
// Reconstruct data state // (Simplified: in reality, need to replay write operations)
Ok(()) }
fn can_prepare(&self, writes: &[(Key, Value)]) -> Result<bool> { // Check if we can prepare (e.g., no conflicts) Ok(true) }
fn inject_failure(&self) -> Result<()> { if let Some(ref failure_mode) = self.failure_mode { match failure_mode { FailureMode::Crash => { return Err(Error::NodeCrashed); }, FailureMode::RandomCrash { probability } => { let mut rng = rand::thread_rng(); if rng.gen::<f64>() < *probability { return Err(Error::NodeCrashed); } }, FailureMode::NetworkPartition => { return Err(Error::NetworkTimeout); }, FailureMode::SlowResponse { delay_ms } => { std::thread::sleep(Duration::from_millis(*delay_ms)); }, _ => {} } } Ok(()) }
fn simulate_network_delay(&self) { std::thread::sleep(Duration::from_millis(self.network_latency_ms)); }}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]pub struct ParticipantId(String);
impl ParticipantId { pub fn new(id: String) -> Self { Self(id) }}
impl std::fmt::Display for ParticipantId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]pub struct Gid(String);
impl Gid { pub fn new(id: String) -> Self { Self(id) }}
impl std::fmt::Display for Gid { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) }}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]pub struct Key(String);
impl Key { pub fn new(key: &str) -> Self { Self(key.to_string()) }
pub fn starts_with(&self, prefix: &str) -> bool { self.0.starts_with(prefix) }}
#[derive(Clone, PartialEq, Eq, Debug)]pub struct Value(String);
impl Value { pub fn new(value: &str) -> Self { Self(value.to_string()) }
pub fn parse<T: std::str::FromStr>(&self) -> Result<T> where T::Err: std::fmt::Display, { self.0.parse().map_err(|e| Error::ParseError(format!("{}", e))) }}
#[derive(Debug)]pub enum Error { NodeCrashed, NetworkTimeout, TransactionNotPrepared(Gid), ParseError(String),}
impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Error::NodeCrashed => write!(f, "Node crashed"), Error::NetworkTimeout => write!(f, "Network timeout"), Error::TransactionNotPrepared(gid) => write!(f, "Transaction {} not prepared", gid), Error::ParseError(msg) => write!(f, "Parse error: {}", msg), } }}
impl std::error::Error for Error {}
pub type Result<T> = std::result::Result<T, Error>;8. Integration Guide
8.1 Adding to CI/CD Pipeline
name: 2PC Testing
on: pull_request: paths: - 'heliosdb-compute/src/xa_coordinator.rs' - 'heliosdb-storage/src/xa_participant.rs' - 'heliosdb-storage/src/xa_log.rs' schedule: - cron: '0 2 * * *' # Daily at 2 AM
jobs: 2pc-basic-tests: runs-on: ubuntu-latest-16-cores timeout-minutes: 120
steps: - uses: actions/checkout@v3
- name: Setup Rust uses: actions-rs/toolchain@v1 with: toolchain: stable
- name: Run Basic 2PC Tests run: | cd heliosdb-storage/tests/distributed cargo test --release -- --test-threads=1
- name: Upload Test Results uses: actions/upload-artifact@v3 with: name: 2pc-test-results path: target/test-results/
2pc-chaos-tests: runs-on: ubuntu-latest-16-cores timeout-minutes: 240
steps: - uses: actions/checkout@v3
- name: Run Chaos Tests run: | cd heliosdb-storage/tests/distributed cargo test --release chaos_ -- --test-threads=1
2pc-scale-tests: runs-on: ubuntu-latest-32-cores timeout-minutes: 480
steps: - uses: actions/checkout@v3
- name: Run Scale Tests (1000 nodes) run: | cd heliosdb-storage/tests/distributed cargo test --release scale_1000_ -- --test-threads=18.2 Running Tests Locally
# Run all testscd heliosdb-storage/tests/distributedcargo test --release
# Run specific categorycargo test --release node_crashes_
# Run single scenariocargo test --release scenario_coordinator_crash_after_prepare
# Run with verbose outputRUST_LOG=debug cargo test --release -- --nocapture
# Generate coverage reportcargo tarpaulin --out Html --output-dir target/coverage8.3 Interpreting Results
Test Output Example:
Running scenario: Coordinator Crash After Prepare Transactions: 1 Duration: 12.3s
Validation: Atomicity: ✓ PASS Consistency: ✓ PASS Isolation: ✓ PASS Durability: ✓ PASS Data Integrity: ✓ PASS
Metrics: Prepare Latency (p99): 8.2ms Commit Latency (p99): 10.1s (includes recovery) Recovery Time: 9.8s
Status: ✓ PASSConclusion
This 2PC Testing Infrastructure provides comprehensive, automated validation of HeliosDB’s distributed transaction capabilities. By Week 19, we will have:
- Validated Correctness: Zero data corruption across all 50+ failure scenarios
- Production Certification: Atomicity, Consistency, Isolation, Durability all verified
- Performance Validation: <5% degradation under chaos
- Automated CI/CD: Continuous testing prevents regressions
- Documentation: Complete runbooks for troubleshooting
This unblocks Production Blocker #4 and enables confident deployment of distributed transactions in production.
Next Steps:
- Week 8: Begin implementation (Engineer 1 + AI Agents)
- Week 12: Start test execution (automated)
- Week 19: Production certification
Document Version: 1.0 Last Updated: November 28, 2025 Maintained By: HeliosDB Testing Team Related Documents: