ISpoutDelegate and IBoltDelegate to implement IUpdatable (#1749)

diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java
index 7674679..598dcc8 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java
@@ -15,14 +15,18 @@
 package com.twitter.heron.api.bolt;
 
 import java.util.Map;
+import java.util.logging.Logger;
 
 import com.twitter.heron.api.exception.FailedException;
 import com.twitter.heron.api.exception.ReportedFailedException;
+import com.twitter.heron.api.topology.IUpdatable;
 import com.twitter.heron.api.topology.OutputFieldsDeclarer;
 import com.twitter.heron.api.topology.TopologyContext;
 import com.twitter.heron.api.tuple.Tuple;
 
-public class BasicBoltExecutor implements IRichBolt {
+public class BasicBoltExecutor implements IRichBolt, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());
+
   private static final long serialVersionUID = 7021447981762957626L;
 
   private IBasicBolt bolt;
@@ -70,4 +74,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return bolt.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (bolt instanceof IUpdatable) {
+      ((IUpdatable) bolt).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), bolt));
+    }
+  }
 }
diff --git a/heron/examples/src/java/BUILD b/heron/examples/src/java/BUILD
index c74ab26..9b5fad5 100644
--- a/heron/examples/src/java/BUILD
+++ b/heron/examples/src/java/BUILD
@@ -4,6 +4,7 @@
     name='examples-unshaded',
     srcs = glob(["**/*.java"]),
     deps = [
+        "//heron/api/src/java:api-java",
         "//heron/common/src/java:basics-java",
         "//heron/storm/src/java:storm-compatibility-java",
     ],
diff --git a/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java b/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java
index 3a53362..47d768c 100644
--- a/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java
+++ b/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java
@@ -14,8 +14,10 @@
 
 package com.twitter.heron.examples;
 
+import java.util.List;
 import java.util.Map;
 
+import com.twitter.heron.api.topology.IUpdatable;
 import com.twitter.heron.common.basics.ByteAmount;
 
 import backtype.storm.Config;
@@ -68,7 +70,7 @@
     }
   }
 
-  public static class ExclamationBolt extends BaseRichBolt {
+  public static class ExclamationBolt extends BaseRichBolt implements IUpdatable {
 
     private static final long serialVersionUID = 1184860508880121352L;
     private long nItems;
@@ -76,10 +78,7 @@
 
     @Override
     @SuppressWarnings("rawtypes")
-    public void prepare(
-        Map conf,
-        TopologyContext context,
-        OutputCollector collector) {
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
       nItems = 0;
       startTime = System.currentTimeMillis();
     }
@@ -98,5 +97,28 @@
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
       // declarer.declare(new Fields("word"));
     }
+
+    /**
+     * Implementing this method is optional and only necessary if BOTH of the following are true:
+     *
+     * a.) you plan to dynamically scale your bolt/spout at runtime using 'heron update'.
+     * b.) you need to take action based on a runtime change to the component parallelism.
+     *
+     * Most bolts and spouts should be written to be unaffected by changes in their parallelism,
+     * but some must be aware of it. An example would be a spout that consumes a subset of queue
+     * partitions, which must be algorithmically divided amongst the total number of spouts.
+     * <P>
+     * Note that this method is from the IUpdatable Heron interface which does not exist in Storm.
+     * It is fine to implement IUpdatable along with other Storm interfaces, but implementing it
+     * will bind an otherwise generic Storm implementation to Heron.
+     *
+     * @param heronTopologyContext Heron topology context.
+     */
+    @Override
+    public void update(com.twitter.heron.api.topology.TopologyContext heronTopologyContext) {
+      List<Integer> newTaskIds =
+          heronTopologyContext.getComponentTasks(heronTopologyContext.getThisComponentId());
+      System.out.println("Bolt updated with new topologyContext. New taskIds: " + newTaskIds);
+    }
   }
 }
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 4e58588..73af001 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -550,7 +550,7 @@
             Log.info("Killing %s process with pid %d: %s" %
                      (process_info.name, process_info.pid, ' '.join(command)))
             try:
-              process_info.process.kill()
+              process_info.process.terminate()  # sends SIGTERM to process
             except OSError, e:
               if e.errno == 3: # No such process
                 Log.warn("Expected process %s with pid %d was not running, ignoring." %
diff --git a/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java b/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java
index 579fbe2..8cd6c34 100644
--- a/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java
+++ b/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java
@@ -19,12 +19,17 @@
 package backtype.storm.topology;
 
 import java.util.Map;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.topology.IUpdatable;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
 
-public class BasicBoltExecutor implements IRichBolt {
+public class BasicBoltExecutor implements IRichBolt, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());
+
   private static final long serialVersionUID = 4359767045622072660L;
   private IBasicBolt delegate;
   private transient BasicOutputCollector collector;
@@ -70,4 +75,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return delegate.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (delegate instanceof IUpdatable) {
+      ((IUpdatable) delegate).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+    }
+  }
 }
diff --git a/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java b/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java
index c8424aa..9289aa8 100644
--- a/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java
+++ b/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java
@@ -19,6 +19,9 @@
 package backtype.storm.topology;
 
 import java.util.Map;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.topology.IUpdatable;
 
 import backtype.storm.task.OutputCollectorImpl;
 import backtype.storm.task.TopologyContext;
@@ -28,7 +31,9 @@
  * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
  * to use to implement components of the topology.
  */
-public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt {
+public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(IRichBoltDelegate.class.getName());
+
   private static final long serialVersionUID = -3717575342431064148L;
   private IRichBolt delegate;
   private TopologyContext topologyContextImpl;
@@ -70,4 +75,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return delegate.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (delegate instanceof IUpdatable) {
+      ((IUpdatable) delegate).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+    }
+  }
 }
diff --git a/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java b/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java
index b8e6175..7ec12cd 100644
--- a/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java
+++ b/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java
@@ -19,8 +19,10 @@
 package backtype.storm.topology;
 
 import java.util.Map;
+import java.util.logging.Logger;
 
 import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.IUpdatable;
 
 import backtype.storm.spout.SpoutOutputCollectorImpl;
 import backtype.storm.task.TopologyContext;
@@ -29,7 +31,9 @@
  * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
  * to use to implement components of the topology.
  */
-public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout {
+public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(IRichSpoutDelegate.class.getName());
+
   private static final long serialVersionUID = -4310232227720592316L;
   private IRichSpout delegate;
   private TopologyContext topologyContextImpl;
@@ -88,4 +92,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return delegate.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (delegate instanceof IUpdatable) {
+      ((IUpdatable) delegate).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+    }
+  }
 }
diff --git a/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java b/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java
index b3f9dcb..a3d18fb 100644
--- a/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java
+++ b/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java
@@ -19,12 +19,17 @@
 package org.apache.storm.topology;
 
 import java.util.Map;
+import java.util.logging.Logger;
 
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 
-public class BasicBoltExecutor implements IRichBolt {
+import com.twitter.heron.api.topology.IUpdatable;
+
+public class BasicBoltExecutor implements IRichBolt, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());
+
   private static final long serialVersionUID = 235217339000923019L;
   private IBasicBolt delegate;
   private transient BasicOutputCollector collector;
@@ -70,4 +75,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return delegate.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (delegate instanceof IUpdatable) {
+      ((IUpdatable) delegate).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+    }
+  }
 }
diff --git a/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java
index dafa46e..eaa5adc 100644
--- a/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java
+++ b/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java
@@ -19,16 +19,21 @@
 package org.apache.storm.topology;
 
 import java.util.Map;
+import java.util.logging.Logger;
 
 import org.apache.storm.task.OutputCollectorImpl;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.TupleImpl;
 
+import com.twitter.heron.api.topology.IUpdatable;
+
 /**
  * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
  * to use to implement components of the topology.
  */
-public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt {
+public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(IRichBoltDelegate.class.getName());
+
   private static final long serialVersionUID = 8350418148268852902L;
   private IRichBolt delegate;
   private TopologyContext topologyContextImpl;
@@ -70,4 +75,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return delegate.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (delegate instanceof IUpdatable) {
+      ((IUpdatable) delegate).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+    }
+  }
 }
diff --git a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
index b0cf0ed..b6a8ac7 100644
--- a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
+++ b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
@@ -19,17 +19,21 @@
 package org.apache.storm.topology;
 
 import java.util.Map;
+import java.util.logging.Logger;
 
 import org.apache.storm.spout.SpoutOutputCollectorImpl;
 import org.apache.storm.task.TopologyContext;
 
 import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.IUpdatable;
 
 /**
  * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
  * to use to implement components of the topology.
  */
-public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout {
+public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout, IUpdatable {
+  private static final Logger LOG = Logger.getLogger(IRichSpoutDelegate.class.getName());
+
   private static final long serialVersionUID = -1543996045558101339L;
   private IRichSpout delegate;
   private TopologyContext topologyContextImpl;
@@ -88,4 +92,14 @@
   public Map<String, Object> getComponentConfiguration() {
     return delegate.getComponentConfiguration();
   }
+
+  @Override
+  public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+    if (delegate instanceof IUpdatable) {
+      ((IUpdatable) delegate).update(topologyContext);
+    } else {
+      LOG.warning(String.format("Update() event received but can not call update() on delegate "
+          + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+    }
+  }
 }