Skip to content

HeliosDB Lakehouse Integration User Guide

HeliosDB Lakehouse Integration User Guide

Table of Contents

Overview

HeliosDB provides unified integration with the three major open-source lakehouse formats: Apache Iceberg, Delta Lake, and Apache Hudi. This guide helps you understand, configure, and optimize lakehouse integration for your use cases.

What is a Lakehouse?

A lakehouse combines the benefits of data lakes and data warehouses:

Data Lake Advantages:

  • Schema flexibility with schema-on-read capabilities
  • Support for all data types (structured, semi-structured, unstructured)
  • Cost-effective storage on cloud object storage
  • Scalable to petabytes of data

Data Warehouse Advantages:

  • ACID transactions for data consistency
  • Optimized analytical query performance
  • Schema management and governance
  • Time-travel and versioning capabilities

Traditional data lakes lack transaction support and data quality guarantees. Traditional warehouses are expensive and inflexible. Lakehouse formats bridge this gap.

Why Use HeliosDB’s Lakehouse Integration?

  1. Multi-Format Support: Query Iceberg, Delta, and Hudi tables with a single unified interface
  2. Automatic Format Detection: HeliosDB automatically detects table format and applies appropriate handlers
  3. Unified Time-Travel: Access historical data across all formats with consistent syntax
  4. Cloud Native: Seamless integration with S3, Azure Blob Storage, and Google Cloud Storage
  5. Schema Evolution: Handle schema changes without data migration
  6. Performance: Optimized query execution with partition pruning and predicate pushdown

Format Overview Comparison

AspectApache IcebergDelta LakeApache Hudi
Maturity2 years3 years4 years
Cloud ProvidersAll major cloudsDatabricks-focusedAll major clouds
Transaction ModelMVCCOptimistic concurrencyOptimistic concurrency
Time TravelYesYes (VERSION/TIMESTAMP)Yes (incremental)
Schema EvolutionFull supportSchema mergingLimited
Best ForEnterprise data lakesDatabricks ecosystemReal-time ingestion

Core Lakehouse Concepts

ACID Transactions

Lakehouse formats provide ACID guarantees:

  • Atomicity: Updates either fully complete or fully roll back
  • Consistency: Data remains in valid state
  • Isolation: Concurrent readers don’t see partial writes
  • Durability: Committed data survives failures

This enables:

-- Traditional data lakes can't guarantee this completes fully
UPDATE lakehouse_table SET status = 'processed' WHERE id = 123;
-- If update fails midway, all changes roll back automatically

Time-Travel Queries

Access data as it existed at a previous point in time:

-- View data from 7 days ago
SELECT * FROM orders FOR SYSTEM_TIME AS OF TIMESTAMP '2024-12-23 10:00:00';
-- View data from specific version
SELECT * FROM products FOR VERSION AS OF 5;
-- Incremental query (Hudi-specific)
SELECT * FROM events FOR INCREMENTAL QUERY BETWEEN '20240101' AND '20240102';

Benefits:

  • Audit trails: See what changed and when
  • Compliance: Maintain historical records
  • Bug fixes: Identify when data corruption occurred
  • Recovery: Rewind to known-good state

Schema Evolution

Schema evolution allows changes without rewriting data:

  • Add columns with optional default values
  • Remove columns (they appear as NULL in old data)
  • Rename columns transparently
  • Change data types with validation

HeliosDB handles these transparently:

-- Originally: (id, name, email)
-- Add new column with default
ALTER TABLE customers ADD COLUMN phone_number VARCHAR;
-- Query works on all versions automatically
SELECT id, name, email, phone_number FROM customers;

Snapshots and Versions

Each format maintains a history of table states:

  • Iceberg Snapshots: Immutable point-in-time views
  • Delta Versions: Sequential transaction log entries
  • Hudi Commits: Timeline-based versioning

Snapshots enable safe concurrent access:

Time ──────────────────────────────────┬──────────→
│ │ │ │ │
V1 V2 V3 V4 V5 (latest)
Reader A: Read from V2 (snapshot isolated)
Reader B: Read from V5 (latest data)
Writer: Writing V6 (doesn't block readers)

Cloud Storage Setup

AWS S3 Configuration

Authentication Methods:

  1. IAM Role (recommended for EC2/Lambda):
No configuration needed - uses IAM role automatically
  1. Access Key (for applications):
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_DEFAULT_REGION=us-east-1
  1. S3 Client Configuration:
let s3_config = S3StorageConfig {
bucket: "my-data-lake".to_string(),
region: "us-east-1".to_string(),
endpoint: None, // Use default S3
access_key: None, // Use IAM role
secret_key: None,
};

S3 Bucket Best Practices:

s3://my-data-lake/
├── iceberg/
│ ├── warehouse/
│ │ ├── namespace1.db/
│ │ │ ├── table1/
│ │ │ └── table2/
│ │ └── namespace2.db/
├── delta/
│ ├── orders/
│ ├── customers/
│ └── products/
└── hudi/
├── events/
├── logs/
└── transactions/

Azure Blob Storage Setup

Authentication:

  1. Managed Identity (recommended):
Automatically uses the Azure AD identity of your resource
  1. Storage Account Key:
export AZURE_STORAGE_ACCOUNT=myaccount
export AZURE_STORAGE_KEY=your_storage_key
  1. Configuration:
let azure_config = AzureStorageConfig {
container: "datalake".to_string(),
account: "myaccount".to_string(),
auth_method: AuthMethod::ManagedIdentity,
};

Google Cloud Storage Setup

Authentication:

  1. Service Account (recommended):
Terminal window
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
  1. Configuration:
let gcs_config = GcsStorageConfig {
bucket: "my-data-lake".to_string(),
project: "my-project".to_string(),
credentials_path: Some("/path/to/service-account.json".to_string()),
};

Apache Iceberg

What is Apache Iceberg?

Apache Iceberg is an open table format designed for large analytic datasets. Created by Netflix and now Apache Software Foundation project.

Key Characteristics:

  • Copy-on-write semantics for metadata
  • Strict schema with evolution support
  • Partition evolution without rewriting data
  • Fast metadata operations (no need to list files)
  • Schema versioning for data compatibility

Setting Up Iceberg Tables

Create Iceberg Table via SQL:

-- Create new Iceberg table
CREATE TABLE IF NOT EXISTS customers (
id INT,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (YEAR(created_at), BUCKET(4, id))
LOCATION 's3://my-bucket/iceberg/customers';
-- Or create external table from existing data
CREATE EXTERNAL TABLE customers_existing
USING iceberg
LOCATION 's3://my-bucket/iceberg/customers_legacy';

Table Properties:

CREATE TABLE logs (
timestamp TIMESTAMP,
level VARCHAR,
message VARCHAR
)
USING iceberg
LOCATION 's3://my-bucket/logs'
WITH (
target_file_size = '134217728', -- 128MB files
format_version = '2', -- Use v2 format
write_distribution_mode = 'range' -- Range-based distribution
);

Schema Evolution with Iceberg

Add, remove, or rename columns without data migration:

-- Add column with default
ALTER TABLE customers ADD COLUMN phone VARCHAR DEFAULT 'N/A';
-- Add column without default (new rows must provide)
ALTER TABLE customers ADD COLUMN vip_tier INT;
-- Rename column
ALTER TABLE customers RENAME COLUMN email TO contact_email;
-- Drop column (marked as deleted, not removed)
ALTER TABLE customers DROP COLUMN phone;
-- Change column type
ALTER TABLE customers ALTER COLUMN id TYPE BIGINT;

Time-Travel Queries with Iceberg

-- Query at specific timestamp
SELECT * FROM customers
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-12-20 15:30:00';
-- Query at specific snapshot
SELECT * FROM customers
FOR VERSION AS OF 42;
-- Find differences between versions
SELECT t1.id, t1.name, t2.name as new_name
FROM customers FOR VERSION AS OF 41 t1
FULL OUTER JOIN customers FOR VERSION AS OF 42 t2 ON t1.id = t2.id
WHERE t1.name <> t2.name OR t2.name IS NULL;
-- Audit trail - show when each record changed
SELECT id, name,
CAST(__metadata.snapshot_id AS VARCHAR) as version,
__metadata.timestamp as change_time
FROM customers FOR ALL VERSIONS
ORDER BY id, version;

Partition Evolution

Change partition schemes without rewriting existing data:

-- Original partitioning: by year
-- Create table with annual partitioning
CREATE TABLE events (
timestamp TIMESTAMP,
event_type VARCHAR,
user_id INT
)
USING iceberg
PARTITIONED BY (YEAR(timestamp))
LOCATION 's3://my-bucket/events';
-- After time, you realize month-level partitioning is better
-- Add new partition spec (doesn't rewrite existing data)
ALTER TABLE events ADD PARTITION SPEC (MONTH(timestamp));
-- New data written with monthly partitions, old data stays yearly
-- Queries automatically handle both partition layouts
SELECT * FROM events WHERE timestamp > '2024-06-01';

Iceberg Performance Considerations

  1. Metadata Caching: Iceberg metadata is cached in memory
  2. Partition Pruning: Efficient filtering on partition columns
  3. File Statistics: Block-level statistics enable predicate pushdown
  4. Hidden Partitions: Partition columns are hidden from queries
-- Good: Uses partition pruning
SELECT * FROM events WHERE YEAR(timestamp) = 2024; -- Fast
-- Better: Direct partition column reference
SELECT * FROM events WHERE year = 2024; -- Faster
-- Use EXPLAIN to see partition pruning
EXPLAIN SELECT * FROM events WHERE YEAR(timestamp) = 2024;

Delta Lake

What is Delta Lake?

Delta Lake is an open-source format created by Databricks. Adds ACID transactions to Apache Spark and cloud storage.

Key Characteristics:

  • Transaction log tracks all changes
  • Optimized for OLAP queries on Spark
  • Unity Catalog integration for governance
  • OPTIMIZE and VACUUM operations
  • Z-order clustering for common access patterns

Setting Up Delta Tables

Create Delta Table:

CREATE TABLE IF NOT EXISTS orders (
order_id INT,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2),
status VARCHAR
)
USING delta
PARTITIONED BY (order_date)
LOCATION 's3://my-bucket/delta/orders';
-- Create with properties
CREATE TABLE IF NOT EXISTS products (
product_id INT,
name VARCHAR,
category VARCHAR,
price DECIMAL(10,2)
)
USING delta
LOCATION 's3://my-bucket/delta/products'
TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '32',
'delta.enableChangeDataFeed' = 'true'
);

Schema Merging in Delta Lake

Delta Lake supports schema merging during writes:

-- Original schema: (id, name)
-- INSERT with extra columns (description, created_at)
INSERT INTO customers VALUES (1, 'Alice', 'Active customer', '2024-01-01');
-- Schema automatically expanded to (id, name, description, created_at)
-- Existing rows get NULL for new columns

Time-Travel with Delta Lake

-- Using VERSION AS OF
SELECT * FROM orders FOR VERSION AS OF 5;
-- Using TIMESTAMP AS OF
SELECT * FROM orders FOR SYSTEM_TIME AS OF '2024-12-20 10:00:00';
-- Timeline: show what changed
SELECT * FROM orders VERSION HISTORY;
-- Shows: version, timestamp, user, operation, etc.
-- Calculate daily changes
WITH prev AS (
SELECT * FROM orders FOR VERSION AS OF 4
)
SELECT current.*, prev.amount as prev_amount
FROM (SELECT * FROM orders FOR VERSION AS OF 5) current
FULL OUTER JOIN prev ON current.order_id = prev.order_id;

OPTIMIZE and VACUUM

Maintain performance and manage storage:

-- Compact small files into larger ones
OPTIMIZE orders
USING ZORDER BY (customer_id, order_date);
-- This reorders data by customer_id then date for faster queries
-- Remove old versions (default: 30 days retention)
VACUUM orders;
-- Keep 90 days of history
VACUUM orders RETAIN 90 DAYS;
-- See what would be deleted (dry run)
VACUUM orders DRY RUN;
-- Specific retention
VACUUM orders RETAIN 7 DAYS; -- Aggressive: keep one week only

Z-Ordering for Performance

Cluster related data together for faster queries:

-- Before: random order
-- File 1: customer_id [1, 500, 1000, 50]
-- File 2: customer_id [100, 2000, 30, 800]
-- After Z-ordering
OPTIMIZE transactions USING ZORDER BY (customer_id);
-- File 1: customer_id [1-250]
-- File 2: customer_id [251-500]
-- Query benefit: skip files not matching predicate
SELECT * FROM transactions WHERE customer_id = 123; -- Only reads File 1

Apache Hudi

What is Apache Hudi?

Apache Hudi (Hadoop Unified Data Ingestion) enables incremental processing on data lakes.

Key Characteristics:

  • Copy-on-Write (COW) and Merge-on-Read (MOR) table types
  • Incremental queries for efficient processing
  • Upserts and bulk inserts with deduplication
  • Compaction to manage file size
  • Timeline for fine-grained versioning

COW vs MOR Tables

Copy-on-Write (COW) Tables:

  • New versions create completely new files
  • Fast reads, slower writes
  • Best for: Read-heavy workloads, data warehousing
Write: id=1, value=100
Read latest: 1-100 (single file read)
Write: id=1, value=200
Entire dataset rewritten
Read latest: 1-200 (single file read, but bigger files)

Merge-on-Read (MOR) Tables:

  • Updates stored in delta files
  • Base files + deltas = complete view
  • Compaction merges deltas into base files periodically
  • Fast writes, slightly slower reads
Write: id=1, value=100
Read latest: 1-100 (base file)
Write: id=1, value=200
Delta file written (small)
Read latest: merge base (1-100) + delta (1-200) = 1-200
Lazy compaction groups deltas with base files later

Setting Up Hudi Tables

Create Hudi Table (COW):

CREATE TABLE IF NOT EXISTS events (
event_id VARCHAR,
timestamp TIMESTAMP,
user_id INT,
event_type VARCHAR,
properties VARCHAR
)
USING hudi
WITH (
type = 'cow',
primaryKey = 'event_id',
preCombineKey = 'timestamp'
)
LOCATION 's3://my-bucket/hudi/events';

Create Hudi Table (MOR):

CREATE TABLE IF NOT EXISTS logs (
log_id VARCHAR,
timestamp TIMESTAMP,
level VARCHAR,
message VARCHAR
)
USING hudi
WITH (
type = 'mor',
primaryKey = 'log_id',
preCombineKey = 'timestamp',
compactionStrategy = 'async'
)
LOCATION 's3://my-bucket/hudi/logs';

Incremental Queries with Hudi

Efficiently process only new/changed data:

-- Get data added/changed since specific commit
SELECT * FROM events
INCREMENTAL QUERY
WHERE _hoodie_commit_time > '20240101000000'
AND _hoodie_commit_time <= '20240102000000';
-- More efficient than reading entire table
-- Perfect for: ELT pipelines, streaming ingestion
-- Example: Daily reconciliation
-- Only process yesterday's changes
WITH yesterday_changes AS (
SELECT * FROM transactions
INCREMENTAL QUERY
WHERE _hoodie_commit_time >= '20240101000000'
AND _hoodie_commit_time < '20240102000000'
)
SELECT
customer_id,
COUNT(*) as txn_count,
SUM(amount) as daily_total
FROM yesterday_changes
GROUP BY customer_id;

Compaction in Hudi

Merge delta files into base files to optimize reads:

-- Run inline compaction (synchronous)
ALTER TABLE events COMPACT;
-- Run async compaction (happens in background)
CALL RUN_COMPACTION('events');
-- Compaction strategies:
-- - inline: During writes, slower writes but good reads
-- - async: Scheduled background task, independent from writes
-- - schedule: Time-based compaction
-- Monitor compaction
SELECT
_hoodie_commit_time,
_hoodie_file_name,
COUNT(*) as record_count
FROM events
GROUP BY _hoodie_commit_time, _hoodie_file_name
ORDER BY _hoodie_commit_time DESC
LIMIT 50;

Multi-Format Federation

Querying Across Formats

HeliosDB allows seamless queries across different lakehouse formats:

-- Join Iceberg and Delta tables
SELECT
i.customer_id,
i.name,
d.order_id,
d.amount
FROM iceberg_customers i
INNER JOIN delta_orders d ON i.customer_id = d.customer_id
WHERE d.order_date > '2024-01-01';
-- Combine all three formats
SELECT
'iceberg' as source,
COUNT(*) as record_count
FROM iceberg_table
UNION ALL
SELECT
'delta' as source,
COUNT(*) as record_count
FROM delta_table
UNION ALL
SELECT
'hudi' as source,
COUNT(*) as record_count
FROM hudi_table;

Format Conversion

Migrate data between formats:

-- Create target table
CREATE TABLE customers_iceberg (
id INT,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP
)
USING iceberg
LOCATION 's3://my-bucket/iceberg/customers';
-- Copy data from Delta
INSERT INTO customers_iceberg
SELECT * FROM customers_delta;
-- Verify counts match
SELECT COUNT(*) as iceberg_count FROM customers_iceberg
UNION ALL
SELECT COUNT(*) as delta_count FROM customers_delta;

Format Comparison

When to Use Each Format

Use Apache Iceberg when:

  • Building enterprise data lakes with strict schemas
  • Requiring partition evolution and advanced features
  • Operating in multi-cloud environments
  • Needing hidden partitions for query efficiency
  • Integrating with Trino, Spark, Flink

Use Delta Lake when:

  • Deeply integrated with Databricks ecosystem
  • Leveraging Unity Catalog for governance
  • Standardized on Apache Spark
  • Need OPTIMIZE/VACUUM simplicity
  • Want Z-order clustering

Use Apache Hudi when:

  • Processing real-time streaming data
  • Requiring frequent upserts/updates
  • Need incremental query efficiency
  • Building near real-time analytics
  • Operating on tight budgets (MOR efficiency)

Feature Comparison

FeatureIcebergDeltaHudi
ACID TransactionsYesYesYes
Time TravelFull snapshotsVersions + timestampsIncremental
Schema EvolutionFullMergingLimited
Partition EvolutionYesNoNo
Incremental QueriesNoChange feedYes (native)
Compaction RequiredNoOPTIMIZEYes
Upserts/DeletesYesYesYes
Hidden PartitionsYesNoNo
Multi-cloud ReadyYesAzure/GCP limitedYes
Community SizeGrowingLarge (Databricks)Enterprise

Real-World Examples

Example 1: Data Lake Migration from Traditional Warehouse

Scenario: Migrating from on-prem Teradata to cloud lakehouse

-- Step 1: Create lakehouse tables (Iceberg chosen for flexibility)
CREATE TABLE IF NOT EXISTS raw_sales (
sale_id BIGINT,
store_id INT,
product_id INT,
quantity INT,
amount DECIMAL(10,2),
sale_date DATE,
sale_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (sale_date)
LOCATION 's3://my-datalake/raw/sales';
-- Step 2: Initial load with timestamp for auditing
INSERT INTO raw_sales
SELECT
sale_id,
store_id,
product_id,
quantity,
amount,
sale_date,
CURRENT_TIMESTAMP as sale_timestamp
FROM teradata_extract;
-- Step 3: Implement incremental sync
-- Daily job:
WITH source_data AS (
SELECT * FROM teradata_incremental
WHERE extraction_date = CURRENT_DATE
)
MERGE INTO raw_sales t
USING source_data s
ON t.sale_id = s.sale_id
WHEN MATCHED THEN UPDATE SET
quantity = s.quantity,
amount = s.amount,
sale_timestamp = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN INSERT *;
-- Step 4: Create curated layer
CREATE TABLE IF NOT EXISTS sales_by_store_day (
store_id INT,
sale_date DATE,
daily_sales DECIMAL(12,2),
transaction_count INT,
last_updated TIMESTAMP
)
USING iceberg
PARTITIONED BY (sale_date)
LOCATION 's3://my-datalake/curated/sales_by_store_day';
-- Step 5: Pipeline to populate curated layer
INSERT INTO sales_by_store_day
SELECT
store_id,
sale_date,
SUM(amount) as daily_sales,
COUNT(*) as transaction_count,
CURRENT_TIMESTAMP as last_updated
FROM raw_sales
WHERE sale_date = CURRENT_DATE - 1
GROUP BY store_id, sale_date;

Benefits realized:

  • Eliminated hardware costs (cloud storage is cheaper)
  • Improved query performance with partition pruning
  • Enabled time-travel for auditing
  • No vendor lock-in (can query from Trino, Spark, Flink)

Example 2: Multi-Cloud Lakehouse Setup

Scenario: Organization with multiple cloud providers wanting unified analytics

-- S3 Iceberg tables (AWS primary)
CREATE TABLE IF NOT EXISTS us_sales (
sale_id BIGINT,
region VARCHAR,
amount DECIMAL(10,2),
timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (YEAR(timestamp), MONTH(timestamp))
LOCATION 's3://us-datalake/sales';
-- Azure Blob Storage Delta tables
CREATE TABLE IF NOT EXISTS eu_sales (
sale_id BIGINT,
region VARCHAR,
amount DECIMAL(10,2),
timestamp TIMESTAMP
)
USING delta
PARTITIONED BY (timestamp)
LOCATION 'abfs://eu-datalake@storage.dfs.core.windows.net/sales';
-- GCS Hudi tables for streaming
CREATE TABLE IF NOT EXISTS apac_events (
event_id VARCHAR,
region VARCHAR,
event_type VARCHAR,
timestamp TIMESTAMP
)
USING hudi
WITH (
type = 'mor',
primaryKey = 'event_id'
)
LOCATION 'gs://apac-datalake/events';
-- Unified global reporting
CREATE TABLE global_sales_summary AS
SELECT
'us' as source,
region,
SUM(amount) as total_sales,
COUNT(*) as transaction_count,
MAX(timestamp) as latest_sale
FROM us_sales
WHERE YEAR(timestamp) = 2024
GROUP BY region
UNION ALL
SELECT
'eu' as source,
region,
SUM(amount) as total_sales,
COUNT(*) as transaction_count,
MAX(timestamp) as latest_sale
FROM eu_sales
WHERE YEAR(timestamp) = 2024
GROUP BY region
UNION ALL
SELECT
'apac' as source,
region,
SUM(CAST(amount AS DECIMAL(10,2))) as total_sales,
COUNT(*) as transaction_count,
MAX(timestamp) as latest_sale
FROM apac_events
WHERE YEAR(timestamp) = 2024
GROUP BY region;
-- Multi-cloud insights
SELECT
source,
region,
total_sales,
ROUND(100 * total_sales / SUM(total_sales) OVER (PARTITION BY source), 2) as pct_of_region
FROM global_sales_summary
ORDER BY source, total_sales DESC;

Benefits realized:

  • No vendor lock-in to single cloud
  • Regulatory compliance (data residency)
  • Cost optimization (use best pricing by region)
  • Unified analytics despite distributed architecture

Example 3: Time-Travel for Compliance Auditing

Scenario: Financial institution needs audit trail for regulatory compliance

-- Create transactions table with Iceberg
CREATE TABLE transactions (
txn_id VARCHAR,
account_id VARCHAR,
amount DECIMAL(15,2),
transaction_type VARCHAR,
status VARCHAR,
timestamp TIMESTAMP,
last_modified TIMESTAMP
)
USING iceberg
PARTITIONED BY (DATE(timestamp))
LOCATION 's3://financial-datalake/transactions';
-- Quarterly compliance check
-- "Show me all transactions that were rejected, and when they were marked as such"
WITH current_state AS (
SELECT
txn_id,
account_id,
amount,
status,
'current' as state_type
FROM transactions
WHERE status = 'REJECTED'
),
previous_states AS (
SELECT
txn_id,
account_id,
amount,
status,
'version_5' as state_type -- Compare against previous version
FROM transactions FOR VERSION AS OF 4
WHERE status = 'REJECTED'
)
SELECT
current_state.*,
previous_states.status as previous_status
FROM current_state
FULL OUTER JOIN previous_states
ON current_state.txn_id = previous_states.txn_id
WHERE current_state.status <> previous_states.status OR previous_states.status IS NULL
ORDER BY current_state.timestamp DESC;
-- Audit log: show who changed what transaction
SELECT
txn_id,
CAST(__metadata.snapshot_id AS VARCHAR) as version,
__metadata.timestamp as version_timestamp,
status,
(SELECT COUNT(*) FROM transactions t2
WHERE t2.txn_id = t1.txn_id AND t2.__metadata.snapshot_id <= t1.__metadata.snapshot_id
) as record_count
FROM transactions FOR ALL VERSIONS t1
WHERE txn_id = 'TXN-2024-123456'
ORDER BY __metadata.snapshot_id;
-- Recovery: restore transaction to previous state
INSERT INTO transactions_recovery
SELECT * FROM transactions FOR VERSION AS OF 50
WHERE txn_id = 'TXN-2024-CORRUPTED';

Benefits realized:

  • Complete audit trail for regulators
  • Ability to prove historical accuracy
  • Recovery from accidental changes
  • No additional storage cost (versioning built-in)

Performance Optimization

Partitioning Strategies

Good Partitioning:

-- Partition by time (most common)
CREATE TABLE events (
event_id VARCHAR,
timestamp TIMESTAMP,
user_id INT,
event_type VARCHAR
)
USING iceberg
PARTITIONED BY (DATE(timestamp), event_type) -- Multi-dimensional partition
LOCATION 's3://datalake/events';
-- Example: Query only scans relevant partitions
SELECT COUNT(*) FROM events
WHERE DATE(timestamp) = '2024-12-20' -- Reads only Dec 20 partition
AND event_type = 'click'; -- Further filters partition
-- Avoid over-partitioning
-- Too many partitions = metadata overhead
-- Rule of thumb: 100-10000 files per partition

Iceberg Hidden Partitions:

-- Iceberg automatically manages partition columns
CREATE TABLE orders (
order_id INT,
timestamp TIMESTAMP,
customer_id INT,
amount DECIMAL(10,2)
)
USING iceberg
PARTITIONED BY (BUCKET(10, customer_id), MONTH(timestamp));
-- Partitions are hidden - you don't specify them in queries
SELECT * FROM orders
WHERE customer_id = 123
AND timestamp > '2024-06-01';
-- Iceberg handles partition pruning automatically

File Format Selection

-- Parquet (recommended for analytics)
-- Columnar format, good compression, predicate pushdown
CREATE TABLE analytics (
id INT,
name VARCHAR,
metrics STRUCT<views: INT, clicks: INT>
)
USING iceberg
WITH (
write_format = 'parquet'
)
LOCATION 's3://datalake/analytics';
-- ORC (Apache Hadoop ecosystem)
-- Slightly better compression than Parquet, less universal
CREATE TABLE logs (
timestamp TIMESTAMP,
message VARCHAR
)
USING hudi
WITH (
write_format = 'orc'
)
LOCATION 's3://datalake/logs';

Caching and Indexing

Column Statistics:

-- Enable data skipping by collecting statistics
ALTER TABLE large_table SET TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '32' -- Delta
);
-- Query with statistics (rows skipped automatically)
SELECT * FROM large_table
WHERE indexed_column = 'value';
-- System skips entire files not matching predicate

Partition Pruning:

-- Good: forces partition pruning
SELECT * FROM sales
WHERE sale_date = '2024-12-20';
-- Better: use hidden partitions (Iceberg)
SELECT * FROM sales WHERE date_bucket = 'dec_20';
-- Monitor with EXPLAIN
EXPLAIN SELECT * FROM sales WHERE sale_date = '2024-12-20';
-- Shows: Files scanned = 1 (pruned to single partition)

Integration Patterns

ELT Pipeline Pattern

-- Bronze layer: Raw data as-is
CREATE TABLE bronze_raw_events (
raw_data VARCHAR,
ingestion_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (DATE(ingestion_timestamp))
LOCATION 's3://datalake/bronze/raw_events';
-- Silver layer: Cleaned, deduplicated
CREATE TABLE silver_events (
event_id VARCHAR,
event_type VARCHAR,
user_id INT,
timestamp TIMESTAMP,
properties VARCHAR,
processed_timestamp TIMESTAMP
)
USING iceberg
PARTITIONED BY (DATE(timestamp), event_type)
LOCATION 's3://datalake/silver/events';
-- Gold layer: Business-ready analytics
CREATE TABLE gold_events_daily (
event_date DATE,
event_type VARCHAR,
unique_users INT,
total_events INT,
top_properties VARCHAR
)
USING delta
PARTITIONED BY (event_date)
LOCATION 's3://datalake/gold/events_daily';
-- Bronze to Silver transformation
INSERT INTO silver_events
SELECT
MD5(CONCAT(raw_data, ingestion_timestamp)) as event_id,
JSON_EXTRACT_SCALAR(raw_data, '$.type') as event_type,
CAST(JSON_EXTRACT_SCALAR(raw_data, '$.user_id') AS INT) as user_id,
CAST(JSON_EXTRACT_SCALAR(raw_data, '$.timestamp') AS TIMESTAMP) as timestamp,
JSON_EXTRACT_SCALAR(raw_data, '$.properties') as properties,
CURRENT_TIMESTAMP as processed_timestamp
FROM bronze_raw_events
WHERE ingestion_timestamp = CURRENT_DATE;
-- Silver to Gold aggregation
INSERT INTO gold_events_daily
SELECT
CAST(timestamp AS DATE) as event_date,
event_type,
COUNT(DISTINCT user_id) as unique_users,
COUNT(*) as total_events,
JSON_EXTRACT_SCALAR(properties, '$.category') as top_properties
FROM silver_events
WHERE DATE(processed_timestamp) = CURRENT_DATE
GROUP BY event_date, event_type, top_properties;

Real-Time Streaming Pattern

-- Hudi MOR table for streaming ingestion
CREATE TABLE events_streaming (
event_id VARCHAR,
timestamp TIMESTAMP,
user_id INT,
event_type VARCHAR,
metadata VARCHAR
)
USING hudi
WITH (
type = 'mor',
primaryKey = 'event_id',
preCombineKey = 'timestamp',
compactionStrategy = 'async'
)
LOCATION 's3://datalake/events_stream';
-- Kafka consumer writes to Hudi
-- INSERT OR UPDATE based on event_id
-- Daily batch process new events efficiently
INSERT INTO events_daily
SELECT
DATE(timestamp) as event_date,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM events_streaming
INCREMENTAL QUERY -- Only read new data since last run
WHERE _hoodie_commit_time >= DATE_FORMAT(CURRENT_DATE - 1, 'yyyyMMddhhmmss')
GROUP BY DATE(timestamp);

Troubleshooting

Issue: Slow Queries on Large Tables

Symptoms: Queries take minutes even with filters

Solutions:

  1. Check partition pruning:
EXPLAIN SELECT * FROM large_table WHERE date = '2024-12-20';
-- Look for "Partitions scanned: 1" (good) vs "Partitions scanned: 10000" (bad)
  1. Add statistics:
-- For Iceberg
ALTER TABLE large_table SET TBLPROPERTIES (
'write.statistics.enabled' = 'true'
);
-- For Delta
ALTER TABLE large_table SET TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '32'
);
  1. Optimize file layout:
-- Iceberg: rewrite small files
ALTER TABLE large_table REWRITE DATA WHERE date = '2024-12-20';
-- Delta: compact files
OPTIMIZE large_table USING ZORDER BY (most_filtered_column);
-- Hudi: run compaction
CALL RUN_COMPACTION('large_table');

Issue: “File Not Found” Errors

Symptoms: Intermittent query failures, file list errors

Causes & Fixes:

  1. Concurrent writes causing file list inconsistency:
-- Ensure single writer
SET spark.sql.shuffle.partitions = 200; -- Adjust for parallelism
SET spark.sql.files.ignoreCorruptFiles = true; -- Skip corrupted files
  1. Clock skew in distributed system:
Fix system time synchronization (NTP)
Verify S3/Cloud clock is synchronized
  1. Permission issues:
Terminal window
# Check S3 access
aws s3 ls s3://my-bucket/iceberg/table --recursive
# Verify IAM role has required permissions:
# - s3:GetObject
# - s3:PutObject
# - s3:ListBucket
# - s3:DeleteObject

Issue: Schema Evolution Errors

Symptoms: “Column not found”, unexpected NULL values

Solutions:

  1. Check schema history:
-- For Iceberg
SELECT schema_id, schema FROM iceberg_metadata_tables
WHERE table = 'my_table'
ORDER BY schema_id DESC;
-- For Delta
SELECT add.schemaString FROM delta.`s3://bucket/table`
  1. Handle missing columns gracefully:
-- Old data missing new columns
SELECT
id,
name,
COALESCE(phone, 'N/A') as phone, -- Handle NULL
COALESCE(email, 'N/A') as email -- Handle NULL
FROM customers
WHERE phone IS NULL OR email IS NULL; -- Find incomplete records

Migration Guide

From Traditional OLAP to Lakehouse

Phase 1: Preparation

-- Audit existing warehouse
SELECT
table_name,
COUNT(*) as row_count,
SUM(bytes)/1024/1024/1024 as size_gb
FROM information_schema.tables
GROUP BY table_name;

Phase 2: Parallel Run

-- Keep both systems, sync data
-- DW → Lakehouse (daily job)
INSERT INTO lakehouse_table
SELECT * FROM dw_table
WHERE load_date = CURRENT_DATE;
-- Validate counts
SELECT COUNT(*) FROM dw_table
UNION ALL
SELECT COUNT(*) FROM lakehouse_table;

Phase 3: Cutover

-- Switch applications to lakehouse
-- Implement circuit breaker to revert if needed
-- Archive old data warehouse
SELECT * FROM dw_table WHERE created_date < '2024-01-01'
INSERT INTO lakehouse_archive;

Phase 4: Optimization

-- Now that in lakehouse, optimize
OPTIMIZE lakehouse_table USING ZORDER BY (most_filtered_column);
-- Monitor performance
SELECT COUNT(*) FROM lakehouse_table; -- Should be instant after OPTIMIZE

Inter-Format Migration

-- Delta → Iceberg (gain partition evolution)
CREATE TABLE iceberg_customers
USING iceberg
PARTITIONED BY (YEAR(created_at), BUCKET(10, id))
LOCATION 's3://bucket/iceberg/customers';
INSERT INTO iceberg_customers
SELECT * FROM delta_customers;
-- Verify
SELECT COUNT(*) FROM delta_customers; -- 1000
SELECT COUNT(*) FROM iceberg_customers; -- 1000
  • Lakehouse Architecture: See docs/architecture/storage/LAKEHOUSE_ARCHITECTURE.md
  • SQL Interface Guide: See docs/features/sql-interface/
  • Time-Travel Debugging: See docs/features/time-travel-debugging-examples.md
  • Performance Tuning: See docs/guides/user/PERFORMANCE_TUNING_GUIDE.md
  • Backup and Recovery: See docs/guides/user/BACKUP_RECOVERY_GUIDE.md
  • Security: See docs/guides/user/SECURITY_HARDENING_V7.md

Last Updated: December 2024 HeliosDB Version: 6.0+ License: Apache-2.0