blob: c713349386fcc6c3b2ce6cfd517a28366184cd59 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.topology;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.storm.Config;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.streams.Pair;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.Event;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TupleWindow;
import org.apache.storm.windowing.WaterMarkEvent;
import org.apache.storm.windowing.WaterMarkEventGenerator;
import org.apache.storm.windowing.persistence.WindowState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertThat;
import static org.mockito.AdditionalAnswers.returnsArgAt;
/**
* Unit tests for {@link PersistentWindowedBoltExecutor}
*/
@RunWith(MockitoJUnitRunner.class)
public class PersistentWindowedBoltExecutorTest {
private static final String LATE_STREAM = "late_stream";
private static final String PARTITION_KEY = "pk";
private static final String EVICTION_STATE_KEY = "es";
private static final String TRIGGER_STATE_KEY = "ts";
private static final int WINDOW_EVENT_COUNT = 5;
private long tupleTs;
private PersistentWindowedBoltExecutor<KeyValueState<String, String>> executor;
private IStatefulWindowedBolt<KeyValueState<String, String>> mockBolt;
private Map<String, Object> testStormConf = new HashMap<>();
private OutputCollector mockOutputCollector;
private TopologyContext mockTopologyContext;
private TimestampExtractor mockTimestampExtractor;
private WaterMarkEventGenerator mockWaterMarkEventGenerator;
@Mock
private KeyValueState<String, Deque<Long>> mockPartitionState;
@Mock
private KeyValueState<Long, WindowState.WindowPartition<Tuple>> mockWindowState;
@Mock
private KeyValueState<String, Optional<?>> mockSystemState;
@Captor
private ArgumentCaptor<Tuple> tupleCaptor;
@Captor
private ArgumentCaptor<Collection<Tuple>> anchorCaptor;
@Captor
private ArgumentCaptor<Long> longCaptor;
@Captor
private ArgumentCaptor<Values> valuesCaptor;
@Captor
private ArgumentCaptor<TupleWindow> tupleWindowCaptor;
@Captor
private ArgumentCaptor<Deque<Long>> partitionValuesCaptor;
@Captor
private ArgumentCaptor<WindowState.WindowPartition<Tuple>> windowValuesCaptor;
@Captor
private ArgumentCaptor<Optional<?>> systemValuesCaptor;
@Before
public void setUp() throws Exception {
mockBolt = Mockito.mock(IStatefulWindowedBolt.class);
mockWaterMarkEventGenerator = Mockito.mock(WaterMarkEventGenerator.class);
mockTimestampExtractor = Mockito.mock(TimestampExtractor.class);
tupleTs = System.currentTimeMillis();
Mockito.when(mockTimestampExtractor.extractTimestamp(Mockito.any())).thenReturn(tupleTs);
Mockito.when(mockBolt.getTimestampExtractor()).thenReturn(mockTimestampExtractor);
mockTopologyContext = Mockito.mock(TopologyContext.class);
Mockito.when(mockTopologyContext.getThisStreams()).thenReturn(Collections.singleton(LATE_STREAM));
mockOutputCollector = Mockito.mock(OutputCollector.class);
executor = new PersistentWindowedBoltExecutor<>(mockBolt);
testStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, WINDOW_EVENT_COUNT);
testStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, WINDOW_EVENT_COUNT);
testStormConf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, LATE_STREAM);
testStormConf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 100_000);
testStormConf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30);
testStormConf.put(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL, 1000);
Mockito.when(mockPartitionState.get(Mockito.any(), Mockito.any())).then(returnsArgAt(1));
Mockito.when(mockWindowState.get(Mockito.any(), Mockito.any())).then(returnsArgAt(1));
Mockito.when(mockSystemState.iterator()).thenReturn(
ImmutableMap.<String, Optional<?>>of("es", Optional.empty(), "ts", Optional.empty()).entrySet().iterator());
executor.prepare(testStormConf, mockTopologyContext, mockOutputCollector,
mockWindowState, mockPartitionState, mockSystemState);
}
@Test
public void testExecuteTuple() throws Exception {
Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(true);
Tuple mockTuple = Mockito.mock(Tuple.class);
executor.initState(null);
executor.waterMarkEventGenerator = mockWaterMarkEventGenerator;
executor.execute(mockTuple);
// should be ack-ed once
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);
}
@Test
public void testExecuteLatetuple() throws Exception {
Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(false);
Tuple mockTuple = Mockito.mock(Tuple.class);
executor.initState(null);
executor.waterMarkEventGenerator = mockWaterMarkEventGenerator;
executor.execute(mockTuple);
// ack-ed once
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);
// late tuple emitted
ArgumentCaptor<String> stringCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockOutputCollector, Mockito.times(1))
.emit(stringCaptor.capture(), anchorCaptor.capture(), valuesCaptor.capture());
Assert.assertEquals(LATE_STREAM, stringCaptor.getValue());
Assert.assertEquals(Collections.singletonList(mockTuple), anchorCaptor.getValue());
Assert.assertEquals(new Values(mockTuple), valuesCaptor.getValue());
}
@Test
public void testActivation() throws Exception {
Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(true);
executor.initState(null);
executor.waterMarkEventGenerator = mockWaterMarkEventGenerator;
List<Tuple> mockTuples = getMockTuples(WINDOW_EVENT_COUNT);
mockTuples.forEach(t -> executor.execute(t));
// all tuples acked
Mockito.verify(mockOutputCollector, Mockito.times(WINDOW_EVENT_COUNT)).ack(tupleCaptor.capture());
Assert.assertArrayEquals(mockTuples.toArray(), tupleCaptor.getAllValues().toArray());
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
TupleWindow window = (TupleWindow) invocation.getArguments()[0];
// iterate the tuples
Assert.assertEquals(WINDOW_EVENT_COUNT, window.get().size());
// iterating multiple times should produce same events
Assert.assertEquals(WINDOW_EVENT_COUNT, window.get().size());
Assert.assertEquals(WINDOW_EVENT_COUNT, window.get().size());
return null;
}
}).when(mockBolt).execute(Mockito.any());
// trigger the window
long activationTs = tupleTs + 1000;
executor.getWindowManager().add(new WaterMarkEvent<>(activationTs));
executor.prePrepare(0);
// partition ids
ArgumentCaptor<String> pkCatptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockPartitionState, Mockito.times(1)).put(pkCatptor.capture(), partitionValuesCaptor.capture());
Assert.assertEquals(PARTITION_KEY, pkCatptor.getValue());
List<Long> expectedPartitionIds = Collections.singletonList(0L);
assertThat(partitionValuesCaptor.getValue(), contains(expectedPartitionIds.toArray(new Long[0])));
// window partitions
Mockito.verify(mockWindowState, Mockito.times(1)).put(longCaptor.capture(), windowValuesCaptor.capture());
Assert.assertEquals((long) expectedPartitionIds.get(0), (long) longCaptor.getValue());
Assert.assertEquals(WINDOW_EVENT_COUNT, windowValuesCaptor.getValue().size());
List<Tuple> tuples = windowValuesCaptor.getValue()
.getEvents().stream().map(Event::get).collect(Collectors.toList());
Assert.assertArrayEquals(mockTuples.toArray(), tuples.toArray());
// window system state
ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockSystemState, Mockito.times(2)).put(keyCaptor.capture(), systemValuesCaptor.capture());
Assert.assertEquals(EVICTION_STATE_KEY, keyCaptor.getAllValues().get(0));
Assert.assertEquals(Optional.of(Pair.of((long) WINDOW_EVENT_COUNT, (long) WINDOW_EVENT_COUNT)),
systemValuesCaptor.getAllValues().get(0));
Assert.assertEquals(TRIGGER_STATE_KEY, keyCaptor.getAllValues().get(1));
Assert.assertEquals(Optional.of(tupleTs), systemValuesCaptor.getAllValues().get(1));
}
@Test
public void testCacheEviction() {
Mockito.when(mockWaterMarkEventGenerator.track(Mockito.any(), Mockito.anyLong())).thenReturn(true);
executor.initState(null);
executor.waterMarkEventGenerator = mockWaterMarkEventGenerator;
int tupleCount = 20000;
List<Tuple> mockTuples = getMockTuples(tupleCount);
mockTuples.forEach(t -> executor.execute(t));
int numPartitions = tupleCount / WindowState.MAX_PARTITION_EVENTS;
int numEvictedPartitions = numPartitions - WindowState.MIN_PARTITIONS;
Mockito.verify(mockWindowState, Mockito.times(numEvictedPartitions)).put(longCaptor.capture(), windowValuesCaptor.capture());
// number of evicted events
Assert.assertEquals(numEvictedPartitions * WindowState.MAX_PARTITION_EVENTS, windowValuesCaptor.getAllValues().stream()
.mapToInt(x -> x.size()).sum());
Map<Long, WindowState.WindowPartition<Tuple>> partitionMap = new HashMap<>();
windowValuesCaptor.getAllValues().forEach(v -> partitionMap.put(v.getId(), v));
ArgumentCaptor<String> stringCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockPartitionState, Mockito.times(numPartitions)).put(stringCaptor.capture(), partitionValuesCaptor.capture());
// partition ids 0 .. 19
Assert.assertThat(partitionValuesCaptor.getAllValues().get(numPartitions - 1),
contains(LongStream.range(0, numPartitions).boxed().collect(Collectors.toList()).toArray(new Long[0])));
Mockito.when(mockWindowState.get(Mockito.any(), Mockito.any())).then(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long partition = invocation.getArgument(0);
WindowState.WindowPartition<Tuple> evicted = partitionMap.get(partition);
return evicted != null ? evicted : invocation.getArgument(1);
}
});
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
partitionMap.put((long) args[0], (WindowState.WindowPartition<Tuple>) args[1]);
return null;
}
}).when(mockWindowState).put(Mockito.any(), Mockito.any());
// trigger the window
long activationTs = tupleTs + 1000;
executor.getWindowManager().add(new WaterMarkEvent<>(activationTs));
Mockito.verify(mockBolt, Mockito.times(tupleCount / WINDOW_EVENT_COUNT)).execute(Mockito.any());
}
@Test
public void testRollbackBeforeInit() throws Exception {
executor.preRollback();
Mockito.verify(mockBolt, Mockito.times(1)).preRollback();
// partition ids
ArgumentCaptor<String> pkCatptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockPartitionState, Mockito.times(1)).rollback();
Mockito.verify(mockWindowState, Mockito.times(1)).rollback();
Mockito.verify(mockSystemState, Mockito.times(1)).rollback();
}
@Test
public void testRollbackAfterInit() throws Exception {
executor.initState(null);
executor.prePrepare(0);
executor.preRollback();
Mockito.verify(mockBolt, Mockito.times(1)).preRollback();
Mockito.verify(mockPartitionState, Mockito.times(1)).rollback();
ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
Mockito.verify(mockPartitionState, Mockito.times(2)).put(stringArgumentCaptor.capture(), partitionValuesCaptor.capture());
Mockito.verify(mockWindowState, Mockito.times(1)).rollback();
Mockito.verify(mockSystemState, Mockito.times(1)).rollback();
Mockito.verify(mockSystemState, Mockito.times(2)).iterator();
}
private List<Tuple> getMockTuples(long count) {
List<Tuple> tuples = new ArrayList<>();
for (int i = 0; i < count; i++) {
tuples.add(Mockito.mock(Tuple.class));
}
return tuples;
}
}