Time-Travel Debugging Examples
Time-Travel Debugging Examples
Quick Start
Basic Setup
use heliosdb_storage::temporal::{ TemporalEngine, TemporalTable, TemporalQuery, RetentionPolicy};use chrono::{Utc, Duration};use bytes::Bytes;use std::collections::HashMap;
// Create temporal enginelet engine = TemporalEngine::new("./data/temporal").await?;
// Create temporal tablelet table = TemporalTable::new("orders".to_string(), None, None);engine.create_temporal_table(table).await?;Example 1: Basic Time Travel
// Insert initial orderlet mut columns = HashMap::new();columns.insert("customer".to_string(), Bytes::from("John Doe"));columns.insert("amount".to_string(), Bytes::from("100.00"));columns.insert("status".to_string(), Bytes::from("pending"));
let row = Row::new(Bytes::from("order_123"), columns);engine.insert("orders", row).await?;
// Save timestamplet timestamp_v1 = Utc::now();
// Update order statuslet mut columns = HashMap::new();columns.insert("customer".to_string(), Bytes::from("John Doe"));columns.insert("amount".to_string(), Bytes::from("100.00"));columns.insert("status".to_string(), Bytes::from("shipped"));
let row = Row::new(Bytes::from("order_123"), columns);engine.update("orders", &Bytes::from("order_123"), row).await?;
// Query as of first timestamp - see original statelet results = engine.query( "orders", TemporalQuery::KeyAsOf(Bytes::from("order_123"), timestamp_v1)).await?;
assert_eq!( results[0].columns.get("status").unwrap(), &Bytes::from("pending"));Example 2: Audit Trail
// Track all changes to an account balancelet account_key = Bytes::from("account_456");
// Initial depositlet mut columns = HashMap::new();columns.insert("balance".to_string(), Bytes::from("1000.00"));columns.insert("last_change".to_string(), Bytes::from("deposit"));
let row = Row::new(account_key.clone(), columns);engine.insert("accounts", row).await?;
// Withdrawallet mut columns = HashMap::new();columns.insert("balance".to_string(), Bytes::from("750.00"));columns.insert("last_change".to_string(), Bytes::from("withdrawal"));
let row = Row::new(account_key.clone(), columns);engine.update("accounts", &account_key, row).await?;
// Another withdrawallet mut columns = HashMap::new();columns.insert("balance".to_string(), Bytes::from("500.00"));columns.insert("last_change".to_string(), Bytes::from("withdrawal"));
let row = Row::new(account_key.clone(), columns);engine.update("accounts", &account_key, row).await?;
// Get complete historylet history = engine.query( "accounts", TemporalQuery::KeyHistory(account_key)).await?;
// history.len() == 3 (all versions)for (i, version) in history.iter().enumerate() { println!( "Version {}: balance={}, change={}", i + 1, String::from_utf8_lossy(version.columns.get("balance").unwrap()), String::from_utf8_lossy(version.columns.get("last_change").unwrap()) );}Example 3: Point-in-Time Recovery
// Simulate data corruption at current timelet corrupt_time = Utc::now();
// Recover to state from 1 hour agolet recovery_point = corrupt_time - Duration::hours(1);
// Query all data as of recovery pointlet recovered_data = engine.query( "critical_data", TemporalQuery::AsOf(recovery_point)).await?;
// Restore each rowfor row in recovered_data { engine.update("critical_data", &row.key, row).await?;}
println!("Recovery complete: restored {} rows", recovered_data.len());Example 4: Change Tracking
// Track changes in a specific time windowlet start_of_day = Utc::now().date().and_hms(0, 0, 0);let end_of_day = start_of_day + Duration::days(1);
// Get all versions of a record during the daylet changes = engine.query( "inventory", TemporalQuery::KeyBetween( Bytes::from("product_789"), start_of_day, end_of_day )).await?;
println!("Product had {} changes today:", changes.len());for change in changes { println!( "At {}: quantity = {}", change.valid_from, String::from_utf8_lossy(change.columns.get("quantity").unwrap()) );}Example 5: Flashback Queries (Oracle-Compatible)
// Record SCN for transactionlet scn = engine.allocate_scn();let timestamp = Utc::now();engine.record_scn(scn, timestamp, Some(transaction_id))?;
// Perform transaction// ... (inserts/updates)
// Later, query using SCNlet query = FlashbackQuery::AsOfScn { scn, table_name: "transactions".to_string(), filter_key: None,};
let result = engine.execute_flashback("transactions", query).await?;println!("Found {} rows at SCN {}", result.rows.len(), scn);
// Query version history between two SCNslet start_scn = 10000;let end_scn = 20000;
let query = FlashbackQuery::VersionsBetweenScn { start_scn, end_scn, table_name: "transactions".to_string(), filter_key: Some(Bytes::from("txn_key")),};
let versions = engine.execute_versions("transactions", query).await?;for version in versions.versions { println!( "SCN {}: operation={}", version.version_start_scn, version.version_operation.code() // "I", "U", or "D" );}Example 6: Retention Management
// Create table with automatic cleanuplet retention = RetentionPolicy::new( Duration::days(30), // Keep 30 days Duration::hours(6), // Cleanup every 6 hours).with_size_limit(10_000_000_000) // 10GB max.with_entry_limit(10_000_000) // 10M entries max.with_cleanup_strategy(CleanupStrategy::TimeBasedOldest);
let table = TemporalTable::new("logs".to_string(), None, None) .with_retention_policy(retention);
engine.create_temporal_table(table).await?;
// Manual cleanup if neededlet stats_before = engine.get_stats("logs").await?;println!("Before cleanup: {} history entries", stats_before.history_rows);
let deleted = engine.cleanup_history("logs").await?;println!("Deleted {} old entries", deleted);
let stats_after = engine.get_stats("logs").await?;println!("After cleanup: {} history entries", stats_after.history_rows);Example 7: Debugging Transaction Issues
// Developer reports a bug: transaction 42 corrupted datalet transaction_id = 42;
// Use flashback transaction query to see all changeslet query = FlashbackQuery::FlashbackTransaction { transaction_id, table_name: "users".to_string(),};
let result = engine.execute_versions("users", query).await?;
println!("Transaction {} made {} changes:", transaction_id, result.versions.len());for version in result.versions { println!( " {} {} at {}", version.version_operation.code(), String::from_utf8_lossy(&version.row.key), version.version_start_time );
// Inspect changes for (col, val) in &version.row.columns { println!(" {}: {:?}", col, val); }}Example 8: Temporal Analytics
// Analyze user activity over the past weeklet week_ago = Utc::now() - Duration::weeks(1);let now = Utc::now();
// Get all user state changes in the past weeklet changes = engine.query( "user_sessions", TemporalQuery::Between(week_ago, now)).await?;
// Group by daylet mut daily_activity = HashMap::new();for change in changes { let day = change.valid_from.date(); *daily_activity.entry(day).or_insert(0) += 1;}
println!("Daily activity for past week:");for (day, count) in daily_activity { println!(" {}: {} changes", day, count);}Example 9: Concurrent Time Travel
use tokio::task;
// Multiple users querying different timestamps concurrentlylet handles: Vec<_> = (0..10) .map(|i| { let engine = engine.clone(); let timestamp = Utc::now() - Duration::minutes(i * 10);
task::spawn(async move { let results = engine.query( "shared_data", TemporalQuery::AsOf(timestamp) ).await?;
Ok::<_, HeliosError>((i, results.len())) }) }) .collect();
// Wait for all queriesfor handle in handles { let (user, count) = handle.await??; println!("User {} saw {} rows at their timestamp", user, count);}Example 10: Range Queries
// FROM query: Get all changes from last week onwardslet last_week = Utc::now() - Duration::weeks(1);let recent_changes = engine.query( "products", TemporalQuery::From(last_week)).await?;
println!("Recent changes: {}", recent_changes.len());
// TO query: Get all historical data up to yesterdaylet yesterday = Utc::now() - Duration::days(1);let historical_data = engine.query( "products", TemporalQuery::To(yesterday)).await?;
println!("Historical snapshot: {}", historical_data.len());
// BETWEEN query: Get changes in specific windowlet start = Utc::now() - Duration::hours(24);let end = Utc::now() - Duration::hours(12);let window_changes = engine.query( "products", TemporalQuery::Between(start, end)).await?;
println!("Changes in 12-hour window: {}", window_changes.len());Performance Tips
1. Use Key-Specific Queries
// Good: Query specific keylet result = engine.query( "large_table", TemporalQuery::KeyAsOf(key, timestamp)).await?;
// Avoid: Full table temporal scanlet result = engine.query( "large_table", TemporalQuery::AsOf(timestamp)).await?;2. Batch Updates
// Use batch operations for better throughputlet entries: Vec<VersionEntry> = /* ... */;version_control.insert_batch("table", entries).await?;3. Set Appropriate Retention
// Balance storage vs historical depthlet retention = RetentionPolicy::new( Duration::days(7), // Adjust based on needs Duration::hours(1), // Frequent cleanup for active tables);4. Use SCN for Exact Snapshots
// SCN is faster than timestamp for repeated querieslet scn = engine.current_scn();engine.record_scn(scn, Utc::now(), None)?;
// Later queries use cached SCN mappinglet query = FlashbackQuery::AsOfScn { scn, ... };Error Handling
use heliosdb_common::HeliosError;
match engine.query("table", TemporalQuery::AsOf(timestamp)).await { Ok(results) => { // Process results } Err(HeliosError::KeyNotFound(msg)) => { // Key doesn't exist at that timestamp println!("Not found: {}", msg); } Err(HeliosError::Storage(msg)) => { // Storage layer error eprintln!("Storage error: {}", msg); } Err(e) => { eprintln!("Unexpected error: {}", e); }}Testing Your Implementation
#[tokio::test]async fn test_time_travel() { let engine = TemporalEngine::new("./test_data").await.unwrap();
// Your test logic here
assert!(/* your assertions */);}