blob: 0484fea7133847fa2ecd5d2780dd25f577d92acc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.topology;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.spout.CheckpointSpout;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.storm.spout.CheckPointState.Action.COMMIT;
import static org.apache.storm.spout.CheckPointState.Action.INITSTATE;
import static org.apache.storm.spout.CheckPointState.Action.PREPARE;
import static org.apache.storm.spout.CheckPointState.Action.ROLLBACK;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_ACTION;
import static org.apache.storm.spout.CheckpointSpout.CHECKPOINT_FIELD_TXID;
import static org.mockito.Mockito.mock;
/**
* Unit tests for {@link StatefulBoltExecutor}
*/
public class StatefulBoltExecutorTest {
private StatefulBoltExecutor<KeyValueState<String, String>> executor;
private IStatefulBolt<KeyValueState<String, String>> mockBolt;
private TopologyContext mockTopologyContext;
private Tuple mockTuple;
private Tuple mockCheckpointTuple;
private Map<String, Object> mockStormConf = new HashMap<>();
private OutputCollector mockOutputCollector;
private KeyValueState<String, String> mockState;
@Before
public void setUp() throws Exception {
mockBolt = Mockito.mock(IStatefulBolt.class);
executor = new StatefulBoltExecutor<>(mockBolt);
mockTopologyContext = Mockito.mock(TopologyContext.class);
mockOutputCollector = Mockito.mock(OutputCollector.class);
mockState = Mockito.mock(KeyValueState.class);
Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
GlobalStreamId globalStreamId = new GlobalStreamId("test", CheckpointSpout.CHECKPOINT_STREAM_ID);
Map<GlobalStreamId, Grouping> thisSources = Collections.singletonMap(globalStreamId, mock(Grouping.class));
Mockito.when(mockTopologyContext.getThisSources()).thenReturn(thisSources);
Mockito.when(mockTopologyContext.getComponentTasks(Mockito.any())).thenReturn(Collections.singletonList(1));
mockTuple = Mockito.mock(Tuple.class);
mockCheckpointTuple = Mockito.mock(Tuple.class);
executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
}
@Test
public void testHandleTupleBeforeInit() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.verify(mockBolt, Mockito.times(0)).execute(Mockito.any(Tuple.class));
}
@Test
public void testHandleTuple() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(INITSTATE);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
Mockito.verify(mockBolt, Mockito.times(1)).execute(mockTuple);
Mockito.verify(mockBolt, Mockito.times(1)).initState(Mockito.any(KeyValueState.class));
}
@Test
public void testRollback() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
Mockito.verify(mockState, Mockito.times(1)).rollback();
}
@Test
public void testCommit() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
Mockito.verify(mockBolt, Mockito.times(1)).preCommit(new Long(0));
Mockito.verify(mockState, Mockito.times(1)).commit(new Long(0));
}
@Test
public void testPrepareAndRollbackBeforeInitstate() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(PREPARE);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
Mockito.verify(mockOutputCollector, Mockito.times(1)).fail(mockCheckpointTuple);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
executor.execute(mockCheckpointTuple);
Mockito.verify(mockState, Mockito.times(1)).rollback();
}
@Test
public void testCommitBeforeInitstate() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockCheckpointTuple);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
Mockito.verify(mockState, Mockito.times(1)).rollback();
}
@Test
public void testPrepareAndCommit() throws Exception {
Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(INITSTATE);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
executor.execute(mockCheckpointTuple);
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(PREPARE);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
executor.execute(mockTuple);
Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
executor.execute(mockCheckpointTuple);
mockOutputCollector.ack(mockTuple);
Mockito.verify(mockState, Mockito.times(1)).commit(new Long(100));
Mockito.verify(mockBolt, Mockito.times(2)).execute(mockTuple);
Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);
}
}