Skip to content

Time-Travel Debugging Examples

Time-Travel Debugging Examples

Quick Start

Basic Setup

use heliosdb_storage::temporal::{
TemporalEngine, TemporalTable, TemporalQuery, RetentionPolicy
};
use chrono::{Utc, Duration};
use bytes::Bytes;
use std::collections::HashMap;
// Create temporal engine
let engine = TemporalEngine::new("./data/temporal").await?;
// Create temporal table
let table = TemporalTable::new("orders".to_string(), None, None);
engine.create_temporal_table(table).await?;

Example 1: Basic Time Travel

// Insert initial order
let mut columns = HashMap::new();
columns.insert("customer".to_string(), Bytes::from("John Doe"));
columns.insert("amount".to_string(), Bytes::from("100.00"));
columns.insert("status".to_string(), Bytes::from("pending"));
let row = Row::new(Bytes::from("order_123"), columns);
engine.insert("orders", row).await?;
// Save timestamp
let timestamp_v1 = Utc::now();
// Update order status
let mut columns = HashMap::new();
columns.insert("customer".to_string(), Bytes::from("John Doe"));
columns.insert("amount".to_string(), Bytes::from("100.00"));
columns.insert("status".to_string(), Bytes::from("shipped"));
let row = Row::new(Bytes::from("order_123"), columns);
engine.update("orders", &Bytes::from("order_123"), row).await?;
// Query as of first timestamp - see original state
let results = engine.query(
"orders",
TemporalQuery::KeyAsOf(Bytes::from("order_123"), timestamp_v1)
).await?;
assert_eq!(
results[0].columns.get("status").unwrap(),
&Bytes::from("pending")
);

Example 2: Audit Trail

// Track all changes to an account balance
let account_key = Bytes::from("account_456");
// Initial deposit
let mut columns = HashMap::new();
columns.insert("balance".to_string(), Bytes::from("1000.00"));
columns.insert("last_change".to_string(), Bytes::from("deposit"));
let row = Row::new(account_key.clone(), columns);
engine.insert("accounts", row).await?;
// Withdrawal
let mut columns = HashMap::new();
columns.insert("balance".to_string(), Bytes::from("750.00"));
columns.insert("last_change".to_string(), Bytes::from("withdrawal"));
let row = Row::new(account_key.clone(), columns);
engine.update("accounts", &account_key, row).await?;
// Another withdrawal
let mut columns = HashMap::new();
columns.insert("balance".to_string(), Bytes::from("500.00"));
columns.insert("last_change".to_string(), Bytes::from("withdrawal"));
let row = Row::new(account_key.clone(), columns);
engine.update("accounts", &account_key, row).await?;
// Get complete history
let history = engine.query(
"accounts",
TemporalQuery::KeyHistory(account_key)
).await?;
// history.len() == 3 (all versions)
for (i, version) in history.iter().enumerate() {
println!(
"Version {}: balance={}, change={}",
i + 1,
String::from_utf8_lossy(version.columns.get("balance").unwrap()),
String::from_utf8_lossy(version.columns.get("last_change").unwrap())
);
}

Example 3: Point-in-Time Recovery

// Simulate data corruption at current time
let corrupt_time = Utc::now();
// Recover to state from 1 hour ago
let recovery_point = corrupt_time - Duration::hours(1);
// Query all data as of recovery point
let recovered_data = engine.query(
"critical_data",
TemporalQuery::AsOf(recovery_point)
).await?;
// Restore each row
for row in recovered_data {
engine.update("critical_data", &row.key, row).await?;
}
println!("Recovery complete: restored {} rows", recovered_data.len());

Example 4: Change Tracking

// Track changes in a specific time window
let start_of_day = Utc::now().date().and_hms(0, 0, 0);
let end_of_day = start_of_day + Duration::days(1);
// Get all versions of a record during the day
let changes = engine.query(
"inventory",
TemporalQuery::KeyBetween(
Bytes::from("product_789"),
start_of_day,
end_of_day
)
).await?;
println!("Product had {} changes today:", changes.len());
for change in changes {
println!(
"At {}: quantity = {}",
change.valid_from,
String::from_utf8_lossy(change.columns.get("quantity").unwrap())
);
}

Example 5: Flashback Queries (Oracle-Compatible)

// Record SCN for transaction
let scn = engine.allocate_scn();
let timestamp = Utc::now();
engine.record_scn(scn, timestamp, Some(transaction_id))?;
// Perform transaction
// ... (inserts/updates)
// Later, query using SCN
let query = FlashbackQuery::AsOfScn {
scn,
table_name: "transactions".to_string(),
filter_key: None,
};
let result = engine.execute_flashback("transactions", query).await?;
println!("Found {} rows at SCN {}", result.rows.len(), scn);
// Query version history between two SCNs
let start_scn = 10000;
let end_scn = 20000;
let query = FlashbackQuery::VersionsBetweenScn {
start_scn,
end_scn,
table_name: "transactions".to_string(),
filter_key: Some(Bytes::from("txn_key")),
};
let versions = engine.execute_versions("transactions", query).await?;
for version in versions.versions {
println!(
"SCN {}: operation={}",
version.version_start_scn,
version.version_operation.code() // "I", "U", or "D"
);
}

Example 6: Retention Management

// Create table with automatic cleanup
let retention = RetentionPolicy::new(
Duration::days(30), // Keep 30 days
Duration::hours(6), // Cleanup every 6 hours
)
.with_size_limit(10_000_000_000) // 10GB max
.with_entry_limit(10_000_000) // 10M entries max
.with_cleanup_strategy(CleanupStrategy::TimeBasedOldest);
let table = TemporalTable::new("logs".to_string(), None, None)
.with_retention_policy(retention);
engine.create_temporal_table(table).await?;
// Manual cleanup if needed
let stats_before = engine.get_stats("logs").await?;
println!("Before cleanup: {} history entries", stats_before.history_rows);
let deleted = engine.cleanup_history("logs").await?;
println!("Deleted {} old entries", deleted);
let stats_after = engine.get_stats("logs").await?;
println!("After cleanup: {} history entries", stats_after.history_rows);

Example 7: Debugging Transaction Issues

// Developer reports a bug: transaction 42 corrupted data
let transaction_id = 42;
// Use flashback transaction query to see all changes
let query = FlashbackQuery::FlashbackTransaction {
transaction_id,
table_name: "users".to_string(),
};
let result = engine.execute_versions("users", query).await?;
println!("Transaction {} made {} changes:", transaction_id, result.versions.len());
for version in result.versions {
println!(
" {} {} at {}",
version.version_operation.code(),
String::from_utf8_lossy(&version.row.key),
version.version_start_time
);
// Inspect changes
for (col, val) in &version.row.columns {
println!(" {}: {:?}", col, val);
}
}

Example 8: Temporal Analytics

// Analyze user activity over the past week
let week_ago = Utc::now() - Duration::weeks(1);
let now = Utc::now();
// Get all user state changes in the past week
let changes = engine.query(
"user_sessions",
TemporalQuery::Between(week_ago, now)
).await?;
// Group by day
let mut daily_activity = HashMap::new();
for change in changes {
let day = change.valid_from.date();
*daily_activity.entry(day).or_insert(0) += 1;
}
println!("Daily activity for past week:");
for (day, count) in daily_activity {
println!(" {}: {} changes", day, count);
}

Example 9: Concurrent Time Travel

use tokio::task;
// Multiple users querying different timestamps concurrently
let handles: Vec<_> = (0..10)
.map(|i| {
let engine = engine.clone();
let timestamp = Utc::now() - Duration::minutes(i * 10);
task::spawn(async move {
let results = engine.query(
"shared_data",
TemporalQuery::AsOf(timestamp)
).await?;
Ok::<_, HeliosError>((i, results.len()))
})
})
.collect();
// Wait for all queries
for handle in handles {
let (user, count) = handle.await??;
println!("User {} saw {} rows at their timestamp", user, count);
}

Example 10: Range Queries

// FROM query: Get all changes from last week onwards
let last_week = Utc::now() - Duration::weeks(1);
let recent_changes = engine.query(
"products",
TemporalQuery::From(last_week)
).await?;
println!("Recent changes: {}", recent_changes.len());
// TO query: Get all historical data up to yesterday
let yesterday = Utc::now() - Duration::days(1);
let historical_data = engine.query(
"products",
TemporalQuery::To(yesterday)
).await?;
println!("Historical snapshot: {}", historical_data.len());
// BETWEEN query: Get changes in specific window
let start = Utc::now() - Duration::hours(24);
let end = Utc::now() - Duration::hours(12);
let window_changes = engine.query(
"products",
TemporalQuery::Between(start, end)
).await?;
println!("Changes in 12-hour window: {}", window_changes.len());

Performance Tips

1. Use Key-Specific Queries

// Good: Query specific key
let result = engine.query(
"large_table",
TemporalQuery::KeyAsOf(key, timestamp)
).await?;
// Avoid: Full table temporal scan
let result = engine.query(
"large_table",
TemporalQuery::AsOf(timestamp)
).await?;

2. Batch Updates

// Use batch operations for better throughput
let entries: Vec<VersionEntry> = /* ... */;
version_control.insert_batch("table", entries).await?;

3. Set Appropriate Retention

// Balance storage vs historical depth
let retention = RetentionPolicy::new(
Duration::days(7), // Adjust based on needs
Duration::hours(1), // Frequent cleanup for active tables
);

4. Use SCN for Exact Snapshots

// SCN is faster than timestamp for repeated queries
let scn = engine.current_scn();
engine.record_scn(scn, Utc::now(), None)?;
// Later queries use cached SCN mapping
let query = FlashbackQuery::AsOfScn { scn, ... };

Error Handling

use heliosdb_common::HeliosError;
match engine.query("table", TemporalQuery::AsOf(timestamp)).await {
Ok(results) => {
// Process results
}
Err(HeliosError::KeyNotFound(msg)) => {
// Key doesn't exist at that timestamp
println!("Not found: {}", msg);
}
Err(HeliosError::Storage(msg)) => {
// Storage layer error
eprintln!("Storage error: {}", msg);
}
Err(e) => {
eprintln!("Unexpected error: {}", e);
}
}

Testing Your Implementation

#[tokio::test]
async fn test_time_travel() {
let engine = TemporalEngine::new("./test_data").await.unwrap();
// Your test logic here
assert!(/* your assertions */);
}