Sharding & Rebalancing
Sharding & Rebalancing — Scale to Petabytes Without Downtime
Crate: heliosdb-cluster/crates/sharding (elastic shard manager) + heliosdb-cluster/crates/rebalancer
Status: Production
Sub-docs in source: ARCHITECTURE.md, REBALANCING_GUIDE.md, SCHEMA_BASED_SHARDING.md, SPLIT_MERGE_GUIDE.md
UVP
When a single Raft group can’t hold your dataset, you shard. Most databases force you to pick a fixed sharding scheme up front and live with it forever. HeliosDB ships elastic sharding: hot shards split themselves, cold shards merge themselves, and the rebalancer moves data between nodes online — all while serving queries. Pick hash, range, or composite. Pick virtual-node count for distribution. Tune split/merge thresholds. The system observes load and reshapes the cluster while you sleep — with <10% data movement guaranteed during any rebalance.
Prerequisites
- A multi-node HeliosDB Full cluster (see raft-setup.md).
- A workload that doesn’t fit comfortably on a single shard (or a forecast that says it won’t soon).
- About 25 minutes.
1. The Three Strategies
From the sharding crate’s README:
| Strategy | Picks shards by | Best for |
|---|---|---|
| Hash (default) | hash(key) % N with virtual nodes | Uniform distribution, point lookups |
| Range | (low_key, high_key) per shard | Range scans, time-series |
| Composite | Hash bucket + range within bucket | Hot keys spread across buckets, but range scans within |
// Hash-basedlet shard = HashStrategy::get_shard("key", 10);
// Range-basedlet ranges = vec![ ("a".to_string(), "m".to_string()), ("m".to_string(), "z".to_string()),];let shard = RangeStrategy::get_shard("key", &ranges);
// Compositelet shard = CompositeStrategy::get_shard("key", 4, &ranges_per_bucket);Hash is the safest default. Range becomes attractive when most queries are WHERE created_at BETWEEN .... Composite is a tuning knob, not a starting choice.
2. Configure the Elastic Shard Manager
use heliosdb_sharding::{ElasticShardManager, ShardConfig, ShardDataOps};use std::sync::Arc;
let config = ShardConfig { initial_shards: 4, virtual_nodes_per_shard: 150, split_threshold: 0.8, merge_threshold: 0.3, auto_rebalance: true, ..Default::default()};
let data_ops: Arc<dyn ShardDataOps> = Arc::new(MyDataOps::new());let manager = ElasticShardManager::new(config, data_ops).await?;Two important defaults from the source:
virtual_nodes_per_shard: 150— consistent hashing uses virtual nodes so that every physical shard owns roughly equal slices of the ring. The README recommends 100-200 for production. Lower → uneven distribution. Higher → more memory in the ring map.split_threshold: 0.8/merge_threshold: 0.3— fractions of CPU/memory utilization that trigger automatic split (hot) or merge (cold).
3. Route a Key
let shard = manager.get_shard_for_key("user:1234").await?;println!("user:1234 lives on {}", shard);get_shard_for_key is the workhorse — every read and write path on top of the manager funnels through it. Latency target from the README: ~500 ns per call.
4. Manually Split a Hot Shard
let (left, right) = manager.split_shard("shard-1").await?;println!("Split into {} and {}", left, right);What happens (from the SPLIT_MERGE_GUIDE flow):
- Identify hot shard (high CPU/memory/storage).
- Find optimal split point (median key or hash midpoint).
- Create two child shards.
- Copy data incrementally to the children.
- Atomic ring update.
- Redirect traffic.
- Delete parent shard.
The whole sequence is online. Reads and writes against shard-1 succeed throughout — the planner just reroutes them at step 5.
Benchmark from the README: ~100ms for 10K keys.
5. Merge Cold Shards
let merged = manager.merge_shards(&["shard-2", "shard-3"]).await?;Same shape as split, in reverse. Constraint: shards must be adjacent in key space. The merger refuses non-adjacent merges to keep range queries fast.
Benchmark: ~80ms for 10K keys.
6. Configure Auto-Rebalancing
The rebalancer runs in the background when auto_rebalance: true. Its config (see REBALANCING_GUIDE):
use heliosdb_sharding::RebalanceConfig;
let config = RebalanceConfig { target_cpu: 0.6, target_memory: 0.6, max_variance: 0.3, min_improvement: 0.1, auto_rebalance: true, check_interval_secs: 300, // 5 minutes};| Knob | Default | What it does |
|---|---|---|
target_cpu / target_memory | 0.6 | Per-shard utilization the rebalancer aims for. Lower = more headroom. |
max_variance | 0.3 | Acceptable load spread across shards. Lower = more aggressive rebalances. |
min_improvement | 0.1 | Plan must improve load by ≥10% to be executed. Higher = fewer no-op rebalances. |
check_interval_secs | 300 | How often to evaluate. Lower = faster reaction; higher = more stability. |
7. Inspect a Rebalance Plan Before Executing
let plan = manager.rebalance().await?;
println!("Data movement: {} bytes", plan.impact.data_movement_bytes);println!("Shards affected: {}", plan.impact.shards_affected);println!("Estimated time: {}s", plan.impact.estimated_duration_secs);println!("Load improvement: {:.1}%", plan.impact.load_improvement * 100.0);
if !plan.is_empty() { manager.execute_rebalance(plan).await?;}Key invariant from REBALANCING_GUIDE: “Guarantees <10% data movement relative to total cluster size”. The planner refuses any plan that would shuffle more.
If a step fails mid-execution, the rebalancer rolls back automatically.
8. Implement ShardDataOps
The shard manager doesn’t know about your storage. You provide a trait implementation that knows how to read/write/delete shards:
use async_trait::async_trait;use heliosdb_sharding::{ShardDataOps, ShardMetrics, SplitError};use std::collections::HashMap;
struct MyDataOps;
#[async_trait]impl ShardDataOps for MyDataOps { async fn get_keys(&self, shard_id: &str) -> Result<Vec<String>, SplitError> { /* ... */ } async fn get_data(&self, shard_id: &str) -> Result<HashMap<String, Vec<u8>>, SplitError> { /* ... */ } async fn copy_data(&self, target: &str, data: HashMap<String, Vec<u8>>) -> Result<(), SplitError> { /* ... */ } async fn delete_shard(&self, shard_id: &str) -> Result<(), SplitError> { /* ... */ } async fn get_metrics(&self, shard_id: &str) -> Result<ShardMetrics, SplitError> { /* ... */ }}The default HeliosDB Full server already wires this up against its storage engine — you only need this if you’re embedding the sharding library in something custom.
9. Schema-Based Sharding (Multi-Tenant Twist)
The Full edition also supports schema-based sharding, where each tenant gets its own schema and the entire schema lives on one shard:
CREATE SCHEMA tenant_1234 DISTRIBUTED;
CREATE TABLE tenant_1234.users ( user_id SERIAL PRIMARY KEY, name TEXT, email TEXT);
CREATE TABLE tenant_1234.orders ( order_id SERIAL PRIMARY KEY, user_id INT REFERENCES tenant_1234.users(user_id), amount DECIMAL);No tenant_id column needed in every table. Foreign keys work natively because all of tenant_1234.* lives on the same physical shard. See SCHEMA_BASED_SHARDING.md in the crate for the full design.
10. Migrate a Shard to a Different Node
Moving shard-4 from one node to another is a four-phase online operation:
manager.migrate_shard("shard-4", "node-2").await?;Phases (from the README):
- Bulk Transfer — copy existing data to the target.
- Incremental Sync — stream changes that arrived during phase 1.
- Switchover — atomically redirect traffic.
- Verification — optional consistency check.
- Cleanup — drop the data on the source.
No downtime. No client retries.
11. Watch Shard Statistics
let stats = manager.get_shard_statistics("shard-1").await?;println!("CPU: {}", stats.metrics.cpu_usage);println!("Memory: {}", stats.metrics.memory_usage);println!("Storage: {} bytes", stats.storage_bytes);println!("Requests/sec: {}", stats.operations_per_sec);These same numbers feed Prometheus via the standard observability surface.
Where Next
- raft-setup.md — each shard is its own Raft group.
- multi-region-active-active.md — shard and replicate across regions.
- Source:
SPLIT_MERGE_GUIDE.md,REBALANCING_GUIDE.md,ARCHITECTURE.md.