MongoDB to HeliosDB-Lite Migration Guide
MongoDB to HeliosDB-Lite Migration Guide
Category: Database Migration HeliosDB-Lite Version: 3.5+
Overview
MongoDB is the most popular document database, used by hundreds of thousands of applications for its flexible schema, developer-friendly API, and horizontal scalability. However, as applications mature, teams often discover the limitations of the document model: lack of true multi-document ACID transactions (prior to 4.0, and still with performance caveats), no native JOINs across collections, escalating Atlas hosting costs, and the inability to perform ad-hoc analytical queries without building a separate data pipeline.
HeliosDB-Lite offers a compelling alternative: store documents as JSONB columns in relational tables, gaining the full power of SQL (JOINs, aggregations, window functions, CTEs) while retaining document flexibility. The JSONB type provides binary-indexed JSON storage with containment operators (@>), key existence checks (?), path extraction (->, ->>), and 20+ built-in functions.
The migration path is: export collections with mongoexport (JSON), transform into SQL INSERT statements, and import via the PostgreSQL wire protocol. Applications switch from the MongoDB driver to a PostgreSQL driver, translating find/aggregate calls into SQL queries.
Why Migrate?
Architectural Advantages
| Capability | MongoDB | HeliosDB-Lite | Advantage |
|---|---|---|---|
| ACID transactions | Per-document (multi-doc since 4.0, performance penalty) | Full multi-row, multi-table ACID | No performance penalty for transactions |
| JOINs | $lookup (limited, slow on large collections) | Native SQL JOINs (hash, merge, nested loop) | Orders of magnitude faster for relational queries |
| Aggregations | Aggregation pipeline (complex, non-standard) | SQL GROUP BY, window functions, CTEs | Standard SQL; simpler to write, optimize, and maintain |
| Schema validation | Optional JSON Schema | SQL constraints (NOT NULL, CHECK, FK, UNIQUE) | Enforced at the database level |
| Full-text search | Atlas Search (additional cost) or text indexes | Native FTS with @@ operator | No additional service needed |
| Vector search | Atlas Vector Search (additional cost) | Native HNSW indexes with SIMD | No additional service needed |
| Time-travel | No | AS OF queries on any table | Point-in-time data access |
| Branching | No | Database-level branching for dev/test | Isolated environments without data copies |
| Embedded deployment | No (requires mongod process) | Yes (in-process, zero-config) | Eliminates network latency and operational overhead |
Cost Comparison
| Cost Factor | MongoDB Atlas (M30) | HeliosDB-Lite (Embedded) | Savings |
|---|---|---|---|
| Database hosting | $1,800/month | $0 (embedded in app) | 100% |
| Atlas Search | $400/month | $0 (built-in FTS) | 100% |
| Atlas Vector Search | $300/month | $0 (built-in HNSW) | 100% |
| Data transfer | $500/month | $0 (in-process) | 100% |
| Backup storage | $200/month | $0 (file-level backup) | 100% |
| Total annual | $38,400 | ~$1,500 (compute only) | 96% |
Performance Gains
| Operation | MongoDB Atlas | HeliosDB-Lite | Improvement |
|---|---|---|---|
| Single document read | 35ms P95 | 8ms P95 | 77% faster |
| Document insert | 52ms P95 | 14ms P95 | 73% faster |
| Aggregation (GROUP BY) | 180ms P95 | 45ms P95 | 75% faster |
| Cross-collection $lookup | 320ms P95 | 55ms P95 (SQL JOIN) | 83% faster |
| Text search | 95ms P95 | 28ms P95 | 71% faster |
Compatibility Matrix
| MongoDB Concept | HeliosDB-Lite Equivalent | Migration Effort |
|---|---|---|
| Collection | Table (with JSONB column or normalized columns) | Schema design |
Document ({}) | Row (JSONB column or individual columns) | Data transform |
_id (ObjectId) | id SERIAL PRIMARY KEY or id TEXT PRIMARY KEY | Auto-generated |
| Nested document | JSONB value or separate table with FK | Design decision |
| Array field | JSONB array or ARRAY type column | Design decision |
find() | SELECT ... WHERE | Query rewrite |
insertOne() / insertMany() | INSERT INTO ... VALUES | Query rewrite |
updateOne() / updateMany() | UPDATE ... SET ... WHERE | Query rewrite |
deleteOne() / deleteMany() | DELETE FROM ... WHERE | Query rewrite |
aggregate() pipeline | SQL GROUP BY, window functions, CTEs | Query rewrite |
$match | WHERE clause | Direct mapping |
$group | GROUP BY + aggregate functions | Direct mapping |
$sort | ORDER BY | Direct mapping |
$project | SELECT column list | Direct mapping |
$limit / $skip | LIMIT / OFFSET | Direct mapping |
$lookup | JOIN | Direct mapping (much faster) |
$unwind | jsonb_array_elements() or UNNEST() | Function call |
$addFields | Computed columns in SELECT | Direct mapping |
$count | COUNT(*) | Direct mapping |
| Compound index | CREATE INDEX ... ON table (col1, col2) | DDL statement |
| Text index | FTS index with @@ operator | Different syntax |
| TTL index | Scheduled DELETE or partition pruning | Application logic |
| Change streams | Triggers | Different mechanism |
| Transactions | BEGIN / COMMIT / ROLLBACK | Standard SQL |
| Replica set | WAL streaming replication | Architecture change |
| Sharding | HeliosDB partitioning (HASH/RANGE/LIST) | Architecture change |
Migration Steps
Step 1: Choose Your Schema Strategy
You have two main approaches:
Approach A: Document-Preserving (JSONB Column)
Keep the document structure intact. Best for heterogeneous documents or when you want minimal transformation.
CREATE TABLE users ( id SERIAL PRIMARY KEY, doc JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW());
-- Index specific JSONB paths for query performanceCREATE INDEX idx_users_email ON users ((doc->>'email'));CREATE INDEX idx_users_status ON users ((doc->>'status'));CREATE INDEX idx_users_doc ON users USING gin(doc); -- Full JSONB containment indexApproach B: Normalized (Relational Columns)
Extract top-level fields into typed columns. Best for well-defined schemas where you want constraint enforcement and optimal query performance.
CREATE TABLE users ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'suspended', 'deleted')), profile JSONB DEFAULT '{}', -- Keep flexible nested data as JSONB tags TEXT[] DEFAULT '{}', -- Arrays for simple lists created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW());
CREATE INDEX idx_users_email ON users (email);CREATE INDEX idx_users_status ON users (status);CREATE INDEX idx_users_profile ON users USING gin(profile);Approach C: Hybrid
Use typed columns for frequently queried/filtered fields and JSONB for the rest.
CREATE TABLE orders ( id SERIAL PRIMARY KEY, user_id INT4 NOT NULL REFERENCES users(id), status TEXT NOT NULL DEFAULT 'pending', total NUMERIC(12,2) NOT NULL, items JSONB NOT NULL, -- Array of order items as JSONB metadata JSONB DEFAULT '{}', -- Flexible metadata created_at TIMESTAMPTZ DEFAULT NOW());Step 2: Export from MongoDB
# Export each collection as JSON (one document per line)mongoexport \ --uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \ --collection=users \ --out=users.json \ --jsonArray
mongoexport \ --uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \ --collection=orders \ --out=orders.json \ --jsonArray
mongoexport \ --uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \ --collection=products \ --out=products.json \ --jsonArrayStep 3: Transform and Import
Python import script (document-preserving approach):
#!/usr/bin/env python3"""Import MongoDB JSON exports into HeliosDB-Lite as JSONB documents."""
import jsonimport psycopg2import psycopg2.extrasfrom pathlib import Path
BATCH_SIZE = 2000
def create_table_for_collection(cursor, collection_name: str): """Create a table with a JSONB document column.""" cursor.execute(f""" CREATE TABLE IF NOT EXISTS {collection_name} ( id SERIAL PRIMARY KEY, mongo_id TEXT UNIQUE, doc JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ) """)
def import_collection(conn, collection_name: str, json_path: str): """Import a MongoDB collection from a JSON export file.""" cursor = conn.cursor()
create_table_for_collection(cursor, collection_name) conn.commit()
with open(json_path, 'r', encoding='utf-8') as f: documents = json.load(f)
print(f" Importing {collection_name}: {len(documents)} documents")
batch = [] imported = 0
for doc in documents: # Extract _id and remove from document body mongo_id = str(doc.pop('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.pop('_id', '')) batch.append((mongo_id, json.dumps(doc)))
if len(batch) >= BATCH_SIZE: psycopg2.extras.execute_batch( cursor, f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)", batch ) conn.commit() imported += len(batch) batch = [] print(f" {imported}/{len(documents)} documents imported")
if batch: psycopg2.extras.execute_batch( cursor, f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)", batch ) conn.commit() imported += len(batch)
print(f" {imported}/{len(documents)} documents imported (complete)") return imported
def main(): conn = psycopg2.connect( host="127.0.0.1", port=5432, user="app", dbname="myapp" )
collections = { 'users': 'users.json', 'orders': 'orders.json', 'products': 'products.json', }
total = 0 for name, path in collections.items(): if Path(path).exists(): count = import_collection(conn, name, path) total += count else: print(f" Skipping {name}: {path} not found")
print(f"\nImported {total} total documents across {len(collections)} collections") conn.close()
if __name__ == "__main__": main()Python import script (normalized approach):
#!/usr/bin/env python3"""Import MongoDB users into a normalized HeliosDB-Lite schema."""
import jsonimport psycopg2import psycopg2.extras
def import_users_normalized(conn, json_path: str): """Import users with typed columns + JSONB for flexible fields.""" cursor = conn.cursor()
cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, mongo_id TEXT UNIQUE, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, status TEXT NOT NULL DEFAULT 'active', profile JSONB DEFAULT '{}', tags TEXT[] DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW() ) """) conn.commit()
with open(json_path, 'r') as f: documents = json.load(f)
batch = [] for doc in documents: mongo_id = str(doc.get('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.get('_id', '')) batch.append({ 'mongo_id': mongo_id, 'name': doc.get('name', ''), 'email': doc.get('email', ''), 'status': doc.get('status', 'active'), 'profile': json.dumps({ k: v for k, v in doc.items() if k not in ('_id', 'name', 'email', 'status', 'tags', 'createdAt') }), 'tags': doc.get('tags', []), })
psycopg2.extras.execute_batch( cursor, """INSERT INTO users (mongo_id, name, email, status, profile, tags) VALUES (%(mongo_id)s, %(name)s, %(email)s, %(status)s, %(profile)s::jsonb, %(tags)s::text[])""", batch ) conn.commit() print(f"Imported {len(batch)} users (normalized)")
if __name__ == "__main__": conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp") import_users_normalized(conn, 'users.json') conn.close()Step 4: Create Indexes
-- JSONB containment index (equivalent to MongoDB compound indexes on nested fields)CREATE INDEX idx_users_doc ON users USING gin(doc);
-- Expression indexes on specific JSONB paths (equivalent to MongoDB field indexes)CREATE INDEX idx_users_email ON users ((doc->>'email'));CREATE INDEX idx_users_status ON users ((doc->>'status'));
-- Full-text search index (replaces MongoDB text index)CREATE INDEX idx_products_fts ON products USING gin(to_tsvector('english', doc->>'name' || ' ' || doc->>'description'));
-- Vector search index (replaces Atlas Vector Search)-- Requires embedding columnALTER TABLE products ADD COLUMN embedding VECTOR(1536);CREATE INDEX idx_products_vector ON products USING hnsw(embedding);Step 5: Update Application Code
See the Query Translation Examples and Common Patterns sections below.
Data Type Mapping
| MongoDB / BSON Type | HeliosDB-Lite Type | Storage Approach |
|---|---|---|
String | TEXT | Direct column or doc->>'field' |
Int32 | INT4 | Direct column or (doc->>'field')::INT4 |
Int64 | INT8 | Direct column or (doc->>'field')::INT8 |
Double | FLOAT8 | Direct column or (doc->>'field')::FLOAT8 |
Decimal128 | NUMERIC | Direct column |
Boolean | BOOLEAN | Direct column or (doc->>'field')::BOOLEAN |
Date | TIMESTAMPTZ | Direct column |
ObjectId | TEXT | Store as hex string |
Array | JSONB array or ARRAY type | Depends on element type |
Embedded Document | JSONB or separate table with FK | Design decision |
Binary | BYTEA | Direct column |
Null | SQL NULL | Direct mapping |
RegExp | TEXT (pattern) | Use ~ operator for matching |
UUID | TEXT | Store as string representation |
Query Translation Examples
find() —> SELECT
// MongoDBdb.users.find({ status: "active", age: { $gte: 18 } })-- HeliosDB-Lite (JSONB approach)SELECT * FROM users WHERE doc->>'status' = 'active' AND (doc->>'age')::INT4 >= 18;
-- HeliosDB-Lite (normalized approach)SELECT * FROM users WHERE status = 'active' AND age >= 18;find() with Projection —> SELECT columns
// MongoDBdb.users.find({ status: "active" }, { name: 1, email: 1, _id: 0 })-- HeliosDB-Lite (JSONB)SELECT doc->>'name' AS name, doc->>'email' AS emailFROM usersWHERE doc->>'status' = 'active';
-- HeliosDB-Lite (normalized)SELECT name, email FROM users WHERE status = 'active';insertOne() —> INSERT
// MongoDBdb.users.insertOne({ name: "Alice", email: "alice@example.com", profile: { bio: "Engineer", location: "NYC" }, tags: ["rust", "databases"]})-- HeliosDB-Lite (JSONB)INSERT INTO users (doc) VALUES ( '{"name": "Alice", "email": "alice@example.com", "profile": {"bio": "Engineer", "location": "NYC"}, "tags": ["rust", "databases"]}'::jsonb) RETURNING id;
-- HeliosDB-Lite (normalized)INSERT INTO users (name, email, profile, tags) VALUES ( 'Alice', 'alice@example.com', '{"bio": "Engineer", "location": "NYC"}'::jsonb, ARRAY['rust', 'databases']) RETURNING id;updateOne() —> UPDATE
// MongoDBdb.users.updateOne( { email: "alice@example.com" }, { $set: { "profile.location": "SF" }, $inc: { login_count: 1 } })-- HeliosDB-Lite (JSONB)UPDATE usersSET doc = jsonb_set( jsonb_set(doc, '{profile,location}', '"SF"'), '{login_count}', to_jsonb(COALESCE((doc->>'login_count')::INT4, 0) + 1))WHERE doc->>'email' = 'alice@example.com';
-- HeliosDB-Lite (normalized with JSONB profile)UPDATE usersSET profile = jsonb_set(profile, '{location}', '"SF"'), login_count = login_count + 1WHERE email = 'alice@example.com';deleteMany() —> DELETE
// MongoDBdb.sessions.deleteMany({ expires_at: { $lt: new Date() } })-- HeliosDB-LiteDELETE FROM sessions WHERE expires_at < NOW();aggregate() with $group —> GROUP BY
// MongoDBdb.orders.aggregate([ { $match: { status: "completed" } }, { $group: { _id: "$user_id", total_spent: { $sum: "$amount" }, order_count: { $sum: 1 }, avg_order: { $avg: "$amount" }, last_order: { $max: "$created_at" } }}, { $sort: { total_spent: -1 } }, { $limit: 10 }])-- HeliosDB-LiteSELECT user_id, SUM(amount) AS total_spent, COUNT(*) AS order_count, AVG(amount) AS avg_order, MAX(created_at) AS last_orderFROM ordersWHERE status = 'completed'GROUP BY user_idORDER BY total_spent DESCLIMIT 10;aggregate() with $lookup —> JOIN
// MongoDBdb.orders.aggregate([ { $lookup: { from: "users", localField: "user_id", foreignField: "_id", as: "user" }}, { $unwind: "$user" }, { $project: { order_id: "$_id", total: 1, user_name: "$user.name", user_email: "$user.email" }}])-- HeliosDB-Lite (dramatically simpler and faster)SELECT o.id AS order_id, o.total, u.name AS user_name, u.email AS user_emailFROM orders oJOIN users u ON o.user_id = u.id;aggregate() with $unwind —> jsonb_array_elements
// MongoDBdb.orders.aggregate([ { $unwind: "$items" }, { $group: { _id: "$items.product_id", total_quantity: { $sum: "$items.quantity" }, total_revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } } }}])-- HeliosDB-LiteSELECT item->>'product_id' AS product_id, SUM((item->>'quantity')::INT4) AS total_quantity, SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS total_revenueFROM orders, jsonb_array_elements(items) AS itemGROUP BY item->>'product_id';Nested Document Queries
// MongoDBdb.users.find({ "address.city": "New York", "address.state": "NY" })-- HeliosDB-Lite (JSONB path extraction)SELECT * FROM usersWHERE doc->'address'->>'city' = 'New York' AND doc->'address'->>'state' = 'NY';
-- HeliosDB-Lite (containment operator -- uses GIN index)SELECT * FROM usersWHERE doc @> '{"address": {"city": "New York", "state": "NY"}}';Array Queries
// MongoDB: Find documents where tags array contains "rust"db.users.find({ tags: "rust" })
// MongoDB: Find documents where tags contains all of thesedb.users.find({ tags: { $all: ["rust", "databases"] } })
// MongoDB: Array sizedb.users.find({ tags: { $size: 3 } })-- HeliosDB-Lite (JSONB array)SELECT * FROM users WHERE doc->'tags' ? 'rust';SELECT * FROM users WHERE doc->'tags' @> '["rust", "databases"]';SELECT * FROM users WHERE jsonb_array_length(doc->'tags') = 3;
-- HeliosDB-Lite (ARRAY type, normalized)SELECT * FROM users WHERE 'rust' = ANY(tags);SELECT * FROM users WHERE tags @> ARRAY['rust', 'databases'];SELECT * FROM users WHERE array_length(tags, 1) = 3;JSONB Operators Quick Reference
| Operator | Description | Example |
|---|---|---|
-> | Get JSON object field (as JSON) | doc->'address' |
->> | Get JSON object field (as text) | doc->>'name' returns 'Alice' |
-> (int) | Get JSON array element (as JSON) | doc->'tags'->0 |
->> (int) | Get JSON array element (as text) | doc->'tags'->>0 returns 'rust' |
@> | Contains (left contains right) | doc @> '{"status":"active"}' |
<@ | Contained by (left is contained by right) | '{"status":"active"}' <@ doc |
? | Key exists | doc ? 'email' |
?| | Any key exists | doc ?| array[‘email’,‘phone’] |
?& | All keys exist | doc ?& array['email','phone'] |
|| | Concatenate JSONB | doc || ’{“new_field”: true}‘ |
- | Delete key | doc - 'temporary_field' |
#- | Delete at path | doc #- '{address,zip}' |
Useful JSONB Functions
-- Extract all keys from a JSONB objectSELECT jsonb_object_keys(doc) FROM users WHERE id = 1;
-- Expand a JSONB array into rowsSELECT id, elem->>'name' AS item_nameFROM orders, jsonb_array_elements(items) AS elem;
-- Build JSONB from key-value pairsSELECT jsonb_build_object('name', u.name, 'email', u.email) AS user_jsonFROM users u;
-- Pretty-print JSONBSELECT jsonb_pretty(doc) FROM users WHERE id = 1;
-- Set a nested valueUPDATE users SET doc = jsonb_set(doc, '{profile,verified}', 'true') WHERE id = 1;
-- Check JSON typeSELECT jsonb_typeof(doc->'tags') FROM users WHERE id = 1; -- returns 'array'
-- Aggregate rows into a JSON arraySELECT jsonb_agg(jsonb_build_object('id', id, 'name', name)) FROM users;Common Patterns
Pattern 1: Python Application Migration
"""MongoDB to HeliosDB-Lite migration -- Python application layer.Replace pymongo calls with psycopg2 queries."""
import psycopg2import psycopg2.extrasimport json
class UserRepository: """Replaces a MongoDB-backed user repository with HeliosDB-Lite."""
def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"): self.conn = psycopg2.connect(dsn) # Register JSONB adapter for automatic serialization psycopg2.extras.register_default_jsonb(self.conn)
def find_one(self, email: str) -> dict: """Replaces: db.users.findOne({ email: email })""" cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT id, name, email, status, profile, tags FROM users WHERE email = %s", (email,) ) return cursor.fetchone()
def find_active(self, limit: int = 20, offset: int = 0) -> list: """Replaces: db.users.find({ status: 'active' }).limit(20).skip(0)""" cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( "SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC LIMIT %s OFFSET %s", (limit, offset) ) return cursor.fetchall()
def insert(self, user: dict) -> int: """Replaces: db.users.insertOne(user)""" cursor = self.conn.cursor() cursor.execute( """INSERT INTO users (name, email, status, profile, tags) VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb, %(tags)s::text[]) RETURNING id""", { 'name': user['name'], 'email': user['email'], 'status': user.get('status', 'active'), 'profile': json.dumps(user.get('profile', {})), 'tags': user.get('tags', []), } ) self.conn.commit() return cursor.fetchone()[0]
def update_profile(self, user_id: int, updates: dict) -> None: """Replaces: db.users.updateOne({ _id: id }, { $set: { profile: updates } })""" cursor = self.conn.cursor() cursor.execute( "UPDATE users SET profile = profile || %s::jsonb WHERE id = %s", (json.dumps(updates), user_id) ) self.conn.commit()
def search(self, query: str, limit: int = 10) -> list: """ Replaces: db.users.find({ $text: { $search: query } }) Uses HeliosDB-Lite FTS with ranking (not available in MongoDB without Atlas Search). """ cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( """SELECT id, name, email, ts_rank(to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', '')), to_tsquery('english', %s)) AS relevance FROM users WHERE to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', '')) @@ to_tsquery('english', %s) ORDER BY relevance DESC LIMIT %s""", (query, query, limit) ) return cursor.fetchall()
def aggregate_by_status(self) -> list: """ Replaces: db.users.aggregate([ { $group: { _id: "$status", count: { $sum: 1 } } }, { $sort: { count: -1 } } ]) """ cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute( """SELECT status, COUNT(*) AS count FROM users GROUP BY status ORDER BY count DESC""" ) return cursor.fetchall()Pattern 2: TypeScript Application Migration
import { Pool, QueryResult } from 'pg';
const pool = new Pool({ host: '127.0.0.1', port: 5432, database: 'myapp', user: 'app',});
interface User { id: number; name: string; email: string; status: string; profile: Record<string, any>; tags: string[];}
interface Order { id: number; user_id: number; status: string; total: number; items: any[];}
// Replaces: db.users.findOne({ email })async function findUserByEmail(email: string): Promise<User | null> { const result = await pool.query<User>( 'SELECT * FROM users WHERE email = $1', [email] ); return result.rows[0] || null;}
// Replaces: db.users.insertOne(user)async function createUser(user: Omit<User, 'id'>): Promise<number> { const result = await pool.query<{ id: number }>( `INSERT INTO users (name, email, status, profile, tags) VALUES ($1, $2, $3, $4::jsonb, $5::text[]) RETURNING id`, [user.name, user.email, user.status, JSON.stringify(user.profile), user.tags] ); return result.rows[0].id;}
// Replaces: db.users.updateOne({ _id: id }, { $set: { "profile.location": city } })async function updateUserLocation(userId: number, city: string): Promise<void> { await pool.query( `UPDATE users SET profile = jsonb_set(profile, '{location}', $1::jsonb) WHERE id = $2`, [JSON.stringify(city), userId] );}
// Replaces: db.orders.aggregate([{ $lookup }, { $unwind }, { $group }])async function getTopCustomers(limit: number = 10) { const result = await pool.query(` SELECT u.id, u.name, u.email, COUNT(o.id) AS order_count, SUM(o.total) AS total_spent, AVG(o.total) AS avg_order_value, MAX(o.created_at) AS last_order_date FROM users u JOIN orders o ON u.id = o.user_id WHERE o.status = 'completed' GROUP BY u.id, u.name, u.email ORDER BY total_spent DESC LIMIT $1 `, [limit]); return result.rows;}
// Replaces: db.orders.aggregate([{ $unwind: "$items" }, { $group }])async function getProductRevenue() { const result = await pool.query(` SELECT item->>'product_id' AS product_id, item->>'name' AS product_name, SUM((item->>'quantity')::INT4) AS units_sold, SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS revenue FROM orders, jsonb_array_elements(items) AS item WHERE status = 'completed' GROUP BY item->>'product_id', item->>'name' ORDER BY revenue DESC `); return result.rows;}
// HeliosDB-Lite exclusive: time-travel query (not possible in MongoDB)async function getUserStateAtDate(userId: number, asOfDate: string) { const result = await pool.query( `SELECT * FROM users AS OF $1 WHERE id = $2`, [asOfDate, userId] ); return result.rows[0] || null;}
// HeliosDB-Lite exclusive: vector similarity searchasync function findSimilarProducts(embedding: number[], limit: number = 5) { const result = await pool.query(` SELECT id, name, description, embedding <-> $1::vector AS distance FROM products ORDER BY embedding <-> $1::vector LIMIT $2 `, [JSON.stringify(embedding), limit]); return result.rows;}Pattern 3: Go Application Migration
package main
import ( "context" "encoding/json" "fmt" "log"
"github.com/jackc/pgx/v5/pgxpool")
type User struct { ID int `json:"id"` Name string `json:"name"` Email string `json:"email"` Status string `json:"status"` Profile map[string]any `json:"profile"` Tags []string `json:"tags"`}
type UserRepo struct { pool *pgxpool.Pool}
func NewUserRepo(ctx context.Context, dsn string) (*UserRepo, error) { pool, err := pgxpool.New(ctx, dsn) if err != nil { return nil, err } return &UserRepo{pool: pool}, nil}
// Replaces: collection.FindOne(ctx, bson.M{"email": email})func (r *UserRepo) FindByEmail(ctx context.Context, email string) (*User, error) { var u User var profileJSON []byte err := r.pool.QueryRow(ctx, "SELECT id, name, email, status, profile, tags FROM users WHERE email = $1", email, ).Scan(&u.ID, &u.Name, &u.Email, &u.Status, &profileJSON, &u.Tags) if err != nil { return nil, err } json.Unmarshal(profileJSON, &u.Profile) return &u, nil}
// Replaces: collection.InsertOne(ctx, doc)func (r *UserRepo) Create(ctx context.Context, u *User) (int, error) { profileJSON, _ := json.Marshal(u.Profile) var id int err := r.pool.QueryRow(ctx, `INSERT INTO users (name, email, status, profile, tags) VALUES ($1, $2, $3, $4::jsonb, $5::text[]) RETURNING id`, u.Name, u.Email, u.Status, string(profileJSON), u.Tags, ).Scan(&id) return id, err}
// Replaces: collection.Aggregate(ctx, pipeline) with $lookup + $groupfunc (r *UserRepo) GetTopCustomers(ctx context.Context, limit int) ([]map[string]any, error) { rows, err := r.pool.Query(ctx, ` SELECT u.name, u.email, COUNT(o.id) AS order_count, SUM(o.total) AS total_spent FROM users u JOIN orders o ON u.id = o.user_id WHERE o.status = 'completed' GROUP BY u.name, u.email ORDER BY total_spent DESC LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close()
var results []map[string]any for rows.Next() { var name, email string var orderCount int var totalSpent float64 rows.Scan(&name, &email, &orderCount, &totalSpent) results = append(results, map[string]any{ "name": name, "email": email, "order_count": orderCount, "total_spent": totalSpent, }) } return results, nil}
func main() { ctx := context.Background() repo, err := NewUserRepo(ctx, "postgresql://app@127.0.0.1:5432/myapp") if err != nil { log.Fatal(err) }
// Create a user (replaces MongoDB insertOne) id, err := repo.Create(ctx, &User{ Name: "Alice", Email: "alice@example.com", Status: "active", Profile: map[string]any{"bio": "Engineer", "location": "NYC"}, Tags: []string{"rust", "databases"}, }) if err != nil { log.Fatal(err) } fmt.Printf("Created user ID: %d\n", id)
// Find by email (replaces MongoDB findOne) user, err := repo.FindByEmail(ctx, "alice@example.com") if err != nil { log.Fatal(err) } fmt.Printf("Found user: %s (%s)\n", user.Name, user.Email)
// Top customers (replaces MongoDB aggregate with $lookup + $group) top, err := repo.GetTopCustomers(ctx, 5) if err != nil { log.Fatal(err) } for _, c := range top { fmt.Printf(" %s: %d orders, $%.2f spent\n", c["name"], c["order_count"], c["total_spent"]) }}Pattern 4: Migrating MongoDB Transactions
# MongoDB transactionwith client.start_session() as session: with session.start_transaction(): db.orders.insert_one({"user_id": user_id, "total": 99.99}, session=session) db.inventory.update_one( {"product_id": product_id}, {"$inc": {"stock": -1}}, session=session ) db.users.update_one( {"_id": user_id}, {"$inc": {"order_count": 1}}, session=session )# HeliosDB-Lite -- standard SQL transaction (simpler, faster, no caveats)conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")cursor = conn.cursor()
try: cursor.execute("BEGIN") cursor.execute( "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id", (user_id, 99.99) ) order_id = cursor.fetchone()[0]
cursor.execute( "UPDATE inventory SET stock = stock - 1 WHERE product_id = %s", (product_id,) )
cursor.execute( "UPDATE users SET order_count = order_count + 1 WHERE id = %s", (user_id,) )
conn.commit()except Exception: conn.rollback() raiseLimitations & Workarounds
MongoDB Features Not Directly Supported
| MongoDB Feature | Status | Workaround |
|---|---|---|
| Schema-less collections | Different model | Use JSONB column for flexible fields; typed columns for stable fields |
$graphLookup (recursive) | Different syntax | Use recursive CTEs (WITH RECURSIVE) |
| Change Streams | Different mechanism | Use triggers + notification channels |
| GridFS (large file storage) | Not supported | Store file paths in database; use object storage (S3, MinIO) for files |
| Capped collections | Not native | Use DELETE with ORDER BY ... LIMIT or partition pruning |
| Map-Reduce | Deprecated in MongoDB too | Use SQL GROUP BY, window functions, CTEs |
Geospatial queries ($near, $geoWithin) | Not native | Store as JSONB GeoJSON; compute distances in application layer |
| MongoDB Realm / Atlas App Services | Not applicable | Use application framework (Django, Express, etc.) |
$regex queries | Different syntax | Use ~ (regex match) or LIKE operator |
| Client-side field-level encryption | Different approach | HeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM) |
HeliosDB-Full Alternative: DocumentStore
For teams that prefer a MongoDB-compatible API without learning SQL, HeliosDB-Full provides a DocumentStore interface:
# HeliosDB-Full DocumentStore API (MongoDB-compatible)from heliosdb import DocumentStore
store = DocumentStore("./myapp.db")users = store.collection("users")
# MongoDB-style operationsusers.insert_one({"name": "Alice", "email": "alice@example.com"})user = users.find_one({"email": "alice@example.com"})users.update_one({"email": "alice@example.com"}, {"$set": {"status": "active"}})
# Aggregation pipelineresults = users.aggregate([ {"$match": {"status": "active"}}, {"$group": {"_id": "$department", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}])This provides a gentler migration path for teams not ready to adopt SQL fully, while still gaining embedded deployment, ACID transactions, and cost elimination.
Rollback Strategy
Pre-Migration
- Keep MongoDB Atlas running for at least 2-4 weeks after cutover
- Document all query translations for easy reversal
- Export HeliosDB-Lite data periodically during the transition
Dual-Write Approach
For critical applications, write to both databases during the transition period:
class DualWriteUserRepo: """Writes to both MongoDB and HeliosDB-Lite during migration."""
def __init__(self, mongo_client, helios_conn): self.mongo_db = mongo_client.myapp self.helios = helios_conn
def create_user(self, user: dict) -> dict: # Write to HeliosDB-Lite (primary after cutover) cursor = self.helios.cursor() cursor.execute( """INSERT INTO users (name, email, status, profile) VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb) RETURNING id""", user ) self.helios.commit() user_id = cursor.fetchone()[0]
# Write to MongoDB (secondary, for rollback safety) user['_helios_id'] = user_id self.mongo_db.users.insert_one(user)
return {**user, 'id': user_id}Rollback Procedure
- Revert application code to use MongoDB driver instead of PostgreSQL driver
- Verify MongoDB data is current (if dual-write was active, it should be)
- If dual-write was not active, export from HeliosDB-Lite and import to MongoDB:
import psycopg2import psycopg2.extrasfrom pymongo import MongoClient
helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")mongo = MongoClient("mongodb+srv://admin:pass@cluster.mongodb.net/myapp")
cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor)cursor.execute("SELECT * FROM users")
for row in cursor: mongo.myapp.users.replace_one( {"_helios_id": row["id"]}, { "name": row["name"], "email": row["email"], "status": row["status"], "profile": row["profile"], "tags": row["tags"], }, upsert=True )print("Rollback to MongoDB complete")- Decommission HeliosDB-Lite instance
- Post-mortem — document issues and plan remediation before next attempt
References
- MongoDB Manual: CRUD Operations and Aggregation Pipeline (2025)
- PostgreSQL Documentation: JSONB Types and Functions (2025)
- HeliosDB-Lite Documentation: JSONB Operators and Full-Text Search (2026)
- mongoexport Documentation: Data Export Reference (2025)
- psycopg2 Documentation: Python PostgreSQL Adapter (2025)
- node-postgres (pg): TypeScript/JavaScript PostgreSQL Client (2025)
- pgx: Go PostgreSQL Driver and Toolkit (2025)
- MongoDB Atlas Pricing: Cluster Tier Comparison (2025)