GIRAPH-1139

closes #30
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 976997f..c3fd141 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -188,12 +188,12 @@
   private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT;
   /** Job id, to ensure uniqueness */
   private final String jobId;
-  /** Task partition, to ensure uniqueness */
-  private final int taskPartition;
+  /** Task id, from partition and application attempt to ensure uniqueness */
+  private final int taskId;
   /** My hostname */
   private final String hostname;
-  /** Combination of hostname '_' partition (unique id) */
-  private final String hostnamePartitionId;
+  /** Combination of hostname '_' task (unique id) */
+  private final String hostnameTaskId;
   /** Graph partitioner */
   private final GraphPartitionerFactory<I, V, E> graphPartitionerFactory;
   /** Mapper that will do the graph computation */
@@ -231,8 +231,8 @@
     this.context = context;
     this.graphTaskManager = graphTaskManager;
     this.conf = graphTaskManager.getConf();
+
     this.jobId = conf.getJobId();
-    this.taskPartition = conf.getTaskPartition();
     this.restartedSuperstep = conf.getLong(
         GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP);
     try {
@@ -240,7 +240,6 @@
     } catch (UnknownHostException e) {
       throw new RuntimeException(e);
     }
-    this.hostnamePartitionId = hostname + "_" + getTaskPartition();
     this.graphPartitionerFactory = conf.createGraphPartitioner();
 
     basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
@@ -252,6 +251,8 @@
     applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
     cleanedUpPath = basePath + CLEANED_UP_DIR;
 
+
+
     String restartJobId = RESTART_JOB_ID.get(conf);
 
     savedCheckpointBasePath =
@@ -272,7 +273,7 @@
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("BspService: Connecting to ZooKeeper with job " + jobId +
-          ", " + getTaskPartition() + " on " + serverPortList);
+          ", partition " + conf.getTaskPartition() + " on " + serverPortList);
     }
     try {
       this.zk = new ZooKeeperExt(serverPortList,
@@ -288,6 +289,10 @@
       throw new RuntimeException(e);
     }
 
+    this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
+            conf.getTaskPartition();
+    this.hostnameTaskId = hostname + "_" + getTaskId();
+
     //Trying to restart from the latest superstep
     if (restartJobId != null &&
         restartedSuperstep == UNSET_SUPERSTEP) {
@@ -529,12 +534,12 @@
     return hostname;
   }
 
-  public final String getHostnamePartitionId() {
-    return hostnamePartitionId;
+  public final String getHostnameTaskId() {
+    return hostnameTaskId;
   }
 
-  public final int getTaskPartition() {
-    return taskPartition;
+  public final int getTaskId() {
+    return taskId;
   }
 
   public final GraphTaskManager<I, V, E> getGraphTaskManager() {
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 779bccb..d1dc79d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -579,15 +579,15 @@
     if (LOG.isInfoEnabled()) {
       Set<Integer> partitionSet = new TreeSet<Integer>();
       for (WorkerInfo workerInfo : healthyWorkerInfoList) {
-        partitionSet.add(workerInfo.getTaskId());
+        partitionSet.add(workerInfo.getTaskId() % maxWorkers);
       }
       for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
-        partitionSet.add(workerInfo.getTaskId());
+        partitionSet.add(workerInfo.getTaskId() % maxWorkers);
       }
       for (int i = 1; i <= maxWorkers; ++i) {
         if (partitionSet.contains(Integer.valueOf(i))) {
           continue;
-        } else if (i == getTaskPartition()) {
+        } else if (i == getTaskId() % maxWorkers) {
           continue;
         } else {
           LOG.info("logMissingWorkersOnSuperstep: No response from " +
@@ -802,7 +802,7 @@
     try {
       myBid =
           getZkExt().createExt(masterElectionPath +
-              "/" + getHostnamePartitionId(),
+              "/" + getHostnameTaskId(),
               null,
               Ids.OPEN_ACL_UNSAFE,
               CreateMode.EPHEMERAL_SEQUENTIAL,
@@ -841,7 +841,7 @@
         }
         if (masterChildArr.get(0).equals(myBid)) {
           GiraphStats.getInstance().getCurrentMasterTaskPartition().
-              setValue(getTaskPartition());
+              setValue(getTaskId());
 
           globalCommHandler = new MasterGlobalCommHandler(
               new MasterAggregatorHandler(getConfiguration(), getContext()),
@@ -860,7 +860,7 @@
                   getGraphTaskManager().createUncaughtExceptionHandler());
           masterInfo.setInetSocketAddress(masterServer.getMyAddress(),
               masterServer.getLocalHostOrIp());
-          masterInfo.setTaskId(getTaskPartition());
+          masterInfo.setTaskId(getTaskId());
           masterClient =
               new NettyMasterClient(getContext(), getConfiguration(), this,
                   getGraphTaskManager().createUncaughtExceptionHandler());
@@ -1211,6 +1211,7 @@
     setApplicationAttempt(getApplicationAttempt() + 1);
     setCachedSuperstep(checkpoint);
     setRestartedSuperstep(checkpoint);
+    checkpointStatus = CheckpointStatus.NONE;
     setJobState(ApplicationState.START_SUPERSTEP,
         getApplicationAttempt(),
         checkpoint);
@@ -1740,7 +1741,7 @@
     if (checkpointFrequency == 0) {
       return CheckpointStatus.NONE;
     }
-    long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency;
+    long firstCheckpoint = INPUT_SUPERSTEP + 1;
     if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
       firstCheckpoint = getRestartedSuperstep() + checkpointFrequency;
     }
@@ -1912,7 +1913,7 @@
     // for workers and masters, the master will clean up the ZooKeeper
     // znodes associated with this job.
     String masterCleanedUpPath = cleanedUpPath  + "/" +
-        getTaskPartition() + MASTER_SUFFIX;
+        getTaskId() + MASTER_SUFFIX;
     try {
       String finalFinishedPath =
           getZkExt().createExt(masterCleanedUpPath,
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index b6b9c12..6f02749 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -216,7 +216,7 @@
         graphTaskManager.createUncaughtExceptionHandler());
     workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
         workerServer.getLocalHostOrIp());
-    workerInfo.setTaskId(getTaskPartition());
+    workerInfo.setTaskId(getTaskId());
     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
         graphTaskManager.createUncaughtExceptionHandler());
     workerServer.setFlowControl(workerClient.getFlowControl());
@@ -243,7 +243,7 @@
     }
     observers = conf.createWorkerObservers(context);
 
-    WorkerProgress.get().setTaskId(getTaskPartition());
+    WorkerProgress.get().setTaskId(getTaskId());
     workerProgressWriter = conf.trackJobProgressOnClient() ?
         new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) :
         null;
@@ -921,7 +921,7 @@
 
     String finishedWorkerPath =
         getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
-        "/" + getHostnamePartitionId();
+        "/" + workerInfo.getHostnameId();
     try {
       getZkExt().createExt(finishedWorkerPath,
           workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()),
@@ -1202,7 +1202,7 @@
     // for workers and masters, the master will clean up the ZooKeeper
     // znodes associated with this job.
     String workerCleanedUpPath = cleanedUpPath  + "/" +
-        getTaskPartition() + WORKER_SUFFIX;
+        getTaskId() + WORKER_SUFFIX;
     try {
       String finalFinishedPath =
           getZkExt().createExt(workerCleanedUpPath,
@@ -1303,7 +1303,7 @@
     // Notify master that checkpoint is stored
     String workerWroteCheckpoint =
         getWorkerWroteCheckpointPath(getApplicationAttempt(),
-            getSuperstep()) + "/" + getHostnamePartitionId();
+            getSuperstep()) + "/" + workerInfo.getHostnameId();
     try {
       getZkExt().createExt(workerWroteCheckpoint,
           new byte[0],