Store documents as JSONB columns in relational tables, gaining the full power of SQL while retaining document flexibility. Replace MongoDB Atlas with an embedded, zero-config deployment.
MongoDB is the most popular document database, used by hundreds of thousands of applications for its flexible schema, developer-friendly API, and horizontal scalability. However, as applications mature, teams often discover the limitations of the document model: lack of true multi-document ACID transactions (prior to 4.0, and still with performance caveats), no native JOINs across collections, escalating Atlas hosting costs, and the inability to perform ad-hoc analytical queries without building a separate data pipeline.
HeliosDB-Lite offers a compelling alternative: store documents as JSONB columns in relational tables, gaining the full power of SQL (JOINs, aggregations, window functions, CTEs) while retaining document flexibility. The JSONB type provides binary-indexed JSON storage with containment operators (@>), key existence checks (?), path extraction (->, ->>), and 20+ built-in functions.
The migration path is: export collections with mongoexport (JSON), transform into SQL INSERT statements, and import via the PostgreSQL wire protocol. Applications switch from the MongoDB driver to a PostgreSQL driver, translating find/aggregate calls into SQL queries.
| Capability | MongoDB | HeliosDB-Lite | Advantage |
|---|---|---|---|
| ACID transactions | Per-document (multi-doc since 4.0, performance penalty) | Full multi-row, multi-table ACID | No performance penalty for transactions |
| JOINs | $lookup (limited, slow on large collections) | Native SQL JOINs (hash, merge, nested loop) | Orders of magnitude faster for relational queries |
| Aggregations | Aggregation pipeline (complex, non-standard) | SQL GROUP BY, window functions, CTEs | Standard SQL; simpler to write, optimize, and maintain |
| Schema validation | Optional JSON Schema | SQL constraints (NOT NULL, CHECK, FK, UNIQUE) | Enforced at the database level |
| Full-text search | Atlas Search (additional cost) or text indexes | Native FTS with @@ operator | No additional service needed |
| Vector search | Atlas Vector Search (additional cost) | Native HNSW indexes with SIMD | No additional service needed |
| Time-travel | No | AS OF queries on any table | Point-in-time data access |
| Branching | No | Database-level branching for dev/test | Isolated environments without data copies |
| Embedded deployment | No (requires mongod process) | Yes (in-process, zero-config) | Eliminates network latency and operational overhead |
| Cost Factor | MongoDB Atlas (M30) | HeliosDB-Lite (Embedded) | Savings |
|---|---|---|---|
| Database hosting | $1,800/month | $0 (embedded in app) | 100% |
| Atlas Search | $400/month | $0 (built-in FTS) | 100% |
| Atlas Vector Search | $300/month | $0 (built-in HNSW) | 100% |
| Data transfer | $500/month | $0 (in-process) | 100% |
| Backup storage | $200/month | $0 (file-level backup) | 100% |
| Total annual | $38,400 | ~$1,500 (compute only) | 96% |
| Operation | MongoDB Atlas | HeliosDB-Lite | Improvement |
|---|---|---|---|
| Single document read | 35ms P95 | 8ms P95 | 77% faster |
| Document insert | 52ms P95 | 14ms P95 | 73% faster |
| Aggregation (GROUP BY) | 180ms P95 | 45ms P95 | 75% faster |
| Cross-collection $lookup | 320ms P95 | 55ms P95 (SQL JOIN) | 83% faster |
| Text search | 95ms P95 | 28ms P95 | 71% faster |
| MongoDB Concept | HeliosDB-Lite Equivalent | Migration Effort |
|---|---|---|
| Collection | Table (with JSONB column or normalized columns) | Schema design |
Document ({}) | Row (JSONB column or individual columns) | Data transform |
_id (ObjectId) | id SERIAL PRIMARY KEY or id TEXT PRIMARY KEY | Auto-generated |
| Nested document | JSONB value or separate table with FK | Design decision |
| Array field | JSONB array or ARRAY type column | Design decision |
find() | SELECT ... WHERE | Query rewrite |
insertOne() / insertMany() | INSERT INTO ... VALUES | Query rewrite |
updateOne() / updateMany() | UPDATE ... SET ... WHERE | Query rewrite |
deleteOne() / deleteMany() | DELETE FROM ... WHERE | Query rewrite |
aggregate() pipeline | SQL GROUP BY, window functions, CTEs | Query rewrite |
$match | WHERE clause | Direct mapping |
$group | GROUP BY + aggregate functions | Direct mapping |
$sort | ORDER BY | Direct mapping |
$project | SELECT column list | Direct mapping |
$limit / $skip | LIMIT / OFFSET | Direct mapping |
$lookup | JOIN | Direct mapping (much faster) |
$unwind | jsonb_array_elements() or UNNEST() | Function call |
$addFields | Computed columns in SELECT | Direct mapping |
$count | COUNT(*) | Direct mapping |
| Compound index | CREATE INDEX ... ON table (col1, col2) | DDL statement |
| Text index | FTS index with @@ operator | Different syntax |
| TTL index | Scheduled DELETE or partition pruning | Application logic |
| Change streams | Triggers | Different mechanism |
| Transactions | BEGIN / COMMIT / ROLLBACK | Standard SQL |
| Replica set | WAL streaming replication | Architecture change |
| Sharding | HeliosDB partitioning (HASH/RANGE/LIST) | Architecture change |
You have two main approaches:
Keep the document structure intact. Best for heterogeneous documents or when you want minimal transformation.
CREATE TABLE users (
id SERIAL PRIMARY KEY,
doc JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index specific JSONB paths for query performance
CREATE INDEX idx_users_email ON users ((doc->>'email'));
CREATE INDEX idx_users_status ON users ((doc->>'status'));
CREATE INDEX idx_users_doc ON users USING gin(doc); -- Full JSONB containment index
Extract top-level fields into typed columns. Best for well-defined schemas where you want constraint enforcement and optimal query performance.
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'suspended', 'deleted')),
profile JSONB DEFAULT '{}', -- Keep flexible nested data as JSONB
tags TEXT[] DEFAULT '{}', -- Arrays for simple lists
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users (email);
CREATE INDEX idx_users_status ON users (status);
CREATE INDEX idx_users_profile ON users USING gin(profile);
Use typed columns for frequently queried/filtered fields and JSONB for the rest.
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INT4 NOT NULL REFERENCES users(id),
status TEXT NOT NULL DEFAULT 'pending',
total NUMERIC(12,2) NOT NULL,
items JSONB NOT NULL, -- Array of order items as JSONB
metadata JSONB DEFAULT '{}', -- Flexible metadata
created_at TIMESTAMPTZ DEFAULT NOW()
);
# Export each collection as JSON (one document per line)
mongoexport \
--uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
--collection=users \
--out=users.json \
--jsonArray
mongoexport \
--uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
--collection=orders \
--out=orders.json \
--jsonArray
mongoexport \
--uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
--collection=products \
--out=products.json \
--jsonArray
#!/usr/bin/env python3
"""Import MongoDB JSON exports into HeliosDB-Lite as JSONB documents."""
import json
import psycopg2
import psycopg2.extras
from pathlib import Path
BATCH_SIZE = 2000
def create_table_for_collection(cursor, collection_name: str):
"""Create a table with a JSONB document column."""
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {collection_name} (
id SERIAL PRIMARY KEY,
mongo_id TEXT UNIQUE,
doc JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
def import_collection(conn, collection_name: str, json_path: str):
"""Import a MongoDB collection from a JSON export file."""
cursor = conn.cursor()
create_table_for_collection(cursor, collection_name)
conn.commit()
with open(json_path, 'r', encoding='utf-8') as f:
documents = json.load(f)
print(f" Importing {collection_name}: {len(documents)} documents")
batch = []
imported = 0
for doc in documents:
# Extract _id and remove from document body
mongo_id = str(doc.pop('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.pop('_id', ''))
batch.append((mongo_id, json.dumps(doc)))
if len(batch) >= BATCH_SIZE:
psycopg2.extras.execute_batch(
cursor,
f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)",
batch
)
conn.commit()
imported += len(batch)
batch = []
print(f" {imported}/{len(documents)} documents imported")
if batch:
psycopg2.extras.execute_batch(
cursor,
f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)",
batch
)
conn.commit()
imported += len(batch)
print(f" {imported}/{len(documents)} documents imported (complete)")
return imported
def main():
conn = psycopg2.connect(
host="127.0.0.1",
port=5432,
user="app",
dbname="myapp"
)
collections = {
'users': 'users.json',
'orders': 'orders.json',
'products': 'products.json',
}
total = 0
for name, path in collections.items():
if Path(path).exists():
count = import_collection(conn, name, path)
total += count
else:
print(f" Skipping {name}: {path} not found")
print(f"\nImported {total} total documents across {len(collections)} collections")
conn.close()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""Import MongoDB users into a normalized HeliosDB-Lite schema."""
import json
import psycopg2
import psycopg2.extras
def import_users_normalized(conn, json_path: str):
"""Import users with typed columns + JSONB for flexible fields."""
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
mongo_id TEXT UNIQUE,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
profile JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
)
""")
conn.commit()
with open(json_path, 'r') as f:
documents = json.load(f)
batch = []
for doc in documents:
mongo_id = str(doc.get('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.get('_id', ''))
batch.append({
'mongo_id': mongo_id,
'name': doc.get('name', ''),
'email': doc.get('email', ''),
'status': doc.get('status', 'active'),
'profile': json.dumps({
k: v for k, v in doc.items()
if k not in ('_id', 'name', 'email', 'status', 'tags', 'createdAt')
}),
'tags': doc.get('tags', []),
})
psycopg2.extras.execute_batch(
cursor,
"""INSERT INTO users (mongo_id, name, email, status, profile, tags)
VALUES (%(mongo_id)s, %(name)s, %(email)s, %(status)s,
%(profile)s::jsonb, %(tags)s::text[])""",
batch
)
conn.commit()
print(f"Imported {len(batch)} users (normalized)")
if __name__ == "__main__":
conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
import_users_normalized(conn, 'users.json')
conn.close()
-- JSONB containment index (equivalent to MongoDB compound indexes on nested fields)
CREATE INDEX idx_users_doc ON users USING gin(doc);
-- Expression indexes on specific JSONB paths (equivalent to MongoDB field indexes)
CREATE INDEX idx_users_email ON users ((doc->>'email'));
CREATE INDEX idx_users_status ON users ((doc->>'status'));
-- Full-text search index (replaces MongoDB text index)
CREATE INDEX idx_products_fts ON products
USING gin(to_tsvector('english', doc->>'name' || ' ' || doc->>'description'));
-- Vector search index (replaces Atlas Vector Search)
-- Requires embedding column
ALTER TABLE products ADD COLUMN embedding VECTOR(1536);
CREATE INDEX idx_products_vector ON products USING hnsw(embedding);
See the Query Translation Examples and Common Patterns sections below.
| MongoDB / BSON Type | HeliosDB-Lite Type | Storage Approach |
|---|---|---|
String | TEXT | Direct column or doc->>'field' |
Int32 | INT4 | Direct column or (doc->>'field')::INT4 |
Int64 | INT8 | Direct column or (doc->>'field')::INT8 |
Double | FLOAT8 | Direct column or (doc->>'field')::FLOAT8 |
Decimal128 | NUMERIC | Direct column |
Boolean | BOOLEAN | Direct column or (doc->>'field')::BOOLEAN |
Date | TIMESTAMPTZ | Direct column |
ObjectId | TEXT | Store as hex string |
Array | JSONB array or ARRAY type | Depends on element type |
Embedded Document | JSONB or separate table with FK | Design decision |
Binary | BYTEA | Direct column |
Null | SQL NULL | Direct mapping |
RegExp | TEXT (pattern) | Use ~ operator for matching |
UUID | TEXT | Store as string representation |
// MongoDB
db.users.find({ status: "active", age: { $gte: 18 } })
-- HeliosDB-Lite (JSONB approach)
SELECT * FROM users WHERE doc->>'status' = 'active' AND (doc->>'age')::INT4 >= 18;
-- HeliosDB-Lite (normalized approach)
SELECT * FROM users WHERE status = 'active' AND age >= 18;
// MongoDB
db.users.find({ status: "active" }, { name: 1, email: 1, _id: 0 })
-- HeliosDB-Lite (JSONB)
SELECT doc->>'name' AS name, doc->>'email' AS email
FROM users
WHERE doc->>'status' = 'active';
-- HeliosDB-Lite (normalized)
SELECT name, email FROM users WHERE status = 'active';
// MongoDB
db.users.insertOne({
name: "Alice",
email: "alice@example.com",
profile: { bio: "Engineer", location: "NYC" },
tags: ["rust", "databases"]
})
-- HeliosDB-Lite (JSONB)
INSERT INTO users (doc) VALUES (
'{"name": "Alice", "email": "alice@example.com",
"profile": {"bio": "Engineer", "location": "NYC"},
"tags": ["rust", "databases"]}'::jsonb
) RETURNING id;
-- HeliosDB-Lite (normalized)
INSERT INTO users (name, email, profile, tags) VALUES (
'Alice',
'alice@example.com',
'{"bio": "Engineer", "location": "NYC"}'::jsonb,
ARRAY['rust', 'databases']
) RETURNING id;
// MongoDB
db.users.updateOne(
{ email: "alice@example.com" },
{ $set: { "profile.location": "SF" }, $inc: { login_count: 1 } }
)
-- HeliosDB-Lite (JSONB)
UPDATE users
SET doc = jsonb_set(
jsonb_set(doc, '{profile,location}', '"SF"'),
'{login_count}',
to_jsonb(COALESCE((doc->>'login_count')::INT4, 0) + 1)
)
WHERE doc->>'email' = 'alice@example.com';
-- HeliosDB-Lite (normalized with JSONB profile)
UPDATE users
SET profile = jsonb_set(profile, '{location}', '"SF"'),
login_count = login_count + 1
WHERE email = 'alice@example.com';
// MongoDB
db.sessions.deleteMany({ expires_at: { $lt: new Date() } })
-- HeliosDB-Lite
DELETE FROM sessions WHERE expires_at < NOW();
// MongoDB
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: {
_id: "$user_id",
total_spent: { $sum: "$amount" },
order_count: { $sum: 1 },
avg_order: { $avg: "$amount" },
last_order: { $max: "$created_at" }
}},
{ $sort: { total_spent: -1 } },
{ $limit: 10 }
])
-- HeliosDB-Lite
SELECT
user_id,
SUM(amount) AS total_spent,
COUNT(*) AS order_count,
AVG(amount) AS avg_order,
MAX(created_at) AS last_order
FROM orders
WHERE status = 'completed'
GROUP BY user_id
ORDER BY total_spent DESC
LIMIT 10;
// MongoDB
db.orders.aggregate([
{ $lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user"
}},
{ $unwind: "$user" },
{ $project: {
order_id: "$_id",
total: 1,
user_name: "$user.name",
user_email: "$user.email"
}}
])
-- HeliosDB-Lite (dramatically simpler and faster)
SELECT
o.id AS order_id,
o.total,
u.name AS user_name,
u.email AS user_email
FROM orders o
JOIN users u ON o.user_id = u.id;
// MongoDB
db.orders.aggregate([
{ $unwind: "$items" },
{ $group: {
_id: "$items.product_id",
total_quantity: { $sum: "$items.quantity" },
total_revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
}}
])
-- HeliosDB-Lite
SELECT
item->>'product_id' AS product_id,
SUM((item->>'quantity')::INT4) AS total_quantity,
SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS total_revenue
FROM orders, jsonb_array_elements(items) AS item
GROUP BY item->>'product_id';
// MongoDB
db.users.find({ "address.city": "New York", "address.state": "NY" })
-- HeliosDB-Lite (JSONB path extraction)
SELECT * FROM users
WHERE doc->'address'->>'city' = 'New York'
AND doc->'address'->>'state' = 'NY';
-- HeliosDB-Lite (containment operator -- uses GIN index)
SELECT * FROM users
WHERE doc @> '{"address": {"city": "New York", "state": "NY"}}';
// MongoDB: Find documents where tags array contains "rust"
db.users.find({ tags: "rust" })
// MongoDB: Find documents where tags contains all of these
db.users.find({ tags: { $all: ["rust", "databases"] } })
// MongoDB: Array size
db.users.find({ tags: { $size: 3 } })
-- HeliosDB-Lite (JSONB array)
SELECT * FROM users WHERE doc->'tags' ? 'rust';
SELECT * FROM users WHERE doc->'tags' @> '["rust", "databases"]';
SELECT * FROM users WHERE jsonb_array_length(doc->'tags') = 3;
-- HeliosDB-Lite (ARRAY type, normalized)
SELECT * FROM users WHERE 'rust' = ANY(tags);
SELECT * FROM users WHERE tags @> ARRAY['rust', 'databases'];
SELECT * FROM users WHERE array_length(tags, 1) = 3;
| Operator | Description | Example |
|---|---|---|
-> | Get JSON object field (as JSON) | doc->'address' |
->> | Get JSON object field (as text) | doc->>'name' returns 'Alice' |
-> (int) | Get JSON array element (as JSON) | doc->'tags'->0 |
->> (int) | Get JSON array element (as text) | doc->'tags'->>0 returns 'rust' |
@> | Contains (left contains right) | doc @> '{"status":"active"}' |
<@ | Contained by (left is contained by right) | '{"status":"active"}' <@ doc |
? | Key exists | doc ? 'email' |
?| | Any key exists | doc ?| array['email','phone'] |
?& | All keys exist | doc ?& array['email','phone'] |
|| | Concatenate JSONB | doc || '{"new_field": true}' |
- | Delete key | doc - 'temporary_field' |
#- | Delete at path | doc #- '{address,zip}' |
-- Extract all keys from a JSONB object
SELECT jsonb_object_keys(doc) FROM users WHERE id = 1;
-- Expand a JSONB array into rows
SELECT id, elem->>'name' AS item_name
FROM orders, jsonb_array_elements(items) AS elem;
-- Build JSONB from key-value pairs
SELECT jsonb_build_object('name', u.name, 'email', u.email) AS user_json
FROM users u;
-- Pretty-print JSONB
SELECT jsonb_pretty(doc) FROM users WHERE id = 1;
-- Set a nested value
UPDATE users SET doc = jsonb_set(doc, '{profile,verified}', 'true') WHERE id = 1;
-- Check JSON type
SELECT jsonb_typeof(doc->'tags') FROM users WHERE id = 1; -- returns 'array'
-- Aggregate rows into a JSON array
SELECT jsonb_agg(jsonb_build_object('id', id, 'name', name)) FROM users;
"""
MongoDB to HeliosDB-Lite migration -- Python application layer.
Replace pymongo calls with psycopg2 queries.
"""
import psycopg2
import psycopg2.extras
import json
class UserRepository:
"""Replaces a MongoDB-backed user repository with HeliosDB-Lite."""
def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"):
self.conn = psycopg2.connect(dsn)
psycopg2.extras.register_default_jsonb(self.conn)
def find_one(self, email: str) -> dict:
"""Replaces: db.users.findOne({ email: email })"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT id, name, email, status, profile, tags FROM users WHERE email = %s",
(email,)
)
return cursor.fetchone()
def find_active(self, limit: int = 20, offset: int = 0) -> list:
"""Replaces: db.users.find({ status: 'active' }).limit(20).skip(0)"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC LIMIT %s OFFSET %s",
(limit, offset)
)
return cursor.fetchall()
def insert(self, user: dict) -> int:
"""Replaces: db.users.insertOne(user)"""
cursor = self.conn.cursor()
cursor.execute(
"""INSERT INTO users (name, email, status, profile, tags)
VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb, %(tags)s::text[])
RETURNING id""",
{
'name': user['name'],
'email': user['email'],
'status': user.get('status', 'active'),
'profile': json.dumps(user.get('profile', {})),
'tags': user.get('tags', []),
}
)
self.conn.commit()
return cursor.fetchone()[0]
def update_profile(self, user_id: int, updates: dict) -> None:
"""Replaces: db.users.updateOne({ _id: id }, { $set: { profile: updates } })"""
cursor = self.conn.cursor()
cursor.execute(
"UPDATE users SET profile = profile || %s::jsonb WHERE id = %s",
(json.dumps(updates), user_id)
)
self.conn.commit()
def search(self, query: str, limit: int = 10) -> list:
"""
Replaces: db.users.find({ $text: { $search: query } })
Uses HeliosDB-Lite FTS with ranking (not available in MongoDB without Atlas Search).
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"""SELECT id, name, email,
ts_rank(to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', '')),
to_tsquery('english', %s)) AS relevance
FROM users
WHERE to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', ''))
@@ to_tsquery('english', %s)
ORDER BY relevance DESC
LIMIT %s""",
(query, query, limit)
)
return cursor.fetchall()
def aggregate_by_status(self) -> list:
"""
Replaces:
db.users.aggregate([
{ $group: { _id: "$status", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
])
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute(
"""SELECT status, COUNT(*) AS count
FROM users
GROUP BY status
ORDER BY count DESC"""
)
return cursor.fetchall()
import { Pool, QueryResult } from 'pg';
const pool = new Pool({
host: '127.0.0.1',
port: 5432,
database: 'myapp',
user: 'app',
});
// Replaces: db.users.findOne({ email })
async function findUserByEmail(email: string): Promise<User | null> {
const result = await pool.query<User>(
'SELECT * FROM users WHERE email = $1',
[email]
);
return result.rows[0] || null;
}
// Replaces: db.users.insertOne(user)
async function createUser(user: Omit<User, 'id'>): Promise<number> {
const result = await pool.query<{ id: number }>(
`INSERT INTO users (name, email, status, profile, tags)
VALUES ($1, $2, $3, $4::jsonb, $5::text[])
RETURNING id`,
[user.name, user.email, user.status, JSON.stringify(user.profile), user.tags]
);
return result.rows[0].id;
}
// Replaces: db.orders.aggregate([{ $lookup }, { $unwind }, { $group }])
async function getTopCustomers(limit: number = 10) {
const result = await pool.query(`
SELECT
u.id,
u.name,
u.email,
COUNT(o.id) AS order_count,
SUM(o.total) AS total_spent,
AVG(o.total) AS avg_order_value,
MAX(o.created_at) AS last_order_date
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
GROUP BY u.id, u.name, u.email
ORDER BY total_spent DESC
LIMIT $1
`, [limit]);
return result.rows;
}
// Replaces: db.orders.aggregate([{ $unwind: "$items" }, { $group }])
async function getProductRevenue() {
const result = await pool.query(`
SELECT
item->>'product_id' AS product_id,
item->>'name' AS product_name,
SUM((item->>'quantity')::INT4) AS units_sold,
SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS revenue
FROM orders, jsonb_array_elements(items) AS item
WHERE status = 'completed'
GROUP BY item->>'product_id', item->>'name'
ORDER BY revenue DESC
`);
return result.rows;
}
// HeliosDB-Lite exclusive: time-travel query (not possible in MongoDB)
async function getUserStateAtDate(userId: number, asOfDate: string) {
const result = await pool.query(
`SELECT * FROM users AS OF $1 WHERE id = $2`,
[asOfDate, userId]
);
return result.rows[0] || null;
}
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/jackc/pgx/v5/pgxpool"
)
type UserRepo struct {
pool *pgxpool.Pool
}
func NewUserRepo(ctx context.Context, dsn string) (*UserRepo, error) {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, err
}
return &UserRepo{pool: pool}, nil
}
// Replaces: collection.FindOne(ctx, bson.M{"email": email})
func (r *UserRepo) FindByEmail(ctx context.Context, email string) (*User, error) {
var u User
var profileJSON []byte
err := r.pool.QueryRow(ctx,
"SELECT id, name, email, status, profile, tags FROM users WHERE email = $1",
email,
).Scan(&u.ID, &u.Name, &u.Email, &u.Status, &profileJSON, &u.Tags)
if err != nil {
return nil, err
}
json.Unmarshal(profileJSON, &u.Profile)
return &u, nil
}
// Replaces: collection.InsertOne(ctx, doc)
func (r *UserRepo) Create(ctx context.Context, u *User) (int, error) {
profileJSON, _ := json.Marshal(u.Profile)
var id int
err := r.pool.QueryRow(ctx,
`INSERT INTO users (name, email, status, profile, tags)
VALUES ($1, $2, $3, $4::jsonb, $5::text[])
RETURNING id`,
u.Name, u.Email, u.Status, string(profileJSON), u.Tags,
).Scan(&id)
return id, err
}
// Replaces: collection.Aggregate(ctx, pipeline) with $lookup + $group
func (r *UserRepo) GetTopCustomers(ctx context.Context, limit int) ([]map[string]any, error) {
rows, err := r.pool.Query(ctx, `
SELECT u.name, u.email,
COUNT(o.id) AS order_count,
SUM(o.total) AS total_spent
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status = 'completed'
GROUP BY u.name, u.email
ORDER BY total_spent DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var results []map[string]any
for rows.Next() {
var name, email string
var orderCount int
var totalSpent float64
rows.Scan(&name, &email, &orderCount, &totalSpent)
results = append(results, map[string]any{
"name": name,
"email": email,
"order_count": orderCount,
"total_spent": totalSpent,
})
}
return results, nil
}
# MongoDB transaction
with client.start_session() as session:
with session.start_transaction():
db.orders.insert_one({"user_id": user_id, "total": 99.99}, session=session)
db.inventory.update_one(
{"product_id": product_id},
{"$inc": {"stock": -1}},
session=session
)
db.users.update_one(
{"_id": user_id},
{"$inc": {"order_count": 1}},
session=session
)
# HeliosDB-Lite -- standard SQL transaction (simpler, faster, no caveats)
conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
cursor = conn.cursor()
try:
cursor.execute("BEGIN")
cursor.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
(user_id, 99.99)
)
order_id = cursor.fetchone()[0]
cursor.execute(
"UPDATE inventory SET stock = stock - 1 WHERE product_id = %s",
(product_id,)
)
cursor.execute(
"UPDATE users SET order_count = order_count + 1 WHERE id = %s",
(user_id,)
)
conn.commit()
except Exception:
conn.rollback()
raise
| MongoDB Feature | Status | Workaround |
|---|---|---|
| Schema-less collections | Different model | Use JSONB column for flexible fields; typed columns for stable fields |
$graphLookup (recursive) | Different syntax | Use recursive CTEs (WITH RECURSIVE) |
| Change Streams | Different mechanism | Use triggers + notification channels |
| GridFS (large file storage) | Not supported | Store file paths in database; use object storage (S3, MinIO) for files |
| Capped collections | Not native | Use DELETE with ORDER BY ... LIMIT or partition pruning |
| Map-Reduce | Deprecated in MongoDB too | Use SQL GROUP BY, window functions, CTEs |
Geospatial queries ($near, $geoWithin) | Not native | Store as JSONB GeoJSON; compute distances in application layer |
| MongoDB Realm / Atlas App Services | Not applicable | Use application framework (Django, Express, etc.) |
$regex queries | Different syntax | Use ~ (regex match) or LIKE operator |
| Client-side field-level encryption | Different approach | HeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM) |
For teams that prefer a MongoDB-compatible API without learning SQL, HeliosDB-Full provides a DocumentStore interface:
# HeliosDB-Full DocumentStore API (MongoDB-compatible)
from heliosdb import DocumentStore
store = DocumentStore("./myapp.db")
users = store.collection("users")
# MongoDB-style operations
users.insert_one({"name": "Alice", "email": "alice@example.com"})
user = users.find_one({"email": "alice@example.com"})
users.update_one({"email": "alice@example.com"}, {"$set": {"status": "active"}})
# Aggregation pipeline
results = users.aggregate([
{"$match": {"status": "active"}},
{"$group": {"_id": "$department", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
])
This provides a gentler migration path for teams not ready to adopt SQL fully, while still gaining embedded deployment, ACID transactions, and cost elimination.
For critical applications, write to both databases during the transition period:
class DualWriteUserRepo:
"""Writes to both MongoDB and HeliosDB-Lite during migration."""
def __init__(self, mongo_client, helios_conn):
self.mongo_db = mongo_client.myapp
self.helios = helios_conn
def create_user(self, user: dict) -> dict:
# Write to HeliosDB-Lite (primary after cutover)
cursor = self.helios.cursor()
cursor.execute(
"""INSERT INTO users (name, email, status, profile)
VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb)
RETURNING id""",
user
)
self.helios.commit()
user_id = cursor.fetchone()[0]
# Write to MongoDB (secondary, for rollback safety)
user['_helios_id'] = user_id
self.mongo_db.users.insert_one(user)
return {**user, 'id': user_id}
import psycopg2
import psycopg2.extras
from pymongo import MongoClient
helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
mongo = MongoClient("mongodb+srv://admin:pass@cluster.mongodb.net/myapp")
cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM users")
for row in cursor:
mongo.myapp.users.replace_one(
{"_helios_id": row["id"]},
{
"name": row["name"],
"email": row["email"],
"status": row["status"],
"profile": row["profile"],
"tags": row["tags"],
},
upsert=True
)
print("Rollback to MongoDB complete")
Our team can help you plan and execute your migration to HeliosDB.