HeliosDB Elastic Sharding
HeliosDB Elastic Sharding
A dynamic sharding system that allows online shard splitting/merging and zero-downtime resharding for distributed databases.
Features
- Dynamic Shard Splitting: Automatically split hot shards during runtime without downtime
- Shard Merging: Merge underutilized shards to optimize resource usage
- Zero-Downtime Migration: Move shards between nodes without service interruption
- Consistent Hashing: Minimize data movement when nodes are added or removed
- Automatic Rebalancing: Monitor and balance load across shards automatically
- Multiple Strategies: Support for hash-based, range-based, and composite sharding
- Virtual Nodes: Enhanced load distribution using virtual node technique
Quick Start
Add to your Cargo.toml:
[dependencies]heliosdb-sharding = "3.0.0"Usage Example
use heliosdb_sharding::{ElasticShardManager, ShardConfig, ShardDataOps};use std::sync::Arc;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Configure elastic sharding let config = ShardConfig { initial_shards: 4, virtual_nodes_per_shard: 150, split_threshold: 0.8, merge_threshold: 0.3, auto_rebalance: true, ..Default::default() };
// Create data operations handler (implement ShardDataOps trait) let data_ops = Arc::new(MyDataOps::new());
// Create manager let manager = ElasticShardManager::new(config, data_ops).await?;
// Route keys to shards let shard = manager.get_shard_for_key("user:1234").await?; println!("Key routed to: {}", shard);
// Split a hot shard let (left, right) = manager.split_shard("shard-1").await?; println!("Split into: {} and {}", left, right);
// Merge cold shards let merged = manager.merge_shards(&["shard-2", "shard-3"]).await?; println!("Merged into: {}", merged);
// Generate and execute rebalance plan let plan = manager.rebalance().await?; if !plan.is_empty() { manager.execute_rebalance(plan).await?; }
Ok(())}Architecture
Components
- Hash Ring: Consistent hashing implementation with virtual nodes
- Splitter: Handles shard splitting operations
- Merger: Handles shard merging operations
- Migrator: Manages online data migration between nodes
- Balancer: Analyzes load and generates rebalance plans
- Coordinator: Orchestrates all elastic sharding operations
Shard Operations
Split Operation Flow
- Identify hot shard (high CPU/memory/storage usage)
- Find optimal split point (median key or hash range midpoint)
- Create two child shards
- Copy data incrementally to child shards
- Update consistent hash ring atomically
- Redirect traffic to child shards
- Delete parent shard
Merge Operation Flow
- Identify cold shards (low utilization)
- Verify shards are adjacent in key space
- Create merged shard
- Copy data from source shards
- Update hash ring atomically
- Redirect traffic to merged shard
- Delete source shards
Migration Operation Flow
- Phase 1 - Bulk Transfer: Copy existing data to target node
- Phase 2 - Incremental Sync: Catch up with ongoing changes
- Phase 3 - Switchover: Atomically redirect traffic to target
- Phase 4 - Verification: Verify data consistency (optional)
- Phase 5 - Cleanup: Remove data from source node
Configuration
ShardConfig
pub struct ShardConfig { /// Initial number of shards pub initial_shards: usize,
/// Virtual nodes per shard in hash ring (higher = better distribution) pub virtual_nodes_per_shard: usize,
/// CPU/memory threshold for splitting (0.0-1.0) pub split_threshold: f64,
/// CPU/memory threshold for merging (0.0-1.0) pub merge_threshold: f64,
/// Enable automatic rebalancing pub auto_rebalance: bool,
/// Sharding strategy (Hash, Range, Composite) pub strategy: ShardStrategy,
/// Rebalance check interval (seconds) pub rebalance_interval_secs: u64,}Split Criteria
pub struct SplitCriteria { pub cpu_threshold: f64, // 0.8 (80%) pub memory_threshold: f64, // 0.8 pub storage_threshold: u64, // 10GB pub request_rate_threshold: f64, // 10,000 req/s pub min_keys: usize, // 1,000 keys}Merge Criteria
pub struct MergeCriteria { pub cpu_threshold: f64, // 0.3 (30%) pub memory_threshold: f64, // 0.3 pub storage_threshold: u64, // 1GB pub request_rate_threshold: f64, // 1,000 req/s pub max_merged_keys: usize, // 1M keys pub min_total_shards: usize, // 2 shards}Implementing ShardDataOps
To use the elastic shard manager, implement the ShardDataOps trait:
use async_trait::async_trait;use heliosdb_sharding::{ShardDataOps, ShardMetrics, SplitError};use std::collections::HashMap;
struct MyDataOps { // Your storage implementation}
#[async_trait]impl ShardDataOps for MyDataOps { async fn get_keys(&self, shard_id: &str) -> Result<Vec<String>, SplitError> { // Return all keys in the shard }
async fn get_data(&self, shard_id: &str) -> Result<HashMap<String, Vec<u8>>, SplitError> { // Return all key-value pairs }
async fn copy_data(&self, target_shard: &str, data: HashMap<String, Vec<u8>>) -> Result<(), SplitError> { // Copy data to target shard }
async fn delete_shard(&self, shard_id: &str) -> Result<(), SplitError> { // Delete the shard }
async fn get_metrics(&self, shard_id: &str) -> Result<ShardMetrics, SplitError> { // Return shard metrics (CPU, memory, storage, etc.) }}Performance
Consistent Hashing Benefits
- Minimal Data Movement: When adding/removing nodes, only ~1/N of data needs to move
- Even Distribution: Virtual nodes ensure balanced key distribution
- Deterministic: Same key always routes to same shard
Benchmarks
On a typical system:
- Hash Lookup: ~500ns per operation
- Split Operation: ~100ms for 10K keys
- Merge Operation: ~80ms for 10K keys
- Rebalance Analysis: ~50ms for 100 shards
Testing
Run tests:
# Unit testscargo test --lib
# Integration testscargo test --test integration_test
# All testscargo test
# With outputcargo test -- --nocaptureRun examples:
cargo run --example elastic_shardMonitoring
Shard Statistics
let stats = manager.get_shard_statistics("shard-1").await?;println!("CPU: {}", stats.metrics.cpu_usage);println!("Memory: {}", stats.metrics.memory_usage);println!("Storage: {} bytes", stats.storage_bytes);println!("Requests/sec: {}", stats.operations_per_sec);Rebalance Plan Impact
let plan = manager.rebalance().await?;println!("Data Movement: {} bytes", plan.impact.data_movement_bytes);println!("Shards Affected: {}", plan.impact.shards_affected);println!("Estimated Duration: {}s", plan.impact.estimated_duration_secs);println!("Load Improvement: {:.1}%", plan.impact.load_improvement * 100.0);Best Practices
- Virtual Nodes: Use 100-200 virtual nodes per shard for optimal distribution
- Split Threshold: Set split threshold at 70-80% utilization
- Merge Threshold: Set merge threshold at 20-30% utilization
- Auto-Rebalance: Enable with appropriate check interval (5-10 minutes)
- Monitoring: Track shard metrics and rebalancing frequency
- Testing: Test split/merge operations under load before production
Advanced Features
Custom Sharding Strategies
// Hash-based shardinglet shard = HashStrategy::get_shard("key", 10);
// Range-based shardinglet ranges = vec![ ("a".to_string(), "m".to_string()), ("m".to_string(), "z".to_string()),];let shard = RangeStrategy::get_shard("key", &ranges);
// Composite sharding (hash + range)let shard = CompositeStrategy::get_shard("key", 4, &ranges_per_bucket);Manual Operations
// Manual split with custom split pointmanager.split_shard("shard-1").await?;
// Manual mergemanager.merge_shards(&["shard-2", "shard-3"]).await?;
// Manual migrationmanager.migrate_shard("shard-4", "node-2").await?;Troubleshooting
Split Failures
- Ensure shard has minimum number of keys
- Check that split threshold is met
- Verify shard is not already splitting
Merge Failures
- Verify shards meet merge criteria
- Check that shards are not already being merged
- Ensure merged size doesn’t exceed limits
High Data Movement
- Increase virtual nodes per shard
- Adjust rebalance thresholds
- Reduce rebalance frequency
Contributing
Contributions are welcome! Please see ARCHITECTURE.md for design details.
License
MIT OR Apache-2.0
See Also
- ARCHITECTURE.md - Detailed architecture documentation
- examples/ - Additional usage examples
- HeliosDB - Main database repository