Skip to content

F6.1 Apache Iceberg Connector Architecture

F6.1 Apache Iceberg Connector Architecture

heliosdb-streaming Integration

Date: October 31, 2025 Priority: P0 (CRITICAL) ARR Impact: $30M Target: 8,500 LOC, 200 tests, 4 weeks Status: Architecture Design


EXECUTIVE SUMMARY

Design and implement Apache Iceberg connector for heliosdb-streaming to enable:

  • Reading from Iceberg tables as streaming sources
  • Writing streaming data to Iceberg tables as sinks
  • Time travel queries for historical streaming data
  • Schema evolution without pipeline downtime
  • Integration with data lake ecosystems (S3, ADLS, GCS)

Why heliosdb-streaming?

The streaming crate already has:

  • Arrow/Parquet/DataFusion (lines 27-29, Cargo.toml)
  • Connector trait architecture (SourceConnector, SinkConnector)
  • Database source/sink patterns (connectors/database/)
  • Kafka/Pulsar/File connectors as reference implementations

This makes it the ideal location for Iceberg integration.


πŸ“‹ REQUIREMENTS

Functional Requirements

  1. Iceberg Source Connector

    • Read Iceberg tables as streaming sources
    • Support incremental reads (CDC-style)
    • Snapshot-based reads
    • Time travel queries (AS OF TIMESTAMP)
    • Partition pruning and predicate pushdown
  2. Iceberg Sink Connector

    • Write streaming data to Iceberg tables
    • Exactly-once semantics with checkpointing
    • Automatic snapshot creation
    • Schema evolution (add/rename/drop columns)
    • Partition management
  3. Catalog Integration

    • Support Hive Metastore
    • AWS Glue Catalog
    • REST Catalog
    • Nessie Catalog (optional)
  4. Storage Integration

    • S3 (AWS)
    • ADLS (Azure)
    • GCS (Google Cloud)
    • Local filesystem (development)

Non-Functional Requirements

  1. Performance

    • Read throughput: 100K+ rows/sec
    • Write throughput: 50K+ rows/sec
    • Snapshot creation: <5 seconds
    • Catalog query latency: <100ms
  2. Reliability

    • Exactly-once write semantics
    • Automatic retry on transient failures
    • Checkpoint recovery
  3. Scalability

    • Support tables with 1000+ partitions
    • Handle files up to 1GB
    • Support parallel reads (multiple workers)

πŸ— ARCHITECTURE OVERVIEW

Module Structure

heliosdb-streaming/src/connectors/
β”œβ”€β”€ iceberg/
β”‚ β”œβ”€β”€ mod.rs (200 LOC) - Public API
β”‚ β”œβ”€β”€ source.rs (1,800 LOC) - IcebergSourceConnector
β”‚ β”œβ”€β”€ sink.rs (1,600 LOC) - IcebergSinkConnector
β”‚ β”œβ”€β”€ catalog.rs (1,200 LOC) - Catalog adapters
β”‚ β”œβ”€β”€ snapshot.rs (800 LOC) - Snapshot management
β”‚ β”œβ”€β”€ schema.rs (600 LOC) - Schema evolution
β”‚ β”œβ”€β”€ partition.rs (500 LOC) - Partition handling
β”‚ β”œβ”€β”€ time_travel.rs (400 LOC) - Time travel queries
β”‚ β”œβ”€β”€ arrow_conversion.rs (600 LOC) - Arrow ↔ Iceberg types
β”‚ └── config.rs (500 LOC) - Configuration types
└── database/ (existing)
└── ...
Total: ~8,200 LOC

Cargo.toml Dependencies

Add to heliosdb-streaming/Cargo.toml:

# Apache Iceberg
iceberg = "0.3" # Core Iceberg library
iceberg-rust = "0.3" # Rust Iceberg implementation
object_store = "0.10" # S3/ADLS/GCS abstraction
# Already have:
# arrow = "53.0"
# parquet = "53.0"
# datafusion = "43.0"

CONNECTOR ARCHITECTURE

1. IcebergSourceConnector

Purpose: Read Iceberg tables as streaming sources

heliosdb-streaming/src/connectors/iceberg/source.rs
pub struct IcebergSourceConnector {
config: IcebergSourceConfig,
catalog: Arc<dyn IcebergCatalog>,
table: Arc<IcebergTable>,
snapshot_id: Option<i64>,
read_mode: IcebergReadMode,
status: Arc<RwLock<ConnectorStatus>>,
stop_signal: Arc<RwLock<bool>>,
}
pub struct IcebergSourceConfig {
pub catalog_type: CatalogType, // Hive, Glue, REST
pub catalog_config: CatalogConfig, // Catalog-specific config
pub table_name: String, // namespace.table
pub read_mode: IcebergReadMode, // Batch, Incremental, Snapshot
pub snapshot_id: Option<i64>, // For time travel
pub timestamp: Option<DateTime<Utc>>, // For time travel
pub partition_filter: Option<String>, // Optional partition filter
pub batch_size: usize, // Rows per batch (default: 10000)
}
pub enum IcebergReadMode {
/// Read all data once
Batch,
/// Read incrementally (CDC-style, from snapshots)
Incremental {
/// Poll interval for new snapshots
poll_interval: Duration,
/// Starting snapshot (None = current)
start_snapshot: Option<i64>,
},
/// Read specific snapshot (time travel)
Snapshot(i64),
}
#[async_trait]
impl SourceConnector for IcebergSourceConnector {
async fn start(&self) -> Result<Stream>;
async fn stop(&self) -> Result<()>;
fn status(&self) -> ConnectorStatus;
}

Key Features:

  1. Batch Read: Read entire table once
  2. Incremental Read: Poll for new snapshots, emit changes
  3. Time Travel: Read from historical snapshot
  4. Partition Filtering: Prune partitions early
  5. Arrow Integration: Convert Iceberg β†’ Arrow β†’ Row

Implementation Flow:

1. Connect to catalog (Hive/Glue/REST)
2. Load table metadata
3. Select snapshot (current or historical)
4. Build scan plan with partition filters
5. Read Parquet files using Arrow
6. Convert Arrow RecordBatch β†’ Row
7. Emit to Stream
8. (Incremental) Poll for new snapshots

2. IcebergSinkConnector

Purpose: Write streaming data to Iceberg tables

heliosdb-streaming/src/connectors/iceberg/sink.rs
pub struct IcebergSinkConnector {
config: IcebergSinkConfig,
catalog: Arc<dyn IcebergCatalog>,
table: Arc<IcebergTable>,
writer: Arc<Mutex<IcebergWriter>>,
pending_files: Arc<Mutex<Vec<DataFile>>>,
checkpoint_interval: Duration,
status: Arc<RwLock<ConnectorStatus>>,
}
pub struct IcebergSinkConfig {
pub catalog_type: CatalogType,
pub catalog_config: CatalogConfig,
pub table_name: String,
pub write_mode: IcebergWriteMode,
pub partition_spec: Option<PartitionSpec>,
pub snapshot_properties: HashMap<String, String>,
pub checkpoint_interval: Duration, // Default: 60s
pub target_file_size_mb: usize, // Default: 128MB
}
pub enum IcebergWriteMode {
/// Append data
Append,
/// Overwrite entire table
Overwrite,
/// Overwrite specific partitions
OverwritePartitions(Vec<String>),
}
#[async_trait]
impl SinkConnector for IcebergSinkConnector {
async fn write(&self, rows: Vec<Row>) -> Result<()>;
async fn flush(&self) -> Result<()>;
fn status(&self) -> ConnectorStatus;
}
impl IcebergSinkConnector {
/// Create snapshot from pending files
async fn commit_snapshot(&self) -> Result<i64>;
/// Handle schema evolution
async fn evolve_schema(&self, new_fields: Vec<Field>) -> Result<()>;
}

Key Features:

  1. Exactly-Once Writes: Two-phase commit with checkpointing
  2. Schema Evolution: Automatic schema updates
  3. Partition Management: Automatic partition creation
  4. File Optimization: Target file size, small file consolidation
  5. Snapshot Creation: Atomic commits with metadata

Write Flow:

1. Buffer rows in memory (up to target_file_size_mb)
2. Convert Row β†’ Arrow RecordBatch
3. Write Parquet file to object store (S3/ADLS/GCS)
4. Accumulate DataFile metadata
5. On checkpoint_interval:
a. Create manifest file listing data files
b. Commit snapshot to catalog
c. Clear pending_files
6. Handle schema evolution if needed

3. Catalog Integration

Purpose: Support multiple Iceberg catalog backends

heliosdb-streaming/src/connectors/iceberg/catalog.rs
#[async_trait]
pub trait IcebergCatalog: Send + Sync {
async fn load_table(&self, name: &str) -> Result<Arc<IcebergTable>>;
async fn create_table(&self, name: &str, schema: Schema, spec: PartitionSpec) -> Result<()>;
async fn list_tables(&self, namespace: &str) -> Result<Vec<String>>;
async fn table_exists(&self, name: &str) -> Result<bool>;
}
pub enum CatalogType {
Hive,
Glue,
Rest,
Nessie,
}
pub enum CatalogConfig {
Hive(HiveConfig),
Glue(GlueConfig),
Rest(RestConfig),
Nessie(NessieConfig),
}
// Hive Metastore
pub struct HiveCatalog {
thrift_uri: String,
warehouse: String,
}
// AWS Glue
pub struct GlueCatalog {
region: String,
database: String,
warehouse: String,
}
// REST Catalog
pub struct RestCatalog {
uri: String,
credentials: Option<(String, String)>,
warehouse: String,
}
// Nessie (optional)
pub struct NessieCatalog {
uri: String,
branch: String,
warehouse: String,
}

Implementation:

  1. HiveCatalog: Use thrift crate for Hive Metastore
  2. GlueCatalog: Use aws-sdk-glue (already in deps)
  3. RestCatalog: HTTP client with Iceberg REST spec
  4. NessieCatalog: REST client for Nessie

4. Snapshot Management

Purpose: Handle Iceberg snapshot operations

heliosdb-streaming/src/connectors/iceberg/snapshot.rs
pub struct SnapshotManager {
table: Arc<IcebergTable>,
catalog: Arc<dyn IcebergCatalog>,
}
impl SnapshotManager {
/// Get current snapshot
pub async fn current_snapshot(&self) -> Result<Snapshot>;
/// Get snapshot by ID
pub async fn snapshot_by_id(&self, id: i64) -> Result<Snapshot>;
/// Get snapshot at timestamp (time travel)
pub async fn snapshot_at_timestamp(&self, ts: DateTime<Utc>) -> Result<Snapshot>;
/// List all snapshots
pub async fn list_snapshots(&self) -> Result<Vec<Snapshot>>;
/// Create new snapshot
pub async fn create_snapshot(
&self,
data_files: Vec<DataFile>,
operation: SnapshotOperation,
) -> Result<i64>;
/// Expire old snapshots
pub async fn expire_snapshots(&self, older_than: DateTime<Utc>) -> Result<()>;
}
pub enum SnapshotOperation {
Append,
Overwrite,
Delete,
}
pub struct Snapshot {
pub snapshot_id: i64,
pub parent_snapshot_id: Option<i64>,
pub timestamp_ms: i64,
pub manifest_list: String,
pub summary: HashMap<String, String>,
}

5. Schema Evolution

Purpose: Handle schema changes without downtime

heliosdb-streaming/src/connectors/iceberg/schema.rs
pub struct SchemaEvolution {
current_schema: Schema,
pending_changes: Vec<SchemaChange>,
}
pub enum SchemaChange {
AddColumn { name: String, field_type: IcebergType },
DropColumn { name: String },
RenameColumn { old_name: String, new_name: String },
UpdateType { name: String, new_type: IcebergType },
}
impl SchemaEvolution {
/// Check if schema evolution is compatible
pub fn is_compatible(&self, new_schema: &Schema) -> Result<bool>;
/// Apply schema changes
pub async fn apply(&self, table: &mut IcebergTable) -> Result<()>;
/// Generate migration SQL
pub fn to_sql(&self) -> String;
}
// Arrow <-> Iceberg type conversion
pub struct TypeConverter;
impl TypeConverter {
pub fn arrow_to_iceberg(arrow_type: &DataType) -> Result<IcebergType>;
pub fn iceberg_to_arrow(iceberg_type: &IcebergType) -> Result<DataType>;
}

6. Time Travel Support

Purpose: Enable time travel queries

heliosdb-streaming/src/connectors/iceberg/time_travel.rs
pub struct TimeTravelQuery {
table: Arc<IcebergTable>,
snapshot_manager: SnapshotManager,
}
impl TimeTravelQuery {
/// Query as of timestamp
pub async fn as_of_timestamp(
&self,
timestamp: DateTime<Utc>,
) -> Result<IcebergSourceConnector>;
/// Query as of version (snapshot ID)
pub async fn as_of_version(
&self,
snapshot_id: i64,
) -> Result<IcebergSourceConnector>;
/// Get changes between two snapshots (CDC)
pub async fn changes_between(
&self,
from_snapshot: i64,
to_snapshot: i64,
) -> Result<Vec<Change>>;
}
pub enum Change {
Insert(Row),
Update { before: Row, after: Row },
Delete(Row),
}

πŸ”„ DATA FLOW

Read Flow (Source Connector)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Iceberg Table β”‚
β”‚ (S3/ADLS/GCS) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 1. Load metadata
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Catalog β”‚ (Hive/Glue/REST)
β”‚ (metadata.json) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 2. Get current/historical snapshot
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Snapshot β”‚
β”‚ (manifest list) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 3. Read manifests
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Manifest Files β”‚
β”‚ (list data files)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 4. Filter partitions
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Parquet Files β”‚ (S3/ADLS/GCS)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 5. Read with Arrow
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Arrow RecordBatchβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 6. Convert to Row
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Stream<Row> β”‚ β†’ Consumer
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Write Flow (Sink Connector)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Stream<Row> β”‚ ← Producer
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 1. Buffer rows
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Row Buffer β”‚ (in-memory, 128MB)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 2. Convert to Arrow
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Arrow RecordBatchβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 3. Write Parquet
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Parquet File β”‚ (S3/ADLS/GCS)
β”‚ (data-xxx.parquet)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 4. Create DataFile metadata
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Manifest β”‚
β”‚ (list data files)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ 5. Commit snapshot
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Catalog Update β”‚ (atomic commit)
β”‚ (new snapshot) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ§ͺ TESTING STRATEGY

Test Breakdown (200 tests)

1. Unit Tests (80 tests)

Module: heliosdb-streaming/src/connectors/iceberg/*.rs

  • source.rs (20 tests):

    • Batch read mode
    • Incremental read mode
    • Snapshot read mode
    • Partition filtering
    • Time travel queries
    • Error handling
  • sink.rs (20 tests):

    • Append mode
    • Overwrite mode
    • Partition overwrite
    • Schema evolution
    • Checkpoint/recovery
    • File size targeting
  • catalog.rs (15 tests):

    • Hive catalog CRUD
    • Glue catalog CRUD
    • REST catalog CRUD
    • Error handling
    • Connection pooling
  • snapshot.rs (10 tests):

    • Snapshot creation
    • Snapshot expiration
    • Time travel lookup
    • Snapshot metadata
  • schema.rs (10 tests):

    • Type conversion (Arrow ↔ Iceberg)
    • Schema compatibility checks
    • Column add/drop/rename
    • Schema evolution
  • partition.rs (5 tests):

    • Partition spec creation
    • Partition pruning
    • Dynamic partitioning

2. Integration Tests (60 tests)

Location: heliosdb-streaming/tests/iceberg_integration.rs

  • End-to-end workflows (20 tests):

    • Write β†’ Read β†’ Verify
    • Incremental updates
    • Schema evolution during write
    • Time travel queries
    • Partition management
  • Catalog integration (15 tests):

    • Hive Metastore (Docker container)
    • AWS Glue (LocalStack)
    • REST Catalog (test server)
    • Catalog failover
  • Storage integration (15 tests):

    • S3 (MinIO)
    • ADLS (Azurite)
    • GCS (fake-gcs-server)
    • Local filesystem
  • Performance tests (10 tests):

    • 100K+ rows/sec read
    • 50K+ rows/sec write
    • Large file handling (1GB+)
    • 1000+ partition tables

3. Streaming Integration Tests (40 tests)

Location: heliosdb-streaming/tests/iceberg_streaming.rs

  • Continuous ingestion (15 tests):

    • Kafka β†’ Iceberg
    • CDC β†’ Iceberg
    • Webhook β†’ Iceberg
    • Exactly-once semantics
  • Streaming queries (10 tests):

    • Windowed aggregation over Iceberg
    • Stream-Iceberg joins
    • Incremental materialized views
    • Time-series analytics
  • Failure scenarios (10 tests):

    • Writer crash recovery
    • Network partition
    • Catalog unavailable
    • Corrupt files
  • Schema evolution (5 tests):

    • Add column mid-stream
    • Type promotion
    • Backward compatibility

4. Benchmark Tests (20 tests)

Location: heliosdb-streaming/benches/iceberg_bench.rs

#[bench]
fn bench_read_throughput_10k_rows(b: &mut Bencher);
#[bench]
fn bench_read_throughput_100k_rows(b: &mut Bencher);
#[bench]
fn bench_write_throughput_10k_rows(b: &mut Bencher);
#[bench]
fn bench_snapshot_creation(b: &mut Bencher);
#[bench]
fn bench_partition_pruning_1000_partitions(b: &mut Bencher);
// ... 15 more benchmarks

Performance Targets:

MetricTargetMeasurement
Read throughput100K rows/secbench_read_throughput_100k_rows
Write throughput50K rows/secbench_write_throughput_100k_rows
Snapshot creation<5 secondsbench_snapshot_creation
Catalog query<100msbench_catalog_load_table
Partition pruning<50msbench_partition_pruning_1000_partitions

πŸ“… IMPLEMENTATION TIMELINE (4 weeks)

Week 1: Foundation (Days 1-7)

Goals: Catalog integration, basic read path

Tasks:

  1. Add Iceberg dependencies to Cargo.toml
  2. Implement IcebergCatalog trait
  3. Implement HiveCatalog (primary)
  4. Implement GlueCatalog (AWS)
  5. Implement RestCatalog (generic)
  6. Basic IcebergSourceConnector (batch read)
  7. Arrow ↔ Iceberg type conversion
  8. Write 30 unit tests

Deliverables:

  • Catalog abstraction complete
  • Basic read path functional
  • 30 tests passing

LOC: ~2,500

Week 2: Source Connector (Days 8-14)

Goals: Complete read functionality

Tasks:

  1. Incremental read mode (CDC-style)
  2. Snapshot read mode (time travel)
  3. Partition filtering optimization
  4. SnapshotManager implementation
  5. TimeTravelQuery implementation
  6. Integration with StreamingEngine
  7. Write 40 unit tests + 20 integration tests

Deliverables:

  • Full source connector complete
  • Time travel working
  • Partition pruning optimized
  • 60 tests passing

LOC: ~2,800

Week 3: Sink Connector (Days 15-21)

Goals: Complete write functionality

Tasks:

  1. IcebergSinkConnector implementation
  2. Parquet file writing with Arrow
  3. Manifest file creation
  4. Snapshot commit logic
  5. Schema evolution support
  6. Checkpointing and recovery
  7. Write 40 unit tests + 25 integration tests

Deliverables:

  • Full sink connector complete
  • Exactly-once writes working
  • Schema evolution functional
  • 65 tests passing

LOC: ~2,400

Week 4: Testing & Optimization (Days 22-28)

Goals: Production readiness

Tasks:

  1. Complete integration test suite
  2. Streaming integration tests (Kafka β†’ Iceberg)
  3. Performance benchmarking
  4. Optimization (parallel reads, file coalescing)
  5. Documentation (API docs, examples)
  6. Write 15 streaming tests + 20 benchmarks

Deliverables:

  • 200 tests passing
  • Performance targets met
  • Complete documentation
  • Production-ready

LOC: ~500 (tests/docs)

Total: 8,200 LOC over 4 weeks


INTEGRATION WITH EXISTING CODE

1. Update lib.rs

heliosdb-streaming/src/lib.rs
pub mod connectors;
pub use connectors::{
// ... existing
IcebergSourceConnector, IcebergSinkConnector,
IcebergSourceConfig, IcebergSinkConfig,
IcebergReadMode, IcebergWriteMode,
CatalogType, CatalogConfig,
};

2. Update connectors.rs

heliosdb-streaming/src/connectors.rs
pub mod database;
pub mod iceberg; // NEW
pub use iceberg::{
IcebergSourceConnector, IcebergSinkConnector,
IcebergSourceConfig, IcebergSinkConfig,
IcebergCatalog, HiveCatalog, GlueCatalog, RestCatalog,
};

3. Example Usage

// Example: Read from Iceberg table
use heliosdb_streaming::*;
let config = IcebergSourceConfig {
catalog_type: CatalogType::Glue,
catalog_config: CatalogConfig::Glue(GlueConfig {
region: "us-east-1".to_string(),
database: "prod_db".to_string(),
warehouse: "s3://my-bucket/warehouse".to_string(),
}),
table_name: "sales.transactions".to_string(),
read_mode: IcebergReadMode::Incremental {
poll_interval: Duration::from_secs(60),
start_snapshot: None,
},
snapshot_id: None,
timestamp: None,
partition_filter: Some("year=2024 AND month=10".to_string()),
batch_size: 10000,
};
let connector = IcebergSourceConnector::new(config).await?;
let stream = connector.start().await?;
while let Some(event) = stream.recv().await {
match event {
StreamEvent::Data(row) => {
println!("Received: {:?}", row);
}
StreamEvent::EndOfStream => break,
_ => {}
}
}
// Example: Write to Iceberg table
let config = IcebergSinkConfig {
catalog_type: CatalogType::Glue,
catalog_config: CatalogConfig::Glue(GlueConfig {
region: "us-east-1".to_string(),
database: "prod_db".to_string(),
warehouse: "s3://my-bucket/warehouse".to_string(),
}),
table_name: "sales.aggregated".to_string(),
write_mode: IcebergWriteMode::Append,
partition_spec: Some(PartitionSpec::new(vec![
PartitionField::new("year", Transform::Identity),
PartitionField::new("month", Transform::Identity),
])),
snapshot_properties: HashMap::new(),
checkpoint_interval: Duration::from_secs(60),
target_file_size_mb: 128,
};
let connector = IcebergSinkConnector::new(config).await?;
// Write rows
connector.write(rows).await?;
// Flush and create snapshot
connector.flush().await?;

SUCCESS CRITERIA

Functional Criteria

  • Read from Iceberg tables (batch, incremental, snapshot)
  • Write to Iceberg tables (append, overwrite)
  • Time travel queries work
  • Schema evolution handled automatically
  • Partition pruning optimized
  • Exactly-once write semantics
  • All 3 catalog types supported (Hive, Glue, REST)
  • All 3 storage types supported (S3, ADLS, GCS)

Performance Criteria

  • Read throughput β‰₯ 100K rows/sec
  • Write throughput β‰₯ 50K rows/sec
  • Snapshot creation < 5 seconds
  • Catalog queries < 100ms
  • Partition pruning < 50ms for 1000+ partitions

Quality Criteria

  • 200 tests passing
  • 90%+ code coverage
  • Zero critical bugs
  • Complete API documentation
  • 5+ working examples

Integration Criteria

  • Works with Kafka streaming
  • Works with CDC connector
  • Integrates with StreamingEngine
  • Compatible with existing connectors
  • No breaking changes to heliosdb-streaming API

🚨 RISKS & MITIGATION

Technical Risks

RiskImpactProbabilityMitigation
iceberg-rust crate maturityHIGHMEDIUMContribute fixes upstream, vendor if needed
Arrow type conversion edge casesMEDIUMHIGHComprehensive unit tests, fuzz testing
Catalog API compatibilityMEDIUMMEDIUMTest against real Hive/Glue/REST catalogs
Large file handling (>1GB)MEDIUMLOWStreaming reads, chunked processing
Concurrent write conflictsHIGHMEDIUMOptimistic locking, retry logic

Performance Risks

RiskImpactProbabilityMitigation
Read throughput < 100K/secMEDIUMLOWParallel file reads, prefetching
Write latency > 5 secMEDIUMMEDIUMAsync writes, batch commits
Memory usage for large tablesMEDIUMMEDIUMStreaming API, pagination

Schedule Risks

RiskImpactProbabilityMitigation
Week 1-2 catalog integration delaysHIGHMEDIUMStart with REST catalog (simplest)
Week 3 sink connector complexityMEDIUMMEDIUMReuse database sink patterns
Testing bottleneck in Week 4MEDIUMLOWWrite tests incrementally from Week 1

πŸ“š REFERENCES

Documentation

Existing Code

  • heliosdb-streaming/src/connectors/database/ - Reference implementation
  • heliosdb-streaming/src/connectors.rs - Connector traits
  • heliosdb-streaming/Cargo.toml - Dependencies

External Resources

  • AWS Glue Catalog API
  • Hive Metastore Thrift API
  • Iceberg REST Catalog Spec

NEXT STEPS (Week 1, Day 1)

  1. Add dependencies to heliosdb-streaming/Cargo.toml:

    iceberg = "0.3"
    iceberg-rust = "0.3"
    object_store = "0.10"
  2. Create module structure:

    Terminal window
    mkdir -p heliosdb-streaming/src/connectors/iceberg
    touch heliosdb-streaming/src/connectors/iceberg/{mod.rs,catalog.rs,config.rs}
  3. Implement IcebergCatalog trait (Day 1-2)

  4. Implement RestCatalog (simplest, Day 3)

  5. Begin IcebergSourceConnector (Day 4-7)


Document Version: 1.0 Created: October 31, 2025 Owner: F6.1 Implementation Team Status: ARCHITECTURE APPROVED - READY FOR IMPLEMENTATION Next Review: Week 1 checkpoint (Day 7)


Apache Iceberg Connector for heliosdb-streaming β€œFrom Data Lakes to Streaming Analytics in Real-Time”