HeliosDB Sharding Strategy Decision Guide
HeliosDB Sharding Strategy Decision Guide
Table of Contents
- What is Sharding?
- Sharding vs. Partitioning vs. Replication
- Sharding Strategies in HeliosDB
- Sharding Key Selection
- Cross-Shard Operations
- Operational Procedures
- Sharding + Replication Combined
- Real-World Examples
- Capacity Planning
- When NOT to Shard
- Migration Strategies
- Troubleshooting
What is Sharding?
Sharding is a horizontal data distribution technique that divides data across multiple independent storage nodes based on a shard key. Each shard holds a subset of the data, allowing the database to scale beyond the capacity of a single node.
Core Concept
┌─────────────────────────────────┐│ Logical Table: users (1M rows) │└──────────────────┬──────────────┘ │ ┌──────────┴──────────┬──────────────┐ ▼ ▼ ▼ ┌────────┐ ┌────────┐ ┌────────┐ │ Shard 0│ │ Shard 1│ │ Shard 2│ │ Node A │ │ Node B │ │ Node C │ │~333K │ │~333K │ │~333K │ └────────┘ └────────┘ └────────┘Key Benefits
- Scalability: Distribute data across nodes to exceed single-node limits
- Performance: Parallel query execution across multiple shards
- Availability: Shard-level replication enables fault tolerance
- Cost: Horizontal scaling with commodity hardware
- Elasticity: Add/remove nodes dynamically with minimal disruption
When You Need Sharding
- Data Size: Dataset exceeds single node capacity (>1TB)
- Throughput: Single node cannot handle write rate (>10K req/sec)
- Query Latency: Need sub-second response times for large datasets
- Geographic Distribution: Data must reside in multiple regions
- Multi-Tenancy: Isolate tenant data across shards
Sharding vs. Partitioning vs. Replication
These three techniques are often confused but serve different purposes:
| Aspect | Sharding | Partitioning | Replication |
|---|---|---|---|
| Purpose | Distribute data across nodes for scalability | Logically segment data within a node | Create redundant copies for fault tolerance |
| Scope | Multiple physical nodes | Single physical node | Multiple physical nodes |
| Automatic | Yes (HeliosDB handles placement) | No (manual definition required) | Yes (configurable) |
| Query Impact | Requires routing logic; may hit multiple shards | Enables partition pruning within shard | No impact (one copy used) |
| Capacity Impact | Increases total capacity | No impact on capacity | Reduces effective capacity (3x replication = 1/3 capacity) |
| Use Case | Datasets > 1TB; high write throughput | Date-based retention; archival | High availability; disaster recovery |
| Cost | Increases complexity; improves scalability | Improves query performance | Increases storage requirements |
Example: Combined Approach
-- Shard by customer_id (horizontal distribution)-- Partition by date (logical organization within shard)-- Replicate across 3 nodes (fault tolerance)CREATE TABLE events ( event_id BIGINT, customer_id BIGINT NOT NULL, event_time TIMESTAMP NOT NULL, event_data JSONB) SHARD BY (customer_id) PARTITION BY RANGE (event_time) ( PARTITION p2024_q1 VALUES LESS THAN ('2024-04-01'), PARTITION p2024_q2 VALUES LESS THAN ('2024-07-01'), PARTITION p2024_q3 VALUES LESS THAN ('2024-10-01'), PARTITION p2024_q4 VALUES LESS THAN ('2025-01-01') ) WITH (replication_factor = 3);Sharding Strategies in HeliosDB
HeliosDB supports four primary sharding strategies:
Strategy Comparison Matrix
| Strategy | Best For | Distribution | Rebalancing | Hotspot Risk | Implementation |
|---|---|---|---|---|---|
| Hash | General purpose; ID-based access | Excellent (uniform) | Virtual nodes minimize movement | Low | Consistent hashing with virtual nodes |
| Range | Time-series; Sequential IDs; Range queries | Good | May require redistribution | High (time-based) | Key range mapping |
| Geo-Distributed | Multi-region; Latency-sensitive | Configurable | Zone-aware rebalancing | Region-specific | Hash + geographic placement |
| Directory | Multi-tenant; Custom mapping | Flexible | Metadata table-based | Depends on mapping | Explicit shard mapping table |
1. Hash-Based Sharding
Strategy: Distribute keys uniformly using consistent hashing with virtual nodes.
When to Use
- General-purpose databases with diverse access patterns
- User IDs, transaction IDs, order IDs as shard keys
- Uniform data distribution is critical
- Minimal data movement during scaling
Benefits
✓ Excellent load balancing (< 5% variance)✓ Minimal data movement (only ~1/N keys move when adding Nth shard)✓ No single point of failure in routing✓ Automatic rebalancing with virtual nodes✓ Deterministic key-to-shard mappingDrawbacks
✗ Range queries require hitting multiple shards✗ No locality optimization (adjacent keys on different shards)✗ Hot keys still cause uneven load within shardConfiguration Example
-- Basic hash sharding by user_idCREATE TABLE users ( id BIGINT PRIMARY KEY, name VARCHAR(100), email VARCHAR(100), created_at TIMESTAMP) SHARD BY HASH(id);
-- Hash sharding by composite keyCREATE TABLE user_activity ( user_id BIGINT, activity_id BIGINT, activity_type VARCHAR(50), timestamp TIMESTAMP, PRIMARY KEY (user_id, activity_id)) SHARD BY HASH(user_id);
-- Equivalent explicit hash functionCREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, total DECIMAL(10,2)) SHARD BY (HASH(customer_id) % 64);Performance Characteristics
- Hash Lookup: O(log N) where N = virtual nodes (~500ns per lookup)
- Throughput: 2M+ hash operations per second on modern hardware
- Virtual Nodes: 150 per physical shard (default); configurable 50-300
Rebalancing Approach
1. Monitor load across all shards2. Identify hot shards (CPU, memory, storage > threshold)3. Split hot shards into two child shards4. Identify cold shard pairs5. Merge cold shards6. Update hash ring atomically (zero-downtime)7. Redirect traffic incrementally to new shardsHotspot Handling
-- Problem: User ID 12345 has 100M records (hotspot)-- Solution: Split the shard containing this key
-- Monitor hotspotsSELECT shard_id, record_count, max_key_frequencyFROM system.shard_hotspot_analysisWHERE max_key_frequency > 0.1; -- 10% of shard's data
-- Automatic split triggered by HeliosDB when:-- - Single key accounts for >5% of shard writes-- - Shard CPU > 80%-- - Shard storage > configured threshold2. Range-Based Sharding
Strategy: Distribute keys based on value ranges. Keys in range [A, B) go to one shard.
When to Use
- Time-series data (events, logs, metrics)
- Sequential IDs where ordering matters
- Range query patterns are common
- Data archival/retention policies by range
- Geographic distribution with explicit boundaries
Benefits
✓ Excellent for range queries (e.g., "last 7 days")✓ Natural fit for time-series data✓ Easy to implement data retention policies✓ Predictable data locality✓ Better for sequential access patternsDrawbacks
✗ Hot spot risk: All new time-series data goes to latest shard✗ Uneven distribution if data arrives non-uniformly✗ Rebalancing requires data redistribution✗ Range boundaries must be managed manually✗ Difficult to add/remove shards dynamicallyConfiguration Example
-- Time-based range shardingCREATE TABLE events ( event_id BIGINT PRIMARY KEY, user_id BIGINT NOT NULL, event_time TIMESTAMP NOT NULL, event_data JSONB) SHARD BY RANGE (event_time) ( SHARD 'shard_2024_01' VALUES FROM '2024-01-01' TO '2024-02-01', SHARD 'shard_2024_02' VALUES FROM '2024-02-01' TO '2024-03-01', SHARD 'shard_2024_03' VALUES FROM '2024-03-01' TO '2024-04-01', SHARD 'shard_2024_04' VALUES FROM '2024-04-01' TO '2024-05-01');
-- ID-based range shardingCREATE TABLE products ( id BIGINT PRIMARY KEY, name VARCHAR(100), price DECIMAL(10,2)) SHARD BY RANGE (id) ( SHARD 'shard_0' VALUES FROM 0 TO 1000000, SHARD 'shard_1' VALUES FROM 1000000 TO 2000000, SHARD 'shard_2' VALUES FROM 2000000 TO 3000000, SHARD 'shard_3' VALUES FROM 3000000 TO UNBOUNDED);
-- Composite range shardingCREATE TABLE transactions ( tx_id BIGINT, customer_id BIGINT, tx_date DATE, amount DECIMAL(10,2), PRIMARY KEY (tx_id, tx_date)) SHARD BY RANGE (tx_date, customer_id) ( -- Shard by month, then by customer SHARD 's_2024_01_a' VALUES FROM ('2024-01-01', 0) TO ('2024-02-01', 500000), SHARD 's_2024_01_b' VALUES FROM ('2024-02-01', 0) TO ('2024-03-01', 500000));Performance Characteristics
- Range Scan: Can be efficient if query range aligns with shard boundaries
- Hot Shard Problem: Latest time-series shard receives all new writes
- Boundary Overhead: Finding correct shard requires B-tree lookup O(log S) where S = number of shards
Rebalancing Approach
1. Define new range boundaries based on data distribution2. Create new shards with new ranges3. Migrate data from old shards to new shards4. Update routing metadata5. Decommission old shards6. Risk: Can take hours for large datasetsHotspot Handling
-- Problem: All recent events go to latest time shard-- Solution: Sub-shard by hash after range
CREATE TABLE events ( event_id BIGINT PRIMARY KEY, user_id BIGINT, event_time TIMESTAMP, event_type VARCHAR(50)) SHARD BY RANGE (YEAR(event_time), MONTH(event_time)) AND HASH(user_id); -- Sub-shard by user_id
-- This distributes writes uniformly while maintaining time-series organization3. Geo-Distributed Sharding
Strategy: Combine hash-based sharding with geographic node placement to optimize latency and compliance.
When to Use
- Multi-region deployments (US, EU, APAC)
- Latency-sensitive applications where sub-10ms is critical
- Data residency compliance (GDPR, data sovereignty)
- Global scale with regional preferences
- Disaster recovery with cross-region backup
Benefits
✓ Optimized read latency (local region reads)✓ Regulatory compliance (data stays in region)✓ Disaster recovery (automatic failover to backup region)✓ Cost optimization (read from local cheaper region)✓ Geographic load balancingDrawbacks
✗ Increased complexity (zone-aware routing)✗ Cross-region writes have higher latency✗ Data must be accessible from multiple regions✗ Synchronization between regions adds overhead✗ Failover recovery time (RTO) depends on data volumeConfiguration Example
-- Geo-distributed sharding with region affinityCREATE TABLE users ( id BIGINT PRIMARY KEY, name VARCHAR(100), region VARCHAR(10) NOT NULL, -- 'US', 'EU', 'APAC' email VARCHAR(100)) SHARD BY HASH(id) WITH ( region_affinity = true, -- Primary replicas in user's region -- Secondary replicas in backup region replica_placement = { 'US': ['us-east-1a', 'us-east-1b', 'us-west-1a'], 'EU': ['eu-west-1a', 'eu-central-1a'], 'APAC': ['ap-southeast-1a', 'ap-northeast-1a'] } );
-- Multi-region with read preferencesSELECT * FROM usersWHERE id = 12345WITH (read_preference = 'nearest'); -- Read from nearest region
-- Cross-region replication with latency awarenessCREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT, region VARCHAR(10)) SHARD BY HASH(customer_id) WITH ( replication_factor = 3, replication_zones = ['primary_region', 'backup_region', 'archive_region'] );Performance Characteristics
- Local Reads: 1-5ms latency within region
- Cross-Region Writes: 50-200ms latency (network dependent)
- Failover Time: 5-30 seconds (depending on detection mechanism)
- Zone-Aware Routing: ~1% latency overhead for routing decision
Rebalancing Approach
1. Monitor load per region2. For intra-region imbalance: Normal hash-based rebalancing3. For inter-region imbalance: Trigger region migration4. Data migration honors region boundaries (don't move data unnecessarily)5. Gradual traffic shift to new regions (canary approach)Hotspot Handling
For geographic hotspots:1. Identify region with excessive load2. Add shards specifically in that region3. Re-hash subset of data to new region-local shards4. Gradually migrate traffic4. Directory-Based (Explicit Mapping) Sharding
Strategy: Maintain explicit mapping of shard key to shard in a metadata table.
When to Use
- Multi-tenant SaaS where tenant → shard mapping is explicit
- Custom sharding logic not fitting standard patterns
- Tenant isolation requirements
- Fine-grained control over shard assignment
- Gradual migration from non-sharded to sharded system
Benefits
✓ Flexible mapping (any shard key to any shard)✓ Easy tenant isolation (one tenant per shard)✓ Explicit control over data placement✓ Supports custom business logic✓ Gradual migration (coexist sharded + non-sharded)Drawbacks
✗ Metadata lookup on every query (routing latency +1-2ms)✗ Single point of failure (directory service)✗ Directory maintenance overhead✗ Limited to < 10K shards (lookup becomes bottleneck)✗ Manual rebalancing requiredConfiguration Example
-- Directory table mapping tenant to shardCREATE TABLE shard_directory ( tenant_id BIGINT PRIMARY KEY, shard_id VARCHAR(50) NOT NULL, shard_node VARCHAR(100) NOT NULL, replication_nodes TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_rebalanced TIMESTAMP);
-- Example dataINSERT INTO shard_directory VALUES (1001, 'shard_0', 'node-a', 'node-b,node-c', NOW(), NOW()), (1002, 'shard_1', 'node-b', 'node-c,node-a', NOW(), NOW()), (1003, 'shard_2', 'node-c', 'node-a,node-b', NOW(), NOW());
-- Queries use directory for routingPREPARE route_query AS SELECT shard_node FROM shard_directory WHERE tenant_id = $1;
-- Application logic:-- 1. Look up tenant shard: SELECT shard_node FROM shard_directory WHERE tenant_id = ?-- 2. Route query to shard_node-- 3. Execute query with tenant filter
CREATE TABLE tenant_data ( record_id BIGINT PRIMARY KEY, tenant_id BIGINT NOT NULL, data JSONB, created_at TIMESTAMP) SHARD BY (tenant_id);Performance Characteristics
- Directory Lookup: O(1) hash table lookup in-memory; ~100 microseconds
- Directory Replication: 3-way replication for fault tolerance
- Maximum Shards: Effective limit ~10K before routing becomes bottleneck
- Cache Directory: Application should cache directory locally (TTL: 5-60 minutes)
Rebalancing Approach
1. Add new shard to directory2. Update directory for subset of tenants3. Migrate tenant data to new shard4. Update directory entry once migration complete5. Remove old shard from directorySharding Key Selection
The shard key is the most critical decision in sharding architecture. Poor shard key choice cannot be easily fixed.
Good Shard Keys
1. High Cardinality
Cardinality: Number of distinct values in the column.
-- GOOD: user_id has millions of distinct valuesCREATE TABLE user_activity ( activity_id BIGINT PRIMARY KEY, user_id BIGINT NOT NULL, activity_type VARCHAR(50), timestamp TIMESTAMP) SHARD BY (user_id);-- Distribution: 1M users across 4 shards = ~250K records per shard ✓
-- AVOID: status has only 5 distinct valuesCREATE TABLE orders ( order_id BIGINT PRIMARY KEY, status VARCHAR(20) -- 'pending', 'confirmed', 'shipped', 'delivered', 'cancelled') SHARD BY (status);-- Distribution: 1M orders / 5 values = 200K records per status ✓-- But only 5 shards max, then shards become overloaded ✗Cardinality Rule: Cardinality ≥ 100 × number_of_shards
For 16 shards: Need ≥ 1,600 distinct values
2. Uniform Distribution
Distribution: Data spread evenly across shard key values.
-- GOOD: Auto-increment IDs distribute uniformlyCREATE TABLE orders ( order_id BIGINT AUTO_INCREMENT PRIMARY KEY, customer_id BIGINT NOT NULL, total DECIMAL(10,2)) SHARD BY (order_id);-- Distribution: Sequential IDs hash evenly ✓
-- AVOID: User-supplied codes may clusterCREATE TABLE items ( item_code VARCHAR(20) PRIMARY KEY, -- e.g., "PROD-0001", "PROD-0002" name VARCHAR(100)) SHARD BY (item_code);-- Distribution: Sequential codes hash unevenly (first digits same) ✗-- Solution: Use HASH(item_code) or add random prefix3. Query Pattern Alignment
Alignment: Shard key matches common WHERE clause predicates.
-- Query Pattern 1: Filter by customer-- SELECT * FROM orders WHERE customer_id = ? AND order_date > ?
CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, order_date DATE NOT NULL, total DECIMAL(10,2)) SHARD BY (customer_id); -- ✓ Single-shard query
-- Query Pattern 2: Filter by product-- SELECT * FROM orders WHERE product_id = ? AND order_date > ?
CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, product_id BIGINT NOT NULL, order_date DATE NOT NULL, total DECIMAL(10,2)) SHARD BY (product_id); -- ✓ Single-shard query (but different pattern)Trade-off: Choose shard key matching most frequent query pattern.
4. Immutable or Rarely Changing
Immutability: Value shouldn’t change after record creation.
-- GOOD: User ID never changesCREATE TABLE users ( id BIGINT PRIMARY KEY, name VARCHAR(100), email VARCHAR(100)) SHARD BY (id);
-- PROBLEMATIC: Department changes (requires record migration)CREATE TABLE employees ( employee_id BIGINT PRIMARY KEY, department_id BIGINT NOT NULL, -- Changes when employee transfers salary DECIMAL(10,2)) SHARD BY (department_id);-- When department_id changes, must re-hash and potentially move record-- Creates rebalancing overheadBad Shard Keys
1. Timestamp-Based
Problem: Creates hotspots; all recent writes go to one shard.
-- AVOID: Timestamp shard keyCREATE TABLE events ( event_id BIGINT PRIMARY KEY, event_time TIMESTAMP NOT NULL, user_id BIGINT, event_type VARCHAR(50)) SHARD BY (event_time);
-- All events from last hour → single shard ✗-- Shard becomes bottleneck while others idle-- Distribution over time: hot → cold as time progresses
-- SOLUTION: Hash-based with time as secondary partitionCREATE TABLE events ( event_id BIGINT PRIMARY KEY, event_time TIMESTAMP NOT NULL, user_id BIGINT NOT NULL, event_type VARCHAR(50)) SHARD BY HASH(user_id) PARTITION BY RANGE (event_time);-- Now: events distributed across shards by user, organized by time2. Low Cardinality
Problem: Too few distinct values leads to uneven load.
-- AVOID: Category has only 10 valuesCREATE TABLE products ( id BIGINT PRIMARY KEY, category VARCHAR(50), -- 'electronics', 'clothing', etc. price DECIMAL(10,2)) SHARD BY (category);-- With 10 categories and 64 shards: most shards empty ✗-- Data clusters in few shards-- Scaling ineffective (adding shards doesn't help)
-- SOLUTION: Use composite keyCREATE TABLE products ( id BIGINT PRIMARY KEY, category VARCHAR(50), price DECIMAL(10,2)) SHARD BY HASH(category || '|' || id);-- Distribution: Across full shard space regardless of category3. Nullable Columns
Problem: NULL values have undefined shard mapping.
-- AVOID: Nullable shard keyCREATE TABLE records ( id BIGINT PRIMARY KEY, optional_field VARCHAR(50), data JSONB) SHARD BY (optional_field);-- What happens when optional_field IS NULL? ✗-- Shard mapping undefined
-- SOLUTION: Use NOT NULL or provide defaultCREATE TABLE records ( id BIGINT PRIMARY KEY, optional_field VARCHAR(50) NOT NULL DEFAULT 'unknown', data JSONB) SHARD BY (optional_field);
-- BETTER: Use non-nullable primary keyCREATE TABLE records ( id BIGINT PRIMARY KEY, optional_field VARCHAR(50), data JSONB) SHARD BY (id); -- Always has value4. Rapidly Changing Values
Problem: Record must move when shard key changes.
-- AVOID: Status changes frequentlyCREATE TABLE tasks ( task_id BIGINT PRIMARY KEY, status VARCHAR(20), -- Changes: pending → running → completed assigned_to BIGINT) SHARD BY (status);-- Each status change requires record migration ✗-- Rebalancing overhead
-- SOLUTION: Use stable identifierCREATE TABLE tasks ( task_id BIGINT PRIMARY KEY, status VARCHAR(20), assigned_to BIGINT) SHARD BY (task_id); -- ImmutableCardinality Guidelines
| Cardinality | Max Shards | Example |
|---|---|---|
| < 10 | 1 | Gender, region (5-10 values) |
| 10-100 | 1-2 | Country, category (50 values) |
| 100-1,000 | 2-10 | Department, team |
| 1,000-1M | 10-1,000 | User ID, Customer ID |
| > 1M | 100-10,000+ | Session ID, Event ID |
Temporal Data Handling
Challenge: Time-series data creates natural hotspots (latest data).
Approach 1: Hash + Time Partition
CREATE TABLE metrics ( metric_id BIGINT, host_id BIGINT NOT NULL, collected_at TIMESTAMP NOT NULL, cpu_usage DECIMAL(5,2), memory_usage DECIMAL(5,2)) SHARD BY HASH(host_id) PARTITION BY RANGE (collected_at) ( PARTITION p2024_12_01 VALUES LESS THAN ('2024-12-02'), PARTITION p2024_12_02 VALUES LESS THAN ('2024-12-03'), PARTITION p2024_12_03 VALUES LESS THAN ('2024-12-04') );
-- Benefits:-- ✓ Metrics for same host → same shard (co-located queries)-- ✓ Old partitions can be archived/deleted-- ✓ Time-range queries benefit from partition pruningApproach 2: Time Bucketing + Hash
CREATE TABLE events ( event_id BIGINT, user_id BIGINT NOT NULL, event_time TIMESTAMP NOT NULL, bucket_hour BIGINT GENERATED ALWAYS AS (EXTRACT(EPOCH FROM event_time) / 3600), event_type VARCHAR(50)) SHARD BY HASH(user_id || ':' || bucket_hour);
-- Benefits:-- ✓ Distributes writes across shards in current hour-- ✓ Historical data (different hours) still sharded consistently-- ✓ Query for "user_id=X, last 24 hours" hits multiple shardsCross-Shard Operations
Most distributed databases struggle with operations spanning multiple shards. HeliosDB provides optimized support for common patterns.
Handling Joins Across Shards
Pattern 1: Co-located Join (Best)
Prerequisite: Both tables sharded by same key.
-- Tables sharded by customer_idCREATE TABLE customers ( customer_id BIGINT PRIMARY KEY, name VARCHAR(100)) SHARD BY (customer_id);
CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, total DECIMAL(10,2)) SHARD BY (customer_id);
-- Query: Single-shard join (all data on one shard)SELECT c.name, SUM(o.total) AS customer_totalFROM customers cJOIN orders o ON c.customer_id = o.customer_idWHERE c.customer_id = 12345GROUP BY c.name;
-- Execution:-- 1. Route by customer_id = 12345 → shard 7-- 2. Both tables have data on shard 7-- 3. Join executes locally (no network traffic)-- 4. Latency: 1-5msPerformance: ~1ms per join (local execution)
Pattern 2: Reference Table Join
Prerequisite: One table is small, replicated on all shards.
-- Small reference table (replicated)CREATE TABLE categories ( category_id INT PRIMARY KEY, category_name VARCHAR(50)) WITH (table_type = 'reference'); -- Replicated to all shards
-- Large table sharded by product_idCREATE TABLE products ( product_id BIGINT PRIMARY KEY, category_id INT NOT NULL, name VARCHAR(100)) SHARD BY (product_id);
-- Query: Reference join (no network traffic)SELECT p.name, c.category_nameFROM products pJOIN categories c ON p.category_id = c.category_idWHERE p.product_id = 999;
-- Execution:-- 1. Route products by product_id → shard 2-- 2. Categories available locally on all shards-- 3. Join executes locally-- 4. Latency: 1-5msPerformance: ~2ms per join
Constraints: Reference table must fit in memory; max size ~1GB per shard
Pattern 3: Broadcast Join
Prerequisite: One table is small enough to broadcast.
-- Small table: department infoCREATE TABLE departments ( dept_id INT PRIMARY KEY, dept_name VARCHAR(100)); -- Not sharded; small size
-- Large table: employeesCREATE TABLE employees ( emp_id BIGINT PRIMARY KEY, dept_id INT NOT NULL, name VARCHAR(100)) SHARD BY (emp_id);
-- Query: Broadcast joinSELECT e.name, d.dept_nameFROM employees eJOIN departments d ON e.dept_id = d.dept_idWHERE e.salary > 100000;
-- Execution:-- 1. Broadcast departments table to all shards-- 2. Execute join on each shard in parallel-- 3. Gather results-- 4. Latency: 5-50ms (depends on size of departments table)Performance: ~20ms for typical broadcast join
Pattern 4: Repartition Join (Expensive)
Prerequisite: Tables sharded by different keys; no better option.
-- Table 1: orders sharded by customer_idCREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, product_id BIGINT NOT NULL, quantity INT) SHARD BY (customer_id);
-- Table 2: products sharded by product_idCREATE TABLE products ( product_id BIGINT PRIMARY KEY, supplier_id BIGINT NOT NULL, name VARCHAR(100)) SHARD BY (product_id);
-- Query: Must repartition to joinSELECT o.order_id, p.name, o.quantityFROM orders oJOIN products p ON o.product_id = p.product_idWHERE o.customer_id = 12345;
-- Execution (HeliosDB optimizes):-- 1. Fetch orders where customer_id = 12345 (single shard) [1ms]-- 2. Group by product_id [1ms]-- 3. Broadcast product_ids to all product shards [10ms]-- 4. Fetch matching products from product shards [20ms]-- 5. Stream results to coordinator; perform join [5ms]-- 6. Total: ~40ms (vs ~1ms for co-located join)Performance: 20-100ms depending on result set size
Cost: 10-100x slower than co-located join; avoid if possible.
Aggregations Across Shards
Single-Shard Aggregation (Fast)
-- Aggregate on single shard (shard key in WHERE)SELECT customer_id, COUNT(*) as order_count, SUM(total) as revenueFROM ordersWHERE customer_id = 12345GROUP BY customer_id;
-- Execution:-- 1. Route to customer shard → 1ms-- 2. Compute aggregate locally → 2ms-- 3. Return result → 1ms-- Total: ~5msMulti-Shard Aggregation (Slower)
-- Aggregate across all shards (no shard key in WHERE)SELECT customer_id, COUNT(*) as order_count, SUM(total) as revenueFROM ordersGROUP BY customer_id;
-- Execution (two-phase):-- Phase 1: Partial aggregation on each shard → 50ms parallel-- Phase 2: Final aggregation of partial results → 5ms-- Total: ~55msOptimized Two-Phase Aggregation
-- HeliosDB automatically optimizes aggregations:-- 1. Send aggregation query to all shards-- 2. Each shard computes partial result (COUNT, SUM, etc.)-- 3. Coordinator combines partial results-- 4. Coordinator computes final result
-- For COUNT, SUM, AVG: Linear cost with shard count-- For MIN, MAX: Constant cost (early exit)-- For DISTINCT: Exponential cost (must collect all values)Transactions Spanning Shards
Single-Shard Transaction (Atomic)
-- All data on one shard (ACID guarantees)BEGIN; INSERT INTO orders (customer_id, total) VALUES (12345, 99.99); INSERT INTO order_items (order_id, product_id) VALUES (NEW_ORDER_ID, 42);COMMIT;
-- Guarantees:-- ✓ Atomicity: All or nothing-- ✓ Consistency: Constraints validated-- ✓ Isolation: No partial reads-- ✓ Durability: Written to diskMulti-Shard Transaction (Two-Phase Commit)
-- Data spans multiple shards (requires coordination)BEGIN; UPDATE accounts SET balance = balance - 100 WHERE customer_id = 111; -- Shard A
UPDATE accounts SET balance = balance + 100 WHERE customer_id = 222; -- Shard BCOMMIT;
-- Execution (Two-Phase Commit):-- Phase 1 (Prepare):-- - Lock affected rows on each shard-- - Validate constraints-- - Return prepared state-- Phase 2 (Commit):-- - Write changes to all shards-- - If any shard fails, rollback all-- - Release locks-- Latency: ~50-200ms (network overhead)Cost Implications
| Scenario | Latency | Throughput | Recommended |
|---|---|---|---|
| Single-shard transaction | 1-5ms | 10K+ TPS | Yes, design for this |
| 2-shard transaction | 50-100ms | 100 TPS | Acceptable if rare |
| 3+ shard transaction | 100-500ms | 10 TPS | Avoid; redesign schema |
Best Practice: Design schemas to maximize single-shard transactions.
Operational Procedures
Adding a New Shard
Scenario: Cluster growing from 4 to 8 shards
-- Step 1: Create new physical nodes-- (Operations team provisions hardware)
-- Step 2: Initiate shard addition (admin command)-- CLI: heliosdb shard add --count 4 --nodes [new-node-1, new-node-2, new-node-3, new-node-4]
-- Step 3: Monitor rebalancing progressSELECT shard_id, source_shard, rebalance_status, progress_pct, bytes_transferred, estimated_completionFROM system.rebalance_operationsORDER BY estimated_completion;
-- Example output:-- shard_id | status | progress | bytes | eta-- ------------|-------------|----------|--------------|------------ shard_4 | in_progress | 45% | 450GB / 1TB | 2024-12-31 14:30:00-- shard_5 | in_progress | 23% | 230GB / 1TB | 2024-12-31 15:15:00-- shard_6 | queued | 0% | 0GB / 1TB | 2024-12-31 16:00:00
-- Step 4: Automatic load balancing-- HeliosDB automatically:// 1. Splits hot shards across new shards// 2. Migrates data with zero downtime// 3. Updates hash ring atomically// 4. Redirects traffic gradually (canary approach)
-- Step 5: Verify completionSELECT COUNT(*) as active_shards FROM system.shards WHERE status = 'active';-- Result: 8 shards after operation completesTime Estimate: 1-4 hours per TB of data (depends on network bandwidth)
Zero-Downtime: Application continues running; no client-side changes needed.
Removing a Shard
Scenario: Decommissioning an old node
-- Step 1: Mark shard for decommissioning-- CLI: heliosdb shard decommission --shard shard_3
-- Step 2: Monitor drain progressSELECT shard_id, remaining_bytes, records_remaining, estimated_completionFROM system.shard_decommission_statusWHERE shard_id = 'shard_3';
-- Step 3: Automatic data migration// HeliosDB redistributes data from shard_3 to active shards// Migration happens in background with no client impact
-- Step 4: Hardware decommission (after data migration complete)-- Operations team powers down the nodePrecaution: Ensure 2x replication before removal (don’t lose data).
Time Estimate: 2-6 hours per TB being removed.
Rebalancing Shards
Scenario: Cluster has become imbalanced (some shards hot, others cold)
-- Step 1: Analyze current load distributionSELECT shard_id, record_count, size_mb, cpu_usage, memory_usage, request_rate_rps, load_scoreFROM system.shard_metricsORDER BY load_score DESC;
-- Example (imbalanced):-- shard_id | records | size | cpu | memory | rps | load-- ---------|-----------|------|-------|--------|--------|--------- shard_0 | 5,000,000 | 500 | 0.85 | 0.78 | 50,000 | 0.81 ← Hot-- shard_1 | 2,000,000 | 200 | 0.40 | 0.35 | 15,000 | 0.38// shard_2 | 100,000 | 10 | 0.02 | 0.01 | 500 | 0.01 ← Cold
-- Step 2: Generate rebalance planSELECT operation_type, source_shard, target_shard, estimated_impact, estimated_duration_minutesFROM system.rebalance_planWHERE plan_id = (SELECT latest_plan_id FROM system.rebalance_plans);
-- Example output:-- operation | source | target | impact | duration// --------------|----------|----------|---------------------------|----------// split | shard_0 | shard_0a | Reduce cpu 0.85 → 0.45 | 120// split | shard_0 | shard_0b | Reduce cpu 0.85 → 0.45 | 120// merge | shard_2 | shard_1 | Consolidate cold shard | 30
-- Step 3: Execute rebalance plan-- CLI: heliosdb cluster rebalance --plan-id <plan-id> --execute
-- Step 4: Monitor rebalancingSELECT * FROM system.rebalance_operationsORDER BY start_time DESC;
-- Step 5: Verify balanced stateSELECT AVG(load_score) as avg_load, STDDEV(load_score) as load_stddev, MAX(load_score) as max_loadFROM system.shard_metrics;-- Target: load_stddev < 0.1 (< 10% variance)Monitoring Shard Health
Key Metrics to Monitor
-- Shard size distributionSELECT shard_id, ROUND(size_mb / 1024.0, 2) as size_gb, record_count, ROUND(100.0 * size_mb / (SELECT SUM(size_mb) FROM system.shard_stats), 2) as pct_totalFROM system.shard_statsORDER BY size_mb DESC;
-- Load distribution (check for hotspots)SELECT shard_id, ROUND(cpu_usage * 100, 2) as cpu_pct, ROUND(memory_usage * 100, 2) as mem_pct, requests_per_sec, p99_latency_msFROM system.shard_metricsWHERE requests_per_sec > 0ORDER BY cpu_usage DESC;
-- Replication statusSELECT shard_id, primary_node, replication_lag_ms, replica_status, mirror_node_1, mirror_node_2FROM system.shard_replication_statusWHERE replica_status != 'healthy'; -- Show problems
-- Rebalancing activitySELECT shard_id, operation_type, start_time, estimated_completion, progress_pct, statusFROM system.rebalance_operationsWHERE status IN ('in_progress', 'queued');
-- Hotspot detectionSELECT shard_id, top_key, key_frequency, ROUND(100.0 * key_frequency / total_keys, 2) as pct_of_shardFROM system.shard_hotspot_analysisWHERE key_frequency / total_keys > 0.05 -- Top 5% of shardORDER BY key_frequency DESC;Alerting Rules
# Example Prometheus/Alertmanager rules- alert: ShardHotspot expr: heliosdb_shard_cpu_usage > 0.80 duration: 5m annotations: summary: "Shard {{ $labels.shard_id }} CPU > 80%" action: "Consider splitting shard {{ $labels.shard_id }}"
- alert: ShardImbalance expr: stddev(heliosdb_shard_load_score) > 0.15 duration: 10m annotations: summary: "Cluster load imbalance > 15%" action: "Run cluster rebalance"
- alert: ReplicationLag expr: heliosdb_shard_replication_lag_ms > 5000 duration: 2m annotations: summary: "Shard {{ $labels.shard_id }} replication lag > 5s" action: "Check network; may need more bandwidth"Sharding + Replication Combined
Sharding (horizontal distribution) and replication (fault tolerance) are complementary techniques commonly used together.
Architecture
Cluster: 4 nodes, 8 shards, 2x replication┌─────────────────────────────────────────────────────────────┐│ Cluster │├────────────────┬────────────────┬────────────────┬───────────┤│ Node A │ Node B │ Node C │ Node D ││ │ │ │ ││ ┌────────────┐ │ ┌────────────┐ │ ┌────────────┐ │┌────────┐││ │ Shard 0 P* │ │ │ Shard 1 P* │ │ │ Shard 2 P* │ ││Shard 3 P*│││ └────────────┘ │ └────────────┘ │ └────────────┘ │└────────┘││ │ │ │ ││ ┌────────────┐ │ ┌────────────┐ │ ┌────────────┐ │ ││ │ Shard 4 M* │ │ │ Shard 5 M* │ │ │ Shard 6 M* │ │ ││ └────────────┘ │ └────────────┘ │ └────────────┘ │ ││ │ │ │ ││ Shard │7 Primary→│ │ │ │└────────────────┼────────────────┼────────────────┴───────────┘ │ │ ┌────▼────┬──────────┬─▼──────┐ │ Shard 4 │ Shard 5 │ Shard 6 │ Primary │ Primary │ PrimaryLayout:
- Node A: Shard 0 (primary), Shard 4 (mirror)
- Node B: Shard 1 (primary), Shard 5 (mirror)
- Node C: Shard 2 (primary), Shard 6 (mirror)
- Node D: Shard 3 (primary), Shard 7 (mirror)
Each shard has 1 primary and 1 mirror replica.
Write Path (Consistency)
Application Write Request │ ▼Route to shard key → Shard 0 │ ▼Write to Primary (Node A) │ ┌────┴────┐ │ │ ▼ ▼ Replicate to → Primary completes write Mirror (Node B)│ │ ▼ │ Return to client │ │ └─────────────┘ │ ▼Mirror ACKs replication(async background)Latency:
- Write to primary: 1-2ms
- Return to client: 2-5ms (doesn’t wait for mirror ACK)
- Mirror replication: 5-50ms asynchronously
Durability:
- Immediate: Stored on primary node
- Replicated: Mirrored to backup node within 100ms
Read Path (Availability)
Application Read Request │ ▼Route to shard key → Shard 0 │ ┌────┴─────┐ │ │ ▼ ▼Default: Read from Primary(strong consistency) │ ▼Return to client (1-5ms)
Alternative: WITH (read_from = 'replica') │ ▼Read from Mirror (Node B)(eventual consistency) │ ▼Return to client (1-5ms)Failover Scenario
Node A (Primary for Shard 0) fails │ ▼Health checker detects offline (10s) │ ▼Promote Mirror (Node B) to Primary │ ▼Update routing table │ ▼New Primary on Node B- Client requests now routed to Node B- Previous client connections may timeout (graceful reconnect)- RTO (Recovery Time Objective): ~15s- RPO (Recovery Point Objective): ~100msConfiguration
-- Table with sharding and replicationCREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, total DECIMAL(10,2)) SHARD BY (customer_id) WITH ( -- Sharding configuration shard_count = 16, virtual_nodes_per_shard = 150, auto_rebalance = true, split_threshold = 0.80, merge_threshold = 0.20,
-- Replication configuration replication_factor = 2, replication_strategy = 'balanced', consistency_level = 'strong' -- Wait for replica confirmation );
-- Read with replica preferenceSELECT * FROM ordersWHERE customer_id = 12345WITH ( read_preference = 'nearest', -- Read from nearest node consistency = 'eventual' -- Ok with slightly stale data);Combined Benefits
Sharding Benefits Replication Benefits├─ 4x capacity (4 nodes) ├─ 2x availability (survive 1 failure)├─ 4x throughput ├─ Read scaling (2 replicas)├─ Low latency (local shard) └─ Data durability
Combined: 4x capacity + 2x availability + Read distributionReal-World Examples
Example 1: E-Commerce Order System
Requirements
- 100 million customers
- 1 billion orders/year (100K/day)
- Need < 50ms latency for customer queries
- Multi-region deployment (US, EU, APAC)
Design
-- Customers table (sharded by customer_id)CREATE TABLE customers ( customer_id BIGINT PRIMARY KEY, email VARCHAR(100) NOT NULL UNIQUE, name VARCHAR(100), region VARCHAR(10), -- 'US', 'EU', 'APAC' created_at TIMESTAMP) SHARD BY HASH(customer_id) WITH ( shard_count = 128, -- Room to grow to 128+ nodes replication_factor = 2, region_affinity = true -- Replicas in same region );
-- Orders table (co-located with customers)CREATE TABLE orders ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, order_date DATE NOT NULL, total DECIMAL(10,2) NOT NULL, status VARCHAR(20) NOT NULL) SHARD BY (customer_id) -- Same shard key as customers PARTITION BY RANGE (order_date) ( -- Partition for retention PARTITION p2024_q4 VALUES LESS THAN ('2025-01-01'), PARTITION p2025_q1 VALUES LESS THAN ('2025-04-01'), PARTITION archive_old VALUES LESS THAN (MAXVALUE) ) WITH ( shard_count = 128, -- Match customer shards replication_factor = 2, region_affinity = true );
-- Order items (co-located with orders by order_id)CREATE TABLE order_items ( order_id BIGINT NOT NULL, item_seq INT NOT NULL, product_id BIGINT NOT NULL, quantity INT, price DECIMAL(10,2), PRIMARY KEY (order_id, item_seq)) SHARD BY (order_id); -- Distribute by order
-- Products (reference table; replicated everywhere)CREATE TABLE products ( product_id BIGINT PRIMARY KEY, name VARCHAR(100), category VARCHAR(50), price DECIMAL(10,2)) WITH (table_type = 'reference'); -- Replicated to all shardsQuery Examples
-- Query 1: Get customer orders (single shard)SELECT o.order_id, o.order_date, o.total, o.statusFROM orders oWHERE o.customer_id = 12345ORDER BY o.order_date DESC;-- Execution: Single shard → ~5ms
-- Query 2: Enrich orders with customer info (co-located join)SELECT c.name, o.order_id, o.total, o.order_dateFROM customers cJOIN orders o ON c.customer_id = o.customer_idWHERE c.customer_id = 12345;-- Execution: Single shard (both tables sharded by customer_id) → ~5ms
-- Query 3: Order with items and products (broadcast join)SELECT o.order_id, p.name, oi.quantity, oi.priceFROM orders oJOIN order_items oi ON o.order_id = oi.order_idJOIN products p ON oi.product_id = p.product_idWHERE o.customer_id = 12345;-- Execution:// 1. Fetch from orders shard (1ms)// 2. Broadcast products table locally available (reference table)// 3. Join on shard (2ms)// Total: ~5ms
-- Query 4: Top customers (multi-shard aggregation)SELECT customer_id, COUNT(*) as order_count, SUM(total) as revenueFROM ordersWHERE order_date >= '2024-01-01'GROUP BY customer_idORDER BY revenue DESCLIMIT 100;-- Execution:// 1. Parallel: aggregate on each of 128 shards (50ms)// 2. Merge partial results (5ms)// Total: ~55msCapacity Planning
Assumptions:- 100M customers- 1B orders/year = ~2.7M/day = 32/second- Average order size: 10KB
Storage:- Customers: 100M * 500B = 50GB- Orders: 1B * 1KB = 1TB- Total data: ~1TB
With 128 shards:- Data per shard: 1TB / 128 ≈ 8GB- Fits easily in cache + hot data
Shards needed:- Data distribution: 128 shards for 1TB (8GB/shard)- Write throughput: 32 writes/sec cluster-wide (0.25/shard)- Read throughput: 1000 reads/sec cluster-wide (8/shard)- CPU: < 5% per shard
Nodes needed:- Start: 4 nodes (32 shards per node)- Growth: Add nodes dynamically; rebalancing automatic- Final: 16-32 nodes for redundancy + read replicasExample 2: Time-Series Metrics System
Requirements
- 10,000 monitored hosts
- 1 million metrics/second from all hosts
- Metrics: CPU, memory, disk, network (100+ metrics/host)
- Retention: 30 days hot, 1 year cold
- Queries: Last 7 days, last 24 hours, hour-level aggregates
Design
-- Metrics table (sharded by host_id + partitioned by time)CREATE TABLE metrics ( host_id BIGINT NOT NULL, metric_name VARCHAR(100) NOT NULL, collected_at TIMESTAMP NOT NULL, value DECIMAL(10,2) NOT NULL, tags JSONB, PRIMARY KEY (host_id, metric_name, collected_at)) SHARD BY HASH(host_id) -- Distribute across hosts PARTITION BY RANGE (collected_at) ( -- Time-based partitions PARTITION p2024_12_25 VALUES LESS THAN ('2024-12-26'), PARTITION p2024_12_26 VALUES LESS THAN ('2024-12-27'), PARTITION p2024_12_27 VALUES LESS THAN ('2024-12-28'), PARTITION p2024_12_28 VALUES LESS THAN ('2024-12-29'), -- ... one partition per day for 30 days PARTITION p_archive VALUES LESS THAN (MAXVALUE) -- Older data ) WITH ( shard_count = 64, -- ~156 hosts per shard replication_factor = 2, ttl = '30 days', -- Auto-delete after 30 days compression = 'zstd' -- Compress old partitions );
-- Host metadata (reference table)CREATE TABLE hosts ( host_id BIGINT PRIMARY KEY, hostname VARCHAR(255), region VARCHAR(10), environment VARCHAR(20), -- 'prod', 'staging', 'dev' cpu_cores INT, memory_gb INT) WITH (table_type = 'reference');
-- Metric definitions (reference table)CREATE TABLE metric_definitions ( metric_name VARCHAR(100) PRIMARY KEY, description VARCHAR(500), unit VARCHAR(50), aggregation_allowed VARCHAR(50) -- 'sum', 'avg', 'min', 'max') WITH (table_type = 'reference');Query Examples
-- Query 1: Single host, last 24 hoursSELECT collected_at, valueFROM metricsWHERE host_id = 1001 AND metric_name = 'cpu_usage' AND collected_at >= NOW() - INTERVAL '24 hours'ORDER BY collected_at DESC;-- Execution: Single shard, partition pruning → ~10ms-- Data accessed: ~24 * 60 points ≈ 1,440 points
-- Query 2: All hosts, last hour (aggregated)SELECT host_id, metric_name, DATE_TRUNC('minute', collected_at) as minute, AVG(value) as avg_value, MIN(value) as min_value, MAX(value) as max_valueFROM metricsWHERE collected_at >= NOW() - INTERVAL '1 hour'GROUP BY host_id, metric_name, minuteORDER BY minute DESC;-- Execution:// 1. Parallel: 64 shards scan latest partition (50ms)// 2. Streaming aggregation on each shard (20ms)// 3. Final aggregation (10ms)// Total: ~80ms// Data returned: ~10K hosts * 100 metrics * 60 minutes = 60M points
-- Query 3: Alert detection (real-time anomaly)SELECT host_id, metric_name, collected_at, valueFROM metricsWHERE collected_at >= NOW() - INTERVAL '5 minutes' AND metric_name IN ('cpu_usage', 'memory_pct', 'disk_io') AND ( value > 0.90 -- CPU > 90% OR value > 0.85 -- Memory > 85% );-- Execution: Parallel scan of active partitions (~50ms)// Filter applied on each shard// Stream anomalies to alerting systemCapacity Planning
Assumptions:- 10,000 hosts- 100 metrics/host = 1M metrics/second- Data size: 100 bytes/metric point
Storage:- Daily ingestion: 1M * 86400 = 86.4B metrics = 8.6TB/day- 30-day hot retention: 30 * 8.6TB = 258TB- 1-year archive: 365 * 8.6TB = 3.1PB
Shards and nodes:- Hot data: 258TB / 64 shards ≈ 4TB/shard- Fits in SSD cache (100-200GB per shard)- Nodes: 64 shards / 16 shards per node = 4 nodes
Write throughput:- 1M metrics/second- 64 shards: 15,625 writes/second per shard- Sustained write: achievable with proper batch insertion
Data flow:- Metrics agents on each host- Send to aggregation tier (load-balanced)- Aggregate batch writes (1000 metrics/batch)- Write via shard key (host_id)- Replication to backup nodeExample 3: Multi-Tenant SaaS Platform
Requirements
- 10,000 customer tenants
- Variable data per tenant (1GB - 100GB)
- Complete tenant isolation
- Tenant → shard mapping explicit
Design
-- Shard directory (explicit tenant → shard mapping)CREATE TABLE tenant_shard_directory ( tenant_id BIGINT PRIMARY KEY, shard_id VARCHAR(50) NOT NULL, primary_node VARCHAR(100) NOT NULL, replica_nodes TEXT NOT NULL, -- CSV: node1,node2 shard_status VARCHAR(20) NOT NULL DEFAULT 'active', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_migrated TIMESTAMP, UNIQUE(shard_id, primary_node)) WITH ( replication_factor = 3, -- Directory replicated for availability shard_count = 4 -- Directory has its own shards);
-- Tenant metadataCREATE TABLE tenants ( tenant_id BIGINT PRIMARY KEY, company_name VARCHAR(255) NOT NULL, plan VARCHAR(20) NOT NULL, -- 'starter', 'pro', 'enterprise' status VARCHAR(20) NOT NULL, -- 'active', 'paused', 'deleted' created_at TIMESTAMP) WITH (table_type = 'reference'); -- Replicated everywhere
-- Tenant-specific data (example: projects table)CREATE TABLE tenant_projects ( tenant_id BIGINT NOT NULL, project_id BIGINT NOT NULL, project_name VARCHAR(255), created_at TIMESTAMP, PRIMARY KEY (tenant_id, project_id)) SHARD BY (tenant_id) WITH ( shard_count = 100, -- 100 shards can serve 10K tenants replication_factor = 2 );
-- All other tenant tables similarly sharded by tenant_idCREATE TABLE tenant_tasks ( tenant_id BIGINT NOT NULL, task_id BIGINT NOT NULL, project_id BIGINT, title VARCHAR(255), PRIMARY KEY (tenant_id, task_id)) SHARD BY (tenant_id);Routing Logic
Application layer (pseudocode):
GET /api/v1/projects?tenant_id=1234 1. Look up tenant shard: shard_info = cache.get('tenant:1234:shard') if not shard_info: shard_info = db.query('SELECT shard_id, primary_node FROM tenant_shard_directory WHERE tenant_id = 1234') cache.set('tenant:1234:shard', shard_info, TTL=3600s)
2. Route to shard: projects = db.query_shard(shard_info.primary_node, 'SELECT * FROM tenant_projects WHERE tenant_id = 1234')
3. Return to client
Advantages:✓ Explicit control over data placement✓ Tenant isolation enforced at shard level✓ Easy to move tenant between shards (update directory)✓ Directory caching minimizes lookup overheadTenant Migration (Move to Different Shard)
-- Scenario: Moving tenant 1234 from shard_0 to shard_1 due to growth
-- Step 1: Create new shard entry in directory (transactional)BEGIN; UPDATE tenant_shard_directory SET shard_id = 'shard_1_new', primary_node = 'node-x', shard_status = 'migrating' WHERE tenant_id = 1234;COMMIT;
-- Step 2: Background migration job-- - Copy all tenant data from shard_0 to shard_1// - Dual-write to both shards during migration window// - Verify consistency (checksums)// - Cutover: redirect reads/writes to new shard
-- Step 3: Update directory (commit migration)UPDATE tenant_shard_directorySET shard_status = 'active', last_migrated = NOW()WHERE tenant_id = 1234;
-- Step 4: Cleanup old shardDELETE FROM tenant_projects WHERE tenant_id = 1234 AND shard_id = 'shard_0';Capacity Planning
Methodology
- Estimate data volume (total dataset size)
- Determine shard capacity (data per shard)
- Calculate shard count (volume / capacity)
- Plan node count (consider shards per node + replication)
- Add headroom (growth buffer)
Example Calculation
Scenario: 10 million users, 100KB average per user
1. Data Volume Total: 10M users * 100KB/user = 1TB
2. Shard Capacity Target: 10-50GB per shard (enough to fit in hot storage) Choose: 20GB per shard
3. Shard Count Shards needed: 1TB / 20GB = 50 shards Round up: 64 shards (power of 2 for hashing)
4. Node Count Shards per node: 16 (balance) Nodes: 64 / 16 = 4 nodes With 2x replication: 8 physical nodes (4 primary + 4 replica)
5. Growth Buffer Plan for 5x growth: 4 nodes → 20 nodes Re-shard gradually as load increasesShard Size Recommendations
| Dataset Size | Shard Count | Shard Size | Nodes | Notes |
|---|---|---|---|---|
| 10GB | 1 | 10GB | 1 | Single shard; no distribution |
| 100GB | 4 | 25GB | 2-4 | Starter cluster |
| 1TB | 16 | 64GB | 4-8 | Small production |
| 10TB | 64 | 156GB | 16-32 | Medium production |
| 100TB | 256 | 390GB | 64-128 | Large cluster |
| 1PB+ | 1024+ | 1TB | 256+ | Mega-cluster |
When NOT to Shard
Small Datasets
Dataset < 500GB ↓Fits on single node ↓Sharding adds complexity with minimal benefit ↓Recommendation: Use single node or simple replicationLow Throughput
Throughput < 5K req/sec ↓Single node can handle ↓Network latency of sharding > benefit ↓Recommendation: Vertical scaling (bigger node) sufficientComplex Cross-Shard Operations
Most queries require joins across different shard keys ↓Repartition joins have high latency ↓Network traffic dominates query cost ↓Recommendation: Redesign schema to enable co-located joinsStrong ACID Guarantees Required
Frequent multi-shard transactions ↓Distributed 2-phase commit overhead (50-500ms) ↓Single-node transactions 1000x faster (1-5ms) ↓Recommendation: Single node; replicate for availabilityOperational Complexity Unacceptable
No experienced DevOps team ↓Sharding requires specialized operational expertise ↓Failure scenarios are complex ↓Recommendation: Use managed database service OR single nodeMigration Strategies
Strategy 1: Rolling Migration (Recommended)
Approach: Gradually migrate data from single node to sharded cluster.
Phase 1: Dual-write├─ New writes go to BOTH old (single node) and new (sharded cluster)├─ Reads from old node (single source of truth)└─ Duration: 1-7 days (until new cluster populated)
Phase 2: Shadow reads├─ Begin shadow reads from new cluster├─ Compare results with old cluster├─ Monitor discrepancies└─ Duration: 1-7 days (until confident)
Phase 3: Cutover├─ Redirect reads to new cluster├─ Keep writing to both for safety window├─ Monitor for issues└─ Duration: 1 day
Phase 4: Cleanup├─ Stop writing to old node├─ Archive old data├─ Decommission hardware└─ Duration: 1-2 daysAdvantages:
- Zero downtime
- Easy rollback if issues discovered
- Safe; can revert to old cluster
Disadvantages:
- Long migration duration
- Operational complexity (dual-write maintenance)
- Disk space required for old + new cluster
Strategy 2: Bulk Rebalance
Approach: Stop application, migrate data, restart.
1. Schedule maintenance window (e.g., 2:00 AM Sunday)2. Drain application connections (graceful shutdown)3. Perform bulk data export from single node4. Import into sharded cluster (map to shards)5. Verify data integrity6. Update application to connect to new cluster7. Restart application8. Monitor for issues
Downtime: 30 min - 2 hours (depends on data volume)Advantages:
- Simple; fewer code changes
- Fast (no dual-write overhead)
Disadvantages:
- Requires downtime
- Riskier (less time for verification)
- Harder to rollback
Strategy 3: Hybrid (Recommended for Large Datasets)
Approach: Combination of above.
1. Use rolling migration for 80% of data (parallel processing)2. Final 20% using bulk import during brief maintenance window3. Verification during migration4. Quick cutover during maintenance
Downtime: < 30 minutes (just cutover)Duration: 2-4 weeks for complete migrationTroubleshooting
Problem: Uneven Shard Distribution
Symptom: SELECT shard_id, size_mb FROM system.shard_stats shows wide variance.
Example (bad):shard_id | size_mb---------|----------shard_0 | 500shard_1 | 50shard_2 | 480shard_3 | 60
Standard deviation: 242 (very high imbalance)Diagnosis:
-- Check shard key cardinalitySELECT COUNT(DISTINCT shard_key_value) as cardinality, COUNT(*) as total_recordsFROM table_name;
-- Expected: cardinality should be >> shard_count-- If cardinality < shard_count * 10, poor shard key choice
-- Identify hotspot keysSELECT shard_key_value, COUNT(*) as frequencyFROM table_nameGROUP BY shard_key_valueORDER BY frequency DESCLIMIT 10;
-- If one key has > 5% of records, that's a hotspotSolutions:
- Better shard key: Change to higher-cardinality column
- Hash the key: Use
HASH(current_key)instead of raw value - Composite key: Combine multiple columns
- Rebalancing: Use HeliosDB’s automatic split/merge
-- Change shard key (requires table rebuild)CREATE TABLE table_name_new (...)SHARD BY HASH(better_key_column);
-- Migrate dataINSERT INTO table_name_new SELECT * FROM table_name;
-- Swap tablesALTER TABLE table_name_old RENAME TO table_name_backup;ALTER TABLE table_name_new RENAME TO table_name;DROP TABLE table_name_backup;Problem: High Cross-Shard Join Latency
Symptom: Queries joining two tables take 100ms+ (much slower than expected).
Diagnosis:
-- Check shard keys for each tableSELECT table_name, shard_key FROM system.table_metadata;
-- If different shard keys:EXPLAIN SELECT * FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id;-- Shows: RepartitionJoin (high cost)Solutions:
-
Co-locate tables: Use same shard key
CREATE TABLE table2 (...) SHARD BY (table1_shard_key); -
Reference table: If table2 is small
CREATE TABLE table2 (...) WITH (table_type = 'reference'); -
Denormalization: Combine tables
CREATE TABLE combined (...) SHARD BY (shard_key); -
Accept latency: Some cross-shard joins unavoidable; optimize elsewhere
Problem: Hotspot in Shard (High CPU/Memory)
Symptom: One shard has 80%+ CPU while others idle.
Diagnosis:
-- Identify hot shardSELECT shard_id, cpu_usage, memory_usage, request_countFROM system.shard_metricsORDER BY cpu_usage DESC;
-- Identify hot keys within shardSELECT shard_key_value, request_count, frequencyFROM system.shard_hotspot_analysisWHERE shard_id = 'shard_0'ORDER BY request_count DESCLIMIT 20;Solutions:
-
Automatic split: HeliosDB splits shard automatically
Monitoring: Detect cpu_usage > 0.80 for 5 minutesAction: Split shard into two child shardsResult: Load distributed -
Manual split:
-- CLI: heliosdb shard split --shard shard_0 --target 2// Splits shard_0 into shard_0a and shard_0b -
Read from replicas:
SELECT * FROM table_nameWHERE shard_key = hotspot_keyWITH (read_preference = 'replica'); -- Read from replica, not primary -
Cache in application: If queries are cacheable
Application caches results locallyReduces load on shard
Problem: Rebalancing Takes Too Long
Symptom: SELECT * FROM system.rebalance_operations shows operations stuck in progress for hours.
Diagnosis:
-- Check rebalance progressSELECT operation_id, operation_type, start_time, progress_pct, estimated_completion, CURRENT_TIMESTAMP - start_time as elapsedFROM system.rebalance_operationsWHERE status = 'in_progress';Causes:
- Low network bandwidth between nodes
- Slow storage (disk I/O bottleneck)
- Too many concurrent operations (default limit: 2)
- Large shard size (> 100GB)
Solutions:
-- Reduce concurrent operations (less network contention)ALTER SYSTEM SET sharding.max_concurrent_rebalance = 1;
-- Adjust throttling (slower migration, less impact)ALTER SYSTEM SET sharding.migration_batch_size = 500; -- Default: 1000ALTER SYSTEM SET sharding.migration_sleep_ms = 100; -- Add delay between batches
-- Monitor network and disk// Check inter-node bandwidth: ethtool, iperf// Check disk I/O: iostat, vmstat// Upgrade hardware if bottleneck identified
-- Increase available bandwidth// Upgrade network (10G → 40G)// Add storage nodes// Upgrade SSDSummary: Decision Framework
Use Sharding When:
- Dataset > 500GB
- Throughput > 10K req/sec
- Multi-region deployment
- Need high availability + scalability
Choose Hash Sharding If:
- Diverse access patterns
- Even distribution critical
- Minimal data movement during scaling
Choose Range Sharding If:
- Time-series or sequential data
- Range queries common
- Data retention/archival policy exists
Choose Geo-Distributed If:
- Multi-region deployment
- Latency-sensitive
- Data residency compliance required
Choose Directory-Based If:
- Multi-tenant SaaS
- Fine-grained control needed
- < 10K shards
Avoid Sharding If:
- Dataset < 500GB
- Throughput < 5K req/sec
- Complex ACID requirements
- Limited operational expertise
Next Steps
- Deployment Guide: Multi-node cluster setup
- Monitoring Guide: Track shard health
- Performance Tuning: Optimize sharded queries
- API Reference: Sharding configuration options