Skip to content

MongoDB to HeliosDB-Lite Migration Guide

MongoDB to HeliosDB-Lite Migration Guide

Category: Database Migration HeliosDB-Lite Version: 3.5+


Overview

MongoDB is the most popular document database, used by hundreds of thousands of applications for its flexible schema, developer-friendly API, and horizontal scalability. However, as applications mature, teams often discover the limitations of the document model: lack of true multi-document ACID transactions (prior to 4.0, and still with performance caveats), no native JOINs across collections, escalating Atlas hosting costs, and the inability to perform ad-hoc analytical queries without building a separate data pipeline.

HeliosDB-Lite offers a compelling alternative: store documents as JSONB columns in relational tables, gaining the full power of SQL (JOINs, aggregations, window functions, CTEs) while retaining document flexibility. The JSONB type provides binary-indexed JSON storage with containment operators (@>), key existence checks (?), path extraction (->, ->>), and 20+ built-in functions.

The migration path is: export collections with mongoexport (JSON), transform into SQL INSERT statements, and import via the PostgreSQL wire protocol. Applications switch from the MongoDB driver to a PostgreSQL driver, translating find/aggregate calls into SQL queries.


Why Migrate?

Architectural Advantages

CapabilityMongoDBHeliosDB-LiteAdvantage
ACID transactionsPer-document (multi-doc since 4.0, performance penalty)Full multi-row, multi-table ACIDNo performance penalty for transactions
JOINs$lookup (limited, slow on large collections)Native SQL JOINs (hash, merge, nested loop)Orders of magnitude faster for relational queries
AggregationsAggregation pipeline (complex, non-standard)SQL GROUP BY, window functions, CTEsStandard SQL; simpler to write, optimize, and maintain
Schema validationOptional JSON SchemaSQL constraints (NOT NULL, CHECK, FK, UNIQUE)Enforced at the database level
Full-text searchAtlas Search (additional cost) or text indexesNative FTS with @@ operatorNo additional service needed
Vector searchAtlas Vector Search (additional cost)Native HNSW indexes with SIMDNo additional service needed
Time-travelNoAS OF queries on any tablePoint-in-time data access
BranchingNoDatabase-level branching for dev/testIsolated environments without data copies
Embedded deploymentNo (requires mongod process)Yes (in-process, zero-config)Eliminates network latency and operational overhead

Cost Comparison

Cost FactorMongoDB Atlas (M30)HeliosDB-Lite (Embedded)Savings
Database hosting$1,800/month$0 (embedded in app)100%
Atlas Search$400/month$0 (built-in FTS)100%
Atlas Vector Search$300/month$0 (built-in HNSW)100%
Data transfer$500/month$0 (in-process)100%
Backup storage$200/month$0 (file-level backup)100%
Total annual$38,400~$1,500 (compute only)96%

Performance Gains

OperationMongoDB AtlasHeliosDB-LiteImprovement
Single document read35ms P958ms P9577% faster
Document insert52ms P9514ms P9573% faster
Aggregation (GROUP BY)180ms P9545ms P9575% faster
Cross-collection $lookup320ms P9555ms P95 (SQL JOIN)83% faster
Text search95ms P9528ms P9571% faster

Compatibility Matrix

MongoDB ConceptHeliosDB-Lite EquivalentMigration Effort
CollectionTable (with JSONB column or normalized columns)Schema design
Document ({})Row (JSONB column or individual columns)Data transform
_id (ObjectId)id SERIAL PRIMARY KEY or id TEXT PRIMARY KEYAuto-generated
Nested documentJSONB value or separate table with FKDesign decision
Array fieldJSONB array or ARRAY type columnDesign decision
find()SELECT ... WHEREQuery rewrite
insertOne() / insertMany()INSERT INTO ... VALUESQuery rewrite
updateOne() / updateMany()UPDATE ... SET ... WHEREQuery rewrite
deleteOne() / deleteMany()DELETE FROM ... WHEREQuery rewrite
aggregate() pipelineSQL GROUP BY, window functions, CTEsQuery rewrite
$matchWHERE clauseDirect mapping
$groupGROUP BY + aggregate functionsDirect mapping
$sortORDER BYDirect mapping
$projectSELECT column listDirect mapping
$limit / $skipLIMIT / OFFSETDirect mapping
$lookupJOINDirect mapping (much faster)
$unwindjsonb_array_elements() or UNNEST()Function call
$addFieldsComputed columns in SELECTDirect mapping
$countCOUNT(*)Direct mapping
Compound indexCREATE INDEX ... ON table (col1, col2)DDL statement
Text indexFTS index with @@ operatorDifferent syntax
TTL indexScheduled DELETE or partition pruningApplication logic
Change streamsTriggersDifferent mechanism
TransactionsBEGIN / COMMIT / ROLLBACKStandard SQL
Replica setWAL streaming replicationArchitecture change
ShardingHeliosDB partitioning (HASH/RANGE/LIST)Architecture change

Migration Steps

Step 1: Choose Your Schema Strategy

You have two main approaches:

Approach A: Document-Preserving (JSONB Column)

Keep the document structure intact. Best for heterogeneous documents or when you want minimal transformation.

CREATE TABLE users (
id SERIAL PRIMARY KEY,
doc JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index specific JSONB paths for query performance
CREATE INDEX idx_users_email ON users ((doc->>'email'));
CREATE INDEX idx_users_status ON users ((doc->>'status'));
CREATE INDEX idx_users_doc ON users USING gin(doc); -- Full JSONB containment index

Approach B: Normalized (Relational Columns)

Extract top-level fields into typed columns. Best for well-defined schemas where you want constraint enforcement and optimal query performance.

CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'suspended', 'deleted')),
profile JSONB DEFAULT '{}', -- Keep flexible nested data as JSONB
tags TEXT[] DEFAULT '{}', -- Arrays for simple lists
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users (email);
CREATE INDEX idx_users_status ON users (status);
CREATE INDEX idx_users_profile ON users USING gin(profile);

Approach C: Hybrid

Use typed columns for frequently queried/filtered fields and JSONB for the rest.

CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INT4 NOT NULL REFERENCES users(id),
status TEXT NOT NULL DEFAULT 'pending',
total NUMERIC(12,2) NOT NULL,
items JSONB NOT NULL, -- Array of order items as JSONB
metadata JSONB DEFAULT '{}', -- Flexible metadata
created_at TIMESTAMPTZ DEFAULT NOW()
);

Step 2: Export from MongoDB

Terminal window
# Export each collection as JSON (one document per line)
mongoexport \
--uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
--collection=users \
--out=users.json \
--jsonArray
mongoexport \
--uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
--collection=orders \
--out=orders.json \
--jsonArray
mongoexport \
--uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
--collection=products \
--out=products.json \
--jsonArray

Step 3: Transform and Import

Python import script (document-preserving approach):

#!/usr/bin/env python3
"""Import MongoDB JSON exports into HeliosDB-Lite as JSONB documents."""
import json
import psycopg2
import psycopg2.extras
from pathlib import Path
BATCH_SIZE = 2000
def create_table_for_collection(cursor, collection_name: str):
"""Create a table with a JSONB document column."""
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {collection_name} (
id SERIAL PRIMARY KEY,
mongo_id TEXT UNIQUE,
doc JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
def import_collection(conn, collection_name: str, json_path: str):
"""Import a MongoDB collection from a JSON export file."""
cursor = conn.cursor()
create_table_for_collection(cursor, collection_name)
conn.commit()
with open(json_path, 'r', encoding='utf-8') as f:
documents = json.load(f)
print(f" Importing {collection_name}: {len(documents)} documents")
batch = []
imported = 0
for doc in documents:
# Extract _id and remove from document body
mongo_id = str(doc.pop('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.pop('_id', ''))
batch.append((mongo_id, json.dumps(doc)))
if len(batch) >= BATCH_SIZE:
psycopg2.extras.execute_batch(
cursor,
f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)",
batch
)
conn.commit()
imported += len(batch)
batch = []
print(f" {imported}/{len(documents)} documents imported")
if batch:
psycopg2.extras.execute_batch(
cursor,
f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)",
batch
)
conn.commit()
imported += len(batch)
print(f" {imported}/{len(documents)} documents imported (complete)")
return imported
def main():
conn = psycopg2.connect(
host="127.0.0.1",
port=5432,
user="app",
dbname="myapp"
)
collections = {
'users': 'users.json',
'orders': 'orders.json',
'products': 'products.json',
}
total = 0
for name, path in collections.items():
if Path(path).exists():
count = import_collection(conn, name, path)
total += count
else:
print(f" Skipping {name}: {path} not found")
print(f"\nImported {total} total documents across {len(collections)} collections")
conn.close()
if __name__ == "__main__":
main()

Python import script (normalized approach):

#!/usr/bin/env python3
"""Import MongoDB users into a normalized HeliosDB-Lite schema."""
import json
import psycopg2
import psycopg2.extras
def import_users_normalized(conn, json_path: str):
"""Import users with typed columns + JSONB for flexible fields."""
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
mongo_id TEXT UNIQUE,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
profile JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
conn.commit()
with open(json_path, 'r') as f:
documents = json.load(f)
batch = []
for doc in documents:
mongo_id = str(doc.get('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.get('_id', ''))
batch.append({
'mongo_id': mongo_id,
'name': doc.get('name', ''),
'email': doc.get('email', ''),
'status': doc.get('status', 'active'),
'profile': json.dumps({
k: v for k, v in doc.items()
if k not in ('_id', 'name', 'email', 'status', 'tags', 'createdAt')
}),
'tags': doc.get('tags', []),
})
psycopg2.extras.execute_batch(
cursor,
"""INSERT INTO users (mongo_id, name, email, status, profile, tags)
VALUES (%(mongo_id)s, %(name)s, %(email)s, %(status)s,
%(profile)s::jsonb, %(tags)s::text[])""",
batch
)
conn.commit()
print(f"Imported {len(batch)} users (normalized)")
if __name__ == "__main__":
conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
import_users_normalized(conn, 'users.json')
conn.close()

Step 4: Create Indexes

-- JSONB containment index (equivalent to MongoDB compound indexes on nested fields)
CREATE INDEX idx_users_doc ON users USING gin(doc);
-- Expression indexes on specific JSONB paths (equivalent to MongoDB field indexes)
CREATE INDEX idx_users_email ON users ((doc->>'email'));
CREATE INDEX idx_users_status ON users ((doc->>'status'));
-- Full-text search index (replaces MongoDB text index)
CREATE INDEX idx_products_fts ON products
USING gin(to_tsvector('english', doc->>'name' || ' ' || doc->>'description'));
-- Vector search index (replaces Atlas Vector Search)
-- Requires embedding column
ALTER TABLE products ADD COLUMN embedding VECTOR(1536);
CREATE INDEX idx_products_vector ON products USING hnsw(embedding);

Step 5: Update Application Code

See the Query Translation Examples and Common Patterns sections below.


Data Type Mapping

MongoDB / BSON TypeHeliosDB-Lite TypeStorage Approach
StringTEXTDirect column or doc->>'field'
Int32INT4Direct column or (doc->>'field')::INT4
Int64INT8Direct column or (doc->>'field')::INT8
DoubleFLOAT8Direct column or (doc->>'field')::FLOAT8
Decimal128NUMERICDirect column
BooleanBOOLEANDirect column or (doc->>'field')::BOOLEAN
DateTIMESTAMPTZDirect column
ObjectIdTEXTStore as hex string
ArrayJSONB array or ARRAY typeDepends on element type
Embedded DocumentJSONB or separate table with FKDesign decision
BinaryBYTEADirect column
NullSQL NULLDirect mapping
RegExpTEXT (pattern)Use ~ operator for matching
UUIDTEXTStore as string representation

Query Translation Examples

find() —> SELECT

// MongoDB
db.users.find({ status: "active", age: { $gte: 18 } })
-- HeliosDB-Lite (JSONB approach)
SELECT * FROM users WHERE doc->>'status' = 'active' AND (doc->>'age')::INT4 >= 18;
-- HeliosDB-Lite (normalized approach)
SELECT * FROM users WHERE status = 'active' AND age >= 18;

find() with Projection —> SELECT columns

// MongoDB
db.users.find({ status: "active" }, { name: 1, email: 1, _id: 0 })
-- HeliosDB-Lite (JSONB)
SELECT doc->>'name' AS name, doc->>'email' AS email
FROM users
WHERE doc->>'status' = 'active';
-- HeliosDB-Lite (normalized)
SELECT name, email FROM users WHERE status = 'active';

insertOne() —> INSERT

// MongoDB
db.users.insertOne({
name: "Alice",
email: "alice@example.com",
profile: { bio: "Engineer", location: "NYC" },
tags: ["rust", "databases"]
})
-- HeliosDB-Lite (JSONB)
INSERT INTO users (doc) VALUES (
'{"name": "Alice", "email": "alice@example.com",
"profile": {"bio": "Engineer", "location": "NYC"},
"tags": ["rust", "databases"]}'::jsonb
) RETURNING id;
-- HeliosDB-Lite (normalized)
INSERT INTO users (name, email, profile, tags) VALUES (
'Alice',
'alice@example.com',
'{"bio": "Engineer", "location": "NYC"}'::jsonb,
ARRAY['rust', 'databases']
) RETURNING id;

updateOne() —> UPDATE

// MongoDB
db.users.updateOne(
{ email: "alice@example.com" },
{ $set: { "profile.location": "SF" }, $inc: { login_count: 1 } }
)
-- HeliosDB-Lite (JSONB)
UPDATE users
SET doc = jsonb_set(
jsonb_set(doc, '{profile,location}', '"SF"'),
'{login_count}',
to_jsonb(COALESCE((doc->>'login_count')::INT4, 0) + 1)
)
WHERE doc->>'email' = 'alice@example.com';
-- HeliosDB-Lite (normalized with JSONB profile)
UPDATE users
SET profile = jsonb_set(profile, '{location}', '"SF"'),
login_count = login_count + 1
WHERE email = 'alice@example.com';

deleteMany() —> DELETE

// MongoDB
db.sessions.deleteMany({ expires_at: { $lt: new Date() } })
-- HeliosDB-Lite
DELETE FROM sessions WHERE expires_at < NOW();

aggregate() with $group —> GROUP BY

// MongoDB
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: {
_id: "$user_id",
total_spent: { $sum: "$amount" },
order_count: { $sum: 1 },
avg_order: { $avg: "$amount" },
last_order: { $max: "$created_at" }
}},
{ $sort: { total_spent: -1 } },
{ $limit: 10 }
])
-- HeliosDB-Lite
SELECT
user_id,
SUM(amount) AS total_spent,
COUNT(*) AS order_count,
AVG(amount) AS avg_order,
MAX(created_at) AS last_order
FROM orders
WHERE status = 'completed'
GROUP BY user_id
ORDER BY total_spent DESC
LIMIT 10;

aggregate() with $lookup —> JOIN

// MongoDB
db.orders.aggregate([
{ $lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user"
}},
{ $unwind: "$user" },
{ $project: {
order_id: "$_id",
total: 1,
user_name: "$user.name",
user_email: "$user.email"
}}
])
-- HeliosDB-Lite (dramatically simpler and faster)
SELECT
o.id AS order_id,
o.total,
u.name AS user_name,
u.email AS user_email
FROM orders o
JOIN users u ON o.user_id = u.id;

aggregate() with $unwind —> jsonb_array_elements

// MongoDB
db.orders.aggregate([
{ $unwind: "$items" },
{ $group: {
_id: "$items.product_id",
total_quantity: { $sum: "$items.quantity" },
total_revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
}}
])
-- HeliosDB-Lite
SELECT
item->>'product_id' AS product_id,
SUM((item->>'quantity')::INT4) AS total_quantity,
SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS total_revenue
FROM orders, jsonb_array_elements(items) AS item
GROUP BY item->>'product_id';

Nested Document Queries

// MongoDB
db.users.find({ "address.city": "New York", "address.state": "NY" })
-- HeliosDB-Lite (JSONB path extraction)
SELECT * FROM users
WHERE doc->'address'->>'city' = 'New York'
AND doc->'address'->>'state' = 'NY';
-- HeliosDB-Lite (containment operator -- uses GIN index)
SELECT * FROM users
WHERE doc @> '{"address": {"city": "New York", "state": "NY"}}';

Array Queries

// MongoDB: Find documents where tags array contains "rust"
db.users.find({ tags: "rust" })
// MongoDB: Find documents where tags contains all of these
db.users.find({ tags: { $all: ["rust", "databases"] } })
// MongoDB: Array size
db.users.find({ tags: { $size: 3 } })
-- HeliosDB-Lite (JSONB array)
SELECT * FROM users WHERE doc->'tags' ? 'rust';
SELECT * FROM users WHERE doc->'tags' @> '["rust", "databases"]';
SELECT * FROM users WHERE jsonb_array_length(doc->'tags') = 3;
-- HeliosDB-Lite (ARRAY type, normalized)
SELECT * FROM users WHERE 'rust' = ANY(tags);
SELECT * FROM users WHERE tags @> ARRAY['rust', 'databases'];
SELECT * FROM users WHERE array_length(tags, 1) = 3;

JSONB Operators Quick Reference

OperatorDescriptionExample
->Get JSON object field (as JSON)doc->'address'
->>Get JSON object field (as text)doc->>'name' returns 'Alice'
-> (int)Get JSON array element (as JSON)doc->'tags'->0
->> (int)Get JSON array element (as text)doc->'tags'->>0 returns 'rust'
@>Contains (left contains right)doc @> '{"status":"active"}'
<@Contained by (left is contained by right)'{"status":"active"}' <@ doc
?Key existsdoc ? 'email'
?|Any key existsdoc ?| array[‘email’,‘phone’]
?&All keys existdoc ?& array['email','phone']
||Concatenate JSONBdoc || ’{“new_field”: true}‘
-Delete keydoc - 'temporary_field'
#-Delete at pathdoc #- '{address,zip}'

Useful JSONB Functions

-- Extract all keys from a JSONB object
SELECT jsonb_object_keys(doc) FROM users WHERE id = 1;
-- Expand a JSONB array into rows
SELECT id, elem->>'name' AS item_name
FROM orders, jsonb_array_elements(items) AS elem;
-- Build JSONB from key-value pairs
SELECT jsonb_build_object('name', u.name, 'email', u.email) AS user_json
FROM users u;
-- Pretty-print JSONB
SELECT jsonb_pretty(doc) FROM users WHERE id = 1;
-- Set a nested value
UPDATE users SET doc = jsonb_set(doc, '{profile,verified}', 'true') WHERE id = 1;
-- Check JSON type
SELECT jsonb_typeof(doc->'tags') FROM users WHERE id = 1; -- returns 'array'
-- Aggregate rows into a JSON array
SELECT jsonb_agg(jsonb_build_object('id', id, 'name', name)) FROM users;

Common Patterns

Pattern 1: Python Application Migration

"""
MongoDB to HeliosDB-Lite migration -- Python application layer.
Replace pymongo calls with psycopg2 queries.
"""
import psycopg2
import psycopg2.extras
import json
class UserRepository:
"""Replaces a MongoDB-backed user repository with HeliosDB-Lite."""
def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"):
self.conn = psycopg2.connect(dsn)
# Register JSONB adapter for automatic serialization
psycopg2.extras.register_default_jsonb(self.conn)
def find_one(self, email: str) -> dict:
"""Replaces: db.users.findOne({ email: email })"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT id, name, email, status, profile, tags FROM users WHERE email = %s",
(email,)
)
return cursor.fetchone()
def find_active(self, limit: int = 20, offset: int = 0) -> list:
"""Replaces: db.users.find({ status: 'active' }).limit(20).skip(0)"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC LIMIT %s OFFSET %s",
(limit, offset)
)
return cursor.fetchall()
def insert(self, user: dict) -> int:
"""Replaces: db.users.insertOne(user)"""
cursor = self.conn.cursor()
cursor.execute(
"""INSERT INTO users (name, email, status, profile, tags)
VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb, %(tags)s::text[])
RETURNING id""",
{
'name': user['name'],
'email': user['email'],
'status': user.get('status', 'active'),
'profile': json.dumps(user.get('profile', {})),
'tags': user.get('tags', []),
}
)
self.conn.commit()
return cursor.fetchone()[0]
def update_profile(self, user_id: int, updates: dict) -> None:
"""Replaces: db.users.updateOne({ _id: id }, { $set: { profile: updates } })"""
cursor = self.conn.cursor()
cursor.execute(
"UPDATE users SET profile = profile || %s::jsonb WHERE id = %s",
(json.dumps(updates), user_id)
)
self.conn.commit()
def search(self, query: str, limit: int = 10) -> list:
"""
Replaces: db.users.find({ $text: { $search: query } })
Uses HeliosDB-Lite FTS with ranking (not available in MongoDB without Atlas Search).
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"""SELECT id, name, email,
ts_rank(to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', '')),
to_tsquery('english', %s)) AS relevance
FROM users
WHERE to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', ''))
@@ to_tsquery('english', %s)
ORDER BY relevance DESC
LIMIT %s""",
(query, query, limit)
)
return cursor.fetchall()
def aggregate_by_status(self) -> list:
"""
Replaces:
db.users.aggregate([
{ $group: { _id: "$status", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
])
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"""SELECT status, COUNT(*) AS count
FROM users
GROUP BY status
ORDER BY count DESC"""
)
return cursor.fetchall()

Pattern 2: TypeScript Application Migration

import { Pool, QueryResult } from 'pg';
const pool = new Pool({
host: '127.0.0.1',
port: 5432,
database: 'myapp',
user: 'app',
});
interface User {
id: number;
name: string;
email: string;
status: string;
profile: Record<string, any>;
tags: string[];
}
interface Order {
id: number;
user_id: number;
status: string;
total: number;
items: any[];
}
// Replaces: db.users.findOne({ email })
async function findUserByEmail(email: string): Promise<User | null> {
const result = await pool.query<User>(
'SELECT * FROM users WHERE email = $1',
[email]
);
return result.rows[0] || null;
}
// Replaces: db.users.insertOne(user)
async function createUser(user: Omit<User, 'id'>): Promise<number> {
const result = await pool.query<{ id: number }>(
`INSERT INTO users (name, email, status, profile, tags)
VALUES ($1, $2, $3, $4::jsonb, $5::text[])
RETURNING id`,
[user.name, user.email, user.status, JSON.stringify(user.profile), user.tags]
);
return result.rows[0].id;
}
// Replaces: db.users.updateOne({ _id: id }, { $set: { "profile.location": city } })
async function updateUserLocation(userId: number, city: string): Promise<void> {
await pool.query(
`UPDATE users
SET profile = jsonb_set(profile, '{location}', $1::jsonb)
WHERE id = $2`,
[JSON.stringify(city), userId]
);
}
// Replaces: db.orders.aggregate([{ $lookup }, { $unwind }, { $group }])
async function getTopCustomers(limit: number = 10) {
const result = await pool.query(`
SELECT
u.id,
u.name,
u.email,
COUNT(o.id) AS order_count,
SUM(o.total) AS total_spent,
AVG(o.total) AS avg_order_value,
MAX(o.created_at) AS last_order_date
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
GROUP BY u.id, u.name, u.email
ORDER BY total_spent DESC
LIMIT $1
`, [limit]);
return result.rows;
}
// Replaces: db.orders.aggregate([{ $unwind: "$items" }, { $group }])
async function getProductRevenue() {
const result = await pool.query(`
SELECT
item->>'product_id' AS product_id,
item->>'name' AS product_name,
SUM((item->>'quantity')::INT4) AS units_sold,
SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS revenue
FROM orders, jsonb_array_elements(items) AS item
WHERE status = 'completed'
GROUP BY item->>'product_id', item->>'name'
ORDER BY revenue DESC
`);
return result.rows;
}
// HeliosDB-Lite exclusive: time-travel query (not possible in MongoDB)
async function getUserStateAtDate(userId: number, asOfDate: string) {
const result = await pool.query(
`SELECT * FROM users AS OF $1 WHERE id = $2`,
[asOfDate, userId]
);
return result.rows[0] || null;
}
// HeliosDB-Lite exclusive: vector similarity search
async function findSimilarProducts(embedding: number[], limit: number = 5) {
const result = await pool.query(`
SELECT id, name, description,
embedding <-> $1::vector AS distance
FROM products
ORDER BY embedding <-> $1::vector
LIMIT $2
`, [JSON.stringify(embedding), limit]);
return result.rows;
}

Pattern 3: Go Application Migration

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/jackc/pgx/v5/pgxpool"
)
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Status string `json:"status"`
Profile map[string]any `json:"profile"`
Tags []string `json:"tags"`
}
type UserRepo struct {
pool *pgxpool.Pool
}
func NewUserRepo(ctx context.Context, dsn string) (*UserRepo, error) {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, err
}
return &UserRepo{pool: pool}, nil
}
// Replaces: collection.FindOne(ctx, bson.M{"email": email})
func (r *UserRepo) FindByEmail(ctx context.Context, email string) (*User, error) {
var u User
var profileJSON []byte
err := r.pool.QueryRow(ctx,
"SELECT id, name, email, status, profile, tags FROM users WHERE email = $1",
email,
).Scan(&u.ID, &u.Name, &u.Email, &u.Status, &profileJSON, &u.Tags)
if err != nil {
return nil, err
}
json.Unmarshal(profileJSON, &u.Profile)
return &u, nil
}
// Replaces: collection.InsertOne(ctx, doc)
func (r *UserRepo) Create(ctx context.Context, u *User) (int, error) {
profileJSON, _ := json.Marshal(u.Profile)
var id int
err := r.pool.QueryRow(ctx,
`INSERT INTO users (name, email, status, profile, tags)
VALUES ($1, $2, $3, $4::jsonb, $5::text[])
RETURNING id`,
u.Name, u.Email, u.Status, string(profileJSON), u.Tags,
).Scan(&id)
return id, err
}
// Replaces: collection.Aggregate(ctx, pipeline) with $lookup + $group
func (r *UserRepo) GetTopCustomers(ctx context.Context, limit int) ([]map[string]any, error) {
rows, err := r.pool.Query(ctx, `
SELECT u.name, u.email,
COUNT(o.id) AS order_count,
SUM(o.total) AS total_spent
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
GROUP BY u.name, u.email
ORDER BY total_spent DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var name, email string
var orderCount int
var totalSpent float64
rows.Scan(&name, &email, &orderCount, &totalSpent)
results = append(results, map[string]any{
"name": name,
"email": email,
"order_count": orderCount,
"total_spent": totalSpent,
})
}
return results, nil
}
func main() {
ctx := context.Background()
repo, err := NewUserRepo(ctx, "postgresql://app@127.0.0.1:5432/myapp")
if err != nil {
log.Fatal(err)
}
// Create a user (replaces MongoDB insertOne)
id, err := repo.Create(ctx, &User{
Name: "Alice",
Email: "alice@example.com",
Status: "active",
Profile: map[string]any{"bio": "Engineer", "location": "NYC"},
Tags: []string{"rust", "databases"},
})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Created user ID: %d\n", id)
// Find by email (replaces MongoDB findOne)
user, err := repo.FindByEmail(ctx, "alice@example.com")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Found user: %s (%s)\n", user.Name, user.Email)
// Top customers (replaces MongoDB aggregate with $lookup + $group)
top, err := repo.GetTopCustomers(ctx, 5)
if err != nil {
log.Fatal(err)
}
for _, c := range top {
fmt.Printf(" %s: %d orders, $%.2f spent\n",
c["name"], c["order_count"], c["total_spent"])
}
}

Pattern 4: Migrating MongoDB Transactions

# MongoDB transaction
with client.start_session() as session:
with session.start_transaction():
db.orders.insert_one({"user_id": user_id, "total": 99.99}, session=session)
db.inventory.update_one(
{"product_id": product_id},
{"$inc": {"stock": -1}},
session=session
)
db.users.update_one(
{"_id": user_id},
{"$inc": {"order_count": 1}},
session=session
)
# HeliosDB-Lite -- standard SQL transaction (simpler, faster, no caveats)
conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
cursor = conn.cursor()
try:
cursor.execute("BEGIN")
cursor.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
(user_id, 99.99)
)
order_id = cursor.fetchone()[0]
cursor.execute(
"UPDATE inventory SET stock = stock - 1 WHERE product_id = %s",
(product_id,)
)
cursor.execute(
"UPDATE users SET order_count = order_count + 1 WHERE id = %s",
(user_id,)
)
conn.commit()
except Exception:
conn.rollback()
raise

Limitations & Workarounds

MongoDB Features Not Directly Supported

MongoDB FeatureStatusWorkaround
Schema-less collectionsDifferent modelUse JSONB column for flexible fields; typed columns for stable fields
$graphLookup (recursive)Different syntaxUse recursive CTEs (WITH RECURSIVE)
Change StreamsDifferent mechanismUse triggers + notification channels
GridFS (large file storage)Not supportedStore file paths in database; use object storage (S3, MinIO) for files
Capped collectionsNot nativeUse DELETE with ORDER BY ... LIMIT or partition pruning
Map-ReduceDeprecated in MongoDB tooUse SQL GROUP BY, window functions, CTEs
Geospatial queries ($near, $geoWithin)Not nativeStore as JSONB GeoJSON; compute distances in application layer
MongoDB Realm / Atlas App ServicesNot applicableUse application framework (Django, Express, etc.)
$regex queriesDifferent syntaxUse ~ (regex match) or LIKE operator
Client-side field-level encryptionDifferent approachHeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM)

HeliosDB-Full Alternative: DocumentStore

For teams that prefer a MongoDB-compatible API without learning SQL, HeliosDB-Full provides a DocumentStore interface:

# HeliosDB-Full DocumentStore API (MongoDB-compatible)
from heliosdb import DocumentStore
store = DocumentStore("./myapp.db")
users = store.collection("users")
# MongoDB-style operations
users.insert_one({"name": "Alice", "email": "alice@example.com"})
user = users.find_one({"email": "alice@example.com"})
users.update_one({"email": "alice@example.com"}, {"$set": {"status": "active"}})
# Aggregation pipeline
results = users.aggregate([
{"$match": {"status": "active"}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
])

This provides a gentler migration path for teams not ready to adopt SQL fully, while still gaining embedded deployment, ACID transactions, and cost elimination.


Rollback Strategy

Pre-Migration

  1. Keep MongoDB Atlas running for at least 2-4 weeks after cutover
  2. Document all query translations for easy reversal
  3. Export HeliosDB-Lite data periodically during the transition

Dual-Write Approach

For critical applications, write to both databases during the transition period:

class DualWriteUserRepo:
"""Writes to both MongoDB and HeliosDB-Lite during migration."""
def __init__(self, mongo_client, helios_conn):
self.mongo_db = mongo_client.myapp
self.helios = helios_conn
def create_user(self, user: dict) -> dict:
# Write to HeliosDB-Lite (primary after cutover)
cursor = self.helios.cursor()
cursor.execute(
"""INSERT INTO users (name, email, status, profile)
VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb)
RETURNING id""",
user
)
self.helios.commit()
user_id = cursor.fetchone()[0]
# Write to MongoDB (secondary, for rollback safety)
user['_helios_id'] = user_id
self.mongo_db.users.insert_one(user)
return {**user, 'id': user_id}

Rollback Procedure

  1. Revert application code to use MongoDB driver instead of PostgreSQL driver
  2. Verify MongoDB data is current (if dual-write was active, it should be)
  3. If dual-write was not active, export from HeliosDB-Lite and import to MongoDB:
import psycopg2
import psycopg2.extras
from pymongo import MongoClient
helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
mongo = MongoClient("mongodb+srv://admin:pass@cluster.mongodb.net/myapp")
cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM users")
for row in cursor:
mongo.myapp.users.replace_one(
{"_helios_id": row["id"]},
{
"name": row["name"],
"email": row["email"],
"status": row["status"],
"profile": row["profile"],
"tags": row["tags"],
},
upsert=True
)
print("Rollback to MongoDB complete")
  1. Decommission HeliosDB-Lite instance
  2. Post-mortem — document issues and plan remediation before next attempt

References

  1. MongoDB Manual: CRUD Operations and Aggregation Pipeline (2025)
  2. PostgreSQL Documentation: JSONB Types and Functions (2025)
  3. HeliosDB-Lite Documentation: JSONB Operators and Full-Text Search (2026)
  4. mongoexport Documentation: Data Export Reference (2025)
  5. psycopg2 Documentation: Python PostgreSQL Adapter (2025)
  6. node-postgres (pg): TypeScript/JavaScript PostgreSQL Client (2025)
  7. pgx: Go PostgreSQL Driver and Toolkit (2025)
  8. MongoDB Atlas Pricing: Cluster Tier Comparison (2025)