blob: de60823403430b5e0bbe583b5514e0bba80b3781 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.spout;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.apache.storm.spout.CheckPointState.Action;
import static org.apache.storm.spout.CheckPointState.State.COMMITTED;
import static org.junit.Assert.assertEquals;
/**
* Unit test for {@link CheckpointSpout}
*/
public class CheckpointSpoutTest {
CheckpointSpout spout = new CheckpointSpout();
TopologyContext mockTopologyContext;
SpoutOutputCollector mockOutputCollector;
@Before
public void setUp() throws Exception {
mockTopologyContext = Mockito.mock(TopologyContext.class);
Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
mockOutputCollector = Mockito.mock(SpoutOutputCollector.class);
}
@Test
public void testInitState() throws Exception {
spout.open(new HashMap(), mockTopologyContext, mockOutputCollector);
spout.nextTuple();
Values expectedTuple = new Values(-1L, Action.INITSTATE);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
Mockito.verify(mockOutputCollector).emit(stream.capture(),
values.capture(),
msgId.capture());
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(-1L, msgId.getValue());
spout.ack(-1L);
Mockito.verify(mockOutputCollector).emit(stream.capture(),
values.capture(),
msgId.capture());
expectedTuple = new Values(-1L, Action.INITSTATE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(-1L, msgId.getValue());
}
@Test
public void testPrepare() throws Exception {
spout.open(new HashMap(), mockTopologyContext, mockOutputCollector);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
spout.nextTuple();
spout.ack(-1L);
spout.nextTuple();
Mockito.verify(mockOutputCollector, Mockito.times(2)).emit(stream.capture(),
values.capture(),
msgId.capture());
Values expectedTuple = new Values(0L, Action.PREPARE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(0L, msgId.getValue());
}
@Test
public void testPrepareWithFail() throws Exception {
Map<String, Object> topoConf = new HashMap<>();
KeyValueState<String, CheckPointState> state =
(KeyValueState<String, CheckPointState>) StateFactory.getState("__state", topoConf, mockTopologyContext);
CheckPointState txState = new CheckPointState(-1, COMMITTED);
state.put("__state", txState);
spout.open(mockTopologyContext, mockOutputCollector, 0, state);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
spout.nextTuple();
spout.ack(-1L);
Utils.sleep(10);
spout.nextTuple();
spout.ack(0L);
Utils.sleep(10);
spout.nextTuple();
spout.ack(0L);
Utils.sleep(10);
spout.nextTuple();
spout.fail(1L);
Utils.sleep(10);
spout.nextTuple();
spout.fail(1L);
Utils.sleep(10);
spout.nextTuple();
spout.ack(1L);
Utils.sleep(10);
spout.nextTuple();
spout.ack(0L);
Utils.sleep(10);
spout.nextTuple();
Mockito.verify(mockOutputCollector, Mockito.times(8)).emit(stream.capture(),
values.capture(),
msgId.capture());
Values expectedTuple = new Values(1L, Action.PREPARE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(1L, msgId.getValue());
}
@Test
public void testCommit() throws Exception {
Map<String, Object> topoConf = new HashMap();
topoConf.put(Config.TOPOLOGY_STATE_CHECKPOINT_INTERVAL, 0);
spout.open(topoConf, mockTopologyContext, mockOutputCollector);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
spout.nextTuple();
spout.ack(-1L);
spout.nextTuple();
spout.ack(0L);
Utils.sleep(10);
spout.nextTuple();
spout.fail(0L);
Utils.sleep(10);
spout.nextTuple();
Mockito.verify(mockOutputCollector, Mockito.times(4)).emit(stream.capture(),
values.capture(),
msgId.capture());
Values expectedTuple = new Values(0L, Action.COMMIT);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(0L, msgId.getValue());
}
@Test
public void testRecoveryRollback() throws Exception {
Map<String, Object> topoConf = new HashMap();
KeyValueState<String, CheckPointState> state =
(KeyValueState<String, CheckPointState>) StateFactory.getState("test-1", topoConf, mockTopologyContext);
CheckPointState checkPointState = new CheckPointState(100, CheckPointState.State.PREPARING);
state.put("__state", checkPointState);
spout.open(mockTopologyContext, mockOutputCollector, 0, state);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
spout.nextTuple();
Mockito.verify(mockOutputCollector, Mockito.times(1)).emit(stream.capture(),
values.capture(),
msgId.capture());
Values expectedTuple = new Values(100L, Action.ROLLBACK);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(100L, msgId.getValue());
}
@Test
public void testRecoveryRollbackAck() throws Exception {
Map<String, Object> topoConf = new HashMap();
KeyValueState<String, CheckPointState> state =
(KeyValueState<String, CheckPointState>) StateFactory.getState("test-1", topoConf, mockTopologyContext);
CheckPointState checkPointState = new CheckPointState(100, CheckPointState.State.PREPARING);
state.put("__state", checkPointState);
spout.open(mockTopologyContext, mockOutputCollector, 0, state);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
spout.nextTuple();
spout.ack(100L);
spout.nextTuple();
spout.ack(99L);
spout.nextTuple();
Mockito.verify(mockOutputCollector, Mockito.times(3)).emit(stream.capture(),
values.capture(),
msgId.capture());
Values expectedTuple = new Values(100L, Action.PREPARE);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(100L, msgId.getValue());
}
@Test
public void testRecoveryCommit() throws Exception {
Map<String, Object> topoConf = new HashMap();
KeyValueState<String, CheckPointState> state =
(KeyValueState<String, CheckPointState>) StateFactory.getState("test-1", topoConf, mockTopologyContext);
CheckPointState checkPointState = new CheckPointState(100, CheckPointState.State.COMMITTING);
state.put("__state", checkPointState);
spout.open(mockTopologyContext, mockOutputCollector, 0, state);
ArgumentCaptor<String> stream = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
ArgumentCaptor<Object> msgId = ArgumentCaptor.forClass(Object.class);
spout.nextTuple();
Mockito.verify(mockOutputCollector, Mockito.times(1)).emit(stream.capture(),
values.capture(),
msgId.capture());
Values expectedTuple = new Values(100L, Action.COMMIT);
assertEquals(CheckpointSpout.CHECKPOINT_STREAM_ID, stream.getValue());
assertEquals(expectedTuple, values.getValue());
assertEquals(100L, msgId.getValue());
}
}