Merged the trunk changes to branch HAMA-505-branch

git-svn-id: https://svn.apache.org/repos/asf/hama/branches/HAMA-505-branch@1369566 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/conf/hama-default.xml b/conf/hama-default.xml
index 2947745..ce36cf5 100644
--- a/conf/hama-default.xml
+++ b/conf/hama-default.xml
@@ -120,7 +120,12 @@
     <description>The maximum number of BSP tasks that will be run simultaneously 
     by a groom server.</description>
   </property>
-   <property>
+  <property>
+    <name>bsp.ft.enabled</name>
+    <value>false</value>
+    <description>Enable Fault Tolerance in BSP Task execution.</description>
+  </property>
+  <property>
     <name>bsp.checkpoint.enabled</name>
     <value>false</value>
     <description>Enable Hama to checkpoint the messages transferred among BSP tasks during the BSP synchronization period.</description>
diff --git a/core/src/main/java/org/apache/hama/Constants.java b/core/src/main/java/org/apache/hama/Constants.java
index bba8779..8b5cd36 100644
--- a/core/src/main/java/org/apache/hama/Constants.java
+++ b/core/src/main/java/org/apache/hama/Constants.java
@@ -60,6 +60,24 @@
   public static final String UTF8_ENCODING = "UTF-8";
 
   public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
+  
+  public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
+
+  public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
+
+  ////////////////////////////////////////
+  // Task scheduler related constants
+  // //////////////////////////////////////
+  
+  public static final String TASK_ALLOCATOR_CLASS = "bsp.taskalloc.class";
+  
+  // //////////////////////////////////////
+  // Fault tolerance related constants
+  // //////////////////////////////////////
+
+  public static final String FAULT_TOLERANCE_FLAG = "bsp.ft.enabled";
+  
+  public static final String FAULT_TOLERANCE_CLASS = "bsp.ft.class";
 
   // //////////////////////////////////////
   // Checkpointing related constants
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index 548e653..e4902a9 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -1025,7 +1025,7 @@
     return tokens;
   }
 
-  static class RawSplit implements Writable {
+  public static class RawSplit implements Writable {
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPMaster.java b/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
index 4babda6..33741ae 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
@@ -28,12 +28,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +48,8 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
 import org.apache.hama.http.HttpServer;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -56,18 +58,23 @@
 import org.apache.hama.monitor.fd.FDProvider;
 import org.apache.hama.monitor.fd.Supervisor;
 import org.apache.hama.monitor.fd.UDPSupervisor;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * BSPMaster is responsible to control all the groom servers and to manage bsp
- * jobs.
+ * jobs. It has the following responsibilities:
+ * <ol>
+ * <li> <b>Job submission</b>. BSPMaster is responsible for accepting new job
+ * requests and assigning the job to scheduler for scheduling BSP Tasks defined
+ * for the job.
+ * <li> <b>GroomServer monitoring</b> BSPMaster keeps track of all the groom 
+ * servers in the cluster. It is responsible for adding new grooms to the 
+ * cluster and keeping a tab on all the grooms and could blacklist a groom if 
+ * it get fails the availability requirement.
+ * <li> BSPMaster keeps track of all the task status for each job and handles
+ * the failure of job as requested by the jobs.  
+ * </ol>
  */
 public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
     GroomServerManager, Watcher, MonitorManager {
@@ -77,8 +84,7 @@
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private HamaConfiguration conf;
-  ZooKeeper zk = null;
-  private String bspRoot = null;
+  MasterSyncClient syncClient = null;
 
   /**
    * Constants for BSPMaster's status.
@@ -122,7 +128,7 @@
 
   // Jobs' Meta Data
   private Integer nextJobId = Integer.valueOf(1);
-  // clients
+  private int totalSubmissions = 0; // how many jobs has been submitted by clients
   private int totalTasks = 0; // currnetly running tasks
   private int totalTaskCapacity; // max tasks that groom server can run
 
@@ -141,6 +147,13 @@
 
   private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
 
+  /**
+   * ReportGroomStatusHandler keeps track of the status reported by each 
+   * Groomservers on the task they are executing currently. Based on the 
+   * status reported, it is responsible for issuing task recovery requests, 
+   * updating the job progress and other book keeping on currently running
+   * jobs. 
+   */
   private class ReportGroomStatusHandler implements DirectiveHandler {
 
     @Override
@@ -177,8 +190,13 @@
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (ts.getRunState() == TaskStatus.State.FAILED) {
-              jip.status.setRunState(JobStatus.FAILED);
-              jip.failedTask(tip, ts);
+              if(jip.handleFailure(tip)){
+                recoverTask(jip);
+              }
+              else {
+                jip.status.setRunState(JobStatus.FAILED);
+                jip.failedTask(tip, ts);
+              }
             }
             if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
               for (JobInProgressListener listener : jobInProgressListeners) {
@@ -192,6 +210,7 @@
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+              
               GroomProtocol worker = findGroomServer(tmpStatus);
               Directive d1 = new DispatchTasksDirective(
                   new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
@@ -444,11 +463,26 @@
     return conf.getLocalPath("bsp.local.dir", pathString);
   }
 
+  /**
+   * Starts the BSP Master process.
+   * @param conf The Hama configuration.
+   * @return an instance of BSPMaster
+   * @throws IOException
+   * @throws InterruptedException
+   */
   public static BSPMaster startMaster(HamaConfiguration conf)
       throws IOException, InterruptedException {
     return startMaster(conf, generateNewIdentifier());
   }
 
+  /**
+   * Starts the BSP Master process
+   * @param conf The Hama configuration
+   * @param identifier Identifier for the job.
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   */
   public static BSPMaster startMaster(HamaConfiguration conf, String identifier)
       throws IOException, InterruptedException {
     BSPMaster result = new BSPMaster(conf, identifier);
@@ -465,108 +499,20 @@
   }
 
   /**
-   * When start the cluster, cleans all zk nodes up.
-   * 
-   * @param conf
+   * Initialize the global synchronization client.
+   * @param conf Hama configuration.
    */
   private void initZK(HamaConfiguration conf) {
-    try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
-          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
-    } catch (IOException e) {
-      LOG.error("Exception during reinitialization!", e);
-    }
-
-    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
-        Constants.DEFAULT_ZOOKEEPER_ROOT);
-    Stat s = null;
-    if (zk != null) {
-      try {
-        s = zk.exists(bspRoot, false);
-      } catch (Exception e) {
-        LOG.error(s, e);
-      }
-
-      if (s == null) {
-        try {
-          zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
-              CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-          LOG.error(e);
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        }
-      } else {
-        this.clearZKNodes(zk);
-      }
-    }
+    this.syncClient = new ZKSyncBSPMasterClient();
+    this.syncClient.init(conf);
   }
 
   /**
-   * Clears all sub-children of node bspRoot
+   * Get a handle of the global synchronization client used by BSPMaster.
+   * @return The synchronization client.
    */
-  public void clearZKNodes(ZooKeeper zk) {
-    clearZKNodes(zk, bspRoot);
-  }
-
-  public static void clearZKNodes(ZooKeeper zk, String pPath) {
-    String path = pPath;
-    if (!path.startsWith("/")) {
-      path = "/" + path;
-      LOG.warn("Path did not start with /, adding it: " + path);
-    }
-    try {
-      Stat s = zk.exists(path, false);
-      if (s != null) {
-        clearZKNodesInternal(zk, path);
-      }
-
-    } catch (Exception e) {
-      LOG.warn("Could not clear zookeeper nodes.", e);
-    }
-  }
-
-  /**
-   * Clears all sub-children of node rooted at path.
-   */
-  private static void clearZKNodesInternal(ZooKeeper zk, String path)
-      throws KeeperException, InterruptedException {
-    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
-
-    if (list.size() == 0) {
-      return;
-
-    } else {
-      for (String node : list) {
-        clearZKNodes(zk, path + "/" + node);
-        zk.delete(path + "/" + node, -1); // delete any version of this node.
-      }
-    }
-  }
-
-  public void createJobRoot(String string) {
-    try {
-      zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT);
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
-    }
-  }
-
-  public void deleteJobRoot(String string) {
-    try {
-      for (String node : zk.getChildren("/" + string, this)) {
-        zk.delete("/" + string + "/" + node, 0);
-      }
-
-      zk.delete("/" + string, 0);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (KeeperException e) {
-      e.printStackTrace();
-    }
+  public MasterSyncClient getSyncClient(){
+    return this.syncClient;
   }
 
   /**
@@ -657,6 +603,11 @@
 
     JobInProgress job = new JobInProgress(jobID, new Path(jobFile), this,
         this.conf);
+    ++totalSubmissions;
+    if(LOG.isDebugEnabled()){
+      LOG.debug("Submitting job number = " + totalSubmissions + 
+          " id = " + job.getJobID());
+    }
     return addJob(jobID, job);
   }
 
@@ -804,6 +755,21 @@
     }
     return job.getStatus();
   }
+  
+  /**
+   * Recovers task in job. To be called when a particular task in a job has failed 
+   * and there is a need to schedule it on a machine.
+   */
+  private synchronized void recoverTask(JobInProgress job) {
+    ++totalSubmissions;
+    for (JobInProgressListener listener : jobInProgressListeners) {
+      try {
+        listener.recoverTaskInJob(job);
+      } catch (IOException ioe) {
+        LOG.error("Fail to alter Scheduler a job is added.", ioe);
+      }
+    }
+  }
 
   @Override
   public JobStatus[] jobsToComplete() throws IOException {
@@ -920,11 +886,14 @@
     }
   }
 
+  /**
+   * Shuts down the BSP Process and does the necessary clean up.
+   */
   public void shutdown() {
     try {
-      this.zk.close();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+      this.syncClient.close();
+    } catch (IOException e) {
+      LOG.error("Error closing the sync client",e);
     }
     if (null != this.supervisor.get()) {
       this.supervisor.get().stop();
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
index 1c6745c..e7ba19e 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
@@ -19,14 +19,12 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -35,10 +33,13 @@
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
 import org.apache.hama.bsp.message.MessageQueue;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -53,10 +54,7 @@
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
-    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
-    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
-    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;
@@ -71,13 +69,9 @@
   private String[] allPeers;
 
   // SYNC
-  private SyncClient syncClient;
+  private PeerSyncClient syncClient;
   private MessageManager<M> messenger;
 
-  // A checkpoint is initiated at the <checkPointInterval>th interval.
-  private int checkPointInterval;
-  private long lastCheckPointStep;
-
   // IO
   private int partition;
   private String splitClass;
@@ -92,6 +86,8 @@
   private Counters counters;
   private Combiner<M> combiner;
 
+  private FaultTolerantPeerService<M> faultToleranceService;
+
   /**
    * Protected default constructor for LocalBSPRunner.
    */
@@ -123,6 +119,13 @@
     this.counters = counters;
   }
 
+  public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
+      BSPPeerProtocol umbilical, int partition, String splitClass,
+      BytesWritable split, Counters counters) throws Exception {
+    this(job, conf, taskId, umbilical, partition, splitClass, split, counters,
+        -1, TaskStatus.State.RUNNING);
+  }
+
   /**
    * BSPPeer Constructor.
    * 
@@ -136,7 +139,8 @@
   @SuppressWarnings("unchecked")
   public BSPPeerImpl(BSPJob job, Configuration conf, TaskAttemptID taskId,
       BSPPeerProtocol umbilical, int partition, String splitClass,
-      BytesWritable split, Counters counters) throws Exception {
+      BytesWritable split, Counters counters, long superstep,
+      TaskStatus.State state) throws Exception {
     this.conf = conf;
     this.taskId = taskId;
     this.umbilical = umbilical;
@@ -149,28 +153,29 @@
 
     this.fs = FileSystem.get(conf);
 
-    this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
-        Constants.DEFAULT_CHECKPOINT_INTERVAL);
-    this.lastCheckPointStep = 0;
-
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
-    initialize();
-    syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
-        peerAddress.getPort());
-    // initial barrier syncing to get all the hosts to the same point, to get
-    // consistent peernames.
-    syncClient.enterBarrier(taskId.getJobID(), taskId, -1);
-    syncClient.leaveBarrier(taskId.getJobID(), taskId, -1);
-    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
-        TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
-        TaskStatus.Phase.STARTING, counters));
 
-    messenger = MessageManagerFactory.getMessageManager(conf);
-    messenger.init(taskId, this, conf, peerAddress);
+    initializeIO();
+    initializeSyncService(superstep, state);
+
+    TaskStatus.Phase phase = TaskStatus.Phase.STARTING;
+    String stateString = "running";
+    if (state == TaskStatus.State.RECOVERING) {
+      phase = TaskStatus.Phase.RECOVERING;
+      stateString = "recovering";
+    }
+
+    setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f, state,
+        stateString, peerAddress.getHostName(), phase, counters));
+
+    initilizeMessaging();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initialized Messaging service.");
+    }
 
     final String combinerName = conf.get("bsp.combiner.class");
     if (combinerName != null) {
@@ -178,12 +183,57 @@
           conf.getClassByName(combinerName), conf);
     }
 
+    if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Fault tolerance enabled.");
+      }
+      if (superstep > 0)
+        conf.setInt("attempt.superstep", (int) superstep);
+      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
+          AsyncRcvdMsgCheckpointImpl.class, BSPFaultTolerantService.class);
+      if (ftClass != null) {
+        if (superstep > 0) {
+          counters.incrCounter(PeerCounter.SUPERSTEP_SUM, superstep);
+        }
+
+        this.faultToleranceService = ((BSPFaultTolerantService<M>) ReflectionUtils
+            .newInstance(ftClass, null)).constructPeerFaultTolerance(job, this,
+            syncClient, peerAddress, this.taskId, superstep, conf, messenger);
+        TaskStatus.State newState = this.faultToleranceService
+            .onPeerInitialized(state);
+
+        if (state == TaskStatus.State.RECOVERING) {
+          if (newState == TaskStatus.State.RUNNING) {
+            phase = TaskStatus.Phase.STARTING;
+            stateString = "running";
+            state = newState;
+          }
+
+          setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 1.0f,
+              state, stateString, peerAddress.getHostName(), phase, counters));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("State after FT service initialization - "
+                + newState.toString());
+          }
+
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initialized fault tolerance service");
+        }
+      }
+    }
+    doFirstSync(superstep);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info(new StringBuffer("BSP Peer successfully initialized for ")
+          .append(this.taskId.toString()).append(" ").append(superstep)
+          .toString());
+    }
   }
 
   @SuppressWarnings("unchecked")
   public final void initialize() throws Exception {
-    syncClient = SyncServiceFactory.getSyncClient(conf);
-    syncClient.init(conf, taskId.getJobID(), taskId);
 
     initInput();
 
@@ -235,6 +285,50 @@
     }
   }
 
+  public final void initilizeMessaging() throws ClassNotFoundException {
+    messenger = MessageManagerFactory.getMessageManager(conf);
+    messenger.init(taskId, this, conf, peerAddress);
+  }
+
+  public final void initializeSyncService(long superstep, TaskStatus.State state)
+      throws Exception {
+
+    syncClient = SyncServiceFactory.getPeerSyncClient(conf);
+    syncClient.init(conf, taskId.getJobID(), taskId);
+    syncClient.register(taskId.getJobID(), taskId, peerAddress.getHostName(),
+        peerAddress.getPort());
+  }
+
+  private void doFirstSync(long superstep) throws SyncException {
+    if (superstep > 0)
+      --superstep;
+    syncClient.enterBarrier(taskId.getJobID(), taskId, superstep);
+    syncClient.leaveBarrier(taskId.getJobID(), taskId, superstep);
+  }
+
+  @SuppressWarnings("unchecked")
+  public final void initializeIO() throws Exception {
+
+    initInput();
+
+    String outdir = null;
+    if (conf.get("bsp.output.dir") != null) {
+      Path outputDir = new Path(conf.get("bsp.output.dir",
+          "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+      outdir = outputDir.makeQualified(fs).toString();
+    }
+    outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
+    final RecordWriter<K2, V2> finalOut = outWriter;
+
+    collector = new OutputCollector<K2, V2>() {
+      @Override
+      public void collect(K2 key, V2 value) throws IOException {
+        finalOut.write(key, value);
+      }
+    };
+
+  }
+
   @Override
   public final M getCurrentMessage() throws IOException {
     return messenger.getCurrentMessage();
@@ -247,85 +341,50 @@
   }
 
   /*
-   * returns true if the peer would checkpoint in the next sync.
-   */
-  public final boolean isReadyToCheckpoint() {
-
-    checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
-    if (LOG.isDebugEnabled())
-      LOG.debug(new StringBuffer(1000).append("Enabled = ")
-          .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
-          .append(" checkPointInterval = ").append(checkPointInterval)
-          .append(" lastCheckPointStep = ").append(lastCheckPointStep)
-          .append(" getSuperstepCount() = ").append(getSuperstepCount())
-          .toString());
-
-    return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
-        && (checkPointInterval != 0) && (((int) (getSuperstepCount() - lastCheckPointStep)) >= checkPointInterval));
-
-  }
-
-  private final String checkpointedPath() {
-    String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
-    String ckptPath = backup + bspJob.getJobID().toString() + "/"
-        + getSuperstepCount() + "/" + this.taskId.toString();
-    if (LOG.isDebugEnabled())
-      LOG.debug("Messages are to be saved to " + ckptPath);
-    return ckptPath;
-  }
-
-  final void checkpoint(String checkpointedPath, BSPMessageBundle<M> bundle) {
-    FSDataOutputStream out = null;
-    try {
-      out = this.fs.create(new Path(checkpointedPath));
-      bundle.write(out);
-    } catch (IOException ioe) {
-      LOG.warn("Fail checkpointing messages to " + checkpointedPath, ioe);
-    } finally {
-      try {
-        if (null != out)
-          out.close();
-      } catch (IOException e) {
-        LOG.warn("Fail to close dfs output stream while checkpointing.", e);
-      }
-    }
-  }
-
-  /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.BSPPeerInterface#sync()
    */
   @Override
   public final void sync() throws IOException, SyncException,
       InterruptedException {
-    long startBarrier = System.currentTimeMillis();
-    enterBarrier();
+
     // normally all messages should been send now, finalizing the send phase
     messenger.finishSendPhase();
     Iterator<Entry<InetSocketAddress, MessageQueue<M>>> it = messenger
         .getMessageIterator();
 
-    boolean shouldCheckPoint;
-
-    if ((shouldCheckPoint = isReadyToCheckpoint())) {
-      lastCheckPointStep = getSuperstepCount();
-    }
-
     while (it.hasNext()) {
       Entry<InetSocketAddress, MessageQueue<M>> entry = it.next();
       final InetSocketAddress addr = entry.getKey();
       final Iterable<M> messages = entry.getValue();
 
       final BSPMessageBundle<M> bundle = combineMessages(messages);
-
-      if (shouldCheckPoint) {
-        checkpoint(checkpointedPath(), bundle);
-      }
-
       // remove this message during runtime to save a bit of memory
       it.remove();
+      try {
+        messenger.transfer(addr, bundle);
+      } catch (Exception e) {
+        LOG.error("Error while sending messages", e);
+      }
+    }
 
-      messenger.transfer(addr, bundle);
+    if (this.faultToleranceService != null) {
+      try {
+        this.faultToleranceService.beforeBarrier();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    long startBarrier = System.currentTimeMillis();
+    enterBarrier();
+
+    if (this.faultToleranceService != null) {
+      try {
+        this.faultToleranceService.duringBarrier();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
     }
 
     leaveBarrier();
@@ -336,9 +395,38 @@
 
     currentTaskStatus.setCounters(counters);
 
+    if (this.faultToleranceService != null) {
+      try {
+        this.faultToleranceService.afterBarrier();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
     umbilical.statusUpdate(taskId, currentTaskStatus);
     // Clear outgoing queues.
     messenger.clearOutgoingQueues();
+
+    // int msgsCount = -1;
+    // if (shouldCheckPoint) {
+    // msgsCount = checkpointReceivedMessages(checkpointedReceivePath());
+    // }
+    //
+    // this.syncClient.storeInformation(this.syncClient.constructKey(
+    // this.bspJob.getJobID(), "checkpoint", String.valueOf(getPeerIndex())),
+    // new IntWritable(msgsCount), false, null);
+
+    // if (msgsCount >= 0) {
+    // ArrayWritable writableArray = new ArrayWritable(IntWritable.class);
+    // Writable[] writeArr = new Writable[2];
+    // writeArr[0] = new IntWritable((int) getSuperstepCount());
+    // writeArr[1] = new IntWritable(msgsCount);
+    // writableArray.set(writeArr);
+    // this.syncClient.storeInformation(
+    // this.syncClient.constructKey(this.bspJob.getJobID(), "checkpoint",
+    // String.valueOf(getPeerIndex())), writableArray, true, null);
+    // }
+
   }
 
   private final BSPMessageBundle<M> combineMessages(Iterable<M> messages) {
@@ -420,8 +508,7 @@
 
   @Override
   public int getPeerIndex() {
-    initPeerNames();
-    return Arrays.binarySearch(getAllPeerNames(), getPeerName());
+    return this.taskId.getTaskID().getId();
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPTask.java b/core/src/main/java/org/apache/hama/bsp/BSPTask.java
index 9212317..679d053 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPTask.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPTask.java
@@ -69,7 +69,7 @@
       boolean shouldKillSelf = false;
       try {
         if (LOG.isDebugEnabled())
-          LOG.debug("Pinging at time " + Calendar.getInstance().toString());
+          LOG.debug("Pinging at time " + Calendar.getInstance().getTimeInMillis());
         // if the RPC call returns false, it means that groomserver does not
         // have knowledge of this task.
         shouldKillSelf = !(pingRPC.ping(taskId) && bspThread.isAlive());
diff --git a/core/src/main/java/org/apache/hama/bsp/GroomServer.java b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
index c63c8d6..5ba7b1e 100644
--- a/core/src/main/java/org/apache/hama/bsp/GroomServer.java
+++ b/core/src/main/java/org/apache/hama/bsp/GroomServer.java
@@ -163,19 +163,20 @@
       }
 
       if (actions != null) {
-        assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
+        // assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
         int prevPort = Constants.DEFAULT_PEER_PORT;
 
         for (GroomServerAction action : actions) {
           if (action instanceof LaunchTaskAction) {
             Task t = ((LaunchTaskAction) action).getTask();
 
-            prevPort = BSPNetUtils.getNextAvailable(prevPort);
-            assignedPeerNames.put(t.getTaskID(), prevPort);
-
+            synchronized (assignedPeerNames) {
+              prevPort = BSPNetUtils.getNextAvailable(prevPort);
+              assignedPeerNames.put(t.getTaskID(), prevPort);
+            }
             LOG.info("Launch " + actions.length + " tasks.");
             startNewTask((LaunchTaskAction) action);
-          } else {
+          } else if (action instanceof KillTaskAction) {
 
             // TODO Use the cleanup thread
             // tasksToCleanup.put(action);
@@ -187,11 +188,30 @@
               tip.taskStatus.setRunState(TaskStatus.State.FAILED);
               try {
                 tip.killAndCleanup(false);
+                tasks.remove(killAction.getTaskID());
+                runningTasks.remove(killAction.getTaskID());
               } catch (IOException ioe) {
                 throw new DirectiveException("Error when killing a "
                     + "TaskInProgress.", ioe);
               }
             }
+          } else if (action instanceof RecoverTaskAction) {
+            LOG.info("Recovery action task.");
+            RecoverTaskAction recoverAction = (RecoverTaskAction) action;
+            Task t = recoverAction.getTask();
+            LOG.info("Recovery action task." + t.getTaskID());
+            synchronized (assignedPeerNames) {
+              prevPort = BSPNetUtils.getNextAvailable(prevPort);
+              assignedPeerNames.put(t.getTaskID(), prevPort);
+            }
+            try {
+              startRecoveryTask(recoverAction);
+            } catch (IOException e) {
+              throw new DirectiveException(
+                  new StringBuffer().append("Error starting the recovery task")
+                  .append(t.getTaskID()).toString(),
+                  e);
+            }
           }
         }
       }
@@ -321,6 +341,8 @@
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
     this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+    this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
+        2 * this.maxCurrentTasks);
 
     int rpcPort = -1;
     String rpcAddr = null;
@@ -571,6 +593,61 @@
     }
   }
 
+  private void startRecoveryTask(RecoverTaskAction action) throws IOException {
+    Task t = action.getTask();
+    BSPJob jobConf = null;
+    try {
+      jobConf = new BSPJob(t.getJobID(), t.getJobFile());
+    } catch (IOException e1) {
+      LOG.error(e1);
+      throw e1;
+    }
+
+    TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
+    tip.markAsRecoveryTask(action.getSuperstepCount());
+    synchronized (this) {
+      if (tasks.containsKey(t.getTaskID())) {
+        TaskInProgress oldTip = tasks.get(t.getTaskID());
+        try {
+          oldTip.killRunner();
+        } catch (IOException e) {
+          LOG.error("Error killing the current process for " + t.getTaskID(), e);
+          throw e;
+        }
+      }
+
+      Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
+      while(taskIterator.hasNext()){
+        TaskAttemptID taskAttId = taskIterator.next();
+        if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
+          if(LOG.isDebugEnabled()){
+            LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
+          }
+          taskIterator.remove();
+          runningTasks.remove(taskAttId);
+        }
+      }
+      
+      tasks.put(t.getTaskID(), tip);
+      runningTasks.put(t.getTaskID(), tip);
+    }
+    try {
+      localizeJob(tip);
+    } catch (Throwable e) {
+      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+          .stringifyException(e));
+      LOG.warn(msg);
+      
+      try {
+        tip.killAndCleanup(true);
+      } catch (IOException ie2) {
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
+            + StringUtils.stringifyException(ie2));
+      }
+      throw new IOException("Errro localizing the job.",e);
+    }
+  }
+
   /**
    * Update and report refresh status back to BSPMaster.
    */
@@ -730,13 +807,20 @@
             + " monitorPeriod = "
             + monitorPeriod
             + " check = "
-            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && 
+                (((tip.lastPingedTimestamp == 0 && 
+                ((currentTime - tip.startTime) > 10 * monitorPeriod)) || 
+                ((tip.lastPingedTimestamp > 0) && 
+                    (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
 
       // Task is out of contact if it has not pinged since more than
       // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
       // to get started.
       if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
-          && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
+          && (((tip.lastPingedTimestamp == 0 
+          && ((currentTime - tip.startTime) > 10 * monitorPeriod)) 
+            || ((tip.lastPingedTimestamp > 0) 
+                && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))) {
 
         LOG.info("adding purge task: " + tip.getTask().getTaskID());
 
@@ -891,6 +975,7 @@
 
     private long startTime = 0L;
     private volatile long lastPingedTimestamp = 0L;
+    private long startSuperstepCount = -1;
 
     public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
       this.task = task;
@@ -901,6 +986,15 @@
           TaskStatus.Phase.STARTING, task.getCounters());
     }
 
+    public void markAsRecoveryTask(long superstepNumber) {
+      if (this.taskStatus.getRunState() != TaskStatus.State.FAILED) {
+        this.taskStatus.setRunState(TaskStatus.State.RECOVERING);
+        this.taskStatus.setPhase(TaskStatus.Phase.RECOVERING);
+        this.taskStatus.setStateString("recovering");
+      }
+      this.startSuperstepCount = superstepNumber;
+    }
+
     private void localizeTask(Task task) throws IOException {
       Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
           + task.getTaskID() + "/job.xml");
@@ -954,8 +1048,22 @@
 
       // runner could be null if task-cleanup attempt is not localized yet
       if (runner != null) {
+        if(LOG.isDebugEnabled()){
+          LOG.debug("Killing process for " + this.task.getTaskID());
+        }
         runner.killBsp();
       }
+      runner = null;
+    }
+
+    public synchronized void killRunner() throws IOException {
+      if (runner != null) {
+        if(LOG.isDebugEnabled()){
+          LOG.debug("Killing process for " + this.task.getTaskID());
+        }
+        runner.killBsp();
+      }
+      runner = null;
     }
 
     /**
@@ -1143,6 +1251,11 @@
         defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
       }
       defaultConf.setInt(Constants.PEER_PORT, peerPort);
+      
+      long superstep = Long.parseLong(args[4]);
+      TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
+      LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
+
 
       try {
         // use job-specified working directory
@@ -1153,7 +1266,7 @@
         @SuppressWarnings("rawtypes")
         final BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job,
             defaultConf, taskid, umbilical, task.partition, task.splitClass,
-            task.split, task.getCounters());
+            task.split, task.getCounters(), superstep, state);
 
         task.run(job, bspPeer, umbilical); // run the task
 
@@ -1195,6 +1308,24 @@
     }
   }
 
+  public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      return tip.getStatus();
+    } else {
+      return null;
+    }
+  }
+
+  public long getStartSuperstep(TaskAttemptID taskid) {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      return tip.startSuperstepCount;
+    } else {
+      return -1L;
+    }
+  }
+
   @Override
   public boolean ping(TaskAttemptID taskid) throws IOException {
     TaskInProgress tip = runningTasks.get(taskid);
@@ -1220,8 +1351,6 @@
   @Override
   public void fsError(TaskAttemptID taskId, String message) throws IOException {
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
-    // TODO
-
   }
 
   @Override
@@ -1254,8 +1383,6 @@
 
   @Override
   public void process(WatchedEvent event) {
-    // TODO Auto-generated method stub
-
   }
 
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java b/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
index bdc9857..658a4a2 100644
--- a/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
+++ b/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
@@ -28,7 +28,7 @@
  * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
  */
-abstract class GroomServerAction implements Writable {
+public abstract class GroomServerAction implements Writable {
 
   /**
    * Ennumeration of various 'actions' that the {@link BSPMaster} directs the
@@ -49,7 +49,13 @@
     REINIT_GROOM,
 
     /** Ask a task to save its output. */
-    COMMIT_TASK
+    COMMIT_TASK,
+
+    /** Recover a task from failure. */
+    RECOVER_TASK,
+
+    /** Update information on a peer. */
+    UPDATE_PEER
   };
 
   /**
@@ -73,7 +79,17 @@
       case KILL_JOB: {
         action = new KillJobAction();
       }
-        break;
+      break;
+      case RECOVER_TASK:
+      {
+        action = new RecoverTaskAction();
+      }
+      break;
+      case UPDATE_PEER:
+      {
+        action = new UpdatePeerAction();
+      }
+      break;
       case REINIT_GROOM: {
         action = new ReinitGroomAction();
       }
diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
index 4da7d90..3630429 100644
--- a/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
+++ b/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
@@ -21,9 +21,11 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,13 +34,21 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.Constants;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.BSPFaultTolerantService;
+import org.apache.hama.bsp.ft.FaultTolerantMasterService;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+import org.apache.hama.util.ReflectionUtils;
 
 /**
  * JobInProgress maintains all the info for keeping a Job on the straight and
  * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
  * tables for doing bookkeeping of its Tasks.ss
  */
-class JobInProgress {
+public class JobInProgress {
 
   /**
    * Used when the a kill is issued to a job which is initializing.
@@ -73,6 +83,8 @@
   long launchTime;
   long finishTime;
 
+  int maxTaskAttempts;
+
   private String jobName;
 
   // private LocalFileSystem localFs;
@@ -88,11 +100,38 @@
   String jobSplit;
 
   Map<Task, GroomServerStatus> taskToGroomMap;
+
   // Used only for scheduling!
-  Map<GroomServerStatus, Integer> tasksInGroomMap;
+  Map<GroomServerStatus, Integer> taskCountInGroomMap;
+
+  // If the task does not exist as key, it implies that the task did not fail
+  // before.
+  // Value in the map implies the attempt ID for which the key(task) was
+  // re-attempted before.
+  Map<Task, Integer> taskReattemptMap;
+
+  Set<TaskInProgress> recoveryTasks;
+
+  // This set keeps track of the tasks that have failed.
+  Set<Task> failedTasksTillNow;
 
   private int taskCompletionEventTracker = 0;
 
+  private TaskAllocationStrategy taskAllocationStrategy;
+
+  private FaultTolerantMasterService faultToleranceService;
+  
+  /**
+   * Used only for unit tests.
+   * @param jobId
+   * @param conf
+   */
+  public JobInProgress(BSPJobID jobId, Configuration conf){
+    this.conf = conf;
+    this.jobId = jobId;
+    master = null;
+  }
+
   public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
       Configuration conf) throws IOException {
     this.conf = conf;
@@ -101,10 +140,6 @@
     this.jobFile = jobFile;
     this.master = master;
 
-    this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
-
-    this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
-
     this.status = new JobStatus(jobId, null, 0L, 0L,
         JobStatus.State.PREP.value(), counters);
     this.startTime = System.currentTimeMillis();
@@ -126,6 +161,9 @@
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
         numBSPTasks + 10);
 
+    this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS,
+        Constants.DEFAULT_MAX_TASK_ATTEMPTS);
+
     this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
         job.getJobName());
 
@@ -139,6 +177,8 @@
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
     }
 
+    failedTasksTillNow = new HashSet<Task>(2 * tasks.length);
+
   }
 
   public JobProfile getProfile() {
@@ -228,25 +268,55 @@
       this.tasks = new TaskInProgress[numBSPTasks];
       for (int i = 0; i < numBSPTasks; i++) {
         tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
-            splits[i], this.master, this.conf, this, i);
+            splits[i], this.conf, this, i);
       }
     } else {
       this.tasks = new TaskInProgress[numBSPTasks];
       for (int i = 0; i < numBSPTasks; i++) {
         tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
-            null, this.master, this.conf, this, i);
+            null, this.conf, this, i);
       }
     }
+    this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);
+
+    this.taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>();
+
+    this.recoveryTasks = new HashSet<TaskInProgress>(2 * tasks.length);
 
     // Update job status
     this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
         0L, 0L, JobStatus.RUNNING, counters);
 
     // delete all nodes belonging to that job before start
-    BSPMaster.clearZKNodes(master.zk, this.getJobID().toString());
-    master.createJobRoot(this.getJobID().toString());
+    MasterSyncClient syncClient = master.getSyncClient();
+    syncClient.registerJob(this.getJobID().toString());
 
     tasksInited = true;
+
+    Class<?> taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS,
+        BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class);
+    this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils
+        .newInstance(taskAllocatorClass, new Object[0]);
+
+    if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {
+
+      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS, 
+          AsyncRcvdMsgCheckpointImpl.class ,
+          BSPFaultTolerantService.class);
+      if (ftClass != null) {
+        try {
+          faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
+              .newInstance(ftClass, new Object[0]))
+              .constructMasterFaultTolerance(jobId, maxTaskAttempts, tasks,
+                  conf, master.getSyncClient(), taskAllocationStrategy);
+          LOG.info("Initialized fault tolerance service with "
+              + ftClass.getCanonicalName());
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+    }
+
     LOG.info("Job is initialized.");
   }
 
@@ -268,29 +338,54 @@
     }
 
     Task result = null;
+    BSPResource[] resources = new BSPResource[0];
 
-    try {
-      for (int i = 0; i < tasks.length; i++) {
-        if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
-          result = tasks[i].getTaskToRun(groomStatuses, tasksInGroomMap);
-          if (result != null)
-            this.taskToGroomMap.put(result, tasks[i].getGroomServerStatus());
-          int taskInGroom = 0;
-          if (tasksInGroomMap.containsKey(tasks[i].getGroomServerStatus())) {
-            taskInGroom = tasksInGroomMap.get(tasks[i].getGroomServerStatus());
-          }
-          tasksInGroomMap.put(tasks[i].getGroomServerStatus(), taskInGroom + 1);
-          break;
+    for (int i = 0; i < tasks.length; i++) {
+      if (!tasks[i].isRunning() && !tasks[i].isComplete()) {
+
+        String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+            groomStatuses, taskCountInGroomMap, resources, tasks[i]);
+        GroomServerStatus groomStatus = taskAllocationStrategy
+            .getGroomToAllocate(groomStatuses, selectedGrooms,
+                taskCountInGroomMap, resources, tasks[i]);
+        if (groomStatus != null)
+          result = tasks[i].constructTask(groomStatus);
+        if (result != null) {
+          updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
         }
+        break;
       }
-
-    } catch (IOException e) {
-      LOG.error("Exception while obtaining new task!", e);
     }
+
     counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
     return result;
   }
 
+  public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+      throws IOException {
+
+    if (this.faultToleranceService == null)
+      return;
+
+    try {
+      this.faultToleranceService.recoverTasks(this, groomStatuses,
+          fetchAndClearTasksToRecover(), tasks, taskCountInGroomMap, actionMap);
+    } catch (IOException e) {
+      throw e;
+    }
+  }
+
+  private void updateGroomTaskDetails(GroomServerStatus groomStatus, Task task) {
+    taskToGroomMap.put(task, groomStatus);
+    int tasksInGroom = 0;
+
+    if (taskCountInGroomMap.containsKey(groomStatus)) {
+      tasksInGroom = taskCountInGroomMap.get(groomStatus);
+    }
+    taskCountInGroomMap.put(groomStatus, tasksInGroom + 1);
+  }
+
   /**
    * Hosts that tasks run on.
    * 
@@ -304,6 +399,14 @@
     return list;
   }
 
+  /**
+   * Mark the completed task status. If all the tasks are completed the status
+   * of the job is updated to notify the client on the completion of the whole
+   * job.
+   * 
+   * @param tip <code>TaskInProgress</code> object representing task.
+   * @param status The completed task status
+   */
   public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
     TaskAttemptID taskid = status.getTaskId();
     updateTaskStatus(tip, status);
@@ -332,12 +435,18 @@
       LOG.info("Job successfully done.");
 
       // delete job root
-      master.deleteJobRoot(this.getJobID().toString());
+      master.getSyncClient().deregisterJob(this.getJobID().toString());
 
       garbageCollect();
     }
   }
 
+  /**
+   * Mark failure of a task.
+   * 
+   * @param tip <code>TaskInProgress</code> object representing task.
+   * @param status The failed task status
+   */
   public void failedTask(TaskInProgress tip, TaskStatus status) {
     TaskAttemptID taskid = status.getTaskId();
     updateTaskStatus(tip, status);
@@ -353,8 +462,6 @@
       }
     }
 
-    // TODO
-
     if (!allDone) {
       // Kill job
       this.kill();
@@ -371,6 +478,12 @@
     }
   }
 
+  /**
+   * Updates the task status of the task.
+   * 
+   * @param tip <code>TaskInProgress</code> representing task
+   * @param taskStatus The status of the task.
+   */
   public synchronized void updateTaskStatus(TaskInProgress tip,
       TaskStatus taskStatus) {
     TaskAttemptID taskid = taskStatus.getTaskId();
@@ -414,6 +527,9 @@
     }
   }
 
+  /**
+   * Kill the job.
+   */
   public synchronized void kill() {
     if (status.getRunState() != JobStatus.KILLED) {
       this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
@@ -438,6 +554,12 @@
    */
   synchronized void garbageCollect() {
     try {
+      
+      if(LOG.isDebugEnabled()){
+        LOG.debug("Removing " + localJobFile + " and " + localJarFile
+            + " getJobFile = " + profile.getJobFile());
+      }
+      
       // Definitely remove the local-disk copy of the job file
       if (localJobFile != null) {
         localFs.delete(localJobFile, true);
@@ -501,4 +623,62 @@
     return events;
   }
 
+  /**
+   * Returns the configured maximum number of times the task could be
+   * re-attempted.
+   */
+  int getMaximumReAttempts() {
+    return maxTaskAttempts;
+  }
+
+  /**
+   * Returns true if the task should be restarted on failure. It also causes
+   * JobInProgress object to maintain state of the restart request.
+   */
+  synchronized boolean handleFailure(TaskInProgress tip) {
+    if (this.faultToleranceService == null
+        || (!faultToleranceService.isRecoveryPossible(tip)))
+      return false;
+
+    if (!faultToleranceService.isAlreadyRecovered(tip)) {
+      if(LOG.isDebugEnabled()){
+        LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
+      }
+      recoveryTasks.add(tip);
+      status.setRunState(JobStatus.RECOVERING);
+      return true;
+    }
+    else if(LOG.isDebugEnabled()){
+      LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
+    }
+    return false;
+    
+  }
+  
+  
+  /**
+   * 
+   * @return Returns the list of tasks in progress that has to be recovered.
+   */
+  synchronized TaskInProgress[] fetchAndClearTasksToRecover() {
+    TaskInProgress[] failedTasksInProgress = new TaskInProgress[recoveryTasks
+        .size()];
+    recoveryTasks.toArray(failedTasksInProgress);
+
+    recoveryTasks.clear();
+    return failedTasksInProgress;
+  }
+
+  public boolean isRecoveryPending() {
+    return recoveryTasks.size() != 0;
+  }
+
+  public Set<Task> getTaskSet() {
+    return taskToGroomMap.keySet();
+  }
+
+  public FaultTolerantMasterService getFaultToleranceService() {
+    return this.faultToleranceService;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java b/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
index f5db6e4..746aa89 100644
--- a/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
+++ b/core/src/main/java/org/apache/hama/bsp/JobInProgressListener.java
@@ -40,5 +40,13 @@
    * @throws IOException
    */
   public abstract void jobRemoved(JobInProgress job) throws IOException;
+  
+  /**
+   * Invoked when a task in job has to be recovered by {@link BSPMaster}.
+   * @param job The job to which the task belongs to.
+   * @param task that has to be recovered
+   * @throws IOException
+   */
+  public abstract void recoverTaskInJob(JobInProgress job) throws IOException;
 
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/JobStatus.java b/core/src/main/java/org/apache/hama/bsp/JobStatus.java
index 1382770..c0f7e8a 100644
--- a/core/src/main/java/org/apache/hama/bsp/JobStatus.java
+++ b/core/src/main/java/org/apache/hama/bsp/JobStatus.java
@@ -44,7 +44,7 @@
   }
 
   public static enum State {
-    RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
+    RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5), RECOVERING(6);
     int s;
 
     State(int s) {
@@ -74,6 +74,9 @@
         case KILLED:
           name = "KILLED";
           break;
+        case RECOVERING:
+          name = "RECOVERING";
+          break;
       }
 
       return name;
@@ -86,6 +89,7 @@
   public static final int FAILED = 3;
   public static final int PREP = 4;
   public static final int KILLED = 5;
+  public static final int RECOVERING = 6;
 
   private BSPJobID jobid;
   private long progress;
diff --git a/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java b/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
index 0f13efb..b99e6f4 100644
--- a/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
+++ b/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
@@ -23,7 +23,7 @@
 
 /**
  * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
- * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ * {@link org.apache.hama.bsp.GroomServer} to launch a recovery task.
  */
 class LaunchTaskAction extends GroomServerAction {
   private Task task;
diff --git a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
index 328a940..97113ec 100644
--- a/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
@@ -44,7 +44,10 @@
 import org.apache.hama.bsp.message.AbstractMessageManager;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
 import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -118,7 +121,7 @@
 
     conf.setClass(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
         LocalMessageManager.class, MessageManager.class);
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalSyncClient.class,
+    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS, LocalSyncClient.class,
         SyncClient.class);
 
     BSPJob job = new BSPJob(new HamaConfiguration(conf), jobID);
@@ -399,7 +402,7 @@
 
   }
 
-  public static class LocalSyncClient implements SyncClient {
+  public static class LocalSyncClient extends BSPPeerSyncClient {
     // note that this is static, because we will have multiple peers
     private static CyclicBarrier barrier;
     private int tasks;
@@ -461,9 +464,52 @@
     }
 
     @Override
-    public void close() throws InterruptedException {
+    public void close() {
       barrier = null;
     }
+
+    @Override
+    public String constructKey(BSPJobID jobId, String... args) {
+      return null;
+    }
+
+    @Override
+    public boolean storeInformation(String key, Writable value,
+        boolean permanent, SyncEventListener listener) {
+      return false;
+    }
+
+    @Override
+    public boolean getInformation(String key, Writable valueHolder) {
+      return false;
+    }
+
+    @Override
+    public boolean addKey(String key, boolean permanent,
+        SyncEventListener listener) {
+      return false;
+    }
+
+    @Override
+    public boolean hasKey(String key) {
+      return false;
+    }
+
+    @Override
+    public String[] getChildKeySet(String key, SyncEventListener listener) {
+      return null;
+    }
+
+    @Override
+    public boolean registerListener(String key, SyncEvent event,
+        SyncEventListener listener) {
+      return false;
+    }
+
+    @Override
+    public boolean remove(String key, SyncEventListener listener) {
+      return false;
+    }
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java b/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
new file mode 100644
index 0000000..bd914a3
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+public class RecoverTaskAction extends GroomServerAction {
+  private Task task;
+  private LongWritable superstepNumber;
+
+  public RecoverTaskAction() {
+    super(ActionType.RECOVER_TASK);
+    superstepNumber = new LongWritable(-1L);
+  }
+
+  public RecoverTaskAction(Task task, long superstep) {
+    super(ActionType.RECOVER_TASK);
+    this.task = task;
+    this.superstepNumber = new LongWritable(superstep);
+  }
+
+  public Task getTask() {
+    return task;
+  }
+  
+  public long getSuperstepCount(){
+    return superstepNumber.get();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    task.write(out);
+    superstepNumber.write(out);
+    
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    task = new BSPTask();
+    task.readFields(in);
+    superstepNumber.readFields(in);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java b/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
index 6124969..0d978d7 100644
--- a/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
+++ b/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
@@ -43,6 +43,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.monitor.Federator;
 import org.apache.hama.monitor.Federator.Act;
@@ -121,6 +122,12 @@
     public void jobRemoved(JobInProgress job) throws IOException {
       queueManager.get().moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
     }
+
+    @Override
+    public void recoverTaskInJob(JobInProgress job) throws IOException {
+      queueManager.get().addJob(WAIT_QUEUE, job);
+    }
+
   }
 
   private class JobProcessor extends Thread implements Schedulable {
@@ -221,11 +228,10 @@
         throw new NullPointerException("No job is specified.");
     }
 
-    @Override
-    public Boolean call() {
+    private Boolean scheduleNewTasks() {
 
       // Action to be sent for each task to the respective groom server.
-      Map<GroomServerStatus, List<LaunchTaskAction>> actionMap = new HashMap<GroomServerStatus, List<LaunchTaskAction>>(
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
           2 * this.groomStatuses.size());
       Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
       Task t = null;
@@ -240,6 +246,7 @@
 
       // if all tasks could not be scheduled
       if (cnt != this.jip.tasks.length) {
+        LOG.error("Could not schedule all tasks!");
         return Boolean.FALSE;
       }
 
@@ -248,21 +255,49 @@
       while (taskIter.hasNext()) {
         Task task = taskIter.next();
         GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
-        List<LaunchTaskAction> taskActions = actionMap.get(groomStatus);
+        List<GroomServerAction> taskActions = actionMap.get(groomStatus);
         if (taskActions == null) {
-          taskActions = new ArrayList<LaunchTaskAction>(
+          taskActions = new ArrayList<GroomServerAction>(
               groomStatus.getMaxTasks());
         }
         taskActions.add(new LaunchTaskAction(task));
         actionMap.put(groomStatus, taskActions);
       }
 
+      sendDirectivesToGrooms(actionMap);
+
+      return Boolean.TRUE;
+    }
+
+    /**
+     * Schedule recovery tasks.
+     * 
+     * @return TRUE object if scheduling is successful else returns FALSE
+     */
+    private Boolean scheduleRecoveryTasks() {
+
+      // Action to be sent for each task to the respective groom server.
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
+          2 * this.groomStatuses.size());
+
+      try {
+        jip.recoverTasks(groomStatuses, actionMap);
+      } catch (IOException e) {
+        return Boolean.FALSE;
+      }
+      return sendDirectivesToGrooms(actionMap);
+
+    }
+
+    private Boolean sendDirectivesToGrooms(
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
       Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator();
-      while (jip.getStatus().getRunState() == JobStatus.RUNNING
+      while ((jip.getStatus().getRunState() == JobStatus.RUNNING || jip
+          .getStatus().getRunState() == JobStatus.RECOVERING)
           && groomIter.hasNext()) {
 
         GroomServerStatus groomStatus = groomIter.next();
-        List<LaunchTaskAction> actionList = actionMap.get(groomStatus);
+        List<GroomServerAction> actionList = actionMap.get(groomStatus);
 
         GroomProtocol worker = groomServerManager.get().findGroomServer(
             groomStatus);
@@ -276,18 +311,29 @@
           LOG.error(
               "Fail to dispatch tasks to GroomServer "
                   + groomStatus.getGroomName(), ioe);
+          return Boolean.FALSE;
         }
 
       }
 
       if (groomIter.hasNext()
-          && jip.getStatus().getRunState() != JobStatus.RUNNING) {
+          && (jip.getStatus().getRunState() != JobStatus.RUNNING || jip
+              .getStatus().getRunState() != JobStatus.RECOVERING)) {
         LOG.warn("Currently master only shcedules job in running state. "
             + "This may be refined in the future. JobId:" + jip.getJobID());
+        return Boolean.FALSE;
       }
 
       return Boolean.TRUE;
     }
+
+    public Boolean call() {
+      if (jip.isRecoveryPending()) {
+        return scheduleRecoveryTasks();
+      } else {
+        return scheduleNewTasks();
+      }
+    }
   }
 
   /**
@@ -365,8 +411,10 @@
     this.jobProcessor.start();
     if (null != getConf()
         && getConf().getBoolean("bsp.federator.enabled", false)) {
-      this.scheduler.scheduleAtFixedRate(new JvmCollector(federator.get(),
-          ((BSPMaster) groomServerManager.get()).zk), 5, 5, SECONDS);
+      this.scheduler.scheduleAtFixedRate(
+          new JvmCollector(federator.get(),
+              ((ZKSyncBSPMasterClient) ((BSPMaster) groomServerManager.get())
+                  .getSyncClient()).getZK()), 5, 5, SECONDS);
     }
 
     if (null != monitorManager.get()) {
diff --git a/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java b/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
index d9e5211..87293db 100644
--- a/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
+++ b/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
@@ -34,7 +34,7 @@
  * TaskInProgress maintains all the info needed for a Task in the lifetime of
  * its owning Job.
  */
-class TaskInProgress {
+public class TaskInProgress {
   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
 
   private Configuration conf;
@@ -48,7 +48,6 @@
   // Job Meta
   private String jobFile = null;
   private int partition;
-  private BSPMaster bspMaster;
   private TaskID id;
   private JobInProgress job;
   private int completes = 0;
@@ -69,6 +68,8 @@
 
   // The first taskid of this tip
   private TaskAttemptID firstTaskId;
+  
+  private TaskAttemptID currentTaskId;
 
   // Map from task Id -> GroomServer Id, contains tasks that are
   // currently runnings
@@ -84,6 +85,8 @@
 
   private RawSplit rawSplit;
 
+  private int mySuperstep = -1;
+
   /**
    * Constructor for new nexus between BSPMaster and GroomServer.
    * 
@@ -99,12 +102,20 @@
     init(jobId);
   }
 
+  /**
+   * 
+   * @param jobId
+   * @param jobFile
+   * @param rawSplit
+   * @param conf
+   * @param job
+   * @param partition
+   */
   public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit,
-      BSPMaster master, Configuration conf, JobInProgress job, int partition) {
+      Configuration conf, JobInProgress job, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.rawSplit = rawSplit;
-    this.setBspMaster(master);
     this.job = job;
     this.setConf(conf);
     this.partition = partition;
@@ -112,18 +123,178 @@
     init(jobId);
   }
 
+  /**
+   * 
+   * @param jobId
+   */
   private void init(BSPJobID jobId) {
     this.id = new TaskID(jobId, partition);
     this.startTime = System.currentTimeMillis();
   }
 
   /**
-   * Return a Task that can be sent to a GroomServer for execution.
+   * 
+   * @param taskid
+   * @param grooms
+   * @param tasksInGroomMap
+   * @param possibleLocations
+   * @return
    */
-  public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
-      Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
-    Task t = null;
+  private String getGroomToSchedule(TaskAttemptID taskid,
+      Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap,
+      String[] possibleLocations) {
 
+    for (int i = 0; i < possibleLocations.length; ++i) {
+      String location = possibleLocations[i];
+      GroomServerStatus groom = grooms.get(location);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()
+          && location.equals(groom.getGroomHostName())) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * 
+   * @param grooms
+   * @param tasksInGroomMap
+   * @return
+   */
+  private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+    Iterator<String> groomIter = grooms.keySet().iterator();
+    while (groomIter.hasNext()) {
+      GroomServerStatus groom = grooms.get(groomIter.next());
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * 
+   * @param groomStatus
+   * @param grooms
+   * @return
+   */
+  public Task constructTask(GroomServerStatus groomStatus) {
+    if(groomStatus == null){
+      return null;
+    }
+    TaskAttemptID taskId = computeTaskId();
+    if (taskId == null) {
+      return null;
+    } else {
+      String splitClass = null;
+      BytesWritable split = null;
+      currentTaskId = taskId;
+      String groomName = groomStatus.getGroomHostName();
+      Task t = new BSPTask(jobId, jobFile, taskId, partition, splitClass, split);
+      activeTasks.put(taskId, groomName);
+      myGroomStatus = groomStatus;
+      return t;
+    }
+
+  }
+
+  // /* Remove */
+  // private Task getGroomForTask(TaskAttemptID taskid,
+  // Map<String, GroomServerStatus> grooms,
+  // Map<GroomServerStatus, Integer> tasksInGroomMap) {
+  // String splitClass = null;
+  // BytesWritable split = null;
+  // Task t = null;
+  // if (rawSplit != null) {
+  // splitClass = rawSplit.getClassName();
+  // split = rawSplit.getBytes();
+  // String[] possibleLocations = rawSplit.getLocations();
+  // String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+  // possibleLocations);
+  // if (groomName != null) {
+  // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+  // activeTasks.put(taskid, groomName);
+  // myGroomStatus = grooms.get(groomName);
+  // }
+  // }
+  //
+  // if (t == null) {
+  // String groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+  // if (groomName != null) {
+  // t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+  // activeTasks.put(taskid, groomName);
+  // myGroomStatus = grooms.get(groomName);
+  // }
+  // }
+  //
+  // return t;
+  // }
+
+  private Task getGroomForRecoverTaskInHosts(TaskAttemptID taskid,
+      Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap,
+      String[] possibleLocations) {
+    String splitClass = null;
+    BytesWritable split = null;
+    Task t = null;
+    String groomName = getGroomToSchedule(taskid, grooms, tasksInGroomMap,
+        possibleLocations);
+    if (groomName != null) {
+      t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+      activeTasks.put(taskid, groomName);
+      myGroomStatus = grooms.get(groomName);
+    }
+
+    if (t == null) {
+      groomName = getAnyGroomToSchedule(grooms, tasksInGroomMap);
+      if (groomName != null) {
+        t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+        activeTasks.put(taskid, groomName);
+        myGroomStatus = grooms.get(groomName);
+      }
+    }
+
+    return t;
+  }
+
+  public Task getRecoveryTask(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap, String[] hostNames)
+      throws IOException {
+    Integer count = tasksInGroomMap.get(myGroomStatus);
+    if (count != null) {
+      tasksInGroomMap.put(myGroomStatus, count - 1);
+    }
+
+    TaskAttemptID taskId = computeTaskId();
+    LOG.debug("Recovering task = " + String.valueOf(taskId));
+    if (taskId == null) {
+      return null;
+    } else {
+      return getGroomForRecoverTaskInHosts(taskId, grooms, tasksInGroomMap,
+          hostNames);
+    }
+  }
+
+  /**
+   * 
+   * @return
+   */
+  public boolean canStartTask() {
+    return (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts));
+  }
+
+  private TaskAttemptID computeTaskId() {
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
       int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
@@ -135,54 +306,20 @@
           + " attempts for the tip '" + getTIPId() + "'");
       return null;
     }
-
-    String splitClass = null;
-    BytesWritable split = null;
-    GroomServerStatus selectedGroom = null;
-    if (rawSplit != null) {
-      splitClass = rawSplit.getClassName();
-      split = rawSplit.getBytes();
-      String[] possibleLocations = rawSplit.getLocations();
-      for (int i = 0; i < possibleLocations.length; ++i) {
-        String location = possibleLocations[i];
-        GroomServerStatus groom = grooms.get(location);
-        if (groom == null) {
-          LOG.error("Could not find groom for location: " + location
-              + " ; active grooms: " + grooms.keySet());
-          continue;
-        }
-        Integer taskInGroom = tasksInGroomMap.get(groom);
-        taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-        if (taskInGroom < groom.getMaxTasks()
-            && location.equals(groom.getGroomHostName())) {
-          selectedGroom = groom;
-          t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
-          activeTasks.put(taskid, groom.getGroomName());
-
-          break;
-        }
-      }
-    }
-    // Failed in attempt to get data locality or there was no input split.
-    if (selectedGroom == null) {
-      Iterator<String> groomIter = grooms.keySet().iterator();
-      while (groomIter.hasNext()) {
-        GroomServerStatus groom = grooms.get(groomIter.next());
-        Integer taskInGroom = tasksInGroomMap.get(groom);
-        taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
-        if (taskInGroom < groom.getMaxTasks()) {
-          selectedGroom = groom;
-          t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
-          activeTasks.put(taskid, groom.getGroomName());
-        }
-      }
-    }
-
-    myGroomStatus = selectedGroom;
-
-    return t;
+    return taskid;
   }
 
+  // /** Remove */
+  // public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
+  // Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
+  // TaskAttemptID taskId = computeTaskId();
+  // if (taskId == null) {
+  // return null;
+  // } else {
+  // return getGroomForTask(taskId, grooms, tasksInGroomMap);
+  // }
+  // }
+
   // //////////////////////////////////
   // Accessors
   // //////////////////////////////////
@@ -344,20 +481,6 @@
   }
 
   /**
-   * @param bspMaster the bspMaster to set
-   */
-  public void setBspMaster(BSPMaster bspMaster) {
-    this.bspMaster = bspMaster;
-  }
-
-  /**
-   * @return the bspMaster
-   */
-  public BSPMaster getBspMaster() {
-    return bspMaster;
-  }
-
-  /**
    * Set the event number that was raised for this tip
    */
   public void setSuccessEventNumber(int eventNumber) {
@@ -382,4 +505,22 @@
     return taskStatuses.get(taskid).getGroomServer();
   }
 
+  public int getSuperstep() {
+    return mySuperstep;
+  }
+
+  public void setSuperstep(int mySuperstep) {
+    this.mySuperstep = mySuperstep;
+  }
+
+  // TODO: In future this should be extended to the list of resources that the
+  // task requires.
+  public RawSplit getFileSplit() {
+    return this.rawSplit;
+  }
+  
+  public TaskAttemptID getCurrentTaskAttemptId(){
+    return this.currentTaskId;
+  }
+
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/TaskRunner.java b/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
index 5a50c58..a059ffc 100644
--- a/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
+++ b/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
@@ -122,8 +122,9 @@
 
         int exit_code = bspProcess.waitFor();
         if (!bspKilled && exit_code != 0) {
+          
           throw new IOException("BSP task process exit with nonzero status of "
-              + exit_code + ".");
+              + exit_code + ". command = " + commands);
         }
       } catch (InterruptedException e) {
         LOG.warn("Thread is interrupted when execeuting BSP process.", e);
@@ -223,6 +224,17 @@
       vargs.add(Integer.toString(addr.getPort()));
       vargs.add(task.getTaskID().toString());
       vargs.add(groomServer.groomHostName);
+      vargs.add(Long.toString(groomServer.getStartSuperstep(task.getTaskID())));
+      TaskStatus status = groomServer.getTaskStatus(task.getTaskID());
+      
+      if(status != null && 
+          TaskStatus.State.RECOVERING.equals(status.getRunState())){
+        vargs.add(TaskStatus.State.RECOVERING.name());
+      }
+      else{
+        vargs.add(TaskStatus.State.RUNNING.name());
+      }
+      
     }
     return vargs;
   }
@@ -285,6 +297,7 @@
     if (bspProcess != null) {
       bspProcess.destroy();
     }
+    
   }
 
   /**
diff --git a/core/src/main/java/org/apache/hama/bsp/TaskStatus.java b/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
index 8665b24..131e174 100644
--- a/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
+++ b/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
@@ -37,13 +37,14 @@
 
   // enumeration for reporting current phase of a task.
   public static enum Phase {
-    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP, RECOVERING
   }
 
   // what state is the task in?
   public static enum State {
     RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING,
-    FAILED_UNCLEAN, KILLED_UNCLEAN
+    FAILED_UNCLEAN, KILLED_UNCLEAN, FAULT_NOTIFIED, RECOVERY_SCHEDULING,
+    RECOVERY_SCHEDULED, RECOVERING
   }
 
   private BSPJobID jobId;
diff --git a/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java b/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
new file mode 100644
index 0000000..f2b63bf
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/UpdatePeerAction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ */
+class UpdatePeerAction extends GroomServerAction {
+  TaskAttemptID taskId;
+  TaskAttemptID peerTaskId;
+  Text groomName;
+  
+  public UpdatePeerAction() {
+    super(ActionType.UPDATE_PEER);
+    taskId = new TaskAttemptID();
+    groomName = new Text("");
+  }
+  
+  public UpdatePeerAction(TaskAttemptID taskId, TaskAttemptID peerTaskId, 
+      String groom) {
+    super(ActionType.UPDATE_PEER);
+    this.taskId = taskId;
+    this.peerTaskId = peerTaskId;
+    this.groomName = new Text(groom);
+  }
+
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  public TaskAttemptID getPeerTaskID(){
+    return peerTaskId;
+  }
+  
+  public String getGroomName(){
+    return groomName.toString();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+    peerTaskId.write(out);
+    groomName.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+    peerTaskId.readFields(in);
+    groomName.readFields(in);
+  }
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java b/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
new file mode 100644
index 0000000..dd79d72
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.RecoverTaskAction;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.TaskStatus;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>AsyncRcvdMsgCheckpointImpl</code> Checkpoint service defines the fault
+ * tolerance strategy by checkpointing of messages sent across peers. On
+ * failure, all the tasks are restarted from the last superstep for which all
+ * the peers successfully checkpointed the messages.
+ * 
+ */
+public class AsyncRcvdMsgCheckpointImpl<M extends Writable> implements
+    BSPFaultTolerantService<M> {
+
+  private static final Log LOG = LogFactory
+      .getLog(AsyncRcvdMsgCheckpointImpl.class);
+
+  /**
+   * It is responsible to find the smallest superstep for which the
+   * checkpointing is done and then restart all the peers from that superstep.
+   */
+  private static class CheckpointMasterService implements
+      FaultTolerantMasterService {
+
+    private Configuration conf;
+    private TaskInProgress tasks[];
+    private BSPJobID jobId;
+    private int maxTaskAttempts;
+    private int currentAttemptId;
+    private MasterSyncClient masterSyncClient;
+    private TaskAllocationStrategy allocationStrategy;
+
+    /**
+     * Initializes the fault tolerance service at BSPMasters
+     * 
+     * @param jobId The identifier of the job.
+     * @param maxTaskAttempts Number of attempts allowed for recovering from
+     *          failure.
+     * @param tasks The list of tasks in the job.
+     * @param conf The job configuration object.
+     * @param masterClient The synchronization client used by BSPMaster.
+     * @param allocationStrategy The task allocation strategy of the job.
+     */
+    public void initialize(BSPJobID jobId, int maxTaskAttempts,
+        TaskInProgress[] tasks, Configuration conf,
+        MasterSyncClient masterClient, TaskAllocationStrategy allocationStrategy) {
+      this.tasks = tasks;
+      this.jobId = jobId;
+      this.conf = conf;
+      this.maxTaskAttempts = maxTaskAttempts;
+      this.currentAttemptId = 0;
+      this.masterSyncClient = masterClient;
+      this.allocationStrategy = allocationStrategy;
+    }
+
+    @Override
+    public boolean isRecoveryPossible(TaskInProgress tip) {
+      return currentAttemptId < maxTaskAttempts;
+    }
+
+    @Override
+    public boolean isAlreadyRecovered(TaskInProgress tip) {
+      return currentAttemptId < tip.getCurrentTaskAttemptId().getId();
+    }
+
+    @Override
+    public void recoverTasks(JobInProgress jip,
+        Map<String, GroomServerStatus> groomStatuses,
+        TaskInProgress[] failedTasksInProgress,
+        TaskInProgress[] allTasksInProgress,
+        Map<GroomServerStatus, Integer> taskCountInGroomMap,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+        throws IOException {
+
+      Map<TaskID, TaskInProgress> recoverySet = new HashMap<TaskID, TaskInProgress>(
+          2 * failedTasksInProgress.length);
+      for (int i = 0; i < failedTasksInProgress.length; ++i) {
+        recoverySet.put(failedTasksInProgress[i].getTaskId(),
+            failedTasksInProgress[i]);
+      }
+
+      long lowestSuperstepNumber = Long.MAX_VALUE;
+
+      String[] taskProgress = this.masterSyncClient.getChildKeySet(
+          this.masterSyncClient.constructKey(jobId, "checkpoint"), null);
+
+      if (LOG.isDebugEnabled()) {
+        StringBuffer list = new StringBuffer(25 * taskProgress.length);
+        list.append("got child key set").append(taskProgress.length)
+            .append("/").append(tasks.length).append(" ");
+        for (String entry : taskProgress) {
+          list.append(entry).append(",");
+        }
+        LOG.debug(list);
+      }
+
+      if (taskProgress.length == this.tasks.length) {
+        for (int i = 0; i < taskProgress.length; ++i) {
+          ArrayWritable progressInformation = new ArrayWritable(
+              LongWritable.class);
+          boolean result = this.masterSyncClient.getInformation(
+              this.masterSyncClient.constructKey(jobId, "checkpoint",
+                  taskProgress[i]), progressInformation);
+
+          if (!result) {
+            lowestSuperstepNumber = -1L;
+            break;
+          }
+
+          Writable[] progressArr = progressInformation.get();
+          LongWritable superstepProgress = (LongWritable) progressArr[0];
+
+          if (superstepProgress != null) {
+            if (superstepProgress.get() < lowestSuperstepNumber) {
+              lowestSuperstepNumber = superstepProgress.get();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Got superstep number " + lowestSuperstepNumber
+                    + " from " + taskProgress[i]);
+              }
+            }
+          }
+        }
+        clearClientForSuperstep(lowestSuperstepNumber);
+        restartJob(lowestSuperstepNumber, groomStatuses, recoverySet,
+            allTasksInProgress, taskCountInGroomMap, actionMap);
+
+      } else {
+        restartJob(-1, groomStatuses, recoverySet, allTasksInProgress,
+            taskCountInGroomMap, actionMap);
+      }
+
+      ++currentAttemptId;
+    }
+
+    private void clearClientForSuperstep(long superstep) {
+      this.masterSyncClient.remove(
+          masterSyncClient.constructKey(jobId, "sync"), null);
+    }
+
+    private void populateAction(Task task, long superstep,
+        GroomServerStatus groomStatus,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+      List<GroomServerAction> list = actionMap.get(groomStatus);
+      if (!actionMap.containsKey(groomStatus)) {
+        list = new ArrayList<GroomServerAction>();
+        actionMap.put(groomStatus, list);
+      }
+      list.add(new RecoverTaskAction(task, superstep));
+
+    }
+
+    private void restartTask(TaskInProgress tip, long superstep,
+        Map<String, GroomServerStatus> groomStatuses,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+      GroomServerStatus serverStatus = tip.getGroomServerStatus();
+      Task task = tip.constructTask(serverStatus);
+      populateAction(task, superstep, serverStatus, actionMap);
+
+    }
+
+    private void restartJob(long superstep,
+        Map<String, GroomServerStatus> groomStatuses,
+        Map<TaskID, TaskInProgress> recoveryMap, TaskInProgress[] allTasks,
+        Map<GroomServerStatus, Integer> taskCountInGroomMap,
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+        throws IOException {
+      String path = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+
+      if (superstep >= 0) {
+        FileSystem fileSystem = FileSystem.get(conf);
+        for (int i = 0; i < allTasks.length; ++i) {
+          String[] hosts = null;
+          if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+
+            // Update task count in map.
+            // TODO: This should be a responsibility of GroomServerStatus
+            Integer count = taskCountInGroomMap.get(allTasks[i]
+                .getGroomServerStatus());
+            if (count != null) {
+              count = count.intValue() - 1;
+              taskCountInGroomMap
+                  .put(allTasks[i].getGroomServerStatus(), count);
+            }
+
+            StringBuffer ckptPath = new StringBuffer(path);
+            ckptPath.append(this.jobId.toString());
+            ckptPath.append("/").append(superstep).append("/")
+                .append(allTasks[i].getTaskId().getId());
+            Path checkpointPath = new Path(ckptPath.toString());
+            if (fileSystem.exists(checkpointPath)) {
+              FileStatus fileStatus = fileSystem.getFileStatus(checkpointPath);
+              BlockLocation[] blocks = fileSystem.getFileBlockLocations(
+                  fileStatus, 0, fileStatus.getLen());
+              hosts = blocks[0].getHosts();
+            } else {
+              hosts = new String[groomStatuses.keySet().size()];
+              groomStatuses.keySet().toArray(hosts);
+            }
+            GroomServerStatus serverStatus = this.allocationStrategy
+                .getGroomToAllocate(groomStatuses, hosts, taskCountInGroomMap,
+                    new BSPResource[0], allTasks[i]);
+            Task task = allTasks[i].constructTask(serverStatus);
+            populateAction(task, superstep, serverStatus, actionMap);
+
+          } else {
+            restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+          }
+        }
+      } else {
+        // Start the task from the beginning.
+        for (int i = 0; i < allTasks.length; ++i) {
+          if (recoveryMap.containsKey(allTasks[i].getTaskId())) {
+            this.allocationStrategy.getGroomToAllocate(groomStatuses,
+                this.allocationStrategy.selectGrooms(groomStatuses,
+                    taskCountInGroomMap, new BSPResource[0], allTasks[i]),
+                taskCountInGroomMap, new BSPResource[0], allTasks[i]);
+          } else {
+            restartTask(allTasks[i], superstep, groomStatuses, actionMap);
+          }
+        }
+      }
+    }
+
+  }// end of CheckpointMasterService
+
+  @Override
+  public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspPeer, PeerSyncClient syncClient,
+      InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+      long superstep, Configuration conf, MessageManager<M> messenger)
+      throws Exception {
+    CheckpointPeerService<M> service = new CheckpointPeerService<M>();
+    service.initialize(job, bspPeer, syncClient, peerAddress, taskAttemptId,
+        superstep, conf, messenger);
+    return service;
+  }
+
+  @Override
+  public FaultTolerantMasterService constructMasterFaultTolerance(
+      BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+      Configuration conf, MasterSyncClient masterClient,
+      TaskAllocationStrategy allocationStrategy) throws Exception {
+    CheckpointMasterService service = new CheckpointMasterService();
+    service.initialize(jobId, maxTaskAttempts, tasks, conf, masterClient,
+        allocationStrategy);
+    return service;
+  }
+
+  /**
+   * Initializes the peer fault tolerance by checkpointing service. For
+   * recovery, on peer initialization, it reads all the checkpointed messages to
+   * recover the state of the peer. During normal working, it checkpoints all
+   * the messages it received in the previous superstep. It also stores the
+   * superstep progress in the global synchronization area.
+   * 
+   */
+  public static class CheckpointPeerService<M extends Writable> implements
+      FaultTolerantPeerService<M>, MessageEventListener<M> {
+
+    private BSPJob job;
+    @SuppressWarnings("rawtypes")
+    private BSPPeer peer;
+    private PeerSyncClient syncClient;
+    private long superstep;
+    private Configuration conf;
+    private MessageManager<M> messenger;
+    private FileSystem fs;
+    private int checkPointInterval;
+    volatile private long lastCheckPointStep;
+    volatile private boolean checkpointState;
+    volatile private FSDataOutputStream checkpointStream;
+    volatile private long checkpointMessageCount;
+
+    public void initialize(BSPJob job, @SuppressWarnings("rawtypes")
+    BSPPeer bspPeer, PeerSyncClient syncClient, InetSocketAddress peerAddress,
+        TaskAttemptID taskAttemptId, long superstep, Configuration conf,
+        MessageManager<M> messenger) throws IOException {
+
+      this.job = job;
+      this.peer = bspPeer;
+      this.syncClient = syncClient;
+      this.superstep = superstep;
+      this.conf = conf;
+      this.messenger = messenger;
+      this.fs = FileSystem.get(conf);
+      this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+          Constants.DEFAULT_CHECKPOINT_INTERVAL);
+      this.checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL,
+          Constants.DEFAULT_CHECKPOINT_INTERVAL);
+
+      this.checkpointState = conf.getBoolean(Constants.CHECKPOINT_ENABLED,
+          false);
+
+      if (superstep > 0) {
+        this.lastCheckPointStep = this.superstep;
+      } else {
+        this.lastCheckPointStep = 1;
+      }
+      this.checkpointMessageCount = 0L;
+    }
+
+    private String checkpointPath(long step) {
+      String backup = conf.get("bsp.checkpoint.prefix_path", "checkpoint/");
+      String ckptPath = backup + job.getJobID().toString() + "/" + (step) + "/"
+          + peer.getPeerIndex();
+      if (LOG.isDebugEnabled())
+        LOG.debug("Received Messages are to be saved to " + ckptPath);
+      return ckptPath;
+    }
+
+    @Override
+    public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+        throws Exception {
+      if (this.superstep >= 0 && state.equals(TaskStatus.State.RECOVERING)) {
+        ArrayWritable progressArr = new ArrayWritable(LongWritable.class);
+        boolean result = this.syncClient.getInformation(
+            this.syncClient.constructKey(job.getJobID(), "checkpoint",
+                String.valueOf(peer.getPeerIndex())), progressArr);
+
+        if (!result) {
+          throw new IOException("No data found to restore peer state.");
+        }
+
+        Writable[] progressInfo = progressArr.get();
+        long superstepProgress = ((LongWritable) progressInfo[0]).get();
+        long numMessages = ((LongWritable) progressInfo[1]).get();
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got sstep =" + superstepProgress + " numMessages = "
+              + numMessages + " this.superstep = " + this.superstep);
+        }
+
+        if (numMessages > 0) {
+          Path path = new Path(checkpointPath(superstepProgress));
+          FSDataInputStream in = this.fs.open(path);
+          BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+          try {
+            for (int i = 0; i < numMessages; ++i) {
+              String className = in.readUTF();
+              @SuppressWarnings("unchecked")
+              M message = (M) ReflectionUtils.newInstance(
+                  Class.forName(className), conf);
+              message.readFields(in);
+              bundle.addMessage(message);
+            }
+            messenger.loopBackMessages(bundle);
+          } catch (EOFException e) {
+            LOG.error("Error recovering from checkpointing", e);
+            throw new IOException(e);
+          } finally {
+            this.fs.close();
+          }
+        }
+      }
+      this.messenger.registerListener(this);
+      return TaskStatus.State.RUNNING;
+
+    }
+
+    public final boolean isReadyToCheckpoint() {
+
+      checkPointInterval = conf.getInt(Constants.CHECKPOINT_INTERVAL, 1);
+      LOG.info(new StringBuffer(1000).append("Enabled = ")
+          .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+          .append(" checkPointInterval = ").append(checkPointInterval)
+          .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+          .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+          .toString());
+      if (LOG.isDebugEnabled())
+        LOG.debug(new StringBuffer(1000).append("Enabled = ")
+            .append(conf.getBoolean(Constants.CHECKPOINT_ENABLED, false))
+            .append(" checkPointInterval = ").append(checkPointInterval)
+            .append(" lastCheckPointStep = ").append(lastCheckPointStep)
+            .append(" getSuperstepCount() = ").append(peer.getSuperstepCount())
+            .toString());
+
+      return (conf.getBoolean(Constants.CHECKPOINT_ENABLED, false)
+          && (checkPointInterval != 0) && (((int) ((peer.getSuperstepCount() + 1) - lastCheckPointStep)) >= checkPointInterval));
+
+    }
+
+    @Override
+    public void beforeBarrier() throws Exception {
+    }
+
+    @Override
+    public void duringBarrier() throws Exception {
+    }
+
+    @Override
+    public void afterBarrier() throws Exception {
+
+      synchronized (this) {
+        if (checkpointState) {
+
+          if (checkpointStream != null) {
+            this.checkpointStream.close();
+            this.checkpointStream = null;
+          }
+
+          lastCheckPointStep = peer.getSuperstepCount();
+
+          ArrayWritable writableArray = new ArrayWritable(LongWritable.class);
+          Writable[] writeArr = new Writable[2];
+          writeArr[0] = new LongWritable(lastCheckPointStep);
+          writeArr[1] = new LongWritable(checkpointMessageCount);
+          writableArray.set(writeArr);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Storing lastCheckPointStep = " + lastCheckPointStep
+                + " checkpointMessageCount = " + checkpointMessageCount
+                + " for peer = " + String.valueOf(peer.getPeerIndex()));
+          }
+
+          this.syncClient.storeInformation(this.syncClient.constructKey(
+              this.job.getJobID(), "checkpoint",
+              String.valueOf(peer.getPeerIndex())), writableArray, true, null);
+        }
+        checkpointState = isReadyToCheckpoint();
+        checkpointMessageCount = 0;
+      }
+
+      LOG.info("checkpoingNext = " + checkpointState
+          + " checkpointMessageCount = " + checkpointMessageCount);
+    }
+
+    @Override
+    public void onInitialized() {
+
+    }
+
+    @Override
+    public void onMessageSent(String peerName, M message) {
+    }
+
+    @Override
+    public void onMessageReceived(M message) {
+      String checkpointedPath = null;
+
+      if (message == null) {
+        LOG.error("Message M is found to be null");
+      }
+
+      synchronized (this) {
+        if (checkpointState) {
+          if (this.checkpointStream == null) {
+            checkpointedPath = checkpointPath(peer.getSuperstepCount() + 1);
+            try {
+              LOG.info("Creating path " + checkpointedPath);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Creating path " + checkpointedPath);
+              }
+              checkpointStream = this.fs.create(new Path(checkpointedPath));
+            } catch (IOException ioe) {
+              LOG.error("Fail checkpointing messages to " + checkpointedPath,
+                  ioe);
+              throw new RuntimeException("Failed opening HDFS file "
+                  + checkpointedPath, ioe);
+            }
+          }
+          try {
+            ++checkpointMessageCount;
+            checkpointStream.writeUTF(message.getClass().getCanonicalName());
+            message.write(checkpointStream);
+          } catch (IOException ioe) {
+            LOG.error("Fail checkpointing messages to " + checkpointedPath, ioe);
+            throw new RuntimeException("Failed writing to HDFS file "
+                + checkpointedPath, ioe);
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("message count = " + checkpointMessageCount);
+          }
+        }
+      }
+
+    }
+
+    @Override
+    public void onClose() {
+
+    }
+
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java b/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
new file mode 100644
index 0000000..967dcd3
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskInProgress;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+/**
+ * <code>BSPFaultTolerantService</code> defines the fault tolerance service
+ * behavior. The fault tolerance service is a feature of a running job and not
+ * the system. A class defined on this behavior has the responsibility to create
+ * two objects. The first object <code>FaultTolerantMasterService</code> is
+ * used by the job at BSPMaster to handle fault tolerance related steps at the
+ * master. The second object <code>FaultTolerantPeerService</code> is used to
+ * define the behavior of object that would implement the fault tolerance
+ * related steps for recovery inside <code>BSPPeer</code> (in each of the BSP
+ * peers doing computations)
+ */
+public interface BSPFaultTolerantService<M extends Writable> {
+  
+  /**
+   * The token by which a job can register its fault-tolerance service.
+   */
+  public static final String FT_SERVICE_CONF = "hama.ft.conf.class";
+
+  /**
+   * Creates the instance of <code>FaultTolerantMasterService</code> that would
+   * handle fault-tolerance related steps at BSPMaster task scheduler.
+   * 
+   * @param jobId The identifier of the job.
+   * @param maxTaskAttempts Number of attempts allowed for recovering from
+   *          failure.
+   * @param tasks The list of tasks in the job.
+   * @param conf The job configuration object.
+   * @param masterClient The synchronization client used by BSPMaster.
+   * @param allocationStrategy The task allocation strategy of the job.
+   * @return An instance of class inheriting
+   *         <code>FaultTolerantMasterService</code>
+   */
+  public FaultTolerantMasterService constructMasterFaultTolerance(
+      BSPJobID jobId, int maxTaskAttempts, TaskInProgress[] tasks,
+      Configuration conf, MasterSyncClient masterClient,
+      TaskAllocationStrategy allocationStrategy) throws Exception;
+
+  /**
+   * Creates an instance of <code>FaultTolerantPeerService</code> which defines
+   * the steps that has to be taken inside a peer for fault-tolerance.
+   * 
+   * @param bspPeer The peer
+   * @param syncClient The synchronization client used by peer.
+   * @param superstep The superstep from which the peer is initialized.
+   * @param conf job configuration object
+   * @param messenger The messaging system between the peers
+   * @return An instance of class inheriting
+   *         <code>FaultTolerantPeerService</code>
+   */
+  public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspPeer, PeerSyncClient syncClient,
+      InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
+      long superstep, Configuration conf, MessageManager<M> messenger)
+      throws Exception;
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
new file mode 100644
index 0000000..43bfd60
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantMasterService.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hama.bsp.GroomServerAction;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobInProgress;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>FaultTolerantMasterService</code> defines the behavior of object
+ * responsible for doing fault-tolerance related work on BSPMaster task
+ * scheduler. This is defined per job.
+ */
+public interface FaultTolerantMasterService {
+
+  /**
+   * Returns true if recovery of the task in question is possible.
+   * 
+   * @param tip <code>TaskInProgress</code> object that represents the task.
+   * @return true if recovery is possible.
+   */
+  public boolean isRecoveryPossible(TaskInProgress tip);
+
+  /**
+   * Returns true if the task is already slated to be recovered for failure.
+   * 
+   * @param tip <code>TaskInProgress</code> object that represents the task.
+   * @return if task/job is already in process of recovery.
+   */
+  public boolean isAlreadyRecovered(TaskInProgress tip);
+
+  /**
+   * From the list of tasks that are failed, provide the task scheduler a set of
+   * actions and the grooms to which these actions must be sent for fault
+   * recovery.
+   * 
+   * @param jip The job in question which has to be recovered.
+   * @param groomStatuses The map of grooms to their statuses.
+   * @param failedTasksInProgress The list of failed tasks.
+   * @param allTasksInProgress This list of all tasks in the job.
+   * @param actionMap The map of groom to the list of actions that are to be
+   *          taken on that groom.
+   */
+  public void recoverTasks(JobInProgress jip,
+      Map<String, GroomServerStatus> groomStatuses,
+      TaskInProgress[] failedTasksInProgress,
+      TaskInProgress[] allTasksInProgress,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap)
+      throws IOException;
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
new file mode 100644
index 0000000..3a34d73
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/ft/FaultTolerantPeerService.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.ft;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskStatus;
+
+/**
+ * <code>FaultTolerantPeerService</code> defines the steps required to be
+ * performed by peers for fault-tolerance. At different stages of peer
+ * execution, the service can take necessary measures to ensure that the peer
+ * computations could be recovered if any of them failed.
+ */
+public interface FaultTolerantPeerService<M extends Writable> {
+
+  /**
+   * This is called once the peer is initialized.
+   * 
+   * @throws Exception
+   */
+  public TaskStatus.State onPeerInitialized(TaskStatus.State state)
+      throws Exception;
+
+  /**
+   * This function is called before all the peers go into global sync/
+   * 
+   * @throws Exception
+   */
+  public void beforeBarrier() throws Exception;
+
+  /**
+   * This functions is called after the peers enter the barrier but before they
+   * initate leaving the barrier.
+   * 
+   * @throws Exception
+   */
+  public void duringBarrier() throws Exception;
+
+  /**
+   * This function is called every time the peer completes the global
+   * synchronization.
+   * 
+   * @throws Exception
+   */
+  public void afterBarrier() throws Exception;
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
index bb96efe..b0894cb 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
@@ -22,7 +22,9 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map.Entry;
+import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
@@ -61,6 +64,9 @@
   // the task attempt id
   protected TaskAttemptID attemptId;
 
+  // List of listeners for all the sent messages
+  protected Queue<MessageEventListener<M>> messageListenerQueue;
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -70,12 +76,14 @@
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
+    this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
     this.attemptId = attemptId;
     this.peer = peer;
     this.conf = conf;
     this.peerAddress = peerAddress;
     localQueue = getQueue();
     localQueueForNextIteration = getSynchronizedQueue();
+    
   }
 
   /*
@@ -84,18 +92,22 @@
    */
   @Override
   public void close() {
-    Collection<MessageQueue<M>> values = outgoingQueues.values();
-    for (MessageQueue<M> msgQueue : values) {
-      msgQueue.close();
-    }
-    localQueue.close();
-    // remove possible disk queues from the path
     try {
-      FileSystem.get(conf).delete(
-          DiskQueue.getQueueDir(conf, attemptId,
-              conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
-    } catch (IOException e) {
-      LOG.warn("Queue dir couldn't be deleted");
+      Collection<MessageQueue<M>> values = outgoingQueues.values();
+      for (MessageQueue<M> msgQueue : values) {
+        msgQueue.close();
+      }
+      localQueue.close();
+      // remove possible disk queues from the path
+      try {
+        FileSystem.get(conf).delete(
+            DiskQueue.getQueueDir(conf, attemptId,
+                conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+      } catch (IOException e) {
+        LOG.warn("Queue dir couldn't be deleted");
+      }
+    } finally {
+      notifyClose();
     }
 
   }
@@ -139,6 +151,7 @@
     localQueue = localQueueForNextIteration.getMessageQueue();
     localQueue.prepareRead();
     localQueueForNextIteration = getSynchronizedQueue();
+    notifyInit();
   }
 
   /*
@@ -163,6 +176,7 @@
     queue.add(msg);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
     outgoingQueues.put(targetPeerAddress, queue);
+    notifySentMessage(peerName, msg);
   }
 
   /*
@@ -206,4 +220,68 @@
     this.conf = conf;
   }
 
+  private void notifySentMessage(String peerName, M message) {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onMessageSent(peerName, message);
+    }
+  }
+
+  private void notifyReceivedMessage(M message) throws IOException {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onMessageReceived(message);
+    }
+  }
+
+  private void notifyInit() {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onInitialized();
+    }
+  }
+
+  private void notifyClose() {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onClose();
+    }
+  }
+
+  
+
+  @Override
+  public void registerListener(MessageEventListener<M> listener)
+      throws IOException {
+    if(listener != null)
+      this.messageListenerQueue.add(listener);
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException{
+    for (Writable message : bundle.getMessages()) {
+      loopBackMessage((M)message);
+    }
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void loopBackMessage(Writable message) throws IOException{
+    this.localQueueForNextIteration.add((M)message);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+    notifyReceivedMessage((M)message);
+    
+  }
+  
+  
+  
+  
+
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
index 43103d7..08d4ddb 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
@@ -25,7 +25,6 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.Iterator;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -61,14 +60,8 @@
     server.close();
   }
 
-  public void put(BSPMessageBundle<M> messages) {
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-        messages.getMessages().size());
-    Iterator<M> iterator = messages.getMessages().iterator();
-    while (iterator.hasNext()) {
-      this.localQueueForNextIteration.add(iterator.next());
-      iterator.remove();
-    }
+  public void put(BSPMessageBundle<M> messages) throws IOException {
+    this.loopBackMessages(messages);
   }
 
   @SuppressWarnings("unchecked")
@@ -139,5 +132,4 @@
       return ByteBuffer.wrap(data);
     }
   }
-
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
index 48ff60d..8adc605 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.message;
 
+import java.io.IOException;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
@@ -35,7 +37,7 @@
    * 
    * @param msg
    */
-  public void put(M msg);
+  public void put(M msg) throws IOException;
 
   /**
    * This method puts a messagebundle for the next iteration. Accessed
@@ -43,7 +45,7 @@
    * 
    * @param messages
    */
-  public void put(BSPMessageBundle<M> messages);
+  public void put(BSPMessageBundle<M> messages) throws IOException;
 
   /**
    * This method puts a compressed message bundle for the next iteration.
@@ -51,6 +53,6 @@
    * 
    * @param compMsgBundle
    */
-  public void put(BSPCompressedBundle compMsgBundle);
+  public void put(BSPCompressedBundle compMsgBundle) throws IOException;
 
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
index 20aa2e4..259a51d 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
@@ -29,7 +29,6 @@
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -115,24 +114,19 @@
   }
 
   @Override
-  public final void put(M msg) {
-    this.localQueueForNextIteration.add(msg);
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+  public final void put(M msg) throws IOException {
+    loopBackMessage(msg);
   }
 
   @Override
-  public final void put(BSPMessageBundle<M> messages) {
-    for (M message : messages.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
+  public final void put(BSPMessageBundle<M> messages) throws IOException {
+    loopBackMessages(messages);
   }
 
   @Override
-  public final void put(BSPCompressedBundle compMsgBundle) {
+  public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
     BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
-    for (M message : bundle.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
+    loopBackMessages(bundle);
   }
 
   @Override
@@ -141,4 +135,5 @@
     return versionID;
   }
 
+  
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java b/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
new file mode 100644
index 0000000..204b6ab
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/message/MessageEventListener.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+public interface MessageEventListener<M> {
+
+  /**
+   * 
+   *
+   */
+  public static enum MessageManagerEvent {
+    INITIALIZED, MESSAGE_SENT, MESSAGE_RECEIVED, CLOSE
+  }
+
+  /**
+   * The function to handle the event when the queue is initialized.
+   */
+  void onInitialized();
+
+  /**
+   * The function to handle the event when a message is sent.
+   * <code>message</code> should not be modified.
+   * 
+   * @param peerName Name of the peer to be sent.
+   * @param message The message set.
+   */
+  void onMessageSent(String peerName, final M message);
+
+  /**
+   * The function to handle the event when a message is received.
+   * <code>message</code> should not be modified.
+   * 
+   * @param message The message received.
+   */
+  void onMessageReceived(final M message);
+
+  /**
+   * The function to handle the event when the queue is closed.
+   */
+  void onClose();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
index 6662a52..c15b36d 100644
--- a/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
+++ b/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
@@ -34,7 +34,7 @@
  * 
  */
 public interface MessageManager<M extends Writable> {
-  
+
   public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
 
   /**
@@ -96,4 +96,25 @@
    */
   public int getNumCurrentMessages();
 
+  /**
+   * Send the messages to self to receive in the next superstep.
+   */
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException;
+  
+  /**
+   * Send the message to self to receive in the next superstep.
+   */
+  public void loopBackMessage(Writable message) throws IOException;
+
+  /**
+   * Register a listener for the events in message manager.
+   * 
+   * @param listener <code>MessageEventListener</code> object that processes the
+   *          messages sent to remote peer.
+   * @throws IOException
+   */
+  public void registerListener(MessageEventListener<M> listener)
+      throws IOException;
+  
+
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
new file mode 100644
index 0000000..28b4c3d
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hama.HamaConfiguration;
+
+public abstract class BSPMasterSyncClient implements MasterSyncClient{
+
+  /**
+   * Initialize the Synchronization client.
+   * 
+   * @param conf The configuration parameters to initialize the client.
+   */
+  public abstract void init(HamaConfiguration conf);
+  
+  /**
+   * Clears all information stored.
+   */
+  public abstract void clear();
+  
+  /**
+   * Register a newly added job 
+   * @param string
+   */
+  public abstract void registerJob(String string);
+
+  /**
+   * Deregister the job from the system.
+   * @param string
+   */
+  public abstract void deregisterJob(String string);
+    
+  /**
+   * Closes the client.
+   */
+  public abstract void close();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
new file mode 100644
index 0000000..e391e11
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+public abstract class BSPPeerSyncClient implements PeerSyncClient{
+
+  /**
+   * Init will be called within a spawned task, it should be used to initialize
+   * the inner structure and fields, e.G. a zookeeper client or an rpc
+   * connection to the real sync daemon.
+   * 
+   * @throws Exception
+   */
+  public abstract void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception;
+
+  /**
+   * Enters the barrier before the message sending in each superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public abstract void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Leaves the barrier after all communication has been done, this is usually
+   * the end of a superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public abstract void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Registers a specific task with a its host and port to the sync daemon.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param hostAddress the host where the sync server resides
+   * @param port the port where the sync server is up
+   */
+  public abstract void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * Returns all registered tasks within the sync daemon. They have to be
+   * ordered ascending by their task id.
+   * 
+   * @param taskId the tasks ID
+   * @return an <b>ordered</b> string array of host:port pairs of all tasks
+   *         connected to the daemon.
+   */
+  public abstract String[] getAllPeerNames(TaskAttemptID taskId);
+
+  /**
+   * TODO this has currently no use. Could later be used to deregister tasks
+   * from the barrier during runtime if they are finished. Something equal to
+   * voteToHalt() in Pregel.
+   * 
+   * @param jobId
+   * @param taskId
+   * @param hostAddress
+   * @param port
+   */
+  public abstract void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * This stops the sync daemon. Only used in YARN.
+   */
+  public abstract void stopServer();
+
+  /**
+   * This method should close all used resources, e.G. a ZooKeeper instance.
+   * 
+   * @throws InterruptedException
+   */
+  public abstract void close() throws IOException;
+
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
new file mode 100644
index 0000000..2d2a61d
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * MasterSyncClient defines the behavior that BSPMaster should follow
+ * to perform different required globally synchronized state changes. 
+ *
+ */
+public interface MasterSyncClient extends SyncClient{
+
+  /**
+   * Initialize the Synchronization client.
+   * 
+   * @param conf The configuration parameters to initialize the client.
+   */
+  public void init(HamaConfiguration conf);
+  
+  /**
+   * Clears all information stored.
+   */
+  public void clear();
+  
+  /**
+   * Register a newly added job 
+   * @param string
+   */
+  public void registerJob(String string);
+
+  /**
+   * Deregister the job from the system.
+   * @param string
+   */
+  public void deregisterJob(String string);
+    
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
new file mode 100644
index 0000000..19857e5
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * PeerSyncClient defines the behavior that a BSPPeer performs to maintain
+ * synchronized global state as it progresses. 
+ */
+
+public interface PeerSyncClient extends SyncClient{
+
+  /**
+   * Init will be called within a spawned task, it should be used to initialize
+   * the inner structure and fields, e.G. a zookeeper client or an rpc
+   * connection to the real sync daemon.
+   * 
+   * @throws Exception
+   */
+  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception;
+
+  /**
+   * Enters the barrier before the message sending in each superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Leaves the barrier after all communication has been done, this is usually
+   * the end of a superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Registers a specific task with a its host and port to the sync daemon.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param hostAddress the host where the sync server resides
+   * @param port the port where the sync server is up
+   */
+  public void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * Returns all registered tasks within the sync daemon. They have to be
+   * ordered ascending by their task id.
+   * 
+   * @param taskId the tasks ID
+   * @return an <b>ordered</b> string array of host:port pairs of all tasks
+   *         connected to the daemon.
+   */
+  public String[] getAllPeerNames(TaskAttemptID taskId);
+
+  /**
+   * TODO this has currently no use. Could later be used to deregister tasks
+   * from the barrier during runtime if they are finished. Something equal to
+   * voteToHalt() in Pregel.
+   * 
+   * @param jobId
+   * @param taskId
+   * @param hostAddress
+   * @param port
+   */
+  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * This stops the sync daemon. Only used in YARN.
+   */
+  public void stopServer();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
index 9564951..e75da95 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
@@ -17,9 +17,10 @@
  */
 package org.apache.hama.bsp.sync;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPJobID;
-import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Basic interface for a client that connects to a sync server.
@@ -28,82 +29,85 @@
 public interface SyncClient {
 
   /**
-   * Init will be called within a spawned task, it should be used to initialize
-   * the inner structure and fields, e.G. a zookeeper client or an rpc
-   * connection to the real sync daemon.
-   * 
-   * @throws Exception
+   * Construct key in the format required by the SyncClient for storing and 
+   * retrieving information. This function is recommended to use to construct
+   * keys for storing keys.
+   * @param jobId The BSP Job Id.
+   * @param args The list of String objects that would be used to construct key
+   * @return The key consisting of entities provided in the required format.
    */
-  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
-      throws Exception;
+  public String constructKey(BSPJobID jobId, String ... args);
 
   /**
-   * Enters the barrier before the message sending in each superstep.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param superstep the superstep of the task
-   * @throws SyncException
+   * Stores value for the specified key.
+   * @param key The key for which value should be stored. It is recommended to use 
+   * <code>constructKey</code> to create key object.
+   * @param value The value to be stored.
+   * @param permanent true if the value should be persisted after end of session.
+   * @param Listener object that provides asynchronous updates on the state 
+   * of information stored under the key.
+   * @return true if the operation was successful.
    */
-  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public boolean storeInformation(String key, Writable value, 
+      boolean permanent, SyncEventListener listener);
 
   /**
-   * Leaves the barrier after all communication has been done, this is usually
-   * the end of a superstep.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param superstep the superstep of the task
-   * @throws SyncException
+   * Retrieve value previously store for the key.
+   * @param key The key for which value was stored.
+   * @param classType The expected class instance of value to be extracted
+   * @return the value if found. Returns null if there was any error of if there
+   * was no value stored for the key.
    */
-  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public boolean getInformation(String key, Writable valueHolder);
 
   /**
-   * Registers a specific task with a its host and port to the sync daemon.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param hostAddress the host where the sync server resides
-   * @param port the port where the sync server is up
+   * Store new key in key set.
+   * @param key The key to be saved in key set. It is recommended to use 
+   * <code>constructKey</code> to create key object. 
+   * @param permanent true if the value should be persisted after end of session.
+   * @param listener Listener object that asynchronously notifies the events 
+   * related to the key.
+   * @return true if operation was successful.
    */
-  public void register(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
+  public boolean addKey(String key, boolean permanent, SyncEventListener listener);
 
   /**
-   * Returns all registered tasks within the sync daemon. They have to be
-   * ordered ascending by their task id.
-   * 
-   * @param taskId the tasks ID
-   * @return an <b>ordered</b> string array of host:port pairs of all tasks
-   *         connected to the daemon.
+   * Check if key was previously stored.
+   * @param key The value of the key. 
+   * @return true if the key exists.
    */
-  public String[] getAllPeerNames(TaskAttemptID taskId);
+  public boolean hasKey(String key);
+  
+  /**
+  * Get list of child keys stored under the key provided.
+  * @param key The key whose child key set are to be found.
+  * @param listener Listener object that asynchronously notifies the changes 
+  * under the provided key
+  * @return Array of child keys.
+  */
+  public String[] getChildKeySet(String key, SyncEventListener listener);
 
   /**
-   * TODO this has currently no use. Could later be used to deregister tasks
-   * from the barrier during runtime if they are finished. Something equal to
-   * voteToHalt() in Pregel.
-   * 
-   * @param jobId
-   * @param taskId
-   * @param hostAddress
-   * @param port
+   * Register a listener for events on the key.
+   * @param key The key on which an event listener should be registered.
+   * @param event for which the listener is registered for.
+   * @param listener The event listener that defines how to process the event.
+   * @return true if the operation is successful.
    */
-  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
+  public boolean registerListener(String key, SyncEvent event,
+      SyncEventListener listener);
 
   /**
-   * This stops the sync daemon. Only used in YARN.
+   * Delete the key and the information stored under it.
+   * @param key
+   * @param listener
+   * @return
    */
-  public void stopServer();
-
+  public boolean remove(String key, SyncEventListener listener);
+  
   /**
-   * This method should close all used resources, e.G. a ZooKeeper instance.
    * 
-   * @throws InterruptedException
    */
-  public void close() throws InterruptedException;
-
+  public void close() throws IOException;
+  
 }
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
new file mode 100644
index 0000000..bf459d6
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * A distributed global synchronization event.   
+ */
+public interface SyncEvent {
+  
+  /**
+   * Returns the event identifier in the scheme of events defined for the
+   * global synchronization service.
+   * @return the event identifier 
+   */
+  public int getEventId();
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
new file mode 100644
index 0000000..e53c9e9
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * This class is used to define a listener to the synchronized global event.
+ *
+ */
+public abstract class SyncEventListener {
+  
+  /**
+   * Every event is identified by an event identifier. You can refer to 
+   * <code>SyncEvent</code> class.
+   * @param eventId The event identification code.
+   */
+  public abstract void handleEvent(int eventId);
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java b/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
index 869402d..436f36f 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
@@ -22,18 +22,27 @@
 
 public class SyncServiceFactory {
   public static final String SYNC_SERVER_CLASS = "hama.sync.server.class";
-  public static final String SYNC_CLIENT_CLASS = "hama.sync.client.class";
+  public static final String SYNC_PEER_CLASS = "hama.sync.peer.class";
+  public static final String SYNC_MASTER_CLASS = "hama.sync.master.class";
 
   /**
    * Returns a sync client via reflection based on what was configured.
    */
-  public static SyncClient getSyncClient(Configuration conf)
+  public static PeerSyncClient getPeerSyncClient(Configuration conf)
       throws ClassNotFoundException {
-    return (SyncClient) ReflectionUtils
-        .newInstance(conf.getClassByName(conf.get(SYNC_CLIENT_CLASS,
+    return (PeerSyncClient) ReflectionUtils
+        .newInstance(conf.getClassByName(conf.get(SYNC_PEER_CLASS,
             ZooKeeperSyncClientImpl.class.getName())), conf);
   }
 
+  
+  public static SyncClient getMasterSyncClient(Configuration conf)
+		  throws ClassNotFoundException {
+	  return (SyncClient) ReflectionUtils
+			  .newInstance(conf.getClassByName(conf.get(SYNC_MASTER_CLASS,
+					  ZKSyncBSPMasterClient.class.getName())), conf);
+  }
+
   /**
    * Returns a sync server via reflection based on what was configured.
    */
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
new file mode 100644
index 0000000..7cfb122
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Zookeeper sunchronization client that is used by BSPMaster to maintain global
+ * state of cluster.
+ */
+public class ZKSyncBSPMasterClient extends ZKSyncClient implements
+    MasterSyncClient {
+
+  private ZooKeeper zk = null;
+  private String bspRoot = null;
+
+  Log LOG = LogFactory.getLog(ZKSyncBSPMasterClient.class);
+
+  @Override
+  public void init(HamaConfiguration conf) {
+    try {
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
+          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+    } catch (IOException e) {
+      LOG.error("Exception during reinitialization!", e);
+    }
+
+    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+    LOG.info("Initialized ZK " + (null == zk));
+    Stat s = null;
+    if (zk != null) {
+      initialize(zk, bspRoot);
+      try {
+        s = zk.exists(bspRoot, false);
+      } catch (Exception e) {
+        LOG.error(s, e);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      } else {
+        this.clearZKNodes();
+      }
+    }
+
+  }
+
+  private void createJobRoot(String string) {
+    writeNode(string, null, true, null);
+  }
+
+  @Override
+  public void clear() {
+    clearZKNodes();
+  }
+
+  @Override
+  public void registerJob(String string) {
+    // TODO: delete job root if key is present.
+    createJobRoot(string);
+  }
+
+  @Override
+  public void deregisterJob(String string) {
+    try {
+      clearZKNodes(bspRoot + "/" + string);
+      this.zk.delete(bspRoot + "/" + string, -1);
+    } catch (KeeperException e) {
+      LOG.error("Error deleting job " + string);
+    } catch (InterruptedException e) {
+      LOG.error("Error deleting job " + string);
+    }
+
+  }
+
+  @Override
+  public void close() {
+    try {
+      this.zk.close();
+    } catch (InterruptedException e) {
+      LOG.error("Error closing sync client", e);
+    }
+
+  }
+
+  @Override
+  public void process(WatchedEvent arg0) {
+    LOG.debug("Processing event " + arg0.getPath());
+    LOG.debug("Processing event type " + arg0.getType().toString());
+
+  }
+
+  public ZooKeeper getZK() {
+    return this.zk;
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
new file mode 100644
index 0000000..4f9c96a
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * A Zookeeper based BSP distributed synchronization client that provides
+ * primitive synchronization API's
+ * 
+ */
+public abstract class ZKSyncClient implements SyncClient, Watcher {
+
+  Log LOG = LogFactory.getLog(ZKSyncClient.class);
+
+  private ZooKeeper zk;
+  private String bspRoot;
+
+  protected Map<String, List<ZKSyncEventListener>> eventListenerMap;
+
+  public ZKSyncClient() {
+    eventListenerMap = new HashMap<String, List<ZKSyncEventListener>>(10);
+  }
+
+  /**
+   * Initializes the zookeeper client-base.
+   */
+  protected void initialize(ZooKeeper zookeeper, String root) {
+    LOG.info("Initializing ZK Sync Client");
+    zk = zookeeper;
+    bspRoot = root;
+    eventListenerMap = new HashMap<String, List<ZKSyncEventListener>>(10);
+  }
+
+  /**
+   * Returns the Node name using conventions to address a superstep progress for
+   * task
+   * 
+   * @param taskId The Task ID
+   * @param superstep The superstep number
+   * @return String that represents the Zookeeper node path that could be
+   *         created.
+   */
+  protected String getNodeName(TaskAttemptID taskId, long superstep) {
+    return constructKey(taskId.getJobID(), "sync", "" + superstep,
+        taskId.toString());
+    //
+    // bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+    // + taskId.toString();
+  }
+
+  private String correctKey(String key) {
+    if (!key.startsWith("/")) {
+      key = "/" + key;
+    }
+    return key;
+  }
+
+  /**
+   * Check if the zookeeper node exists.
+   * 
+   * @param path The Zookeeper node path to check.
+   * @param watcher A Watcher that would trigger interested events on the node.
+   *          This value could be null if no watcher has to be left.
+   * @return true if the node exists.
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  protected boolean isExists(final String path, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    synchronized (zk) {
+      return !(null == zk.exists(path, false));
+    }
+  }
+
+  /**
+   * Returns the zookeeper stat object.
+   * 
+   * @param path The path of the expected Zookeeper node.
+   * @return Stat object for the Zookeeper path.
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  protected Stat getStat(final String path) throws KeeperException,
+      InterruptedException {
+    synchronized (zk) {
+      return zk.exists(path, false);
+    }
+  }
+
+  private void createZnode(final String path, final CreateMode mode,
+      byte[] data, Watcher watcher) throws KeeperException,
+      InterruptedException {
+
+    synchronized (zk) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Checking node " + path);
+      }
+      Stat s = zk.exists(path, false);
+      if (null == s) {
+        try {
+          zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Created node " + path);
+          }
+
+        } catch (KeeperException.NodeExistsException nee) {
+          LOG.debug("Ignore because znode may be already created at " + path,
+              nee);
+        }
+      }
+    }
+  }
+
+  /**
+   * Utility function to get byte array out of Writable
+   * 
+   * @param value The Writable object to be converted to byte array.
+   * @return byte array from the Writable object. Returns null on given null
+   *         value or on error.
+   */
+  protected byte[] getBytesForData(Writable value) {
+    byte[] data = null;
+
+    if (value != null) {
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      DataOutputStream outputStream = new DataOutputStream(byteStream);
+      try {
+        value.write(outputStream);
+        outputStream.flush();
+        data = byteStream.toByteArray();
+      } catch (IOException e) {
+        LOG.error("Error writing data to write buffer.", e);
+      } finally {
+        try {
+          byteStream.close();
+          outputStream.close();
+        } catch (IOException e) {
+          LOG.error("Error closing byte stream.", e);
+        }
+      }
+    }
+    return data;
+  }
+
+  /**
+   * Utility function to read Writable object value from byte array.
+   * 
+   * @param data The byte array
+   * @param classType The Class object of expected Writable object.
+   * @return The instance of Writable object.
+   * @throws IOException
+   */
+  protected boolean getValueFromBytes(byte[] data,
+      Writable valueHolder) throws IOException {
+    if (data != null) {
+      ByteArrayInputStream istream = new ByteArrayInputStream(data);
+      DataInputStream diStream = new DataInputStream(istream);
+      try {
+        valueHolder.readFields(diStream);
+      } finally {
+        diStream.close();
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Read value stored in the Zookeeper node.
+   * 
+   * @param path The path of the Zookeeper node.
+   * @param classType The expected class type of the Writable object.
+   * @return The Writable object constructed from the value read from the
+   *         Zookeeper node.
+   */
+  protected boolean extractData(String path,
+      Writable valueHolder) {
+    try {
+      Stat stat = getStat(path);
+      if (stat != null) {
+        byte[] data = this.zk.getData(path, false, stat);
+        try {
+          getValueFromBytes(data, valueHolder);
+        } catch (IOException e) {
+          LOG.error(
+              new StringBuffer(200).append("Error getting data from path ")
+                  .append(path).toString(), e);
+          return false;
+        }
+        return true;
+      }
+
+    } catch (KeeperException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+
+    } catch (InterruptedException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+
+    }
+    return false;
+
+  }
+
+  /**
+   * Writes data into the Zookeeper node. If the path does not exist the
+   * zookeeper node is created recursively and the value is stored in the node.
+   * 
+   * @param path The path of the Zookeeper node.
+   * @param value The value to be stored in the Zookeeper node.
+   * @param persistent true if the node to be created is ephemeral of permanent
+   * @param watcher If any Watcher object should listen to event on the
+   *          Zookeeper node.
+   * @return true if operation is successful.
+   */
+  protected boolean writeNode(String path, Writable value, boolean persistent,
+      Watcher watcher) {
+    if (path == null || "".equals(path.trim())) {
+      return false;
+    }
+    path = correctKey(path);
+    boolean pathExists = false;
+    try {
+      pathExists = isExists(path, watcher);
+    } catch (KeeperException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+    } catch (InterruptedException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+    }
+
+    byte[] data = getBytesForData(value);
+
+    if (!pathExists) {
+      try {
+        String[] pathComponents = path.split("/");
+        StringBuffer pathBuffer = new StringBuffer(path.length()
+            + pathComponents.length);
+        for (int i = 0; i < pathComponents.length - 1; ++i) {
+          if (pathComponents[i].equals(""))
+            continue;
+          pathBuffer.append("/").append(pathComponents[i]);
+          createZnode(pathBuffer.toString(), CreateMode.PERSISTENT, null,
+              watcher);
+        }
+        pathBuffer.append("/")
+            .append(pathComponents[pathComponents.length - 1]);
+        CreateMode mode = CreateMode.EPHEMERAL;
+        if (persistent) {
+          mode = CreateMode.PERSISTENT;
+        }
+        createZnode(pathBuffer.toString(), mode, data, watcher);
+
+        return true;
+      } catch (InterruptedException e) {
+        LOG.error(new StringBuilder(200).append("Error creating zk path ")
+            .append(path).toString(), e);
+      } catch (KeeperException e) {
+        LOG.error(new StringBuilder(200).append("Error creating zk path ")
+            .append(path).toString(), e);
+      }
+    } else if (value != null) {
+      try {
+        this.zk.setData(path, data, -1);
+        return true;
+      } catch (InterruptedException e) {
+        LOG.error(new StringBuilder(200).append("Error modifying zk path ")
+            .append(path).toString(), e);
+        return false;
+      } catch (KeeperException e) {
+        LOG.error(new StringBuilder(200).append("Error modifying zk path ")
+            .append(path).toString(), e);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String constructKey(BSPJobID jobId, String... args) {
+    StringBuffer keyBuffer = new StringBuffer(100);
+    keyBuffer.append(bspRoot);
+    if (jobId != null)
+      keyBuffer.append("/").append(jobId.toString());
+    for (String arg : args) {
+      keyBuffer.append("/").append(arg);
+    }
+    return keyBuffer.toString();
+  }
+
+  @Override
+  public boolean storeInformation(String key, Writable value,
+      boolean permanent, SyncEventListener listener) {
+    ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+    key = correctKey(key);
+    final String path = key;
+    LOG.info("Writing data " + path);
+    return writeNode(path, value, permanent, zkListener);
+  }
+
+  @Override
+  public boolean getInformation(String key, Writable valueHolder) {
+    key = correctKey(key);
+    final String path = key;
+    return extractData(path, valueHolder);
+  }
+
+  @Override
+  public boolean addKey(String key, boolean permanent,
+      SyncEventListener listener) {
+    ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+    return writeNode(key, null, permanent, zkListener);
+  }
+
+  @Override
+  public boolean hasKey(String key) {
+    try {
+      return isExists(key, null);
+    } catch (KeeperException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(key).toString(), e);
+    } catch (InterruptedException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(key).toString(), e);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean registerListener(String key, SyncEvent event,
+      SyncEventListener listener) {
+    key = correctKey(key);
+
+    LOG.debug("Registering listener for " + key);
+    ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+    zkListener.setSyncEvent(event);
+    zkListener.setZKSyncClient(this);
+    synchronized (this.zk) {
+
+      try {
+        Stat stat = this.zk.exists(key, zkListener);
+        if (stat == null) {
+          writeNode(key, null, true, zkListener);
+        }
+        this.zk.getData(key, zkListener, stat);
+        this.zk.getChildren(key, zkListener);
+        // List<ZKSyncEventListener> list = this.eventListenerMap.get(key);
+        // if(!eventListenerMap.containsKey(key)){
+        // list = new ArrayList<ZKSyncEventListener>(5);
+        // }
+        // list.add(zkListener);
+        // this.eventListenerMap.put(key, list);
+        return true;
+      } catch (KeeperException e) {
+        LOG.error("Error getting stat and data.", e);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted getting stat and data.", e);
+      }
+
+    }
+    return false;
+  }
+
+  @Override
+  public String[] getChildKeySet(String key, SyncEventListener listener) {
+    key = correctKey(key);
+    ZKSyncEventListener zkListener = null;
+    if (listener != null) {
+      zkListener = (ZKSyncEventListener) listener;
+    }
+    Stat stat = null;
+    String[] children = new String[0];
+    try {
+      stat = this.zk.exists(key, null);
+
+    } catch (KeeperException e) {
+      LOG.error("Error getting stat and data.", e);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted getting stat and data.", e);
+    }
+    if (stat == null)
+      return children;
+
+    try {
+      List<String> childList = this.zk.getChildren(key, zkListener);
+      children = new String[childList.size()];
+      childList.toArray(children);
+    } catch (KeeperException e) {
+      LOG.error("Error getting stat and data.", e);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted getting stat and data.", e);
+    }
+
+    return children;
+  }
+
+  /**
+   * Clears all sub-children of node bspRoot
+   */
+  protected void clearZKNodes() {
+    try {
+      Stat s = zk.exists(bspRoot, false);
+      if (s != null) {
+        clearZKNodes(bspRoot);
+      }
+
+    } catch (Exception e) {
+      LOG.warn("Could not clear zookeeper nodes.", e);
+    }
+  }
+
+  /**
+   * Clears all sub-children of node rooted at path.
+   * 
+   * @param path
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  protected void clearZKNodes(String path) throws KeeperException,
+      InterruptedException {
+    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+    if (list.size() == 0) {
+      return;
+
+    } else {
+      for (String node : list) {
+        clearZKNodes(path + "/" + node);
+        LOG.info("Deleting " + path + "/" + node);
+        zk.delete(path + "/" + node, -1); // delete any version of this
+        // node.
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent arg0) {
+  }
+
+  @Override
+  public boolean remove(String key, SyncEventListener listener) {
+    key = correctKey(key);
+    try {
+      clearZKNodes(key);
+      this.zk.delete(key, -1);
+      return true;
+    } catch (KeeperException e) {
+      LOG.error("Error deleting key " + key);
+    } catch (InterruptedException e) {
+      LOG.error("Error deleting key " + key);
+    }
+    return false;
+
+  }
+
+  @Override
+  public void close() throws IOException {
+
+    try {
+      this.zk.close();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
new file mode 100644
index 0000000..7ce8ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * Zookeeper Synchronization Event Factory. 
+ * <ul>It provides three event definitions. 
+ * <li>Value stored in a Zookeeper node is changed
+ * <li>A new child node is added to a Zookeeper node.
+ * <li>A Zookeeper node is deleted.
+ * </ul>
+ */
+public class ZKSyncEventFactory {
+  
+  public static enum ZKEvent{
+    VALUE_CHANGE_EVENT(0),
+    CHILD_ADD_EVENT(1),
+    DELETE_EVENT(2);
+    
+    private final int id;
+    
+    ZKEvent(int num){
+      this.id = num;
+    }
+    
+    public int getValue(){
+      return this.id;
+    }
+    
+    public static int getEventCount(){
+      return ZKEvent.values().length;
+    }
+    
+    public String getName(int num){
+      if(num >=0 && num < ZKEvent.getEventCount()){
+        return ZKEvent.values()[num].name();
+      }
+      else 
+        throw new IllegalArgumentException((new StringBuilder(100)
+              .append("The value ")
+              .append(num).append(" is not a valid ZKEvent type. ")
+              .append("Expected range is 0-")
+              .append(getEventCount()-1)).toString());
+    }
+    
+  };
+  
+  public static int getSupportedEventCount(){
+    return ZKEvent.getEventCount();
+  }
+
+  private static class ValueChangeEvent implements SyncEvent {
+
+    @Override
+    public int getEventId() {
+      return ZKEvent.VALUE_CHANGE_EVENT.getValue();
+    }
+    
+  }
+  
+  private static class ChildAddEvent implements SyncEvent {
+
+    @Override
+    public int getEventId() {
+      return ZKEvent.CHILD_ADD_EVENT.getValue();
+    }
+    
+  }
+  
+  private static class DeleteEvent implements SyncEvent {
+
+    @Override
+    public int getEventId() {
+      return ZKEvent.DELETE_EVENT.getValue();
+    }
+    
+  }
+  
+  /**
+   * Provides the Zookeeper node value change event definition.
+   * @return the Zookeeper value changed event.
+   */
+  public static SyncEvent getValueChangeEvent(){
+    return new ValueChangeEvent();
+  }
+  
+  /**
+   * Provides the Zookeeper deletion event definition.
+   * @return the Zookeeper node is deleted event
+   */
+  public static SyncEvent getDeletionEvent(){
+    return new DeleteEvent();
+  }
+  
+  /**
+   * Provides the Zookeeper child addition event definition. 
+   * @return the Zookeeper child node is added event
+   */
+  public static SyncEvent getChildAddEvent(){
+    return new ChildAddEvent();
+  }
+  
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
new file mode 100644
index 0000000..fcc83fa
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public abstract class ZKSyncEventListener extends SyncEventListener
+ implements Watcher {
+Log LOG = LogFactory.getLog(SyncEventListener.class);
+  
+  private ZKSyncClient client;
+  private SyncEvent event;
+
+  /**
+   * 
+   */
+  @Override
+  public void process(WatchedEvent event) {
+    
+    client.registerListener(event.getPath(),
+        ZKSyncEventFactory.getValueChangeEvent()
+        , this);    
+    //if(LOG.isDebugEnabled()){
+      LOG.debug(event.toString());
+    //}
+
+    if(event.getType().equals(EventType.NodeChildrenChanged)){
+      LOG.debug("Node children changed - " + event.getPath());
+      onChildKeySetChange();
+    }
+    else if (event.getType().equals(EventType.NodeDeleted)){
+      LOG.debug("Node children deleted - " + event.getPath());
+      onDelete();
+    }
+    else if (event.getType().equals(EventType.NodeDataChanged)){
+      LOG.debug("Node children changed - " + event.getPath());
+      
+      onChange();
+    }
+
+  }
+  
+  public void setZKSyncClient(ZKSyncClient zkClient){
+    client = zkClient;
+  }
+  
+  public void setSyncEvent(SyncEvent event){
+    this.event = event;
+  }
+  
+  public SyncEvent getEvent(){
+    return this.event;
+  }
+
+  /**
+   * 
+   */
+  public abstract void onDelete();
+
+  /**
+   * 
+   */
+  public abstract void onChange();
+
+  /**
+   * 
+   */
+  public abstract void onChildKeySetChange();
+
+  @Override
+  public void handleEvent(int eventId) {
+    
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
index bd1c28e..ad7dc07 100644
--- a/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
+++ b/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hama.bsp.sync;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -47,7 +43,8 @@
  * This client class abstracts the use of our zookeeper sync code.
  * 
  */
-public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
+public class ZooKeeperSyncClientImpl extends ZKSyncClient implements
+    PeerSyncClient {
 
   /*
    * TODO maybe extract an abstract class and let the subclasses implement
@@ -81,6 +78,8 @@
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
 
+    initialize(this.zk, bspRoot);
+
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
     LOG.info("Start connecting to Zookeeper! At " + peerAddress);
     numBSPTasks = conf.getInt("bsp.peers.num", 1);
@@ -93,14 +92,14 @@
 
     try {
       synchronized (zk) {
-        createZnode(bspRoot);
-        final String pathToJobIdZnode = bspRoot + "/"
-            + taskId.getJobID().toString();
-        createZnode(pathToJobIdZnode);
-        final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
-        createZnode(pathToSuperstepZnode);
+
+        final String pathToSuperstepZnode = 
+            constructKey(taskId.getJobID(), "sync", ""+superstep);
+        
+        writeNode(pathToSuperstepZnode, null, true, null);
         BarrierWatcher barrierWatcher = new BarrierWatcher();
-        // this is really needed to register the barrier watcher, don't remove this line!
+        // this is really needed to register the barrier watcher, don't remove
+        // this line!
         zk.exists(pathToSuperstepZnode + "/ready", barrierWatcher);
         zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
             CreateMode.EPHEMERAL);
@@ -131,7 +130,7 @@
         } else {
           LOG.debug("---> at superstep: " + superstep
               + " task that is creating /ready znode:" + taskId.toString());
-          createEphemeralZnode(pathToSuperstepZnode + "/ready");
+          writeNode(pathToSuperstepZnode + "/ready", null, false, null);
         }
       }
     } catch (Exception e) {
@@ -143,8 +142,10 @@
   public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
       final long superstep) throws SyncException {
     try {
-      final String pathToSuperstepZnode = bspRoot + "/"
-          + taskId.getJobID().toString() + "/" + superstep;
+//      final String pathToSuperstepZnode = bspRoot + "/"
+//          + taskId.getJobID().toString() + "/" + superstep;
+      final String pathToSuperstepZnode = 
+          constructKey(taskId.getJobID(), "sync", ""+superstep);
       while (true) {
         List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
         LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
@@ -236,8 +237,9 @@
   public void register(BSPJobID jobId, TaskAttemptID taskId,
       String hostAddress, long port) {
     try {
-      if (zk.exists("/" + jobId.toString(), false) == null) {
-        zk.create("/" + jobId.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+      String jobRegisterKey = constructKey(jobId, "peers");
+      if (zk.exists(jobRegisterKey, false) == null) {
+        zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT);
       }
     } catch (KeeperException e) {
@@ -245,7 +247,7 @@
     } catch (InterruptedException e) {
       LOG.error(e);
     }
-    registerTask(zk, jobId, hostAddress, port, taskId);
+    registerTask(jobId, hostAddress, port, taskId);
   }
 
   /**
@@ -259,54 +261,14 @@
    * @param port
    * @param taskId
    */
-  public static void registerTask(ZooKeeper zk, BSPJobID jobId,
-      String hostAddress, long port, TaskAttemptID taskId) {
+  public void registerTask(BSPJobID jobId, String hostAddress, long port,
+      TaskAttemptID taskId) {
 
-    byte[] taskIdBytes = serializeTaskId(taskId);
+    // byte[] taskIdBytes = serializeTaskId(taskId);
+    String taskRegisterKey = constructKey(jobId, "peers", hostAddress + ":"
+        + port);
+    writeNode(taskRegisterKey, taskId, false, null);
 
-    try {
-      zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port,
-          taskIdBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
-    }
-  }
-
-  private static byte[] serializeTaskId(TaskAttemptID taskId) {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bos);
-    try {
-      taskId.write(out);
-    } catch (IOException e) {
-      LOG.error(e);
-    } finally {
-      try {
-        out.close();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-    return bos.toByteArray();
-  }
-
-  public static TaskAttemptID deserializeTaskId(byte[] arr) {
-    ByteArrayInputStream bis = new ByteArrayInputStream(arr);
-    DataInputStream in = new DataInputStream(bis);
-    TaskAttemptID id = new TaskAttemptID();
-    try {
-      id.readFields(in);
-    } catch (IOException e) {
-      LOG.error(e);
-    } finally {
-      try {
-        in.close();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-    return id;
   }
 
   @Override
@@ -314,16 +276,22 @@
     if (allPeers == null) {
       TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
       try {
-        allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
-            .toArray(new String[0]);
+        allPeers = zk.getChildren(constructKey(taskId.getJobID(), "peers"),
+            this).toArray(new String[0]);
 
         for (String s : allPeers) {
-          byte[] data = zk.getData(
-              "/" + taskId.getJobID().toString() + "/" + s, this, null);
-          TaskAttemptID thatTask = deserializeTaskId(data);
-          LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
-              + thatTask.getTaskID().getId() + " : " + s);
-          sortedMap.put(thatTask.getTaskID().getId(), s);
+          byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
+              this, null);
+          TaskAttemptID thatTask = new TaskAttemptID(); 
+          boolean result = getValueFromBytes(data, thatTask);
+
+          if(result){
+            LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+                + thatTask.getTaskID().getId() + " : " + s);
+            sortedMap.put(thatTask.getTaskID().getId(), s);
+          }
+
+
         }
 
       } catch (Exception e) {
@@ -344,8 +312,13 @@
   }
 
   @Override
-  public void close() throws InterruptedException {
-    zk.close();
+  public void close() throws IOException {
+    try{
+      zk.close();
+    }
+    catch(InterruptedException e){
+      throw new IOException(e);
+    }
   }
 
   @Override
@@ -379,36 +352,6 @@
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
 
-  private String getNodeName(TaskAttemptID taskId, long superstep) {
-    return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
-        + taskId.toString();
-  }
-
-  private void createZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.PERSISTENT);
-  }
-
-  private void createEphemeralZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.EPHEMERAL);
-  }
-
-  private void createZnode(final String path, final CreateMode mode)
-      throws KeeperException, InterruptedException {
-    synchronized (zk) {
-      Stat s = zk.exists(path, false);
-      if (null == s) {
-        try {
-          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
-        } catch (KeeperException.NodeExistsException nee) {
-          LOG.debug("Ignore because znode may be already created at " + path,
-              nee);
-        }
-      }
-    }
-  }
-
   /*
    * INNER CLASSES
    */
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
new file mode 100644
index 0000000..28e914a
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/BSPResource.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BSPResource defines a resource entity that would be used as a factor
+ * for allocating tasks on groom-servers.
+ */
+public abstract class BSPResource {
+
+  /**
+   * Returns the list of grooms on which the current resource is available or
+   * local or is best chosen for the task.
+   * 
+   * @param tip The <code>TaskInProgress</code> representing the task to
+   *          schedule.
+   * @return The list of groomserver host names.
+   */
+  public abstract String[] getGrooms(TaskInProgress tip);
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
new file mode 100644
index 0000000..a132cb6
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/BestEffortDataLocalTaskAllocator.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>BestEffortDataLocalTaskAllocator</code> is a simple task allocator that
+ * takes in only the data locality as a constraint for allocating tasks. It
+ * makes the best attempt to schedule task on the groom server with the input
+ * split. If the aforesaid is not possible, it selects any other available groom
+ * to allocate tasks on.
+ */
+public class BestEffortDataLocalTaskAllocator implements TaskAllocationStrategy {
+
+  Log LOG = LogFactory.getLog(BestEffortDataLocalTaskAllocator.class);
+
+  @Override
+  public void initialize(Configuration conf) {
+  }
+
+  /**
+   * Returns the first groom that has a slot to schedule a task on.
+   * 
+   * @param grooms
+   * @param tasksInGroomMap
+   * @return
+   */
+  private String getAnyGroomToSchedule(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap) {
+
+    Iterator<String> groomIter = grooms.keySet().iterator();
+    while (groomIter.hasNext()) {
+      GroomServerStatus groom = grooms.get(groomIter.next());
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * From the set of grooms given, returns the groom on which a task could be
+   * scheduled on.
+   * 
+   * @param grooms
+   * @param tasksInGroomMap
+   * @param possibleLocations
+   * @return
+   */
+  private String getGroomToSchedule(Map<String, GroomServerStatus> grooms,
+      Map<GroomServerStatus, Integer> tasksInGroomMap,
+      String[] possibleLocations) {
+
+    for (int i = 0; i < possibleLocations.length; ++i) {
+      String location = possibleLocations[i];
+      GroomServerStatus groom = grooms.get(location);
+      if (groom == null)
+        continue;
+      Integer taskInGroom = tasksInGroomMap.get(groom);
+      taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+      if (taskInGroom < groom.getMaxTasks()
+          && location.equals(groom.getGroomHostName())) {
+        return groom.getGroomHostName();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public GroomServerStatus getGroomToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask())
+      return null;
+
+    String groomName = null;
+    if (selectedGrooms != null) {
+      groomName = getGroomToSchedule(groomStatuses, taskCountInGroomMap,
+          selectedGrooms);
+    }
+
+    if (groomName == null) {
+      groomName = getAnyGroomToSchedule(groomStatuses, taskCountInGroomMap);
+    }
+
+    if (groomName != null) {
+      return groomStatuses.get(groomName);
+    }
+
+    return null;
+  }
+
+  /**
+   * Select grooms that has the block of data locally stored on the groom
+   * server.
+   */
+  @Override
+  public String[] selectGrooms(Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    if (!taskInProgress.canStartTask()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cannot start task based on id");
+      }
+      return new String[0];
+    }
+
+    RawSplit rawSplit = taskInProgress.getFileSplit();
+    if (rawSplit != null) {
+      return rawSplit.getLocations();
+    }
+    return null;
+  }
+
+  /**
+   * This operation is not supported.
+   */
+  @Override
+  public Set<GroomServerStatus> getGroomsToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress) {
+    throw new UnsupportedOperationException(
+        "This API is not supported for the called API function call.");
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
new file mode 100644
index 0000000..3006a94
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/RawSplitResource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>RawSplitResource</code> defines the data block resource that could be
+ * used to find which groom to schedule for data-locality. 
+ */
+public class RawSplitResource extends BSPResource{
+
+  private RawSplit split;
+  
+  public RawSplitResource(){
+    
+  }
+  
+  /**
+   * Initialize the resource with data block split information.
+   * @param split The data-split provided by <code>BSPJobClient</client>
+   */
+  public RawSplitResource(RawSplit split){
+    this.split = split;
+  }
+  
+  @Override
+  public String[] getGrooms(TaskInProgress tip) {
+    return split.getLocations();
+  }
+
+}
diff --git a/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java b/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
new file mode 100644
index 0000000..1f15607
--- /dev/null
+++ b/core/src/main/java/org/apache/hama/bsp/taskallocation/TaskAllocationStrategy.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.taskallocation;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.TaskInProgress;
+
+/**
+ * <code>TaskAllocationStrategy</class> defines the behavior of task allocation
+ * strategy that is employed by a Hama job to schedule tasks in GroomServers in 
+ * Hama cluster. The function <code>selectGrooms</code> is responsible to define
+ * the strategy to select the grooms. This list of grooms could be used in the
+ * functions <code>getGroomToAllocate</code> or <code>getGroomsToAllocate</code>
+ * in the parameter <code>selectedGrooms</code>. The two functions are not given
+ * the responsibility to select grooms because these functions could also be
+ * handling the task of allocating tasks on any other restricted set of grooms
+ * that the caller invokes them for.
+ * 
+ */
+
+@Unstable
+public interface TaskAllocationStrategy {
+
+  /**
+   * Initializes the <code>TaskAllocationStrategy</code> instance.
+   * 
+   * @param conf Hama configuration
+   */
+  public abstract void initialize(Configuration conf);
+
+  /**
+   * Defines the task-allocation strategy to select the grooms based on the
+   * resource constraints and the task related restrictions posed. This function
+   * could be used to populate the <code>selectedGrooms</code> argument in the
+   * functions <code>getGroomToAllocate</code> and
+   * <code>getGroomsToAllocate</code>
+   * 
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+   * @return An array of hostnames where the tasks could be allocated on.
+   */
+  @Unstable
+  public abstract String[] selectGrooms(
+      Map<String, GroomServerStatus> groomStatuses,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress);
+
+  /**
+   * Returns the best groom to run the task on based on the set of grooms
+   * provided. The designer of the class can choose to populate the
+   * <code>selectedGrooms</code> value with the function
+   * <code>selectGrooms</code>
+   * 
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param selectedGrooms An array of selected groom host-names to select from.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+   * @return Host Name of the selected groom. Returns null if no groom could be
+   *         found.
+   */
+  @Unstable
+  public abstract GroomServerStatus getGroomToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress);
+
+  /**
+   * Returns the best grooms to run the task on based on the set of grooms
+   * provided. The designer of the class can choose to populate the
+   * <code>selectedGrooms</code> value with the function
+   * <code>selectGrooms</code>
+   * 
+   * @param groomStatuses The map of groom-name to
+   *          <code>GroomServerStatus</code> object for all known grooms.
+   * @param selectedGrooms An array of selected groom host-names to select from.
+   * @param taskCountInGroomMap Map of count of tasks in groom (To be deprecated
+   *          soon)
+   * @param taskInProgress The <code>TaskInProgress</code> object for the task.
+   * @return Host Names of the selected grooms where the task could be
+   *         allocated. Returns null if no groom could be found.
+   */
+  @Unstable
+  public abstract Set<GroomServerStatus> getGroomsToAllocate(
+      Map<String, GroomServerStatus> groomStatuses, String[] selectedGrooms,
+      Map<GroomServerStatus, Integer> taskCountInGroomMap,
+      BSPResource[] resources, TaskInProgress taskInProgress);
+}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
index 80e3578..3cc1b4a 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
@@ -316,7 +316,7 @@
       HamaConfiguration hamaConf = new HamaConfiguration();
       hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
       hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
-      hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+      hamaConf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
           LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
       hamaConf.setInt("bsp.master.port", 610002);
@@ -421,7 +421,7 @@
 
     conf.setInt(Constants.GROOM_PING_PERIOD, 200);
     conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
     int testNumber = incrementTestNumber();
diff --git a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
index e756638..e34b05b 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
@@ -17,7 +17,18 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -25,19 +36,30 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
-import org.apache.hama.bsp.message.type.ByteMessage;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
+import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.KeyValuePair;
 
 public class TestCheckpoint extends TestCase {
 
@@ -45,130 +67,578 @@
 
   static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public void testCheckpoint() throws Exception {
-    Configuration config = new Configuration();
-    config.set(SyncServiceFactory.SYNC_CLIENT_CLASS,
-        LocalBSPRunner.LocalSyncClient.class.getName());
-    config.set("bsp.output.dir", "/tmp/hama-test_out");
-    FileSystem dfs = FileSystem.get(config);
+  public static class TestMessageManager implements MessageManager<Text> {
 
-    BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
-    bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
-        new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
-        "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
-    assertNotNull("BSPPeerImpl should not be null.", bspTask);
-    if (dfs.mkdirs(new Path("checkpoint"))) {
-      if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
-        if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
-          ;
+    List<Text> messageQueue = new ArrayList<Text>();
+    BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>();
+    Iterator<Text> iter = null;
+    MessageEventListener<Text> listener;
+
+    @Override
+    public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
+        Configuration conf, InetSocketAddress peerAddress) {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Text getCurrentMessage() throws IOException {
+      if (iter == null)
+        iter = this.messageQueue.iterator();
+      if (iter.hasNext())
+        return iter.next();
+      return null;
+    }
+
+    @Override
+    public void send(String peerName, Text msg) throws IOException {
+    }
+
+    @Override
+    public void finishSendPhase() throws IOException {
+    }
+
+    @Override
+    public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
+      return null;
+    }
+
+    @Override
+    public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
+        throws IOException {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void clearOutgoingQueues() {
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return this.messageQueue.size();
+    }
+
+    public BSPMessageBundle<Text> getLoopbackBundle() {
+      return this.loopbackBundle;
+    }
+
+    public void addMessage(Text message) throws IOException {
+      this.messageQueue.add(message);
+      listener.onMessageReceived(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+      this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
+    }
+
+    @Override
+    public void loopBackMessage(Writable message) {
+    }
+
+    @Override
+    public void registerListener(MessageEventListener<Text> listener)
+        throws IOException {
+      this.listener = listener;
+    }
+
+  }
+
+  public static class TestBSPPeer implements
+      BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
+
+    Configuration conf;
+    long superstepCount;
+    FaultTolerantPeerService<Text> fService;
+
+    public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
+        Counters counters, long superstep, BSPPeerSyncClient syncClient,
+        MessageManager<Text> messenger, TaskStatus.State state) {
+      this.conf = conf;
+      if (superstep > 0)
+        superstepCount = superstep;
+      else
+        superstepCount = 0L;
+
+      try {
+        fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
+            job, (BSPPeer<?, ?, ?, ?, Text>) this,
+            (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
+            messenger);
+        this.fService.onPeerInitialized(state);
+      } catch (Exception e) {
+        e.printStackTrace();
       }
     }
-    assertTrue("Make sure directory is created.",
-        dfs.exists(new Path(checkpointedDir)));
-    byte[] tmpData = "data".getBytes();
-    BSPMessageBundle bundle = new BSPMessageBundle();
-    bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
-    assertNotNull("Message bundle can not be null.", bundle);
-    assertNotNull("Configuration should not be null.", config);
-    bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
-        bundle);
-    FSDataInputStream in = dfs.open(new Path(checkpointedDir
-        + "/attempt_201110302255_0001_000000_0"));
-    BSPMessageBundle bundleRead = new BSPMessageBundle();
-    bundleRead.readFields(in);
-    in.close();
-    ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
-    String content = new String(byteMsg.getData());
-    LOG.info("Saved checkpointed content is " + content);
-    assertTrue("Message content should be the same.", "data".equals(content));
-    dfs.delete(new Path("checkpoint"), true);
+
+    @Override
+    public void send(String peerName, Text msg) throws IOException {
+    }
+
+    @Override
+    public Text getCurrentMessage() throws IOException {
+      return new Text("data");
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return 1;
+    }
+
+    @Override
+    public void sync() throws IOException, SyncException, InterruptedException {
+      ++superstepCount;
+      try {
+        this.fService.afterBarrier();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      LOG.info("After barrier " + superstepCount);
+    }
+
+    @Override
+    public long getSuperstepCount() {
+      return superstepCount;
+    }
+
+    @Override
+    public String getPeerName() {
+      return null;
+    }
+
+    @Override
+    public String getPeerName(int index) {
+      return null;
+    }
+
+    @Override
+    public int getPeerIndex() {
+      return 1;
+    }
+
+    @Override
+    public String[] getAllPeerNames() {
+      return null;
+    }
+
+    @Override
+    public int getNumPeers() {
+      return 0;
+    }
+
+    @Override
+    public void clear() {
+
+    }
+
+    @Override
+    public void write(NullWritable key, NullWritable value) throws IOException {
+
+    }
+
+    @Override
+    public boolean readNext(NullWritable key, NullWritable value)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public KeyValuePair<NullWritable, NullWritable> readNext()
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public void reopenInput() throws IOException {
+
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return null;
+    }
+
+    @Override
+    public void incrementCounter(Enum<?> key, long amount) {
+
+    }
+
+    @Override
+    public void incrementCounter(String group, String counter, long amount) {
+
+    }
+
+  }
+
+  public static class TempSyncClient extends BSPPeerSyncClient {
+
+    Map<String, Writable> valueMap = new HashMap<String, Writable>();
+
+    @Override
+    public String constructKey(BSPJobID jobId, String... args) {
+      StringBuffer buffer = new StringBuffer(100);
+      buffer.append(jobId.toString()).append("/");
+      for (String arg : args) {
+        buffer.append(arg).append("/");
+      }
+      return buffer.toString();
+    }
+
+    @Override
+    public boolean storeInformation(String key, Writable value,
+        boolean permanent, SyncEventListener listener) {
+      ArrayWritable writables = (ArrayWritable) value;
+      long step = ((LongWritable) writables.get()[0]).get();
+      long count = ((LongWritable) writables.get()[1]).get();
+
+      LOG.info("SyncClient Storing value step = " + step + " count = " + count
+          + " for key " + key);
+      valueMap.put(key, value);
+      return true;
+    }
+
+    @Override
+    public boolean getInformation(String key,
+        Writable valueHolder) {
+      LOG.info("Getting value for key " + key);
+      if(!valueMap.containsKey(key)){
+        return false;
+      }
+      Writable value =  valueMap.get(key);
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      DataOutputStream outputStream = new DataOutputStream(byteStream);
+      byte[] data = null;
+      try {
+        value.write(outputStream);
+        outputStream.flush();
+        data = byteStream.toByteArray();
+        ByteArrayInputStream istream = new ByteArrayInputStream(data);
+        DataInputStream diStream = new DataInputStream(istream);
+        valueHolder.readFields(diStream);
+        return true;
+      } catch (IOException e) {
+        LOG.error("Error writing data to write buffer.", e);
+      } finally {
+        try {
+          byteStream.close();
+          outputStream.close();
+        } catch (IOException e) {
+          LOG.error("Error closing byte stream.", e);
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean addKey(String key, boolean permanent,
+        SyncEventListener listener) {
+      valueMap.put(key, NullWritable.get());
+      return true;
+    }
+
+    @Override
+    public boolean hasKey(String key) {
+      return valueMap.containsKey(key);
+    }
+
+    @Override
+    public String[] getChildKeySet(String key, SyncEventListener listener) {
+      List<String> list = new ArrayList<String>();
+      Iterator<String> keyIter = valueMap.keySet().iterator();
+      while (keyIter.hasNext()) {
+        String keyVal = keyIter.next();
+        if (keyVal.startsWith(key + "/")) {
+          list.add(keyVal);
+        }
+      }
+      String[] arr = new String[list.size()];
+      list.toArray(arr);
+      return arr;
+    }
+
+    @Override
+    public boolean registerListener(String key, SyncEvent event,
+        SyncEventListener listener) {
+      return false;
+    }
+
+    @Override
+    public boolean remove(String key, SyncEventListener listener) {
+      valueMap.remove(key);
+      return false;
+    }
+
+    @Override
+    public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+        throws Exception {
+    }
+
+    @Override
+    public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws SyncException {
+      LOG.info("Enter barrier called - " + superstep);
+    }
+
+    @Override
+    public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws SyncException {
+      LOG.info("Exit barrier called - " + superstep);
+    }
+
+    @Override
+    public void register(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+    }
+
+    @Override
+    public String[] getAllPeerNames(TaskAttemptID taskId) {
+      return null;
+    }
+
+    @Override
+    public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+    }
+
+    @Override
+    public void stopServer() {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+  }
+
+  private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspTask, BSPJob job, long step, long count) {
+    
+    ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
+    
+    boolean result = syncClient.getInformation(
+        syncClient.constructKey(job.getJobID(), "checkpoint",
+            "" + bspTask.getPeerIndex()), writableVal);
+    
+    assertTrue(result);
+
+    LongWritable superstepNo = (LongWritable) writableVal.get()[0]; 
+    LongWritable msgCount = (LongWritable) writableVal.get()[1];
+
+    assertEquals(step, superstepNo.get());
+    assertEquals(count, msgCount.get());
   }
 
   public void testCheckpointInterval() throws Exception {
+    Configuration config = new Configuration();
+    System.setProperty("user.dir", "/tmp");
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    config.setInt(Constants.CHECKPOINT_INTERVAL, 2);
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
 
-    Configuration conf = new Configuration();
-    conf.set("bsp.output.dir", "/tmp/hama-test_out");
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
-        LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
 
-    conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+    @SuppressWarnings("rawtypes")
+    BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+        (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
 
-    int port = BSPNetUtils.getFreePort(5000);
-    InetSocketAddress inetAddress = new InetSocketAddress(port);
-    MinimalGroomServer groom = new MinimalGroomServer(conf);
-    Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
-        inetAddress.getPort(), conf);
-    workerServer.start();
+    assertNotNull("BSPPeerImpl should not be null.", bspTask);
 
-    LOG.info("Started RPC server");
-    conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
-    conf.setInt("bsp.peers.num", 1);
+    LOG.info("Created bsp peer and other parameters");
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
 
-    BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
-        BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
-        conf);
-    LOG.info("Started the proxy connections");
+    boolean result = syncClient.getInformation(
+            syncClient.constructKey(job.getJobID(), "checkpoint",
+                "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
 
-    TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
-        "job_201110102255", 1), 1), 1);
+    assertFalse(result);
 
-    try {
-      BSPJob job = new BSPJob(new HamaConfiguration(conf));
-      job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
-      job.setOutputFormat(TextOutputFormat.class);
-      final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
-          BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
-          new InetSocketAddress("127.0.0.1", port), conf);
+    bspTask.sync();
+    // Superstep 1
+  
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
 
-      BSPTask task = new BSPTask();
-      task.setConf(job);
+    Text txtMessage = new Text("data");
+    messenger.addMessage(txtMessage);
 
-      @SuppressWarnings("rawtypes")
-      BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
-          proto, 0, null, null, new Counters());
+    bspTask.sync();
+    // Superstep 2
 
-      bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
-          TaskStatus.State.RUNNING, "running", "127.0.0.1",
-          TaskStatus.Phase.STARTING, new Counters()));
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
 
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
+    messenger.addMessage(txtMessage);
 
-      conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
-      conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
+    bspTask.sync();
+    // Superstep 3
 
-      bspPeer.sync();
+    checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
 
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), true);
+    bspTask.sync();
+    // Superstep 4
 
-      job.setCheckPointInterval(5);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
+    checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
 
-    } catch (Exception e) {
-      LOG.error("Error testing BSPPeer.", e);
-    } finally {
-      umbilical.close();
-      Thread.sleep(2000);
-      workerServer.stop();
-      Thread.sleep(2000);
-    }
+    messenger.addMessage(txtMessage);
+    messenger.addMessage(txtMessage);
 
+    bspTask.sync();
+    // Superstep 5
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+    bspTask.sync();
+    // Superstep 6
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+    dfs.delete(new Path("checkpoint"), true);
   }
+
+  @SuppressWarnings("rawtypes")
+  public void testCheckpoint() throws Exception {
+    Configuration config = new Configuration();
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
+
+    config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    config.setInt(Constants.PEER_PORT, port);
+
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
+
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+    BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+        (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
+
+    assertNotNull("BSPPeerImpl should not be null.", bspTask);
+
+    LOG.info("Created bsp peer and other parameters");
+
+    @SuppressWarnings("unused")
+    FaultTolerantPeerService<Text> service = null;
+
+    bspTask.sync();
+    LOG.info("Completed first sync.");
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    Text txtMessage = new Text("data");
+    messenger.addMessage(txtMessage);
+    
+    bspTask.sync();
+
+    LOG.info("Completed second sync.");
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L);
+
+    // Checking the messages for superstep 2 and peer id 1
+    String expectedPath = "checkpoint/job_checkpttest_0001/2/1";
+    FSDataInputStream in = dfs.open(new Path(expectedPath));
+
+    String className = in.readUTF();
+    Text message = (Text) ReflectionUtils.newInstance(Class.forName(className),
+        config);
+    message.readFields(in);
+
+    assertEquals("data", message.toString());
+
+    dfs.delete(new Path("checkpoint"), true);
+  }
+
+  public void testPeerRecovery() throws Exception {
+    Configuration config = new Configuration();
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
+
+    config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    config.setInt(Constants.PEER_PORT, port);
+
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
+
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+
+    Text txtMessage = new Text("data");
+    String writeKey = "job_checkpttest_0001/checkpoint/1/";
+
+    Writable[] writableArr = new Writable[2];
+    writableArr[0] = new LongWritable(3L);
+    writableArr[1] = new LongWritable(5L);
+    ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+    arrWritable.set(writableArr);
+    syncClient.storeInformation(writeKey, arrWritable, true, null);
+
+    String writePath = "checkpoint/job_checkpttest_0001/3/1";
+    FSDataOutputStream out = dfs.create(new Path(writePath));
+    for (int i = 0; i < 5; ++i) {
+      out.writeUTF(txtMessage.getClass().getCanonicalName());
+      txtMessage.write(out);
+    }
+    out.close();
+
+    @SuppressWarnings("unused")
+    BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId,
+        new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger,
+        TaskStatus.State.RECOVERING);
+
+    BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
+    assertEquals(5, bundleRead.getMessages().size());
+    String recoveredMsg = bundleRead.getMessages().get(0).toString();
+    assertEquals(recoveredMsg, "data");
+    dfs.delete(new Path("checkpoint"), true);
+  }
+
 }
diff --git a/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
new file mode 100644
index 0000000..4a20dba
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestTaskAllocation.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
+import org.apache.hama.bsp.taskallocation.BSPResource;
+import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
+import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
+
+public class TestTaskAllocation extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestTaskAllocation.class);
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  public void testBestEffortDataLocality() throws Exception {
+
+    Configuration conf = new Configuration();
+
+    String[] locations = new String[] { "host6", "host4", "host3" };
+    String value = "data";
+    RawSplit split = new RawSplit();
+    split.setLocations(locations);
+    split.setBytes(value.getBytes(), 0, value.getBytes().length);
+    split.setDataLength(value.getBytes().length);
+
+    assertEquals(value.getBytes().length, (int) split.getDataLength());
+
+    Map<GroomServerStatus, Integer> taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>(
+        20);
+    BSPResource[] resources = new BSPResource[0];
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    JobInProgress jobProgress = new JobInProgress(job.getJobID(), conf);
+    TaskInProgress taskInProgress = new TaskInProgress(job.getJobID(),
+        "job.xml", split, conf, jobProgress, 1);
+
+    Map<String, GroomServerStatus> groomStatuses = new HashMap<String, GroomServerStatus>(
+        20);
+
+    for (int i = 0; i < 10; ++i) {
+
+      String name = "host" + i;
+      GroomServerStatus status = new GroomServerStatus(name,
+          new ArrayList<TaskStatus>(), 0, 3);
+      groomStatuses.put(name, status);
+      taskCountInGroomMap.put(status, 0);
+
+    }
+
+    TaskAllocationStrategy strategy = ReflectionUtils.newInstance(conf
+        .getClass("", BestEffortDataLocalTaskAllocator.class,
+            TaskAllocationStrategy.class), conf);
+
+    String[] hosts = strategy.selectGrooms(groomStatuses, taskCountInGroomMap,
+        resources, taskInProgress);
+
+    List<String> list = new ArrayList<String>();
+
+    for (int i = 0; i < hosts.length; ++i) {
+      list.add(hosts[i]);
+    }
+
+    assertTrue(list.contains("host6"));
+    assertTrue(list.contains("host3"));
+    assertTrue(list.contains("host4"));
+
+  }
+
+}
diff --git a/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java b/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
index 3e3afd5..9b68671 100644
--- a/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
+++ b/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
@@ -20,20 +20,22 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
+import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
 import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl;
 import org.apache.hama.util.BSPNetUtils;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.mortbay.log.Log;
 
 public class TestZooKeeper extends TestCase {
 
@@ -41,6 +43,7 @@
 
   public TestZooKeeper() {
     configuration = new HamaConfiguration();
+    System.setProperty("user.dir", "/tmp");
     configuration.set("bsp.master.address", "localhost");
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
@@ -57,6 +60,7 @@
   public void testClearZKNodes() throws IOException, KeeperException,
       InterruptedException {
     final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl();
+    boolean done = false;
     try {
       server.init(configuration);
       new Thread(new Runnable() {
@@ -71,55 +75,113 @@
         }
       }).start();
 
-      int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
-          6000);
-      String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
-      String bspRoot = "/";
-      // Establishing a zk session.
-      ZooKeeper zk = new ZooKeeper(connectStr, timeout, null);
+      Thread.sleep(1000);
 
-      // Creating dummy bspRoot if it doesn't already exist.
-      Stat s = zk.exists(bspRoot, false);
-      if (s == null) {
-        zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-      }
+      String bspRoot = "/bsp";
 
-      // Creating dummy child nodes at depth 1.
-      String node1 = bspRoot + "task1";
-      String node2 = bspRoot + "task2";
-      zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      ZooKeeperSyncClientImpl peerClient = (ZooKeeperSyncClientImpl) SyncServiceFactory
+          .getPeerSyncClient(configuration);
 
-      // Creating dummy child node at depth 2.
-      String node11 = node1 + "superstep1";
-      zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      ZKSyncBSPMasterClient masterClient = (ZKSyncBSPMasterClient) SyncServiceFactory
+          .getMasterSyncClient(configuration);
 
-      ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot,
-          false);
-      assertEquals(2, list.size());
-      System.out.println(list.size());
+      masterClient.init(configuration);
 
-      // clear it
-      BSPMaster.clearZKNodes(zk, "/");
+      Thread.sleep(100);
 
-      list = (ArrayList<String>) zk.getChildren(bspRoot, false);
-      System.out.println(list.size());
-      assertEquals(0, list.size());
+      Log.info("Created master and client sync clients");
 
-      try {
-        zk.getData(node11, false, null);
-        fail();
-      } catch (KeeperException.NoNodeException e) {
-        System.out.println("Node has been removed correctly!");
-      } finally {
-        zk.close();
-      }
+      assertTrue(masterClient.hasKey(bspRoot));
+
+      Log.info("BSP root exists");
+
+      BSPJobID jobID = new BSPJobID("test1", 1);
+      masterClient.registerJob(jobID.toString());
+      TaskID taskId1 = new TaskID(jobID, 1);
+      TaskID taskId2 = new TaskID(jobID, 2);
+
+      TaskAttemptID task1 = new TaskAttemptID(taskId1, 1);
+      TaskAttemptID task2 = new TaskAttemptID(taskId2, 1);
+
+      int zkPort = BSPNetUtils.getFreePort(21815);
+      configuration.setInt(Constants.PEER_PORT, zkPort);
+      peerClient.init(configuration, jobID, task1);
+
+      peerClient.registerTask(jobID, "hamanode1", 5000L, task1);
+      peerClient.registerTask(jobID, "hamanode2", 5000L, task2);
+
+      peerClient.storeInformation(
+          peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
+          true, null);
+
+      String[] names = peerClient.getAllPeerNames(task1);
+
+      Log.info("Found child count = " + names.length);
+
+      assertEquals(2, names.length);
+
+      Log.info("Passed the child count test");
+
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"),
+          true, null);
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"),
+          true, null);
+
+      String[] peerChild = masterClient.getChildKeySet(
+          masterClient.constructKey(jobID, "peer"), null);
+      Log.info("Found child count = " + peerChild.length);
+
+      assertEquals(2, peerChild.length);
+
+      Log.info(" Peer name " + peerChild[0]);
+      Log.info(" Peer name " + peerChild[1]);
+
+      Log.info("Passed the child key set test");
+
+      masterClient.deregisterJob(jobID.toString());
+      Log.info(masterClient.constructKey(jobID));
+
+      Thread.sleep(200);
+
+      assertEquals(false, masterClient.hasKey(masterClient.constructKey(jobID)));
+
+      Log.info("Passed the key presence test");
+
+      boolean result = masterClient
+          .getInformation(masterClient.constructKey(jobID, "info", "level3"),
+              new IntWritable());
+
+      assertEquals(false, result);
+      
+      Writable[] writableArr = new Writable[2];
+      writableArr[0] = new LongWritable(3L);
+      writableArr[1] = new LongWritable(5L);
+      ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+      arrWritable.set(writableArr);
+      masterClient.storeInformation(
+          masterClient.constructKey(jobID, "info", "level3"), 
+          arrWritable, true, null);
+      
+      ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
+      
+      boolean getResult = masterClient.getInformation(
+          masterClient.constructKey(jobID, "info", "level3"), valueHolder);
+      
+      assertTrue(getResult);
+      
+      assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
+      assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
+      
+      Log.info("Passed array writable test");
+      done = true;
+
     } catch (Exception e) {
       e.printStackTrace();
+
     } finally {
       server.stopServer();
     }
+    assertEquals(true, done);
   }
 
 }
diff --git a/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java b/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
index b6ce89e..22ed266 100644
--- a/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
+++ b/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
@@ -17,20 +17,64 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.util.concurrent.Executors;
+
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.util.BSPNetUtils;
 
 public class TestSyncServiceFactory extends TestCase {
 
+  public static final Log LOG = LogFactory.getLog(TestCase.class);
+
+  public static class ListenerTest extends ZKSyncEventListener {
+
+    private Text value;
+
+    public ListenerTest() {
+      value = new Text("init");
+    }
+
+    public String getValue() {
+      return value.toString();
+    }
+
+    @Override
+    public void onDelete() {
+
+    }
+
+    @Override
+    public void onChange() {
+      LOG.info("ZK value changed event triggered.");
+      value.set("Changed");
+
+    }
+
+    @Override
+    public void onChildKeySetChange() {
+
+    }
+
+  }
+
   public void testClientInstantiation() throws Exception {
 
     Configuration conf = new Configuration();
     // given null, should return zookeeper
-    SyncClient syncClient = SyncServiceFactory.getSyncClient(conf);
+    PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(conf);
     assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
   }
-  
+
   public void testServerInstantiation() throws Exception {
 
     Configuration conf = new Configuration();
@@ -39,4 +83,104 @@
     assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
   }
 
+  private static class ZKServerThread implements Runnable {
+
+    SyncServer server;
+
+    ZKServerThread(SyncServer s) {
+      server = s;
+    }
+
+    @Override
+    public void run() {
+      try {
+        server.start();
+      } catch (Exception e) {
+        LOG.error("Error running server.", e);
+      }
+    }
+
+  }
+
+  public void testZKSyncStore() throws Exception {
+    Configuration conf = new Configuration();
+    int zkPort = BSPNetUtils.getFreePort(21811);
+    conf.set("bsp.local.dir", "/tmp/hama-test");
+    conf.set("bsp.output.dir", "/tmp/hama-test_out");
+    conf.setInt(Constants.PEER_PORT, zkPort);
+    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort);
+    conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000");
+    System.setProperty("user.dir", "/tmp");
+    // given null, should return zookeeper
+    final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf);
+    syncServer.init(conf);
+    assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
+
+    ZKServerThread serverThread = new ZKServerThread(syncServer);
+    Executors.newFixedThreadPool(1).submit(serverThread);
+
+    Thread.sleep(1000);
+
+    final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+        .getPeerSyncClient(conf);
+    assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
+    BSPJobID jobId = new BSPJobID("abc", 1);
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1);
+    syncClient.init(conf, jobId, taskId);
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        try {
+          syncServer.stopServer();
+
+        } catch (Exception e) {
+          // too late to log!
+        }
+      }
+    });
+
+    IntWritable data = new IntWritable(5);
+    syncClient.storeInformation(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+        null);
+
+    ListenerTest listenerTest = new ListenerTest();
+
+    syncClient.registerListener(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+        ZKSyncEventFactory.getValueChangeEvent(), listenerTest);
+
+    IntWritable valueHolder = new IntWritable();
+    boolean result = syncClient
+        .getInformation(
+            syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+            valueHolder);
+    assertTrue(result);
+    int intVal = valueHolder.get();
+    assertTrue(intVal == data.get());
+
+    data.set(6);
+    syncClient.storeInformation(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+        null);
+    valueHolder = new IntWritable();
+    result = syncClient
+        .getInformation(
+            syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+            valueHolder);
+
+    assertTrue(result);
+    intVal = valueHolder.get();
+    assertTrue(intVal == data.get());
+
+    Thread.sleep(5000);
+
+    assertEquals(true, listenerTest.getValue().equals("Changed"));
+
+    syncServer.stopServer();
+
+  }
+
 }