blob: 0cb56fed6cf1e9eab2aecc767231820b243bd6b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.topology;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TupleWindow;
import org.apache.storm.windowing.TupleWindowImpl;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.storm.topology.StatefulWindowedBoltExecutor.TaskStream;
import static org.apache.storm.topology.StatefulWindowedBoltExecutor.WindowState;
/**
* Unit tests for {@link StatefulWindowedBoltExecutor}
*/
public class StatefulWindowedBoltExecutorTest {
StatefulWindowedBoltExecutor<KeyValueState<String, String>> executor;
IStatefulWindowedBolt<KeyValueState<String, String>> mockBolt;
OutputCollector mockOutputCollector;
TopologyContext mockTopologyContext;
Map<String, Object> mockStormConf = new HashMap<>();
@Before
public void setUp() throws Exception {
mockBolt = Mockito.mock(IStatefulWindowedBolt.class);
mockTopologyContext = Mockito.mock(TopologyContext.class);
mockOutputCollector = Mockito.mock(OutputCollector.class);
executor = new StatefulWindowedBoltExecutor<>(mockBolt);
}
@Test(expected = IllegalArgumentException.class)
public void testPrepare() throws Exception {
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector);
}
@Test
public void testPrepareWithMsgid() throws Exception {
mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector);
}
@Test
public void testExecute() throws Exception {
mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
KeyValueState<TaskStream, WindowState> mockState;
mockState = Mockito.mock(KeyValueState.class);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
executor.initState(null);
List<Tuple> tuples = getMockTuples(5);
for (Tuple tuple : tuples) {
executor.execute(tuple);
}
Mockito.verify(mockBolt, Mockito.times(1)).execute(getTupleWindow(tuples));
WindowState expectedState = new WindowState(Long.MIN_VALUE, 4);
Mockito.verify(mockState, Mockito.times(1)).put(Mockito.any(TaskStream.class), Mockito.eq(expectedState));
}
@Test
public void testRecovery() throws Exception {
mockStormConf.put(Config.TOPOLOGY_BOLTS_MESSAGE_ID_FIELD_NAME, "msgid");
mockStormConf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, 5);
mockStormConf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, 5);
KeyValueState<TaskStream, WindowState> mockState;
mockState = Mockito.mock(KeyValueState.class);
Map<GlobalStreamId, Grouping> mockMap = Mockito.mock(Map.class);
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockMap);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
Mockito.when(mockMap.keySet()).thenReturn(Collections.singleton(new GlobalStreamId("a", "s")));
WindowState mockWindowState = new WindowState(4, 4);
Mockito.when(mockState.get(Mockito.any(TaskStream.class))).thenReturn(mockWindowState);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
executor.initState(null);
List<Tuple> tuples = getMockTuples(10);
for (Tuple tuple : tuples) {
executor.execute(tuple);
}
WindowState expectedState = new WindowState(4, 9);
Mockito.verify(mockState, Mockito.times(1)).put(Mockito.any(TaskStream.class), Mockito.eq(expectedState));
}
private TupleWindow getTupleWindow(List<Tuple> tuples) {
return new TupleWindowImpl(tuples, tuples, Collections.<Tuple>emptyList());
}
private List<Tuple> getMockTuples(int count) {
List<Tuple> mockTuples = new ArrayList<>();
for (long i = 0; i < count; i++) {
Tuple mockTuple = Mockito.mock(Tuple.class);
Mockito.when(mockTuple.getLongByField("msgid")).thenReturn(i);
Mockito.when(mockTuple.getSourceTask()).thenReturn(1);
Mockito.when(mockTuple.getSourceGlobalStreamId()).thenReturn(new GlobalStreamId("a", "s"));
mockTuples.add(mockTuple);
}
return mockTuples;
}
}