HeliosDB Audit Logging
HeliosDB Audit Logging
Production-grade audit logging system for HeliosDB with tamper-proof event tracking, compliance reporting, and flexible querying.
Features
- Comprehensive Event Tracking: Log queries, access control events, schema changes, and system events
- Tamper-Proof Logging: Blockchain-style cryptographic hash chains detect any tampering
- Persistent Storage: RocksDB-based storage with efficient indexing by user, table, and timestamp
- Flexible Querying: Filter events by user, table, time range, or event type
- Export Capabilities: Export to JSON, CSV, JSON Lines, or compressed formats
- Retention Policies: Automatic log rotation and archival based on configurable retention periods
- High Performance: Asynchronous logging with buffered writes, targeting >10K events/sec
- Compliance Ready: Generate compliance reports and track access patterns
Installation
Add to your Cargo.toml:
[dependencies]heliosdb-audit = { path = "../heliosdb-audit" }Quick Start
use heliosdb_audit::*;
#[tokio::main]async fn main() -> Result<()> { // Create audit logger let config = AuditConfig::new("./audit_logs") .retention_days(90) .enable_chain_verification(true);
let logger = AuditLogger::new(config).await?;
// Log a query event let event = AuditEvent::builder( EventType::Query(QueryType::Select), "user_123" ) .query("SELECT * FROM users WHERE age > 18") .table("users") .affected_rows(150) .duration_ms(42) .ip_address("192.168.1.100") .build();
logger.log_event(event).await?;
// Query audit logs let query = AuditQuery::new() .user_id("user_123") .limit(100);
let events = logger.query(query).await?; println!("Found {} events", events.len());
// Verify chain integrity logger.verify_chain().await?;
Ok(())}Event Types
Query Events
Track database queries and operations:
// SELECT querylet event = AuditEvent::builder(EventType::Query(QueryType::Select), "alice") .query("SELECT * FROM users") .table("users") .affected_rows(10) .build();
// INSERT operationlet event = AuditEvent::builder(EventType::Query(QueryType::Insert), "bob") .query("INSERT INTO orders VALUES (...)") .table("orders") .affected_rows(1) .build();Access Control Events
Track authentication and authorization:
// Successful loginlet event = AuditEvent::builder(EventType::Access(AccessType::Login), "alice") .ip_address("192.168.1.100") .metadata("session_id", "sess_abc123") .build();
// Failed authenticationlet event = AuditEvent::builder(EventType::Access(AccessType::FailedAuth), "unknown") .ip_address("203.0.113.42") .status(EventStatus::Failed) .build();
// Permission deniedlet event = AuditEvent::new( EventType::Access(AccessType::PermissionDenied), "user", EventStatus::Denied);Schema Changes
Track DDL operations:
let event = AuditEvent::builder(EventType::Schema(SchemaType::CreateTable), "admin") .query("CREATE TABLE products (id INT, name VARCHAR(255))") .table("products") .build();
let event = AuditEvent::builder(EventType::Schema(SchemaType::AlterTable), "admin") .query("ALTER TABLE users ADD COLUMN email VARCHAR(255)") .table("users") .build();System Events
Track system-level operations:
let event = AuditEvent::new( EventType::System(SystemType::Startup), "system", EventStatus::Success);
let event = AuditEvent::builder(EventType::System(SystemType::Backup), "admin") .metadata("backup_id", "backup_20250112") .build();Querying Audit Logs
Query by User
let events = logger.get_events_by_user("alice", Some(100)).await?;Query by Table
let events = logger.get_events_by_table("users", Some(100)).await?;Query by Time Range
use chrono::{Duration, Utc};
let start = Utc::now() - Duration::days(7);let end = Utc::now();
let events = logger.get_events_by_time_range(start, end).await?;Advanced Queries
let query = AuditQuery::new() .user_id("alice") .table("users") .start_time(Utc::now() - Duration::days(30)) .limit(1000);
let events = logger.query(query).await?;Chain Verification
Verify the integrity of the audit chain to detect tampering:
match logger.verify_chain().await { Ok(true) => println!("Audit chain is valid"), Ok(false) => println!("WARNING: Audit chain has been tampered with!"), Err(e) => println!("Error verifying chain: {}", e),}The audit system uses SHA-256 cryptographic hashing to create a blockchain-style chain where each event includes the hash of the previous event. This makes it virtually impossible to modify historical events without detection.
Export and Compliance
Export to Different Formats
let query = AuditQuery::new().limit(usize::MAX);let events = logger.query(query).await?;
// Export to JSONAuditExporter::export_to_file(&events, "audit.json", ExportFormat::Json)?;
// Export to CSVAuditExporter::export_to_file(&events, "audit.csv", ExportFormat::Csv)?;
// Export to compressed JSONAuditExporter::export_to_file(&events, "audit.json.gz", ExportFormat::JsonGz)?;
// Export to JSON Lines (one event per line)AuditExporter::export_to_file(&events, "audit.jsonl", ExportFormat::JsonLines)?;Generate Compliance Reports
let report = AuditExporter::generate_compliance_report(&events);
println!("Total Events: {}", report.total_events);println!("Unique Users: {}", report.unique_users);println!("Failed Events: {}", report.failed_events);println!("Denied Events: {}", report.denied_events);
// Print formatted reportprintln!("{}", report.to_summary_string());Filter Events
// Filter by userlet alice_events = AuditExporter::filter_by_user(&events, "alice");
// Filter by tablelet users_events = AuditExporter::filter_by_table(&events, "users");
// Filter by time rangelet recent_events = AuditExporter::filter_by_time_range( &events, Utc::now() - Duration::days(7), Utc::now());Retention and Rotation
Configure automatic log rotation:
let config = AuditConfig::new("./audit_logs") .retention_days(90) // Keep logs for 90 days .enable_rotation(true);
let logger = AuditLogger::new(config).await?;
// Manually trigger rotationlet deleted_count = logger.rotate_logs().await?;println!("Deleted {} old events", deleted_count);Statistics
Get audit log statistics:
let stats = logger.get_statistics().await?;
println!("Total Events: {}", stats.total_events);println!("Last Event Time: {:?}", stats.last_event_time);println!("Last Event User: {:?}", stats.last_event_user);println!("Retention Period: {} days", stats.retention_days);println!("Storage Path: {}", stats.storage_path);Configuration
let config = AuditConfig::new("./audit_logs") .buffer_size(100) // Number of events to buffer before flush .retention_days(365) // Keep logs for 1 year .enable_chain_verification(true) // Enable tamper detection .enable_rotation(true); // Enable automatic log rotation
let logger = AuditLogger::new(config).await?;Performance
The audit logging system is designed for high performance:
- Asynchronous operations: Non-blocking I/O using tokio
- Buffered writes: Batch multiple events for efficient storage
- Efficient indexing: RocksDB column families for fast queries
- Lock-free reads: Concurrent read operations
- Target throughput: >10,000 events/second
Storage
Audit logs are stored in RocksDB with multiple column families for efficient indexing:
- events: Main event storage
- user_index: Index by user ID
- table_index: Index by table name
- timestamp_index: Index by timestamp
- metadata: Chain metadata and last event hash
Storage path structure:
./audit_logs/├── events/├── user_index/├── table_index/├── timestamp_index/└── metadata/Testing
Run tests:
# Unit testscargo test --lib
# Integration testscargo test --test integration_test
# All testscargo test
# With outputcargo test -- --nocaptureRun the example:
cargo run --example basic_auditArchitecture
See ARCHITECTURE.md for detailed technical documentation including:
- System design and components
- Hash chain implementation
- Storage architecture
- Query optimization
- Performance characteristics
License
Apache-2.0
Contributing
Part of the HeliosDB project. See the main repository for contribution guidelines.