blob: e40362337204a5d03eaed547d4a699c8b2a13036 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface} interface.
*/
public class FakeKinesisBehavioursFactory {
// ------------------------------------------------------------------------
// Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest
// ------------------------------------------------------------------------
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
return new KinesisProxyInterface() {
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
return new GetShardListResult(); // not setting any retrieved shards for result
}
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
return null;
}
};
}
public static KinesisProxyInterface nonReshardedStreamsBehaviour(Map<String, Integer> streamsToShardCount) {
return new NonReshardedStreamsKinesis(streamsToShardCount);
}
// ------------------------------------------------------------------------
// Behaviours related to fetching records, used mainly in ShardConsumerTest
// ------------------------------------------------------------------------
public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(
final int numOfRecords,
final int numOfGetRecordsCalls,
final long millisBehindLatest) {
return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
}
public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
final int numOfRecords,
final int numOfGetRecordsCall,
final int orderOfCallToExpire,
final long millisBehindLatest) {
return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
numOfRecords, numOfGetRecordsCall, orderOfCallToExpire, millisBehindLatest);
}
private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
private final long millisBehindLatest;
private final int orderOfCallToExpire;
private boolean expiredOnceAlready = false;
private boolean expiredIteratorRefreshed = false;
public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
final int numOfRecords,
final int numOfGetRecordsCalls,
final int orderOfCallToExpire,
final long millisBehindLatest) {
super(numOfRecords, numOfGetRecordsCalls, millisBehindLatest);
checkArgument(orderOfCallToExpire <= numOfGetRecordsCalls,
"can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
this.millisBehindLatest = millisBehindLatest;
this.orderOfCallToExpire = orderOfCallToExpire;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) {
// we fake only once the expired iterator exception at the specified get records attempt order
expiredOnceAlready = true;
throw new ExpiredIteratorException("Artificial expired shard iterator");
} else if (expiredOnceAlready && !expiredIteratorRefreshed) {
// if we've thrown the expired iterator exception already, but the iterator was not refreshed,
// throw a hard exception to the test that is testing this Kinesis behaviour
throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
} else {
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withMillisBehindLatest(millisBehindLatest)
.withNextShardIterator(
(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
}
}
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
if (!expiredOnceAlready) {
// for the first call, just return the iterator of the first batch of records
return "0";
} else {
// fake the iterator refresh when this is called again after getRecords throws expired iterator
// exception on the orderOfCallToExpire attempt
expiredIteratorRefreshed = true;
return String.valueOf(orderOfCallToExpire - 1);
}
}
}
private static class SingleShardEmittingFixNumOfRecordsKinesis implements KinesisProxyInterface {
protected final int totalNumOfGetRecordsCalls;
protected final int totalNumOfRecords;
private final long millisBehindLatest;
protected final Map<String, List<Record>> shardItrToRecordBatch;
public SingleShardEmittingFixNumOfRecordsKinesis(
final int numOfRecords,
final int numOfGetRecordsCalls,
final long millistBehindLatest) {
this.totalNumOfRecords = numOfRecords;
this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls;
this.millisBehindLatest = millistBehindLatest;
// initialize the record batches that we will be fetched
this.shardItrToRecordBatch = new HashMap<>();
int numOfAlreadyPartitionedRecords = 0;
int numOfRecordsPerBatch = numOfRecords / numOfGetRecordsCalls + 1;
for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) {
if (batch != totalNumOfGetRecordsCalls - 1) {
shardItrToRecordBatch.put(
String.valueOf(batch),
createRecordBatchWithRange(
numOfAlreadyPartitionedRecords,
numOfAlreadyPartitionedRecords + numOfRecordsPerBatch));
numOfAlreadyPartitionedRecords += numOfRecordsPerBatch;
} else {
shardItrToRecordBatch.put(
String.valueOf(batch),
createRecordBatchWithRange(
numOfAlreadyPartitionedRecords,
totalNumOfRecords));
}
}
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
// assuming that the maxRecordsToGet is always large enough
return new GetRecordsResult()
.withRecords(shardItrToRecordBatch.get(shardIterator))
.withMillisBehindLatest(millisBehindLatest)
.withNextShardIterator(
(Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
}
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
// this will be called only one time per ShardConsumer;
// so, simply return the iterator of the first batch of records
return "0";
}
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
return null;
}
public static List<Record> createRecordBatchWithRange(int min, int max) {
List<Record> batch = new LinkedList<>();
for (int i = min; i < max; i++) {
batch.add(
new Record()
.withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET)))
.withPartitionKey(UUID.randomUUID().toString())
.withApproximateArrivalTimestamp(new Date(System.currentTimeMillis()))
.withSequenceNumber(String.valueOf(i)));
}
return batch;
}
}
private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) {
for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) {
String streamName = streamToShardCount.getKey();
int shardCount = streamToShardCount.getValue();
if (shardCount == 0) {
// don't do anything
} else {
List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
for (int i = 0; i < shardCount; i++) {
shardsOfStream.add(
new StreamShardHandle(
streamName,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
}
streamsWithListOfShards.put(streamName, shardsOfStream);
}
}
}
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
GetShardListResult result = new GetShardListResult();
for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : streamsWithListOfShards.entrySet()) {
String streamName = streamsWithShards.getKey();
for (StreamShardHandle shard : streamsWithShards.getValue()) {
if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
result.addRetrievedShardToStream(streamName, shard);
} else {
if (StreamShardHandle.compareShardIds(
shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) > 0) {
result.addRetrievedShardToStream(streamName, shard);
}
}
}
}
return result;
}
@Override
public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
return null;
}
}
}