Skip to content

Automated ETL Examples

Automated ETL Examples

Last Updated: January 4, 2026


This document provides practical examples of using HeliosDB’s Automated ETL feature for common data integration scenarios.

Table of Contents

  1. CSV Import with Schema Inference
  2. Database Migration
  3. Custom Transformations
  4. Data Quality Validation
  5. Anomaly Detection
  6. Change Data Capture (CDC)
  7. Streaming ETL Pipeline

CSV Import with Schema Inference

Import CSV files with automatic type detection:

use heliosdb_etl::{AutomatedETLEngine, SchemaInferenceConfig};
use csv::Reader;
use std::collections::HashMap;
async fn import_csv(file_path: &str) -> Result<(), Box<dyn std::error::Error>> {
// Read CSV file
let mut reader = Reader::from_path(file_path)?;
let headers: Vec<String> = reader.headers()?.iter().map(|s| s.to_string()).collect();
let mut data = Vec::new();
for result in reader.records() {
let record = result?;
let mut row = HashMap::new();
for (i, field) in record.iter().enumerate() {
row.insert(headers[i].clone(), field.to_string());
}
data.push(row);
}
// Create ETL engine
let engine = AutomatedETLEngine::new(SchemaInferenceConfig::default()).await?;
// Infer schema from CSV data
let schema = engine.infer_schema("csv_import", &data).await?;
println!("Detected schema:");
for col in &schema.columns {
println!(" {} : {:?} (nullable: {})",
col.name, col.data_type, col.nullable);
}
// Build and execute pipeline
let pipeline = engine.build_pipeline(schema.clone(), schema).await?;
let result = pipeline.execute(data).await?;
println!("\nImport complete: {} rows processed", result.rows_processed);
Ok(())
}

Database Migration

Migrate data between different databases with automatic schema mapping:

use heliosdb_etl::{AutomatedETLEngine, SchemaInferenceConfig, SchemaMapper};
async fn migrate_database(
source_connection: &str,
target_connection: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let engine = AutomatedETLEngine::new(SchemaInferenceConfig {
sample_size: 50_000,
confidence_threshold: 0.85,
infer_relationships: true,
..Default::default()
}).await?;
// Fetch source data (pseudocode)
let source_data = fetch_from_database(source_connection, "SELECT * FROM users").await?;
// Infer source schema
let source_schema = engine.infer_schema("users", &source_data).await?;
// Define target schema (or infer from existing table)
let target_schema = get_target_schema(target_connection, "users").await?;
// Create schema mapper with fuzzy matching
let mapper = SchemaMapper::new(0.7); // 70% similarity threshold
let mappings = mapper.map_schemas(&source_schema, &target_schema).await?;
println!("Schema mappings:");
for mapping in &mappings {
println!(" {} -> {} (confidence: {:.2})",
mapping.source_column,
mapping.target_column,
mapping.confidence);
}
// Build pipeline with mappings
let pipeline = engine.build_pipeline_with_mappings(
source_schema,
target_schema,
mappings,
).await?;
// Execute migration
let result = pipeline.execute(source_data).await?;
// Write to target (pseudocode)
write_to_database(target_connection, &result.transformed_data).await?;
println!("Migration complete: {} rows migrated", result.rows_processed);
Ok(())
}

Custom Transformations

Apply custom transformation rules to your data:

use heliosdb_etl::*;
use uuid::Uuid;
async fn custom_transformations() -> Result<(), Box<dyn std::error::Error>> {
let engine = TransformationEngine::default();
// Define transformation rules
let rules = vec![
// Trim and uppercase names
TransformationRule {
id: Uuid::new_v4().to_string(),
name: "normalize_name".to_string(),
source_column: "name".to_string(),
target_column: "name".to_string(),
operations: vec![
TransformationOp::Normalize {
operation: NormalizeOp::Trim,
params: HashMap::new(),
},
TransformationOp::Normalize {
operation: NormalizeOp::Uppercase,
params: HashMap::new(),
},
],
confidence: 1.0,
priority: 0,
},
// Convert string to integer
TransformationRule {
id: Uuid::new_v4().to_string(),
name: "convert_age".to_string(),
source_column: "age".to_string(),
target_column: "age".to_string(),
operations: vec![
TransformationOp::TypeConversion {
from_type: DataType::String,
to_type: DataType::Integer,
format: None,
},
],
confidence: 1.0,
priority: 1,
},
// Parse date from string
TransformationRule {
id: Uuid::new_v4().to_string(),
name: "parse_date".to_string(),
source_column: "birth_date".to_string(),
target_column: "birth_date".to_string(),
operations: vec![
TransformationOp::TypeConversion {
from_type: DataType::String,
to_type: DataType::Date,
format: Some("%Y-%m-%d".to_string()),
},
],
confidence: 1.0,
priority: 2,
},
];
// Apply transformations
let source_data = vec![
HashMap::from([
("name".to_string(), " john doe ".to_string()),
("age".to_string(), "30".to_string()),
("birth_date".to_string(), "1994-05-15".to_string()),
]),
];
let transformed = engine.transform_batch(source_data, &rules).await?;
println!("Transformed data:");
for row in &transformed {
println!(" {:?}", row);
}
Ok(())
}

Data Quality Validation

Validate data quality with comprehensive metrics:

use heliosdb_etl::{DataQualityValidator, QualityConfig};
async fn validate_data_quality() -> Result<(), Box<dyn std::error::Error>> {
let validator = DataQualityValidator::new();
// Configure quality thresholds
let config = QualityConfig {
min_completeness: 0.95, // 95% non-null
min_accuracy: 0.98, // 98% valid values
min_consistency: 0.99, // 99% consistent
min_uniqueness: 1.0, // 100% unique for primary keys
check_duplicates: true,
check_referential: true,
};
// Validate dataset
let metrics = validator.validate_with_config(&data, &schema, &config).await?;
println!("Data Quality Report:");
println!("=====================");
println!("Completeness: {:.1}% (threshold: {:.1}%)",
metrics.completeness * 100.0, config.min_completeness * 100.0);
println!("Accuracy: {:.1}% (threshold: {:.1}%)",
metrics.accuracy * 100.0, config.min_accuracy * 100.0);
println!("Consistency: {:.1}% (threshold: {:.1}%)",
metrics.consistency * 100.0, config.min_consistency * 100.0);
println!("Uniqueness: {:.1}% (threshold: {:.1}%)",
metrics.uniqueness * 100.0, config.min_uniqueness * 100.0);
println!("Timeliness: {:.1}%", metrics.timeliness * 100.0);
println!("---------------------");
println!("Overall Score: {:.1}%", metrics.overall_score * 100.0);
// Check if quality meets thresholds
if metrics.passes_thresholds(&config) {
println!("\n[PASS] Data quality meets all thresholds");
} else {
println!("\n[FAIL] Data quality below thresholds");
for issue in &metrics.issues {
println!(" - {}", issue);
}
}
Ok(())
}

Anomaly Detection

Detect and handle data anomalies:

use heliosdb_etl::{AnomalyDetector, AnomalyType};
async fn detect_anomalies() -> Result<(), Box<dyn std::error::Error>> {
// Create detector with 80% sensitivity
let detector = AnomalyDetector::new(0.8);
// Detect anomalies in dataset
let anomalies = detector.detect(&data, &schema).await?;
println!("Detected {} anomalies:", anomalies.len());
for anomaly in &anomalies {
let severity = match anomaly.anomaly_type {
AnomalyType::TypeMismatch => "HIGH",
AnomalyType::UnexpectedNull => "MEDIUM",
AnomalyType::RangeViolation => "MEDIUM",
AnomalyType::FormatViolation => "LOW",
AnomalyType::Outlier => "LOW",
};
println!(" [{severity}] Row {}: {} - {} (confidence: {:.2})",
anomaly.row_id,
anomaly.column,
anomaly.explanation,
anomaly.confidence);
}
// Optionally: filter out anomalous rows
let clean_data: Vec<_> = data.iter()
.enumerate()
.filter(|(i, _)| !anomalies.iter().any(|a| a.row_id == *i))
.map(|(_, row)| row.clone())
.collect();
println!("\nClean data: {} rows (removed {} anomalous rows)",
clean_data.len(), anomalies.len());
Ok(())
}

Change Data Capture (CDC)

Set up real-time data synchronization with CDC:

use heliosdb_etl::{CDCProcessor, ChangeOperation};
use tokio::sync::mpsc;
async fn setup_cdc() -> Result<(), Box<dyn std::error::Error>> {
// Create CDC processor
let processor = CDCProcessor::default();
// Create event channel
let (tx, rx) = mpsc::channel(1000);
// Spawn processor task
let processor_handle = tokio::spawn(async move {
processor.process_events(rx).await
});
// Simulate database changes
for i in 1..=100 {
// INSERT event
let insert_event = CDCProcessor::create_event(
"users".to_string(),
ChangeOperation::Insert,
None, // No old value for insert
Some(HashMap::from([
("id".to_string(), i.to_string()),
("name".to_string(), format!("User {}", i)),
])),
HashMap::from([("id".to_string(), i.to_string())]),
);
tx.send(insert_event).await?;
// UPDATE event
if i % 10 == 0 {
let update_event = CDCProcessor::create_event(
"users".to_string(),
ChangeOperation::Update,
Some(HashMap::from([
("id".to_string(), i.to_string()),
("name".to_string(), format!("User {}", i)),
])),
Some(HashMap::from([
("id".to_string(), i.to_string()),
("name".to_string(), format!("Updated User {}", i)),
])),
HashMap::from([("id".to_string(), i.to_string())]),
);
tx.send(update_event).await?;
}
}
// Close channel to signal completion
drop(tx);
// Wait for processing to complete
let stats = processor_handle.await??;
println!("CDC Processing Complete:");
println!(" Inserts: {}", stats.inserts);
println!(" Updates: {}", stats.updates);
println!(" Deletes: {}", stats.deletes);
println!(" Latency (P50): {:.2}ms", stats.latency_p50_ms);
println!(" Latency (P99): {:.2}ms", stats.latency_p99_ms);
Ok(())
}

Streaming ETL Pipeline

Process large datasets with streaming/chunked processing:

use heliosdb_etl::{AutomatedETLEngine, PipelineConfig, SchemaInferenceConfig};
async fn streaming_etl(file_path: &str) -> Result<(), Box<dyn std::error::Error>> {
let engine = AutomatedETLEngine::new(SchemaInferenceConfig {
sample_size: 1_000, // Sample for schema
..Default::default()
}).await?;
// Sample first chunk for schema inference
let sample_data = read_csv_chunk(file_path, 0, 1000)?;
let schema = engine.infer_schema("stream", &sample_data).await?;
// Configure pipeline for streaming
let pipeline_config = PipelineConfig {
name: "streaming_pipeline".to_string(),
source_schema: schema.clone(),
target_schema: schema,
batch_size: 10_000, // Process 10K rows at a time
parallelism: 8,
quality_checks: false, // Skip for speed
anomaly_detection: false,
..Default::default()
};
let pipeline = engine.build_pipeline_with_config(pipeline_config).await?;
// Stream process file in chunks
let chunk_size = 10_000;
let mut offset = 0;
let mut total_processed = 0;
loop {
let chunk = read_csv_chunk(file_path, offset, chunk_size)?;
if chunk.is_empty() {
break;
}
let result = pipeline.execute(chunk).await?;
total_processed += result.rows_processed;
offset += chunk_size;
println!("Processed {} rows (total: {})",
result.rows_processed, total_processed);
}
println!("\nStreaming ETL complete: {} total rows", total_processed);
Ok(())
}
fn read_csv_chunk(
path: &str,
offset: usize,
limit: usize
) -> Result<Vec<HashMap<String, String>>, Box<dyn std::error::Error>> {
// Implementation: read CSV with offset and limit
todo!()
}

Performance Comparison

ScenarioRowsTimeThroughput
CSV Import (small)10,0000.1s100K rows/s
CSV Import (large)1,000,0007.2s139K rows/s
Database Migration500,0004.5s111K rows/s
Streaming ETL10,000,00072s139K rows/s
With Quality Checks100,0001.1s91K rows/s