Skip to content

Serializable Snapshot Isolation (SSI) - Complete Architecture Specification

Serializable Snapshot Isolation (SSI) - Complete Architecture Specification

Document Version: 1.0 Created: November 28, 2025 Status: READY FOR IMPLEMENTATION Phase: Phase 1, Weeks 1-15 (Critical Path) Team: 3 Engineers Priority: P0 - PRODUCTION BLOCKER #3


Executive Summary

This document provides the complete, implementable architecture for Serializable Snapshot Isolation (SSI) in HeliosDB. This is Production Blocker #3 and represents the critical path for Phase 1 production readiness.

What is SSI?

Serializable Snapshot Isolation is the highest transaction isolation level, providing true serializability while maintaining the performance characteristics of snapshot isolation. Unlike traditional serializable isolation that uses strict 2PL (two-phase locking), SSI:

  1. Allows concurrent reads without blocking
  2. Detects conflicts at commit time rather than during execution
  3. Uses read-write dependency tracking instead of predicate locks
  4. Achieves serializability by detecting “dangerous structures” (rw-antidependency cycles)

Why HeliosDB Needs SSI

Current State:

  • Snapshot isolation (SI) implemented
  • Prevents write-write conflicts (first-committer-wins)
  • ❌ SERIALIZABLE isolation level declared but NOT enforced
  • ❌ Write skew anomalies possible
  • ❌ Read-write conflicts not detected

Problem:

-- Transaction T1 (SERIALIZABLE)
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- Returns $500
-- Decision: Balance >= $500, allow withdrawal
-- Transaction T2 (SERIALIZABLE, concurrent)
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- Returns $500
UPDATE accounts SET balance = balance - 400 WHERE id = 1;
COMMIT; -- SUCCESS (balance now $100)
-- Transaction T1 continues
UPDATE accounts SET balance = balance - 300 WHERE id = 1;
COMMIT; -- ❌ BUG: Should ABORT (serialization failure)
-- Should detect: T1 read was invalidated by T2's write

Result: Balance = -$200 (constraint violation!). True SERIALIZABLE would prevent this.

Implementation Scope

Timeline: 15 weeks (Weeks 1-15)

  • Weeks 1-4: Algorithm design & module specifications (THIS DOCUMENT)
  • Weeks 5-10: Core implementation
  • Weeks 11-13: Integration & testing
  • Weeks 14-15: Production hardening

Team Structure:

  • Engineer 1: SIREAD lock manager & conflict detector
  • Engineer 2: Write skew validator & integration
  • Engineer 3: Testing, benchmarks, documentation

Deliverable: Production-ready SSI implementation that prevents all serialization anomalies while maintaining high concurrency.


Table of Contents

  1. SSI Algorithm Design
  2. Module Specifications
  3. Data Structure Design
  4. Integration Contract
  5. API Specifications
  6. Implementation Roadmap
  7. Testing Strategy
  8. Performance Targets

1. SSI Algorithm Design

1.1 Core Concept: Dangerous Structures

SSI prevents serialization anomalies by detecting dangerous structures - patterns of dependencies between transactions that would violate serializability.

Three types of dependencies:

  • wr-dependency (write-read): T1 writes X, T2 reads X
  • ww-dependency (write-write): T1 writes X, T2 writes X
  • rw-antidependency (read-write): T1 reads X, T2 writes X ← This is the key!

Dangerous structure = cycle involving at least two rw-antidependencies:

T1 --rw--> T2 --rw--> T3 --ww--> T1 (CYCLE = ABORT)

1.2 PostgreSQL-Inspired SSI Algorithm

HeliosDB implements the algorithm from “Serializable Snapshot Isolation in PostgreSQL” (Ports et al., VLDB 2012).

Key insight: Track rw-antidependencies using SIREAD locks.

Algorithm steps:

Phase 1: Transaction Start

fn begin_transaction(isolation_level: IsolationLevel) -> TransactionId {
let txn_id = allocate_transaction_id();
let snapshot_ts = current_version();
// Create transaction metadata
let txn = Transaction {
txn_id,
snapshot_ts,
isolation_level,
read_set: HashSet::new(), // Keys read by this transaction
write_set: HashMap::new(), // Keys written by this transaction
inConflict: HashSet::new(), // Transactions in conflict with this
outConflict: HashSet::new(), // Transactions this conflicts with
};
register_transaction(txn);
txn_id
}

Phase 2: Read Operations (Acquire SIREAD Locks)

fn read(txn_id: TransactionId, key: &Key) -> Result<Value> {
let txn = get_transaction(txn_id);
// 1. Read from snapshot
let value = storage.read_at_snapshot(key, txn.snapshot_ts)?;
// 2. Record read in read_set
txn.read_set.insert(key.clone());
// 3. Acquire SIREAD lock (for conflict detection)
if txn.isolation_level == IsolationLevel::Serializable {
siread_lock_manager.acquire_siread_lock(txn_id, key);
}
Ok(value)
}

Phase 3: Write Operations (Track Write Set)

fn write(txn_id: TransactionId, key: &Key, value: &Value) -> Result<()> {
let txn = get_transaction(txn_id);
// 1. Acquire write lock (standard 2PL)
lock_manager.acquire_write_lock(txn_id, key)?;
// 2. Record write in write_set
txn.write_set.insert(key.clone(), value.clone());
// 3. Check for rw-conflicts with SIREAD locks
if txn.isolation_level == IsolationLevel::Serializable {
conflict_detector.check_siread_conflicts(txn_id, key)?;
}
Ok(())
}

Phase 4: Commit (Detect Dangerous Structures)

fn commit(txn_id: TransactionId) -> Result<()> {
let txn = get_transaction(txn_id);
if txn.isolation_level == IsolationLevel::Serializable {
// Check for dangerous structures
if write_skew_validator.has_dangerous_structure(txn_id)? {
abort(txn_id);
return Err(SerializationFailure);
}
}
// Allocate commit version
let commit_ts = allocate_version();
// Apply writes to storage
for (key, value) in &txn.write_set {
storage.write(key, value, commit_ts, txn_id)?;
}
// Release locks
lock_manager.release_locks(txn_id);
siread_lock_manager.release_siread_locks(txn_id);
Ok(())
}

1.3 SIREAD Lock Mechanism

SIREAD locks (Serializable Read locks) are lightweight markers that track which transactions have read which data.

Key properties:

  • Never block: Acquired without waiting
  • Conflict detection only: Used to detect rw-antidependencies
  • Short-lived: Released at commit/abort

Example:

-- Transaction T1 (snapshot_ts = 100)
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SELECT balance FROM accounts WHERE id = 1;
-- Acquires: SIREAD lock on accounts:1 for T1
-- Transaction T2 (snapshot_ts = 101)
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
UPDATE accounts SET balance = 500 WHERE id = 1;
-- Detects: SIREAD lock held by T1 → rw-antidependency (T1 --rw--> T2)
COMMIT; -- Success (commit_ts = 102)
-- Transaction T1 continues
UPDATE accounts SET balance = balance + 100 WHERE id = 1;
-- Validator checks: T1 has inConflict={T2}, T2 has outConflict={T1}
-- Dangerous structure? Check for cycles...
COMMIT; -- ABORT with serialization_failure if cycle detected

1.4 Read-Write Conflict Detection

When to detect rw-antidependencies:

  1. On WRITE: Check if any SIREAD locks exist on the key being written
fn check_siread_conflicts(writer_txn_id: TransactionId, key: &Key) -> Result<()> {
let siread_locks = siread_lock_manager.get_locks_on_key(key);
for reader_txn_id in siread_locks {
if reader_txn_id != writer_txn_id {
// rw-antidependency: reader --rw--> writer
register_conflict(reader_txn_id, writer_txn_id);
}
}
Ok(())
}
  1. On COMMIT: Check if this transaction’s reads were invalidated by concurrent writes
fn validate_read_set(txn: &Transaction) -> Result<()> {
for key in &txn.read_set {
let latest_version = storage.get_latest_version(key);
if latest_version.commit_ts > txn.snapshot_ts {
// This key was modified after our snapshot
let writer_txn_id = latest_version.txn_id;
register_conflict(txn.txn_id, writer_txn_id);
}
}
Ok(())
}

1.5 Dangerous Structure Detection

Cycle detection algorithm:

fn has_dangerous_structure(txn_id: TransactionId) -> bool {
// A transaction is in a dangerous structure if:
// 1. It has both inConflict and outConflict
// 2. There exists a path from outConflict back to inConflict
let txn = get_transaction(txn_id);
if txn.inConflict.is_empty() || txn.outConflict.is_empty() {
return false; // No cycle possible
}
// Check if any outConflict transaction can reach any inConflict transaction
for out_txn_id in &txn.outConflict {
if can_reach_any(out_txn_id, &txn.inConflict) {
return true; // Cycle detected!
}
}
false
}
fn can_reach_any(start: TransactionId, targets: &HashSet<TransactionId>) -> bool {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(start);
while let Some(current) = queue.pop_front() {
if targets.contains(&current) {
return true; // Found path to target
}
if visited.contains(&current) {
continue;
}
visited.insert(current);
let current_txn = get_transaction(current);
for next in &current_txn.outConflict {
queue.push_back(*next);
}
}
false
}

1.6 Integration with Existing MVCC

Existing HeliosDB Components (from analysis):

// From heliosdb-multi-model/src/transaction.rs
pub struct Transaction {
pub id: TransactionId,
pub isolation_level: IsolationLevel,
pub state: TransactionState,
pub start_version: u64, // ← This is snapshot_ts
pub commit_version: Option<u64>,
pub read_set: HashSet<StorageKey>, // Already exists!
pub write_set: HashMap<StorageKey, Bytes>, // Already exists!
// ...
}
// From heliosdb-multi-model/src/mvcc.rs
pub struct MvccStore {
chains: Arc<RwLock<HashMap<Bytes, VersionChain>>>,
active_snapshots: Arc<RwLock<HashMap<TxnId, Timestamp>>>,
// ...
}

SSI Enhancement:

// Add to Transaction struct:
pub struct Transaction {
// ... existing fields ...
// NEW for SSI:
pub in_conflict: HashSet<TransactionId>, // Transactions we conflict with (inConflict)
pub out_conflict: HashSet<TransactionId>, // Transactions that conflict with us (outConflict)
}

2. Module Specifications

2.1 Module: ssi_lock_manager.rs (1,200 LOC)

Purpose: Manage SIREAD locks for detecting read-write conflicts.

Responsibilities:

  • Acquire/release SIREAD locks
  • Track which transactions hold locks on which keys
  • Support both key-level and predicate (range) locks
  • Efficient lock lookup and conflict checking

Key APIs:

pub struct SireadLockManager {
/// Map: Key -> Set of transactions holding SIREAD locks
key_locks: Arc<RwLock<HashMap<Bytes, HashSet<TransactionId>>>>,
/// Map: Transaction -> Set of keys it has SIREAD locked
txn_locks: Arc<RwLock<HashMap<TransactionId, HashSet<Bytes>>>>,
/// Predicate locks for range queries
predicate_locks: Arc<RwLock<Vec<PredicateLock>>>,
/// Metrics
metrics: Arc<RwLock<SireadMetrics>>,
}
impl SireadLockManager {
pub fn new() -> Self;
/// Acquire SIREAD lock on a key
pub fn acquire_siread_lock(&self, txn_id: TransactionId, key: &Bytes) -> Result<()>;
/// Acquire SIREAD lock on a range (predicate lock)
pub fn acquire_predicate_lock(
&self,
txn_id: TransactionId,
predicate: Predicate,
) -> Result<PredicateLockId>;
/// Get all transactions with SIREAD locks on a key
pub fn get_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId>;
/// Get all transactions with predicate locks overlapping a key
pub fn get_predicate_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId>;
/// Release all SIREAD locks for a transaction
pub fn release_locks(&self, txn_id: TransactionId) -> Result<()>;
/// Get metrics
pub fn get_metrics(&self) -> SireadMetrics;
}

Data Structures:

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PredicateLock {
pub txn_id: TransactionId,
pub predicate: Predicate,
pub created_at: Timestamp,
}
#[derive(Debug, Clone)]
pub enum Predicate {
/// Range: start_key <= key < end_key
Range { start_key: Bytes, end_key: Bytes },
/// Table scan: all keys in table
TableScan { table_name: String },
/// Index scan: all keys matching condition
IndexScan { index_name: String, condition: Condition },
}
#[derive(Debug, Clone)]
pub struct SireadMetrics {
pub total_locks_acquired: u64,
pub total_locks_released: u64,
pub active_key_locks: usize,
pub active_predicate_locks: usize,
pub conflicts_detected: u64,
}

Implementation Notes:

  • Use DashMap from dashmap crate for lock-free concurrent hash maps
  • SIREAD locks never block (unlike write locks)
  • Predicate locks use interval tree for efficient overlap detection

Testing:

  • 50+ unit tests covering all API methods
  • Concurrent acquisition/release (1000 threads)
  • Predicate lock overlap detection correctness
  • Memory leak detection (no leaked locks after transaction cleanup)

Estimated LOC: 1,200 (including tests)


2.2 Module: conflict_detector.rs (800 LOC)

Purpose: Detect rw-antidependencies and maintain conflict graph.

Responsibilities:

  • Detect rw-conflicts when writes occur
  • Maintain in/outConflict sets for each transaction
  • Provide conflict graph traversal APIs
  • Optimize conflict detection for hot keys

Key APIs:

pub struct ConflictDetector {
/// Global transaction registry
transactions: Arc<RwLock<HashMap<TransactionId, TransactionMetadata>>>,
/// SIREAD lock manager (dependency)
siread_lock_manager: Arc<SireadLockManager>,
/// Metrics
metrics: Arc<RwLock<ConflictMetrics>>,
}
impl ConflictDetector {
pub fn new(siread_lock_manager: Arc<SireadLockManager>) -> Self;
/// Check for SIREAD conflicts when writing a key
pub fn check_write_conflicts(
&self,
writer_txn_id: TransactionId,
key: &Bytes,
) -> Result<Vec<Conflict>>;
/// Register a rw-antidependency: reader --rw--> writer
pub fn register_rw_conflict(
&self,
reader_txn_id: TransactionId,
writer_txn_id: TransactionId,
) -> Result<()>;
/// Get all transactions in conflict with given transaction
pub fn get_conflicts(&self, txn_id: TransactionId) -> ConflictInfo;
/// Remove transaction from conflict graph (cleanup on commit/abort)
pub fn remove_transaction(&self, txn_id: TransactionId) -> Result<()>;
}
pub struct TransactionMetadata {
pub txn_id: TransactionId,
pub snapshot_ts: Timestamp,
pub isolation_level: IsolationLevel,
pub in_conflict: HashSet<TransactionId>, // This txn depends on these
pub out_conflict: HashSet<TransactionId>, // These depend on this txn
pub state: TransactionState,
}
#[derive(Debug, Clone)]
pub struct Conflict {
pub reader_txn_id: TransactionId,
pub writer_txn_id: TransactionId,
pub key: Bytes,
pub conflict_type: ConflictType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictType {
ReadWrite, // rw-antidependency
WriteWrite, // ww-dependency
}
#[derive(Debug, Clone)]
pub struct ConflictInfo {
pub in_conflict: HashSet<TransactionId>,
pub out_conflict: HashSet<TransactionId>,
}

Implementation Notes:

  • Store TransactionMetadata in shared Arc<RwLock<HashMap>> for concurrent access
  • Use fine-grained locking per transaction (not global lock)
  • Optimize hot key detection (if 100+ transactions read same key, use bloom filter)

Testing:

  • Conflict registration correctness
  • Concurrent write conflict detection (100 threads, 10K writes)
  • Cleanup verification (no leaked metadata after transaction end)

Estimated LOC: 800


2.3 Module: write_skew_validator.rs (600 LOC)

Purpose: Detect dangerous structures (serialization cycles) at commit time.

Responsibilities:

  • Cycle detection in conflict graph
  • Determine which transaction to abort when cycle detected
  • Optimize for common case (no cycles)

Key APIs:

pub struct WriteSkewValidator {
/// Conflict detector (dependency)
conflict_detector: Arc<ConflictDetector>,
/// Metrics
metrics: Arc<RwLock<ValidatorMetrics>>,
}
impl WriteSkewValidator {
pub fn new(conflict_detector: Arc<ConflictDetector>) -> Self;
/// Check if transaction is in a dangerous structure
pub fn has_dangerous_structure(&self, txn_id: TransactionId) -> Result<bool>;
/// Detect cycle using DFS
pub fn detect_cycle(
&self,
start_txn_id: TransactionId,
targets: &HashSet<TransactionId>,
) -> Option<Vec<TransactionId>>;
/// Choose victim transaction to abort (when cycle detected)
pub fn choose_victim(&self, cycle: &[TransactionId]) -> TransactionId;
}
pub struct ValidatorMetrics {
pub validations_performed: u64,
pub cycles_detected: u64,
pub avg_validation_time_us: f64,
pub max_cycle_length: usize,
}

Implementation Details:

impl WriteSkewValidator {
/// Optimized dangerous structure detection
pub fn has_dangerous_structure(&self, txn_id: TransactionId) -> Result<bool> {
let conflicts = self.conflict_detector.get_conflicts(txn_id);
// Fast path: No conflicts = no cycle
if conflicts.in_conflict.is_empty() || conflicts.out_conflict.is_empty() {
return Ok(false);
}
// Slow path: BFS to detect cycle
for out_txn in &conflicts.out_conflict {
if self.can_reach_any(*out_txn, &conflicts.in_conflict) {
return Ok(true); // Cycle found!
}
}
Ok(false)
}
/// BFS reachability check
fn can_reach_any(&self, start: TransactionId, targets: &HashSet<TransactionId>) -> bool {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(start);
while let Some(current) = queue.pop_front() {
if targets.contains(&current) {
return true; // Reached target!
}
if visited.contains(&current) {
continue;
}
visited.insert(current);
let current_conflicts = self.conflict_detector.get_conflicts(current);
for next in &current_conflicts.out_conflict {
queue.push_back(*next);
}
}
false
}
/// Victim selection: Abort youngest transaction (heuristic)
pub fn choose_victim(&self, cycle: &[TransactionId]) -> TransactionId {
// Simple heuristic: Abort transaction with highest txn_id (youngest)
*cycle.iter().max().unwrap()
}
}

Testing:

  • Cycle detection correctness (various graph topologies)
  • Performance on large conflict graphs (1000+ transactions)
  • Victim selection fairness (no starvation)

Estimated LOC: 600


2.4 Module: ssi_types.rs (300 LOC)

Purpose: Shared data types for SSI implementation.

Contents:

use bytes::Bytes;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use serde::{Serialize, Deserialize};
/// Transaction ID (reuse existing type)
pub type TransactionId = u64;
/// Timestamp (reuse existing MVCC timestamp)
pub type Timestamp = u64;
/// SIREAD lock identifier
pub type SireadLockId = u64;
/// Predicate lock identifier
pub type PredicateLockId = u64;
/// Transaction isolation level (extend existing enum)
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable, // ← SSI implementation
}
/// Transaction state
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransactionState {
Active,
Preparing,
Prepared,
Committing,
Committed,
Aborted,
}
/// Serialization error type
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SsiError {
SerializationFailure(String),
DeadlockDetected(TransactionId),
LockTimeout(TransactionId),
TransactionNotFound(TransactionId),
InvalidState(String),
}
impl std::fmt::Display for SsiError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
SsiError::SerializationFailure(msg) => {
write!(f, "Serialization failure: {}", msg)
}
SsiError::DeadlockDetected(txn_id) => {
write!(f, "Deadlock detected for transaction {}", txn_id)
}
SsiError::LockTimeout(txn_id) => {
write!(f, "Lock timeout for transaction {}", txn_id)
}
SsiError::TransactionNotFound(txn_id) => {
write!(f, "Transaction {} not found", txn_id)
}
SsiError::InvalidState(msg) => {
write!(f, "Invalid state: {}", msg)
}
}
}
}
impl std::error::Error for SsiError {}
/// Result type for SSI operations
pub type Result<T> = std::result::Result<T, SsiError>;

Estimated LOC: 300


3. Data Structure Design

3.1 Transaction Dependency Graph

Representation:

/// Global transaction dependency graph
pub struct TransactionGraph {
/// All active transactions
transactions: Arc<RwLock<HashMap<TransactionId, TransactionNode>>>,
/// Next transaction ID
next_txn_id: AtomicU64,
}
/// Node in transaction graph
pub struct TransactionNode {
pub txn_id: TransactionId,
pub snapshot_ts: Timestamp,
pub isolation_level: IsolationLevel,
pub state: TransactionState,
/// Transactions this depends on (T --rw--> other)
pub in_conflict: HashSet<TransactionId>,
/// Transactions that depend on this (other --rw--> T)
pub out_conflict: HashSet<TransactionId>,
/// Read set (for validation)
pub read_set: HashSet<Bytes>,
/// Write set
pub write_set: HashMap<Bytes, Bytes>,
}

Graph Operations:

  • Add edge: O(1) (HashSet insert)
  • Remove node: O(d) where d = degree (number of conflicts)
  • Cycle detection: O(V + E) BFS/DFS

Memory Overhead:

  • Per transaction: ~200 bytes base + read/write set size
  • For 1000 concurrent transactions: ~200KB + data

3.2 SIREAD Lock Table

Key-Level Locks:

pub struct SireadLockTable {
/// Map: Key -> Transactions holding SIREAD locks
key_locks: DashMap<Bytes, HashSet<TransactionId>>,
/// Map: Transaction -> Keys it has locked
txn_locks: DashMap<TransactionId, HashSet<Bytes>>,
}

Predicate Locks (for range queries):

pub struct PredicateLockTable {
/// Interval tree for efficient overlap detection
locks: Arc<RwLock<IntervalTree<Bytes, PredicateLock>>>,
}
pub struct PredicateLock {
pub txn_id: TransactionId,
pub start_key: Bytes,
pub end_key: Bytes,
pub created_at: Timestamp,
}

Operations:

  • Acquire lock: O(1) for key locks, O(log n) for predicate locks
  • Check conflict: O(k) where k = number of locks on key
  • Release locks: O(m) where m = number of locks held by transaction

3.3 Conflict Information Tracking

Per-Transaction Conflict Metadata:

pub struct ConflictMetadata {
/// Transactions we have rw-conflicts with (we read, they wrote)
pub in_conflict: HashSet<TransactionId>,
/// Transactions that have rw-conflicts with us (they read, we wrote)
pub out_conflict: HashSet<TransactionId>,
/// Conflict timestamps (for debugging/logging)
pub conflict_timestamps: Vec<(TransactionId, Timestamp, ConflictType)>,
}

Global Conflict Statistics:

pub struct ConflictStats {
pub total_rw_conflicts: AtomicU64,
pub total_ww_conflicts: AtomicU64,
pub serialization_failures: AtomicU64,
pub avg_conflicts_per_txn: AtomicF64,
}

3.4 Serialization Conflict Types

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SerializationConflict {
/// Write skew: Two transactions read overlapping sets and write disjoint sets
WriteSkew {
txn1_id: TransactionId,
txn2_id: TransactionId,
read_key: Bytes,
write_key: Bytes,
},
/// Read-only anomaly: Read-only transaction sees inconsistent state
ReadOnlyAnomaly {
reader_id: TransactionId,
writer1_id: TransactionId,
writer2_id: TransactionId,
},
/// Phantom read: Transaction reads range, concurrent insert in range
PhantomRead {
reader_id: TransactionId,
writer_id: TransactionId,
predicate: Predicate,
},
}

4. Integration Contract

4.1 Transaction Coordinator Integration

File: heliosdb-multi-model/src/coordinator.rs

Required Changes:

pub struct CoordinatorManager {
// ... existing fields ...
// NEW: SSI components
ssi_lock_manager: Arc<SireadLockManager>,
conflict_detector: Arc<ConflictDetector>,
write_skew_validator: Arc<WriteSkewValidator>,
}
impl CoordinatorManager {
pub async fn begin_transaction(&self, isolation_level: IsolationLevel) -> Result<TransactionId> {
let txn_id = self.allocate_transaction_id().await;
let snapshot_ts = *self.next_version.read().await;
// Register with conflict detector if SERIALIZABLE
if isolation_level == IsolationLevel::Serializable {
self.conflict_detector.register_transaction(txn_id, snapshot_ts, isolation_level)?;
}
// ... existing logic ...
Ok(txn_id)
}
pub async fn read(&self, txn_id: TransactionId, key: &Bytes) -> Result<Option<Bytes>> {
let txn = self.get_transaction(txn_id)?;
// 1. Read from MVCC snapshot
let value = self.storage.read_at_snapshot(key, txn.snapshot_ts).await?;
// 2. Track read in read_set
txn.read_set.insert(key.clone());
// 3. Acquire SIREAD lock if SERIALIZABLE
if txn.isolation_level == IsolationLevel::Serializable {
self.ssi_lock_manager.acquire_siread_lock(txn_id, key)?;
}
Ok(value)
}
pub async fn write(&self, txn_id: TransactionId, key: Bytes, value: Bytes) -> Result<()> {
let txn = self.get_transaction(txn_id)?;
// 1. Acquire write lock (existing 2PL)
self.lock_manager.acquire_write_lock(txn_id, &key, self.lock_timeout).await?;
// 2. Check SIREAD conflicts if SERIALIZABLE
if txn.isolation_level == IsolationLevel::Serializable {
let conflicts = self.conflict_detector.check_write_conflicts(txn_id, &key)?;
for conflict in conflicts {
self.conflict_detector.register_rw_conflict(
conflict.reader_txn_id,
txn_id,
)?;
}
}
// 3. Track write
txn.write_set.insert(key, value);
Ok(())
}
pub async fn commit(&self, txn_id: TransactionId) -> Result<()> {
let txn = self.get_transaction(txn_id)?;
// 1. Validate SERIALIZABLE transactions
if txn.isolation_level == IsolationLevel::Serializable {
if self.write_skew_validator.has_dangerous_structure(txn_id)? {
self.abort(txn_id).await?;
return Err(SsiError::SerializationFailure(format!(
"Transaction {} aborted due to serialization conflict",
txn_id
)));
}
}
// 2. Commit (existing logic)
// ... existing commit logic ...
// 3. Release SIREAD locks
if txn.isolation_level == IsolationLevel::Serializable {
self.ssi_lock_manager.release_locks(txn_id)?;
self.conflict_detector.remove_transaction(txn_id)?;
}
Ok(())
}
}

4.2 MVCC Compatibility Points

File: heliosdb-multi-model/src/mvcc.rs

Integration points:

impl MvccStore {
/// Read with SIREAD lock acquisition
pub async fn read_with_siread_lock(
&self,
key: &StorageKey,
snapshot_ts: Timestamp,
txn_id: TransactionId,
ssi_lock_manager: &SireadLockManager,
) -> Result<Option<Bytes>> {
// 1. MVCC read
let value = self.read(key, snapshot_ts).await?;
// 2. Acquire SIREAD lock
ssi_lock_manager.acquire_siread_lock(txn_id, &key.encode())?;
Ok(value)
}
/// Check conflicts during write
pub async fn write_with_conflict_check(
&self,
key: StorageKey,
value: Bytes,
version: Timestamp,
txn_id: TransactionId,
conflict_detector: &ConflictDetector,
) -> Result<()> {
// 1. Check SIREAD conflicts
let conflicts = conflict_detector.check_write_conflicts(txn_id, &key.encode())?;
for conflict in conflicts {
conflict_detector.register_rw_conflict(conflict.reader_txn_id, txn_id)?;
}
// 2. MVCC write
self.write(key, value, version, txn_id).await?;
Ok(())
}
}

4.3 2PC Integration Approach

Distributed SSI (for multi-node deployments):

pub struct Distributed2PCCoordinator {
// ... existing fields ...
// SSI components (global coordination)
global_ssi_lock_manager: Arc<SireadLockManager>,
global_conflict_detector: Arc<ConflictDetector>,
}
impl Distributed2PCCoordinator {
pub async fn prepare(&self, txn_id: TransactionId) -> Result<bool> {
// 1. Local validation
if self.write_skew_validator.has_dangerous_structure(txn_id)? {
return Ok(false); // Vote ABORT
}
// 2. Send PREPARE to all participants
let votes = self.send_prepare_to_participants(txn_id).await?;
// 3. All participants must vote COMMIT
Ok(votes.iter().all(|vote| *vote == Vote::VoteCommit))
}
pub async fn commit(&self, txn_id: TransactionId) -> Result<()> {
// 1. Send COMMIT to all participants
self.send_commit_to_participants(txn_id).await?;
// 2. Release global SIREAD locks
self.global_ssi_lock_manager.release_locks(txn_id)?;
self.global_conflict_detector.remove_transaction(txn_id)?;
Ok(())
}
}

Key insight: SSI validation happens before 2PC PREPARE phase. If dangerous structure detected, transaction aborts before distributed coordination.

4.4 Error Handling Strategy

Error Types:

pub enum SsiError {
/// Serialization failure (SQLSTATE 40001)
SerializationFailure(String),
/// Deadlock detected (SQLSTATE 40P01)
DeadlockDetected(TransactionId),
/// Lock timeout
LockTimeout(TransactionId),
/// Internal error
InternalError(String),
}

Error Propagation:

impl From<SsiError> for PgError {
fn from(err: SsiError) -> PgError {
match err {
SsiError::SerializationFailure(msg) => {
PgError::new(PgErrorCode::SerializationFailure, msg)
}
SsiError::DeadlockDetected(txn_id) => {
PgError::new(
PgErrorCode::DeadlockDetected,
format!("Deadlock detected for transaction {}", txn_id),
)
}
SsiError::LockTimeout(txn_id) => {
PgError::new(
PgErrorCode::LockNotAvailable,
format!("Lock timeout for transaction {}", txn_id),
)
}
SsiError::InternalError(msg) => {
PgError::new(PgErrorCode::InternalError, msg)
}
}
}
}

Client Retry Logic:

-- PostgreSQL client code
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
LOOP
BEGIN
-- Your transaction logic here
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
EXIT; -- Success!
EXCEPTION
WHEN SQLSTATE '40001' THEN -- serialization_failure
ROLLBACK;
-- Retry transaction
END;
END LOOP;

5. API Specifications

5.1 Public Transaction API

High-level API for application developers:

// BEGIN TRANSACTION
pub async fn begin_transaction(
&self,
isolation_level: IsolationLevel,
) -> Result<TransactionId>;
// READ
pub async fn read(
&self,
txn_id: TransactionId,
table: &str,
key: &Bytes,
) -> Result<Option<Bytes>>;
// RANGE READ (with predicate lock for SERIALIZABLE)
pub async fn range_read(
&self,
txn_id: TransactionId,
table: &str,
start_key: &Bytes,
end_key: &Bytes,
) -> Result<Vec<(Bytes, Bytes)>>;
// WRITE
pub async fn write(
&self,
txn_id: TransactionId,
table: &str,
key: Bytes,
value: Bytes,
) -> Result<()>;
// DELETE
pub async fn delete(
&self,
txn_id: TransactionId,
table: &str,
key: &Bytes,
) -> Result<()>;
// COMMIT
pub async fn commit(&self, txn_id: TransactionId) -> Result<()>;
// ABORT
pub async fn abort(&self, txn_id: TransactionId) -> Result<()>;

5.2 Internal SSI APIs

SIREAD Lock Manager:

pub trait SireadLockManager: Send + Sync {
fn acquire_siread_lock(&self, txn_id: TransactionId, key: &Bytes) -> Result<()>;
fn acquire_predicate_lock(&self, txn_id: TransactionId, predicate: Predicate) -> Result<PredicateLockId>;
fn get_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId>;
fn get_predicate_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId>;
fn release_locks(&self, txn_id: TransactionId) -> Result<()>;
fn get_metrics(&self) -> SireadMetrics;
}

Conflict Detector:

pub trait ConflictDetector: Send + Sync {
fn register_transaction(
&self,
txn_id: TransactionId,
snapshot_ts: Timestamp,
isolation_level: IsolationLevel,
) -> Result<()>;
fn check_write_conflicts(
&self,
writer_txn_id: TransactionId,
key: &Bytes,
) -> Result<Vec<Conflict>>;
fn register_rw_conflict(
&self,
reader_txn_id: TransactionId,
writer_txn_id: TransactionId,
) -> Result<()>;
fn get_conflicts(&self, txn_id: TransactionId) -> ConflictInfo;
fn remove_transaction(&self, txn_id: TransactionId) -> Result<()>;
}

Write Skew Validator:

pub trait WriteSkewValidator: Send + Sync {
fn has_dangerous_structure(&self, txn_id: TransactionId) -> Result<bool>;
fn detect_cycle(&self, start_txn_id: TransactionId, targets: &HashSet<TransactionId>) -> Option<Vec<TransactionId>>;
fn choose_victim(&self, cycle: &[TransactionId]) -> TransactionId;
fn get_metrics(&self) -> ValidatorMetrics;
}

5.3 Thread-Safety Guarantees

All SSI components are thread-safe:

  • SireadLockManager: Uses DashMap (lock-free concurrent hash map)
  • ConflictDetector: Uses Arc<RwLock<HashMap>> with fine-grained locking
  • WriteSkewValidator: Read-only access to conflict graph (no mutations)

Concurrency Guarantees:

  • SIREAD lock acquisition: Lock-free, never blocks
  • Conflict detection: Multiple writers can register conflicts concurrently
  • Cycle detection: Read-only operation, safe for concurrent validation

Deadlock Prevention:

  • SIREAD locks never wait (acquired optimistically)
  • Write locks use timeout-based deadlock detection (existing)
  • Victim selection breaks cycles deterministically

5.4 Error Types and Handling

Error Hierarchy:

pub enum SsiError {
SerializationFailure(String),
DeadlockDetected(TransactionId),
LockTimeout(TransactionId),
TransactionNotFound(TransactionId),
InvalidState(String),
InternalError(String),
}
impl From<SsiError> for DatabaseError {
fn from(err: SsiError) -> DatabaseError {
match err {
SsiError::SerializationFailure(msg) => {
DatabaseError::SerializationFailure { message: msg }
}
SsiError::DeadlockDetected(txn_id) => {
DatabaseError::DeadlockDetected { txn_id }
}
// ... other conversions ...
}
}
}

Recovery Actions:

  • SerializationFailure: Client retries transaction
  • DeadlockDetected: Client retries after backoff
  • LockTimeout: Client retries with increased timeout
  • InternalError: Log error, abort transaction, alert operator

6. Implementation Roadmap

Week 1-2: Foundation & Design

Owner: All engineers

Tasks:

  • Finalize data structure designs
  • Set up test infrastructure
  • Create module scaffolding
  • Design API contracts
  • Write integration test cases

Deliverables:

  • Empty module files with documented APIs
  • Test harness for SSI testing
  • Performance benchmark framework
  • Design review document

Week 3-4: SIREAD Lock Manager

Owner: Engineer 1

Tasks:

  • Implement key-level SIREAD locks
  • Implement predicate (range) locks
  • Write unit tests (50+ tests)
  • Benchmark lock acquisition performance
  • Integration with transaction coordinator

Deliverables:

  • ssi_lock_manager.rs (1,200 LOC)
  • Unit tests (200 LOC)
  • Performance benchmarks
  • Documentation

Acceptance Criteria:

  • All unit tests pass
  • Lock acquisition: <100ns (p99)
  • No memory leaks under load

Week 5-6: Conflict Detector

Owner: Engineer 2

Tasks:

  • Implement conflict graph management
  • Write conflict registration logic
  • Integrate with SIREAD lock manager
  • Write unit tests (40+ tests)
  • Optimize for hot keys

Deliverables:

  • conflict_detector.rs (800 LOC)
  • Unit tests (150 LOC)
  • Conflict graph visualization tool (optional)
  • Documentation

Acceptance Criteria:

  • Correct conflict detection (validated by tests)
  • Support 10,000+ concurrent transactions
  • Graph cleanup on transaction end

Week 7-8: Write Skew Validator

Owner: Engineer 3

Tasks:

  • Implement cycle detection algorithm
  • Write victim selection logic
  • Benchmark cycle detection performance
  • Write unit tests (30+ tests)
  • Integration testing

Deliverables:

  • write_skew_validator.rs (600 LOC)
  • Unit tests (100 LOC)
  • Cycle detection benchmarks
  • Documentation

Acceptance Criteria:

  • Cycle detection correctness (100% accuracy)
  • Validation time: <1ms for typical graphs
  • No false positives

Week 9-10: Transaction Coordinator Integration

Owner: All engineers

Tasks:

  • Modify begin_transaction() to initialize SSI
  • Update read() to acquire SIREAD locks
  • Update write() to detect conflicts
  • Update commit() to validate serializability
  • Update abort() to cleanup SSI state

Deliverables:

  • Updated coordinator.rs (+300 LOC)
  • Integration tests (200 LOC)
  • End-to-end SSI validation

Acceptance Criteria:

  • TPC-C runs with SERIALIZABLE isolation
  • No serialization anomalies detected
  • Correct abort on dangerous structures

Week 11-12: MVCC & 2PC Integration

Owner: Engineer 1 & 2

Tasks:

  • Integrate SSI with existing MVCC snapshot isolation
  • Add SIREAD lock support to range scans
  • Integrate with 2PC coordinator
  • Test distributed SSI scenarios
  • Write distributed conflict detection tests

Deliverables:

  • Updated mvcc.rs (+200 LOC)
  • Updated coordinator.rs (2PC) (+150 LOC)
  • Distributed SSI tests (150 LOC)

Acceptance Criteria:

  • Snapshot isolation + SSI work together
  • Distributed transactions validated correctly
  • No conflicts missed across nodes

Week 13: Testing & Validation

Owner: Engineer 3 + QA

Tasks:

  • Run TPC-C with SERIALIZABLE isolation
  • Run PostgreSQL regression tests
  • Write anomaly detection tests
  • Performance regression testing
  • Chaos engineering (random aborts, network failures)

Deliverables:

  • TPC-C results (target: >5,000 tpmC)
  • PostgreSQL compatibility report
  • Anomaly test suite (50+ tests)
  • Performance benchmark results

Acceptance Criteria:

  • Zero serialization anomalies detected
  • TPC-C throughput >= 5,000 tpmC
  • PostgreSQL regression tests: 90%+ pass

Week 14: Production Hardening

Owner: All engineers

Tasks:

  • Memory leak detection (Valgrind, ASAN)
  • Stress testing (10K+ concurrent transactions)
  • Monitoring & metrics integration
  • Error handling hardening
  • Documentation finalization

Deliverables:

  • Production deployment guide
  • Monitoring dashboard
  • Operations runbook
  • Complete API documentation

Acceptance Criteria:

  • No memory leaks under 24hr load test
  • All error paths tested
  • Metrics exported to Prometheus
  • Documentation complete

Week 15: Beta Deployment & Feedback

Owner: All engineers + DevOps

Tasks:

  • Deploy to beta environment
  • Run production-like workloads
  • Collect performance metrics
  • Fix critical bugs
  • Create rollback plan

Deliverables:

  • Beta deployment report
  • Performance analysis
  • Bug fix patches
  • Rollback procedure

Acceptance Criteria:

  • Beta environment stable for 1 week
  • No critical bugs found
  • Performance meets targets

7. Testing Strategy

7.1 Unit Tests

Module-level tests:

#[cfg(test)]
mod ssi_lock_manager_tests {
use super::*;
#[test]
fn test_acquire_siread_lock() {
let lock_manager = SireadLockManager::new();
let key = Bytes::from("test_key");
// Acquire lock
lock_manager.acquire_siread_lock(1, &key).unwrap();
// Verify lock registered
let locks = lock_manager.get_locks_on_key(&key);
assert_eq!(locks, vec![1]);
}
#[test]
fn test_concurrent_siread_lock_acquisition() {
let lock_manager = Arc::new(SireadLockManager::new());
let key = Bytes::from("test_key");
// Spawn 100 threads acquiring locks
let handles: Vec<_> = (0..100)
.map(|i| {
let lm = Arc::clone(&lock_manager);
let k = key.clone();
std::thread::spawn(move || {
lm.acquire_siread_lock(i, &k).unwrap();
})
})
.collect();
// Wait for all threads
for handle in handles {
handle.join().unwrap();
}
// Verify all 100 locks acquired
let locks = lock_manager.get_locks_on_key(&key);
assert_eq!(locks.len(), 100);
}
#[test]
fn test_predicate_lock_overlap_detection() {
let lock_manager = SireadLockManager::new();
// Acquire predicate lock on range [10, 20)
let predicate1 = Predicate::Range {
start_key: Bytes::from("10"),
end_key: Bytes::from("20"),
};
lock_manager.acquire_predicate_lock(1, predicate1).unwrap();
// Check if key 15 overlaps
let key = Bytes::from("15");
let locks = lock_manager.get_predicate_locks_on_key(&key);
assert_eq!(locks, vec![1]);
// Check if key 25 does NOT overlap
let key = Bytes::from("25");
let locks = lock_manager.get_predicate_locks_on_key(&key);
assert_eq!(locks, Vec::<TransactionId>::new());
}
}
#[cfg(test)]
mod conflict_detector_tests {
#[test]
fn test_register_rw_conflict() {
let lock_manager = Arc::new(SireadLockManager::new());
let detector = ConflictDetector::new(lock_manager);
// Register transactions
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
// Register rw-conflict: T1 --rw--> T2
detector.register_rw_conflict(1, 2).unwrap();
// Verify conflict recorded
let conflicts = detector.get_conflicts(1);
assert!(conflicts.out_conflict.contains(&2));
let conflicts = detector.get_conflicts(2);
assert!(conflicts.in_conflict.contains(&1));
}
#[test]
fn test_check_write_conflicts() {
let lock_manager = Arc::new(SireadLockManager::new());
let detector = ConflictDetector::new(Arc::clone(&lock_manager));
let key = Bytes::from("test_key");
// T1 reads key (acquires SIREAD lock)
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
lock_manager.acquire_siread_lock(1, &key).unwrap();
// T2 writes key (should detect conflict with T1)
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
let conflicts = detector.check_write_conflicts(2, &key).unwrap();
// Should detect rw-conflict
assert_eq!(conflicts.len(), 1);
assert_eq!(conflicts[0].reader_txn_id, 1);
assert_eq!(conflicts[0].writer_txn_id, 2);
}
}
#[cfg(test)]
mod write_skew_validator_tests {
#[test]
fn test_cycle_detection() {
let lock_manager = Arc::new(SireadLockManager::new());
let detector = Arc::new(ConflictDetector::new(lock_manager));
let validator = WriteSkewValidator::new(detector.clone());
// Create cycle: T1 --rw--> T2 --rw--> T3 --ww--> T1
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
detector.register_transaction(3, 102, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap(); // T1 --rw--> T2
detector.register_rw_conflict(2, 3).unwrap(); // T2 --rw--> T3
// Simulate T3 writing key that T1 wrote (ww-dependency)
detector.register_transaction_metadata(3, |meta| {
meta.out_conflict.insert(1); // T3 --> T1
});
detector.register_transaction_metadata(1, |meta| {
meta.in_conflict.insert(3); // T1 <-- T3
});
// Validate T1: Should detect cycle
let has_cycle = validator.has_dangerous_structure(1).unwrap();
assert!(has_cycle);
}
#[test]
fn test_no_cycle_no_abort() {
let lock_manager = Arc::new(SireadLockManager::new());
let detector = Arc::new(ConflictDetector::new(lock_manager));
let validator = WriteSkewValidator::new(detector.clone());
// T1 --rw--> T2 (no cycle)
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap();
// Validate T1: No cycle
let has_cycle = validator.has_dangerous_structure(1).unwrap();
assert!(!has_cycle);
}
}

Total Unit Tests: 100+ tests across all modules

7.2 Integration Tests

End-to-end SSI validation:

#[tokio::test]
async fn test_write_skew_prevention() {
let coordinator = setup_coordinator().await;
// Initialize accounts
coordinator.execute("INSERT INTO accounts (id, balance) VALUES (1, 500), (2, 500)").await.unwrap();
// Transaction T1
let txn1 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
let balance1 = coordinator.read(txn1, "accounts", &Bytes::from("1")).await.unwrap();
assert_eq!(balance1, Some(Bytes::from("500")));
// Transaction T2 (concurrent)
let txn2 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
let balance2 = coordinator.read(txn2, "accounts", &Bytes::from("2")).await.unwrap();
assert_eq!(balance2, Some(Bytes::from("500")));
// T2 writes to account 1
coordinator.write(txn2, "accounts", Bytes::from("1"), Bytes::from("100")).await.unwrap();
coordinator.commit(txn2).await.unwrap(); // SUCCESS
// T1 writes to account 2 (based on stale read of account 1)
coordinator.write(txn1, "accounts", Bytes::from("2"), Bytes::from("100")).await.unwrap();
// T1 commit should FAIL (serialization failure)
let result = coordinator.commit(txn1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), SsiError::SerializationFailure(_)));
}
#[tokio::test]
async fn test_phantom_read_prevention() {
let coordinator = setup_coordinator().await;
// T1: Range read
let txn1 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
let rows = coordinator.range_read(
txn1,
"accounts",
&Bytes::from("1"),
&Bytes::from("100"),
).await.unwrap();
assert_eq!(rows.len(), 50);
// T2: Insert into range
let txn2 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
coordinator.write(txn2, "accounts", Bytes::from("25"), Bytes::from("new_value")).await.unwrap();
coordinator.commit(txn2).await.unwrap();
// T1: Read again (should still see 50 rows, not 51)
let rows = coordinator.range_read(
txn1,
"accounts",
&Bytes::from("1"),
&Bytes::from("100"),
).await.unwrap();
assert_eq!(rows.len(), 50); // Snapshot isolation
// T1: Commit should FAIL (phantom read detected)
let result = coordinator.commit(txn1).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_read_only_serializable_transaction() {
let coordinator = setup_coordinator().await;
// T1: Read-only transaction
let txn1 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
let balance1 = coordinator.read(txn1, "accounts", &Bytes::from("1")).await.unwrap();
let balance2 = coordinator.read(txn1, "accounts", &Bytes::from("2")).await.unwrap();
// T2: Write to account 1
let txn2 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
coordinator.write(txn2, "accounts", Bytes::from("1"), Bytes::from("600")).await.unwrap();
coordinator.commit(txn2).await.unwrap();
// T3: Write to account 2
let txn3 = coordinator.begin_transaction(IsolationLevel::Serializable).await.unwrap();
coordinator.write(txn3, "accounts", Bytes::from("2"), Bytes::from("400")).await.unwrap();
coordinator.commit(txn3).await.unwrap();
// T1: Commit should SUCCEED (read-only, no write skew)
coordinator.commit(txn1).await.unwrap();
}
#[tokio::test]
async fn test_distributed_ssi_with_2pc() {
let coordinator = setup_distributed_coordinator().await;
// Distributed transaction across 2 nodes
let txn = coordinator.begin_distributed_transaction(
IsolationLevel::Serializable,
vec![node1, node2],
).await.unwrap();
// Write to node1
coordinator.write_to_node(txn, node1, "key1", "value1").await.unwrap();
// Write to node2
coordinator.write_to_node(txn, node2, "key2", "value2").await.unwrap();
// Commit (2PC + SSI validation)
coordinator.commit_distributed(txn).await.unwrap();
// Verify both writes committed
assert_eq!(coordinator.read_from_node(node1, "key1").await.unwrap(), "value1");
assert_eq!(coordinator.read_from_node(node2, "key2").await.unwrap(), "value2");
}

Total Integration Tests: 50+ tests

7.3 Anomaly Detection Tests

Test for known serialization anomalies:

// Write skew
#[tokio::test]
async fn test_write_skew_anomaly();
// Lost update
#[tokio::test]
async fn test_lost_update_anomaly();
// Phantom read
#[tokio::test]
async fn test_phantom_read_anomaly();
// Read-only anomaly
#[tokio::test]
async fn test_read_only_anomaly();
// Serialization graph cycle
#[tokio::test]
async fn test_serialization_graph_cycle();

Total Anomaly Tests: 20+ tests

7.4 Performance Tests

Benchmarks:

#[bench]
fn bench_siread_lock_acquisition(b: &mut Bencher) {
let lock_manager = SireadLockManager::new();
let key = Bytes::from("test_key");
b.iter(|| {
lock_manager.acquire_siread_lock(1, &key).unwrap();
});
}
#[bench]
fn bench_conflict_detection(b: &mut Bencher) {
let detector = setup_conflict_detector();
let key = Bytes::from("test_key");
b.iter(|| {
detector.check_write_conflicts(1, &key).unwrap();
});
}
#[bench]
fn bench_cycle_detection(b: &mut Bencher) {
let validator = setup_validator_with_graph();
b.iter(|| {
validator.has_dangerous_structure(1).unwrap();
});
}
#[bench]
fn bench_serializable_transaction_commit(b: &mut Bencher) {
let coordinator = setup_coordinator();
b.iter(|| {
let txn = coordinator.begin_transaction(IsolationLevel::Serializable).unwrap();
coordinator.write(txn, "key", "value").unwrap();
coordinator.commit(txn).unwrap();
});
}

Target Performance:

  • SIREAD lock acquisition: <100ns (p99)
  • Conflict detection: <500ns (p99)
  • Cycle detection: <1ms (p99)
  • Serializable commit: <10ms (p99)

8. Performance Targets

8.1 Throughput Targets

TPC-C Benchmark:

  • Current (Snapshot Isolation): ~2,000 tpmC
  • Target (SSI): >5,000 tpmC
  • Acceptable: >4,000 tpmC (2x current)

Overhead Analysis:

  • SIREAD lock overhead: <5%
  • Conflict detection overhead: <3%
  • Cycle detection overhead: <2%
  • Total SSI overhead: <10%

8.2 Latency Targets

Operation Latencies (p99):

  • BEGIN TRANSACTION: <100us
  • READ (with SIREAD lock): <500us
  • WRITE (with conflict check): <1ms
  • COMMIT (with validation): <10ms
  • ABORT: <1ms

Validation Latency:

  • No conflicts: <100us
  • With conflicts (no cycle): <500us
  • Cycle detection: <1ms

8.3 Scalability Targets

Concurrent Transactions:

  • Support: 10,000+ concurrent transactions
  • Memory: <1MB per 1,000 transactions
  • CPU: Linear scaling up to 64 cores

Conflict Graph Size:

  • Nodes: 10,000+ active transactions
  • Edges: 100,000+ rw-dependencies
  • Cycle detection: O(V + E) with early termination

8.4 Comparison with PostgreSQL SSI

PostgreSQL 16 SSI Performance:

  • TPC-C: ~8,000 tpmC (single node)
  • Validation overhead: <15%
  • False positive abort rate: <5%

HeliosDB SSI Target:

  • TPC-C: >5,000 tpmC (64% of PostgreSQL)
  • Validation overhead: <10% (better than PostgreSQL)
  • False positive abort rate: <3% (better than PostgreSQL)

Rationale: HeliosDB uses more aggressive optimizations (lock-free SIREAD locks, bloom filters for hot keys).


9. Appendix

9.1 References

Academic Papers:

Implementation References:

9.2 Glossary

  • SSI: Serializable Snapshot Isolation
  • SIREAD lock: Serializable Read lock (lightweight, non-blocking)
  • rw-antidependency: Read-write antidependency (T1 reads X, T2 writes X)
  • ww-dependency: Write-write dependency (T1 writes X, T2 writes X)
  • Dangerous structure: Cycle in dependency graph containing rw-antidependencies
  • Write skew: Serialization anomaly where two transactions read overlapping sets and write disjoint sets
  • Predicate lock: Lock on a range of keys (for phantom read prevention)

9.3 Code Structure

Final directory structure:

heliosdb-ssi/
├── Cargo.toml
├── src/
│ ├── lib.rs
│ ├── ssi_lock_manager.rs (1,200 LOC)
│ ├── conflict_detector.rs (800 LOC)
│ ├── write_skew_validator.rs (600 LOC)
│ ├── ssi_types.rs (300 LOC)
│ └── error.rs (100 LOC)
├── tests/
│ ├── unit/
│ │ ├── ssi_lock_manager_tests.rs
│ │ ├── conflict_detector_tests.rs
│ │ └── write_skew_validator_tests.rs
│ ├── integration/
│ │ ├── write_skew_tests.rs
│ │ ├── phantom_read_tests.rs
│ │ └── distributed_ssi_tests.rs
│ └── benchmarks/
│ ├── lock_benchmarks.rs
│ ├── conflict_benchmarks.rs
│ └── validation_benchmarks.rs
└── benches/
└── ssi_benchmarks.rs

Total LOC: ~4,500 LOC (implementation) + 1,500 LOC (tests) = 6,000 LOC


Document Control

Version History:

  • v1.0 (2025-11-28): Initial architecture specification

Approval:

  • Architecture Review: PENDING
  • Engineering Lead: PENDING
  • Product Owner: PENDING

Next Steps:

  1. Architecture review with 3-engineer team (Week 0)
  2. Begin Week 1 implementation (foundation & design)
  3. Weekly progress reviews
  4. Mid-point review (Week 8)
  5. Final acceptance testing (Week 13-15)

END OF DOCUMENT