blob: 69cb175a8341dfd714d7e578bab9645395eff392 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.common.utils.misc;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.twitter.heron.api.Config;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.grouping.CustomStreamGrouping;
import com.twitter.heron.api.utils.Utils;
import com.twitter.heron.common.utils.metrics.MetricsCollector;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.proto.system.PhysicalPlans;
/**
* PhysicalPlanHelper could use to fetch this instance's info according to workId
*/
public class PhysicalPlanHelper {
private final PhysicalPlans.PhysicalPlan pplan;
private final int myTaskId;
private final String myComponent;
private final String hostname;
private final String myInstanceId;
private final TopologyAPI.Component component;
// Map from streamid to number of fields in that stream's schema
private final Map<String, Integer> outputSchema;
private final CustomStreamGroupingHelper customGrouper;
private PhysicalPlans.Instance myInstance;
private TopologyAPI.Spout mySpout;
private TopologyAPI.Bolt myBolt;
private TopologyContextImpl topologyContext;
private final boolean isTerminatedComponent;
/**
* Constructor for physical plan helper
*/
public PhysicalPlanHelper(PhysicalPlans.PhysicalPlan pplan, String instanceId) {
this.pplan = pplan;
// Get my instance
for (int i = 0; i < pplan.getInstancesCount(); ++i) {
if (pplan.getInstances(i).getInstanceId().equals(instanceId)) {
myInstance = pplan.getInstances(i);
}
}
if (myInstance == null) {
throw new RuntimeException("There was no instance that matched my id " + instanceId);
}
myComponent = myInstance.getInfo().getComponentName();
myTaskId = myInstance.getInfo().getTaskId();
myInstanceId = myInstance.getInstanceId();
// Am i a spout or a bolt
TopologyAPI.Topology topo = pplan.getTopology();
for (int i = 0; i < topo.getSpoutsCount(); ++i) {
if (topo.getSpouts(i).getComp().getName().equals(myComponent)) {
mySpout = topo.getSpouts(i);
break;
}
}
for (int i = 0; i < topo.getBoltsCount(); ++i) {
if (topo.getBolts(i).getComp().getName().equals(myComponent)) {
myBolt = topo.getBolts(i);
break;
}
}
if (mySpout != null && myBolt != null) {
throw new RuntimeException("MyTaskId is both a bolt or a spout " + myTaskId);
}
if (mySpout == null && myBolt == null) {
throw new RuntimeException("MyTaskId is neither a bolt or a spout " + myTaskId);
}
// setup outputSchema
outputSchema = new HashMap<String, Integer>();
List<TopologyAPI.OutputStream> outputs;
if (mySpout != null) {
outputs = mySpout.getOutputsList();
component = mySpout.getComp();
} else {
outputs = myBolt.getOutputsList();
component = myBolt.getComp();
}
for (TopologyAPI.OutputStream outputStream : outputs) {
outputSchema.put(outputStream.getStream().getId(),
outputStream.getSchema().getKeysCount());
}
try {
this.hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new RuntimeException("GetHostName failed");
}
// Do some setup for any custom grouping
customGrouper = new CustomStreamGroupingHelper();
// Do we have any bolt that consumes any of my streams using custom grouping
for (int i = 0; i < topo.getBoltsCount(); ++i) {
for (TopologyAPI.InputStream inputStream : topo.getBolts(i).getInputsList()) {
if (inputStream.getStream().getComponentName().equals(myComponent)
&& inputStream.getGtype() == TopologyAPI.Grouping.CUSTOM) {
// This dude takes my output in custom grouping manner
// This assumes that this custom grouping object is Java-serialized.
CustomStreamGrouping customStreamGrouping =
(CustomStreamGrouping) Utils.deserialize(
inputStream.getCustomGroupingObject().toByteArray());
customGrouper.add(inputStream.getStream().getId(),
getTaskIdsAsListForComponent(topo.getBolts(i).getComp().getName()),
customStreamGrouping, myComponent);
}
}
}
// Check whether it is a terminated bolt
HashSet<String> terminals = getTerminatedComponentSet();
this.isTerminatedComponent = terminals.contains(myComponent);
}
public void checkOutputSchema(String streamId, List<Object> tuple) {
// First do some checking to make sure that the number of fields match
// whats expected
Integer size = outputSchema.get(streamId);
if (size == null) {
throw new RuntimeException(myComponent + " emitting stream " + streamId
+ " but was not declared in declareOutputFields");
} else if (!size.equals(tuple.size())) {
throw new RuntimeException("Number of fields emitted in stream " + streamId
+ " does not match whats expected. Expected "
+ Integer.toString(size)
+ " Observed " + Integer.toString(tuple.size()));
}
// TODO:- Do more checks wrt type
}
public TopologyAPI.TopologyState getTopologyState() {
return pplan.getTopology().getState();
}
public int getMyTaskId() {
return myTaskId;
}
// Accessors
public String getMyHostname() {
return hostname;
}
public String getMyInstanceId() {
return myInstanceId;
}
public int getMyInstanceIndex() {
return myInstance.getInfo().getComponentIndex();
}
public String getMyComponent() {
return myComponent;
}
public TopologyAPI.Spout getMySpout() {
return mySpout;
}
public TopologyAPI.Bolt getMyBolt() {
return myBolt;
}
public TopologyContextImpl getTopologyContext() {
return topologyContext;
}
public void setTopologyContext(MetricsCollector metricsCollector) {
topologyContext =
new TopologyContextImpl(mergeConfigs(pplan.getTopology().getTopologyConfig(), component),
pplan.getTopology(), makeTaskToComponentMap(), myTaskId, metricsCollector);
}
private Map<String, Object> mergeConfigs(TopologyAPI.Config config,
TopologyAPI.Component acomponent) {
Map<String, Object> map = new HashMap<>();
addConfigsToMap(config, map);
addConfigsToMap(acomponent.getConfig(), map); // Override any component specific configs
return map;
}
private void addConfigsToMap(TopologyAPI.Config config, Map<String, Object> map) {
for (TopologyAPI.Config.KeyValue kv : config.getKvsList()) {
if (!kv.getValue().isEmpty()) {
map.put(kv.getKey(), kv.getValue());
} else {
map.put(kv.getKey(), Utils.deserialize(kv.getSerializedValue().toByteArray()));
}
}
}
private Map<Integer, String> makeTaskToComponentMap() {
Map<Integer, String> retval = new HashMap<Integer, String>();
for (PhysicalPlans.Instance instance : pplan.getInstancesList()) {
retval.put(instance.getInfo().getTaskId(),
instance.getInfo().getComponentName());
}
return retval;
}
private List<Integer> getTaskIdsAsListForComponent(String comp) {
List<Integer> retval = new LinkedList<Integer>();
for (PhysicalPlans.Instance instance : pplan.getInstancesList()) {
if (instance.getInfo().getComponentName().equals(comp)) {
retval.add(instance.getInfo().getTaskId());
}
}
return retval;
}
public void prepareForCustomStreamGrouping() {
customGrouper.prepare(topologyContext);
}
public List<Integer> chooseTasksForCustomStreamGrouping(String streamId, List<Object> values) {
return customGrouper.chooseTasks(streamId, values);
}
public boolean isTerminatedComponent() {
return isTerminatedComponent;
}
private HashSet<String> getTerminatedComponentSet() {
Map<String, TopologyAPI.Spout> spouts = new HashMap<>();
Map<String, HashSet<String>> prev = new HashMap<>();
for (TopologyAPI.Spout spout : pplan.getTopology().getSpoutsList()) {
String name = spout.getComp().getName();
spouts.put(name, spout);
}
// We will build the structure of the topologyBlr - a graph directed from children to parents,
// by looking only on bolts, since spout will not have parents
for (TopologyAPI.Bolt bolt : pplan.getTopology().getBoltsList()) {
String name = bolt.getComp().getName();
// To get the parent's component to construct a graph of topology structure
for (TopologyAPI.InputStream inputStream : bolt.getInputsList()) {
String parent = inputStream.getStream().getComponentName();
if (prev.containsKey(name)) {
prev.get(name).add(parent);
} else {
HashSet<String> parents = new HashSet<String>();
parents.add(parent);
prev.put(name, parents);
}
}
}
// To find the terminal bolts defined by users and link them with "AggregatorBolt"
// First, "it" of course needs upstream component, we don't want the isolated bolt
HashSet<String> terminals = new HashSet<>();
// Second, "it" should not exists in the prev.valueSet, which means, it has no downstream
HashSet<String> nonTerminals = new HashSet<>();
for (HashSet<String> set : prev.values()) {
nonTerminals.addAll(set);
}
// Here we iterate bolt in prev.keySet() rather than bolts.keySet() due to we don't want
// a isolated bolt, including AggregatorBolt
for (String bolt : prev.keySet()) {
if (!nonTerminals.contains(bolt)) {
terminals.add(bolt);
}
}
// We will also consider the cases with spouts without children
for (String spout : spouts.keySet()) {
if (!nonTerminals.contains(spout)) {
terminals.add(spout);
}
}
return terminals;
}
public boolean isCustomGroupingEmpty() {
return customGrouper.isCustomGroupingEmpty();
}
public boolean isTopologyStateful() {
Map<String, Object> config = topologyContext.getTopologyConfig();
Config.TopologyReliabilityMode mode =
Config.TopologyReliabilityMode.valueOf(
String.valueOf(config.get(Config.TOPOLOGY_RELIABILITY_MODE)));
return Config.TopologyReliabilityMode.EFFECTIVELY_ONCE.equals(mode);
}
public boolean isTopologyRunning() {
return getTopologyState().equals(TopologyAPI.TopologyState.RUNNING);
}
}