blob: 04dd216455493fc2c7a971ba9ab8f3acfc122bee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.task;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.json.simple.JSONAware;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.tuple.Fields;
// import org.apache.storm.generated.ComponentCommon;
// import org.apache.storm.generated.GlobalStreamId;
// import org.apache.storm.generated.Grouping;
// import org.apache.storm.utils.ThriftTopologyUtils;
public class GeneralTopologyContext implements JSONAware {
private org.apache.heron.api.topology.GeneralTopologyContext delegate;
@SuppressWarnings("rawtypes")
public GeneralTopologyContext(StormTopology topology, Map stormConf,
Map<Integer, String> taskToComponent,
Map<String, List<Integer>> componentToSortedTasks,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId) {
throw new RuntimeException("GeneralTopologyContext should never be initiated this way");
}
public GeneralTopologyContext(org.apache.heron.api.topology.GeneralTopologyContext delegate) {
this.delegate = delegate;
}
/**
* Gets the unique id assigned to this topology. The id is the storm name with a
* unique nonce appended to it.
*
* @return the storm id
*/
public String getStormId() {
return delegate.getTopologyId();
}
/**
* Gets the Thrift object representing the topology.
*
* @return the Thrift definition representing the topology
*/
@SuppressWarnings("deprecation")
public StormTopology getRawTopology() {
StormTopology stormTopology = new StormTopology();
Map<String, SpoutSpec> spouts = new HashMap<>();
for (TopologyAPI.Spout spout : this.delegate.getRawTopology().getSpoutsList()) {
spouts.put(spout.getComp().getName(), new SpoutSpec(spout));
}
Map<String, Bolt> bolts = new HashMap<>();
for (TopologyAPI.Bolt bolt : this.delegate.getRawTopology().getBoltsList()) {
bolts.put(bolt.getComp().getName(), new Bolt(bolt));
}
stormTopology.set_spouts(spouts);
stormTopology.set_bolts(bolts);
return stormTopology;
}
/**
* Gets the component id for the specified task id. The component id maps
* to a component id specified for a Spout or Bolt in the topology definition.
*
* @param taskId the task id
* @return the component id for the input task id
*/
public String getComponentId(int taskId) {
return delegate.getComponentId(taskId);
}
/**
* Gets the set of streams declared for the specified component.
*/
public Set<String> getComponentStreams(String componentId) {
return delegate.getComponentStreams(componentId);
}
/**
* Gets the task ids allocated for the given component id. The task ids are
* always returned in ascending order.
*/
public List<Integer> getComponentTasks(String componentId) {
return delegate.getComponentTasks(componentId);
}
/**
* Gets the declared output fields for the specified component/stream.
*/
public Fields getComponentOutputFields(String componentId, String streamId) {
return new Fields(delegate.getComponentOutputFields(componentId, streamId));
}
/**
* Gets the declared output fields for the specified global stream id.
*/
/*
public Fields getComponentOutputFields(GlobalStreamId id) {
}
*/
/**
* Gets the declared inputs to the specified component.
*
* @return A map from subscribed component/stream to the grouping subscribed with.
*/
/*
public Map<GlobalStreamId, Grouping> getSources(String componentId) {
return getComponentCommon(componentId).get_inputs();
}
*/
/**
* Gets information about who is consuming the outputs of the specified component,
* and how.
*
* @return Map from stream id to component id to the Grouping used.
*/
/*
public Map<String, Map<String, Grouping>> getTargets(String componentId) {
Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
for(String otherComponentId: getComponentIds()) {
Map<GlobalStreamId, Grouping> inputs =
getComponentCommon(otherComponentId).get_inputs();
for(GlobalStreamId id: inputs.keySet()) {
if(id.get_componentId().equals(componentId)) {
Map<String, Grouping> curr = ret.get(id.get_streamId());
if(curr==null) curr = new HashMap<String, Grouping>();
curr.put(otherComponentId, inputs.get(id));
ret.put(id.get_streamId(), curr);
}
}
}
return ret;
}
*/
@Override
public String toJSONString() {
throw new RuntimeException("toJSONString not implemented");
}
/**
* Gets a map from task id to component id.
*/
public Map<Integer, String> getTaskToComponent() {
return delegate.getTaskToComponent();
}
/**
* Gets a list of all component ids in this topology
*/
public Set<String> getComponentIds() {
return delegate.getComponentIds();
}
/*
public ComponentCommon getComponentCommon(String componentId) {
return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId);
}
*/
/*
public int maxTopologyMessageTimeout() {
Integer max = Utils.getInteger(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
for(String spout: getRawTopology().get_spouts().keySet()) {
ComponentCommon common = getComponentCommon(spout);
String jsonConf = common.get_json_conf();
if(jsonConf!=null) {
Map conf = (Map) JSONValue.parse(jsonConf);
Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
if(comp!=null) {
max = Math.max(Utils.getInteger(comp), max);
}
}
}
return max;
}
*/
}