Branch Storage Architecture - Part 2 of 3
Branch Storage Architecture - Part 2 of 3
API & Implementation
Navigation: Index | ← Part 1 | Part 2 | Part 3 →
// 2. Find merge base (common ancestor)let merge_base = find_merge_base(storage, source.branch_id, target.branch_id)?;
// 3. Three-way merge: base -> source, base -> targetlet conflicts = perform_three_way_merge( storage, merge_base, source.branch_id, target.branch_id, &options,)?;
// 4. Handle conflicts based on resolution strategyif !conflicts.is_empty() { match options.conflict_resolution { ConflictResolution::Fail => { return Err(Error::merge_conflicts(conflicts)); } ConflictResolution::BranchWins => { // Source wins - already applied } ConflictResolution::TargetWins => { // Revert to target versions revert_conflicts_to_target(storage, &conflicts)?; } }}
// 5. Update target's merge_baselet mut target_meta = target.clone();target_meta.merge_base = Some(source.created_from_snapshot);save_branch_metadata(storage, &target_meta)?;
// 6. Mark source as mergedif options.delete_branch_after { let mut source_meta = source.clone(); source_meta.state = BranchState::Merged { into_branch: target.branch_id, at_timestamp: storage.current_timestamp(), }; save_branch_metadata(storage, &source_meta)?;}
Ok(MergeResult { conflicts_count: conflicts.len(), keys_merged: 0, // TODO: track resolution: options.conflict_resolution,})}
### 6.2 Branch-Aware Transaction API
```rustimpl StorageEngine { /// Begin transaction on a specific branch pub fn begin_branch_transaction( &self, branch_name: &str, ) -> Result<BranchTransaction> { let branch_mgr = BranchManager::new(Arc::clone(&self.db))?; let branch = branch_mgr.get_branch(branch_name)?;
// Create MVCC snapshot let snapshot_id = self.next_timestamp(); let tx = Transaction::new(Arc::clone(&self.db), snapshot_id)?;
// Build parent chain for reads let parent_chain = build_parent_chain(&branch)?;
Ok(BranchTransaction { tx, branch_id: branch.branch_id, branch_meta: branch, parent_chain, }) }}
impl BranchTransaction { /// Get value with branch hierarchy resolution pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> { // Try current branch let branch_key = encode_branch_data_key( self.branch_id, &String::from_utf8_lossy(key), self.tx.snapshot_id(), );
if let Some(value) = self.tx.get(&branch_key)? { return Ok(Some(value)); }
// Walk parent chain for (parent_id, parent_snapshot) in &self.parent_chain { let parent_key = encode_branch_data_key( *parent_id, &String::from_utf8_lossy(key), *parent_snapshot, );
if let Some(value) = self.tx.get(&parent_key)? { return Ok(Some(value)); } }
Ok(None) }
/// Put value with copy-on-write pub fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> { let branch_key = encode_branch_data_key( self.branch_id, &String::from_utf8_lossy(&key), self.tx.snapshot_id(), );
self.tx.put(branch_key, value) }}7. Branch Isolation
7.1 Isolation Guarantees
Each branch provides snapshot isolation combined with branch isolation:
Isolation Properties:1. Read Isolation: Reads in branch A don't see uncommitted changes in branch B2. Write Isolation: Writes in branch A don't affect branch B3. MVCC Snapshots: Transactions see consistent snapshot within a branch4. Parent Visibility: Child branches see parent state at branch point5. No Phantoms: New branches don't affect existing transactions7.2 Visibility Rules
/// Check if a version is visible to a branch transactionfn is_version_visible( version_branch: BranchId, version_timestamp: Timestamp, tx_branch: BranchId, tx_snapshot: SnapshotId, branch_hierarchy: &[(BranchId, SnapshotId)],) -> bool { // 1. Same branch: standard MVCC visibility if version_branch == tx_branch { return version_timestamp <= tx_snapshot; }
// 2. Check if version is in parent chain for (parent_id, parent_snapshot) in branch_hierarchy { if version_branch == *parent_id { // Version must be before parent snapshot return version_timestamp <= *parent_snapshot; } }
// 3. Version is in unrelated branch - not visible false}7.3 Isolation Example
-- Time T0: Create branchesCREATE DATABASE BRANCH dev FROM main AS OF NOW;CREATE DATABASE BRANCH staging FROM main AS OF NOW;
-- Time T1: Insert in mainINSERT INTO users (name) VALUES ('Alice'); -- main only
-- Time T2: Switch to devSET branch = dev;SELECT * FROM users; -- Empty (dev branched before insert)
-- Time T3: Insert in devINSERT INTO users (name) VALUES ('Bob'); -- dev only
-- Time T4: Switch to stagingSET branch = staging;SELECT * FROM users; -- Empty (isolated from dev)
-- Time T5: Switch to mainSET branch = main;SELECT * FROM users; -- Returns 'Alice' (isolated from dev)7.4 Concurrent Operations
Multiple transactions can operate on different branches concurrently:
Timeline:T0: TxA begins on mainT1: TxB begins on devT2: TxA writes key1='v1' on mainT3: TxB writes key1='v2' on devT4: TxA commits (main:key1='v1')T5: TxB commits (dev:key1='v2')
Result:main: key1='v1' (TxA's write)dev: key1='v2' (TxB's write)
No conflict - branches are isolated8. Merge Strategies
8.1 Three-Way Merge Algorithm
/// Perform three-way mergefn perform_three_way_merge( storage: &StorageEngine, base: SnapshotId, source_branch: BranchId, target_branch: BranchId, options: &MergeOptions,) -> Result<Vec<MergeConflict>> { let mut conflicts = Vec::new();
// 1. Find all keys modified in source since base let source_changes = find_changes_since(storage, source_branch, base)?;
// 2. Find all keys modified in target since base let target_changes = find_changes_since(storage, target_branch, base)?;
// 3. For each changed key, determine merge action let all_keys: HashSet<_> = source_changes.keys() .chain(target_changes.keys()) .collect();
for key in all_keys { let base_value = get_at_snapshot(storage, key, base)?; let source_value = source_changes.get(key); let target_value = target_changes.get(key);
match (source_value, target_value) { // Only source changed - apply source (Some(sv), None) => { apply_to_target(storage, target_branch, key, sv.clone())?; }
// Only target changed - keep target (already applied) (None, Some(_)) => { // No action needed }
// Both changed - potential conflict (Some(sv), Some(tv)) => { if sv == tv { // Same change - no conflict continue; }
// Conflict detected conflicts.push(MergeConflict { key: key.clone(), base_value: base_value.clone(), source_value: sv.clone(), target_value: tv.clone(), });
// Apply resolution strategy match options.conflict_resolution { ConflictResolution::BranchWins => { apply_to_target(storage, target_branch, key, sv.clone())?; } ConflictResolution::TargetWins => { // Keep target value (no action) } ConflictResolution::Fail => { // Conflicts will be returned } } }
// Neither changed - no action (None, None) => {} } }
Ok(conflicts)}8.2 Merge Conflict Representation
/// Merge conflict information#[derive(Debug, Clone)]pub struct MergeConflict { /// Conflicting key pub key: String,
/// Value at merge base pub base_value: Option<Vec<u8>>,
/// Value in source branch pub source_value: Option<Vec<u8>>,
/// Value in target branch pub target_value: Option<Vec<u8>>,}
/// Merge result#[derive(Debug)]pub struct MergeResult { /// Number of conflicts detected pub conflicts_count: usize,
/// Number of keys merged pub keys_merged: usize,
/// Resolution strategy used pub resolution: ConflictResolution,}
/// Merge options#[derive(Debug, Clone)]pub struct MergeOptions { /// Conflict resolution strategy pub conflict_resolution: ConflictResolution,
/// Delete source branch after merge pub delete_branch_after: bool,}8.3 Conflict Resolution Strategies
#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum ConflictResolution { /// Source branch wins (default) BranchWins,
/// Target branch wins TargetWins,
/// Fail on conflict (manual resolution required) Fail,}8.4 Finding Merge Base
/// Find common ancestor (merge base) of two branchesfn find_merge_base( storage: &StorageEngine, branch_a: BranchId, branch_b: BranchId,) -> Result<SnapshotId> { // Build ancestry chains let ancestors_a = build_ancestry_chain(storage, branch_a)?; let ancestors_b = build_ancestry_chain(storage, branch_b)?;
// Find lowest common ancestor (LCA) for (id_a, snapshot_a) in &ancestors_a { for (id_b, snapshot_b) in &ancestors_b { if id_a == id_b { // Found common ancestor // Use earlier snapshot as merge base return Ok(std::cmp::min(*snapshot_a, *snapshot_b)); } } }
// No common ancestor - use epoch (snapshot 0) Ok(0)}
/// Build ancestry chain from branch to rootfn build_ancestry_chain( storage: &StorageEngine, branch_id: BranchId,) -> Result<Vec<(BranchId, SnapshotId)>> { let mut chain = Vec::new(); let mut current_id = branch_id;
loop { let metadata = get_branch_metadata(storage, current_id)?; chain.push((current_id, metadata.created_from_snapshot));
match metadata.parent_id { Some(parent) => current_id = parent, None => break, // Reached root } }
Ok(chain)}9. Garbage Collection
9.1 GC Strategy
Garbage collection removes unreachable versions:
GC Targets:1. Dropped branches (soft-deleted)2. Merged branches (optionally)3. Old versions no longer visible to any branch4. Compacted MVCC versions9.2 Reference Counting
Track references to each version:
/// Version reference tracking#[derive(Debug, Clone, Serialize, Deserialize)]pub struct VersionRefs { /// Version key pub key: String,
/// Branches referencing this version pub branches: HashSet<BranchId>,
/// Active snapshots referencing this version pub snapshots: HashSet<SnapshotId>,}
/// Check if version is reachablefn is_version_reachable( storage: &StorageEngine, version_key: &str,) -> Result<bool> { let refs_key = format!("gc:refs:{}", version_key); let refs_data = storage.get(refs_key.as_bytes())?;
match refs_data { None => Ok(false), // No refs = unreachable Some(data) => { let refs: VersionRefs = bincode::deserialize(&data)?;
// Check if any referencing branch is still active for branch_id in &refs.branches { let meta = get_branch_metadata(storage, *branch_id)?; if meta.state == BranchState::Active { return Ok(true); // Reachable } }
// Check if any snapshot is still active // TODO: Track active snapshots
Ok(false) // Unreachable } }}9.3 GC Process
/// Run garbage collectionpub fn run_gc(storage: &StorageEngine) -> Result<GcStats> { let mut stats = GcStats::default();
// 1. Find all dropped branches let dropped_branches = find_dropped_branches(storage)?;
for branch_id in dropped_branches { // 2. Find all versions owned by this branch let versions = find_branch_versions(storage, branch_id)?;
for version_key in versions { // 3. Check if version is reachable from other branches if !is_version_reachable(storage, &version_key)? { // 4. Delete unreachable version storage.delete(version_key.as_bytes())?; stats.versions_deleted += 1; stats.bytes_freed += version_key.len() as u64; } }
// 5. Delete branch metadata delete_branch_metadata(storage, branch_id)?; stats.branches_deleted += 1; }
// 6. Compact RocksDB to reclaim space storage.db.compact_range(None::<&[u8]>, None::<&[u8]>);
Ok(stats)}
#[derive(Debug, Default)]pub struct GcStats { pub branches_deleted: usize, pub versions_deleted: usize, pub bytes_freed: u64,}9.4 GC Scheduling
/// Background GC configurationpub struct GcConfig { /// Enable automatic GC pub enabled: bool,
/// GC interval (seconds) pub interval_secs: u64,
/// Minimum age before GC (seconds) pub min_age_secs: u64,}
impl Default for GcConfig { fn default() -> Self { Self { enabled: true, interval_secs: 3600, // 1 hour min_age_secs: 86400, // 24 hours } }}
/// Start background GC threadpub fn start_gc_thread( storage: Arc<StorageEngine>, config: GcConfig,) -> std::thread::JoinHandle<()> { std::thread::spawn(move || { loop { std::thread::sleep(Duration::from_secs(config.interval_secs));
if config.enabled { match run_gc(&storage) { Ok(stats) => { log::info!("GC completed: {:?}", stats); } Err(e) => { log::error!("GC failed: {}", e); } } } } })}10. MVCC Integration
10.1 Branch-Aware MVCC
Branches integrate with existing MVCC by extending snapshot visibility:
/// Extended snapshot for branch-aware transactions#[derive(Debug, Clone)]pub struct BranchSnapshot { /// Base MVCC snapshot pub base_snapshot: Snapshot,
/// Branch context pub branch_id: BranchId,
/// Parent chain (for visibility checks) pub parent_chain: Vec<(BranchId, SnapshotId)>,}
impl BranchSnapshot { /// Check if version is visible pub fn is_visible( &self, version_branch: BranchId, version_timestamp: u64, ) -> bool { // Current branch: standard MVCC if version_branch == self.branch_id { return self.base_snapshot.is_visible(version_timestamp); }
// Parent chain: check against parent snapshot for (parent_id, parent_snapshot) in &self.parent_chain { if version_branch == *parent_id { return version_timestamp <= *parent_snapshot; } }
false }}10.2 Transaction Isolation Levels
Branches maintain standard isolation levels:
#[derive(Debug, Clone, Copy)]pub enum IsolationLevel { /// Read committed (not supported yet) ReadCommitted,
/// Snapshot isolation (default) SnapshotIsolation,
/// Serializable (not supported yet) Serializable,}
impl BranchTransaction { /// Begin with specific isolation level pub fn with_isolation_level( storage: &StorageEngine, branch_name: &str, level: IsolationLevel, ) -> Result<Self> { match level { IsolationLevel::SnapshotIsolation => { storage.begin_branch_transaction(branch_name) } _ => { Err(Error::unsupported_isolation_level(level)) } } }}10.3 Timestamp Allocation
Branch transactions use global timestamp ordering:
impl StorageEngine { /// Get next timestamp (atomic increment) pub fn next_timestamp(&self) -> Timestamp { let mut ts = self.timestamp.write(); *ts += 1; *ts }
/// Get current timestamp (read-only) pub fn current_timestamp(&self) -> Timestamp { *self.timestamp.read() }}10.4 Commit Protocol
impl BranchTransaction { /// Commit with validation pub fn commit(mut self) -> Result<()> { // 1. Validate no conflicts with concurrent commits self.validate_no_conflicts()?;
// 2. Assign commit timestamp let commit_ts = self.storage.next_timestamp();
// 3. Write all buffered changes with commit timestamp for (key, value) in &self.write_set { let versioned_key = encode_branch_data_key( self.branch_id, &String::from_utf8_lossy(key), commit_ts, );
self.tx.put(versioned_key, value.clone())?; }
// 4. Commit underlying transaction self.tx.commit()?;
// 5. Update branch statistics self.update_branch_stats()?;
Ok(()) }
fn validate_no_conflicts(&self) -> Result<()> { // Check for write-write conflicts // (Simplified - full implementation would track write sets) Ok(()) }}11. Performance Considerations
11.1 Performance Targets
Operation Target Latency Target Throughput────────────────────────────────────────────────────────────────CREATE BRANCH < 10ms 1000 ops/secDROP BRANCH < 50ms 200 ops/secMERGE BRANCH (small) < 100ms 10 ops/secMERGE BRANCH (large) < 10s 1 op/10sec
Branch Read (hit) < 0.1ms overhead Same as non-branchBranch Read (parent) < 0.5ms overhead 90% of non-branchBranch Write (CoW) < 0.2ms overhead 95% of non-branch
Storage Overhead < 2% per branch -Memory Overhead < 100KB per branch -11.2 Optimization Techniques
11.2.1 Metadata Caching
/// Branch metadata cachepub struct BranchMetadataCache { /// LRU cache of branch metadata cache: Arc<RwLock<LruCache<BranchId, BranchMetadata>>>,
/// Cache size capacity: usize,}