blob: 3b71af94788f48fa5a3415747350be50cd829e5a [file] [log] [blame]
/*
* Copyright 2018 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.integration;
import static org.apache.storm.AssertLoop.assertAcked;
import static org.apache.storm.AssertLoop.assertFailed;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.Thrift;
import org.apache.storm.Thrift.BoltDetails;
import org.apache.storm.Thrift.SpoutDetails;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.hooks.BaseTaskHook;
import org.apache.storm.hooks.info.BoltAckInfo;
import org.apache.storm.hooks.info.BoltExecuteInfo;
import org.apache.storm.hooks.info.BoltFailInfo;
import org.apache.storm.hooks.info.EmitInfo;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.AckFailMapTracker;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.testing.FixedTuple;
import org.apache.storm.testing.IntegrationTest;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestAggregatesCounter;
import org.apache.storm.testing.TestConfBolt;
import org.apache.storm.testing.TestGlobalCount;
import org.apache.storm.testing.TestPlannerSpout;
import org.apache.storm.testing.TestWordCounter;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.testing.TrackedTopology;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@IntegrationTest
public class TopologyIntegrationTest {
@ParameterizedTest
@ValueSource(strings = {"true", "false"})
public void testBasicTopology(boolean useLocalMessaging) throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withSupervisors(4)
.withDaemonConf(Collections.singletonMap(Config.STORM_LOCAL_MODE_ZMQ, !useLocalMessaging))
.build()) {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
boltMap.put("3",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareGlobalGrouping()),
new TestGlobalCount()));
boltMap.put("4",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("2", null),
Thrift.prepareGlobalGrouping()),
new TestAggregatesCounter()));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_WORKERS, 2);
stormConf.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
.map(value -> new FixedTuple(new Values(value)))
.collect(Collectors.toList());
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
completeTopologyParams.setStormConf(stormConf);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
assertThat(Testing.readTuples(results, "1"), containsInAnyOrder(
new Values("nathan"),
new Values("nathan"),
new Values("bob"),
new Values("joey")));
assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
new Values("nathan", 1),
new Values("nathan", 2),
new Values("bob", 1),
new Values("joey", 1)
));
assertThat(Testing.readTuples(results, "3"), contains(
new Values(1),
new Values(2),
new Values(3),
new Values(4)
));
assertThat(Testing.readTuples(results, "4"), contains(
new Values(1),
new Values(2),
new Values(3),
new Values(4)
));
}
}
private static class EmitTaskIdBolt extends BaseRichBolt {
private int taskIndex;
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tid"));
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.taskIndex = context.getThisTaskIndex();
}
@Override
public void execute(Tuple input) {
collector.emit(input, new Values(taskIndex));
collector.ack(input);
}
}
@Test
public void testMultiTasksPerCluster() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withSupervisors(4)
.build()) {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true)));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareAllGrouping()),
new EmitTaskIdBolt(), 3, Collections.singletonMap(Config.TOPOLOGY_TASKS, 6)));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", Collections.singletonList(new FixedTuple(new Values("a")))));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
new Values(0),
new Values(1),
new Values(2),
new Values(3),
new Values(4),
new Values(5)
));
}
}
@Test
public void testTimeout() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withSupervisors(4)
.withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
.build()) {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareGlobalGrouping()),
new AckEveryOtherBolt()));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
cluster.submitTopology("timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
cluster.advanceClusterTime(11);
feeder.feed(new Values("a"), 1);
feeder.feed(new Values("b"), 2);
feeder.feed(new Values("c"), 3);
cluster.advanceClusterTime(9);
assertAcked(tracker, 1, 3);
assertThat(tracker.isFailed(2), is(false));
cluster.advanceClusterTime(12);
assertFailed(tracker, 2);
}
}
private static class ResetTimeoutBolt extends BaseRichBolt {
private int tupleCounter = 1;
private Tuple firstTuple = null;
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
if (tupleCounter == 1) {
firstTuple = input;
} else if (tupleCounter == 2) {
collector.resetTimeout(firstTuple);
} else if (tupleCounter == 5) {
collector.ack(firstTuple);
collector.ack(input);
} else {
collector.resetTimeout(firstTuple);
collector.ack(input);
}
tupleCounter++;
}
}
@Test
public void testResetTimeout() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
.build()) {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(feeder));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareGlobalGrouping()),
new ResetTimeoutBolt()));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
cluster.submitTopology("reset-timeout-tester", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology);
//The first tuple wil be used to check timeout reset
feeder.feed(new Values("a"), 1);
//The second tuple is used to wait for the spout to rotate its pending map
feeder.feed(new Values("b"), 2);
cluster.advanceClusterTime(9);
//The other tuples are used to reset the first tuple's timeout,
//and to wait for the message to get through to the spout (acks use the same path as timeout resets)
feeder.feed(new Values("c"), 3);
assertAcked(tracker, 3);
cluster.advanceClusterTime(9);
feeder.feed(new Values("d"), 4);
assertAcked(tracker, 4);
cluster.advanceClusterTime(2);
//The time is now twice the message timeout, the second tuple should expire since it was not acked
//Waiting for this also ensures that the first tuple gets failed if reset-timeout doesn't work
assertFailed(tracker, 2);
//Put in a tuple to cause the first tuple to be acked
feeder.feed(new Values("e"), 5);
assertAcked(tracker, 5);
//The first tuple should be acked, and should not have failed
assertThat(tracker.isFailed(1), is(false));
assertAcked(tracker, 1);
}
}
private StormTopology mkValidateTopology() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
private StormTopology mkInvalidateTopology1() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("3", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
private StormTopology mkInvalidateTopology2() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", null),
Thrift.prepareFieldsGrouping(Collections.singletonList("non-exists-field"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
private StormTopology mkInvalidateTopology3() {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(
Utils.getGlobalStreamId("1", "non-exists-stream"),
Thrift.prepareFieldsGrouping(Collections.singletonList("word"))),
new TestWordCounter(), 4));
return Thrift.buildTopology(spoutMap, boltMap);
}
private boolean tryCompleteWordCountTopology(LocalCluster cluster, StormTopology topology) throws Exception {
try {
List<FixedTuple> testTuples = Arrays.asList("nathan", "bob", "joey", "nathan").stream()
.map(value -> new FixedTuple(new Values(value)))
.collect(Collectors.toList());
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
completeTopologyParam.setMockedSources(mockedSources);
completeTopologyParam.setStormConf(Collections.singletonMap(Config.TOPOLOGY_WORKERS, 2));
Testing.completeTopology(cluster, topology, completeTopologyParam);
return false;
} catch (InvalidTopologyException e) {
return true;
}
}
@Test
public void testValidateTopologystructure() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
.build()) {
assertThat(tryCompleteWordCountTopology(cluster, mkValidateTopology()), is(false));
assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology1()), is(true));
assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology2()), is(true));
assertThat(tryCompleteWordCountTopology(cluster, mkInvalidateTopology3()), is(true));
}
}
@Test
public void testSystemStream() throws Exception {
//this test works because mocking a spout splits up the tuples evenly among the tasks
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.build()) {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestWordSpout(true), 3));
Map<GlobalStreamId, Grouping> boltInputs = new HashMap<>();
boltInputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareFieldsGrouping(Collections.singletonList("word")));
boltInputs.put(Utils.getGlobalStreamId("1", "__system"), Thrift.prepareGlobalGrouping());
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
boltInputs,
new IdentityBolt(), 1));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_WORKERS, 2);
List<FixedTuple> testTuples = Arrays.asList("a", "b", "c").stream()
.map(value -> new FixedTuple(new Values(value)))
.collect(Collectors.toList());
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
completeTopologyParams.setStormConf(stormConf);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
assertThat(Testing.readTuples(results, "2"), containsInAnyOrder(
new Values("a"),
new Values("b"),
new Values("c")
));
}
}
private static class SpoutAndChecker {
private final FeederSpout spout;
private final Consumer<Integer> checker;
public SpoutAndChecker(FeederSpout spout, Consumer<Integer> checker) {
this.spout = spout;
this.checker = checker;
}
}
private static class BranchingBolt extends BaseRichBolt {
private final int branches;
private OutputCollector collector;
public BranchingBolt(int branches) {
this.branches = branches;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
IntStream.range(0, branches)
.forEach(i -> collector.emit(input, new Values(i)));
collector.ack(input);
}
}
private static class AckBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
collector.ack(input);
}
}
@Test
public void testAcking() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withTracked()
.build()) {
AckTrackingFeeder feeder1 = new AckTrackingFeeder("num");
AckTrackingFeeder feeder2 = new AckTrackingFeeder("num");
AckTrackingFeeder feeder3 = new AckTrackingFeeder("num");
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder1.getSpout()));
spoutMap.put("2", Thrift.prepareSpoutDetails(feeder2.getSpout()));
spoutMap.put("3", Thrift.prepareSpoutDetails(feeder3.getSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("4", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
boltMap.put("5", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(4)));
boltMap.put("6", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(1)));
Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
aggregatorInputs.put(Utils.getGlobalStreamId("4", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("5", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("6", null), Thrift.prepareShuffleGrouping());
boltMap.put("7", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(3)));
boltMap.put("8", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("7", null), Thrift.prepareShuffleGrouping()), new BranchingBolt(2)));
boltMap.put("9", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("8", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
cluster.submitTopology("acking-test1", Collections.emptyMap(), tracked);
cluster.advanceClusterTime(11);
feeder1.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder1.assertNumAcks(0);
feeder2.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder1.assertNumAcks(1);
feeder2.assertNumAcks(1);
feeder1.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder1.assertNumAcks(0);
feeder1.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder1.assertNumAcks(1);
feeder3.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder1.assertNumAcks(0);
feeder3.assertNumAcks(0);
feeder2.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder1.feed(new Values(1));
feeder2.feed(new Values(1));
feeder3.feed(new Values(1));
}
}
@Test
public void testAckBranching() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withTracked()
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new IdentityBolt()));
Map<GlobalStreamId, Grouping> aggregatorInputs = new HashMap<>();
aggregatorInputs.put(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping());
aggregatorInputs.put(Utils.getGlobalStreamId("3", null), Thrift.prepareShuffleGrouping());
boltMap.put("4", Thrift.prepareBoltDetails(aggregatorInputs, new AggBolt(4)));
TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
cluster.submitTopology("test-acking2", Collections.emptyMap(), tracked);
cluster.advanceClusterTime(11);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(0);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(2);
}
}
private static class DupAnchorBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
ArrayList<Tuple> anchors = new ArrayList<>();
anchors.add(input);
anchors.add(input);
collector.emit(anchors, new Values(1));
collector.ack(input);
}
}
private static boolean boltPrepared = false;
private static class PrepareTrackedBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
boltPrepared = true;
collector.ack(input);
}
}
private static boolean spoutOpened = false;
private static class OpenTrackedSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("val"));
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void nextTuple() {
spoutOpened = true;
}
}
@Test
public void testSubmitInactiveTopology() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withDaemonConf(Collections.singletonMap(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS, true))
.build()) {
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder));
spoutMap.put("2", Thrift.prepareSpoutDetails(new OpenTrackedSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareGlobalGrouping()), new PrepareTrackedBolt()));
boltPrepared = false;
spoutOpened = false;
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
cluster.submitTopologyWithOpts("test", Collections.singletonMap(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10), topology, new SubmitOptions(TopologyInitialStatus.INACTIVE));
cluster.advanceClusterTime(11);
feeder.feed(new Values("a"), 1);
cluster.advanceClusterTime(9);
assertThat(boltPrepared, is(false));
assertThat(spoutOpened, is(false));
cluster.getNimbus().activate("test");
cluster.advanceClusterTime(12);
assertAcked(tracker, 1);
assertThat(boltPrepared, is(true));
assertThat(spoutOpened, is(true));
}
}
@Test
public void testAckingSelfAnchor() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withTracked()
.build()) {
AckTrackingFeeder feeder = new AckTrackingFeeder("num");
Map<String, SpoutDetails> spoutMap = new HashMap<>();
spoutMap.put("1", Thrift.prepareSpoutDetails(feeder.getSpout()));
Map<String, BoltDetails> boltMap = new HashMap<>();
boltMap.put("2", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()), new DupAnchorBolt()));
boltMap.put("3", Thrift.prepareBoltDetails(Collections.singletonMap(Utils.getGlobalStreamId("2", null), Thrift.prepareShuffleGrouping()), new AckBolt()));
TrackedTopology tracked = new TrackedTopology(Thrift.buildTopology(spoutMap, boltMap), cluster);;
cluster.submitTopology("test", Collections.emptyMap(), tracked);
cluster.advanceClusterTime(11);
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 1);
feeder.assertNumAcks(1);
feeder.feed(new Values(1));
feeder.feed(new Values(1));
feeder.feed(new Values(1));
Testing.trackedWait(tracked, 3);
feeder.assertNumAcks(3);
}
}
private Map<Object, Object> listToMap(List<Object> list) {
assertThat(list.size() % 2, is(0));
Map<Object, Object> res = new HashMap<>();
for (int i = 0; i < list.size(); i += 2) {
res.put(list.get(i), list.get(i + 1));
}
return res;
}
@Test
public void testKryoDecoratorsConfig() throws Exception {
Map<String, Object> daemonConf = new HashMap<>();
daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
daemonConf.put(Config.TOPOLOGY_KRYO_DECORATORS, "this-is-overridden");
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withDaemonConf(daemonConf)
.build()) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
topologyBuilder.setBolt("2", new TestConfBolt(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "two"))))
.shuffleGrouping("1");
List<FixedTuple> testTuples = Arrays.asList(new Values(Config.TOPOLOGY_KRYO_DECORATORS)).stream()
.map(value -> new FixedTuple(value))
.collect(Collectors.toList());
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
completeTopologyParams.setStormConf(Collections.singletonMap(Config.TOPOLOGY_KRYO_DECORATORS, Arrays.asList("one", "three")));
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
List<Object> concatValues = Testing.readTuples(results, "2").stream()
.flatMap(values -> values.stream())
.collect(Collectors.toList());
assertThat(concatValues.get(0), is(Config.TOPOLOGY_KRYO_DECORATORS));
assertThat(concatValues.get(1), is(Arrays.asList("one", "two", "three")));
}
}
@Test
public void testComponentSpecificConfig() throws Exception {
Map<String, Object> daemonConf = new HashMap<>();
daemonConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, true);
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.withDaemonConf(daemonConf)
.build()) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("1", new TestPlannerSpout(new Fields("conf")));
Map<String, Object> componentConf = new HashMap<>();
componentConf.put("fake.config", 123);
componentConf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 20);
componentConf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30);
componentConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(Collections.singletonMap("fake.type", "bad.serializer"), Collections.singletonMap("fake.type2", "a.serializer")));
topologyBuilder.setBolt("2", new TestConfBolt(componentConf))
.shuffleGrouping("1")
.setMaxTaskParallelism(2)
.addConfiguration("fake.config2", 987);
List<FixedTuple> testTuples = Arrays.asList("fake.config", Config.TOPOLOGY_MAX_TASK_PARALLELISM, Config.TOPOLOGY_MAX_SPOUT_PENDING, "fake.config2", Config.TOPOLOGY_KRYO_REGISTER).stream()
.map(value -> new FixedTuple(new Values(value)))
.collect(Collectors.toList());
Map<String, String> kryoRegister = new HashMap<>();
kryoRegister.put("fake.type", "good.serializer");
kryoRegister.put("fake.type3", "a.serializer3");
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_KRYO_REGISTER, Arrays.asList(kryoRegister));
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
completeTopologyParams.setStormConf(stormConf);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topologyBuilder.createTopology(), completeTopologyParams);
Map<String, Object> expectedValues = new HashMap<>();
expectedValues.put("fake.config", 123L);
expectedValues.put("fake.config2", 987L);
expectedValues.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 2L);
expectedValues.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30L);
Map<String, String> expectedKryoRegister = new HashMap<>();
expectedKryoRegister.putAll(kryoRegister);
expectedKryoRegister.put("fake.type2", "a.serializer");
expectedValues.put(Config.TOPOLOGY_KRYO_REGISTER, expectedKryoRegister);
List<Object> concatValues = Testing.readTuples(results, "2").stream()
.flatMap(values -> values.stream())
.collect(Collectors.toList());
assertThat(listToMap(concatValues), is(expectedValues));
}
}
private static class HooksBolt extends BaseRichBolt {
private int acked = 0;
private int failed = 0;
private int executed = 0;
private int emitted = 0;
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("emit", "ack", "fail", "executed"));
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
context.addTaskHook(new BaseTaskHook() {
@Override
public void boltExecute(BoltExecuteInfo info) {
executed++;
}
@Override
public void boltFail(BoltFailInfo info) {
failed++;
}
@Override
public void boltAck(BoltAckInfo info) {
acked++;
}
@Override
public void emit(EmitInfo info) {
emitted++;
}
});
}
@Override
public void execute(Tuple input) {
collector.emit(new Values(emitted, acked, failed, executed));
if (acked - failed == 0) {
collector.ack(input);
} else {
collector.fail(input);
}
}
}
@Test
public void testHooks() throws Exception {
try (LocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.build()) {
Map<String, SpoutDetails> spoutMap = Collections.singletonMap("1", Thrift.prepareSpoutDetails(new TestPlannerSpout(new Fields("conf"))));
Map<String, BoltDetails> boltMap = Collections.singletonMap("2",
Thrift.prepareBoltDetails(
Collections.singletonMap(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()),
new HooksBolt()));
StormTopology topology = Thrift.buildTopology(spoutMap, boltMap);
List<FixedTuple> testTuples = Arrays.asList(1, 1, 1, 1).stream()
.map(value -> new FixedTuple(new Values(value)))
.collect(Collectors.toList());
MockedSources mockedSources = new MockedSources(Collections.singletonMap("1", testTuples));
CompleteTopologyParam completeTopologyParams = new CompleteTopologyParam();
completeTopologyParams.setMockedSources(mockedSources);
Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, completeTopologyParams);
List<List<Object>> expectedTuples = Arrays.asList(
Arrays.asList(0, 0, 0, 0),
Arrays.asList(2, 1, 0, 1),
Arrays.asList(4, 1, 1, 2),
Arrays.asList(6, 2, 1, 3));
assertThat(Testing.readTuples(results, "2"), is(expectedTuples));
}
}
}