HeliosDB Foreign Data Wrappers (FDW)
HeliosDB Foreign Data Wrappers (FDW)
Foreign Data Wrappers for HeliosDB v3.0 - Query external data sources seamlessly.
Overview
HeliosDB FDW provides a comprehensive system for querying external data sources as if they were native tables. This enables federated queries across multiple data sources, data lake analytics, and unified data access.
Supported Data Sources
- S3 - Query Parquet and CSV files on Amazon S3
- PostgreSQL - Full SQL query passthrough with connection pooling
- MySQL - MySQL database queries with type mapping
- MongoDB - NoSQL document queries with BSON mapping
- REST APIs - HTTP-based data sources with JSON parsing
Features
Core Capabilities
- Unified API - Single interface for all data sources
- Type Mapping - Automatic type conversion between source and Arrow types
- Predicate Pushdown - Push filters to remote sources for performance
- Projection Pushdown - Read only required columns
- Connection Pooling - Efficient connection management for SQL databases
- Schema Inference - Automatic schema detection for CSV and JSON
- Partitioned Data - Support for partitioned S3 data (year=2025/month=01/)
Performance Optimizations
- Predicate pushdown to reduce data transfer
- Column projection to minimize I/O
- Connection pooling for SQL databases
- Parallel file reading for S3
- Arrow columnar format for efficient data handling
Quick Start
Installation
Add to your Cargo.toml:
[dependencies]heliosdb-fdw = "3.0.0"Basic Usage
use heliosdb_fdw::*;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Create server configuration let config = ServerConfig::new("pg_server", "postgres") .with_option("host", "localhost") .with_option("database", "mydb") .with_option("username", "user") .with_option("password", "pass");
// Create FDW instance let fdw = postgres_fdw::PostgresFdw::new(&config).await?;
// Query data let batch = fdw.scan("users", &[], &[]).await?; println!("Retrieved {} rows", batch.num_rows());
Ok(())}Data Source Configuration
S3 FDW
let config = ServerConfig::new("s3_server", "s3") .with_option("bucket", "my-data-bucket") .with_option("region", "us-west-2");
let fdw = s3_fdw::S3Fdw::new(&config).await?;
// Query Parquet filelet batch = fdw.scan("data/events.parquet", &[], &[]).await?;
// Query multiple fileslet batch = fdw.scan("logs/*.parquet", &[], &[]).await?;
// Query CSV filelet batch = fdw.scan("data/users.csv", &[], &[]).await?;Configuration Options:
bucket(required) - S3 bucket nameregion(optional) - AWS region (default: us-east-1)
Supported Formats:
- Parquet (.parquet)
- CSV (.csv)
Features:
- Column projection
- Wildcard file matching
- Schema inference for CSV
- Partitioned data support
PostgreSQL FDW
let config = ServerConfig::new("pg_server", "postgres") .with_option("host", "localhost") .with_option("port", "5432") .with_option("database", "mydb") .with_option("username", "postgres") .with_option("password", "secret") .with_option("schema", "public");
let fdw = postgres_fdw::PostgresFdw::new(&config).await?;
// Query with predicateslet predicates = vec![ Predicate::Gt("age".to_string(), Value::Int32(18)), Predicate::Eq("status".to_string(), Value::String("active".to_string())),];
let batch = fdw.scan("users", &[], &predicates).await?;Configuration Options:
host(optional) - Database host (default: localhost)port(optional) - Port (default: 5432)database(required) - Database nameusername(required) - Usernamepassword(optional) - Passwordschema(optional) - Schema name (default: public)
Features:
- Full SQL predicate pushdown
- Connection pooling (10 connections)
- Transaction support
- INSERT/UPDATE/DELETE operations
- Type mapping for all PostgreSQL types
MySQL FDW
let config = ServerConfig::new("mysql_server", "mysql") .with_option("host", "localhost") .with_option("port", "3306") .with_option("database", "mydb") .with_option("username", "root") .with_option("password", "secret");
let fdw = mysql_fdw::MySqlFdw::new(&config).await?;Configuration Options:
host(optional) - Database host (default: localhost)port(optional) - Port (default: 3306)database(required) - Database nameusername(required) - Usernamepassword(optional) - Password
Features:
- SQL predicate pushdown
- Connection pooling
- Full CRUD operations
- Type mapping for MySQL types
MongoDB FDW
let config = ServerConfig::new("mongo_server", "mongodb") .with_option("host", "localhost") .with_option("port", "27017") .with_option("database", "mydb") .with_option("username", "user") .with_option("password", "pass");
let fdw = mongodb_fdw::MongoDbFdw::new(&config).await?;
// Query with MongoDB-style predicateslet predicates = vec![ Predicate::Gt("age".to_string(), Value::Int32(18)),];
let batch = fdw.scan("users", &[], &predicates).await?;Configuration Options:
host(optional) - MongoDB host (default: localhost)port(optional) - Port (default: 27017)database(required) - Database nameusername(optional) - Usernamepassword(optional) - Password
Features:
- MongoDB query translation
- BSON to Arrow mapping
- Schema inference from documents
- Nested document flattening to JSON
- Full CRUD operations
REST API FDW
let config = ServerConfig::new("api_server", "rest") .with_option("base_url", "https://api.example.com") .with_option("auth_type", "bearer") .with_option("token", "your-api-token");
let fdw = rest_fdw::RestFdw::new(&config).await?;
// Query API endpointlet batch = fdw.scan("users", &[], &[]).await?;Configuration Options:
base_url(required) - Base API URLauth_type(optional) - Authentication type: none, bearer, api_key, basictoken(for bearer auth) - Bearer tokenauth_header(for api_key) - Header name for API keyapi_key(for api_key) - API key valueusername(for basic) - Usernamepassword(for basic) - Password
Supported Auth Types:
- None (no authentication)
- Bearer token
- API key in header
- Basic authentication
Features:
- JSON response parsing
- Schema inference
- POST for inserts
- DELETE by ID
- Automatic pagination (coming soon)
Predicates and Filtering
The FDW system supports rich predicate pushdown for optimal performance:
use heliosdb_fdw::*;
// Simple equalitylet pred = Predicate::Eq("id".to_string(), Value::Int32(42));
// Comparison operatorslet pred = Predicate::Gt("age".to_string(), Value::Int32(18));let pred = Predicate::Gte("score".to_string(), Value::Float64(80.0));let pred = Predicate::Lt("price".to_string(), Value::Float64(100.0));let pred = Predicate::Lte("quantity".to_string(), Value::Int32(10));
// LIKE pattern matchinglet pred = Predicate::Like("name".to_string(), "John%".to_string());
// IN clauselet pred = Predicate::In( "status".to_string(), vec![ Value::String("active".to_string()), Value::String("pending".to_string()), ],);
// NULL checkslet pred = Predicate::IsNull("deleted_at".to_string());let pred = Predicate::IsNotNull("email".to_string());
// Logical operatorslet pred = Predicate::And(vec![ Predicate::Gt("age".to_string(), Value::Int32(18)), Predicate::Lt("age".to_string(), Value::Int32(65)),]);
let pred = Predicate::Or(vec![ Predicate::Eq("status".to_string(), Value::String("active".to_string())), Predicate::Eq("status".to_string(), Value::String("pending".to_string())),]);
let pred = Predicate::Not(Box::new( Predicate::Eq("deleted".to_string(), Value::Boolean(true))));Pushdown Capabilities
Different FDWs support different optimization capabilities:
| FDW | Predicates | Aggregations | Limit | Joins | Order By | Projection |
|---|---|---|---|---|---|---|
| S3 | ⚠ Partial | ❌ | ❌ | ❌ | ❌ | |
| PostgreSQL | ||||||
| MySQL | ||||||
| MongoDB | ❌ | ❌ | ❌ | |||
| REST | ❌ | ❌ | ❌ | ❌ | ❌ |
Registry and Server Management
Use the global registry for managing FDW instances:
use heliosdb_fdw::*;
// Initialize built-in FDWsinit_builtin_fdws()?;
// Get the global registrylet registry = global_registry();
// Create a serverlet config = ServerConfig::new("my_pg", "postgres") .with_option("host", "localhost") .with_option("database", "mydb") .with_option("username", "user");
registry.create_server(config)?;
// Get a server instancelet fdw = registry.get_server("my_pg")?;
// List all serverslet servers = registry.list_servers()?;
// Drop a serverregistry.drop_server("my_pg")?;Examples
See the examples/ directory for complete working examples:
s3_query.rs- Query Parquet and CSV files on S3postgres_fdw.rs- Complete PostgreSQL integration example
Run examples:
cargo run --example s3_querycargo run --example postgres_fdwTesting
Run the test suite:
# Run all testscargo test
# Run with outputcargo test -- --nocapture
# Run integration tests onlycargo test --test integration_test
# Run with coveragecargo tarpaulin --out HtmlArchitecture
See ARCHITECTURE.md for detailed design documentation.
Performance Tips
- Use projection - Only request columns you need
- Push down predicates - Filter at the source when possible
- Batch operations - Use Arrow RecordBatch for bulk operations
- Connection pooling - Reuse connections for SQL databases
- Partition pruning - Use partitioned S3 data for better performance
Limitations
- S3 FDW is read-only (no INSERT/UPDATE/DELETE)
- REST FDW supports limited UPDATE operations
- MongoDB aggregations not yet implemented
- S3 predicate pushdown limited to Parquet statistics
Future Enhancements
- Delta Lake support for S3
- Apache Iceberg table format
- Snowflake connector
- BigQuery connector
- More advanced aggregation pushdown
- Cost-based query optimization
- Parallel query execution
License
MIT OR Apache-2.0
Contributing
Contributions welcome! Please see CONTRIBUTING.md for guidelines.