blob: 957c18d2dd50284e87f01ad632254797319a2839 [file] [log] [blame] [view]
# Apache HugeGraph-Computer (Java)
<!-- status:deepwiki-budget:0 -->
HugeGraph-Computer is a distributed graph processing framework implementing the [Pregel](https://kowshik.github.io/JPregel/pregel_paper.pdf) model (BSP - Bulk Synchronous Parallel). It runs on Kubernetes or YARN clusters and integrates with HugeGraph for graph input/output.
## Features
- **Distributed MPP Computing**: Massively parallel graph processing across cluster nodes
- **BSP Model**: Algorithm execution through iterative supersteps with global synchronization
- **Auto Memory Management**: Automatic spill to disk when memory is insufficient - never OOM
- **Flexible Data Sources**: Load from HugeGraph or HDFS, output to HugeGraph or HDFS
- **Easy Algorithm Development**: Focus on single-vertex logic without worrying about distribution
- **Production-Ready**: Battle-tested on billion-scale graphs with Kubernetes integration
## Architecture
### Module Structure
```
┌─────────────────────────────────────────────────────────────┐
│ HugeGraph-Computer │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────┐ │
│ │ computer-driver │ │
│ │ (Job Submission & Coordination) │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────────────────────┐ │
│ │ Deployment Layer (choose one) │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ computer-k8s │ │ computer-yarn│ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────────────────────┐ │
│ │ computer-core │ │
│ │ (WorkerService, MasterService, BSP) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Managers: Message, Aggregation, Snapshot... │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┼───────────────────────────┐ │
│ │ computer-algorithm │ │
│ │ (PageRank, LPA, WCC, SSSP, TriangleCount...) │ │
│ └─────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┴───────────────────────────┐ │
│ │ computer-api │ │
│ │ (Computation, Vertex, Edge, Aggregator, Value) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### Module Descriptions
| Module | Description |
|--------|-------------|
| **computer-api** | Public interfaces for algorithm development (`Computation`, `Vertex`, `Edge`, `Aggregator`, `Combiner`) |
| **computer-core** | Runtime implementation (WorkerService, MasterService, messaging, BSP coordination, memory management) |
| **computer-algorithm** | Built-in graph algorithms (45+ implementations) |
| **computer-driver** | Job submission and driver-side coordination |
| **computer-k8s** | Kubernetes deployment integration |
| **computer-yarn** | YARN deployment integration |
| **computer-k8s-operator** | Kubernetes operator for job lifecycle management |
| **computer-dist** | Distribution packaging and assembly |
| **computer-test** | Integration tests and unit tests |
## Prerequisites
- **JDK 11** or later (for building and running)
- **Maven 3.5+** for building
- **Kubernetes cluster** or **YARN cluster** for deployment
- **etcd** for BSP coordination (configured via `BSP_ETCD_URL`)
**Note**: For K8s-operator module development, run `mvn clean install` in `computer-k8s-operator` first to generate CRD classes.
## Quick Start
### Build from Source
```bash
cd computer
# Compile (skip javadoc for faster builds)
mvn clean compile -Dmaven.javadoc.skip=true
# Package (skip tests for faster packaging)
mvn clean package -DskipTests
```
### Run Tests
```bash
# Unit tests
mvn test -P unit-test
# Integration tests (requires etcd, K8s, HugeGraph)
mvn test -P integrate-test
# Run specific test class
mvn test -P unit-test -Dtest=ClassName
# Run specific test method
mvn test -P unit-test -Dtest=ClassName#methodName
```
### License Check
```bash
mvn apache-rat:check
```
### Deploy on Kubernetes
#### 1. Configure Job
Create `job-config.properties`:
```properties
# Algorithm class
algorithm.class=org.apache.hugegraph.computer.algorithm.centrality.pagerank.PageRank
# HugeGraph connection
hugegraph.url=http://hugegraph-server:8080
hugegraph.graph=hugegraph
# K8s configuration
k8s.namespace=default
k8s.image=hugegraph/hugegraph-computer:latest
k8s.master.cpu=2
k8s.master.memory=4Gi
k8s.worker.replicas=3
k8s.worker.cpu=4
k8s.worker.memory=8Gi
# BSP coordination (etcd)
bsp.etcd.url=http://etcd-cluster:2379
# Algorithm parameters (PageRank example)
# Alpha parameter (1 - damping factor), default: 0.15
page_rank.alpha=0.85
# Maximum supersteps (iterations), controlled by BSP framework
bsp.max_superstep=20
# L1 norm difference threshold for convergence, default: 0.00001
pagerank.l1DiffThreshold=0.0001
```
#### 2. Submit Job
```bash
java -jar computer-driver/target/computer-driver-${VERSION}.jar \
--config job-config.properties
```
#### 3. Monitor Job
```bash
# Check pod status
kubectl get pods -n default
# View master logs
kubectl logs hugegraph-computer-master-xxx -n default
# View worker logs
kubectl logs hugegraph-computer-worker-0 -n default
```
### Deploy on YARN
**Note**: YARN deployment support is under development. Use Kubernetes for production deployments.
## Available Algorithms
### Centrality Algorithms
| Algorithm | Class | Description |
|-----------|-------|-------------|
| PageRank | `algorithm.centrality.pagerank.PageRank` | Standard PageRank |
| Personalized PageRank | `algorithm.centrality.pagerank.PersonalizedPageRank` | Source-specific PageRank |
| Betweenness Centrality | `algorithm.centrality.betweenness.BetweennessCentrality` | Shortest-path-based centrality |
| Closeness Centrality | `algorithm.centrality.closeness.ClosenessCentrality` | Average distance centrality |
| Degree Centrality | `algorithm.centrality.degree.DegreeCentrality` | In/out degree counting |
### Community Detection
| Algorithm | Class | Description |
|-----------|-------|-------------|
| LPA | `algorithm.community.lpa.Lpa` | Label Propagation Algorithm |
| WCC | `algorithm.community.wcc.Wcc` | Weakly Connected Components |
| Louvain | `algorithm.community.louvain.Louvain` | Modularity-based community detection |
| K-Core | `algorithm.community.kcore.KCore` | K-core decomposition |
### Path Finding
| Algorithm | Class | Description |
|-----------|-------|-------------|
| SSSP | `algorithm.path.sssp.Sssp` | Single Source Shortest Path |
| BFS | `algorithm.traversal.bfs.Bfs` | Breadth-First Search |
| Rings | `algorithm.path.rings.Rings` | Cycle/ring detection |
### Graph Structure
| Algorithm | Class | Description |
|-----------|-------|-------------|
| Triangle Count | `algorithm.trianglecount.TriangleCount` | Count triangles |
| Clustering Coefficient | `algorithm.clusteringcoefficient.ClusteringCoefficient` | Local clustering measure |
**Full algorithm list**: See `computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/`
## Developing Custom Algorithms
### Algorithm Contract
Algorithms implement the `Computation` interface from `computer-api`:
```java
package org.apache.hugegraph.computer.core.worker;
public interface Computation<M extends Value> {
/**
* Initialization at superstep 0
*/
void compute0(ComputationContext context, Vertex vertex);
/**
* Message processing in subsequent supersteps
*/
void compute(ComputationContext context, Vertex vertex, Iterator<M> messages);
}
```
### Example: Simple PageRank
> **NOTE**: This is a simplified example showing the key concepts.
> For the complete implementation including all required methods (`name()`, `category()`, `init()`, etc.),
> see: `computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java`
```java
package org.apache.hugegraph.computer.algorithm.centrality.pagerank;
import org.apache.hugegraph.computer.core.worker.Computation;
import org.apache.hugegraph.computer.core.worker.ComputationContext;
public class PageRank implements Computation<DoubleValue> {
public static final String OPTION_ALPHA = "pagerank.alpha";
public static final String OPTION_MAX_ITERATIONS = "pagerank.max_iterations";
private double alpha;
private int maxIterations;
@Override
public void init(Config config) {
this.alpha = config.getDouble(OPTION_ALPHA, 0.85);
this.maxIterations = config.getInt(OPTION_MAX_ITERATIONS, 20);
}
@Override
public void compute0(ComputationContext context, Vertex vertex) {
// Initialize: set initial PR value
vertex.value(new DoubleValue(1.0));
// Send PR to neighbors
int edgeCount = vertex.numEdges();
if (edgeCount > 0) {
double contribution = 1.0 / edgeCount;
context.sendMessageToAllEdges(vertex, new DoubleValue(contribution));
}
}
@Override
public void compute(ComputationContext context, Vertex vertex, Iterator<DoubleValue> messages) {
// Sum incoming PR contributions
double sum = 0.0;
while (messages.hasNext()) {
sum += messages.next().value();
}
// Calculate new PR value
double newPR = (1.0 - alpha) + alpha * sum;
vertex.value(new DoubleValue(newPR));
// Send to neighbors if not converged
if (context.superstep() < maxIterations) {
int edgeCount = vertex.numEdges();
if (edgeCount > 0) {
double contribution = newPR / edgeCount;
context.sendMessageToAllEdges(vertex, new DoubleValue(contribution));
}
} else {
vertex.inactivate();
}
}
}
```
### Key Concepts
#### 1. Supersteps
- **Superstep 0**: Initialization via `compute0()`
- **Superstep 1+**: Message processing via `compute()`
- **Barrier Synchronization**: All workers complete superstep N before starting N+1
#### 2. Message Passing
```java
// Send to specific vertex
context.sendMessage(targetId, new DoubleValue(1.0));
// Send to all outgoing edges
context.sendMessageToAllEdges(vertex, new DoubleValue(1.0));
```
#### 3. Aggregators
Global state shared across all workers:
```java
// Register aggregator in compute0()
context.registerAggregator("sum", new DoubleValue(0.0), SumAggregator.class);
// Write to aggregator
context.aggregateValue("sum", new DoubleValue(vertex.value()));
// Read aggregator value (available in next superstep)
DoubleValue total = context.aggregatedValue("sum");
```
#### 4. Combiners
Reduce message volume by combining messages at sender:
```java
public class SumCombiner implements Combiner<DoubleValue> {
@Override
public void combine(DoubleValue v1, DoubleValue v2, DoubleValue result) {
result.value(v1.value() + v2.value());
}
}
```
### Algorithm Development Workflow
1. **Implement `Computation` interface** in `computer-algorithm`
2. **Add configuration options** with `OPTION_*` constants
3. **Implement `compute0()` for initialization**
4. **Implement `compute()` for message processing**
5. **Configure in job properties**:
```properties
algorithm.class=com.example.MyAlgorithm
myalgorithm.param1=value1
```
6. **Build and test**:
```bash
mvn clean package -DskipTests
```
## BSP Coordination
HugeGraph-Computer uses etcd for BSP barrier synchronization:
### BSP Lifecycle (per superstep)
1. **Worker Prepare**: `workerStepPrepareDone` `waitMasterStepPrepareDone`
2. **Compute Phase**: Workers process vertices and messages locally
3. **Worker Compute Done**: `workerStepComputeDone` `waitMasterStepComputeDone`
4. **Aggregation**: Aggregators combine global state
5. **Worker Step Done**: `workerStepDone` `waitMasterStepDone` (master returns `SuperstepStat`)
### Manager Pattern
`WorkerService` composes multiple managers with lifecycle hooks:
- `MessageSendManager`: Outgoing message buffering and sending
- `MessageRecvManager`: Incoming message receiving and sorting
- `WorkerAggrManager`: Aggregator value collection
- `DataServerManager`: Inter-worker data transfer
- `SortManagers`: Message and edge sorting
- `SnapshotManager`: Checkpoint creation
All managers implement:
- `initAll()`: Initialize before first superstep
- `beforeSuperstep()`: Prepare for superstep
- `afterSuperstep()`: Cleanup after superstep
- `closeAll()`: Shutdown cleanup
## Configuration Reference
### Job Configuration
```properties
# === Algorithm ===
algorithm.class=<fully qualified class name>
algorithm.message_class=<message value class>
algorithm.result_class=<result value class>
# === HugeGraph Input ===
hugegraph.url=http://localhost:8080
hugegraph.graph=hugegraph
hugegraph.input.vertex_label=person
hugegraph.input.edge_label=knows
hugegraph.input.filter=<gremlin filter>
# === HugeGraph Output ===
hugegraph.output.vertex_property=pagerank_value
hugegraph.output.edge_property=<property name>
# === HDFS Input ===
input.hdfs.path=/graph/input
input.hdfs.format=json
# === HDFS Output ===
output.hdfs.path=/graph/output
output.hdfs.format=json
# === Worker Resources ===
worker.count=3
worker.memory=8Gi
worker.cpu=4
worker.thread_count=<cpu cores>
# === BSP Coordination ===
bsp.etcd.url=http://etcd:2379
bsp.max_superstep=100
bsp.log_interval=10
# === Memory Management ===
worker.data.dirs=/data1,/data2
worker.write_buffer_size=134217728
worker.max_spill_size=1073741824
```
## Memory Management
Computer auto-manages memory to prevent OOM:
1. **In-Memory Buffering**: Vertices, edges, messages buffered in memory
2. **Spill Threshold**: When memory usage exceeds threshold, spill to disk
3. **Disk Storage**: Configurable data directories (`worker.data.dirs`)
4. **Automatic Cleanup**: Spilled data cleaned after superstep completion
**Best Practice**: Allocate worker memory 2x graph size for optimal performance.
## Troubleshooting
### K8s CRD Classes Not Found
```bash
# Generate CRD classes first
cd computer-k8s-operator
mvn clean install
```
Generated classes appear in `computer-k8s/target/generated-sources/`.
### etcd Connection Errors
- Verify `bsp.etcd.url` is reachable from all pods
- Check etcd cluster health: `etcdctl endpoint health`
- Ensure firewall allows port 2379
### Out of Memory Errors
- Increase `worker.memory` in job config
- Reduce `worker.write_buffer_size` to trigger earlier spilling
- Increase `worker.count` to distribute graph across more workers
### Slow Convergence
- Check algorithm parameters (e.g., `pagerank.convergence_tolerance`)
- Monitor superstep logs for progress
- Consider using combiners to reduce message volume
## Important Files
| File | Description |
|------|-------------|
| `computer-api/.../Computation.java` | Algorithm interface contract (computer/computer-api/src/main/java/org/apache/hugegraph/computer/core/worker/Computation.java:25) |
| `computer-core/.../WorkerService.java` | Worker runtime orchestration (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java:1) |
| `computer-core/.../Bsp4Worker.java` | BSP coordination logic (computer/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/Bsp4Worker.java:1) |
| `computer-algorithm/.../PageRank.java` | Example algorithm implementation (computer/computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/centrality/pagerank/PageRank.java:1) |
## Testing
### CI/CD Pipeline
The CI pipeline (`.github/workflows/computer-ci.yml`) runs:
1. License check (Apache RAT)
2. Setup HDFS (Hadoop 3.3.2)
3. Setup Minikube/Kubernetes
4. Load test data into HugeGraph
5. Compile with JDK 11
6. Run integration tests (`-P integrate-test`)
7. Run unit tests (`-P unit-test`)
8. Upload coverage to Codecov
### Local Testing
```bash
# Setup test environment (etcd, HDFS, K8s)
cd computer-dist/src/assembly/travis
./start-etcd.sh
./start-hdfs.sh
./start-minikube.sh
# Run tests
cd ../../../../
mvn test -P integrate-test
```
## Links
- [Project Homepage](https://hugegraph.apache.org/docs/quickstart/hugegraph-computer/)
- [Main README](../README.md)
- [Vermeer (Go) README](../vermeer/README.md)
- [GitHub Issues](https://github.com/apache/hugegraph-computer/issues)
## Contributing
See the main [Contributing Guide](../README.md#contributing) for how to contribute.
## License
HugeGraph-Computer is licensed under [Apache 2.0 License](https://github.com/apache/hugegraph-computer/blob/master/LICENSE).