| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kinesis; |
| |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.functions.RuntimeContext; |
| import org.apache.flink.api.common.serialization.DeserializationSchema; |
| import org.apache.flink.api.common.serialization.SimpleStringSchema; |
| import org.apache.flink.api.common.state.ListState; |
| import org.apache.flink.api.common.state.ListStateDescriptor; |
| import org.apache.flink.api.common.state.OperatorStateStore; |
| import org.apache.flink.api.common.time.Deadline; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.api.java.typeutils.TupleTypeInfo; |
| import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.mock.Whitebox; |
| import org.apache.flink.runtime.state.FunctionSnapshotContext; |
| import org.apache.flink.runtime.state.StateInitializationContext; |
| import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; |
| import org.apache.flink.streaming.api.TimeCharacteristic; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction; |
| import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; |
| import org.apache.flink.streaming.api.operators.StreamSource; |
| import org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext; |
| import org.apache.flink.streaming.api.watermark.Watermark; |
| import org.apache.flink.streaming.api.windowing.time.Time; |
| import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; |
| import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; |
| import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; |
| import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; |
| import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; |
| import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; |
| import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; |
| import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; |
| import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; |
| import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory; |
| import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator; |
| import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; |
| import org.apache.flink.streaming.connectors.kinesis.testutils.TestableFlinkKinesisConsumer; |
| import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; |
| import org.apache.flink.streaming.connectors.kinesis.util.KinesisStateUtil; |
| import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter; |
| import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; |
| import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; |
| import org.apache.flink.streaming.util.CollectingSourceContext; |
| import org.apache.flink.streaming.util.MockStreamingRuntimeContext; |
| import org.apache.flink.types.PojoTestUtils; |
| import org.apache.flink.util.InstantiationUtil; |
| import org.apache.flink.util.TestLogger; |
| |
| import com.amazonaws.services.kinesis.model.HashKeyRange; |
| import com.amazonaws.services.kinesis.model.SequenceNumberRange; |
| import com.amazonaws.services.kinesis.model.Shard; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.MockedStatic; |
| import org.mockito.Mockito; |
| import org.powermock.api.mockito.PowerMockito; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.powermock.modules.junit4.PowerMockRunner; |
| |
| import java.io.Serializable; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Supplier; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.mockStatic; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| /** Suite of FlinkKinesisConsumer tests for the methods called throughout the source life cycle. */ |
| @RunWith(PowerMockRunner.class) |
| @PrepareForTest(FlinkKinesisConsumer.class) |
| public class FlinkKinesisConsumerTest extends TestLogger { |
| |
| // ---------------------------------------------------------------------- |
| // Tests related to state initialization |
| // ---------------------------------------------------------------------- |
| |
| @Test |
| public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception { |
| Properties config = TestUtils.getStandardProperties(); |
| |
| List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = |
| Arrays.asList( |
| createShardState("fakeStream", 0, "1"), |
| createShardState("fakeStream", 1, "1"), |
| createShardState("fakeStream", 2, "1"), |
| createShardState("fakeStream", 3, "1")); |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| listState.addAll(globalUnionState); |
| |
| FlinkKinesisConsumer<String> consumer = |
| new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); |
| RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0); |
| consumer.setRuntimeContext(context); |
| |
| OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); |
| when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) |
| .thenReturn(listState); |
| |
| StateInitializationContext initializationContext = mock(StateInitializationContext.class); |
| when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); |
| when(initializationContext.isRestored()).thenReturn(true); |
| |
| consumer.initializeState(initializationContext); |
| |
| // only opened, not run |
| consumer.open(new Configuration()); |
| |
| // arbitrary checkpoint id and timestamp |
| consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123)); |
| |
| assertThat(listState.isClearCalled()).isTrue(); |
| |
| // the checkpointed list state should contain only the shards that it should subscribe to |
| assertThat(listState.getList()).hasSize(globalUnionState.size() / 2); |
| assertThat(listState.getList()).contains(globalUnionState.get(0)); |
| assertThat(listState.getList()).contains(globalUnionState.get(2)); |
| } |
| |
| @Test |
| public void testListStateChangedAfterSnapshotState() throws Exception { |
| |
| // ---------------------------------------------------------------------- |
| // setup config, initial state and expected state snapshot |
| // ---------------------------------------------------------------------- |
| List<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = |
| Collections.singletonList(createShardState("fakeStream", 0, "1")); |
| |
| List<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot = |
| Arrays.asList( |
| createShardState("fakeStream", 0, "12"), |
| createShardState("fakeStream", 1, "11"), |
| createShardState("fakeStream", 2, "31")); |
| |
| // ---------------------------------------------------------------------- |
| // mock operator state backend and initial state for initializeState() |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| listState.addAll(initialState); |
| |
| // ---------------------------------------------------------------------- |
| // mock a running fetcher and its state for snapshot |
| // ---------------------------------------------------------------------- |
| |
| HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>(); |
| for (Tuple2<StreamShardMetadata, SequenceNumber> tuple : expectedStateSnapshot) { |
| stateSnapshot.put(tuple.f0, tuple.f1); |
| } |
| |
| KinesisDataFetcher<String> mockedFetcher = mock(KinesisDataFetcher.class); |
| when(mockedFetcher.snapshotState()).thenReturn(stateSnapshot); |
| |
| // ---------------------------------------------------------------------- |
| // create a consumer and test the snapshotState() |
| // ---------------------------------------------------------------------- |
| |
| FlinkKinesisConsumer<String> mockedConsumer = |
| prepareMockedConsumer( |
| "fakeStream", new SimpleStringSchema(), mockedFetcher, listState); |
| |
| mockedConsumer.snapshotState(mock(FunctionSnapshotContext.class)); |
| |
| // ---------------------------------------------------------------------- |
| // verify that state had been updated |
| // ---------------------------------------------------------------------- |
| assertThat(listState.clearCalled).isTrue(); |
| assertThat(listState.getList()) |
| .hasSize(3) |
| .doesNotContainAnyElementsOf(initialState) |
| .containsAll(expectedStateSnapshot); |
| } |
| |
| @Test |
| public void testSnapshotStateChangesAfterCancel() throws Exception { |
| |
| // ---------------------------------------------------------------------- |
| // setup config, initial state and expected state snapshot |
| // ---------------------------------------------------------------------- |
| List<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = |
| Collections.singletonList(createShardState("fakeStream", 0, "11")); |
| |
| List<Tuple2<StreamShardMetadata, SequenceNumber>> expectedStateSnapshot = |
| Collections.singletonList(createShardState("fakeStream", 0, "12")); |
| |
| // ---------------------------------------------------------------------- |
| // mock operator state backend and initial state for initializeState() |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| listState.addAll(initialState); |
| |
| // ---------------------------------------------------------------------- |
| // mock a running fetcher and its state for snapshot |
| // ---------------------------------------------------------------------- |
| |
| HashMap<StreamShardMetadata, SequenceNumber> stateSnapshot = new HashMap<>(); |
| for (Tuple2<StreamShardMetadata, SequenceNumber> tuple : expectedStateSnapshot) { |
| stateSnapshot.put(tuple.f0, tuple.f1); |
| } |
| |
| KinesisDataFetcher<String> mockedFetcher = mock(KinesisDataFetcher.class); |
| when(mockedFetcher.snapshotState()).thenReturn(stateSnapshot); |
| |
| // ---------------------------------------------------------------------- |
| // create a consumer and test the snapshotState() |
| // ---------------------------------------------------------------------- |
| |
| FlinkKinesisConsumer<String> mockedConsumer = |
| prepareMockedConsumer( |
| "fakeStream", new SimpleStringSchema(), mockedFetcher, listState); |
| |
| mockedConsumer.cancel(); |
| mockedConsumer.snapshotState(new MockFunctionSnapshotContext(2)); |
| verify(mockedFetcher, times(1)).snapshotState(); |
| |
| // ---------------------------------------------------------------------- |
| // verify that state had been updated |
| // ---------------------------------------------------------------------- |
| assertThat(listState.isClearCalled()).isTrue(); |
| assertThat(listState.getList()) |
| .hasSize(1) |
| .doesNotContainAnyElementsOf(initialState) |
| .containsAll(expectedStateSnapshot); |
| } |
| |
| @Test |
| public void testSnapshotStateNotChangedAfterClose() throws Exception { |
| |
| // ---------------------------------------------------------------------- |
| // setup initial state |
| // ---------------------------------------------------------------------- |
| |
| List<Tuple2<StreamShardMetadata, SequenceNumber>> initialState = |
| Collections.singletonList(createShardState("fakeStream", 0, "11")); |
| |
| // ---------------------------------------------------------------------- |
| // mock initial state |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| listState.addAll(initialState); |
| |
| // ---------------------------------------------------------------------- |
| // mock a running fetcher and its state for snapshot |
| // ---------------------------------------------------------------------- |
| |
| KinesisDataFetcher<String> mockedFetcher = mock(KinesisDataFetcher.class); |
| when(mockedFetcher.snapshotState()).thenReturn(new HashMap<>()); |
| |
| // ---------------------------------------------------------------------- |
| // create a consumer and test the snapshotState() |
| // ---------------------------------------------------------------------- |
| |
| FlinkKinesisConsumer<String> mockedConsumer = |
| prepareMockedConsumer( |
| "fakeStream", new SimpleStringSchema(), mockedFetcher, listState); |
| |
| mockedConsumer.close(); |
| mockedConsumer.snapshotState(new MockFunctionSnapshotContext(3)); |
| verify(mockedFetcher, never()).snapshotState(); |
| assertThat(listState.isClearCalled()).isFalse(); |
| assertThat(listState.getList()).containsAll(initialState); |
| } |
| |
| private <T> FlinkKinesisConsumer<T> prepareMockedConsumer( |
| String streamName, |
| DeserializationSchema<T> schema, |
| KinesisDataFetcher<T> fetcher, |
| ListState<?> listState) |
| throws Exception { |
| |
| Properties config = TestUtils.getStandardProperties(); |
| |
| OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); |
| when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) |
| .thenReturn(listState); |
| |
| StateInitializationContext initializationContext = mock(StateInitializationContext.class); |
| when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); |
| when(initializationContext.isRestored()).thenReturn(true); |
| |
| FlinkKinesisConsumer<T> consumer = new FlinkKinesisConsumer<>(streamName, schema, config); |
| FlinkKinesisConsumer<T> mockedConsumer = spy(consumer); |
| |
| RuntimeContext context = new MockStreamingRuntimeContext(true, 1, 0); |
| |
| mockedConsumer.setRuntimeContext(context); |
| mockedConsumer.initializeState(initializationContext); |
| mockedConsumer.open(new Configuration()); |
| Whitebox.setInternalState(mockedConsumer, "fetcher", fetcher); // mock consumer as running. |
| |
| return mockedConsumer; |
| } |
| |
| /** |
| * Before using an explicit TypeSerializer for the state the {@link FlinkKinesisConsumer} was |
| * creating a serializer implicitly using a {@link TypeInformation}. After fixing issue |
| * FLINK-24943, * serializer is created explicitly. Here, we verify that previous approach is |
| * compatible with the new one. |
| */ |
| @Test |
| public void testExplicitStateSerializerCompatibility() throws Exception { |
| ExecutionConfig executionConfig = new ExecutionConfig(); |
| |
| Tuple2<StreamShardMetadata, SequenceNumber> tuple = createShardState("fakeStream", 0, "1"); |
| |
| // This is how serializer was created implicitly using a TypeInformation |
| // and since SequenceNumber is GenericType, Flink falls back to Kryo |
| TypeInformation<Tuple2<StreamShardMetadata, SequenceNumber>> originalShardsStateTypeInfo = |
| new TupleTypeInfo<>( |
| TypeInformation.of(StreamShardMetadata.class), |
| TypeInformation.of(SequenceNumber.class)); |
| TypeSerializer<Tuple2<StreamShardMetadata, SequenceNumber>> serializerFromTypeInfo = |
| originalShardsStateTypeInfo.createSerializer(executionConfig); |
| byte[] bytes = InstantiationUtil.serializeToByteArray(serializerFromTypeInfo, tuple); |
| |
| // This is how we create serializer explicitly with Kryo |
| TupleSerializer<Tuple2<StreamShardMetadata, SequenceNumber>> serializerFromKryo = |
| KinesisStateUtil.createShardsStateSerializer(executionConfig); |
| |
| Tuple2<StreamShardMetadata, SequenceNumber> actualTuple = |
| InstantiationUtil.deserializeFromByteArray(serializerFromKryo, bytes); |
| |
| // Both ways should be the same |
| assertThat(tuple) |
| .overridingErrorMessage( |
| "Explicit serializer is not compatible with " |
| + "implicit method of creating serializer using TypeInformation.") |
| .isEqualTo(actualTuple); |
| } |
| |
| private Tuple2<StreamShardMetadata, SequenceNumber> createShardState( |
| String streamName, int shardNumber, String sequenceNumber) { |
| return Tuple2.of( |
| KinesisDataFetcher.convertToStreamShardMetadata( |
| new StreamShardHandle( |
| streamName, |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder( |
| shardNumber)))), |
| new SequenceNumber(sequenceNumber)); |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Tests related to fetcher initialization |
| // ---------------------------------------------------------------------- |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() |
| throws Exception { |
| KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); |
| |
| // assume the given config is correct |
| try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) { |
| TestableFlinkKinesisConsumer consumer = |
| new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2); |
| consumer.open(new Configuration()); |
| consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); |
| } |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception { |
| |
| // ---------------------------------------------------------------------- |
| // setup initial state |
| // ---------------------------------------------------------------------- |
| |
| HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); |
| |
| // ---------------------------------------------------------------------- |
| // mock operator state backend and initial state for initializeState() |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) { |
| listState.add( |
| Tuple2.of( |
| KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), |
| state.getValue())); |
| } |
| |
| OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); |
| when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) |
| .thenReturn(listState); |
| |
| StateInitializationContext initializationContext = mock(StateInitializationContext.class); |
| when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); |
| when(initializationContext.isRestored()).thenReturn(true); |
| |
| // ---------------------------------------------------------------------- |
| // mock fetcher |
| // ---------------------------------------------------------------------- |
| |
| KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); |
| List<StreamShardHandle> shards = new ArrayList<>(); |
| shards.addAll(fakeRestoredState.keySet()); |
| when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); |
| |
| // assume the given config is correct |
| try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) { |
| |
| // ---------------------------------------------------------------------- |
| // start to test fetcher's initial state seeding |
| // ---------------------------------------------------------------------- |
| |
| TestableFlinkKinesisConsumer consumer = |
| new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2); |
| consumer.initializeState(initializationContext); |
| consumer.open(new Configuration()); |
| consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); |
| |
| for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : |
| fakeRestoredState.entrySet()) { |
| Mockito.verify(mockedFetcher) |
| .registerNewSubscribedShardState( |
| new KinesisStreamShardState( |
| KinesisDataFetcher.convertToStreamShardMetadata( |
| restoredShard.getKey()), |
| restoredShard.getKey(), |
| restoredShard.getValue())); |
| } |
| } |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws Exception { |
| |
| // ---------------------------------------------------------------------- |
| // setup initial state |
| // ---------------------------------------------------------------------- |
| |
| HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = |
| getFakeRestoredStore("fakeStream1"); |
| |
| HashMap<StreamShardHandle, SequenceNumber> fakeRestoredStateForOthers = |
| getFakeRestoredStore("fakeStream2"); |
| |
| // ---------------------------------------------------------------------- |
| // mock operator state backend and initial state for initializeState() |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) { |
| listState.add( |
| Tuple2.of( |
| KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), |
| state.getValue())); |
| } |
| for (Map.Entry<StreamShardHandle, SequenceNumber> state : |
| fakeRestoredStateForOthers.entrySet()) { |
| listState.add( |
| Tuple2.of( |
| KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), |
| state.getValue())); |
| } |
| |
| OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); |
| when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) |
| .thenReturn(listState); |
| |
| StateInitializationContext initializationContext = mock(StateInitializationContext.class); |
| when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); |
| when(initializationContext.isRestored()).thenReturn(true); |
| |
| // ---------------------------------------------------------------------- |
| // mock fetcher |
| // ---------------------------------------------------------------------- |
| |
| KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); |
| List<StreamShardHandle> shards = new ArrayList<>(); |
| shards.addAll(fakeRestoredState.keySet()); |
| when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); |
| |
| // assume the given config is correct |
| try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) { |
| |
| // ---------------------------------------------------------------------- |
| // start to test fetcher's initial state seeding |
| // ---------------------------------------------------------------------- |
| |
| TestableFlinkKinesisConsumer consumer = |
| new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2); |
| consumer.initializeState(initializationContext); |
| consumer.open(new Configuration()); |
| consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); |
| |
| for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : |
| fakeRestoredStateForOthers.entrySet()) { |
| // should never get restored state not belonging to itself |
| Mockito.verify(mockedFetcher, never()) |
| .registerNewSubscribedShardState( |
| new KinesisStreamShardState( |
| KinesisDataFetcher.convertToStreamShardMetadata( |
| restoredShard.getKey()), |
| restoredShard.getKey(), |
| restoredShard.getValue())); |
| } |
| for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : |
| fakeRestoredState.entrySet()) { |
| // should get restored state belonging to itself |
| Mockito.verify(mockedFetcher) |
| .registerNewSubscribedShardState( |
| new KinesisStreamShardState( |
| KinesisDataFetcher.convertToStreamShardMetadata( |
| restoredShard.getKey()), |
| restoredShard.getKey(), |
| restoredShard.getValue())); |
| } |
| } |
| } |
| |
| /* |
| * This tests that the consumer correctly picks up shards that were not discovered on the previous run. |
| * |
| * Case under test: |
| * |
| * If the original parallelism is 2 and states are: |
| * Consumer subtask 1: |
| * stream1, shard1, SequentialNumber(xxx) |
| * Consumer subtask 2: |
| * stream1, shard2, SequentialNumber(yyy) |
| * |
| * After discoverNewShardsToSubscribe() if there were two shards (shard3, shard4) created: |
| * Consumer subtask 1 (late for discoverNewShardsToSubscribe()): |
| * stream1, shard1, SequentialNumber(xxx) |
| * Consumer subtask 2: |
| * stream1, shard2, SequentialNumber(yyy) |
| * stream1, shard4, SequentialNumber(zzz) |
| * |
| * If snapshotState() occurs and parallelism is changed to 1: |
| * Union state will be: |
| * stream1, shard1, SequentialNumber(xxx) |
| * stream1, shard2, SequentialNumber(yyy) |
| * stream1, shard4, SequentialNumber(zzz) |
| * Fetcher should be seeded with: |
| * stream1, shard1, SequentialNumber(xxx) |
| * stream1, shard2, SequentialNumber(yyy) |
| * stream1, share3, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM |
| * stream1, shard4, SequentialNumber(zzz) |
| */ |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() |
| throws Exception { |
| |
| // ---------------------------------------------------------------------- |
| // setup initial state |
| // ---------------------------------------------------------------------- |
| |
| HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); |
| |
| // ---------------------------------------------------------------------- |
| // mock operator state backend and initial state for initializeState() |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) { |
| listState.add( |
| Tuple2.of( |
| KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), |
| state.getValue())); |
| } |
| |
| OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); |
| when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) |
| .thenReturn(listState); |
| |
| StateInitializationContext initializationContext = mock(StateInitializationContext.class); |
| when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); |
| when(initializationContext.isRestored()).thenReturn(true); |
| |
| // ---------------------------------------------------------------------- |
| // mock fetcher |
| // ---------------------------------------------------------------------- |
| |
| KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); |
| List<StreamShardHandle> shards = new ArrayList<>(); |
| shards.addAll(fakeRestoredState.keySet()); |
| shards.add( |
| new StreamShardHandle( |
| "fakeStream2", |
| new Shard() |
| .withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))); |
| when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); |
| |
| // assume the given config is correct |
| try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) { |
| |
| // ---------------------------------------------------------------------- |
| // start to test fetcher's initial state seeding |
| // ---------------------------------------------------------------------- |
| |
| TestableFlinkKinesisConsumer consumer = |
| new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2); |
| consumer.initializeState(initializationContext); |
| consumer.open(new Configuration()); |
| consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); |
| |
| fakeRestoredState.put( |
| new StreamShardHandle( |
| "fakeStream2", |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder(2))), |
| SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()); |
| for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : |
| fakeRestoredState.entrySet()) { |
| Mockito.verify(mockedFetcher) |
| .registerNewSubscribedShardState( |
| new KinesisStreamShardState( |
| KinesisDataFetcher.convertToStreamShardMetadata( |
| restoredShard.getKey()), |
| restoredShard.getKey(), |
| restoredShard.getValue())); |
| } |
| } |
| } |
| |
| @Test |
| public void testLegacyKinesisStreamShardToStreamShardMetadataConversion() { |
| String streamName = "fakeStream1"; |
| String shardId = "shard-000001"; |
| String parentShardId = "shard-000002"; |
| String adjacentParentShardId = "shard-000003"; |
| String startingHashKey = "key-000001"; |
| String endingHashKey = "key-000010"; |
| String startingSequenceNumber = "seq-0000021"; |
| String endingSequenceNumber = "seq-00000031"; |
| |
| StreamShardMetadata streamShardMetadata = new StreamShardMetadata(); |
| streamShardMetadata.setStreamName(streamName); |
| streamShardMetadata.setShardId(shardId); |
| streamShardMetadata.setParentShardId(parentShardId); |
| streamShardMetadata.setAdjacentParentShardId(adjacentParentShardId); |
| streamShardMetadata.setStartingHashKey(startingHashKey); |
| streamShardMetadata.setEndingHashKey(endingHashKey); |
| streamShardMetadata.setStartingSequenceNumber(startingSequenceNumber); |
| streamShardMetadata.setEndingSequenceNumber(endingSequenceNumber); |
| |
| Shard shard = |
| new Shard() |
| .withShardId(shardId) |
| .withParentShardId(parentShardId) |
| .withAdjacentParentShardId(adjacentParentShardId) |
| .withHashKeyRange( |
| new HashKeyRange() |
| .withStartingHashKey(startingHashKey) |
| .withEndingHashKey(endingHashKey)) |
| .withSequenceNumberRange( |
| new SequenceNumberRange() |
| .withStartingSequenceNumber(startingSequenceNumber) |
| .withEndingSequenceNumber(endingSequenceNumber)); |
| KinesisStreamShard kinesisStreamShard = new KinesisStreamShard(streamName, shard); |
| |
| assertThat(KinesisStreamShard.convertToStreamShardMetadata(kinesisStreamShard)) |
| .isEqualTo(streamShardMetadata); |
| } |
| |
| @Test |
| public void testStreamShardMetadataSerializedUsingPojoSerializer() { |
| PojoTestUtils.assertSerializedAsPojo(StreamShardMetadata.class); |
| } |
| |
| /** |
| * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link |
| * StreamShardMetadata#getShardId()} or {@link StreamShardMetadata#getStreamName()} does not |
| * result in the shard not being able to be restored. This handles the corner case where the |
| * stored shard metadata is open (no ending sequence number), but after the job restore, the |
| * shard has been closed (ending number set) due to re-sharding, and we can no longer rely on |
| * {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection |
| * of restored shard metadata. |
| * |
| * <p>Therefore, we will rely on synchronizing the snapshot's state with the Kinesis shard |
| * before attempting to find back the sequence number to restore. |
| */ |
| @Test |
| public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheStateWasStored() |
| throws Exception { |
| // ---------------------------------------------------------------------- |
| // setup initial state |
| // ---------------------------------------------------------------------- |
| |
| HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all"); |
| |
| // ---------------------------------------------------------------------- |
| // mock operator state backend and initial state for initializeState() |
| // ---------------------------------------------------------------------- |
| |
| TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = |
| new TestingListState<>(); |
| for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) { |
| listState.add( |
| Tuple2.of( |
| KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), |
| state.getValue())); |
| } |
| |
| OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); |
| when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) |
| .thenReturn(listState); |
| |
| StateInitializationContext initializationContext = mock(StateInitializationContext.class); |
| when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); |
| when(initializationContext.isRestored()).thenReturn(true); |
| |
| // ---------------------------------------------------------------------- |
| // mock fetcher |
| // ---------------------------------------------------------------------- |
| |
| KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); |
| List<StreamShardHandle> shards = new ArrayList<>(); |
| |
| // create a fake stream shard handle based on the first entry in the restored state |
| final StreamShardHandle originalStreamShardHandle = |
| fakeRestoredState.keySet().iterator().next(); |
| final StreamShardHandle closedStreamShardHandle = |
| new StreamShardHandle( |
| originalStreamShardHandle.getStreamName(), |
| originalStreamShardHandle.getShard()); |
| // close the shard handle by setting an ending sequence number |
| final SequenceNumberRange sequenceNumberRange = new SequenceNumberRange(); |
| sequenceNumberRange.setEndingSequenceNumber("1293844"); |
| closedStreamShardHandle.getShard().setSequenceNumberRange(sequenceNumberRange); |
| |
| shards.add(closedStreamShardHandle); |
| |
| when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards); |
| |
| // assume the given config is correct |
| try (MockedStatic<KinesisConfigUtil> kcu = mockStatic(KinesisConfigUtil.class)) { |
| |
| // ---------------------------------------------------------------------- |
| // start to test fetcher's initial state seeding |
| // ---------------------------------------------------------------------- |
| |
| TestableFlinkKinesisConsumer consumer = |
| new TestableFlinkKinesisConsumer("fakeStream", new Properties(), 10, 2); |
| consumer.initializeState(initializationContext); |
| consumer.open(new Configuration()); |
| consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); |
| |
| Mockito.verify(mockedFetcher) |
| .registerNewSubscribedShardState( |
| new KinesisStreamShardState( |
| KinesisDataFetcher.convertToStreamShardMetadata( |
| closedStreamShardHandle), |
| closedStreamShardHandle, |
| fakeRestoredState.get(closedStreamShardHandle))); |
| } |
| } |
| |
| private static final class TestingListState<T> implements ListState<T> { |
| |
| private final List<T> list = new ArrayList<>(); |
| private boolean clearCalled = false; |
| |
| @Override |
| public void clear() { |
| list.clear(); |
| clearCalled = true; |
| } |
| |
| @Override |
| public Iterable<T> get() throws Exception { |
| return list; |
| } |
| |
| @Override |
| public void add(T value) throws Exception { |
| list.add(value); |
| } |
| |
| public List<T> getList() { |
| return list; |
| } |
| |
| public boolean isClearCalled() { |
| return clearCalled; |
| } |
| |
| @Override |
| public void update(List<T> values) throws Exception { |
| list.clear(); |
| |
| addAll(values); |
| } |
| |
| @Override |
| public void addAll(List<T> values) throws Exception { |
| if (values != null) { |
| list.addAll(values); |
| } |
| } |
| } |
| |
| private HashMap<StreamShardHandle, SequenceNumber> getFakeRestoredStore(String streamName) { |
| HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = new HashMap<>(); |
| |
| if (streamName.equals("fakeStream1") || streamName.equals("all")) { |
| fakeRestoredState.put( |
| new StreamShardHandle( |
| "fakeStream1", |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder(0))), |
| new SequenceNumber(UUID.randomUUID().toString())); |
| fakeRestoredState.put( |
| new StreamShardHandle( |
| "fakeStream1", |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder(1))), |
| new SequenceNumber(UUID.randomUUID().toString())); |
| fakeRestoredState.put( |
| new StreamShardHandle( |
| "fakeStream1", |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder(2))), |
| new SequenceNumber(UUID.randomUUID().toString())); |
| } |
| |
| if (streamName.equals("fakeStream2") || streamName.equals("all")) { |
| fakeRestoredState.put( |
| new StreamShardHandle( |
| "fakeStream2", |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder(0))), |
| new SequenceNumber(UUID.randomUUID().toString())); |
| fakeRestoredState.put( |
| new StreamShardHandle( |
| "fakeStream2", |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator.generateFromShardOrder(1))), |
| new SequenceNumber(UUID.randomUUID().toString())); |
| } |
| |
| return fakeRestoredState; |
| } |
| |
| private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception { |
| KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class); |
| |
| java.lang.reflect.Constructor<KinesisDataFetcher> ctor = |
| (java.lang.reflect.Constructor<KinesisDataFetcher>) |
| KinesisDataFetcher.class.getConstructors()[0]; |
| Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterCount() - 1]; |
| System.arraycopy( |
| ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterCount() - 1); |
| |
| Supplier<Object[]> argumentSupplier = |
| () -> { |
| Object[] otherParamArgs = new Object[otherParamTypes.length]; |
| for (int i = 0; i < otherParamTypes.length; i++) { |
| otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]); |
| } |
| return otherParamArgs; |
| }; |
| PowerMockito.whenNew(ctor) |
| .withArguments(Mockito.any(ctor.getParameterTypes()[0]), argumentSupplier.get()) |
| .thenReturn(mockedFetcher); |
| return mockedFetcher; |
| } |
| |
| @Test |
| public void testPeriodicWatermark() throws Exception { |
| String streamName = "fakeStreamName"; |
| Time maxOutOfOrderness = Time.milliseconds(5); |
| long autoWatermarkInterval = 1_000; |
| |
| HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>(); |
| subscribedStreamsToLastDiscoveredShardIds.put(streamName, null); |
| |
| KinesisDeserializationSchema<String> deserializationSchema = |
| new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()); |
| Properties props = new Properties(); |
| props.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); |
| props.setProperty( |
| ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(10L)); |
| |
| BlockingQueue<String> shard1 = new LinkedBlockingQueue<>(); |
| BlockingQueue<String> shard2 = new LinkedBlockingQueue<>(); |
| |
| Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>(); |
| streamToQueueMap.put(streamName, Arrays.asList(shard1, shard2)); |
| |
| // override createFetcher to mock Kinesis |
| FlinkKinesisConsumer<String> sourceFunc = |
| new FlinkKinesisConsumer<String>(streamName, deserializationSchema, props) { |
| @Override |
| protected KinesisDataFetcher<String> createFetcher( |
| List<String> streams, |
| SourceContext<String> sourceContext, |
| RuntimeContext runtimeContext, |
| Properties configProps, |
| KinesisDeserializationSchema<String> deserializationSchema) { |
| |
| KinesisDataFetcher<String> fetcher = |
| new KinesisDataFetcher<String>( |
| streams, |
| sourceContext, |
| sourceContext.getCheckpointLock(), |
| runtimeContext, |
| configProps, |
| deserializationSchema, |
| getShardAssigner(), |
| getPeriodicWatermarkAssigner(), |
| null, |
| new AtomicReference<>(), |
| new ArrayList<>(), |
| subscribedStreamsToLastDiscoveredShardIds, |
| (props) -> |
| FakeKinesisBehavioursFactory |
| .blockingQueueGetRecords(streamToQueueMap), |
| null) {}; |
| return fetcher; |
| } |
| }; |
| |
| sourceFunc.setShardAssigner( |
| (streamShardHandle, i) -> { |
| // shardId-000000000000 |
| return Integer.parseInt( |
| streamShardHandle |
| .getShard() |
| .getShardId() |
| .substring("shardId-".length())); |
| }); |
| |
| sourceFunc.setPeriodicWatermarkAssigner(new TestTimestampExtractor(maxOutOfOrderness)); |
| |
| // there is currently no test harness specifically for sources, |
| // so we overlay the source thread here |
| AbstractStreamOperatorTestHarness<Object> testHarness = |
| new AbstractStreamOperatorTestHarness<Object>( |
| new StreamSource(sourceFunc), 1, 1, 0); |
| testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); |
| testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval); |
| |
| testHarness.initializeEmptyState(); |
| testHarness.open(); |
| |
| ConcurrentLinkedQueue<Watermark> watermarks = new ConcurrentLinkedQueue<>(); |
| |
| @SuppressWarnings("unchecked") |
| SourceFunction.SourceContext<String> sourceContext = |
| new CollectingSourceContext( |
| testHarness.getCheckpointLock(), testHarness.getOutput()) { |
| @Override |
| public void emitWatermark(Watermark mark) { |
| watermarks.add(mark); |
| } |
| |
| @Override |
| public void markAsTemporarilyIdle() {} |
| }; |
| |
| new Thread( |
| () -> { |
| try { |
| sourceFunc.run(sourceContext); |
| } catch (InterruptedException e) { |
| // expected on cancel |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }) |
| .start(); |
| |
| shard1.put("1"); |
| shard1.put("2"); |
| shard2.put("10"); |
| int recordCount = 3; |
| int watermarkCount = 0; |
| awaitRecordCount(testHarness.getOutput(), recordCount); |
| |
| // Trigger watermark emit, first watermark is -3 |
| // - Shard-1 @2 |
| // - Shard-2 @10 |
| // - Watermark = min(2, 10) - maxOutOfOrderness = 2 - 5 = -3 |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); |
| watermarkCount++; |
| |
| // advance watermark |
| shard1.put("10"); |
| recordCount++; |
| awaitRecordCount(testHarness.getOutput(), recordCount); |
| |
| // Trigger watermark emit, second watermark is -3 |
| // - Shard-1 @10 |
| // - Shard-2 @10 |
| // - Watermark = min(10, 10) - maxOutOfOrderness = 10 - 5 = 5 |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); |
| watermarkCount++; |
| |
| sourceFunc.cancel(); |
| testHarness.close(); |
| |
| assertThat(testHarness.getOutput()).as("record count").hasSize(recordCount); |
| assertThat(watermarks).contains(new Watermark(-3), new Watermark(5)); |
| assertThat(watermarks).as("watermark count").hasSize(watermarkCount); |
| } |
| |
| @Test |
| public void testSourceSynchronization() throws Exception { |
| |
| final String streamName = "fakeStreamName"; |
| final Time maxOutOfOrderness = Time.milliseconds(5); |
| final long autoWatermarkInterval = 1_000; |
| final long watermarkSyncInterval = autoWatermarkInterval + 1; |
| |
| TestWatermarkTracker.WATERMARK.set(0); |
| HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds = new HashMap<>(); |
| subscribedStreamsToLastDiscoveredShardIds.put(streamName, null); |
| |
| final KinesisDeserializationSchema<String> deserializationSchema = |
| new KinesisDeserializationSchemaWrapper<>(new OpenCheckingStringSchema()); |
| Properties props = new Properties(); |
| props.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); |
| props.setProperty( |
| ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(10L)); |
| props.setProperty( |
| ConsumerConfigConstants.WATERMARK_SYNC_MILLIS, |
| Long.toString(watermarkSyncInterval)); |
| props.setProperty(ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS, Long.toString(5)); |
| |
| BlockingQueue<String> shard1 = new LinkedBlockingQueue<>(); |
| |
| Map<String, List<BlockingQueue<String>>> streamToQueueMap = new HashMap<>(); |
| streamToQueueMap.put(streamName, Collections.singletonList(shard1)); |
| |
| // override createFetcher to mock Kinesis |
| FlinkKinesisConsumer<String> sourceFunc = |
| new FlinkKinesisConsumer<String>(streamName, deserializationSchema, props) { |
| @Override |
| protected KinesisDataFetcher<String> createFetcher( |
| List<String> streams, |
| SourceFunction.SourceContext<String> sourceContext, |
| RuntimeContext runtimeContext, |
| Properties configProps, |
| KinesisDeserializationSchema<String> deserializationSchema) { |
| |
| KinesisDataFetcher<String> fetcher = |
| new KinesisDataFetcher<String>( |
| streams, |
| sourceContext, |
| sourceContext.getCheckpointLock(), |
| runtimeContext, |
| configProps, |
| deserializationSchema, |
| getShardAssigner(), |
| getPeriodicWatermarkAssigner(), |
| getWatermarkTracker(), |
| new AtomicReference<>(), |
| new ArrayList<>(), |
| subscribedStreamsToLastDiscoveredShardIds, |
| (props) -> |
| FakeKinesisBehavioursFactory |
| .blockingQueueGetRecords(streamToQueueMap), |
| null) { |
| @Override |
| protected void emitWatermark() { |
| // necessary in this test to ensure that watermark state is |
| // updated |
| // before the watermark timer callback is triggered |
| synchronized (sourceContext.getCheckpointLock()) { |
| super.emitWatermark(); |
| } |
| } |
| }; |
| return fetcher; |
| } |
| }; |
| |
| sourceFunc.setShardAssigner( |
| (streamShardHandle, i) -> { |
| // shardId-000000000000 |
| return Integer.parseInt( |
| streamShardHandle |
| .getShard() |
| .getShardId() |
| .substring("shardId-".length())); |
| }); |
| |
| sourceFunc.setPeriodicWatermarkAssigner(new TestTimestampExtractor(maxOutOfOrderness)); |
| |
| sourceFunc.setWatermarkTracker(new TestWatermarkTracker()); |
| |
| // there is currently no test harness specifically for sources, |
| // so we overlay the source thread here |
| AbstractStreamOperatorTestHarness<Object> testHarness = |
| new AbstractStreamOperatorTestHarness<Object>( |
| new StreamSource(sourceFunc), 1, 1, 0); |
| testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); |
| testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval); |
| |
| testHarness.initializeEmptyState(); |
| testHarness.open(); |
| |
| final ConcurrentLinkedQueue<Object> results = testHarness.getOutput(); |
| final AtomicBoolean throwOnCollect = new AtomicBoolean(); |
| |
| @SuppressWarnings("unchecked") |
| SourceFunction.SourceContext<String> sourceContext = |
| new CollectingSourceContext(testHarness.getCheckpointLock(), results) { |
| @Override |
| public void markAsTemporarilyIdle() {} |
| |
| @Override |
| public void collect(Serializable element) { |
| if (throwOnCollect.get()) { |
| throw new RuntimeException("expected"); |
| } |
| super.collect(element); |
| } |
| |
| @Override |
| public void emitWatermark(Watermark mark) { |
| results.add(mark); |
| } |
| }; |
| |
| final AtomicReference<Exception> sourceThreadError = new AtomicReference<>(); |
| new Thread( |
| () -> { |
| try { |
| sourceFunc.run(sourceContext); |
| } catch (InterruptedException e) { |
| // expected on cancel |
| } catch (Exception e) { |
| sourceThreadError.set(e); |
| } |
| }) |
| .start(); |
| |
| ArrayList<Object> expectedResults = new ArrayList<>(); |
| |
| final long record1 = 1; |
| shard1.put(Long.toString(record1)); |
| expectedResults.add(Long.toString(record1)); |
| awaitRecordCount(results, expectedResults.size()); |
| |
| // at this point we know the fetcher was initialized |
| final KinesisDataFetcher fetcher = |
| org.powermock.reflect.Whitebox.getInternalState(sourceFunc, "fetcher"); |
| |
| // trigger watermark emit |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); |
| expectedResults.add(new Watermark(-4)); |
| // verify watermark |
| awaitRecordCount(results, expectedResults.size()); |
| assertThat(results).contains(expectedResults.toArray()); |
| assertThat(TestWatermarkTracker.WATERMARK.get()).isEqualTo(0); |
| |
| // trigger sync |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + 1); |
| TestWatermarkTracker.assertGlobalWatermark(-4); |
| |
| final long record2 = record1 + (watermarkSyncInterval * 3) + 1; |
| shard1.put(Long.toString(record2)); |
| |
| // wait for the record to be buffered in the emitter |
| final RecordEmitter<?> emitter = |
| org.powermock.reflect.Whitebox.getInternalState(fetcher, "recordEmitter"); |
| RecordEmitter.RecordQueue emitterQueue = emitter.getQueue(0); |
| Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10)); |
| while (deadline.hasTimeLeft() && emitterQueue.getSize() < 1) { |
| Thread.sleep(10); |
| } |
| assertThat(emitterQueue.getSize()).as("first record received").isEqualTo(1); |
| |
| // Advance the watermark. Since the new record is past global watermark + threshold, |
| // it won't be emitted and the watermark does not advance |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); |
| assertThat(results).contains(expectedResults.toArray()); |
| assertThat((long) org.powermock.reflect.Whitebox.getInternalState(fetcher, "nextWatermark")) |
| .isEqualTo(3000L); |
| TestWatermarkTracker.assertGlobalWatermark(-4); |
| |
| // Trigger global watermark sync |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + 1); |
| expectedResults.add(Long.toString(record2)); |
| awaitRecordCount(results, expectedResults.size()); |
| assertThat(results).contains(expectedResults.toArray()); |
| TestWatermarkTracker.assertGlobalWatermark(3000); |
| |
| // Trigger watermark update and emit |
| testHarness.setProcessingTime(testHarness.getProcessingTime() + autoWatermarkInterval); |
| expectedResults.add(new Watermark(3000)); |
| assertThat(results).contains(expectedResults.toArray()); |
| |
| // verify exception propagation |
| assertThat(sourceThreadError.get()).isNull(); |
| throwOnCollect.set(true); |
| shard1.put(Long.toString(record2 + 1)); |
| |
| deadline = Deadline.fromNow(Duration.ofSeconds(10)); |
| while (deadline.hasTimeLeft() && sourceThreadError.get() == null) { |
| Thread.sleep(10); |
| } |
| assertThat(sourceThreadError.get()).isNotNull(); |
| assertThat(sourceThreadError.get().getMessage()).as("expected").isNotNull(); |
| |
| sourceFunc.cancel(); |
| testHarness.close(); |
| } |
| |
| @Test |
| public void testCloseConnectorBeforeSubtaskStart() throws Exception { |
| Properties config = TestUtils.getStandardProperties(); |
| FlinkKinesisConsumer<String> consumer = |
| new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config); |
| consumer.close(); |
| } |
| |
| private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count) |
| throws Exception { |
| Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10)); |
| while (deadline.hasTimeLeft() && queue.size() < count) { |
| Thread.sleep(10); |
| } |
| |
| int received = queue.size(); |
| assertThat(received).isEqualTo(count); |
| } |
| |
| private static class OpenCheckingStringSchema extends SimpleStringSchema { |
| private boolean opened = false; |
| |
| @Override |
| public void open(DeserializationSchema.InitializationContext context) throws Exception { |
| assertThat(context.getMetricGroup()).isNotNull(); |
| this.opened = true; |
| } |
| |
| @Override |
| public String deserialize(byte[] message) { |
| if (!opened) { |
| throw new AssertionError( |
| "DeserializationSchema was not opened before deserialization."); |
| } |
| return super.deserialize(message); |
| } |
| } |
| |
| private static class TestTimestampExtractor |
| extends BoundedOutOfOrdernessTimestampExtractor<String> { |
| private static final long serialVersionUID = 1L; |
| |
| public TestTimestampExtractor(Time maxAllowedLateness) { |
| super(maxAllowedLateness); |
| } |
| |
| @Override |
| public long extractTimestamp(String element) { |
| return Long.parseLong(element); |
| } |
| } |
| |
| private static class TestWatermarkTracker extends WatermarkTracker { |
| |
| private static final AtomicLong WATERMARK = new AtomicLong(); |
| |
| @Override |
| public long getUpdateTimeoutCount() { |
| return 0; |
| } |
| |
| @Override |
| public long updateWatermark(long localWatermark) { |
| WATERMARK.set(localWatermark); |
| return localWatermark; |
| } |
| |
| static void assertGlobalWatermark(long expected) { |
| assertThat(WATERMARK.get()).isEqualTo(expected); |
| } |
| } |
| } |