Skip to content

HeliosDB Foreign Data Wrappers (FDW)

HeliosDB Foreign Data Wrappers (FDW)

Foreign Data Wrappers for HeliosDB v3.0 - Query external data sources seamlessly.

Overview

HeliosDB FDW provides a comprehensive system for querying external data sources as if they were native tables. This enables federated queries across multiple data sources, data lake analytics, and unified data access.

Supported Data Sources

  • S3 - Query Parquet and CSV files on Amazon S3
  • PostgreSQL - Full SQL query passthrough with connection pooling
  • MySQL - MySQL database queries with type mapping
  • MongoDB - NoSQL document queries with BSON mapping
  • REST APIs - HTTP-based data sources with JSON parsing

Features

Core Capabilities

  • Unified API - Single interface for all data sources
  • Type Mapping - Automatic type conversion between source and Arrow types
  • Predicate Pushdown - Push filters to remote sources for performance
  • Projection Pushdown - Read only required columns
  • Connection Pooling - Efficient connection management for SQL databases
  • Schema Inference - Automatic schema detection for CSV and JSON
  • Partitioned Data - Support for partitioned S3 data (year=2025/month=01/)

Performance Optimizations

  • Predicate pushdown to reduce data transfer
  • Column projection to minimize I/O
  • Connection pooling for SQL databases
  • Parallel file reading for S3
  • Arrow columnar format for efficient data handling

Quick Start

Installation

Add to your Cargo.toml:

[dependencies]
heliosdb-fdw = "3.0.0"

Basic Usage

use heliosdb_fdw::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create server configuration
let config = ServerConfig::new("pg_server", "postgres")
.with_option("host", "localhost")
.with_option("database", "mydb")
.with_option("username", "user")
.with_option("password", "pass");
// Create FDW instance
let fdw = postgres_fdw::PostgresFdw::new(&config).await?;
// Query data
let batch = fdw.scan("users", &[], &[]).await?;
println!("Retrieved {} rows", batch.num_rows());
Ok(())
}

Data Source Configuration

S3 FDW

let config = ServerConfig::new("s3_server", "s3")
.with_option("bucket", "my-data-bucket")
.with_option("region", "us-west-2");
let fdw = s3_fdw::S3Fdw::new(&config).await?;
// Query Parquet file
let batch = fdw.scan("data/events.parquet", &[], &[]).await?;
// Query multiple files
let batch = fdw.scan("logs/*.parquet", &[], &[]).await?;
// Query CSV file
let batch = fdw.scan("data/users.csv", &[], &[]).await?;

Configuration Options:

  • bucket (required) - S3 bucket name
  • region (optional) - AWS region (default: us-east-1)

Supported Formats:

  • Parquet (.parquet)
  • CSV (.csv)

Features:

  • Column projection
  • Wildcard file matching
  • Schema inference for CSV
  • Partitioned data support

PostgreSQL FDW

let config = ServerConfig::new("pg_server", "postgres")
.with_option("host", "localhost")
.with_option("port", "5432")
.with_option("database", "mydb")
.with_option("username", "postgres")
.with_option("password", "secret")
.with_option("schema", "public");
let fdw = postgres_fdw::PostgresFdw::new(&config).await?;
// Query with predicates
let predicates = vec![
Predicate::Gt("age".to_string(), Value::Int32(18)),
Predicate::Eq("status".to_string(), Value::String("active".to_string())),
];
let batch = fdw.scan("users", &[], &predicates).await?;

Configuration Options:

  • host (optional) - Database host (default: localhost)
  • port (optional) - Port (default: 5432)
  • database (required) - Database name
  • username (required) - Username
  • password (optional) - Password
  • schema (optional) - Schema name (default: public)

Features:

  • Full SQL predicate pushdown
  • Connection pooling (10 connections)
  • Transaction support
  • INSERT/UPDATE/DELETE operations
  • Type mapping for all PostgreSQL types

MySQL FDW

let config = ServerConfig::new("mysql_server", "mysql")
.with_option("host", "localhost")
.with_option("port", "3306")
.with_option("database", "mydb")
.with_option("username", "root")
.with_option("password", "secret");
let fdw = mysql_fdw::MySqlFdw::new(&config).await?;

Configuration Options:

  • host (optional) - Database host (default: localhost)
  • port (optional) - Port (default: 3306)
  • database (required) - Database name
  • username (required) - Username
  • password (optional) - Password

Features:

  • SQL predicate pushdown
  • Connection pooling
  • Full CRUD operations
  • Type mapping for MySQL types

MongoDB FDW

let config = ServerConfig::new("mongo_server", "mongodb")
.with_option("host", "localhost")
.with_option("port", "27017")
.with_option("database", "mydb")
.with_option("username", "user")
.with_option("password", "pass");
let fdw = mongodb_fdw::MongoDbFdw::new(&config).await?;
// Query with MongoDB-style predicates
let predicates = vec![
Predicate::Gt("age".to_string(), Value::Int32(18)),
];
let batch = fdw.scan("users", &[], &predicates).await?;

Configuration Options:

  • host (optional) - MongoDB host (default: localhost)
  • port (optional) - Port (default: 27017)
  • database (required) - Database name
  • username (optional) - Username
  • password (optional) - Password

Features:

  • MongoDB query translation
  • BSON to Arrow mapping
  • Schema inference from documents
  • Nested document flattening to JSON
  • Full CRUD operations

REST API FDW

let config = ServerConfig::new("api_server", "rest")
.with_option("base_url", "https://api.example.com")
.with_option("auth_type", "bearer")
.with_option("token", "your-api-token");
let fdw = rest_fdw::RestFdw::new(&config).await?;
// Query API endpoint
let batch = fdw.scan("users", &[], &[]).await?;

Configuration Options:

  • base_url (required) - Base API URL
  • auth_type (optional) - Authentication type: none, bearer, api_key, basic
  • token (for bearer auth) - Bearer token
  • auth_header (for api_key) - Header name for API key
  • api_key (for api_key) - API key value
  • username (for basic) - Username
  • password (for basic) - Password

Supported Auth Types:

  • None (no authentication)
  • Bearer token
  • API key in header
  • Basic authentication

Features:

  • JSON response parsing
  • Schema inference
  • POST for inserts
  • DELETE by ID
  • Automatic pagination (coming soon)

Predicates and Filtering

The FDW system supports rich predicate pushdown for optimal performance:

use heliosdb_fdw::*;
// Simple equality
let pred = Predicate::Eq("id".to_string(), Value::Int32(42));
// Comparison operators
let pred = Predicate::Gt("age".to_string(), Value::Int32(18));
let pred = Predicate::Gte("score".to_string(), Value::Float64(80.0));
let pred = Predicate::Lt("price".to_string(), Value::Float64(100.0));
let pred = Predicate::Lte("quantity".to_string(), Value::Int32(10));
// LIKE pattern matching
let pred = Predicate::Like("name".to_string(), "John%".to_string());
// IN clause
let pred = Predicate::In(
"status".to_string(),
vec![
Value::String("active".to_string()),
Value::String("pending".to_string()),
],
);
// NULL checks
let pred = Predicate::IsNull("deleted_at".to_string());
let pred = Predicate::IsNotNull("email".to_string());
// Logical operators
let pred = Predicate::And(vec![
Predicate::Gt("age".to_string(), Value::Int32(18)),
Predicate::Lt("age".to_string(), Value::Int32(65)),
]);
let pred = Predicate::Or(vec![
Predicate::Eq("status".to_string(), Value::String("active".to_string())),
Predicate::Eq("status".to_string(), Value::String("pending".to_string())),
]);
let pred = Predicate::Not(Box::new(
Predicate::Eq("deleted".to_string(), Value::Boolean(true))
));

Pushdown Capabilities

Different FDWs support different optimization capabilities:

FDWPredicatesAggregationsLimitJoinsOrder ByProjection
S3⚠ Partial
PostgreSQL
MySQL
MongoDB
REST

Registry and Server Management

Use the global registry for managing FDW instances:

use heliosdb_fdw::*;
// Initialize built-in FDWs
init_builtin_fdws()?;
// Get the global registry
let registry = global_registry();
// Create a server
let config = ServerConfig::new("my_pg", "postgres")
.with_option("host", "localhost")
.with_option("database", "mydb")
.with_option("username", "user");
registry.create_server(config)?;
// Get a server instance
let fdw = registry.get_server("my_pg")?;
// List all servers
let servers = registry.list_servers()?;
// Drop a server
registry.drop_server("my_pg")?;

Examples

See the examples/ directory for complete working examples:

  • s3_query.rs - Query Parquet and CSV files on S3
  • postgres_fdw.rs - Complete PostgreSQL integration example

Run examples:

Terminal window
cargo run --example s3_query
cargo run --example postgres_fdw

Testing

Run the test suite:

Terminal window
# Run all tests
cargo test
# Run with output
cargo test -- --nocapture
# Run integration tests only
cargo test --test integration_test
# Run with coverage
cargo tarpaulin --out Html

Architecture

See ARCHITECTURE.md for detailed design documentation.

Performance Tips

  1. Use projection - Only request columns you need
  2. Push down predicates - Filter at the source when possible
  3. Batch operations - Use Arrow RecordBatch for bulk operations
  4. Connection pooling - Reuse connections for SQL databases
  5. Partition pruning - Use partitioned S3 data for better performance

Limitations

  • S3 FDW is read-only (no INSERT/UPDATE/DELETE)
  • REST FDW supports limited UPDATE operations
  • MongoDB aggregations not yet implemented
  • S3 predicate pushdown limited to Parquet statistics

Future Enhancements

  • Delta Lake support for S3
  • Apache Iceberg table format
  • Snowflake connector
  • BigQuery connector
  • More advanced aggregation pushdown
  • Cost-based query optimization
  • Parallel query execution

License

MIT OR Apache-2.0

Contributing

Contributions welcome! Please see CONTRIBUTING.md for guidelines.