blob: 45e585c440f3d12517badcd3e82330fa3480b1a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.scheduler.INimbus;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.CompletableSpout;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.FixedTuple;
import org.apache.storm.testing.FixedTupleSpout;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.MkTupleParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.testing.TrackedTopology;
import org.apache.storm.testing.TupleCaptureBolt;
import org.apache.storm.thrift.TException;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A utility that helps with testing topologies, Bolts and Spouts.
*/
public class Testing {
/**
* The default amount of wall time should be spent waiting for
* specific conditions to happen. Default is 10 seconds unless
* the environment variable STORM_TEST_TIMEOUT_MS is set.
*/
public static final int TEST_TIMEOUT_MS;
private static final Logger LOG = LoggerFactory.getLogger(Testing.class);
static {
int timeout = 10_000;
try {
timeout = Integer.parseInt(System.getenv("STORM_TEST_TIMEOUT_MS"));
} catch (Exception e) {
//Ignored, will go with default timeout
}
TEST_TIMEOUT_MS = timeout;
}
/**
* Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
* passed.
* @param condition what we are waiting for
* @param body what to run in the loop
* @throws AssertionError if the loop timed out.
*/
public static void whileTimeout(Condition condition, Runnable body) {
whileTimeout(TEST_TIMEOUT_MS, condition, body);
}
/**
* Continue to execute body repeatedly until condition is true or TEST_TIMEOUT_MS has
* passed.
* @param timeoutMs the number of ms to wait before timing out.
* @param condition what we are waiting for
* @param body what to run in the loop
* @throws AssertionError if the loop timed out.
*/
public static void whileTimeout(long timeoutMs, Condition condition, Runnable body) {
long endTime = System.currentTimeMillis() + timeoutMs;
LOG.debug("Looping until {}", condition);
int count = 0;
while (condition.exec()) {
count++;
if (System.currentTimeMillis() > endTime) {
LOG.info("Condition {} not met in {} ms after calling {} times", condition, timeoutMs, count);
LOG.info(Utils.threadDump());
throw new AssertionError("Test timed out (" + timeoutMs + "ms) " + condition);
}
body.run();
}
LOG.debug("Condition met {}", condition);
}
/**
* Convenience method for data.stream.allMatch(pred).
*/
public static <T> boolean isEvery(Collection<T> data, Predicate<T> pred) {
return data.stream().allMatch(pred);
}
/**
* Run with simulated time.
*
* @deprecated use ```
* try (Time.SimulatedTime time = new Time.SimulatedTime()) {
* ...
* }
* ```
* @param code what to run
*/
@Deprecated
public static void withSimulatedTime(Runnable code) {
try (SimulatedTime st = new SimulatedTime()) {
code.run();
}
}
private static LocalCluster cluster(MkClusterParam param, boolean simulated) throws Exception {
return cluster(param, null, simulated);
}
private static LocalCluster cluster(MkClusterParam param) throws Exception {
return cluster(param, null, false);
}
private static LocalCluster cluster(MkClusterParam param, String id, boolean simulated) throws Exception {
Integer supervisors = param.getSupervisors();
if (supervisors == null) {
supervisors = 2;
}
Integer ports = param.getPortsPerSupervisor();
if (ports == null) {
ports = 3;
}
Map<String, Object> conf = param.getDaemonConf();
if (conf == null) {
conf = new HashMap<>();
}
return new LocalCluster.Builder()
.withSupervisors(supervisors)
.withPortsPerSupervisor(ports)
.withDaemonConf(conf)
.withNimbusDaemon(param.isNimbusDaemon())
.withTracked(id)
.withSimulatedTime(simulated)
.build();
}
/**
* Run with a local cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster()) {
* ...
* }
* ```
* @param code what to run
*/
@Deprecated
public static void withLocalCluster(TestJob code) {
withLocalCluster(new MkClusterParam(), code);
}
/**
* Run with a local cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
* ...
* }
* ```
* @param param configs to set in the cluster
* @param code what to run
*/
@Deprecated
public static void withLocalCluster(MkClusterParam param, TestJob code) {
try (LocalCluster lc = cluster(param)) {
code.run(lc);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Run with a local cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder()....build()) {
* ...
* }
* ```
* @param clusterConf some configs to set in the cluster
*/
@Deprecated
public static ILocalCluster getLocalCluster(Map<String, Object> clusterConf) {
@SuppressWarnings("unchecked")
Map<String, Object> conf = (Map<String, Object>) clusterConf.get("daemon-conf");
if (conf == null) {
conf = new HashMap<>();
}
Number supervisors = (Number) clusterConf.getOrDefault("supervisors", 2);
Number ports = (Number) clusterConf.getOrDefault("ports-per-supervisor", 3);
INimbus inimbus = (INimbus) clusterConf.get("inimbus");
Number portMin = (Number) clusterConf.getOrDefault("supervisor-slot-port-min", 1024);
Boolean nimbusDaemon = (Boolean) clusterConf.getOrDefault("nimbus-daemon", false);
try {
return new LocalCluster.Builder()
.withSupervisors(supervisors.intValue())
.withDaemonConf(conf)
.withPortsPerSupervisor(ports.intValue())
.withINimbus(inimbus)
.withSupervisorSlotPortMin(portMin)
.withNimbusDaemon(nimbusDaemon)
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Run with a local cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().build()) {
* ...
* }
* ```
* @param code what to run
*/
@Deprecated
public static void withSimulatedTimeLocalCluster(TestJob code) {
withSimulatedTimeLocalCluster(new MkClusterParam(), code);
}
/**
* Run with a local cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()....build()) {
* ...
* }
* ```
* @param param configs to set in the cluster
* @param code what to run
*/
@Deprecated
public static void withSimulatedTimeLocalCluster(MkClusterParam param, TestJob code) {
try (LocalCluster lc = cluster(param, true)) {
code.run(lc);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Run with a local cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withTracked().build()) {
* ...
* }
* ```
* @param code what to run
*/
@Deprecated
public static void withTrackedCluster(TestJob code) {
withTrackedCluster(new MkClusterParam(), code);
}
/**
* Run with a local tracked cluster.
*
* @deprecated use ```
* try (LocalCluster cluster = new LocalCluster.Builder().withTracked()....build()) {
* ...
* }
* ```
* @param param configs to set in the cluster
* @param code what to run
*/
@Deprecated
public static void withTrackedCluster(MkClusterParam param, TestJob code) {
try (LocalCluster lc = cluster(param, Utils.uuid(), true)) {
code.run(lc);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* In a tracked topology some metrics are tracked. This provides a way to get those metrics.
* This is intended mostly for internal testing.
*
* @param id the id of the tracked cluster
* @param key the name of the metric to get.
* @return the metric
*/
@SuppressWarnings("unchecked")
@Deprecated
public static int globalAmt(String id, String key) {
LOG.warn("Reading tracked metrics for ID {}", id);
return ((ConcurrentHashMap<String, AtomicInteger>) RegisteredGlobalState.getState(id)).get(key).get();
}
/**
* Track and capture a topology.
* This is intended mostly for internal testing.
*/
public static CapturedTopology<TrackedTopology> trackAndCaptureTopology(ILocalCluster cluster, StormTopology topology) {
CapturedTopology<StormTopology> captured = captureTopology(topology);
return new CapturedTopology<>(new TrackedTopology(captured.topology, cluster), captured.capturer);
}
/**
* Rewrites a topology so that all the tuples flowing through it are captured.
* @param topology the topology to rewrite
* @return the modified topology and a new Bolt that can retrieve the
* captured tuples.
*/
public static CapturedTopology<StormTopology> captureTopology(StormTopology topology) {
topology = topology.deepCopy(); //Don't modify the original
TupleCaptureBolt capturer = new TupleCaptureBolt();
Map<GlobalStreamId, Grouping> captureBoltInputs = new HashMap<>();
for (Map.Entry<String, SpoutSpec> spoutEntry : topology.get_spouts().entrySet()) {
String id = spoutEntry.getKey();
for (Entry<String, StreamInfo> streamEntry : spoutEntry.getValue().get_common().get_streams().entrySet()) {
String stream = streamEntry.getKey();
StreamInfo info = streamEntry.getValue();
if (info.is_direct()) {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
} else {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
}
}
}
for (Entry<String, Bolt> boltEntry : topology.get_bolts().entrySet()) {
String id = boltEntry.getKey();
for (Entry<String, StreamInfo> streamEntry : boltEntry.getValue().get_common().get_streams().entrySet()) {
String stream = streamEntry.getKey();
StreamInfo info = streamEntry.getValue();
if (info.is_direct()) {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareDirectGrouping());
} else {
captureBoltInputs.put(new GlobalStreamId(id, stream), Thrift.prepareGlobalGrouping());
}
}
}
topology.put_to_bolts(Utils.uuid(), new Bolt(Thrift.serializeComponentObject(capturer),
Thrift.prepareComponentCommon(captureBoltInputs, new HashMap<>(), null)));
return new CapturedTopology<>(topology, capturer);
}
/**
* Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are
* instances of {@link org.apache.storm.testing.CompletableSpout}.
* @param cluster the cluster to submit the topology to
* @param topology the topology itself
* @return a map of the component to the list of tuples it emitted
* @throws TException on any error from nimbus
*/
public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology) throws InterruptedException,
TException {
return completeTopology(cluster, topology, new CompleteTopologyParam());
}
/**
* Run a topology to completion capturing all of the messages that are emitted. This only works when all of the spouts are
* instances of {@link org.apache.storm.testing.CompletableSpout} or are overwritten by MockedSources in param
* @param cluster the cluster to submit the topology to
* @param topology the topology itself
* @param param parameters to describe how to complete a topology
* @return a map of the component to the list of tuples it emitted
* @throws TException on any error from nimbus.
*/
public static Map<String, List<FixedTuple>> completeTopology(ILocalCluster cluster, StormTopology topology,
CompleteTopologyParam param) throws TException, InterruptedException {
Map<String, List<FixedTuple>> ret = null;
CapturedTopology<StormTopology> capTopo = captureTopology(topology);
topology = capTopo.topology;
String topoName = param.getTopologyName();
if (topoName == null) {
topoName = "topologytest-" + Utils.uuid();
}
Map<String, SpoutSpec> spouts = topology.get_spouts();
MockedSources ms = param.getMockedSources();
if (ms != null) {
for (Entry<String, List<FixedTuple>> mocked : ms.getData().entrySet()) {
FixedTupleSpout newSpout = new FixedTupleSpout(mocked.getValue());
spouts.get(mocked.getKey()).set_spout_object(Thrift.serializeComponentObject(newSpout));
}
}
List<Object> spoutObjects = spouts.values()
.stream()
.map((spec) -> Thrift.deserializeComponentObject(spec.get_spout_object()))
.collect(Collectors.toList());
for (Object o : spoutObjects) {
if (!(o instanceof CompletableSpout)) {
throw new RuntimeException(
"Cannot complete topology unless every spout is a CompletableSpout (or mocked to be); failed by " + o);
}
}
for (Object spout : spoutObjects) {
((CompletableSpout) spout).startup();
}
cluster.submitTopology(topoName, param.getStormConf(), topology);
if (Time.isSimulating()) {
cluster.advanceClusterTime(11);
}
IStormClusterState state = cluster.getClusterState();
String topoId = state.getTopoId(topoName).get();
//Give the topology time to come up without using it to wait for the spouts to complete
simulateWait(cluster);
Integer timeoutMs = param.getTimeoutMs();
if (timeoutMs == null) {
timeoutMs = TEST_TIMEOUT_MS;
}
whileTimeout(timeoutMs,
() -> !isEvery(spoutObjects, (o) -> ((CompletableSpout) o).isExhausted()),
() -> {
try {
simulateWait(cluster);
} catch (Exception e) {
throw new RuntimeException();
}
});
KillOptions killOpts = new KillOptions();
killOpts.set_wait_secs(0);
cluster.killTopologyWithOpts(topoName, killOpts);
whileTimeout(timeoutMs,
() -> state.assignmentInfo(topoId, null) != null,
() -> {
try {
simulateWait(cluster);
} catch (Exception e) {
throw new RuntimeException();
}
});
if (param.getCleanupState()) {
for (Object o : spoutObjects) {
((CompletableSpout) o).clean();
}
ret = capTopo.capturer.getAndRemoveResults();
} else {
ret = capTopo.capturer.getAndClearResults();
}
return ret;
}
/**
* If using simulated time simulate waiting for 10 seconds. This is intended for internal testing only.
*/
public static void simulateWait(ILocalCluster cluster) throws InterruptedException {
if (Time.isSimulating()) {
cluster.advanceClusterTime(10);
Thread.sleep(100);
}
}
/**
* Get all of the tuples from a given component on the default stream.
* @param results the results of running a completed topology
* @param componentId the id of the component to look at
* @return a list of the tuple values.
*/
public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId) {
return readTuples(results, componentId, Utils.DEFAULT_STREAM_ID);
}
/**
* Get all of the tuples from a given component on a given stream.
* @param results the results of running a completed topology
* @param componentId the id of the component to look at
* @param streamId the id of the stream to look for.
* @return a list of the tuple values.
*/
public static List<List<Object>> readTuples(Map<String, List<FixedTuple>> results, String componentId, String streamId) {
List<List<Object>> ret = new ArrayList<>();
List<FixedTuple> streamResult = results.get(componentId);
if (streamResult != null) {
for (FixedTuple tuple : streamResult) {
if (streamId.equals(tuple.stream)) {
ret.add(tuple.values);
}
}
}
return ret;
}
/**
* Create a tracked topology.
* @deprecated use {@link org.apache.storm.testing.TrackedTopology} directly.
*/
@Deprecated
public static TrackedTopology mkTrackedTopology(ILocalCluster cluster, StormTopology topology) {
return new TrackedTopology(topology, cluster);
}
/**
* Simulated time wait for a tracked topology. This is intended for internal testing.
*/
public static void trackedWait(CapturedTopology<TrackedTopology> topo) {
topo.topology.trackedWait();
}
/**
* Simulated time wait for a tracked topology. This is intended for internal testing.
*/
public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt) {
topo.topology.trackedWait(amt);
}
/**
* Simulated time wait for a tracked topology. This is intended for internal testing.
*/
public static void trackedWait(CapturedTopology<TrackedTopology> topo, Integer amt, Integer timeoutMs) {
topo.topology.trackedWait(amt, timeoutMs);
}
/**
* Simulated time wait for a tracked topology. This is intended for internal testing.
*/
public static void trackedWait(TrackedTopology topo) {
topo.trackedWait();
}
/**
* Simulated time wait for a tracked topology. This is intended for internal testing.
*/
public static void trackedWait(TrackedTopology topo, Integer amt) {
topo.trackedWait(amt);
}
/**
* Simulated time wait for a tracked topology. This is intended for internal testing.
*/
public static void trackedWait(TrackedTopology topo, Integer amt, Integer timeoutMs) {
topo.trackedWait(amt, timeoutMs);
}
/**
* Simulated time wait for a cluster. This is intended for internal testing.
*/
public static void advanceClusterTime(ILocalCluster cluster, Integer secs) throws InterruptedException {
advanceClusterTime(cluster, secs, 1);
}
/**
* Simulated time wait for a cluster. This is intended for internal testing.
*/
public static void advanceClusterTime(ILocalCluster cluster, Integer secs, Integer step) throws InterruptedException {
cluster.advanceClusterTime(secs, step);
}
/**
* Count how many times each element appears in the Collection.
* @param c a collection of values
* @return a map of the unique values in c to the count of those values.
*/
public static <T> Map<T, Integer> multiset(Collection<T> c) {
Map<T, Integer> ret = new HashMap<T, Integer>();
for (T t : c) {
Integer i = ret.get(t);
if (i == null) {
i = new Integer(0);
}
i += 1;
ret.put(t, i);
}
return ret;
}
private static void printRec(Object o, String prefix) {
if (o instanceof Collection) {
LOG.info("{} {} ({}) [", prefix, o, o.getClass());
for (Object sub : (Collection) o) {
printRec(sub, prefix + " ");
}
LOG.info("{} ]", prefix);
} else if (o instanceof Map) {
Map<?, ?> m = (Map<?, ?>) o;
LOG.info("{} {} ({}) {", prefix, o, o.getClass());
for (Map.Entry<?, ?> entry : m.entrySet()) {
printRec(entry.getKey(), prefix + " ");
LOG.info("{} ->", prefix);
printRec(entry.getValue(), prefix + " ");
}
LOG.info("{} }", prefix);
} else {
LOG.info("{} {} ({})", prefix, o, o.getClass());
}
}
/**
* Check if two collections are equivalent ignoring the order of elements.
*/
public static <T> boolean multiseteq(Collection<T> a, Collection<T> b) {
boolean ret = multiset(a).equals(multiset(b));
if (!ret) {
printRec(multiset(a), "MS-A:");
printRec(multiset(b), "MS-B:");
}
return ret;
}
/**
* Create a {@link org.apache.storm.tuple.Tuple} for use with testing.
* @param values the values to appear in the tuple
*/
public static Tuple testTuple(List<Object> values) {
return testTuple(values, new MkTupleParam());
}
/**
* Create a {@link org.apache.storm.tuple.Tuple} for use with testing.
* @param values the values to appear in the tuple
* @param param parametrs describing more details about the tuple
*/
public static Tuple testTuple(List<Object> values, MkTupleParam param) {
String stream = param.getStream();
if (stream == null) {
stream = Utils.DEFAULT_STREAM_ID;
}
String component = param.getComponent();
if (component == null) {
component = "component";
}
int task = 1;
List<String> fields = param.getFields();
if (fields == null) {
fields = new ArrayList<>(values.size());
for (int i = 1; i <= values.size(); i++) {
fields.add("field" + i);
}
}
Map<Integer, String> taskToComp = new HashMap<>();
taskToComp.put(task, component);
Map<String, Map<String, Fields>> compToStreamToFields = new HashMap<>();
Map<String, Fields> streamToFields = new HashMap<>();
streamToFields.put(stream, new Fields(fields));
compToStreamToFields.put(component, streamToFields);
TopologyContext context = new TopologyContext(null,
ConfigUtils.readStormConfig(),
taskToComp,
null,
compToStreamToFields,
null,
"test-storm-id",
null,
null,
1,
null,
null,
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new AtomicBoolean(false),
null);
return new TupleImpl(context, values, component, 1, stream);
}
/**
* Simply produces a boolean to see if a specific state is true or false.
*/
public interface Condition {
boolean exec();
}
/**
* A topology that has all messages captured and can be read later on.
* This is intended mostly for internal testing.
* @param <T> the topology (tracked or regular)
*/
public static final class CapturedTopology<T> {
public final T topology;
/**
* a Bolt that will hold all of the captured data.
*/
public final TupleCaptureBolt capturer;
public CapturedTopology(T topology, TupleCaptureBolt capturer) {
this.topology = topology;
this.capturer = capturer;
}
}
}