Merge remote-tracking branch 'apache/master'
diff --git a/CHANGES.txt b/CHANGES.txt
index c7e3a9b..56600ee 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,11 +1,28 @@
 Hama Change Log
 
-Release 0.7.1 (unreleased changes)
+Release 0.7.2 (unreleased changes)
 
   NEW FEATURES
+ 
+    HAMA-988: Allow to add additional no-input tasks (edwardyoon)
 
   BUG FIXES
 
+    HAMA-986: There is a missing value when calculating hashcode of AsyncClient (JongYoon Lim via edwardyoon)
+
+  IMPROVEMENTS
+
+    HAMA-978: Netty native transport (JongYoon Lim via edwardyoon)
+
+Release 0.7.1 - March 14, 2016
+
+  NEW FEATURES
+
+    HAMA-900: Added a new strategy to schedule tasks based on round robin scheme (Behroz Sikander via edwardyoon)
+
+  BUG FIXES
+
+    HAMA-982: Vertex.read/writeState() method throws NullPointerException (edwardyoon)
     HAMA-965: Infinite loop because of recursive function call (JongYoon Lim via edwardyoon)
     HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
 
diff --git a/README.md b/README.md
index fea4d49..b686f45 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
 # Apache Hama
 
-Apache Hama is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model, which was established in 2012 as a Top-Level Project of The Apache Software Foundation.
+<img src="http://hama.apache.org/images/hama_paint_logo.png" width="130" align="left"> Apache Hama is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model, which was established in 2012 as a Top-Level Project of The Apache Software Foundation.
 
 It provides not only pure BSP programming model but also SQL-like query interface (Apache MRQL) and vertex/neuron centric programming models, inspired by Google's Pregel and DistBelief (Apache Horn). For the latest information about Hama, please visit our website at: <https://hama.apache.org/> and our wiki at: <https://wiki.apache.org/hama/>
 
diff --git a/c++/pom.xml b/c++/pom.xml
index 358db37..07ecff0 100644
--- a/c++/pom.xml
+++ b/c++/pom.xml
@@ -20,13 +20,13 @@
     <parent>
       <groupId>org.apache.hama</groupId>
       <artifactId>hama-parent</artifactId>
-      <version>0.7.1-SNAPSHOT</version>
+      <version>0.7.2-SNAPSHOT</version>
     </parent>
     
     <modelVersion>4.0.0</modelVersion>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-pipes</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
     <name>pipes</name>
     <description>Apache Hama Pipes</description>
     <packaging>pom</packaging>
diff --git a/commons/pom.xml b/commons/pom.xml
index 8625c2f..3277336 100644
--- a/commons/pom.xml
+++ b/commons/pom.xml
@@ -20,14 +20,14 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-commons</artifactId>
   <name>commons</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/core/pom.xml b/core/pom.xml
index 2adb920..1305efb 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -20,14 +20,14 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-core</artifactId>
   <name>core</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/core/src/main/java/org/apache/hama/Constants.java b/core/src/main/java/org/apache/hama/Constants.java
index c3563f9..351fb05 100644
--- a/core/src/main/java/org/apache/hama/Constants.java
+++ b/core/src/main/java/org/apache/hama/Constants.java
@@ -132,6 +132,8 @@
 
   // If true, framework launches the number of tasks by user settings.
   public static final String FORCE_SET_BSP_TASKS = "hama.force.set.bsp.tasks";
+  // framework launches additional tasks to the number of input splits
+  public static final String ADDITIONAL_BSP_TASKS = "hama.additional.bsp.tasks";
   
   // /////////////////////////////////////
   // Constants for ZooKeeper
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJob.java b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
index 293e6a6..8489f83 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
@@ -189,6 +189,11 @@
     return info.progress();
   }
 
+  public Counters getCounters() throws IOException {
+    ensureState(JobState.RUNNING);
+   return info.getCounters();
+  }
+  
   public boolean isComplete() throws IOException {
     ensureState(JobState.RUNNING);
     return info.isComplete();
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index 4ca7b61..c1ce0f4 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -219,6 +219,11 @@
     public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
       return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
     }
+
+    @Override
+    public Counters getCounters() {
+      return status.getCounter();
+    }
   }
 
   public BSPJobClient(Configuration conf) throws IOException {
@@ -363,17 +368,22 @@
         splits = job.getInputFormat().getSplits(job, maxTasks);
       }
 
-      if (maxTasks < splits.length) {
+      // the number of additional tasks to the number of input splits
+      int additionalTasks = job.getConfiguration().getInt(
+          Constants.ADDITIONAL_BSP_TASKS, 0);
+      if (maxTasks < splits.length + additionalTasks) {
         throw new IOException(
             "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
-                + splits.length + ", The number of max tasks: " + maxTasks);
+                + splits.length
+                + ", The number of additional tasks: "
+                + +additionalTasks + ", The number of max tasks: " + maxTasks);
       }
 
       int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
       if (numOfSplits > configured
           || !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
               false)) {
-        job.setNumBspTask(numOfSplits);
+        job.setNumBspTask(numOfSplits + additionalTasks);
       }
 
       job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -583,8 +593,9 @@
         // set partitionID to rawSplit
         if (split.getClass().getName().equals(FileSplit.class.getName())
             && job.getBoolean("input.has.partitioned", false)) {
-          String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
-          if(extractPartitionID.length > 1)
+          String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+              .split("[-]");
+          if (extractPartitionID.length > 1)
             rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
         }
 
diff --git a/core/src/main/java/org/apache/hama/bsp/RunningJob.java b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
index 95faba5..dcbabdd 100644
--- a/core/src/main/java/org/apache/hama/bsp/RunningJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
@@ -53,6 +53,8 @@
    */
   public String getJobFile();
 
+  public Counters getCounters();
+  
   /**
    * Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
    * When all bsp tasks have completed, the function returns 1.0.
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
new file mode 100644
index 0000000..0bf060b
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RoundRobinTaskAllocator.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+
+/**
+ * <code>RoundRobinTaskAllocator</code> is a round robin based task allocator that equally
+ * divides the tasks among all the Grooms. It balances the load of cluster. For example
+ * if a cluster has 10 Grooms and 20 tasks are to be scheduled then each Groom which
+ * get 2 tasks.
+ */
+public class RoundRobinTaskAllocator implements TaskAllocationStrategy {
+
+  Log LOG = LogFactory.getLog(RoundRobinTaskAllocator.class);
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  /**
+   * This function loops through the whole list of Grooms with their task count
+   * and returns the first Groom which contains the minimum number of tasks.
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @return returns the groom name which should be allocated the next task or 
+   *          null no suitable groom was found.
+   */
+  private String findGroomWithMinimumTasks(
+      Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap) {
+    
+    Entry<GroomServerStatus, Integer> firstGroomWithMinimumTasks = null;
+    
+    // At the start taskCountInGroomMap is empty so we have to put 0 tasks on grooms
+    if (taskCountInGroomMap.size() < groomStatuses.size()) {
+      for (String s : groomStatuses.keySet()) {
+        GroomServerStatus groom = groomStatuses.get(s);
+        if (groom == null)
+          continue;
+        Integer taskInGroom = taskCountInGroomMap.get(groom);
+        
+        // Find the groom that is yet to get its first tasks and assign 0 value to it.
+        // Having zero will make sure that it gets selected.
+        if (taskInGroom == null) {
+          taskCountInGroomMap.put(groom, 0);
+          break;
+        }
+      }
+    }
+    
+    for (Entry<GroomServerStatus, Integer> currentGroom : taskCountInGroomMap.entrySet()) {
+      if (firstGroomWithMinimumTasks == null || firstGroomWithMinimumTasks.getValue() > currentGroom.getValue()) {
+        if(currentGroom.getValue() < currentGroom.getKey().getMaxTasks()) { // Assign the task to groom which still has space for more tasks 
+          firstGroomWithMinimumTasks = currentGroom;
+        } // If there is no space then continue and find the next best groom
+      }
+    }
+    
+    return (firstGroomWithMinimumTasks == null) ? null
+        : firstGroomWithMinimumTasks.getKey().getGroomHostName();
+  }
+  
+  /**
+   * Select grooms that has the block of data locally stored on the groom
+   * server.
+   */
+  @Override
+  public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot start task based on id");
+      }
+      return new String[0];
+    }
+
+    RawSplit rawSplit = taskInProgress.getFileSplit();
+    if (rawSplit != null) {
+      return rawSplit.getLocations();
+    }
+    return null;
+  }
+
+  @Override
+  public GroomServerStatus getGroomToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exceeded allowed attempts.");
+      }
+      return null;
+    }    
+
+    String groomName = null;
+
+    groomName = findGroomWithMinimumTasks(groomStatuses, taskCountInGroomMap);
+
+    if (groomName != null) {
+      return groomStatuses.get(groomName);
+    }
+    
+    return null;
+  }
+
+  /**
+   * This operation is not supported.
+   */
+  @Override
+  public Set<GroomServerStatus> getGroomsToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    throw new UnsupportedOperationException(
+        "This API is not supported for the called API function call.");
+  }
+}
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
index ba0266a..8bb3913 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
@@ -20,24 +20,29 @@
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.*;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
 
+import javax.net.SocketFactory;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.ConnectException;
@@ -51,20 +56,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.net.SocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.util.BSPNetUtils;
-
 /**
  * A client for an IPC service using netty. IPC calls take a single
  * {@link Writable} as a parameter, and return a {@link Writable} as their
@@ -186,7 +177,7 @@
      * @throws IOException
      */
     public Connection(ConnectionId remoteId) throws IOException {
-      group = new NioEventLoopGroup();
+      group = new EpollEventLoopGroup();
       bootstrap = new Bootstrap();
       this.remoteId = remoteId;
       this.serverAddress = remoteId.getAddress();
@@ -280,12 +271,12 @@
           }
 
           // Configure the client.
-          // NioEventLoopGroup is a multithreaded event loop that handles I/O
+          // EpollEventLoopGroup is a multithreaded event loop that handles I/O
           // operation
-          group = new NioEventLoopGroup();
+          group = new EpollEventLoopGroup();
           // Bootstrap is a helper class that sets up a client
           bootstrap = new Bootstrap();
-          bootstrap.group(group).channel(NioSocketChannel.class)
+          bootstrap.group(group).channel(EpollSocketChannel.class)
               .option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
               .option(ChannelOption.SO_KEEPALIVE, true)
               .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)
@@ -1127,7 +1118,7 @@
       result = PRIME * result + maxIdleTime;
       result = PRIME * result + pingInterval;
       result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
-      result = PRIME * rpcTimeout;
+      result = PRIME * result + rpcTimeout;
       result = PRIME * result
           + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
       result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
diff --git a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
index 93627aa..67ad5d0 100644
--- a/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
+++ b/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
@@ -20,12 +20,12 @@
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.*;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,7 +48,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.*;
-import java.util.concurrent.Future;
 
 /**
  * An abstract IPC service using netty. IPC calls take a single {@link Writable}
@@ -82,8 +81,8 @@
   private int port; // port we listen on
   private Class<? extends Writable> paramClass; // class of call parameters
   // Configure the server.(constructor is thread num)
-  private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
-  private EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
+  private EventLoopGroup workerGroup = new EpollEventLoopGroup();
   private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();
   private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
 
@@ -192,7 +191,7 @@
       // ServerBootstrap is a helper class that sets up a server
       ServerBootstrap b = new ServerBootstrap();
       b.group(bossGroup, workerGroup)
-          .channel(NioServerSocketChannel.class)
+          .channel(EpollServerSocketChannel.class)
           .option(ChannelOption.SO_BACKLOG, backlogLength)
           .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
           .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
diff --git a/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
new file mode 100644
index 0000000..e50e4f9
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.util.KeyValuePair;
+
+public class TestAdditionalTasks extends HamaCluster {
+
+  public static final Log LOG = LogFactory.getLog(TestAdditionalTasks.class);
+
+  protected HamaConfiguration configuration;
+
+  // these variables are preventing from rebooting the whole stuff again since
+  // setup and teardown are called per method.
+
+  public TestAdditionalTasks() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.peer.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testAdditionalTasks() throws Exception {
+
+    Configuration conf = new Configuration();
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setBspClass(TestBSP.class);
+    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    bsp.setInputFormat(TextInputFormat.class);
+    bsp.setOutputFormat(NullOutputFormat.class);
+    bsp.setInputPath(new Path("../CHANGES.txt"));
+
+    bsp.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+    bsp.setNumBspTask(2);
+
+    assertTrue(bsp.waitForCompletion(true));
+    Counters counter = bsp.getCounters();
+    assertTrue(2 == counter.getCounter(JobInProgress.JobCounter.LAUNCHED_TASKS));
+  }
+
+  public static class TestBSP extends
+      BSP<LongWritable, Text, NullWritable, NullWritable, NullWritable> {
+
+    @Override
+    public void bsp(
+        BSPPeer<LongWritable, Text, NullWritable, NullWritable, NullWritable> peer)
+        throws IOException, SyncException, InterruptedException {
+      long numOfPairs = 0;
+      KeyValuePair<LongWritable, Text> readNext = null;
+      while ((readNext = peer.readNext()) != null) {
+        LOG.debug(readNext.getKey().get() + " / "
+            + readNext.getValue().toString());
+        numOfPairs++;
+      }
+
+      assertTrue(numOfPairs > 2 || numOfPairs == 0);
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
new file mode 100644
index 0000000..e471fe0
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocationRoundRobin.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.RoundRobinTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocationRoundRobin extends TestCase {
+
+  public static final Log LOG = LogFactory
+      .getLog(TestTaskAllocationRoundRobin.class);
+
+  Configuration conf = new Configuration();
+  Map<String, GroomServerStatus> groomStatuses;
+  Map<GroomServerStatus, Integer> taskCountInGroomMap;
+  BSPResource[] resources;
+  TaskInProgress taskInProgress;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+
+    String[] locations = new String[] { "host6", "host4", "host3" };
+    String value = "data";
+    RawSplit split = new RawSplit();
+    split.setLocations(locations);
+    split.setBytes(value.getBytes(), 0, value.getBytes().length);
+    split.setDataLength(value.getBytes().length);
+
+    assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+    taskCountInGroomMap = new LinkedHashMap<GroomServerStatus, Integer>(10);
+    resources = new BSPResource[0];
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+    taskInProgress = new TaskInProgress(job.getJobID(), "job.xml", split, conf,
+        jobProgress, 1);
+
+    groomStatuses = new LinkedHashMap<String, GroomServerStatus>(10);
+
+    for (int i = 0; i < 10; ++i) {
+      String name = "host" + i;
+
+      GroomServerStatus status = new GroomServerStatus(name,
+          new ArrayList<TaskStatus>(), 0, 3, "", name);
+      groomStatuses.put(name, status);
+      taskCountInGroomMap.put(status, 0);
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  /**
+   * This test simulates the allocation of 30 tasks in round robin fashion
+   * on 10 Grooms.
+   */
+  @Test
+  public void testRoundRobinAllocation() {
+    TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+        .getClass("", RoundRobinTaskAllocator.class,
+            TaskAllocationStrategy.class), conf);
+
+    for (int i = 0; i < 30; i++) {
+      GroomServerStatus groomStatus = strategy.getGroomToAllocate(
+          groomStatuses, null, taskCountInGroomMap, resources, taskInProgress);
+      if (groomStatus != null) {
+        taskCountInGroomMap.put(groomStatus,
+            taskCountInGroomMap.get(groomStatus) + 1); // Increment the total tasks in it
+        
+        assertEquals("", "host" + (i % 10), groomStatus.getGroomHostName()); 
+      }
+    }
+  }
+
+  @Test
+  public void testRoundRobinDataLocality() throws Exception {
+
+    TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+        .getClass("", RoundRobinTaskAllocator.class,
+            TaskAllocationStrategy.class), conf);
+
+    String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+        resources, taskInProgress);
+
+    List<String> list = new ArrayList<String>();
+
+    for (int i = 0; i < hosts.length; ++i) {
+      list.add(hosts[i]);
+    }
+
+    assertTrue(list.contains("host6"));
+    assertTrue(list.contains("host3"));
+    assertTrue(list.contains("host4"));
+  }
+
+}
diff --git a/dist/pom.xml b/dist/pom.xml
index f8cae37..a4b70cc 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -20,7 +20,7 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
   
   <artifactId>hama-dist</artifactId>
diff --git a/examples/pom.xml b/examples/pom.xml
index 49b7227..c5fc1aa 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -20,14 +20,14 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-examples</artifactId>
   <name>examples</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java b/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java
new file mode 100644
index 0000000..f753ffb
--- /dev/null
+++ b/examples/src/test/java/org/apache/hama/examples/CustomVertexReadWriteStateTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.Vertex;
+
+public class CustomVertexReadWriteStateTest extends TestCase {
+  static int initialState = 2;
+  static int changedState = 4;
+
+  public static class TestVertex extends Vertex<Text, IntWritable, IntWritable> {
+
+    private static ArrayWritable test = new ArrayWritable(IntWritable.class);
+
+    @Override
+    public void setup(HamaConfiguration conf) {
+      // Sets the initial state
+      test.set(new Writable[] { new IntWritable(initialState) });
+    }
+
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+      if (this.getSuperstepCount() == 3) {
+        // change the state
+        test.set(new Writable[] { new IntWritable(changedState) });
+      }
+
+      if (this.getSuperstepCount() < 3) {
+        assertEquals(initialState, ((IntWritable) test.get()[0]).get());
+      } else {
+        assertEquals(changedState, ((IntWritable) test.get()[0]).get());
+      }
+    }
+
+    public void readState(DataInput in) throws IOException {
+      test.readFields(in);
+    }
+
+    public void writeState(DataOutput out) throws IOException {
+      test.write(out);
+    }
+  }
+
+}
diff --git a/examples/src/test/java/org/apache/hama/examples/SSSPTest.java b/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
index 3d7f03f..116abbb 100644
--- a/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
+++ b/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
@@ -31,7 +31,16 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.NullOutputFormat;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.examples.CustomVertexReadWriteStateTest.TestVertex;
+import org.apache.hama.examples.SSSP.SSSPTextReader;
+import org.apache.hama.graph.GraphJob;
 import org.junit.Test;
 
 /**
@@ -61,18 +70,51 @@
   protected void setUp() throws Exception {
     super.setUp();
     fs = FileSystem.get(conf);
+    generateTestData();
+  }
+  
+  protected void tearDown() throws Exception {
+    deleteTempDirs();
+    fs.close();
   }
 
   @Test
   public void testShortestPaths() throws IOException, InterruptedException,
       ClassNotFoundException, InstantiationException, IllegalAccessException {
 
-    generateTestData();
-    try {
-      SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
-      verifyResult();
-    } finally {
-      deleteTempDirs();
+    SSSP.main(new String[] { "0", INPUT, OUTPUT, "3" });
+    verifyResult();
+  }
+
+  @Test
+  public void testCustomReadWriteState() throws IOException,
+      InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
+
+    HamaConfiguration conf = new HamaConfiguration();
+    GraphJob job = new GraphJob(conf, CustomVertexReadWriteStateTest.class);
+    // Set the job name
+    job.setJobName("test custom read/write state");
+    job.setInputPath(new Path(INPUT));
+    job.setNumBspTask(1);
+    job.setVertexClass(TestVertex.class);
+    job.setInputFormat(TextInputFormat.class);
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(Text.class);
+
+    job.setPartitioner(HashPartitioner.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setVertexInputReaderClass(SSSPTextReader.class);
+    // Iterate until all the nodes have been reached.
+    job.setMaxIteration(6);
+    job.setVertexIDClass(Text.class);
+    job.setVertexValueClass(IntWritable.class);
+    job.setEdgeValueClass(IntWritable.class);
+
+    long startTime = System.currentTimeMillis();
+    if (job.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
   }
 
diff --git a/graph/pom.xml b/graph/pom.xml
index 14fd3ef..d56fe57 100644
--- a/graph/pom.xml
+++ b/graph/pom.xml
@@ -20,14 +20,14 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-graph</artifactId>
   <name>graph</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java
index bf90187..cdbf6b5 100644
--- a/graph/src/main/java/org/apache/hama/graph/Vertex.java
+++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hama.graph;
 
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -271,7 +273,11 @@
       }
     }
     votedToHalt = in.readBoolean();
-    readState(in);
+
+    boolean hasMoreContents = in.readBoolean();
+    if (hasMoreContents) {
+      readState(in);
+    }
   }
 
   @Override
@@ -308,8 +314,24 @@
       }
     }
     out.writeBoolean(votedToHalt);
-    writeState(out);
 
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput customOut = new DataOutputStream(baos);
+    boolean hasMoreContents = true;
+    try {
+      writeState(customOut);
+    } catch (NullPointerException e) {
+      // do nothing
+    }
+
+    // if all states are null, set hasContents to false.
+    if (baos.size() == 0) {
+      hasMoreContents = false;
+    }
+
+    out.writeBoolean(hasMoreContents);
+    if (hasMoreContents)
+      out.write(baos.toByteArray());
   }
 
   // compare across the vertex ID
diff --git a/mesos/pom.xml b/mesos/pom.xml
index c0af443..00bafdb 100644
--- a/mesos/pom.xml
+++ b/mesos/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
diff --git a/ml/pom.xml b/ml/pom.xml
index 6701df0..b6cf3cf 100644
--- a/ml/pom.xml
+++ b/ml/pom.xml
@@ -20,14 +20,14 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-ml</artifactId>
   <name>machine learning</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/pom.xml b/pom.xml
index 7ed4d99..51677fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,14 +20,14 @@
   <parent>
     <groupId>org.apache</groupId>
     <artifactId>apache</artifactId>
-    <version>8</version>
+    <version>10</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-parent</artifactId>
   <name>Apache Hama parent POM</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <url>http://hama.apache.org</url>
   <packaging>pom</packaging>
   <prerequisites>
@@ -197,7 +197,7 @@
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-auth</artifactId>
+          <artifactId>hadoop-aws</artifactId>
           <version>${hadoop.version}</version>
         </dependency>
         <dependency>
@@ -390,10 +390,17 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-release-plugin</artifactId>
-        <version>2.2</version>
+        <version>2.5.3</version>
         <configuration>
           <mavenExecutorId>forked-path</mavenExecutorId>
         </configuration>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.maven.scm</groupId>
+            <artifactId>maven-scm-provider-gitexe</artifactId>
+            <version>1.8.1</version>
+          </dependency>
+        </dependencies>
       </plugin>
       <plugin>
         <groupId>org.apache.rat</groupId>
diff --git a/src/site/xdoc/index.xml b/src/site/xdoc/index.xml
index a75a846..1602680 100644
--- a/src/site/xdoc/index.xml
+++ b/src/site/xdoc/index.xml
@@ -24,7 +24,7 @@
 
     <section name=""></section>
     <p>
-    <div style="float:left;margin-right:15px;"><img src="./images/hama_paint_logo.png" style="width:130px" alt="" /></div>
+    <div style="float:left;margin-right:15px;margin-bottom: 10px;"><img src="./images/hama_paint_logo.png" style="width:120px" alt="" /></div>
      Apache Hama<sup>TM</sup> is a framework for Big Data analytics which uses the Bulk Synchronous Parallel (BSP) computing model, 
     which was established in 2012 as a Top-Level Project of The Apache Software Foundation. 
     <br/><br/>It provides not only pure BSP programming model 
@@ -49,8 +49,8 @@
 
     <h3 align="center">Recent News</h3>
       <ul>
+        <li>Jan 28, 2016: Behroz Sikander was added as a committer and PMC</li>
         <li>Jun 14, 2015: release 0.7.0 available [<a href="downloads.html">downloads</a>]</li>
-        <li>Jun 11, 2015: Minho Kim was added as a committer.</li>
       </ul>
     </div>
       
diff --git a/src/site/xdoc/team-list.xml b/src/site/xdoc/team-list.xml
index 57b6755..d4bc378 100644
--- a/src/site/xdoc/team-list.xml
+++ b/src/site/xdoc/team-list.xml
@@ -65,6 +65,12 @@
           <td align="center">committer</td>
         </tr>
        <tr valign="top">
+          <td align="center">bsikander</td>
+          <td align="center">Behroz Sikander</td>
+          <td align="center">Technical University of Munich</td>
+          <td align="center">PMC member, committer</td>
+        </tr>
+       <tr valign="top">
           <td align="center">bsmin</td>
           <td align="center">Byungseok Min</td>
           <td align="center">LG CNS</td>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index dede15f..7129321 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -19,14 +19,14 @@
   <parent>
     <groupId>org.apache.hama</groupId>
     <artifactId>hama-parent</artifactId>
-    <version>0.7.1-SNAPSHOT</version>
+    <version>0.7.2-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-yarn</artifactId>
   <name>yarn</name>
-  <version>0.7.1-SNAPSHOT</version>
+  <version>0.7.2-SNAPSHOT</version>
   <packaging>jar</packaging>
 
   <dependencies>
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
index 4d23384..2c2cb23 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
@@ -123,6 +123,12 @@
     public String getJobFile() {
       return null;
     }
+
+    @Override
+    public Counters getCounters() {
+      // TODO Auto-generated method stub
+      return null;
+    }
   }
 
   public YARNBSPJobClient(HamaConfiguration conf) {