This document provides a deep dive into HugeGraph Store's distributed architecture, including the three-tier design, Raft consensus mechanisms, partition management, and coordination with HugeGraph PD.
HugeGraph Store is designed as a distributed, partition-based storage system that provides:
HugeGraph Store follows a layered architecture with clear separation of responsibilities:
┌─────────────────────────────────────────────────────────────┐
│ Client Layer │
│ (hugegraph-server with hg-store-client library) │
│ - Graph API requests │
│ - Query execution planning │
│ - Partition routing via PD │
└───────────────────────┬─────────────────────────────────────┘
│ gRPC (Query, Batch, Session)
↓
┌─────────────────────────────────────────────────────────────┐
│ Store Node Layer │
│ (hg-store-node: multiple Store instances) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ gRPC Services │ │ PD Integration │ │
│ │ - Session │ │ - Registration │ │
│ │ - Query │ │ - Heartbeat │ │
│ │ - State │ │ - Partition │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ ↓ ↓ │
│ ┌──────────────────────────────────────┐ │
│ │ HgStoreEngine (singleton) │ │
│ │ - Manages all partition engines │ │
│ │ - Coordinates with PD │ │
│ │ - Handles partition lifecycle │ │
│ └─────────────────┬────────────────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ │ │ │ │
│ ↓ ↓ ↓ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Partition│ │Partition│ │Partition│ (N partitions) │
│ │Engine 1 │ │Engine 2 │ │Engine N │ │
│ │ │ │ │ │ │ │
│ │ Raft │ │ Raft │ │ Raft │ │
│ │ Group 1 │ │ Group 2 │ │ Group N │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
└───────┼────────────┼────────────┼──────────────────────────┘
│ │ │
↓ ↓ ↓
┌─────────────────────────────────────────────────────────────┐
│ Storage Engine Layer │
│ (hg-store-core + hg-store-rocksdb) │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ PartitionEngine (per partition) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Raft State Machine │ │ │
│ │ │ - Apply log entries │ │ │
│ │ │ - Snapshot creation/loading │ │ │
│ │ │ - Business logic delegation │ │ │
│ │ └────────────────┬─────────────────────────────┘ │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ BusinessHandler │ │ │
│ │ │ - Put/Get/Delete/Scan operations │ │ │
│ │ │ - Query processing (filters, aggregations) │ │ │
│ │ │ - Transaction management │ │ │
│ │ └────────────────┬─────────────────────────────┘ │ │
│ │ │ │ │
│ └───────────────────┼─────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ RocksDB Session & Store │ │
│ │ - Column families for different data types │ │
│ │ - LSM tree storage │ │
│ │ - Compaction and caching │ │
│ │ - Persistent storage on disk │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
hg-store-client)Location: hugegraph-server/hugegraph-hstore (backend implementation) + hugegraph-store/hg-store-client
Responsibilities:
Key Classes:
HgStoreClient: Main client interfaceHgStoreSession: Session-based operations (put, get, delete, scan)HgStoreNodeManager: Manages connections to Store nodeshg-store-node)Location: hugegraph-store/hg-store-node
Responsibilities:
PartitionEngine instances for assigned partitionsPartitionEngine based on partition IDKey Components:
gRPC Services (7 proto files in hg-store-grpc/src/main/proto/):
HgStoreSession (store_session.proto): Session management, batch operationsQueryService (query.proto): Query pushdown operationsGraphStore (graphpb.proto): Graph-specific operations (vertex, edge scanning)HgStoreState (store_state.proto): Node state and cluster state queriesHgStoreStreamMeta (store_stream_meta.proto): Streaming metadata operationsHealthy (healthy.proto): Health check endpointsstore_common.proto): Shared data structuresHgStoreEngine (hg-store-core/src/main/java/.../HgStoreEngine.java):
PartitionEngine instancesDefaultPdProviderHeartbeatService)PartitionEngine (hg-store-core/src/main/java/.../PartitionEngine.java):
RaftEngine) for consensusBusinessHandlerhg-store-core + hg-store-rocksdb)Location: hugegraph-store/hg-store-core and hugegraph-store/hg-store-rocksdb
Responsibilities:
Key Components:
HgStoreStateMachine (hg-store-core/.../raft/HgStoreStateMachine.java):
StateMachine interfaceonSnapshotSave) and loading (onSnapshotLoad)BusinessHandler for actual data operationsBusinessHandler (hg-store-core/.../business/BusinessHandler.java):
RocksDBSession for storage accessRocksDBSession (hg-store-rocksdb/.../RocksDBSession.java):
ScanIterator)HugeGraph Store uses Sofa-JRaft (Ant Financial's Raft implementation) to achieve strong consistency and high availability.
Unlike some distributed systems that use a single Raft group for the entire cluster, HugeGraph Store uses MultiRaft:
Store Cluster (3 nodes: S1, S2, S3) Partition 1: Raft Group 1 - Leader: S1 - Followers: S2, S3 Partition 2: Raft Group 2 - Leader: S2 - Followers: S1, S3 Partition 3: Raft Group 3 - Leader: S3 - Followers: S1, S2
Advantages:
Trade-offs:
RaftEngine)Location: hg-store-core/.../raft/RaftEngine.java (wraps JRaft's Node)
Responsibilities:
Key Configuration (application.yml):
raft: address: 127.0.0.1:8510 # Raft RPC address snapshotInterval: 1800 # Snapshot every 30 minutes disruptorBufferSize: 1024 # Raft log buffer size max-log-file-size: 600000000000 # Max log file size
HgStoreStateMachine)Location: hg-store-core/.../raft/HgStoreStateMachine.java
Implements JRaft's StateMachine interface with these key methods:
onApply(Iterator iter):
RaftOperation from log entryBusinessHandler for executionRaftClosureonSnapshotSave(SnapshotWriter writer, Closure done):
HgSnapshotHandler to save RocksDB dataonSnapshotLoad(SnapshotReader reader):
onLeaderStart(long term):
onLeaderStop(Status status):
RaftOperation)Location: hg-store-core/.../raft/RaftOperation.java
Encapsulates all operations that need Raft consensus:
Operation Types:
PUT: Single key-value writeDELETE: Key deletionBATCH: Batch write operationsPARTITION_META: Partition metadata updatesSNAPSHOT: Snapshot-related operationsFlow for Write Operations:
RaftOperation with operation datanode.apply(task)HgSnapshotHandler)Location: hg-store-core/.../snapshot/HgSnapshotHandler.java
Snapshot Creation:
snapshotInterval (default: 1800 seconds)Snapshot Loading:
Snapshot Directory Structure:
raft/
└── partition-<partition-id>/
├── log/ # Raft log files
├── snapshot/ # Snapshots
│ ├── snapshot_<index>_<term>/
│ │ ├── data/ # RocksDB data files
│ │ └── meta # Snapshot metadata
│ └── ...
└── meta # Raft metadata
raft.max-log-file-size: Maximum size of a single log file
Log Retention:
snapshotInterval and JRaft's log compactionraft.snapshotInterval: How often to create snapshots (seconds)
raft.disruptorBufferSize: Raft log buffer size
PD Notification: New leader reports leadership to PD via updatePartitionLeader() call
Scenario: Network partition splits cluster into two groups
Example: 3-node cluster (S1, S2, S3) splits into {S1} and {S2, S3}
Behavior:
Recovery: When network heals, S1 rejoins, discards any uncommitted writes, and syncs from the leader
Prevention: Use Raft's pre-vote mechanism (enabled by default in JRaft) to prevent unnecessary elections
Partitions are the fundamental unit of data distribution in HugeGraph Store. Understanding partition management is critical for operating Store clusters.
Partition: A logical unit of data with a unique partition ID
Shard: A replica of a partition (e.g., Partition 1 might have Shard 1.1, 1.2, 1.3 on three Store nodes)
Shard Group: The set of all shards (replicas) for a partition, forming a Raft group
Partition Metadata (hg-pd-grpc/src/main/proto/metapb.proto):
message Partition { uint32 id = 1; // Unique partition ID uint64 version = 2; // Version for partition updates uint32 start_key = 3; // Start of key range (hash value) uint32 end_key = 4; // End of key range (hash value) repeated Shard shards = 5; // List of replicas PartitionState state = 6; // Normal, Offline, etc. } message Shard { uint64 store_id = 1; // Store node ID ShardRole role = 2; // Leader or Follower ShardState state = 3; // Normal, Offline, etc. }
When a Store node starts:
pdserver.addressgrpc.host:grpc.port)raft.address)Code: hg-store-core/.../pd/DefaultPdProvider.java handles PD communication
Trigger: First Store nodes join the cluster
Process:
pd.initial-store-count)Example: 3 Store nodes (S1, S2, S3), 6 partitions, 3 replicas each
Partition 1: S1 (leader), S2, S3 Partition 2: S2 (leader), S1, S3 Partition 3: S3 (leader), S1, S2 Partition 4: S1 (leader), S2, S3 Partition 5: S2 (leader), S1, S3 Partition 6: S3 (leader), S1, S2
Code: hg-store-core/.../PartitionEngine.java and HgStoreEngine.java
When PD instructs a Store to create a partition:
PartitionInstructionListener command from PDHgStoreEngine creates a new PartitionEngine instancePartitionEngine initializes Raft node with peer list (shard group)Partition State Transitions:
None → Normal: Partition successfully created and operationalNormal → Offline: Partition marked for deletion or migrationOffline → Tombstone: Partition data deleted (pending cleanup)Hash-based Partitioning (default):
Example:
Key: "vertex:person:1001"
Hash: MurmurHash3("vertex:person:1001") = 0x12345678
Partition Range: 0x10000000 - 0x1FFFFFFF → Partition 3
Partition 3 Shards: S1 (leader), S2, S3
Request sent to: S1 (leader of Partition 3)
Code:
hg-store-client/.../HgStoreNodeManager.javaTrigger: PD's patrol task detects imbalance (runs every pd.patrol-interval seconds)
Imbalance Scenarios:
Rebalancing Process:
Configuration (in PD application.yml):
pd: patrol-interval: 1800 # Rebalancing check interval (seconds) store: max-down-time: 172800 # Mark Store offline after 48 hours partition: store-max-shard-count: 12 # Max partitions per Store
Note: Partition splitting is planned but not yet implemented in the current version.
Planned Behavior:
PD (Placement Driver) serves as the control plane for HugeGraph Store, managing cluster metadata, partition assignment, and health monitoring.
When: Store node startup
Process:
StoreId: 0 (or previously assigned ID) Address: 192.168.1.20:8500 RaftAddress: 192.168.1.20:8510 DataPath: /data/hugegraph-store Capacity: 1TB
Code: hg-store-core/.../pd/DefaultPdProvider.java → register() method
Frequency: Every 30 seconds (configurable in PD)
Heartbeat Content:
Purpose:
Code: hg-store-core/.../HeartbeatService.java
Heartbeat Timeout (in PD):
store.max-down-time (default: 48 hours)Purpose: Receive partition management commands from PD
Instruction Types:
CREATE_PARTITION: Create a new partition replica on this StoreDELETE_PARTITION: Delete a partition replica from this StoreUPDATE_PARTITION: Update partition metadata (e.g., add/remove shard)TRANSFER_LEADER: Transfer Raft leadership to another shardCode: hg-store-core/.../pd/PartitionInstructionListener.java
Flow:
PartitionEngine)Trigger: Raft leader election completes
Process:
onLeaderStart() or onLeaderStop())updatePartitionLeader(partitionId, newLeader, term)Importance: Ensures clients always route writes to the current Raft leader
Graph Metadata: List of graphs managed by the cluster Partition Metadata: Partition ID, key ranges, shard list Store Metadata: Store ID, address, capacity, state Shard Group Metadata: Replica list for each partition
PD Cluster Failure:
Recommendation: Always run PD in a 3-node or 5-node cluster for high availability
Scenario: Client writes a vertex to HugeGraph Server
1. [Client] → [hugegraph-server]
GraphAPI.addVertex(vertex)
2. [hugegraph-server] → [hg-store-client]
HstoreStore.put(key, value)
3. [hg-store-client] → [PD]
Query: Which partition owns hash(key)?
Response: Partition 3, Leader = Store 1 (192.168.1.20:8500)
4. [hg-store-client] → [Store 1 gRPC]
Put Request (key, value)
5. [Store 1] → [PartitionEngine 3]
Identify partition by key hash
6. [PartitionEngine 3] → [Raft Leader]
Propose RaftOperation(PUT, key, value)
7. [Raft Leader] → [Raft Followers (Store 2, Store 3)]
Replicate log entry
8. [Raft Followers] → [Raft Leader]
Acknowledge (2/3 quorum achieved)
9. [Raft Leader] → [State Machine]
Apply committed log entry
10. [State Machine] → [BusinessHandler]
Execute put(key, value)
11. [BusinessHandler] → [RocksDB]
rocksDB.put(key, value)
12. [Store 1] → [hg-store-client]
Put Response (success)
13. [hg-store-client] → [hugegraph-server]
Success
14. [hugegraph-server] → [Client]
HTTP 201 Created
Latency Breakdown (typical production cluster):
Scenario: Client queries vertices by label
1. [Client] → [hugegraph-server]
GraphAPI.queryVertices(label="person")
2. [hugegraph-server] → [hg-store-client]
HstoreSession.scan(labelKey, filters)
3. [hg-store-client] → [PD]
Query: Which partitions store vertices?
Response: All partitions (multi-partition scan)
4. [hg-store-client] → [Multiple Stores in parallel]
Scan Request (labelKey, filters) to each partition
5. [Each Store] → [PartitionEngine]
Forward scan to appropriate partition
6. [PartitionEngine] → [Raft Leader]
Optional: Read index check (ensure linearizable read)
7. [PartitionEngine] → [BusinessHandler]
scan(labelKey, filters)
8. [BusinessHandler] → [RocksDB]
rocksDB.scan(startKey, endKey, filter)
9. [RocksDB] → [BusinessHandler]
Iterator over matching keys
10. [BusinessHandler] → [Query Processor]
Apply filters and aggregations (if pushdown)
11. [Stores] → [hg-store-client]
Partial results from each partition
12. [hg-store-client] → [MultiPartitionIterator]
Merge and deduplicate results
13. [hugegraph-server] → [Client]
Final result set
Optimization: Query Pushdown:
Scenario: PD decides to create a new partition on Store 1
1. [PD Patrol Task]
Detect: Need more partitions for load balancing
2. [PD] → [PartitionService]
createPartition(partitionId=100, shards=[Store1, Store2, Store3])
3. [PD] → [Store 1, Store 2, Store 3]
Instruction: CREATE_PARTITION (partitionId=100)
4. [Each Store] → [PartitionInstructionListener]
Receive and validate instruction
5. [Each Store] → [HgStoreEngine]
createPartitionEngine(partitionId=100, peers=[S1, S2, S3])
6. [HgStoreEngine] → [PartitionEngine]
new PartitionEngine(partitionId=100)
7. [PartitionEngine] → [RaftEngine]
Initialize Raft node with peer list
8. [Raft Nodes] → [Raft Leader Election]
Perform leader election (typically 1-3 seconds)
9. [New Leader] → [PD]
Report: updatePartitionLeader(100, leaderId)
10. [All Stores] → [PD]
Report: Partition creation successful
11. [PD] → [Metadata Store]
Update partition metadata: State = Normal
12. [hg-store-client] → [PD]
Refresh partition routing cache
Total Time: ~5-10 seconds for a new partition to become operational
HugeGraph Store's distributed architecture is designed for:
Key Takeaways:
For deployment strategies and cluster sizing, see Deployment Guide.
For query optimization and pushdown mechanisms, see Query Engine.
For operational best practices, see Operations Guide.