blob: 46cc797ff7d3777dc2da8d888005afef4dd25d0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.util.Preconditions;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface}
* interface.
*/
public class FakeKinesisBehavioursFactory {
// ------------------------------------------------------------------------
// Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest
// ------------------------------------------------------------------------
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
return new KinesisProxyInterface() {
@Override
public GetShardListResult getShardList(
Map<String, String> streamNamesWithLastSeenShardIds) {
return new GetShardListResult(); // not setting any retrieved shards for result
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
return null;
}
};
}
public static KinesisProxyInterface nonReshardedStreamsBehaviour(
Map<String, Integer> streamsToShardCount) {
return new NonReshardedStreamsKinesis(streamsToShardCount);
}
// ------------------------------------------------------------------------
// Behaviours related to fetching records, used mainly in ShardConsumerTest
// ------------------------------------------------------------------------
public static KinesisProxyInterface emptyShard(final int numberOfIterations) {
return new SingleShardEmittingZeroRecords(numberOfIterations);
}
public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(
final int numOfRecords, final int numOfGetRecordsCalls, final long millisBehindLatest) {
return new SingleShardEmittingFixNumOfRecordsKinesis(
numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
}
public static KinesisProxyInterface
totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
final int numOfRecords,
final int numOfGetRecordsCall,
final int orderOfCallToExpire,
final long millisBehindLatest) {
return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
numOfRecords, numOfGetRecordsCall, orderOfCallToExpire, millisBehindLatest);
}
public static KinesisProxyInterface
initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads(
final int numOfRecords,
final int numOfGetRecordsCalls,
final long millisBehindLatest) {
return new SingleShardEmittingAdaptiveNumOfRecordsKinesis(
numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
}
/**
* Creates a mocked Kinesis Proxy that will Emit aggregated records from a fake stream: - There
* will be {@code numOfGetRecordsCalls} batches available in the stream - Each batch will
* contain {@code numOfAggregatedRecords} aggregated records - Each aggregated record will
* contain {@code numOfChildRecords} child records Therefore this class will emit a total of
* {@code numOfGetRecordsCalls * numOfAggregatedRecords * numOfChildRecords} records.
*
* @param numOfAggregatedRecords the number of records per batch
* @param numOfChildRecords the number of child records in each aggregated record
* @param numOfGetRecordsCalls the number batches available in the fake stream
*/
public static KinesisProxyInterface aggregatedRecords(
final int numOfAggregatedRecords,
final int numOfChildRecords,
final int numOfGetRecordsCalls) {
return new SingleShardEmittingAggregatedRecordsKinesis(
numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls);
}
public static KinesisProxyInterface blockingQueueGetRecords(
Map<String, List<BlockingQueue<String>>> streamsToShardQueues) {
return new BlockingQueueKinesis(streamsToShardQueues);
}
private static class SingleShardEmittingZeroRecords implements KinesisProxyInterface {
private int remainingIterators;
private SingleShardEmittingZeroRecords(int remainingIterators) {
this.remainingIterators = remainingIterators;
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker)
throws InterruptedException {
return String.valueOf(remainingIterators--);
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet)
throws InterruptedException {
return new GetRecordsResult()
.withMillisBehindLatest(0L)
.withNextShardIterator(
remainingIterators == 0 ? null : String.valueOf(remainingIterators--));
}
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds)
throws InterruptedException {
return null;
}
}
private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis
extends SingleShardEmittingFixNumOfRecordsKinesis {
private final long millisBehindLatest;
private final int orderOfCallToExpire;
private boolean expiredOnceAlready = false;
private boolean expiredIteratorRefreshed = false;
public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
final int numOfRecords,
final int numOfGetRecordsCalls,
final int orderOfCallToExpire,
final long millisBehindLatest) {
super(numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
checkArgument(
orderOfCallToExpire <= numOfGetRecordsCalls,
"can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
this.millisBehindLatest = millisBehindLatest;
this.orderOfCallToExpire = orderOfCallToExpire;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
if ((Integer.parseInt(shardIterator) == orderOfCallToExpire - 1)
&& !expiredOnceAlready) {
// we fake only once the expired iterator exception at the specified get records
// attempt order
expiredOnceAlready = true;
throw new ExpiredIteratorException("Artificial expired shard iterator");
} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
// if we've thrown the expired iterator exception already, but the iterator was not
// refreshed,
// throw a hard exception to the test that is testing this Kinesis behaviour
throw new RuntimeException(
"expired shard iterator was not refreshed on the next getRecords() call");
} else {
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withMillisBehindLatest(millisBehindLatest)
.withNextShardIterator(
(Integer.parseInt(shardIterator) == totalNumOfGetRecordsCalls - 1)
? null
: String.valueOf(
Integer.parseInt(shardIterator)
+ 1)); // last next shard iterator is null
}
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
if (!expiredOnceAlready) {
// for the first call, just return the iterator of the first batch of records
return "0";
} else {
// fake the iterator refresh when this is called again after getRecords throws
// expired iterator
// exception on the orderOfCallToExpire attempt
expiredIteratorRefreshed = true;
return String.valueOf(orderOfCallToExpire - 1);
}
}
}
private static class SingleShardEmittingFixNumOfRecordsKinesis
implements KinesisProxyInterface {
protected final int totalNumOfGetRecordsCalls;
protected final int totalNumOfRecords;
private final long millisBehindLatest;
protected final Map<String, List<Record>> shardItrToRecordBatch;
public SingleShardEmittingFixNumOfRecordsKinesis(
final int numOfRecords,
final int numOfGetRecordsCalls,
final long millistBehindLatest) {
this.totalNumOfRecords = numOfRecords;
this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
this.millisBehindLatest = millistBehindLatest;
// initialize the record batches that we will be fetched
this.shardItrToRecordBatch = new HashMap<>();
int numOfAlreadyPartitionedRecords = 0;
int numOfRecordsPerBatch = numOfRecords / numOfGetRecordsCalls + 1;
for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) {
if (batch != totalNumOfGetRecordsCalls - 1) {
shardItrToRecordBatch.put(
String.valueOf(batch),
createRecordBatchWithRange(
numOfAlreadyPartitionedRecords,
numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
} else {
shardItrToRecordBatch.put(
String.valueOf(batch),
createRecordBatchWithRange(
numOfAlreadyPartitionedRecords, totalNumOfRecords));
}
}
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withMillisBehindLatest(millisBehindLatest)
.withNextShardIterator(
(Integer.parseInt(shardIterator) == totalNumOfGetRecordsCalls - 1)
? null
: String.valueOf(
Integer.parseInt(shardIterator)
+ 1)); // last next shard iterator is null
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
// this will be called only one time per ShardConsumer;
// so, simply return the iterator of the first batch of records
return "0";
}
@Override
public GetShardListResult getShardList(
Map<String, String> streamNamesWithLastSeenShardIds) {
return null;
}
public static List<Record> createRecordBatchWithRange(int min, int max) {
List<Record> batch = new LinkedList<>();
for (int i = min; i < max; i++) {
batch.add(
new Record()
.withData(
ByteBuffer.wrap(
String.valueOf(i)
.getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(
new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(i)));
}
return batch;
}
}
private static class SingleShardEmittingAdaptiveNumOfRecordsKinesis
extends SingleShardEmittingKinesis {
protected static long averageRecordSizeBytes = 0L;
private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L;
public SingleShardEmittingAdaptiveNumOfRecordsKinesis(
final int numOfRecords,
final int numOfGetRecordsCalls,
final long millisBehindLatest) {
super(
initShardItrToRecordBatch(numOfRecords, numOfGetRecordsCalls),
millisBehindLatest);
}
private static Map<String, List<Record>> initShardItrToRecordBatch(
final int numOfRecords, final int numOfGetRecordsCalls) {
// initialize the record batches that we will be fetched
Map<String, List<Record>> shardItrToRecordBatch = new HashMap<>();
int numOfAlreadyPartitionedRecords = 0;
int numOfRecordsPerBatch = numOfRecords;
for (int batch = 0; batch < numOfGetRecordsCalls; batch++) {
shardItrToRecordBatch.put(
String.valueOf(batch),
createRecordBatchWithRange(
numOfAlreadyPartitionedRecords,
numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
numOfRecordsPerBatch =
(int)
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT
/ (averageRecordSizeBytes
* 1000L
/ ConsumerConfigConstants
.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS));
}
return shardItrToRecordBatch;
}
private static List<Record> createRecordBatchWithRange(int min, int max) {
List<Record> batch = new LinkedList<>();
long sumRecordBatchBytes = 0L;
// Create record of size 10Kb
String data = createDataSize(10 * 1024L);
for (int i = min; i < max; i++) {
Record record =
new Record()
.withData(
ByteBuffer.wrap(
data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(
new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(i));
batch.add(record);
sumRecordBatchBytes += record.getData().remaining();
}
if (batch.size() != 0) {
averageRecordSizeBytes = sumRecordBatchBytes / batch.size();
}
return batch;
}
private static String createDataSize(final long msgSize) {
char[] data = new char[(int) msgSize];
return new String(data);
}
}
private static class SingleShardEmittingAggregatedRecordsKinesis
extends SingleShardEmittingKinesis {
public SingleShardEmittingAggregatedRecordsKinesis(
final int numOfAggregatedRecords,
final int numOfChildRecords,
final int numOfGetRecordsCalls) {
super(
initShardItrToRecordBatch(
numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls));
}
private static Map<String, List<Record>> initShardItrToRecordBatch(
final int numOfAggregatedRecords,
final int numOfChildRecords,
final int numOfGetRecordsCalls) {
Map<String, List<Record>> shardToRecordBatch = new HashMap<>();
AtomicInteger sequenceNumber = new AtomicInteger();
for (int batch = 0; batch < numOfGetRecordsCalls; batch++) {
List<Record> recordBatch =
TestUtils.createAggregatedRecordBatch(
numOfAggregatedRecords, numOfChildRecords, sequenceNumber);
shardToRecordBatch.put(String.valueOf(batch), recordBatch);
}
return shardToRecordBatch;
}
}
/** A helper base class used to emit records from a single sharded fake Kinesis Stream. */
private abstract static class SingleShardEmittingKinesis implements KinesisProxyInterface {
private final long millisBehindLatest;
private final Map<String, List<Record>> shardItrToRecordBatch;
protected SingleShardEmittingKinesis(
final Map<String, List<Record>> shardItrToRecordBatch) {
this(shardItrToRecordBatch, 0L);
}
protected SingleShardEmittingKinesis(
final Map<String, List<Record>> shardItrToRecordBatch,
final long millisBehindLatest) {
this.millisBehindLatest = millisBehindLatest;
this.shardItrToRecordBatch = shardItrToRecordBatch;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
int index = Integer.parseInt(shardIterator);
// last next shard iterator is null
String nextShardIterator =
(index == shardItrToRecordBatch.size() - 1) ? null : String.valueOf(index + 1);
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withNextShardIterator(nextShardIterator)
.withMillisBehindLatest(millisBehindLatest);
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
// this will be called only one time per ShardConsumer;
// so, simply return the iterator of the first batch of records
return "0";
}
@Override
public GetShardListResult getShardList(
Map<String, String> streamNamesWithLastSeenShardIds) {
return null;
}
}
private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
private final Map<String, List<StreamShardHandle>> streamsWithListOfShards =
new HashMap<>();
public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) {
for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) {
String streamName = streamToShardCount.getKey();
int shardCount = streamToShardCount.getValue();
if (shardCount == 0) {
// don't do anything
} else {
List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shardsOfStream.add(
createDummyStreamShardHandle(
streamName,
KinesisShardIdGenerator.generateFromShardOrder(i)));
}
streamsWithListOfShards.put(streamName, shardsOfStream);
}
}
}
@Override
public GetShardListResult getShardList(
Map<String, String> streamNamesWithLastSeenShardIds) {
GetShardListResult result = new GetShardListResult();
for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards :
streamsWithListOfShards.entrySet()) {
String streamName = streamsWithShards.getKey();
for (StreamShardHandle shard : streamsWithShards.getValue()) {
if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
result.addRetrievedShardToStream(streamName, shard);
} else {
if (compareShardIds(
shard.getShard().getShardId(),
streamNamesWithLastSeenShardIds.get(streamName))
> 0) {
result.addRetrievedShardToStream(streamName, shard);
}
}
}
}
return result;
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
return null;
}
/**
* Utility function to compare two shard ids.
*
* @param firstShardId first shard id to compare
* @param secondShardId second shard id to compare
* @return a value less than 0 if the first shard id is smaller than the second shard id, or
* a value larger than 0 the first shard is larger than the second shard id, or 0 if
* they are equal
*/
private static int compareShardIds(String firstShardId, String secondShardId) {
if (!isValidShardId(firstShardId)) {
throw new IllegalArgumentException("The first shard id has invalid format.");
}
if (!isValidShardId(secondShardId)) {
throw new IllegalArgumentException("The second shard id has invalid format.");
}
// digit segment of the shard id starts at index 8
return Long.compare(
Long.parseLong(firstShardId.substring(8)),
Long.parseLong(secondShardId.substring(8)));
}
/**
* Checks if a shard id has valid format. Kinesis stream shard ids have 12-digit numbers
* left-padded with 0's, prefixed with "shardId-", ex. "shardId-000000000015".
*
* @param shardId the shard id to check
* @return whether the shard id is valid
*/
private static boolean isValidShardId(String shardId) {
if (shardId == null) {
return false;
}
return shardId.matches("^shardId-\\d{12}");
}
}
private static class BlockingQueueKinesis implements KinesisProxyInterface {
private final Map<String, List<StreamShardHandle>> streamsWithListOfShards =
new HashMap<>();
private final Map<String, BlockingQueue<String>> shardIteratorToQueueMap = new HashMap<>();
private static String getShardIterator(StreamShardHandle shardHandle) {
return shardHandle.getStreamName() + "-" + shardHandle.getShard().getShardId();
}
public BlockingQueueKinesis(Map<String, List<BlockingQueue<String>>> streamsToShardCount) {
for (Map.Entry<String, List<BlockingQueue<String>>> streamToShardQueues :
streamsToShardCount.entrySet()) {
String streamName = streamToShardQueues.getKey();
int shardCount = streamToShardQueues.getValue().size();
if (shardCount == 0) {
// don't do anything
} else {
List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
for (int i = 0; i < shardCount; i++) {
StreamShardHandle shardHandle =
new StreamShardHandle(
streamName,
new Shard()
.withShardId(
KinesisShardIdGenerator
.generateFromShardOrder(i))
.withSequenceNumberRange(
new SequenceNumberRange()
.withStartingSequenceNumber("0"))
.withHashKeyRange(
new HashKeyRange()
.withStartingHashKey("0")
.withEndingHashKey("0")));
shardsOfStream.add(shardHandle);
shardIteratorToQueueMap.put(
getShardIterator(shardHandle),
streamToShardQueues.getValue().get(i));
}
streamsWithListOfShards.put(streamName, shardsOfStream);
}
}
}
@Override
public GetShardListResult getShardList(
Map<String, String> streamNamesWithLastSeenShardIds) {
GetShardListResult result = new GetShardListResult();
for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards :
streamsWithListOfShards.entrySet()) {
String streamName = streamsWithShards.getKey();
for (StreamShardHandle shard : streamsWithShards.getValue()) {
if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
result.addRetrievedShardToStream(streamName, shard);
} else {
if (StreamShardHandle.compareShardIds(
shard.getShard().getShardId(),
streamNamesWithLastSeenShardIds.get(streamName))
> 0) {
result.addRetrievedShardToStream(streamName, shard);
}
}
}
}
return result;
}
@Override
public String getShardIterator(
StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return getShardIterator(shard);
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
BlockingQueue<String> queue =
Preconditions.checkNotNull(
this.shardIteratorToQueueMap.get(shardIterator),
"no queue for iterator %s",
shardIterator);
List<Record> records = Collections.emptyList();
try {
String data = queue.poll(100, TimeUnit.MILLISECONDS);
if (data != null) {
Record record =
new Record()
.withData(
ByteBuffer.wrap(
data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(
new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(0));
records = Collections.singletonList(record);
}
} catch (InterruptedException e) {
shardIterator = null;
}
return new GetRecordsResult()
.withRecords(records)
.withMillisBehindLatest(0L)
.withNextShardIterator(shardIterator);
}
}
}