Skip to content

HeliosDB FDW Architecture

HeliosDB FDW Architecture

This document describes the architecture and design of the HeliosDB Foreign Data Wrapper system.

Overview

The FDW system enables HeliosDB to query external data sources through a unified interface. It provides abstraction over heterogeneous data sources while optimizing query execution through predicate and projection pushdown.

Design Principles

  1. Unified Interface - Single API for all data sources
  2. Type Safety - Strong typing with Arrow as the common format
  3. Performance - Aggressive pushdown optimization
  4. Extensibility - Easy to add new FDW implementations
  5. Async First - All operations are asynchronous for scalability

Architecture Diagram

┌─────────────────────────────────────────────────────────────┐
│ HeliosDB Query Engine │
└──────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ FDW Registry │
│ - Factory management │
│ - Server instance cache │
│ - Connection validation │
└──────────────────────┬──────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ForeignDataWrapper Trait │
│ - scan() │
│ - get_schema() │
│ - insert() / update() / delete() │
│ - supports_pushdown() │
└──────────────────────┬──────────────────────────────────────┘
┌───────────────┼───────────────┬───────────────┐
▼ ▼ ▼ ▼
┌─────────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ S3 FDW │ │ Postgres │ │ MySQL FDW │ │ MongoDB FDW │
│ │ │ FDW │ │ │ │ │
└─────────────┘ └──────────┘ └─────────────┘ └──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ AWS S3 │ │PostgreSQL│ │ MySQL │ │ MongoDB │
└─────────────┘ └──────────┘ └─────────────┘ └──────────────┘

Core Components

1. ForeignDataWrapper Trait

The central abstraction that all FDW implementations must satisfy.

#[async_trait]
pub trait ForeignDataWrapper: Send + Sync {
async fn scan(&self, table: &str, projection: &[String],
predicates: &[Predicate]) -> FdwResult<RecordBatch>;
async fn get_schema(&self, table: &str) -> FdwResult<Arc<Schema>>;
async fn insert(&self, table: &str, batch: RecordBatch) -> FdwResult<usize>;
async fn update(&self, table: &str, predicates: &[Predicate],
values: &[(String, Value)]) -> FdwResult<usize>;
async fn delete(&self, table: &str, predicates: &[Predicate]) -> FdwResult<usize>;
fn supports_pushdown(&self) -> PushdownCapabilities;
async fn validate_connection(&self) -> FdwResult<()>;
async fn get_stats(&self) -> Option<QueryStats>;
async fn list_tables(&self) -> FdwResult<Vec<String>>;
}

Key Methods:

  • scan() - Main query method, returns Arrow RecordBatch
  • get_schema() - Returns table schema for planning
  • insert/update/delete() - Write operations (optional)
  • supports_pushdown() - Declares optimization capabilities
  • validate_connection() - Health check

2. Registry System

Manages FDW factories and server instances.

pub struct FdwRegistry {
factories: RwLock<HashMap<String, FdwFactory>>,
servers: RwLock<HashMap<String, (ServerConfig, FdwInstance)>>,
}

Responsibilities:

  • Register FDW factories by type
  • Create and cache server instances
  • Validate connections on creation
  • Thread-safe access with RwLock

Usage Pattern:

// Register a factory
registry.register_factory("postgres", factory);
// Create a server
let config = ServerConfig::new("pg1", "postgres")
.with_option("host", "localhost");
registry.create_server(config)?;
// Get instance
let fdw = registry.get_server("pg1")?;

3. Type System

Value Enum

Common value representation across all data sources:

pub enum Value {
Null,
Boolean(bool),
Int32(i32),
Int64(i64),
Float32(f32),
Float64(f64),
String(String),
Binary(Vec<u8>),
Timestamp(i64),
Date(i32),
Json(String),
}

Schema Mapping

Type conversion between source-specific types and Arrow types:

  • SQL Types → Arrow DataType
  • BSON Types → Arrow DataType
  • JSON Inference → Arrow DataType
  • CSV Inference → Arrow DataType

4. Predicate System

Tree-based predicate representation for filter pushdown:

pub enum Predicate {
Eq(String, Value),
NotEq(String, Value),
Gt(String, Value),
Gte(String, Value),
Lt(String, Value),
Lte(String, Value),
Like(String, String),
In(String, Vec<Value>),
IsNull(String),
IsNotNull(String),
And(Vec<Predicate>),
Or(Vec<Predicate>),
Not(Box<Predicate>),
}

Translation:

  • SQL WHERE clause generation
  • MongoDB query document generation
  • Parquet row group filtering (future)

5. Pushdown Optimization

pub struct PushdownCapabilities {
pub predicates: bool, // WHERE clause
pub aggregations: bool, // GROUP BY, COUNT, SUM
pub limit: bool, // LIMIT/OFFSET
pub joins: bool, // JOIN operations
pub order_by: bool, // ORDER BY
pub projection: bool, // Column selection
}

Optimization Strategy:

  1. Query planner checks supports_pushdown()
  2. Constructs optimal query for remote source
  3. Minimizes data transfer
  4. Leverages source-specific optimizations

FDW Implementations

S3 FDW

Architecture:

┌──────────────┐
│ S3 Client │
└──────┬───────┘
┌──────────────┐ ┌────────────────┐
│ Parquet │◄─────┤ File Reader │
│ Reader │ └────────────────┘
└──────────────┘
┌──────────────┐
│ Arrow Batch │
└──────────────┘

Features:

  • Parquet column projection
  • CSV schema inference
  • Wildcard file matching
  • Partitioned data support
  • Future: Predicate pushdown via Parquet statistics

Data Flow:

  1. List objects with prefix
  2. Read file(s) into memory
  3. Parse with Parquet/CSV reader
  4. Convert to Arrow RecordBatch
  5. Apply client-side filters (if needed)

PostgreSQL/MySQL FDW

Architecture:

┌──────────────┐
│Connection │
│Pool (10) │
└──────┬───────┘
┌──────────────┐
│SQL Query │
│Builder │
└──────┬───────┘
┌──────────────┐
│Result Set │
│→ Arrow │
└──────────────┘

Features:

  • Connection pooling (sqlx)
  • Full predicate pushdown
  • Transaction support
  • Prepared statements
  • Type mapping

Query Flow:

  1. Build SQL with pushdown
  2. Acquire connection from pool
  3. Execute query
  4. Convert rows to Arrow columns
  5. Return RecordBatch

Type Mapping:

SQL TypeArrow Type
BOOLEANBoolean
INT, INTEGERInt32
BIGINTInt64
REALFloat32
DOUBLEFloat64
VARCHAR, TEXTUtf8
BYTEABinary
TIMESTAMPTimestamp

MongoDB FDW

Architecture:

┌──────────────┐
│MongoDB │
│Client │
└──────┬───────┘
┌──────────────┐
│Query Doc │
│Builder │
└──────┬───────┘
┌──────────────┐
│BSON → Arrow │
│Converter │
└──────────────┘

Features:

  • MongoDB query translation
  • BSON document mapping
  • Nested document flattening
  • Schema inference from samples
  • Filter and projection pushdown

Query Flow:

  1. Translate predicates to MongoDB query doc
  2. Build projection document
  3. Execute find() with cursor
  4. Convert BSON → Value → Arrow
  5. Handle nested documents as JSON strings

BSON Mapping:

BSON TypeArrow Type
boolBoolean
intInt32
longInt64
doubleFloat64
stringUtf8
binDataBinary
dateTimestamp
objectUtf8 (JSON)
arrayUtf8 (JSON)

REST API FDW

Architecture:

┌──────────────┐
│HTTP Client │
│(reqwest) │
└──────┬───────┘
┌──────────────┐
│Auth Handler │
│- Bearer │
│- API Key │
│- Basic │
└──────┬───────┘
┌──────────────┐
│JSON Parser │
│→ Arrow │
└──────────────┘

Features:

  • Multiple auth types
  • JSON schema inference
  • POST for inserts
  • DELETE by ID
  • Rate limiting (future)

Request Flow:

  1. Build HTTP request
  2. Add authentication
  3. Send request
  4. Parse JSON response
  5. Infer schema from first object
  6. Convert JSON → Arrow

Error Handling

pub enum FdwError {
Connection(String),
Query(String),
Schema(String),
TypeConversion(String),
Authentication(String),
Io(std::io::Error),
NotSupported(String),
Configuration(String),
Arrow(arrow::error::ArrowError),
Other(anyhow::Error),
}

Error Strategy:

  • Structured errors with context
  • Propagate source errors with ?
  • Wrap in FdwError for consistency
  • Client can inspect error type

Performance Considerations

1. Connection Pooling

SQL databases use connection pools:

PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(30))
.connect(&connection_string)

2. Batch Processing

Use Arrow RecordBatch for bulk operations:

  • Read/write in batches, not row-by-row
  • Minimize network round trips
  • Leverage columnar format

3. Predicate Pushdown

Always push predicates when supported:

// Instead of this:
let all_data = fdw.scan("users", &[], &[]).await?;
let filtered = apply_filter(all_data, predicates);
// Do this:
let filtered_data = fdw.scan("users", &[], &predicates).await?;

4. Projection Pushdown

Only request needed columns:

// Instead of:
let all_cols = fdw.scan("users", &[], &[]).await?;
// Do this:
let needed_cols = fdw.scan("users", &["id", "name"], &[]).await?;

5. Parallel Processing

S3 FDW can read multiple files in parallel (future):

// Read multiple Parquet files concurrently
let files = list_objects(prefix).await?;
let batches = files.par_iter()
.map(|f| read_parquet(f))
.collect()?;

Testing Strategy

Unit Tests

Each module has comprehensive unit tests:

  • Type conversion correctness
  • Predicate translation
  • Schema mapping
  • Error handling

Integration Tests

Mock-based integration tests:

  • Registry operations
  • Server lifecycle
  • Query construction
  • End-to-end flows

Property Tests

Future: QuickCheck-style property tests:

  • Predicate commutativity
  • Type conversion roundtrips
  • Schema equivalence

Future Enhancements

1. Advanced Pushdown

  • Aggregation pushdown (COUNT, SUM, AVG)
  • JOIN pushdown to PostgreSQL
  • LIMIT/OFFSET pushdown

2. Caching Layer

  • Query result caching
  • Schema caching
  • Connection caching

3. Statistics

  • Track query metrics
  • Cost-based optimization
  • Adaptive query planning

4. More Data Sources

  • Snowflake
  • BigQuery
  • Delta Lake
  • Apache Iceberg
  • Elasticsearch

5. Federation

  • Cross-source JOINs
  • Query splitting
  • Result merging

Security Considerations

1. Credentials

  • Never log credentials
  • Support environment variables
  • Use IAM roles when possible
  • Secure credential storage

2. SQL Injection

  • Use parameterized queries
  • Escape identifiers
  • Validate user input

3. Network

  • Support SSL/TLS
  • Validate certificates
  • Connection timeouts

4. Access Control

  • Honor source permissions
  • Row-level security (future)
  • Column-level security (future)

Monitoring and Observability

Metrics

  • Query count by FDW type
  • Query latency percentiles
  • Connection pool stats
  • Error rates

Logging

Using tracing crate:

tracing::info!("Executing query on {}", table);
tracing::debug!("Predicates: {:?}", predicates);

Query Stats

pub struct QueryStats {
pub rows_scanned: usize,
pub rows_returned: usize,
pub execution_time_ms: u64,
pub bytes_transferred: usize,
pub predicates_pushed: bool,
}

Contributing

To add a new FDW:

  1. Implement ForeignDataWrapper trait
  2. Add comprehensive tests
  3. Update registry initialization
  4. Add documentation
  5. Provide example usage

References