Database Source Connector Specification
Database Source Connector Specification
Feature: F1.3 Flink Streaming - Database Source Connector Version: 1.0 Date: October 29, 2025 Status: APPROVED Priority: P1 CRITICAL Implementation Timeline: Week 1 (Nov 4-10, 2025)
Executive Summary
The Database Source Connector enables HeliosDB Flink Streaming to read data from relational databases (PostgreSQL, MySQL, Oracle) in a resumable, fault-tolerant manner. This connector supports three read modes: snapshot (full table scan), incremental (poll for changes), and CDC (change data capture using WAL/binlog).
Key Features
- Resumable reads with checkpoint coordination
- Change data capture (CDC) using PostgreSQL WAL, MySQL binlog
- Parallel reading with configurable parallelism
- Schema detection and automatic type mapping
- Connection pooling for efficient resource usage
- Backpressure support for downstream flow control
Requirements
Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| FR-1 | Support PostgreSQL, MySQL, Oracle source databases | P0 |
| FR-2 | Support snapshot read (full table scan) | P0 |
| FR-3 | Support incremental read (polling for new/updated rows) | P1 |
| FR-4 | Support CDC (change data capture) for real-time streaming | P1 |
| FR-5 | Resumable reads from last checkpoint | P0 |
| FR-6 | Parallel reading with configurable parallelism | P1 |
| FR-7 | Schema detection and validation | P0 |
| FR-8 | Data type mapping (database โ stream) | P0 |
| FR-9 | Connection pooling and management | P1 |
| FR-10 | Backpressure integration | P0 |
| FR-11 | Error handling and retry logic | P0 |
| FR-12 | Monitoring and metrics | P1 |
Non-Functional Requirements
| ID | Requirement | Target |
|---|---|---|
| NFR-1 | Read throughput | >10K rows/sec |
| NFR-2 | Latency (CDC mode) | <100ms |
| NFR-3 | Memory usage | <100MB per source |
| NFR-4 | Connection pool size | 1-100 connections |
| NFR-5 | Checkpoint overhead | <5% |
| NFR-6 | Test coverage | >90% |
๐ Architecture
High-Level Design
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ DatabaseSourceConnector โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโคโ โโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโ โ Config โ โ Connection โ โ Checkpointโ โโ โ Parser โ โ Pool โ โ Manager โ โโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโ โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโ โ Read Mode Selector โ โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โโ โ โ โโ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโโโ โ โโ โ โ Snapshot โ โIncrementalโ โ CDC โ โ โโ โ โ Reader โ โ Reader โ โ (WAL/Binlog)โ โ โโ โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโโโ โ โโ โ โ โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโ โโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโ โ Schema โ โ Type Mapper โ โ Metrics โ โโ โ Detector โ โ โ โ Collector โ โโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโ โ Stream Pipeline โ โ (Downstream Ops) โ โโโโโโโโโโโโโโโโโโโโโโRead Modes
1. Snapshot Mode (Full Table Scan)
- Use Case: Initial bulk load, historical data ingestion
- Mechanism:
- Execute
SELECT * FROM tablewith chunking - Divide table into N chunks (by primary key ranges)
- Read chunks in parallel
- Track progress with checkpoint offsets
- Execute
2. Incremental Mode (Polling)
- Use Case: Periodic updates, append-only data
- Mechanism:
- Execute
SELECT * FROM table WHERE modified_at > ? - Track last read timestamp/ID
- Poll at configurable interval (1s - 1h)
- Resume from last checkpoint
- Execute
3. CDC Mode (Change Data Capture)
- Use Case: Real-time streaming, sub-second latency
- Mechanism:
- PostgreSQL: Read from WAL using logical replication
- MySQL: Read from binlog
- Oracle: Use Oracle GoldenGate or LogMiner
- Stream INSERT, UPDATE, DELETE operations
- Resume from LSN (Log Sequence Number)
API Design
Configuration
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct DatabaseSourceConfig { /// Database connection string /// Format: postgresql://user:password@host:port/database pub connection_string: String,
/// Source table name pub table_name: String,
/// Read mode: snapshot, incremental, cdc pub read_mode: ReadMode,
/// Parallelism (number of concurrent readers) #[serde(default = "default_parallelism")] pub parallelism: usize,
/// Connection pool configuration pub connection_pool: ConnectionPoolConfig,
/// Checkpoint configuration pub checkpoint: CheckpointConfig,
/// Schema configuration pub schema: Option<SchemaConfig>,
/// Incremental mode settings pub incremental: Option<IncrementalConfig>,
/// CDC mode settings pub cdc: Option<CdcConfig>,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub enum ReadMode { Snapshot { /// Chunking strategy chunk_strategy: ChunkStrategy, /// Chunk size (rows per chunk) chunk_size: usize, }, Incremental { /// Column to track (e.g., modified_at, id) tracking_column: String, /// Poll interval poll_interval: Duration, }, Cdc { /// CDC provider (wal, binlog, goldengate) provider: CdcProvider, /// Start position (LSN, position, SCN) start_position: Option<String>, },}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct ConnectionPoolConfig { /// Minimum connections pub min_connections: usize, /// Maximum connections pub max_connections: usize, /// Connection timeout (seconds) pub connect_timeout: u64, /// Idle timeout (seconds) pub idle_timeout: u64, /// Max lifetime (seconds) pub max_lifetime: u64,}
fn default_parallelism() -> usize { 4}Connector Interface
pub struct DatabaseSourceConnector { config: DatabaseSourceConfig, connection_pool: Arc<ConnectionPool>, checkpoint_coordinator: Arc<CheckpointCoordinator>, schema: Arc<Schema>, metrics: Arc<MetricsCollector>, reader: Box<dyn DatabaseReader>,}
impl DatabaseSourceConnector { /// Create new database source connector pub async fn new(config: DatabaseSourceConfig) -> Result<Self> { // 1. Validate configuration config.validate()?;
// 2. Create connection pool let connection_pool = ConnectionPool::new(config.connection_pool.clone()).await?;
// 3. Detect schema let schema = SchemaDetector::detect(&connection_pool, &config.table_name).await?;
// 4. Create reader based on mode let reader = Self::create_reader(&config, &connection_pool, &schema).await?;
// 5. Initialize checkpoint coordinator let checkpoint_coordinator = CheckpointCoordinator::new(config.checkpoint.clone());
// 6. Initialize metrics let metrics = MetricsCollector::new("database_source");
Ok(Self { config, connection_pool, checkpoint_coordinator, schema, metrics, reader, }) }
/// Start reading from database pub async fn read(&mut self) -> Result<Vec<Row>> { self.reader.read().await }
/// Checkpoint current progress pub async fn checkpoint(&self) -> Result<CheckpointMetadata> { self.reader.checkpoint().await }
/// Resume from checkpoint pub async fn resume_from_checkpoint(&mut self, checkpoint: CheckpointMetadata) -> Result<()> { self.reader.resume(checkpoint).await }
/// Get current metrics pub fn metrics(&self) -> Metrics { self.metrics.snapshot() }}Reader Trait
#[async_trait]pub trait DatabaseReader: Send + Sync { /// Read next batch of rows async fn read(&mut self) -> Result<Vec<Row>>;
/// Create checkpoint async fn checkpoint(&self) -> Result<CheckpointMetadata>;
/// Resume from checkpoint async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()>;
/// Get schema fn schema(&self) -> &Schema;
/// Close reader async fn close(&mut self) -> Result<()>;}Snapshot Reader
pub struct SnapshotReader { connection_pool: Arc<ConnectionPool>, table_name: String, schema: Arc<Schema>, chunk_strategy: ChunkStrategy, chunk_size: usize, current_chunk: usize, total_chunks: usize, parallelism: usize,}
impl SnapshotReader { pub async fn new( connection_pool: Arc<ConnectionPool>, table_name: String, schema: Arc<Schema>, chunk_strategy: ChunkStrategy, chunk_size: usize, parallelism: usize, ) -> Result<Self> { // Calculate total chunks let total_rows = Self::count_rows(&connection_pool, &table_name).await?; let total_chunks = (total_rows + chunk_size - 1) / chunk_size;
Ok(Self { connection_pool, table_name, schema, chunk_strategy, chunk_size, current_chunk: 0, total_chunks, parallelism, }) }
async fn read_chunk(&self, chunk_id: usize) -> Result<Vec<Row>> { let conn = self.connection_pool.get().await?;
let (start, end) = match &self.chunk_strategy { ChunkStrategy::ByPrimaryKey { column } => { // Calculate primary key range for this chunk let start_id = chunk_id * self.chunk_size; let end_id = (chunk_id + 1) * self.chunk_size; (start_id, end_id) } ChunkStrategy::ByOffset => { let offset = chunk_id * self.chunk_size; (offset, offset + self.chunk_size) } };
// Execute query let query = match &self.chunk_strategy { ChunkStrategy::ByPrimaryKey { column } => { format!( "SELECT * FROM {} WHERE {} >= {} AND {} < {} ORDER BY {}", self.table_name, column, start, column, end, column ) } ChunkStrategy::ByOffset => { format!( "SELECT * FROM {} LIMIT {} OFFSET {}", self.table_name, self.chunk_size, start ) } };
let rows = conn.query(&query).await?;
// Convert to stream rows rows.into_iter() .map(|db_row| self.convert_row(db_row)) .collect() }
fn convert_row(&self, db_row: DatabaseRow) -> Result<Row> { // Convert database row to stream row using schema // Map data types appropriately todo!("Implement row conversion") }}
#[async_trait]impl DatabaseReader for SnapshotReader { async fn read(&mut self) -> Result<Vec<Row>> { if self.current_chunk >= self.total_chunks { return Ok(vec![]); // EOF }
// Read current chunk let rows = self.read_chunk(self.current_chunk).await?; self.current_chunk += 1;
Ok(rows) }
async fn checkpoint(&self) -> Result<CheckpointMetadata> { Ok(CheckpointMetadata { read_mode: "snapshot".to_string(), position: format!("chunk_{}", self.current_chunk), timestamp: SystemTime::now(), }) }
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()> { // Parse checkpoint position let chunk_str = checkpoint.position.strip_prefix("chunk_") .ok_or(Error::InvalidCheckpoint)?; self.current_chunk = chunk_str.parse()?; Ok(()) }
fn schema(&self) -> &Schema { &self.schema }
async fn close(&mut self) -> Result<()> { // Cleanup if needed Ok(()) }}Incremental Reader
pub struct IncrementalReader { connection_pool: Arc<ConnectionPool>, table_name: String, schema: Arc<Schema>, tracking_column: String, poll_interval: Duration, last_value: Option<Value>,}
#[async_trait]impl DatabaseReader for IncrementalReader { async fn read(&mut self) -> Result<Vec<Row>> { let conn = self.connection_pool.get().await?;
let query = if let Some(last_value) = &self.last_value { format!( "SELECT * FROM {} WHERE {} > {} ORDER BY {} LIMIT 1000", self.table_name, self.tracking_column, last_value, self.tracking_column ) } else { format!( "SELECT * FROM {} ORDER BY {} LIMIT 1000", self.table_name, self.tracking_column ) };
let rows = conn.query(&query).await?;
if let Some(last_row) = rows.last() { self.last_value = Some(last_row.get(&self.tracking_column)?); }
// Convert rows let stream_rows = rows.into_iter() .map(|db_row| self.convert_row(db_row)) .collect::<Result<Vec<_>>>()?;
// Sleep until next poll tokio::time::sleep(self.poll_interval).await;
Ok(stream_rows) }
async fn checkpoint(&self) -> Result<CheckpointMetadata> { Ok(CheckpointMetadata { read_mode: "incremental".to_string(), position: format!("value_{:?}", self.last_value), timestamp: SystemTime::now(), }) }
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()> { // Parse last value from checkpoint let value_str = checkpoint.position.strip_prefix("value_") .ok_or(Error::InvalidCheckpoint)?; self.last_value = Some(serde_json::from_str(value_str)?); Ok(()) }
fn schema(&self) -> &Schema { &self.schema }
async fn close(&mut self) -> Result<()> { Ok(()) }}CDC Reader (PostgreSQL WAL)
pub struct PostgresCdcReader { connection_pool: Arc<ConnectionPool>, replication_slot: String, table_name: String, schema: Arc<Schema>, current_lsn: Option<Lsn>,}
impl PostgresCdcReader { pub async fn new( connection_pool: Arc<ConnectionPool>, table_name: String, schema: Arc<Schema>, start_lsn: Option<Lsn>, ) -> Result<Self> { // Create replication slot let replication_slot = format!("heliosdb_slot_{}", table_name);
let conn = connection_pool.get().await?; conn.execute(&format!( "SELECT pg_create_logical_replication_slot('{}', 'pgoutput')", replication_slot )).await?;
Ok(Self { connection_pool, replication_slot, table_name, schema, current_lsn: start_lsn, }) }}
#[async_trait]impl DatabaseReader for PostgresCdcReader { async fn read(&mut self) -> Result<Vec<Row>> { let conn = self.connection_pool.get().await?;
// Start logical replication let query = if let Some(lsn) = &self.current_lsn { format!( "SELECT * FROM pg_logical_slot_get_changes('{}', '{}', NULL)", self.replication_slot, lsn ) } else { format!( "SELECT * FROM pg_logical_slot_peek_changes('{}', NULL, NULL)", self.replication_slot ) };
let changes = conn.query(&query).await?;
// Parse WAL changes let mut rows = Vec::new(); for change in changes { let lsn: Lsn = change.get("lsn")?; let data: String = change.get("data")?;
// Parse logical replication message if let Some(row) = self.parse_wal_message(&data)? { rows.push(row); }
self.current_lsn = Some(lsn); }
Ok(rows) }
fn parse_wal_message(&self, message: &str) -> Result<Option<Row>> { // Parse pgoutput format // Format: table_name INSERT/UPDATE/DELETE (column1, column2, ...) // Example: users INSERT (id: 1, name: 'Alice', email: 'alice@example.com')
if !message.contains(&self.table_name) { return Ok(None); // Not our table }
// Parse operation type let operation = if message.contains("INSERT") { ChangeOperation::Insert } else if message.contains("UPDATE") { ChangeOperation::Update } else if message.contains("DELETE") { ChangeOperation::Delete } else { return Ok(None); };
// Parse column values // (simplified, actual parsing would be more complex) let values = self.parse_column_values(message)?;
let mut row = Row::new(values); row.set_metadata("operation", Value::String(operation.to_string()));
Ok(Some(row)) }
async fn checkpoint(&self) -> Result<CheckpointMetadata> { Ok(CheckpointMetadata { read_mode: "cdc".to_string(), position: format!("lsn_{:?}", self.current_lsn), timestamp: SystemTime::now(), }) }
async fn resume(&mut self, checkpoint: CheckpointMetadata) -> Result<()> { let lsn_str = checkpoint.position.strip_prefix("lsn_") .ok_or(Error::InvalidCheckpoint)?; self.current_lsn = Some(lsn_str.parse()?); Ok(()) }
fn schema(&self) -> &Schema { &self.schema }
async fn close(&mut self) -> Result<()> { // Drop replication slot let conn = self.connection_pool.get().await?; conn.execute(&format!( "SELECT pg_drop_replication_slot('{}')", self.replication_slot )).await?; Ok(()) }}๐งช Testing Strategy
Unit Tests (10 tests)
-
Configuration validation
- Valid configurations
- Invalid configurations (missing fields, wrong types)
-
Connection pool
- Pool creation
- Connection acquisition
- Connection release
- Pool exhaustion
-
Schema detection
- PostgreSQL schema
- MySQL schema
- Type mapping
-
Row conversion
- Database row โ Stream row
- Type conversions
- Null handling
Integration Tests (8 tests)
-
Snapshot read
- Full table scan
- Chunked reading
- Parallel reading
- Resume from checkpoint
-
Incremental read
- First read (no last value)
- Subsequent reads
- Resume from checkpoint
- Empty result handling
-
CDC read
- INSERT operations
- UPDATE operations
- DELETE operations
- Resume from LSN
-
Error handling
- Connection failures
- Invalid checkpoint
- Schema changes
- Retry logic
Performance Tests (2 tests)
-
Throughput test
- Measure rows/second for 1M row table
- Target: >10K rows/sec
-
Latency test
- Measure end-to-end latency (CDC mode)
- Target: <100ms
Metrics
Runtime Metrics
pub struct DatabaseSourceMetrics { /// Total rows read pub rows_read: Counter, /// Read throughput (rows/sec) pub read_throughput: Gauge, /// Read latency (milliseconds) pub read_latency: Histogram, /// Active connections pub active_connections: Gauge, /// Checkpoint count pub checkpoints: Counter, /// Errors pub errors: Counter,}Prometheus Metrics
heliosdb_database_source_rows_read_totalheliosdb_database_source_read_throughputheliosdb_database_source_read_latency_secondsheliosdb_database_source_active_connectionsheliosdb_database_source_checkpoints_totalheliosdb_database_source_errors_total
๐จ Error Handling
Error Types
#[derive(Debug, thiserror::Error)]pub enum DatabaseSourceError { #[error("Invalid configuration: {0}")] InvalidConfig(String),
#[error("Connection failed: {0}")] ConnectionFailed(String),
#[error("Schema detection failed: {0}")] SchemaDetectionFailed(String),
#[error("Read failed: {0}")] ReadFailed(String),
#[error("Checkpoint invalid: {0}")] InvalidCheckpoint(String),
#[error("Type conversion failed: {0}")] TypeConversionFailed(String),}Retry Strategy
- Connection failures: Exponential backoff (1s, 2s, 4s, 8s, max 60s)
- Read failures: 3 retries with 5s delay
- Schema changes: Fail fast, require manual intervention
Documentation
User Guide Topics
-
Getting Started
- Installation
- Configuration
-
Read Modes
- Snapshot mode
- Incremental mode
- CDC mode
-
Configuration Reference
- All configuration options
- Examples
-
Advanced Topics
- Parallel reading
- Checkpoint management
- Performance tuning
-
Troubleshooting
- Common errors
- Debug tips
Acceptance Criteria
- All 15 tests passing (unit + integration)
- Code coverage >90%
- Throughput >10K rows/sec (benchmark)
- Latency <100ms (CDC mode)
- Documentation complete
- Code reviewed and approved
- Security reviewed (if applicable)
Document Version: 1.0 Last Updated: October 29, 2025 Status: APPROVED Implementation: Week 1 (Nov 4-10, 2025)