blob: e34d3d51cec1abb44b783de2eee12f178c9b899e [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.integration_test.core;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import com.twitter.heron.api.Config;
import com.twitter.heron.api.HeronTopology;
import com.twitter.heron.api.bolt.IRichBolt;
import com.twitter.heron.api.bolt.IWindowedBolt;
import com.twitter.heron.api.bolt.WindowedBoltExecutor;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.spout.IRichSpout;
import com.twitter.heron.api.topology.BoltDeclarer;
import com.twitter.heron.api.topology.SpoutDeclarer;
import com.twitter.heron.api.topology.TopologyBuilder;
public class TestTopologyBuilder extends TopologyBuilder {
private static final int DEFAULT_EXECUTION_COUNT = 10;
// This variable will be used as input variable for constructor of our aggregator bolt
// This will determine the location of where our output is directed
// Could be URL, file location, etc.
private final String outputLocation;
private final String stateLocation;
private final String stateUpdateToken;
private final SpoutWrapperType spoutWrapperType;
// The structure of the topologyBlr - a graph directed from children to parents
private final Map<String, TopologyAPI.Bolt.Builder> bolts = new HashMap<>();
private final Map<String, TopologyAPI.Spout.Builder> spouts = new HashMap<>();
private final Map<String, HashSet<String>> prev = new HashMap<>();
// By default, terminalBoltClass will be AggregatorBolt, which writes to specified HTTP server
private String terminalBoltClass = "com.twitter.heron.integration_test.core.AggregatorBolt";
public enum SpoutWrapperType {
DEFAULT,
TWO_PHASE,
EMIT_UNTIL
}
public TestTopologyBuilder(String outputLocation) {
this(outputLocation, null, null, SpoutWrapperType.DEFAULT);
}
public TestTopologyBuilder(String outputLocation,
String stateLocation,
String stateUpdateToken,
SpoutWrapperType spoutWrapperType) {
this.outputLocation = outputLocation;
this.stateLocation = stateLocation;
this.stateUpdateToken = stateUpdateToken;
this.spoutWrapperType = spoutWrapperType;
}
@Override
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint) {
return setBolt(id, bolt, parallelismHint, true);
}
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelismHint, boolean ackAuto) {
return super.setBolt(id, new IntegrationTestBolt(bolt, ackAuto), parallelismHint);
}
public BoltDeclarer setBolt(String id, IWindowedBolt bolt,
Number parallelismHint, boolean ackAuto) throws
IllegalArgumentException {
return setBolt(id, new WindowedBoltExecutor(bolt), parallelismHint, ackAuto);
}
@Override
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelismHint) {
return setSpout(id, spout, parallelismHint, DEFAULT_EXECUTION_COUNT);
}
// A method allows user to define the maxExecutionCount of the spout
// To be compatible with earlier Integration Test Framework
public SpoutDeclarer setSpout(String id, IRichSpout spout,
Number parallelismHint, int maxExecutionCount) {
String topologyStartedUrl = null;
String tuplesEmittedUrl = null;
String topologyUpdateUrl = null;
if (stateLocation != null) {
topologyStartedUrl = stateLocation + "_topology_started";
tuplesEmittedUrl = stateLocation + "_tuples_emitted";
topologyUpdateUrl = stateLocation + "_" + stateUpdateToken;
}
IntegrationTestSpout wrappedSpout;
switch (spoutWrapperType) {
case TWO_PHASE:
wrappedSpout = new MultiPhaseTestSpout(spout, maxExecutionCount, 2,
new HttpGetCondition(topologyUpdateUrl), topologyStartedUrl);
break;
case EMIT_UNTIL:
wrappedSpout = new EmitUntilConditionTestSpout(spout,
new HttpGetCondition(topologyUpdateUrl), topologyStartedUrl, tuplesEmittedUrl);
break;
case DEFAULT:
default:
wrappedSpout = new IntegrationTestSpout(spout, maxExecutionCount, topologyStartedUrl);
}
return setSpout(id, wrappedSpout, parallelismHint);
}
private SpoutDeclarer setSpout(String id, IntegrationTestSpout itSpout, Number parallelismHint) {
return super.setSpout(id, itSpout, parallelismHint);
}
public void setTerminalBoltClass(String terminalBoltClass) {
this.terminalBoltClass = terminalBoltClass;
}
// By default, will use AggregatorBolt, which writes to HTTP Server and takes URL as input
@Override
public HeronTopology createTopology() {
// We will add the aggregation_bolt to be serialized
final String AGGREGATOR_BOLT = "__integration_test_aggregator_bolt";
BaseBatchBolt aggregatorBolt;
try {
// Terminal Bolt will be initialized using reflection, based on the value of terminal bolt
// class.
// class should be built on top of BaseBatchBolt abstract class, and can be changed using
// setTerminalBolt function
aggregatorBolt =
(BaseBatchBolt) Class.forName(terminalBoltClass).getConstructor(String.class)
.newInstance(this.outputLocation);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e + " Terminal Bolt class must have a single String constructor.");
} catch (InstantiationException e) {
throw new RuntimeException(e + " Terminal bolt class could not be instantiated.");
} catch (IllegalAccessException e) {
throw new RuntimeException(e + " Terminal Bolt class constructor is not accessible.");
} catch (InvocationTargetException e) {
throw new RuntimeException(e + " Terminal Bolt class constructor could not be invoked.");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e + " Terminal Bolt class must be a class path.");
}
setBolt(AGGREGATOR_BOLT, aggregatorBolt, 1);
// We get the user-defined TopologyAPI.Topology.Builder
TopologyAPI.Topology.Builder topologyBlr =
super.createTopology().
setConfig(new Config()).
setName("").
setState(TopologyAPI.TopologyState.RUNNING).
getTopology().toBuilder();
// Clear unnecessary fields to make the state of TopologyAPI.Topology.Builder clean
topologyBlr.clearTopologyConfig().clearName().clearState();
for (TopologyAPI.Spout.Builder spout : topologyBlr.getSpoutsBuilderList()) {
String name = spout.getComp().getName();
spouts.put(name, spout);
}
// We will build the structure of the topologyBlr - a graph directed from children to parents,
// by looking only on bolts, since spout will not have parents
for (TopologyAPI.Bolt.Builder bolt : topologyBlr.getBoltsBuilderList()) {
String name = bolt.getComp().getName();
bolts.put(name, bolt);
if (name.equals(AGGREGATOR_BOLT)) {
// We should not consider aggregator bolt in the topology's graph or terminal components
// since it is not user-defined
continue;
}
// To get the parent's component to construct a graph of topology structure
for (TopologyAPI.InputStream inputStream : bolt.getInputsList()) {
String parent = inputStream.getStream().getComponentName();
if (prev.containsKey(name)) {
prev.get(name).add(parent);
} else {
HashSet<String> parents = new HashSet<String>();
parents.add(parent);
prev.put(name, parents);
}
}
}
// To find the terminal bolts defined by users and link them with "AggregatorBolt"
// First, "it" of course needs upstream component, we don't want the isolated bolt
HashSet<String> terminals = new HashSet<>();
// Second, "it" should not exists in the prev.valueSet, which means, it has no downstream
HashSet<String> nonTerminals = new HashSet<>();
for (HashSet<String> set : prev.values()) {
nonTerminals.addAll(set);
}
// Here we iterate bolt in prev.keySet() rather than bolts.keySet() due to we don't want
// a isolated bolt, including AggregatorBolt
for (String bolt : prev.keySet()) {
if (!nonTerminals.contains(bolt)) {
terminals.add(bolt);
}
}
// We will also consider the cases with spouts without children
for (String spout : spouts.keySet()) {
if (!nonTerminals.contains(spout)) {
terminals.add(spout);
}
}
// Now first, we will add all grouping to components
for (String child : prev.keySet()) {
for (String parent : prev.get(child)) {
addAllGrouping(child, parent, Constants.INTEGRATION_TEST_CONTROL_STREAM_ID);
}
}
// Then we need to connect aggregator bolt with user's terminal components
// We could use any grouping but for convenience we would use allGrouping here
for (String t : terminals) {
List<TopologyAPI.OutputStream> osList;
if (bolts.get(t) != null) {
osList = bolts.get(t).getOutputsList();
} else {
osList = spouts.get(t).getOutputsList();
}
for (TopologyAPI.OutputStream os : osList) {
addAllGrouping(AGGREGATOR_BOLT, t, os.getStream().getId());
}
}
// We wrap it to the new topologyBuilder
return new HeronTopology(topologyBlr);
}
/**
* Given a child component and its upstream component, add the allGrouping between them with
* given streamId
*
* @param child the child component id
* @param parent the upstream component id
* @param streamId the stream id
*/
protected void addAllGrouping(String child, String parent, String streamId) {
TopologyAPI.InputStream.Builder builder = TopologyAPI.InputStream.newBuilder();
builder.setStream(TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(parent));
builder.setGtype(TopologyAPI.Grouping.ALL);
bolts.get(child).addInputs(builder);
}
}