Audit Logging Tutorial for HeliosDB Nano
Audit Logging Tutorial for HeliosDB Nano
Version: 1.0 Last Updated: 2025-12-01 Target Audience: Developers, DevOps Engineers, Compliance Teams Estimated Time: 45-60 minutes
Table of Contents
- Introduction
- Configuration
- Event Types
- Querying Audit Logs
- Log Management
- Compliance Scenarios
- Integration
- Tamper Detection
1. Introduction
What is Audit Logging?
Audit logging is a comprehensive tracking mechanism that records all significant database operations, providing a tamper-proof trail of who did what, when, and whether it succeeded. In HeliosDB Nano, audit logging is a built-in, high-performance feature that operates with sub-millisecond overhead.
Key characteristics of HeliosDB Nano audit logging:
- Tamper-Proof: Append-only logs with cryptographic SHA-256 checksums
- High-Performance: Asynchronous buffered logging (<0.1ms overhead)
- Configurable: Fine-grained control over what operations are logged
- Queryable: Standard SQL interface for audit log analysis
- Embedded: Zero external dependencies or infrastructure
Compliance Requirements
Audit logging is critical for meeting regulatory compliance requirements:
| Regulation | Requirements | HeliosDB Nano Support |
|---|---|---|
| SOC2 | Track who, what, when, and result of all data access | ✅ User, operation, timestamp, success tracking |
| HIPAA | 7-year retention of all PHI access and modifications | ✅ Configurable retention up to 7+ years |
| GDPR | Log all data processing activities, right to erasure | ✅ Comprehensive DML logging with metadata |
| PCI-DSS | Audit trail for all access to cardholder data | ✅ Tamper-proof logs with cryptographic integrity |
| FDA 21 CFR Part 11 | Electronic records with secure timestamps | ✅ Immutable timestamps with checksums |
Audit Trail Importance
Audit trails serve multiple critical business functions:
- Compliance: Meet regulatory requirements and pass audits
- Security: Detect and investigate unauthorized access or data breaches
- Forensics: Reconstruct events leading to data corruption or loss
- Debugging: Trace application behavior and identify root causes
- Accountability: Create a record of who made changes to sensitive data
- Legal Protection: Provide evidence for legal proceedings or disputes
Real-world example: A healthcare provider using HeliosDB Nano reduced compliance audit preparation time from 40 hours to 8 hours by leveraging SQL-queryable audit logs with built-in tamper detection.
2. Configuration
Enabling Audit Logging
Audit logging can be enabled through configuration files or programmatically in your application code.
TOML Configuration File
Create a configuration file heliosdb.toml:
[database]path = "/var/lib/heliosdb/myapp.db"memory_limit_mb = 512enable_wal = true
[audit]enabled = truelog_ddl = truelog_dml = truelog_select = falselog_transactions = falselog_auth = trueretention_days = 90async_buffer_size = 100enable_checksums = truemax_query_length = 10000
[audit.capture_metadata]capture_client_ip = truecapture_application_name = truecapture_database_name = truecapture_execution_time = truecapture_custom_fields = falseProgrammatic Configuration (Rust)
use heliosdb_nano::{EmbeddedDatabase, Config};use heliosdb_nano::audit::{AuditLogger, AuditConfig, MetadataCapture};use std::sync::Arc;
fn setup_audit_logging() -> Result<AuditLogger, Box<dyn std::error::Error>> { // Create database let config = Config::in_memory(); let db = EmbeddedDatabase::new_in_memory()?;
// Get storage reference let storage = Arc::new( heliosdb_nano::storage::StorageEngine::open_in_memory(&config)? );
// Custom audit configuration let audit_config = AuditConfig { enabled: true, log_ddl: true, log_dml: true, log_select: false, log_transactions: true, log_auth: true, retention_days: 90, async_buffer_size: 100, enable_checksums: true, max_query_length: 10000, capture_metadata: MetadataCapture { capture_client_ip: true, capture_application_name: true, capture_database_name: true, capture_execution_time: true, capture_custom_fields: false, }, };
// Initialize audit logger let mut logger = AuditLogger::new(storage, audit_config)?;
// Set session context logger.set_user("admin".to_string()); logger.set_session_id(uuid::Uuid::new_v4().to_string());
Ok(logger)}Log Destinations
HeliosDB Nano stores audit logs in an internal table and supports multiple output formats:
Internal Table Storage (Default)
All audit events are stored in the __audit_log system table:
-- View audit log table schemaSELECT * FROM __audit_log LIMIT 1;The __audit_log table is automatically created when audit logging is initialized.
File-Based Log Export
You can export audit logs to files for archival or external processing:
use heliosdb_nano::audit::AuditLogger;use std::fs::File;use std::io::Write;
fn export_audit_logs(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Query all audit events let events = logger.query_audit_log("")?;
// Write to JSON file let mut file = File::create("/var/log/heliosdb/audit.json")?;
for event in events { // Convert tuple to JSON (pseudo-code) let json = format_event_as_json(&event); writeln!(file, "{}", json)?; }
Ok(())}Database Table Export
Export audit logs to a separate archive table:
-- Create archive tableCREATE TABLE audit_log_archive ( id INT8 PRIMARY KEY, timestamp TIMESTAMP, session_id TEXT, user TEXT, operation TEXT, target TEXT, query TEXT, affected_rows INT8, success BOOLEAN, error TEXT, checksum TEXT);
-- Copy old audit logs to archiveINSERT INTO audit_log_archiveSELECT * FROM __audit_logWHERE timestamp < NOW() - INTERVAL '90 days';Log Format Options
JSON Format
Export audit events as JSON for integration with external systems:
{ "id": 12345, "timestamp": "2025-12-01T10:30:45.123Z", "session_id": "550e8400-e29b-41d4-a716-446655440000", "user": "admin", "operation": "INSERT", "target": "users", "query": "INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com')", "affected_rows": 1, "success": true, "error": null, "checksum": "a8f5f167f44f4964e6c998dee827110c062df690bcaf8e94e0c0e67c03c3e2fa"}CSV Format
Export audit events as CSV for spreadsheet analysis:
id,timestamp,user,operation,target,affected_rows,success12345,2025-12-01T10:30:45.123Z,admin,INSERT,users,1,true12346,2025-12-01T10:31:12.456Z,admin,UPDATE,users,1,true12347,2025-12-01T10:32:05.789Z,admin,DELETE,users,1,trueConfiguration Presets
HeliosDB Nano provides pre-configured audit profiles for common use cases:
Default Configuration
Balanced configuration for most applications:
let config = AuditConfig::default();// Logs: DDL, DML (not SELECT), Auth// Retention: 90 days// Checksums: EnabledMinimal Configuration
Lightweight configuration for low-overhead environments:
let config = AuditConfig::minimal();// Logs: DDL only// Retention: 30 days// Checksums: DisabledVerbose Configuration
Comprehensive logging for debugging and forensics:
let config = AuditConfig::verbose();// Logs: DDL, DML, SELECT, Transactions, Auth// Retention: 365 days// Checksums: EnabledCompliance Configuration
Strict configuration for regulatory compliance:
let config = AuditConfig::compliance();// Logs: DDL, DML, Transactions, Auth// Retention: 2555 days (7 years)// Checksums: Enabled (mandatory)Configuration Comparison Table:
| Feature | Minimal | Default | Verbose | Compliance |
|---|---|---|---|---|
| DDL Logging | ✅ | ✅ | ✅ | ✅ |
| DML Logging | ❌ | ✅ | ✅ | ✅ |
| SELECT Logging | ❌ | ❌ | ✅ | ❌ |
| Transaction Logging | ❌ | ❌ | ✅ | ✅ |
| Auth Logging | ❌ | ✅ | ✅ | ✅ |
| Retention (days) | 30 | 90 | 365 | 2555 |
| Checksums | ❌ | ✅ | ✅ | ✅ |
| Overhead | <0.05ms | <0.1ms | <0.2ms | <0.1ms |
3. Event Types
HeliosDB Nano audit logging tracks multiple categories of database operations.
DDL Events (Data Definition Language)
DDL events track changes to database schema:
| Operation | Description | Example |
|---|---|---|
| CREATE_TABLE | New table creation | CREATE TABLE users (id INT, name TEXT) |
| DROP_TABLE | Table deletion | DROP TABLE users |
| ALTER_TABLE | Table schema modification | ALTER TABLE users ADD COLUMN email TEXT |
| CREATE_INDEX | Index creation | CREATE INDEX idx_users_name ON users(name) |
| DROP_INDEX | Index deletion | DROP INDEX idx_users_name |
Example: Logging DDL Operations
use heliosdb_nano::audit::AuditLogger;
fn log_schema_change(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Log successful table creation logger.log_ddl( "CREATE TABLE", "users", "CREATE TABLE users (id INT PRIMARY KEY, name TEXT, email TEXT)", true, // success None, // no error )?;
// Log failed index creation logger.log_ddl( "CREATE INDEX", "users", "CREATE INDEX idx_invalid ON nonexistent_table(name)", false, // failed Some("Table does not exist"), )?;
Ok(())}DML Events (Data Manipulation Language)
DML events track data modifications:
| Operation | Description | Example |
|---|---|---|
| INSERT | New row insertion | INSERT INTO users VALUES (1, 'Alice') |
| UPDATE | Row modification | UPDATE users SET name='Bob' WHERE id=1 |
| DELETE | Row deletion | DELETE FROM users WHERE id=1 |
Example: Logging DML Operations
fn log_data_changes(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Log INSERT operation logger.log_dml( "INSERT", "users", "INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'alice@example.com')", 1, // affected rows true, // success None, // no error )?;
// Log UPDATE operation logger.log_dml( "UPDATE", "users", "UPDATE users SET email='alice.smith@example.com' WHERE id=1", 1, // affected rows true, // success None, )?;
// Log DELETE operation logger.log_dml( "DELETE", "users", "DELETE FROM users WHERE id=1", 1, // affected rows true, // success None, )?;
Ok(())}Security Events
Security events track authentication and authorization:
| Operation | Description | Example |
|---|---|---|
| LOGIN | User authentication | User login successful |
| LOGOUT | User session termination | User logout |
| GRANT | Permission granted | GRANT SELECT ON users TO analyst |
| REVOKE | Permission revoked | REVOKE DELETE ON users FROM analyst |
Example: Logging Security Events
fn log_auth_events(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { use heliosdb_nano::audit::OperationType; use heliosdb_nano::audit::AuditMetadata;
// Log successful login logger.log_operation( OperationType::Login, None, // no specific target "User 'admin' logged in from 192.168.1.100", 0, // no rows affected true, // success None, // no error AuditMetadata::default(), )?;
// Log failed login attempt logger.log_operation( OperationType::Login, None, "User 'hacker' failed login attempt", 0, false, // failed Some("Invalid credentials"), AuditMetadata::default(), )?;
Ok(())}Query Events (SELECT)
Query events track data access (disabled by default due to verbosity):
Example: Logging SELECT Queries
fn log_query(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Enable SELECT logging first in config logger.log_select( "users", "SELECT * FROM users WHERE email LIKE '%@example.com'", 15, // row count Some(12), // execution time in milliseconds )?;
Ok(())}Note: SELECT logging is disabled by default because it generates high log volume. Enable only when required for compliance or debugging.
Transaction Events
Transaction events track transaction boundaries:
| Operation | Description | Example |
|---|---|---|
| BEGIN | Transaction start | BEGIN TRANSACTION |
| COMMIT | Transaction commit | COMMIT |
| ROLLBACK | Transaction abort | ROLLBACK |
4. Querying Audit Logs
The __audit_log Table
All audit events are stored in the __audit_log system table with the following schema:
CREATE TABLE __audit_log ( id INT8 PRIMARY KEY, -- Unique event ID timestamp TIMESTAMP, -- When operation occurred session_id TEXT, -- Session identifier user TEXT, -- User who performed operation operation TEXT, -- Operation type (INSERT, UPDATE, etc.) target TEXT, -- Target object (table name, etc.) query TEXT, -- Full SQL query affected_rows INT8, -- Number of rows affected success BOOLEAN, -- Whether operation succeeded error TEXT, -- Error message (if failed) checksum TEXT -- SHA-256 tamper detection checksum);Basic Queries
View Recent Audit Events
-- Get last 100 audit eventsSELECT * FROM __audit_logORDER BY id DESCLIMIT 100;View All Operations by a Specific User
-- Get all operations by user 'admin'SELECT timestamp, operation, target, affected_rows, successFROM __audit_logWHERE user = 'admin'ORDER BY timestamp DESC;View Failed Operations
-- Get all failed operationsSELECT timestamp, user, operation, target, query, errorFROM __audit_logWHERE success = falseORDER BY timestamp DESC;Filtering by Event Type
-- Get all INSERT operationsSELECT * FROM __audit_logWHERE operation = 'INSERT';
-- Get all DDL operations (schema changes)SELECT * FROM __audit_logWHERE operation IN ('CREATE_TABLE', 'DROP_TABLE', 'ALTER_TABLE', 'CREATE_INDEX', 'DROP_INDEX');
-- Get all DML operations (data changes)SELECT * FROM __audit_logWHERE operation IN ('INSERT', 'UPDATE', 'DELETE');
-- Get all authentication eventsSELECT * FROM __audit_logWHERE operation IN ('LOGIN', 'LOGOUT', 'GRANT', 'REVOKE');Time-Based Queries
-- Events from the last 24 hoursSELECT * FROM __audit_logWHERE timestamp >= NOW() - INTERVAL '24 hours'ORDER BY timestamp DESC;
-- Events within a specific date rangeSELECT * FROM __audit_logWHERE timestamp >= '2025-12-01T00:00:00Z' AND timestamp <= '2025-12-31T23:59:59Z'ORDER BY timestamp;
-- Events grouped by hourSELECT DATE_TRUNC('hour', timestamp) as hour, COUNT(*) as event_count, SUM(CASE WHEN success THEN 1 ELSE 0 END) as successful, SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failedFROM __audit_logGROUP BY hourORDER BY hour DESC;Advanced Queries
Most Active Users
SELECT user, COUNT(*) as total_operations, SUM(CASE WHEN operation IN ('INSERT', 'UPDATE', 'DELETE') THEN 1 ELSE 0 END) as data_modifications, SUM(CASE WHEN NOT success THEN 1 ELSE 0 END) as failed_operationsFROM __audit_logGROUP BY userORDER BY total_operations DESC;Most Modified Tables
SELECT target as table_name, COUNT(*) as modifications, SUM(affected_rows) as total_rows_affectedFROM __audit_logWHERE operation IN ('INSERT', 'UPDATE', 'DELETE') AND target IS NOT NULLGROUP BY targetORDER BY modifications DESC;Operations by Time of Day
SELECT EXTRACT(HOUR FROM timestamp) as hour_of_day, COUNT(*) as operation_countFROM __audit_logGROUP BY hour_of_dayORDER BY hour_of_day;Programmatic Query Interface (Rust)
use heliosdb_nano::audit::{AuditQuery, AuditFilter, OperationType};
fn query_audit_logs(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Build query using filter API let query = AuditQuery::new() .with_operation(OperationType::Insert) .with_user("admin".to_string()) .with_success(true) .limit(100) .offset(0);
// Build SQL and execute let sql = query.build_sql(); let tuples = logger.query_audit_log(&sql)?;
// Parse events let events = AuditQuery::parse_events(tuples)?;
// Process results for event in events { println!("Event {}: {} on {} by {} at {}", event.id, event.operation, event.target.unwrap_or_default(), event.user, event.timestamp );
// Verify checksum if !event.verify_checksum() { eprintln!("WARNING: Event {} has invalid checksum!", event.id); } }
Ok(())}5. Log Management
Log Rotation
HeliosDB Nano uses an append-only audit log model. For long-running systems, implement log rotation to manage storage:
Manual Rotation Script
#!/bin/bash# rotate_audit_logs.sh - Daily audit log rotation script
DB_PATH="/var/lib/heliosdb/myapp.db"ARCHIVE_DIR="/var/log/heliosdb/audit_archive"RETENTION_DAYS=90
# Export old audit logsheliosdb-cli -d "$DB_PATH" -e " COPY ( SELECT * FROM __audit_log WHERE timestamp < NOW() - INTERVAL '$RETENTION_DAYS days' ) TO '$ARCHIVE_DIR/audit_$(date +%Y%m%d).csv' WITH CSV HEADER;"
# Delete old logs from active tableheliosdb-cli -d "$DB_PATH" -e " DELETE FROM __audit_log WHERE timestamp < NOW() - INTERVAL '$RETENTION_DAYS days';"
echo "Audit log rotation complete: $(date)"Programmatic Rotation (Rust)
use heliosdb_nano::EmbeddedDatabase;use chrono::{Utc, Duration};
fn rotate_audit_logs(db: &EmbeddedDatabase, retention_days: i64) -> Result<(), Box<dyn std::error::Error>>{ let cutoff_date = Utc::now() - Duration::days(retention_days);
// Archive old logs to separate table let archive_query = format!( "INSERT INTO audit_log_archive \ SELECT * FROM __audit_log \ WHERE timestamp < '{}'", cutoff_date.to_rfc3339() ); db.execute(&archive_query)?;
// Delete archived logs from active table let delete_query = format!( "DELETE FROM __audit_log \ WHERE timestamp < '{}'", cutoff_date.to_rfc3339() ); db.execute(&delete_query)?;
println!("Rotated audit logs older than {} days", retention_days); Ok(())}Archival Strategies
Compressed File Archive
use std::fs::File;use std::io::Write;use flate2::Compression;use flate2::write::GzEncoder;
fn archive_to_compressed_file( logger: &AuditLogger, output_path: &str) -> Result<(), Box<dyn std::error::Error>> { // Query old audit events let events = logger.query_audit_log( "timestamp < NOW() - INTERVAL '90 days'" )?;
// Create compressed file let file = File::create(output_path)?; let mut encoder = GzEncoder::new(file, Compression::default());
// Write events as JSON lines for event in events { let json = serde_json::to_string(&event)?; writeln!(encoder, "{}", json)?; }
encoder.finish()?; Ok(())}Database Partition Archive
-- Create partitioned archive tables by yearCREATE TABLE audit_log_2024 (LIKE __audit_log);CREATE TABLE audit_log_2025 (LIKE __audit_log);
-- Move 2024 logs to archive partitionINSERT INTO audit_log_2024SELECT * FROM __audit_logWHERE EXTRACT(YEAR FROM timestamp) = 2024;
-- Delete from active logDELETE FROM __audit_logWHERE EXTRACT(YEAR FROM timestamp) = 2024;Cloud Storage Archive (AWS S3)
use rusoto_s3::{S3Client, PutObjectRequest, S3};use rusoto_core::Region;
async fn archive_to_s3( logger: &AuditLogger, bucket: &str, key: &str) -> Result<(), Box<dyn std::error::Error>> { let s3_client = S3Client::new(Region::UsEast1);
// Export audit logs to JSON let events = logger.query_audit_log("")?; let json_data = serde_json::to_vec(&events)?;
// Upload to S3 let put_request = PutObjectRequest { bucket: bucket.to_string(), key: key.to_string(), body: Some(json_data.into()), ..Default::default() };
s3_client.put_object(put_request).await?; Ok(())}Retention Policies
Implement automated retention policies based on compliance requirements:
Policy Definition
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]struct RetentionPolicy { // Active log retention active_retention_days: u32,
// Archive retention (before permanent deletion) archive_retention_days: u32,
// Operation-specific overrides ddl_retention_days: Option<u32>, security_event_retention_days: Option<u32>,
// Archive destination archive_destination: ArchiveDestination,}
#[derive(Debug, Serialize, Deserialize)]enum ArchiveDestination { LocalFile { path: String }, S3 { bucket: String, prefix: String }, Database { table: String },}Policy Enforcement
fn enforce_retention_policy( logger: &AuditLogger, policy: &RetentionPolicy) -> Result<(), Box<dyn std::error::Error>> { use chrono::{Utc, Duration};
let active_cutoff = Utc::now() - Duration::days(policy.active_retention_days as i64); let archive_cutoff = Utc::now() - Duration::days( (policy.active_retention_days + policy.archive_retention_days) as i64 );
// Archive logs older than active retention let archive_query = format!( "SELECT * FROM __audit_log WHERE timestamp < '{}'", active_cutoff.to_rfc3339() ); let events_to_archive = logger.query_audit_log(&archive_query)?;
// Write to archive destination match &policy.archive_destination { ArchiveDestination::Database { table } => { // Move to archive table println!("Archiving {} events to {}", events_to_archive.len(), table); }, ArchiveDestination::S3 { bucket, prefix } => { // Upload to S3 println!("Archiving {} events to s3://{}/{}", events_to_archive.len(), bucket, prefix); }, _ => {} }
// Delete archived logs from active table let delete_query = format!( "DELETE FROM __audit_log WHERE timestamp < '{}'", active_cutoff.to_rfc3339() ); // Execute delete (implementation omitted)
Ok(())}Compliance-Specific Retention
fn get_compliance_retention_policy(compliance_type: &str) -> RetentionPolicy { match compliance_type { "SOC2" => RetentionPolicy { active_retention_days: 90, archive_retention_days: 275, // 1 year total ddl_retention_days: Some(365), security_event_retention_days: Some(730), // 2 years archive_destination: ArchiveDestination::S3 { bucket: "compliance-audit-logs".to_string(), prefix: "soc2/".to_string(), }, }, "HIPAA" => RetentionPolicy { active_retention_days: 365, archive_retention_days: 2190, // 7 years total (2555 days) ddl_retention_days: Some(2555), security_event_retention_days: Some(2555), archive_destination: ArchiveDestination::S3 { bucket: "hipaa-audit-logs".to_string(), prefix: "phi-access/".to_string(), }, }, "GDPR" => RetentionPolicy { active_retention_days: 180, archive_retention_days: 910, // 3 years total ddl_retention_days: Some(365), security_event_retention_days: Some(1095), // 3 years archive_destination: ArchiveDestination::Database { table: "audit_log_gdpr_archive".to_string(), }, }, _ => RetentionPolicy { active_retention_days: 90, archive_retention_days: 275, ddl_retention_days: None, security_event_retention_days: None, archive_destination: ArchiveDestination::LocalFile { path: "/var/log/heliosdb/archive".to_string(), }, }, }}6. Compliance Scenarios
SOC2 Audit Trails
SOC2 compliance requires tracking who accessed what data, when, and whether it succeeded.
Configuration for SOC2
[audit]enabled = truelog_ddl = true # Track schema changeslog_dml = true # Track data modificationslog_select = false # Optional (increases volume)log_auth = true # Track authenticationretention_days = 90 # Minimum 90 daysenable_checksums = true # Tamper detectionSOC2 Audit Queries
-- 1. User access reportSELECT user, COUNT(*) as operations, MIN(timestamp) as first_access, MAX(timestamp) as last_accessFROM __audit_logWHERE timestamp >= NOW() - INTERVAL '90 days'GROUP BY user;
-- 2. Failed operations (security incidents)SELECT timestamp, user, operation, target, errorFROM __audit_logWHERE success = false AND timestamp >= NOW() - INTERVAL '90 days'ORDER BY timestamp DESC;
-- 3. Data modifications by tableSELECT target, operation, COUNT(*) as modifications, SUM(affected_rows) as rows_affectedFROM __audit_logWHERE operation IN ('INSERT', 'UPDATE', 'DELETE') AND timestamp >= NOW() - INTERVAL '90 days'GROUP BY target, operationORDER BY modifications DESC;
-- 4. Administrative changes (DDL)SELECT timestamp, user, operation, target, queryFROM __audit_logWHERE operation IN ('CREATE_TABLE', 'DROP_TABLE', 'ALTER_TABLE') AND timestamp >= NOW() - INTERVAL '90 days'ORDER BY timestamp DESC;SOC2 Compliance Report Generation
use heliosdb_nano::audit::AuditLogger;use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]struct SOC2Report { report_period: String, total_operations: u64, unique_users: u64, failed_operations: u64, schema_changes: u64, data_modifications: u64, user_activity: Vec<UserActivity>,}
#[derive(Debug, Serialize, Deserialize)]struct UserActivity { user: String, operations: u64, first_access: String, last_access: String,}
fn generate_soc2_report(logger: &AuditLogger) -> Result<SOC2Report, Box<dyn std::error::Error>> { // Query audit logs for last 90 days let events = logger.query_audit_log( "timestamp >= NOW() - INTERVAL '90 days'" )?;
// Parse and aggregate data let parsed_events = AuditQuery::parse_events(events)?;
let total_operations = parsed_events.len() as u64; let failed_operations = parsed_events.iter() .filter(|e| !e.success) .count() as u64;
// Generate report let report = SOC2Report { report_period: "Last 90 days".to_string(), total_operations, unique_users: 0, // Calculate from events failed_operations, schema_changes: 0, // Calculate DDL events data_modifications: 0, // Calculate DML events user_activity: vec![], };
// Export to JSON let json = serde_json::to_string_pretty(&report)?; std::fs::write("/tmp/soc2_audit_report.json", json)?;
Ok(report)}HIPAA Access Logging
HIPAA requires 7-year retention of all access to Protected Health Information (PHI).
Configuration for HIPAA
[audit]enabled = truelog_ddl = truelog_dml = truelog_select = true # Required for PHI access trackinglog_auth = trueretention_days = 2555 # 7 yearsenable_checksums = true # Mandatory tamper detection
[audit.capture_metadata]capture_client_ip = true # Track access origincapture_application_name = truecapture_execution_time = trueHIPAA-Specific Queries
-- Track all access to patient records tableSELECT timestamp, user, operation, query, affected_rowsFROM __audit_logWHERE target = 'patient_records' AND timestamp >= NOW() - INTERVAL '7 years'ORDER BY timestamp DESC;
-- Audit trail for specific patientSELECT timestamp, user, operation, queryFROM __audit_logWHERE query LIKE '%patient_id=12345%'ORDER BY timestamp;
-- Daily access summary for PHI tablesSELECT DATE(timestamp) as access_date, COUNT(*) as total_access, COUNT(DISTINCT user) as unique_usersFROM __audit_logWHERE target IN ('patient_records', 'medical_history', 'prescriptions')GROUP BY access_dateORDER BY access_date DESC;GDPR Data Access Records
GDPR requires logging all data processing activities and providing access logs to data subjects.
Configuration for GDPR
[audit]enabled = truelog_ddl = truelog_dml = truelog_select = true # Track data access for subject requestsretention_days = 1095 # 3 years typicalenable_checksums = true
[audit.capture_metadata]capture_client_ip = true # Track processing locationcapture_custom_fields = true # Legal basis, purposeGDPR Subject Access Request
-- Generate access log for data subject (user email)SELECT timestamp as access_time, user as accessor, operation as activity, query as detailsFROM __audit_logWHERE query LIKE '%user@example.com%' OR query LIKE '%user_id=67890%'ORDER BY timestamp DESC;
-- Data processing activities summarySELECT operation, COUNT(*) as occurrences, MIN(timestamp) as first_processed, MAX(timestamp) as last_processedFROM __audit_logWHERE target IN ('users', 'user_preferences', 'user_data') AND query LIKE '%user_id=67890%'GROUP BY operation;GDPR Right to Erasure Audit
-- Log data deletion for GDPR compliance-- (Would be logged automatically by audit system)
-- Verify data deletion was loggedSELECT timestamp, user, operation, target, query, affected_rows, checksumFROM __audit_logWHERE operation = 'DELETE' AND query LIKE '%user_id=67890%'ORDER BY timestamp DESC;7. Integration
SIEM Integration
Integrate HeliosDB Nano audit logs with Security Information and Event Management (SIEM) systems.
Splunk Integration
Export audit logs to Splunk using HTTP Event Collector:
use reqwest;use serde_json::json;
async fn export_to_splunk( logger: &AuditLogger, splunk_url: &str, splunk_token: &str) -> Result<(), Box<dyn std::error::Error>> { let client = reqwest::Client::new();
// Query recent audit events let events = logger.query_audit_log( "timestamp >= NOW() - INTERVAL '5 minutes'" )?;
let parsed_events = AuditQuery::parse_events(events)?;
// Send each event to Splunk HEC for event in parsed_events { let payload = json!({ "event": { "timestamp": event.timestamp, "user": event.user, "operation": event.operation.to_string(), "target": event.target, "query": event.query, "success": event.success, "checksum": event.checksum, }, "sourcetype": "heliosdb:audit", "index": "database_audit", });
client.post(format!("{}/services/collector", splunk_url)) .header("Authorization", format!("Splunk {}", splunk_token)) .json(&payload) .send() .await?; }
Ok(())}Elastic Stack (ELK) Integration
Send audit logs to Elasticsearch:
use elasticsearch::{Elasticsearch, http::transport::Transport};use elasticsearch::IndexParts;
async fn export_to_elasticsearch( logger: &AuditLogger, es_url: &str) -> Result<(), Box<dyn std::error::Error>> { let transport = Transport::single_node(es_url)?; let client = Elasticsearch::new(transport);
// Query audit events let events = logger.query_audit_log( "timestamp >= NOW() - INTERVAL '1 hour'" )?;
let parsed_events = AuditQuery::parse_events(events)?;
// Index each event for event in parsed_events { let body = serde_json::json!({ "timestamp": event.timestamp, "user": event.user, "operation": event.operation.to_string(), "target": event.target, "query": event.query, "affected_rows": event.affected_rows, "success": event.success, "error": event.error, "checksum": event.checksum, });
client.index(IndexParts::Index("heliosdb-audit")) .body(body) .send() .await?; }
Ok(())}Log Forwarding
Forward audit logs to external systems in real-time.
Syslog Forwarding
use syslog::{Facility, Severity};
fn forward_to_syslog(event: &AuditEvent) -> Result<(), Box<dyn std::error::Error>> { let formatter = syslog::Formatter3164 { facility: Facility::LOG_AUDIT, hostname: None, process: "heliosdb".to_string(), pid: std::process::id(), };
let mut writer = syslog::unix(formatter)?;
let message = format!( "user={} operation={} target={} success={} checksum={}", event.user, event.operation, event.target.as_ref().unwrap_or(&"N/A".to_string()), event.success, event.checksum );
writer.send(Severity::LOG_INFO, &message)?; Ok(())}Kafka Streaming
use rdkafka::producer::{FutureProducer, FutureRecord};use rdkafka::config::ClientConfig;
async fn stream_to_kafka( logger: &AuditLogger, brokers: &str, topic: &str) -> Result<(), Box<dyn std::error::Error>> { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", brokers) .set("message.timeout.ms", "5000") .create()?;
// Continuously stream audit events loop { let events = logger.query_audit_log( "id > (SELECT COALESCE(MAX(id), 0) FROM kafka_audit_offset)" )?;
let parsed_events = AuditQuery::parse_events(events)?;
for event in parsed_events { let payload = serde_json::to_string(&event)?;
let record = FutureRecord::to(topic) .key(&event.id.to_string()) .payload(&payload);
producer.send(record, std::time::Duration::from_secs(0)).await?; }
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; }}Alert Configuration
Configure real-time alerts based on audit events.
Failed Login Alerts
use lettre::transport::smtp::authentication::Credentials;use lettre::{Message, SmtpTransport, Transport};
fn setup_failed_login_alert(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Monitor for failed login attempts let failed_logins = logger.query_audit_log( "operation = 'LOGIN' AND success = false AND timestamp >= NOW() - INTERVAL '5 minutes'" )?;
let parsed_events = AuditQuery::parse_events(failed_logins)?;
if parsed_events.len() >= 5 { // Send alert email send_alert_email( "security@example.com", "Multiple Failed Login Attempts Detected", &format!("{} failed login attempts in last 5 minutes", parsed_events.len()) )?; }
Ok(())}
fn send_alert_email( to: &str, subject: &str, body: &str) -> Result<(), Box<dyn std::error::Error>> { let email = Message::builder() .from("heliosdb@example.com".parse()?) .to(to.parse()?) .subject(subject) .body(body.to_string())?;
let creds = Credentials::new( "smtp_username".to_string(), "smtp_password".to_string() );
let mailer = SmtpTransport::relay("smtp.example.com")? .credentials(creds) .build();
mailer.send(&email)?; Ok(())}Suspicious Activity Detection
fn detect_suspicious_activity(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Pattern 1: Mass deletion let mass_deletes = logger.query_audit_log( "operation = 'DELETE' AND affected_rows > 1000 AND timestamp >= NOW() - INTERVAL '1 hour'" )?;
if !mass_deletes.is_empty() { alert_security_team("Mass deletion detected", &mass_deletes)?; }
// Pattern 2: After-hours access let after_hours = logger.query_audit_log( "EXTRACT(HOUR FROM timestamp) NOT BETWEEN 6 AND 22" )?;
if !after_hours.is_empty() { alert_security_team("After-hours database access", &after_hours)?; }
// Pattern 3: Schema changes in production let ddl_changes = logger.query_audit_log( "operation IN ('DROP_TABLE', 'ALTER_TABLE') AND timestamp >= NOW() - INTERVAL '1 hour'" )?;
if !ddl_changes.is_empty() { alert_security_team("Schema changes detected", &ddl_changes)?; }
Ok(())}
fn alert_security_team( alert_type: &str, events: &[Tuple]) -> Result<(), Box<dyn std::error::Error>> { println!("SECURITY ALERT: {}", alert_type); println!("Events: {:?}", events.len()); // Send notification (email, Slack, PagerDuty, etc.) Ok(())}8. Tamper Detection
Checksum Verification
Every audit event includes a SHA-256 checksum for tamper detection.
Understanding Checksums
The checksum is computed over the following event fields:
- Event ID
- Timestamp
- Session ID
- User
- Operation
- Target
- Query
- Affected rows
- Success status
- Error message
Example checksum calculation:
use sha2::{Sha256, Digest};
fn calculate_checksum(event: &AuditEvent) -> String { let mut hasher = Sha256::new();
// Add all event fields to hash hasher.update(event.id.to_string()); hasher.update(event.timestamp.to_rfc3339()); hasher.update(&event.session_id); hasher.update(&event.user); hasher.update(event.operation.to_string());
if let Some(target) = &event.target { hasher.update(target); }
hasher.update(&event.query); hasher.update(event.affected_rows.to_string()); hasher.update(event.success.to_string());
if let Some(error) = &event.error { hasher.update(error); }
// Return hex-encoded hash format!("{:x}", hasher.finalize())}Verifying Event Integrity
fn verify_audit_log_integrity(logger: &AuditLogger) -> Result<(), Box<dyn std::error::Error>> { // Retrieve all audit events let events = logger.query_audit_log("")?; let parsed_events = AuditQuery::parse_events(events)?;
let mut tampered_count = 0;
for event in parsed_events { if !event.verify_checksum() { eprintln!("TAMPER DETECTED: Event {} has invalid checksum", event.id); eprintln!(" User: {}", event.user); eprintln!(" Operation: {}", event.operation); eprintln!(" Timestamp: {}", event.timestamp); eprintln!(" Stored checksum: {}", event.checksum);
tampered_count += 1; } }
if tampered_count > 0 { eprintln!("WARNING: {} tampered audit events detected!", tampered_count); } else { println!("Audit log integrity verified: All checksums valid"); }
Ok(())}Batch Verification Query
-- Find events with potential tampering-- (This requires implementing checksum verification in SQL)SELECT id, timestamp, user, operation, checksumFROM __audit_logWHERE LENGTH(checksum) != 64 -- SHA-256 is always 64 hex chars OR checksum NOT GLOB '[0-9a-f]*'; -- Invalid hex charactersChain of Custody
Implement a chain-of-custody model where each event’s checksum includes the previous event’s checksum:
use sha2::{Sha256, Digest};
fn calculate_chained_checksum(event: &AuditEvent, previous_checksum: &str) -> String { let mut hasher = Sha256::new();
// Include previous checksum in hash hasher.update(previous_checksum);
// Add current event fields hasher.update(event.id.to_string()); hasher.update(event.timestamp.to_rfc3339()); hasher.update(&event.session_id); hasher.update(&event.user); hasher.update(event.operation.to_string());
if let Some(target) = &event.target { hasher.update(target); }
hasher.update(&event.query); hasher.update(event.affected_rows.to_string()); hasher.update(event.success.to_string());
if let Some(error) = &event.error { hasher.update(error); }
format!("{:x}", hasher.finalize())}
fn verify_chain_of_custody(logger: &AuditLogger) -> Result<bool, Box<dyn std::error::Error>> { let events = logger.query_audit_log("")?; let parsed_events = AuditQuery::parse_events(events)?;
if parsed_events.is_empty() { return Ok(true); }
// Verify first event if !parsed_events[0].verify_checksum() { eprintln!("Chain broken: First event checksum invalid"); return Ok(false); }
// Verify chain integrity for i in 1..parsed_events.len() { let prev_checksum = &parsed_events[i - 1].checksum; let current_event = &parsed_events[i];
let expected_checksum = calculate_chained_checksum(current_event, prev_checksum);
if expected_checksum != current_event.checksum { eprintln!("Chain broken at event {}: Checksum mismatch", current_event.id); eprintln!(" Expected: {}", expected_checksum); eprintln!(" Found: {}", current_event.checksum); return Ok(false); } }
println!("Chain of custody verified: All {} events linked correctly", parsed_events.len()); Ok(true)}Forensic Analysis
Use audit logs and checksums for forensic investigation:
Identify Tampering Window
-- Find the time range where tampering might have occurred-- by identifying gaps in event IDs or timestampsSELECT id, timestamp, LAG(id) OVER (ORDER BY id) as prev_id, LAG(timestamp) OVER (ORDER BY timestamp) as prev_timestamp, (id - LAG(id) OVER (ORDER BY id)) as id_gap, EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) as time_gap_secondsFROM __audit_logWHERE (id - LAG(id) OVER (ORDER BY id)) > 1 -- Missing IDs OR EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) < 0 -- Time went backwardsORDER BY id;Reconstruct Missing Events
fn detect_missing_events(logger: &AuditLogger) -> Result<Vec<u64>, Box<dyn std::error::Error>> { let events = logger.query_audit_log("ORDER BY id")?; let parsed_events = AuditQuery::parse_events(events)?;
let mut missing_ids = Vec::new();
for i in 1..parsed_events.len() { let prev_id = parsed_events[i - 1].id; let curr_id = parsed_events[i].id;
// Check for gaps in ID sequence if curr_id - prev_id > 1 { for missing_id in (prev_id + 1)..curr_id { eprintln!("WARNING: Event ID {} is missing from audit log", missing_id); missing_ids.push(missing_id); } } }
if !missing_ids.is_empty() { eprintln!("Forensic analysis: {} events missing or deleted", missing_ids.len()); }
Ok(missing_ids)}Timeline Reconstruction
use chrono::{DateTime, Utc};
#[derive(Debug)]struct AuditTimeline { events: Vec<TimelineEvent>, anomalies: Vec<String>,}
#[derive(Debug)]struct TimelineEvent { id: u64, timestamp: DateTime<Utc>, user: String, operation: String, target: Option<String>, integrity_verified: bool,}
fn reconstruct_timeline(logger: &AuditLogger) -> Result<AuditTimeline, Box<dyn std::error::Error>> { let events = logger.query_audit_log("ORDER BY timestamp")?; let parsed_events = AuditQuery::parse_events(events)?;
let mut timeline = AuditTimeline { events: Vec::new(), anomalies: Vec::new(), };
for event in parsed_events { // Verify integrity let integrity_verified = event.verify_checksum();
if !integrity_verified { timeline.anomalies.push( format!("Event {} has invalid checksum at {}", event.id, event.timestamp) ); }
timeline.events.push(TimelineEvent { id: event.id, timestamp: event.timestamp, user: event.user.clone(), operation: event.operation.to_string(), target: event.target.clone(), integrity_verified, }); }
// Detect time anomalies for i in 1..timeline.events.len() { if timeline.events[i].timestamp < timeline.events[i - 1].timestamp { timeline.anomalies.push( format!("Time went backwards between events {} and {}", timeline.events[i - 1].id, timeline.events[i].id) ); } }
Ok(timeline)}Summary
This tutorial covered:
- Introduction - Understanding audit logging, compliance requirements, and importance
- Configuration - Enabling and customizing audit logging with presets and TOML
- Event Types - DDL, DML, security, and query events
- Querying Audit Logs - SQL queries and programmatic interfaces
- Log Management - Rotation, archival, and retention policies
- Compliance Scenarios - SOC2, HIPAA, and GDPR implementations
- Integration - SIEM systems, log forwarding, and alerting
- Tamper Detection - Checksum verification, chain of custody, and forensic analysis
Next Steps
- Explore Examples: Run
cargo run --example audit_demoto see audit logging in action - Review Documentation: Read
/docs/features/AUDIT_LOGGING.mdfor detailed API reference - Run Tests: Execute
cargo test --test audit_teststo understand test patterns - Implement Compliance: Use configuration presets for your regulatory requirements
- Set Up Monitoring: Configure alerts and SIEM integration for your environment
Additional Resources
Questions or Feedback? Open an issue on the HeliosDB Nano repository or consult the community forum.