Branch Storage Architecture - Part 3 of 3
Branch Storage Architecture - Part 3 of 3
Operations & Testing
Navigation: Index | ← Part 2 | Part 3
impl BranchMetadataCache { pub fn new(capacity: usize) -> Self { Self { cache: Arc::new(RwLock::new(LruCache::new(capacity))), capacity, } }
pub fn get(&self, branch_id: BranchId) -> Option<BranchMetadata> { self.cache.write().get(&branch_id).cloned()}
pub fn put(&self, metadata: BranchMetadata) { self.cache.write().put(metadata.branch_id, metadata);}}
#### 11.2.2 Parent Chain Caching
```rustimpl BranchTransaction { /// Build and cache parent chain fn build_parent_chain( branch: &BranchMetadata, storage: &StorageEngine, ) -> Result<Vec<(BranchId, SnapshotId)>> { let mut chain = Vec::new(); let mut current_id = branch.parent_id;
while let Some(parent_id) = current_id { let parent = get_branch_metadata(storage, parent_id)?; chain.push((parent_id, parent.created_from_snapshot)); current_id = parent.parent_id; }
Ok(chain) }}11.2.3 Bloom Filters for Modified Keys
/// Track modified keys per branch with bloom filterpub struct BranchBloomFilter { /// Bloom filter for modified keys filter: BloomFilter,
/// False positive rate fpr: f64,}
impl BranchTransaction { /// Check if key might be modified in this branch fn key_might_exist_in_branch(&self, key: &str) -> bool { if let Some(filter) = &self.bloom_filter { filter.contains(key) } else { true // Conservative: assume might exist } }
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> { // Fast path: check bloom filter let key_str = String::from_utf8_lossy(key);
if !self.key_might_exist_in_branch(&key_str) { // Definitely not in this branch - go to parent return self.get_from_parent_chain(key); }
// Slow path: actual lookup // ... rest of implementation }}11.2.4 Lazy Parent Reads
/// Lazy iterator over parent chainstruct ParentChainIterator<'a> { storage: &'a StorageEngine, current_branch: Option<BranchId>, current_snapshot: SnapshotId,}
impl<'a> Iterator for ParentChainIterator<'a> { type Item = (BranchId, SnapshotId);
fn next(&mut self) -> Option<Self::Item> { let branch_id = self.current_branch?; let snapshot = self.current_snapshot;
// Load next parent lazily if let Ok(metadata) = get_branch_metadata(self.storage, branch_id) { self.current_branch = metadata.parent_id; self.current_snapshot = metadata.created_from_snapshot; } else { self.current_branch = None; }
Some((branch_id, snapshot)) }}11.3 Space Efficiency
Storage Breakdown (per branch):────────────────────────────────────────────Metadata: ~500 bytesRegistry entry: ~50 bytesParent/child index: ~100 bytesBloom filter: ~10KB (optional)────────────────────────────────────────────Total overhead: ~11KB per branch
Data storage: Only modified data (copy-on-write)
Example:- 1GB database- 5 branches- 10% data modified per branch
Total storage:- Original: 1GB- Branches: 5 × (11KB + 100MB) ≈ 500MB- Total: 1.5GB (50% overhead)- Per branch: 100MB / 1GB = 10% (as expected)11.4 Compaction Impact
/// Custom compaction filter for branch GCstruct BranchCompactionFilter { active_branches: HashSet<BranchId>,}
impl rocksdb::CompactionFilter for BranchCompactionFilter { fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> bool { // Parse branch ID from key if let Some(branch_id) = extract_branch_id(key) { // Keep if branch is active return !self.active_branches.contains(&branch_id); } false // Keep non-branch keys }}12. Testing Strategy
12.1 Unit Tests
#[cfg(test)]mod tests { use super::*;
#[test] fn test_create_branch_metadata_only() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create branch let branch_id = create_branch( &storage, "dev", None, AsOfClause::Now, BranchOptions::default(), ).unwrap();
// Verify metadata exists let metadata = get_branch_metadata(&storage, branch_id).unwrap(); assert_eq!(metadata.name, "dev"); assert_eq!(metadata.state, BranchState::Active); }
#[test] fn test_copy_on_write_first_write() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert in main storage.put(b"key1", b"value1").unwrap();
// Create branch create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();
// Read from dev (should see main's value) let mut tx = storage.begin_branch_transaction("dev").unwrap(); assert_eq!(tx.get(b"key1").unwrap(), Some(b"value1".to_vec()));
// Write to dev (copy-on-write) tx.put(b"key1".to_vec(), b"value2".to_vec()).unwrap(); tx.commit().unwrap();
// Verify isolation assert_eq!( storage.get(b"key1").unwrap(), Some(b"value1".to_vec()) ); // main unchanged
let tx_dev = storage.begin_branch_transaction("dev").unwrap(); assert_eq!( tx_dev.get(b"key1").unwrap(), Some(b"value2".to_vec()) ); // dev has new value }
#[test] fn test_branch_isolation() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create two branches create_branch(&storage, "branch_a", None, AsOfClause::Now, Default::default()).unwrap(); create_branch(&storage, "branch_b", None, AsOfClause::Now, Default::default()).unwrap();
// Write to branch_a let mut tx_a = storage.begin_branch_transaction("branch_a").unwrap(); tx_a.put(b"key1".to_vec(), b"value_a".to_vec()).unwrap(); tx_a.commit().unwrap();
// Write to branch_b let mut tx_b = storage.begin_branch_transaction("branch_b").unwrap(); tx_b.put(b"key1".to_vec(), b"value_b".to_vec()).unwrap(); tx_b.commit().unwrap();
// Verify isolation let tx_a = storage.begin_branch_transaction("branch_a").unwrap(); assert_eq!(tx_a.get(b"key1").unwrap(), Some(b"value_a".to_vec()));
let tx_b = storage.begin_branch_transaction("branch_b").unwrap(); assert_eq!(tx_b.get(b"key1").unwrap(), Some(b"value_b".to_vec())); }
#[test] fn test_parent_chain_resolution() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert in main storage.put(b"key1", b"value1").unwrap();
// Create child branch create_branch(&storage, "level1", None, AsOfClause::Now, Default::default()).unwrap();
// Create grandchild branch create_branch(&storage, "level2", Some("level1"), AsOfClause::Now, Default::default()).unwrap();
// Read from grandchild (should see main's value) let tx = storage.begin_branch_transaction("level2").unwrap(); assert_eq!(tx.get(b"key1").unwrap(), Some(b"value1".to_vec())); }
#[test] fn test_three_way_merge_no_conflicts() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Setup: main has key1=v1 storage.put(b"key1", b"v1").unwrap();
// Create branch create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();
// Modify in dev: key1=v2 let mut tx = storage.begin_branch_transaction("dev").unwrap(); tx.put(b"key1".to_vec(), b"v2".to_vec()).unwrap(); tx.commit().unwrap();
// Merge dev into main let result = merge_branch( &storage, "dev", "main", MergeOptions { conflict_resolution: ConflictResolution::BranchWins, delete_branch_after: false, }, ).unwrap();
assert_eq!(result.conflicts_count, 0); assert_eq!(storage.get(b"key1").unwrap(), Some(b"v2".to_vec())); }
#[test] fn test_merge_with_conflicts() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Setup: main has key1=v1 storage.put(b"key1", b"v1").unwrap();
// Create branch create_branch(&storage, "dev", None, AsOfClause::Now, Default::default()).unwrap();
// Modify same key in both branches storage.put(b"key1", b"v2_main").unwrap();
let mut tx = storage.begin_branch_transaction("dev").unwrap(); tx.put(b"key1".to_vec(), b"v2_dev".to_vec()).unwrap(); tx.commit().unwrap();
// Merge with FAIL resolution let result = merge_branch( &storage, "dev", "main", MergeOptions { conflict_resolution: ConflictResolution::Fail, delete_branch_after: false, }, );
assert!(result.is_err()); // Should fail due to conflict }}12.2 Integration Tests
#[cfg(test)]mod integration_tests { use super::*;
#[test] fn test_branching_sql_integration() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Execute SQL commands execute_sql(&storage, "CREATE TABLE users (id INT, name TEXT)").unwrap(); execute_sql(&storage, "INSERT INTO users VALUES (1, 'Alice')").unwrap();
// Create branch via SQL execute_sql(&storage, "CREATE DATABASE BRANCH dev FROM CURRENT AS OF NOW").unwrap();
// Insert in dev execute_sql(&storage, "SET branch = dev").unwrap(); execute_sql(&storage, "INSERT INTO users VALUES (2, 'Bob')").unwrap();
// Verify isolation execute_sql(&storage, "SET branch = main").unwrap(); let result = query_sql(&storage, "SELECT * FROM users").unwrap(); assert_eq!(result.len(), 1); // Only Alice
execute_sql(&storage, "SET branch = dev").unwrap(); let result = query_sql(&storage, "SELECT * FROM users").unwrap(); assert_eq!(result.len(), 2); // Alice and Bob }
#[test] fn test_concurrent_branch_transactions() { use std::sync::Arc; use std::thread;
let config = Config::in_memory(); let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
// Create branches create_branch(&storage, "branch1", None, AsOfClause::Now, Default::default()).unwrap(); create_branch(&storage, "branch2", None, AsOfClause::Now, Default::default()).unwrap();
let storage1 = Arc::clone(&storage); let storage2 = Arc::clone(&storage);
// Concurrent writes to different branches let t1 = thread::spawn(move || { for i in 0..100 { let mut tx = storage1.begin_branch_transaction("branch1").unwrap(); tx.put(format!("key{}", i).into_bytes(), b"value1".to_vec()).unwrap(); tx.commit().unwrap(); } });
let t2 = thread::spawn(move || { for i in 0..100 { let mut tx = storage2.begin_branch_transaction("branch2").unwrap(); tx.put(format!("key{}", i).into_bytes(), b"value2".to_vec()).unwrap(); tx.commit().unwrap(); } });
t1.join().unwrap(); t2.join().unwrap();
// Verify isolation let tx1 = storage.begin_branch_transaction("branch1").unwrap(); assert_eq!(tx1.get(b"key50").unwrap(), Some(b"value1".to_vec()));
let tx2 = storage.begin_branch_transaction("branch2").unwrap(); assert_eq!(tx2.get(b"key50").unwrap(), Some(b"value2".to_vec())); }}12.3 Performance Tests
#[cfg(test)]mod perf_tests { use super::*; use std::time::Instant;
#[test] fn bench_branch_creation() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert 1M rows for i in 0..1_000_000 { storage.put(format!("key{}", i).as_bytes(), b"value").unwrap(); }
// Benchmark branch creation let start = Instant::now(); create_branch(&storage, "bench", None, AsOfClause::Now, Default::default()).unwrap(); let elapsed = start.elapsed();
println!("Branch creation: {:?}", elapsed); assert!(elapsed.as_millis() < 10, "Branch creation too slow: {:?}", elapsed); }
#[test] fn bench_branch_read_overhead() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Insert 100K rows for i in 0..100_000 { storage.put(format!("key{}", i).as_bytes(), b"value").unwrap(); }
// Create branch create_branch(&storage, "bench", None, AsOfClause::Now, Default::default()).unwrap();
// Benchmark main reads let start = Instant::now(); for i in 0..10_000 { storage.get(format!("key{}", i).as_bytes()).unwrap(); } let main_time = start.elapsed();
// Benchmark branch reads let start = Instant::now(); for i in 0..10_000 { let tx = storage.begin_branch_transaction("bench").unwrap(); tx.get(format!("key{}", i).as_bytes()).unwrap(); } let branch_time = start.elapsed();
let overhead = (branch_time.as_micros() as f64 / main_time.as_micros() as f64 - 1.0) * 100.0; println!("Branch read overhead: {:.2}%", overhead); assert!(overhead < 10.0, "Branch read overhead too high: {:.2}%", overhead); }}12.4 Stress Tests
#[cfg(test)]mod stress_tests { use super::*;
#[test] fn stress_test_many_branches() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create 100 branches for i in 0..100 { create_branch( &storage, &format!("branch{}", i), None, AsOfClause::Now, Default::default(), ).unwrap(); }
// Verify all branches are accessible for i in 0..100 { let tx = storage.begin_branch_transaction(&format!("branch{}", i)).unwrap(); assert!(tx.get(b"key").unwrap().is_none()); } }
#[test] fn stress_test_deep_hierarchy() { let config = Config::in_memory(); let storage = StorageEngine::open_in_memory(&config).unwrap();
// Create 50-level deep hierarchy let mut parent = None; for i in 0..50 { let branch_name = format!("level{}", i); create_branch( &storage, &branch_name, parent.as_deref(), AsOfClause::Now, Default::default(), ).unwrap(); parent = Some(branch_name); }
// Insert at root storage.put(b"key", b"value").unwrap();
// Read from deepest branch (should traverse 50 parents) let tx = storage.begin_branch_transaction("level49").unwrap(); assert_eq!(tx.get(b"key").unwrap(), Some(b"value".to_vec())); }}Appendix A: SQL Examples
-- Create a development branch from current stateCREATE DATABASE BRANCH dev FROM CURRENT AS OF NOW;
-- Create a branch from a specific timestampCREATE DATABASE BRANCH stagingFROM mainAS OF TIMESTAMP '2025-11-18 10:00:00';
-- Create a branch from a transaction IDCREATE DATABASE BRANCH hotfixFROM productionAS OF TRANSACTION 123456;
-- Create a branch with optionsCREATE DATABASE BRANCH experimentFROM mainAS OF NOWWITH ( replication_factor = 3, region = 'us-west');
-- Switch to a branch (session-level)SET branch = dev;
-- Insert data in branchINSERT INTO users (name) VALUES ('Alice');
-- Query branchSELECT * FROM users;
-- Merge branch back to mainMERGE DATABASE BRANCH dev INTO mainWITH ( conflict_resolution = 'branch_wins', delete_branch_after = true);
-- Drop a branchDROP DATABASE BRANCH IF EXISTS experiment;
-- List all branches (system view)SELECT * FROM pg_database_branches();
-- View branch hierarchySELECT name, parent_name, created_at, state, modified_keysFROM pg_database_branches()ORDER BY created_at;Appendix B: Architecture Decision Records
ADR-1: Use RocksDB Native MVCC
Decision: Leverage RocksDB’s existing MVCC capabilities rather than implementing a separate versioning layer.
Rationale:
- RocksDB provides efficient multi-version storage
- Reduces implementation complexity
- Better integration with existing storage layer
Consequences:
- Branch versioning builds on RocksDB timestamps
- Natural alignment with existing transaction system
- Minor overhead for branch metadata
ADR-2: Branch ID in Physical Key
Decision: Encode branch ID as part of the physical key rather than using column families.
Rationale:
- Simpler key space management
- Efficient range scans within a branch
- Natural clustering for compaction
Alternatives Considered:
- Column families per branch: Complex management, limited to ~10K branches
- Separate RocksDB instances: Too much overhead
Consequences:
- Keys are slightly larger (8 bytes for branch ID)
- Efficient branch isolation and GC
ADR-3: Lazy Parent Resolution
Decision: Resolve parent values on-demand during reads rather than eagerly copying.
Rationale:
- Instant branch creation
- Minimal storage overhead
- Most data is never modified in branches
Consequences:
- Slight read overhead for unchanged data
- Complex read path with parent chain traversal
- Excellent storage efficiency
ADR-4: Three-Way Merge Algorithm
Decision: Use three-way merge with configurable conflict resolution.
Rationale:
- Standard approach in version control systems
- Handles complex merge scenarios
- Supports automatic conflict resolution
Alternatives Considered:
- Two-way merge: Cannot detect concurrent modifications
- CRDTs: Too complex for general key-value storage
Consequences:
- Requires merge base tracking
- Supports flexible conflict resolution strategies
Appendix C: Future Enhancements
Distributed Branching
Support branches across distributed HeliosDB clusters:
pub struct DistributedBranchOptions { /// Replication factor pub replication_factor: usize,
/// Preferred region pub region: String,
/// Cross-region replication pub cross_region: bool,}Branch Snapshots
Create lightweight snapshots within branches:
CREATE SNAPSHOT dev_snapshot_1 FROM BRANCH dev AS OF NOW;Branch Permissions
Fine-grained access control per branch:
GRANT SELECT, INSERT ON BRANCH dev TO user_alice;REVOKE DELETE ON BRANCH production FROM user_bob;Branch Triggers
Execute triggers on branch events:
CREATE TRIGGER on_branch_mergeAFTER MERGE ON BRANCH devEXECUTE PROCEDURE notify_deployment();Summary
This architecture provides:
- Efficient Copy-on-Write: Zero-copy branch creation, minimal storage overhead
- Strong Isolation: Branch-aware MVCC with snapshot isolation
- Flexible Merging: Three-way merge with conflict detection and resolution
- Automatic GC: Reference-counting garbage collection for dropped branches
- MVCC Integration: Seamless integration with existing transaction system
Next Steps:
- Implement core data structures (BranchMetadata, BranchManager)
- Extend StorageEngine with branch-aware transactions
- Implement copy-on-write read/write paths
- Implement three-way merge algorithm
- Add garbage collection
- Integration with SQL parser and executor
- Comprehensive testing (unit, integration, performance)
Implementation Effort: 6-8 weeks
- Week 1-2: Core structures and metadata management
- Week 3-4: Copy-on-write read/write paths
- Week 5: Merge algorithm
- Week 6: Garbage collection
- Week 7: SQL integration
- Week 8: Testing and optimization