SSI Implementation - Rust Module Templates
SSI Implementation - Rust Module Templates
Document Version: 1.0 Created: November 28, 2025 Status: READY FOR IMPLEMENTATION Purpose: Concrete Rust code templates for immediate implementation
Overview
This document provides implementable Rust code templates for all 4 SSI modules. Engineers can copy these templates and fill in the implementation logic following the TODOs.
Module Structure:
heliosdb-ssi/├── Cargo.toml├── src/│ ├── lib.rs│ ├── ssi_lock_manager.rs (Module 1)│ ├── conflict_detector.rs (Module 2)│ ├── write_skew_validator.rs (Module 3)│ └── ssi_types.rs (Module 4)Module 1: ssi_lock_manager.rs (1,200 LOC)
//! SIREAD Lock Manager for Serializable Snapshot Isolation//!//! This module manages SIREAD locks (Serializable Read locks) that are used//! to detect read-write conflicts in serializable transactions.//!//! Key features://! - Non-blocking SIREAD lock acquisition//! - Predicate locks for range queries (phantom read prevention)//! - Efficient lock lookup and conflict detection//! - Lock-free concurrent operations using DashMap
use bytes::Bytes;use dashmap::DashMap;use std::collections::HashSet;use std::sync::atomic::{AtomicU64, Ordering};use std::sync::Arc;use parking_lot::RwLock;use serde::{Deserialize, Serialize};
use crate::ssi_types::{TransactionId, Timestamp, PredicateLockId, Result, SsiError};
// ============================================================================// Data Structures// ============================================================================
/// Predicate for range-based SIREAD locks#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]pub enum Predicate { /// Range lock: start_key <= key < end_key Range { start_key: Bytes, end_key: Bytes, },
/// Table scan: all keys in table TableScan { table_name: String, },
/// Index scan with condition IndexScan { index_name: String, condition: String, },}
impl Predicate { /// Check if a key falls within this predicate pub fn matches_key(&self, key: &Bytes) -> bool { match self { Predicate::Range { start_key, end_key } => { key >= start_key && key < end_key } Predicate::TableScan { .. } => { // TODO: Implement table name extraction from key // For now, match all keys true } Predicate::IndexScan { .. } => { // TODO: Implement condition evaluation false } } }}
/// Predicate lock metadata#[derive(Debug, Clone)]pub struct PredicateLock { pub id: PredicateLockId, pub txn_id: TransactionId, pub predicate: Predicate, pub created_at: Timestamp,}
/// SIREAD lock manager metrics#[derive(Debug, Clone, Default, Serialize, Deserialize)]pub struct SireadMetrics { pub total_locks_acquired: u64, pub total_locks_released: u64, pub active_key_locks: usize, pub active_predicate_locks: usize, pub conflicts_detected: u64,}
// ============================================================================// SIREAD Lock Manager// ============================================================================
/// Manages SIREAD locks for SSI conflict detectionpub struct SireadLockManager { /// Map: Key -> Set of transactions holding SIREAD locks /// Uses DashMap for lock-free concurrent access key_locks: Arc<DashMap<Bytes, HashSet<TransactionId>>>,
/// Map: Transaction -> Set of keys it has SIREAD locked txn_locks: Arc<DashMap<TransactionId, HashSet<Bytes>>>,
/// Predicate locks (protected by RwLock for now) /// TODO: Optimize with interval tree for O(log n) overlap detection predicate_locks: Arc<RwLock<Vec<PredicateLock>>>,
/// Next predicate lock ID next_predicate_id: AtomicU64,
/// Metrics metrics: Arc<RwLock<SireadMetrics>>,}
impl SireadLockManager { /// Create new SIREAD lock manager pub fn new() -> Self { Self { key_locks: Arc::new(DashMap::new()), txn_locks: Arc::new(DashMap::new()), predicate_locks: Arc::new(RwLock::new(Vec::new())), next_predicate_id: AtomicU64::new(1), metrics: Arc::new(RwLock::new(SireadMetrics::default())), } }
/// Acquire SIREAD lock on a specific key /// /// SIREAD locks are non-blocking and used only for conflict detection. /// Multiple transactions can hold SIREAD locks on the same key. pub fn acquire_siread_lock(&self, txn_id: TransactionId, key: &Bytes) -> Result<()> { // 1. Add to key_locks: key -> {txn_id} self.key_locks .entry(key.clone()) .or_insert_with(HashSet::new) .insert(txn_id);
// 2. Add to txn_locks: txn_id -> {key} self.txn_locks .entry(txn_id) .or_insert_with(HashSet::new) .insert(key.clone());
// 3. Update metrics let mut metrics = self.metrics.write(); metrics.total_locks_acquired += 1; metrics.active_key_locks = self.key_locks.len();
Ok(()) }
/// Acquire predicate lock for range query /// /// Used to prevent phantom reads in SERIALIZABLE transactions. pub fn acquire_predicate_lock( &self, txn_id: TransactionId, predicate: Predicate, ) -> Result<PredicateLockId> { let predicate_id = self.next_predicate_id.fetch_add(1, Ordering::SeqCst);
let lock = PredicateLock { id: predicate_id, txn_id, predicate, created_at: Self::current_timestamp(), };
// Add to predicate_locks self.predicate_locks.write().push(lock);
// Update metrics let mut metrics = self.metrics.write(); metrics.total_locks_acquired += 1; metrics.active_predicate_locks = self.predicate_locks.read().len();
Ok(predicate_id) }
/// Get all transactions with SIREAD locks on a key /// /// Used by conflict detector to find rw-antidependencies. pub fn get_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId> { self.key_locks .get(key) .map(|entry| entry.value().iter().copied().collect()) .unwrap_or_default() }
/// Get all transactions with predicate locks overlapping a key /// /// Checks if any predicate lock matches the given key. pub fn get_predicate_locks_on_key(&self, key: &Bytes) -> Vec<TransactionId> { let predicate_locks = self.predicate_locks.read();
predicate_locks .iter() .filter(|lock| lock.predicate.matches_key(key)) .map(|lock| lock.txn_id) .collect() }
/// Release all SIREAD locks for a transaction /// /// Called when transaction commits or aborts. pub fn release_locks(&self, txn_id: TransactionId) -> Result<()> { // 1. Get all keys locked by this transaction let keys = self.txn_locks.remove(&txn_id).map(|(_, keys)| keys);
if let Some(keys) = keys { // 2. Remove transaction from each key's lock set for key in keys { self.key_locks.entry(key.clone()).and_modify(|txn_set| { txn_set.remove(&txn_id); });
// 3. Clean up empty entries if let Some(entry) = self.key_locks.get(&key) { if entry.is_empty() { drop(entry); self.key_locks.remove(&key); } } } }
// 4. Remove predicate locks for this transaction self.predicate_locks.write().retain(|lock| lock.txn_id != txn_id);
// 5. Update metrics let mut metrics = self.metrics.write(); metrics.total_locks_released += 1; metrics.active_key_locks = self.key_locks.len(); metrics.active_predicate_locks = self.predicate_locks.read().len();
Ok(()) }
/// Get current metrics pub fn get_metrics(&self) -> SireadMetrics { self.metrics.read().clone() }
/// Get current timestamp fn current_timestamp() -> Timestamp { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_micros() as Timestamp }}
impl Default for SireadLockManager { fn default() -> Self { Self::new() }}
// ============================================================================// Tests// ============================================================================
#[cfg(test)]mod tests { use super::*;
#[test] fn test_acquire_siread_lock() { let lock_manager = SireadLockManager::new(); let key = Bytes::from("test_key");
// Acquire lock lock_manager.acquire_siread_lock(1, &key).unwrap();
// Verify lock registered let locks = lock_manager.get_locks_on_key(&key); assert_eq!(locks, vec![1]); }
#[test] fn test_multiple_transactions_same_key() { let lock_manager = SireadLockManager::new(); let key = Bytes::from("test_key");
// Multiple transactions can hold SIREAD locks lock_manager.acquire_siread_lock(1, &key).unwrap(); lock_manager.acquire_siread_lock(2, &key).unwrap(); lock_manager.acquire_siread_lock(3, &key).unwrap();
let mut locks = lock_manager.get_locks_on_key(&key); locks.sort(); assert_eq!(locks, vec![1, 2, 3]); }
#[test] fn test_release_locks() { let lock_manager = SireadLockManager::new(); let key1 = Bytes::from("key1"); let key2 = Bytes::from("key2");
// Acquire locks lock_manager.acquire_siread_lock(1, &key1).unwrap(); lock_manager.acquire_siread_lock(1, &key2).unwrap();
// Release all locks for txn 1 lock_manager.release_locks(1).unwrap();
// Verify locks removed assert!(lock_manager.get_locks_on_key(&key1).is_empty()); assert!(lock_manager.get_locks_on_key(&key2).is_empty()); }
#[test] fn test_predicate_lock_range() { let lock_manager = SireadLockManager::new();
let predicate = Predicate::Range { start_key: Bytes::from("10"), end_key: Bytes::from("20"), };
lock_manager.acquire_predicate_lock(1, predicate).unwrap();
// Key 15 is in range let locks = lock_manager.get_predicate_locks_on_key(&Bytes::from("15")); assert_eq!(locks, vec![1]);
// Key 25 is NOT in range let locks = lock_manager.get_predicate_locks_on_key(&Bytes::from("25")); assert!(locks.is_empty()); }
#[test] fn test_concurrent_lock_acquisition() { use std::sync::Arc;
let lock_manager = Arc::new(SireadLockManager::new()); let key = Bytes::from("test_key");
// Spawn 100 threads acquiring locks let handles: Vec<_> = (0..100) .map(|i| { let lm = Arc::clone(&lock_manager); let k = key.clone(); std::thread::spawn(move || { lm.acquire_siread_lock(i, &k).unwrap(); }) }) .collect();
// Wait for all threads for handle in handles { handle.join().unwrap(); }
// Verify all 100 locks acquired let locks = lock_manager.get_locks_on_key(&key); assert_eq!(locks.len(), 100); }
// TODO: Add more tests // - Predicate lock overlap detection // - Metrics tracking // - Edge cases (empty keys, large transaction sets)}Module 2: conflict_detector.rs (800 LOC)
//! Conflict Detector for SSI//!//! Detects and tracks rw-antidependencies (read-write conflicts) between//! serializable transactions.
use bytes::Bytes;use std::collections::{HashMap, HashSet};use std::sync::Arc;use parking_lot::RwLock;use serde::{Deserialize, Serialize};
use crate::ssi_lock_manager::SireadLockManager;use crate::ssi_types::{TransactionId, Timestamp, IsolationLevel, TransactionState, Result, SsiError};
// ============================================================================// Data Structures// ============================================================================
/// Transaction metadata for conflict tracking#[derive(Debug, Clone)]pub struct TransactionMetadata { pub txn_id: TransactionId, pub snapshot_ts: Timestamp, pub isolation_level: IsolationLevel, pub state: TransactionState,
/// Transactions we have rw-conflicts with (we read, they wrote) /// This is the "in" part of the conflict graph pub in_conflict: HashSet<TransactionId>,
/// Transactions that have rw-conflicts with us (they read, we wrote) /// This is the "out" part of the conflict graph pub out_conflict: HashSet<TransactionId>,
/// Timestamp when transaction was created pub created_at: Timestamp,}
/// Conflict between two transactions#[derive(Debug, Clone)]pub struct Conflict { pub reader_txn_id: TransactionId, pub writer_txn_id: TransactionId, pub key: Bytes, pub conflict_type: ConflictType, pub detected_at: Timestamp,}
/// Type of conflict#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]pub enum ConflictType { ReadWrite, // rw-antidependency WriteWrite, // ww-dependency}
/// Conflict information for a transaction#[derive(Debug, Clone)]pub struct ConflictInfo { pub in_conflict: HashSet<TransactionId>, pub out_conflict: HashSet<TransactionId>,}
/// Conflict detector metrics#[derive(Debug, Clone, Default, Serialize, Deserialize)]pub struct ConflictMetrics { pub total_rw_conflicts: u64, pub total_ww_conflicts: u64, pub active_transactions: usize, pub avg_conflicts_per_txn: f64,}
// ============================================================================// Conflict Detector// ============================================================================
/// Detects and tracks conflicts between serializable transactionspub struct ConflictDetector { /// All active transactions /// Fine-grained locking: Lock per transaction, not global transactions: Arc<RwLock<HashMap<TransactionId, Arc<RwLock<TransactionMetadata>>>>>,
/// SIREAD lock manager (dependency) siread_lock_manager: Arc<SireadLockManager>,
/// Metrics metrics: Arc<RwLock<ConflictMetrics>>,}
impl ConflictDetector { /// Create new conflict detector pub fn new(siread_lock_manager: Arc<SireadLockManager>) -> Self { Self { transactions: Arc::new(RwLock::new(HashMap::new())), siread_lock_manager, metrics: Arc::new(RwLock::new(ConflictMetrics::default())), } }
/// Register a new transaction pub fn register_transaction( &self, txn_id: TransactionId, snapshot_ts: Timestamp, isolation_level: IsolationLevel, ) -> Result<()> { let metadata = TransactionMetadata { txn_id, snapshot_ts, isolation_level, state: TransactionState::Active, in_conflict: HashSet::new(), out_conflict: HashSet::new(), created_at: Self::current_timestamp(), };
self.transactions .write() .insert(txn_id, Arc::new(RwLock::new(metadata)));
// Update metrics let mut metrics = self.metrics.write(); metrics.active_transactions = self.transactions.read().len();
Ok(()) }
/// Check for SIREAD conflicts when writing a key /// /// Returns list of conflicts detected (reader_txn -> writer_txn). pub fn check_write_conflicts( &self, writer_txn_id: TransactionId, key: &Bytes, ) -> Result<Vec<Conflict>> { let mut conflicts = Vec::new();
// 1. Get all transactions with SIREAD locks on this key let reader_txns = self.siread_lock_manager.get_locks_on_key(key);
// 2. Get all transactions with predicate locks overlapping this key let predicate_reader_txns = self.siread_lock_manager.get_predicate_locks_on_key(key);
// 3. Combine both types of locks let all_readers: HashSet<TransactionId> = reader_txns .into_iter() .chain(predicate_reader_txns) .collect();
// 4. For each reader, register rw-conflict for reader_txn_id in all_readers { if reader_txn_id != writer_txn_id { conflicts.push(Conflict { reader_txn_id, writer_txn_id, key: key.clone(), conflict_type: ConflictType::ReadWrite, detected_at: Self::current_timestamp(), }); } }
Ok(conflicts) }
/// Register a rw-antidependency between transactions /// /// reader_txn --rw--> writer_txn pub fn register_rw_conflict( &self, reader_txn_id: TransactionId, writer_txn_id: TransactionId, ) -> Result<()> { let transactions = self.transactions.read();
// 1. Update reader's out_conflict if let Some(reader_meta) = transactions.get(&reader_txn_id) { reader_meta.write().out_conflict.insert(writer_txn_id); }
// 2. Update writer's in_conflict if let Some(writer_meta) = transactions.get(&writer_txn_id) { writer_meta.write().in_conflict.insert(reader_txn_id); }
// 3. Update metrics let mut metrics = self.metrics.write(); metrics.total_rw_conflicts += 1;
Ok(()) }
/// Get conflict info for a transaction pub fn get_conflicts(&self, txn_id: TransactionId) -> ConflictInfo { let transactions = self.transactions.read();
if let Some(meta) = transactions.get(&txn_id) { let meta_read = meta.read(); ConflictInfo { in_conflict: meta_read.in_conflict.clone(), out_conflict: meta_read.out_conflict.clone(), } } else { ConflictInfo { in_conflict: HashSet::new(), out_conflict: HashSet::new(), } } }
/// Remove transaction from conflict graph /// /// Called when transaction commits or aborts. pub fn remove_transaction(&self, txn_id: TransactionId) -> Result<()> { let mut transactions = self.transactions.write();
// 1. Remove from transactions map if let Some(meta) = transactions.remove(&txn_id) { let meta_read = meta.read();
// 2. Remove from other transactions' conflict sets for other_txn_id in &meta_read.in_conflict { if let Some(other_meta) = transactions.get(other_txn_id) { other_meta.write().out_conflict.remove(&txn_id); } }
for other_txn_id in &meta_read.out_conflict { if let Some(other_meta) = transactions.get(other_txn_id) { other_meta.write().in_conflict.remove(&txn_id); } } }
// 3. Update metrics let mut metrics = self.metrics.write(); metrics.active_transactions = transactions.len();
Ok(()) }
/// Get current metrics pub fn get_metrics(&self) -> ConflictMetrics { let mut metrics = self.metrics.read().clone();
// Calculate average conflicts per transaction let transactions = self.transactions.read(); if !transactions.is_empty() { let total_conflicts: usize = transactions .values() .map(|meta| meta.read().out_conflict.len()) .sum();
metrics.avg_conflicts_per_txn = total_conflicts as f64 / transactions.len() as f64; }
metrics }
/// Get current timestamp fn current_timestamp() -> Timestamp { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_micros() as Timestamp }
/// Get transaction metadata (internal use) pub(crate) fn get_transaction_metadata(&self, txn_id: TransactionId) -> Option<Arc<RwLock<TransactionMetadata>>> { self.transactions.read().get(&txn_id).cloned() }}
// ============================================================================// Tests// ============================================================================
#[cfg(test)]mod tests { use super::*;
fn setup_detector() -> (Arc<SireadLockManager>, ConflictDetector) { let lock_manager = Arc::new(SireadLockManager::new()); let detector = ConflictDetector::new(Arc::clone(&lock_manager)); (lock_manager, detector) }
#[test] fn test_register_transaction() { let (_, detector) = setup_detector();
detector .register_transaction(1, 100, IsolationLevel::Serializable) .unwrap();
let conflicts = detector.get_conflicts(1); assert!(conflicts.in_conflict.is_empty()); assert!(conflicts.out_conflict.is_empty()); }
#[test] fn test_register_rw_conflict() { let (_, detector) = setup_detector();
// Register transactions detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap(); detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap();
// Register conflict: T1 --rw--> T2 detector.register_rw_conflict(1, 2).unwrap();
// Verify T1's out_conflict contains T2 let t1_conflicts = detector.get_conflicts(1); assert!(t1_conflicts.out_conflict.contains(&2));
// Verify T2's in_conflict contains T1 let t2_conflicts = detector.get_conflicts(2); assert!(t2_conflicts.in_conflict.contains(&1)); }
#[test] fn test_check_write_conflicts() { let (lock_manager, detector) = setup_detector();
let key = Bytes::from("test_key");
// T1 reads key (acquires SIREAD lock) detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap(); lock_manager.acquire_siread_lock(1, &key).unwrap();
// T2 writes key detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap(); let conflicts = detector.check_write_conflicts(2, &key).unwrap();
// Should detect rw-conflict assert_eq!(conflicts.len(), 1); assert_eq!(conflicts[0].reader_txn_id, 1); assert_eq!(conflicts[0].writer_txn_id, 2); assert_eq!(conflicts[0].conflict_type, ConflictType::ReadWrite); }
#[test] fn test_remove_transaction() { let (_, detector) = setup_detector();
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap(); detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap(); detector.register_rw_conflict(1, 2).unwrap();
// Remove T1 detector.remove_transaction(1).unwrap();
// T2's in_conflict should be empty let t2_conflicts = detector.get_conflicts(2); assert!(t2_conflicts.in_conflict.is_empty()); }
// TODO: Add more tests // - Multiple conflicts // - Concurrent conflict registration // - Metrics tracking}Module 3: write_skew_validator.rs (600 LOC)
//! Write Skew Validator for SSI//!//! Detects dangerous structures (cycles in dependency graph) that would//! violate serializability.
use std::collections::{HashSet, VecDeque};use std::sync::Arc;use parking_lot::RwLock;use serde::{Deserialize, Serialize};
use crate::conflict_detector::{ConflictDetector, ConflictInfo};use crate::ssi_types::{TransactionId, Result};
// ============================================================================// Data Structures// ============================================================================
/// Validator metrics#[derive(Debug, Clone, Default, Serialize, Deserialize)]pub struct ValidatorMetrics { pub validations_performed: u64, pub cycles_detected: u64, pub avg_validation_time_us: f64, pub max_cycle_length: usize,}
// ============================================================================// Write Skew Validator// ============================================================================
/// Validates transactions for dangerous structures (serialization cycles)pub struct WriteSkewValidator { /// Conflict detector (dependency) conflict_detector: Arc<ConflictDetector>,
/// Metrics metrics: Arc<RwLock<ValidatorMetrics>>,}
impl WriteSkewValidator { /// Create new validator pub fn new(conflict_detector: Arc<ConflictDetector>) -> Self { Self { conflict_detector, metrics: Arc::new(RwLock::new(ValidatorMetrics::default())), } }
/// Check if transaction is in a dangerous structure /// /// A dangerous structure exists when: /// 1. Transaction has both in_conflict and out_conflict (not empty) /// 2. There exists a path from out_conflict back to in_conflict (cycle) /// /// Returns true if cycle detected (transaction should abort). pub fn has_dangerous_structure(&self, txn_id: TransactionId) -> Result<bool> { let start_time = std::time::Instant::now();
// Get conflict info for this transaction let conflicts = self.conflict_detector.get_conflicts(txn_id);
// Fast path: No conflicts = no cycle possible if conflicts.in_conflict.is_empty() || conflicts.out_conflict.is_empty() { self.update_metrics(start_time, false, 0); return Ok(false); }
// Slow path: BFS to detect cycle for out_txn in &conflicts.out_conflict { if let Some(cycle) = self.detect_cycle(*out_txn, &conflicts.in_conflict) { self.update_metrics(start_time, true, cycle.len()); return Ok(true); // Cycle found! } }
self.update_metrics(start_time, false, 0); Ok(false) }
/// Detect cycle from start transaction to any target transaction /// /// Uses BFS to find if there's a path from start to any transaction in targets. /// Returns Some(path) if cycle found, None otherwise. pub fn detect_cycle( &self, start: TransactionId, targets: &HashSet<TransactionId>, ) -> Option<Vec<TransactionId>> { let mut visited = HashSet::new(); let mut queue = VecDeque::new(); let mut parent: std::collections::HashMap<TransactionId, TransactionId> = std::collections::HashMap::new();
queue.push_back(start);
while let Some(current) = queue.pop_front() { // Found target! if targets.contains(¤t) { // Reconstruct path let mut path = vec![current]; let mut node = current;
while let Some(&prev) = parent.get(&node) { path.push(prev); node = prev; }
path.reverse(); return Some(path); }
if visited.contains(¤t) { continue; } visited.insert(current);
// Get current transaction's out_conflict let current_conflicts = self.conflict_detector.get_conflicts(current);
for &next in ¤t_conflicts.out_conflict { if !visited.contains(&next) { parent.insert(next, current); queue.push_back(next); } } }
None // No cycle found }
/// Choose victim transaction to abort when cycle detected /// /// Simple heuristic: Abort youngest transaction (highest txn_id). /// This reduces cascading aborts. pub fn choose_victim(&self, cycle: &[TransactionId]) -> TransactionId { // TODO: Implement smarter victim selection strategies: // - Youngest transaction (current) // - Transaction with least work done // - Transaction with most conflicts // - Random selection
*cycle.iter().max().unwrap() }
/// Get current metrics pub fn get_metrics(&self) -> ValidatorMetrics { self.metrics.read().clone() }
/// Update metrics after validation fn update_metrics(&self, start_time: std::time::Instant, cycle_detected: bool, cycle_length: usize) { let mut metrics = self.metrics.write();
metrics.validations_performed += 1;
if cycle_detected { metrics.cycles_detected += 1; if cycle_length > metrics.max_cycle_length { metrics.max_cycle_length = cycle_length; } }
let elapsed_us = start_time.elapsed().as_micros() as f64; metrics.avg_validation_time_us = (metrics.avg_validation_time_us * (metrics.validations_performed - 1) as f64 + elapsed_us) / metrics.validations_performed as f64; }}
// ============================================================================// Tests// ============================================================================
#[cfg(test)]mod tests { use super::*; use crate::ssi_lock_manager::SireadLockManager; use crate::ssi_types::IsolationLevel;
fn setup_validator() -> (Arc<ConflictDetector>, WriteSkewValidator) { let lock_manager = Arc::new(SireadLockManager::new()); let detector = Arc::new(ConflictDetector::new(lock_manager)); let validator = WriteSkewValidator::new(Arc::clone(&detector)); (detector, validator) }
#[test] fn test_no_cycle_no_conflicts() { let (detector, validator) = setup_validator();
detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap();
// No conflicts = no cycle let has_cycle = validator.has_dangerous_structure(1).unwrap(); assert!(!has_cycle); }
#[test] fn test_no_cycle_linear_chain() { let (detector, validator) = setup_validator();
// T1 --rw--> T2 (no cycle) detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap(); detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap(); detector.register_rw_conflict(1, 2).unwrap();
// T1 has out_conflict but no in_conflict = no cycle let has_cycle = validator.has_dangerous_structure(1).unwrap(); assert!(!has_cycle); }
#[test] fn test_cycle_detection() { let (detector, validator) = setup_validator();
// Create cycle: T1 --rw--> T2 --rw--> T3 --rw--> T1 detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap(); detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap(); detector.register_transaction(3, 102, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap(); // T1 --rw--> T2 detector.register_rw_conflict(2, 3).unwrap(); // T2 --rw--> T3 detector.register_rw_conflict(3, 1).unwrap(); // T3 --rw--> T1
// All transactions are in a cycle assert!(validator.has_dangerous_structure(1).unwrap()); assert!(validator.has_dangerous_structure(2).unwrap()); assert!(validator.has_dangerous_structure(3).unwrap()); }
#[test] fn test_detect_cycle_bfs() { let (detector, validator) = setup_validator();
// T1 -> T2 -> T3 detector.register_transaction(1, 100, IsolationLevel::Serializable).unwrap(); detector.register_transaction(2, 101, IsolationLevel::Serializable).unwrap(); detector.register_transaction(3, 102, IsolationLevel::Serializable).unwrap();
detector.register_rw_conflict(1, 2).unwrap(); detector.register_rw_conflict(2, 3).unwrap();
let targets = vec![3].into_iter().collect(); let cycle = validator.detect_cycle(1, &targets); assert!(cycle.is_some()); assert_eq!(cycle.unwrap(), vec![1, 2, 3]); }
#[test] fn test_victim_selection() { let (_, validator) = setup_validator();
let cycle = vec![10, 20, 30, 15];
// Youngest transaction (highest ID) let victim = validator.choose_victim(&cycle); assert_eq!(victim, 30); }
// TODO: Add more tests // - Complex graphs with multiple cycles // - Performance tests (large graphs) // - Metrics validation}Module 4: ssi_types.rs (300 LOC)
//! Shared types for SSI implementation
use serde::{Deserialize, Serialize};use std::fmt;
// ============================================================================// Type Aliases// ============================================================================
/// Transaction identifierpub type TransactionId = u64;
/// Timestamp (microseconds since UNIX epoch)pub type Timestamp = u64;
/// Predicate lock identifierpub type PredicateLockId = u64;
// ============================================================================// Enums// ============================================================================
/// Transaction isolation level#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]pub enum IsolationLevel { ReadUncommitted, ReadCommitted, RepeatableRead, Serializable,}
/// Transaction state#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]pub enum TransactionState { Active, Preparing, Prepared, Committing, Committed, Aborted,}
// ============================================================================// Error Types// ============================================================================
/// SSI-specific errors#[derive(Debug, Clone, PartialEq, Eq)]pub enum SsiError { /// Serialization failure (SQLSTATE 40001) SerializationFailure(String),
/// Deadlock detected (SQLSTATE 40P01) DeadlockDetected(TransactionId),
/// Lock timeout LockTimeout(TransactionId),
/// Transaction not found TransactionNotFound(TransactionId),
/// Invalid state InvalidState(String),
/// Internal error InternalError(String),}
impl fmt::Display for SsiError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { SsiError::SerializationFailure(msg) => { write!(f, "Serialization failure: {}", msg) } SsiError::DeadlockDetected(txn_id) => { write!(f, "Deadlock detected for transaction {}", txn_id) } SsiError::LockTimeout(txn_id) => { write!(f, "Lock timeout for transaction {}", txn_id) } SsiError::TransactionNotFound(txn_id) => { write!(f, "Transaction {} not found", txn_id) } SsiError::InvalidState(msg) => { write!(f, "Invalid state: {}", msg) } SsiError::InternalError(msg) => { write!(f, "Internal error: {}", msg) } } }}
impl std::error::Error for SsiError {}
/// Result type for SSI operationspub type Result<T> = std::result::Result<T, SsiError>;
// ============================================================================// Conversion Implementations// ============================================================================
impl From<SsiError> for String { fn from(err: SsiError) -> String { err.to_string() }}
// TODO: Implement conversions to database-specific error types// impl From<SsiError> for PgError { ... }// impl From<SsiError> for MySqlError { ... }Module 5: lib.rs (200 LOC)
//! HeliosDB SSI (Serializable Snapshot Isolation) Implementation//!//! This crate provides the SSI layer for HeliosDB, implementing true//! serializability on top of snapshot isolation using the PostgreSQL//! SSI algorithm.//!//! # Architecture//!//! ```text//! ┌─────────────────────────────────────────────────────────────┐//! │ Transaction Coordinator │//! └─────────────────────┬───────────────────────────────────────┘//! │//! ↓//! ┌─────────────────────────────────────────────────────────────┐//! │ SSI Layer │//! ├─────────────────────────────────────────────────────────────┤//! │ - SireadLockManager: SIREAD lock acquisition │//! │ - ConflictDetector: rw-conflict tracking │//! │ - WriteSkewValidator: Cycle detection │//! └─────────────────────┬───────────────────────────────────────┘//! │//! ↓//! ┌─────────────────────────────────────────────────────────────┐//! │ MVCC Storage Layer │//! └─────────────────────────────────────────────────────────────┘//! ```//!//! # Usage//!//! ```rust//! use heliosdb_ssi::{SireadLockManager, ConflictDetector, WriteSkewValidator};//! use std::sync::Arc;//!//! // Initialize SSI components//! let lock_manager = Arc::new(SireadLockManager::new());//! let conflict_detector = Arc::new(ConflictDetector::new(Arc::clone(&lock_manager)));//! let validator = WriteSkewValidator::new(Arc::clone(&conflict_detector));//!//! // Use in transaction coordinator//! // (see SSI_SERIALIZABLE_SNAPSHOT_ISOLATION_ARCHITECTURE.md for details)//! ```
// Public modulespub mod ssi_lock_manager;pub mod conflict_detector;pub mod write_skew_validator;pub mod ssi_types;
// Re-export main typespub use ssi_lock_manager::{SireadLockManager, Predicate, SireadMetrics};pub use conflict_detector::{ConflictDetector, Conflict, ConflictType, ConflictInfo};pub use write_skew_validator::{WriteSkewValidator, ValidatorMetrics};pub use ssi_types::{TransactionId, Timestamp, IsolationLevel, TransactionState, SsiError, Result};
// Version informationpub const VERSION: &str = env!("CARGO_PKG_VERSION");pub const SSI_ALGORITHM_VERSION: &str = "PostgreSQL-SSI-1.0";
/// Initialize SSI subsystem////// Returns configured SSI components ready for use.pub fn initialize() -> ( Arc<SireadLockManager>, Arc<ConflictDetector>, WriteSkewValidator,) { let lock_manager = Arc::new(SireadLockManager::new()); let conflict_detector = Arc::new(ConflictDetector::new(Arc::clone(&lock_manager))); let validator = WriteSkewValidator::new(Arc::clone(&conflict_detector));
(lock_manager, conflict_detector, validator)}
#[cfg(test)]mod integration_tests { use super::*; use bytes::Bytes;
#[test] fn test_ssi_initialization() { let (lock_manager, conflict_detector, validator) = initialize();
// Verify components initialized assert_eq!(lock_manager.get_metrics().active_key_locks, 0); assert_eq!(conflict_detector.get_metrics().active_transactions, 0); assert_eq!(validator.get_metrics().validations_performed, 0); }
// TODO: Add end-to-end integration tests // - Full transaction lifecycle with SSI // - Write skew prevention // - Phantom read prevention // - Distributed SSI (2PC integration)}Cargo.toml
[package]name = "heliosdb-ssi"version = "0.1.0"edition = "2021"authors = ["HeliosDB Team"]description = "Serializable Snapshot Isolation implementation for HeliosDB"license = "Apache-2.0"
[dependencies]# Concurrencydashmap = "5.5"parking_lot = "0.12"
# Data structuresbytes = "1.5"
# Serializationserde = { version = "1.0", features = ["derive"] }
# Async runtime (if needed)tokio = { version = "1.35", features = ["sync"], optional = true }
[dev-dependencies]# Testingtokio = { version = "1.35", features = ["full"] }criterion = "0.5"
[features]default = []async = ["tokio"]
[[bench]]name = "ssi_benchmarks"harness = falseREADME.md for heliosdb-ssi
# HeliosDB SSI (Serializable Snapshot Isolation)
Implementation of Serializable Snapshot Isolation for HeliosDB, based on the PostgreSQL SSI algorithm.
## Features
- Non-blocking SIREAD locks- Predicate locks for phantom read prevention- Efficient rw-conflict detection- Cycle detection for dangerous structures- Production-ready performance (<10% overhead)
## Architecture
See [SSI_SERIALIZABLE_SNAPSHOT_ISOLATION_ARCHITECTURE.md](../../docs/architecture/SSI_SERIALIZABLE_SNAPSHOT_ISOLATION_ARCHITECTURE.md) for complete architecture.
## Module Overview
- **ssi_lock_manager**: SIREAD lock acquisition and management- **conflict_detector**: rw-conflict detection and tracking- **write_skew_validator**: Dangerous structure (cycle) detection- **ssi_types**: Shared types and error definitions
## Usage
```rustuse heliosdb_ssi::{initialize, IsolationLevel};
// Initialize SSIlet (lock_manager, conflict_detector, validator) = initialize();
// Integrate with transaction coordinator// (see architecture document for details)Testing
# Run all testscargo test
# Run benchmarkscargo bench
# Generate coverage reportcargo tarpaulin --out HtmlPerformance Targets
- SIREAD lock acquisition: <100ns (p99)
- Conflict detection: <500ns (p99)
- Cycle detection: <1ms (p99)
- TPC-C throughput: >5,000 tpmC
Implementation Timeline
- Weeks 1-4: Algorithm design & module specifications
- Weeks 5-10: Core implementation
- Weeks 11-13: Integration & testing
- Weeks 14-15: Production hardening
References
- Serializable Snapshot Isolation in PostgreSQL - Ports et al., VLDB 2012
- PostgreSQL source:
src/backend/storage/lmgr/predicate.c
---
## Next Steps for Engineers
1. **Week 1:** Review architecture document, set up module scaffolding2. **Week 2:** Implement `ssi_lock_manager.rs` (Engineer 1)3. **Week 3:** Implement `conflict_detector.rs` (Engineer 2)4. **Week 4:** Implement `write_skew_validator.rs` (Engineer 3)5. **Week 5+:** Integration testing and production hardening
All TODO comments in the code indicate areas requiring implementation logic.
**END OF MODULE TEMPLATES**