[STREAMCOMP-2885] broadcast GCC completition (#3330)

* amend gitignore

* decouple GCC post-save logic from stateful controller

* tmaster broadcast checkpoint completion

the message will be first sent to all stmgrs, and then each stmgr will forward
it to every heron instance it connects to.

* heron instance handles global checkpoint saved msg

* expose necessary API for the 2PC support

unit tests for this will be in the following commit

* test: refactor mock physical plan to a builder

* test preSave and postSave hook

* [STREAMCOMP-2916] block execute on postSave

* add stateful tests for spouts

* test preRestore

* rename interface to ITwoPhaseStatefulComponent

* block spout from reading data tuples as well

* fix log format

* do not check interface

this is because the boolean will only be set to true for tasks that implement ITwoPhaseStatefulComponent. No need to do the check again.
diff --git a/.gitignore b/.gitignore
index 3b81340..c599612 100644
--- a/.gitignore
+++ b/.gitignore
@@ -90,6 +90,10 @@
 .idea/sqlDataSources.xml
 .idea/dynamic.xml
 
+# Intellij bazel plugin
+.ijwb
+.clwb
+
 ## Maven generated files
 .classpath.txt
 
@@ -134,6 +138,7 @@
 
 # Visual Studio Code
 .vscode
+vs.code-workspace
 
 # integration_test
 results/
diff --git a/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
new file mode 100644
index 0000000..6dc88c3
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.topology;
+
+import java.io.Serializable;
+
+/**
+ * Defines a stateful component that is aware of Heron topology's "two-phase commit".
+ *
+ * Note tasks saving a distributed checkpoint would be the "prepare" phase of the two-phase commit
+ * algorithm. When a distributed checkpoint is done, we can say that all tasks agree that they
+ * will not roll back to the time before that distributed checkpoint, and the "prepare" phase is
+ * complete.
+ *
+ * When the "prepare" phase is complete, Heron will invoke the "postSave" hook to signal the
+ * beginning of the "commit" phase. If there is a failure occurred during the "prepare" phase,
+ * Heron will invoke the hook "preRestore" to signal that two-phase commit is aborted, and the
+ * topology will be rolled back to the previous checkpoint.
+ *
+ * Note that the commit phase will finish after the postSave hook exits successfully. Then, the
+ * prepare phase of the following checkpoint will begin.
+ *
+ * In addition, for two-phase stateful components specifically, Heron will not execute (for bolts)
+ * or produce (for spouts) tuples between preSave and postSave. This will guarantee that the prepare
+ * phase of the next checkpoint will not overlap with the commit phase of the current checkpoint
+ * (eg. we block execution of tuples from the next checkpoint unless commit phase is done).
+ *
+ * See the end-to-end effectively-once designed doc (linked in the PR of this commit) for more
+ * details.
+ */
+public interface ITwoPhaseStatefulComponent<K extends Serializable, V extends Serializable>
+    extends IStatefulComponent<K, V> {
+
+  /**
+   * This is a hook for the component to perform some actions after a checkpoint is persisted
+   * successfully for all components in the topology.
+   *
+   * @param checkpointId the ID of the checkpoint
+   */
+  void postSave(String checkpointId);
+
+  /**
+   * This is a hook for the component to perform some actions (eg. state clean-up) before the
+   * framework attempts to delete the component and restore it to a previously-saved checkpoint.
+   *
+   * @param checkpointId the ID of the checkpoint that the component is being restored to
+   */
+  void preRestore(String checkpointId);
+
+}
diff --git a/heron/instance/src/java/org/apache/heron/instance/IInstance.java b/heron/instance/src/java/org/apache/heron/instance/IInstance.java
index de7fbd0..9671a1a 100644
--- a/heron/instance/src/java/org/apache/heron/instance/IInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/IInstance.java
@@ -58,6 +58,21 @@
   void clean();
 
   /**
+   * Inform the Instance that the framework will clean, stop, and delete the instance
+   * in order to restore its state to a previously-saved checkpoint.
+   *
+   * @param checkpointId the ID of the checkpoint the instance will be restoring to
+   */
+  void preRestore(String checkpointId);
+
+  /**
+   * Inform the Instance that a particular checkpoint has become globally consistent
+   *
+   * @param checkpointId the ID of the checkpoint that became globally consistent
+   */
+  void onCheckpointSaved(String checkpointId);
+
+  /**
    * Destroy the whole IInstance.
    * Notice: It should only be called when the whole program is
    * exiting. And in fact, this method should never be called.
diff --git a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
index 789f53b..c58c384 100644
--- a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
+++ b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
@@ -26,11 +26,13 @@
   private PhysicalPlanHelper newPhysicalPlanHelper;
   private CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest;
   private CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing;
+  private CheckpointManager.StatefulConsistentCheckpointSaved statefulConsistentCheckpointSaved;
 
   private InstanceControlMsg(Builder builder) {
     this.newPhysicalPlanHelper = builder.newPhysicalPlanHelper;
     this.restoreInstanceStateRequest = builder.restoreInstanceStateRequest;
     this.startInstanceStatefulProcessing = builder.startInstanceStatefulProcessing;
+    this.statefulConsistentCheckpointSaved = builder.statefulConsistentCheckpointSaved;
   }
 
   public static Builder newBuilder() {
@@ -41,6 +43,10 @@
     return newPhysicalPlanHelper;
   }
 
+  public CheckpointManager.StatefulConsistentCheckpointSaved getStatefulCheckpointSavedMessage() {
+    return this.statefulConsistentCheckpointSaved;
+  }
+
   public boolean isNewPhysicalPlanHelper() {
     return newPhysicalPlanHelper != null;
   }
@@ -65,6 +71,7 @@
     private PhysicalPlanHelper newPhysicalPlanHelper;
     private CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest;
     private CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing;
+    private CheckpointManager.StatefulConsistentCheckpointSaved statefulConsistentCheckpointSaved;
 
     private Builder() {
 
@@ -87,6 +94,12 @@
       return this;
     }
 
+    public Builder setStatefulCheckpointSaved(
+        CheckpointManager.StatefulConsistentCheckpointSaved message) {
+      this.statefulConsistentCheckpointSaved = message;
+      return this;
+    }
+
     public InstanceControlMsg build() {
       return new InstanceControlMsg(this);
     }
diff --git a/heron/instance/src/java/org/apache/heron/instance/Slave.java b/heron/instance/src/java/org/apache/heron/instance/Slave.java
index 5b85fc6..4e57772 100644
--- a/heron/instance/src/java/org/apache/heron/instance/Slave.java
+++ b/heron/instance/src/java/org/apache/heron/instance/Slave.java
@@ -123,6 +123,16 @@
           if (instanceControlMsg.isNewPhysicalPlanHelper()) {
             handleNewPhysicalPlan(instanceControlMsg);
           }
+
+          // When a checkpoint becomes "globally consistent"
+          if (instanceControlMsg.getStatefulCheckpointSavedMessage() != null) {
+            String checkpointId = instanceControlMsg
+                .getStatefulCheckpointSavedMessage()
+                .getConsistentCheckpoint()
+                .getCheckpointId();
+
+            handleGlobalCheckpointConsistent(checkpointId);
+          }
         }
       }
     };
@@ -130,6 +140,11 @@
     slaveLooper.addTasksOnWakeup(handleControlMessageTask);
   }
 
+  private void handleGlobalCheckpointConsistent(String checkpointId) {
+    LOG.log(Level.INFO, "checkpoint: {0} has become globally consistent", checkpointId);
+    instance.onCheckpointSaved(checkpointId);
+  }
+
   private void resetCurrentAssignment() {
     helper.setTopologyContext(metricsCollector);
     instance = helper.getMySpout() != null
@@ -262,7 +277,7 @@
     startInstanceIfNeeded();
   }
 
-  private void cleanAndStopSlave() {
+  private void cleanAndStopSlaveBeforeRestore(String checkpointId) {
     // Clear all queues
     streamInCommunicator.clear();
     streamOutCommunicator.clear();
@@ -275,6 +290,7 @@
     slaveLooper.clearTimers();
 
     if (instance != null) {
+      instance.preRestore(checkpointId);
       instance.clean();
     }
 
@@ -294,9 +310,13 @@
   private void handleRestoreInstanceStateRequest(InstanceControlMsg instanceControlMsg) {
     CheckpointManager.RestoreInstanceStateRequest request =
         instanceControlMsg.getRestoreInstanceStateRequest();
+
+    // ID of the checkpoint we are restoring to
+    String checkpointId = request.getState().getCheckpointId();
+
     // Clean buffers and unregister tasks in slave looper
     if (isInstanceStarted) {
-      cleanAndStopSlave();
+      cleanAndStopSlaveBeforeRestore(checkpointId);
     }
 
     // Restore the state
diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index 9cc177a..a53ed3d 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -37,6 +37,7 @@
 import org.apache.heron.api.serializer.IPluggableSerializer;
 import org.apache.heron.api.state.State;
 import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
 import org.apache.heron.api.topology.IUpdatable;
 import org.apache.heron.api.utils.Utils;
 import org.apache.heron.common.basics.Communicator;
@@ -74,6 +75,9 @@
 
   private State<Serializable, Serializable> instanceState;
 
+  // default to false, can only be toggled to true if bolt implements ITwoPhaseStatefulComponent
+  private boolean waitingForCheckpointSaved;
+
   // The reference to topology's config
   private final Map<String, Object> config;
 
@@ -110,6 +114,8 @@
             String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
             helper.getMyInstanceId());
 
+    this.waitingForCheckpointSaved = false;
+
     if (helper.getMyBolt() == null) {
       throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
     }
@@ -162,10 +168,12 @@
     // so that topology emit, ack, fail are thread safe
     collector.lock.lock();
     try {
-      // Checkpoint
       if (bolt instanceof IStatefulComponent) {
         ((IStatefulComponent) bolt).preSave(checkpointId);
       }
+      if (bolt instanceof ITwoPhaseStatefulComponent) {
+        waitingForCheckpointSaved = true;
+      }
       collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation);
     } finally {
       collector.lock.unlock();
@@ -214,6 +222,21 @@
   }
 
   @Override
+  public void preRestore(String checkpointId) {
+    if (bolt instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) bolt).preRestore(checkpointId);
+    }
+  }
+
+  @Override
+  public void onCheckpointSaved(String checkpointId) {
+    if (bolt instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) bolt).postSave(checkpointId);
+      waitingForCheckpointSaved = false;
+    }
+  }
+
+  @Override
   public void clean() {
     // Invoke clean up hook before clean() is called
     helper.getTopologyContext().invokeHookCleanup();
@@ -238,6 +261,7 @@
       @Override
       public void run() {
         boltMetrics.updateTaskRunCount();
+
         // Back-pressure -- only when we could send out tuples will we read & execute tuples
         if (collector.isOutQueuesAvailable()) {
           boltMetrics.updateExecutionCount();
@@ -271,7 +295,7 @@
 
     long startOfCycle = System.nanoTime();
     // Read data from in Queues
-    while (!inQueue.isEmpty()) {
+    while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
       Message msg = inQueue.poll();
 
       if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
diff --git a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index 53675c4..e9a0e13 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -36,6 +36,7 @@
 import org.apache.heron.api.spout.SpoutOutputCollector;
 import org.apache.heron.api.state.State;
 import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
 import org.apache.heron.api.topology.IUpdatable;
 import org.apache.heron.api.utils.Utils;
 import org.apache.heron.common.basics.ByteAmount;
@@ -71,6 +72,9 @@
   private final boolean spillState;
   private final String spillStateLocation;
 
+  // default to false, can only be toggled to true if spout implements ITwoPhaseStatefulComponent
+  private boolean waitingForCheckpointSaved;
+
   private State<Serializable, Serializable> instanceState;
 
   private final SlaveLooper looper;
@@ -110,6 +114,8 @@
         String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
         helper.getMyInstanceId());
 
+    this.waitingForCheckpointSaved = false;
+
     LOG.info("Is this topology stateful: " + isTopologyStateful);
 
     if (helper.getMySpout() == null) {
@@ -171,10 +177,15 @@
       if (spout instanceof IStatefulComponent) {
         ((IStatefulComponent) spout).preSave(checkpointId);
       }
+
+      if (spout instanceof ITwoPhaseStatefulComponent) {
+        waitingForCheckpointSaved = true;
+      }
       collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation);
     } finally {
       collector.lock.unlock();
     }
+
     LOG.info("State persisted for checkpoint: " + checkpointId);
   }
 
@@ -219,6 +230,21 @@
   }
 
   @Override
+  public void preRestore(String checkpointId) {
+    if (spout instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) spout).preRestore(checkpointId);
+    }
+  }
+
+  @Override
+  public void onCheckpointSaved(String checkpointId) {
+    if (spout instanceof ITwoPhaseStatefulComponent) {
+      ((ITwoPhaseStatefulComponent) spout).postSave(checkpointId);
+      waitingForCheckpointSaved = false;
+    }
+  }
+
+  @Override
   public void clean() {
     // Invoke clean up hook before clean() is called
     helper.getTopologyContext().invokeHookCleanup();
@@ -257,6 +283,9 @@
       public void run() {
         spoutMetrics.updateTaskRunCount();
 
+        // Check if we have any message to process anyway
+        readTuplesAndExecute(streamInQueue);
+
         // Check whether we should produce more tuples
         if (isProduceTuple()) {
           spoutMetrics.updateProduceTupleCount();
@@ -272,9 +301,6 @@
           spoutMetrics.updateOutQueueFullCount();
         }
 
-        // Check if we have any message to process anyway
-        readTuplesAndExecute(streamInQueue);
-
         if (ackEnabled) {
           // Update the pending-to-be-acked tuples counts
           spoutMetrics.updatePendingTuplesCount(collector.numInFlight());
@@ -330,12 +356,14 @@
    * It is allowed in:
    * 1. Outgoing Stream queue is available
    * 2. Topology State is RUNNING
+   * 3. If the Spout implements ITwoPhaseStatefulComponent, not waiting for checkpoint saved message
    *
    * @return true to allow produceTuple() to be invoked
    */
   private boolean isProduceTuple() {
     return collector.isOutQueuesAvailable()
-        && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
+        && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING)
+        && !waitingForCheckpointSaved;
   }
 
   protected void produceTuple() {
@@ -435,7 +463,7 @@
     long startOfCycle = System.nanoTime();
     Duration spoutAckBatchTime = systemConfig.getInstanceAckBatchTime();
 
-    while (!inQueue.isEmpty()) {
+    while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
       Message msg = inQueue.poll();
 
       if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
diff --git a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
index 69b2493..fb9212b 100644
--- a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
+++ b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
@@ -120,6 +120,7 @@
     registerOnMessage(CheckpointManager.InitiateStatefulCheckpoint.newBuilder());
     registerOnMessage(CheckpointManager.RestoreInstanceStateRequest.newBuilder());
     registerOnMessage(CheckpointManager.StartInstanceStatefulProcessing.newBuilder());
+    registerOnMessage(CheckpointManager.StatefulConsistentCheckpointSaved.newBuilder());
   }
 
 
@@ -205,6 +206,8 @@
       handleRestoreInstanceStateRequest((CheckpointManager.RestoreInstanceStateRequest) message);
     } else if (message instanceof CheckpointManager.StartInstanceStatefulProcessing) {
       handleStartStatefulRequest((CheckpointManager.StartInstanceStatefulProcessing) message);
+    } else if (message instanceof CheckpointManager.StatefulConsistentCheckpointSaved) {
+      handleCheckpointSaved((CheckpointManager.StatefulConsistentCheckpointSaved) message);
     } else {
       throw new RuntimeException("Unknown kind of message received from Stream Manager");
     }
@@ -286,6 +289,17 @@
     inControlQueue.offer(instanceControlMsg);
   }
 
+  private void handleCheckpointSaved(
+      CheckpointManager.StatefulConsistentCheckpointSaved message) {
+    LOG.info("Received a StatefulCheckpointSaved message with checkpoint id: "
+        + message.getConsistentCheckpoint().getCheckpointId());
+
+    InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder()
+        .setStatefulCheckpointSaved(message)
+        .build();
+    inControlQueue.offer(instanceControlMsg);
+  }
+
   private void handleRestoreInstanceStateRequest(
       CheckpointManager.RestoreInstanceStateRequest request) {
     LOG.info("Received a RestoreInstanceState request with checkpoint id: "
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index 0c79300..23b2186 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -24,8 +24,10 @@
         "org.apache.heron.grouping.EmitDirectBoltTest",
         "org.apache.heron.grouping.EmitDirectSpoutTest",
         "org.apache.heron.instance.bolt.BoltInstanceTest",
+        "org.apache.heron.instance.bolt.BoltStatefulInstanceTest",
         "org.apache.heron.instance.spout.ActivateDeactivateTest",
         "org.apache.heron.instance.spout.SpoutInstanceTest",
+        "org.apache.heron.instance.spout.SpoutStatefulInstanceTest",
         "org.apache.heron.metrics.GlobalMetricsTest",
         "org.apache.heron.metrics.MultiAssignableMetricTest",
         "org.apache.heron.network.ConnectTest",
diff --git a/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
new file mode 100644
index 0000000..d5eac2c
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.instance.bolt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.serializer.IPluggableSerializer;
+import org.apache.heron.api.serializer.JavaSerializer;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.instance.SlaveTester;
+import org.apache.heron.proto.system.HeronTuples;
+import org.apache.heron.proto.system.PhysicalPlans;
+import org.apache.heron.resource.Constants;
+import org.apache.heron.resource.MockPhysicalPlansBuilder;
+import org.apache.heron.resource.TestSpout;
+import org.apache.heron.resource.TestStatefulBolt;
+import org.apache.heron.resource.TestTwoPhaseStatefulBolt;
+import org.apache.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test if stateful bolt is able to respond to incoming control/data tuples as expected.
+ */
+public class BoltStatefulInstanceTest {
+  private SlaveTester slaveTester;
+  private static IPluggableSerializer serializer = new JavaSerializer();
+
+  @Before
+  public void before() {
+    slaveTester = new SlaveTester();
+    slaveTester.start();
+  }
+
+  @After
+  public void after() throws NoSuchFieldException, IllegalAccessException {
+    slaveTester.stop();
+  }
+
+  @Test
+  public void testPreSaveAndPostSave() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke preSave
+    slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke postSave
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+  }
+
+  @Test
+  public void testPreRestore() throws InterruptedException {
+    CountDownLatch preRestoreLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+    assertEquals(1, preRestoreLatch.getCount());
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
+
+    assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preRestoreLatch.getCount());
+  }
+
+  /**
+   * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked
+   * unless the corresponding postSave is called.
+   */
+  @Test
+  public void testPostSaveBlockExecute() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+
+    CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // this should invoke preSave
+    slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // put a data tuple into the inStreamQueue
+    slaveTester.getInStreamQueue().offer(buildTupleSet());
+
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // Wait for a bounded amount of time, assert that the tuple will not execute as it is
+    // blocked on postSave. This is because we only want to allow one uncommitted "transaction" on
+    // each task. See the design doc for more details.
+    assertFalse(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // this should invoke postSave
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+    assertEquals(0, executeLatch.getCount());
+  }
+
+  /**
+   * Ensure that the aforementioned behaviour does not apply for bolts that don't implement
+   * ITwoPhaseStatefulComponent
+   */
+  @Test
+  public void testExecuteNotBlocked() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulBolt());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, executeLatch.getCount());
+
+    // this should invoke preSave
+    slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // put a data tuple into the inStreamQueue
+    slaveTester.getInStreamQueue().offer(buildTupleSet());
+
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+
+    // no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent
+    assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, executeLatch.getCount());
+  }
+
+  // build a tuple set that contains one data tuple
+  private HeronTuples.HeronTupleSet buildTupleSet() {
+    HeronTuples.HeronTupleSet.Builder heronTupleSet = HeronTuples.HeronTupleSet.newBuilder();
+    heronTupleSet.setSrcTaskId(1);
+    HeronTuples.HeronDataTupleSet.Builder dataTupleSet = HeronTuples.HeronDataTupleSet.newBuilder();
+    TopologyAPI.StreamId.Builder streamId = TopologyAPI.StreamId.newBuilder();
+    streamId.setComponentName("test-spout");
+    streamId.setId("default");
+    dataTupleSet.setStream(streamId);
+
+    HeronTuples.HeronDataTuple.Builder dataTuple = HeronTuples.HeronDataTuple.newBuilder();
+    dataTuple.setKey(0);
+
+    HeronTuples.RootId.Builder rootId = HeronTuples.RootId.newBuilder();
+    rootId.setKey(0);
+    rootId.setTaskid(0);
+    dataTuple.addRoots(rootId);
+
+    dataTuple.addValues(ByteString.copyFrom(serializer.serialize("A")));
+    dataTupleSet.addTuples(dataTuple);
+    heronTupleSet.setData(dataTupleSet);
+
+    return heronTupleSet.build();
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageFor2PCBolt() {
+    return buildPhysicalPlanMessage(new TestTwoPhaseStatefulBolt());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageForStatefulBolt() {
+    return buildPhysicalPlanMessage(new TestStatefulBolt());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessage(IRichBolt bolt) {
+    PhysicalPlans.PhysicalPlan physicalPlan =
+        MockPhysicalPlansBuilder
+            .newBuilder()
+            .withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
+            .withTopologyState(TopologyAPI.TopologyState.RUNNING)
+            .withSpoutInstance(
+                "test-spout",
+                0,
+                "spout-id",
+                new TestSpout()
+            )
+            .withBoltInstance(
+                "test-bolt",
+                1,
+                "bolt-id",
+                "test-spout",
+                bolt
+            )
+            .build();
+
+    PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "bolt-id");
+
+    return InstanceControlMsg.newBuilder()
+        .setNewPhysicalPlanHelper(ph)
+        .build();
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
new file mode 100644
index 0000000..4d65aec
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.instance.spout;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.serializer.IPluggableSerializer;
+import org.apache.heron.api.serializer.JavaSerializer;
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.instance.SlaveTester;
+import org.apache.heron.proto.system.PhysicalPlans;
+import org.apache.heron.resource.Constants;
+import org.apache.heron.resource.MockPhysicalPlansBuilder;
+import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestStatefulSpout;
+import org.apache.heron.resource.TestTwoPhaseStatefulSpout;
+import org.apache.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.*;
+
+public class SpoutStatefulInstanceTest {
+
+  private SlaveTester slaveTester;
+  private static IPluggableSerializer serializer = new JavaSerializer();
+
+  @Before
+  public void before() {
+    slaveTester = new SlaveTester();
+    slaveTester.start();
+  }
+
+  @After
+  public void after() throws NoSuchFieldException, IllegalAccessException {
+    slaveTester.stop();
+  }
+
+  @Test
+  public void testPreSaveAndPostSave() throws Exception {
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke preSave
+    slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke postSave
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+  }
+
+  @Test
+  public void testPreRestore() throws InterruptedException {
+    CountDownLatch preRestoreLatch = new CountDownLatch(1);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+    assertEquals(1, preRestoreLatch.getCount());
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
+
+    assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preRestoreLatch.getCount());
+  }
+
+  /**
+   * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked
+   * unless the corresponding postSave is called.
+   */
+  @Test
+  public void testPostSaveBlockExecute() throws Exception {
+    // when this boolean is set to false, nextTuple on the spout will be run, but the spout will
+    // make sure to not emit any tuples.
+    AtomicBoolean shouldStartEmit = new AtomicBoolean(false);
+    SingletonRegistry.INSTANCE.registerSingleton(
+        Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit);
+
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch postSaveLatch = new CountDownLatch(1);
+    CountDownLatch emitLatch = new CountDownLatch(1);
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, emitLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+
+    // this should invoke preSave
+    slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // tell the spout to start emitting tuples
+    assertFalse(shouldStartEmit.getAndSet(true));
+
+    // since preSave is executed, spout will not emit until postSave is called
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, emitLatch.getCount());
+
+    // Wait for a bounded amount of time, assert that the spout will not emit tuples as it is
+    // blocked on postSave. This is because we only want to allow one uncommitted "transaction" on
+    // each task. See the design doc for more details.
+    assertFalse(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(1, postSaveLatch.getCount());
+    assertEquals(1, emitLatch.getCount());
+
+    // this should invoke postSave
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+    assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, postSaveLatch.getCount());
+    assertEquals(0, emitLatch.getCount());
+  }
+
+  /**
+   * Ensure that the aforementioned behaviour does not apply for spouts that don't implement
+   * ITwoPhaseStatefulComponent
+   */
+  @Test
+  public void testExecuteNotBlocked() throws Exception {
+    // when this boolean is set to false, nextTuple on the spout will be run, but the spout will
+    // make sure to not emit any tuples.
+    AtomicBoolean shouldStartEmit = new AtomicBoolean(false);
+    SingletonRegistry.INSTANCE.registerSingleton(
+        Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit);
+
+    CountDownLatch preSaveLatch = new CountDownLatch(1);
+    CountDownLatch emitLatch = new CountDownLatch(1);
+
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+    SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, emitLatch);
+
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+    slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+    slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulSpout());
+
+    // initially non of preSave or postSave are invoked yet
+    assertEquals(1, preSaveLatch.getCount());
+    assertEquals(1, emitLatch.getCount());
+
+    // this should invoke preSave
+    slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+    // tell the spout to start emitting tuples
+    assertFalse(shouldStartEmit.getAndSet(true));
+
+    assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+
+    // no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent
+    assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+    assertEquals(0, preSaveLatch.getCount());
+    assertEquals(0, emitLatch.getCount());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageFor2PCSpout() {
+    return buildPhysicalPlanMessage(new TestTwoPhaseStatefulSpout());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessageForStatefulSpout() {
+    return buildPhysicalPlanMessage(new TestStatefulSpout());
+  }
+
+  private InstanceControlMsg buildPhysicalPlanMessage(IRichSpout spout) {
+    PhysicalPlans.PhysicalPlan physicalPlan =
+        MockPhysicalPlansBuilder
+            .newBuilder()
+            .withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
+            .withTopologyState(TopologyAPI.TopologyState.RUNNING)
+            .withSpoutInstance(
+                "test-spout",
+                0,
+                "spout-id",
+                spout
+            )
+            .withBoltInstance(
+                "test-bolt",
+                1,
+                "bolt-id",
+                "test-spout",
+                new TestBolt()
+            )
+            .build();
+
+    PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "spout-id");
+
+    return InstanceControlMsg.newBuilder()
+        .setNewPhysicalPlanHelper(ph)
+        .build();
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/Constants.java b/heron/instance/tests/java/org/apache/heron/resource/Constants.java
index 3ffe9b0..9b7d2ce 100644
--- a/heron/instance/tests/java/org/apache/heron/resource/Constants.java
+++ b/heron/instance/tests/java/org/apache/heron/resource/Constants.java
@@ -39,12 +39,18 @@
   public static final String ACK_COUNT = "ack-count";
 
   public static final String EXECUTE_LATCH = "execute-latch";
+  public static final String EMIT_LATCH = "emit-latch";
   public static final String FAIL_LATCH = "fail-latch";
   public static final String ACK_LATCH = "ack-latch";
 
   public static final String ACTIVATE_COUNT_LATCH = "activate-count-latch";
   public static final String DEACTIVATE_COUNT_LATCH = "deactivate-count-latch";
 
+  public static final String PRESAVE_LATCH = "preSave-latch";
+  public static final String POSTSAVE_LATCH = "postSave-latch";
+  public static final String PRERESTORE_LATCH = "postSave-latch";
+  public static final String SPOUT_SHOULD_START_EMIT = "spout-should-start-emit";
+
   public static final String RECEIVED_STRING_LIST = "received-string-list";
 
   public static final String HERON_SYSTEM_CONFIG = "org.apache.heron.common.config.SystemConfig";
diff --git a/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
new file mode 100644
index 0000000..14a36cf
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.proto.system.PhysicalPlans;
+
+@Ignore
+public final class MockPhysicalPlansBuilder {
+
+  private PhysicalPlans.PhysicalPlan.Builder pPlan;
+  private TopologyBuilder topologyBuilder;
+  private List<PhysicalPlans.Instance.Builder> instanceBuilders;
+
+  private Config conf;
+  private TopologyAPI.TopologyState initialTopologyState;
+
+  private MockPhysicalPlansBuilder() {
+    pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
+    topologyBuilder = new TopologyBuilder();
+    instanceBuilders = new ArrayList<>();
+
+    conf = null;
+    initialTopologyState = null;
+  }
+
+  public static MockPhysicalPlansBuilder newBuilder() {
+    return new MockPhysicalPlansBuilder();
+  }
+
+  public MockPhysicalPlansBuilder withTopologyConfig(
+      Config.TopologyReliabilityMode reliabilityMode,
+      int messageTimeout
+  ) {
+    conf = new Config();
+    conf.setTeamEmail("streaming-compute@twitter.com");
+    conf.setTeamName("stream-computing");
+    conf.setTopologyProjectName("heron-integration-test");
+    conf.setNumStmgrs(1);
+    conf.setMaxSpoutPending(100);
+    conf.setTopologyReliabilityMode(reliabilityMode);
+    if (messageTimeout != -1) {
+      conf.setMessageTimeoutSecs(messageTimeout);
+      conf.put("topology.enable.message.timeouts", "true");
+    }
+
+    return this;
+  }
+
+  public MockPhysicalPlansBuilder withTopologyState(TopologyAPI.TopologyState topologyState) {
+    initialTopologyState = topologyState;
+    return this;
+  }
+
+  public MockPhysicalPlansBuilder withSpoutInstance(
+      String componentName,
+      int taskId,
+      String instanceId,
+      IRichSpout spout
+  ) {
+    PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
+    spoutInstanceInfo.setComponentName(componentName);
+    spoutInstanceInfo.setTaskId(taskId);
+    spoutInstanceInfo.setComponentIndex(0);
+
+    PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder();
+    spoutInstance.setInstanceId(instanceId);
+    spoutInstance.setStmgrId("stream-manager-id");
+    spoutInstance.setInfo(spoutInstanceInfo);
+
+    topologyBuilder.setSpout(componentName, spout, 1);
+    instanceBuilders.add(spoutInstance);
+
+    return this;
+  }
+
+  public MockPhysicalPlansBuilder withBoltInstance(
+      String componentName,
+      int taskId,
+      String instanceId,
+      String upStreamComponentId,
+      IRichBolt bolt
+  ) {
+    PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
+    boltInstanceInfo.setComponentName(componentName);
+    boltInstanceInfo.setTaskId(taskId);
+    boltInstanceInfo.setComponentIndex(0);
+
+    PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder();
+    boltInstance.setInstanceId(instanceId);
+    boltInstance.setStmgrId("stream-manager-id");
+    boltInstance.setInfo(boltInstanceInfo);
+
+    topologyBuilder.setBolt(componentName, bolt, 1)
+        .shuffleGrouping(upStreamComponentId);
+    instanceBuilders.add(boltInstance);
+
+    return this;
+  }
+
+  public PhysicalPlans.PhysicalPlan build() {
+    addStmgrs();
+    pPlan.setTopology(buildTopology());
+
+    for (PhysicalPlans.Instance.Builder b : instanceBuilders) {
+      pPlan.addInstances(b);
+    }
+
+    return pPlan.build();
+  }
+
+  private TopologyAPI.Topology buildTopology() {
+    return topologyBuilder
+        .createTopology()
+        .setName("topology-name")
+        .setConfig(conf)
+        .setState(initialTopologyState)
+        .getTopology();
+  }
+
+  private void addStmgrs() {
+    PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
+    stmgr.setId("stream-manager-id");
+    stmgr.setHostName("127.0.0.1");
+    stmgr.setDataPort(8888);
+    stmgr.setLocalEndpoint("endpoint");
+    pPlan.addStmgrs(stmgr);
+  }
+
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java
new file mode 100644
index 0000000..e16efff
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestStatefulBolt extends BaseRichBolt
+    implements IStatefulComponent<String, String> {
+  @Override
+  public void prepare(
+      Map<String, Object> heronConf,
+      TopologyContext context,
+      OutputCollector collector) {
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    CountDownLatch tupleExecutedLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH);
+
+    if (tupleExecutedLatch != null) {
+      tupleExecutedLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java
new file mode 100644
index 0000000..c22a917
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestStatefulSpout extends BaseRichSpout implements IStatefulComponent<String, String> {
+  @Override
+  public void open(
+      Map<String, Object> conf,
+      TopologyContext context,
+      SpoutOutputCollector collector) {
+  }
+
+  @Override
+  public void nextTuple() {
+    AtomicBoolean shouldStartEmit =
+        (AtomicBoolean) SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT);
+
+    if (shouldStartEmit != null && !shouldStartEmit.get()) {
+      return;
+    }
+
+    // actually "emit" the tuple
+    CountDownLatch emitLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH);
+
+    if (emitLatch != null) {
+      emitLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
new file mode 100644
index 0000000..b67a9ee
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+@Ignore
+public class TestTwoPhaseStatefulBolt extends BaseRichBolt
+    implements ITwoPhaseStatefulComponent<String, String> {
+
+  private static final long serialVersionUID = -5160420613503624743L;
+
+  @Override
+  public void prepare(
+      Map<String, Object> map,
+      TopologyContext topologyContext,
+      OutputCollector collector) {
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    CountDownLatch tupleExecutedLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH);
+
+    if (tupleExecutedLatch != null) {
+      tupleExecutedLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    outputFieldsDeclarer.declare(new Fields("word"));
+  }
+
+  @Override
+  public void postSave(String checkpointId) {
+    CountDownLatch postSaveLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH);
+
+    if (postSaveLatch != null) {
+      postSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void preRestore(String checkpointId) {
+    CountDownLatch preRestoreLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH);
+
+    if (preRestoreLatch != null) {
+      preRestoreLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
new file mode 100644
index 0000000..a4b6efd
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestTwoPhaseStatefulSpout extends BaseRichSpout
+    implements ITwoPhaseStatefulComponent<String, String> {
+  @Override
+  public void open(
+      Map<String, Object> conf,
+      TopologyContext context,
+      SpoutOutputCollector collector) {
+  }
+
+  @Override
+  public void nextTuple() {
+    AtomicBoolean shouldStartEmit =
+        (AtomicBoolean) SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT);
+
+    if (shouldStartEmit != null && !shouldStartEmit.get()) {
+      return;
+    }
+
+    // actually "emit" the tuple
+    CountDownLatch emitLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH);
+
+    if (emitLatch != null) {
+      emitLatch.countDown();
+    }
+  }
+
+  @Override
+  public void postSave(String checkpointId) {
+    CountDownLatch postSaveLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH);
+
+    if (postSaveLatch != null) {
+      postSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void preRestore(String checkpointId) {
+    CountDownLatch preRestoreLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH);
+
+    if (preRestoreLatch != null) {
+      preRestoreLatch.countDown();
+    }
+  }
+
+  @Override
+  public void initState(State<String, String> state) {
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    CountDownLatch preSaveLatch =
+        (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+    if (preSaveLatch != null) {
+      preSaveLatch.countDown();
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+  }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
index 904ecab..8e39333 100644
--- a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
+++ b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
@@ -23,14 +23,17 @@
 import java.nio.file.Paths;
 import java.util.Map;
 
+import com.google.protobuf.Message;
+
 import org.junit.Ignore;
 
 import org.apache.heron.api.Config;
 import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyBuilder;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.config.SystemConfig;
 import org.apache.heron.common.config.SystemConfigKey;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.proto.ckptmgr.CheckpointManager;
 import org.apache.heron.proto.stmgr.StreamManager;
 import org.apache.heron.proto.system.Common;
 import org.apache.heron.proto.system.PhysicalPlans;
@@ -60,91 +63,36 @@
   public static PhysicalPlans.PhysicalPlan getPhysicalPlan(
       boolean ackEnabled,
       int messageTimeout,
-      TopologyAPI.TopologyState topologyState) {
-    PhysicalPlans.PhysicalPlan.Builder pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
+      TopologyAPI.TopologyState topologyState
+  ) {
+    Config.TopologyReliabilityMode reliabilityMode = ackEnabled
+        ? Config.TopologyReliabilityMode.ATLEAST_ONCE
+        : Config.TopologyReliabilityMode.ATMOST_ONCE;
 
-    setTopology(pPlan, ackEnabled, messageTimeout, topologyState);
-
-    setInstances(pPlan);
-
-    setStMgr(pPlan);
-
-    return pPlan.build();
+    return MockPhysicalPlansBuilder
+        .newBuilder()
+        .withTopologyConfig(reliabilityMode, messageTimeout)
+        .withTopologyState(topologyState)
+        .withSpoutInstance(
+            "test-spout",
+            0,
+            "spout-id",
+            new TestSpout()
+        )
+        .withBoltInstance(
+            "test-bolt",
+            1,
+            "bolt-id",
+            "test-spout",
+            new TestBolt()
+        )
+        .build();
   }
 
   public static PhysicalPlans.PhysicalPlan getPhysicalPlan(boolean ackEnabled, int messageTimeout) {
     return getPhysicalPlan(ackEnabled, messageTimeout, TopologyAPI.TopologyState.RUNNING);
   }
 
-  private static void setTopology(PhysicalPlans.PhysicalPlan.Builder pPlan, boolean ackEnabled,
-                                  int messageTimeout, TopologyAPI.TopologyState topologyState) {
-    TopologyBuilder topologyBuilder = new TopologyBuilder();
-    topologyBuilder.setSpout("test-spout", new TestSpout(), 1);
-    // Here we need case switch to corresponding grouping
-    topologyBuilder.setBolt("test-bolt", new TestBolt(), 1).shuffleGrouping("test-spout");
-
-    Config conf = new Config();
-    conf.setTeamEmail("streaming-compute@twitter.com");
-    conf.setTeamName("stream-computing");
-    conf.setTopologyProjectName("heron-integration-test");
-    conf.setNumStmgrs(1);
-    conf.setMaxSpoutPending(100);
-    if (ackEnabled) {
-      conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATLEAST_ONCE);
-    } else {
-      conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATMOST_ONCE);
-    }
-    if (messageTimeout != -1) {
-      conf.setMessageTimeoutSecs(messageTimeout);
-      conf.put("topology.enable.message.timeouts", "true");
-    }
-
-    TopologyAPI.Topology fTopology =
-        topologyBuilder.createTopology().
-            setName("topology-name").
-            setConfig(conf).
-            setState(topologyState).
-            getTopology();
-
-    pPlan.setTopology(fTopology);
-  }
-
-  private static void setInstances(PhysicalPlans.PhysicalPlan.Builder pPlan) {
-    // Construct the spoutInstance
-    PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
-    spoutInstanceInfo.setComponentName("test-spout");
-    spoutInstanceInfo.setTaskId(0);
-    spoutInstanceInfo.setComponentIndex(0);
-
-    PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder();
-    spoutInstance.setInstanceId("spout-id");
-    spoutInstance.setStmgrId("stream-manager-id");
-    spoutInstance.setInfo(spoutInstanceInfo);
-
-    // Construct the boltInstanceInfo
-    PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
-    boltInstanceInfo.setComponentName("test-bolt");
-    boltInstanceInfo.setTaskId(1);
-    boltInstanceInfo.setComponentIndex(0);
-
-    PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder();
-    boltInstance.setInstanceId("bolt-id");
-    boltInstance.setStmgrId("stream-manager-id");
-    boltInstance.setInfo(boltInstanceInfo);
-
-    pPlan.addInstances(spoutInstance);
-    pPlan.addInstances(boltInstance);
-  }
-
-  private static void setStMgr(PhysicalPlans.PhysicalPlan.Builder pPlan) {
-    PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
-    stmgr.setId("stream-manager-id");
-    stmgr.setHostName("127.0.0.1");
-    stmgr.setDataPort(8888);
-    stmgr.setLocalEndpoint("endpoint");
-    pPlan.addStmgrs(stmgr);
-  }
-
   @SuppressWarnings("unchecked")
   public static void clearSingletonRegistry() throws IllegalAccessException, NoSuchFieldException {
     // Remove the Singleton by Reflection
@@ -196,4 +144,60 @@
     return registerInstanceResponse.build();
   }
 
+  public static Message buildPersistStateMessage(String checkpointId) {
+    CheckpointManager.InitiateStatefulCheckpoint.Builder builder = CheckpointManager
+        .InitiateStatefulCheckpoint
+        .newBuilder();
+
+    builder.setCheckpointId(checkpointId);
+
+    return builder.build();
+  }
+
+  public static InstanceControlMsg buildRestoreInstanceState(String checkpointId) {
+    return InstanceControlMsg.newBuilder()
+        .setRestoreInstanceStateRequest(
+            CheckpointManager.RestoreInstanceStateRequest
+                .newBuilder()
+                .setState(CheckpointManager.InstanceStateCheckpoint
+                    .newBuilder()
+                    .setCheckpointId(checkpointId))
+                .build()
+        )
+        .build();
+  }
+
+  public static InstanceControlMsg buildStartInstanceProcessingMessage(String checkpointId) {
+    return InstanceControlMsg.newBuilder()
+        .setStartInstanceStatefulProcessing(
+            CheckpointManager.StartInstanceStatefulProcessing
+                .newBuilder()
+                .setCheckpointId(checkpointId)
+                .build()
+        )
+        .build();
+  }
+
+  public static InstanceControlMsg buildCheckpointSavedMessage(
+      String checkpointId,
+      String packingPlanId
+  ) {
+    CheckpointManager.StatefulConsistentCheckpointSaved.Builder builder = CheckpointManager
+        .StatefulConsistentCheckpointSaved
+        .newBuilder();
+
+    CheckpointManager.StatefulConsistentCheckpoint.Builder ckptBuilder = CheckpointManager
+        .StatefulConsistentCheckpoint
+        .newBuilder();
+
+    ckptBuilder.setCheckpointId(checkpointId);
+    ckptBuilder.setPackingPlanId(packingPlanId);
+
+    builder.setConsistentCheckpoint(ckptBuilder.build());
+
+    return InstanceControlMsg.newBuilder()
+        .setStatefulCheckpointSaved(builder.build())
+        .build();
+  }
+
 }
diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto
index ba6633a..ea6162c 100644
--- a/heron/proto/ckptmgr.proto
+++ b/heron/proto/ckptmgr.proto
@@ -108,6 +108,12 @@
   required string checkpoint_id = 1;
 }
 
+// Message broadcasted to stmgr (will then be forwarded to instances) after when checkpoint becomes
+// "globally consistent"
+message StatefulConsistentCheckpointSaved {
+  required StatefulConsistentCheckpoint consistent_checkpoint = 1;
+}
+
 // Message sent by tmaster to stmgr asking them to reset their instances
 // to this checkpoint
 message RestoreTopologyStateRequest {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 6c0f973..9a752de 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -498,6 +498,15 @@
   }
 }
 
+void InstanceServer::BroadcastStatefulCheckpointSaved(
+    const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+  for (auto & iter : active_instances_) {
+    LOG(INFO) << "Sending checkpoint: " << _msg.consistent_checkpoint().checkpoint_id()
+              << " saved message to instance with task_id: " << iter.second;
+    SendMessage(iter.first, _msg);
+  }
+}
+
 void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
                                   const std::string& _component,
                                   Connection* _conn) const {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index df4b5e9..b4a6038 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -72,6 +72,9 @@
 
   void BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan);
 
+  void BroadcastStatefulCheckpointSaved(
+      const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg);
+
   virtual bool HaveAllInstancesConnectedToUs() const {
     return active_instances_.size() == expected_instances_.size();
   }
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 6510271..aa6625e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -368,12 +368,18 @@
     this->StartStatefulProcessing(checkpoint_id);
   };
 
+  auto broadcast_checkpoint_saved =
+       [this](const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+    this->BroadcastCheckpointSaved(_msg);
+  };
+
   tmaster_client_ = make_shared<TMasterClient>(eventLoop_, master_options, stmgr_id_, stmgr_host_,
                                       data_port_, local_data_port_, shell_port_,
                                       std::move(pplan_watch),
                                       std::move(stateful_checkpoint_watch),
                                       std::move(restore_topology_watch),
-                                      std::move(start_stateful_watch));
+                                      std::move(start_stateful_watch),
+                                      std::move(broadcast_checkpoint_saved));
 }
 
 void StMgr::CreateTupleCache() {
@@ -1080,6 +1086,12 @@
   stateful_restorer_->StartRestore(_checkpoint_id, _restore_txid, local_taskids, *pplan_);
 }
 
+// broadcast the news that the checkpoint has been saved to all instances connected to this stmgr
+void StMgr::BroadcastCheckpointSaved(
+        const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+  instance_server_->BroadcastStatefulCheckpointSaved(_msg);
+}
+
 // Called by TmasterClient when it receives directive from tmaster
 // to start processing after having previously recovered the state at _checkpoint_id
 void StMgr::StartStatefulProcessing(sp_string _checkpoint_id) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index decdc39..582ac3a 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -197,6 +197,10 @@
   void HandleStatefulRestoreDone(proto::system::StatusCode _status,
                                  std::string _checkpoint_id, sp_int64 _restore_txid);
 
+  // Called when stmgr received StatefulConsistentCheckpointSaved message from TMaster
+  // Then, the stmgr will forward this fact to all heron instances connected to it
+  void BroadcastCheckpointSaved(const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg);
+
   // Patch new physical plan with internal hydrated topology but keep new topology data:
   // - new topology state
   // - new topology/component config
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index f1e27d8..657077d 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -41,7 +41,9 @@
                              VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch,
                              VCallback<sp_string> _stateful_checkpoint_watch,
                              VCallback<sp_string, sp_int64> _restore_topology_watch,
-                             VCallback<sp_string> _start_stateful_watch)
+                             VCallback<sp_string> _start_stateful_watch,
+                             VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&>
+                                 _broadcast_checkpoint_saved)
     : Client(eventLoop, _options),
       stmgr_id_(_stmgr_id),
       stmgr_host_(_stmgr_host),
@@ -53,6 +55,7 @@
       stateful_checkpoint_watch_(std::move(_stateful_checkpoint_watch)),
       restore_topology_watch_(std::move(_restore_topology_watch)),
       start_stateful_watch_(std::move(_start_stateful_watch)),
+      broadcast_checkpoint_saved_(_broadcast_checkpoint_saved),
       reconnect_timer_id(0),
       heartbeat_timer_id(0),
       reconnect_attempts_(0) {
@@ -74,6 +77,7 @@
   InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointMessage);
   InstallMessageHandler(&TMasterClient::HandleRestoreTopologyStateRequest);
   InstallMessageHandler(&TMasterClient::HandleStartStmgrStatefulProcessing);
+  InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointSavedMessage);
 }
 
 TMasterClient::~TMasterClient() {
@@ -303,6 +307,11 @@
   start_stateful_watch_(_message->checkpoint_id());
 }
 
+void TMasterClient::HandleStatefulCheckpointSavedMessage(
+    pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg) {
+  broadcast_checkpoint_saved_(*_msg);
+}
+
 void TMasterClient::SendResetTopologyState(const std::string& _dead_stmgr,
                                            int32_t _dead_task,
                                            const std::string& _reason) {
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.h b/heron/stmgr/src/cpp/manager/tmaster-client.h
index ec83406..2457d72 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.h
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.h
@@ -42,7 +42,9 @@
                 VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch,
                 VCallback<sp_string> _stateful_checkpoint_watch,
                 VCallback<sp_string, sp_int64> _restore_topology_watch,
-                VCallback<sp_string> _start_stateful_watch);
+                VCallback<sp_string> _start_stateful_watch,
+                VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&>
+                    _broadcast_checkpoint_saved);
   virtual ~TMasterClient();
 
   // Told by the upper layer to disconnect and self destruct
@@ -88,6 +90,9 @@
   void HandleStartStmgrStatefulProcessing(
           pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
 
+  void HandleStatefulCheckpointSavedMessage(
+          pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg);
+
   void OnReConnectTimer();
   void OnHeartbeatTimer();
   void SendRegisterRequest();
@@ -116,6 +121,9 @@
   // We invoke this callback upon receiving a StartStatefulProcessing message from tmaster
   // passing in the checkpoint id
   VCallback<sp_string> start_stateful_watch_;
+  // This callback will be invoked upon receiving a StatefulConsistentCheckpointSaved message.
+  // We will then forward this message to all the instances connected to this stmgr
+  VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&> broadcast_checkpoint_saved_;
 
   // Configs to be read
   sp_int32 reconnect_tmaster_interval_sec_;
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
index 31dcf7b..a348c10 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
@@ -55,14 +55,18 @@
                shared_ptr<heron::common::HeronStateMgr> _state_mgr,
                std::chrono::high_resolution_clock::time_point _tmaster_start_time,
                shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
-               std::function<void(std::string)> _ckpt_save_watcher)
-  : topology_name_(_topology_name), ckpt_record_(std::move(_ckpt)), state_mgr_(_state_mgr),
-    metrics_manager_client_(_metrics_manager_client) {
+               std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)>
+                   _ckpt_save_watcher)
+  : topology_name_(_topology_name),
+    ckpt_record_(std::move(_ckpt)),
+    state_mgr_(_state_mgr),
+    metrics_manager_client_(_metrics_manager_client),
+    ckpt_save_watcher_(_ckpt_save_watcher) {
   checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time);
   restorer_ = make_unique<StatefulRestorer>();
   count_metrics_ = make_shared<common::MultiCountMetric>();
+
   metrics_manager_client_->register_metric("__stateful_controller", count_metrics_);
-  ckpt_save_watcher_ = _ckpt_save_watcher;
 }
 
 StatefulController::~StatefulController() {
@@ -162,13 +166,13 @@
         shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
         proto::system::StatusCode _status) {
   if (_status == proto::system::OK) {
-    LOG(INFO) << "Successfully saved " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
-              << " as the new globally consistent checkpoint";
     ckpt_record_ = std::move(_new_ckpt);
-    std::string oldest_ckpt =
-        ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size() - 1)
-          .checkpoint_id();
-    ckpt_save_watcher_(oldest_ckpt);
+
+    LOG(INFO) << "Successfully saved "
+              << ckpt_record_->consistent_checkpoints(0).checkpoint_id()
+              << " as the latest globally consistent checkpoint";
+
+    ckpt_save_watcher_(*ckpt_record_);
   } else {
     LOG(ERROR) << "Error saving " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
               << " as the new globally consistent checkpoint "
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h b/heron/tmaster/src/cpp/manager/stateful-controller.h
index 6ceda37..4e3b618 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.h
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.h
@@ -58,7 +58,8 @@
                shared_ptr<heron::common::HeronStateMgr> _state_mgr,
                std::chrono::high_resolution_clock::time_point _tmaster_start_time,
                shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
-               std::function<void(std::string)> _ckpt_save_watcher);
+               std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)>
+                   _ckpt_save_watcher);
   virtual ~StatefulController();
   // Start a new restore process
   void StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_checkpoints);
@@ -102,7 +103,7 @@
   unique_ptr<StatefulRestorer> restorer_;
   shared_ptr<common::MetricsMgrSt> metrics_manager_client_;
   shared_ptr<common::MultiCountMetric> count_metrics_;
-  std::function<void(std::string)> ckpt_save_watcher_;
+  std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> ckpt_save_watcher_;
 };
 }  // namespace tmaster
 }  // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
index 46be97d..b144244 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
@@ -107,10 +107,17 @@
 }
 
 void StMgrState::NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request) {
-  LOG(INFO) << "Sending a new stateful checkpoint request to stmgr" << stmgr_->id();
+  LOG(INFO) << "Sending a new stateful checkpoint request to stmgr: " << stmgr_->id();
   server_.SendMessage(connection_, _request);
 }
 
+void StMgrState::SendCheckpointSavedMessage(
+        const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg) {
+  LOG(INFO) << "Sending checkpoint saved message to stmgr: " << stmgr_->id() << " "
+            << "for checkpoint: " << _msg.consistent_checkpoint().checkpoint_id();
+  server_.SendMessage(connection_, _msg);
+}
+
 /*
 void
 StMgrState::AddAssignment(const std::vector<pair<string, sp_int32> >& _assignments,
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h b/heron/tmaster/src/cpp/manager/stmgrstate.h
index 270d93a..d9d9159 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.h
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.h
@@ -70,6 +70,8 @@
   // Send stateful checkpoint message to the stmgr
   void NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request);
 
+  void SendCheckpointSavedMessage(const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg);
+
 
   bool TimedOut() const;
 
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index 304634f..36b7845 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -398,8 +398,8 @@
        config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300);
   CHECK_GT(stateful_checkpoint_interval, 0);
 
-  auto cb = [this](std::string _oldest_ckptid) {
-    this->HandleStatefulCheckpointSave(_oldest_ckptid);
+  auto cb = [this](const proto::ckptmgr::StatefulConsistentCheckpoints& new_ckpts) {
+    this->HandleStatefulCheckpointSave(new_ckpts);
   };
   // Instantiate the stateful restorer/coordinator
   stateful_controller_ = make_unique<StatefulController>(topology_->name(), _ckpt, state_mgr_,
@@ -616,8 +616,22 @@
   ckptmgr_client_->SendCleanStatefulCheckpointRequest("", true);
 }
 
-void TMaster::HandleStatefulCheckpointSave(const std::string& _oldest_ckpt) {
-  ckptmgr_client_->SendCleanStatefulCheckpointRequest(_oldest_ckpt, false);
+void TMaster::HandleStatefulCheckpointSave(
+    const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts) {
+  // broadcast globally consistent checkpoint completion
+  proto::ckptmgr::StatefulConsistentCheckpointSaved msg;
+  msg.mutable_consistent_checkpoint()->CopyFrom(new_ckpts.consistent_checkpoints(0));
+
+  for (auto & stmgr : stmgrs_) {
+    stmgr.second->SendCheckpointSavedMessage(msg);
+  }
+
+  // clean oldest checkpoint on save
+  std::string oldest_ckpt_id =
+      new_ckpts.consistent_checkpoints(new_ckpts.consistent_checkpoints_size() - 1)
+        .checkpoint_id();
+
+  ckptmgr_client_->SendCleanStatefulCheckpointRequest(oldest_ckpt_id, false);
 }
 
 // Called when ckptmgr completes the clean stateful checkpoint request
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index d37fbaa..3bdd25e 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -195,7 +195,8 @@
                                      const ComponentConfigMap& _config);
 
   // Function called when a new stateful ckpt record is saved
-  void HandleStatefulCheckpointSave(const std::string& _oldest_ckpt);
+  void HandleStatefulCheckpointSave(
+      const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts);
 
   // Function called to kill container
   void KillContainer(const std::string& host_name,