This document provides comprehensive API reference for HugeGraph PD, including gRPC services, Protocol Buffers definitions, and usage examples.
HugeGraph PD exposes multiple gRPC services for cluster management and coordination:
| Service | Proto File | Description |
|---|---|---|
| PDGrpc | pdpb.proto | Main PD service: store registration, partition queries, member management |
| KvServiceGrpc | kv.proto | Distributed key-value operations for metadata storage |
| HgPdPulseGrpc | pd_pulse.proto | Heartbeat and health monitoring for Store nodes |
| HgPdWatchGrpc | pd_watch.proto | Watch for partition and store change notifications |
| DiscoveryServiceGrpc | discovery.proto | Service discovery and registration |
Proto Location: hugegraph-pd/hg-pd-grpc/src/main/proto/
Generated Stubs: hugegraph-pd/hg-pd-grpc/src/main/java/org/apache/hugegraph/pd/grpc/
hg-pd-grpc/src/main/proto/ ├── pdpb.proto # Main PD service RPCs ├── metapb.proto # Core metadata objects (Partition, Shard, Store) ├── meta.proto # Extended metadata definitions ├── pd_common.proto # Common types and enums ├── kv.proto # Key-value service ├── pd_pulse.proto # Heartbeat protocol ├── pd_watch.proto # Watch notification protocol ├── discovery.proto # Service discovery └── metaTask.proto # Task coordination
Represents a data partition in the cluster.
message Partition { uint64 id = 1; string graph_name = 2; uint64 start_key = 3; uint64 end_key = 4; repeated Shard shards = 5; PartitionState state = 6; uint64 version = 7; } enum PartitionState { PState_None = 0; PState_Normal = 1; PState_Splitting = 2; PState_Offline = 3; }
Represents a replica of a partition.
message Shard { uint64 store_id = 1; ShardRole role = 2; } enum ShardRole { None = 0; Leader = 1; Follower = 2; Learner = 3; }
Represents a Store node in the cluster.
message Store { uint64 id = 1; string address = 2; // gRPC address (host:port) string raft_address = 3; // Raft address for data replication StoreState state = 4; map<string, string> labels = 5; // Topology labels (rack, zone, region) StoreStats stats = 6; int64 last_heartbeat = 7; // Unix timestamp uint64 version = 8; } enum StoreState { Unknown = 0; Up = 1; // Store is online and healthy Offline = 2; // Store is temporarily offline Tombstone = 3; // Store is permanently removed Exiting = 4; // Store is in the process of shutting down } message StoreStats { uint64 capacity = 1; // Total disk capacity (bytes) uint64 available = 2; // Available disk space (bytes) uint32 partition_count = 3; // Number of partitions on this store uint32 leader_count = 4; // Number of partitions where this store is leader }
Represents a graph in the cluster.
message Graph { string graph_name = 1; uint32 partition_count = 2; GraphState state = 3; } enum GraphState { Graph_Normal = 0; Graph_Deleting = 1; }
Main service for cluster management.
Retrieve all PD members in the cluster.
Request:
message GetMembersRequest {}
Response:
message GetMembersResponse { ResponseHeader header = 1; repeated Member members = 2; Member leader = 3; } message Member { string cluster_id = 1; string member_id = 2; string grpc_url = 3; // gRPC endpoint string raft_url = 4; // Raft endpoint MemberState state = 5; }
Java Example:
import org.apache.hugegraph.pd.grpc.PDGrpc; import org.apache.hugegraph.pd.grpc.Pdpb; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; // Create gRPC channel ManagedChannel channel = ManagedChannelBuilder .forAddress("localhost", 8686) .usePlaintext() .build(); // Create blocking stub PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(channel); // Get PD members Pdpb.GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder().build(); Pdpb.GetMembersResponse response = stub.getMembers(request); System.out.println("Leader: " + response.getLeader().getGrpcUrl()); for (Pdpb.Member member : response.getMembersList()) { System.out.println("Member: " + member.getGrpcUrl() + " State: " + member.getState()); } // Clean up channel.shutdown();
Register a new Store node with PD.
Request:
message RegisterStoreRequest { RequestHeader header = 1; Store store = 2; }
Response:
message RegisterStoreResponse { ResponseHeader header = 1; uint64 store_id = 2; // Assigned store ID }
Java Example:
import org.apache.hugegraph.pd.grpc.Metapb; // Build store information Metapb.Store store = Metapb.Store.newBuilder() .setAddress("192.168.1.100:8500") .setRaftAddress("192.168.1.100:8501") .setState(Metapb.StoreState.Up) .putLabels("zone", "zone-1") .putLabels("rack", "rack-a") .build(); // Register store Pdpb.RegisterStoreRequest request = Pdpb.RegisterStoreRequest.newBuilder() .setStore(store) .build(); Pdpb.RegisterStoreResponse response = stub.registerStore(request); long storeId = response.getStoreId(); System.out.println("Registered store with ID: " + storeId);
Retrieve Store node information.
Request:
message GetStoreInfoRequest { RequestHeader header = 1; uint64 store_id = 2; }
Response:
message GetStoreInfoResponse { ResponseHeader header = 1; Store store = 2; }
Java Example:
Pdpb.GetStoreInfoRequest request = Pdpb.GetStoreInfoRequest.newBuilder() .setStoreId(storeId) .build(); Pdpb.GetStoreInfoResponse response = stub.getStoreInfo(request); Metapb.Store store = response.getStore(); System.out.println("Store " + store.getId() + " at " + store.getAddress()); System.out.println("State: " + store.getState()); System.out.println("Partitions: " + store.getStats().getPartitionCount()); System.out.println("Capacity: " + store.getStats().getCapacity() / (1024*1024*1024) + " GB");
Retrieve partition information by partition code.
Request:
message GetPartitionRequest { RequestHeader header = 1; string graph_name = 2; uint64 partition_key = 3; // Hash code of the data key }
Response:
message GetPartitionResponse { ResponseHeader header = 1; Partition partition = 2; Shard leader = 3; // Current leader shard }
Java Example:
String graphName = "social_network"; long partitionKey = 12345L; // Hash of vertex/edge key Pdpb.GetPartitionRequest request = Pdpb.GetPartitionRequest.newBuilder() .setGraphName(graphName) .setPartitionKey(partitionKey) .build(); Pdpb.GetPartitionResponse response = stub.getPartition(request); Metapb.Partition partition = response.getPartition(); Metapb.Shard leader = response.getLeader(); System.out.println("Partition " + partition.getId() + " range: [" + partition.getStartKey() + ", " + partition.getEndKey() + ")"); System.out.println("Leader store: " + leader.getStoreId()); System.out.println("Replicas: " + partition.getShardsCount());
Retrieve partition by exact partition code (optimized for routing).
Request:
message GetPartitionByCodeRequest { RequestHeader header = 1; string graph_name = 2; uint64 partition_id = 3; }
Response:
message GetPartitionByCodeResponse { ResponseHeader header = 1; Partition partition = 2; }
Java Example:
Pdpb.GetPartitionByCodeRequest request = Pdpb.GetPartitionByCodeRequest.newBuilder() .setGraphName("social_network") .setPartitionId(5) .build(); Pdpb.GetPartitionByCodeResponse response = stub.getPartitionByCode(request); Metapb.Partition partition = response.getPartition(); // Find leader shard Metapb.Shard leader = partition.getShardsList().stream() .filter(s -> s.getRole() == Metapb.ShardRole.Leader) .findFirst() .orElse(null); if (leader != null) { System.out.println("Route query to store: " + leader.getStoreId()); }
Distributed key-value operations for metadata storage.
Store a key-value pair.
Request:
message PutRequest { string key = 1; bytes value = 2; int64 ttl = 3; // Time-to-live in seconds (0 = no expiration) }
Response:
message PutResponse { ResponseHeader header = 1; }
Java Example:
import org.apache.hugegraph.pd.grpc.kv.KvServiceGrpc; import org.apache.hugegraph.pd.grpc.kv.Kv; KvServiceGrpc.KvServiceBlockingStub kvStub = KvServiceGrpc.newBlockingStub(channel); // Store configuration String key = "config/max_retry_count"; String value = "5"; Kv.PutRequest request = Kv.PutRequest.newBuilder() .setKey(key) .setValue(com.google.protobuf.ByteString.copyFromUtf8(value)) .setTtl(0) // No expiration .build(); Kv.PutResponse response = kvStub.put(request); System.out.println("Stored: " + key);
Retrieve a value by key.
Request:
message GetRequest { string key = 1; }
Response:
message GetResponse { ResponseHeader header = 1; bytes value = 2; }
Java Example:
Kv.GetRequest request = Kv.GetRequest.newBuilder() .setKey("config/max_retry_count") .build(); Kv.GetResponse response = kvStub.get(request); String value = response.getValue().toStringUtf8(); System.out.println("Retrieved value: " + value);
Range scan for keys matching a prefix.
Request:
message ScanRequest { string start_key = 1; string end_key = 2; int32 limit = 3; // Max number of results }
Response:
message ScanResponse { ResponseHeader header = 1; repeated KvPair kvs = 2; } message KvPair { string key = 1; bytes value = 2; }
Java Example:
// Scan all configuration keys Kv.ScanRequest request = Kv.ScanRequest.newBuilder() .setStartKey("config/") .setEndKey("config/\uffff") // End of prefix range .setLimit(100) .build(); Kv.ScanResponse response = kvStub.scan(request); for (Kv.KvPair kv : response.getKvsList()) { System.out.println(kv.getKey() + " = " + kv.getValue().toStringUtf8()); }
Heartbeat and health monitoring for Store nodes.
Bidirectional streaming for continuous heartbeat.
Request Stream:
message PulseRequest { PulseType pulse_type = 1; oneof notice { PulseCreatePartition create_partition = 2; PulseTransferLeader transfer_leader = 3; PulseMovePartition move_partition = 4; PulseDeletePartition delete_partition = 5; } } enum PulseType { PULSE_TYPE_UNKNOWN = 0; PULSE_TYPE_STORE_HEARTBEAT = 1; PULSE_TYPE_PARTITION_HEARTBEAT = 2; }
Response Stream:
message PulseResponse { PulseType pulse_type = 1; oneof notice { PulseCreatePartition create_partition = 2; PulseTransferLeader transfer_leader = 3; PulseMovePartition move_partition = 4; PulseDeletePartition delete_partition = 5; } }
Java Example:
import org.apache.hugegraph.pd.grpc.pulse.HgPdPulseGrpc; import org.apache.hugegraph.pd.grpc.pulse.HgPdPulse; import io.grpc.stub.StreamObserver; HgPdPulseGrpc.HgPdPulseStub asyncStub = HgPdPulseGrpc.newStub(channel); // Response handler StreamObserver<HgPdPulse.PulseResponse> responseObserver = new StreamObserver<>() { @Override public void onNext(HgPdPulse.PulseResponse response) { System.out.println("Received instruction: " + response.getPulseType()); // Handle instructions from PD (partition transfer, split, etc.) } @Override public void onError(Throwable t) { System.err.println("Pulse stream error: " + t.getMessage()); } @Override public void onCompleted() { System.out.println("Pulse stream completed"); } }; // Create bidirectional stream StreamObserver<HgPdPulse.PulseRequest> requestObserver = asyncStub.pulse(responseObserver); // Send periodic heartbeat ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { HgPdPulse.PulseRequest heartbeat = HgPdPulse.PulseRequest.newBuilder() .setPulseType(HgPdPulse.PulseType.PULSE_TYPE_STORE_HEARTBEAT) .build(); requestObserver.onNext(heartbeat); }, 0, 10, TimeUnit.SECONDS);
Watch for partition and store change notifications.
Watch for partition changes in a graph.
Request:
message WatchPartitionRequest { RequestHeader header = 1; string graph_name = 2; WatchType watch_type = 3; } enum WatchType { WATCH_TYPE_PARTITION_CHANGE = 0; WATCH_TYPE_STORE_CHANGE = 1; }
Response Stream:
message WatchPartitionResponse { ResponseHeader header = 1; WatchChangeType change_type = 2; Partition partition = 3; } enum WatchChangeType { WATCH_CHANGE_TYPE_ADD = 0; WATCH_CHANGE_TYPE_DEL = 1; WATCH_CHANGE_TYPE_ALTER = 2; }
Java Example:
import org.apache.hugegraph.pd.grpc.watch.HgPdWatchGrpc; import org.apache.hugegraph.pd.grpc.watch.HgPdWatch; HgPdWatchGrpc.HgPdWatchStub watchStub = HgPdWatchGrpc.newStub(channel); // Watch partition changes HgPdWatch.WatchPartitionRequest request = HgPdWatch.WatchPartitionRequest.newBuilder() .setGraphName("social_network") .setWatchType(HgPdWatch.WatchType.WATCH_TYPE_PARTITION_CHANGE) .build(); StreamObserver<HgPdWatch.WatchPartitionResponse> responseObserver = new StreamObserver<>() { @Override public void onNext(HgPdWatch.WatchPartitionResponse response) { WatchChangeType changeType = response.getChangeType(); Metapb.Partition partition = response.getPartition(); switch (changeType) { case WATCH_CHANGE_TYPE_ADD: System.out.println("Partition added: " + partition.getId()); break; case WATCH_CHANGE_TYPE_DEL: System.out.println("Partition deleted: " + partition.getId()); break; case WATCH_CHANGE_TYPE_ALTER: System.out.println("Partition changed: " + partition.getId()); // Refresh local cache break; } } @Override public void onError(Throwable t) { System.err.println("Watch error: " + t.getMessage()); } @Override public void onCompleted() { System.out.println("Watch completed"); } }; watchStub.watchPartition(request, responseObserver);
HugeGraph PD provides a high-level Java client library (hg-pd-client) that simplifies interaction with PD.
Main client class for PD operations.
Initialization:
import org.apache.hugegraph.pd.client.PDClient; import org.apache.hugegraph.pd.client.PDConfig; // Configure PD client PDConfig config = PDConfig.builder() .pdServers("192.168.1.10:8686,192.168.1.11:8686,192.168.1.12:8686") .connectTimeout(5000) // 5 seconds .requestTimeout(10000) // 10 seconds .enableCache(true) // Enable partition cache .build(); // Create client PDClient client = new PDClient(config); // Use client... // Clean up client.close();
import org.apache.hugegraph.pd.common.PartitionEngine; // Get partition by key String graphName = "social_network"; long vertexId = 12345L; long partitionKey = PartitionEngine.calcHashcode(vertexId); Metapb.Partition partition = client.getPartitionByKey(graphName, partitionKey); System.out.println("Partition ID: " + partition.getId()); // Get all partitions for a graph List<Metapb.Partition> partitions = client.getPartitionsByGraphName(graphName); System.out.println("Total partitions: " + partitions.size()); // Get partition leader Metapb.Shard leader = client.getPartitionLeader(graphName, partition.getId()); Metapb.Store leaderStore = client.getStore(leader.getStoreId()); System.out.println("Leader at: " + leaderStore.getAddress());
// Get all stores List<Metapb.Store> stores = client.getStores(); for (Metapb.Store store : stores) { System.out.println("Store " + store.getId() + ": " + store.getAddress() + " (" + store.getState() + ")"); } // Get active stores List<Metapb.Store> activeStores = client.getActiveStores(); System.out.println("Active stores: " + activeStores.size()); // Get stores by graph List<Metapb.Store> graphStores = client.getStoresByGraphName(graphName);
import org.apache.hugegraph.pd.client.PDWatch; // Create watch listener PDWatch.Listener<Metapb.Partition> listener = new PDWatch.Listener<>() { @Override public void onNext(PDWatch.WatchEvent<Metapb.Partition> event) { System.out.println("Partition " + event.getTarget().getId() + " " + event.getType()); } @Override public void onError(Throwable error) { System.err.println("Watch error: " + error.getMessage()); } }; // Watch partition changes PDWatch watch = client.watchPartition(graphName, listener); // Stop watching watch.close();
// Put key-value client.put("config/setting1", "value1".getBytes()); // Get value byte[] value = client.get("config/setting1"); System.out.println("Value: " + new String(value)); // Delete key client.delete("config/setting1"); // Scan with prefix Map<String, byte[]> results = client.scan("config/", "config/\uffff", 100); for (Map.Entry<String, byte[]> entry : results.entrySet()) { System.out.println(entry.getKey() + " = " + new String(entry.getValue())); }
PD exposes a REST API for management and monitoring (default port: 8620).
curl http://localhost:8620/actuator/health
Response:
{ "status": "UP", "groups": ["liveness", "readiness"] }
curl http://localhost:8620/actuator/metrics
Response (Prometheus format):
# HELP pd_raft_state Raft state (0=Follower, 1=Candidate, 2=Leader)
# TYPE pd_raft_state gauge
pd_raft_state 2.0
# HELP pd_store_count Number of stores
# TYPE pd_store_count gauge
pd_store_count{state="Up"} 3.0
pd_store_count{state="Offline"} 0.0
# HELP pd_partition_count Number of partitions
# TYPE pd_partition_count gauge
pd_partition_count 36.0
curl http://localhost:8620/v1/partitions?graph_name=social_network
Response:
{ "partitions": [ { "id": 1, "graph_name": "social_network", "start_key": 0, "end_key": 1000, "shards": [ {"store_id": 1, "role": "Leader"}, {"store_id": 2, "role": "Follower"}, {"store_id": 3, "role": "Follower"} ], "state": "PState_Normal" } ] }
curl http://localhost:8620/v1/stores
Response:
{ "stores": [ { "id": 1, "address": "192.168.1.100:8500", "raft_address": "192.168.1.100:8501", "state": "Up", "labels": { "zone": "zone-1", "rack": "rack-a" }, "stats": { "capacity": 107374182400, "available": 53687091200, "partition_count": 12, "leader_count": 8 }, "last_heartbeat": 1620000000 } ] }
PD uses standard gRPC status codes:
| Code | Name | Description |
|---|---|---|
| 0 | OK | Success |
| 1 | CANCELLED | Operation cancelled |
| 2 | UNKNOWN | Unknown error |
| 3 | INVALID_ARGUMENT | Invalid request parameters |
| 4 | DEADLINE_EXCEEDED | Timeout |
| 5 | NOT_FOUND | Resource not found (store, partition, etc.) |
| 6 | ALREADY_EXISTS | Resource already exists |
| 7 | PERMISSION_DENIED | Insufficient permissions |
| 8 | RESOURCE_EXHAUSTED | Quota exceeded |
| 14 | UNAVAILABLE | Service unavailable (not leader, Raft not ready) |
All responses include a ResponseHeader with error information:
message ResponseHeader { uint64 cluster_id = 1; Error error = 2; } message Error { ErrorType type = 1; string message = 2; } enum ErrorType { OK = 0; NOT_LEADER = 1; // Current node is not Raft leader STORE_NOT_FOUND = 2; PARTITION_NOT_FOUND = 3; STORE_TOMBSTONE = 4; // Store is permanently removed RAFT_ERROR = 5; }
Error Handling Example:
Pdpb.GetStoreInfoResponse response = stub.getStoreInfo(request); if (response.getHeader().hasError()) { Error error = response.getHeader().getError(); if (error.getType() == ErrorType.NOT_LEADER) { // Retry with leader node String leaderUrl = getLeaderFromMembers(); // Reconnect and retry... } else { System.err.println("Error: " + error.getMessage()); } } else { Metapb.Store store = response.getStore(); // Process store... }
// Good: Reuse channel ManagedChannel channel = ManagedChannelBuilder .forAddress("pd-host", 8686) .usePlaintext() .keepAliveTime(30, TimeUnit.SECONDS) .idleTimeout(60, TimeUnit.SECONDS) .build(); // Bad: Create new channel per request // ManagedChannel channel = ... // channel.shutdown() // Don't do this after every request
ResponseHeader.error.type for NOT_LEADERGetMembers() to find current leaderWatchPartition to invalidate cache on changesint maxRetries = 3; int retryDelay = 1000; // milliseconds for (int i = 0; i < maxRetries; i++) { try { response = stub.getPartition(request); break; // Success } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.UNAVAILABLE && i < maxRetries - 1) { Thread.sleep(retryDelay * (1 << i)); // Exponential backoff } else { throw e; } } }
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(channel) .withDeadlineAfter(5, TimeUnit.SECONDS);
HugeGraph PD provides comprehensive gRPC APIs for:
Use the high-level PDClient library for simplified integration, or use raw gRPC stubs for fine-grained control.
For architecture details, see Architecture Documentation.
For configuration, see Configuration Guide.