| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kinesis.testutils; |
| |
| import org.apache.flink.configuration.ConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; |
| import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; |
| import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; |
| import org.apache.flink.util.Preconditions; |
| |
| import com.amazonaws.services.kinesis.model.ExpiredIteratorException; |
| import com.amazonaws.services.kinesis.model.GetRecordsResult; |
| import com.amazonaws.services.kinesis.model.HashKeyRange; |
| import com.amazonaws.services.kinesis.model.Record; |
| import com.amazonaws.services.kinesis.model.SequenceNumberRange; |
| import com.amazonaws.services.kinesis.model.Shard; |
| |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle; |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| |
| /** |
| * Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface} |
| * interface. |
| */ |
| public class FakeKinesisBehavioursFactory { |
| |
| // ------------------------------------------------------------------------ |
| // Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest |
| // ------------------------------------------------------------------------ |
| |
| public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() { |
| |
| return new KinesisProxyInterface() { |
| @Override |
| public GetShardListResult getShardList( |
| Map<String, String> streamNamesWithLastSeenShardIds) { |
| return new GetShardListResult(); // not setting any retrieved shards for result |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) { |
| return null; |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { |
| return null; |
| } |
| }; |
| } |
| |
| public static KinesisProxyInterface nonReshardedStreamsBehaviour( |
| Map<String, Integer> streamsToShardCount) { |
| return new NonReshardedStreamsKinesis(streamsToShardCount); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Behaviours related to fetching records, used mainly in ShardConsumerTest |
| // ------------------------------------------------------------------------ |
| |
| public static KinesisProxyInterface emptyShard(final int numberOfIterations) { |
| return new SingleShardEmittingZeroRecords(numberOfIterations); |
| } |
| |
| public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls( |
| final int numOfRecords, final int numOfGetRecordsCalls, final long millisBehindLatest) { |
| return new SingleShardEmittingFixNumOfRecordsKinesis( |
| numOfRecords, numOfGetRecordsCalls, millisBehindLatest); |
| } |
| |
| public static KinesisProxyInterface |
| totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator( |
| final int numOfRecords, |
| final int numOfGetRecordsCall, |
| final int orderOfCallToExpire, |
| final long millisBehindLatest) { |
| return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis( |
| numOfRecords, numOfGetRecordsCall, orderOfCallToExpire, millisBehindLatest); |
| } |
| |
| public static KinesisProxyInterface |
| initialNumOfRecordsAfterNumOfGetRecordsCallsWithAdaptiveReads( |
| final int numOfRecords, |
| final int numOfGetRecordsCalls, |
| final long millisBehindLatest) { |
| return new SingleShardEmittingAdaptiveNumOfRecordsKinesis( |
| numOfRecords, numOfGetRecordsCalls, millisBehindLatest); |
| } |
| |
| /** |
| * Creates a mocked Kinesis Proxy that will Emit aggregated records from a fake stream: - There |
| * will be {@code numOfGetRecordsCalls} batches available in the stream - Each batch will |
| * contain {@code numOfAggregatedRecords} aggregated records - Each aggregated record will |
| * contain {@code numOfChildRecords} child records Therefore this class will emit a total of |
| * {@code numOfGetRecordsCalls * numOfAggregatedRecords * numOfChildRecords} records. |
| * |
| * @param numOfAggregatedRecords the number of records per batch |
| * @param numOfChildRecords the number of child records in each aggregated record |
| * @param numOfGetRecordsCalls the number batches available in the fake stream |
| */ |
| public static KinesisProxyInterface aggregatedRecords( |
| final int numOfAggregatedRecords, |
| final int numOfChildRecords, |
| final int numOfGetRecordsCalls) { |
| return new SingleShardEmittingAggregatedRecordsKinesis( |
| numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls); |
| } |
| |
| public static KinesisProxyInterface blockingQueueGetRecords( |
| Map<String, List<BlockingQueue<String>>> streamsToShardQueues) { |
| return new BlockingQueueKinesis(streamsToShardQueues); |
| } |
| |
| private static class SingleShardEmittingZeroRecords implements KinesisProxyInterface { |
| |
| private int remainingIterators; |
| |
| private SingleShardEmittingZeroRecords(int remainingIterators) { |
| this.remainingIterators = remainingIterators; |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) |
| throws InterruptedException { |
| return String.valueOf(remainingIterators--); |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) |
| throws InterruptedException { |
| return new GetRecordsResult() |
| .withMillisBehindLatest(0L) |
| .withNextShardIterator( |
| remainingIterators == 0 ? null : String.valueOf(remainingIterators--)); |
| } |
| |
| @Override |
| public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) |
| throws InterruptedException { |
| return null; |
| } |
| } |
| |
| private static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis |
| extends SingleShardEmittingFixNumOfRecordsKinesis { |
| |
| private final long millisBehindLatest; |
| private final int orderOfCallToExpire; |
| |
| private boolean expiredOnceAlready = false; |
| private boolean expiredIteratorRefreshed = false; |
| |
| public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis( |
| final int numOfRecords, |
| final int numOfGetRecordsCalls, |
| final int orderOfCallToExpire, |
| final long millisBehindLatest) { |
| super(numOfRecords, numOfGetRecordsCalls, millisBehindLatest); |
| checkArgument( |
| orderOfCallToExpire <= numOfGetRecordsCalls, |
| "can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls"); |
| this.millisBehindLatest = millisBehindLatest; |
| this.orderOfCallToExpire = orderOfCallToExpire; |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { |
| if ((Integer.parseInt(shardIterator) == orderOfCallToExpire - 1) |
| && !expiredOnceAlready) { |
| // we fake only once the expired iterator exception at the specified get records |
| // attempt order |
| expiredOnceAlready = true; |
| throw new ExpiredIteratorException("Artificial expired shard iterator"); |
| } else if (expiredOnceAlready && !expiredIteratorRefreshed) { |
| // if we've thrown the expired iterator exception already, but the iterator was not |
| // refreshed, |
| // throw a hard exception to the test that is testing this Kinesis behaviour |
| throw new RuntimeException( |
| "expired shard iterator was not refreshed on the next getRecords() call"); |
| } else { |
| // assuming that the maxRecordsToGet is always large enough |
| return new GetRecordsResult() |
| .withRecords(shardItrToRecordBatch.get(shardIterator)) |
| .withMillisBehindLatest(millisBehindLatest) |
| .withNextShardIterator( |
| (Integer.parseInt(shardIterator) == totalNumOfGetRecordsCalls - 1) |
| ? null |
| : String.valueOf( |
| Integer.parseInt(shardIterator) |
| + 1)); // last next shard iterator is null |
| } |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) { |
| if (!expiredOnceAlready) { |
| // for the first call, just return the iterator of the first batch of records |
| return "0"; |
| } else { |
| // fake the iterator refresh when this is called again after getRecords throws |
| // expired iterator |
| // exception on the orderOfCallToExpire attempt |
| expiredIteratorRefreshed = true; |
| return String.valueOf(orderOfCallToExpire - 1); |
| } |
| } |
| } |
| |
| private static class SingleShardEmittingFixNumOfRecordsKinesis |
| implements KinesisProxyInterface { |
| |
| protected final int totalNumOfGetRecordsCalls; |
| |
| protected final int totalNumOfRecords; |
| |
| private final long millisBehindLatest; |
| |
| protected final Map<String, List<Record>> shardItrToRecordBatch; |
| |
| public SingleShardEmittingFixNumOfRecordsKinesis( |
| final int numOfRecords, |
| final int numOfGetRecordsCalls, |
| final long millistBehindLatest) { |
| this.totalNumOfRecords = numOfRecords; |
| this.totalNumOfGetRecordsCalls = numOfGetRecordsCalls; |
| this.millisBehindLatest = millistBehindLatest; |
| |
| // initialize the record batches that we will be fetched |
| this.shardItrToRecordBatch = new HashMap<>(); |
| |
| int numOfAlreadyPartitionedRecords = 0; |
| int numOfRecordsPerBatch = numOfRecords / numOfGetRecordsCalls + 1; |
| for (int batch = 0; batch < totalNumOfGetRecordsCalls; batch++) { |
| if (batch != totalNumOfGetRecordsCalls - 1) { |
| shardItrToRecordBatch.put( |
| String.valueOf(batch), |
| createRecordBatchWithRange( |
| numOfAlreadyPartitionedRecords, |
| numOfAlreadyPartitionedRecords + numOfRecordsPerBatch)); |
| numOfAlreadyPartitionedRecords += numOfRecordsPerBatch; |
| } else { |
| shardItrToRecordBatch.put( |
| String.valueOf(batch), |
| createRecordBatchWithRange( |
| numOfAlreadyPartitionedRecords, totalNumOfRecords)); |
| } |
| } |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { |
| // assuming that the maxRecordsToGet is always large enough |
| return new GetRecordsResult() |
| .withRecords(shardItrToRecordBatch.get(shardIterator)) |
| .withMillisBehindLatest(millisBehindLatest) |
| .withNextShardIterator( |
| (Integer.parseInt(shardIterator) == totalNumOfGetRecordsCalls - 1) |
| ? null |
| : String.valueOf( |
| Integer.parseInt(shardIterator) |
| + 1)); // last next shard iterator is null |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) { |
| // this will be called only one time per ShardConsumer; |
| // so, simply return the iterator of the first batch of records |
| return "0"; |
| } |
| |
| @Override |
| public GetShardListResult getShardList( |
| Map<String, String> streamNamesWithLastSeenShardIds) { |
| return null; |
| } |
| |
| public static List<Record> createRecordBatchWithRange(int min, int max) { |
| List<Record> batch = new LinkedList<>(); |
| for (int i = min; i < max; i++) { |
| batch.add( |
| new Record() |
| .withData( |
| ByteBuffer.wrap( |
| String.valueOf(i) |
| .getBytes(ConfigConstants.DEFAULT_CHARSET))) |
| .withPartitionKey(UUID.randomUUID().toString()) |
| .withApproximateArrivalTimestamp( |
| new Date(System.currentTimeMillis())) |
| .withSequenceNumber(String.valueOf(i))); |
| } |
| return batch; |
| } |
| } |
| |
| private static class SingleShardEmittingAdaptiveNumOfRecordsKinesis |
| extends SingleShardEmittingKinesis { |
| |
| protected static long averageRecordSizeBytes = 0L; |
| |
| private static final long KINESIS_SHARD_BYTES_PER_SECOND_LIMIT = 2 * 1024L * 1024L; |
| |
| public SingleShardEmittingAdaptiveNumOfRecordsKinesis( |
| final int numOfRecords, |
| final int numOfGetRecordsCalls, |
| final long millisBehindLatest) { |
| super( |
| initShardItrToRecordBatch(numOfRecords, numOfGetRecordsCalls), |
| millisBehindLatest); |
| } |
| |
| private static Map<String, List<Record>> initShardItrToRecordBatch( |
| final int numOfRecords, final int numOfGetRecordsCalls) { |
| // initialize the record batches that we will be fetched |
| Map<String, List<Record>> shardItrToRecordBatch = new HashMap<>(); |
| |
| int numOfAlreadyPartitionedRecords = 0; |
| int numOfRecordsPerBatch = numOfRecords; |
| for (int batch = 0; batch < numOfGetRecordsCalls; batch++) { |
| shardItrToRecordBatch.put( |
| String.valueOf(batch), |
| createRecordBatchWithRange( |
| numOfAlreadyPartitionedRecords, |
| numOfAlreadyPartitionedRecords + numOfRecordsPerBatch)); |
| numOfAlreadyPartitionedRecords += numOfRecordsPerBatch; |
| |
| numOfRecordsPerBatch = |
| (int) |
| (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT |
| / (averageRecordSizeBytes |
| * 1000L |
| / ConsumerConfigConstants |
| .DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)); |
| } |
| |
| return shardItrToRecordBatch; |
| } |
| |
| private static List<Record> createRecordBatchWithRange(int min, int max) { |
| List<Record> batch = new LinkedList<>(); |
| long sumRecordBatchBytes = 0L; |
| // Create record of size 10Kb |
| String data = createDataSize(10 * 1024L); |
| |
| for (int i = min; i < max; i++) { |
| Record record = |
| new Record() |
| .withData( |
| ByteBuffer.wrap( |
| data.getBytes(ConfigConstants.DEFAULT_CHARSET))) |
| .withPartitionKey(UUID.randomUUID().toString()) |
| .withApproximateArrivalTimestamp( |
| new Date(System.currentTimeMillis())) |
| .withSequenceNumber(String.valueOf(i)); |
| batch.add(record); |
| sumRecordBatchBytes += record.getData().remaining(); |
| } |
| if (batch.size() != 0) { |
| averageRecordSizeBytes = sumRecordBatchBytes / batch.size(); |
| } |
| |
| return batch; |
| } |
| |
| private static String createDataSize(final long msgSize) { |
| char[] data = new char[(int) msgSize]; |
| return new String(data); |
| } |
| } |
| |
| private static class SingleShardEmittingAggregatedRecordsKinesis |
| extends SingleShardEmittingKinesis { |
| |
| public SingleShardEmittingAggregatedRecordsKinesis( |
| final int numOfAggregatedRecords, |
| final int numOfChildRecords, |
| final int numOfGetRecordsCalls) { |
| super( |
| initShardItrToRecordBatch( |
| numOfAggregatedRecords, numOfChildRecords, numOfGetRecordsCalls)); |
| } |
| |
| private static Map<String, List<Record>> initShardItrToRecordBatch( |
| final int numOfAggregatedRecords, |
| final int numOfChildRecords, |
| final int numOfGetRecordsCalls) { |
| |
| Map<String, List<Record>> shardToRecordBatch = new HashMap<>(); |
| |
| AtomicInteger sequenceNumber = new AtomicInteger(); |
| for (int batch = 0; batch < numOfGetRecordsCalls; batch++) { |
| List<Record> recordBatch = |
| TestUtils.createAggregatedRecordBatch( |
| numOfAggregatedRecords, numOfChildRecords, sequenceNumber); |
| |
| shardToRecordBatch.put(String.valueOf(batch), recordBatch); |
| } |
| |
| return shardToRecordBatch; |
| } |
| } |
| |
| /** A helper base class used to emit records from a single sharded fake Kinesis Stream. */ |
| private abstract static class SingleShardEmittingKinesis implements KinesisProxyInterface { |
| |
| private final long millisBehindLatest; |
| |
| private final Map<String, List<Record>> shardItrToRecordBatch; |
| |
| protected SingleShardEmittingKinesis( |
| final Map<String, List<Record>> shardItrToRecordBatch) { |
| this(shardItrToRecordBatch, 0L); |
| } |
| |
| protected SingleShardEmittingKinesis( |
| final Map<String, List<Record>> shardItrToRecordBatch, |
| final long millisBehindLatest) { |
| this.millisBehindLatest = millisBehindLatest; |
| this.shardItrToRecordBatch = shardItrToRecordBatch; |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { |
| int index = Integer.parseInt(shardIterator); |
| // last next shard iterator is null |
| String nextShardIterator = |
| (index == shardItrToRecordBatch.size() - 1) ? null : String.valueOf(index + 1); |
| |
| // assuming that the maxRecordsToGet is always large enough |
| return new GetRecordsResult() |
| .withRecords(shardItrToRecordBatch.get(shardIterator)) |
| .withNextShardIterator(nextShardIterator) |
| .withMillisBehindLatest(millisBehindLatest); |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) { |
| // this will be called only one time per ShardConsumer; |
| // so, simply return the iterator of the first batch of records |
| return "0"; |
| } |
| |
| @Override |
| public GetShardListResult getShardList( |
| Map<String, String> streamNamesWithLastSeenShardIds) { |
| return null; |
| } |
| } |
| |
| private static class NonReshardedStreamsKinesis implements KinesisProxyInterface { |
| |
| private final Map<String, List<StreamShardHandle>> streamsWithListOfShards = |
| new HashMap<>(); |
| |
| public NonReshardedStreamsKinesis(Map<String, Integer> streamsToShardCount) { |
| for (Map.Entry<String, Integer> streamToShardCount : streamsToShardCount.entrySet()) { |
| String streamName = streamToShardCount.getKey(); |
| int shardCount = streamToShardCount.getValue(); |
| |
| if (shardCount == 0) { |
| // don't do anything |
| } else { |
| List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount); |
| for (int i = 0; i < shardCount; i++) { |
| shardsOfStream.add( |
| createDummyStreamShardHandle( |
| streamName, |
| KinesisShardIdGenerator.generateFromShardOrder(i))); |
| } |
| streamsWithListOfShards.put(streamName, shardsOfStream); |
| } |
| } |
| } |
| |
| @Override |
| public GetShardListResult getShardList( |
| Map<String, String> streamNamesWithLastSeenShardIds) { |
| GetShardListResult result = new GetShardListResult(); |
| for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : |
| streamsWithListOfShards.entrySet()) { |
| String streamName = streamsWithShards.getKey(); |
| for (StreamShardHandle shard : streamsWithShards.getValue()) { |
| if (streamNamesWithLastSeenShardIds.get(streamName) == null) { |
| result.addRetrievedShardToStream(streamName, shard); |
| } else { |
| if (compareShardIds( |
| shard.getShard().getShardId(), |
| streamNamesWithLastSeenShardIds.get(streamName)) |
| > 0) { |
| result.addRetrievedShardToStream(streamName, shard); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) { |
| return null; |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { |
| return null; |
| } |
| |
| /** |
| * Utility function to compare two shard ids. |
| * |
| * @param firstShardId first shard id to compare |
| * @param secondShardId second shard id to compare |
| * @return a value less than 0 if the first shard id is smaller than the second shard id, or |
| * a value larger than 0 the first shard is larger than the second shard id, or 0 if |
| * they are equal |
| */ |
| private static int compareShardIds(String firstShardId, String secondShardId) { |
| if (!isValidShardId(firstShardId)) { |
| throw new IllegalArgumentException("The first shard id has invalid format."); |
| } |
| |
| if (!isValidShardId(secondShardId)) { |
| throw new IllegalArgumentException("The second shard id has invalid format."); |
| } |
| |
| // digit segment of the shard id starts at index 8 |
| return Long.compare( |
| Long.parseLong(firstShardId.substring(8)), |
| Long.parseLong(secondShardId.substring(8))); |
| } |
| |
| /** |
| * Checks if a shard id has valid format. Kinesis stream shard ids have 12-digit numbers |
| * left-padded with 0's, prefixed with "shardId-", ex. "shardId-000000000015". |
| * |
| * @param shardId the shard id to check |
| * @return whether the shard id is valid |
| */ |
| private static boolean isValidShardId(String shardId) { |
| if (shardId == null) { |
| return false; |
| } |
| return shardId.matches("^shardId-\\d{12}"); |
| } |
| } |
| |
| private static class BlockingQueueKinesis implements KinesisProxyInterface { |
| |
| private final Map<String, List<StreamShardHandle>> streamsWithListOfShards = |
| new HashMap<>(); |
| private final Map<String, BlockingQueue<String>> shardIteratorToQueueMap = new HashMap<>(); |
| |
| private static String getShardIterator(StreamShardHandle shardHandle) { |
| return shardHandle.getStreamName() + "-" + shardHandle.getShard().getShardId(); |
| } |
| |
| public BlockingQueueKinesis(Map<String, List<BlockingQueue<String>>> streamsToShardCount) { |
| for (Map.Entry<String, List<BlockingQueue<String>>> streamToShardQueues : |
| streamsToShardCount.entrySet()) { |
| String streamName = streamToShardQueues.getKey(); |
| int shardCount = streamToShardQueues.getValue().size(); |
| |
| if (shardCount == 0) { |
| // don't do anything |
| } else { |
| List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount); |
| for (int i = 0; i < shardCount; i++) { |
| StreamShardHandle shardHandle = |
| new StreamShardHandle( |
| streamName, |
| new Shard() |
| .withShardId( |
| KinesisShardIdGenerator |
| .generateFromShardOrder(i)) |
| .withSequenceNumberRange( |
| new SequenceNumberRange() |
| .withStartingSequenceNumber("0")) |
| .withHashKeyRange( |
| new HashKeyRange() |
| .withStartingHashKey("0") |
| .withEndingHashKey("0"))); |
| shardsOfStream.add(shardHandle); |
| shardIteratorToQueueMap.put( |
| getShardIterator(shardHandle), |
| streamToShardQueues.getValue().get(i)); |
| } |
| streamsWithListOfShards.put(streamName, shardsOfStream); |
| } |
| } |
| } |
| |
| @Override |
| public GetShardListResult getShardList( |
| Map<String, String> streamNamesWithLastSeenShardIds) { |
| GetShardListResult result = new GetShardListResult(); |
| for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : |
| streamsWithListOfShards.entrySet()) { |
| String streamName = streamsWithShards.getKey(); |
| for (StreamShardHandle shard : streamsWithShards.getValue()) { |
| if (streamNamesWithLastSeenShardIds.get(streamName) == null) { |
| result.addRetrievedShardToStream(streamName, shard); |
| } else { |
| if (StreamShardHandle.compareShardIds( |
| shard.getShard().getShardId(), |
| streamNamesWithLastSeenShardIds.get(streamName)) |
| > 0) { |
| result.addRetrievedShardToStream(streamName, shard); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public String getShardIterator( |
| StreamShardHandle shard, String shardIteratorType, Object startingMarker) { |
| return getShardIterator(shard); |
| } |
| |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { |
| BlockingQueue<String> queue = |
| Preconditions.checkNotNull( |
| this.shardIteratorToQueueMap.get(shardIterator), |
| "no queue for iterator %s", |
| shardIterator); |
| List<Record> records = Collections.emptyList(); |
| try { |
| String data = queue.poll(100, TimeUnit.MILLISECONDS); |
| if (data != null) { |
| Record record = |
| new Record() |
| .withData( |
| ByteBuffer.wrap( |
| data.getBytes(ConfigConstants.DEFAULT_CHARSET))) |
| .withPartitionKey(UUID.randomUUID().toString()) |
| .withApproximateArrivalTimestamp( |
| new Date(System.currentTimeMillis())) |
| .withSequenceNumber(String.valueOf(0)); |
| records = Collections.singletonList(record); |
| } |
| } catch (InterruptedException e) { |
| shardIterator = null; |
| } |
| return new GetRecordsResult() |
| .withRecords(records) |
| .withMillisBehindLatest(0L) |
| .withNextShardIterator(shardIterator); |
| } |
| } |
| } |