feat(core): isolate namespace for different input data source (#252)
* set default job namespace to empty string
* use StringUtils.isEmpty
diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
index 765f8ad..33fba83 100644
--- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
+++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
@@ -420,6 +420,14 @@
10000
);
+ public static final ConfigOption<String> JOB_NAMESPACE =
+ new ConfigOption<>(
+ "job.namespace",
+ "The job namespace can seperate different data source.",
+ null,
+ ""
+ );
+
public static final ConfigOption<String> JOB_ID =
new ConfigOption<>(
"job.id",
@@ -921,6 +929,7 @@
ComputerOptions.BSP_ETCD_ENDPOINTS.name(),
ComputerOptions.TRANSPORT_SERVER_HOST.name(),
ComputerOptions.TRANSPORT_SERVER_PORT.name(),
+ ComputerOptions.JOB_NAMESPACE.name(),
ComputerOptions.JOB_ID.name(),
ComputerOptions.JOB_WORKERS_COUNT.name(),
ComputerOptions.RPC_SERVER_HOST_NAME,
diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java
index 66a3fd9..23d01f4 100644
--- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java
+++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/BspBase.java
@@ -17,6 +17,7 @@
package org.apache.hugegraph.computer.core.bsp;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
@@ -30,6 +31,7 @@
private final Config config;
private final String jobId;
+ private final String jobNamespace;
private final int workerCount;
private final long registerTimeout;
private final long barrierOnMasterTimeout;
@@ -42,6 +44,7 @@
this.config = config;
this.jobId = config.get(ComputerOptions.JOB_ID);
+ this.jobNamespace = config.get(ComputerOptions.JOB_NAMESPACE);
this.workerCount = this.config.get(ComputerOptions.JOB_WORKERS_COUNT);
this.registerTimeout = this.config.get(
ComputerOptions.BSP_REGISTER_TIMEOUT);
@@ -59,7 +62,10 @@
*/
private BspClient init() {
BspClient bspClient = this.createBspClient();
- bspClient.init(this.jobId);
+ String namespace = StringUtils.isEmpty(this.jobNamespace) ?
+ this.constructPath(null, this.jobId) :
+ this.constructPath(null, this.jobNamespace, this.jobId);
+ bspClient.init(namespace);
LOG.info("Init {} BSP connection to '{}' for job '{}'",
bspClient.type(), bspClient.endpoint(), this.jobId);
return bspClient;
@@ -123,8 +129,10 @@
*/
protected String constructPath(BspEvent event, Object... paths) {
StringBuilder sb = new StringBuilder();
- // TODO: replace event.code() with event.name()
- sb.append(event.name());
+ if (event != null) {
+ // TODO: replace event.code() with event.name()
+ sb.append(event.name());
+ }
for (Object path : paths) {
sb.append("/").append(path.toString());
}