Skip to content

Two-Phase Commit Testing Infrastructure Architecture

Two-Phase Commit Testing Infrastructure Architecture

Document Version: 1.0 Created: November 28, 2025 Status: READY FOR IMPLEMENTATION Phase: Phase 1, Weeks 8-19 Team: 2 Engineers + 3 AI Agents Priority: P0 - PRODUCTION BLOCKER #4


Executive Summary

This document specifies the complete testing infrastructure for validating HeliosDB’s Two-Phase Commit (2PC) implementation at production scale. The design focuses on practical, automatable scenarios that validate correctness, performance, and fault tolerance without requiring massive infrastructure.

Problem Statement

Current State (Week 8):

  • 2PC implementation exists (xa_coordinator.rs, xa_participant.rs, xa_log.rs)
  • Basic tests exist (42 tests in xa_transactions_test.rs)
  • User documentation complete (21_distributed_transactions_2pc.md)
  • NO scale testing (>100 nodes)
  • NO chaos engineering (failure injection)
  • NO distributed correctness validation
  • NO performance regression testing

Gap: Cannot certify production readiness without comprehensive testing infrastructure.

What This Infrastructure Provides

1. Simulation-Based Scale Testing

  • Simulate 1000+ node scenarios on single VM
  • Use virtual participants with configurable behavior
  • Memory-efficient testing (100MB per 1000 nodes)

2. Chaos Engineering Framework

  • 50+ failure scenario definitions
  • Deterministic and random failure injection
  • Network partition/latency simulation
  • Crash/recovery testing

3. Correctness Validation

  • Atomicity verification (all-or-nothing commits)
  • Isolation verification (no dirty reads)
  • Durability verification (WAL-based recovery)
  • Serializability testing (conflict detection)

4. Performance Monitoring

  • Real-time metrics collection
  • Regression detection
  • Bottleneck identification
  • SLA validation

Success Criteria (Week 19)

All 50+ failure scenarios pass with zero data corruption
1000-node distributed transactions validated
<5% performance degradation under chaos
100% atomicity guarantee across all scenarios
Recovery time <10s for all failure modes
Automated CI/CD integration

Resource Requirements

Development (Weeks 8-11):

  • 2 Engineers: Test framework implementation
  • 3 AI Agents: Scenario generation, validation code
  • Budget: $200K

Execution (Weeks 12-19):

  • Single 16-core VM (64GB RAM)
  • No multi-machine cluster required
  • Total runtime: ~48 hours for full suite

Table of Contents

  1. Architecture Overview
  2. Component Specifications
  3. Failure Scenario Catalog
  4. Test Harness Design
  5. Validation Framework
  6. Implementation Roadmap
  7. Module Templates
  8. Integration Guide

1. Architecture Overview

1.1 System Architecture

┌─────────────────────────────────────────────────────────────────┐
│ 2PC Test Orchestrator │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Test Runner │─▶│ Coordinator │─▶│ Validator │ │
│ │ (Main) │ │ Simulator │ │ (Results) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌───────────────────┐
│ Scenario │ │ Virtual │ │ Correctness │
│ Generator │ │ Participant │ │ Checker │
│ │ │ Pool (1000+) │ │ │
│ • Load from DB │ │ │ │ • WAL validator │
│ • Randomize │ │ • In-memory │ │ • Atomicity check │
│ • Replay │ │ • Configurable │ │ • Isolation check │
└─────────────────┘ └──────────────────┘ └───────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Failure Injector │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Network │ │ Node │ │ Timing │ │
│ │ Faults │ │ Crashes │ │ Delays │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Metrics & Monitoring Layer │
│ • Transaction throughput │
│ • Commit latency (p50, p99, p99.9) │
│ • Failure recovery time │
│ • Data corruption detection │
└─────────────────────────────────────────────────────────────────┘

1.2 Virtual Participant Design

Key Innovation: Simulate 1000+ participants without 1000 processes.

/// Lightweight in-memory participant for scale testing
pub struct VirtualParticipant {
pub id: ParticipantId,
pub state: ParticipantState,
pub config: ParticipantConfig,
// Simulated storage (in-memory)
pub data: HashMap<Key, Value>,
pub wal: Vec<WalEntry>,
pub prepared_txns: HashMap<Gid, PreparedTransaction>,
// Behavior configuration
pub failure_mode: Option<FailureMode>,
pub network_latency_ms: u64,
pub crash_probability: f64,
}
pub enum ParticipantState {
Healthy,
Slow, // Responds with delay
NetworkDown, // Doesn't respond
Crashed, // Permanently unavailable
Byzantine, // Sends incorrect responses
}
impl VirtualParticipant {
/// Process PREPARE request
pub fn prepare(&mut self, gid: &Gid, writes: &[(Key, Value)])
-> Result<PrepareVote>
{
// Simulate failure modes
self.inject_failure()?;
// Simulate network latency
thread::sleep(Duration::from_millis(self.network_latency_ms));
// Execute prepare logic
if self.can_commit(writes) {
self.prepared_txns.insert(gid.clone(), PreparedTransaction {
gid: gid.clone(),
writes: writes.to_vec(),
prepared_at: Instant::now(),
});
// Write to WAL
self.wal.push(WalEntry::Prepare(gid.clone()));
Ok(PrepareVote::Yes)
} else {
Ok(PrepareVote::No)
}
}
/// Process COMMIT request
pub fn commit(&mut self, gid: &Gid) -> Result<()> {
self.inject_failure()?;
if let Some(prepared_txn) = self.prepared_txns.remove(gid) {
// Apply writes
for (key, value) in prepared_txn.writes {
self.data.insert(key, value);
}
// Write to WAL
self.wal.push(WalEntry::Commit(gid.clone()));
Ok(())
} else {
Err(Error::TransactionNotPrepared(gid.clone()))
}
}
/// Simulate crash and recovery
pub fn crash_and_recover(&mut self) -> Result<()> {
// Lose in-memory state
self.data.clear();
self.prepared_txns.clear();
// Recover from WAL
self.recover_from_wal()?;
self.state = ParticipantState::Healthy;
Ok(())
}
/// Inject configured failure
fn inject_failure(&self) -> Result<()> {
if let Some(ref failure_mode) = self.failure_mode {
match failure_mode {
FailureMode::AlwaysCrash => return Err(Error::NodeCrashed),
FailureMode::RandomCrash if rand::random::<f64>() < self.crash_probability => {
return Err(Error::NodeCrashed)
},
FailureMode::NetworkPartition => return Err(Error::NetworkTimeout),
FailureMode::SlowResponse => {
thread::sleep(Duration::from_secs(10));
},
_ => {}
}
}
Ok(())
}
}

1.3 Test Execution Model

Single-VM Testing Approach:

/// Test harness runs all components in single process
pub struct TwoPhaseCommitTestHarness {
/// Real XA coordinator (production code)
coordinator: Arc<XaCoordinator>,
/// Pool of virtual participants (1000+)
participants: HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
/// Failure injection controller
failure_injector: FailureInjector,
/// Correctness validator
validator: CorrectnessValidator,
/// Metrics collector
metrics: MetricsCollector,
}
impl TwoPhaseCommitTestHarness {
/// Execute test scenario
pub async fn run_scenario(&mut self, scenario: TestScenario)
-> TestResult
{
// 1. Setup: Configure participants and failures
self.setup_scenario(&scenario)?;
// 2. Execute: Run distributed transactions
let txn_results = self.execute_transactions(&scenario.transactions).await?;
// 3. Inject failures at configured times
self.failure_injector.inject_failures(&scenario.failures).await?;
// 4. Wait for recovery/timeout
self.wait_for_quiescence().await?;
// 5. Validate correctness
let validation_result = self.validator.validate(&txn_results)?;
// 6. Collect metrics
let metrics = self.metrics.collect();
Ok(TestResult {
scenario_name: scenario.name,
validation: validation_result,
metrics,
passed: validation_result.is_ok() && metrics.meets_sla(),
})
}
}

2. Component Specifications

2.1 Test Harness (2pc_test_harness.rs)

Purpose: Orchestrate test execution, coordinate components LOC: ~1,200 Owner: Engineer 1

heliosdb-storage/tests/distributed/2pc_test_harness.rs
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use heliosdb_compute::xa_coordinator::XaCoordinator;
use heliosdb_storage::xa_participant::XaParticipant;
/// Main test harness for 2PC testing
pub struct TwoPhaseCommitTestHarness {
/// Production XA coordinator
coordinator: Arc<XaCoordinator>,
/// Virtual participant pool
participants: HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
/// Failure injection engine
failure_injector: FailureInjector,
/// Correctness validator
validator: CorrectnessValidator,
/// Metrics collector
metrics: Arc<Mutex<MetricsCollector>>,
/// Configuration
config: TestHarnessConfig,
}
#[derive(Clone)]
pub struct TestHarnessConfig {
/// Number of virtual participants
pub num_participants: usize,
/// Base network latency (ms)
pub base_latency_ms: u64,
/// Enable detailed logging
pub verbose: bool,
/// Timeout for coordinator operations (ms)
pub coordinator_timeout_ms: u64,
/// Maximum concurrent transactions
pub max_concurrent_txns: usize,
}
impl Default for TestHarnessConfig {
fn default() -> Self {
Self {
num_participants: 100,
base_latency_ms: 5,
verbose: false,
coordinator_timeout_ms: 5000,
max_concurrent_txns: 1000,
}
}
}
impl TwoPhaseCommitTestHarness {
/// Create new test harness
pub fn new(config: TestHarnessConfig) -> Result<Self> {
// Initialize coordinator
let coordinator_config = XaCoordinatorConfig {
max_prepared_transactions: config.max_concurrent_txns,
timeout_ms: config.coordinator_timeout_ms,
..Default::default()
};
let coordinator = Arc::new(XaCoordinator::new(coordinator_config)?);
// Create virtual participant pool
let mut participants = HashMap::new();
for i in 0..config.num_participants {
let participant_id = ParticipantId::new(format!("participant_{}", i));
let participant = VirtualParticipant::new(participant_id.clone(), config.clone());
participants.insert(participant_id, Arc::new(Mutex::new(participant)));
}
Ok(Self {
coordinator,
participants,
failure_injector: FailureInjector::new(),
validator: CorrectnessValidator::new(),
metrics: Arc::new(Mutex::new(MetricsCollector::new())),
config,
})
}
/// Run single test scenario
pub async fn run_scenario(&mut self, scenario: TestScenario) -> TestResult {
println!("Running scenario: {}", scenario.name);
// Setup
self.setup_scenario(&scenario)?;
// Start metrics collection
let metrics_handle = self.metrics.clone();
// Execute transactions
let txn_results = self.execute_transactions(&scenario).await?;
// Wait for quiescence
tokio::time::sleep(Duration::from_millis(100)).await;
// Validate
let validation = self.validator.validate(&txn_results, &self.participants)?;
// Collect metrics
let metrics = metrics_handle.lock().unwrap().snapshot();
let passed = validation.all_checks_passed() && metrics.meets_sla(&scenario.sla);
Ok(TestResult {
scenario_name: scenario.name.clone(),
validation,
metrics,
passed,
duration: Duration::from_secs(0), // Set properly
})
}
/// Setup scenario configuration
fn setup_scenario(&mut self, scenario: &TestScenario) -> Result<()> {
// Configure failure modes
for (participant_id, failure_config) in &scenario.failure_configs {
if let Some(participant) = self.participants.get(participant_id) {
let mut p = participant.lock().unwrap();
p.configure_failure(failure_config.clone());
}
}
// Configure network conditions
self.failure_injector.configure(&scenario.network_conditions);
Ok(())
}
/// Execute distributed transactions
async fn execute_transactions(&mut self, scenario: &TestScenario)
-> Result<Vec<TransactionResult>>
{
let mut results = Vec::new();
let mut handles = Vec::new();
for txn in &scenario.transactions {
let coordinator = self.coordinator.clone();
let participants = self.get_participants_for_txn(txn);
let metrics = self.metrics.clone();
let handle = tokio::spawn(async move {
Self::execute_single_transaction(
coordinator,
participants,
txn.clone(),
metrics
).await
});
handles.push(handle);
}
// Await all transactions
for handle in handles {
results.push(handle.await??);
}
Ok(results)
}
/// Execute single distributed transaction
async fn execute_single_transaction(
coordinator: Arc<XaCoordinator>,
participants: Vec<Arc<Mutex<VirtualParticipant>>>,
txn: DistributedTransaction,
metrics: Arc<Mutex<MetricsCollector>>,
) -> Result<TransactionResult> {
let start = Instant::now();
// Begin transaction
let gid = Gid::new(txn.id.clone());
// Phase 1: PREPARE
let prepare_start = Instant::now();
let mut votes = Vec::new();
for participant in &participants {
let mut p = participant.lock().unwrap();
let vote = p.prepare(&gid, &txn.writes)?;
votes.push(vote);
}
let prepare_duration = prepare_start.elapsed();
metrics.lock().unwrap().record_prepare_latency(prepare_duration);
// All must vote YES
let all_yes = votes.iter().all(|v| matches!(v, PrepareVote::Yes));
// Phase 2: COMMIT or ABORT
let commit_start = Instant::now();
let outcome = if all_yes {
for participant in &participants {
participant.lock().unwrap().commit(&gid)?;
}
TransactionOutcome::Committed
} else {
for participant in &participants {
participant.lock().unwrap().abort(&gid)?;
}
TransactionOutcome::Aborted
};
let commit_duration = commit_start.elapsed();
metrics.lock().unwrap().record_commit_latency(commit_duration);
let total_duration = start.elapsed();
Ok(TransactionResult {
gid,
outcome,
prepare_duration,
commit_duration,
total_duration,
})
}
fn get_participants_for_txn(&self, txn: &DistributedTransaction)
-> Vec<Arc<Mutex<VirtualParticipant>>>
{
txn.participant_ids
.iter()
.filter_map(|id| self.participants.get(id).cloned())
.collect()
}
}
/// Test scenario definition
#[derive(Clone)]
pub struct TestScenario {
pub name: String,
pub description: String,
pub transactions: Vec<DistributedTransaction>,
pub failure_configs: HashMap<ParticipantId, FailureConfig>,
pub network_conditions: NetworkConditions,
pub sla: SlaRequirements,
}
/// Distributed transaction definition
#[derive(Clone)]
pub struct DistributedTransaction {
pub id: String,
pub participant_ids: Vec<ParticipantId>,
pub writes: Vec<(Key, Value)>,
}
/// Transaction execution result
#[derive(Debug)]
pub struct TransactionResult {
pub gid: Gid,
pub outcome: TransactionOutcome,
pub prepare_duration: Duration,
pub commit_duration: Duration,
pub total_duration: Duration,
}
#[derive(Debug, PartialEq)]
pub enum TransactionOutcome {
Committed,
Aborted,
Failed(String),
}
/// Test execution result
pub struct TestResult {
pub scenario_name: String,
pub validation: ValidationResult,
pub metrics: MetricsSnapshot,
pub passed: bool,
pub duration: Duration,
}
impl TestResult {
pub fn print_summary(&self) {
println!("\n=== Test Result: {} ===", self.scenario_name);
println!("Status: {}", if self.passed { "✓ PASS" } else { "✗ FAIL" });
println!("\nValidation:");
self.validation.print();
println!("\nMetrics:");
self.metrics.print();
println!("Duration: {:?}", self.duration);
}
}

2.2 Failure Injector (failure_injector.rs)

Purpose: Inject failures at deterministic or random times LOC: ~800 Owner: Engineer 1 + AI Agent

heliosdb-storage/tests/distributed/failure_injector.rs
use std::collections::HashMap;
use std::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use rand::Rng;
/// Failure injection engine
pub struct FailureInjector {
/// Scheduled failure events
scheduled_failures: Vec<ScheduledFailure>,
/// Network fault simulator
network_simulator: NetworkSimulator,
/// Active failures
active_failures: HashMap<ParticipantId, FailureMode>,
}
impl FailureInjector {
pub fn new() -> Self {
Self {
scheduled_failures: Vec::new(),
network_simulator: NetworkSimulator::new(),
active_failures: HashMap::new(),
}
}
/// Configure network conditions
pub fn configure(&mut self, conditions: &NetworkConditions) {
self.network_simulator.configure(conditions);
}
/// Schedule failure at specific time
pub fn schedule_failure(&mut self, failure: ScheduledFailure) {
self.scheduled_failures.push(failure);
}
/// Inject failures according to schedule
pub async fn inject_failures(&mut self, scenario_start: Instant) -> Result<()> {
for failure in &self.scheduled_failures {
let delay = failure.delay_from_start;
tokio::time::sleep(delay).await;
self.activate_failure(&failure.participant_id, failure.mode.clone())?;
}
Ok(())
}
/// Activate failure mode for participant
fn activate_failure(&mut self, participant_id: &ParticipantId, mode: FailureMode)
-> Result<()>
{
self.active_failures.insert(participant_id.clone(), mode);
Ok(())
}
/// Clear all failures
pub fn clear_failures(&mut self) {
self.active_failures.clear();
self.scheduled_failures.clear();
}
}
/// Scheduled failure event
#[derive(Clone)]
pub struct ScheduledFailure {
pub participant_id: ParticipantId,
pub mode: FailureMode,
pub delay_from_start: Duration,
pub duration: Option<Duration>, // None = permanent
}
/// Failure modes
#[derive(Clone, Debug)]
pub enum FailureMode {
/// Node crashes immediately
Crash,
/// Node crashes with probability on each request
RandomCrash { probability: f64 },
/// Network partition (can't communicate)
NetworkPartition,
/// Slow responses (delay all requests)
SlowResponse { delay_ms: u64 },
/// Intermittent failures (fail X% of requests)
Intermittent { failure_rate: f64 },
/// Byzantine (send incorrect responses)
Byzantine { behavior: ByzantineBehavior },
/// Out of memory
OutOfMemory,
/// Disk full
DiskFull,
}
#[derive(Clone, Debug)]
pub enum ByzantineBehavior {
/// Vote YES in prepare but don't actually prepare
FalseYesVote,
/// Commit without prepare
CommitWithoutPrepare,
/// Send different votes to different coordinators
InconsistentVotes,
}
/// Network conditions simulator
pub struct NetworkSimulator {
/// Base latency (ms)
base_latency_ms: u64,
/// Latency variance (ms)
latency_variance_ms: u64,
/// Packet loss probability
packet_loss_rate: f64,
/// Bandwidth limit (bytes/sec)
bandwidth_limit: Option<u64>,
}
impl NetworkSimulator {
pub fn new() -> Self {
Self {
base_latency_ms: 5,
latency_variance_ms: 2,
packet_loss_rate: 0.0,
bandwidth_limit: None,
}
}
pub fn configure(&mut self, conditions: &NetworkConditions) {
self.base_latency_ms = conditions.base_latency_ms;
self.latency_variance_ms = conditions.latency_variance_ms;
self.packet_loss_rate = conditions.packet_loss_rate;
self.bandwidth_limit = conditions.bandwidth_limit;
}
/// Simulate network delay
pub fn simulate_delay(&self) -> Duration {
let mut rng = rand::thread_rng();
let variance = rng.gen_range(0..=self.latency_variance_ms);
let total_latency = self.base_latency_ms + variance;
Duration::from_millis(total_latency)
}
/// Check if packet should be dropped
pub fn should_drop_packet(&self) -> bool {
let mut rng = rand::thread_rng();
rng.gen::<f64>() < self.packet_loss_rate
}
}
#[derive(Clone)]
pub struct NetworkConditions {
pub base_latency_ms: u64,
pub latency_variance_ms: u64,
pub packet_loss_rate: f64,
pub bandwidth_limit: Option<u64>,
}
impl Default for NetworkConditions {
fn default() -> Self {
Self {
base_latency_ms: 5,
latency_variance_ms: 2,
packet_loss_rate: 0.0,
bandwidth_limit: None,
}
}
}
/// Failure configuration for participant
#[derive(Clone)]
pub struct FailureConfig {
pub mode: FailureMode,
pub trigger_time: FailureTriggerTime,
}
#[derive(Clone)]
pub enum FailureTriggerTime {
/// Fail immediately at test start
Immediate,
/// Fail after delay
Delayed(Duration),
/// Fail at specific transaction number
AtTransaction(usize),
/// Fail randomly during test
Random,
}

2.3 Correctness Validator (correctness_validator.rs)

Purpose: Verify ACID properties and correctness invariants LOC: ~600 Owner: Engineer 2

heliosdb-storage/tests/distributed/correctness_validator.rs
use std::collections::{HashMap, HashSet};
/// Correctness validation engine
pub struct CorrectnessValidator {
/// Track all transaction outcomes
transaction_log: Vec<TransactionRecord>,
/// Track data state across all participants
global_state_tracker: GlobalStateTracker,
}
impl CorrectnessValidator {
pub fn new() -> Self {
Self {
transaction_log: Vec::new(),
global_state_tracker: GlobalStateTracker::new(),
}
}
/// Validate test execution results
pub fn validate(
&mut self,
txn_results: &[TransactionResult],
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<ValidationResult> {
let mut result = ValidationResult::default();
// Check 1: Atomicity (all-or-nothing)
result.atomicity = self.check_atomicity(txn_results, participants)?;
// Check 2: Consistency (invariants maintained)
result.consistency = self.check_consistency(participants)?;
// Check 3: Isolation (no dirty reads)
result.isolation = self.check_isolation(txn_results, participants)?;
// Check 4: Durability (WAL recovery)
result.durability = self.check_durability(participants)?;
// Check 5: No data corruption
result.data_integrity = self.check_data_integrity(participants)?;
Ok(result)
}
/// Check atomicity: committed txns visible everywhere, aborted txns nowhere
fn check_atomicity(
&self,
txn_results: &[TransactionResult],
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
for txn_result in txn_results {
match txn_result.outcome {
TransactionOutcome::Committed => {
// All participants must have committed writes
for (pid, participant) in participants {
let p = participant.lock().unwrap();
// Check if writes are present
let all_writes_present = self.check_writes_present(
&p,
&txn_result.gid
)?;
if !all_writes_present {
failures.push(format!(
"Transaction {} committed but writes missing on {}",
txn_result.gid, pid
));
}
}
},
TransactionOutcome::Aborted => {
// No participant should have committed writes
for (pid, participant) in participants {
let p = participant.lock().unwrap();
let any_writes_present = self.check_any_writes_present(
&p,
&txn_result.gid
)?;
if any_writes_present {
failures.push(format!(
"Transaction {} aborted but writes present on {}",
txn_result.gid, pid
));
}
}
},
TransactionOutcome::Failed(_) => {
// Same as aborted
}
}
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}
/// Check consistency: application-level invariants
fn check_consistency(
&self,
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
// Example: Bank transfer invariant (total balance unchanged)
let initial_total = self.calculate_total_balance_initial();
let final_total = self.calculate_total_balance(participants)?;
if initial_total != final_total {
failures.push(format!(
"Balance invariant violated: initial={}, final={}",
initial_total, final_total
));
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}
/// Check isolation: no dirty reads, no non-repeatable reads
fn check_isolation(
&self,
txn_results: &[TransactionResult],
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
// Check for dirty reads (reads from uncommitted transactions)
for txn_result in txn_results {
// Implementation depends on read tracking
// For now, check that all reads were from committed transactions
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}
/// Check durability: WAL can recover all committed transactions
fn check_durability(
&self,
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
for (pid, participant) in participants {
let mut p = participant.lock().unwrap();
// Simulate crash and recovery
let data_before = p.data.clone();
p.crash_and_recover()?;
let data_after = p.data.clone();
// Verify data is identical after recovery
if data_before != data_after {
failures.push(format!(
"Participant {} lost data during recovery",
pid
));
}
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}
/// Check data integrity: no corruption, checksums valid
fn check_data_integrity(
&self,
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
for (pid, participant) in participants {
let p = participant.lock().unwrap();
// Verify WAL checksums
for wal_entry in &p.wal {
if !wal_entry.verify_checksum() {
failures.push(format!(
"WAL corruption detected on participant {}",
pid
));
}
}
// Verify data checksums (if implemented)
// ...
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}
fn check_writes_present(&self, participant: &VirtualParticipant, gid: &Gid)
-> Result<bool>
{
// Check if transaction's writes are in participant's data
// Implementation specific to test scenario
Ok(true)
}
fn check_any_writes_present(&self, participant: &VirtualParticipant, gid: &Gid)
-> Result<bool>
{
Ok(false)
}
fn calculate_total_balance_initial(&self) -> i64 {
// Return initial total from test setup
0
}
fn calculate_total_balance(
&self,
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<i64> {
let mut total = 0i64;
for participant in participants.values() {
let p = participant.lock().unwrap();
// Sum balances from participant data
// Implementation specific
}
Ok(total)
}
}
/// Validation result
#[derive(Default)]
pub struct ValidationResult {
pub atomicity: CheckResult,
pub consistency: CheckResult,
pub isolation: CheckResult,
pub durability: CheckResult,
pub data_integrity: CheckResult,
}
impl ValidationResult {
pub fn all_checks_passed(&self) -> bool {
self.atomicity.passed
&& self.consistency.passed
&& self.isolation.passed
&& self.durability.passed
&& self.data_integrity.passed
}
pub fn print(&self) {
println!(" Atomicity: {}", self.atomicity.status());
println!(" Consistency: {}", self.consistency.status());
println!(" Isolation: {}", self.isolation.status());
println!(" Durability: {}", self.durability.status());
println!(" Data Integrity: {}", self.data_integrity.status());
if !self.all_checks_passed() {
println!("\nFailures:");
self.print_failures();
}
}
fn print_failures(&self) {
for (name, check) in [
("Atomicity", &self.atomicity),
("Consistency", &self.consistency),
("Isolation", &self.isolation),
("Durability", &self.durability),
("Data Integrity", &self.data_integrity),
] {
if !check.passed {
for failure in &check.failures {
println!(" [{}] {}", name, failure);
}
}
}
}
}
/// Individual check result
#[derive(Default)]
pub struct CheckResult {
pub passed: bool,
pub failures: Vec<String>,
}
impl CheckResult {
pub fn status(&self) -> &str {
if self.passed { "✓ PASS" } else { "✗ FAIL" }
}
}
/// Global state tracker for consistency checks
struct GlobalStateTracker {
// Track global invariants across all participants
}
impl GlobalStateTracker {
fn new() -> Self {
Self {}
}
}

2.4 Scenario Generator (scenario_generator.rs)

Purpose: Generate and load test scenarios LOC: ~500 Owner: AI Agent (Coder-1)

heliosdb-storage/tests/distributed/scenario_generator.rs
use std::collections::HashMap;
use rand::Rng;
/// Scenario generator
pub struct ScenarioGenerator {
/// Scenario database
scenarios: Vec<TestScenario>,
}
impl ScenarioGenerator {
pub fn new() -> Self {
let mut generator = Self {
scenarios: Vec::new(),
};
// Load all 50+ predefined scenarios
generator.load_predefined_scenarios();
generator
}
/// Load all predefined test scenarios
fn load_predefined_scenarios(&mut self) {
// Category 1: Node Crashes (6 scenarios)
self.scenarios.push(self.scenario_single_participant_crash());
self.scenarios.push(self.scenario_coordinator_crash_before_prepare());
self.scenarios.push(self.scenario_coordinator_crash_after_prepare());
self.scenarios.push(self.scenario_coordinator_crash_during_commit());
self.scenarios.push(self.scenario_multiple_participant_crashes());
self.scenarios.push(self.scenario_cascading_crashes());
// Category 2: Network Partitions (8 scenarios)
self.scenarios.push(self.scenario_network_partition_minority());
self.scenarios.push(self.scenario_network_partition_split_brain());
self.scenarios.push(self.scenario_asymmetric_partition());
self.scenarios.push(self.scenario_partition_during_prepare());
self.scenarios.push(self.scenario_partition_during_commit());
self.scenarios.push(self.scenario_partition_heal_during_recovery());
self.scenarios.push(self.scenario_multiple_partitions());
self.scenarios.push(self.scenario_flapping_network());
// Category 3: Timing Anomalies (10 scenarios)
self.scenarios.push(self.scenario_slow_participant_prepare());
self.scenarios.push(self.scenario_slow_participant_commit());
self.scenarios.push(self.scenario_timeout_during_prepare());
self.scenarios.push(self.scenario_timeout_during_commit());
self.scenarios.push(self.scenario_variable_latency());
self.scenarios.push(self.scenario_packet_loss_5_percent());
self.scenarios.push(self.scenario_packet_loss_20_percent());
self.scenarios.push(self.scenario_high_latency_cross_region());
self.scenarios.push(self.scenario_bandwidth_constraint());
self.scenarios.push(self.scenario_clock_skew());
// Category 4: Coordinator Failures (8 scenarios)
self.scenarios.push(self.scenario_coordinator_failover());
self.scenarios.push(self.scenario_coordinator_restart_with_recovery());
self.scenarios.push(self.scenario_coordinator_out_of_memory());
self.scenarios.push(self.scenario_coordinator_thread_exhaustion());
self.scenarios.push(self.scenario_coordinator_disk_full());
self.scenarios.push(self.scenario_multiple_coordinator_crashes());
self.scenarios.push(self.scenario_coordinator_byzantine());
self.scenarios.push(self.scenario_coordinator_stuck_in_prepare());
// Category 5: Lock Timeouts (6 scenarios)
self.scenarios.push(self.scenario_lock_timeout_single_key());
self.scenarios.push(self.scenario_lock_timeout_multiple_keys());
self.scenarios.push(self.scenario_deadlock_detection());
self.scenarios.push(self.scenario_deadlock_resolution());
self.scenarios.push(self.scenario_livelock_prevention());
self.scenarios.push(self.scenario_long_running_transactions());
// Category 6: Combined Failures (6 scenarios)
self.scenarios.push(self.scenario_network_partition_plus_crash());
self.scenarios.push(self.scenario_slow_network_plus_timeout());
self.scenarios.push(self.scenario_cascading_failures());
self.scenarios.push(self.scenario_recovery_during_new_failures());
self.scenarios.push(self.scenario_multiple_failure_modes());
self.scenarios.push(self.scenario_worst_case_chaos());
// Additional scenarios...
}
/// Get all scenarios
pub fn all_scenarios(&self) -> &[TestScenario] {
&self.scenarios
}
/// Get scenarios by category
pub fn scenarios_by_category(&self, category: &str) -> Vec<&TestScenario> {
self.scenarios
.iter()
.filter(|s| s.category == category)
.collect()
}
/// Generate random scenario
pub fn generate_random_scenario(&self, num_participants: usize, num_txns: usize)
-> TestScenario
{
let mut rng = rand::thread_rng();
// Generate random transactions
let mut transactions = Vec::new();
for i in 0..num_txns {
let num_parts = rng.gen_range(2..=10);
let participant_ids: Vec<_> = (0..num_parts)
.map(|j| ParticipantId::new(format!("participant_{}", j)))
.collect();
transactions.push(DistributedTransaction {
id: format!("txn_{}", i),
participant_ids,
writes: vec![],
});
}
TestScenario {
name: "Random Scenario".to_string(),
description: "Randomly generated test scenario".to_string(),
category: "Random".to_string(),
transactions,
failure_configs: HashMap::new(),
network_conditions: NetworkConditions::default(),
sla: SlaRequirements::default(),
}
}
}
// Example scenario implementations
impl ScenarioGenerator {
/// Scenario: Single participant crashes during prepare
fn scenario_single_participant_crash(&self) -> TestScenario {
TestScenario {
name: "Single Participant Crash During Prepare".to_string(),
description: "One participant crashes after receiving PREPARE but before voting".to_string(),
category: "Node Crashes".to_string(),
transactions: vec![
DistributedTransaction {
id: "txn_1".to_string(),
participant_ids: vec![
ParticipantId::new("participant_0"),
ParticipantId::new("participant_1"), // Will crash
ParticipantId::new("participant_2"),
],
writes: vec![
(Key::new("key1"), Value::new("value1")),
(Key::new("key2"), Value::new("value2")),
],
}
],
failure_configs: {
let mut configs = HashMap::new();
configs.insert(
ParticipantId::new("participant_1"),
FailureConfig {
mode: FailureMode::Crash,
trigger_time: FailureTriggerTime::AtTransaction(0),
}
);
configs
},
network_conditions: NetworkConditions::default(),
sla: SlaRequirements {
max_abort_rate: 1.0, // Expect 100% abort
max_prepare_latency_ms: 5000,
max_commit_latency_ms: 5000,
max_recovery_time_ms: 10000,
},
}
}
/// Scenario: Coordinator crash after PREPARE, before COMMIT
fn scenario_coordinator_crash_after_prepare(&self) -> TestScenario {
TestScenario {
name: "Coordinator Crash After Prepare".to_string(),
description: "Coordinator crashes after sending PREPARE but before COMMIT decision".to_string(),
category: "Coordinator Failures".to_string(),
transactions: vec![
DistributedTransaction {
id: "txn_1".to_string(),
participant_ids: vec![
ParticipantId::new("participant_0"),
ParticipantId::new("participant_1"),
ParticipantId::new("participant_2"),
],
writes: vec![
(Key::new("key1"), Value::new("value1")),
],
}
],
failure_configs: HashMap::new(), // Coordinator crash handled separately
network_conditions: NetworkConditions::default(),
sla: SlaRequirements {
max_abort_rate: 0.0, // Should recover and commit
max_prepare_latency_ms: 5000,
max_commit_latency_ms: 5000,
max_recovery_time_ms: 10000,
},
}
}
/// Scenario: Network partition (split-brain)
fn scenario_network_partition_split_brain(&self) -> TestScenario {
TestScenario {
name: "Network Partition - Split Brain".to_string(),
description: "Cluster splits into two equal partitions".to_string(),
category: "Network Partitions".to_string(),
transactions: vec![
DistributedTransaction {
id: "txn_1".to_string(),
participant_ids: (0..10)
.map(|i| ParticipantId::new(format!("participant_{}", i)))
.collect(),
writes: vec![
(Key::new("key1"), Value::new("value1")),
],
}
],
failure_configs: {
let mut configs = HashMap::new();
// Participants 5-9 partitioned
for i in 5..10 {
configs.insert(
ParticipantId::new(format!("participant_{}", i)),
FailureConfig {
mode: FailureMode::NetworkPartition,
trigger_time: FailureTriggerTime::Delayed(
Duration::from_millis(50)
),
}
);
}
configs
},
network_conditions: NetworkConditions::default(),
sla: SlaRequirements {
max_abort_rate: 1.0, // Transaction should abort
max_prepare_latency_ms: 5000,
max_commit_latency_ms: 5000,
max_recovery_time_ms: 10000,
},
}
}
// ... 47+ more scenario implementations ...
}
/// SLA requirements for test scenario
#[derive(Clone)]
pub struct SlaRequirements {
/// Maximum acceptable abort rate (0.0 - 1.0)
pub max_abort_rate: f64,
/// Maximum prepare latency (ms)
pub max_prepare_latency_ms: u64,
/// Maximum commit latency (ms)
pub max_commit_latency_ms: u64,
/// Maximum recovery time (ms)
pub max_recovery_time_ms: u64,
}
impl Default for SlaRequirements {
fn default() -> Self {
Self {
max_abort_rate: 0.05, // 5% abort rate
max_prepare_latency_ms: 100,
max_commit_latency_ms: 100,
max_recovery_time_ms: 10000,
}
}
}

3. Failure Scenario Catalog

This section defines all 50+ failure scenarios with expected outcomes.

3.1 Category 1: Node Crashes (6 Scenarios)

Scenario 1.1: Single Participant Crash During Prepare

Description: One participant crashes after receiving PREPARE message but before voting.

Setup:

  • 3 participants
  • 1 transaction writing to all 3 participants
  • Participant 1 crashes immediately upon receiving PREPARE

Expected Outcome:

  • Transaction ABORTS (coordinator timeout waiting for vote)
  • Participants 0, 2 abort their prepared state
  • Participant 1 recovers with no committed data
  • Recovery time: <5s

Validation:

  • No data written to any participant
  • WAL shows PREPARE but no COMMIT
  • Atomicity preserved

Scenario 1.2: Coordinator Crash Before Prepare

Description: Coordinator crashes before sending PREPARE to any participant.

Setup:

  • 5 participants
  • 1 transaction
  • Coordinator crashes immediately

Expected Outcome:

  • Transaction ABORTS
  • No participant receives PREPARE
  • No data modified
  • Client receives error

Validation:

  • All participants in IDLE state
  • No WAL entries for this transaction

Scenario 1.3: Coordinator Crash After Prepare, Before Commit

Description: Coordinator crashes after all participants voted YES, before sending COMMIT.

Setup:

  • 3 participants
  • 1 transaction
  • All participants vote YES
  • Coordinator crashes before COMMIT

Expected Outcome:

  • Participants remain in PREPARED state
  • Coordinator recovery detects prepared transaction in WAL
  • Recovery process sends COMMIT to all participants
  • Transaction eventually COMMITS
  • Recovery time: <10s

Validation:

  • All participants have committed data
  • WAL shows PREPARE + COMMIT
  • Atomicity preserved
  • Critical: This tests WAL-based recovery

Scenario 1.4: Coordinator Crash During Commit Phase

Description: Coordinator crashes after sending COMMIT to some (but not all) participants.

Setup:

  • 5 participants
  • 1 transaction
  • Coordinator crashes after sending COMMIT to participants 0, 1

Expected Outcome:

  • Participants 0, 1 commit
  • Participants 2, 3, 4 remain in PREPARED state
  • Coordinator recovery completes commit to remaining participants
  • Recovery time: <10s

Validation:

  • Eventually, ALL participants have committed data
  • No participant left in PREPARED state indefinitely
  • Atomicity eventually consistent

Scenario 1.5: Multiple Participant Crashes

Description: 40% of participants crash during prepare phase.

Setup:

  • 10 participants
  • 4 participants crash during PREPARE

Expected Outcome:

  • Transaction ABORTS (cannot reach quorum)
  • Remaining 6 participants abort
  • Crashed participants recover with no data

Validation:

  • Zero data committed anywhere
  • Recovery of crashed participants clean

Scenario 1.6: Cascading Crashes

Description: Participants crash sequentially during different phases.

Setup:

  • 5 participants
  • Participant 0 crashes during PREPARE
  • Participant 1 crashes during COMMIT

Expected Outcome:

  • Transaction ABORTS (Participant 0 failed to vote)
  • Participant 1 crash has no effect (transaction already aborted)

Validation:

  • No data committed
  • Recovery of both participants clean

3.2 Category 2: Network Partitions (8 Scenarios)

Scenario 2.1: Network Partition - Minority Partition

Description: Network split isolates 20% of participants.

Setup:

  • 10 participants
  • 2 participants (minority) partitioned
  • 1 transaction spanning all 10

Expected Outcome:

  • Majority partition (8 nodes) can reach quorum
  • Transaction ABORTS (cannot commit with 2 participants unreachable)
  • Partition heals → participants sync

Validation:

  • Eventual consistency after partition heal
  • No split-brain data

Scenario 2.2: Split-Brain Network Partition

Description: Network splits cluster exactly in half.

Setup:

  • 10 participants (5 + 5 split)
  • 1 transaction

Expected Outcome:

  • Neither partition has quorum
  • Transaction ABORTS on both sides
  • Partition heals → coordinator resolves to ABORT

Validation:

  • No data committed on either side
  • Atomicity preserved

Scenario 2.3: Asymmetric Partition

Description: Coordinator can reach some participants, but participants can’t reach each other.

Setup:

  • Coordinator → All participants (reachable)
  • Participant 0 ↔ Participant 1 (unreachable)

Expected Outcome:

  • Depends on 2PC implementation
  • If participants need peer communication: ABORT
  • If only coordinator communication: COMMIT (if all vote YES)

Validation:

  • Consistency maintained
  • No conflicting commits

Scenario 2.4: Partition During Prepare Phase

Description: Network partition occurs halfway through PREPARE voting.

Setup:

  • 8 participants
  • 4 vote YES
  • Partition isolates remaining 4 before voting

Expected Outcome:

  • Coordinator cannot collect all votes
  • Transaction ABORTS
  • Timeout triggers abort on all prepared participants

Validation:

  • Participants that voted YES eventually abort
  • No data committed

Scenario 2.5: Partition During Commit Phase

Description: Network partition during COMMIT message propagation.

Setup:

  • All participants vote YES
  • Partition occurs after COMMIT sent to 5/10 participants

Expected Outcome:

  • 5 participants commit
  • 5 participants remain PREPARED
  • Partition heals → recovery completes commit
  • Eventual atomicity

Validation:

  • Eventually all 10 participants have committed data
  • Data consistency across all participants

Scenario 2.6: Partition Heal During Recovery

Description: Network heals while coordinator is recovering a previous transaction.

Setup:

  • Transaction prepared on all participants
  • Coordinator crashes
  • Network partitions
  • Coordinator recovers
  • Network heals

Expected Outcome:

  • Recovery process completes successfully
  • All participants eventually commit
  • Recovery time: <15s (including partition time)

Validation:

  • Atomicity eventually consistent
  • No duplicate commits

Scenario 2.7: Multiple Overlapping Partitions

Description: Multiple partitions occur simultaneously, affecting different participant subsets.

Setup:

  • 15 participants
  • Partition 1: Isolates participants 0-4
  • Partition 2: Isolates participants 10-14

Expected Outcome:

  • Coordinator can only reach 5 participants (5-9)
  • Transaction ABORTS (cannot reach all participants)

Validation:

  • No data committed
  • All partitions eventually heal
  • Consistent state after heal

Scenario 2.8: Flapping Network

Description: Network connectivity flaps repeatedly (connect/disconnect).

Setup:

  • 5 participants
  • Network flaps every 100ms for 2 seconds

Expected Outcome:

  • Coordinator retries failed connections
  • Either transaction commits during stable window OR aborts after timeout
  • No partial commits

Validation:

  • Atomicity maintained despite flapping
  • Performance degradation acceptable (<10x slowdown)

3.3 Category 3: Timing Anomalies (10 Scenarios)

Scenario 3.1: Slow Participant Prepare (1s delay)

Description: One participant is slow to vote in PREPARE phase.

Setup:

  • 5 participants
  • Participant 2 delays PREPARE vote by 1s

Expected Outcome:

  • If timeout > 1s: Transaction commits successfully
  • If timeout < 1s: Transaction aborts (timeout)

Validation:

  • No partial commits
  • Timeout handling correct

Scenario 3.2: Slow Participant Commit (2s delay)

Description: One participant is slow to execute COMMIT.

Setup:

  • 5 participants
  • Participant 3 delays COMMIT execution by 2s

Expected Outcome:

  • Transaction commits on all participants eventually
  • Coordinator waits for slow participant
  • Total latency: ~2s

Validation:

  • All participants eventually commit
  • No timeout during commit phase (commit must be durable)

Scenario 3.3: Timeout During Prepare

Description: Coordinator timeout expires before all votes collected.

Setup:

  • 10 participants
  • 2 participants never respond to PREPARE
  • Timeout: 5s

Expected Outcome:

  • After 5s, coordinator aborts transaction
  • 8 participants that voted YES abort
  • 2 non-responsive participants eventually timeout locally

Validation:

  • No data committed anywhere
  • All participants eventually idle

Scenario 3.4: Timeout During Commit

Description: Participant doesn’t acknowledge COMMIT within timeout.

Setup:

  • 5 participants
  • Participant 4 doesn’t ACK COMMIT (network issue)

Expected Outcome:

  • Coordinator retries COMMIT indefinitely (commit must succeed)
  • Participant 4 eventually receives COMMIT (retry succeeds)
  • Transaction commits on all participants

Validation:

  • Atomicity maintained
  • No permanent PREPARED state

Scenario 3.5: Variable Latency (50ms - 500ms)

Description: Network latency varies randomly between participants.

Setup:

  • 20 participants
  • Latency range: 50ms - 500ms per participant

Expected Outcome:

  • Transaction commits successfully
  • Total latency: ~500ms (slowest participant)
  • Performance degradation proportional to max latency

Validation:

  • All participants commit
  • Latency distribution tracked

Scenario 3.6: Packet Loss 5%

Description: 5% of network packets dropped randomly.

Setup:

  • 10 participants
  • 5% packet loss rate
  • No network-level retries

Expected Outcome:

  • Coordinator retries failed messages
  • Transaction commits eventually
  • Latency increase: ~20% (due to retries)

Validation:

  • No data loss
  • Retries handled correctly

Scenario 3.7: Packet Loss 20%

Description: 20% packet loss (severe network degradation).

Setup:

  • 10 participants
  • 20% packet loss
  • Timeout: 10s

Expected Outcome:

  • High retry count
  • Transaction commits OR aborts within timeout
  • Performance severely degraded

Validation:

  • Atomicity maintained despite severe packet loss
  • No deadlocks from retries

Scenario 3.8: High Latency Cross-Region (200ms)

Description: Simulate cross-region deployment with high latency.

Setup:

  • 6 participants (3 in region A, 3 in region B)
  • Inter-region latency: 200ms
  • Intra-region latency: 5ms

Expected Outcome:

  • Transaction commits successfully
  • Prepare phase: ~400ms (round-trip)
  • Commit phase: ~400ms (round-trip)
  • Total: ~800ms

Validation:

  • Latency matches expectation
  • No timeouts due to high latency

Scenario 3.9: Bandwidth Constraint (1 Mbps)

Description: Simulate low-bandwidth network (1 Mbps).

Setup:

  • 10 participants
  • Large transaction payload (1 MB per participant)
  • Bandwidth: 1 Mbps

Expected Outcome:

  • Transaction commits
  • Latency dominated by bandwidth (10s for 10 MB total)

Validation:

  • No errors due to bandwidth constraint
  • Throughput matches bandwidth limit

Scenario 3.10: Clock Skew (10s difference)

Description: Participant clocks differ by 10 seconds.

Setup:

  • 5 participants
  • Participant 2 clock is 10s ahead

Expected Outcome:

  • Transaction commits successfully
  • Logical timestamps prevent issues
  • No impact on correctness

Validation:

  • Clock skew doesn’t affect atomicity
  • Timestamps monotonically increase

3.4 Category 4: Coordinator Failures (8 Scenarios)

Scenario 4.1: Coordinator Failover

Description: Primary coordinator fails, backup takes over.

Setup:

  • Primary coordinator crashes
  • Backup coordinator detects failure and takes over

Expected Outcome:

  • Backup coordinator recovers in-flight transactions from WAL
  • Prepared transactions completed
  • New transactions routed to backup coordinator
  • Failover time: <5s

Validation:

  • No transaction loss
  • Atomicity maintained across failover

Scenario 4.2: Coordinator Restart with Recovery

Description: Coordinator restarts and recovers prepared transactions.

Setup:

  • 3 prepared transactions in coordinator WAL
  • Coordinator restarts

Expected Outcome:

  • Recovery process reads WAL
  • Completes all 3 prepared transactions
  • Recovery time: <10s

Validation:

  • All 3 transactions committed
  • WAL entries complete

Scenario 4.3: Coordinator Out of Memory

Description: Coordinator runs out of memory during transaction.

Setup:

  • Simulate OOM error during PREPARE processing

Expected Outcome:

  • Coordinator crashes or rejects new transactions
  • In-flight transactions may abort
  • Recovery after restart

Validation:

  • No data corruption
  • Clean recovery

Scenario 4.4: Coordinator Thread Exhaustion

Description: Coordinator thread pool exhausted.

Setup:

  • 1000 concurrent transactions
  • Thread pool size: 100

Expected Outcome:

  • Coordinator queues new transactions
  • Throughput limited by thread pool
  • No failures, just degraded performance

Validation:

  • All transactions eventually complete
  • No deadlocks

Scenario 4.5: Coordinator Disk Full

Description: Coordinator runs out of disk space for WAL.

Setup:

  • Simulate disk full error during WAL write

Expected Outcome:

  • Coordinator rejects new transactions
  • Returns error to client
  • No data loss for completed transactions

Validation:

  • No corrupted WAL
  • Graceful degradation

Scenario 4.6: Multiple Coordinator Crashes

Description: Coordinator crashes multiple times during recovery.

Setup:

  • Coordinator crashes during initial recovery
  • Crashes again after partial recovery

Expected Outcome:

  • Each restart resumes recovery from WAL
  • Eventually completes all prepared transactions
  • Recovery idempotent

Validation:

  • No duplicate commits
  • Atomicity maintained

Scenario 4.7: Byzantine Coordinator

Description: Coordinator sends conflicting COMMIT/ABORT messages.

Setup:

  • Coordinator sends COMMIT to participants 0-4
  • Sends ABORT to participants 5-9

Expected Outcome:

  • Depends on implementation
  • Ideally: Participants detect inconsistency, escalate to admin
  • Worst case: Split-brain data (manual resolution needed)

Validation:

  • Byzantine behavior detected
  • Alerting triggered

Scenario 4.8: Coordinator Stuck in Prepare

Description: Coordinator gets stuck waiting for PREPARE votes indefinitely.

Setup:

  • All participants vote, but coordinator doesn’t process votes (bug simulation)

Expected Outcome:

  • Participants timeout in PREPARED state
  • Auto-abort after timeout
  • Coordinator needs restart

Validation:

  • Participants don’t stay PREPARED forever
  • Timeout protection works

3.5 Category 5: Lock Timeouts (6 Scenarios)

Scenario 5.1: Lock Timeout on Single Key

Description: Transaction waits for lock longer than timeout.

Setup:

  • Transaction T1 holds lock on key “x”
  • Transaction T2 requests lock on “x”
  • Lock timeout: 5s

Expected Outcome:

  • T2 waits for 5s
  • T2 aborts with lock timeout error
  • T1 commits successfully

Validation:

  • No deadlock
  • Timeout enforced correctly

Scenario 5.2: Lock Timeout on Multiple Keys

Description: Transaction times out waiting for multiple locks.

Setup:

  • T1 holds locks on keys {a, b, c}
  • T2 requests locks on keys {a, b, c, d}
  • T2 acquires lock on “d”, waits for {a, b, c}

Expected Outcome:

  • T2 times out
  • T2 releases lock on “d” and aborts

Validation:

  • No partial locks held after timeout
  • Clean lock release

Scenario 5.3: Deadlock Detection (2 transactions)

Description: Classic deadlock between two transactions.

Setup:

  • T1: locks “a”, waits for “b”
  • T2: locks “b”, waits for “a”

Expected Outcome:

  • Deadlock detector identifies cycle
  • One transaction aborted (youngest)
  • Other transaction proceeds

Validation:

  • Deadlock resolved within 1s
  • No permanent deadlock

Scenario 5.4: Deadlock Resolution (3+ transactions)

Description: Complex deadlock involving 3 transactions.

Setup:

  • T1: locks “a”, waits for “b”
  • T2: locks “b”, waits for “c”
  • T3: locks “c”, waits for “a”

Expected Outcome:

  • Deadlock detector identifies 3-way cycle
  • One transaction aborted (breaks cycle)
  • Other 2 proceed

Validation:

  • Deadlock resolved
  • Minimal aborts (only 1 transaction)

Scenario 5.5: Livelock Prevention

Description: Two transactions repeatedly abort and retry, never making progress.

Setup:

  • T1 and T2 both need same locks
  • Both use immediate retry on abort

Expected Outcome:

  • Exponential backoff prevents livelock
  • Eventually one transaction succeeds

Validation:

  • Progress within reasonable time (<10s)
  • No infinite retry loop

Scenario 5.6: Long-Running Transactions

Description: Transaction runs for 60s (longer than typical timeout).

Setup:

  • T1 runs for 60s (complex computation)
  • T2 waits for locks held by T1

Expected Outcome:

  • T1 configures longer timeout (or no timeout)
  • T2 eventually proceeds after T1 commits
  • No premature abort of T1

Validation:

  • Long-running transactions supported
  • Configurable timeouts work

3.6 Category 6: Combined Failures (6 Scenarios)

Scenario 6.1: Network Partition + Node Crash

Description: Network partition followed by node crash on minority side.

Setup:

  • Network partitions (5 + 5 split)
  • 2 nodes crash on minority side

Expected Outcome:

  • Transaction aborts (no quorum on either side)
  • Crashed nodes recover after partition heals
  • No data loss

Validation:

  • Complex failure handled correctly
  • Recovery sequence correct

Scenario 6.2: Slow Network + Timeout

Description: High latency network triggers timeout before transaction completes.

Setup:

  • Network latency: 8s
  • Timeout: 5s

Expected Outcome:

  • Coordinator times out during PREPARE
  • Transaction aborts
  • Participants eventually receive ABORT

Validation:

  • Timeout correctly aborts transaction
  • No orphaned PREPARED participants

Scenario 6.3: Cascading Failures

Description: Failure of one component triggers failures in others.

Setup:

  • Participant 0 crashes → coordinator overload → coordinator crash

Expected Outcome:

  • System handles cascading failures gracefully
  • Recovery doesn’t trigger more failures
  • Eventually stable

Validation:

  • No failure cascade loop
  • System self-stabilizes

Scenario 6.4: Recovery During New Failures

Description: New failures occur during recovery process.

Setup:

  • Coordinator recovering prepared transactions
  • Participant crashes during recovery

Expected Outcome:

  • Recovery process handles new failures
  • Retries failed recovery operations
  • Eventually all transactions resolved

Validation:

  • Recovery robust to new failures
  • No permanent stuck state

Scenario 6.5: Multiple Failure Modes Simultaneously

Description: Network partition, slow participants, and crashes all at once.

Setup:

  • 10 participants
  • 3 partitioned
  • 2 slow (1s delay)
  • 1 crashed

Expected Outcome:

  • Coordinator makes best effort
  • Transaction likely aborts (too many failures)
  • Clean recovery

Validation:

  • No data corruption despite chaos
  • System survives extreme conditions

Scenario 6.6: Worst-Case Chaos (Kitchen Sink)

Description: Every possible failure mode injected randomly.

Setup:

  • 100 participants
  • 100 concurrent transactions
  • Random failures: crashes, partitions, delays, byzantine
  • Duration: 5 minutes

Expected Outcome:

  • Some transactions commit
  • Some transactions abort
  • Zero data corruption
  • System remains stable

Validation:

  • THIS IS THE FINAL BOSS TEST
  • Atomicity: 100%
  • Consistency: 100%
  • Isolation: 100%
  • Durability: 100%
  • Performance degradation: <5x

4. Test Harness Design

4.1 Test Execution Flow

┌─────────────────────────────────────────────────────────────────┐
│ Test Execution Flow │
└─────────────────────────────────────────────────────────────────┘
1. Load Scenario
├─ Read scenario definition
├─ Validate configuration
└─ Initialize test environment
2. Setup Phase
├─ Create coordinator (production code)
├─ Create virtual participants (N = 10-1000)
├─ Configure failure injector
├─ Configure network simulator
└─ Initialize validator & metrics
3. Execution Phase
├─ Start metrics collection
├─ Execute transactions (concurrent)
│ ├─ BEGIN transaction
│ ├─ Coordinator sends PREPARE to participants
│ ├─ Participants vote YES/NO
│ ├─ Coordinator decides COMMIT/ABORT
│ └─ Participants execute decision
├─ Inject failures (according to schedule)
└─ Wait for quiescence
4. Validation Phase
├─ Check atomicity (all-or-nothing)
├─ Check consistency (invariants)
├─ Check isolation (no dirty reads)
├─ Check durability (WAL recovery)
└─ Check data integrity (checksums)
5. Metrics Collection
├─ Transaction throughput
├─ Latency (p50, p99, p99.9)
├─ Failure recovery time
├─ Resource utilization
└─ Error rates
6. Report Generation
├─ Test result (PASS/FAIL)
├─ Detailed validation report
├─ Performance metrics
├─ Failure logs
└─ Recommendations

4.2 Metrics Collection

/// Metrics collector
pub struct MetricsCollector {
/// Transaction metrics
pub txn_count: AtomicU64,
pub txn_committed: AtomicU64,
pub txn_aborted: AtomicU64,
/// Latency metrics
pub prepare_latencies: Mutex<Vec<Duration>>,
pub commit_latencies: Mutex<Vec<Duration>>,
pub total_latencies: Mutex<Vec<Duration>>,
/// Failure metrics
pub failures_detected: AtomicU64,
pub recoveries_completed: AtomicU64,
pub recovery_times: Mutex<Vec<Duration>>,
/// Resource metrics
pub peak_memory_mb: AtomicU64,
pub peak_cpu_percent: AtomicU64,
}
impl MetricsCollector {
pub fn snapshot(&self) -> MetricsSnapshot {
let prepare_latencies = self.prepare_latencies.lock().unwrap();
let commit_latencies = self.commit_latencies.lock().unwrap();
let total_latencies = self.total_latencies.lock().unwrap();
MetricsSnapshot {
txn_count: self.txn_count.load(Ordering::Relaxed),
txn_committed: self.txn_committed.load(Ordering::Relaxed),
txn_aborted: self.txn_aborted.load(Ordering::Relaxed),
prepare_p50: Self::percentile(&prepare_latencies, 0.50),
prepare_p99: Self::percentile(&prepare_latencies, 0.99),
prepare_p999: Self::percentile(&prepare_latencies, 0.999),
commit_p50: Self::percentile(&commit_latencies, 0.50),
commit_p99: Self::percentile(&commit_latencies, 0.99),
commit_p999: Self::percentile(&commit_latencies, 0.999),
total_p50: Self::percentile(&total_latencies, 0.50),
total_p99: Self::percentile(&total_latencies, 0.99),
total_p999: Self::percentile(&total_latencies, 0.999),
failures_detected: self.failures_detected.load(Ordering::Relaxed),
recoveries_completed: self.recoveries_completed.load(Ordering::Relaxed),
peak_memory_mb: self.peak_memory_mb.load(Ordering::Relaxed),
peak_cpu_percent: self.peak_cpu_percent.load(Ordering::Relaxed),
}
}
fn percentile(values: &[Duration], p: f64) -> Duration {
if values.is_empty() {
return Duration::from_millis(0);
}
let mut sorted = values.to_vec();
sorted.sort();
let index = ((values.len() as f64) * p) as usize;
sorted[index.min(values.len() - 1)]
}
pub fn meets_sla(&self, sla: &SlaRequirements) -> bool {
let snapshot = self.snapshot();
let abort_rate = snapshot.txn_aborted as f64 / snapshot.txn_count as f64;
abort_rate <= sla.max_abort_rate
&& snapshot.prepare_p99.as_millis() as u64 <= sla.max_prepare_latency_ms
&& snapshot.commit_p99.as_millis() as u64 <= sla.max_commit_latency_ms
}
}
pub struct MetricsSnapshot {
pub txn_count: u64,
pub txn_committed: u64,
pub txn_aborted: u64,
pub prepare_p50: Duration,
pub prepare_p99: Duration,
pub prepare_p999: Duration,
pub commit_p50: Duration,
pub commit_p99: Duration,
pub commit_p999: Duration,
pub total_p50: Duration,
pub total_p99: Duration,
pub total_p999: Duration,
pub failures_detected: u64,
pub recoveries_completed: u64,
pub peak_memory_mb: u64,
pub peak_cpu_percent: u64,
}
impl MetricsSnapshot {
pub fn print(&self) {
println!(" Transactions:");
println!(" Total: {}", self.txn_count);
println!(" Committed: {}", self.txn_committed);
println!(" Aborted: {}", self.txn_aborted);
println!(" Abort Rate: {:.2}%",
(self.txn_aborted as f64 / self.txn_count as f64) * 100.0);
println!(" Latency (Prepare):");
println!(" P50: {:?}", self.prepare_p50);
println!(" P99: {:?}", self.prepare_p99);
println!(" P99.9: {:?}", self.prepare_p999);
println!(" Latency (Commit):");
println!(" P50: {:?}", self.commit_p50);
println!(" P99: {:?}", self.commit_p99);
println!(" P99.9: {:?}", self.commit_p999);
println!(" Latency (Total):");
println!(" P50: {:?}", self.total_p50);
println!(" P99: {:?}", self.total_p99);
println!(" P99.9: {:?}", self.total_p999);
println!(" Failures:");
println!(" Detected: {}", self.failures_detected);
println!(" Recovered: {}", self.recoveries_completed);
println!(" Resources:");
println!(" Peak Memory: {} MB", self.peak_memory_mb);
println!(" Peak CPU: {}%", self.peak_cpu_percent);
}
}

5. Validation Framework

5.1 Atomicity Validation

Invariant: For any transaction T, either ALL participants commit T’s writes, or NONE do.

Validation Algorithm:

fn check_atomicity(
txn_results: &[TransactionResult],
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
for txn_result in txn_results {
let gid = &txn_result.gid;
// Collect commit status from all participants
let mut commit_count = 0;
let mut total_participants = 0;
for (_pid, participant) in participants {
let p = participant.lock().unwrap();
// Check if this participant committed this transaction
if p.has_committed(gid)? {
commit_count += 1;
}
total_participants += 1;
}
// Atomicity check: commit_count must be 0 OR total_participants
if commit_count != 0 && commit_count != total_participants {
failures.push(format!(
"Atomicity violation for {}: {} out of {} participants committed",
gid, commit_count, total_participants
));
}
// Cross-check with expected outcome
match txn_result.outcome {
TransactionOutcome::Committed => {
if commit_count != total_participants {
failures.push(format!(
"Transaction {} marked as committed but only {} of {} participants committed",
gid, commit_count, total_participants
));
}
},
TransactionOutcome::Aborted => {
if commit_count != 0 {
failures.push(format!(
"Transaction {} marked as aborted but {} participants committed",
gid, commit_count
));
}
},
_ => {}
}
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}

5.2 Consistency Validation

Invariant: Application-specific invariants (e.g., total balance unchanged in bank transfers).

Example (Bank Transfer):

fn check_consistency_bank_transfer(
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
initial_total_balance: i64,
) -> Result<CheckResult> {
let mut failures = Vec::new();
// Calculate final total balance
let mut final_total_balance = 0i64;
for participant in participants.values() {
let p = participant.lock().unwrap();
for (key, value) in &p.data {
if key.starts_with("account_") {
let balance = value.parse::<i64>()?;
final_total_balance += balance;
}
}
}
if initial_total_balance != final_total_balance {
failures.push(format!(
"Total balance changed: initial={}, final={}",
initial_total_balance, final_total_balance
));
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}

5.3 Isolation Validation

Invariant: No transaction reads uncommitted data from another transaction.

Validation: Replay transaction execution log, check that all reads came from committed versions.

5.4 Durability Validation

Invariant: All committed transactions survive crash and recovery.

Validation Algorithm:

fn check_durability(
participants: &HashMap<ParticipantId, Arc<Mutex<VirtualParticipant>>>,
) -> Result<CheckResult> {
let mut failures = Vec::new();
for (pid, participant) in participants {
let mut p = participant.lock().unwrap();
// Snapshot current data
let data_before = p.data.clone();
// Simulate crash and recovery
p.crash_and_recover()?;
// Check data after recovery
let data_after = p.data.clone();
// All committed data must survive
for (key, value_before) in &data_before {
match data_after.get(key) {
Some(value_after) if value_after == value_before => {
// OK: Data survived
},
Some(value_after) => {
failures.push(format!(
"Participant {}: Key {} changed during recovery: {:?} -> {:?}",
pid, key, value_before, value_after
));
},
None => {
failures.push(format!(
"Participant {}: Key {} lost during recovery",
pid, key
));
},
}
}
// No new data should appear
for key in data_after.keys() {
if !data_before.contains_key(key) {
failures.push(format!(
"Participant {}: Key {} appeared after recovery",
pid, key
));
}
}
}
Ok(CheckResult {
passed: failures.is_empty(),
failures,
})
}

6. Implementation Roadmap

Week 8-11: Development (4 weeks, $200K)

Week 8: Foundation

  • Module structure setup
  • Virtual participant implementation
  • Basic test harness

Week 9: Failure Injection

  • Failure injector implementation
  • Network simulator
  • Scenario loader

Week 10: Validation

  • Correctness validator
  • Metrics collector
  • WAL recovery testing

Week 11: Scenarios

  • Implement all 50+ scenarios
  • Integration testing
  • Documentation

Deliverables (Week 11):

  • Complete test framework (4 modules, 3,100 LOC)
  • All 50+ scenarios defined
  • CI/CD integration
  • Developer documentation

Week 12-15: Basic Testing (4 weeks, $0 - automated)

Execute scenarios:

  • Category 1: Node Crashes (6 scenarios)
  • Category 2: Network Partitions (8 scenarios)
  • Category 3: Timing Anomalies (10 scenarios)

Activities:

  • Run tests on CI/CD
  • Collect results
  • File bugs for failures
  • Iterate on fixes

Week 16-18: Advanced Testing (3 weeks, $0 - automated)

Execute scenarios:

  • Category 4: Coordinator Failures (8 scenarios)
  • Category 5: Lock Timeouts (6 scenarios)
  • Category 6: Combined Failures (6 scenarios)

Activities:

  • Scale testing (100, 500, 1000 nodes)
  • Performance regression testing
  • Stress testing
  • Chaos testing

Week 19: Final Validation & Certification (1 week, $0 - automated)

Activities:

  • Run full test suite (50+ scenarios × 3 repetitions)
  • Aggregate results
  • Generate final report
  • Production certification sign-off

Success Criteria:

  • All 50+ scenarios pass
  • Zero data corruption
  • <5% performance degradation
  • Recovery time <10s (all scenarios)

7. Module Templates

7.1 Main Test Runner

heliosdb-storage/tests/distributed/main.rs
use std::time::Instant;
mod 2pc_test_harness;
mod failure_injector;
mod correctness_validator;
mod scenario_generator;
mod virtual_participant;
mod metrics_collector;
use 2pc_test_harness::*;
use scenario_generator::ScenarioGenerator;
#[tokio::main]
async fn main() -> Result<()> {
println!("=== HeliosDB 2PC Testing Infrastructure ===\n");
// Initialize
let config = TestHarnessConfig::default();
let mut harness = TwoPhaseCommitTestHarness::new(config)?;
let generator = ScenarioGenerator::new();
// Get all scenarios
let scenarios = generator.all_scenarios();
println!("Loaded {} test scenarios\n", scenarios.len());
// Run all scenarios
let start = Instant::now();
let mut results = Vec::new();
for scenario in scenarios {
println!("Running: {}", scenario.name);
let result = harness.run_scenario(scenario.clone()).await?;
result.print_summary();
results.push(result);
}
let total_duration = start.elapsed();
// Print summary
print_final_summary(&results, total_duration);
Ok(())
}
fn print_final_summary(results: &[TestResult], total_duration: Duration) {
println!("\n=== FINAL SUMMARY ===");
let total_tests = results.len();
let passed_tests = results.iter().filter(|r| r.passed).count();
let failed_tests = total_tests - passed_tests;
println!("Total Tests: {}", total_tests);
println!("Passed: {} ({}%)", passed_tests, (passed_tests * 100) / total_tests);
println!("Failed: {} ({}%)", failed_tests, (failed_tests * 100) / total_tests);
println!("Total Duration: {:?}", total_duration);
if failed_tests > 0 {
println!("\nFailed Tests:");
for result in results.iter().filter(|r| !r.passed) {
println!(" - {}", result.scenario_name);
}
}
if passed_tests == total_tests {
println!("\n ALL TESTS PASSED - PRODUCTION READY");
} else {
println!("\n❌ SOME TESTS FAILED - NOT PRODUCTION READY");
}
}

7.2 Virtual Participant (Complete Implementation)

heliosdb-storage/tests/distributed/virtual_participant.rs
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use heliosdb_storage::xa_participant::PrepareVote;
use rand::Rng;
/// Virtual participant for testing
pub struct VirtualParticipant {
pub id: ParticipantId,
pub state: ParticipantState,
// Simulated storage
pub data: HashMap<Key, Value>,
pub wal: Vec<WalEntry>,
pub prepared_txns: HashMap<Gid, PreparedTransaction>,
// Behavior configuration
pub failure_mode: Option<FailureMode>,
pub network_latency_ms: u64,
pub crash_probability: f64,
// Metrics
pub prepare_count: usize,
pub commit_count: usize,
pub abort_count: usize,
}
#[derive(Clone, PartialEq, Debug)]
pub enum ParticipantState {
Healthy,
Slow,
NetworkDown,
Crashed,
Byzantine,
}
pub struct PreparedTransaction {
pub gid: Gid,
pub writes: Vec<(Key, Value)>,
pub prepared_at: Instant,
}
#[derive(Clone, Debug)]
pub enum WalEntry {
Prepare(Gid),
Commit(Gid),
Abort(Gid),
}
impl WalEntry {
pub fn verify_checksum(&self) -> bool {
// Simplified checksum verification
true
}
}
impl VirtualParticipant {
pub fn new(id: ParticipantId, config: TestHarnessConfig) -> Self {
Self {
id,
state: ParticipantState::Healthy,
data: HashMap::new(),
wal: Vec::new(),
prepared_txns: HashMap::new(),
failure_mode: None,
network_latency_ms: config.base_latency_ms,
crash_probability: 0.0,
prepare_count: 0,
commit_count: 0,
abort_count: 0,
}
}
pub fn configure_failure(&mut self, config: FailureConfig) {
self.failure_mode = Some(config.mode);
}
pub fn prepare(&mut self, gid: &Gid, writes: &[(Key, Value)]) -> Result<PrepareVote> {
self.inject_failure()?;
self.simulate_network_delay();
if self.state == ParticipantState::Crashed {
return Err(Error::NodeCrashed);
}
// Check if we can prepare
let can_prepare = self.can_prepare(writes)?;
if can_prepare {
// Record in WAL
self.wal.push(WalEntry::Prepare(gid.clone()));
// Store prepared transaction
self.prepared_txns.insert(gid.clone(), PreparedTransaction {
gid: gid.clone(),
writes: writes.to_vec(),
prepared_at: Instant::now(),
});
self.prepare_count += 1;
Ok(PrepareVote::Yes)
} else {
Ok(PrepareVote::No)
}
}
pub fn commit(&mut self, gid: &Gid) -> Result<()> {
self.inject_failure()?;
self.simulate_network_delay();
if let Some(prepared_txn) = self.prepared_txns.remove(gid) {
// Apply writes
for (key, value) in prepared_txn.writes {
self.data.insert(key, value);
}
// Record in WAL
self.wal.push(WalEntry::Commit(gid.clone()));
self.commit_count += 1;
Ok(())
} else {
Err(Error::TransactionNotPrepared(gid.clone()))
}
}
pub fn abort(&mut self, gid: &Gid) -> Result<()> {
self.inject_failure()?;
self.simulate_network_delay();
if let Some(_) = self.prepared_txns.remove(gid) {
// Remove prepared state
self.wal.push(WalEntry::Abort(gid.clone()));
self.abort_count += 1;
Ok(())
} else {
// Not prepared = already committed or never prepared
Ok(())
}
}
pub fn has_committed(&self, gid: &Gid) -> Result<bool> {
// Check WAL for commit entry
Ok(self.wal.iter().any(|entry| {
matches!(entry, WalEntry::Commit(g) if g == gid)
}))
}
pub fn crash_and_recover(&mut self) -> Result<()> {
// Simulate crash: lose in-memory state
self.data.clear();
self.prepared_txns.clear();
self.state = ParticipantState::Crashed;
// Recover from WAL
self.recover_from_wal()?;
self.state = ParticipantState::Healthy;
Ok(())
}
fn recover_from_wal(&mut self) -> Result<()> {
// Replay WAL to restore state
let mut committed_txns = HashMap::new();
for entry in &self.wal {
match entry {
WalEntry::Prepare(gid) => {
// Prepared but not committed = need to query coordinator
// For testing, we'll mark as uncertain
},
WalEntry::Commit(gid) => {
committed_txns.insert(gid.clone(), true);
},
WalEntry::Abort(gid) => {
committed_txns.insert(gid.clone(), false);
},
}
}
// Reconstruct data state
// (Simplified: in reality, need to replay write operations)
Ok(())
}
fn can_prepare(&self, writes: &[(Key, Value)]) -> Result<bool> {
// Check if we can prepare (e.g., no conflicts)
Ok(true)
}
fn inject_failure(&self) -> Result<()> {
if let Some(ref failure_mode) = self.failure_mode {
match failure_mode {
FailureMode::Crash => {
return Err(Error::NodeCrashed);
},
FailureMode::RandomCrash { probability } => {
let mut rng = rand::thread_rng();
if rng.gen::<f64>() < *probability {
return Err(Error::NodeCrashed);
}
},
FailureMode::NetworkPartition => {
return Err(Error::NetworkTimeout);
},
FailureMode::SlowResponse { delay_ms } => {
std::thread::sleep(Duration::from_millis(*delay_ms));
},
_ => {}
}
}
Ok(())
}
fn simulate_network_delay(&self) {
std::thread::sleep(Duration::from_millis(self.network_latency_ms));
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ParticipantId(String);
impl ParticipantId {
pub fn new(id: String) -> Self {
Self(id)
}
}
impl std::fmt::Display for ParticipantId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Gid(String);
impl Gid {
pub fn new(id: String) -> Self {
Self(id)
}
}
impl std::fmt::Display for Gid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Key(String);
impl Key {
pub fn new(key: &str) -> Self {
Self(key.to_string())
}
pub fn starts_with(&self, prefix: &str) -> bool {
self.0.starts_with(prefix)
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Value(String);
impl Value {
pub fn new(value: &str) -> Self {
Self(value.to_string())
}
pub fn parse<T: std::str::FromStr>(&self) -> Result<T>
where
T::Err: std::fmt::Display,
{
self.0.parse().map_err(|e| Error::ParseError(format!("{}", e)))
}
}
#[derive(Debug)]
pub enum Error {
NodeCrashed,
NetworkTimeout,
TransactionNotPrepared(Gid),
ParseError(String),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::NodeCrashed => write!(f, "Node crashed"),
Error::NetworkTimeout => write!(f, "Network timeout"),
Error::TransactionNotPrepared(gid) => write!(f, "Transaction {} not prepared", gid),
Error::ParseError(msg) => write!(f, "Parse error: {}", msg),
}
}
}
impl std::error::Error for Error {}
pub type Result<T> = std::result::Result<T, Error>;

8. Integration Guide

8.1 Adding to CI/CD Pipeline

.github/workflows/2pc-testing.yml
name: 2PC Testing
on:
pull_request:
paths:
- 'heliosdb-compute/src/xa_coordinator.rs'
- 'heliosdb-storage/src/xa_participant.rs'
- 'heliosdb-storage/src/xa_log.rs'
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
jobs:
2pc-basic-tests:
runs-on: ubuntu-latest-16-cores
timeout-minutes: 120
steps:
- uses: actions/checkout@v3
- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Run Basic 2PC Tests
run: |
cd heliosdb-storage/tests/distributed
cargo test --release -- --test-threads=1
- name: Upload Test Results
uses: actions/upload-artifact@v3
with:
name: 2pc-test-results
path: target/test-results/
2pc-chaos-tests:
runs-on: ubuntu-latest-16-cores
timeout-minutes: 240
steps:
- uses: actions/checkout@v3
- name: Run Chaos Tests
run: |
cd heliosdb-storage/tests/distributed
cargo test --release chaos_ -- --test-threads=1
2pc-scale-tests:
runs-on: ubuntu-latest-32-cores
timeout-minutes: 480
steps:
- uses: actions/checkout@v3
- name: Run Scale Tests (1000 nodes)
run: |
cd heliosdb-storage/tests/distributed
cargo test --release scale_1000_ -- --test-threads=1

8.2 Running Tests Locally

Terminal window
# Run all tests
cd heliosdb-storage/tests/distributed
cargo test --release
# Run specific category
cargo test --release node_crashes_
# Run single scenario
cargo test --release scenario_coordinator_crash_after_prepare
# Run with verbose output
RUST_LOG=debug cargo test --release -- --nocapture
# Generate coverage report
cargo tarpaulin --out Html --output-dir target/coverage

8.3 Interpreting Results

Test Output Example:

Running scenario: Coordinator Crash After Prepare
Transactions: 1
Duration: 12.3s
Validation:
Atomicity: ✓ PASS
Consistency: ✓ PASS
Isolation: ✓ PASS
Durability: ✓ PASS
Data Integrity: ✓ PASS
Metrics:
Prepare Latency (p99): 8.2ms
Commit Latency (p99): 10.1s (includes recovery)
Recovery Time: 9.8s
Status: ✓ PASS

Conclusion

This 2PC Testing Infrastructure provides comprehensive, automated validation of HeliosDB’s distributed transaction capabilities. By Week 19, we will have:

  1. Validated Correctness: Zero data corruption across all 50+ failure scenarios
  2. Production Certification: Atomicity, Consistency, Isolation, Durability all verified
  3. Performance Validation: <5% degradation under chaos
  4. Automated CI/CD: Continuous testing prevents regressions
  5. Documentation: Complete runbooks for troubleshooting

This unblocks Production Blocker #4 and enables confident deployment of distributed transactions in production.


Next Steps:

  • Week 8: Begin implementation (Engineer 1 + AI Agents)
  • Week 12: Start test execution (automated)
  • Week 19: Production certification

Document Version: 1.0 Last Updated: November 28, 2025 Maintained By: HeliosDB Testing Team Related Documents: