[work in progress] fix a bug for `deactivated heron-instance` during `heron update` (#2580)

fix a bug for `deactivated heron-instance` during `heron update`
diff --git a/heron/instance/src/java/com/twitter/heron/instance/Slave.java b/heron/instance/src/java/com/twitter/heron/instance/Slave.java
index 41e664a..2970a40 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/Slave.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/Slave.java
@@ -192,11 +192,6 @@
       return;
     }
 
-    if (!helper.isTopologyRunning()) {
-      LOG.info("Topology is not in RUNNING state. Instance is not started");
-      return;
-    }
-
     // Setting topology environment properties
     Map<String, Object> topoConf = helper.getTopologyContext().getTopologyConfig();
     if (topoConf.containsKey(Config.TOPOLOGY_ENVIRONMENT)) {
@@ -211,6 +206,10 @@
       // when a RestoreInstanceStateRequest is received
       if (isStatefulProcessingStarted) {
         instance.init(instanceState);
+        // Deactivate the instance if start with deactivated mode
+        if (TopologyAPI.TopologyState.PAUSED.equals(helper.getTopologyState())) {
+          instance.deactivate();
+        }
         instance.start();
         isInstanceStarted = true;
         LOG.info("Instance is started for stateful topology");
@@ -221,6 +220,10 @@
       // For non-stateful topology, any provided state will be ignored
       // by the instance
       instance.init(null);
+      // Deactivate the instance if start with deactivated mode
+      if (TopologyAPI.TopologyState.PAUSED.equals(helper.getTopologyState())) {
+        instance.deactivate();
+      }
       instance.start();
       isInstanceStarted = true;
       LOG.info("Instance is started for non-stateful topology");
diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 7e99117..100d293 100644
--- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -72,8 +72,6 @@
 
   private PhysicalPlanHelper helper;
 
-  private TopologyAPI.TopologyState topologyState;
-
   /**
    * Construct a SpoutInstance basing on given arguments
    */
@@ -186,8 +184,6 @@
   public void start() {
     // Add spout tasks for execution
     addSpoutsTasks();
-
-    topologyState = TopologyAPI.TopologyState.RUNNING;
   }
 
   @Override
@@ -213,14 +209,12 @@
   public void activate() {
     LOG.info("Spout is activated");
     spout.activate();
-    topologyState = TopologyAPI.TopologyState.RUNNING;
   }
 
   @Override
   public void deactivate() {
     LOG.info("Spout is deactivated");
     spout.deactivate();
-    topologyState = TopologyAPI.TopologyState.PAUSED;
   }
 
   // Tasks happen in every time looper is waken up
@@ -284,7 +278,7 @@
    */
   private boolean isContinueWork() {
     long maxSpoutPending = TypeUtils.getLong(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
-    return topologyState.equals(TopologyAPI.TopologyState.RUNNING)
+    return helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING)
         &&
         ((!ackEnabled && collector.isOutQueuesAvailable())
             ||
@@ -305,7 +299,7 @@
    */
   private boolean isProduceTuple() {
     return collector.isOutQueuesAvailable()
-        && topologyState.equals(TopologyAPI.TopologyState.RUNNING);
+        && helper.getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
   }
 
   protected void produceTuple() {
diff --git a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
index 10737c3..b5247f1 100644
--- a/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
+++ b/heron/scheduler-core/src/java/com/twitter/heron/scheduler/UpdateTopologyManager.java
@@ -165,10 +165,10 @@
 
     // update parallelism in updatedTopology since TMaster checks that
     // Sum(parallelism) == Sum(instances)
-    logFine("Update new Topology: %s", stateManager.updateTopology(updatedTopology, topologyName));
+    logInfo("Update new Topology: %s", stateManager.updateTopology(updatedTopology, topologyName));
 
     // update packing plan to trigger the scaling event
-    logFine("Update new PackingPlan: %s",
+    logInfo("Update new PackingPlan: %s",
         stateManager.updatePackingPlan(proposedProtoPackingPlan, topologyName));
 
     // reactivate topology