blob: 05c1cce4fce44c070cc929576cca1c71166c8211 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({FlinkKinesisConsumer.class, KinesisConfigUtil.class})
public class FlinkKinesisConsumerTest {
@Rule
private ExpectedException exception = ExpectedException.none();
// ----------------------------------------------------------------------
// Tests related to state initialization
// ----------------------------------------------------------------------
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
@Test
public void testListStateChangedAfterSnapshotState() throws Exception {
// ----------------------------------------------------------------------
// setup config, initial state and expected state snapshot
// ----------------------------------------------------------------------
Properties config = TestUtils.getStandardProperties();
ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = new ArrayList<>(1);
initialState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
ArrayList<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot = new ArrayList<>(3);
expectedStateSnapshot.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("12")));
expectedStateSnapshot.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("11")));
expectedStateSnapshot.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("31")));
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
listState.add(state);
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock a running fetcher and its state for snapshot
// ----------------------------------------------------------------------
HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> tuple : expectedStateSnapshot) {
stateSnapshot.put(tuple.f0, tuple.f1);
}
KinesisDataFetcher mockedFetcher = mock(KinesisDataFetcher.class);
when(mockedFetcher.snapshotState()).thenReturn(stateSnapshot);
// ----------------------------------------------------------------------
// create a consumer and test the snapshotState()
// ----------------------------------------------------------------------
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
FlinkKinesisConsumer<?> mockedConsumer = spy(consumer);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(1);
mockedConsumer.setRuntimeContext(context);
mockedConsumer.initializeState(initializationContext);
mockedConsumer.open(new Configuration());
Whitebox.setInternalState(mockedConsumer, "fetcher", mockedFetcher); // mock consumer as running.
mockedConsumer.snapshotState(mock(FunctionSnapshotContext.class));
assertEquals(true, listState.clearCalled);
assertEquals(3, listState.getList().size());
for (Tuple2<StreamShardMetadata, SequenceNumber> state : initialState) {
for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
assertNotEquals(state, currentState);
}
}
for (Tuple2<StreamShardMetadata, SequenceNumber> state : expectedStateSnapshot) {
boolean hasOneIsSame = false;
for (Tuple2<StreamShardMetadata, SequenceNumber> currentState : listState.getList()) {
hasOneIsSame = hasOneIsSame || state.equals(currentState);
}
assertEquals(true, hasOneIsSame);
}
}
// ----------------------------------------------------------------------
// Tests related to fetcher initialization
// ----------------------------------------------------------------------
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("fakeStream1");
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredStateForOthers.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredStateForOthers.entrySet()) {
// should never get restored state not belonging to itself
Mockito.verify(mockedFetcher, never()).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
// should get restored state belonging to itself
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
/*
* This tests that the consumer correctly picks up shards that were not discovered on the previous run.
*
* Case under test:
*
* If the original parallelism is 2 and states are:
* Consumer subtask 1:
* stream1, shard1, SequentialNumber(xxx)
* Consumer subtask 2:
* stream1, shard2, SequentialNumber(yyy)
*
* After discoverNewShardsToSubscribe() if there were two shards (shard3, shard4) created:
* Consumer subtask 1 (late for discoverNewShardsToSubscribe()):
* stream1, shard1, SequentialNumber(xxx)
* Consumer subtask 2:
* stream1, shard2, SequentialNumber(yyy)
* stream1, shard4, SequentialNumber(zzz)
*
* If snapshotState() occurs and parallelism is changed to 1:
* Union state will be:
* stream1, shard1, SequentialNumber(xxx)
* stream1, shard2, SequentialNumber(yyy)
* stream1, shard4, SequentialNumber(zzz)
* Fetcher should be seeded with:
* stream1, shard1, SequentialNumber(xxx)
* stream1, shard2, SequentialNumber(yyy)
* stream1, share3, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM
* stream1, shard4, SequentialNumber(zzz)
*/
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
@Test
public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() {
String streamName = "fakeStream1";
String shardId = "shard-000001";
String parentShardId = "shard-000002";
String adjacentParentShardId = "shard-000003";
String startingHashKey = "key-000001";
String endingHashKey = "key-000010";
String startingSequenceNumber = "seq-0000021";
String endingSequenceNumber = "seq-00000031";
StreamShardMetadata streamShardMetadata = new StreamShardMetadata();
streamShardMetadata.setStreamName(streamName);
streamShardMetadata.setShardId(shardId);
streamShardMetadata.setParentShardId(parentShardId);
streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId);
streamShardMetadata.setStartingHashKey(startingHashKey);
streamShardMetadata.setEndingHashKey(endingHashKey);
streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber);
streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber);
Shard shard = new Shard()
.withShardId(shardId)
.withParentShardId(parentShardId)
.withAdjacentParentShardId(adjacentParentShardId)
.withHashKeyRange(new HashKeyRange()
.withStartingHashKey(startingHashKey)
.withEndingHashKey(endingHashKey))
.withSequenceNumberRange(new SequenceNumberRange()
.withStartingSequenceNumber(startingSequenceNumber)
.withEndingSequenceNumber(endingSequenceNumber));
KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard);
assertEquals(streamShardMetadata, KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard));
}
@Test
public void testStreamShardMetadataSerializedUsingPojoSerializer() {
TypeInformation<StreamShardMetadata> typeInformation = TypeInformation.of(StreamShardMetadata.class);
assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer);
}
/**
* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
* {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
* <p></p>
* Therefore, we will rely on synchronizing the snapshot's state with the Kinesis shard before attempting to find back
* the sequence number to restore.
*/
@Test
public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheStateWasStored() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
List<StreamShardHandle> shards = new ArrayList<>();
// create a fake stream shard handle based on the first entry in the restored state
final StreamShardHandle originalStreamShardHandle = fakeRestoredState.keySet().iterator().next();
final StreamShardHandle closedStreamShardHandle = new StreamShardHandle(originalStreamShardHandle.getStreamName(), originalStreamShardHandle.getShard());
// close the shard handle by setting an ending sequence number
final SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
sequenceNumberRange.setEndingSequenceNumber("1293844");
closedStreamShardHandle.getShard().setSequenceNumberRange(sequenceNumberRange);
shards.add(closedStreamShardHandle);
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(closedStreamShardHandle),
closedStreamShardHandle, fakeRestoredState.get(closedStreamShardHandle)));
}
private static final class TestingListState<T> implements ListState<T> {
private final List<T> list = new ArrayList<>();
private boolean clearCalled = false;
@Override
public void clear() {
list.clear();
clearCalled = true;
}
@Override
public Iterable<T> get() throws Exception {
return list;
}
@Override
public void add(T value) throws Exception {
list.add(value);
}
public List<T> getList() {
return list;
}
public boolean isClearCalled() {
return clearCalled;
}
@Override
public void update(List<T> values) throws Exception {
list.clear();
addAll(values);
}
@Override
public void addAll(List<T> values) throws Exception {
if (values != null) {
list.addAll(values);
}
}
}
private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) {
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = new HashMap<>();
if (streamName.equals("fakeStream1") || streamName.equals("all")) {
fakeRestoredState.put(
new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
new SequenceNumber(UUID.randomUUID().toString()));
fakeRestoredState.put(
new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
new SequenceNumber(UUID.randomUUID().toString()));
fakeRestoredState.put(
new StreamShardHandle("fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
new SequenceNumber(UUID.randomUUID().toString()));
}
if (streamName.equals("fakeStream2") || streamName.equals("all")) {
fakeRestoredState.put(
new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
new SequenceNumber(UUID.randomUUID().toString()));
fakeRestoredState.put(
new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
new SequenceNumber(UUID.randomUUID().toString()));
}
return fakeRestoredState;
}
/**
* A non-serializable {@link KinesisDeserializationSchema} (because it is a nested class with reference
* to the enclosing class, which is not serializable) used for testing.
*/
private final class NonSerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
@Override
public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
return new String(recordValue);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
/**
* A static, serializable {@link KinesisDeserializationSchema}.
*/
private static final class SerializableDeserializationSchema implements KinesisDeserializationSchema<String> {
@Override
public String deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId) throws IOException {
return new String(recordValue);
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
}