ISpoutDelegate and IBoltDelegate to implement IUpdatable (#1749)
diff --git a/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java b/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java
index 7674679..598dcc8 100644
--- a/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java
+++ b/heron/api/src/java/com/twitter/heron/api/bolt/BasicBoltExecutor.java
@@ -15,14 +15,18 @@
package com.twitter.heron.api.bolt;
import java.util.Map;
+import java.util.logging.Logger;
import com.twitter.heron.api.exception.FailedException;
import com.twitter.heron.api.exception.ReportedFailedException;
+import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.api.topology.OutputFieldsDeclarer;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;
-public class BasicBoltExecutor implements IRichBolt {
+public class BasicBoltExecutor implements IRichBolt, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());
+
private static final long serialVersionUID = 7021447981762957626L;
private IBasicBolt bolt;
@@ -70,4 +74,14 @@
public Map<String, Object> getComponentConfiguration() {
return bolt.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (bolt instanceof IUpdatable) {
+ ((IUpdatable) bolt).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), bolt));
+ }
+ }
}
diff --git a/heron/examples/src/java/BUILD b/heron/examples/src/java/BUILD
index c74ab26..9b5fad5 100644
--- a/heron/examples/src/java/BUILD
+++ b/heron/examples/src/java/BUILD
@@ -4,6 +4,7 @@
name='examples-unshaded',
srcs = glob(["**/*.java"]),
deps = [
+ "//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
"//heron/storm/src/java:storm-compatibility-java",
],
diff --git a/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java b/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java
index 3a53362..47d768c 100644
--- a/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java
+++ b/heron/examples/src/java/com/twitter/heron/examples/ExclamationTopology.java
@@ -14,8 +14,10 @@
package com.twitter.heron.examples;
+import java.util.List;
import java.util.Map;
+import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.common.basics.ByteAmount;
import backtype.storm.Config;
@@ -68,7 +70,7 @@
}
}
- public static class ExclamationBolt extends BaseRichBolt {
+ public static class ExclamationBolt extends BaseRichBolt implements IUpdatable {
private static final long serialVersionUID = 1184860508880121352L;
private long nItems;
@@ -76,10 +78,7 @@
@Override
@SuppressWarnings("rawtypes")
- public void prepare(
- Map conf,
- TopologyContext context,
- OutputCollector collector) {
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
nItems = 0;
startTime = System.currentTimeMillis();
}
@@ -98,5 +97,28 @@
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declarer.declare(new Fields("word"));
}
+
+ /**
+ * Implementing this method is optional and only necessary if BOTH of the following are true:
+ *
+ * a.) you plan to dynamically scale your bolt/spout at runtime using 'heron update'.
+ * b.) you need to take action based on a runtime change to the component parallelism.
+ *
+ * Most bolts and spouts should be written to be unaffected by changes in their parallelism,
+ * but some must be aware of it. An example would be a spout that consumes a subset of queue
+ * partitions, which must be algorithmically divided amongst the total number of spouts.
+ * <P>
+ * Note that this method is from the IUpdatable Heron interface which does not exist in Storm.
+ * It is fine to implement IUpdatable along with other Storm interfaces, but implementing it
+ * will bind an otherwise generic Storm implementation to Heron.
+ *
+ * @param heronTopologyContext Heron topology context.
+ */
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext heronTopologyContext) {
+ List<Integer> newTaskIds =
+ heronTopologyContext.getComponentTasks(heronTopologyContext.getThisComponentId());
+ System.out.println("Bolt updated with new topologyContext. New taskIds: " + newTaskIds);
+ }
}
}
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 4e58588..73af001 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -550,7 +550,7 @@
Log.info("Killing %s process with pid %d: %s" %
(process_info.name, process_info.pid, ' '.join(command)))
try:
- process_info.process.kill()
+ process_info.process.terminate() # sends SIGTERM to process
except OSError, e:
if e.errno == 3: # No such process
Log.warn("Expected process %s with pid %d was not running, ignoring." %
diff --git a/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java b/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java
index 579fbe2..8cd6c34 100644
--- a/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java
+++ b/heron/storm/src/java/backtype/storm/topology/BasicBoltExecutor.java
@@ -19,12 +19,17 @@
package backtype.storm.topology;
import java.util.Map;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.topology.IUpdatable;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
-public class BasicBoltExecutor implements IRichBolt {
+public class BasicBoltExecutor implements IRichBolt, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());
+
private static final long serialVersionUID = 4359767045622072660L;
private IBasicBolt delegate;
private transient BasicOutputCollector collector;
@@ -70,4 +75,14 @@
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (delegate instanceof IUpdatable) {
+ ((IUpdatable) delegate).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+ }
+ }
}
diff --git a/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java b/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java
index c8424aa..9289aa8 100644
--- a/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java
+++ b/heron/storm/src/java/backtype/storm/topology/IRichBoltDelegate.java
@@ -19,6 +19,9 @@
package backtype.storm.topology;
import java.util.Map;
+import java.util.logging.Logger;
+
+import com.twitter.heron.api.topology.IUpdatable;
import backtype.storm.task.OutputCollectorImpl;
import backtype.storm.task.TopologyContext;
@@ -28,7 +31,9 @@
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
-public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt {
+public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(IRichBoltDelegate.class.getName());
+
private static final long serialVersionUID = -3717575342431064148L;
private IRichBolt delegate;
private TopologyContext topologyContextImpl;
@@ -70,4 +75,14 @@
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (delegate instanceof IUpdatable) {
+ ((IUpdatable) delegate).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+ }
+ }
}
diff --git a/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java b/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java
index b8e6175..7ec12cd 100644
--- a/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java
+++ b/heron/storm/src/java/backtype/storm/topology/IRichSpoutDelegate.java
@@ -19,8 +19,10 @@
package backtype.storm.topology;
import java.util.Map;
+import java.util.logging.Logger;
import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.IUpdatable;
import backtype.storm.spout.SpoutOutputCollectorImpl;
import backtype.storm.task.TopologyContext;
@@ -29,7 +31,9 @@
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
-public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout {
+public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(IRichSpoutDelegate.class.getName());
+
private static final long serialVersionUID = -4310232227720592316L;
private IRichSpout delegate;
private TopologyContext topologyContextImpl;
@@ -88,4 +92,14 @@
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (delegate instanceof IUpdatable) {
+ ((IUpdatable) delegate).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+ }
+ }
}
diff --git a/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java b/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java
index b3f9dcb..a3d18fb 100644
--- a/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java
+++ b/heron/storm/src/java/org/apache/storm/topology/BasicBoltExecutor.java
@@ -19,12 +19,17 @@
package org.apache.storm.topology;
import java.util.Map;
+import java.util.logging.Logger;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
-public class BasicBoltExecutor implements IRichBolt {
+import com.twitter.heron.api.topology.IUpdatable;
+
+public class BasicBoltExecutor implements IRichBolt, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(BasicBoltExecutor.class.getName());
+
private static final long serialVersionUID = 235217339000923019L;
private IBasicBolt delegate;
private transient BasicOutputCollector collector;
@@ -70,4 +75,14 @@
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (delegate instanceof IUpdatable) {
+ ((IUpdatable) delegate).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+ }
+ }
}
diff --git a/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java
index dafa46e..eaa5adc 100644
--- a/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java
+++ b/heron/storm/src/java/org/apache/storm/topology/IRichBoltDelegate.java
@@ -19,16 +19,21 @@
package org.apache.storm.topology;
import java.util.Map;
+import java.util.logging.Logger;
import org.apache.storm.task.OutputCollectorImpl;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.TupleImpl;
+import com.twitter.heron.api.topology.IUpdatable;
+
/**
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
-public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt {
+public class IRichBoltDelegate implements com.twitter.heron.api.bolt.IRichBolt, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(IRichBoltDelegate.class.getName());
+
private static final long serialVersionUID = 8350418148268852902L;
private IRichBolt delegate;
private TopologyContext topologyContextImpl;
@@ -70,4 +75,14 @@
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (delegate instanceof IUpdatable) {
+ ((IUpdatable) delegate).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+ }
+ }
}
diff --git a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
index b0cf0ed..b6a8ac7 100644
--- a/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
+++ b/heron/storm/src/java/org/apache/storm/topology/IRichSpoutDelegate.java
@@ -19,17 +19,21 @@
package org.apache.storm.topology;
import java.util.Map;
+import java.util.logging.Logger;
import org.apache.storm.spout.SpoutOutputCollectorImpl;
import org.apache.storm.task.TopologyContext;
import com.twitter.heron.api.spout.SpoutOutputCollector;
+import com.twitter.heron.api.topology.IUpdatable;
/**
* When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
* to use to implement components of the topology.
*/
-public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout {
+public class IRichSpoutDelegate implements com.twitter.heron.api.spout.IRichSpout, IUpdatable {
+ private static final Logger LOG = Logger.getLogger(IRichSpoutDelegate.class.getName());
+
private static final long serialVersionUID = -1543996045558101339L;
private IRichSpout delegate;
private TopologyContext topologyContextImpl;
@@ -88,4 +92,14 @@
public Map<String, Object> getComponentConfiguration() {
return delegate.getComponentConfiguration();
}
+
+ @Override
+ public void update(com.twitter.heron.api.topology.TopologyContext topologyContext) {
+ if (delegate instanceof IUpdatable) {
+ ((IUpdatable) delegate).update(topologyContext);
+ } else {
+ LOG.warning(String.format("Update() event received but can not call update() on delegate "
+ + "because it does not implement %s: %s", IUpdatable.class.getName(), delegate));
+ }
+ }
}