Amazon DynamoDB to HeliosDB-Lite Migration Guide
Amazon DynamoDB to HeliosDB-Lite Migration Guide
Category: Database Migration HeliosDB-Lite Version: 3.5+
Overview
Amazon DynamoDB is a fully managed NoSQL key-value and document database that delivers single-digit millisecond performance at any scale. While DynamoDB excels at high-throughput, low-latency access patterns with predictable key-based lookups, its rigid query model, opaque pricing, and severe analytical limitations push teams toward costly workarounds: maintaining separate Elasticsearch clusters for search, building ETL pipelines to Redshift for reporting, and implementing complex single-table designs to simulate relational queries.
HeliosDB-Lite offers a fundamentally different approach: a full SQL engine with ACID transactions, JOINs, aggregations, window functions, vector search, full-text search, and time-travel — all in an embedded, zero-config deployment. By migrating from DynamoDB, organizations eliminate capacity planning (RCU/WCU), throttling anxiety, and the constellation of auxiliary services required to fill DynamoDB’s feature gaps.
The migration path is: export DynamoDB tables (via Data Pipeline, S3 export, or scan), transform the items into SQL INSERT statements, and import through the PostgreSQL wire protocol. Applications replace AWS SDK calls with standard SQL queries via any PostgreSQL client library.
Why Migrate?
Feature Comparison
DynamoDB is a key-value store with document capabilities. HeliosDB-Lite is a full relational database engine. The feature gap is significant:
| Capability | DynamoDB | HeliosDB-Lite | Impact |
|---|---|---|---|
| JOINs | Not supported | Full SQL JOINs (INNER, LEFT, RIGHT, FULL, CROSS, LATERAL) | Eliminate denormalization and data duplication |
| Aggregations (SUM, AVG, COUNT, MIN, MAX) | Not supported (scan + application code) | Native SQL aggregations | Remove application-layer computation |
| GROUP BY | Not supported | Full GROUP BY with HAVING | Inline analytics without ETL |
| Window functions | Not supported | ROW_NUMBER, RANK, LAG, LEAD, NTILE, etc. | Running totals, rankings, time-series analysis |
| CTEs | Not supported | WITH / WITH RECURSIVE | Simplify complex queries, traverse hierarchies |
| Subqueries | Not supported | Scalar, correlated, EXISTS, IN, ANY/ALL | Flexible filtering and computation |
| Full-text search | Not supported (requires OpenSearch) | Native @@ operator with ranking | Eliminate OpenSearch cluster ($500+/month) |
| Vector search | Not supported (requires separate service) | Native HNSW indexes with SIMD | Eliminate vector DB ($300+/month) |
| Complex WHERE clauses | Limited (partition key required, filter expressions) | Arbitrary SQL WHERE with AND/OR/NOT, BETWEEN, LIKE, IN | Query any column combination without index constraints |
| ACID transactions | Limited (25-item, 4MB, single-region) | Full multi-row, multi-table, no size limit | True transactional integrity |
| Time-travel | Not supported (requires DynamoDB Streams + Lambda) | Native AS OF queries | Point-in-time data access, built-in |
| Branching | Not supported | Database-level branching | Isolated dev/test environments |
| Secondary indexes | GSI (20 max), LSI (5 max, must be created at table creation) | Unlimited indexes on any column or expression | No index limits or creation-time constraints |
| Ad-hoc queries | Extremely limited (must use partition key) | Any SQL query on any column | True exploratory data access |
| Stored procedures | Not supported | PL/pgSQL with variables, loops, cursors, exceptions | Server-side business logic |
| Triggers | DynamoDB Streams + Lambda (eventually consistent) | BEFORE/AFTER triggers (synchronous, transactional) | Guaranteed consistency |
Pricing Model Comparison
DynamoDB pricing is notoriously difficult to predict. Costs scale with read/write capacity units (RCU/WCU) and can spike dramatically with unexpected traffic patterns.
DynamoDB Pricing Breakdown (typical mid-sized application):
| Cost Component | On-Demand Mode | Provisioned Mode | Notes |
|---|---|---|---|
| Write requests (WCU) | $1.25 per million | $0.00065 per WCU/hour | 1 WCU = 1KB write/second |
| Read requests (RCU) | $0.25 per million | $0.00013 per RCU/hour | 1 RCU = 4KB strongly consistent read/second |
| Storage | $0.25/GB/month | $0.25/GB/month | First 25GB free |
| GSI writes | Same as base table | Same as base table | Each GSI doubles write costs |
| GSI storage | $0.25/GB/month | $0.25/GB/month | Each GSI adds storage costs |
| DynamoDB Streams | $0.02 per 100K reads | $0.02 per 100K reads | Required for triggers/CDC |
| Data export to S3 | $0.10/GB | $0.10/GB | For analytics pipeline |
| Backup (on-demand) | $0.10/GB/month | $0.10/GB/month | PITR: $0.20/GB/month |
Cost Comparison Scenario: 50GB database, 5,000 writes/sec, 20,000 reads/sec:
| Component | DynamoDB (Provisioned + Auto-Scaling) | HeliosDB-Lite (Embedded) |
|---|---|---|
| Compute (WCU/RCU) | $2,850/month | $0 |
| Storage (50GB) | $12.50/month | $0 (disk on app server) |
| GSI overhead (3 GSIs) | $1,200/month | $0 |
| DynamoDB Streams | $150/month | $0 (triggers built-in) |
| OpenSearch (for search) | $650/month | $0 (FTS built-in) |
| Backup (PITR) | $10/month | $0 (file-level backup) |
| Data transfer | $200/month | $0 (in-process) |
| Application server | $300/month | $300/month |
| Monthly total | $5,372 | $300 |
| Annual total | $64,470 | $3,600 |
| Savings | — | $60,870/year (94%) |
For on-demand pricing with variable traffic, DynamoDB costs can be 2-5x higher during peak periods. HeliosDB-Lite costs are fixed regardless of query volume.
Operational Simplification
| Operational Concern | DynamoDB | HeliosDB-Lite |
|---|---|---|
| Capacity planning | Required (RCU/WCU provisioning or on-demand cost monitoring) | Not needed (embedded, uses app server resources) |
| Throttling | Yes (ProvisionedThroughputExceededException) | No (no artificial limits) |
| Hot partition management | Manual (choose good partition keys, add GSIs) | Not applicable (B-tree indexes, no partitioning constraints) |
| GSI backfill | Slow (can take hours for large tables) | CREATE INDEX completes in seconds/minutes |
| Item size limit | 400KB per item | No per-row limit |
| Transaction limit | 25 items, 4MB total | No limit |
| Query result limit | 1MB per query (requires pagination) | No artificial limit |
| Region availability | Must configure multi-region explicitly (Global Tables) | Embedded (runs wherever your app runs) |
| Vendor lock-in | Complete (AWS proprietary API) | None (standard SQL, PostgreSQL wire protocol) |
Compatibility Matrix
| DynamoDB Concept | HeliosDB-Lite Equivalent | Notes |
|---|---|---|
| Table | CREATE TABLE | Standard SQL DDL |
| Partition key | PRIMARY KEY (single column) | Any data type |
| Partition key + sort key | Composite PRIMARY KEY (pk, sk) | Ordered by sort key |
| Global Secondary Index (GSI) | CREATE INDEX on any column(s) | No limit on number of indexes |
| Local Secondary Index (LSI) | CREATE INDEX including primary key columns | Can be added any time |
| Item (document) | Row (typed columns and/or JSONB) | Schema-enforced |
| Attribute | Column | Typed (INT4, TEXT, JSONB, etc.) |
| String (S) | TEXT | Direct mapping |
| Number (N) | INT4, INT8, NUMERIC, or FLOAT8 | Choose appropriate type |
| Binary (B) | BYTEA | Binary data |
| Boolean (BOOL) | BOOLEAN | Direct mapping |
| Null (NULL) | SQL NULL | Direct mapping |
| List (L) | JSONB array or ARRAY type | Design choice |
| Map (M) | JSONB | Binary-indexed JSON |
| String Set (SS) | TEXT[] | PostgreSQL array |
| Number Set (NS) | INT4[] or FLOAT8[] | PostgreSQL array |
| Binary Set (BS) | BYTEA[] | PostgreSQL array |
| TTL attribute | Scheduled DELETE or partition-based pruning | Application logic or cron |
| DynamoDB Streams | Triggers | Synchronous, transactional |
| Conditional expressions | WHERE clause / CHECK constraints | Standard SQL |
| UpdateExpression | UPDATE ... SET | Standard SQL |
| ProjectionExpression | SELECT col1, col2 | Standard SQL |
| FilterExpression | WHERE clause | Applied at query time, not post-scan |
| PartiQL | SQL (native) | Full SQL support |
Migration Steps
Step 1: Export Data from DynamoDB
Option A: DynamoDB Export to S3 (recommended for large tables)
# Export via AWS CLI (outputs JSON to S3)aws dynamodb export-table-to-point-in-time \ --table-arn arn:aws:dynamodb:us-east-1:123456789:table/Users \ --s3-bucket my-migration-bucket \ --s3-prefix dynamodb-export/ \ --export-format DYNAMODB_JSON
# Download the exportaws s3 sync s3://my-migration-bucket/dynamodb-export/ ./dynamodb-export/Option B: Scan and export (for smaller tables)
#!/usr/bin/env python3"""Export DynamoDB table to JSON file."""
import boto3import jsonfrom decimal import Decimal
class DecimalEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, Decimal): return float(obj) return super().default(obj)
def export_table(table_name: str, output_path: str): dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(table_name)
items = [] scan_kwargs = {} total = 0
print(f"Exporting {table_name}...")
while True: response = table.scan(**scan_kwargs) items.extend(response['Items']) total += response['Count'] print(f" Scanned {total} items...")
if 'LastEvaluatedKey' not in response: break scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
with open(output_path, 'w') as f: json.dump(items, f, cls=DecimalEncoder, indent=2)
print(f"Exported {total} items to {output_path}")
if __name__ == "__main__": export_table('Users', 'users.json') export_table('Orders', 'orders.json') export_table('Products', 'products.json')Step 2: Design the HeliosDB-Lite Schema
DynamoDB’s single-table design pattern (where all entity types share one table with overloaded keys) should typically be normalized into separate tables during migration. This is one of the primary advantages of migrating to a relational database.
DynamoDB single-table design:
PK SK AttributesUSER#123 PROFILE { name: "Alice", email: "alice@..." }USER#123 ORDER#001 { total: 99.99, status: "shipped" }USER#123 ORDER#002 { total: 149.50, status: "pending" }PRODUCT#456 METADATA { name: "Widget", price: 29.99 }PRODUCT#456 REVIEW#789 { rating: 5, text: "Great product" }HeliosDB-Lite normalized schema:
-- Users tableCREATE TABLE users ( id SERIAL PRIMARY KEY, dynamo_pk TEXT UNIQUE, -- Preserve original key for reference name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, status TEXT NOT NULL DEFAULT 'active', profile JSONB DEFAULT '{}', -- Flexible attributes as JSONB created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW());
-- Orders table (separate from users, with foreign key)CREATE TABLE orders ( id SERIAL PRIMARY KEY, dynamo_sk TEXT, -- Preserve original sort key user_id INT4 NOT NULL REFERENCES users(id) ON DELETE CASCADE, total NUMERIC(12,2) NOT NULL, status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')), items JSONB NOT NULL DEFAULT '[]', shipping_address JSONB, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW());
CREATE INDEX idx_orders_user_id ON orders(user_id);CREATE INDEX idx_orders_status ON orders(status);CREATE INDEX idx_orders_created ON orders(created_at);
-- Products tableCREATE TABLE products ( id SERIAL PRIMARY KEY, dynamo_pk TEXT UNIQUE, name TEXT NOT NULL, description TEXT, price NUMERIC(10,2) NOT NULL, category TEXT, attributes JSONB DEFAULT '{}', embedding VECTOR(1536), -- For vector similarity search created_at TIMESTAMPTZ DEFAULT NOW());
CREATE INDEX idx_products_category ON products(category);CREATE INDEX idx_products_price ON products(price);CREATE INDEX idx_products_attrs ON products USING gin(attributes);
-- Reviews table (was embedded in DynamoDB single-table)CREATE TABLE reviews ( id SERIAL PRIMARY KEY, product_id INT4 NOT NULL REFERENCES products(id) ON DELETE CASCADE, user_id INT4 NOT NULL REFERENCES users(id), rating INT2 NOT NULL CHECK (rating BETWEEN 1 AND 5), title TEXT, body TEXT, created_at TIMESTAMPTZ DEFAULT NOW());
CREATE INDEX idx_reviews_product ON reviews(product_id);CREATE INDEX idx_reviews_user ON reviews(user_id);
-- Full-text search index on reviewsCREATE INDEX idx_reviews_fts ON reviews USING gin(to_tsvector('english', COALESCE(title, '') || ' ' || COALESCE(body, '')));Step 3: Transform and Import Data
#!/usr/bin/env python3"""Import DynamoDB JSON exports into HeliosDB-Lite normalized schema."""
import jsonimport psycopg2import psycopg2.extrasfrom typing import Dict, List
def connect(): return psycopg2.connect( host="127.0.0.1", port=5432, user="app", dbname="myapp" )
def import_single_table_design(conn, json_path: str): """ Parse a DynamoDB single-table export and route items to the correct normalized tables based on PK/SK patterns. """ with open(json_path, 'r') as f: items = json.load(f)
cursor = conn.cursor() user_map = {} # dynamo_pk -> heliosdb id
# First pass: import users (PK=USER#*, SK=PROFILE) users = [item for item in items if item.get('SK', '').startswith('PROFILE')] for item in users: pk = item['PK'] cursor.execute( """INSERT INTO users (dynamo_pk, name, email, status, profile) VALUES (%s, %s, %s, %s, %s::jsonb) RETURNING id""", ( pk, item.get('name', ''), item.get('email', ''), item.get('status', 'active'), json.dumps({k: v for k, v in item.items() if k not in ('PK', 'SK', 'name', 'email', 'status')}), ) ) user_map[pk] = cursor.fetchone()[0] conn.commit() print(f" Imported {len(users)} users")
# Second pass: import products (PK=PRODUCT#*, SK=METADATA) product_map = {} products = [item for item in items if item.get('SK', '').startswith('METADATA')] for item in products: pk = item['PK'] cursor.execute( """INSERT INTO products (dynamo_pk, name, description, price, category, attributes) VALUES (%s, %s, %s, %s, %s, %s::jsonb) RETURNING id""", ( pk, item.get('name', ''), item.get('description', ''), item.get('price', 0), item.get('category', ''), json.dumps({k: v for k, v in item.items() if k not in ('PK', 'SK', 'name', 'description', 'price', 'category')}), ) ) product_map[pk] = cursor.fetchone()[0] conn.commit() print(f" Imported {len(products)} products")
# Third pass: import orders (PK=USER#*, SK=ORDER#*) orders = [item for item in items if item.get('SK', '').startswith('ORDER#')] for item in orders: user_id = user_map.get(item['PK']) if user_id: cursor.execute( """INSERT INTO orders (dynamo_sk, user_id, total, status, items) VALUES (%s, %s, %s, %s, %s::jsonb)""", ( item['SK'], user_id, item.get('total', 0), item.get('status', 'pending'), json.dumps(item.get('items', [])), ) ) conn.commit() print(f" Imported {len(orders)} orders")
# Fourth pass: import reviews (PK=PRODUCT#*, SK=REVIEW#*) reviews = [item for item in items if item.get('SK', '').startswith('REVIEW#')] for item in reviews: product_id = product_map.get(item['PK']) user_pk = item.get('user_pk', '') user_id = user_map.get(user_pk) if product_id and user_id: cursor.execute( """INSERT INTO reviews (product_id, user_id, rating, title, body) VALUES (%s, %s, %s, %s, %s)""", ( product_id, user_id, item.get('rating', 3), item.get('title', ''), item.get('text', ''), ) ) conn.commit() print(f" Imported {len(reviews)} reviews")
def import_separate_tables(conn, exports: Dict[str, str]): """Import from per-table DynamoDB exports (non-single-table design).""" cursor = conn.cursor()
for table_name, json_path in exports.items(): with open(json_path, 'r') as f: items = json.load(f)
print(f" Importing {table_name}: {len(items)} items")
for item in items: columns = list(item.keys()) values = list(item.values()) placeholders = ', '.join(['%s'] * len(columns)) col_names = ', '.join(columns)
# Serialize dicts/lists as JSONB serialized_values = [] for v in values: if isinstance(v, (dict, list)): serialized_values.append(json.dumps(v)) else: serialized_values.append(v)
cursor.execute( f"INSERT INTO {table_name} ({col_names}) VALUES ({placeholders})", serialized_values )
conn.commit() print(f" Done ({len(items)} rows)")
if __name__ == "__main__": conn = connect() import_single_table_design(conn, 'dynamodb_export.json') conn.close() print("\nMigration complete.")Step 4: Update Application Code
Replace AWS SDK DynamoDB calls with standard SQL queries. See the Query Translation Examples section below for detailed mappings.
Step 5: Validate
-- Verify row countsSELECT 'users' AS entity, COUNT(*) AS count FROM usersUNION ALL SELECT 'orders', COUNT(*) FROM ordersUNION ALL SELECT 'products', COUNT(*) FROM productsUNION ALL SELECT 'reviews', COUNT(*) FROM reviews;
-- Verify referential integrity (DynamoDB had none)SELECT o.id AS orphaned_orderFROM orders oLEFT JOIN users u ON o.user_id = u.idWHERE u.id IS NULL;
-- Test a query that was impossible in DynamoDBSELECT u.name, COUNT(o.id) AS order_count, SUM(o.total) AS lifetime_value, AVG(o.total) AS avg_order, MAX(o.created_at) AS last_orderFROM users uJOIN orders o ON u.id = o.user_idWHERE o.status IN ('shipped', 'delivered')GROUP BY u.nameHAVING SUM(o.total) > 500ORDER BY lifetime_value DESCLIMIT 20;Data Type Mapping
| DynamoDB Type | Type Code | HeliosDB-Lite Type | Notes |
|---|---|---|---|
| String | S | TEXT | Direct mapping |
| Number | N | INT4, INT8, NUMERIC, or FLOAT8 | Choose based on data range and precision |
| Binary | B | BYTEA | Base64-decoded |
| Boolean | BOOL | BOOLEAN | Direct mapping |
| Null | NULL | SQL NULL | Direct mapping |
| List | L | JSONB array | '[1, "two", true]'::jsonb |
| Map | M | JSONB | '{"key": "value"}'::jsonb |
| String Set | SS | TEXT[] | ARRAY['a', 'b', 'c'] |
| Number Set | NS | INT4[] or FLOAT8[] | ARRAY[1, 2, 3] |
| Binary Set | BS | BYTEA[] | Array of binary values |
Partition Key / Sort Key Mapping
| DynamoDB Key Structure | HeliosDB-Lite Equivalent |
|---|---|
| Partition key only (hash) | PRIMARY KEY (id) |
| Partition key + sort key | PRIMARY KEY (partition_col, sort_col) |
| GSI (partition key only) | CREATE INDEX idx_name ON table(column) |
| GSI (partition + sort) | CREATE INDEX idx_name ON table(col1, col2) |
| LSI (same partition, different sort) | CREATE INDEX idx_name ON table(partition_col, alt_sort_col) |
Query Translation Examples
GetItem —> SELECT WHERE
# DynamoDBresponse = table.get_item(Key={'user_id': '123'})item = response.get('Item')
# HeliosDB-Litecursor.execute("SELECT * FROM users WHERE id = %s", (123,))row = cursor.fetchone()PutItem —> INSERT / UPSERT
# DynamoDBtable.put_item(Item={ 'user_id': '123', 'name': 'Alice', 'email': 'alice@example.com', 'status': 'active'})
# HeliosDB-Lite (INSERT)cursor.execute( "INSERT INTO users (id, name, email, status) VALUES (%s, %s, %s, %s)", (123, 'Alice', 'alice@example.com', 'active'))
# HeliosDB-Lite (UPSERT -- equivalent to PutItem which overwrites)cursor.execute(""" INSERT INTO users (id, name, email, status) VALUES (%s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email, status = EXCLUDED.status""", (123, 'Alice', 'alice@example.com', 'active'))Query (partition key + sort key) —> SELECT WHERE
# DynamoDB: Get all orders for a user, sorted by dateresponse = table.query( KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#'), ScanIndexForward=False # Descending)
# HeliosDB-Lite: Simple JOIN (no single-table design needed)cursor.execute(""" SELECT o.id, o.total, o.status, o.created_at FROM orders o WHERE o.user_id = %s ORDER BY o.created_at DESC""", (123,))Query with FilterExpression —> SELECT WHERE
# DynamoDB: Filter is applied AFTER reading from disk (inefficient)response = table.query( KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#'), FilterExpression=Attr('status').eq('shipped') & Attr('total').gte(50))
# HeliosDB-Lite: WHERE filters at query planning time (efficient)cursor.execute(""" SELECT * FROM orders WHERE user_id = %s AND status = 'shipped' AND total >= 50 ORDER BY created_at DESC""", (123,))Scan —> SELECT (full table scan)
# DynamoDB: Expensive, paginated, 1MB limit per callitems = []scan_kwargs = {'FilterExpression': Attr('status').eq('active')}while True: response = table.scan(**scan_kwargs) items.extend(response['Items']) if 'LastEvaluatedKey' not in response: break scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
# HeliosDB-Lite: Single query, no pagination neededcursor.execute("SELECT * FROM users WHERE status = 'active'")items = cursor.fetchall()UpdateItem with UpdateExpression —> UPDATE SET
# DynamoDBtable.update_item( Key={'user_id': '123'}, UpdateExpression='SET #n = :name, login_count = login_count + :inc, updated_at = :now', ExpressionAttributeNames={'#n': 'name'}, ExpressionAttributeValues={ ':name': 'Alice Smith', ':inc': 1, ':now': '2026-01-15T10:00:00Z' })
# HeliosDB-Lite (dramatically simpler)cursor.execute(""" UPDATE users SET name = %s, login_count = login_count + 1, updated_at = NOW() WHERE id = %s""", ('Alice Smith', 123))DeleteItem with ConditionExpression —> DELETE WHERE
# DynamoDB (conditional delete)table.delete_item( Key={'user_id': '123'}, ConditionExpression=Attr('status').eq('inactive'))
# HeliosDB-Litecursor.execute( "DELETE FROM users WHERE id = %s AND status = 'inactive'", (123,))# Check cursor.rowcount to see if a row was actually deletedBatchGetItem —> SELECT WHERE IN
# DynamoDB: BatchGetItem (max 100 items, 16MB limit)response = dynamodb.batch_get_item( RequestItems={ 'Users': { 'Keys': [{'user_id': {'S': '1'}}, {'user_id': {'S': '2'}}, {'user_id': {'S': '3'}}] } })
# HeliosDB-Lite: Single query, no limitscursor.execute("SELECT * FROM users WHERE id = ANY(%s)", ([1, 2, 3],))TransactWriteItems —> SQL Transaction
# DynamoDB (25-item limit, 4MB limit, complex API)dynamodb.transact_write_items( TransactItems=[ {'Put': {'TableName': 'Orders', 'Item': {'order_id': {'S': 'ORD-001'}, 'total': {'N': '99.99'}}}}, {'Update': {'TableName': 'Users', 'Key': {'user_id': {'S': '123'}}, 'UpdateExpression': 'SET order_count = order_count + :inc', 'ExpressionAttributeValues': {':inc': {'N': '1'}}}}, {'Update': {'TableName': 'Inventory', 'Key': {'product_id': {'S': '456'}}, 'UpdateExpression': 'SET stock = stock - :qty', 'ExpressionAttributeValues': {':qty': {'N': '2'}}, 'ConditionExpression': 'stock >= :qty'}} ])
# HeliosDB-Lite (standard SQL, no limits, much simpler)cursor.execute("BEGIN")cursor.execute( "INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id", (123, 99.99))order_id = cursor.fetchone()[0]cursor.execute( "UPDATE users SET order_count = order_count + 1 WHERE id = %s", (123,))cursor.execute( "UPDATE inventory SET stock = stock - 2 WHERE product_id = %s AND stock >= 2", (456,))if cursor.rowcount == 0: conn.rollback() raise Exception("Insufficient inventory")conn.commit()Common Patterns
Pattern 1: Python Application Migration
"""DynamoDB to HeliosDB-Lite migration -- Python application layer.Replace boto3 DynamoDB calls with psycopg2 SQL queries."""
import psycopg2import psycopg2.extrasimport json
class ProductCatalog: """ Replaces a DynamoDB-backed product catalog.
DynamoDB limitations this migration eliminates: - No JOINs (required denormalized data) - No aggregations (required scan + application code) - No full-text search (required OpenSearch) - No vector search (required separate service) - 400KB item size limit - 1MB query result limit """
def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"): self.conn = psycopg2.connect(dsn)
def get_product(self, product_id: int) -> dict: """Replaces: table.get_item(Key={'product_id': product_id})""" cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,)) return cursor.fetchone()
def search_products(self, query: str, category: str = None, limit: int = 20) -> list: """ Replaces: OpenSearch cluster ($500+/month) that was required because DynamoDB cannot do full-text search. """ cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) sql = """ SELECT id, name, description, price, category, ts_rank(to_tsvector('english', name || ' ' || COALESCE(description, '')), to_tsquery('english', %s)) AS relevance FROM products WHERE to_tsvector('english', name || ' ' || COALESCE(description, '')) @@ to_tsquery('english', %s) """ params = [query, query]
if category: sql += " AND category = %s" params.append(category)
sql += " ORDER BY relevance DESC LIMIT %s" params.append(limit)
cursor.execute(sql, params) return cursor.fetchall()
def find_similar(self, product_id: int, limit: int = 5) -> list: """ Replaces: Separate vector database service that was required because DynamoDB has no vector search capability. """ cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute(""" SELECT p2.id, p2.name, p2.price, p2.category, p2.embedding <-> p1.embedding AS distance FROM products p1, products p2 WHERE p1.id = %s AND p2.id != %s ORDER BY p2.embedding <-> p1.embedding LIMIT %s """, (product_id, product_id, limit)) return cursor.fetchall()
def get_category_stats(self) -> list: """ Replaces: DynamoDB scan + application-layer aggregation. In DynamoDB, this required scanning the entire table and computing aggregations in Lambda/application code. """ cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute(""" SELECT category, COUNT(*) AS product_count, AVG(price) AS avg_price, MIN(price) AS min_price, MAX(price) AS max_price, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_price FROM products GROUP BY category ORDER BY product_count DESC """) return cursor.fetchall()
def get_top_rated_by_category(self, min_reviews: int = 5) -> list: """ Replaces: DynamoDB $lookup-equivalent (impossible without single-table design tricks, even then very limited). """ cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute(""" WITH product_ratings AS ( SELECT p.id, p.name, p.category, p.price, AVG(r.rating) AS avg_rating, COUNT(r.id) AS review_count, RANK() OVER (PARTITION BY p.category ORDER BY AVG(r.rating) DESC) AS category_rank FROM products p JOIN reviews r ON p.id = r.product_id GROUP BY p.id, p.name, p.category, p.price HAVING COUNT(r.id) >= %s ) SELECT * FROM product_ratings WHERE category_rank <= 3 ORDER BY category, category_rank """, (min_reviews,)) return cursor.fetchall()Pattern 2: TypeScript Application Migration
import { Pool, QueryResult } from 'pg';
const pool = new Pool({ host: '127.0.0.1', port: 5432, database: 'myapp', user: 'app',});
// Replaces: DynamoDB GetItemasync function getUser(userId: number) { const result = await pool.query('SELECT * FROM users WHERE id = $1', [userId]); return result.rows[0] || null;}
// Replaces: DynamoDB PutItemasync function createUser(name: string, email: string, profile: object = {}) { const result = await pool.query( `INSERT INTO users (name, email, profile) VALUES ($1, $2, $3::jsonb) RETURNING id`, [name, email, JSON.stringify(profile)] ); return result.rows[0].id;}
// Replaces: DynamoDB Query (pk + sk condition) + separate $lookup-equivalentasync function getUserOrderHistory(userId: number) { const result = await pool.query(` SELECT o.id AS order_id, o.total, o.status, o.created_at, jsonb_agg( jsonb_build_object( 'product_id', item->>'product_id', 'name', item->>'name', 'quantity', item->>'quantity', 'price', item->>'price' ) ) AS items FROM orders o, jsonb_array_elements(o.items) AS item WHERE o.user_id = $1 GROUP BY o.id ORDER BY o.created_at DESC `, [userId]); return result.rows;}
// Replaces: DynamoDB Scan + application-layer aggregation + separate analytics pipelineasync function getDashboardMetrics(days: number = 30) { const result = await pool.query(` SELECT COUNT(DISTINCT o.user_id) AS active_customers, COUNT(o.id) AS total_orders, SUM(o.total) AS total_revenue, AVG(o.total) AS avg_order_value, COUNT(CASE WHEN o.status = 'cancelled' THEN 1 END)::FLOAT / NULLIF(COUNT(o.id), 0) AS cancellation_rate FROM orders o WHERE o.created_at >= NOW() - ($1 || ' days')::INTERVAL `, [days]); return result.rows[0];}
// Replaces: DynamoDB TransactWriteItems (25-item limit, 4MB limit)async function placeOrder(userId: number, items: Array<{productId: number, quantity: number, price: number}>) { const client = await pool.connect(); try { await client.query('BEGIN');
// Insert order const total = items.reduce((sum, i) => sum + i.quantity * i.price, 0); const orderResult = await client.query( `INSERT INTO orders (user_id, total, status, items) VALUES ($1, $2, 'pending', $3::jsonb) RETURNING id`, [userId, total, JSON.stringify(items)] ); const orderId = orderResult.rows[0].id;
// Update inventory for each item (DynamoDB required individual ConditionExpressions) for (const item of items) { const invResult = await client.query( `UPDATE inventory SET stock = stock - $1 WHERE product_id = $2 AND stock >= $1 RETURNING stock`, [item.quantity, item.productId] ); if (invResult.rowCount === 0) { throw new Error(`Insufficient stock for product ${item.productId}`); } }
// Update user order count await client.query( 'UPDATE users SET order_count = order_count + 1 WHERE id = $1', [userId] );
await client.query('COMMIT'); return orderId; } catch (err) { await client.query('ROLLBACK'); throw err; } finally { client.release(); }}
// HeliosDB-Lite exclusive: time-travel for audit/complianceasync function getOrderStateAtTime(orderId: number, timestamp: string) { const result = await pool.query( `SELECT * FROM orders AS OF $1 WHERE id = $2`, [timestamp, orderId] ); return result.rows[0] || null;}Pattern 3: Go Application Migration
package main
import ( "context" "encoding/json" "fmt" "log" "time"
"github.com/jackc/pgx/v5/pgxpool")
type OrderService struct { pool *pgxpool.Pool}
func NewOrderService(ctx context.Context) (*OrderService, error) { pool, err := pgxpool.New(ctx, "postgresql://app@127.0.0.1:5432/myapp") if err != nil { return nil, err } return &OrderService{pool: pool}, nil}
// Replaces: DynamoDB GetItemfunc (s *OrderService) GetOrder(ctx context.Context, orderID int) (map[string]any, error) { var id int var userID int var total float64 var status string var itemsJSON []byte var createdAt time.Time
err := s.pool.QueryRow(ctx, "SELECT id, user_id, total, status, items, created_at FROM orders WHERE id = $1", orderID, ).Scan(&id, &userID, &total, &status, &itemsJSON, &createdAt) if err != nil { return nil, err }
var items []any json.Unmarshal(itemsJSON, &items)
return map[string]any{ "id": id, "user_id": userID, "total": total, "status": status, "items": items, "created_at": createdAt, }, nil}
// Replaces: DynamoDB Query (impossible to do JOINs)func (s *OrderService) GetOrdersWithUserInfo(ctx context.Context, status string, limit int) ([]map[string]any, error) { rows, err := s.pool.Query(ctx, ` SELECT o.id AS order_id, o.total, o.status, o.created_at, u.name AS customer_name, u.email AS customer_email FROM orders o JOIN users u ON o.user_id = u.id WHERE o.status = $1 ORDER BY o.created_at DESC LIMIT $2 `, status, limit) if err != nil { return nil, err } defer rows.Close()
var results []map[string]any for rows.Next() { var orderID int var total float64 var orderStatus, customerName, customerEmail string var createdAt time.Time rows.Scan(&orderID, &total, &orderStatus, &createdAt, &customerName, &customerEmail) results = append(results, map[string]any{ "order_id": orderID, "total": total, "status": orderStatus, "created_at": createdAt, "customer_name": customerName, "customer_email": customerEmail, }) } return results, nil}
// Replaces: DynamoDB Scan + Lambda aggregation (was a separate analytics pipeline)func (s *OrderService) GetRevenueByDay(ctx context.Context, days int) ([]map[string]any, error) { rows, err := s.pool.Query(ctx, ` SELECT DATE(created_at) AS order_date, COUNT(*) AS order_count, SUM(total) AS revenue, AVG(total) AS avg_order_value, COUNT(DISTINCT user_id) AS unique_customers FROM orders WHERE created_at >= NOW() - ($1 || ' days')::INTERVAL AND status NOT IN ('cancelled') GROUP BY DATE(created_at) ORDER BY order_date DESC `, days) if err != nil { return nil, err } defer rows.Close()
var results []map[string]any for rows.Next() { var orderDate time.Time var orderCount, uniqueCustomers int var revenue, avgValue float64 rows.Scan(&orderDate, &orderCount, &revenue, &avgValue, &uniqueCustomers) results = append(results, map[string]any{ "date": orderDate.Format("2006-01-02"), "order_count": orderCount, "revenue": revenue, "avg_order_value": avgValue, "unique_customers": uniqueCustomers, }) } return results, nil}
func main() { ctx := context.Background() svc, err := NewOrderService(ctx) if err != nil { log.Fatal(err) }
// Query that was impossible in DynamoDB: JOIN + aggregation + window function rows, err := svc.pool.Query(ctx, ` WITH monthly_revenue AS ( SELECT DATE_TRUNC('month', o.created_at) AS month, SUM(o.total) AS revenue, COUNT(DISTINCT o.user_id) AS customers FROM orders o WHERE o.status = 'delivered' GROUP BY DATE_TRUNC('month', o.created_at) ) SELECT month, revenue, customers, LAG(revenue) OVER (ORDER BY month) AS prev_month_revenue, ROUND((revenue - LAG(revenue) OVER (ORDER BY month)) / NULLIF(LAG(revenue) OVER (ORDER BY month), 0) * 100, 1) AS growth_pct FROM monthly_revenue ORDER BY month DESC LIMIT 12 `) if err != nil { log.Fatal(err) } defer rows.Close()
fmt.Println("Monthly Revenue Report (impossible in DynamoDB):") fmt.Printf("%-12s %12s %10s %12s %8s\n", "Month", "Revenue", "Customers", "Prev Month", "Growth") for rows.Next() { var month time.Time var revenue, prevRevenue *float64 var customers int var growthPct *float64 rows.Scan(&month, &revenue, &customers, &prevRevenue, &growthPct) fmt.Printf("%-12s $%11.2f %10d", month.Format("2006-01"), *revenue, customers) if prevRevenue != nil { fmt.Printf(" $%11.2f", *prevRevenue) } else { fmt.Printf(" %12s", "N/A") } if growthPct != nil { fmt.Printf(" %7.1f%%", *growthPct) } else { fmt.Printf(" %8s", "N/A") } fmt.Println() }}Pattern 4: Replacing DynamoDB Streams with Triggers
-- DynamoDB required: DynamoDB Streams + Lambda function (eventually consistent)-- HeliosDB-Lite: Synchronous, transactional trigger
-- Audit log trigger (replaces Streams + Lambda audit function)CREATE TABLE audit_log ( id SERIAL PRIMARY KEY, table_name TEXT NOT NULL, operation TEXT NOT NULL, -- INSERT, UPDATE, DELETE row_id INT4, old_data JSONB, new_data JSONB, changed_at TIMESTAMPTZ DEFAULT NOW());
CREATE OR REPLACE FUNCTION audit_trigger_func()RETURNS TRIGGERLANGUAGE plpgsqlAS $$BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO audit_log (table_name, operation, row_id, new_data) VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, row_to_json(NEW)::jsonb); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN INSERT INTO audit_log (table_name, operation, row_id, old_data, new_data) VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, row_to_json(OLD)::jsonb, row_to_json(NEW)::jsonb); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN INSERT INTO audit_log (table_name, operation, row_id, old_data) VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, row_to_json(OLD)::jsonb); RETURN OLD; END IF;END;$$;
-- Apply to orders tableCREATE TRIGGER orders_auditAFTER INSERT OR UPDATE OR DELETE ON ordersFOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
-- Apply to users tableCREATE TRIGGER users_auditAFTER INSERT OR UPDATE OR DELETE ON usersFOR EACH ROW EXECUTE FUNCTION audit_trigger_func();Pattern 5: Replacing DynamoDB TTL
-- DynamoDB TTL: Set a numeric attribute, DynamoDB deletes expired items-- (within 48 hours, not precise)
-- HeliosDB-Lite Option A: Scheduled cleanup with precise timingDELETE FROM sessions WHERE expires_at < NOW();
-- HeliosDB-Lite Option B: Use partitioning for efficient bulk deletionCREATE TABLE sessions ( id SERIAL, user_id INT4 NOT NULL, token TEXT NOT NULL, data JSONB, expires_at TIMESTAMPTZ NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW()) PARTITION BY RANGE (expires_at);
-- Create monthly partitionsCREATE TABLE sessions_2026_01 PARTITION OF sessions FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');CREATE TABLE sessions_2026_02 PARTITION OF sessions FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- Drop expired partitions (instant, no per-row delete)DROP TABLE sessions_2025_12;Limitations & Workarounds
DynamoDB Features Not Directly Supported
| DynamoDB Feature | Status | Workaround |
|---|---|---|
| Auto-scaling (read/write capacity) | Not applicable | Embedded database uses application server resources; scale the server |
| Global Tables (multi-region) | Different architecture | Use HeliosDB-Lite replication (WAL streaming) or deploy per-region |
| DynamoDB Accelerator (DAX) | Not applicable | HeliosDB-Lite is already in-process (0ms latency); use built-in query cache |
| PartiQL (limited SQL) | Full SQL | HeliosDB-Lite supports complete SQL (PartiQL queries migrate trivially) |
| On-demand capacity | Not applicable | No capacity planning needed; embedded database has no artificial throughput limits |
| Point-in-time recovery (PITR) | Built-in (better) | HeliosDB-Lite time-travel queries (AS OF) provide row-level point-in-time access |
| AWS IAM authentication | Different auth model | HeliosDB-Lite uses SCRAM-SHA-256 or trust authentication |
| VPC endpoints | Not applicable | Embedded database has no network exposure |
| DynamoDB Streams ordering | Different guarantee | Triggers execute synchronously within the transaction (stronger guarantee) |
| Item-level encryption | Different approach | HeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM) |
| 400KB item limit | No limit | No per-row size restriction |
| Eventual consistency reads | Not applicable | All reads are strongly consistent |
When DynamoDB Might Still Be Better
- Massive write throughput (100K+ writes/second): DynamoDB’s distributed architecture scales horizontally. For extreme write volumes, consider HeliosDB-Lite’s sharding or multi-primary replication.
- Multi-region active-active: DynamoDB Global Tables provide turn-key multi-region replication. HeliosDB-Lite supports replication but requires more configuration.
- Serverless Lambda integration: If your entire stack is AWS Lambda + DynamoDB and you have no desire to manage infrastructure, the serverless model may still make sense (at higher cost).
Rollback Strategy
Pre-Migration
- Keep DynamoDB tables active for at least 2-4 weeks after cutover
- Enable DynamoDB PITR on all tables as a safety net
- Tag DynamoDB resources as “migration-pending” (do not delete)
Dual-Write Approach
During the transition, write to both databases:
import boto3import psycopg2import json
class DualWriteService: def __init__(self): self.dynamodb = boto3.resource('dynamodb') self.ddb_table = self.dynamodb.Table('Orders') self.helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
def create_order(self, user_id: int, total: float, items: list) -> int: # Primary: HeliosDB-Lite cursor = self.helios.cursor() cursor.execute( """INSERT INTO orders (user_id, total, status, items) VALUES (%s, %s, 'pending', %s::jsonb) RETURNING id""", (user_id, total, json.dumps(items)) ) self.helios.commit() order_id = cursor.fetchone()[0]
# Secondary: DynamoDB (for rollback safety) self.ddb_table.put_item(Item={ 'PK': f'USER#{user_id}', 'SK': f'ORDER#{order_id}', 'total': str(total), 'status': 'pending', 'items': items, 'helios_id': order_id, })
return order_idRollback Procedure
- Revert application code to use AWS SDK DynamoDB calls
- Verify DynamoDB data is current (if dual-write was active)
- If dual-write was not active, export from HeliosDB-Lite and reimport:
import psycopg2import psycopg2.extrasimport boto3import jsonfrom decimal import Decimal
def rollback_to_dynamodb(): helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp") dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('Orders')
cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cursor.execute("SELECT * FROM orders")
with table.batch_writer() as batch: for row in cursor: batch.put_item(Item={ 'PK': f"USER#{row['user_id']}", 'SK': f"ORDER#{row['id']}", 'total': Decimal(str(row['total'])), 'status': row['status'], 'items': json.loads(json.dumps(row['items'])), 'created_at': row['created_at'].isoformat(), })
print("Rollback to DynamoDB complete")
rollback_to_dynamodb()- Re-enable DynamoDB auto-scaling and verify capacity is sufficient
- Post-mortem — document issues and plan remediation
References
- AWS DynamoDB Developer Guide: Core Components and API Reference (2025)
- AWS DynamoDB Pricing: On-Demand and Provisioned Capacity (2025)
- HeliosDB-Lite Documentation: SQL Engine and JSONB Reference (2026)
- AWS Data Pipeline: DynamoDB Export to S3 (2025)
- boto3 Documentation: DynamoDB Resource and Client (2025)
- psycopg2 Documentation: Python PostgreSQL Adapter (2025)
- node-postgres (pg): TypeScript/JavaScript PostgreSQL Client (2025)
- pgx: Go PostgreSQL Driver and Toolkit (2025)
- The DynamoDB Book by Alex DeBrie: Single-Table Design Patterns (2024)
- AWS Well-Architected Framework: Cost Optimization Pillar (2025)