HugeGraph-Computer is a distributed graph processing framework implementing the Pregel model (BSP - Bulk Synchronous Parallel). It runs on Kubernetes or YARN clusters and integrates with HugeGraph for graph input/output.
┌─────────────────────────────────────────────────────────────┐ │ HugeGraph-Computer │ ├─────────────────────────────────────────────────────────────┤ │ ┌─────────────────────────────────────────────────────┐ │ │ │ computer-driver │ │ │ │ (Job Submission & Coordination) │ │ │ └─────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌─────────────────────────┼───────────────────────────┐ │ │ │ Deployment Layer (choose one) │ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ │ │ computer-k8s │ │ computer-yarn│ │ │ │ │ └──────────────┘ └──────────────┘ │ │ │ └─────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌─────────────────────────┼───────────────────────────┐ │ │ │ computer-core │ │ │ │ (WorkerService, MasterService, BSP) │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ │ │ Managers: Message, Aggregation, Snapshot... │ │ │ │ │ └──────────────────────────────────────────────┘ │ │ │ └─────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌─────────────────────────┼───────────────────────────┐ │ │ │ computer-algorithm │ │ │ │ (PageRank, LPA, WCC, SSSP, TriangleCount...) │ │ │ └─────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌─────────────────────────┴───────────────────────────┐ │ │ │ computer-api │ │ │ │ (Computation, Vertex, Edge, Aggregator, Value) │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
| Module | Description |
|---|---|
| computer-api | Public interfaces for algorithm development (Computation, Vertex, Edge, Aggregator, Combiner) |
| computer-core | Runtime implementation (WorkerService, MasterService, messaging, BSP coordination, memory management) |
| computer-algorithm | Built-in graph algorithms (45+ implementations) |
| computer-driver | Job submission and driver-side coordination |
| computer-k8s | Kubernetes deployment integration |
| computer-yarn | YARN deployment integration |
| computer-k8s-operator | Kubernetes operator for job lifecycle management |
| computer-dist | Distribution packaging and assembly |
| computer-test | Integration tests and unit tests |
BSP_ETCD_URL)Note: For K8s-operator module development, run mvn clean install in computer-k8s-operator first to generate CRD classes.
cd computer # Compile (skip javadoc for faster builds) mvn clean compile -Dmaven.javadoc.skip=true # Package (skip tests for faster packaging) mvn clean package -DskipTests
# Unit tests mvn test -P unit-test # Integration tests (requires etcd, K8s, HugeGraph) mvn test -P integrate-test # Run specific test class mvn test -P unit-test -Dtest=ClassName # Run specific test method mvn test -P unit-test -Dtest=ClassName#methodName
mvn apache-rat:check
Create job-config.properties:
# Algorithm class algorithm.class=org.apache.hugegraph.computer.algorithm.centrality.pagerank.PageRank # HugeGraph connection hugegraph.url=http://hugegraph-server:8080 hugegraph.graph=hugegraph # K8s configuration k8s.namespace=default k8s.image=hugegraph/hugegraph-computer:latest k8s.master.cpu=2 k8s.master.memory=4Gi k8s.worker.replicas=3 k8s.worker.cpu=4 k8s.worker.memory=8Gi # BSP coordination (etcd) bsp.etcd.url=http://etcd-cluster:2379 # Algorithm parameters (PageRank example) # Alpha parameter (1 - damping factor), default: 0.15 page_rank.alpha=0.85 # Maximum supersteps (iterations), controlled by BSP framework bsp.max_superstep=20 # L1 norm difference threshold for convergence, default: 0.00001 pagerank.l1DiffThreshold=0.0001
java -jar computer-driver/target/computer-driver-${VERSION}.jar \ --config job-config.properties
# Check pod status kubectl get pods -n default # View master logs kubectl logs hugegraph-computer-master-xxx -n default # View worker logs kubectl logs hugegraph-computer-worker-0 -n default
Note: YARN deployment support is under development. Use Kubernetes for production deployments.
| Algorithm | Class | Description |
|---|---|---|
| PageRank | algorithm.centrality.pagerank.PageRank | Standard PageRank |
| Personalized PageRank | algorithm.centrality.pagerank.PersonalizedPageRank | Source-specific PageRank |
| Betweenness Centrality | algorithm.centrality.betweenness.BetweennessCentrality | Shortest-path-based centrality |
| Closeness Centrality | algorithm.centrality.closeness.ClosenessCentrality | Average distance centrality |
| Degree Centrality | algorithm.centrality.degree.DegreeCentrality | In/out degree counting |
| Algorithm | Class | Description |
|---|---|---|
| LPA | algorithm.community.lpa.Lpa | Label Propagation Algorithm |
| WCC | algorithm.community.wcc.Wcc | Weakly Connected Components |
| Louvain | algorithm.community.louvain.Louvain | Modularity-based community detection |
| K-Core | algorithm.community.kcore.KCore | K-core decomposition |
| Algorithm | Class | Description |
|---|---|---|
| SSSP | algorithm.path.sssp.Sssp | Single Source Shortest Path |
| BFS | algorithm.traversal.bfs.Bfs | Breadth-First Search |
| Rings | algorithm.path.rings.Rings | Cycle/ring detection |
| Algorithm | Class | Description |
|---|---|---|
| Triangle Count | algorithm.trianglecount.TriangleCount | Count triangles |
| Clustering Coefficient | algorithm.clusteringcoefficient.ClusteringCoefficient | Local clustering measure |
Full algorithm list: See computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/
Algorithms implement the Computation interface from computer-api:
package org.apache.hugegraph.computer.core.worker; public interface Computation<M extends Value> { /** * Initialization at superstep 0 */ void compute0(ComputationContext context, Vertex vertex); /** * Message processing in subsequent supersteps */ void compute(ComputationContext context, Vertex vertex, Iterator<M> messages); }
NOTE: This is a simplified example showing the key concepts. For the complete implementation including all required methods (
name(),category(),init(), etc.), see:computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java
package org.apache.hugegraph.computer.algorithm.centrality.pagerank; import org.apache.hugegraph.computer.core.worker.Computation; import org.apache.hugegraph.computer.core.worker.ComputationContext; public class PageRank implements Computation<DoubleValue> { public static final String OPTION_ALPHA = "pagerank.alpha"; public static final String OPTION_MAX_ITERATIONS = "pagerank.max_iterations"; private double alpha; private int maxIterations; @Override public void init(Config config) { this.alpha = config.getDouble(OPTION_ALPHA, 0.85); this.maxIterations = config.getInt(OPTION_MAX_ITERATIONS, 20); } @Override public void compute0(ComputationContext context, Vertex vertex) { // Initialize: set initial PR value vertex.value(new DoubleValue(1.0)); // Send PR to neighbors int edgeCount = vertex.numEdges(); if (edgeCount > 0) { double contribution = 1.0 / edgeCount; context.sendMessageToAllEdges(vertex, new DoubleValue(contribution)); } } @Override public void compute(ComputationContext context, Vertex vertex, Iterator<DoubleValue> messages) { // Sum incoming PR contributions double sum = 0.0; while (messages.hasNext()) { sum += messages.next().value(); } // Calculate new PR value double newPR = (1.0 - alpha) + alpha * sum; vertex.value(new DoubleValue(newPR)); // Send to neighbors if not converged if (context.superstep() < maxIterations) { int edgeCount = vertex.numEdges(); if (edgeCount > 0) { double contribution = newPR / edgeCount; context.sendMessageToAllEdges(vertex, new DoubleValue(contribution)); } } else { vertex.inactivate(); } } }
compute0()compute()// Send to specific vertex context.sendMessage(targetId, new DoubleValue(1.0)); // Send to all outgoing edges context.sendMessageToAllEdges(vertex, new DoubleValue(1.0));
Global state shared across all workers:
// Register aggregator in compute0() context.registerAggregator("sum", new DoubleValue(0.0), SumAggregator.class); // Write to aggregator context.aggregateValue("sum", new DoubleValue(vertex.value())); // Read aggregator value (available in next superstep) DoubleValue total = context.aggregatedValue("sum");
Reduce message volume by combining messages at sender:
public class SumCombiner implements Combiner<DoubleValue> { @Override public void combine(DoubleValue v1, DoubleValue v2, DoubleValue result) { result.value(v1.value() + v2.value()); } }
Computation interface in computer-algorithmOPTION_* constantscompute0() for initializationcompute() for message processingalgorithm.class=com.example.MyAlgorithm myalgorithm.param1=value1
mvn clean package -DskipTests
HugeGraph-Computer uses etcd for BSP barrier synchronization:
workerStepPrepareDone → waitMasterStepPrepareDoneworkerStepComputeDone → waitMasterStepComputeDoneworkerStepDone → waitMasterStepDone (master returns SuperstepStat)WorkerService composes multiple managers with lifecycle hooks:
MessageSendManager: Outgoing message buffering and sendingMessageRecvManager: Incoming message receiving and sortingWorkerAggrManager: Aggregator value collectionDataServerManager: Inter-worker data transferSortManagers: Message and edge sortingSnapshotManager: Checkpoint creationAll managers implement:
initAll(): Initialize before first superstepbeforeSuperstep(): Prepare for superstepafterSuperstep(): Cleanup after superstepcloseAll(): Shutdown cleanup# === Algorithm === algorithm.class=<fully qualified class name> algorithm.message_class=<message value class> algorithm.result_class=<result value class> # === HugeGraph Input === hugegraph.url=http://localhost:8080 hugegraph.graph=hugegraph hugegraph.input.vertex_label=person hugegraph.input.edge_label=knows hugegraph.input.filter=<gremlin filter> # === HugeGraph Output === hugegraph.output.vertex_property=pagerank_value hugegraph.output.edge_property=<property name> # === HDFS Input === input.hdfs.path=/graph/input input.hdfs.format=json # === HDFS Output === output.hdfs.path=/graph/output output.hdfs.format=json # === Worker Resources === worker.count=3 worker.memory=8Gi worker.cpu=4 worker.thread_count=<cpu cores> # === BSP Coordination === bsp.etcd.url=http://etcd:2379 bsp.max_superstep=100 bsp.log_interval=10 # === Memory Management === worker.data.dirs=/data1,/data2 worker.write_buffer_size=134217728 worker.max_spill_size=1073741824
Computer auto-manages memory to prevent OOM:
worker.data.dirs)Best Practice: Allocate worker memory ≥ 2x graph size for optimal performance.
# Generate CRD classes first cd computer-k8s-operator mvn clean install
Generated classes appear in computer-k8s/target/generated-sources/.
bsp.etcd.url is reachable from all podsetcdctl endpoint healthworker.memory in job configworker.write_buffer_size to trigger earlier spillingworker.count to distribute graph across more workerspagerank.convergence_tolerance)| File | Description |
|---|---|
computer-api/.../Computation.java | Algorithm interface contract (computer/computer-api/src/main/java/org/apache/hugegraph/computer/core/worker/Computation.java:25) |
computer-core/.../WorkerService.java | Worker runtime orchestration (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java:1) |
computer-core/.../Bsp4Worker.java | BSP coordination logic (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/Bsp4Worker.java:1) |
computer-algorithm/.../PageRank.java | Example algorithm implementation (computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java:1) |
The CI pipeline (.github/workflows/computer-ci.yml) runs:
-P integrate-test)-P unit-test)# Setup test environment (etcd, HDFS, K8s) cd computer-dist/src/assembly/travis ./start-etcd.sh ./start-hdfs.sh ./start-minikube.sh # Run tests cd ../../../../ mvn test -P integrate-test
See the main Contributing Guide for how to contribute.
HugeGraph-Computer is licensed under Apache 2.0 License.