HAMA-963. Fix the problem that occurs ArrayIndexOutOfBoundsException

git-svn-id: https://svn.apache.org/repos/asf/hama/trunk@1695219 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
index 8eecbbf..bc63f4f 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
@@ -241,7 +241,6 @@
     String jobRegisterKey = constructKey(jobId, "peers");
     Stat stat = null;
 
-    LOG.info("TaskAttemptID : " + taskId);
     while (stat != null) {
       try {
         stat = zk.exists(jobRegisterKey, false);
diff --git a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
index 8ef43ee..5ee9781 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
@@ -85,7 +85,10 @@
   private Counters globalCounter = new Counters();
   private BSPJobClient.RawSplit[] splits;
 
+  // Hama job id
   private BSPJobID jobId;
+  // Partiion id
+  private static AtomicInteger ai = new AtomicInteger(-1);
 
   // SyncServer for Zookeeper
   private SyncServer syncServer;
@@ -723,7 +726,7 @@
       vargs.add(BSPRunner.class.getCanonicalName());
 
       vargs.add(jobId.getJtIdentifier());
-      vargs.add(Long.toString(container.getId().getContainerId()));
+      vargs.add(Integer.toString(ai.incrementAndGet()));
       vargs.add(new Path(jobFile).makeQualified(fs.getUri(),
           fs.getWorkingDirectory()).toString());
 
@@ -945,7 +948,6 @@
   public Task getTask(TaskAttemptID taskid) throws IOException {
     BSPJobClient.RawSplit assignedSplit = null;
     String splitName = NullInputFormat.NullInputSplit.class.getName();
-    // String splitName = NullInputSplit.class.getCanonicalName();
     if (splits != null) {
       assignedSplit = splits[taskid.id];
       splitName = assignedSplit.getClassName();
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
index 3263fef..d5db98d 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
@@ -74,6 +74,7 @@
   public void submit() throws IOException, InterruptedException {
     // If Constants.MAX_TASKS_PER_JOB is null, calculates the max tasks based on resource status.
     this.getConfiguration().setInt(Constants.MAX_TASKS_PER_JOB, getMaxTasks());
+
     LOG.debug("MaxTasks: " + this.getConfiguration().get(Constants.MAX_TASKS_PER_JOB));
     
     RunningJob submitJobInternal = submitClient.submitJobInternal(this,