blob: ccf39d0e19b2b68e342687fe594bbdc8818418bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcher;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for the {@link KinesisDataFetcher}.
*/
public class KinesisDataFetcherTest extends TestLogger {
@Test(expected = RuntimeException.class)
public void testIfNoShardsAreFoundShouldThrowException() throws Exception {
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
fakeStreams,
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.noShardsFoundForRequestedStreamsBehaviour());
fetcher.runFetcher(); // this should throw RuntimeException
}
@Test
public void testSkipCorruptedRecord() throws Exception {
final String stream = "fakeStream";
final int numShards = 3;
final LinkedList<KinesisStreamShardState> testShardStates = new LinkedList<>();
final TestSourceContext<String> sourceContext = new TestSourceContext<>();
final TestableKinesisDataFetcher<String> fetcher = new TestableKinesisDataFetcher<>(
Collections.singletonList(stream),
sourceContext,
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
1,
0,
new AtomicReference<>(),
testShardStates,
new HashMap<>(),
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(Collections.singletonMap(stream, numShards)));
// FlinkKinesisConsumer is responsible for setting up the fetcher before it can be run;
// run the consumer until it reaches the point where the fetcher starts to run
final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(TestUtils.getStandardProperties(), fetcher, 1, 0);
CheckedThread consumerThread = new CheckedThread() {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
}
};
consumerThread.start();
fetcher.waitUntilRun();
consumer.cancel();
consumerThread.sync();
assertEquals(numShards, testShardStates.size());
for (int i = 0; i < numShards; i++) {
fetcher.emitRecordAndUpdateState("record-" + i, 10L, i, new SequenceNumber("seq-num-1"));
assertEquals(new SequenceNumber("seq-num-1"), testShardStates.get(i).getLastProcessedSequenceNum());
assertEquals(new StreamRecord<>("record-" + i, 10L), sourceContext.removeLatestOutput());
}
// emitting a null (i.e., a corrupt record) should not produce any output, but still have the shard state updated
fetcher.emitRecordAndUpdateState(null, 10L, 1, new SequenceNumber("seq-num-2"));
assertEquals(new SequenceNumber("seq-num-2"), testShardStates.get(1).getLastProcessedSequenceNum());
assertEquals(null, sourceContext.removeLatestOutput()); // no output should have been collected
}
@Test
public void testStreamToLastSeenShardStateIsCorrectlySetWhenNotRestoringFromFailure() throws Exception {
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
fakeStreams.add("fakeStream3");
fakeStreams.add("fakeStream4");
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
Map<String, Integer> streamToShardCount = new HashMap<>();
Random rand = new Random();
for (String fakeStream : fakeStreams) {
streamToShardCount.put(fakeStream, rand.nextInt(5) + 1);
}
final TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
fakeStreams,
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
final DummyFlinkKinesisConsumer<String> consumer = new DummyFlinkKinesisConsumer<>(
TestUtils.getStandardProperties(), fetcher, 1, 0);
CheckedThread consumerThread = new CheckedThread() {
@Override
public void go() throws Exception {
consumer.run(new TestSourceContext<>());
}
};
consumerThread.start();
fetcher.waitUntilRun();
consumer.cancel();
consumerThread.sync();
// assert that the streams tracked in the state are identical to the subscribed streams
Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
assertEquals(fakeStreams.size(), streamsInState.size());
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1),
streamToLastSeenShard.getValue());
}
}
@Test
public void testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpoint() throws Exception {
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
Map<String, Integer> streamToShardCount = new HashMap<>();
streamToShardCount.put("fakeStream1", 3); // fakeStream1 will still have 3 shards after restore
streamToShardCount.put("fakeStream2", 2); // fakeStream2 will still have 2 shards after restore
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
final TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
fakeStreams,
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
CheckedThread runFetcherThread = new CheckedThread() {
@Override
public void go() throws Exception {
fetcher.runFetcher();
}
};
runFetcherThread.start();
fetcher.waitUntilInitialDiscovery();
fetcher.shutdownFetcher();
runFetcherThread.sync();
// assert that the streams tracked in the state are identical to the subscribed streams
Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
assertEquals(fakeStreams.size(), streamsInState.size());
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1),
streamToLastSeenShard.getValue());
}
}
@Test
public void testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpoint() throws Exception {
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
Map<String, Integer> streamToShardCount = new HashMap<>();
streamToShardCount.put("fakeStream1", 3 + 1); // fakeStream1 had 3 shards before & 1 new shard after restore
streamToShardCount.put("fakeStream2", 2 + 3); // fakeStream2 had 2 shards before & 3 new shard after restore
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
// using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore
final TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
fakeStreams,
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
CheckedThread runFetcherThread = new CheckedThread() {
@Override
public void go() throws Exception {
fetcher.runFetcher();
}
};
runFetcherThread.start();
fetcher.waitUntilInitialDiscovery();
fetcher.shutdownFetcher();
runFetcherThread.sync();
// assert that the streams tracked in the state are identical to the subscribed streams
Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
assertEquals(fakeStreams.size(), streamsInState.size());
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
for (Map.Entry<String, String> streamToLastSeenShard : subscribedStreamsToLastSeenShardIdsUnderTest.entrySet()) {
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(streamToShardCount.get(streamToLastSeenShard.getKey()) - 1),
streamToLastSeenShard.getValue());
}
}
@Test
public void testStreamToLastSeenShardStateIsCorrectlySetWhenNoNewShardsSinceRestoredCheckpointAndSomeStreamsDoNotExist() throws Exception {
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
Map<String, Integer> streamToShardCount = new HashMap<>();
streamToShardCount.put("fakeStream1", 3); // fakeStream1 has fixed 3 shards
streamToShardCount.put("fakeStream2", 2); // fakeStream2 has fixed 2 shards
streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
// using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore
final TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
fakeStreams,
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
CheckedThread runFetcherThread = new CheckedThread() {
@Override
public void go() throws Exception {
fetcher.runFetcher();
}
};
runFetcherThread.start();
fetcher.waitUntilInitialDiscovery();
fetcher.shutdownFetcher();
runFetcherThread.sync();
// assert that the streams tracked in the state are identical to the subscribed streams
Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
assertEquals(fakeStreams.size(), streamsInState.size());
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(2),
subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1"));
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(1),
subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2"));
assertNull(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3"));
assertNull(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4"));
}
@Test
public void testStreamToLastSeenShardStateIsCorrectlySetWhenNewShardsFoundSinceRestoredCheckpointAndSomeStreamsDoNotExist() throws Exception {
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
Map<String, Integer> streamToShardCount = new HashMap<>();
streamToShardCount.put("fakeStream1", 3 + 1); // fakeStream1 had 3 shards before & 1 new shard after restore
streamToShardCount.put("fakeStream2", 2 + 3); // fakeStream2 had 2 shards before & 2 new shard after restore
streamToShardCount.put("fakeStream3", 0); // no shards can be found for fakeStream3
streamToShardCount.put("fakeStream4", 0); // no shards can be found for fakeStream4
HashMap<String, String> subscribedStreamsToLastSeenShardIdsUnderTest =
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams);
// using a non-resharded streams kinesis behaviour to represent that Kinesis is not resharded AFTER the restore
final TestableKinesisDataFetcher<String> fetcher =
new TestableKinesisDataFetcher<>(
fakeStreams,
new TestSourceContext<>(),
TestUtils.getStandardProperties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
10,
2,
new AtomicReference<>(),
new LinkedList<>(),
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredState.getKey()),
restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
CheckedThread runFetcherThread = new CheckedThread() {
@Override
public void go() throws Exception {
fetcher.runFetcher();
}
};
runFetcherThread.start();
fetcher.waitUntilInitialDiscovery();
fetcher.shutdownFetcher();
runFetcherThread.sync();
// assert that the streams tracked in the state are identical to the subscribed streams
Set<String> streamsInState = subscribedStreamsToLastSeenShardIdsUnderTest.keySet();
assertEquals(fakeStreams.size(), streamsInState.size());
assertTrue(streamsInState.containsAll(fakeStreams));
// assert that the last seen shards in state is correctly set
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(3),
subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream1"));
assertEquals(
KinesisShardIdGenerator.generateFromShardOrder(4),
subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream2"));
assertNull(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream3"));
assertNull(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4"));
}
@Test
public void testStreamShardMetadataAndHandleConversion() {
String streamName = "fakeStream1";
String shardId = "shard-000001";
String parentShardId = "shard-000002";
String adjacentParentShardId = "shard-000003";
String startingHashKey = "key-000001";
String endingHashKey = "key-000010";
String startingSequenceNumber = "seq-0000021";
String endingSequenceNumber = "seq-00000031";
StreamShardMetadata kinesisStreamShard = new StreamShardMetadata();
kinesisStreamShard.setStreamName(streamName);
kinesisStreamShard.setShardId(shardId);
kinesisStreamShard.setParentShardId(parentShardId);
kinesisStreamShard.setAdjacentParentShardId(adjacentParentShardId);
kinesisStreamShard.setStartingHashKey(startingHashKey);
kinesisStreamShard.setEndingHashKey(endingHashKey);
kinesisStreamShard.setStartingSequenceNumber(startingSequenceNumber);
kinesisStreamShard.setEndingSequenceNumber(endingSequenceNumber);
Shard shard = new Shard()
.withShardId(shardId)
.withParentShardId(parentShardId)
.withAdjacentParentShardId(adjacentParentShardId)
.withHashKeyRange(new HashKeyRange()
.withStartingHashKey(startingHashKey)
.withEndingHashKey(endingHashKey))
.withSequenceNumberRange(new SequenceNumberRange()
.withStartingSequenceNumber(startingSequenceNumber)
.withEndingSequenceNumber(endingSequenceNumber));
StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
assertEquals(kinesisStreamShard, KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle));
assertEquals(streamShardHandle, KinesisDataFetcher.convertToStreamShardHandle(kinesisStreamShard));
}
private static class DummyFlinkKinesisConsumer<T> extends FlinkKinesisConsumer<T> {
private static final long serialVersionUID = 1L;
private final KinesisDataFetcher<T> fetcher;
private final int numParallelSubtasks;
private final int subtaskIndex;
@SuppressWarnings("unchecked")
DummyFlinkKinesisConsumer(
Properties properties,
KinesisDataFetcher<T> fetcher,
int numParallelSubtasks,
int subtaskIndex) {
super("test", mock(KinesisDeserializationSchema.class), properties);
this.fetcher = fetcher;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
}
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return fetcher;
}
@Override
public RuntimeContext getRuntimeContext() {
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(subtaskIndex);
when(context.getNumberOfParallelSubtasks()).thenReturn(numParallelSubtasks);
return context;
}
}
// ----------------------------------------------------------------------
// Tests shard distribution with custom hash function
// ----------------------------------------------------------------------
@Test
public void testShardToSubtaskMappingWithCustomHashFunction() throws Exception {
int totalCountOfSubtasks = 10;
int shardCount = 3;
for (int i = 0; i < 2; i++) {
final int hash = i;
final KinesisShardAssigner allShardsSingleSubtaskFn = (shard, subtasks) -> hash;
Map<String, Integer> streamToShardCount = new HashMap<>();
List<String> fakeStreams = new LinkedList<>();
fakeStreams.add("fakeStream");
streamToShardCount.put("fakeStream", shardCount);
for (int j = 0; j < totalCountOfSubtasks; j++) {
int subtaskIndex = j;
// subscribe with default hashing
final TestableKinesisDataFetcher fetcher =
new TestableKinesisDataFetcher(
fakeStreams,
new TestSourceContext<>(),
new Properties(),
new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
totalCountOfSubtasks,
subtaskIndex,
new AtomicReference<>(),
new LinkedList<>(),
KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(fakeStreams),
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
Whitebox.setInternalState(fetcher, "shardAssigner", allShardsSingleSubtaskFn); // override hashing
List<StreamShardHandle> shards = fetcher.discoverNewShardsToSubscribe();
fetcher.shutdownFetcher();
String msg = String.format("for hash=%d, subtask=%d", hash, subtaskIndex);
if (j == i) {
assertEquals(msg, shardCount, shards.size());
} else {
assertEquals(msg, 0, shards.size());
}
}
}
}
@Test
public void testIsThisSubtaskShouldSubscribeTo() {
assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(0, 2, 0));
assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(1, 2, 0));
assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 0));
assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(0, 2, 1));
assertTrue(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(1, 2, 1));
assertFalse(KinesisDataFetcher.isThisSubtaskShouldSubscribeTo(2, 2, 1));
}
}