HAMA-939: Refactoring which was implement using out-of-date status response
git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1684349 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 1f7a36d..f7bed47 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -83,4 +83,3 @@
#log4j.logger.org.apache.hadoop.dfs=DEBUG
#log4j.logger.org.apache.hama=DEBUG
#log4j.logger.org.apache.zookeeper=DEBUG
-#log4j.logger.org.apache.avro=DEBUG
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
index f147be2..8eecbbf 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
@@ -237,16 +237,26 @@
@Override
public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
- try {
- String jobRegisterKey = constructKey(jobId, "peers");
- if (zk.exists(jobRegisterKey, false) == null) {
+ int count = 0;
+ String jobRegisterKey = constructKey(jobId, "peers");
+ Stat stat = null;
+
+ LOG.info("TaskAttemptID : " + taskId);
+ while (stat != null) {
+ try {
+ stat = zk.exists(jobRegisterKey, false);
zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ LOG.debug(e); // ignore it.
}
- } catch (KeeperException e) {
- LOG.error(e);
- } catch (InterruptedException e) {
- LOG.error(e);
+ count++;
+
+ // retry 10 times.
+ if (count > 9) {
+ throw new RuntimeException("can't create root node.");
+ }
}
registerTask(jobId, hostAddress, port, taskId);
}
diff --git a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
index 62663fd..6e73521 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
@@ -80,7 +80,6 @@
private String hostname;
private int clientPort;
private FileSystem fs;
- private static int id = 0;
private volatile long superstep;
private Counters globalCounter = new Counters();
@@ -186,6 +185,7 @@
LogManager.shutdown();
ExitUtil.terminate(1, t);
} finally {
+ LOG.info("Stop SyncServer and RPCServer.");
appMaster.close();
}
@@ -491,6 +491,7 @@
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
+
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
LOG.info("Launching shell command on a new container."
@@ -502,10 +503,8 @@
+ allocatedContainer.getResource().getMemory()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
- // + ", containerToken"
- // +allocatedContainer.getContainerToken().getIdentifier().toString());
- Thread launchThread = createLaunchContainerThread(allocatedContainer);
+ Thread launchThread = createLaunchContainerThread(allocatedContainer, allocatedContainer.getId().getContainerId());
// launch and start the container on a separate thread to keep
// the main thread unblocked
@@ -513,7 +512,6 @@
launchThreads.add(launchThread);
launchedContainers.add(allocatedContainer.getId());
launchThread.start();
- id++;
}
}
@@ -621,15 +619,19 @@
Configuration conf;
+ long taskAttemptId;
+
/**
* @param lcontainer Allocated container
* @param containerListener Callback handler of the container
*/
public LaunchContainerRunnable(
- Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
+ Container lcontainer, NMCallbackHandler containerListener,
+ Configuration conf, long taskAttemptId) {
this.container = lcontainer;
this.containerListener = containerListener;
this.conf = conf;
+ this.taskAttemptId = taskAttemptId;
}
/**
@@ -725,7 +727,7 @@
vargs.add(BSPRunner.class.getCanonicalName());
vargs.add(jobId.getJtIdentifier());
- vargs.add(Integer.toString(id));
+ vargs.add(Long.toString(taskAttemptId));
vargs.add(
new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
.toString());
@@ -805,7 +807,6 @@
"ApplicationAttemptId not set in the environment");
}
- LOG.info("app attempt id!!!");
ContainerId containerId = ConverterUtils.toContainerId(envs
.get(ApplicationConstants.Environment.CONTAINER_ID.name()));
return containerId.getApplicationAttemptId();
@@ -930,9 +931,9 @@
}
@VisibleForTesting
- Thread createLaunchContainerThread(Container allocatedContainer) {
+ Thread createLaunchContainerThread(Container allocatedContainer, long taskAttemptId) {
LaunchContainerRunnable runnableLaunchContainer =
- new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
+ new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf, taskAttemptId);
return new Thread(runnableLaunchContainer);
}
@@ -1001,6 +1002,7 @@
public void close() throws IOException {
this.clientServer.stop();
this.taskServer.stop();
+ this.syncServer.stopServer();
}
@Override