F6.1 Apache Iceberg Connector Architecture
F6.1 Apache Iceberg Connector Architecture
heliosdb-streaming Integration
Date: October 31, 2025 Priority: P0 (CRITICAL) ARR Impact: $30M Target: 8,500 LOC, 200 tests, 4 weeks Status: Architecture Design
EXECUTIVE SUMMARY
Design and implement Apache Iceberg connector for heliosdb-streaming to enable:
- Reading from Iceberg tables as streaming sources
- Writing streaming data to Iceberg tables as sinks
- Time travel queries for historical streaming data
- Schema evolution without pipeline downtime
- Integration with data lake ecosystems (S3, ADLS, GCS)
Why heliosdb-streaming?
The streaming crate already has:
- Arrow/Parquet/DataFusion (lines 27-29, Cargo.toml)
- Connector trait architecture (SourceConnector, SinkConnector)
- Database source/sink patterns (connectors/database/)
- Kafka/Pulsar/File connectors as reference implementations
This makes it the ideal location for Iceberg integration.
π REQUIREMENTS
Functional Requirements
-
Iceberg Source Connector
- Read Iceberg tables as streaming sources
- Support incremental reads (CDC-style)
- Snapshot-based reads
- Time travel queries (AS OF TIMESTAMP)
- Partition pruning and predicate pushdown
-
Iceberg Sink Connector
- Write streaming data to Iceberg tables
- Exactly-once semantics with checkpointing
- Automatic snapshot creation
- Schema evolution (add/rename/drop columns)
- Partition management
-
Catalog Integration
- Support Hive Metastore
- AWS Glue Catalog
- REST Catalog
- Nessie Catalog (optional)
-
Storage Integration
- S3 (AWS)
- ADLS (Azure)
- GCS (Google Cloud)
- Local filesystem (development)
Non-Functional Requirements
-
Performance
- Read throughput: 100K+ rows/sec
- Write throughput: 50K+ rows/sec
- Snapshot creation: <5 seconds
- Catalog query latency: <100ms
-
Reliability
- Exactly-once write semantics
- Automatic retry on transient failures
- Checkpoint recovery
-
Scalability
- Support tables with 1000+ partitions
- Handle files up to 1GB
- Support parallel reads (multiple workers)
π ARCHITECTURE OVERVIEW
Module Structure
heliosdb-streaming/src/connectors/βββ iceberg/β βββ mod.rs (200 LOC) - Public APIβ βββ source.rs (1,800 LOC) - IcebergSourceConnectorβ βββ sink.rs (1,600 LOC) - IcebergSinkConnectorβ βββ catalog.rs (1,200 LOC) - Catalog adaptersβ βββ snapshot.rs (800 LOC) - Snapshot managementβ βββ schema.rs (600 LOC) - Schema evolutionβ βββ partition.rs (500 LOC) - Partition handlingβ βββ time_travel.rs (400 LOC) - Time travel queriesβ βββ arrow_conversion.rs (600 LOC) - Arrow β Iceberg typesβ βββ config.rs (500 LOC) - Configuration typesβββ database/ (existing) βββ ...
Total: ~8,200 LOCCargo.toml Dependencies
Add to heliosdb-streaming/Cargo.toml:
# Apache Icebergiceberg = "0.3" # Core Iceberg libraryiceberg-rust = "0.3" # Rust Iceberg implementationobject_store = "0.10" # S3/ADLS/GCS abstraction
# Already have:# arrow = "53.0"# parquet = "53.0"# datafusion = "43.0"CONNECTOR ARCHITECTURE
1. IcebergSourceConnector
Purpose: Read Iceberg tables as streaming sources
pub struct IcebergSourceConnector { config: IcebergSourceConfig, catalog: Arc<dyn IcebergCatalog>, table: Arc<IcebergTable>, snapshot_id: Option<i64>, read_mode: IcebergReadMode, status: Arc<RwLock<ConnectorStatus>>, stop_signal: Arc<RwLock<bool>>,}
pub struct IcebergSourceConfig { pub catalog_type: CatalogType, // Hive, Glue, REST pub catalog_config: CatalogConfig, // Catalog-specific config pub table_name: String, // namespace.table pub read_mode: IcebergReadMode, // Batch, Incremental, Snapshot pub snapshot_id: Option<i64>, // For time travel pub timestamp: Option<DateTime<Utc>>, // For time travel pub partition_filter: Option<String>, // Optional partition filter pub batch_size: usize, // Rows per batch (default: 10000)}
pub enum IcebergReadMode { /// Read all data once Batch, /// Read incrementally (CDC-style, from snapshots) Incremental { /// Poll interval for new snapshots poll_interval: Duration, /// Starting snapshot (None = current) start_snapshot: Option<i64>, }, /// Read specific snapshot (time travel) Snapshot(i64),}
#[async_trait]impl SourceConnector for IcebergSourceConnector { async fn start(&self) -> Result<Stream>; async fn stop(&self) -> Result<()>; fn status(&self) -> ConnectorStatus;}Key Features:
- Batch Read: Read entire table once
- Incremental Read: Poll for new snapshots, emit changes
- Time Travel: Read from historical snapshot
- Partition Filtering: Prune partitions early
- Arrow Integration: Convert Iceberg β Arrow β Row
Implementation Flow:
1. Connect to catalog (Hive/Glue/REST)2. Load table metadata3. Select snapshot (current or historical)4. Build scan plan with partition filters5. Read Parquet files using Arrow6. Convert Arrow RecordBatch β Row7. Emit to Stream8. (Incremental) Poll for new snapshots2. IcebergSinkConnector
Purpose: Write streaming data to Iceberg tables
pub struct IcebergSinkConnector { config: IcebergSinkConfig, catalog: Arc<dyn IcebergCatalog>, table: Arc<IcebergTable>, writer: Arc<Mutex<IcebergWriter>>, pending_files: Arc<Mutex<Vec<DataFile>>>, checkpoint_interval: Duration, status: Arc<RwLock<ConnectorStatus>>,}
pub struct IcebergSinkConfig { pub catalog_type: CatalogType, pub catalog_config: CatalogConfig, pub table_name: String, pub write_mode: IcebergWriteMode, pub partition_spec: Option<PartitionSpec>, pub snapshot_properties: HashMap<String, String>, pub checkpoint_interval: Duration, // Default: 60s pub target_file_size_mb: usize, // Default: 128MB}
pub enum IcebergWriteMode { /// Append data Append, /// Overwrite entire table Overwrite, /// Overwrite specific partitions OverwritePartitions(Vec<String>),}
#[async_trait]impl SinkConnector for IcebergSinkConnector { async fn write(&self, rows: Vec<Row>) -> Result<()>; async fn flush(&self) -> Result<()>; fn status(&self) -> ConnectorStatus;}
impl IcebergSinkConnector { /// Create snapshot from pending files async fn commit_snapshot(&self) -> Result<i64>;
/// Handle schema evolution async fn evolve_schema(&self, new_fields: Vec<Field>) -> Result<()>;}Key Features:
- Exactly-Once Writes: Two-phase commit with checkpointing
- Schema Evolution: Automatic schema updates
- Partition Management: Automatic partition creation
- File Optimization: Target file size, small file consolidation
- Snapshot Creation: Atomic commits with metadata
Write Flow:
1. Buffer rows in memory (up to target_file_size_mb)2. Convert Row β Arrow RecordBatch3. Write Parquet file to object store (S3/ADLS/GCS)4. Accumulate DataFile metadata5. On checkpoint_interval: a. Create manifest file listing data files b. Commit snapshot to catalog c. Clear pending_files6. Handle schema evolution if needed3. Catalog Integration
Purpose: Support multiple Iceberg catalog backends
#[async_trait]pub trait IcebergCatalog: Send + Sync { async fn load_table(&self, name: &str) -> Result<Arc<IcebergTable>>; async fn create_table(&self, name: &str, schema: Schema, spec: PartitionSpec) -> Result<()>; async fn list_tables(&self, namespace: &str) -> Result<Vec<String>>; async fn table_exists(&self, name: &str) -> Result<bool>;}
pub enum CatalogType { Hive, Glue, Rest, Nessie,}
pub enum CatalogConfig { Hive(HiveConfig), Glue(GlueConfig), Rest(RestConfig), Nessie(NessieConfig),}
// Hive Metastorepub struct HiveCatalog { thrift_uri: String, warehouse: String,}
// AWS Gluepub struct GlueCatalog { region: String, database: String, warehouse: String,}
// REST Catalogpub struct RestCatalog { uri: String, credentials: Option<(String, String)>, warehouse: String,}
// Nessie (optional)pub struct NessieCatalog { uri: String, branch: String, warehouse: String,}Implementation:
- HiveCatalog: Use
thriftcrate for Hive Metastore - GlueCatalog: Use
aws-sdk-glue(already in deps) - RestCatalog: HTTP client with Iceberg REST spec
- NessieCatalog: REST client for Nessie
4. Snapshot Management
Purpose: Handle Iceberg snapshot operations
pub struct SnapshotManager { table: Arc<IcebergTable>, catalog: Arc<dyn IcebergCatalog>,}
impl SnapshotManager { /// Get current snapshot pub async fn current_snapshot(&self) -> Result<Snapshot>;
/// Get snapshot by ID pub async fn snapshot_by_id(&self, id: i64) -> Result<Snapshot>;
/// Get snapshot at timestamp (time travel) pub async fn snapshot_at_timestamp(&self, ts: DateTime<Utc>) -> Result<Snapshot>;
/// List all snapshots pub async fn list_snapshots(&self) -> Result<Vec<Snapshot>>;
/// Create new snapshot pub async fn create_snapshot( &self, data_files: Vec<DataFile>, operation: SnapshotOperation, ) -> Result<i64>;
/// Expire old snapshots pub async fn expire_snapshots(&self, older_than: DateTime<Utc>) -> Result<()>;}
pub enum SnapshotOperation { Append, Overwrite, Delete,}
pub struct Snapshot { pub snapshot_id: i64, pub parent_snapshot_id: Option<i64>, pub timestamp_ms: i64, pub manifest_list: String, pub summary: HashMap<String, String>,}5. Schema Evolution
Purpose: Handle schema changes without downtime
pub struct SchemaEvolution { current_schema: Schema, pending_changes: Vec<SchemaChange>,}
pub enum SchemaChange { AddColumn { name: String, field_type: IcebergType }, DropColumn { name: String }, RenameColumn { old_name: String, new_name: String }, UpdateType { name: String, new_type: IcebergType },}
impl SchemaEvolution { /// Check if schema evolution is compatible pub fn is_compatible(&self, new_schema: &Schema) -> Result<bool>;
/// Apply schema changes pub async fn apply(&self, table: &mut IcebergTable) -> Result<()>;
/// Generate migration SQL pub fn to_sql(&self) -> String;}
// Arrow <-> Iceberg type conversionpub struct TypeConverter;
impl TypeConverter { pub fn arrow_to_iceberg(arrow_type: &DataType) -> Result<IcebergType>; pub fn iceberg_to_arrow(iceberg_type: &IcebergType) -> Result<DataType>;}6. Time Travel Support
Purpose: Enable time travel queries
pub struct TimeTravelQuery { table: Arc<IcebergTable>, snapshot_manager: SnapshotManager,}
impl TimeTravelQuery { /// Query as of timestamp pub async fn as_of_timestamp( &self, timestamp: DateTime<Utc>, ) -> Result<IcebergSourceConnector>;
/// Query as of version (snapshot ID) pub async fn as_of_version( &self, snapshot_id: i64, ) -> Result<IcebergSourceConnector>;
/// Get changes between two snapshots (CDC) pub async fn changes_between( &self, from_snapshot: i64, to_snapshot: i64, ) -> Result<Vec<Change>>;}
pub enum Change { Insert(Row), Update { before: Row, after: Row }, Delete(Row),}π DATA FLOW
Read Flow (Source Connector)
ββββββββββββββββββββ Iceberg Table ββ (S3/ADLS/GCS) βββββββββββ¬βββββββββ β 1. Load metadata βΌββββββββββββββββββββ Catalog β (Hive/Glue/REST)β (metadata.json) βββββββββββ¬βββββββββ β 2. Get current/historical snapshot βΌββββββββββββββββββββ Snapshot ββ (manifest list) βββββββββββ¬βββββββββ β 3. Read manifests βΌββββββββββββββββββββ Manifest Files ββ (list data files)ββββββββββ¬βββββββββ β 4. Filter partitions βΌββββββββββββββββββββ Parquet Files β (S3/ADLS/GCS)ββββββββββ¬βββββββββ β 5. Read with Arrow βΌββββββββββββββββββββ Arrow RecordBatchβββββββββββ¬βββββββββ β 6. Convert to Row βΌββββββββββββββββββββ Stream<Row> β β ConsumerβββββββββββββββββββWrite Flow (Sink Connector)
ββββββββββββββββββββ Stream<Row> β β Producerββββββββββ¬βββββββββ β 1. Buffer rows βΌββββββββββββββββββββ Row Buffer β (in-memory, 128MB)ββββββββββ¬βββββββββ β 2. Convert to Arrow βΌββββββββββββββββββββ Arrow RecordBatchβββββββββββ¬βββββββββ β 3. Write Parquet βΌββββββββββββββββββββ Parquet File β (S3/ADLS/GCS)β (data-xxx.parquet)ββββββββββ¬βββββββββ β 4. Create DataFile metadata βΌββββββββββββββββββββ Manifest ββ (list data files)ββββββββββ¬βββββββββ β 5. Commit snapshot βΌββββββββββββββββββββ Catalog Update β (atomic commit)β (new snapshot) ββββββββββββββββββββπ§ͺ TESTING STRATEGY
Test Breakdown (200 tests)
1. Unit Tests (80 tests)
Module: heliosdb-streaming/src/connectors/iceberg/*.rs
-
source.rs (20 tests):
- Batch read mode
- Incremental read mode
- Snapshot read mode
- Partition filtering
- Time travel queries
- Error handling
-
sink.rs (20 tests):
- Append mode
- Overwrite mode
- Partition overwrite
- Schema evolution
- Checkpoint/recovery
- File size targeting
-
catalog.rs (15 tests):
- Hive catalog CRUD
- Glue catalog CRUD
- REST catalog CRUD
- Error handling
- Connection pooling
-
snapshot.rs (10 tests):
- Snapshot creation
- Snapshot expiration
- Time travel lookup
- Snapshot metadata
-
schema.rs (10 tests):
- Type conversion (Arrow β Iceberg)
- Schema compatibility checks
- Column add/drop/rename
- Schema evolution
-
partition.rs (5 tests):
- Partition spec creation
- Partition pruning
- Dynamic partitioning
2. Integration Tests (60 tests)
Location: heliosdb-streaming/tests/iceberg_integration.rs
-
End-to-end workflows (20 tests):
- Write β Read β Verify
- Incremental updates
- Schema evolution during write
- Time travel queries
- Partition management
-
Catalog integration (15 tests):
- Hive Metastore (Docker container)
- AWS Glue (LocalStack)
- REST Catalog (test server)
- Catalog failover
-
Storage integration (15 tests):
- S3 (MinIO)
- ADLS (Azurite)
- GCS (fake-gcs-server)
- Local filesystem
-
Performance tests (10 tests):
- 100K+ rows/sec read
- 50K+ rows/sec write
- Large file handling (1GB+)
- 1000+ partition tables
3. Streaming Integration Tests (40 tests)
Location: heliosdb-streaming/tests/iceberg_streaming.rs
-
Continuous ingestion (15 tests):
- Kafka β Iceberg
- CDC β Iceberg
- Webhook β Iceberg
- Exactly-once semantics
-
Streaming queries (10 tests):
- Windowed aggregation over Iceberg
- Stream-Iceberg joins
- Incremental materialized views
- Time-series analytics
-
Failure scenarios (10 tests):
- Writer crash recovery
- Network partition
- Catalog unavailable
- Corrupt files
-
Schema evolution (5 tests):
- Add column mid-stream
- Type promotion
- Backward compatibility
4. Benchmark Tests (20 tests)
Location: heliosdb-streaming/benches/iceberg_bench.rs
#[bench]fn bench_read_throughput_10k_rows(b: &mut Bencher);
#[bench]fn bench_read_throughput_100k_rows(b: &mut Bencher);
#[bench]fn bench_write_throughput_10k_rows(b: &mut Bencher);
#[bench]fn bench_snapshot_creation(b: &mut Bencher);
#[bench]fn bench_partition_pruning_1000_partitions(b: &mut Bencher);
// ... 15 more benchmarksPerformance Targets:
| Metric | Target | Measurement |
|---|---|---|
| Read throughput | 100K rows/sec | bench_read_throughput_100k_rows |
| Write throughput | 50K rows/sec | bench_write_throughput_100k_rows |
| Snapshot creation | <5 seconds | bench_snapshot_creation |
| Catalog query | <100ms | bench_catalog_load_table |
| Partition pruning | <50ms | bench_partition_pruning_1000_partitions |
π IMPLEMENTATION TIMELINE (4 weeks)
Week 1: Foundation (Days 1-7)
Goals: Catalog integration, basic read path
Tasks:
- Add Iceberg dependencies to Cargo.toml
- Implement
IcebergCatalogtrait - Implement
HiveCatalog(primary) - Implement
GlueCatalog(AWS) - Implement
RestCatalog(generic) - Basic
IcebergSourceConnector(batch read) - Arrow β Iceberg type conversion
- Write 30 unit tests
Deliverables:
- Catalog abstraction complete
- Basic read path functional
- 30 tests passing
LOC: ~2,500
Week 2: Source Connector (Days 8-14)
Goals: Complete read functionality
Tasks:
- Incremental read mode (CDC-style)
- Snapshot read mode (time travel)
- Partition filtering optimization
SnapshotManagerimplementationTimeTravelQueryimplementation- Integration with StreamingEngine
- Write 40 unit tests + 20 integration tests
Deliverables:
- Full source connector complete
- Time travel working
- Partition pruning optimized
- 60 tests passing
LOC: ~2,800
Week 3: Sink Connector (Days 15-21)
Goals: Complete write functionality
Tasks:
IcebergSinkConnectorimplementation- Parquet file writing with Arrow
- Manifest file creation
- Snapshot commit logic
- Schema evolution support
- Checkpointing and recovery
- Write 40 unit tests + 25 integration tests
Deliverables:
- Full sink connector complete
- Exactly-once writes working
- Schema evolution functional
- 65 tests passing
LOC: ~2,400
Week 4: Testing & Optimization (Days 22-28)
Goals: Production readiness
Tasks:
- Complete integration test suite
- Streaming integration tests (Kafka β Iceberg)
- Performance benchmarking
- Optimization (parallel reads, file coalescing)
- Documentation (API docs, examples)
- Write 15 streaming tests + 20 benchmarks
Deliverables:
- 200 tests passing
- Performance targets met
- Complete documentation
- Production-ready
LOC: ~500 (tests/docs)
Total: 8,200 LOC over 4 weeks
INTEGRATION WITH EXISTING CODE
1. Update lib.rs
pub mod connectors;
pub use connectors::{ // ... existing IcebergSourceConnector, IcebergSinkConnector, IcebergSourceConfig, IcebergSinkConfig, IcebergReadMode, IcebergWriteMode, CatalogType, CatalogConfig,};2. Update connectors.rs
pub mod database;pub mod iceberg; // NEW
pub use iceberg::{ IcebergSourceConnector, IcebergSinkConnector, IcebergSourceConfig, IcebergSinkConfig, IcebergCatalog, HiveCatalog, GlueCatalog, RestCatalog,};3. Example Usage
// Example: Read from Iceberg tableuse heliosdb_streaming::*;
let config = IcebergSourceConfig { catalog_type: CatalogType::Glue, catalog_config: CatalogConfig::Glue(GlueConfig { region: "us-east-1".to_string(), database: "prod_db".to_string(), warehouse: "s3://my-bucket/warehouse".to_string(), }), table_name: "sales.transactions".to_string(), read_mode: IcebergReadMode::Incremental { poll_interval: Duration::from_secs(60), start_snapshot: None, }, snapshot_id: None, timestamp: None, partition_filter: Some("year=2024 AND month=10".to_string()), batch_size: 10000,};
let connector = IcebergSourceConnector::new(config).await?;let stream = connector.start().await?;
while let Some(event) = stream.recv().await { match event { StreamEvent::Data(row) => { println!("Received: {:?}", row); } StreamEvent::EndOfStream => break, _ => {} }}// Example: Write to Iceberg tablelet config = IcebergSinkConfig { catalog_type: CatalogType::Glue, catalog_config: CatalogConfig::Glue(GlueConfig { region: "us-east-1".to_string(), database: "prod_db".to_string(), warehouse: "s3://my-bucket/warehouse".to_string(), }), table_name: "sales.aggregated".to_string(), write_mode: IcebergWriteMode::Append, partition_spec: Some(PartitionSpec::new(vec![ PartitionField::new("year", Transform::Identity), PartitionField::new("month", Transform::Identity), ])), snapshot_properties: HashMap::new(), checkpoint_interval: Duration::from_secs(60), target_file_size_mb: 128,};
let connector = IcebergSinkConnector::new(config).await?;
// Write rowsconnector.write(rows).await?;
// Flush and create snapshotconnector.flush().await?;SUCCESS CRITERIA
Functional Criteria
- Read from Iceberg tables (batch, incremental, snapshot)
- Write to Iceberg tables (append, overwrite)
- Time travel queries work
- Schema evolution handled automatically
- Partition pruning optimized
- Exactly-once write semantics
- All 3 catalog types supported (Hive, Glue, REST)
- All 3 storage types supported (S3, ADLS, GCS)
Performance Criteria
- Read throughput β₯ 100K rows/sec
- Write throughput β₯ 50K rows/sec
- Snapshot creation < 5 seconds
- Catalog queries < 100ms
- Partition pruning < 50ms for 1000+ partitions
Quality Criteria
- 200 tests passing
- 90%+ code coverage
- Zero critical bugs
- Complete API documentation
- 5+ working examples
Integration Criteria
- Works with Kafka streaming
- Works with CDC connector
- Integrates with StreamingEngine
- Compatible with existing connectors
- No breaking changes to heliosdb-streaming API
π¨ RISKS & MITIGATION
Technical Risks
| Risk | Impact | Probability | Mitigation |
|---|---|---|---|
iceberg-rust crate maturity | HIGH | MEDIUM | Contribute fixes upstream, vendor if needed |
| Arrow type conversion edge cases | MEDIUM | HIGH | Comprehensive unit tests, fuzz testing |
| Catalog API compatibility | MEDIUM | MEDIUM | Test against real Hive/Glue/REST catalogs |
| Large file handling (>1GB) | MEDIUM | LOW | Streaming reads, chunked processing |
| Concurrent write conflicts | HIGH | MEDIUM | Optimistic locking, retry logic |
Performance Risks
| Risk | Impact | Probability | Mitigation |
|---|---|---|---|
| Read throughput < 100K/sec | MEDIUM | LOW | Parallel file reads, prefetching |
| Write latency > 5 sec | MEDIUM | MEDIUM | Async writes, batch commits |
| Memory usage for large tables | MEDIUM | MEDIUM | Streaming API, pagination |
Schedule Risks
| Risk | Impact | Probability | Mitigation |
|---|---|---|---|
| Week 1-2 catalog integration delays | HIGH | MEDIUM | Start with REST catalog (simplest) |
| Week 3 sink connector complexity | MEDIUM | MEDIUM | Reuse database sink patterns |
| Testing bottleneck in Week 4 | MEDIUM | LOW | Write tests incrementally from Week 1 |
π REFERENCES
Documentation
Existing Code
heliosdb-streaming/src/connectors/database/- Reference implementationheliosdb-streaming/src/connectors.rs- Connector traitsheliosdb-streaming/Cargo.toml- Dependencies
External Resources
- AWS Glue Catalog API
- Hive Metastore Thrift API
- Iceberg REST Catalog Spec
NEXT STEPS (Week 1, Day 1)
-
Add dependencies to
heliosdb-streaming/Cargo.toml:iceberg = "0.3"iceberg-rust = "0.3"object_store = "0.10" -
Create module structure:
Terminal window mkdir -p heliosdb-streaming/src/connectors/icebergtouch heliosdb-streaming/src/connectors/iceberg/{mod.rs,catalog.rs,config.rs} -
Implement
IcebergCatalogtrait (Day 1-2) -
Implement
RestCatalog(simplest, Day 3) -
Begin
IcebergSourceConnector(Day 4-7)
Document Version: 1.0 Created: October 31, 2025 Owner: F6.1 Implementation Team Status: ARCHITECTURE APPROVED - READY FOR IMPLEMENTATION Next Review: Week 1 checkpoint (Day 7)
Apache Iceberg Connector for heliosdb-streaming βFrom Data Lakes to Streaming Analytics in Real-Timeβ