This document explains HugeGraph Store's query processing capabilities, including query pushdown, multi-partition queries, and gRPC API reference.
HugeGraph Store implements advanced query processing to minimize network traffic and improve performance.
1. [Server] Receives Graph Query (e.g., g.V().has('age', 30))
↓
2. [Server] Translates to Store Query
- Determine table (vertices, edges, indexes)
- Extract filters (age == 30)
- Identify partitions (all or specific)
↓
3. [Server] Sends Query to Store Nodes (parallel)
- Query includes filters and aggregations
- One request per partition
↓
4. [Store] Executes Query Locally
- Apply filters at RocksDB scan level
- Compute aggregations if requested
- Stream results back to Server
↓
5. [Server] Merges Results
- Deduplicate if needed
- Apply final ordering/limiting
↓
6. [Server] Returns to Client
Supported Filters:
key == valuekey > value, key >= value, key < value, key <= valuekey.startsWith(prefix)key in [value1, value2, ...]Example: Query vertices with age = 30
Without Pushdown (inefficient):
1. Store scans all vertices 2. Sends all vertices to Server 3. Server filters age == 30 Network: Transfers all vertices
With Pushdown (efficient):
1. Server sends filter: age == 30 2. Store scans and filters locally 3. Store sends only matching vertices Network: Transfers only matching vertices
Implementation (proto definition):
File: hg-store-grpc/src/main/proto/query.proto
message Condition { string key = 1; // Property key ConditionType type = 2; // EQ, GT, LT, GTE, LTE, IN, PREFIX bytes value = 3; // Value to compare repeated bytes values = 4; // For IN operator } enum ConditionType { EQ = 0; // Equals GT = 1; // Greater than LT = 2; // Less than GTE = 3; // Greater than or equal LTE = 4; // Less than or equal IN = 5; // In list PREFIX = 6; // Prefix match }
Code Example (using client API):
import org.apache.hugegraph.store.client.HgStoreQuery; import org.apache.hugegraph.store.client.HgStoreQuery.Condition; // Build query with filter HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .filter(Condition.eq("age", 30)) .build(); // Execute (filter applied at Store) HgStoreResultSet results = session.query(query);
Supported Aggregations:
COUNT: Count matching rowsSUM: Sum numeric propertyMIN: Minimum valueMAX: Maximum valueAVG: Average valueExample: Count vertices with label “person”
Without Pushdown:
1. Store sends all person vertices to Server 2. Server counts vertices Network: Transfers millions of vertices
With Pushdown:
1. Server sends COUNT query 2. Store counts locally 3. Store sends count (single number) Network: Transfers 8 bytes
Proto Definition:
message QueryRequest { string table = 1; bytes start_key = 2; bytes end_key = 3; repeated Condition conditions = 4; AggregationType aggregation = 5; // COUNT, SUM, MIN, MAX, AVG string aggregation_key = 6; // Property to aggregate int64 limit = 7; } enum AggregationType { NONE = 0; COUNT = 1; SUM = 2; MIN = 3; MAX = 4; AVG = 5; }
Code Example:
// Count query HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .aggregation(HgStoreQuery.Aggregation.COUNT) .build(); long count = session.aggregate(query); System.out.println("Person count: " + count); // Sum query HgStoreQuery sumQuery = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .aggregation(HgStoreQuery.Aggregation.SUM) .aggregationKey("age") .build(); long totalAge = session.aggregate(sumQuery);
HugeGraph Server creates indexes as separate tables in Store. Store can directly scan index tables.
Index Types:
Example: Query by indexed property
// Server creates index (one-time setup) // schema.indexLabel("personByName").onV("person").by("name").secondary().create() // Query using index // Server translates to index table scan HgStoreQuery query = HgStoreQuery.builder() .table("INDEX_personByName") // Index table .filter(Condition.eq("name", "Alice")) .build(); // Store scans index table directly (fast) HgStoreResultSet results = session.query(query);
Index Table Structure:
Key: <index_value>:<vertex_id> Value: <vertex_id> Example: Key: "Alice:V1001" → Value: "V1001" Key: "Bob:V1002" → Value: "V1002"
Store distributes data across partitions using hash-based partitioning. Queries may target:
Example: Single-partition query
// Get vertex by ID (single partition) String vertexId = "person:1001"; int hash = MurmurHash3.hash32(vertexId); int partitionId = hash % totalPartitions; // Server routes to specific partition HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .partitionId(partitionId) .key(vertexId.getBytes()) .build(); byte[] value = session.get(query);
Example: Multi-partition query
// Scan all person vertices (all partitions) HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .build(); // Server sends query to ALL partitions in parallel HgStoreResultSet results = session.queryAll(query);
Implementation: hg-store-core/src/main/java/.../business/MultiPartitionIterator.java
Purpose: Merge results from multiple partitions
Deduplication Modes:
Code Example:
import org.apache.hugegraph.store.client.DeduplicationMode; // Query with deduplication HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .limit(100) .deduplicationMode(DeduplicationMode.LIMIT_DEDUP) .build(); HgStoreResultSet results = session.queryAll(query);
Algorithm (LIMIT_DEDUP):
1. Iterate partition 1, add results to set (up to limit) 2. Iterate partition 2, skip duplicates, add new results 3. Continue until limit reached or all partitions exhausted 4. Return deduplicated results
Flow:
Server Store 1 Store 2 Store 3 | | | | |---Query(partition 1-4)--------->| | | |---Query(partition 5-8)--------------------->| | |---Query(partition 9-12)------------------------------>| | | | | |<---Results (partitions 1-4)-----| | | |<---Results (partitions 5-8)----------------| | |<---Results (partitions 9-12)---------------------------| | | | | |-Merge results | | | | | | |
Performance:
File: hg-store-grpc/src/main/proto/store_session.proto
service HgStoreSession { // Get single key rpc Get(GetRequest) returns (GetResponse); // Batch get rpc BatchGet(BatchGetRequest) returns (stream BatchGetResponse); // Put/Delete/Batch operations rpc Batch(stream BatchRequest) returns (BatchResponse); // Scan range rpc ScanTable(ScanTableRequest) returns (stream ScanResponse); // Table operations rpc Table(TableRequest) returns (TableResponse); // Clean/Truncate rpc Clean(CleanRequest) returns (CleanResponse); }
GetRequest:
message GetRequest { Header header = 1; // Graph name, table name bytes key = 2; // Key to get }
ScanTableRequest:
message ScanTableRequest { Header header = 1; bytes start_key = 2; // Start of range bytes end_key = 3; // End of range (exclusive) int64 limit = 4; // Max results ScanMethod scan_method = 5; // ALL, PREFIX, RANGE bytes prefix = 6; // For PREFIX scan } enum ScanMethod { ALL = 0; // Scan all keys PREFIX = 1; // Scan keys with prefix RANGE = 2; // Scan key range }
File: hg-store-grpc/src/main/proto/query.proto
service QueryService { // Execute query with filters rpc Query(QueryRequest) returns (stream QueryResponse); // Execute query (alternative API) rpc Query0(QueryRequest) returns (stream QueryResponse); // Aggregate query (COUNT, SUM, etc.) rpc Count(QueryRequest) returns (CountResponse); }
QueryRequest:
message QueryRequest { Header header = 1; // Graph name, partition ID string table = 2; // Table name bytes start_key = 3; // Scan start bytes end_key = 4; // Scan end repeated Condition conditions = 5; // Filters AggregationType aggregation = 6; // COUNT, SUM, MIN, MAX, AVG string aggregation_key = 7; // Property to aggregate int64 limit = 8; // Max results ScanType scan_type = 9; // TABLE_SCAN, PRIMARY_SCAN, INDEX_SCAN } enum ScanType { TABLE_SCAN = 0; // Full table scan PRIMARY_SCAN = 1; // Primary key lookup INDEX_SCAN = 2; // Index scan }
QueryResponse:
message QueryResponse { repeated KV data = 1; // Key-value pairs bool has_more = 2; // More results available bytes continuation_token = 3; // For pagination }
CountResponse:
message CountResponse { int64 count = 1; // Aggregation result }
File: hg-store-grpc/src/main/proto/graphpb.proto
service GraphStore { // Scan partition (graph-specific API) rpc ScanPartition(ScanPartitionRequest) returns (stream ScanPartitionResponse); }
ScanPartitionRequest:
message ScanPartitionRequest { Header header = 1; int32 partition_id = 2; // Partition to scan ScanType scan_type = 3; // SCAN_VERTEX, SCAN_EDGE bytes start_key = 4; bytes end_key = 5; int64 limit = 6; } enum ScanType { SCAN_VERTEX = 0; SCAN_EDGE = 1; SCAN_ALL = 2; }
ScanPartitionResponse:
message ScanPartitionResponse { repeated Vertex vertices = 1; // Vertex results repeated Edge edges = 2; // Edge results } message Vertex { bytes id = 1; // Vertex ID string label = 2; // Vertex label repeated Property properties = 3; } message Edge { bytes id = 1; string label = 2; bytes source_id = 3; // Source vertex ID bytes target_id = 4; // Target vertex ID repeated Property properties = 5; } message Property { string key = 1; // Property name Variant value = 2; // Property value } message Variant { oneof value { int32 int_value = 1; int64 long_value = 2; float float_value = 3; double double_value = 4; string string_value = 5; bytes bytes_value = 6; bool bool_value = 7; } }
File: hg-store-grpc/src/main/proto/store_state.proto
service HgStoreState { // Subscribe to state changes rpc SubState(SubStateRequest) returns (stream StateResponse); // Unsubscribe rpc UnsubState(UnsubStateRequest) returns (UnsubStateResponse); // Get partition scan state rpc GetScanState(GetScanStateRequest) returns (GetScanStateResponse); // Get Raft peers rpc GetPeers(GetPeersRequest) returns (GetPeersResponse); }
StateResponse:
message StateResponse { NodeState node_state = 1; // Store node state repeated PartitionState partition_states = 2; } message NodeState { int64 store_id = 1; string address = 2; NodeStateType state = 3; // STARTING, ONLINE, PAUSE, STOPPING, ERROR } enum NodeStateType { STARTING = 0; STANDBY = 1; ONLINE = 2; PAUSE = 3; STOPPING = 4; HALTED = 5; ERROR = 6; } message PartitionState { int32 partition_id = 1; int64 leader_term = 2; string leader_address = 3; PartitionStateType state = 4; }
Inefficient (full scan):
// g.V().has('name', 'Alice') # Scans all vertices
Efficient (index scan):
// Create index first schema.indexLabel("personByName").onV("person").by("name").secondary().create() // Query uses index // g.V().has('name', 'Alice') # Scans index only
Inefficient:
// Fetches all, then limits at Server HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .build(); HgStoreResultSet results = session.query(query); // Server limits to 100 after fetching all
Efficient:
// Limits at Store HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .limit(100) // Store limits before sending .build(); HgStoreResultSet results = session.query(query);
Inefficient:
// Fetch all, count at Server HgStoreResultSet results = session.query(query); int count = 0; while (results.hasNext()) { results.next(); count++; }
Efficient:
// Count at Store HgStoreQuery query = HgStoreQuery.builder() .table("VERTEX") .prefix("person:") .aggregation(HgStoreQuery.Aggregation.COUNT) .build(); long count = session.aggregate(query);
Inefficient (N sequential requests):
for (String id : vertexIds) { byte[] value = session.get(table, id.getBytes()); }
Efficient (single batch request):
List<byte[]> keys = vertexIds.stream() .map(String::getBytes) .collect(Collectors.toList()); Map<byte[], byte[]> results = session.batchGet(table, keys);
Typical Latencies (production cluster, 3 Store nodes):
| Operation | Single Partition | Multi-Partition (12 partitions) |
|---|---|---|
| Get by key | 1-2 ms | N/A |
| Scan 100 rows | 5-10 ms | 15-30 ms |
| Scan 10,000 rows | 50-100 ms | 100-200 ms |
| Count (no filter) | 10-20 ms | 30-60 ms |
| Count (with filter) | 20-50 ms | 50-150 ms |
| Index scan (100 rows) | 3-8 ms | 10-25 ms |
Factors Affecting Performance:
For operational monitoring of query performance, see Operations Guide.
For RocksDB tuning to improve query speed, see Best Practices.