refactor task-specific topologycontext from a more general topology context
diff --git a/src/jvm/backtype/storm/task/GeneralTopologyContext.java b/src/jvm/backtype/storm/task/GeneralTopologyContext.java
new file mode 100644
index 0000000..da8d3a7
--- /dev/null
+++ b/src/jvm/backtype/storm/task/GeneralTopologyContext.java
@@ -0,0 +1,193 @@
+package backtype.storm.task;
+
+import backtype.storm.Config;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.StreamInfo;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.ThriftTopologyUtils;
+import backtype.storm.utils.Utils;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.json.simple.JSONValue;
+import org.json.simple.JSONAware;
+
+/**
+ * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
+ * methods, respectively. This object provides information about the component's
+ * place within the topology, such as task ids, inputs and outputs, etc.
+ *
+ * <p>The TopologyContext is also used to declare ISubscribedState objects to
+ * synchronize state with StateSpouts this object is subscribed to.</p>
+ */
+public class GeneralTopologyContext implements JSONAware {
+ private StormTopology _topology;
+ private Map<Integer, String> _taskToComponent;
+ private Map<String, List<Integer>> _componentToTasks;
+ private String _stormId;
+ protected Map _stormConf;
+
+ public GeneralTopologyContext(StormTopology topology, Map stormConf,
+ Map<Integer, String> taskToComponent, String stormId) {
+ _topology = topology;
+ _stormConf = stormConf;
+ _taskToComponent = taskToComponent;
+ _stormId = stormId;
+ _componentToTasks = new HashMap<String, List<Integer>>();
+ for(Integer task: taskToComponent.keySet()) {
+ String component = taskToComponent.get(task);
+ List<Integer> curr = _componentToTasks.get(component);
+ if(curr==null) curr = new ArrayList<Integer>();
+ curr.add(task);
+ _componentToTasks.put(component, curr);
+ }
+ for(String component: _componentToTasks.keySet()) {
+ List<Integer> tasks = _componentToTasks.get(component);
+ Collections.sort(tasks);
+ }
+ }
+
+ /**
+ * Gets the unique id assigned to this topology. The id is the storm name with a
+ * unique nonce appended to it.
+ * @return the storm id
+ */
+ public String getStormId() {
+ return _stormId;
+ }
+
+ /**
+ * Gets the Thrift object representing the topology.
+ *
+ * @return the Thrift definition representing the topology
+ */
+ public StormTopology getRawTopology() {
+ return _topology;
+ }
+
+ /**
+ * Gets the component id for the specified task id. The component id maps
+ * to a component id specified for a Spout or Bolt in the topology definition.
+ *
+ * @param taskId the task id
+ * @return the component id for the input task id
+ */
+ public String getComponentId(int taskId) {
+ return _taskToComponent.get(taskId);
+ }
+
+ /**
+ * Gets the set of streams declared for the specified component.
+ */
+ public Set<String> getComponentStreams(String componentId) {
+ return getComponentCommon(componentId).get_streams().keySet();
+ }
+
+ /**
+ * Gets the task ids allocated for the given component id. The task ids are
+ * always returned in ascending order.
+ */
+ public List<Integer> getComponentTasks(String componentId) {
+ List<Integer> ret = _componentToTasks.get(componentId);
+ if(ret==null) return new ArrayList<Integer>();
+ else return new ArrayList<Integer>(ret);
+ }
+
+ /**
+ * Gets the declared output fields for the specified component/stream.
+ */
+ public Fields getComponentOutputFields(String componentId, String streamId) {
+ StreamInfo streamInfo = getComponentCommon(componentId).get_streams().get(streamId);
+ if(streamInfo==null) {
+ throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);
+ }
+ return new Fields(streamInfo.get_output_fields());
+ }
+
+ /**
+ * Gets the declared output fields for the specified global stream id.
+ */
+ public Fields getComponentOutputFields(GlobalStreamId id) {
+ return getComponentOutputFields(id.get_componentId(), id.get_streamId());
+ }
+
+ /**
+ * Gets the declared inputs to the specified component.
+ *
+ * @return A map from subscribed component/stream to the grouping subscribed with.
+ */
+ public Map<GlobalStreamId, Grouping> getSources(String componentId) {
+ return getComponentCommon(componentId).get_inputs();
+ }
+
+ /**
+ * Gets information about who is consuming the outputs of the specified component,
+ * and how.
+ *
+ * @return Map from stream id to component id to the Grouping used.
+ */
+ public Map<String, Map<String, Grouping>> getTargets(String componentId) {
+ Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
+ for(String otherComponentId: getComponentIds()) {
+ Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();
+ for(GlobalStreamId id: inputs.keySet()) {
+ if(id.get_componentId().equals(componentId)) {
+ Map<String, Grouping> curr = ret.get(id.get_streamId());
+ if(curr==null) curr = new HashMap<String, Grouping>();
+ curr.put(otherComponentId, inputs.get(id));
+ ret.put(id.get_streamId(), curr);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public String toJSONString() {
+ Map obj = new HashMap();
+ obj.put("task->component", _taskToComponent);
+ // TODO: jsonify StormTopology
+ // at the minimum should send source info
+ return JSONValue.toJSONString(obj);
+ }
+
+ /**
+ * Gets a map from task id to component id.
+ */
+ public Map<Integer, String> getTaskToComponent() {
+ return _taskToComponent;
+ }
+
+ /**
+ * Gets a list of all component ids in this topology
+ */
+ public Set<String> getComponentIds() {
+ return ThriftTopologyUtils.getComponentIds(getRawTopology());
+ }
+
+ public ComponentCommon getComponentCommon(String componentId) {
+ return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId);
+ }
+
+ public int maxTopologyMessageTimeout() {
+ Integer max = Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ for(String spout: getRawTopology().get_spouts().keySet()) {
+ ComponentCommon common = getComponentCommon(spout);
+ String jsonConf = common.get_json_conf();
+ if(jsonConf!=null) {
+ Map conf = (Map) JSONValue.parse(jsonConf);
+ Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
+ if(comp!=null) {
+ max = Math.max(Utils.getInt(comp), max);
+ }
+ }
+ }
+ return max;
+ }
+}
\ No newline at end of file
diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java
index 7cfe9b5..8778cb2 100644
--- a/src/jvm/backtype/storm/task/TopologyContext.java
+++ b/src/jvm/backtype/storm/task/TopologyContext.java
@@ -20,7 +20,6 @@
import java.util.Set;
import org.apache.commons.lang.NotImplementedException;
import org.json.simple.JSONValue;
-import org.json.simple.JSONAware;
/**
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
@@ -30,17 +29,12 @@
* <p>The TopologyContext is also used to declare ISubscribedState objects to
* synchronize state with StateSpouts this object is subscribed to.</p>
*/
-public class TopologyContext implements JSONAware {
- private StormTopology _topology;
- private Map<Integer, String> _taskToComponent;
+public class TopologyContext extends GeneralTopologyContext {
private Integer _taskId;
- private Map<String, List<Integer>> _componentToTasks;
private String _codeDir;
private String _pidDir;
- private String _stormId;
private Object _taskData = null;
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
- private Map _stormConf;
private Integer _workerPort;
private List<Integer> _workerTasks;
@@ -48,28 +42,13 @@
Map<Integer, String> taskToComponent, String stormId,
String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks) {
- _topology = topology;
- _stormConf = stormConf;
+ super(topology, stormConf, taskToComponent, stormId);
_workerPort = workerPort;
- _taskToComponent = taskToComponent;
- _stormId = stormId;
_taskId = taskId;
- _componentToTasks = new HashMap<String, List<Integer>>();
_pidDir = pidDir;
_codeDir = codeDir;
_workerTasks = new ArrayList<Integer>(workerTasks);
Collections.sort(_workerTasks);
- for(Integer task: taskToComponent.keySet()) {
- String component = taskToComponent.get(task);
- List<Integer> curr = _componentToTasks.get(component);
- if(curr==null) curr = new ArrayList<Integer>();
- curr.add(task);
- _componentToTasks.put(component, curr);
- }
- for(String component: _componentToTasks.keySet()) {
- List<Integer> tasks = _componentToTasks.get(component);
- Collections.sort(tasks);
- }
}
/**
@@ -128,15 +107,6 @@
}
/**
- * Gets the unique id assigned to this topology. The id is the storm name with a
- * unique nonce appended to it.
- * @return the storm id
- */
- public String getStormId() {
- return _stormId;
- }
-
- /**
* Gets the task id of this task.
*
* @return the task id
@@ -146,26 +116,6 @@
}
/**
- * Gets the Thrift object representing the topology.
- *
- * @return the Thrift definition representing the topology
- */
- public StormTopology getRawTopology() {
- return _topology;
- }
-
- /**
- * Gets the component id for the specified task id. The component id maps
- * to a component id specified for a Spout or Bolt in the topology definition.
- *
- * @param taskId the task id
- * @return the component id for the input task id
- */
- public String getComponentId(int taskId) {
- return _taskToComponent.get(taskId);
- }
-
- /**
* Gets the component id for this task. The component id maps
* to a component id specified for a Spout or Bolt in the topology definition.
* @return
@@ -198,23 +148,6 @@
}
/**
- * Gets the set of streams declared for the specified component.
- */
- public Set<String> getComponentStreams(String componentId) {
- return getComponentCommon(componentId).get_streams().keySet();
- }
-
- /**
- * Gets the task ids allocated for the given component id. The task ids are
- * always returned in ascending order.
- */
- public List<Integer> getComponentTasks(String componentId) {
- List<Integer> ret = _componentToTasks.get(componentId);
- if(ret==null) return new ArrayList<Integer>();
- else return new ArrayList<Integer>(ret);
- }
-
- /**
* Gets the index of this task id in getComponentTasks(getThisComponentId()).
* An example use case for this method is determining which task
* accesses which resource in a distributed resource to ensure an even distribution.
@@ -229,24 +162,6 @@
}
throw new RuntimeException("Fatal: could not find this task id in this component");
}
-
- /**
- * Gets the declared output fields for the specified component/stream.
- */
- public Fields getComponentOutputFields(String componentId, String streamId) {
- StreamInfo streamInfo = getComponentCommon(componentId).get_streams().get(streamId);
- if(streamInfo==null) {
- throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);
- }
- return new Fields(streamInfo.get_output_fields());
- }
-
- /**
- * Gets the declared output fields for the specified global stream id.
- */
- public Fields getComponentOutputFields(GlobalStreamId id) {
- return getComponentOutputFields(id.get_componentId(), id.get_streamId());
- }
/**
* Gets the declared inputs to this component.
@@ -256,15 +171,6 @@
public Map<GlobalStreamId, Grouping> getThisSources() {
return getSources(getThisComponentId());
}
-
- /**
- * Gets the declared inputs to the specified component.
- *
- * @return A map from subscribed component/stream to the grouping subscribed with.
- */
- public Map<GlobalStreamId, Grouping> getSources(String componentId) {
- return getComponentCommon(componentId).get_inputs();
- }
/**
* Gets information about who is consuming the outputs of this component, and how.
@@ -276,37 +182,6 @@
}
/**
- * Gets information about who is consuming the outputs of the specified component,
- * and how.
- *
- * @return Map from stream id to component id to the Grouping used.
- */
- public Map<String, Map<String, Grouping>> getTargets(String componentId) {
- Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
- for(String otherComponentId: getComponentIds()) {
- Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();
- for(GlobalStreamId id: inputs.keySet()) {
- if(id.get_componentId().equals(componentId)) {
- Map<String, Grouping> curr = ret.get(id.get_streamId());
- if(curr==null) curr = new HashMap<String, Grouping>();
- curr.put(otherComponentId, inputs.get(id));
- ret.put(id.get_streamId(), curr);
- }
- }
- }
- return ret;
- }
-
- public String toJSONString() {
- Map obj = new HashMap();
- obj.put("taskid", _taskId);
- obj.put("task->component", _taskToComponent);
- // TODO: jsonify StormTopology
- // at the minimum should send source info
- return JSONValue.toJSONString(obj);
- }
-
- /**
* Gets the location of the external resources for this worker on the
* local filesystem. These external resources typically include bolts implemented
* in other languages, such as Ruby or Python.
@@ -323,24 +198,6 @@
public String getPIDDir() {
return _pidDir;
}
-
- /**
- * Gets a map from task id to component id.
- */
- public Map<Integer, String> getTaskToComponent() {
- return _taskToComponent;
- }
-
- /**
- * Gets a list of all component ids in this topology
- */
- public Set<String> getComponentIds() {
- return ThriftTopologyUtils.getComponentIds(getRawTopology());
- }
-
- public ComponentCommon getComponentCommon(String componentId) {
- return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId);
- }
public void setTaskData(Object data) {
_taskData = data;
@@ -350,22 +207,6 @@
return _taskData;
}
- public int maxTopologyMessageTimeout() {
- Integer max = Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
- for(String spout: getRawTopology().get_spouts().keySet()) {
- ComponentCommon common = getComponentCommon(spout);
- String jsonConf = common.get_json_conf();
- if(jsonConf!=null) {
- Map conf = (Map) JSONValue.parse(jsonConf);
- Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
- if(comp!=null) {
- max = Math.max(Utils.getInt(comp), max);
- }
- }
- }
- return max;
- }
-
public Integer getThisWorkerPort() {
return _workerPort;
}