blob: e8a7478e9482c45e32cc09cd9b5b608d98d220d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.ILocalCluster.ILocalTopology;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.testing.AckFailMapTracker;
import org.apache.storm.testing.FeederSpout;
import org.apache.storm.tuple.Values;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
public class TickTupleTest {
private final static Logger LOG = LoggerFactory.getLogger(TickTupleTest.class);
private static final AtomicInteger tickTupleCount = new AtomicInteger();
private static final AtomicReference<Tuple> nonTickTuple = new AtomicReference<>(null);
private static final AtomicBoolean receivedAnyTuple = new AtomicBoolean();
//This needs to be appropriately large to drown out any time advances performed during topology boot
private static final int TICK_INTERVAL_SECS = 30;
@AfterEach
public void cleanUp() {
tickTupleCount.set(0);
nonTickTuple.set(null);
receivedAnyTuple.set(false);
}
@Test
public void testTickTupleWorksWithSystemBolt() throws Exception {
try (ILocalCluster cluster = new LocalCluster.Builder()
.withSimulatedTime()
.build()) {
TopologyBuilder builder = new TopologyBuilder();
FeederSpout feeder = new FeederSpout(new Fields("field1"));
AckFailMapTracker tracker = new AckFailMapTracker();
feeder.setAckFailDelegate(tracker);
builder.setSpout("Spout", feeder);
builder.setBolt("Bolt", new NoopBolt())
.shuffleGrouping("Spout");
Config topoConf = new Config();
topoConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TICK_INTERVAL_SECS);
try (ILocalTopology topo = cluster.submitTopology("test", topoConf, builder.createTopology())) {
//Use a bootstrap tuple to wait for topology to be running
feeder.feed(new Values("val"), 1);
AssertLoop.assertAcked(tracker, 1);
/*
* Verify that some ticks are received. The interval between ticks is validated by the bolt.
* Too few and the checks will time out. Too many and the bolt may crash (not reliably, but the test should become flaky).
*/
try {
cluster.advanceClusterTime(TICK_INTERVAL_SECS);
waitForTicks(1);
cluster.advanceClusterTime(TICK_INTERVAL_SECS);
waitForTicks(2);
cluster.advanceClusterTime(TICK_INTERVAL_SECS);
waitForTicks(3);
} catch (ConditionTimeoutException e) {
throw new AssertionError(e.getMessage());
}
assertNull("The bolt got a tuple that is not a tick tuple " + nonTickTuple.get(), nonTickTuple.get());
}
}
}
private void waitForTicks(int minTicks) {
try {
Awaitility.with()
.pollInterval(1, TimeUnit.MILLISECONDS)
.atMost(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.untilAsserted(() -> assertThat(tickTupleCount.get(), Matchers.greaterThanOrEqualTo(minTicks)));
} catch (ConditionTimeoutException e) {
throw new AssertionError(e.getMessage());
}
}
private static class NoopBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector outputCollector) {
collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
LOG.info("GOT {} at time {}", tuple, Time.currentTimeMillis());
if (!receivedAnyTuple.get() && Time.currentTimeSecs() > TICK_INTERVAL_SECS) {
throw new RuntimeException("Simulated time was higher than " + TICK_INTERVAL_SECS + " at start of test."
+ " Increase the interval until this no longer occurs, but keep an eye on Storm's timeouts for e.g. worker heartbeat.");
}
receivedAnyTuple.set(true);
if (tickTupleCount.get() > 3) {
throw new RuntimeException("Unexpectedly many tick tuples");
}
if (TupleUtils.isTick(tuple)) {
tickTupleCount.incrementAndGet();
collector.ack(tuple);
} else {
if (tuple.getValues().size() == 1 && "val".equals(tuple.getValue(0))) {
collector.ack(tuple);
} else {
nonTickTuple.set(tuple);
}
}
}
@Override
public void cleanup() { }
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {}
}
}