blob: ac4c85066983e90bc2ba3ef72bcb3a8361ea8b28 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.task;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.shade.org.json.simple.JSONAware;
import org.apache.storm.shade.org.json.simple.JSONValue;
import org.apache.storm.shade.org.json.simple.parser.ParseException;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ThriftTopologyUtils;
public class GeneralTopologyContext implements JSONAware {
protected Map<String, Object> topoConf;
protected boolean doSanityCheck;
private StormTopology topology;
private Map<Integer, String> taskToComponent;
private Map<String, List<Integer>> componentToTasks;
private Map<String, Map<String, Fields>> componentToStreamToFields;
private String stormId;
// pass in componentToSortedTasks for the case of running tons of tasks in single executor
public GeneralTopologyContext(StormTopology topology, Map<String, Object> topoConf,
Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks,
Map<String, Map<String, Fields>> componentToStreamToFields, String stormId) {
this.topology = topology;
this.topoConf = topoConf;
this.taskToComponent = taskToComponent;
this.stormId = stormId;
componentToTasks = componentToSortedTasks;
this.componentToStreamToFields = componentToStreamToFields;
doSanityCheck = ConfigUtils.isLocalMode(this.topoConf);
}
/**
* Gets the unique id assigned to this topology. The id is the storm name with a unique nonce appended to it.
*
* @return the storm id
*/
public String getStormId() {
return stormId;
}
/**
* Gets the Thrift object representing the topology.
*
* @return the Thrift definition representing the topology
*/
public StormTopology getRawTopology() {
return topology;
}
/**
* Gets the component id for the specified task id. The component id maps to a component id specified for a Spout or Bolt in the
* topology definition.
*
* @param taskId the task id
* @return the component id for the input task id
*/
public String getComponentId(int taskId) {
if (taskId == Constants.SYSTEM_TASK_ID) {
return Constants.SYSTEM_COMPONENT_ID;
} else {
return taskToComponent.get(taskId);
}
}
/**
* Gets the set of streams declared for the specified component.
*/
public Set<String> getComponentStreams(String componentId) {
return getComponentCommon(componentId).get_streams().keySet();
}
/**
* Gets the task ids allocated for the given component id. The task ids are always returned in ascending order.
*/
public List<Integer> getComponentTasks(String componentId) {
List<Integer> ret = componentToTasks.get(componentId);
if (ret == null) {
return new ArrayList<>();
} else {
return new ArrayList<>(ret);
}
}
/**
* Gets the declared output fields for the specified component/stream.
*/
public Fields getComponentOutputFields(String componentId, String streamId) {
Fields ret = componentToStreamToFields.get(componentId).get(streamId);
if (ret == null) {
throw new IllegalArgumentException("No output fields defined for component:stream " + componentId + ":" + streamId);
}
return ret;
}
/**
* Gets the declared output fields for the specified global stream id.
*/
public Fields getComponentOutputFields(GlobalStreamId id) {
return getComponentOutputFields(id.get_componentId(), id.get_streamId());
}
/**
* Gets the declared inputs to the specified component.
*
* @return A map from subscribed component/stream to the grouping subscribed with.
*/
public Map<GlobalStreamId, Grouping> getSources(String componentId) {
return getComponentCommon(componentId).get_inputs();
}
/**
* Gets information about who is consuming the outputs of the specified component, and how.
*
* @return Map from stream id to component id to the Grouping used.
*/
public Map<String, Map<String, Grouping>> getTargets(String componentId) {
Map<String, Map<String, Grouping>> ret = new HashMap<>();
for (String otherComponentId : getComponentIds()) {
Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();
for (Map.Entry<GlobalStreamId, Grouping> entry : inputs.entrySet()) {
GlobalStreamId id = entry.getKey();
if (id.get_componentId().equals(componentId)) {
Map<String, Grouping> curr = ret.get(id.get_streamId());
if (curr == null) {
curr = new HashMap<>();
}
curr.put(otherComponentId, entry.getValue());
ret.put(id.get_streamId(), curr);
}
}
}
return ret;
}
@Override
public String toJSONString() {
Map<String, Object> obj = new HashMap<>();
obj.put("task->component", taskToComponent);
// TODO: jsonify StormTopology
// at the minimum should send source info
return JSONValue.toJSONString(obj);
}
/**
* Gets a map from task id to component id.
*/
public Map<Integer, String> getTaskToComponent() {
return taskToComponent;
}
/**
* Gets a list of all component ids in this topology.
*/
public Set<String> getComponentIds() {
return ThriftTopologyUtils.getComponentIds(getRawTopology());
}
public ComponentCommon getComponentCommon(String componentId) {
return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId);
}
public int maxTopologyMessageTimeout() {
Integer max = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
for (String spout : getRawTopology().get_spouts().keySet()) {
ComponentCommon common = getComponentCommon(spout);
String jsonConf = common.get_json_conf();
if (jsonConf != null) {
try {
Map<String, Object> conf = (Map) JSONValue.parseWithException(jsonConf);
Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
max = Math.max(ObjectReader.getInt(comp, max), max);
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
return max;
}
public Map<String, Object> getConf() {
return topoConf;
}
public boolean doSanityCheck() {
return doSanityCheck;
}
}