[HAMA-988] Allow to add additional no-input tasks (edwardyoon)
diff --git a/CHANGES.txt b/CHANGES.txt
index d297c4f..56600ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@
 Release 0.7.2 (unreleased changes)
 
   NEW FEATURES
+ 
+    HAMA-988: Allow to add additional no-input tasks (edwardyoon)
 
   BUG FIXES
 
diff --git a/core/src/main/java/org/apache/hama/Constants.java b/core/src/main/java/org/apache/hama/Constants.java
index c3563f9..351fb05 100644
--- a/core/src/main/java/org/apache/hama/Constants.java
+++ b/core/src/main/java/org/apache/hama/Constants.java
@@ -132,6 +132,8 @@
 
   // If true, framework launches the number of tasks by user settings.
   public static final String FORCE_SET_BSP_TASKS = "hama.force.set.bsp.tasks";
+  // framework launches additional tasks to the number of input splits
+  public static final String ADDITIONAL_BSP_TASKS = "hama.additional.bsp.tasks";
   
   // /////////////////////////////////////
   // Constants for ZooKeeper
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJob.java b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
index 293e6a6..8489f83 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
@@ -189,6 +189,11 @@
     return info.progress();
   }
 
+  public Counters getCounters() throws IOException {
+    ensureState(JobState.RUNNING);
+   return info.getCounters();
+  }
+  
   public boolean isComplete() throws IOException {
     ensureState(JobState.RUNNING);
     return info.isComplete();
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index 4ca7b61..c1ce0f4 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -219,6 +219,11 @@
     public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
       return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
     }
+
+    @Override
+    public Counters getCounters() {
+      return status.getCounter();
+    }
   }
 
   public BSPJobClient(Configuration conf) throws IOException {
@@ -363,17 +368,22 @@
         splits = job.getInputFormat().getSplits(job, maxTasks);
       }
 
-      if (maxTasks < splits.length) {
+      // the number of additional tasks to the number of input splits
+      int additionalTasks = job.getConfiguration().getInt(
+          Constants.ADDITIONAL_BSP_TASKS, 0);
+      if (maxTasks < splits.length + additionalTasks) {
         throw new IOException(
             "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
-                + splits.length + ", The number of max tasks: " + maxTasks);
+                + splits.length
+                + ", The number of additional tasks: "
+                + +additionalTasks + ", The number of max tasks: " + maxTasks);
       }
 
       int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
       if (numOfSplits > configured
           || !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
               false)) {
-        job.setNumBspTask(numOfSplits);
+        job.setNumBspTask(numOfSplits + additionalTasks);
       }
 
       job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -583,8 +593,9 @@
         // set partitionID to rawSplit
         if (split.getClass().getName().equals(FileSplit.class.getName())
             && job.getBoolean("input.has.partitioned", false)) {
-          String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
-          if(extractPartitionID.length > 1)
+          String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+              .split("[-]");
+          if (extractPartitionID.length > 1)
             rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
         }
 
diff --git a/core/src/main/java/org/apache/hama/bsp/RunningJob.java b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
index 95faba5..dcbabdd 100644
--- a/core/src/main/java/org/apache/hama/bsp/RunningJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
@@ -53,6 +53,8 @@
    */
   public String getJobFile();
 
+  public Counters getCounters();
+  
   /**
    * Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
    * When all bsp tasks have completed, the function returns 1.0.
diff --git a/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
new file mode 100644
index 0000000..e50e4f9
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.util.KeyValuePair;
+
+public class TestAdditionalTasks extends HamaCluster {
+
+  public static final Log LOG = LogFactory.getLog(TestAdditionalTasks.class);
+
+  protected HamaConfiguration configuration;
+
+  // these variables are preventing from rebooting the whole stuff again since
+  // setup and teardown are called per method.
+
+  public TestAdditionalTasks() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.peer.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testAdditionalTasks() throws Exception {
+
+    Configuration conf = new Configuration();
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setBspClass(TestBSP.class);
+    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    bsp.setInputFormat(TextInputFormat.class);
+    bsp.setOutputFormat(NullOutputFormat.class);
+    bsp.setInputPath(new Path("../CHANGES.txt"));
+
+    bsp.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+    bsp.setNumBspTask(2);
+
+    assertTrue(bsp.waitForCompletion(true));
+    Counters counter = bsp.getCounters();
+    assertTrue(2 == counter.getCounter(JobInProgress.JobCounter.LAUNCHED_TASKS));
+  }
+
+  public static class TestBSP extends
+      BSP<LongWritable, Text, NullWritable, NullWritable, NullWritable> {
+
+    @Override
+    public void bsp(
+        BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException, SyncException, InterruptedException {
+      long numOfPairs = 0;
+      KeyValuePair<LongWritable, Text> readNext = null;
+      while ((readNext = peer.readNext()) != null) {
+        LOG.debug(readNext.getKey().get() + " / "
+            + readNext.getValue().toString());
+        numOfPairs++;
+      }
+
+      assertTrue(numOfPairs > 2 || numOfPairs == 0);
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
index 4d23384..2c2cb23 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
@@ -123,6 +123,12 @@
     public String getJobFile() {
       return null;
     }
+
+    @Override
+    public Counters getCounters() {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   public YARNBSPJobClient(HamaConfiguration conf) {