Skip to content

Database Source Connector Specification

Database Source Connector Specification

Feature: F1.3 Flink Streaming - Database Source Connector Version: 1.0 Date: October 29, 2025 Status: APPROVED Priority: P1 CRITICAL Implementation Timeline: Week 1 (Nov 4-10, 2025)


Executive Summary

The Database Source Connector enables HeliosDB Flink Streaming to read data from relational databases (PostgreSQL, MySQL, Oracle) in a resumable, fault-tolerant manner. This connector supports three read modes: snapshot (full table scan), incremental (poll for changes), and CDC (change data capture using WAL/binlog).

Key Features

  • Resumable reads with checkpoint coordination
  • Change data capture (CDC) using PostgreSQL WAL, MySQL binlog
  • Parallel reading with configurable parallelism
  • Schema detection and automatic type mapping
  • Connection pooling for efficient resource usage
  • Backpressure support for downstream flow control

Requirements

Functional Requirements

IDRequirementPriority
FR-1Support PostgreSQL, MySQL, Oracle source databasesP0
FR-2Support snapshot read (full table scan)P0
FR-3Support incremental read (polling for new/updated rows)P1
FR-4Support CDC (change data capture) for real-time streamingP1
FR-5Resumable reads from last checkpointP0
FR-6Parallel reading with configurable parallelismP1
FR-7Schema detection and validationP0
FR-8Data type mapping (database โ†’ stream)P0
FR-9Connection pooling and managementP1
FR-10Backpressure integrationP0
FR-11Error handling and retry logicP0
FR-12Monitoring and metricsP1

Non-Functional Requirements

IDRequirementTarget
NFR-1Read throughput>10K rows/sec
NFR-2Latency (CDC mode)<100ms
NFR-3Memory usage<100MB per source
NFR-4Connection pool size1-100 connections
NFR-5Checkpoint overhead<5%
NFR-6Test coverage>90%

๐Ÿ— Architecture

High-Level Design

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ DatabaseSourceConnector โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Config โ”‚ โ”‚ Connection โ”‚ โ”‚ Checkpointโ”‚ โ”‚
โ”‚ โ”‚ Parser โ”‚ โ”‚ Pool โ”‚ โ”‚ Manager โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Read Mode Selector โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ Snapshot โ”‚ โ”‚Incrementalโ”‚ โ”‚ CDC โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ Reader โ”‚ โ”‚ Reader โ”‚ โ”‚ (WAL/Binlog)โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Schema โ”‚ โ”‚ Type Mapper โ”‚ โ”‚ Metrics โ”‚ โ”‚
โ”‚ โ”‚ Detector โ”‚ โ”‚ โ”‚ โ”‚ Collector โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Stream Pipeline โ”‚
โ”‚ (Downstream Ops) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Read Modes

1. Snapshot Mode (Full Table Scan)

  • Use Case: Initial bulk load, historical data ingestion
  • Mechanism:
    • Execute SELECT * FROM table with chunking
    • Divide table into N chunks (by primary key ranges)
    • Read chunks in parallel
    • Track progress with checkpoint offsets

2. Incremental Mode (Polling)

  • Use Case: Periodic updates, append-only data
  • Mechanism:
    • Execute SELECT * FROM table WHERE modified_at > ?
    • Track last read timestamp/ID
    • Poll at configurable interval (1s - 1h)
    • Resume from last checkpoint

3. CDC Mode (Change Data Capture)

  • Use Case: Real-time streaming, sub-second latency
  • Mechanism:
    • PostgreSQL: Read from WAL using logical replication
    • MySQL: Read from binlog
    • Oracle: Use Oracle GoldenGate or LogMiner
    • Stream INSERT, UPDATE, DELETE operations
    • Resume from LSN (Log Sequence Number)

API Design

Configuration

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseSourceConfig {
/// Database connection string
/// Format: postgresql://user:password@host:port/database
pub connection_string: String,
/// Source table name
pub table_name: String,
/// Read mode: snapshot, incremental, cdc
pub read_mode: ReadMode,
/// Parallelism (number of concurrent readers)
#[serde(default = "default_parallelism")]
pub parallelism: usize,
/// Connection pool configuration
pub connection_pool: ConnectionPoolConfig,
/// Checkpoint configuration
pub checkpoint: CheckpointConfig,
/// Schema configuration
pub schema: Option<SchemaConfig>,
/// Incremental mode settings
pub incremental: Option<IncrementalConfig>,
/// CDC mode settings
pub cdc: Option<CdcConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReadMode {
Snapshot {
/// Chunking strategy
chunk_strategy: ChunkStrategy,
/// Chunk size (rows per chunk)
chunk_size: usize,
},
Incremental {
/// Column to track (e.g., modified_at, id)
tracking_column: String,
/// Poll interval
poll_interval: Duration,
},
Cdc {
/// CDC provider (wal, binlog, goldengate)
provider: CdcProvider,
/// Start position (LSN, position, SCN)
start_position: Option<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionPoolConfig {
/// Minimum connections
pub min_connections: usize,
/// Maximum connections
pub max_connections: usize,
/// Connection timeout (seconds)
pub connect_timeout: u64,
/// Idle timeout (seconds)
pub idle_timeout: u64,
/// Max lifetime (seconds)
pub max_lifetime: u64,
}
fn default_parallelism() -> usize {
4
}

Connector Interface

pub struct DatabaseSourceConnector {
config: DatabaseSourceConfig,
connection_pool: Arc<ConnectionPool>,
checkpoint_coordinator: Arc<CheckpointCoordinator>,
schema: Arc<Schema>,
metrics: Arc<MetricsCollector>,
reader: Box<dyn DatabaseReader>,
}
impl DatabaseSourceConnector {
/// Create new database source connector
pub async fn new(config: DatabaseSourceConfig) -> Result<Self> {
// 1. Validate configuration
config.validate()?;
// 2. Create connection pool
let connection_pool = ConnectionPool::new(config.connection_pool.clone()).await?;
// 3. Detect schema
let schema = SchemaDetector::detect(&connection_pool, &config.table_name).await?;
// 4. Create reader based on mode
let reader = Self::create_reader(&config, &connection_pool, &schema).await?;
// 5. Initialize checkpoint coordinator
let checkpoint_coordinator = CheckpointCoordinator::new(config.checkpoint.clone());
// 6. Initialize metrics
let metrics = MetricsCollector::new("database_source");
Ok(Self {
config,
connection_pool,
checkpoint_coordinator,
schema,
metrics,
reader,
})
}
/// Start reading from database
pub async fn read(&mut self) -> Result<Vec<Row>> {
self.reader.read().await
}
/// Checkpoint current progress
pub async fn checkpoint(&self) -> Result<CheckpointMetadata> {
self.reader.checkpoint().await
}
/// Resume from checkpoint
pub async fn resume_from_checkpoint(&mut self, checkpoint: CheckpointMetadata) -> Result<()> {
self.reader.resume(checkpoint).await
}
/// Get current metrics
pub fn metrics(&self) -> Metrics {
self.metrics.snapshot()
}
}

Reader Trait

#[async_trait]
pub trait DatabaseReader: Send + Sync {
/// Read next batch of rows
async fn read(&mut self) -> Result<Vec<Row>>;
/// Create checkpoint
async fn checkpoint(&self) -> Result<CheckpointMetadata>;
/// Resume from checkpoint
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()>;
/// Get schema
fn schema(&self) -> &Schema;
/// Close reader
async fn close(&mut self) -> Result<()>;
}

Snapshot Reader

pub struct SnapshotReader {
connection_pool: Arc<ConnectionPool>,
table_name: String,
schema: Arc<Schema>,
chunk_strategy: ChunkStrategy,
chunk_size: usize,
current_chunk: usize,
total_chunks: usize,
parallelism: usize,
}
impl SnapshotReader {
pub async fn new(
connection_pool: Arc<ConnectionPool>,
table_name: String,
schema: Arc<Schema>,
chunk_strategy: ChunkStrategy,
chunk_size: usize,
parallelism: usize,
) -> Result<Self> {
// Calculate total chunks
let total_rows = Self::count_rows(&connection_pool, &table_name).await?;
let total_chunks = (total_rows + chunk_size - 1) / chunk_size;
Ok(Self {
connection_pool,
table_name,
schema,
chunk_strategy,
chunk_size,
current_chunk: 0,
total_chunks,
parallelism,
})
}
async fn read_chunk(&self, chunk_id: usize) -> Result<Vec<Row>> {
let conn = self.connection_pool.get().await?;
let (start, end) = match &self.chunk_strategy {
ChunkStrategy::ByPrimaryKey { column } => {
// Calculate primary key range for this chunk
let start_id = chunk_id * self.chunk_size;
let end_id = (chunk_id + 1) * self.chunk_size;
(start_id, end_id)
}
ChunkStrategy::ByOffset => {
let offset = chunk_id * self.chunk_size;
(offset, offset + self.chunk_size)
}
};
// Execute query
let query = match &self.chunk_strategy {
ChunkStrategy::ByPrimaryKey { column } => {
format!(
"SELECT * FROM {} WHERE {} >= {} AND {} < {} ORDER BY {}",
self.table_name, column, start, column, end, column
)
}
ChunkStrategy::ByOffset => {
format!(
"SELECT * FROM {} LIMIT {} OFFSET {}",
self.table_name, self.chunk_size, start
)
}
};
let rows = conn.query(&query).await?;
// Convert to stream rows
rows.into_iter()
.map(|db_row| self.convert_row(db_row))
.collect()
}
fn convert_row(&self, db_row: DatabaseRow) -> Result<Row> {
// Convert database row to stream row using schema
// Map data types appropriately
todo!("Implement row conversion")
}
}
#[async_trait]
impl DatabaseReader for SnapshotReader {
async fn read(&mut self) -> Result<Vec<Row>> {
if self.current_chunk >= self.total_chunks {
return Ok(vec![]); // EOF
}
// Read current chunk
let rows = self.read_chunk(self.current_chunk).await?;
self.current_chunk += 1;
Ok(rows)
}
async fn checkpoint(&self) -> Result<CheckpointMetadata> {
Ok(CheckpointMetadata {
read_mode: "snapshot".to_string(),
position: format!("chunk_{}", self.current_chunk),
timestamp: SystemTime::now(),
})
}
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()> {
// Parse checkpoint position
let chunk_str = checkpoint.position.strip_prefix("chunk_")
.ok_or(Error::InvalidCheckpoint)?;
self.current_chunk = chunk_str.parse()?;
Ok(())
}
fn schema(&self) -> &Schema {
&self.schema
}
async fn close(&mut self) -> Result<()> {
// Cleanup if needed
Ok(())
}
}

Incremental Reader

pub struct IncrementalReader {
connection_pool: Arc<ConnectionPool>,
table_name: String,
schema: Arc<Schema>,
tracking_column: String,
poll_interval: Duration,
last_value: Option<Value>,
}
#[async_trait]
impl DatabaseReader for IncrementalReader {
async fn read(&mut self) -> Result<Vec<Row>> {
let conn = self.connection_pool.get().await?;
let query = if let Some(last_value) = &self.last_value {
format!(
"SELECT * FROM {} WHERE {} > {} ORDER BY {} LIMIT 1000",
self.table_name, self.tracking_column, last_value, self.tracking_column
)
} else {
format!(
"SELECT * FROM {} ORDER BY {} LIMIT 1000",
self.table_name, self.tracking_column
)
};
let rows = conn.query(&query).await?;
if let Some(last_row) = rows.last() {
self.last_value = Some(last_row.get(&self.tracking_column)?);
}
// Convert rows
let stream_rows = rows.into_iter()
.map(|db_row| self.convert_row(db_row))
.collect::<Result<Vec<_>>>()?;
// Sleep until next poll
tokio::time::sleep(self.poll_interval).await;
Ok(stream_rows)
}
async fn checkpoint(&self) -> Result<CheckpointMetadata> {
Ok(CheckpointMetadata {
read_mode: "incremental".to_string(),
position: format!("value_{:?}", self.last_value),
timestamp: SystemTime::now(),
})
}
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()> {
// Parse last value from checkpoint
let value_str = checkpoint.position.strip_prefix("value_")
.ok_or(Error::InvalidCheckpoint)?;
self.last_value = Some(serde_json::from_str(value_str)?);
Ok(())
}
fn schema(&self) -> &Schema {
&self.schema
}
async fn close(&mut self) -> Result<()> {
Ok(())
}
}

CDC Reader (PostgreSQL WAL)

pub struct PostgresCdcReader {
connection_pool: Arc<ConnectionPool>,
replication_slot: String,
table_name: String,
schema: Arc<Schema>,
current_lsn: Option<Lsn>,
}
impl PostgresCdcReader {
pub async fn new(
connection_pool: Arc<ConnectionPool>,
table_name: String,
schema: Arc<Schema>,
start_lsn: Option<Lsn>,
) -> Result<Self> {
// Create replication slot
let replication_slot = format!("heliosdb_slot_{}", table_name);
let conn = connection_pool.get().await?;
conn.execute(&format!(
"SELECT pg_create_logical_replication_slot('{}', 'pgoutput')",
replication_slot
)).await?;
Ok(Self {
connection_pool,
replication_slot,
table_name,
schema,
current_lsn: start_lsn,
})
}
}
#[async_trait]
impl DatabaseReader for PostgresCdcReader {
async fn read(&mut self) -> Result<Vec<Row>> {
let conn = self.connection_pool.get().await?;
// Start logical replication
let query = if let Some(lsn) = &self.current_lsn {
format!(
"SELECT * FROM pg_logical_slot_get_changes('{}', '{}', NULL)",
self.replication_slot, lsn
)
} else {
format!(
"SELECT * FROM pg_logical_slot_peek_changes('{}', NULL, NULL)",
self.replication_slot
)
};
let changes = conn.query(&query).await?;
// Parse WAL changes
let mut rows = Vec::new();
for change in changes {
let lsn: Lsn = change.get("lsn")?;
let data: String = change.get("data")?;
// Parse logical replication message
if let Some(row) = self.parse_wal_message(&data)? {
rows.push(row);
}
self.current_lsn = Some(lsn);
}
Ok(rows)
}
fn parse_wal_message(&self, message: &str) -> Result<Option<Row>> {
// Parse pgoutput format
// Format: table_name INSERT/UPDATE/DELETE (column1, column2, ...)
// Example: users INSERT (id: 1, name: 'Alice', email: 'alice@example.com')
if !message.contains(&self.table_name) {
return Ok(None); // Not our table
}
// Parse operation type
let operation = if message.contains("INSERT") {
ChangeOperation::Insert
} else if message.contains("UPDATE") {
ChangeOperation::Update
} else if message.contains("DELETE") {
ChangeOperation::Delete
} else {
return Ok(None);
};
// Parse column values
// (simplified, actual parsing would be more complex)
let values = self.parse_column_values(message)?;
let mut row = Row::new(values);
row.set_metadata("operation", Value::String(operation.to_string()));
Ok(Some(row))
}
async fn checkpoint(&self) -> Result<CheckpointMetadata> {
Ok(CheckpointMetadata {
read_mode: "cdc".to_string(),
position: format!("lsn_{:?}", self.current_lsn),
timestamp: SystemTime::now(),
})
}
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()> {
let lsn_str = checkpoint.position.strip_prefix("lsn_")
.ok_or(Error::InvalidCheckpoint)?;
self.current_lsn = Some(lsn_str.parse()?);
Ok(())
}
fn schema(&self) -> &Schema {
&self.schema
}
async fn close(&mut self) -> Result<()> {
// Drop replication slot
let conn = self.connection_pool.get().await?;
conn.execute(&format!(
"SELECT pg_drop_replication_slot('{}')",
self.replication_slot
)).await?;
Ok(())
}
}

๐Ÿงช Testing Strategy

Unit Tests (10 tests)

  1. Configuration validation

    • Valid configurations
    • Invalid configurations (missing fields, wrong types)
  2. Connection pool

    • Pool creation
    • Connection acquisition
    • Connection release
    • Pool exhaustion
  3. Schema detection

    • PostgreSQL schema
    • MySQL schema
    • Type mapping
  4. Row conversion

    • Database row โ†’ Stream row
    • Type conversions
    • Null handling

Integration Tests (8 tests)

  1. Snapshot read

    • Full table scan
    • Chunked reading
    • Parallel reading
    • Resume from checkpoint
  2. Incremental read

    • First read (no last value)
    • Subsequent reads
    • Resume from checkpoint
    • Empty result handling
  3. CDC read

    • INSERT operations
    • UPDATE operations
    • DELETE operations
    • Resume from LSN
  4. Error handling

    • Connection failures
    • Invalid checkpoint
    • Schema changes
    • Retry logic

Performance Tests (2 tests)

  1. Throughput test

    • Measure rows/second for 1M row table
    • Target: >10K rows/sec
  2. Latency test

    • Measure end-to-end latency (CDC mode)
    • Target: <100ms

Metrics

Runtime Metrics

pub struct DatabaseSourceMetrics {
/// Total rows read
pub rows_read: Counter,
/// Read throughput (rows/sec)
pub read_throughput: Gauge,
/// Read latency (milliseconds)
pub read_latency: Histogram,
/// Active connections
pub active_connections: Gauge,
/// Checkpoint count
pub checkpoints: Counter,
/// Errors
pub errors: Counter,
}

Prometheus Metrics

  • heliosdb_database_source_rows_read_total
  • heliosdb_database_source_read_throughput
  • heliosdb_database_source_read_latency_seconds
  • heliosdb_database_source_active_connections
  • heliosdb_database_source_checkpoints_total
  • heliosdb_database_source_errors_total

๐Ÿšจ Error Handling

Error Types

#[derive(Debug, thiserror::Error)]
pub enum DatabaseSourceError {
#[error("Invalid configuration: {0}")]
InvalidConfig(String),
#[error("Connection failed: {0}")]
ConnectionFailed(String),
#[error("Schema detection failed: {0}")]
SchemaDetectionFailed(String),
#[error("Read failed: {0}")]
ReadFailed(String),
#[error("Checkpoint invalid: {0}")]
InvalidCheckpoint(String),
#[error("Type conversion failed: {0}")]
TypeConversionFailed(String),
}

Retry Strategy

  • Connection failures: Exponential backoff (1s, 2s, 4s, 8s, max 60s)
  • Read failures: 3 retries with 5s delay
  • Schema changes: Fail fast, require manual intervention

Documentation

User Guide Topics

  1. Getting Started

    • Installation
    • Configuration
  2. Read Modes

    • Snapshot mode
    • Incremental mode
    • CDC mode
  3. Configuration Reference

    • All configuration options
    • Examples
  4. Advanced Topics

    • Parallel reading
    • Checkpoint management
    • Performance tuning
  5. Troubleshooting

    • Common errors
    • Debug tips

Acceptance Criteria

  • All 15 tests passing (unit + integration)
  • Code coverage >90%
  • Throughput >10K rows/sec (benchmark)
  • Latency <100ms (CDC mode)
  • Documentation complete
  • Code reviewed and approved
  • Security reviewed (if applicable)

Document Version: 1.0 Last Updated: October 29, 2025 Status: APPROVED Implementation: Week 1 (Nov 4-10, 2025)