[work in progress] fix a bug for `deactivated heron-instance` during `heron update` (#2580)
fix a bug for `deactivated heron-instance` during `heron update`
diff --git a/heron/instance/src/java/com/twitter/heron/instance/Slave.java b/heron/instance/src/java/com/twitter/heron/instance/Slave.java
index 41e664a..2970a40 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/Slave.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/Slave.java
@@ -192,11 +192,6 @@
return;
}
- if (!helper.isTopologyRunning()) {
- LOG.info("Topology is not in RUNNING state. Instance is not started");
- return;
- }
-
// Setting topology environment properties
Map<String, Object> topoConf = helper.getTopologyContext().getTopologyConfig();
if (topoConf.containsKey(Config.TOPOLOGY_ENVIRONMENT)) {
@@ -211,6 +206,10 @@
// when a RestoreInstanceStateRequest is received
if (isStatefulProcessingStarted) {
instance.init(instanceState);
+ // Deactivate the instance if start with deactivated mode
+ if (TopologyAPI.TopologyState.PAUSED.equals(helper.getTopologyState())) {
+ instance.deactivate();
+ }
instance.start();
isInstanceStarted = true;
LOG.info("Instance is started for stateful topology");
@@ -221,6 +220,10 @@
// For non-stateful topology, any provided state will be ignored
// by the instance
instance.init(null);
+ // Deactivate the instance if start with deactivated mode
+ if (TopologyAPI.TopologyState.PAUSED.equals(helper.getTopologyState())) {
+ instance.deactivate();
+ }
instance.start();
isInstanceStarted = true;
LOG.info("Instance is started for non-stateful topology");
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 7e99117..100d293 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -72,8 +72,6 @@
private PhysicalPlanHelper helper;
- private TopologyAPI.TopologyState topologyState;
-
/**
* Construct a SpoutInstance basing on given arguments
*/
@@ -186,8 +184,6 @@
public void start() {
// Add spout tasks for execution
addSpoutsTasks();
-
- topologyState = TopologyAPI.TopologyState.RUNNING;
}
@Override
@@ -213,14 +209,12 @@
public void activate() {
LOG.info("Spout is activated");
spout.activate();
- topologyState = TopologyAPI.TopologyState.RUNNING;
}
@Override
public void deactivate() {
LOG.info("Spout is deactivated");
spout.deactivate();
- topologyState = TopologyAPI.TopologyState.PAUSED;
}
// Tasks happen in every time looper is waken up
@@ -284,7 +278,7 @@
*/
private boolean isContinueWork() {
long maxSpoutPending = TypeUtils.getLong(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
- return topologyState.equals(TopologyAPI.TopologyState.RUNNING)
+ return helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING)
&&
((!ackEnabled && collector.isOutQueuesAvailable())
||
@@ -305,7 +299,7 @@
*/
private boolean isProduceTuple() {
return collector.isOutQueuesAvailable()
- && topologyState.equals(TopologyAPI.TopologyState.RUNNING);
+ && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
}
protected void produceTuple() {
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index 10737c3..b5247f1 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -165,10 +165,10 @@
// update parallelism in updatedTopology since TMaster checks that
// Sum(parallelism) == Sum(instances)
- logFine("Update new Topology: %s", stateManager.updateTopology(updatedTopology, topologyName));
+ logInfo("Update new Topology: %s", stateManager.updateTopology(updatedTopology, topologyName));
// update packing plan to trigger the scaling event
- logFine("Update new PackingPlan: %s",
+ logInfo("Update new PackingPlan: %s",
stateManager.updatePackingPlan(proposedProtoPackingPlan, topologyName));
// reactivate topology