Skip to content

Sharding & Rebalancing

Sharding & Rebalancing — Scale to Petabytes Without Downtime

Crate: heliosdb-cluster/crates/sharding (elastic shard manager) + heliosdb-cluster/crates/rebalancer Status: Production Sub-docs in source: ARCHITECTURE.md, REBALANCING_GUIDE.md, SCHEMA_BASED_SHARDING.md, SPLIT_MERGE_GUIDE.md


UVP

When a single Raft group can’t hold your dataset, you shard. Most databases force you to pick a fixed sharding scheme up front and live with it forever. HeliosDB ships elastic sharding: hot shards split themselves, cold shards merge themselves, and the rebalancer moves data between nodes online — all while serving queries. Pick hash, range, or composite. Pick virtual-node count for distribution. Tune split/merge thresholds. The system observes load and reshapes the cluster while you sleep — with <10% data movement guaranteed during any rebalance.


Prerequisites

  • A multi-node HeliosDB Full cluster (see raft-setup.md).
  • A workload that doesn’t fit comfortably on a single shard (or a forecast that says it won’t soon).
  • About 25 minutes.

1. The Three Strategies

From the sharding crate’s README:

StrategyPicks shards byBest for
Hash (default)hash(key) % N with virtual nodesUniform distribution, point lookups
Range(low_key, high_key) per shardRange scans, time-series
CompositeHash bucket + range within bucketHot keys spread across buckets, but range scans within
// Hash-based
let shard = HashStrategy::get_shard("key", 10);
// Range-based
let ranges = vec![
("a".to_string(), "m".to_string()),
("m".to_string(), "z".to_string()),
];
let shard = RangeStrategy::get_shard("key", &ranges);
// Composite
let shard = CompositeStrategy::get_shard("key", 4, &ranges_per_bucket);

Hash is the safest default. Range becomes attractive when most queries are WHERE created_at BETWEEN .... Composite is a tuning knob, not a starting choice.


2. Configure the Elastic Shard Manager

use heliosdb_sharding::{ElasticShardManager, ShardConfig, ShardDataOps};
use std::sync::Arc;
let config = ShardConfig {
initial_shards: 4,
virtual_nodes_per_shard: 150,
split_threshold: 0.8,
merge_threshold: 0.3,
auto_rebalance: true,
..Default::default()
};
let data_ops: Arc<dyn ShardDataOps> = Arc::new(MyDataOps::new());
let manager = ElasticShardManager::new(config, data_ops).await?;

Two important defaults from the source:

  • virtual_nodes_per_shard: 150 — consistent hashing uses virtual nodes so that every physical shard owns roughly equal slices of the ring. The README recommends 100-200 for production. Lower → uneven distribution. Higher → more memory in the ring map.
  • split_threshold: 0.8 / merge_threshold: 0.3 — fractions of CPU/memory utilization that trigger automatic split (hot) or merge (cold).

3. Route a Key

let shard = manager.get_shard_for_key("user:1234").await?;
println!("user:1234 lives on {}", shard);

get_shard_for_key is the workhorse — every read and write path on top of the manager funnels through it. Latency target from the README: ~500 ns per call.


4. Manually Split a Hot Shard

let (left, right) = manager.split_shard("shard-1").await?;
println!("Split into {} and {}", left, right);

What happens (from the SPLIT_MERGE_GUIDE flow):

  1. Identify hot shard (high CPU/memory/storage).
  2. Find optimal split point (median key or hash midpoint).
  3. Create two child shards.
  4. Copy data incrementally to the children.
  5. Atomic ring update.
  6. Redirect traffic.
  7. Delete parent shard.

The whole sequence is online. Reads and writes against shard-1 succeed throughout — the planner just reroutes them at step 5.

Benchmark from the README: ~100ms for 10K keys.


5. Merge Cold Shards

let merged = manager.merge_shards(&["shard-2", "shard-3"]).await?;

Same shape as split, in reverse. Constraint: shards must be adjacent in key space. The merger refuses non-adjacent merges to keep range queries fast.

Benchmark: ~80ms for 10K keys.


6. Configure Auto-Rebalancing

The rebalancer runs in the background when auto_rebalance: true. Its config (see REBALANCING_GUIDE):

use heliosdb_sharding::RebalanceConfig;
let config = RebalanceConfig {
target_cpu: 0.6,
target_memory: 0.6,
max_variance: 0.3,
min_improvement: 0.1,
auto_rebalance: true,
check_interval_secs: 300, // 5 minutes
};
KnobDefaultWhat it does
target_cpu / target_memory0.6Per-shard utilization the rebalancer aims for. Lower = more headroom.
max_variance0.3Acceptable load spread across shards. Lower = more aggressive rebalances.
min_improvement0.1Plan must improve load by ≥10% to be executed. Higher = fewer no-op rebalances.
check_interval_secs300How often to evaluate. Lower = faster reaction; higher = more stability.

7. Inspect a Rebalance Plan Before Executing

let plan = manager.rebalance().await?;
println!("Data movement: {} bytes", plan.impact.data_movement_bytes);
println!("Shards affected: {}", plan.impact.shards_affected);
println!("Estimated time: {}s", plan.impact.estimated_duration_secs);
println!("Load improvement: {:.1}%", plan.impact.load_improvement * 100.0);
if !plan.is_empty() {
manager.execute_rebalance(plan).await?;
}

Key invariant from REBALANCING_GUIDE: “Guarantees <10% data movement relative to total cluster size”. The planner refuses any plan that would shuffle more.

If a step fails mid-execution, the rebalancer rolls back automatically.


8. Implement ShardDataOps

The shard manager doesn’t know about your storage. You provide a trait implementation that knows how to read/write/delete shards:

use async_trait::async_trait;
use heliosdb_sharding::{ShardDataOps, ShardMetrics, SplitError};
use std::collections::HashMap;
struct MyDataOps;
#[async_trait]
impl ShardDataOps for MyDataOps {
async fn get_keys(&self, shard_id: &str) -> Result<Vec<String>, SplitError> { /* ... */ }
async fn get_data(&self, shard_id: &str) -> Result<HashMap<String, Vec<u8>>, SplitError> { /* ... */ }
async fn copy_data(&self, target: &str, data: HashMap<String, Vec<u8>>) -> Result<(), SplitError> { /* ... */ }
async fn delete_shard(&self, shard_id: &str) -> Result<(), SplitError> { /* ... */ }
async fn get_metrics(&self, shard_id: &str) -> Result<ShardMetrics, SplitError> { /* ... */ }
}

The default HeliosDB Full server already wires this up against its storage engine — you only need this if you’re embedding the sharding library in something custom.


9. Schema-Based Sharding (Multi-Tenant Twist)

The Full edition also supports schema-based sharding, where each tenant gets its own schema and the entire schema lives on one shard:

CREATE SCHEMA tenant_1234 DISTRIBUTED;
CREATE TABLE tenant_1234.users (
user_id SERIAL PRIMARY KEY,
name TEXT,
email TEXT
);
CREATE TABLE tenant_1234.orders (
order_id SERIAL PRIMARY KEY,
user_id INT REFERENCES tenant_1234.users(user_id),
amount DECIMAL
);

No tenant_id column needed in every table. Foreign keys work natively because all of tenant_1234.* lives on the same physical shard. See SCHEMA_BASED_SHARDING.md in the crate for the full design.


10. Migrate a Shard to a Different Node

Moving shard-4 from one node to another is a four-phase online operation:

manager.migrate_shard("shard-4", "node-2").await?;

Phases (from the README):

  1. Bulk Transfer — copy existing data to the target.
  2. Incremental Sync — stream changes that arrived during phase 1.
  3. Switchover — atomically redirect traffic.
  4. Verification — optional consistency check.
  5. Cleanup — drop the data on the source.

No downtime. No client retries.


11. Watch Shard Statistics

let stats = manager.get_shard_statistics("shard-1").await?;
println!("CPU: {}", stats.metrics.cpu_usage);
println!("Memory: {}", stats.metrics.memory_usage);
println!("Storage: {} bytes", stats.storage_bytes);
println!("Requests/sec: {}", stats.operations_per_sec);

These same numbers feed Prometheus via the standard observability surface.


Where Next