Overview

Amazon DynamoDB is a fully managed NoSQL key-value and document database that delivers single-digit millisecond performance at any scale. While DynamoDB excels at high-throughput, low-latency access patterns with predictable key-based lookups, its rigid query model, opaque pricing, and severe analytical limitations push teams toward costly workarounds: maintaining separate Elasticsearch clusters for search, building ETL pipelines to Redshift for reporting, and implementing complex single-table designs to simulate relational queries.

HeliosDB-Lite offers a fundamentally different approach: a full SQL engine with ACID transactions, JOINs, aggregations, window functions, vector search, full-text search, and time-travel -- all in an embedded, zero-config deployment. By migrating from DynamoDB, organizations eliminate capacity planning (RCU/WCU), throttling anxiety, and the constellation of auxiliary services required to fill DynamoDB's feature gaps.

The migration path is: export DynamoDB tables (via Data Pipeline, S3 export, or scan), transform the items into SQL INSERT statements, and import through the PostgreSQL wire protocol. Applications replace AWS SDK calls with standard SQL queries via any PostgreSQL client library.


Why Migrate?

Feature Comparison

DynamoDB is a key-value store with document capabilities. HeliosDB-Lite is a full relational database engine. The feature gap is significant:

CapabilityDynamoDBHeliosDB-LiteImpact
JOINsNot supportedFull SQL JOINs (INNER, LEFT, RIGHT, FULL, CROSS, LATERAL)Eliminate denormalization and data duplication
Aggregations (SUM, AVG, COUNT, MIN, MAX)Not supported (scan + application code)Native SQL aggregationsRemove application-layer computation
GROUP BYNot supportedFull GROUP BY with HAVINGInline analytics without ETL
Window functionsNot supportedROW_NUMBER, RANK, LAG, LEAD, NTILE, etc.Running totals, rankings, time-series analysis
CTEsNot supportedWITH / WITH RECURSIVESimplify complex queries, traverse hierarchies
SubqueriesNot supportedScalar, correlated, EXISTS, IN, ANY/ALLFlexible filtering and computation
Full-text searchNot supported (requires OpenSearch)Native @@ operator with rankingEliminate OpenSearch cluster ($500+/month)
Vector searchNot supported (requires separate service)Native HNSW indexes with SIMDEliminate vector DB ($300+/month)
Complex WHERE clausesLimited (partition key required, filter expressions)Arbitrary SQL WHERE with AND/OR/NOT, BETWEEN, LIKE, INQuery any column combination without index constraints
ACID transactionsLimited (25-item, 4MB, single-region)Full multi-row, multi-table, no size limitTrue transactional integrity
Time-travelNot supported (requires DynamoDB Streams + Lambda)Native AS OF queriesPoint-in-time data access, built-in
BranchingNot supportedDatabase-level branchingIsolated dev/test environments
Secondary indexesGSI (20 max), LSI (5 max, must be created at table creation)Unlimited indexes on any column or expressionNo index limits or creation-time constraints
Ad-hoc queriesExtremely limited (must use partition key)Any SQL query on any columnTrue exploratory data access
Stored proceduresNot supportedPL/pgSQL with variables, loops, cursors, exceptionsServer-side business logic
TriggersDynamoDB Streams + Lambda (eventually consistent)BEFORE/AFTER triggers (synchronous, transactional)Guaranteed consistency

Pricing Model Comparison

DynamoDB pricing is notoriously difficult to predict. Costs scale with read/write capacity units (RCU/WCU) and can spike dramatically with unexpected traffic patterns.

DynamoDB Pricing Breakdown (typical mid-sized application)

Cost ComponentOn-Demand ModeProvisioned ModeNotes
Write requests (WCU)$1.25 per million$0.00065 per WCU/hour1 WCU = 1KB write/second
Read requests (RCU)$0.25 per million$0.00013 per RCU/hour1 RCU = 4KB strongly consistent read/second
Storage$0.25/GB/month$0.25/GB/monthFirst 25GB free
GSI writesSame as base tableSame as base tableEach GSI doubles write costs
GSI storage$0.25/GB/month$0.25/GB/monthEach GSI adds storage costs
DynamoDB Streams$0.02 per 100K reads$0.02 per 100K readsRequired for triggers/CDC
Data export to S3$0.10/GB$0.10/GBFor analytics pipeline
Backup (on-demand)$0.10/GB/month$0.10/GB/monthPITR: $0.20/GB/month

Cost Comparison Scenario: 50GB database, 5,000 writes/sec, 20,000 reads/sec

ComponentDynamoDB (Provisioned + Auto-Scaling)HeliosDB-Lite (Embedded)
Compute (WCU/RCU)$2,850/month$0
Storage (50GB)$12.50/month$0 (disk on app server)
GSI overhead (3 GSIs)$1,200/month$0
DynamoDB Streams$150/month$0 (triggers built-in)
OpenSearch (for search)$650/month$0 (FTS built-in)
Backup (PITR)$10/month$0 (file-level backup)
Data transfer$200/month$0 (in-process)
Application server$300/month$300/month
Monthly total$5,372$300
Annual total$64,470$3,600
Savings--$60,870/year (94%)

For on-demand pricing with variable traffic, DynamoDB costs can be 2-5x higher during peak periods. HeliosDB-Lite costs are fixed regardless of query volume.

Operational Simplification

Operational ConcernDynamoDBHeliosDB-Lite
Capacity planningRequired (RCU/WCU provisioning or on-demand cost monitoring)Not needed (embedded, uses app server resources)
ThrottlingYes (ProvisionedThroughputExceededException)No (no artificial limits)
Hot partition managementManual (choose good partition keys, add GSIs)Not applicable (B-tree indexes, no partitioning constraints)
GSI backfillSlow (can take hours for large tables)CREATE INDEX completes in seconds/minutes
Item size limit400KB per itemNo per-row limit
Transaction limit25 items, 4MB totalNo limit
Query result limit1MB per query (requires pagination)No artificial limit
Region availabilityMust configure multi-region explicitly (Global Tables)Embedded (runs wherever your app runs)
Vendor lock-inComplete (AWS proprietary API)None (standard SQL, PostgreSQL wire protocol)

Compatibility Matrix

DynamoDB ConceptHeliosDB-Lite EquivalentNotes
TableCREATE TABLEStandard SQL DDL
Partition keyPRIMARY KEY (single column)Any data type
Partition key + sort keyComposite PRIMARY KEY (pk, sk)Ordered by sort key
Global Secondary Index (GSI)CREATE INDEX on any column(s)No limit on number of indexes
Local Secondary Index (LSI)CREATE INDEX including primary key columnsCan be added any time
Item (document)Row (typed columns and/or JSONB)Schema-enforced
AttributeColumnTyped (INT4, TEXT, JSONB, etc.)
String (S)TEXTDirect mapping
Number (N)INT4, INT8, NUMERIC, or FLOAT8Choose appropriate type
Binary (B)BYTEABinary data
Boolean (BOOL)BOOLEANDirect mapping
Null (NULL)SQL NULLDirect mapping
List (L)JSONB array or ARRAY typeDesign choice
Map (M)JSONBBinary-indexed JSON
String Set (SS)TEXT[]PostgreSQL array
Number Set (NS)INT4[] or FLOAT8[]PostgreSQL array
Binary Set (BS)BYTEA[]PostgreSQL array
TTL attributeScheduled DELETE or partition-based pruningApplication logic or cron
DynamoDB StreamsTriggersSynchronous, transactional
Conditional expressionsWHERE clause / CHECK constraintsStandard SQL
UpdateExpressionUPDATE ... SETStandard SQL
ProjectionExpressionSELECT col1, col2Standard SQL
FilterExpressionWHERE clauseApplied at query time, not post-scan
PartiQLSQL (native)Full SQL support

Migration Steps

Step 1: Export Data from DynamoDB

Option A: DynamoDB Export to S3 (recommended for large tables)

# Export via AWS CLI (outputs JSON to S3)
aws dynamodb export-table-to-point-in-time \
  --table-arn arn:aws:dynamodb:us-east-1:123456789:table/Users \
  --s3-bucket my-migration-bucket \
  --s3-prefix dynamodb-export/ \
  --export-format DYNAMODB_JSON

# Download the export
aws s3 sync s3://my-migration-bucket/dynamodb-export/ ./dynamodb-export/

Option B: Scan and export (for smaller tables)

#!/usr/bin/env python3
"""Export DynamoDB table to JSON file."""

import boto3
import json
from decimal import Decimal

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return float(obj)
        return super().default(obj)

def export_table(table_name: str, output_path: str):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)

    items = []
    scan_kwargs = {}
    total = 0

    print(f"Exporting {table_name}...")

    while True:
        response = table.scan(**scan_kwargs)
        items.extend(response['Items'])
        total += response['Count']
        print(f"  Scanned {total} items...")

        if 'LastEvaluatedKey' not in response:
            break
        scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']

    with open(output_path, 'w') as f:
        json.dump(items, f, cls=DecimalEncoder, indent=2)

    print(f"Exported {total} items to {output_path}")

if __name__ == "__main__":
    export_table('Users', 'users.json')
    export_table('Orders', 'orders.json')
    export_table('Products', 'products.json')

Step 2: Design the HeliosDB-Lite Schema

DynamoDB's single-table design pattern (where all entity types share one table with overloaded keys) should typically be normalized into separate tables during migration. This is one of the primary advantages of migrating to a relational database.

DynamoDB single-table design

PK              SK              Attributes
USER#123        PROFILE         { name: "Alice", email: "alice@..." }
USER#123        ORDER#001       { total: 99.99, status: "shipped" }
USER#123        ORDER#002       { total: 149.50, status: "pending" }
PRODUCT#456     METADATA        { name: "Widget", price: 29.99 }
PRODUCT#456     REVIEW#789      { rating: 5, text: "Great product" }

HeliosDB-Lite normalized schema

-- Users table
CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  dynamo_pk TEXT UNIQUE,          -- Preserve original key for reference
  name TEXT NOT NULL,
  email TEXT UNIQUE NOT NULL,
  status TEXT NOT NULL DEFAULT 'active',
  profile JSONB DEFAULT '{}',     -- Flexible attributes as JSONB
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Orders table (separate from users, with foreign key)
CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  dynamo_sk TEXT,                  -- Preserve original sort key
  user_id INT4 NOT NULL REFERENCES users(id) ON DELETE CASCADE,
  total NUMERIC(12,2) NOT NULL,
  status TEXT NOT NULL DEFAULT 'pending'
    CHECK (status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')),
  items JSONB NOT NULL DEFAULT '[]',
  shipping_address JSONB,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at);

-- Products table
CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  dynamo_pk TEXT UNIQUE,
  name TEXT NOT NULL,
  description TEXT,
  price NUMERIC(10,2) NOT NULL,
  category TEXT,
  attributes JSONB DEFAULT '{}',
  embedding VECTOR(1536),          -- For vector similarity search
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_products_category ON products(category);
CREATE INDEX idx_products_price ON products(price);
CREATE INDEX idx_products_attrs ON products USING gin(attributes);

-- Reviews table (was embedded in DynamoDB single-table)
CREATE TABLE reviews (
  id SERIAL PRIMARY KEY,
  product_id INT4 NOT NULL REFERENCES products(id) ON DELETE CASCADE,
  user_id INT4 NOT NULL REFERENCES users(id),
  rating INT2 NOT NULL CHECK (rating BETWEEN 1 AND 5),
  title TEXT,
  body TEXT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_reviews_product ON reviews(product_id);
CREATE INDEX idx_reviews_user ON reviews(user_id);

-- Full-text search index on reviews
CREATE INDEX idx_reviews_fts ON reviews
  USING gin(to_tsvector('english', COALESCE(title, '') || ' ' || COALESCE(body, '')));

Step 3: Transform and Import Data

#!/usr/bin/env python3
"""Import DynamoDB JSON exports into HeliosDB-Lite normalized schema."""

import json
import psycopg2
import psycopg2.extras
from typing import Dict, List

def connect():
    return psycopg2.connect(
        host="127.0.0.1",
        port=5432,
        user="app",
        dbname="myapp"
    )

def import_single_table_design(conn, json_path: str):
    """
    Parse a DynamoDB single-table export and route items
    to the correct normalized tables based on PK/SK patterns.
    """
    with open(json_path, 'r') as f:
        items = json.load(f)

    cursor = conn.cursor()
    user_map = {}  # dynamo_pk -> heliosdb id

    # First pass: import users (PK=USER#*, SK=PROFILE)
    users = [item for item in items if item.get('SK', '').startswith('PROFILE')]
    for item in users:
        pk = item['PK']
        cursor.execute(
            """INSERT INTO users (dynamo_pk, name, email, status, profile)
               VALUES (%s, %s, %s, %s, %s::jsonb)
               RETURNING id""",
            (
                pk,
                item.get('name', ''),
                item.get('email', ''),
                item.get('status', 'active'),
                json.dumps({k: v for k, v in item.items()
                           if k not in ('PK', 'SK', 'name', 'email', 'status')}),
            )
        )
        user_map[pk] = cursor.fetchone()[0]
    conn.commit()
    print(f"  Imported {len(users)} users")

    # Second pass: import products (PK=PRODUCT#*, SK=METADATA)
    product_map = {}
    products = [item for item in items if item.get('SK', '').startswith('METADATA')]
    for item in products:
        pk = item['PK']
        cursor.execute(
            """INSERT INTO products (dynamo_pk, name, description, price, category, attributes)
               VALUES (%s, %s, %s, %s, %s, %s::jsonb)
               RETURNING id""",
            (
                pk,
                item.get('name', ''),
                item.get('description', ''),
                item.get('price', 0),
                item.get('category', ''),
                json.dumps({k: v for k, v in item.items()
                           if k not in ('PK', 'SK', 'name', 'description', 'price', 'category')}),
            )
        )
        product_map[pk] = cursor.fetchone()[0]
    conn.commit()
    print(f"  Imported {len(products)} products")

    # Third pass: import orders (PK=USER#*, SK=ORDER#*)
    orders = [item for item in items if item.get('SK', '').startswith('ORDER#')]
    for item in orders:
        user_id = user_map.get(item['PK'])
        if user_id:
            cursor.execute(
                """INSERT INTO orders (dynamo_sk, user_id, total, status, items)
                   VALUES (%s, %s, %s, %s, %s::jsonb)""",
                (
                    item['SK'],
                    user_id,
                    item.get('total', 0),
                    item.get('status', 'pending'),
                    json.dumps(item.get('items', [])),
                )
            )
    conn.commit()
    print(f"  Imported {len(orders)} orders")

    # Fourth pass: import reviews (PK=PRODUCT#*, SK=REVIEW#*)
    reviews = [item for item in items if item.get('SK', '').startswith('REVIEW#')]
    for item in reviews:
        product_id = product_map.get(item['PK'])
        user_pk = item.get('user_pk', '')
        user_id = user_map.get(user_pk)
        if product_id and user_id:
            cursor.execute(
                """INSERT INTO reviews (product_id, user_id, rating, title, body)
                   VALUES (%s, %s, %s, %s, %s)""",
                (
                    product_id,
                    user_id,
                    item.get('rating', 3),
                    item.get('title', ''),
                    item.get('text', ''),
                )
            )
    conn.commit()
    print(f"  Imported {len(reviews)} reviews")

if __name__ == "__main__":
    conn = connect()
    import_single_table_design(conn, 'dynamodb_export.json')
    conn.close()
    print("\nMigration complete.")

Step 4: Update Application Code

Replace AWS SDK DynamoDB calls with standard SQL queries. See the Query Translation Examples section below for detailed mappings.

Step 5: Validate

-- Verify row counts
SELECT 'users' AS entity, COUNT(*) AS count FROM users
UNION ALL SELECT 'orders', COUNT(*) FROM orders
UNION ALL SELECT 'products', COUNT(*) FROM products
UNION ALL SELECT 'reviews', COUNT(*) FROM reviews;

-- Verify referential integrity (DynamoDB had none)
SELECT o.id AS orphaned_order
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE u.id IS NULL;

-- Test a query that was impossible in DynamoDB
SELECT
  u.name,
  COUNT(o.id) AS order_count,
  SUM(o.total) AS lifetime_value,
  AVG(o.total) AS avg_order,
  MAX(o.created_at) AS last_order
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status IN ('shipped', 'delivered')
GROUP BY u.name
HAVING SUM(o.total) > 500
ORDER BY lifetime_value DESC
LIMIT 20;

Data Type Mapping

DynamoDB TypeType CodeHeliosDB-Lite TypeNotes
StringSTEXTDirect mapping
NumberNINT4, INT8, NUMERIC, or FLOAT8Choose based on data range and precision
BinaryBBYTEABase64-decoded
BooleanBOOLBOOLEANDirect mapping
NullNULLSQL NULLDirect mapping
ListLJSONB array'[1, "two", true]'::jsonb
MapMJSONB'{"key": "value"}'::jsonb
String SetSSTEXT[]ARRAY['a', 'b', 'c']
Number SetNSINT4[] or FLOAT8[]ARRAY[1, 2, 3]
Binary SetBSBYTEA[]Array of binary values

Partition Key / Sort Key Mapping

DynamoDB Key StructureHeliosDB-Lite Equivalent
Partition key only (hash)PRIMARY KEY (id)
Partition key + sort keyPRIMARY KEY (partition_col, sort_col)
GSI (partition key only)CREATE INDEX idx_name ON table(column)
GSI (partition + sort)CREATE INDEX idx_name ON table(col1, col2)
LSI (same partition, different sort)CREATE INDEX idx_name ON table(partition_col, alt_sort_col)

Query Translation Examples

GetItem --> SELECT WHERE

# DynamoDB
response = table.get_item(Key={'user_id': '123'})
item = response.get('Item')

# HeliosDB-Lite
cursor.execute("SELECT * FROM users WHERE id = %s", (123,))
row = cursor.fetchone()

PutItem --> INSERT / UPSERT

# DynamoDB
table.put_item(Item={
    'user_id': '123',
    'name': 'Alice',
    'email': 'alice@example.com',
    'status': 'active'
})

# HeliosDB-Lite (INSERT)
cursor.execute(
    "INSERT INTO users (id, name, email, status) VALUES (%s, %s, %s, %s)",
    (123, 'Alice', 'alice@example.com', 'active')
)

# HeliosDB-Lite (UPSERT -- equivalent to PutItem which overwrites)
cursor.execute("""
    INSERT INTO users (id, name, email, status)
    VALUES (%s, %s, %s, %s)
    ON CONFLICT (id) DO UPDATE SET
      name = EXCLUDED.name,
      email = EXCLUDED.email,
      status = EXCLUDED.status
""", (123, 'Alice', 'alice@example.com', 'active'))

Query (partition key + sort key) --> SELECT WHERE

# DynamoDB: Get all orders for a user, sorted by date
response = table.query(
    KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#'),
    ScanIndexForward=False  # Descending
)

# HeliosDB-Lite: Simple JOIN (no single-table design needed)
cursor.execute("""
    SELECT o.id, o.total, o.status, o.created_at
    FROM orders o
    WHERE o.user_id = %s
    ORDER BY o.created_at DESC
""", (123,))

Query with FilterExpression --> SELECT WHERE

# DynamoDB: Filter is applied AFTER reading from disk (inefficient)
response = table.query(
    KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#'),
    FilterExpression=Attr('status').eq('shipped') & Attr('total').gte(50)
)

# HeliosDB-Lite: WHERE filters at query planning time (efficient)
cursor.execute("""
    SELECT * FROM orders
    WHERE user_id = %s
      AND status = 'shipped'
      AND total >= 50
    ORDER BY created_at DESC
""", (123,))

Scan --> SELECT (full table scan)

# DynamoDB: Expensive, paginated, 1MB limit per call
items = []
scan_kwargs = {'FilterExpression': Attr('status').eq('active')}
while True:
    response = table.scan(**scan_kwargs)
    items.extend(response['Items'])
    if 'LastEvaluatedKey' not in response:
        break
    scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']

# HeliosDB-Lite: Single query, no pagination needed
cursor.execute("SELECT * FROM users WHERE status = 'active'")
items = cursor.fetchall()

UpdateItem with UpdateExpression --> UPDATE SET

# DynamoDB
table.update_item(
    Key={'user_id': '123'},
    UpdateExpression='SET #n = :name, login_count = login_count + :inc, updated_at = :now',
    ExpressionAttributeNames={'#n': 'name'},
    ExpressionAttributeValues={
        ':name': 'Alice Smith',
        ':inc': 1,
        ':now': '2026-01-15T10:00:00Z'
    }
)

# HeliosDB-Lite (dramatically simpler)
cursor.execute("""
    UPDATE users
    SET name = %s,
        login_count = login_count + 1,
        updated_at = NOW()
    WHERE id = %s
""", ('Alice Smith', 123))

DeleteItem with ConditionExpression --> DELETE WHERE

# DynamoDB (conditional delete)
table.delete_item(
    Key={'user_id': '123'},
    ConditionExpression=Attr('status').eq('inactive')
)

# HeliosDB-Lite
cursor.execute(
    "DELETE FROM users WHERE id = %s AND status = 'inactive'",
    (123,)
)
# Check cursor.rowcount to see if a row was actually deleted

BatchGetItem --> SELECT WHERE IN

# DynamoDB: BatchGetItem (max 100 items, 16MB limit)
response = dynamodb.batch_get_item(
    RequestItems={
        'Users': {
            'Keys': [{'user_id': {'S': '1'}}, {'user_id': {'S': '2'}}, {'user_id': {'S': '3'}}]
        }
    }
)

# HeliosDB-Lite: Single query, no limits
cursor.execute("SELECT * FROM users WHERE id = ANY(%s)", ([1, 2, 3],))

TransactWriteItems --> SQL Transaction

# DynamoDB (25-item limit, 4MB limit, complex API)
dynamodb.transact_write_items(
    TransactItems=[
        {'Put': {'TableName': 'Orders', 'Item': {'order_id': {'S': 'ORD-001'}, 'total': {'N': '99.99'}}}},
        {'Update': {'TableName': 'Users', 'Key': {'user_id': {'S': '123'}},
                    'UpdateExpression': 'SET order_count = order_count + :inc',
                    'ExpressionAttributeValues': {':inc': {'N': '1'}}}},
        {'Update': {'TableName': 'Inventory', 'Key': {'product_id': {'S': '456'}},
                    'UpdateExpression': 'SET stock = stock - :qty',
                    'ExpressionAttributeValues': {':qty': {'N': '2'}},
                    'ConditionExpression': 'stock >= :qty'}}
    ]
)

# HeliosDB-Lite (standard SQL, no limits, much simpler)
cursor.execute("BEGIN")
cursor.execute(
    "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
    (123, 99.99)
)
order_id = cursor.fetchone()[0]
cursor.execute(
    "UPDATE users SET order_count = order_count + 1 WHERE id = %s",
    (123,)
)
cursor.execute(
    "UPDATE inventory SET stock = stock - 2 WHERE product_id = %s AND stock >= 2",
    (456,)
)
if cursor.rowcount == 0:
    conn.rollback()
    raise Exception("Insufficient inventory")
conn.commit()

Common Patterns

Pattern 1: Python Application Migration

"""
DynamoDB to HeliosDB-Lite migration -- Python application layer.
Replace boto3 DynamoDB calls with psycopg2 SQL queries.
"""

import psycopg2
import psycopg2.extras
import json

class ProductCatalog:
    """
    Replaces a DynamoDB-backed product catalog.

    DynamoDB limitations this migration eliminates:
    - No JOINs (required denormalized data)
    - No aggregations (required scan + application code)
    - No full-text search (required OpenSearch)
    - No vector search (required separate service)
    - 400KB item size limit
    - 1MB query result limit
    """

    def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"):
        self.conn = psycopg2.connect(dsn)

    def get_product(self, product_id: int) -> dict:
        """Replaces: table.get_item(Key={'product_id': product_id})"""
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))
        return cursor.fetchone()

    def search_products(self, query: str, category: str = None, limit: int = 20) -> list:
        """
        Replaces: OpenSearch cluster ($500+/month) that was required because
        DynamoDB cannot do full-text search.
        """
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        sql = """
            SELECT id, name, description, price, category,
                   ts_rank(to_tsvector('english', name || ' ' || COALESCE(description, '')),
                           to_tsquery('english', %s)) AS relevance
            FROM products
            WHERE to_tsvector('english', name || ' ' || COALESCE(description, ''))
                  @@ to_tsquery('english', %s)
        """
        params = [query, query]

        if category:
            sql += " AND category = %s"
            params.append(category)

        sql += " ORDER BY relevance DESC LIMIT %s"
        params.append(limit)

        cursor.execute(sql, params)
        return cursor.fetchall()

    def get_category_stats(self) -> list:
        """
        Replaces: DynamoDB scan + application-layer aggregation.
        In DynamoDB, this required scanning the entire table and computing
        aggregations in Lambda/application code.
        """
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute("""
            SELECT
                category,
                COUNT(*) AS product_count,
                AVG(price) AS avg_price,
                MIN(price) AS min_price,
                MAX(price) AS max_price,
                PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_price
            FROM products
            GROUP BY category
            ORDER BY product_count DESC
        """)
        return cursor.fetchall()

    def get_top_rated_by_category(self, min_reviews: int = 5) -> list:
        """
        Replaces: DynamoDB $lookup-equivalent (impossible without single-table
        design tricks, even then very limited).
        """
        cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute("""
            WITH product_ratings AS (
                SELECT
                    p.id, p.name, p.category, p.price,
                    AVG(r.rating) AS avg_rating,
                    COUNT(r.id) AS review_count,
                    RANK() OVER (PARTITION BY p.category ORDER BY AVG(r.rating) DESC) AS category_rank
                FROM products p
                JOIN reviews r ON p.id = r.product_id
                GROUP BY p.id, p.name, p.category, p.price
                HAVING COUNT(r.id) >= %s
            )
            SELECT * FROM product_ratings
            WHERE category_rank <= 3
            ORDER BY category, category_rank
        """, (min_reviews,))
        return cursor.fetchall()

Pattern 2: TypeScript Application Migration

import { Pool, QueryResult } from 'pg';

const pool = new Pool({
  host: '127.0.0.1',
  port: 5432,
  database: 'myapp',
  user: 'app',
});

// Replaces: DynamoDB GetItem
async function getUser(userId: number) {
  const result = await pool.query('SELECT * FROM users WHERE id = $1', [userId]);
  return result.rows[0] || null;
}

// Replaces: DynamoDB PutItem
async function createUser(name: string, email: string, profile: object = {}) {
  const result = await pool.query(
    `INSERT INTO users (name, email, profile)
     VALUES ($1, $2, $3::jsonb)
     RETURNING id`,
    [name, email, JSON.stringify(profile)]
  );
  return result.rows[0].id;
}

// Replaces: DynamoDB Query (pk + sk condition) + separate $lookup-equivalent
async function getUserOrderHistory(userId: number) {
  const result = await pool.query(`
    SELECT
      o.id AS order_id,
      o.total,
      o.status,
      o.created_at,
      jsonb_agg(
        jsonb_build_object(
          'product_id', item->>'product_id',
          'name', item->>'name',
          'quantity', item->>'quantity',
          'price', item->>'price'
        )
      ) AS items
    FROM orders o,
         jsonb_array_elements(o.items) AS item
    WHERE o.user_id = $1
    GROUP BY o.id
    ORDER BY o.created_at DESC
  `, [userId]);
  return result.rows;
}

// Replaces: DynamoDB Scan + application-layer aggregation + separate analytics pipeline
async function getDashboardMetrics(days: number = 30) {
  const result = await pool.query(`
    SELECT
      COUNT(DISTINCT o.user_id) AS active_customers,
      COUNT(o.id) AS total_orders,
      SUM(o.total) AS total_revenue,
      AVG(o.total) AS avg_order_value,
      COUNT(CASE WHEN o.status = 'cancelled' THEN 1 END)::FLOAT
        / NULLIF(COUNT(o.id), 0) AS cancellation_rate
    FROM orders o
    WHERE o.created_at >= NOW() - ($1 || ' days')::INTERVAL
  `, [days]);
  return result.rows[0];
}

// Replaces: DynamoDB TransactWriteItems (25-item limit, 4MB limit)
async function placeOrder(userId: number, items: Array<{productId: number, quantity: number, price: number}>) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');

    const total = items.reduce((sum, i) => sum + i.quantity * i.price, 0);
    const orderResult = await client.query(
      `INSERT INTO orders (user_id, total, status, items)
       VALUES ($1, $2, 'pending', $3::jsonb)
       RETURNING id`,
      [userId, total, JSON.stringify(items)]
    );
    const orderId = orderResult.rows[0].id;

    for (const item of items) {
      const invResult = await client.query(
        `UPDATE inventory
         SET stock = stock - $1
         WHERE product_id = $2 AND stock >= $1
         RETURNING stock`,
        [item.quantity, item.productId]
      );
      if (invResult.rowCount === 0) {
        throw new Error(`Insufficient stock for product ${item.productId}`);
      }
    }

    await client.query(
      'UPDATE users SET order_count = order_count + 1 WHERE id = $1',
      [userId]
    );

    await client.query('COMMIT');
    return orderId;
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}

// HeliosDB-Lite exclusive: time-travel for audit/compliance
async function getOrderStateAtTime(orderId: number, timestamp: string) {
  const result = await pool.query(
    `SELECT * FROM orders AS OF $1 WHERE id = $2`,
    [timestamp, orderId]
  );
  return result.rows[0] || null;
}

Pattern 3: Replacing DynamoDB Streams with Triggers

-- DynamoDB required: DynamoDB Streams + Lambda function (eventually consistent)
-- HeliosDB-Lite: Synchronous, transactional trigger

-- Audit log trigger (replaces Streams + Lambda audit function)
CREATE TABLE audit_log (
  id SERIAL PRIMARY KEY,
  table_name TEXT NOT NULL,
  operation TEXT NOT NULL,   -- INSERT, UPDATE, DELETE
  row_id INT4,
  old_data JSONB,
  new_data JSONB,
  changed_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
  IF TG_OP = 'INSERT' THEN
    INSERT INTO audit_log (table_name, operation, row_id, new_data)
    VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, row_to_json(NEW)::jsonb);
    RETURN NEW;
  ELSIF TG_OP = 'UPDATE' THEN
    INSERT INTO audit_log (table_name, operation, row_id, old_data, new_data)
    VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, row_to_json(OLD)::jsonb, row_to_json(NEW)::jsonb);
    RETURN NEW;
  ELSIF TG_OP = 'DELETE' THEN
    INSERT INTO audit_log (table_name, operation, row_id, old_data)
    VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, row_to_json(OLD)::jsonb);
    RETURN OLD;
  END IF;
END;
$$;

-- Apply to orders table
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();

-- Apply to users table
CREATE TRIGGER users_audit
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();

Pattern 4: Replacing DynamoDB TTL

-- DynamoDB TTL: Set a numeric attribute, DynamoDB deletes expired items
-- (within 48 hours, not precise)

-- HeliosDB-Lite Option A: Scheduled cleanup with precise timing
DELETE FROM sessions WHERE expires_at < NOW();

-- HeliosDB-Lite Option B: Use partitioning for efficient bulk deletion
CREATE TABLE sessions (
  id SERIAL,
  user_id INT4 NOT NULL,
  token TEXT NOT NULL,
  data JSONB,
  expires_at TIMESTAMPTZ NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (expires_at);

-- Create monthly partitions
CREATE TABLE sessions_2026_01 PARTITION OF sessions
  FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE sessions_2026_02 PARTITION OF sessions
  FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

-- Drop expired partitions (instant, no per-row delete)
DROP TABLE sessions_2025_12;

Limitations & Workarounds

DynamoDB Features Not Directly Supported

DynamoDB FeatureStatusWorkaround
Auto-scaling (read/write capacity)Not applicableEmbedded database uses application server resources; scale the server
Global Tables (multi-region)Different architectureUse HeliosDB-Lite replication (WAL streaming) or deploy per-region
DynamoDB Accelerator (DAX)Not applicableHeliosDB-Lite is already in-process (0ms latency); use built-in query cache
PartiQL (limited SQL)Full SQLHeliosDB-Lite supports complete SQL (PartiQL queries migrate trivially)
On-demand capacityNot applicableNo capacity planning needed; embedded database has no artificial throughput limits
Point-in-time recovery (PITR)Built-in (better)HeliosDB-Lite time-travel queries (AS OF) provide row-level point-in-time access
AWS IAM authenticationDifferent auth modelHeliosDB-Lite uses SCRAM-SHA-256 or trust authentication
VPC endpointsNot applicableEmbedded database has no network exposure
DynamoDB Streams orderingDifferent guaranteeTriggers execute synchronously within the transaction (stronger guarantee)
Item-level encryptionDifferent approachHeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM)
400KB item limitNo limitNo per-row size restriction
Eventual consistency readsNot applicableAll reads are strongly consistent

When DynamoDB Might Still Be Better

  • Massive write throughput (100K+ writes/second): DynamoDB's distributed architecture scales horizontally. For extreme write volumes, consider HeliosDB-Lite's sharding or multi-primary replication.
  • Multi-region active-active: DynamoDB Global Tables provide turn-key multi-region replication. HeliosDB-Lite supports replication but requires more configuration.
  • Serverless Lambda integration: If your entire stack is AWS Lambda + DynamoDB and you have no desire to manage infrastructure, the serverless model may still make sense (at higher cost).

Rollback Strategy

Pre-Migration

  1. Keep DynamoDB tables active for at least 2-4 weeks after cutover
  2. Enable DynamoDB PITR on all tables as a safety net
  3. Tag DynamoDB resources as "migration-pending" (do not delete)

Dual-Write Approach

During the transition, write to both databases:

import boto3
import psycopg2
import json

class DualWriteService:
    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.ddb_table = self.dynamodb.Table('Orders')
        self.helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")

    def create_order(self, user_id: int, total: float, items: list) -> int:
        # Primary: HeliosDB-Lite
        cursor = self.helios.cursor()
        cursor.execute(
            """INSERT INTO orders (user_id, total, status, items)
               VALUES (%s, %s, 'pending', %s::jsonb) RETURNING id""",
            (user_id, total, json.dumps(items))
        )
        self.helios.commit()
        order_id = cursor.fetchone()[0]

        # Secondary: DynamoDB (for rollback safety)
        self.ddb_table.put_item(Item={
            'PK': f'USER#{user_id}',
            'SK': f'ORDER#{order_id}',
            'total': str(total),
            'status': 'pending',
            'items': items,
            'helios_id': order_id,
        })

        return order_id

Rollback Procedure

  1. Revert application code to use AWS SDK DynamoDB calls
  2. Verify DynamoDB data is current (if dual-write was active)
  3. If dual-write was not active, export from HeliosDB-Lite and reimport:
import psycopg2
import psycopg2.extras
import boto3
import json
from decimal import Decimal

def rollback_to_dynamodb():
    helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('Orders')

    cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    cursor.execute("SELECT * FROM orders")

    with table.batch_writer() as batch:
        for row in cursor:
            batch.put_item(Item={
                'PK': f"USER#{row['user_id']}",
                'SK': f"ORDER#{row['id']}",
                'total': Decimal(str(row['total'])),
                'status': row['status'],
                'items': json.loads(json.dumps(row['items'])),
                'created_at': row['created_at'].isoformat(),
            })

    print("Rollback to DynamoDB complete")

rollback_to_dynamodb()
  1. Re-enable DynamoDB auto-scaling and verify capacity is sufficient
  2. Post-mortem -- document issues and plan remediation

References

  1. AWS DynamoDB Developer Guide: Core Components and API Reference (2025)
  2. AWS DynamoDB Pricing: On-Demand and Provisioned Capacity (2025)
  3. HeliosDB-Lite Documentation: SQL Engine and JSONB Reference (2026)
  4. AWS Data Pipeline: DynamoDB Export to S3 (2025)
  5. boto3 Documentation: DynamoDB Resource and Client (2025)
  6. psycopg2 Documentation: Python PostgreSQL Adapter (2025)
  7. node-postgres (pg): TypeScript/JavaScript PostgreSQL Client (2025)
  8. pgx: Go PostgreSQL Driver and Toolkit (2025)
  9. The DynamoDB Book by Alex DeBrie: Single-Table Design Patterns (2024)
  10. AWS Well-Architected Framework: Cost Optimization Pillar (2025)

Need help migrating?

Our team can help you plan and execute your migration to HeliosDB.

Contact Sales More Migration Guides