HeliosDB FDW Architecture
HeliosDB FDW Architecture
This document describes the architecture and design of the HeliosDB Foreign Data Wrapper system.
Overview
The FDW system enables HeliosDB to query external data sources through a unified interface. It provides abstraction over heterogeneous data sources while optimizing query execution through predicate and projection pushdown.
Design Principles
- Unified Interface - Single API for all data sources
- Type Safety - Strong typing with Arrow as the common format
- Performance - Aggressive pushdown optimization
- Extensibility - Easy to add new FDW implementations
- Async First - All operations are asynchronous for scalability
Architecture Diagram
┌─────────────────────────────────────────────────────────────┐│ HeliosDB Query Engine │└──────────────────────┬──────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ FDW Registry ││ - Factory management ││ - Server instance cache ││ - Connection validation │└──────────────────────┬──────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ ForeignDataWrapper Trait ││ - scan() ││ - get_schema() ││ - insert() / update() / delete() ││ - supports_pushdown() │└──────────────────────┬──────────────────────────────────────┘ │ ┌───────────────┼───────────────┬───────────────┐ ▼ ▼ ▼ ▼┌─────────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐│ S3 FDW │ │ Postgres │ │ MySQL FDW │ │ MongoDB FDW ││ │ │ FDW │ │ │ │ │└─────────────┘ └──────────┘ └─────────────┘ └──────────────┘ │ │ │ │ ▼ ▼ ▼ ▼┌─────────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐│ AWS S3 │ │PostgreSQL│ │ MySQL │ │ MongoDB │└─────────────┘ └──────────┘ └─────────────┘ └──────────────┘Core Components
1. ForeignDataWrapper Trait
The central abstraction that all FDW implementations must satisfy.
#[async_trait]pub trait ForeignDataWrapper: Send + Sync { async fn scan(&self, table: &str, projection: &[String], predicates: &[Predicate]) -> FdwResult<RecordBatch>; async fn get_schema(&self, table: &str) -> FdwResult<Arc<Schema>>; async fn insert(&self, table: &str, batch: RecordBatch) -> FdwResult<usize>; async fn update(&self, table: &str, predicates: &[Predicate], values: &[(String, Value)]) -> FdwResult<usize>; async fn delete(&self, table: &str, predicates: &[Predicate]) -> FdwResult<usize>; fn supports_pushdown(&self) -> PushdownCapabilities; async fn validate_connection(&self) -> FdwResult<()>; async fn get_stats(&self) -> Option<QueryStats>; async fn list_tables(&self) -> FdwResult<Vec<String>>;}Key Methods:
scan()- Main query method, returns Arrow RecordBatchget_schema()- Returns table schema for planninginsert/update/delete()- Write operations (optional)supports_pushdown()- Declares optimization capabilitiesvalidate_connection()- Health check
2. Registry System
Manages FDW factories and server instances.
pub struct FdwRegistry { factories: RwLock<HashMap<String, FdwFactory>>, servers: RwLock<HashMap<String, (ServerConfig, FdwInstance)>>,}Responsibilities:
- Register FDW factories by type
- Create and cache server instances
- Validate connections on creation
- Thread-safe access with RwLock
Usage Pattern:
// Register a factoryregistry.register_factory("postgres", factory);
// Create a serverlet config = ServerConfig::new("pg1", "postgres") .with_option("host", "localhost");registry.create_server(config)?;
// Get instancelet fdw = registry.get_server("pg1")?;3. Type System
Value Enum
Common value representation across all data sources:
pub enum Value { Null, Boolean(bool), Int32(i32), Int64(i64), Float32(f32), Float64(f64), String(String), Binary(Vec<u8>), Timestamp(i64), Date(i32), Json(String),}Schema Mapping
Type conversion between source-specific types and Arrow types:
- SQL Types → Arrow DataType
- BSON Types → Arrow DataType
- JSON Inference → Arrow DataType
- CSV Inference → Arrow DataType
4. Predicate System
Tree-based predicate representation for filter pushdown:
pub enum Predicate { Eq(String, Value), NotEq(String, Value), Gt(String, Value), Gte(String, Value), Lt(String, Value), Lte(String, Value), Like(String, String), In(String, Vec<Value>), IsNull(String), IsNotNull(String), And(Vec<Predicate>), Or(Vec<Predicate>), Not(Box<Predicate>),}Translation:
- SQL WHERE clause generation
- MongoDB query document generation
- Parquet row group filtering (future)
5. Pushdown Optimization
pub struct PushdownCapabilities { pub predicates: bool, // WHERE clause pub aggregations: bool, // GROUP BY, COUNT, SUM pub limit: bool, // LIMIT/OFFSET pub joins: bool, // JOIN operations pub order_by: bool, // ORDER BY pub projection: bool, // Column selection}Optimization Strategy:
- Query planner checks
supports_pushdown() - Constructs optimal query for remote source
- Minimizes data transfer
- Leverages source-specific optimizations
FDW Implementations
S3 FDW
Architecture:
┌──────────────┐│ S3 Client │└──────┬───────┘ │ ▼┌──────────────┐ ┌────────────────┐│ Parquet │◄─────┤ File Reader ││ Reader │ └────────────────┘└──────────────┘ │ ▼┌──────────────┐│ Arrow Batch │└──────────────┘Features:
- Parquet column projection
- CSV schema inference
- Wildcard file matching
- Partitioned data support
- Future: Predicate pushdown via Parquet statistics
Data Flow:
- List objects with prefix
- Read file(s) into memory
- Parse with Parquet/CSV reader
- Convert to Arrow RecordBatch
- Apply client-side filters (if needed)
PostgreSQL/MySQL FDW
Architecture:
┌──────────────┐│Connection ││Pool (10) │└──────┬───────┘ │ ▼┌──────────────┐│SQL Query ││Builder │└──────┬───────┘ │ ▼┌──────────────┐│Result Set ││→ Arrow │└──────────────┘Features:
- Connection pooling (sqlx)
- Full predicate pushdown
- Transaction support
- Prepared statements
- Type mapping
Query Flow:
- Build SQL with pushdown
- Acquire connection from pool
- Execute query
- Convert rows to Arrow columns
- Return RecordBatch
Type Mapping:
| SQL Type | Arrow Type |
|---|---|
| BOOLEAN | Boolean |
| INT, INTEGER | Int32 |
| BIGINT | Int64 |
| REAL | Float32 |
| DOUBLE | Float64 |
| VARCHAR, TEXT | Utf8 |
| BYTEA | Binary |
| TIMESTAMP | Timestamp |
MongoDB FDW
Architecture:
┌──────────────┐│MongoDB ││Client │└──────┬───────┘ │ ▼┌──────────────┐│Query Doc ││Builder │└──────┬───────┘ │ ▼┌──────────────┐│BSON → Arrow ││Converter │└──────────────┘Features:
- MongoDB query translation
- BSON document mapping
- Nested document flattening
- Schema inference from samples
- Filter and projection pushdown
Query Flow:
- Translate predicates to MongoDB query doc
- Build projection document
- Execute find() with cursor
- Convert BSON → Value → Arrow
- Handle nested documents as JSON strings
BSON Mapping:
| BSON Type | Arrow Type |
|---|---|
| bool | Boolean |
| int | Int32 |
| long | Int64 |
| double | Float64 |
| string | Utf8 |
| binData | Binary |
| date | Timestamp |
| object | Utf8 (JSON) |
| array | Utf8 (JSON) |
REST API FDW
Architecture:
┌──────────────┐│HTTP Client ││(reqwest) │└──────┬───────┘ │ ▼┌──────────────┐│Auth Handler ││- Bearer ││- API Key ││- Basic │└──────┬───────┘ │ ▼┌──────────────┐│JSON Parser ││→ Arrow │└──────────────┘Features:
- Multiple auth types
- JSON schema inference
- POST for inserts
- DELETE by ID
- Rate limiting (future)
Request Flow:
- Build HTTP request
- Add authentication
- Send request
- Parse JSON response
- Infer schema from first object
- Convert JSON → Arrow
Error Handling
pub enum FdwError { Connection(String), Query(String), Schema(String), TypeConversion(String), Authentication(String), Io(std::io::Error), NotSupported(String), Configuration(String), Arrow(arrow::error::ArrowError), Other(anyhow::Error),}Error Strategy:
- Structured errors with context
- Propagate source errors with
? - Wrap in FdwError for consistency
- Client can inspect error type
Performance Considerations
1. Connection Pooling
SQL databases use connection pools:
PgPoolOptions::new() .max_connections(10) .acquire_timeout(Duration::from_secs(30)) .connect(&connection_string)2. Batch Processing
Use Arrow RecordBatch for bulk operations:
- Read/write in batches, not row-by-row
- Minimize network round trips
- Leverage columnar format
3. Predicate Pushdown
Always push predicates when supported:
// Instead of this:let all_data = fdw.scan("users", &[], &[]).await?;let filtered = apply_filter(all_data, predicates);
// Do this:let filtered_data = fdw.scan("users", &[], &predicates).await?;4. Projection Pushdown
Only request needed columns:
// Instead of:let all_cols = fdw.scan("users", &[], &[]).await?;
// Do this:let needed_cols = fdw.scan("users", &["id", "name"], &[]).await?;5. Parallel Processing
S3 FDW can read multiple files in parallel (future):
// Read multiple Parquet files concurrentlylet files = list_objects(prefix).await?;let batches = files.par_iter() .map(|f| read_parquet(f)) .collect()?;Testing Strategy
Unit Tests
Each module has comprehensive unit tests:
- Type conversion correctness
- Predicate translation
- Schema mapping
- Error handling
Integration Tests
Mock-based integration tests:
- Registry operations
- Server lifecycle
- Query construction
- End-to-end flows
Property Tests
Future: QuickCheck-style property tests:
- Predicate commutativity
- Type conversion roundtrips
- Schema equivalence
Future Enhancements
1. Advanced Pushdown
- Aggregation pushdown (COUNT, SUM, AVG)
- JOIN pushdown to PostgreSQL
- LIMIT/OFFSET pushdown
2. Caching Layer
- Query result caching
- Schema caching
- Connection caching
3. Statistics
- Track query metrics
- Cost-based optimization
- Adaptive query planning
4. More Data Sources
- Snowflake
- BigQuery
- Delta Lake
- Apache Iceberg
- Elasticsearch
5. Federation
- Cross-source JOINs
- Query splitting
- Result merging
Security Considerations
1. Credentials
- Never log credentials
- Support environment variables
- Use IAM roles when possible
- Secure credential storage
2. SQL Injection
- Use parameterized queries
- Escape identifiers
- Validate user input
3. Network
- Support SSL/TLS
- Validate certificates
- Connection timeouts
4. Access Control
- Honor source permissions
- Row-level security (future)
- Column-level security (future)
Monitoring and Observability
Metrics
- Query count by FDW type
- Query latency percentiles
- Connection pool stats
- Error rates
Logging
Using tracing crate:
tracing::info!("Executing query on {}", table);tracing::debug!("Predicates: {:?}", predicates);Query Stats
pub struct QueryStats { pub rows_scanned: usize, pub rows_returned: usize, pub execution_time_ms: u64, pub bytes_transferred: usize, pub predicates_pushed: bool,}Contributing
To add a new FDW:
- Implement
ForeignDataWrappertrait - Add comprehensive tests
- Update registry initialization
- Add documentation
- Provide example usage