HeliosDB Lakehouse Integration User Guide
HeliosDB Lakehouse Integration User Guide
Table of Contents
- Overview
- Core Lakehouse Concepts
- Cloud Storage Setup
- Apache Iceberg
- Delta Lake
- Apache Hudi
- Multi-Format Federation
- Format Comparison
- Real-World Examples
- Performance Optimization
- Integration Patterns
- Troubleshooting
- Migration Guide
Overview
HeliosDB provides unified integration with the three major open-source lakehouse formats: Apache Iceberg, Delta Lake, and Apache Hudi. This guide helps you understand, configure, and optimize lakehouse integration for your use cases.
What is a Lakehouse?
A lakehouse combines the benefits of data lakes and data warehouses:
Data Lake Advantages:
- Schema flexibility with schema-on-read capabilities
- Support for all data types (structured, semi-structured, unstructured)
- Cost-effective storage on cloud object storage
- Scalable to petabytes of data
Data Warehouse Advantages:
- ACID transactions for data consistency
- Optimized analytical query performance
- Schema management and governance
- Time-travel and versioning capabilities
Traditional data lakes lack transaction support and data quality guarantees. Traditional warehouses are expensive and inflexible. Lakehouse formats bridge this gap.
Why Use HeliosDB’s Lakehouse Integration?
- Multi-Format Support: Query Iceberg, Delta, and Hudi tables with a single unified interface
- Automatic Format Detection: HeliosDB automatically detects table format and applies appropriate handlers
- Unified Time-Travel: Access historical data across all formats with consistent syntax
- Cloud Native: Seamless integration with S3, Azure Blob Storage, and Google Cloud Storage
- Schema Evolution: Handle schema changes without data migration
- Performance: Optimized query execution with partition pruning and predicate pushdown
Format Overview Comparison
| Aspect | Apache Iceberg | Delta Lake | Apache Hudi |
|---|---|---|---|
| Maturity | 2 years | 3 years | 4 years |
| Cloud Providers | All major clouds | Databricks-focused | All major clouds |
| Transaction Model | MVCC | Optimistic concurrency | Optimistic concurrency |
| Time Travel | Yes | Yes (VERSION/TIMESTAMP) | Yes (incremental) |
| Schema Evolution | Full support | Schema merging | Limited |
| Best For | Enterprise data lakes | Databricks ecosystem | Real-time ingestion |
Core Lakehouse Concepts
ACID Transactions
Lakehouse formats provide ACID guarantees:
- Atomicity: Updates either fully complete or fully roll back
- Consistency: Data remains in valid state
- Isolation: Concurrent readers don’t see partial writes
- Durability: Committed data survives failures
This enables:
-- Traditional data lakes can't guarantee this completes fullyUPDATE lakehouse_table SET status = 'processed' WHERE id = 123;
-- If update fails midway, all changes roll back automaticallyTime-Travel Queries
Access data as it existed at a previous point in time:
-- View data from 7 days agoSELECT * FROM orders FOR SYSTEM_TIME AS OF TIMESTAMP '2024-12-23 10:00:00';
-- View data from specific versionSELECT * FROM products FOR VERSION AS OF 5;
-- Incremental query (Hudi-specific)SELECT * FROM events FOR INCREMENTAL QUERY BETWEEN '20240101' AND '20240102';Benefits:
- Audit trails: See what changed and when
- Compliance: Maintain historical records
- Bug fixes: Identify when data corruption occurred
- Recovery: Rewind to known-good state
Schema Evolution
Schema evolution allows changes without rewriting data:
- Add columns with optional default values
- Remove columns (they appear as NULL in old data)
- Rename columns transparently
- Change data types with validation
HeliosDB handles these transparently:
-- Originally: (id, name, email)-- Add new column with defaultALTER TABLE customers ADD COLUMN phone_number VARCHAR;
-- Query works on all versions automaticallySELECT id, name, email, phone_number FROM customers;Snapshots and Versions
Each format maintains a history of table states:
- Iceberg Snapshots: Immutable point-in-time views
- Delta Versions: Sequential transaction log entries
- Hudi Commits: Timeline-based versioning
Snapshots enable safe concurrent access:
Time ──────────────────────────────────┬──────────→ │ │ │ │ │ V1 V2 V3 V4 V5 (latest)
Reader A: Read from V2 (snapshot isolated)Reader B: Read from V5 (latest data)Writer: Writing V6 (doesn't block readers)Cloud Storage Setup
AWS S3 Configuration
Authentication Methods:
- IAM Role (recommended for EC2/Lambda):
No configuration needed - uses IAM role automatically- Access Key (for applications):
export AWS_ACCESS_KEY_ID=your_access_keyexport AWS_SECRET_ACCESS_KEY=your_secret_keyexport AWS_DEFAULT_REGION=us-east-1- S3 Client Configuration:
let s3_config = S3StorageConfig { bucket: "my-data-lake".to_string(), region: "us-east-1".to_string(), endpoint: None, // Use default S3 access_key: None, // Use IAM role secret_key: None,};S3 Bucket Best Practices:
s3://my-data-lake/├── iceberg/│ ├── warehouse/│ │ ├── namespace1.db/│ │ │ ├── table1/│ │ │ └── table2/│ │ └── namespace2.db/├── delta/│ ├── orders/│ ├── customers/│ └── products/└── hudi/ ├── events/ ├── logs/ └── transactions/Azure Blob Storage Setup
Authentication:
- Managed Identity (recommended):
Automatically uses the Azure AD identity of your resource- Storage Account Key:
export AZURE_STORAGE_ACCOUNT=myaccountexport AZURE_STORAGE_KEY=your_storage_key- Configuration:
let azure_config = AzureStorageConfig { container: "datalake".to_string(), account: "myaccount".to_string(), auth_method: AuthMethod::ManagedIdentity,};Google Cloud Storage Setup
Authentication:
- Service Account (recommended):
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json- Configuration:
let gcs_config = GcsStorageConfig { bucket: "my-data-lake".to_string(), project: "my-project".to_string(), credentials_path: Some("/path/to/service-account.json".to_string()),};Apache Iceberg
What is Apache Iceberg?
Apache Iceberg is an open table format designed for large analytic datasets. Created by Netflix and now Apache Software Foundation project.
Key Characteristics:
- Copy-on-write semantics for metadata
- Strict schema with evolution support
- Partition evolution without rewriting data
- Fast metadata operations (no need to list files)
- Schema versioning for data compatibility
Setting Up Iceberg Tables
Create Iceberg Table via SQL:
-- Create new Iceberg tableCREATE TABLE IF NOT EXISTS customers ( id INT, name VARCHAR, email VARCHAR, created_at TIMESTAMP)USING icebergPARTITIONED BY (YEAR(created_at), BUCKET(4, id))LOCATION 's3://my-bucket/iceberg/customers';
-- Or create external table from existing dataCREATE EXTERNAL TABLE customers_existingUSING icebergLOCATION 's3://my-bucket/iceberg/customers_legacy';Table Properties:
CREATE TABLE logs ( timestamp TIMESTAMP, level VARCHAR, message VARCHAR)USING icebergLOCATION 's3://my-bucket/logs'WITH ( target_file_size = '134217728', -- 128MB files format_version = '2', -- Use v2 format write_distribution_mode = 'range' -- Range-based distribution);Schema Evolution with Iceberg
Add, remove, or rename columns without data migration:
-- Add column with defaultALTER TABLE customers ADD COLUMN phone VARCHAR DEFAULT 'N/A';
-- Add column without default (new rows must provide)ALTER TABLE customers ADD COLUMN vip_tier INT;
-- Rename columnALTER TABLE customers RENAME COLUMN email TO contact_email;
-- Drop column (marked as deleted, not removed)ALTER TABLE customers DROP COLUMN phone;
-- Change column typeALTER TABLE customers ALTER COLUMN id TYPE BIGINT;Time-Travel Queries with Iceberg
-- Query at specific timestampSELECT * FROM customersFOR SYSTEM_TIME AS OF TIMESTAMP '2024-12-20 15:30:00';
-- Query at specific snapshotSELECT * FROM customersFOR VERSION AS OF 42;
-- Find differences between versionsSELECT t1.id, t1.name, t2.name as new_nameFROM customers FOR VERSION AS OF 41 t1FULL OUTER JOIN customers FOR VERSION AS OF 42 t2 ON t1.id = t2.idWHERE t1.name <> t2.name OR t2.name IS NULL;
-- Audit trail - show when each record changedSELECT id, name, CAST(__metadata.snapshot_id AS VARCHAR) as version, __metadata.timestamp as change_timeFROM customers FOR ALL VERSIONSORDER BY id, version;Partition Evolution
Change partition schemes without rewriting existing data:
-- Original partitioning: by year-- Create table with annual partitioningCREATE TABLE events ( timestamp TIMESTAMP, event_type VARCHAR, user_id INT)USING icebergPARTITIONED BY (YEAR(timestamp))LOCATION 's3://my-bucket/events';
-- After time, you realize month-level partitioning is better-- Add new partition spec (doesn't rewrite existing data)ALTER TABLE events ADD PARTITION SPEC (MONTH(timestamp));
-- New data written with monthly partitions, old data stays yearly-- Queries automatically handle both partition layoutsSELECT * FROM events WHERE timestamp > '2024-06-01';Iceberg Performance Considerations
- Metadata Caching: Iceberg metadata is cached in memory
- Partition Pruning: Efficient filtering on partition columns
- File Statistics: Block-level statistics enable predicate pushdown
- Hidden Partitions: Partition columns are hidden from queries
-- Good: Uses partition pruningSELECT * FROM events WHERE YEAR(timestamp) = 2024; -- Fast
-- Better: Direct partition column referenceSELECT * FROM events WHERE year = 2024; -- Faster
-- Use EXPLAIN to see partition pruningEXPLAIN SELECT * FROM events WHERE YEAR(timestamp) = 2024;Delta Lake
What is Delta Lake?
Delta Lake is an open-source format created by Databricks. Adds ACID transactions to Apache Spark and cloud storage.
Key Characteristics:
- Transaction log tracks all changes
- Optimized for OLAP queries on Spark
- Unity Catalog integration for governance
- OPTIMIZE and VACUUM operations
- Z-order clustering for common access patterns
Setting Up Delta Tables
Create Delta Table:
CREATE TABLE IF NOT EXISTS orders ( order_id INT, customer_id INT, order_date DATE, amount DECIMAL(10,2), status VARCHAR)USING deltaPARTITIONED BY (order_date)LOCATION 's3://my-bucket/delta/orders';
-- Create with propertiesCREATE TABLE IF NOT EXISTS products ( product_id INT, name VARCHAR, category VARCHAR, price DECIMAL(10,2))USING deltaLOCATION 's3://my-bucket/delta/products'TBLPROPERTIES ( 'delta.dataSkippingNumIndexedCols' = '32', 'delta.enableChangeDataFeed' = 'true');Schema Merging in Delta Lake
Delta Lake supports schema merging during writes:
-- Original schema: (id, name)-- INSERT with extra columns (description, created_at)INSERT INTO customers VALUES (1, 'Alice', 'Active customer', '2024-01-01');-- Schema automatically expanded to (id, name, description, created_at)-- Existing rows get NULL for new columnsTime-Travel with Delta Lake
-- Using VERSION AS OFSELECT * FROM orders FOR VERSION AS OF 5;
-- Using TIMESTAMP AS OFSELECT * FROM orders FOR SYSTEM_TIME AS OF '2024-12-20 10:00:00';
-- Timeline: show what changedSELECT * FROM orders VERSION HISTORY;-- Shows: version, timestamp, user, operation, etc.
-- Calculate daily changesWITH prev AS ( SELECT * FROM orders FOR VERSION AS OF 4)SELECT current.*, prev.amount as prev_amountFROM (SELECT * FROM orders FOR VERSION AS OF 5) currentFULL OUTER JOIN prev ON current.order_id = prev.order_id;OPTIMIZE and VACUUM
Maintain performance and manage storage:
-- Compact small files into larger onesOPTIMIZE ordersUSING ZORDER BY (customer_id, order_date);-- This reorders data by customer_id then date for faster queries
-- Remove old versions (default: 30 days retention)VACUUM orders;
-- Keep 90 days of historyVACUUM orders RETAIN 90 DAYS;
-- See what would be deleted (dry run)VACUUM orders DRY RUN;
-- Specific retentionVACUUM orders RETAIN 7 DAYS; -- Aggressive: keep one week onlyZ-Ordering for Performance
Cluster related data together for faster queries:
-- Before: random order-- File 1: customer_id [1, 500, 1000, 50]-- File 2: customer_id [100, 2000, 30, 800]
-- After Z-orderingOPTIMIZE transactions USING ZORDER BY (customer_id);-- File 1: customer_id [1-250]-- File 2: customer_id [251-500]
-- Query benefit: skip files not matching predicateSELECT * FROM transactions WHERE customer_id = 123; -- Only reads File 1Apache Hudi
What is Apache Hudi?
Apache Hudi (Hadoop Unified Data Ingestion) enables incremental processing on data lakes.
Key Characteristics:
- Copy-on-Write (COW) and Merge-on-Read (MOR) table types
- Incremental queries for efficient processing
- Upserts and bulk inserts with deduplication
- Compaction to manage file size
- Timeline for fine-grained versioning
COW vs MOR Tables
Copy-on-Write (COW) Tables:
- New versions create completely new files
- Fast reads, slower writes
- Best for: Read-heavy workloads, data warehousing
Write: id=1, value=100 Read latest: 1-100 (single file read)
Write: id=1, value=200 Entire dataset rewritten Read latest: 1-200 (single file read, but bigger files)Merge-on-Read (MOR) Tables:
- Updates stored in delta files
- Base files + deltas = complete view
- Compaction merges deltas into base files periodically
- Fast writes, slightly slower reads
Write: id=1, value=100 Read latest: 1-100 (base file)
Write: id=1, value=200 Delta file written (small) Read latest: merge base (1-100) + delta (1-200) = 1-200 Lazy compaction groups deltas with base files laterSetting Up Hudi Tables
Create Hudi Table (COW):
CREATE TABLE IF NOT EXISTS events ( event_id VARCHAR, timestamp TIMESTAMP, user_id INT, event_type VARCHAR, properties VARCHAR)USING hudiWITH ( type = 'cow', primaryKey = 'event_id', preCombineKey = 'timestamp')LOCATION 's3://my-bucket/hudi/events';Create Hudi Table (MOR):
CREATE TABLE IF NOT EXISTS logs ( log_id VARCHAR, timestamp TIMESTAMP, level VARCHAR, message VARCHAR)USING hudiWITH ( type = 'mor', primaryKey = 'log_id', preCombineKey = 'timestamp', compactionStrategy = 'async')LOCATION 's3://my-bucket/hudi/logs';Incremental Queries with Hudi
Efficiently process only new/changed data:
-- Get data added/changed since specific commitSELECT * FROM eventsINCREMENTAL QUERYWHERE _hoodie_commit_time > '20240101000000'AND _hoodie_commit_time <= '20240102000000';
-- More efficient than reading entire table-- Perfect for: ELT pipelines, streaming ingestion
-- Example: Daily reconciliation-- Only process yesterday's changesWITH yesterday_changes AS ( SELECT * FROM transactions INCREMENTAL QUERY WHERE _hoodie_commit_time >= '20240101000000' AND _hoodie_commit_time < '20240102000000')SELECT customer_id, COUNT(*) as txn_count, SUM(amount) as daily_totalFROM yesterday_changesGROUP BY customer_id;Compaction in Hudi
Merge delta files into base files to optimize reads:
-- Run inline compaction (synchronous)ALTER TABLE events COMPACT;
-- Run async compaction (happens in background)CALL RUN_COMPACTION('events');
-- Compaction strategies:-- - inline: During writes, slower writes but good reads-- - async: Scheduled background task, independent from writes-- - schedule: Time-based compaction
-- Monitor compactionSELECT _hoodie_commit_time, _hoodie_file_name, COUNT(*) as record_countFROM eventsGROUP BY _hoodie_commit_time, _hoodie_file_nameORDER BY _hoodie_commit_time DESCLIMIT 50;Multi-Format Federation
Querying Across Formats
HeliosDB allows seamless queries across different lakehouse formats:
-- Join Iceberg and Delta tablesSELECT i.customer_id, i.name, d.order_id, d.amountFROM iceberg_customers iINNER JOIN delta_orders d ON i.customer_id = d.customer_idWHERE d.order_date > '2024-01-01';
-- Combine all three formatsSELECT 'iceberg' as source, COUNT(*) as record_countFROM iceberg_tableUNION ALLSELECT 'delta' as source, COUNT(*) as record_countFROM delta_tableUNION ALLSELECT 'hudi' as source, COUNT(*) as record_countFROM hudi_table;Format Conversion
Migrate data between formats:
-- Create target tableCREATE TABLE customers_iceberg ( id INT, name VARCHAR, email VARCHAR, created_at TIMESTAMP)USING icebergLOCATION 's3://my-bucket/iceberg/customers';
-- Copy data from DeltaINSERT INTO customers_icebergSELECT * FROM customers_delta;
-- Verify counts matchSELECT COUNT(*) as iceberg_count FROM customers_icebergUNION ALLSELECT COUNT(*) as delta_count FROM customers_delta;Format Comparison
When to Use Each Format
Use Apache Iceberg when:
- Building enterprise data lakes with strict schemas
- Requiring partition evolution and advanced features
- Operating in multi-cloud environments
- Needing hidden partitions for query efficiency
- Integrating with Trino, Spark, Flink
Use Delta Lake when:
- Deeply integrated with Databricks ecosystem
- Leveraging Unity Catalog for governance
- Standardized on Apache Spark
- Need OPTIMIZE/VACUUM simplicity
- Want Z-order clustering
Use Apache Hudi when:
- Processing real-time streaming data
- Requiring frequent upserts/updates
- Need incremental query efficiency
- Building near real-time analytics
- Operating on tight budgets (MOR efficiency)
Feature Comparison
| Feature | Iceberg | Delta | Hudi |
|---|---|---|---|
| ACID Transactions | Yes | Yes | Yes |
| Time Travel | Full snapshots | Versions + timestamps | Incremental |
| Schema Evolution | Full | Merging | Limited |
| Partition Evolution | Yes | No | No |
| Incremental Queries | No | Change feed | Yes (native) |
| Compaction Required | No | OPTIMIZE | Yes |
| Upserts/Deletes | Yes | Yes | Yes |
| Hidden Partitions | Yes | No | No |
| Multi-cloud Ready | Yes | Azure/GCP limited | Yes |
| Community Size | Growing | Large (Databricks) | Enterprise |
Real-World Examples
Example 1: Data Lake Migration from Traditional Warehouse
Scenario: Migrating from on-prem Teradata to cloud lakehouse
-- Step 1: Create lakehouse tables (Iceberg chosen for flexibility)CREATE TABLE IF NOT EXISTS raw_sales ( sale_id BIGINT, store_id INT, product_id INT, quantity INT, amount DECIMAL(10,2), sale_date DATE, sale_timestamp TIMESTAMP)USING icebergPARTITIONED BY (sale_date)LOCATION 's3://my-datalake/raw/sales';
-- Step 2: Initial load with timestamp for auditingINSERT INTO raw_salesSELECT sale_id, store_id, product_id, quantity, amount, sale_date, CURRENT_TIMESTAMP as sale_timestampFROM teradata_extract;
-- Step 3: Implement incremental sync-- Daily job:WITH source_data AS ( SELECT * FROM teradata_incremental WHERE extraction_date = CURRENT_DATE)MERGE INTO raw_sales tUSING source_data sON t.sale_id = s.sale_idWHEN MATCHED THEN UPDATE SET quantity = s.quantity, amount = s.amount, sale_timestamp = CURRENT_TIMESTAMPWHEN NOT MATCHED THEN INSERT *;
-- Step 4: Create curated layerCREATE TABLE IF NOT EXISTS sales_by_store_day ( store_id INT, sale_date DATE, daily_sales DECIMAL(12,2), transaction_count INT, last_updated TIMESTAMP)USING icebergPARTITIONED BY (sale_date)LOCATION 's3://my-datalake/curated/sales_by_store_day';
-- Step 5: Pipeline to populate curated layerINSERT INTO sales_by_store_daySELECT store_id, sale_date, SUM(amount) as daily_sales, COUNT(*) as transaction_count, CURRENT_TIMESTAMP as last_updatedFROM raw_salesWHERE sale_date = CURRENT_DATE - 1GROUP BY store_id, sale_date;Benefits realized:
- Eliminated hardware costs (cloud storage is cheaper)
- Improved query performance with partition pruning
- Enabled time-travel for auditing
- No vendor lock-in (can query from Trino, Spark, Flink)
Example 2: Multi-Cloud Lakehouse Setup
Scenario: Organization with multiple cloud providers wanting unified analytics
-- S3 Iceberg tables (AWS primary)CREATE TABLE IF NOT EXISTS us_sales ( sale_id BIGINT, region VARCHAR, amount DECIMAL(10,2), timestamp TIMESTAMP)USING icebergPARTITIONED BY (YEAR(timestamp), MONTH(timestamp))LOCATION 's3://us-datalake/sales';
-- Azure Blob Storage Delta tablesCREATE TABLE IF NOT EXISTS eu_sales ( sale_id BIGINT, region VARCHAR, amount DECIMAL(10,2), timestamp TIMESTAMP)USING deltaPARTITIONED BY (timestamp)LOCATION 'abfs://eu-datalake@storage.dfs.core.windows.net/sales';
-- GCS Hudi tables for streamingCREATE TABLE IF NOT EXISTS apac_events ( event_id VARCHAR, region VARCHAR, event_type VARCHAR, timestamp TIMESTAMP)USING hudiWITH ( type = 'mor', primaryKey = 'event_id')LOCATION 'gs://apac-datalake/events';
-- Unified global reportingCREATE TABLE global_sales_summary ASSELECT 'us' as source, region, SUM(amount) as total_sales, COUNT(*) as transaction_count, MAX(timestamp) as latest_saleFROM us_salesWHERE YEAR(timestamp) = 2024GROUP BY region
UNION ALL
SELECT 'eu' as source, region, SUM(amount) as total_sales, COUNT(*) as transaction_count, MAX(timestamp) as latest_saleFROM eu_salesWHERE YEAR(timestamp) = 2024GROUP BY region
UNION ALL
SELECT 'apac' as source, region, SUM(CAST(amount AS DECIMAL(10,2))) as total_sales, COUNT(*) as transaction_count, MAX(timestamp) as latest_saleFROM apac_eventsWHERE YEAR(timestamp) = 2024GROUP BY region;
-- Multi-cloud insightsSELECT source, region, total_sales, ROUND(100 * total_sales / SUM(total_sales) OVER (PARTITION BY source), 2) as pct_of_regionFROM global_sales_summaryORDER BY source, total_sales DESC;Benefits realized:
- No vendor lock-in to single cloud
- Regulatory compliance (data residency)
- Cost optimization (use best pricing by region)
- Unified analytics despite distributed architecture
Example 3: Time-Travel for Compliance Auditing
Scenario: Financial institution needs audit trail for regulatory compliance
-- Create transactions table with IcebergCREATE TABLE transactions ( txn_id VARCHAR, account_id VARCHAR, amount DECIMAL(15,2), transaction_type VARCHAR, status VARCHAR, timestamp TIMESTAMP, last_modified TIMESTAMP)USING icebergPARTITIONED BY (DATE(timestamp))LOCATION 's3://financial-datalake/transactions';
-- Quarterly compliance check-- "Show me all transactions that were rejected, and when they were marked as such"WITH current_state AS ( SELECT txn_id, account_id, amount, status, 'current' as state_type FROM transactions WHERE status = 'REJECTED'),previous_states AS ( SELECT txn_id, account_id, amount, status, 'version_5' as state_type -- Compare against previous version FROM transactions FOR VERSION AS OF 4 WHERE status = 'REJECTED')SELECT current_state.*, previous_states.status as previous_statusFROM current_stateFULL OUTER JOIN previous_states ON current_state.txn_id = previous_states.txn_idWHERE current_state.status <> previous_states.status OR previous_states.status IS NULLORDER BY current_state.timestamp DESC;
-- Audit log: show who changed what transactionSELECT txn_id, CAST(__metadata.snapshot_id AS VARCHAR) as version, __metadata.timestamp as version_timestamp, status, (SELECT COUNT(*) FROM transactions t2 WHERE t2.txn_id = t1.txn_id AND t2.__metadata.snapshot_id <= t1.__metadata.snapshot_id ) as record_countFROM transactions FOR ALL VERSIONS t1WHERE txn_id = 'TXN-2024-123456'ORDER BY __metadata.snapshot_id;
-- Recovery: restore transaction to previous stateINSERT INTO transactions_recoverySELECT * FROM transactions FOR VERSION AS OF 50WHERE txn_id = 'TXN-2024-CORRUPTED';Benefits realized:
- Complete audit trail for regulators
- Ability to prove historical accuracy
- Recovery from accidental changes
- No additional storage cost (versioning built-in)
Performance Optimization
Partitioning Strategies
Good Partitioning:
-- Partition by time (most common)CREATE TABLE events ( event_id VARCHAR, timestamp TIMESTAMP, user_id INT, event_type VARCHAR)USING icebergPARTITIONED BY (DATE(timestamp), event_type) -- Multi-dimensional partitionLOCATION 's3://datalake/events';
-- Example: Query only scans relevant partitionsSELECT COUNT(*) FROM eventsWHERE DATE(timestamp) = '2024-12-20' -- Reads only Dec 20 partition AND event_type = 'click'; -- Further filters partition
-- Avoid over-partitioning-- Too many partitions = metadata overhead-- Rule of thumb: 100-10000 files per partitionIceberg Hidden Partitions:
-- Iceberg automatically manages partition columnsCREATE TABLE orders ( order_id INT, timestamp TIMESTAMP, customer_id INT, amount DECIMAL(10,2))USING icebergPARTITIONED BY (BUCKET(10, customer_id), MONTH(timestamp));
-- Partitions are hidden - you don't specify them in queriesSELECT * FROM ordersWHERE customer_id = 123 AND timestamp > '2024-06-01';-- Iceberg handles partition pruning automaticallyFile Format Selection
-- Parquet (recommended for analytics)-- Columnar format, good compression, predicate pushdownCREATE TABLE analytics ( id INT, name VARCHAR, metrics STRUCT<views: INT, clicks: INT>)USING icebergWITH ( write_format = 'parquet')LOCATION 's3://datalake/analytics';
-- ORC (Apache Hadoop ecosystem)-- Slightly better compression than Parquet, less universalCREATE TABLE logs ( timestamp TIMESTAMP, message VARCHAR)USING hudiWITH ( write_format = 'orc')LOCATION 's3://datalake/logs';Caching and Indexing
Column Statistics:
-- Enable data skipping by collecting statisticsALTER TABLE large_table SET TBLPROPERTIES ( 'delta.dataSkippingNumIndexedCols' = '32' -- Delta);
-- Query with statistics (rows skipped automatically)SELECT * FROM large_tableWHERE indexed_column = 'value';-- System skips entire files not matching predicatePartition Pruning:
-- Good: forces partition pruningSELECT * FROM salesWHERE sale_date = '2024-12-20';
-- Better: use hidden partitions (Iceberg)SELECT * FROM sales WHERE date_bucket = 'dec_20';
-- Monitor with EXPLAINEXPLAIN SELECT * FROM sales WHERE sale_date = '2024-12-20';-- Shows: Files scanned = 1 (pruned to single partition)Integration Patterns
ELT Pipeline Pattern
-- Bronze layer: Raw data as-isCREATE TABLE bronze_raw_events ( raw_data VARCHAR, ingestion_timestamp TIMESTAMP)USING icebergPARTITIONED BY (DATE(ingestion_timestamp))LOCATION 's3://datalake/bronze/raw_events';
-- Silver layer: Cleaned, deduplicatedCREATE TABLE silver_events ( event_id VARCHAR, event_type VARCHAR, user_id INT, timestamp TIMESTAMP, properties VARCHAR, processed_timestamp TIMESTAMP)USING icebergPARTITIONED BY (DATE(timestamp), event_type)LOCATION 's3://datalake/silver/events';
-- Gold layer: Business-ready analyticsCREATE TABLE gold_events_daily ( event_date DATE, event_type VARCHAR, unique_users INT, total_events INT, top_properties VARCHAR)USING deltaPARTITIONED BY (event_date)LOCATION 's3://datalake/gold/events_daily';
-- Bronze to Silver transformationINSERT INTO silver_eventsSELECT MD5(CONCAT(raw_data, ingestion_timestamp)) as event_id, JSON_EXTRACT_SCALAR(raw_data, '$.type') as event_type, CAST(JSON_EXTRACT_SCALAR(raw_data, '$.user_id') AS INT) as user_id, CAST(JSON_EXTRACT_SCALAR(raw_data, '$.timestamp') AS TIMESTAMP) as timestamp, JSON_EXTRACT_SCALAR(raw_data, '$.properties') as properties, CURRENT_TIMESTAMP as processed_timestampFROM bronze_raw_eventsWHERE ingestion_timestamp = CURRENT_DATE;
-- Silver to Gold aggregationINSERT INTO gold_events_dailySELECT CAST(timestamp AS DATE) as event_date, event_type, COUNT(DISTINCT user_id) as unique_users, COUNT(*) as total_events, JSON_EXTRACT_SCALAR(properties, '$.category') as top_propertiesFROM silver_eventsWHERE DATE(processed_timestamp) = CURRENT_DATEGROUP BY event_date, event_type, top_properties;Real-Time Streaming Pattern
-- Hudi MOR table for streaming ingestionCREATE TABLE events_streaming ( event_id VARCHAR, timestamp TIMESTAMP, user_id INT, event_type VARCHAR, metadata VARCHAR)USING hudiWITH ( type = 'mor', primaryKey = 'event_id', preCombineKey = 'timestamp', compactionStrategy = 'async')LOCATION 's3://datalake/events_stream';
-- Kafka consumer writes to Hudi-- INSERT OR UPDATE based on event_id
-- Daily batch process new events efficientlyINSERT INTO events_dailySELECT DATE(timestamp) as event_date, COUNT(*) as event_count, COUNT(DISTINCT user_id) as unique_usersFROM events_streamingINCREMENTAL QUERY -- Only read new data since last runWHERE _hoodie_commit_time >= DATE_FORMAT(CURRENT_DATE - 1, 'yyyyMMddhhmmss')GROUP BY DATE(timestamp);Troubleshooting
Issue: Slow Queries on Large Tables
Symptoms: Queries take minutes even with filters
Solutions:
- Check partition pruning:
EXPLAIN SELECT * FROM large_table WHERE date = '2024-12-20';-- Look for "Partitions scanned: 1" (good) vs "Partitions scanned: 10000" (bad)- Add statistics:
-- For IcebergALTER TABLE large_table SET TBLPROPERTIES ( 'write.statistics.enabled' = 'true');
-- For DeltaALTER TABLE large_table SET TBLPROPERTIES ( 'delta.dataSkippingNumIndexedCols' = '32');- Optimize file layout:
-- Iceberg: rewrite small filesALTER TABLE large_table REWRITE DATA WHERE date = '2024-12-20';
-- Delta: compact filesOPTIMIZE large_table USING ZORDER BY (most_filtered_column);
-- Hudi: run compactionCALL RUN_COMPACTION('large_table');Issue: “File Not Found” Errors
Symptoms: Intermittent query failures, file list errors
Causes & Fixes:
- Concurrent writes causing file list inconsistency:
-- Ensure single writerSET spark.sql.shuffle.partitions = 200; -- Adjust for parallelismSET spark.sql.files.ignoreCorruptFiles = true; -- Skip corrupted files- Clock skew in distributed system:
Fix system time synchronization (NTP)Verify S3/Cloud clock is synchronized- Permission issues:
# Check S3 accessaws s3 ls s3://my-bucket/iceberg/table --recursive
# Verify IAM role has required permissions:# - s3:GetObject# - s3:PutObject# - s3:ListBucket# - s3:DeleteObjectIssue: Schema Evolution Errors
Symptoms: “Column not found”, unexpected NULL values
Solutions:
- Check schema history:
-- For IcebergSELECT schema_id, schema FROM iceberg_metadata_tablesWHERE table = 'my_table'ORDER BY schema_id DESC;
-- For DeltaSELECT add.schemaString FROM delta.`s3://bucket/table`- Handle missing columns gracefully:
-- Old data missing new columnsSELECT id, name, COALESCE(phone, 'N/A') as phone, -- Handle NULL COALESCE(email, 'N/A') as email -- Handle NULLFROM customersWHERE phone IS NULL OR email IS NULL; -- Find incomplete recordsMigration Guide
From Traditional OLAP to Lakehouse
Phase 1: Preparation
-- Audit existing warehouseSELECT table_name, COUNT(*) as row_count, SUM(bytes)/1024/1024/1024 as size_gbFROM information_schema.tablesGROUP BY table_name;Phase 2: Parallel Run
-- Keep both systems, sync data-- DW → Lakehouse (daily job)INSERT INTO lakehouse_tableSELECT * FROM dw_tableWHERE load_date = CURRENT_DATE;
-- Validate countsSELECT COUNT(*) FROM dw_tableUNION ALLSELECT COUNT(*) FROM lakehouse_table;Phase 3: Cutover
-- Switch applications to lakehouse-- Implement circuit breaker to revert if needed
-- Archive old data warehouseSELECT * FROM dw_table WHERE created_date < '2024-01-01'INSERT INTO lakehouse_archive;Phase 4: Optimization
-- Now that in lakehouse, optimizeOPTIMIZE lakehouse_table USING ZORDER BY (most_filtered_column);
-- Monitor performanceSELECT COUNT(*) FROM lakehouse_table; -- Should be instant after OPTIMIZEInter-Format Migration
-- Delta → Iceberg (gain partition evolution)CREATE TABLE iceberg_customersUSING icebergPARTITIONED BY (YEAR(created_at), BUCKET(10, id))LOCATION 's3://bucket/iceberg/customers';
INSERT INTO iceberg_customersSELECT * FROM delta_customers;
-- VerifySELECT COUNT(*) FROM delta_customers; -- 1000SELECT COUNT(*) FROM iceberg_customers; -- 1000Related Documentation
- Lakehouse Architecture: See
docs/architecture/storage/LAKEHOUSE_ARCHITECTURE.md - SQL Interface Guide: See
docs/features/sql-interface/ - Time-Travel Debugging: See
docs/features/time-travel-debugging-examples.md - Performance Tuning: See
docs/guides/user/PERFORMANCE_TUNING_GUIDE.md - Backup and Recovery: See
docs/guides/user/BACKUP_RECOVERY_GUIDE.md - Security: See
docs/guides/user/SECURITY_HARDENING_V7.md
Last Updated: December 2024 HeliosDB Version: 6.0+ License: Apache-2.0