GIRAPH-1139
closes #30
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 976997f..c3fd141 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -188,12 +188,12 @@
private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
/** Job id, to ensure uniqueness */
private final String jobId;
- /** Task partition, to ensure uniqueness */
- private final int taskPartition;
+ /** Task id, from partition and application attempt to ensure uniqueness */
+ private final int taskId;
/** My hostname */
private final String hostname;
- /** Combination of hostname '_' partition (unique id) */
- private final String hostnamePartitionId;
+ /** Combination of hostname '_' task (unique id) */
+ private final String hostnameTaskId;
/** Graph partitioner */
private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
/** Mapper that will do the graph computation */
@@ -231,8 +231,8 @@
this.context = context;
this.graphTaskManager = graphTaskManager;
this.conf = graphTaskManager.getConf();
+
this.jobId = conf.getJobId();
- this.taskPartition = conf.getTaskPartition();
this.restartedSuperstep = conf.getLong(
GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
try {
@@ -240,7 +240,6 @@
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
- this.hostnamePartitionId = hostname + "_" + getTaskPartition();
this.graphPartitionerFactory = conf.createGraphPartitioner();
basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
@@ -252,6 +251,8 @@
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
+
+
String restartJobId = RESTART_JOB_ID.get(conf);
savedCheckpointBasePath =
@@ -272,7 +273,7 @@
}
if (LOG.isInfoEnabled()) {
LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
- ", " + getTaskPartition() + " on " + serverPortList);
+ ", partition " + conf.getTaskPartition() + " on " + serverPortList);
}
try {
this.zk = new ZooKeeperExt(serverPortList,
@@ -288,6 +289,10 @@
throw new RuntimeException(e);
}
+ this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
+ conf.getTaskPartition();
+ this.hostnameTaskId = hostname + "_" + getTaskId();
+
//Trying to restart from the latest superstep
if (restartJobId != null &&
restartedSuperstep == UNSET_SUPERSTEP) {
@@ -529,12 +534,12 @@
return hostname;
}
- public final String getHostnamePartitionId() {
- return hostnamePartitionId;
+ public final String getHostnameTaskId() {
+ return hostnameTaskId;
}
- public final int getTaskPartition() {
- return taskPartition;
+ public final int getTaskId() {
+ return taskId;
}
public final GraphTaskManager<I, V, E> getGraphTaskManager() {
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 779bccb..d1dc79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -579,15 +579,15 @@
if (LOG.isInfoEnabled()) {
Set<Integer> partitionSet = new TreeSet<Integer>();
for (WorkerInfo workerInfo : healthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
+ partitionSet.add(workerInfo.getTaskId() % maxWorkers);
}
for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
+ partitionSet.add(workerInfo.getTaskId() % maxWorkers);
}
for (int i = 1; i <= maxWorkers; ++i) {
if (partitionSet.contains(Integer.valueOf(i))) {
continue;
- } else if (i == getTaskPartition()) {
+ } else if (i == getTaskId() % maxWorkers) {
continue;
} else {
LOG.info("logMissingWorkersOnSuperstep: No response from " +
@@ -802,7 +802,7 @@
try {
myBid =
getZkExt().createExt(masterElectionPath +
- "/" + getHostnamePartitionId(),
+ "/" + getHostnameTaskId(),
null,
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
@@ -841,7 +841,7 @@
}
if (masterChildArr.get(0).equals(myBid)) {
GiraphStats.getInstance().getCurrentMasterTaskPartition().
- setValue(getTaskPartition());
+ setValue(getTaskId());
globalCommHandler = new MasterGlobalCommHandler(
new MasterAggregatorHandler(getConfiguration(), getContext()),
@@ -860,7 +860,7 @@
getGraphTaskManager().createUncaughtExceptionHandler());
masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
masterServer.getLocalHostOrIp());
- masterInfo.setTaskId(getTaskPartition());
+ masterInfo.setTaskId(getTaskId());
masterClient =
new NettyMasterClient(getContext(), getConfiguration(), this,
getGraphTaskManager().createUncaughtExceptionHandler());
@@ -1211,6 +1211,7 @@
setApplicationAttempt(getApplicationAttempt() + 1);
setCachedSuperstep(checkpoint);
setRestartedSuperstep(checkpoint);
+ checkpointStatus = CheckpointStatus.NONE;
setJobState(ApplicationState.START_SUPERSTEP,
getApplicationAttempt(),
checkpoint);
@@ -1740,7 +1741,7 @@
if (checkpointFrequency == 0) {
return CheckpointStatus.NONE;
}
- long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency;
+ long firstCheckpoint = INPUT_SUPERSTEP + 1;
if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
}
@@ -1912,7 +1913,7 @@
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String masterCleanedUpPath = cleanedUpPath + "/" +
- getTaskPartition() + MASTER_SUFFIX;
+ getTaskId() + MASTER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(masterCleanedUpPath,
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index b6b9c12..6f02749 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -216,7 +216,7 @@
graphTaskManager.createUncaughtExceptionHandler());
workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
workerServer.getLocalHostOrIp());
- workerInfo.setTaskId(getTaskPartition());
+ workerInfo.setTaskId(getTaskId());
workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
graphTaskManager.createUncaughtExceptionHandler());
workerServer.setFlowControl(workerClient.getFlowControl());
@@ -243,7 +243,7 @@
}
observers = conf.createWorkerObservers(context);
- WorkerProgress.get().setTaskId(getTaskPartition());
+ WorkerProgress.get().setTaskId(getTaskId());
workerProgressWriter = conf.trackJobProgressOnClient() ?
new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
null;
@@ -921,7 +921,7 @@
String finishedWorkerPath =
getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
- "/" + getHostnamePartitionId();
+ "/" + workerInfo.getHostnameId();
try {
getZkExt().createExt(finishedWorkerPath,
workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
@@ -1202,7 +1202,7 @@
// for workers and masters, the master will clean up the ZooKeeper
// znodes associated with this job.
String workerCleanedUpPath = cleanedUpPath + "/" +
- getTaskPartition() + WORKER_SUFFIX;
+ getTaskId() + WORKER_SUFFIX;
try {
String finalFinishedPath =
getZkExt().createExt(workerCleanedUpPath,
@@ -1303,7 +1303,7 @@
// Notify master that checkpoint is stored
String workerWroteCheckpoint =
getWorkerWroteCheckpointPath(getApplicationAttempt(),
- getSuperstep()) + "/" + getHostnamePartitionId();
+ getSuperstep()) + "/" + workerInfo.getHostnameId();
try {
getZkExt().createExt(workerWroteCheckpoint,
new byte[0],