Replace DynamoDB single-table designs with normalized SQL schemas. Eliminate RCU/WCU capacity planning, throttling, and auxiliary services like OpenSearch and Redshift.
Amazon DynamoDB is a fully managed NoSQL key-value and document database that delivers single-digit millisecond performance at any scale. While DynamoDB excels at high-throughput, low-latency access patterns with predictable key-based lookups, its rigid query model, opaque pricing, and severe analytical limitations push teams toward costly workarounds: maintaining separate Elasticsearch clusters for search, building ETL pipelines to Redshift for reporting, and implementing complex single-table designs to simulate relational queries.
HeliosDB-Lite offers a fundamentally different approach: a full SQL engine with ACID transactions, JOINs, aggregations, window functions, vector search, full-text search, and time-travel -- all in an embedded, zero-config deployment. By migrating from DynamoDB, organizations eliminate capacity planning (RCU/WCU), throttling anxiety, and the constellation of auxiliary services required to fill DynamoDB's feature gaps.
The migration path is: export DynamoDB tables (via Data Pipeline, S3 export, or scan), transform the items into SQL INSERT statements, and import through the PostgreSQL wire protocol. Applications replace AWS SDK calls with standard SQL queries via any PostgreSQL client library.
DynamoDB is a key-value store with document capabilities. HeliosDB-Lite is a full relational database engine. The feature gap is significant:
| Capability | DynamoDB | HeliosDB-Lite | Impact |
|---|---|---|---|
| JOINs | Not supported | Full SQL JOINs (INNER, LEFT, RIGHT, FULL, CROSS, LATERAL) | Eliminate denormalization and data duplication |
| Aggregations (SUM, AVG, COUNT, MIN, MAX) | Not supported (scan + application code) | Native SQL aggregations | Remove application-layer computation |
| GROUP BY | Not supported | Full GROUP BY with HAVING | Inline analytics without ETL |
| Window functions | Not supported | ROW_NUMBER, RANK, LAG, LEAD, NTILE, etc. | Running totals, rankings, time-series analysis |
| CTEs | Not supported | WITH / WITH RECURSIVE | Simplify complex queries, traverse hierarchies |
| Subqueries | Not supported | Scalar, correlated, EXISTS, IN, ANY/ALL | Flexible filtering and computation |
| Full-text search | Not supported (requires OpenSearch) | Native @@ operator with ranking | Eliminate OpenSearch cluster ($500+/month) |
| Vector search | Not supported (requires separate service) | Native HNSW indexes with SIMD | Eliminate vector DB ($300+/month) |
| Complex WHERE clauses | Limited (partition key required, filter expressions) | Arbitrary SQL WHERE with AND/OR/NOT, BETWEEN, LIKE, IN | Query any column combination without index constraints |
| ACID transactions | Limited (25-item, 4MB, single-region) | Full multi-row, multi-table, no size limit | True transactional integrity |
| Time-travel | Not supported (requires DynamoDB Streams + Lambda) | Native AS OF queries | Point-in-time data access, built-in |
| Branching | Not supported | Database-level branching | Isolated dev/test environments |
| Secondary indexes | GSI (20 max), LSI (5 max, must be created at table creation) | Unlimited indexes on any column or expression | No index limits or creation-time constraints |
| Ad-hoc queries | Extremely limited (must use partition key) | Any SQL query on any column | True exploratory data access |
| Stored procedures | Not supported | PL/pgSQL with variables, loops, cursors, exceptions | Server-side business logic |
| Triggers | DynamoDB Streams + Lambda (eventually consistent) | BEFORE/AFTER triggers (synchronous, transactional) | Guaranteed consistency |
DynamoDB pricing is notoriously difficult to predict. Costs scale with read/write capacity units (RCU/WCU) and can spike dramatically with unexpected traffic patterns.
| Cost Component | On-Demand Mode | Provisioned Mode | Notes |
|---|---|---|---|
| Write requests (WCU) | $1.25 per million | $0.00065 per WCU/hour | 1 WCU = 1KB write/second |
| Read requests (RCU) | $0.25 per million | $0.00013 per RCU/hour | 1 RCU = 4KB strongly consistent read/second |
| Storage | $0.25/GB/month | $0.25/GB/month | First 25GB free |
| GSI writes | Same as base table | Same as base table | Each GSI doubles write costs |
| GSI storage | $0.25/GB/month | $0.25/GB/month | Each GSI adds storage costs |
| DynamoDB Streams | $0.02 per 100K reads | $0.02 per 100K reads | Required for triggers/CDC |
| Data export to S3 | $0.10/GB | $0.10/GB | For analytics pipeline |
| Backup (on-demand) | $0.10/GB/month | $0.10/GB/month | PITR: $0.20/GB/month |
| Component | DynamoDB (Provisioned + Auto-Scaling) | HeliosDB-Lite (Embedded) |
|---|---|---|
| Compute (WCU/RCU) | $2,850/month | $0 |
| Storage (50GB) | $12.50/month | $0 (disk on app server) |
| GSI overhead (3 GSIs) | $1,200/month | $0 |
| DynamoDB Streams | $150/month | $0 (triggers built-in) |
| OpenSearch (for search) | $650/month | $0 (FTS built-in) |
| Backup (PITR) | $10/month | $0 (file-level backup) |
| Data transfer | $200/month | $0 (in-process) |
| Application server | $300/month | $300/month |
| Monthly total | $5,372 | $300 |
| Annual total | $64,470 | $3,600 |
| Savings | -- | $60,870/year (94%) |
For on-demand pricing with variable traffic, DynamoDB costs can be 2-5x higher during peak periods. HeliosDB-Lite costs are fixed regardless of query volume.
| Operational Concern | DynamoDB | HeliosDB-Lite |
|---|---|---|
| Capacity planning | Required (RCU/WCU provisioning or on-demand cost monitoring) | Not needed (embedded, uses app server resources) |
| Throttling | Yes (ProvisionedThroughputExceededException) | No (no artificial limits) |
| Hot partition management | Manual (choose good partition keys, add GSIs) | Not applicable (B-tree indexes, no partitioning constraints) |
| GSI backfill | Slow (can take hours for large tables) | CREATE INDEX completes in seconds/minutes |
| Item size limit | 400KB per item | No per-row limit |
| Transaction limit | 25 items, 4MB total | No limit |
| Query result limit | 1MB per query (requires pagination) | No artificial limit |
| Region availability | Must configure multi-region explicitly (Global Tables) | Embedded (runs wherever your app runs) |
| Vendor lock-in | Complete (AWS proprietary API) | None (standard SQL, PostgreSQL wire protocol) |
| DynamoDB Concept | HeliosDB-Lite Equivalent | Notes |
|---|---|---|
| Table | CREATE TABLE | Standard SQL DDL |
| Partition key | PRIMARY KEY (single column) | Any data type |
| Partition key + sort key | Composite PRIMARY KEY (pk, sk) | Ordered by sort key |
| Global Secondary Index (GSI) | CREATE INDEX on any column(s) | No limit on number of indexes |
| Local Secondary Index (LSI) | CREATE INDEX including primary key columns | Can be added any time |
| Item (document) | Row (typed columns and/or JSONB) | Schema-enforced |
| Attribute | Column | Typed (INT4, TEXT, JSONB, etc.) |
| String (S) | TEXT | Direct mapping |
| Number (N) | INT4, INT8, NUMERIC, or FLOAT8 | Choose appropriate type |
| Binary (B) | BYTEA | Binary data |
| Boolean (BOOL) | BOOLEAN | Direct mapping |
| Null (NULL) | SQL NULL | Direct mapping |
| List (L) | JSONB array or ARRAY type | Design choice |
| Map (M) | JSONB | Binary-indexed JSON |
| String Set (SS) | TEXT[] | PostgreSQL array |
| Number Set (NS) | INT4[] or FLOAT8[] | PostgreSQL array |
| Binary Set (BS) | BYTEA[] | PostgreSQL array |
| TTL attribute | Scheduled DELETE or partition-based pruning | Application logic or cron |
| DynamoDB Streams | Triggers | Synchronous, transactional |
| Conditional expressions | WHERE clause / CHECK constraints | Standard SQL |
| UpdateExpression | UPDATE ... SET | Standard SQL |
| ProjectionExpression | SELECT col1, col2 | Standard SQL |
| FilterExpression | WHERE clause | Applied at query time, not post-scan |
| PartiQL | SQL (native) | Full SQL support |
# Export via AWS CLI (outputs JSON to S3)
aws dynamodb export-table-to-point-in-time \
--table-arn arn:aws:dynamodb:us-east-1:123456789:table/Users \
--s3-bucket my-migration-bucket \
--s3-prefix dynamodb-export/ \
--export-format DYNAMODB_JSON
# Download the export
aws s3 sync s3://my-migration-bucket/dynamodb-export/ ./dynamodb-export/
#!/usr/bin/env python3
"""Export DynamoDB table to JSON file."""
import boto3
import json
from decimal import Decimal
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
def export_table(table_name: str, output_path: str):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
items = []
scan_kwargs = {}
total = 0
print(f"Exporting {table_name}...")
while True:
response = table.scan(**scan_kwargs)
items.extend(response['Items'])
total += response['Count']
print(f" Scanned {total} items...")
if 'LastEvaluatedKey' not in response:
break
scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
with open(output_path, 'w') as f:
json.dump(items, f, cls=DecimalEncoder, indent=2)
print(f"Exported {total} items to {output_path}")
if __name__ == "__main__":
export_table('Users', 'users.json')
export_table('Orders', 'orders.json')
export_table('Products', 'products.json')
DynamoDB's single-table design pattern (where all entity types share one table with overloaded keys) should typically be normalized into separate tables during migration. This is one of the primary advantages of migrating to a relational database.
PK SK Attributes
USER#123 PROFILE { name: "Alice", email: "alice@..." }
USER#123 ORDER#001 { total: 99.99, status: "shipped" }
USER#123 ORDER#002 { total: 149.50, status: "pending" }
PRODUCT#456 METADATA { name: "Widget", price: 29.99 }
PRODUCT#456 REVIEW#789 { rating: 5, text: "Great product" }
-- Users table
CREATE TABLE users (
id SERIAL PRIMARY KEY,
dynamo_pk TEXT UNIQUE, -- Preserve original key for reference
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
profile JSONB DEFAULT '{}', -- Flexible attributes as JSONB
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Orders table (separate from users, with foreign key)
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
dynamo_sk TEXT, -- Preserve original sort key
user_id INT4 NOT NULL REFERENCES users(id) ON DELETE CASCADE,
total NUMERIC(12,2) NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')),
items JSONB NOT NULL DEFAULT '[]',
shipping_address JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_orders_user_id ON orders(user_id);
CREATE INDEX idx_orders_status ON orders(status);
CREATE INDEX idx_orders_created ON orders(created_at);
-- Products table
CREATE TABLE products (
id SERIAL PRIMARY KEY,
dynamo_pk TEXT UNIQUE,
name TEXT NOT NULL,
description TEXT,
price NUMERIC(10,2) NOT NULL,
category TEXT,
attributes JSONB DEFAULT '{}',
embedding VECTOR(1536), -- For vector similarity search
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_products_category ON products(category);
CREATE INDEX idx_products_price ON products(price);
CREATE INDEX idx_products_attrs ON products USING gin(attributes);
-- Reviews table (was embedded in DynamoDB single-table)
CREATE TABLE reviews (
id SERIAL PRIMARY KEY,
product_id INT4 NOT NULL REFERENCES products(id) ON DELETE CASCADE,
user_id INT4 NOT NULL REFERENCES users(id),
rating INT2 NOT NULL CHECK (rating BETWEEN 1 AND 5),
title TEXT,
body TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_reviews_product ON reviews(product_id);
CREATE INDEX idx_reviews_user ON reviews(user_id);
-- Full-text search index on reviews
CREATE INDEX idx_reviews_fts ON reviews
USING gin(to_tsvector('english', COALESCE(title, '') || ' ' || COALESCE(body, '')));
#!/usr/bin/env python3
"""Import DynamoDB JSON exports into HeliosDB-Lite normalized schema."""
import json
import psycopg2
import psycopg2.extras
from typing import Dict, List
def connect():
return psycopg2.connect(
host="127.0.0.1",
port=5432,
user="app",
dbname="myapp"
)
def import_single_table_design(conn, json_path: str):
"""
Parse a DynamoDB single-table export and route items
to the correct normalized tables based on PK/SK patterns.
"""
with open(json_path, 'r') as f:
items = json.load(f)
cursor = conn.cursor()
user_map = {} # dynamo_pk -> heliosdb id
# First pass: import users (PK=USER#*, SK=PROFILE)
users = [item for item in items if item.get('SK', '').startswith('PROFILE')]
for item in users:
pk = item['PK']
cursor.execute(
"""INSERT INTO users (dynamo_pk, name, email, status, profile)
VALUES (%s, %s, %s, %s, %s::jsonb)
RETURNING id""",
(
pk,
item.get('name', ''),
item.get('email', ''),
item.get('status', 'active'),
json.dumps({k: v for k, v in item.items()
if k not in ('PK', 'SK', 'name', 'email', 'status')}),
)
)
user_map[pk] = cursor.fetchone()[0]
conn.commit()
print(f" Imported {len(users)} users")
# Second pass: import products (PK=PRODUCT#*, SK=METADATA)
product_map = {}
products = [item for item in items if item.get('SK', '').startswith('METADATA')]
for item in products:
pk = item['PK']
cursor.execute(
"""INSERT INTO products (dynamo_pk, name, description, price, category, attributes)
VALUES (%s, %s, %s, %s, %s, %s::jsonb)
RETURNING id""",
(
pk,
item.get('name', ''),
item.get('description', ''),
item.get('price', 0),
item.get('category', ''),
json.dumps({k: v for k, v in item.items()
if k not in ('PK', 'SK', 'name', 'description', 'price', 'category')}),
)
)
product_map[pk] = cursor.fetchone()[0]
conn.commit()
print(f" Imported {len(products)} products")
# Third pass: import orders (PK=USER#*, SK=ORDER#*)
orders = [item for item in items if item.get('SK', '').startswith('ORDER#')]
for item in orders:
user_id = user_map.get(item['PK'])
if user_id:
cursor.execute(
"""INSERT INTO orders (dynamo_sk, user_id, total, status, items)
VALUES (%s, %s, %s, %s, %s::jsonb)""",
(
item['SK'],
user_id,
item.get('total', 0),
item.get('status', 'pending'),
json.dumps(item.get('items', [])),
)
)
conn.commit()
print(f" Imported {len(orders)} orders")
# Fourth pass: import reviews (PK=PRODUCT#*, SK=REVIEW#*)
reviews = [item for item in items if item.get('SK', '').startswith('REVIEW#')]
for item in reviews:
product_id = product_map.get(item['PK'])
user_pk = item.get('user_pk', '')
user_id = user_map.get(user_pk)
if product_id and user_id:
cursor.execute(
"""INSERT INTO reviews (product_id, user_id, rating, title, body)
VALUES (%s, %s, %s, %s, %s)""",
(
product_id,
user_id,
item.get('rating', 3),
item.get('title', ''),
item.get('text', ''),
)
)
conn.commit()
print(f" Imported {len(reviews)} reviews")
if __name__ == "__main__":
conn = connect()
import_single_table_design(conn, 'dynamodb_export.json')
conn.close()
print("\nMigration complete.")
Replace AWS SDK DynamoDB calls with standard SQL queries. See the Query Translation Examples section below for detailed mappings.
-- Verify row counts
SELECT 'users' AS entity, COUNT(*) AS count FROM users
UNION ALL SELECT 'orders', COUNT(*) FROM orders
UNION ALL SELECT 'products', COUNT(*) FROM products
UNION ALL SELECT 'reviews', COUNT(*) FROM reviews;
-- Verify referential integrity (DynamoDB had none)
SELECT o.id AS orphaned_order
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE u.id IS NULL;
-- Test a query that was impossible in DynamoDB
SELECT
u.name,
COUNT(o.id) AS order_count,
SUM(o.total) AS lifetime_value,
AVG(o.total) AS avg_order,
MAX(o.created_at) AS last_order
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.status IN ('shipped', 'delivered')
GROUP BY u.name
HAVING SUM(o.total) > 500
ORDER BY lifetime_value DESC
LIMIT 20;
| DynamoDB Type | Type Code | HeliosDB-Lite Type | Notes |
|---|---|---|---|
| String | S | TEXT | Direct mapping |
| Number | N | INT4, INT8, NUMERIC, or FLOAT8 | Choose based on data range and precision |
| Binary | B | BYTEA | Base64-decoded |
| Boolean | BOOL | BOOLEAN | Direct mapping |
| Null | NULL | SQL NULL | Direct mapping |
| List | L | JSONB array | '[1, "two", true]'::jsonb |
| Map | M | JSONB | '{"key": "value"}'::jsonb |
| String Set | SS | TEXT[] | ARRAY['a', 'b', 'c'] |
| Number Set | NS | INT4[] or FLOAT8[] | ARRAY[1, 2, 3] |
| Binary Set | BS | BYTEA[] | Array of binary values |
| DynamoDB Key Structure | HeliosDB-Lite Equivalent |
|---|---|
| Partition key only (hash) | PRIMARY KEY (id) |
| Partition key + sort key | PRIMARY KEY (partition_col, sort_col) |
| GSI (partition key only) | CREATE INDEX idx_name ON table(column) |
| GSI (partition + sort) | CREATE INDEX idx_name ON table(col1, col2) |
| LSI (same partition, different sort) | CREATE INDEX idx_name ON table(partition_col, alt_sort_col) |
# DynamoDB
response = table.get_item(Key={'user_id': '123'})
item = response.get('Item')
# HeliosDB-Lite
cursor.execute("SELECT * FROM users WHERE id = %s", (123,))
row = cursor.fetchone()
# DynamoDB
table.put_item(Item={
'user_id': '123',
'name': 'Alice',
'email': 'alice@example.com',
'status': 'active'
})
# HeliosDB-Lite (INSERT)
cursor.execute(
"INSERT INTO users (id, name, email, status) VALUES (%s, %s, %s, %s)",
(123, 'Alice', 'alice@example.com', 'active')
)
# HeliosDB-Lite (UPSERT -- equivalent to PutItem which overwrites)
cursor.execute("""
INSERT INTO users (id, name, email, status)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
status = EXCLUDED.status
""", (123, 'Alice', 'alice@example.com', 'active'))
# DynamoDB: Get all orders for a user, sorted by date
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#'),
ScanIndexForward=False # Descending
)
# HeliosDB-Lite: Simple JOIN (no single-table design needed)
cursor.execute("""
SELECT o.id, o.total, o.status, o.created_at
FROM orders o
WHERE o.user_id = %s
ORDER BY o.created_at DESC
""", (123,))
# DynamoDB: Filter is applied AFTER reading from disk (inefficient)
response = table.query(
KeyConditionExpression=Key('PK').eq('USER#123') & Key('SK').begins_with('ORDER#'),
FilterExpression=Attr('status').eq('shipped') & Attr('total').gte(50)
)
# HeliosDB-Lite: WHERE filters at query planning time (efficient)
cursor.execute("""
SELECT * FROM orders
WHERE user_id = %s
AND status = 'shipped'
AND total >= 50
ORDER BY created_at DESC
""", (123,))
# DynamoDB: Expensive, paginated, 1MB limit per call
items = []
scan_kwargs = {'FilterExpression': Attr('status').eq('active')}
while True:
response = table.scan(**scan_kwargs)
items.extend(response['Items'])
if 'LastEvaluatedKey' not in response:
break
scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
# HeliosDB-Lite: Single query, no pagination needed
cursor.execute("SELECT * FROM users WHERE status = 'active'")
items = cursor.fetchall()
# DynamoDB
table.update_item(
Key={'user_id': '123'},
UpdateExpression='SET #n = :name, login_count = login_count + :inc, updated_at = :now',
ExpressionAttributeNames={'#n': 'name'},
ExpressionAttributeValues={
':name': 'Alice Smith',
':inc': 1,
':now': '2026-01-15T10:00:00Z'
}
)
# HeliosDB-Lite (dramatically simpler)
cursor.execute("""
UPDATE users
SET name = %s,
login_count = login_count + 1,
updated_at = NOW()
WHERE id = %s
""", ('Alice Smith', 123))
# DynamoDB (conditional delete)
table.delete_item(
Key={'user_id': '123'},
ConditionExpression=Attr('status').eq('inactive')
)
# HeliosDB-Lite
cursor.execute(
"DELETE FROM users WHERE id = %s AND status = 'inactive'",
(123,)
)
# Check cursor.rowcount to see if a row was actually deleted
# DynamoDB: BatchGetItem (max 100 items, 16MB limit)
response = dynamodb.batch_get_item(
RequestItems={
'Users': {
'Keys': [{'user_id': {'S': '1'}}, {'user_id': {'S': '2'}}, {'user_id': {'S': '3'}}]
}
}
)
# HeliosDB-Lite: Single query, no limits
cursor.execute("SELECT * FROM users WHERE id = ANY(%s)", ([1, 2, 3],))
# DynamoDB (25-item limit, 4MB limit, complex API)
dynamodb.transact_write_items(
TransactItems=[
{'Put': {'TableName': 'Orders', 'Item': {'order_id': {'S': 'ORD-001'}, 'total': {'N': '99.99'}}}},
{'Update': {'TableName': 'Users', 'Key': {'user_id': {'S': '123'}},
'UpdateExpression': 'SET order_count = order_count + :inc',
'ExpressionAttributeValues': {':inc': {'N': '1'}}}},
{'Update': {'TableName': 'Inventory', 'Key': {'product_id': {'S': '456'}},
'UpdateExpression': 'SET stock = stock - :qty',
'ExpressionAttributeValues': {':qty': {'N': '2'}},
'ConditionExpression': 'stock >= :qty'}}
]
)
# HeliosDB-Lite (standard SQL, no limits, much simpler)
cursor.execute("BEGIN")
cursor.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s) RETURNING id",
(123, 99.99)
)
order_id = cursor.fetchone()[0]
cursor.execute(
"UPDATE users SET order_count = order_count + 1 WHERE id = %s",
(123,)
)
cursor.execute(
"UPDATE inventory SET stock = stock - 2 WHERE product_id = %s AND stock >= 2",
(456,)
)
if cursor.rowcount == 0:
conn.rollback()
raise Exception("Insufficient inventory")
conn.commit()
"""
DynamoDB to HeliosDB-Lite migration -- Python application layer.
Replace boto3 DynamoDB calls with psycopg2 SQL queries.
"""
import psycopg2
import psycopg2.extras
import json
class ProductCatalog:
"""
Replaces a DynamoDB-backed product catalog.
DynamoDB limitations this migration eliminates:
- No JOINs (required denormalized data)
- No aggregations (required scan + application code)
- No full-text search (required OpenSearch)
- No vector search (required separate service)
- 400KB item size limit
- 1MB query result limit
"""
def __init__(self, dsn: str = "postgresql://app@127.0.0.1:5432/myapp"):
self.conn = psycopg2.connect(dsn)
def get_product(self, product_id: int) -> dict:
"""Replaces: table.get_item(Key={'product_id': product_id})"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))
return cursor.fetchone()
def search_products(self, query: str, category: str = None, limit: int = 20) -> list:
"""
Replaces: OpenSearch cluster ($500+/month) that was required because
DynamoDB cannot do full-text search.
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
sql = """
SELECT id, name, description, price, category,
ts_rank(to_tsvector('english', name || ' ' || COALESCE(description, '')),
to_tsquery('english', %s)) AS relevance
FROM products
WHERE to_tsvector('english', name || ' ' || COALESCE(description, ''))
@@ to_tsquery('english', %s)
"""
params = [query, query]
if category:
sql += " AND category = %s"
params.append(category)
sql += " ORDER BY relevance DESC LIMIT %s"
params.append(limit)
cursor.execute(sql, params)
return cursor.fetchall()
def get_category_stats(self) -> list:
"""
Replaces: DynamoDB scan + application-layer aggregation.
In DynamoDB, this required scanning the entire table and computing
aggregations in Lambda/application code.
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("""
SELECT
category,
COUNT(*) AS product_count,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_price
FROM products
GROUP BY category
ORDER BY product_count DESC
""")
return cursor.fetchall()
def get_top_rated_by_category(self, min_reviews: int = 5) -> list:
"""
Replaces: DynamoDB $lookup-equivalent (impossible without single-table
design tricks, even then very limited).
"""
cursor = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("""
WITH product_ratings AS (
SELECT
p.id, p.name, p.category, p.price,
AVG(r.rating) AS avg_rating,
COUNT(r.id) AS review_count,
RANK() OVER (PARTITION BY p.category ORDER BY AVG(r.rating) DESC) AS category_rank
FROM products p
JOIN reviews r ON p.id = r.product_id
GROUP BY p.id, p.name, p.category, p.price
HAVING COUNT(r.id) >= %s
)
SELECT * FROM product_ratings
WHERE category_rank <= 3
ORDER BY category, category_rank
""", (min_reviews,))
return cursor.fetchall()
import { Pool, QueryResult } from 'pg';
const pool = new Pool({
host: '127.0.0.1',
port: 5432,
database: 'myapp',
user: 'app',
});
// Replaces: DynamoDB GetItem
async function getUser(userId: number) {
const result = await pool.query('SELECT * FROM users WHERE id = $1', [userId]);
return result.rows[0] || null;
}
// Replaces: DynamoDB PutItem
async function createUser(name: string, email: string, profile: object = {}) {
const result = await pool.query(
`INSERT INTO users (name, email, profile)
VALUES ($1, $2, $3::jsonb)
RETURNING id`,
[name, email, JSON.stringify(profile)]
);
return result.rows[0].id;
}
// Replaces: DynamoDB Query (pk + sk condition) + separate $lookup-equivalent
async function getUserOrderHistory(userId: number) {
const result = await pool.query(`
SELECT
o.id AS order_id,
o.total,
o.status,
o.created_at,
jsonb_agg(
jsonb_build_object(
'product_id', item->>'product_id',
'name', item->>'name',
'quantity', item->>'quantity',
'price', item->>'price'
)
) AS items
FROM orders o,
jsonb_array_elements(o.items) AS item
WHERE o.user_id = $1
GROUP BY o.id
ORDER BY o.created_at DESC
`, [userId]);
return result.rows;
}
// Replaces: DynamoDB Scan + application-layer aggregation + separate analytics pipeline
async function getDashboardMetrics(days: number = 30) {
const result = await pool.query(`
SELECT
COUNT(DISTINCT o.user_id) AS active_customers,
COUNT(o.id) AS total_orders,
SUM(o.total) AS total_revenue,
AVG(o.total) AS avg_order_value,
COUNT(CASE WHEN o.status = 'cancelled' THEN 1 END)::FLOAT
/ NULLIF(COUNT(o.id), 0) AS cancellation_rate
FROM orders o
WHERE o.created_at >= NOW() - ($1 || ' days')::INTERVAL
`, [days]);
return result.rows[0];
}
// Replaces: DynamoDB TransactWriteItems (25-item limit, 4MB limit)
async function placeOrder(userId: number, items: Array<{productId: number, quantity: number, price: number}>) {
const client = await pool.connect();
try {
await client.query('BEGIN');
const total = items.reduce((sum, i) => sum + i.quantity * i.price, 0);
const orderResult = await client.query(
`INSERT INTO orders (user_id, total, status, items)
VALUES ($1, $2, 'pending', $3::jsonb)
RETURNING id`,
[userId, total, JSON.stringify(items)]
);
const orderId = orderResult.rows[0].id;
for (const item of items) {
const invResult = await client.query(
`UPDATE inventory
SET stock = stock - $1
WHERE product_id = $2 AND stock >= $1
RETURNING stock`,
[item.quantity, item.productId]
);
if (invResult.rowCount === 0) {
throw new Error(`Insufficient stock for product ${item.productId}`);
}
}
await client.query(
'UPDATE users SET order_count = order_count + 1 WHERE id = $1',
[userId]
);
await client.query('COMMIT');
return orderId;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
// HeliosDB-Lite exclusive: time-travel for audit/compliance
async function getOrderStateAtTime(orderId: number, timestamp: string) {
const result = await pool.query(
`SELECT * FROM orders AS OF $1 WHERE id = $2`,
[timestamp, orderId]
);
return result.rows[0] || null;
}
-- DynamoDB required: DynamoDB Streams + Lambda function (eventually consistent)
-- HeliosDB-Lite: Synchronous, transactional trigger
-- Audit log trigger (replaces Streams + Lambda audit function)
CREATE TABLE audit_log (
id SERIAL PRIMARY KEY,
table_name TEXT NOT NULL,
operation TEXT NOT NULL, -- INSERT, UPDATE, DELETE
row_id INT4,
old_data JSONB,
new_data JSONB,
changed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO audit_log (table_name, operation, row_id, new_data)
VALUES (TG_TABLE_NAME, 'INSERT', NEW.id, row_to_json(NEW)::jsonb);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO audit_log (table_name, operation, row_id, old_data, new_data)
VALUES (TG_TABLE_NAME, 'UPDATE', NEW.id, row_to_json(OLD)::jsonb, row_to_json(NEW)::jsonb);
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO audit_log (table_name, operation, row_id, old_data)
VALUES (TG_TABLE_NAME, 'DELETE', OLD.id, row_to_json(OLD)::jsonb);
RETURN OLD;
END IF;
END;
$$;
-- Apply to orders table
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
-- Apply to users table
CREATE TRIGGER users_audit
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
-- DynamoDB TTL: Set a numeric attribute, DynamoDB deletes expired items
-- (within 48 hours, not precise)
-- HeliosDB-Lite Option A: Scheduled cleanup with precise timing
DELETE FROM sessions WHERE expires_at < NOW();
-- HeliosDB-Lite Option B: Use partitioning for efficient bulk deletion
CREATE TABLE sessions (
id SERIAL,
user_id INT4 NOT NULL,
token TEXT NOT NULL,
data JSONB,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (expires_at);
-- Create monthly partitions
CREATE TABLE sessions_2026_01 PARTITION OF sessions
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE sessions_2026_02 PARTITION OF sessions
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- Drop expired partitions (instant, no per-row delete)
DROP TABLE sessions_2025_12;
| DynamoDB Feature | Status | Workaround |
|---|---|---|
| Auto-scaling (read/write capacity) | Not applicable | Embedded database uses application server resources; scale the server |
| Global Tables (multi-region) | Different architecture | Use HeliosDB-Lite replication (WAL streaming) or deploy per-region |
| DynamoDB Accelerator (DAX) | Not applicable | HeliosDB-Lite is already in-process (0ms latency); use built-in query cache |
| PartiQL (limited SQL) | Full SQL | HeliosDB-Lite supports complete SQL (PartiQL queries migrate trivially) |
| On-demand capacity | Not applicable | No capacity planning needed; embedded database has no artificial throughput limits |
| Point-in-time recovery (PITR) | Built-in (better) | HeliosDB-Lite time-travel queries (AS OF) provide row-level point-in-time access |
| AWS IAM authentication | Different auth model | HeliosDB-Lite uses SCRAM-SHA-256 or trust authentication |
| VPC endpoints | Not applicable | Embedded database has no network exposure |
| DynamoDB Streams ordering | Different guarantee | Triggers execute synchronously within the transaction (stronger guarantee) |
| Item-level encryption | Different approach | HeliosDB-Lite TDE encrypts entire database at rest (AES-256-GCM) |
| 400KB item limit | No limit | No per-row size restriction |
| Eventual consistency reads | Not applicable | All reads are strongly consistent |
During the transition, write to both databases:
import boto3
import psycopg2
import json
class DualWriteService:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.ddb_table = self.dynamodb.Table('Orders')
self.helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
def create_order(self, user_id: int, total: float, items: list) -> int:
# Primary: HeliosDB-Lite
cursor = self.helios.cursor()
cursor.execute(
"""INSERT INTO orders (user_id, total, status, items)
VALUES (%s, %s, 'pending', %s::jsonb) RETURNING id""",
(user_id, total, json.dumps(items))
)
self.helios.commit()
order_id = cursor.fetchone()[0]
# Secondary: DynamoDB (for rollback safety)
self.ddb_table.put_item(Item={
'PK': f'USER#{user_id}',
'SK': f'ORDER#{order_id}',
'total': str(total),
'status': 'pending',
'items': items,
'helios_id': order_id,
})
return order_id
import psycopg2
import psycopg2.extras
import boto3
import json
from decimal import Decimal
def rollback_to_dynamodb():
helios = psycopg2.connect("postgresql://app@127.0.0.1:5432/myapp")
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Orders')
cursor = helios.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cursor.execute("SELECT * FROM orders")
with table.batch_writer() as batch:
for row in cursor:
batch.put_item(Item={
'PK': f"USER#{row['user_id']}",
'SK': f"ORDER#{row['id']}",
'total': Decimal(str(row['total'])),
'status': row['status'],
'items': json.loads(json.dumps(row['items'])),
'created_at': row['created_at'].isoformat(),
})
print("Rollback to DynamoDB complete")
rollback_to_dynamodb()
Our team can help you plan and execute your migration to HeliosDB.