| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.storm.task; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.storm.Config; |
| import org.apache.storm.Constants; |
| import org.apache.storm.generated.ComponentCommon; |
| import org.apache.storm.generated.GlobalStreamId; |
| import org.apache.storm.generated.Grouping; |
| import org.apache.storm.generated.StormTopology; |
| import org.apache.storm.shade.org.json.simple.JSONAware; |
| import org.apache.storm.shade.org.json.simple.JSONValue; |
| import org.apache.storm.shade.org.json.simple.parser.ParseException; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.utils.ConfigUtils; |
| import org.apache.storm.utils.ObjectReader; |
| import org.apache.storm.utils.ThriftTopologyUtils; |
| |
| public class GeneralTopologyContext implements JSONAware { |
| protected Map<String, Object> topoConf; |
| protected boolean doSanityCheck; |
| private StormTopology topology; |
| private Map<Integer, String> taskToComponent; |
| private Map<String, List<Integer>> componentToTasks; |
| private Map<String, Map<String, Fields>> componentToStreamToFields; |
| private String stormId; |
| |
| // pass in componentToSortedTasks for the case of running tons of tasks in single executor |
| public GeneralTopologyContext(StormTopology topology, Map<String, Object> topoConf, |
| Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, |
| Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) { |
| this.topology = topology; |
| this.topoConf = topoConf; |
| this.taskToComponent = taskToComponent; |
| this.stormId = stormId; |
| componentToTasks = componentToSortedTasks; |
| this.componentToStreamToFields = componentToStreamToFields; |
| doSanityCheck = ConfigUtils.isLocalMode(this.topoConf); |
| } |
| |
| /** |
| * Gets the unique id assigned to this topology. The id is the storm name with a unique nonce appended to it. |
| * |
| * @return the storm id |
| */ |
| public String getStormId() { |
| return stormId; |
| } |
| |
| /** |
| * Gets the Thrift object representing the topology. |
| * |
| * @return the Thrift definition representing the topology |
| */ |
| public StormTopology getRawTopology() { |
| return topology; |
| } |
| |
| /** |
| * Gets the component id for the specified task id. The component id maps to a component id specified for a Spout or Bolt in the |
| * topology definition. |
| * |
| * @param taskId the task id |
| * @return the component id for the input task id |
| */ |
| public String getComponentId(int taskId) { |
| if (taskId == Constants.SYSTEM_TASK_ID) { |
| return Constants.SYSTEM_COMPONENT_ID; |
| } else { |
| return taskToComponent.get(taskId); |
| } |
| } |
| |
| /** |
| * Gets the set of streams declared for the specified component. |
| */ |
| public Set<String> getComponentStreams(String componentId) { |
| return getComponentCommon(componentId).get_streams().keySet(); |
| } |
| |
| /** |
| * Gets the task ids allocated for the given component id. The task ids are always returned in ascending order. |
| */ |
| public List<Integer> getComponentTasks(String componentId) { |
| List<Integer> ret = componentToTasks.get(componentId); |
| if (ret == null) { |
| return new ArrayList<>(); |
| } else { |
| return new ArrayList<>(ret); |
| } |
| } |
| |
| /** |
| * Gets the declared output fields for the specified component/stream. |
| */ |
| public Fields getComponentOutputFields(String componentId, String streamId) { |
| Fields ret = componentToStreamToFields.get(componentId).get(streamId); |
| if (ret == null) { |
| throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId); |
| } |
| return ret; |
| } |
| |
| /** |
| * Gets the declared output fields for the specified global stream id. |
| */ |
| public Fields getComponentOutputFields(GlobalStreamId id) { |
| return getComponentOutputFields(id.get_componentId(), id.get_streamId()); |
| } |
| |
| /** |
| * Gets the declared inputs to the specified component. |
| * |
| * @return A map from subscribed component/stream to the grouping subscribed with. |
| */ |
| public Map<GlobalStreamId, Grouping> getSources(String componentId) { |
| return getComponentCommon(componentId).get_inputs(); |
| } |
| |
| /** |
| * Gets information about who is consuming the outputs of the specified component, and how. |
| * |
| * @return Map from stream id to component id to the Grouping used. |
| */ |
| public Map<String, Map<String, Grouping>> getTargets(String componentId) { |
| Map<String, Map<String, Grouping>> ret = new HashMap<>(); |
| for (String otherComponentId : getComponentIds()) { |
| Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs(); |
| for (Map.Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) { |
| GlobalStreamId id = entry.getKey(); |
| if (id.get_componentId().equals(componentId)) { |
| Map<String, Grouping> curr = ret.get(id.get_streamId()); |
| if (curr == null) { |
| curr = new HashMap<>(); |
| } |
| curr.put(otherComponentId, entry.getValue()); |
| ret.put(id.get_streamId(), curr); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| @Override |
| public String toJSONString() { |
| Map<String, Object> obj = new HashMap<>(); |
| obj.put("task->component", taskToComponent); |
| // TODO: jsonify StormTopology |
| // at the minimum should send source info |
| return JSONValue.toJSONString(obj); |
| } |
| |
| /** |
| * Gets a map from task id to component id. |
| */ |
| public Map<Integer, String> getTaskToComponent() { |
| return taskToComponent; |
| } |
| |
| /** |
| * Gets a list of all component ids in this topology. |
| */ |
| public Set<String> getComponentIds() { |
| return ThriftTopologyUtils.getComponentIds(getRawTopology()); |
| } |
| |
| public ComponentCommon getComponentCommon(String componentId) { |
| return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId); |
| } |
| |
| public int maxTopologyMessageTimeout() { |
| Integer max = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); |
| for (String spout : getRawTopology().get_spouts().keySet()) { |
| ComponentCommon common = getComponentCommon(spout); |
| String jsonConf = common.get_json_conf(); |
| if (jsonConf != null) { |
| try { |
| Map<String, Object> conf = (Map) JSONValue.parseWithException(jsonConf); |
| Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS); |
| max = Math.max(ObjectReader.getInt(comp, max), max); |
| } catch (ParseException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| return max; |
| } |
| |
| public Map<String, Object> getConf() { |
| return topoConf; |
| } |
| |
| public boolean doSanityCheck() { |
| return doSanityCheck; |
| } |
| } |