Skip to content

SSI Implementation - Rust Module Templates

SSI Implementation - Rust Module Templates

Document Version: 1.0 Created: November 28, 2025 Status: READY FOR IMPLEMENTATION Purpose: Concrete Rust code templates for immediate implementation


Overview

This document provides implementable Rust code templates for all 4 SSI modules. Engineers can copy these templates and fill in the implementation logic following the TODOs.

Module Structure:

heliosdb-ssi/
├── Cargo.toml
├── src/
│ ├── lib.rs
│ ├── ssi_lock_manager.rs (Module 1)
│ ├── conflict_detector.rs (Module 2)
│ ├── write_skew_validator.rs (Module 3)
│ └── ssi_types.rs (Module 4)

Module 1: ssi_lock_manager.rs (1,200 LOC)

//! SIREAD Lock Manager for Serializable Snapshot Isolation
//!
//! This module manages SIREAD locks (Serializable Read locks) that are used
//! to detect read-write conflicts in serializable transactions.
//!
//! Key features:
//! - Non-blocking SIREAD lock acquisition
//! - Predicate locks for range queries (phantom read prevention)
//! - Efficient lock lookup and conflict detection
//! - Lock-free concurrent operations using DashMap
use bytes::Bytes;
use dashmap::DashMap;
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::ssi_types::{TransactionId, Timestamp, PredicateLockId, Result, SsiError};
// ============================================================================
// Data Structures
// ============================================================================
/// Predicate for range-based SIREAD locks
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Predicate {
/// Range lock: start_key <= key < end_key
Range {
start_key: Bytes,
end_key: Bytes,
},
/// Table scan: all keys in table
TableScan {
table_name: String,
},
/// Index scan with condition
IndexScan {
index_name: String,
condition: String,
},
}
impl Predicate {
/// Check if a key falls within this predicate
pub fn matches_key(&self, key: &Bytes) -> bool {
match self {
Predicate::Range { start_key, end_key } => {
key >= start_key && key < end_key
}
Predicate::TableScan { .. } => {
// TODO: Implement table name extraction from key
// For now, match all keys
true
}
Predicate::IndexScan { .. } => {
// TODO: Implement condition evaluation
false
}
}
}
}
/// Predicate lock metadata
#[derive(Debug, Clone)]
pub struct PredicateLock {
pub id: PredicateLockId,
pub txn_id: TransactionId,
pub predicate: Predicate,
pub created_at: Timestamp,
}
/// SIREAD lock manager metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SireadMetrics {
pub total_locks_acquired: u64,
pub total_locks_released: u64,
pub active_key_locks: usize,
pub active_predicate_locks: usize,
pub conflicts_detected: u64,
}
// ============================================================================
// SIREAD Lock Manager
// ============================================================================
/// Manages SIREAD locks for SSI conflict detection
pub struct SireadLockManager {
/// Map: Key -> Set of transactions holding SIREAD locks
/// Uses DashMap for lock-free concurrent access
key_locks: Arc<DashMap<Bytes, HashSet<TransactionId>>>,
/// Map: Transaction -> Set of keys it has SIREAD locked
txn_locks: Arc<DashMap<TransactionId, HashSet<Bytes>>>,
/// Predicate locks (protected by RwLock for now)
/// TODO: Optimize with interval tree for O(log n) overlap detection
predicate_locks: Arc<RwLock<Vec<PredicateLock>>>,
/// Next predicate lock ID
next_predicate_id: AtomicU64,
/// Metrics
metrics: Arc<RwLock<SireadMetrics>>,
}
impl SireadLockManager {
/// Create new SIREAD lock manager
pub fn new() -> Self {
Self {
key_locks: Arc::new(DashMap::new()),
txn_locks: Arc::new(DashMap::new()),
predicate_locks: Arc::new(RwLock::new(Vec::new())),
next_predicate_id: AtomicU64::new(1),
metrics: Arc::new(RwLock::new(SireadMetrics::default())),
}
}
/// Acquire SIREAD lock on a specific key
///
/// SIREAD locks are non-blocking and used only for conflict detection.
/// Multiple transactions can hold SIREAD locks on the same key.
pub fn acquire_siread_lock(&self, txn_id: TransactionId, key: &Bytes) -> Result<()> {
// 1. Add to key_locks: key -> {txn_id}
self.key_locks
.entry(key.clone())
.or_insert_with(HashSet::new)
.insert(txn_id);
// 2. Add to txn_locks: txn_id -> {key}
self.txn_locks
.entry(txn_id)
.or_insert_with(HashSet::new)
.insert(key.clone());
// 3. Update metrics
let mut metrics = self.metrics.write();
metrics.total_locks_acquired += 1;
metrics.active_key_locks = self.key_locks.len();
Ok(())
}
/// Acquire predicate lock for range query
///
/// Used to prevent phantom reads in SERIALIZABLE transactions.
pub fn acquire_predicate_lock(
&self,
txn_id: TransactionId,
predicate: Predicate,
) -> Result<PredicateLockId> {
let predicate_id = self.next_predicate_id.fetch_add(1, Ordering::SeqCst);
let lock = PredicateLock {
id: predicate_id,
txn_id,
predicate,
created_at: Self::current_timestamp(),
};
// Add to predicate_locks
self.predicate_locks.write().push(lock);
// Update metrics
let mut metrics = self.metrics.write();
metrics.total_locks_acquired += 1;
metrics.active_predicate_locks = self.predicate_locks.read().len();
Ok(predicate_id)
}
/// Get all transactions with SIREAD locks on a key
///
/// Used by conflict detector to find rw-antidependencies.
pub fn get_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId> {
self.key_locks
.get(key)
.map(|entry| entry.value().iter().copied().collect())
.unwrap_or_default()
}
/// Get all transactions with predicate locks overlapping a key
///
/// Checks if any predicate lock matches the given key.
pub fn get_predicate_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId> {
let predicate_locks = self.predicate_locks.read();
predicate_locks
.iter()
.filter(|lock| lock.predicate.matches_key(key))
.map(|lock| lock.txn_id)
.collect()
}
/// Release all SIREAD locks for a transaction
///
/// Called when transaction commits or aborts.
pub fn release_locks(&self, txn_id: TransactionId) -> Result<()> {
// 1. Get all keys locked by this transaction
let keys = self.txn_locks.remove(&txn_id).map(|(_, keys)| keys);
if let Some(keys) = keys {
// 2. Remove transaction from each key's lock set
for key in keys {
self.key_locks.entry(key.clone()).and_modify(|txn_set| {
txn_set.remove(&txn_id);
});
// 3. Clean up empty entries
if let Some(entry) = self.key_locks.get(&key) {
if entry.is_empty() {
drop(entry);
self.key_locks.remove(&key);
}
}
}
}
// 4. Remove predicate locks for this transaction
self.predicate_locks.write().retain(|lock| lock.txn_id != txn_id);
// 5. Update metrics
let mut metrics = self.metrics.write();
metrics.total_locks_released += 1;
metrics.active_key_locks = self.key_locks.len();
metrics.active_predicate_locks = self.predicate_locks.read().len();
Ok(())
}
/// Get current metrics
pub fn get_metrics(&self) -> SireadMetrics {
self.metrics.read().clone()
}
/// Get current timestamp
fn current_timestamp() -> Timestamp {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as Timestamp
}
}
impl Default for SireadLockManager {
fn default() -> Self {
Self::new()
}
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_acquire_siread_lock() {
let lock_manager = SireadLockManager::new();
let key = Bytes::from("test_key");
// Acquire lock
lock_manager.acquire_siread_lock(1, &key).unwrap();
// Verify lock registered
let locks = lock_manager.get_locks_on_key(&key);
assert_eq!(locks, vec![1]);
}
#[test]
fn test_multiple_transactions_same_key() {
let lock_manager = SireadLockManager::new();
let key = Bytes::from("test_key");
// Multiple transactions can hold SIREAD locks
lock_manager.acquire_siread_lock(1, &key).unwrap();
lock_manager.acquire_siread_lock(2, &key).unwrap();
lock_manager.acquire_siread_lock(3, &key).unwrap();
let mut locks = lock_manager.get_locks_on_key(&key);
locks.sort();
assert_eq!(locks, vec![1, 2, 3]);
}
#[test]
fn test_release_locks() {
let lock_manager = SireadLockManager::new();
let key1 = Bytes::from("key1");
let key2 = Bytes::from("key2");
// Acquire locks
lock_manager.acquire_siread_lock(1, &key1).unwrap();
lock_manager.acquire_siread_lock(1, &key2).unwrap();
// Release all locks for txn 1
lock_manager.release_locks(1).unwrap();
// Verify locks removed
assert!(lock_manager.get_locks_on_key(&key1).is_empty());
assert!(lock_manager.get_locks_on_key(&key2).is_empty());
}
#[test]
fn test_predicate_lock_range() {
let lock_manager = SireadLockManager::new();
let predicate = Predicate::Range {
start_key: Bytes::from("10"),
end_key: Bytes::from("20"),
};
lock_manager.acquire_predicate_lock(1, predicate).unwrap();
// Key 15 is in range
let locks = lock_manager.get_predicate_locks_on_key(&Bytes::from("15"));
assert_eq!(locks, vec![1]);
// Key 25 is NOT in range
let locks = lock_manager.get_predicate_locks_on_key(&Bytes::from("25"));
assert!(locks.is_empty());
}
#[test]
fn test_concurrent_lock_acquisition() {
use std::sync::Arc;
let lock_manager = Arc::new(SireadLockManager::new());
let key = Bytes::from("test_key");
// Spawn 100 threads acquiring locks
let handles: Vec<_> = (0..100)
.map(|i| {
let lm = Arc::clone(&lock_manager);
let k = key.clone();
std::thread::spawn(move || {
lm.acquire_siread_lock(i, &k).unwrap();
})
})
.collect();
// Wait for all threads
for handle in handles {
handle.join().unwrap();
}
// Verify all 100 locks acquired
let locks = lock_manager.get_locks_on_key(&key);
assert_eq!(locks.len(), 100);
}
// TODO: Add more tests
// - Predicate lock overlap detection
// - Metrics tracking
// - Edge cases (empty keys, large transaction sets)
}

Module 2: conflict_detector.rs (800 LOC)

//! Conflict Detector for SSI
//!
//! Detects and tracks rw-antidependencies (read-write conflicts) between
//! serializable transactions.
use bytes::Bytes;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::ssi_lock_manager::SireadLockManager;
use crate::ssi_types::{TransactionId, Timestamp, IsolationLevel, TransactionState, Result, SsiError};
// ============================================================================
// Data Structures
// ============================================================================
/// Transaction metadata for conflict tracking
#[derive(Debug, Clone)]
pub struct TransactionMetadata {
pub txn_id: TransactionId,
pub snapshot_ts: Timestamp,
pub isolation_level: IsolationLevel,
pub state: TransactionState,
/// Transactions we have rw-conflicts with (we read, they wrote)
/// This is the "in" part of the conflict graph
pub in_conflict: HashSet<TransactionId>,
/// Transactions that have rw-conflicts with us (they read, we wrote)
/// This is the "out" part of the conflict graph
pub out_conflict: HashSet<TransactionId>,
/// Timestamp when transaction was created
pub created_at: Timestamp,
}
/// Conflict between two transactions
#[derive(Debug, Clone)]
pub struct Conflict {
pub reader_txn_id: TransactionId,
pub writer_txn_id: TransactionId,
pub key: Bytes,
pub conflict_type: ConflictType,
pub detected_at: Timestamp,
}
/// Type of conflict
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictType {
ReadWrite, // rw-antidependency
WriteWrite, // ww-dependency
}
/// Conflict information for a transaction
#[derive(Debug, Clone)]
pub struct ConflictInfo {
pub in_conflict: HashSet<TransactionId>,
pub out_conflict: HashSet<TransactionId>,
}
/// Conflict detector metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ConflictMetrics {
pub total_rw_conflicts: u64,
pub total_ww_conflicts: u64,
pub active_transactions: usize,
pub avg_conflicts_per_txn: f64,
}
// ============================================================================
// Conflict Detector
// ============================================================================
/// Detects and tracks conflicts between serializable transactions
pub struct ConflictDetector {
/// All active transactions
/// Fine-grained locking: Lock per transaction, not global
transactions: Arc<RwLock<HashMap<TransactionId, Arc<RwLock<TransactionMetadata>>>>>,
/// SIREAD lock manager (dependency)
siread_lock_manager: Arc<SireadLockManager>,
/// Metrics
metrics: Arc<RwLock<ConflictMetrics>>,
}
impl ConflictDetector {
/// Create new conflict detector
pub fn new(siread_lock_manager: Arc<SireadLockManager>) -> Self {
Self {
transactions: Arc::new(RwLock::new(HashMap::new())),
siread_lock_manager,
metrics: Arc::new(RwLock::new(ConflictMetrics::default())),
}
}
/// Register a new transaction
pub fn register_transaction(
&self,
txn_id: TransactionId,
snapshot_ts: Timestamp,
isolation_level: IsolationLevel,
) -> Result<()> {
let metadata = TransactionMetadata {
txn_id,
snapshot_ts,
isolation_level,
state: TransactionState::Active,
in_conflict: HashSet::new(),
out_conflict: HashSet::new(),
created_at: Self::current_timestamp(),
};
self.transactions
.write()
.insert(txn_id, Arc::new(RwLock::new(metadata)));
// Update metrics
let mut metrics = self.metrics.write();
metrics.active_transactions = self.transactions.read().len();
Ok(())
}
/// Check for SIREAD conflicts when writing a key
///
/// Returns list of conflicts detected (reader_txn -> writer_txn).
pub fn check_write_conflicts(
&self,
writer_txn_id: TransactionId,
key: &Bytes,
) -> Result<Vec<Conflict>> {
let mut conflicts = Vec::new();
// 1. Get all transactions with SIREAD locks on this key
let reader_txns = self.siread_lock_manager.get_locks_on_key(key);
// 2. Get all transactions with predicate locks overlapping this key
let predicate_reader_txns = self.siread_lock_manager.get_predicate_locks_on_key(key);
// 3. Combine both types of locks
let all_readers: HashSet<TransactionId> = reader_txns
.into_iter()
.chain(predicate_reader_txns)
.collect();
// 4. For each reader, register rw-conflict
for reader_txn_id in all_readers {
if reader_txn_id != writer_txn_id {
conflicts.push(Conflict {
reader_txn_id,
writer_txn_id,
key: key.clone(),
conflict_type: ConflictType::ReadWrite,
detected_at: Self::current_timestamp(),
});
}
}
Ok(conflicts)
}
/// Register a rw-antidependency between transactions
///
/// reader_txn --rw--> writer_txn
pub fn register_rw_conflict(
&self,
reader_txn_id: TransactionId,
writer_txn_id: TransactionId,
) -> Result<()> {
let transactions = self.transactions.read();
// 1. Update reader's out_conflict
if let Some(reader_meta) = transactions.get(&reader_txn_id) {
reader_meta.write().out_conflict.insert(writer_txn_id);
}
// 2. Update writer's in_conflict
if let Some(writer_meta) = transactions.get(&writer_txn_id) {
writer_meta.write().in_conflict.insert(reader_txn_id);
}
// 3. Update metrics
let mut metrics = self.metrics.write();
metrics.total_rw_conflicts += 1;
Ok(())
}
/// Get conflict info for a transaction
pub fn get_conflicts(&self, txn_id: TransactionId) -> ConflictInfo {
let transactions = self.transactions.read();
if let Some(meta) = transactions.get(&txn_id) {
let meta_read = meta.read();
ConflictInfo {
in_conflict: meta_read.in_conflict.clone(),
out_conflict: meta_read.out_conflict.clone(),
}
} else {
ConflictInfo {
in_conflict: HashSet::new(),
out_conflict: HashSet::new(),
}
}
}
/// Remove transaction from conflict graph
///
/// Called when transaction commits or aborts.
pub fn remove_transaction(&self, txn_id: TransactionId) -> Result<()> {
let mut transactions = self.transactions.write();
// 1. Remove from transactions map
if let Some(meta) = transactions.remove(&txn_id) {
let meta_read = meta.read();
// 2. Remove from other transactions' conflict sets
for other_txn_id in &meta_read.in_conflict {
if let Some(other_meta) = transactions.get(other_txn_id) {
other_meta.write().out_conflict.remove(&txn_id);
}
}
for other_txn_id in &meta_read.out_conflict {
if let Some(other_meta) = transactions.get(other_txn_id) {
other_meta.write().in_conflict.remove(&txn_id);
}
}
}
// 3. Update metrics
let mut metrics = self.metrics.write();
metrics.active_transactions = transactions.len();
Ok(())
}
/// Get current metrics
pub fn get_metrics(&self) -> ConflictMetrics {
let mut metrics = self.metrics.read().clone();
// Calculate average conflicts per transaction
let transactions = self.transactions.read();
if !transactions.is_empty() {
let total_conflicts: usize = transactions
.values()
.map(|meta| meta.read().out_conflict.len())
.sum();
metrics.avg_conflicts_per_txn = total_conflicts as f64 / transactions.len() as f64;
}
metrics
}
/// Get current timestamp
fn current_timestamp() -> Timestamp {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_micros() as Timestamp
}
/// Get transaction metadata (internal use)
pub(crate) fn get_transaction_metadata(&self, txn_id: TransactionId) -> Option<Arc<RwLock<TransactionMetadata>>> {
self.transactions.read().get(&txn_id).cloned()
}
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
fn setup_detector() -> (Arc<SireadLockManager>, ConflictDetector) {
let lock_manager = Arc::new(SireadLockManager::new());
let detector = ConflictDetector::new(Arc::clone(&lock_manager));
(lock_manager, detector)
}
#[test]
fn test_register_transaction() {
let (_, detector) = setup_detector();
detector
.register_transaction(1, 100, IsolationLevel::Serializable)
.unwrap();
let conflicts = detector.get_conflicts(1);
assert!(conflicts.in_conflict.is_empty());
assert!(conflicts.out_conflict.is_empty());
}
#[test]
fn test_register_rw_conflict() {
let (_, detector) = setup_detector();
// Register transactions
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
// Register conflict: T1 --rw--> T2
detector.register_rw_conflict(1, 2).unwrap();
// Verify T1's out_conflict contains T2
let t1_conflicts = detector.get_conflicts(1);
assert!(t1_conflicts.out_conflict.contains(&2));
// Verify T2's in_conflict contains T1
let t2_conflicts = detector.get_conflicts(2);
assert!(t2_conflicts.in_conflict.contains(&1));
}
#[test]
fn test_check_write_conflicts() {
let (lock_manager, detector) = setup_detector();
let key = Bytes::from("test_key");
// T1 reads key (acquires SIREAD lock)
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
lock_manager.acquire_siread_lock(1, &key).unwrap();
// T2 writes key
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
let conflicts = detector.check_write_conflicts(2, &key).unwrap();
// Should detect rw-conflict
assert_eq!(conflicts.len(), 1);
assert_eq!(conflicts[0].reader_txn_id, 1);
assert_eq!(conflicts[0].writer_txn_id, 2);
assert_eq!(conflicts[0].conflict_type, ConflictType::ReadWrite);
}
#[test]
fn test_remove_transaction() {
let (_, detector) = setup_detector();
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap();
// Remove T1
detector.remove_transaction(1).unwrap();
// T2's in_conflict should be empty
let t2_conflicts = detector.get_conflicts(2);
assert!(t2_conflicts.in_conflict.is_empty());
}
// TODO: Add more tests
// - Multiple conflicts
// - Concurrent conflict registration
// - Metrics tracking
}

Module 3: write_skew_validator.rs (600 LOC)

//! Write Skew Validator for SSI
//!
//! Detects dangerous structures (cycles in dependency graph) that would
//! violate serializability.
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::conflict_detector::{ConflictDetector, ConflictInfo};
use crate::ssi_types::{TransactionId, Result};
// ============================================================================
// Data Structures
// ============================================================================
/// Validator metrics
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ValidatorMetrics {
pub validations_performed: u64,
pub cycles_detected: u64,
pub avg_validation_time_us: f64,
pub max_cycle_length: usize,
}
// ============================================================================
// Write Skew Validator
// ============================================================================
/// Validates transactions for dangerous structures (serialization cycles)
pub struct WriteSkewValidator {
/// Conflict detector (dependency)
conflict_detector: Arc<ConflictDetector>,
/// Metrics
metrics: Arc<RwLock<ValidatorMetrics>>,
}
impl WriteSkewValidator {
/// Create new validator
pub fn new(conflict_detector: Arc<ConflictDetector>) -> Self {
Self {
conflict_detector,
metrics: Arc::new(RwLock::new(ValidatorMetrics::default())),
}
}
/// Check if transaction is in a dangerous structure
///
/// A dangerous structure exists when:
/// 1. Transaction has both in_conflict and out_conflict (not empty)
/// 2. There exists a path from out_conflict back to in_conflict (cycle)
///
/// Returns true if cycle detected (transaction should abort).
pub fn has_dangerous_structure(&self, txn_id: TransactionId) -> Result<bool> {
let start_time = std::time::Instant::now();
// Get conflict info for this transaction
let conflicts = self.conflict_detector.get_conflicts(txn_id);
// Fast path: No conflicts = no cycle possible
if conflicts.in_conflict.is_empty() || conflicts.out_conflict.is_empty() {
self.update_metrics(start_time, false, 0);
return Ok(false);
}
// Slow path: BFS to detect cycle
for out_txn in &conflicts.out_conflict {
if let Some(cycle) = self.detect_cycle(*out_txn, &conflicts.in_conflict) {
self.update_metrics(start_time, true, cycle.len());
return Ok(true); // Cycle found!
}
}
self.update_metrics(start_time, false, 0);
Ok(false)
}
/// Detect cycle from start transaction to any target transaction
///
/// Uses BFS to find if there's a path from start to any transaction in targets.
/// Returns Some(path) if cycle found, None otherwise.
pub fn detect_cycle(
&self,
start: TransactionId,
targets: &HashSet<TransactionId>,
) -> Option<Vec<TransactionId>> {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
let mut parent: std::collections::HashMap<TransactionId, TransactionId> =
std::collections::HashMap::new();
queue.push_back(start);
while let Some(current) = queue.pop_front() {
// Found target!
if targets.contains(&current) {
// Reconstruct path
let mut path = vec![current];
let mut node = current;
while let Some(&prev) = parent.get(&node) {
path.push(prev);
node = prev;
}
path.reverse();
return Some(path);
}
if visited.contains(&current) {
continue;
}
visited.insert(current);
// Get current transaction's out_conflict
let current_conflicts = self.conflict_detector.get_conflicts(current);
for &next in &current_conflicts.out_conflict {
if !visited.contains(&next) {
parent.insert(next, current);
queue.push_back(next);
}
}
}
None // No cycle found
}
/// Choose victim transaction to abort when cycle detected
///
/// Simple heuristic: Abort youngest transaction (highest txn_id).
/// This reduces cascading aborts.
pub fn choose_victim(&self, cycle: &[TransactionId]) -> TransactionId {
// TODO: Implement smarter victim selection strategies:
// - Youngest transaction (current)
// - Transaction with least work done
// - Transaction with most conflicts
// - Random selection
*cycle.iter().max().unwrap()
}
/// Get current metrics
pub fn get_metrics(&self) -> ValidatorMetrics {
self.metrics.read().clone()
}
/// Update metrics after validation
fn update_metrics(&self, start_time: std::time::Instant, cycle_detected: bool, cycle_length: usize) {
let mut metrics = self.metrics.write();
metrics.validations_performed += 1;
if cycle_detected {
metrics.cycles_detected += 1;
if cycle_length > metrics.max_cycle_length {
metrics.max_cycle_length = cycle_length;
}
}
let elapsed_us = start_time.elapsed().as_micros() as f64;
metrics.avg_validation_time_us = (metrics.avg_validation_time_us
* (metrics.validations_performed - 1) as f64
+ elapsed_us)
/ metrics.validations_performed as f64;
}
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
use crate::ssi_lock_manager::SireadLockManager;
use crate::ssi_types::IsolationLevel;
fn setup_validator() -> (Arc<ConflictDetector>, WriteSkewValidator) {
let lock_manager = Arc::new(SireadLockManager::new());
let detector = Arc::new(ConflictDetector::new(lock_manager));
let validator = WriteSkewValidator::new(Arc::clone(&detector));
(detector, validator)
}
#[test]
fn test_no_cycle_no_conflicts() {
let (detector, validator) = setup_validator();
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
// No conflicts = no cycle
let has_cycle = validator.has_dangerous_structure(1).unwrap();
assert!(!has_cycle);
}
#[test]
fn test_no_cycle_linear_chain() {
let (detector, validator) = setup_validator();
// T1 --rw--> T2 (no cycle)
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap();
// T1 has out_conflict but no in_conflict = no cycle
let has_cycle = validator.has_dangerous_structure(1).unwrap();
assert!(!has_cycle);
}
#[test]
fn test_cycle_detection() {
let (detector, validator) = setup_validator();
// Create cycle: T1 --rw--> T2 --rw--> T3 --rw--> T1
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
detector.register_transaction(3, 102, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap(); // T1 --rw--> T2
detector.register_rw_conflict(2, 3).unwrap(); // T2 --rw--> T3
detector.register_rw_conflict(3, 1).unwrap(); // T3 --rw--> T1
// All transactions are in a cycle
assert!(validator.has_dangerous_structure(1).unwrap());
assert!(validator.has_dangerous_structure(2).unwrap());
assert!(validator.has_dangerous_structure(3).unwrap());
}
#[test]
fn test_detect_cycle_bfs() {
let (detector, validator) = setup_validator();
// T1 -> T2 -> T3
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
detector.register_transaction(3, 102, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap();
detector.register_rw_conflict(2, 3).unwrap();
let targets = vec![3].into_iter().collect();
let cycle = validator.detect_cycle(1, &targets);
assert!(cycle.is_some());
assert_eq!(cycle.unwrap(), vec![1, 2, 3]);
}
#[test]
fn test_victim_selection() {
let (_, validator) = setup_validator();
let cycle = vec![10, 20, 30, 15];
// Youngest transaction (highest ID)
let victim = validator.choose_victim(&cycle);
assert_eq!(victim, 30);
}
// TODO: Add more tests
// - Complex graphs with multiple cycles
// - Performance tests (large graphs)
// - Metrics validation
}

Module 4: ssi_types.rs (300 LOC)

//! Shared types for SSI implementation
use serde::{Deserialize, Serialize};
use std::fmt;
// ============================================================================
// Type Aliases
// ============================================================================
/// Transaction identifier
pub type TransactionId = u64;
/// Timestamp (microseconds since UNIX epoch)
pub type Timestamp = u64;
/// Predicate lock identifier
pub type PredicateLockId = u64;
// ============================================================================
// Enums
// ============================================================================
/// Transaction isolation level
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
RepeatableRead,
Serializable,
}
/// Transaction state
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransactionState {
Active,
Preparing,
Prepared,
Committing,
Committed,
Aborted,
}
// ============================================================================
// Error Types
// ============================================================================
/// SSI-specific errors
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SsiError {
/// Serialization failure (SQLSTATE 40001)
SerializationFailure(String),
/// Deadlock detected (SQLSTATE 40P01)
DeadlockDetected(TransactionId),
/// Lock timeout
LockTimeout(TransactionId),
/// Transaction not found
TransactionNotFound(TransactionId),
/// Invalid state
InvalidState(String),
/// Internal error
InternalError(String),
}
impl fmt::Display for SsiError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SsiError::SerializationFailure(msg) => {
write!(f, "Serialization failure: {}", msg)
}
SsiError::DeadlockDetected(txn_id) => {
write!(f, "Deadlock detected for transaction {}", txn_id)
}
SsiError::LockTimeout(txn_id) => {
write!(f, "Lock timeout for transaction {}", txn_id)
}
SsiError::TransactionNotFound(txn_id) => {
write!(f, "Transaction {} not found", txn_id)
}
SsiError::InvalidState(msg) => {
write!(f, "Invalid state: {}", msg)
}
SsiError::InternalError(msg) => {
write!(f, "Internal error: {}", msg)
}
}
}
}
impl std::error::Error for SsiError {}
/// Result type for SSI operations
pub type Result<T> = std::result::Result<T, SsiError>;
// ============================================================================
// Conversion Implementations
// ============================================================================
impl From<SsiError> for String {
fn from(err: SsiError) -> String {
err.to_string()
}
}
// TODO: Implement conversions to database-specific error types
// impl From<SsiError> for PgError { ... }
// impl From<SsiError> for MySqlError { ... }

Module 5: lib.rs (200 LOC)

//! HeliosDB SSI (Serializable Snapshot Isolation) Implementation
//!
//! This crate provides the SSI layer for HeliosDB, implementing true
//! serializability on top of snapshot isolation using the PostgreSQL
//! SSI algorithm.
//!
//! # Architecture
//!
//! ```text
//! ┌─────────────────────────────────────────────────────────────┐
//! │ Transaction Coordinator │
//! └─────────────────────┬───────────────────────────────────────┘
//! │
//! ↓
//! ┌─────────────────────────────────────────────────────────────┐
//! │ SSI Layer │
//! ├─────────────────────────────────────────────────────────────┤
//! │ - SireadLockManager: SIREAD lock acquisition │
//! │ - ConflictDetector: rw-conflict tracking │
//! │ - WriteSkewValidator: Cycle detection │
//! └─────────────────────┬───────────────────────────────────────┘
//! │
//! ↓
//! ┌─────────────────────────────────────────────────────────────┐
//! │ MVCC Storage Layer │
//! └─────────────────────────────────────────────────────────────┘
//! ```
//!
//! # Usage
//!
//! ```rust
//! use heliosdb_ssi::{SireadLockManager, ConflictDetector, WriteSkewValidator};
//! use std::sync::Arc;
//!
//! // Initialize SSI components
//! let lock_manager = Arc::new(SireadLockManager::new());
//! let conflict_detector = Arc::new(ConflictDetector::new(Arc::clone(&lock_manager)));
//! let validator = WriteSkewValidator::new(Arc::clone(&conflict_detector));
//!
//! // Use in transaction coordinator
//! // (see SSI_SERIALIZABLE_SNAPSHOT_ISOLATION_ARCHITECTURE.md for details)
//! ```
// Public modules
pub mod ssi_lock_manager;
pub mod conflict_detector;
pub mod write_skew_validator;
pub mod ssi_types;
// Re-export main types
pub use ssi_lock_manager::{SireadLockManager, Predicate, SireadMetrics};
pub use conflict_detector::{ConflictDetector, Conflict, ConflictType, ConflictInfo};
pub use write_skew_validator::{WriteSkewValidator, ValidatorMetrics};
pub use ssi_types::{TransactionId, Timestamp, IsolationLevel, TransactionState, SsiError, Result};
// Version information
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const SSI_ALGORITHM_VERSION: &str = "PostgreSQL-SSI-1.0";
/// Initialize SSI subsystem
///
/// Returns configured SSI components ready for use.
pub fn initialize() -> (
Arc<SireadLockManager>,
Arc<ConflictDetector>,
WriteSkewValidator,
) {
let lock_manager = Arc::new(SireadLockManager::new());
let conflict_detector = Arc::new(ConflictDetector::new(Arc::clone(&lock_manager)));
let validator = WriteSkewValidator::new(Arc::clone(&conflict_detector));
(lock_manager, conflict_detector, validator)
}
#[cfg(test)]
mod integration_tests {
use super::*;
use bytes::Bytes;
#[test]
fn test_ssi_initialization() {
let (lock_manager, conflict_detector, validator) = initialize();
// Verify components initialized
assert_eq!(lock_manager.get_metrics().active_key_locks, 0);
assert_eq!(conflict_detector.get_metrics().active_transactions, 0);
assert_eq!(validator.get_metrics().validations_performed, 0);
}
// TODO: Add end-to-end integration tests
// - Full transaction lifecycle with SSI
// - Write skew prevention
// - Phantom read prevention
// - Distributed SSI (2PC integration)
}

Cargo.toml

[package]
name = "heliosdb-ssi"
version = "0.1.0"
edition = "2021"
authors = ["HeliosDB Team"]
description = "Serializable Snapshot Isolation implementation for HeliosDB"
license = "Apache-2.0"
[dependencies]
# Concurrency
dashmap = "5.5"
parking_lot = "0.12"
# Data structures
bytes = "1.5"
# Serialization
serde = { version = "1.0", features = ["derive"] }
# Async runtime (if needed)
tokio = { version = "1.35", features = ["sync"], optional = true }
[dev-dependencies]
# Testing
tokio = { version = "1.35", features = ["full"] }
criterion = "0.5"
[features]
default = []
async = ["tokio"]
[[bench]]
name = "ssi_benchmarks"
harness = false

README.md for heliosdb-ssi

# HeliosDB SSI (Serializable Snapshot Isolation)
Implementation of Serializable Snapshot Isolation for HeliosDB, based on the PostgreSQL SSI algorithm.
## Features
- Non-blocking SIREAD locks
- Predicate locks for phantom read prevention
- Efficient rw-conflict detection
- Cycle detection for dangerous structures
- Production-ready performance (<10% overhead)
## Architecture
See [SSI_SERIALIZABLE_SNAPSHOT_ISOLATION_ARCHITECTURE.md](../../docs/architecture/SSI_SERIALIZABLE_SNAPSHOT_ISOLATION_ARCHITECTURE.md) for complete architecture.
## Module Overview
- **ssi_lock_manager**: SIREAD lock acquisition and management
- **conflict_detector**: rw-conflict detection and tracking
- **write_skew_validator**: Dangerous structure (cycle) detection
- **ssi_types**: Shared types and error definitions
## Usage
```rust
use heliosdb_ssi::{initialize, IsolationLevel};
// Initialize SSI
let (lock_manager, conflict_detector, validator) = initialize();
// Integrate with transaction coordinator
// (see architecture document for details)

Testing

Terminal window
# Run all tests
cargo test
# Run benchmarks
cargo bench
# Generate coverage report
cargo tarpaulin --out Html

Performance Targets

  • SIREAD lock acquisition: <100ns (p99)
  • Conflict detection: <500ns (p99)
  • Cycle detection: <1ms (p99)
  • TPC-C throughput: >5,000 tpmC

Implementation Timeline

  • Weeks 1-4: Algorithm design & module specifications
  • Weeks 5-10: Core implementation
  • Weeks 11-13: Integration & testing
  • Weeks 14-15: Production hardening

References

---
## Next Steps for Engineers
1. **Week 1:** Review architecture document, set up module scaffolding
2. **Week 2:** Implement `ssi_lock_manager.rs` (Engineer 1)
3. **Week 3:** Implement `conflict_detector.rs` (Engineer 2)
4. **Week 4:** Implement `write_skew_validator.rs` (Engineer 3)
5. **Week 5+:** Integration testing and production hardening
All TODO comments in the code indicate areas requiring implementation logic.
**END OF MODULE TEMPLATES**