HAMA-939: Refactoring which was implement using out-of-date status response

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1684349 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 1f7a36d..f7bed47 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -83,4 +83,3 @@
 #log4j.logger.org.apache.hadoop.dfs=DEBUG
 #log4j.logger.org.apache.hama=DEBUG
 #log4j.logger.org.apache.zookeeper=DEBUG
-#log4j.logger.org.apache.avro=DEBUG
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
index f147be2..8eecbbf 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
@@ -237,16 +237,26 @@
   @Override
   public void register(BSPJobID jobId, TaskAttemptID taskId,
       String hostAddress, long port) {
-    try {
-      String jobRegisterKey = constructKey(jobId, "peers");
-      if (zk.exists(jobRegisterKey, false) == null) {
+    int count = 0;
+    String jobRegisterKey = constructKey(jobId, "peers");
+    Stat stat = null;
+
+    LOG.info("TaskAttemptID : " + taskId);
+    while (stat != null) {
+      try {
+        stat = zk.exists(jobRegisterKey, false);
         zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT);
+        Thread.sleep(1000);
+      } catch (Exception e) {
+        LOG.debug(e); // ignore it.
       }
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
+      count++;
+
+      // retry 10 times.
+      if (count > 9) {
+        throw new RuntimeException("can't create root node.");
+      }
     }
     registerTask(jobId, hostAddress, port, taskId);
   }
diff --git a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
index 62663fd..6e73521 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
@@ -80,7 +80,6 @@
   private String hostname;
   private int clientPort;
   private FileSystem fs;
-  private static int id = 0;
 
   private volatile long superstep;
   private Counters globalCounter = new Counters();
@@ -186,6 +185,7 @@
       LogManager.shutdown();
       ExitUtil.terminate(1, t);
     } finally {
+      LOG.info("Stop SyncServer and RPCServer.");
       appMaster.close();
     }
     
@@ -491,6 +491,7 @@
     public void onContainersAllocated(List<Container> allocatedContainers) {
       LOG.info("Got response from RM for container ask, allocatedCnt="
           + allocatedContainers.size());
+
       numAllocatedContainers.addAndGet(allocatedContainers.size());
       for (Container allocatedContainer : allocatedContainers) {
         LOG.info("Launching shell command on a new container."
@@ -502,10 +503,8 @@
             + allocatedContainer.getResource().getMemory()
             + ", containerResourceVirtualCores"
             + allocatedContainer.getResource().getVirtualCores());
-        // + ", containerToken"
-        // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
-        Thread launchThread = createLaunchContainerThread(allocatedContainer);
+        Thread launchThread = createLaunchContainerThread(allocatedContainer, allocatedContainer.getId().getContainerId());
 
         // launch and start the container on a separate thread to keep
         // the main thread unblocked
@@ -513,7 +512,6 @@
         launchThreads.add(launchThread);
         launchedContainers.add(allocatedContainer.getId());
         launchThread.start();
-        id++;
       }
     }
 
@@ -621,15 +619,19 @@
 
     Configuration conf;
 
+    long taskAttemptId;
+
     /**
      * @param lcontainer        Allocated container
      * @param containerListener Callback handler of the container
      */
     public LaunchContainerRunnable(
-        Container lcontainer, NMCallbackHandler containerListener, Configuration conf) {
+        Container lcontainer, NMCallbackHandler containerListener,
+        Configuration conf, long taskAttemptId) {
       this.container = lcontainer;
       this.containerListener = containerListener;
       this.conf = conf;
+      this.taskAttemptId = taskAttemptId;
     }
 
     /**
@@ -725,7 +727,7 @@
       vargs.add(BSPRunner.class.getCanonicalName());
 
       vargs.add(jobId.getJtIdentifier());
-      vargs.add(Integer.toString(id));
+      vargs.add(Long.toString(taskAttemptId));
       vargs.add(
           new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
               .toString());
@@ -805,7 +807,6 @@
           "ApplicationAttemptId not set in the environment");
     }
 
-    LOG.info("app attempt id!!!");
     ContainerId containerId = ConverterUtils.toContainerId(envs
         .get(ApplicationConstants.Environment.CONTAINER_ID.name()));
     return containerId.getApplicationAttemptId();
@@ -930,9 +931,9 @@
   }
 
   @VisibleForTesting
-  Thread createLaunchContainerThread(Container allocatedContainer) {
+  Thread createLaunchContainerThread(Container allocatedContainer, long taskAttemptId) {
     LaunchContainerRunnable runnableLaunchContainer =
-        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf);
+        new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf, taskAttemptId);
     return new Thread(runnableLaunchContainer);
   }
 
@@ -1001,6 +1002,7 @@
   public void close() throws IOException {
     this.clientServer.stop();
     this.taskServer.stop();
+    this.syncServer.stopServer();
   }
 
   @Override