[STREAMCOMP-2885] broadcast GCC completition (#3330)
* amend gitignore
* decouple GCC post-save logic from stateful controller
* tmaster broadcast checkpoint completion
the message will be first sent to all stmgrs, and then each stmgr will forward
it to every heron instance it connects to.
* heron instance handles global checkpoint saved msg
* expose necessary API for the 2PC support
unit tests for this will be in the following commit
* test: refactor mock physical plan to a builder
* test preSave and postSave hook
* [STREAMCOMP-2916] block execute on postSave
* add stateful tests for spouts
* test preRestore
* rename interface to ITwoPhaseStatefulComponent
* block spout from reading data tuples as well
* fix log format
* do not check interface
this is because the boolean will only be set to true for tasks that implement ITwoPhaseStatefulComponent. No need to do the check again.
diff --git a/.gitignore b/.gitignore
index 3b81340..c599612 100644
--- a/.gitignore
+++ b/.gitignore
@@ -90,6 +90,10 @@
.idea/sqlDataSources.xml
.idea/dynamic.xml
+# Intellij bazel plugin
+.ijwb
+.clwb
+
## Maven generated files
.classpath.txt
@@ -134,6 +138,7 @@
# Visual Studio Code
.vscode
+vs.code-workspace
# integration_test
results/
diff --git a/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
new file mode 100644
index 0000000..6dc88c3
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/topology/ITwoPhaseStatefulComponent.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.topology;
+
+import java.io.Serializable;
+
+/**
+ * Defines a stateful component that is aware of Heron topology's "two-phase commit".
+ *
+ * Note tasks saving a distributed checkpoint would be the "prepare" phase of the two-phase commit
+ * algorithm. When a distributed checkpoint is done, we can say that all tasks agree that they
+ * will not roll back to the time before that distributed checkpoint, and the "prepare" phase is
+ * complete.
+ *
+ * When the "prepare" phase is complete, Heron will invoke the "postSave" hook to signal the
+ * beginning of the "commit" phase. If there is a failure occurred during the "prepare" phase,
+ * Heron will invoke the hook "preRestore" to signal that two-phase commit is aborted, and the
+ * topology will be rolled back to the previous checkpoint.
+ *
+ * Note that the commit phase will finish after the postSave hook exits successfully. Then, the
+ * prepare phase of the following checkpoint will begin.
+ *
+ * In addition, for two-phase stateful components specifically, Heron will not execute (for bolts)
+ * or produce (for spouts) tuples between preSave and postSave. This will guarantee that the prepare
+ * phase of the next checkpoint will not overlap with the commit phase of the current checkpoint
+ * (eg. we block execution of tuples from the next checkpoint unless commit phase is done).
+ *
+ * See the end-to-end effectively-once designed doc (linked in the PR of this commit) for more
+ * details.
+ */
+public interface ITwoPhaseStatefulComponent<K extends Serializable, V extends Serializable>
+ extends IStatefulComponent<K, V> {
+
+ /**
+ * This is a hook for the component to perform some actions after a checkpoint is persisted
+ * successfully for all components in the topology.
+ *
+ * @param checkpointId the ID of the checkpoint
+ */
+ void postSave(String checkpointId);
+
+ /**
+ * This is a hook for the component to perform some actions (eg. state clean-up) before the
+ * framework attempts to delete the component and restore it to a previously-saved checkpoint.
+ *
+ * @param checkpointId the ID of the checkpoint that the component is being restored to
+ */
+ void preRestore(String checkpointId);
+
+}
diff --git a/heron/instance/src/java/org/apache/heron/instance/IInstance.java b/heron/instance/src/java/org/apache/heron/instance/IInstance.java
index de7fbd0..9671a1a 100644
--- a/heron/instance/src/java/org/apache/heron/instance/IInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/IInstance.java
@@ -58,6 +58,21 @@
void clean();
/**
+ * Inform the Instance that the framework will clean, stop, and delete the instance
+ * in order to restore its state to a previously-saved checkpoint.
+ *
+ * @param checkpointId the ID of the checkpoint the instance will be restoring to
+ */
+ void preRestore(String checkpointId);
+
+ /**
+ * Inform the Instance that a particular checkpoint has become globally consistent
+ *
+ * @param checkpointId the ID of the checkpoint that became globally consistent
+ */
+ void onCheckpointSaved(String checkpointId);
+
+ /**
* Destroy the whole IInstance.
* Notice: It should only be called when the whole program is
* exiting. And in fact, this method should never be called.
diff --git a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
index 789f53b..c58c384 100644
--- a/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
+++ b/heron/instance/src/java/org/apache/heron/instance/InstanceControlMsg.java
@@ -26,11 +26,13 @@
private PhysicalPlanHelper newPhysicalPlanHelper;
private CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest;
private CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing;
+ private CheckpointManager.StatefulConsistentCheckpointSaved statefulConsistentCheckpointSaved;
private InstanceControlMsg(Builder builder) {
this.newPhysicalPlanHelper = builder.newPhysicalPlanHelper;
this.restoreInstanceStateRequest = builder.restoreInstanceStateRequest;
this.startInstanceStatefulProcessing = builder.startInstanceStatefulProcessing;
+ this.statefulConsistentCheckpointSaved = builder.statefulConsistentCheckpointSaved;
}
public static Builder newBuilder() {
@@ -41,6 +43,10 @@
return newPhysicalPlanHelper;
}
+ public CheckpointManager.StatefulConsistentCheckpointSaved getStatefulCheckpointSavedMessage() {
+ return this.statefulConsistentCheckpointSaved;
+ }
+
public boolean isNewPhysicalPlanHelper() {
return newPhysicalPlanHelper != null;
}
@@ -65,6 +71,7 @@
private PhysicalPlanHelper newPhysicalPlanHelper;
private CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest;
private CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing;
+ private CheckpointManager.StatefulConsistentCheckpointSaved statefulConsistentCheckpointSaved;
private Builder() {
@@ -87,6 +94,12 @@
return this;
}
+ public Builder setStatefulCheckpointSaved(
+ CheckpointManager.StatefulConsistentCheckpointSaved message) {
+ this.statefulConsistentCheckpointSaved = message;
+ return this;
+ }
+
public InstanceControlMsg build() {
return new InstanceControlMsg(this);
}
diff --git a/heron/instance/src/java/org/apache/heron/instance/Slave.java b/heron/instance/src/java/org/apache/heron/instance/Slave.java
index 5b85fc6..4e57772 100644
--- a/heron/instance/src/java/org/apache/heron/instance/Slave.java
+++ b/heron/instance/src/java/org/apache/heron/instance/Slave.java
@@ -123,6 +123,16 @@
if (instanceControlMsg.isNewPhysicalPlanHelper()) {
handleNewPhysicalPlan(instanceControlMsg);
}
+
+ // When a checkpoint becomes "globally consistent"
+ if (instanceControlMsg.getStatefulCheckpointSavedMessage() != null) {
+ String checkpointId = instanceControlMsg
+ .getStatefulCheckpointSavedMessage()
+ .getConsistentCheckpoint()
+ .getCheckpointId();
+
+ handleGlobalCheckpointConsistent(checkpointId);
+ }
}
}
};
@@ -130,6 +140,11 @@
slaveLooper.addTasksOnWakeup(handleControlMessageTask);
}
+ private void handleGlobalCheckpointConsistent(String checkpointId) {
+ LOG.log(Level.INFO, "checkpoint: {0} has become globally consistent", checkpointId);
+ instance.onCheckpointSaved(checkpointId);
+ }
+
private void resetCurrentAssignment() {
helper.setTopologyContext(metricsCollector);
instance = helper.getMySpout() != null
@@ -262,7 +277,7 @@
startInstanceIfNeeded();
}
- private void cleanAndStopSlave() {
+ private void cleanAndStopSlaveBeforeRestore(String checkpointId) {
// Clear all queues
streamInCommunicator.clear();
streamOutCommunicator.clear();
@@ -275,6 +290,7 @@
slaveLooper.clearTimers();
if (instance != null) {
+ instance.preRestore(checkpointId);
instance.clean();
}
@@ -294,9 +310,13 @@
private void handleRestoreInstanceStateRequest(InstanceControlMsg instanceControlMsg) {
CheckpointManager.RestoreInstanceStateRequest request =
instanceControlMsg.getRestoreInstanceStateRequest();
+
+ // ID of the checkpoint we are restoring to
+ String checkpointId = request.getState().getCheckpointId();
+
// Clean buffers and unregister tasks in slave looper
if (isInstanceStarted) {
- cleanAndStopSlave();
+ cleanAndStopSlaveBeforeRestore(checkpointId);
}
// Restore the state
diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index 9cc177a..a53ed3d 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -37,6 +37,7 @@
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.Communicator;
@@ -74,6 +75,9 @@
private State<Serializable, Serializable> instanceState;
+ // default to false, can only be toggled to true if bolt implements ITwoPhaseStatefulComponent
+ private boolean waitingForCheckpointSaved;
+
// The reference to topology's config
private final Map<String, Object> config;
@@ -110,6 +114,8 @@
String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
helper.getMyInstanceId());
+ this.waitingForCheckpointSaved = false;
+
if (helper.getMyBolt() == null) {
throw new RuntimeException("HeronBoltInstance has no bolt in physical plan.");
}
@@ -162,10 +168,12 @@
// so that topology emit, ack, fail are thread safe
collector.lock.lock();
try {
- // Checkpoint
if (bolt instanceof IStatefulComponent) {
((IStatefulComponent) bolt).preSave(checkpointId);
}
+ if (bolt instanceof ITwoPhaseStatefulComponent) {
+ waitingForCheckpointSaved = true;
+ }
collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation);
} finally {
collector.lock.unlock();
@@ -214,6 +222,21 @@
}
@Override
+ public void preRestore(String checkpointId) {
+ if (bolt instanceof ITwoPhaseStatefulComponent) {
+ ((ITwoPhaseStatefulComponent) bolt).preRestore(checkpointId);
+ }
+ }
+
+ @Override
+ public void onCheckpointSaved(String checkpointId) {
+ if (bolt instanceof ITwoPhaseStatefulComponent) {
+ ((ITwoPhaseStatefulComponent) bolt).postSave(checkpointId);
+ waitingForCheckpointSaved = false;
+ }
+ }
+
+ @Override
public void clean() {
// Invoke clean up hook before clean() is called
helper.getTopologyContext().invokeHookCleanup();
@@ -238,6 +261,7 @@
@Override
public void run() {
boltMetrics.updateTaskRunCount();
+
// Back-pressure -- only when we could send out tuples will we read & execute tuples
if (collector.isOutQueuesAvailable()) {
boltMetrics.updateExecutionCount();
@@ -271,7 +295,7 @@
long startOfCycle = System.nanoTime();
// Read data from in Queues
- while (!inQueue.isEmpty()) {
+ while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
Message msg = inQueue.poll();
if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
diff --git a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index 53675c4..e9a0e13 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -36,6 +36,7 @@
import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
import org.apache.heron.api.topology.IUpdatable;
import org.apache.heron.api.utils.Utils;
import org.apache.heron.common.basics.ByteAmount;
@@ -71,6 +72,9 @@
private final boolean spillState;
private final String spillStateLocation;
+ // default to false, can only be toggled to true if spout implements ITwoPhaseStatefulComponent
+ private boolean waitingForCheckpointSaved;
+
private State<Serializable, Serializable> instanceState;
private final SlaveLooper looper;
@@ -110,6 +114,8 @@
String.valueOf(config.get(Config.TOPOLOGY_STATEFUL_SPILL_STATE_LOCATION)),
helper.getMyInstanceId());
+ this.waitingForCheckpointSaved = false;
+
LOG.info("Is this topology stateful: " + isTopologyStateful);
if (helper.getMySpout() == null) {
@@ -171,10 +177,15 @@
if (spout instanceof IStatefulComponent) {
((IStatefulComponent) spout).preSave(checkpointId);
}
+
+ if (spout instanceof ITwoPhaseStatefulComponent) {
+ waitingForCheckpointSaved = true;
+ }
collector.sendOutState(instanceState, checkpointId, spillState, spillStateLocation);
} finally {
collector.lock.unlock();
}
+
LOG.info("State persisted for checkpoint: " + checkpointId);
}
@@ -219,6 +230,21 @@
}
@Override
+ public void preRestore(String checkpointId) {
+ if (spout instanceof ITwoPhaseStatefulComponent) {
+ ((ITwoPhaseStatefulComponent) spout).preRestore(checkpointId);
+ }
+ }
+
+ @Override
+ public void onCheckpointSaved(String checkpointId) {
+ if (spout instanceof ITwoPhaseStatefulComponent) {
+ ((ITwoPhaseStatefulComponent) spout).postSave(checkpointId);
+ waitingForCheckpointSaved = false;
+ }
+ }
+
+ @Override
public void clean() {
// Invoke clean up hook before clean() is called
helper.getTopologyContext().invokeHookCleanup();
@@ -257,6 +283,9 @@
public void run() {
spoutMetrics.updateTaskRunCount();
+ // Check if we have any message to process anyway
+ readTuplesAndExecute(streamInQueue);
+
// Check whether we should produce more tuples
if (isProduceTuple()) {
spoutMetrics.updateProduceTupleCount();
@@ -272,9 +301,6 @@
spoutMetrics.updateOutQueueFullCount();
}
- // Check if we have any message to process anyway
- readTuplesAndExecute(streamInQueue);
-
if (ackEnabled) {
// Update the pending-to-be-acked tuples counts
spoutMetrics.updatePendingTuplesCount(collector.numInFlight());
@@ -330,12 +356,14 @@
* It is allowed in:
* 1. Outgoing Stream queue is available
* 2. Topology State is RUNNING
+ * 3. If the Spout implements ITwoPhaseStatefulComponent, not waiting for checkpoint saved message
*
* @return true to allow produceTuple() to be invoked
*/
private boolean isProduceTuple() {
return collector.isOutQueuesAvailable()
- && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
+ && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING)
+ && !waitingForCheckpointSaved;
}
protected void produceTuple() {
@@ -435,7 +463,7 @@
long startOfCycle = System.nanoTime();
Duration spoutAckBatchTime = systemConfig.getInstanceAckBatchTime();
- while (!inQueue.isEmpty()) {
+ while (!inQueue.isEmpty() && !waitingForCheckpointSaved) {
Message msg = inQueue.poll();
if (msg instanceof CheckpointManager.InitiateStatefulCheckpoint) {
diff --git a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
index 69b2493..fb9212b 100644
--- a/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
+++ b/heron/instance/src/java/org/apache/heron/network/StreamManagerClient.java
@@ -120,6 +120,7 @@
registerOnMessage(CheckpointManager.InitiateStatefulCheckpoint.newBuilder());
registerOnMessage(CheckpointManager.RestoreInstanceStateRequest.newBuilder());
registerOnMessage(CheckpointManager.StartInstanceStatefulProcessing.newBuilder());
+ registerOnMessage(CheckpointManager.StatefulConsistentCheckpointSaved.newBuilder());
}
@@ -205,6 +206,8 @@
handleRestoreInstanceStateRequest((CheckpointManager.RestoreInstanceStateRequest) message);
} else if (message instanceof CheckpointManager.StartInstanceStatefulProcessing) {
handleStartStatefulRequest((CheckpointManager.StartInstanceStatefulProcessing) message);
+ } else if (message instanceof CheckpointManager.StatefulConsistentCheckpointSaved) {
+ handleCheckpointSaved((CheckpointManager.StatefulConsistentCheckpointSaved) message);
} else {
throw new RuntimeException("Unknown kind of message received from Stream Manager");
}
@@ -286,6 +289,17 @@
inControlQueue.offer(instanceControlMsg);
}
+ private void handleCheckpointSaved(
+ CheckpointManager.StatefulConsistentCheckpointSaved message) {
+ LOG.info("Received a StatefulCheckpointSaved message with checkpoint id: "
+ + message.getConsistentCheckpoint().getCheckpointId());
+
+ InstanceControlMsg instanceControlMsg = InstanceControlMsg.newBuilder()
+ .setStatefulCheckpointSaved(message)
+ .build();
+ inControlQueue.offer(instanceControlMsg);
+ }
+
private void handleRestoreInstanceStateRequest(
CheckpointManager.RestoreInstanceStateRequest request) {
LOG.info("Received a RestoreInstanceState request with checkpoint id: "
diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD
index 0c79300..23b2186 100644
--- a/heron/instance/tests/java/BUILD
+++ b/heron/instance/tests/java/BUILD
@@ -24,8 +24,10 @@
"org.apache.heron.grouping.EmitDirectBoltTest",
"org.apache.heron.grouping.EmitDirectSpoutTest",
"org.apache.heron.instance.bolt.BoltInstanceTest",
+ "org.apache.heron.instance.bolt.BoltStatefulInstanceTest",
"org.apache.heron.instance.spout.ActivateDeactivateTest",
"org.apache.heron.instance.spout.SpoutInstanceTest",
+ "org.apache.heron.instance.spout.SpoutStatefulInstanceTest",
"org.apache.heron.metrics.GlobalMetricsTest",
"org.apache.heron.metrics.MultiAssignableMetricTest",
"org.apache.heron.network.ConnectTest",
diff --git a/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
new file mode 100644
index 0000000..d5eac2c
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/instance/bolt/BoltStatefulInstanceTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.instance.bolt;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.serializer.IPluggableSerializer;
+import org.apache.heron.api.serializer.JavaSerializer;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.instance.SlaveTester;
+import org.apache.heron.proto.system.HeronTuples;
+import org.apache.heron.proto.system.PhysicalPlans;
+import org.apache.heron.resource.Constants;
+import org.apache.heron.resource.MockPhysicalPlansBuilder;
+import org.apache.heron.resource.TestSpout;
+import org.apache.heron.resource.TestStatefulBolt;
+import org.apache.heron.resource.TestTwoPhaseStatefulBolt;
+import org.apache.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test if stateful bolt is able to respond to incoming control/data tuples as expected.
+ */
+public class BoltStatefulInstanceTest {
+ private SlaveTester slaveTester;
+ private static IPluggableSerializer serializer = new JavaSerializer();
+
+ @Before
+ public void before() {
+ slaveTester = new SlaveTester();
+ slaveTester.start();
+ }
+
+ @After
+ public void after() throws NoSuchFieldException, IllegalAccessException {
+ slaveTester.stop();
+ }
+
+ @Test
+ public void testPreSaveAndPostSave() throws Exception {
+ CountDownLatch preSaveLatch = new CountDownLatch(1);
+ CountDownLatch postSaveLatch = new CountDownLatch(1);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+ // initially non of preSave or postSave are invoked yet
+ assertEquals(1, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+
+ // this should invoke preSave
+ slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+ assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+
+ // this should invoke postSave
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+ assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(0, postSaveLatch.getCount());
+ }
+
+ @Test
+ public void testPreRestore() throws InterruptedException {
+ CountDownLatch preRestoreLatch = new CountDownLatch(1);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+ assertEquals(1, preRestoreLatch.getCount());
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
+
+ assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preRestoreLatch.getCount());
+ }
+
+ /**
+ * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked
+ * unless the corresponding postSave is called.
+ */
+ @Test
+ public void testPostSaveBlockExecute() throws Exception {
+ CountDownLatch preSaveLatch = new CountDownLatch(1);
+ CountDownLatch postSaveLatch = new CountDownLatch(1);
+
+ CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple
+
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCBolt());
+
+ // initially non of preSave or postSave are invoked yet
+ assertEquals(1, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+ assertEquals(1, executeLatch.getCount());
+
+ // this should invoke preSave
+ slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+ // put a data tuple into the inStreamQueue
+ slaveTester.getInStreamQueue().offer(buildTupleSet());
+
+ assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+ assertEquals(1, executeLatch.getCount());
+
+ // Wait for a bounded amount of time, assert that the tuple will not execute as it is
+ // blocked on postSave. This is because we only want to allow one uncommitted "transaction" on
+ // each task. See the design doc for more details.
+ assertFalse(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+ assertEquals(1, executeLatch.getCount());
+
+ // this should invoke postSave
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+ assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(0, postSaveLatch.getCount());
+ assertEquals(0, executeLatch.getCount());
+ }
+
+ /**
+ * Ensure that the aforementioned behaviour does not apply for bolts that don't implement
+ * ITwoPhaseStatefulComponent
+ */
+ @Test
+ public void testExecuteNotBlocked() throws Exception {
+ CountDownLatch preSaveLatch = new CountDownLatch(1);
+ CountDownLatch executeLatch = new CountDownLatch(1); // expect to execute one tuple
+
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.EXECUTE_LATCH, executeLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulBolt());
+
+ // initially non of preSave or postSave are invoked yet
+ assertEquals(1, preSaveLatch.getCount());
+ assertEquals(1, executeLatch.getCount());
+
+ // this should invoke preSave
+ slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+ // put a data tuple into the inStreamQueue
+ slaveTester.getInStreamQueue().offer(buildTupleSet());
+
+ assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+
+ // no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent
+ assertTrue(executeLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(0, executeLatch.getCount());
+ }
+
+ // build a tuple set that contains one data tuple
+ private HeronTuples.HeronTupleSet buildTupleSet() {
+ HeronTuples.HeronTupleSet.Builder heronTupleSet = HeronTuples.HeronTupleSet.newBuilder();
+ heronTupleSet.setSrcTaskId(1);
+ HeronTuples.HeronDataTupleSet.Builder dataTupleSet = HeronTuples.HeronDataTupleSet.newBuilder();
+ TopologyAPI.StreamId.Builder streamId = TopologyAPI.StreamId.newBuilder();
+ streamId.setComponentName("test-spout");
+ streamId.setId("default");
+ dataTupleSet.setStream(streamId);
+
+ HeronTuples.HeronDataTuple.Builder dataTuple = HeronTuples.HeronDataTuple.newBuilder();
+ dataTuple.setKey(0);
+
+ HeronTuples.RootId.Builder rootId = HeronTuples.RootId.newBuilder();
+ rootId.setKey(0);
+ rootId.setTaskid(0);
+ dataTuple.addRoots(rootId);
+
+ dataTuple.addValues(ByteString.copyFrom(serializer.serialize("A")));
+ dataTupleSet.addTuples(dataTuple);
+ heronTupleSet.setData(dataTupleSet);
+
+ return heronTupleSet.build();
+ }
+
+ private InstanceControlMsg buildPhysicalPlanMessageFor2PCBolt() {
+ return buildPhysicalPlanMessage(new TestTwoPhaseStatefulBolt());
+ }
+
+ private InstanceControlMsg buildPhysicalPlanMessageForStatefulBolt() {
+ return buildPhysicalPlanMessage(new TestStatefulBolt());
+ }
+
+ private InstanceControlMsg buildPhysicalPlanMessage(IRichBolt bolt) {
+ PhysicalPlans.PhysicalPlan physicalPlan =
+ MockPhysicalPlansBuilder
+ .newBuilder()
+ .withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
+ .withTopologyState(TopologyAPI.TopologyState.RUNNING)
+ .withSpoutInstance(
+ "test-spout",
+ 0,
+ "spout-id",
+ new TestSpout()
+ )
+ .withBoltInstance(
+ "test-bolt",
+ 1,
+ "bolt-id",
+ "test-spout",
+ bolt
+ )
+ .build();
+
+ PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "bolt-id");
+
+ return InstanceControlMsg.newBuilder()
+ .setNewPhysicalPlanHelper(ph)
+ .build();
+ }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
new file mode 100644
index 0000000..4d65aec
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/instance/spout/SpoutStatefulInstanceTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.instance.spout;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.serializer.IPluggableSerializer;
+import org.apache.heron.api.serializer.JavaSerializer;
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.common.basics.SingletonRegistry;
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.instance.SlaveTester;
+import org.apache.heron.proto.system.PhysicalPlans;
+import org.apache.heron.resource.Constants;
+import org.apache.heron.resource.MockPhysicalPlansBuilder;
+import org.apache.heron.resource.TestBolt;
+import org.apache.heron.resource.TestStatefulSpout;
+import org.apache.heron.resource.TestTwoPhaseStatefulSpout;
+import org.apache.heron.resource.UnitTestHelper;
+
+import static org.junit.Assert.*;
+
+public class SpoutStatefulInstanceTest {
+
+ private SlaveTester slaveTester;
+ private static IPluggableSerializer serializer = new JavaSerializer();
+
+ @Before
+ public void before() {
+ slaveTester = new SlaveTester();
+ slaveTester.start();
+ }
+
+ @After
+ public void after() throws NoSuchFieldException, IllegalAccessException {
+ slaveTester.stop();
+ }
+
+ @Test
+ public void testPreSaveAndPostSave() throws Exception {
+ CountDownLatch preSaveLatch = new CountDownLatch(1);
+ CountDownLatch postSaveLatch = new CountDownLatch(1);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+ // initially non of preSave or postSave are invoked yet
+ assertEquals(1, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+
+ // this should invoke preSave
+ slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+ assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+
+ // this should invoke postSave
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+ assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(0, postSaveLatch.getCount());
+ }
+
+ @Test
+ public void testPreRestore() throws InterruptedException {
+ CountDownLatch preRestoreLatch = new CountDownLatch(1);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRERESTORE_LATCH, preRestoreLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+ assertEquals(1, preRestoreLatch.getCount());
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("cx"));
+
+ assertTrue(preRestoreLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preRestoreLatch.getCount());
+ }
+
+ /**
+ * Ensure that for ITwoPhaseStatefulComponent bolts, after a preSave, execute will not be invoked
+ * unless the corresponding postSave is called.
+ */
+ @Test
+ public void testPostSaveBlockExecute() throws Exception {
+ // when this boolean is set to false, nextTuple on the spout will be run, but the spout will
+ // make sure to not emit any tuples.
+ AtomicBoolean shouldStartEmit = new AtomicBoolean(false);
+ SingletonRegistry.INSTANCE.registerSingleton(
+ Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit);
+
+ CountDownLatch preSaveLatch = new CountDownLatch(1);
+ CountDownLatch postSaveLatch = new CountDownLatch(1);
+ CountDownLatch emitLatch = new CountDownLatch(1);
+
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.POSTSAVE_LATCH, postSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, emitLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageFor2PCSpout());
+
+ // initially non of preSave or postSave are invoked yet
+ assertEquals(1, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+
+ // this should invoke preSave
+ slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+ // tell the spout to start emitting tuples
+ assertFalse(shouldStartEmit.getAndSet(true));
+
+ // since preSave is executed, spout will not emit until postSave is called
+ assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+ assertEquals(1, emitLatch.getCount());
+
+ // Wait for a bounded amount of time, assert that the spout will not emit tuples as it is
+ // blocked on postSave. This is because we only want to allow one uncommitted "transaction" on
+ // each task. See the design doc for more details.
+ assertFalse(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(1, postSaveLatch.getCount());
+ assertEquals(1, emitLatch.getCount());
+
+ // this should invoke postSave
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildCheckpointSavedMessage("c0", "p0"));
+ assertTrue(postSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(0, postSaveLatch.getCount());
+ assertEquals(0, emitLatch.getCount());
+ }
+
+ /**
+ * Ensure that the aforementioned behaviour does not apply for spouts that don't implement
+ * ITwoPhaseStatefulComponent
+ */
+ @Test
+ public void testExecuteNotBlocked() throws Exception {
+ // when this boolean is set to false, nextTuple on the spout will be run, but the spout will
+ // make sure to not emit any tuples.
+ AtomicBoolean shouldStartEmit = new AtomicBoolean(false);
+ SingletonRegistry.INSTANCE.registerSingleton(
+ Constants.SPOUT_SHOULD_START_EMIT, shouldStartEmit);
+
+ CountDownLatch preSaveLatch = new CountDownLatch(1);
+ CountDownLatch emitLatch = new CountDownLatch(1);
+
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.PRESAVE_LATCH, preSaveLatch);
+ SingletonRegistry.INSTANCE.registerSingleton(Constants.EMIT_LATCH, emitLatch);
+
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildRestoreInstanceState("c0"));
+ slaveTester.getInControlQueue().offer(UnitTestHelper.buildStartInstanceProcessingMessage("c0"));
+ slaveTester.getInControlQueue().offer(buildPhysicalPlanMessageForStatefulSpout());
+
+ // initially non of preSave or postSave are invoked yet
+ assertEquals(1, preSaveLatch.getCount());
+ assertEquals(1, emitLatch.getCount());
+
+ // this should invoke preSave
+ slaveTester.getInStreamQueue().offer(UnitTestHelper.buildPersistStateMessage("c0"));
+
+ // tell the spout to start emitting tuples
+ assertFalse(shouldStartEmit.getAndSet(true));
+
+ assertTrue(preSaveLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+
+ // no need to wait for postSave as the bolt doesn't implement ITwoPhaseStatefulComponent
+ assertTrue(emitLatch.await(Constants.TEST_WAIT_TIME.toMillis(), TimeUnit.MILLISECONDS));
+ assertEquals(0, preSaveLatch.getCount());
+ assertEquals(0, emitLatch.getCount());
+ }
+
+ private InstanceControlMsg buildPhysicalPlanMessageFor2PCSpout() {
+ return buildPhysicalPlanMessage(new TestTwoPhaseStatefulSpout());
+ }
+
+ private InstanceControlMsg buildPhysicalPlanMessageForStatefulSpout() {
+ return buildPhysicalPlanMessage(new TestStatefulSpout());
+ }
+
+ private InstanceControlMsg buildPhysicalPlanMessage(IRichSpout spout) {
+ PhysicalPlans.PhysicalPlan physicalPlan =
+ MockPhysicalPlansBuilder
+ .newBuilder()
+ .withTopologyConfig(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE, -1)
+ .withTopologyState(TopologyAPI.TopologyState.RUNNING)
+ .withSpoutInstance(
+ "test-spout",
+ 0,
+ "spout-id",
+ spout
+ )
+ .withBoltInstance(
+ "test-bolt",
+ 1,
+ "bolt-id",
+ "test-spout",
+ new TestBolt()
+ )
+ .build();
+
+ PhysicalPlanHelper ph = new PhysicalPlanHelper(physicalPlan, "spout-id");
+
+ return InstanceControlMsg.newBuilder()
+ .setNewPhysicalPlanHelper(ph)
+ .build();
+ }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/Constants.java b/heron/instance/tests/java/org/apache/heron/resource/Constants.java
index 3ffe9b0..9b7d2ce 100644
--- a/heron/instance/tests/java/org/apache/heron/resource/Constants.java
+++ b/heron/instance/tests/java/org/apache/heron/resource/Constants.java
@@ -39,12 +39,18 @@
public static final String ACK_COUNT = "ack-count";
public static final String EXECUTE_LATCH = "execute-latch";
+ public static final String EMIT_LATCH = "emit-latch";
public static final String FAIL_LATCH = "fail-latch";
public static final String ACK_LATCH = "ack-latch";
public static final String ACTIVATE_COUNT_LATCH = "activate-count-latch";
public static final String DEACTIVATE_COUNT_LATCH = "deactivate-count-latch";
+ public static final String PRESAVE_LATCH = "preSave-latch";
+ public static final String POSTSAVE_LATCH = "postSave-latch";
+ public static final String PRERESTORE_LATCH = "postSave-latch";
+ public static final String SPOUT_SHOULD_START_EMIT = "spout-should-start-emit";
+
public static final String RECEIVED_STRING_LIST = "received-string-list";
public static final String HERON_SYSTEM_CONFIG = "org.apache.heron.common.config.SystemConfig";
diff --git a/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
new file mode 100644
index 0000000..14a36cf
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/MockPhysicalPlansBuilder.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.IRichBolt;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.spout.IRichSpout;
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.proto.system.PhysicalPlans;
+
+@Ignore
+public final class MockPhysicalPlansBuilder {
+
+ private PhysicalPlans.PhysicalPlan.Builder pPlan;
+ private TopologyBuilder topologyBuilder;
+ private List<PhysicalPlans.Instance.Builder> instanceBuilders;
+
+ private Config conf;
+ private TopologyAPI.TopologyState initialTopologyState;
+
+ private MockPhysicalPlansBuilder() {
+ pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
+ topologyBuilder = new TopologyBuilder();
+ instanceBuilders = new ArrayList<>();
+
+ conf = null;
+ initialTopologyState = null;
+ }
+
+ public static MockPhysicalPlansBuilder newBuilder() {
+ return new MockPhysicalPlansBuilder();
+ }
+
+ public MockPhysicalPlansBuilder withTopologyConfig(
+ Config.TopologyReliabilityMode reliabilityMode,
+ int messageTimeout
+ ) {
+ conf = new Config();
+ conf.setTeamEmail("streaming-compute@twitter.com");
+ conf.setTeamName("stream-computing");
+ conf.setTopologyProjectName("heron-integration-test");
+ conf.setNumStmgrs(1);
+ conf.setMaxSpoutPending(100);
+ conf.setTopologyReliabilityMode(reliabilityMode);
+ if (messageTimeout != -1) {
+ conf.setMessageTimeoutSecs(messageTimeout);
+ conf.put("topology.enable.message.timeouts", "true");
+ }
+
+ return this;
+ }
+
+ public MockPhysicalPlansBuilder withTopologyState(TopologyAPI.TopologyState topologyState) {
+ initialTopologyState = topologyState;
+ return this;
+ }
+
+ public MockPhysicalPlansBuilder withSpoutInstance(
+ String componentName,
+ int taskId,
+ String instanceId,
+ IRichSpout spout
+ ) {
+ PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
+ spoutInstanceInfo.setComponentName(componentName);
+ spoutInstanceInfo.setTaskId(taskId);
+ spoutInstanceInfo.setComponentIndex(0);
+
+ PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder();
+ spoutInstance.setInstanceId(instanceId);
+ spoutInstance.setStmgrId("stream-manager-id");
+ spoutInstance.setInfo(spoutInstanceInfo);
+
+ topologyBuilder.setSpout(componentName, spout, 1);
+ instanceBuilders.add(spoutInstance);
+
+ return this;
+ }
+
+ public MockPhysicalPlansBuilder withBoltInstance(
+ String componentName,
+ int taskId,
+ String instanceId,
+ String upStreamComponentId,
+ IRichBolt bolt
+ ) {
+ PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
+ boltInstanceInfo.setComponentName(componentName);
+ boltInstanceInfo.setTaskId(taskId);
+ boltInstanceInfo.setComponentIndex(0);
+
+ PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder();
+ boltInstance.setInstanceId(instanceId);
+ boltInstance.setStmgrId("stream-manager-id");
+ boltInstance.setInfo(boltInstanceInfo);
+
+ topologyBuilder.setBolt(componentName, bolt, 1)
+ .shuffleGrouping(upStreamComponentId);
+ instanceBuilders.add(boltInstance);
+
+ return this;
+ }
+
+ public PhysicalPlans.PhysicalPlan build() {
+ addStmgrs();
+ pPlan.setTopology(buildTopology());
+
+ for (PhysicalPlans.Instance.Builder b : instanceBuilders) {
+ pPlan.addInstances(b);
+ }
+
+ return pPlan.build();
+ }
+
+ private TopologyAPI.Topology buildTopology() {
+ return topologyBuilder
+ .createTopology()
+ .setName("topology-name")
+ .setConfig(conf)
+ .setState(initialTopologyState)
+ .getTopology();
+ }
+
+ private void addStmgrs() {
+ PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
+ stmgr.setId("stream-manager-id");
+ stmgr.setHostName("127.0.0.1");
+ stmgr.setDataPort(8888);
+ stmgr.setLocalEndpoint("endpoint");
+ pPlan.addStmgrs(stmgr);
+ }
+
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java
new file mode 100644
index 0000000..e16efff
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulBolt.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestStatefulBolt extends BaseRichBolt
+ implements IStatefulComponent<String, String> {
+ @Override
+ public void prepare(
+ Map<String, Object> heronConf,
+ TopologyContext context,
+ OutputCollector collector) {
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ CountDownLatch tupleExecutedLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH);
+
+ if (tupleExecutedLatch != null) {
+ tupleExecutedLatch.countDown();
+ }
+ }
+
+ @Override
+ public void initState(State<String, String> state) {
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ CountDownLatch preSaveLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+ if (preSaveLatch != null) {
+ preSaveLatch.countDown();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java
new file mode 100644
index 0000000..c22a917
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestStatefulSpout.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestStatefulSpout extends BaseRichSpout implements IStatefulComponent<String, String> {
+ @Override
+ public void open(
+ Map<String, Object> conf,
+ TopologyContext context,
+ SpoutOutputCollector collector) {
+ }
+
+ @Override
+ public void nextTuple() {
+ AtomicBoolean shouldStartEmit =
+ (AtomicBoolean) SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT);
+
+ if (shouldStartEmit != null && !shouldStartEmit.get()) {
+ return;
+ }
+
+ // actually "emit" the tuple
+ CountDownLatch emitLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH);
+
+ if (emitLatch != null) {
+ emitLatch.countDown();
+ }
+ }
+
+ @Override
+ public void initState(State<String, String> state) {
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ CountDownLatch preSaveLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+ if (preSaveLatch != null) {
+ preSaveLatch.countDown();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
new file mode 100644
index 0000000..b67a9ee
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulBolt.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Ignore;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+@Ignore
+public class TestTwoPhaseStatefulBolt extends BaseRichBolt
+ implements ITwoPhaseStatefulComponent<String, String> {
+
+ private static final long serialVersionUID = -5160420613503624743L;
+
+ @Override
+ public void prepare(
+ Map<String, Object> map,
+ TopologyContext topologyContext,
+ OutputCollector collector) {
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ CountDownLatch tupleExecutedLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EXECUTE_LATCH);
+
+ if (tupleExecutedLatch != null) {
+ tupleExecutedLatch.countDown();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public void postSave(String checkpointId) {
+ CountDownLatch postSaveLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH);
+
+ if (postSaveLatch != null) {
+ postSaveLatch.countDown();
+ }
+ }
+
+ @Override
+ public void preRestore(String checkpointId) {
+ CountDownLatch preRestoreLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH);
+
+ if (preRestoreLatch != null) {
+ preRestoreLatch.countDown();
+ }
+ }
+
+ @Override
+ public void initState(State<String, String> state) {
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ CountDownLatch preSaveLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+ if (preSaveLatch != null) {
+ preSaveLatch.countDown();
+ }
+ }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
new file mode 100644
index 0000000..a4b6efd
--- /dev/null
+++ b/heron/instance/tests/java/org/apache/heron/resource/TestTwoPhaseStatefulSpout.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.resource;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.ITwoPhaseStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.common.basics.SingletonRegistry;
+
+public class TestTwoPhaseStatefulSpout extends BaseRichSpout
+ implements ITwoPhaseStatefulComponent<String, String> {
+ @Override
+ public void open(
+ Map<String, Object> conf,
+ TopologyContext context,
+ SpoutOutputCollector collector) {
+ }
+
+ @Override
+ public void nextTuple() {
+ AtomicBoolean shouldStartEmit =
+ (AtomicBoolean) SingletonRegistry.INSTANCE.getSingleton(Constants.SPOUT_SHOULD_START_EMIT);
+
+ if (shouldStartEmit != null && !shouldStartEmit.get()) {
+ return;
+ }
+
+ // actually "emit" the tuple
+ CountDownLatch emitLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.EMIT_LATCH);
+
+ if (emitLatch != null) {
+ emitLatch.countDown();
+ }
+ }
+
+ @Override
+ public void postSave(String checkpointId) {
+ CountDownLatch postSaveLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.POSTSAVE_LATCH);
+
+ if (postSaveLatch != null) {
+ postSaveLatch.countDown();
+ }
+ }
+
+ @Override
+ public void preRestore(String checkpointId) {
+ CountDownLatch preRestoreLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRERESTORE_LATCH);
+
+ if (preRestoreLatch != null) {
+ preRestoreLatch.countDown();
+ }
+ }
+
+ @Override
+ public void initState(State<String, String> state) {
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ CountDownLatch preSaveLatch =
+ (CountDownLatch) SingletonRegistry.INSTANCE.getSingleton(Constants.PRESAVE_LATCH);
+
+ if (preSaveLatch != null) {
+ preSaveLatch.countDown();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
diff --git a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
index 904ecab..8e39333 100644
--- a/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
+++ b/heron/instance/tests/java/org/apache/heron/resource/UnitTestHelper.java
@@ -23,14 +23,17 @@
import java.nio.file.Paths;
import java.util.Map;
+import com.google.protobuf.Message;
+
import org.junit.Ignore;
import org.apache.heron.api.Config;
import org.apache.heron.api.generated.TopologyAPI;
-import org.apache.heron.api.topology.TopologyBuilder;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.config.SystemConfigKey;
+import org.apache.heron.instance.InstanceControlMsg;
+import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.stmgr.StreamManager;
import org.apache.heron.proto.system.Common;
import org.apache.heron.proto.system.PhysicalPlans;
@@ -60,91 +63,36 @@
public static PhysicalPlans.PhysicalPlan getPhysicalPlan(
boolean ackEnabled,
int messageTimeout,
- TopologyAPI.TopologyState topologyState) {
- PhysicalPlans.PhysicalPlan.Builder pPlan = PhysicalPlans.PhysicalPlan.newBuilder();
+ TopologyAPI.TopologyState topologyState
+ ) {
+ Config.TopologyReliabilityMode reliabilityMode = ackEnabled
+ ? Config.TopologyReliabilityMode.ATLEAST_ONCE
+ : Config.TopologyReliabilityMode.ATMOST_ONCE;
- setTopology(pPlan, ackEnabled, messageTimeout, topologyState);
-
- setInstances(pPlan);
-
- setStMgr(pPlan);
-
- return pPlan.build();
+ return MockPhysicalPlansBuilder
+ .newBuilder()
+ .withTopologyConfig(reliabilityMode, messageTimeout)
+ .withTopologyState(topologyState)
+ .withSpoutInstance(
+ "test-spout",
+ 0,
+ "spout-id",
+ new TestSpout()
+ )
+ .withBoltInstance(
+ "test-bolt",
+ 1,
+ "bolt-id",
+ "test-spout",
+ new TestBolt()
+ )
+ .build();
}
public static PhysicalPlans.PhysicalPlan getPhysicalPlan(boolean ackEnabled, int messageTimeout) {
return getPhysicalPlan(ackEnabled, messageTimeout, TopologyAPI.TopologyState.RUNNING);
}
- private static void setTopology(PhysicalPlans.PhysicalPlan.Builder pPlan, boolean ackEnabled,
- int messageTimeout, TopologyAPI.TopologyState topologyState) {
- TopologyBuilder topologyBuilder = new TopologyBuilder();
- topologyBuilder.setSpout("test-spout", new TestSpout(), 1);
- // Here we need case switch to corresponding grouping
- topologyBuilder.setBolt("test-bolt", new TestBolt(), 1).shuffleGrouping("test-spout");
-
- Config conf = new Config();
- conf.setTeamEmail("streaming-compute@twitter.com");
- conf.setTeamName("stream-computing");
- conf.setTopologyProjectName("heron-integration-test");
- conf.setNumStmgrs(1);
- conf.setMaxSpoutPending(100);
- if (ackEnabled) {
- conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATLEAST_ONCE);
- } else {
- conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATMOST_ONCE);
- }
- if (messageTimeout != -1) {
- conf.setMessageTimeoutSecs(messageTimeout);
- conf.put("topology.enable.message.timeouts", "true");
- }
-
- TopologyAPI.Topology fTopology =
- topologyBuilder.createTopology().
- setName("topology-name").
- setConfig(conf).
- setState(topologyState).
- getTopology();
-
- pPlan.setTopology(fTopology);
- }
-
- private static void setInstances(PhysicalPlans.PhysicalPlan.Builder pPlan) {
- // Construct the spoutInstance
- PhysicalPlans.InstanceInfo.Builder spoutInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
- spoutInstanceInfo.setComponentName("test-spout");
- spoutInstanceInfo.setTaskId(0);
- spoutInstanceInfo.setComponentIndex(0);
-
- PhysicalPlans.Instance.Builder spoutInstance = PhysicalPlans.Instance.newBuilder();
- spoutInstance.setInstanceId("spout-id");
- spoutInstance.setStmgrId("stream-manager-id");
- spoutInstance.setInfo(spoutInstanceInfo);
-
- // Construct the boltInstanceInfo
- PhysicalPlans.InstanceInfo.Builder boltInstanceInfo = PhysicalPlans.InstanceInfo.newBuilder();
- boltInstanceInfo.setComponentName("test-bolt");
- boltInstanceInfo.setTaskId(1);
- boltInstanceInfo.setComponentIndex(0);
-
- PhysicalPlans.Instance.Builder boltInstance = PhysicalPlans.Instance.newBuilder();
- boltInstance.setInstanceId("bolt-id");
- boltInstance.setStmgrId("stream-manager-id");
- boltInstance.setInfo(boltInstanceInfo);
-
- pPlan.addInstances(spoutInstance);
- pPlan.addInstances(boltInstance);
- }
-
- private static void setStMgr(PhysicalPlans.PhysicalPlan.Builder pPlan) {
- PhysicalPlans.StMgr.Builder stmgr = PhysicalPlans.StMgr.newBuilder();
- stmgr.setId("stream-manager-id");
- stmgr.setHostName("127.0.0.1");
- stmgr.setDataPort(8888);
- stmgr.setLocalEndpoint("endpoint");
- pPlan.addStmgrs(stmgr);
- }
-
@SuppressWarnings("unchecked")
public static void clearSingletonRegistry() throws IllegalAccessException, NoSuchFieldException {
// Remove the Singleton by Reflection
@@ -196,4 +144,60 @@
return registerInstanceResponse.build();
}
+ public static Message buildPersistStateMessage(String checkpointId) {
+ CheckpointManager.InitiateStatefulCheckpoint.Builder builder = CheckpointManager
+ .InitiateStatefulCheckpoint
+ .newBuilder();
+
+ builder.setCheckpointId(checkpointId);
+
+ return builder.build();
+ }
+
+ public static InstanceControlMsg buildRestoreInstanceState(String checkpointId) {
+ return InstanceControlMsg.newBuilder()
+ .setRestoreInstanceStateRequest(
+ CheckpointManager.RestoreInstanceStateRequest
+ .newBuilder()
+ .setState(CheckpointManager.InstanceStateCheckpoint
+ .newBuilder()
+ .setCheckpointId(checkpointId))
+ .build()
+ )
+ .build();
+ }
+
+ public static InstanceControlMsg buildStartInstanceProcessingMessage(String checkpointId) {
+ return InstanceControlMsg.newBuilder()
+ .setStartInstanceStatefulProcessing(
+ CheckpointManager.StartInstanceStatefulProcessing
+ .newBuilder()
+ .setCheckpointId(checkpointId)
+ .build()
+ )
+ .build();
+ }
+
+ public static InstanceControlMsg buildCheckpointSavedMessage(
+ String checkpointId,
+ String packingPlanId
+ ) {
+ CheckpointManager.StatefulConsistentCheckpointSaved.Builder builder = CheckpointManager
+ .StatefulConsistentCheckpointSaved
+ .newBuilder();
+
+ CheckpointManager.StatefulConsistentCheckpoint.Builder ckptBuilder = CheckpointManager
+ .StatefulConsistentCheckpoint
+ .newBuilder();
+
+ ckptBuilder.setCheckpointId(checkpointId);
+ ckptBuilder.setPackingPlanId(packingPlanId);
+
+ builder.setConsistentCheckpoint(ckptBuilder.build());
+
+ return InstanceControlMsg.newBuilder()
+ .setStatefulCheckpointSaved(builder.build())
+ .build();
+ }
+
}
diff --git a/heron/proto/ckptmgr.proto b/heron/proto/ckptmgr.proto
index ba6633a..ea6162c 100644
--- a/heron/proto/ckptmgr.proto
+++ b/heron/proto/ckptmgr.proto
@@ -108,6 +108,12 @@
required string checkpoint_id = 1;
}
+// Message broadcasted to stmgr (will then be forwarded to instances) after when checkpoint becomes
+// "globally consistent"
+message StatefulConsistentCheckpointSaved {
+ required StatefulConsistentCheckpoint consistent_checkpoint = 1;
+}
+
// Message sent by tmaster to stmgr asking them to reset their instances
// to this checkpoint
message RestoreTopologyStateRequest {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 6c0f973..9a752de 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -498,6 +498,15 @@
}
}
+void InstanceServer::BroadcastStatefulCheckpointSaved(
+ const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+ for (auto & iter : active_instances_) {
+ LOG(INFO) << "Sending checkpoint: " << _msg.consistent_checkpoint().checkpoint_id()
+ << " saved message to instance with task_id: " << iter.second;
+ SendMessage(iter.first, _msg);
+ }
+}
+
void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
const std::string& _component,
Connection* _conn) const {
diff --git a/heron/stmgr/src/cpp/manager/instance-server.h b/heron/stmgr/src/cpp/manager/instance-server.h
index df4b5e9..b4a6038 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.h
+++ b/heron/stmgr/src/cpp/manager/instance-server.h
@@ -72,6 +72,9 @@
void BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan);
+ void BroadcastStatefulCheckpointSaved(
+ const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg);
+
virtual bool HaveAllInstancesConnectedToUs() const {
return active_instances_.size() == expected_instances_.size();
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr.cpp
index 6510271..aa6625e 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr.cpp
@@ -368,12 +368,18 @@
this->StartStatefulProcessing(checkpoint_id);
};
+ auto broadcast_checkpoint_saved =
+ [this](const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+ this->BroadcastCheckpointSaved(_msg);
+ };
+
tmaster_client_ = make_shared<TMasterClient>(eventLoop_, master_options, stmgr_id_, stmgr_host_,
data_port_, local_data_port_, shell_port_,
std::move(pplan_watch),
std::move(stateful_checkpoint_watch),
std::move(restore_topology_watch),
- std::move(start_stateful_watch));
+ std::move(start_stateful_watch),
+ std::move(broadcast_checkpoint_saved));
}
void StMgr::CreateTupleCache() {
@@ -1080,6 +1086,12 @@
stateful_restorer_->StartRestore(_checkpoint_id, _restore_txid, local_taskids, *pplan_);
}
+// broadcast the news that the checkpoint has been saved to all instances connected to this stmgr
+void StMgr::BroadcastCheckpointSaved(
+ const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
+ instance_server_->BroadcastStatefulCheckpointSaved(_msg);
+}
+
// Called by TmasterClient when it receives directive from tmaster
// to start processing after having previously recovered the state at _checkpoint_id
void StMgr::StartStatefulProcessing(sp_string _checkpoint_id) {
diff --git a/heron/stmgr/src/cpp/manager/stmgr.h b/heron/stmgr/src/cpp/manager/stmgr.h
index decdc39..582ac3a 100644
--- a/heron/stmgr/src/cpp/manager/stmgr.h
+++ b/heron/stmgr/src/cpp/manager/stmgr.h
@@ -197,6 +197,10 @@
void HandleStatefulRestoreDone(proto::system::StatusCode _status,
std::string _checkpoint_id, sp_int64 _restore_txid);
+ // Called when stmgr received StatefulConsistentCheckpointSaved message from TMaster
+ // Then, the stmgr will forward this fact to all heron instances connected to it
+ void BroadcastCheckpointSaved(const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg);
+
// Patch new physical plan with internal hydrated topology but keep new topology data:
// - new topology state
// - new topology/component config
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.cpp b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
index f1e27d8..657077d 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.cpp
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.cpp
@@ -41,7 +41,9 @@
VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch,
VCallback<sp_string> _stateful_checkpoint_watch,
VCallback<sp_string, sp_int64> _restore_topology_watch,
- VCallback<sp_string> _start_stateful_watch)
+ VCallback<sp_string> _start_stateful_watch,
+ VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&>
+ _broadcast_checkpoint_saved)
: Client(eventLoop, _options),
stmgr_id_(_stmgr_id),
stmgr_host_(_stmgr_host),
@@ -53,6 +55,7 @@
stateful_checkpoint_watch_(std::move(_stateful_checkpoint_watch)),
restore_topology_watch_(std::move(_restore_topology_watch)),
start_stateful_watch_(std::move(_start_stateful_watch)),
+ broadcast_checkpoint_saved_(_broadcast_checkpoint_saved),
reconnect_timer_id(0),
heartbeat_timer_id(0),
reconnect_attempts_(0) {
@@ -74,6 +77,7 @@
InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointMessage);
InstallMessageHandler(&TMasterClient::HandleRestoreTopologyStateRequest);
InstallMessageHandler(&TMasterClient::HandleStartStmgrStatefulProcessing);
+ InstallMessageHandler(&TMasterClient::HandleStatefulCheckpointSavedMessage);
}
TMasterClient::~TMasterClient() {
@@ -303,6 +307,11 @@
start_stateful_watch_(_message->checkpoint_id());
}
+void TMasterClient::HandleStatefulCheckpointSavedMessage(
+ pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg) {
+ broadcast_checkpoint_saved_(*_msg);
+}
+
void TMasterClient::SendResetTopologyState(const std::string& _dead_stmgr,
int32_t _dead_task,
const std::string& _reason) {
diff --git a/heron/stmgr/src/cpp/manager/tmaster-client.h b/heron/stmgr/src/cpp/manager/tmaster-client.h
index ec83406..2457d72 100644
--- a/heron/stmgr/src/cpp/manager/tmaster-client.h
+++ b/heron/stmgr/src/cpp/manager/tmaster-client.h
@@ -42,7 +42,9 @@
VCallback<shared_ptr<proto::system::PhysicalPlan>> _pplan_watch,
VCallback<sp_string> _stateful_checkpoint_watch,
VCallback<sp_string, sp_int64> _restore_topology_watch,
- VCallback<sp_string> _start_stateful_watch);
+ VCallback<sp_string> _start_stateful_watch,
+ VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&>
+ _broadcast_checkpoint_saved);
virtual ~TMasterClient();
// Told by the upper layer to disconnect and self destruct
@@ -88,6 +90,9 @@
void HandleStartStmgrStatefulProcessing(
pool_unique_ptr<proto::ckptmgr::StartStmgrStatefulProcessing> _msg);
+ void HandleStatefulCheckpointSavedMessage(
+ pool_unique_ptr<proto::ckptmgr::StatefulConsistentCheckpointSaved> _msg);
+
void OnReConnectTimer();
void OnHeartbeatTimer();
void SendRegisterRequest();
@@ -116,6 +121,9 @@
// We invoke this callback upon receiving a StartStatefulProcessing message from tmaster
// passing in the checkpoint id
VCallback<sp_string> start_stateful_watch_;
+ // This callback will be invoked upon receiving a StatefulConsistentCheckpointSaved message.
+ // We will then forward this message to all the instances connected to this stmgr
+ VCallback<const proto::ckptmgr::StatefulConsistentCheckpointSaved&> broadcast_checkpoint_saved_;
// Configs to be read
sp_int32 reconnect_tmaster_interval_sec_;
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.cpp b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
index 31dcf7b..a348c10 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.cpp
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.cpp
@@ -55,14 +55,18 @@
shared_ptr<heron::common::HeronStateMgr> _state_mgr,
std::chrono::high_resolution_clock::time_point _tmaster_start_time,
shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
- std::function<void(std::string)> _ckpt_save_watcher)
- : topology_name_(_topology_name), ckpt_record_(std::move(_ckpt)), state_mgr_(_state_mgr),
- metrics_manager_client_(_metrics_manager_client) {
+ std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)>
+ _ckpt_save_watcher)
+ : topology_name_(_topology_name),
+ ckpt_record_(std::move(_ckpt)),
+ state_mgr_(_state_mgr),
+ metrics_manager_client_(_metrics_manager_client),
+ ckpt_save_watcher_(_ckpt_save_watcher) {
checkpointer_ = make_unique<StatefulCheckpointer>(_tmaster_start_time);
restorer_ = make_unique<StatefulRestorer>();
count_metrics_ = make_shared<common::MultiCountMetric>();
+
metrics_manager_client_->register_metric("__stateful_controller", count_metrics_);
- ckpt_save_watcher_ = _ckpt_save_watcher;
}
StatefulController::~StatefulController() {
@@ -162,13 +166,13 @@
shared_ptr<proto::ckptmgr::StatefulConsistentCheckpoints> _new_ckpt,
proto::system::StatusCode _status) {
if (_status == proto::system::OK) {
- LOG(INFO) << "Successfully saved " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
- << " as the new globally consistent checkpoint";
ckpt_record_ = std::move(_new_ckpt);
- std::string oldest_ckpt =
- ckpt_record_->consistent_checkpoints(ckpt_record_->consistent_checkpoints_size() - 1)
- .checkpoint_id();
- ckpt_save_watcher_(oldest_ckpt);
+
+ LOG(INFO) << "Successfully saved "
+ << ckpt_record_->consistent_checkpoints(0).checkpoint_id()
+ << " as the latest globally consistent checkpoint";
+
+ ckpt_save_watcher_(*ckpt_record_);
} else {
LOG(ERROR) << "Error saving " << _new_ckpt->consistent_checkpoints(0).checkpoint_id()
<< " as the new globally consistent checkpoint "
diff --git a/heron/tmaster/src/cpp/manager/stateful-controller.h b/heron/tmaster/src/cpp/manager/stateful-controller.h
index 6ceda37..4e3b618 100644
--- a/heron/tmaster/src/cpp/manager/stateful-controller.h
+++ b/heron/tmaster/src/cpp/manager/stateful-controller.h
@@ -58,7 +58,8 @@
shared_ptr<heron::common::HeronStateMgr> _state_mgr,
std::chrono::high_resolution_clock::time_point _tmaster_start_time,
shared_ptr<common::MetricsMgrSt> _metrics_manager_client,
- std::function<void(std::string)> _ckpt_save_watcher);
+ std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)>
+ _ckpt_save_watcher);
virtual ~StatefulController();
// Start a new restore process
void StartRestore(const StMgrMap& _stmgrs, bool _ignore_prev_checkpoints);
@@ -102,7 +103,7 @@
unique_ptr<StatefulRestorer> restorer_;
shared_ptr<common::MetricsMgrSt> metrics_manager_client_;
shared_ptr<common::MultiCountMetric> count_metrics_;
- std::function<void(std::string)> ckpt_save_watcher_;
+ std::function<void(const proto::ckptmgr::StatefulConsistentCheckpoints&)> ckpt_save_watcher_;
};
} // namespace tmaster
} // namespace heron
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.cpp b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
index 46be97d..b144244 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.cpp
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.cpp
@@ -107,10 +107,17 @@
}
void StMgrState::NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request) {
- LOG(INFO) << "Sending a new stateful checkpoint request to stmgr" << stmgr_->id();
+ LOG(INFO) << "Sending a new stateful checkpoint request to stmgr: " << stmgr_->id();
server_.SendMessage(connection_, _request);
}
+void StMgrState::SendCheckpointSavedMessage(
+ const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg) {
+ LOG(INFO) << "Sending checkpoint saved message to stmgr: " << stmgr_->id() << " "
+ << "for checkpoint: " << _msg.consistent_checkpoint().checkpoint_id();
+ server_.SendMessage(connection_, _msg);
+}
+
/*
void
StMgrState::AddAssignment(const std::vector<pair<string, sp_int32> >& _assignments,
diff --git a/heron/tmaster/src/cpp/manager/stmgrstate.h b/heron/tmaster/src/cpp/manager/stmgrstate.h
index 270d93a..d9d9159 100644
--- a/heron/tmaster/src/cpp/manager/stmgrstate.h
+++ b/heron/tmaster/src/cpp/manager/stmgrstate.h
@@ -70,6 +70,8 @@
// Send stateful checkpoint message to the stmgr
void NewStatefulCheckpoint(const proto::ckptmgr::StartStatefulCheckpoint& _request);
+ void SendCheckpointSavedMessage(const proto::ckptmgr::StatefulConsistentCheckpointSaved &_msg);
+
bool TimedOut() const;
diff --git a/heron/tmaster/src/cpp/manager/tmaster.cpp b/heron/tmaster/src/cpp/manager/tmaster.cpp
index 304634f..36b7845 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.cpp
+++ b/heron/tmaster/src/cpp/manager/tmaster.cpp
@@ -398,8 +398,8 @@
config::TopologyConfigHelper::GetStatefulCheckpointIntervalSecsWithDefault(*topology_, 300);
CHECK_GT(stateful_checkpoint_interval, 0);
- auto cb = [this](std::string _oldest_ckptid) {
- this->HandleStatefulCheckpointSave(_oldest_ckptid);
+ auto cb = [this](const proto::ckptmgr::StatefulConsistentCheckpoints& new_ckpts) {
+ this->HandleStatefulCheckpointSave(new_ckpts);
};
// Instantiate the stateful restorer/coordinator
stateful_controller_ = make_unique<StatefulController>(topology_->name(), _ckpt, state_mgr_,
@@ -616,8 +616,22 @@
ckptmgr_client_->SendCleanStatefulCheckpointRequest("", true);
}
-void TMaster::HandleStatefulCheckpointSave(const std::string& _oldest_ckpt) {
- ckptmgr_client_->SendCleanStatefulCheckpointRequest(_oldest_ckpt, false);
+void TMaster::HandleStatefulCheckpointSave(
+ const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts) {
+ // broadcast globally consistent checkpoint completion
+ proto::ckptmgr::StatefulConsistentCheckpointSaved msg;
+ msg.mutable_consistent_checkpoint()->CopyFrom(new_ckpts.consistent_checkpoints(0));
+
+ for (auto & stmgr : stmgrs_) {
+ stmgr.second->SendCheckpointSavedMessage(msg);
+ }
+
+ // clean oldest checkpoint on save
+ std::string oldest_ckpt_id =
+ new_ckpts.consistent_checkpoints(new_ckpts.consistent_checkpoints_size() - 1)
+ .checkpoint_id();
+
+ ckptmgr_client_->SendCleanStatefulCheckpointRequest(oldest_ckpt_id, false);
}
// Called when ckptmgr completes the clean stateful checkpoint request
diff --git a/heron/tmaster/src/cpp/manager/tmaster.h b/heron/tmaster/src/cpp/manager/tmaster.h
index d37fbaa..3bdd25e 100644
--- a/heron/tmaster/src/cpp/manager/tmaster.h
+++ b/heron/tmaster/src/cpp/manager/tmaster.h
@@ -195,7 +195,8 @@
const ComponentConfigMap& _config);
// Function called when a new stateful ckpt record is saved
- void HandleStatefulCheckpointSave(const std::string& _oldest_ckpt);
+ void HandleStatefulCheckpointSave(
+ const proto::ckptmgr::StatefulConsistentCheckpoints &new_ckpts);
// Function called to kill container
void KillContainer(const std::string& host_name,