Skip to content

HeliosDB Metadata Service

HeliosDB Metadata Service

Raft-based distributed metadata consensus service for HeliosDB.

Overview

The metadata service is the brain of the HeliosDB cluster, responsible for:

  • Schema Management: DDL operations (CREATE/DROP TABLE, schema registry)
  • Sharding Topology: Mapping of data ranges to storage nodes
  • Node Registration: Health monitoring of compute/storage nodes
  • Cache Invalidation: Broadcasting metadata updates to compute nodes

Architecture

Raft Consensus

The metadata service uses the Raft consensus algorithm to ensure:

  • Consistency: All metadata nodes agree on the same state
  • Fault Tolerance: Survives node failures (requires 3 or 5 nodes)
  • Leader Election: Automatic failover when leader fails
  • Log Replication: Changes are replicated before being applied

Components

  1. Raft Storage Layer (raft_storage.rs)

    • RocksDB-backed persistent storage for Raft log
    • Handles log entries, hard state, conf state, and snapshots
    • In-memory cache for fast access
  2. State Machine (state_machine.rs)

    • Applies committed Raft commands
    • Manages topology and schema state
    • Supports snapshot creation/restoration
  3. gRPC Server (metadata_server.rs)

    • Implements MetadataService gRPC API
    • Routes requests to Raft leader
    • Broadcasts updates to subscribers
  4. Client Library (client.rs)

    • Local cache with automatic invalidation
    • Subscribe to metadata updates
    • Shard-aware routing support

Features

1. Table Management

// Create table
let mut table = TableSchema::new(1, "users".to_string());
table.add_column(ColumnSchema {
column_id: 1,
name: "id".to_string(),
data_type: DataType::Int64,
nullable: false,
toast_strategy: None,
});
let table_id = service.create_table(table).await?;
// Drop table
service.drop_table(table_id).await?;

2. Topology Management

// Update cluster topology
let mut topology = service.get_topology();
// Add storage node
topology.add_node(NodeInfo {
node_id: "storage_1".to_string(),
addr: "127.0.0.1:6000".to_string(),
node_type: NodeType::Storage,
status: NodeStatus::Online,
});
// Add shard
topology.add_shard(ShardInfo {
shard_id: 1,
primary_node: "storage_1".to_string(),
mirror_node: Some("storage_2".to_string()),
witness_node: None,
key_range: (vec![0x00], vec![0xFF]),
});
service.update_topology(topology).await?;

3. Cache Invalidation

The service broadcasts MetadataUpdate events via gRPC streaming:

// Client subscribes to updates
let mut client = MetadataClient::connect("http://127.0.0.1:9000").await?;
client.subscribe_updates().await?;
// Automatic cache invalidation when topology/schema changes
let topology = client.get_topology().await?; // Uses cache if valid

Running the Service

Single Node (Development)

Terminal window
# Start metadata server
cargo run --example metadata_server -- 1
# In another terminal, run client
cargo run --example metadata_client -- http://127.0.0.1:9000

Multi-Node Cluster (Production)

Terminal window
# Node 1 (Leader)
cargo run --example metadata_server -- 1
# Node 2 (Follower)
cargo run --example metadata_server -- 2
# Node 3 (Follower)
cargo run --example metadata_server -- 3

Configuration

let config = MetadataConfig {
node_id: 1, // Unique node ID
raft_addr: "127.0.0.1:5000".to_string(),
storage_path: "./metadata_data".to_string(),
peers: vec![2, 3], // Other Raft nodes
tick_interval: Duration::from_millis(100),
};

gRPC API

Table Operations

rpc CreateTable(CreateTableRequest) returns (CreateTableResponse);
rpc DropTable(DropTableRequest) returns (DropTableResponse);
rpc GetTable(GetTableRequest) returns (GetTableResponse);
rpc ListTables(ListTablesRequest) returns (ListTablesResponse);

Topology Operations

rpc GetTopology(GetTopologyRequest) returns (Topology);
rpc UpdateShardMap(UpdateShardMapRequest) returns (UpdateShardMapResponse);

Node Management

rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse);
rpc UnregisterNode(UnregisterNodeRequest) returns (UnregisterNodeResponse);
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);

Subscription

rpc Subscribe(SubscribeRequest) returns (stream MetadataUpdate);

Data Types Supported

  • Integers: INT8, INT16, INT32, INT64, UINT8, UINT16, UINT32, UINT64
  • Floating Point: FLOAT32, FLOAT64
  • Boolean
  • String, Bytes
  • Timestamp, Date
  • Decimal(precision, scale)
  • Vector(dimensions) - For AI/ML embeddings

TOAST Strategy

For large data types (especially vectors), TOAST (The Oversized-Attribute Storage Technique) controls storage:

  • PLAIN: Always inline (fastest access)
  • EXTENDED: Compress and move out-of-line if too large
  • EXTERNAL: Always out-of-line, compressed
  • MAIN: Prefer inline, move to external if needed

Fault Tolerance

Leader Election

  • When leader fails, followers detect via heartbeat timeout
  • New leader elected via Raft voting mechanism
  • Typical election time: 1-2 seconds

Write Path (DDL Operations)

  1. Client sends request to any metadata node
  2. Follower forwards to leader
  3. Leader proposes to Raft cluster
  4. Waits for quorum (majority of nodes)
  5. Applies to state machine and returns success
  6. Broadcasts update to all subscribers

Read Path (Schema/Topology Queries)

  • Can be served by any node (eventually consistent)
  • Or leader only (strongly consistent)
  • Clients cache and subscribe to updates

Performance Considerations

Write Throughput

  • DDL operations serialized through Raft leader (~1000-5000 ops/sec)
  • Batching recommended for bulk schema changes

Read Throughput

  • Unlimited (served from local state machine)
  • No Raft consensus required for reads

Network

  • gRPC over TCP (can be configured for TLS)
  • Protobuf binary serialization
  • Supports HTTP/2 multiplexing

Testing

Terminal window
# Run unit tests
cargo test
# Run integration tests
cargo test --test integration_test
# Run with logging
RUST_LOG=debug cargo test -- --nocapture

Monitoring

Key metrics to monitor:

  • Raft leader status
  • Proposal latency
  • Log compaction size
  • Snapshot creation time
  • Number of subscribers
  • Cache hit rate (clients)

Implementation Details

Raft Log Storage

  • Storage Engine: RocksDB
  • Log Entry Format: Bincode serialization
  • Compaction: Automatic via snapshots
  • Key Prefixes:
    • entry_{index} - Log entries
    • raft_state - Hard state
    • conf_state - Configuration state
    • snapshot - Latest snapshot

State Machine

  • Format: In-memory (ClusterTopology + SchemaRegistry)
  • Snapshot: Full state serialization
  • Version: Monotonically increasing counter

Concurrency

  • parking_lot::RwLock for state machine access
  • parking_lot::Mutex for Raft node access
  • Lock-free reads for topology/schema queries

Future Enhancements

  1. Multi-Raft: Separate Raft groups for topology and schema
  2. Read Replicas: Non-voting nodes for read scaling
  3. Partition Spec: Full support for range/list/hash partitioning
  4. Index Metadata: Track secondary indexes, vector indexes
  5. Query Statistics: Store table/column statistics for optimizer
  6. Access Control: Table-level permissions
  7. Change Data Capture: Stream metadata changes

References