Automated ETL Examples
Automated ETL Examples
Last Updated: January 4, 2026
This document provides practical examples of using HeliosDB’s Automated ETL feature for common data integration scenarios.
Table of Contents
- CSV Import with Schema Inference
- Database Migration
- Custom Transformations
- Data Quality Validation
- Anomaly Detection
- Change Data Capture (CDC)
- Streaming ETL Pipeline
CSV Import with Schema Inference
Import CSV files with automatic type detection:
use heliosdb_etl::{AutomatedETLEngine, SchemaInferenceConfig};use csv::Reader;use std::collections::HashMap;
async fn import_csv(file_path: &str) -> Result<(), Box<dyn std::error::Error>> { // Read CSV file let mut reader = Reader::from_path(file_path)?; let headers: Vec<String> = reader.headers()?.iter().map(|s| s.to_string()).collect();
let mut data = Vec::new(); for result in reader.records() { let record = result?; let mut row = HashMap::new(); for (i, field) in record.iter().enumerate() { row.insert(headers[i].clone(), field.to_string()); } data.push(row); }
// Create ETL engine let engine = AutomatedETLEngine::new(SchemaInferenceConfig::default()).await?;
// Infer schema from CSV data let schema = engine.infer_schema("csv_import", &data).await?;
println!("Detected schema:"); for col in &schema.columns { println!(" {} : {:?} (nullable: {})", col.name, col.data_type, col.nullable); }
// Build and execute pipeline let pipeline = engine.build_pipeline(schema.clone(), schema).await?; let result = pipeline.execute(data).await?;
println!("\nImport complete: {} rows processed", result.rows_processed);
Ok(())}Database Migration
Migrate data between different databases with automatic schema mapping:
use heliosdb_etl::{AutomatedETLEngine, SchemaInferenceConfig, SchemaMapper};
async fn migrate_database( source_connection: &str, target_connection: &str,) -> Result<(), Box<dyn std::error::Error>> { let engine = AutomatedETLEngine::new(SchemaInferenceConfig { sample_size: 50_000, confidence_threshold: 0.85, infer_relationships: true, ..Default::default() }).await?;
// Fetch source data (pseudocode) let source_data = fetch_from_database(source_connection, "SELECT * FROM users").await?;
// Infer source schema let source_schema = engine.infer_schema("users", &source_data).await?;
// Define target schema (or infer from existing table) let target_schema = get_target_schema(target_connection, "users").await?;
// Create schema mapper with fuzzy matching let mapper = SchemaMapper::new(0.7); // 70% similarity threshold let mappings = mapper.map_schemas(&source_schema, &target_schema).await?;
println!("Schema mappings:"); for mapping in &mappings { println!(" {} -> {} (confidence: {:.2})", mapping.source_column, mapping.target_column, mapping.confidence); }
// Build pipeline with mappings let pipeline = engine.build_pipeline_with_mappings( source_schema, target_schema, mappings, ).await?;
// Execute migration let result = pipeline.execute(source_data).await?;
// Write to target (pseudocode) write_to_database(target_connection, &result.transformed_data).await?;
println!("Migration complete: {} rows migrated", result.rows_processed);
Ok(())}Custom Transformations
Apply custom transformation rules to your data:
use heliosdb_etl::*;use uuid::Uuid;
async fn custom_transformations() -> Result<(), Box<dyn std::error::Error>> { let engine = TransformationEngine::default();
// Define transformation rules let rules = vec![ // Trim and uppercase names TransformationRule { id: Uuid::new_v4().to_string(), name: "normalize_name".to_string(), source_column: "name".to_string(), target_column: "name".to_string(), operations: vec![ TransformationOp::Normalize { operation: NormalizeOp::Trim, params: HashMap::new(), }, TransformationOp::Normalize { operation: NormalizeOp::Uppercase, params: HashMap::new(), }, ], confidence: 1.0, priority: 0, },
// Convert string to integer TransformationRule { id: Uuid::new_v4().to_string(), name: "convert_age".to_string(), source_column: "age".to_string(), target_column: "age".to_string(), operations: vec![ TransformationOp::TypeConversion { from_type: DataType::String, to_type: DataType::Integer, format: None, }, ], confidence: 1.0, priority: 1, },
// Parse date from string TransformationRule { id: Uuid::new_v4().to_string(), name: "parse_date".to_string(), source_column: "birth_date".to_string(), target_column: "birth_date".to_string(), operations: vec![ TransformationOp::TypeConversion { from_type: DataType::String, to_type: DataType::Date, format: Some("%Y-%m-%d".to_string()), }, ], confidence: 1.0, priority: 2, }, ];
// Apply transformations let source_data = vec![ HashMap::from([ ("name".to_string(), " john doe ".to_string()), ("age".to_string(), "30".to_string()), ("birth_date".to_string(), "1994-05-15".to_string()), ]), ];
let transformed = engine.transform_batch(source_data, &rules).await?;
println!("Transformed data:"); for row in &transformed { println!(" {:?}", row); }
Ok(())}Data Quality Validation
Validate data quality with comprehensive metrics:
use heliosdb_etl::{DataQualityValidator, QualityConfig};
async fn validate_data_quality() -> Result<(), Box<dyn std::error::Error>> { let validator = DataQualityValidator::new();
// Configure quality thresholds let config = QualityConfig { min_completeness: 0.95, // 95% non-null min_accuracy: 0.98, // 98% valid values min_consistency: 0.99, // 99% consistent min_uniqueness: 1.0, // 100% unique for primary keys check_duplicates: true, check_referential: true, };
// Validate dataset let metrics = validator.validate_with_config(&data, &schema, &config).await?;
println!("Data Quality Report:"); println!("====================="); println!("Completeness: {:.1}% (threshold: {:.1}%)", metrics.completeness * 100.0, config.min_completeness * 100.0); println!("Accuracy: {:.1}% (threshold: {:.1}%)", metrics.accuracy * 100.0, config.min_accuracy * 100.0); println!("Consistency: {:.1}% (threshold: {:.1}%)", metrics.consistency * 100.0, config.min_consistency * 100.0); println!("Uniqueness: {:.1}% (threshold: {:.1}%)", metrics.uniqueness * 100.0, config.min_uniqueness * 100.0); println!("Timeliness: {:.1}%", metrics.timeliness * 100.0); println!("---------------------"); println!("Overall Score: {:.1}%", metrics.overall_score * 100.0);
// Check if quality meets thresholds if metrics.passes_thresholds(&config) { println!("\n[PASS] Data quality meets all thresholds"); } else { println!("\n[FAIL] Data quality below thresholds"); for issue in &metrics.issues { println!(" - {}", issue); } }
Ok(())}Anomaly Detection
Detect and handle data anomalies:
use heliosdb_etl::{AnomalyDetector, AnomalyType};
async fn detect_anomalies() -> Result<(), Box<dyn std::error::Error>> { // Create detector with 80% sensitivity let detector = AnomalyDetector::new(0.8);
// Detect anomalies in dataset let anomalies = detector.detect(&data, &schema).await?;
println!("Detected {} anomalies:", anomalies.len());
for anomaly in &anomalies { let severity = match anomaly.anomaly_type { AnomalyType::TypeMismatch => "HIGH", AnomalyType::UnexpectedNull => "MEDIUM", AnomalyType::RangeViolation => "MEDIUM", AnomalyType::FormatViolation => "LOW", AnomalyType::Outlier => "LOW", };
println!(" [{severity}] Row {}: {} - {} (confidence: {:.2})", anomaly.row_id, anomaly.column, anomaly.explanation, anomaly.confidence); }
// Optionally: filter out anomalous rows let clean_data: Vec<_> = data.iter() .enumerate() .filter(|(i, _)| !anomalies.iter().any(|a| a.row_id == *i)) .map(|(_, row)| row.clone()) .collect();
println!("\nClean data: {} rows (removed {} anomalous rows)", clean_data.len(), anomalies.len());
Ok(())}Change Data Capture (CDC)
Set up real-time data synchronization with CDC:
use heliosdb_etl::{CDCProcessor, ChangeOperation};use tokio::sync::mpsc;
async fn setup_cdc() -> Result<(), Box<dyn std::error::Error>> { // Create CDC processor let processor = CDCProcessor::default();
// Create event channel let (tx, rx) = mpsc::channel(1000);
// Spawn processor task let processor_handle = tokio::spawn(async move { processor.process_events(rx).await });
// Simulate database changes for i in 1..=100 { // INSERT event let insert_event = CDCProcessor::create_event( "users".to_string(), ChangeOperation::Insert, None, // No old value for insert Some(HashMap::from([ ("id".to_string(), i.to_string()), ("name".to_string(), format!("User {}", i)), ])), HashMap::from([("id".to_string(), i.to_string())]), ); tx.send(insert_event).await?;
// UPDATE event if i % 10 == 0 { let update_event = CDCProcessor::create_event( "users".to_string(), ChangeOperation::Update, Some(HashMap::from([ ("id".to_string(), i.to_string()), ("name".to_string(), format!("User {}", i)), ])), Some(HashMap::from([ ("id".to_string(), i.to_string()), ("name".to_string(), format!("Updated User {}", i)), ])), HashMap::from([("id".to_string(), i.to_string())]), ); tx.send(update_event).await?; } }
// Close channel to signal completion drop(tx);
// Wait for processing to complete let stats = processor_handle.await??;
println!("CDC Processing Complete:"); println!(" Inserts: {}", stats.inserts); println!(" Updates: {}", stats.updates); println!(" Deletes: {}", stats.deletes); println!(" Latency (P50): {:.2}ms", stats.latency_p50_ms); println!(" Latency (P99): {:.2}ms", stats.latency_p99_ms);
Ok(())}Streaming ETL Pipeline
Process large datasets with streaming/chunked processing:
use heliosdb_etl::{AutomatedETLEngine, PipelineConfig, SchemaInferenceConfig};
async fn streaming_etl(file_path: &str) -> Result<(), Box<dyn std::error::Error>> { let engine = AutomatedETLEngine::new(SchemaInferenceConfig { sample_size: 1_000, // Sample for schema ..Default::default() }).await?;
// Sample first chunk for schema inference let sample_data = read_csv_chunk(file_path, 0, 1000)?; let schema = engine.infer_schema("stream", &sample_data).await?;
// Configure pipeline for streaming let pipeline_config = PipelineConfig { name: "streaming_pipeline".to_string(), source_schema: schema.clone(), target_schema: schema, batch_size: 10_000, // Process 10K rows at a time parallelism: 8, quality_checks: false, // Skip for speed anomaly_detection: false, ..Default::default() };
let pipeline = engine.build_pipeline_with_config(pipeline_config).await?;
// Stream process file in chunks let chunk_size = 10_000; let mut offset = 0; let mut total_processed = 0;
loop { let chunk = read_csv_chunk(file_path, offset, chunk_size)?; if chunk.is_empty() { break; }
let result = pipeline.execute(chunk).await?; total_processed += result.rows_processed; offset += chunk_size;
println!("Processed {} rows (total: {})", result.rows_processed, total_processed); }
println!("\nStreaming ETL complete: {} total rows", total_processed);
Ok(())}
fn read_csv_chunk( path: &str, offset: usize, limit: usize) -> Result<Vec<HashMap<String, String>>, Box<dyn std::error::Error>> { // Implementation: read CSV with offset and limit todo!()}Performance Comparison
| Scenario | Rows | Time | Throughput |
|---|---|---|---|
| CSV Import (small) | 10,000 | 0.1s | 100K rows/s |
| CSV Import (large) | 1,000,000 | 7.2s | 139K rows/s |
| Database Migration | 500,000 | 4.5s | 111K rows/s |
| Streaming ETL | 10,000,000 | 72s | 139K rows/s |
| With Quality Checks | 100,000 | 1.1s | 91K rows/s |
Related Documentation
- README.md - Feature overview
- QUICK_START.md - Getting started guide
- User Guide - Complete API reference