blob: 8f46a33fbb404e3f1a4221348339c684a994702a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.CompletableSpout;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.FixedTuple;
import org.apache.storm.testing.FixedTupleSpout;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.MkTupleParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.testing.TrackedTopology;
import org.apache.storm.testing.TupleCaptureBolt;
import org.apache.storm.thrift.TException;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A utility that helps with testing topologies, Bolts and Spouts.
public class Testing {
* The default amount of wall time should be spent waiting for
* specific conditions to happen. Default is 10 seconds unless
* the environment variable STORM_TEST_TIMEOUT_MS is set.
public static final int TEST_TIMEOUT_MS;
private static final Logger LOG = LoggerFactory.getLogger(Testing.class);
static {
int timeout = 10_000;
try {
timeout = Integer.parseInt(System.getenv("STORM_TEST_TIMEOUT_MS"));
} catch (Exception e) {
//Ignored, will go with default timeout
TEST_TIMEOUT_MS = timeout;
* Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
* passed
* @param condition what we are waiting for
* @param body what to run in the loop
* @throws AssertionError if the loop timed out.
public static void whileTimeout(Condition condition, Runnable body) {
whileTimeout(TEST_TIMEOUT_MS, condition, body);
* Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
* passed
* @param timeoutMs the number of ms to wait before timing out.
* @param condition what we are waiting for
* @param body what to run in the loop
* @throws AssertionError if the loop timed out.
public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
long endTime = System.currentTimeMillis() + timeoutMs;
LOG.debug("Looping until {}", condition);
int count = 0;
while (condition.exec()) {
if (System.currentTimeMillis() > endTime) {"Condition {} not met in {} ms after calling {} times", condition, timeoutMs, count);;
throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition);
LOG.debug("Condition met {}", condition);
* Convenience method for
public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred) {
* Run with simulated time
* @deprecated use ```
* try (Time.SimulatedTime time = new Time.SimulatedTime()) {
* ...
* }
* ```
* @param code what to run
public static void withSimulatedTime(Runnable code) {
try (SimulatedTime st = new SimulatedTime()) {;
private static LocalCluster cluster(MkClusterParam param, boolean simulated) throws Exception {
return cluster(param, null, simulated);
private static LocalCluster cluster(MkClusterParam param) throws Exception {
return cluster(param, null, false);
private static LocalCluster cluster(MkClusterParam param, String id, boolean simulated) throws Exception {
Integer supervisors = param.getSupervisors();
if (supervisors == null) {
supervisors = 2;
Integer ports = param.getPortsPerSupervisor();
if (ports == null) {
ports = 3;
Map<String, Object> conf = param.getDaemonConf();
if (conf == null) {
conf = new HashMap<>();
return new LocalCluster.Builder()
* Run with a local cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster()) {
* ...
* }
* ```
* @param code what to run
public static void withLocalCluster(TestJob code) {
withLocalCluster(new MkClusterParam(), code);
* Run with a local cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder() {
* ...
* }
* ```
* @param param configs to set in the cluster
* @param code what to run
public static void withLocalCluster(MkClusterParam param, TestJob code) {
try (LocalCluster lc = cluster(param)) {;
} catch (Exception e) {
throw new RuntimeException(e);
* Run with a local cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder() {
* ...
* }
* ```
* @param clusterConf some configs to set in the cluster
public static ILocalCluster getLocalCluster(Map<String, Object> clusterConf) {
Map<String, Object> conf = (Map<String, Object>) clusterConf.get("daemon-conf");
if (conf == null) {
conf = new HashMap<>();
Number supervisors = (Number) clusterConf.getOrDefault("supervisors", 2);
Number ports = (Number) clusterConf.getOrDefault("ports-per-supervisor", 3);
INimbus inimbus = (INimbus) clusterConf.get("inimbus");
Number portMin = (Number) clusterConf.getOrDefault("supervisor-slot-port-min", 1024);
Boolean nimbusDaemon = (Boolean) clusterConf.getOrDefault("nimbus-daemon", false);
try {
return new LocalCluster.Builder()
} catch (Exception e) {
throw new RuntimeException(e);
* Run with a local cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) {
* ...
* }
* ```
* @param code what to run
public static void withSimulatedTimeLocalCluster(TestJob code) {
withSimulatedTimeLocalCluster(new MkClusterParam(), code);
* Run with a local cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime() {
* ...
* }
* ```
* @param param configs to set in the cluster
* @param code what to run
public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code) {
try (LocalCluster lc = cluster(param, true)) {;
} catch (Exception e) {
throw new RuntimeException(e);
* Run with a local cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) {
* ...
* }
* ```
* @param code what to run
public static void withTrackedCluster(TestJob code) {
withTrackedCluster(new MkClusterParam(), code);
* In a tracked topology some metrics are tracked. This provides a way to get those metrics.
* This is intended mostly for internal testing.
* @param id the id of the tracked cluster
* @param key the name of the metric to get.
* @return the metric
public static int globalAmt(String id, String key) {
LOG.warn("Reading tracked metrics for ID {}", id);
return ((ConcurrentHashMap<String, AtomicInteger>) RegisteredGlobalState.getState(id)).get(key).get();
* Run with a local tracked cluster
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withTracked() {
* ...
* }
* ```
* @param param configs to set in the cluster
* @param code what to run
public static void withTrackedCluster(MkClusterParam param, TestJob code) {
try (LocalCluster lc = cluster(param, Utils.uuid(), true)) {;
} catch (Exception e) {
throw new RuntimeException(e);
* Track and capture a topology.
* This is intended mostly for internal testing.
public static CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology) {
CapturedTopology<StormTopology> captured = captureTopology(topology);
return new CapturedTopology<>(new TrackedTopology(captured.topology, cluster), captured.capturer);
* Rewrites a topology so that all the tuples flowing through it are captured
* @param topology the topology to rewrite
* @return the modified topology and a new Bolt that can retrieve the
* captured tuples.
public static CapturedTopology<StormTopology> captureTopology(StormTopology topology) {
topology = topology.deepCopy(); //Don't modify the original
TupleCaptureBolt capturer = new TupleCaptureBolt();
Map<GlobalStreamId, Grouping> captureBoltInputs = new HashMap<>();
for (Map.Entry<String, SpoutSpec> spoutEntry : topology.get_spouts().entrySet()) {
String id = spoutEntry.getKey();
for (Entry<String, StreamInfo> streamEntry : spoutEntry.getValue().get_common().get_streams().entrySet()) {
String stream = streamEntry.getKey();
StreamInfo info = streamEntry.getValue();
if (info.is_direct()) {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
} else {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
for (Entry<String, Bolt> boltEntry : topology.get_bolts().entrySet()) {
String id = boltEntry.getKey();
for (Entry<String, StreamInfo> streamEntry : boltEntry.getValue().get_common().get_streams().entrySet()) {
String stream = streamEntry.getKey();
StreamInfo info = streamEntry.getValue();
if (info.is_direct()) {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
} else {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
topology.put_to_bolts(Utils.uuid(), new Bolt(Thrift.serializeComponentObject(capturer),
Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null)));
return new CapturedTopology<>(topology, capturer);
* Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are
* instances of {@link org.apache.storm.testing.CompletableSpout}
* @param cluster the cluster to submit the topology to
* @param topology the topology itself
* @return a map of the component to the list of tuples it emitted.
* @throws InterruptedException
* @throws TException on any error from nimbus.
public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException,
TException {
return completeTopology(cluster, topology, new CompleteTopologyParam());
* Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are
* instances of {@link org.apache.storm.testing.CompletableSpout} or are overwritten by MockedSources in param
* @param cluster the cluster to submit the topology to
* @param topology the topology itself
* @param param parameters to describe how to complete a topology.
* @return a map of the component to the list of tuples it emitted.
* @throws InterruptedException
* @throws TException on any error from nimbus.
public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology,
CompleteTopologyParam param) throws TException, InterruptedException {
Map<String, List<FixedTuple>> ret = null;
IStormClusterState state = cluster.getClusterState();
CapturedTopology<StormTopology> capTopo = captureTopology(topology);
topology = capTopo.topology;
String topoName = param.getTopologyName();
if (topoName == null) {
topoName = "topologytest-" + Utils.uuid();
Map<String, SpoutSpec> spouts = topology.get_spouts();
MockedSources ms = param.getMockedSources();
if (ms != null) {
for (Entry<String, List<FixedTuple>> mocked : ms.getData().entrySet()) {
FixedTupleSpout newSpout = new FixedTupleSpout(mocked.getValue());
List<Object> spoutObjects = spouts.values().stream().
map((spec) -> Thrift.deserializeComponentObject(spec.get_spout_object())).collect(Collectors.toList());
for (Object o : spoutObjects) {
if (!(o instanceof CompletableSpout)) {
throw new RuntimeException(
"Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " + o);
for (Object spout : spoutObjects) {
((CompletableSpout) spout).startup();
cluster.submitTopology(topoName, param.getStormConf(), topology);
if (Time.isSimulating()) {
String topoId = state.getTopoId(topoName).get();
//Give the topology time to come up without using it to wait for the spouts to complete
Integer timeoutMs = param.getTimeoutMs();
if (timeoutMs == null) {
timeoutMs = TEST_TIMEOUT_MS;
() -> !isEvery(spoutObjects, (o) -> ((CompletableSpout) o).isExhausted()),
() -> {
try {
} catch (Exception e) {
throw new RuntimeException();
KillOptions killOpts = new KillOptions();
cluster.killTopologyWithOpts(topoName, killOpts);
() -> state.assignmentInfo(topoId, null) != null,
() -> {
try {
} catch (Exception e) {
throw new RuntimeException();
if (param.getCleanupState()) {
for (Object o : spoutObjects) {
((CompletableSpout) o).clean();
ret = capTopo.capturer.getAndRemoveResults();
} else {
ret = capTopo.capturer.getAndClearResults();
return ret;
* If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.
public static void simulateWait(ILocalCluster cluster) throws InterruptedException {
if (Time.isSimulating()) {
* Get all of the tuples from a given component on the default stream
* @param results the results of running a completed topology
* @param componentId the id of the component to look at
* @return a list of the tuple values.
public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId) {
return readTuples(results, componentId, Utils.DEFAULT_STREAM_ID);
* Get all of the tuples from a given component on a given stream
* @param results the results of running a completed topology
* @param componentId the id of the component to look at
* @param streamId the id of the stream to look for.
* @return a list of the tuple values.
public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId, String streamId) {
List<List<Object>> ret = new ArrayList<>();
List<FixedTuple> streamResult = results.get(componentId);
if (streamResult != null) {
for (FixedTuple tuple : streamResult) {
if (streamId.equals( {
return ret;
* Create a tracked topology.
* @deprecated use {@link org.apache.storm.testing.TrackedTopology} directly.
public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology) {
return new TrackedTopology(topology, cluster);
* Simulated time wait for a tracked topology. This is intended for internal testing
public static void trackedWait(CapturedTopology<TrackedTopology> topo) {
* Simulated time wait for a tracked topology. This is intended for internal testing
public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt) {
* Simulated time wait for a tracked topology. This is intended for internal testing
public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) {
topo.topology.trackedWait(amt, timeoutMs);
* Simulated time wait for a tracked topology. This is intended for internal testing
public static void trackedWait(TrackedTopology topo) {
* Simulated time wait for a tracked topology. This is intended for internal testing
public static void trackedWait(TrackedTopology topo, Integer amt) {
* Simulated time wait for a tracked topology. This is intended for internal testing
public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs) {
topo.trackedWait(amt, timeoutMs);
* Simulated time wait for a cluster. This is intended for internal testing
public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException {
advanceClusterTime(cluster, secs, 1);
* Simulated time wait for a cluster. This is intended for internal testing
public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException {
cluster.advanceClusterTime(secs, step);
* Count how many times each element appears in the Collection
* @param c a collection of values
* @return a map of the unique values in c to the count of those values.
public static <T> Map<T, Integer> multiset(Collection<T> c) {
Map<T, Integer> ret = new HashMap<T, Integer>();
for (T t : c) {
Integer i = ret.get(t);
if (i == null) {
i = new Integer(0);
i += 1;
ret.put(t, i);
return ret;
private static void printRec(Object o, String prefix) {
if (o instanceof Collection) {"{} {} ({}) [", prefix, o, o.getClass());
for (Object sub : (Collection) o) {
printRec(sub, prefix + " ");
}"{} ]", prefix);
} else if (o instanceof Map) {
Map<?, ?> m = (Map<?, ?>) o;"{} {} ({}) {", prefix, o, o.getClass());
for (Map.Entry<?, ?> entry : m.entrySet()) {
printRec(entry.getKey(), prefix + " ");"{} ->", prefix);
printRec(entry.getValue(), prefix + " ");
}"{} }", prefix);
} else {"{} {} ({})", prefix, o, o.getClass());
* Check if two collections are equivalent ignoring the order of elements.
public static <T> boolean multiseteq(Collection<T> a, Collection<T> b) {
boolean ret = multiset(a).equals(multiset(b));
if (!ret) {
printRec(multiset(a), "MS-A:");
printRec(multiset(b), "MS-B:");
return ret;
* Create a {@link org.apache.storm.tuple.Tuple} for use with testing
* @param values the values to appear in the tuple
public static Tuple testTuple(List<Object> values) {
return testTuple(values, new MkTupleParam());
* Create a {@link org.apache.storm.tuple.Tuple} for use with testing
* @param values the values to appear in the tuple
* @param param parametrs describing more details about the tuple
public static Tuple testTuple(List<Object> values, MkTupleParam param) {
String stream = param.getStream();
if (stream == null) {
stream = Utils.DEFAULT_STREAM_ID;
String component = param.getComponent();
if (component == null) {
component = "component";
int task = 1;
List<String> fields = param.getFields();
if (fields == null) {
fields = new ArrayList<>(values.size());
for (int i = 1; i <= values.size(); i++) {
fields.add("field" + i);
Map<Integer, String> taskToComp = new HashMap<>();
taskToComp.put(task, component);
Map<String, Map<String, Fields>> compToStreamToFields = new HashMap<>();
Map<String, Fields> streamToFields = new HashMap<>();
streamToFields.put(stream, new Fields(fields));
compToStreamToFields.put(component, streamToFields);
TopologyContext context = new TopologyContext(null,
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new AtomicBoolean(false),
return new TupleImpl(context, values, component, 1, stream);
* Simply produces a boolean to see if a specific state is true or false.
public static interface Condition {
public boolean exec();
* A topology that has all messages captured and can be read later on.
* This is intended mostly for internal testing.
* @param <T> the topology (tracked or regular)
public static final class CapturedTopology<T> {
public final T topology;
* a Bolt that will hold all of the captured data.
public final TupleCaptureBolt capturer;
public CapturedTopology(T topology, TupleCaptureBolt capturer) {
this.topology = topology;
this.capturer = capturer;