Overview

MongoDB is the most popular document database, used by hundreds of thousands of applications for its flexible schema, developer-friendly API, and horizontal scalability. However, as applications mature, teams often discover the limitations of the document model: lack of true multi-document ACID transactions (prior to 4.0, and still with performance caveats), no native JOINs across collections, escalating Atlas hosting costs, and the inability to perform ad-hoc analytical queries without building a separate data pipeline.

HeliosDB-Lite offers a compelling alternative: store documents as JSONB columns in relational tables, gaining the full power of SQL (JOINs, aggregations, window functions, CTEs) while retaining document flexibility. The JSONB type provides binary-indexed JSON storage with containment operators (@>), key existence checks (?), path extraction (->, ->>), and 20+ built-in functions.

The migration path is: export collections with mongoexport (JSON), transform into SQL INSERT statements, and import via the PostgreSQL wire protocol. Applications switch from the MongoDB driver to a PostgreSQL driver, translating find/aggregate calls into SQL queries.


Why Migrate?

Architectural Advantages

CapabilityMongoDBHeliosDB-LiteAdvantage
ACID transactionsPer-document (multi-doc since 4.0, performance penalty)Full multi-row, multi-table ACIDNo performance penalty for transactions
JOINs$lookup (limited, slow on large collections)Native SQL JOINs (hash, merge, nested loop)Orders of magnitude faster for relational queries
AggregationsAggregation pipeline (complex, non-standard)SQL GROUP BY, window functions, CTEsStandard SQL; simpler to write, optimize, and maintain
Schema validationOptional JSON SchemaSQL constraints (NOT NULL, CHECK, FK, UNIQUE)Enforced at the database level
Full-text searchAtlas Search (additional cost) or text indexesNative FTS with @@ operatorNo additional service needed
Vector searchAtlas Vector Search (additional cost)Native HNSW indexes with SIMDNo additional service needed
Time-travelNoAS OF queries on any tablePoint-in-time data access
BranchingNoDatabase-level branching for dev/testIsolated environments without data copies
Embedded deploymentNo (requires mongod process)Yes (in-process, zero-config)Eliminates network latency and operational overhead

Cost Comparison

Cost FactorMongoDB Atlas (M30)HeliosDB-Lite (Embedded)Savings
Database hosting$1,800/month$0 (embedded in app)100%
Atlas Search$400/month$0 (built-in FTS)100%
Atlas Vector Search$300/month$0 (built-in HNSW)100%
Data transfer$500/month$0 (in-process)100%
Backup storage$200/month$0 (file-level backup)100%
Total annual$38,400~$1,500 (compute only)96%

Performance Gains

OperationMongoDB AtlasHeliosDB-LiteImprovement
Single document read35ms P958ms P9577% faster
Document insert52ms P9514ms P9573% faster
Aggregation (GROUP BY)180ms P9545ms P9575% faster
Cross-collection $lookup320ms P9555ms P95 (SQL JOIN)83% faster
Text search95ms P9528ms P9571% faster

Compatibility Matrix

MongoDB ConceptHeliosDB-Lite EquivalentMigration Effort
CollectionTable (with JSONB column or normalized columns)Schema design
Document ({})Row (JSONB column or individual columns)Data transform
_id (ObjectId)id SERIAL PRIMARY KEY or id TEXT PRIMARY KEYAuto-generated
Nested documentJSONB value or separate table with FKDesign decision
Array fieldJSONB array or ARRAY type columnDesign decision
find()SELECT ... WHEREQuery rewrite
insertOne() / insertMany()INSERT INTO ... VALUESQuery rewrite
updateOne() / updateMany()UPDATE ... SET ... WHEREQuery rewrite
deleteOne() / deleteMany()DELETE FROM ... WHEREQuery rewrite
aggregate() pipelineSQL GROUP BY, window functions, CTEsQuery rewrite
$matchWHERE clauseDirect mapping
$groupGROUP BY + aggregate functionsDirect mapping
$sortORDER BYDirect mapping
$projectSELECT column listDirect mapping
$limit / $skipLIMIT / OFFSETDirect mapping
$lookupJOINDirect mapping (much faster)
$unwindjsonb_array_elements() or UNNEST()Function call
$addFieldsComputed columns in SELECTDirect mapping
$countCOUNT(*)Direct mapping
Compound indexCREATE INDEX ... ON table (col1, col2)DDL statement
Text indexFTS index with @@ operatorDifferent syntax
TTL indexScheduled DELETE or partition pruningApplication logic
Change streamsTriggersDifferent mechanism
TransactionsBEGIN / COMMIT / ROLLBACKStandard SQL
Replica setWAL streaming replicationArchitecture change
ShardingHeliosDB partitioning (HASH/RANGE/LIST)Architecture change

Migration Steps

Step 1: Choose Your Schema Strategy

You have two main approaches:

Approach A: Document-Preserving (JSONB Column)

Keep the document structure intact. Best for heterogeneous documents or when you want minimal transformation.

CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  doc JSONB NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Index specific JSONB paths for query performance
CREATE INDEX idx_users_email ON users ((doc->>'email'));
CREATE INDEX idx_users_status ON users ((doc->>'status'));
CREATE INDEX idx_users_doc ON users USING gin(doc);  -- Full JSONB containment index

Approach B: Normalized (Relational Columns)

Extract top-level fields into typed columns. Best for well-defined schemas where you want constraint enforcement and optimal query performance.

CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  email TEXT UNIQUE NOT NULL,
  status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'suspended', 'deleted')),
  profile JSONB DEFAULT '{}',     -- Keep flexible nested data as JSONB
  tags TEXT[] DEFAULT '{}',       -- Arrays for simple lists
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_users_email ON users (email);
CREATE INDEX idx_users_status ON users (status);
CREATE INDEX idx_users_profile ON users USING gin(profile);

Approach C: Hybrid

Use typed columns for frequently queried/filtered fields and JSONB for the rest.

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  user_id INT4 NOT NULL REFERENCES users(id),
  status TEXT NOT NULL DEFAULT 'pending',
  total NUMERIC(12,2) NOT NULL,
  items JSONB NOT NULL,          -- Array of order items as JSONB
  metadata JSONB DEFAULT '{}',   -- Flexible metadata
  created_at TIMESTAMPTZ DEFAULT NOW()
);

Step 2: Export from MongoDB

# Export each collection as JSON (one document per line)
mongoexport \
  --uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
  --collection=users \
  --out=users.json \
  --jsonArray

mongoexport \
  --uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
  --collection=orders \
  --out=orders.json \
  --jsonArray

mongoexport \
  --uri="mongodb+srv://admin:password@cluster.mongodb.net/myapp" \
  --collection=products \
  --out=products.json \
  --jsonArray

Step 3: Transform and Import

Python import script (document-preserving approach)

#!/usr/bin/env python3
"""Import MongoDB JSON exports into HeliosDB-Lite as JSONB documents."""

import json
import psycopg2
import psycopg2.extras
from pathlib import Path

BATCH_SIZE = 2000

def create_table_for_collection(cursor, collection_name: str):
    """Create a table with a JSONB document column."""
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {collection_name} (
            id SERIAL PRIMARY KEY,
            mongo_id TEXT UNIQUE,
            doc JSONB NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW()
        )
    """)

def import_collection(conn, collection_name: str, json_path: str):
    """Import a MongoDB collection from a JSON export file."""
    cursor = conn.cursor()

    create_table_for_collection(cursor, collection_name)
    conn.commit()

    with open(json_path, 'r', encoding='utf-8') as f:
        documents = json.load(f)

    print(f"  Importing {collection_name}: {len(documents)} documents")

    batch = []
    imported = 0

    for doc in documents:
        # Extract _id and remove from document body
        mongo_id = str(doc.pop('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.pop('_id', ''))
        batch.append((mongo_id, json.dumps(doc)))

        if len(batch) >= BATCH_SIZE:
            psycopg2.extras.execute_batch(
                cursor,
                f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)",
                batch
            )
            conn.commit()
            imported += len(batch)
            batch = []
            print(f"    {imported}/{len(documents)} documents imported")

    if batch:
        psycopg2.extras.execute_batch(
            cursor,
            f"INSERT INTO {collection_name} (mongo_id, doc) VALUES (%s, %s::jsonb)",
            batch
        )
        conn.commit()
        imported += len(batch)

    print(f"    {imported}/{len(documents)} documents imported (complete)")
    return imported

def main():
    conn = psycopg2.connect(
        host="127.0.0.1",
        port=5432,
        user="app",
        dbname="myapp"
    )

    collections = {
        'users': 'users.json',
        'orders': 'orders.json',
        'products': 'products.json',
    }

    total = 0
    for name, path in collections.items():
        if Path(path).exists():
            count = import_collection(conn, name, path)
            total += count
        else:
            print(f"  Skipping {name}: {path} not found")

    print(f"\nImported {total} total documents across {len(collections)} collections")
    conn.close()

if __name__ == "__main__":
    main()

Python import script (normalized approach)

#!/usr/bin/env python3
"""Import MongoDB users into a normalized HeliosDB-Lite schema."""

import json
import psycopg2
import psycopg2.extras

def import_users_normalized(conn, json_path: str):
    """Import users with typed columns + JSONB for flexible fields."""
    cursor = conn.cursor()

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            mongo_id TEXT UNIQUE,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            status TEXT NOT NULL DEFAULT 'active',
            profile JSONB DEFAULT '{}',
            tags TEXT[] DEFAULT '{}',
            created_at TIMESTAMPTZ DEFAULT NOW()
        )
    """)
    conn.commit()

    with open(json_path, 'r') as f:
        documents = json.load(f)

    batch = []
    for doc in documents:
        mongo_id = str(doc.get('_id', {}).get('$oid', '')) if isinstance(doc.get('_id'), dict) else str(doc.get('_id', ''))
        batch.append({
            'mongo_id': mongo_id,
            'name': doc.get('name', ''),
            'email': doc.get('email', ''),
            'status': doc.get('status', 'active'),
            'profile': json.dumps({
                k: v for k, v in doc.items()
                if k not in ('_id', 'name', 'email', 'status', 'tags', 'createdAt')
            }),
            'tags': doc.get('tags', []),
        })

    psycopg2.extras.execute_batch(
        cursor,
        """INSERT INTO users (mongo_id, name, email, status, profile, tags)
           VALUES (%(mongo_id)s, %(name)s, %(email)s, %(status)s,
                   %(profile)s::jsonb, %(tags)s::text[])""",
        batch
    )
    conn.commit()
    print(f"Imported {len(batch)} users (normalized)")

if __name__ == "__main__":
    conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
    import_users_normalized(conn, 'users.json')
    conn.close()

Step 4: Create Indexes

-- JSONB containment index (equivalent to MongoDB compound indexes on nested fields)
CREATE INDEX idx_users_doc ON users USING gin(doc);

-- Expression indexes on specific JSONB paths (equivalent to MongoDB field indexes)
CREATE INDEX idx_users_email ON users ((doc->>'email'));
CREATE INDEX idx_users_status ON users ((doc->>'status'));

-- Full-text search index (replaces MongoDB text index)
CREATE INDEX idx_products_fts ON products
  USING gin(to_tsvector('english', doc->>'name' || ' ' || doc->>'description'));

-- Vector search index (replaces Atlas Vector Search)
-- Requires embedding column
ALTER TABLE products ADD COLUMN embedding VECTOR(1536);
CREATE INDEX idx_products_vector ON products USING hnsw(embedding);

Step 5: Update Application Code

See the Query Translation Examples and Common Patterns sections below.


Data Type Mapping

MongoDB / BSON TypeHeliosDB-Lite TypeStorage Approach
StringTEXTDirect column or doc->>'field'
Int32INT4Direct column or (doc->>'field')::INT4
Int64INT8Direct column or (doc->>'field')::INT8
DoubleFLOAT8Direct column or (doc->>'field')::FLOAT8
Decimal128NUMERICDirect column
BooleanBOOLEANDirect column or (doc->>'field')::BOOLEAN
DateTIMESTAMPTZDirect column
ObjectIdTEXTStore as hex string
ArrayJSONB array or ARRAY typeDepends on element type
Embedded DocumentJSONB or separate table with FKDesign decision
BinaryBYTEADirect column
NullSQL NULLDirect mapping
RegExpTEXT (pattern)Use ~ operator for matching
UUIDTEXTStore as string representation

Query Translation Examples

find() --> SELECT

// MongoDB
db.users.find({ status: "active", age: { $gte: 18 } })
-- HeliosDB-Lite (JSONB approach)
SELECT * FROM users WHERE doc->>'status' = 'active' AND (doc->>'age')::INT4 >= 18;

-- HeliosDB-Lite (normalized approach)
SELECT * FROM users WHERE status = 'active' AND age >= 18;

find() with Projection --> SELECT columns

// MongoDB
db.users.find({ status: "active" }, { name: 1, email: 1, _id: 0 })
-- HeliosDB-Lite (JSONB)
SELECT doc->>'name' AS name, doc->>'email' AS email
FROM users
WHERE doc->>'status' = 'active';

-- HeliosDB-Lite (normalized)
SELECT name, email FROM users WHERE status = 'active';

insertOne() --> INSERT

// MongoDB
db.users.insertOne({
  name: "Alice",
  email: "alice@example.com",
  profile: { bio: "Engineer", location: "NYC" },
  tags: ["rust", "databases"]
})
-- HeliosDB-Lite (JSONB)
INSERT INTO users (doc) VALUES (
  '{"name": "Alice", "email": "alice@example.com",
    "profile": {"bio": "Engineer", "location": "NYC"},
    "tags": ["rust", "databases"]}'::jsonb
) RETURNING id;

-- HeliosDB-Lite (normalized)
INSERT INTO users (name, email, profile, tags) VALUES (
  'Alice',
  'alice@example.com',
  '{"bio": "Engineer", "location": "NYC"}'::jsonb,
  ARRAY['rust', 'databases']
) RETURNING id;

updateOne() --> UPDATE

// MongoDB
db.users.updateOne(
  { email: "alice@example.com" },
  { $set: { "profile.location": "SF" }, $inc: { login_count: 1 } }
)
-- HeliosDB-Lite (JSONB)
UPDATE users
SET doc = jsonb_set(
  jsonb_set(doc, '{profile,location}', '"SF"'),
  '{login_count}',
  to_jsonb(COALESCE((doc->>'login_count')::INT4, 0) + 1)
)
WHERE doc->>'email' = 'alice@example.com';

-- HeliosDB-Lite (normalized with JSONB profile)
UPDATE users
SET profile = jsonb_set(profile, '{location}', '"SF"'),
    login_count = login_count + 1
WHERE email = 'alice@example.com';

deleteMany() --> DELETE

// MongoDB
db.sessions.deleteMany({ expires_at: { $lt: new Date() } })
-- HeliosDB-Lite
DELETE FROM sessions WHERE expires_at < NOW();

aggregate() with $group --> GROUP BY

// MongoDB
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $group: {
      _id: "$user_id",
      total_spent: { $sum: "$amount" },
      order_count: { $sum: 1 },
      avg_order: { $avg: "$amount" },
      last_order: { $max: "$created_at" }
  }},
  { $sort: { total_spent: -1 } },
  { $limit: 10 }
])
-- HeliosDB-Lite
SELECT
  user_id,
  SUM(amount) AS total_spent,
  COUNT(*) AS order_count,
  AVG(amount) AS avg_order,
  MAX(created_at) AS last_order
FROM orders
WHERE status = 'completed'
GROUP BY user_id
ORDER BY total_spent DESC
LIMIT 10;

aggregate() with $lookup --> JOIN

// MongoDB
db.orders.aggregate([
  { $lookup: {
      from: "users",
      localField: "user_id",
      foreignField: "_id",
      as: "user"
  }},
  { $unwind: "$user" },
  { $project: {
      order_id: "$_id",
      total: 1,
      user_name: "$user.name",
      user_email: "$user.email"
  }}
])
-- HeliosDB-Lite (dramatically simpler and faster)
SELECT
  o.id AS order_id,
  o.total,
  u.name AS user_name,
  u.email AS user_email
FROM orders o
JOIN users u ON o.user_id = u.id;

aggregate() with $unwind --> jsonb_array_elements

// MongoDB
db.orders.aggregate([
  { $unwind: "$items" },
  { $group: {
      _id: "$items.product_id",
      total_quantity: { $sum: "$items.quantity" },
      total_revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
  }}
])
-- HeliosDB-Lite
SELECT
  item->>'product_id' AS product_id,
  SUM((item->>'quantity')::INT4) AS total_quantity,
  SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS total_revenue
FROM orders, jsonb_array_elements(items) AS item
GROUP BY item->>'product_id';

Nested Document Queries

// MongoDB
db.users.find({ "address.city": "New York", "address.state": "NY" })
-- HeliosDB-Lite (JSONB path extraction)
SELECT * FROM users
WHERE doc->'address'->>'city' = 'New York'
  AND doc->'address'->>'state' = 'NY';

-- HeliosDB-Lite (containment operator -- uses GIN index)
SELECT * FROM users
WHERE doc @> '{"address": {"city": "New York", "state": "NY"}}';

Array Queries

// MongoDB: Find documents where tags array contains "rust"
db.users.find({ tags: "rust" })

// MongoDB: Find documents where tags contains all of these
db.users.find({ tags: { $all: ["rust", "databases"] } })

// MongoDB: Array size
db.users.find({ tags: { $size: 3 } })
-- HeliosDB-Lite (JSONB array)
SELECT * FROM users WHERE doc->'tags' ? 'rust';
SELECT * FROM users WHERE doc->'tags' @> '["rust", "databases"]';
SELECT * FROM users WHERE jsonb_array_length(doc->'tags') = 3;

-- HeliosDB-Lite (ARRAY type, normalized)
SELECT * FROM users WHERE 'rust' = ANY(tags);
SELECT * FROM users WHERE tags @> ARRAY['rust', 'databases'];
SELECT * FROM users WHERE array_length(tags, 1) = 3;

JSONB Operators Quick Reference

OperatorDescriptionExample
->Get JSON object field (as JSON)doc->'address'
->>Get JSON object field (as text)doc->>'name' returns 'Alice'
-> (int)Get JSON array element (as JSON)doc->'tags'->0
->> (int)Get JSON array element (as text)doc->'tags'->>0 returns 'rust'
@>Contains (left contains right)doc @> '{"status":"active"}'
<@Contained by (left is contained by right)'{"status":"active"}' <@ doc
?Key existsdoc ? 'email'
?|Any key existsdoc ?| array['email','phone']
?&All keys existdoc ?& array['email','phone']
||Concatenate JSONBdoc || '{"new_field": true}'
-Delete keydoc - 'temporary_field'
#-Delete at pathdoc #- '{address,zip}'

Useful JSONB Functions

-- Extract all keys from a JSONB object
SELECT jsonb_object_keys(doc) FROM users WHERE id = 1;

-- Expand a JSONB array into rows
SELECT id, elem->>'name' AS item_name
FROM orders, jsonb_array_elements(items) AS elem;

-- Build JSONB from key-value pairs
SELECT jsonb_build_object('name', u.name, 'email', u.email) AS user_json
FROM users u;

-- Pretty-print JSONB
SELECT jsonb_pretty(doc) FROM users WHERE id = 1;

-- Set a nested value
UPDATE users SET doc = jsonb_set(doc, '{profile,verified}', 'true') WHERE id = 1;

-- Check JSON type
SELECT jsonb_typeof(doc->'tags') FROM users WHERE id = 1;  -- returns 'array'

-- Aggregate rows into a JSON array
SELECT jsonb_agg(jsonb_build_object('id', id, 'name', name)) FROM users;

Common Patterns

Pattern 1: Python Application Migration

"""
MongoDB to HeliosDB-Lite migration -- Python application layer.
Replace pymongo calls with psycopg2 queries.
"""

import psycopg2
import psycopg2.extras
import json

class UserRepository:
    """Replaces a MongoDB-backed user repository with HeliosDB-Lite."""

    def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"):
        self.conn = psycopg2.connect(dsn)
        psycopg2.extras.register_default_jsonb(self.conn)

    def find_one(self, email: str) -> dict:
        """Replaces: db.users.findOne({ email: email })"""
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(
            "SELECT id, name, email, status, profile, tags FROM users WHERE email = %s",
            (email,)
        )
        return cursor.fetchone()

    def find_active(self, limit: int = 20, offset: int = 0) -> list:
        """Replaces: db.users.find({ status: 'active' }).limit(20).skip(0)"""
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(
            "SELECT * FROM users WHERE status = 'active' ORDER BY created_at DESC LIMIT %s OFFSET %s",
            (limit, offset)
        )
        return cursor.fetchall()

    def insert(self, user: dict) -> int:
        """Replaces: db.users.insertOne(user)"""
        cursor = self.conn.cursor()
        cursor.execute(
            """INSERT INTO users (name, email, status, profile, tags)
               VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb, %(tags)s::text[])
               RETURNING id""",
            {
                'name': user['name'],
                'email': user['email'],
                'status': user.get('status', 'active'),
                'profile': json.dumps(user.get('profile', {})),
                'tags': user.get('tags', []),
            }
        )
        self.conn.commit()
        return cursor.fetchone()[0]

    def update_profile(self, user_id: int, updates: dict) -> None:
        """Replaces: db.users.updateOne({ _id: id }, { $set: { profile: updates } })"""
        cursor = self.conn.cursor()
        cursor.execute(
            "UPDATE users SET profile = profile || %s::jsonb WHERE id = %s",
            (json.dumps(updates), user_id)
        )
        self.conn.commit()

    def search(self, query: str, limit: int = 10) -> list:
        """
        Replaces: db.users.find({ $text: { $search: query } })
        Uses HeliosDB-Lite FTS with ranking (not available in MongoDB without Atlas Search).
        """
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(
            """SELECT id, name, email,
                      ts_rank(to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', '')),
                              to_tsquery('english', %s)) AS relevance
               FROM users
               WHERE to_tsvector('english', name || ' ' || COALESCE(profile->>'bio', ''))
                     @@ to_tsquery('english', %s)
               ORDER BY relevance DESC
               LIMIT %s""",
            (query, query, limit)
        )
        return cursor.fetchall()

    def aggregate_by_status(self) -> list:
        """
        Replaces:
        db.users.aggregate([
            { $group: { _id: "$status", count: { $sum: 1 } } },
            { $sort: { count: -1 } }
        ])
        """
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(
            """SELECT status, COUNT(*) AS count
               FROM users
               GROUP BY status
               ORDER BY count DESC"""
        )
        return cursor.fetchall()

Pattern 2: TypeScript Application Migration

import { Pool, QueryResult } from 'pg';

const pool = new Pool({
  host: '127.0.0.1',
  port: 5432,
  database: 'myapp',
  user: 'app',
});

// Replaces: db.users.findOne({ email })
async function findUserByEmail(email: string): Promise<User | null> {
  const result = await pool.query<User>(
    'SELECT * FROM users WHERE email = $1',
    [email]
  );
  return result.rows[0] || null;
}

// Replaces: db.users.insertOne(user)
async function createUser(user: Omit<User, 'id'>): Promise<number> {
  const result = await pool.query<{ id: number }>(
    `INSERT INTO users (name, email, status, profile, tags)
     VALUES ($1, $2, $3, $4::jsonb, $5::text[])
     RETURNING id`,
    [user.name, user.email, user.status, JSON.stringify(user.profile), user.tags]
  );
  return result.rows[0].id;
}

// Replaces: db.orders.aggregate([{ $lookup }, { $unwind }, { $group }])
async function getTopCustomers(limit: number = 10) {
  const result = await pool.query(`
    SELECT
      u.id,
      u.name,
      u.email,
      COUNT(o.id) AS order_count,
      SUM(o.total) AS total_spent,
      AVG(o.total) AS avg_order_value,
      MAX(o.created_at) AS last_order_date
    FROM users u
    JOIN orders o ON u.id = o.user_id
    WHERE o.status = 'completed'
    GROUP BY u.id, u.name, u.email
    ORDER BY total_spent DESC
    LIMIT $1
  `, [limit]);
  return result.rows;
}

// Replaces: db.orders.aggregate([{ $unwind: "$items" }, { $group }])
async function getProductRevenue() {
  const result = await pool.query(`
    SELECT
      item->>'product_id' AS product_id,
      item->>'name' AS product_name,
      SUM((item->>'quantity')::INT4) AS units_sold,
      SUM((item->>'quantity')::INT4 * (item->>'price')::NUMERIC) AS revenue
    FROM orders, jsonb_array_elements(items) AS item
    WHERE status = 'completed'
    GROUP BY item->>'product_id', item->>'name'
    ORDER BY revenue DESC
  `);
  return result.rows;
}

// HeliosDB-Lite exclusive: time-travel query (not possible in MongoDB)
async function getUserStateAtDate(userId: number, asOfDate: string) {
  const result = await pool.query(
    `SELECT * FROM users AS OF $1 WHERE id = $2`,
    [asOfDate, userId]
  );
  return result.rows[0] || null;
}

Pattern 3: Go Application Migration

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/jackc/pgx/v5/pgxpool"
)

type UserRepo struct {
    pool *pgxpool.Pool
}

func NewUserRepo(ctx context.Context, dsn string) (*UserRepo, error) {
    pool, err := pgxpool.New(ctx, dsn)
    if err != nil {
        return nil, err
    }
    return &UserRepo{pool: pool}, nil
}

// Replaces: collection.FindOne(ctx, bson.M{"email": email})
func (r *UserRepo) FindByEmail(ctx context.Context, email string) (*User, error) {
    var u User
    var profileJSON []byte
    err := r.pool.QueryRow(ctx,
        "SELECT id, name, email, status, profile, tags FROM users WHERE email = $1",
        email,
    ).Scan(&u.ID, &u.Name, &u.Email, &u.Status, &profileJSON, &u.Tags)
    if err != nil {
        return nil, err
    }
    json.Unmarshal(profileJSON, &u.Profile)
    return &u, nil
}

// Replaces: collection.InsertOne(ctx, doc)
func (r *UserRepo) Create(ctx context.Context, u *User) (int, error) {
    profileJSON, _ := json.Marshal(u.Profile)
    var id int
    err := r.pool.QueryRow(ctx,
        `INSERT INTO users (name, email, status, profile, tags)
         VALUES ($1, $2, $3, $4::jsonb, $5::text[])
         RETURNING id`,
        u.Name, u.Email, u.Status, string(profileJSON), u.Tags,
    ).Scan(&id)
    return id, err
}

// Replaces: collection.Aggregate(ctx, pipeline) with $lookup + $group
func (r *UserRepo) GetTopCustomers(ctx context.Context, limit int) ([]map[string]any, error) {
    rows, err := r.pool.Query(ctx, `
        SELECT u.name, u.email,
               COUNT(o.id) AS order_count,
               SUM(o.total) AS total_spent
        FROM users u
        JOIN orders o ON u.id = o.user_id
        WHERE o.status = 'completed'
        GROUP BY u.name, u.email
        ORDER BY total_spent DESC
        LIMIT $1
    `, limit)
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var results []map[string]any
    for rows.Next() {
        var name, email string
        var orderCount int
        var totalSpent float64
        rows.Scan(&name, &email, &orderCount, &totalSpent)
        results = append(results, map[string]any{
            "name":        name,
            "email":       email,
            "order_count": orderCount,
            "total_spent": totalSpent,
        })
    }
    return results, nil
}

Pattern 4: Migrating MongoDB Transactions

# MongoDB transaction
with client.start_session() as session:
    with session.start_transaction():
        db.orders.insert_one({"user_id": user_id, "total": 99.99}, session=session)
        db.inventory.update_one(
            {"product_id": product_id},
            {"$inc": {"stock": -1}},
            session=session
        )
        db.users.update_one(
            {"_id": user_id},
            {"$inc": {"order_count": 1}},
            session=session
        )
# HeliosDB-Lite -- standard SQL transaction (simpler, faster, no caveats)
conn = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
cursor = conn.cursor()

try:
    cursor.execute("BEGIN")
    cursor.execute(
        "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
        (user_id, 99.99)
    )
    order_id = cursor.fetchone()[0]

    cursor.execute(
        "UPDATE inventory SET stock = stock - 1 WHERE product_id = %s",
        (product_id,)
    )

    cursor.execute(
        "UPDATE users SET order_count = order_count + 1 WHERE id = %s",
        (user_id,)
    )

    conn.commit()
except Exception:
    conn.rollback()
    raise

Limitations & Workarounds

MongoDB Features Not Directly Supported

MongoDB FeatureStatusWorkaround
Schema-less collectionsDifferent modelUse JSONB column for flexible fields; typed columns for stable fields
$graphLookup (recursive)Different syntaxUse recursive CTEs (WITH RECURSIVE)
Change StreamsDifferent mechanismUse triggers + notification channels
GridFS (large file storage)Not supportedStore file paths in database; use object storage (S3, MinIO) for files
Capped collectionsNot nativeUse DELETE with ORDER BY ... LIMIT or partition pruning
Map-ReduceDeprecated in MongoDB tooUse SQL GROUP BY, window functions, CTEs
Geospatial queries ($near, $geoWithin)Not nativeStore as JSONB GeoJSON; compute distances in application layer
MongoDB Realm / Atlas App ServicesNot applicableUse application framework (Django, Express, etc.)
$regex queriesDifferent syntaxUse ~ (regex match) or LIKE operator
Client-side field-level encryptionDifferent approachHeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM)

HeliosDB-Full Alternative: DocumentStore

For teams that prefer a MongoDB-compatible API without learning SQL, HeliosDB-Full provides a DocumentStore interface:

# HeliosDB-Full DocumentStore API (MongoDB-compatible)
from heliosdb import DocumentStore

store = DocumentStore("./myapp.db")
users = store.collection("users")

# MongoDB-style operations
users.insert_one({"name": "Alice", "email": "alice@example.com"})
user = users.find_one({"email": "alice@example.com"})
users.update_one({"email": "alice@example.com"}, {"$set": {"status": "active"}})

# Aggregation pipeline
results = users.aggregate([
    {"$match": {"status": "active"}},
    {"$group": {"_id": "$department", "count": {"$sum": 1}}},
    {"$sort": {"count": -1}}
])

This provides a gentler migration path for teams not ready to adopt SQL fully, while still gaining embedded deployment, ACID transactions, and cost elimination.


Rollback Strategy

Pre-Migration

  1. Keep MongoDB Atlas running for at least 2-4 weeks after cutover
  2. Document all query translations for easy reversal
  3. Export HeliosDB-Lite data periodically during the transition

Dual-Write Approach

For critical applications, write to both databases during the transition period:

class DualWriteUserRepo:
    """Writes to both MongoDB and HeliosDB-Lite during migration."""

    def __init__(self, mongo_client, helios_conn):
        self.mongo_db = mongo_client.myapp
        self.helios = helios_conn

    def create_user(self, user: dict) -> dict:
        # Write to HeliosDB-Lite (primary after cutover)
        cursor = self.helios.cursor()
        cursor.execute(
            """INSERT INTO users (name, email, status, profile)
               VALUES (%(name)s, %(email)s, %(status)s, %(profile)s::jsonb)
               RETURNING id""",
            user
        )
        self.helios.commit()
        user_id = cursor.fetchone()[0]

        # Write to MongoDB (secondary, for rollback safety)
        user['_helios_id'] = user_id
        self.mongo_db.users.insert_one(user)

        return {**user, 'id': user_id}

Rollback Procedure

  1. Revert application code to use MongoDB driver instead of PostgreSQL driver
  2. Verify MongoDB data is current (if dual-write was active, it should be)
  3. If dual-write was not active, export from HeliosDB-Lite and import to MongoDB:
import psycopg2
import psycopg2.extras
from pymongo import MongoClient

helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
mongo = MongoClient("mongodb+srv://admin:pass@cluster.mongodb.net/myapp")

cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM users")

for row in cursor:
    mongo.myapp.users.replace_one(
        {"_helios_id": row["id"]},
        {
            "name": row["name"],
            "email": row["email"],
            "status": row["status"],
            "profile": row["profile"],
            "tags": row["tags"],
        },
        upsert=True
    )
print("Rollback to MongoDB complete")
  1. Decommission HeliosDB-Lite instance
  2. Post-mortem -- document issues and plan remediation before next attempt

References

  1. MongoDB Manual: CRUD Operations and Aggregation Pipeline (2025)
  2. PostgreSQL Documentation: JSONB Types and Functions (2025)
  3. HeliosDB-Lite Documentation: JSONB Operators and Full-Text Search (2026)
  4. mongoexport Documentation: Data Export Reference (2025)
  5. psycopg2 Documentation: Python PostgreSQL Adapter (2025)
  6. node-postgres (pg): TypeScript/JavaScript PostgreSQL Client (2025)
  7. pgx: Go PostgreSQL Driver and Toolkit (2025)
  8. MongoDB Atlas Pricing: Cluster Tier Comparison (2025)

Need help migrating?

Our team can help you plan and execute your migration to HeliosDB.

Contact Sales More Migration Guides