blob: 52a21d3bbcd3d01c72a472870aa5dc681af72b8f [file] [log] [blame]
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.api.bolt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.twitter.heron.api.Config;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.topology.TopologyBuilder;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.windowing.TupleWindow;
import com.twitter.heron.api.windowing.WindowingConfigs;
import com.twitter.heron.common.utils.topology.TopologyContextImpl;
import com.twitter.heron.common.utils.tuple.TupleImpl;
import static org.junit.Assert.*;
/**
* Unit tests for {@link WindowedBoltExecutor}
*/
public class WindowedBoltExecutorTest {
private WindowedBoltExecutor executor;
private TestWindowedBolt testWindowedBolt;
@SuppressWarnings("VisibilityModifier")
private static class TestWindowedBolt extends BaseWindowedBolt {
private static final long serialVersionUID = -8934326157586387333L;
List<TupleWindow> tupleWindows = new ArrayList<>();
@Override
public void execute(TupleWindow input) {
//System.out.println(input);
tupleWindows.add(input);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private TopologyContext getContext(final Fields fields) {
TopologyBuilder builder = new TopologyBuilder();
return new TopologyContextImpl(new Config(),
builder.createTopology()
.setConfig(new Config())
.setName("test")
.setState(TopologyAPI.TopologyState.RUNNING)
.getTopology(),
new HashMap(), 1, null) {
@Override
public Fields getComponentOutputFields(String componentId, String streamId) {
return fields;
}
};
}
private Tuple getTuple(String streamId, final Fields fields, Values values) {
TopologyContext topologyContext = getContext(fields);
return new TupleImpl(topologyContext, TopologyAPI.StreamId.newBuilder()
.setId(streamId).setComponentName("s1")
.build(), 0, null, values, 1) {
@Override
public TopologyAPI.StreamId getSourceGlobalStreamId() {
return TopologyAPI.StreamId.newBuilder().setComponentName("s1").setId("default").build();
}
};
}
private OutputCollector getOutputCollector() {
return Mockito.mock(OutputCollector.class);
}
private TopologyContext getTopologyContext() {
TopologyContext context = Mockito.mock(TopologyContext.class);
Map<TopologyAPI.StreamId, TopologyAPI.Grouping> sources =
Collections.singletonMap(TopologyAPI.StreamId.newBuilder()
.setComponentName("s1").setId("default").build(), null);
Mockito.when(context.getThisSources()).thenReturn(sources);
return context;
}
@Before
public void setUp() {
testWindowedBolt = new TestWindowedBolt();
testWindowedBolt.withTimestampField("ts");
executor = new WindowedBoltExecutor(testWindowedBolt);
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
// trigger manually to avoid timing issues
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100000L);
executor.prepare(conf, getTopologyContext(), getOutputCollector());
}
@Test(expected = IllegalArgumentException.class)
public void testExecuteWithoutTs() throws Exception {
executor.execute(getTuple("s1", new Fields("a"), new Values(1)));
}
@Test
public void testExecuteWithTs() throws Exception {
long[] timestamps = {603, 605, 607, 618, 626, 636};
for (long ts : timestamps) {
executor.execute(getTuple("s1", new Fields("ts"), new Values(ts)));
}
//Thread.sleep(120);
executor.waterMarkEventGenerator.run();
//System.out.println(testWindowedBolt.tupleWindows);
assertEquals(3, testWindowedBolt.tupleWindows.size());
TupleWindow first = testWindowedBolt.tupleWindows.get(0);
assertArrayEquals(
new long[]{603, 605, 607},
new long[]{(long) first.get().get(0).getValue(0),
(long) first.get().get(1).getValue(0),
(long) first.get().get(2).getValue(0)});
TupleWindow second = testWindowedBolt.tupleWindows.get(1);
assertArrayEquals(
new long[]{603, 605, 607, 618},
new long[]{(long) second.get().get(0).getValue(0),
(long) second.get().get(1).getValue(0),
(long) second.get().get(2).getValue(0),
(long) second.get().get(3).getValue(0)});
TupleWindow third = testWindowedBolt.tupleWindows.get(2);
assertArrayEquals(new long[]{618, 626}, new long[]{(long) third.get().get(0).getValue(0),
(long) third.get().get(1).getValue(0)});
}
@Test
public void testPrepareLateTupleStreamWithoutTs() throws Exception {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10L);
testWindowedBolt = new TestWindowedBolt();
executor = new WindowedBoltExecutor(testWindowedBolt);
TopologyContext context = getTopologyContext();
// emulate the call of withLateTupleStream method
Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default",
"$late")));
try {
executor.prepare(conf, context, getOutputCollector());
fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Late tuple stream can be defined only when specifying a "
+ "timestamp field");
}
}
@Test
public void testPrepareLateTupleStreamWithoutBuilder() throws Exception {
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10L);
testWindowedBolt = new TestWindowedBolt();
testWindowedBolt.withTimestampField("ts");
executor = new WindowedBoltExecutor(testWindowedBolt);
TopologyContext context = getTopologyContext();
try {
executor.prepare(conf, context, getOutputCollector());
fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Stream for late tuples must be defined with the builder "
+ "method withLateTupleStream");
}
}
@Test
public void testExecuteWithLateTupleStream() throws Exception {
testWindowedBolt = new TestWindowedBolt();
testWindowedBolt.withTimestampField("ts");
executor = new WindowedBoltExecutor(testWindowedBolt);
TopologyContext context = getTopologyContext();
Mockito.when(context.getThisStreams()).thenReturn(new HashSet<>(Arrays.asList("default",
"$late")));
OutputCollector outputCollector = Mockito.mock(OutputCollector.class);
Map<String, Object> conf = new HashMap<>();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10L);
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late");
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5L);
//Trigger manually to avoid timing issues
conf.put(WindowingConfigs.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000L);
executor.prepare(conf, context, outputCollector);
long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
List<Tuple> tuples = new ArrayList<>(timestamps.length);
for (long ts : timestamps) {
Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts));
tuples.add(tuple);
executor.execute(tuple);
//Update the watermark to this timestamp
executor.waterMarkEventGenerator.run();
}
System.out.println(testWindowedBolt.tupleWindows);
Tuple tuple = tuples.get(tuples.size() - 1);
Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
}
}