blob: 0d730af7bc7a37f96e4edbb05b50418ca22e4b8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.metrics.ShardMetricsReporter;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Thread that does the actual data pulling from AWS Kinesis shards. Each thread is in charge of one Kinesis shard only.
*/
@Internal
public class ShardConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
private final KinesisDeserializationSchema<T> deserializer;
private final KinesisProxyInterface kinesis;
private final int subscribedShardStateIndex;
private final KinesisDataFetcher<T> fetcherRef;
private final StreamShardHandle subscribedShard;
private final int maxNumberOfRecordsPerFetch;
private final long fetchIntervalMillis;
private final ShardMetricsReporter shardMetricsReporter;
private SequenceNumber lastSequenceNum;
private Date initTimestamp;
/**
* Creates a shard consumer.
*
* @param fetcherRef reference to the owning fetcher
* @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to
* @param subscribedShard the shard this consumer is subscribed to
* @param lastSequenceNum the sequence number in the shard to start consuming
* @param shardMetricsReporter the reporter to report metrics to
*/
public ShardConsumer(KinesisDataFetcher<T> fetcherRef,
Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
ShardMetricsReporter shardMetricsReporter) {
this(fetcherRef,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
shardMetricsReporter);
}
/** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher<T> fetcherRef,
Integer subscribedShardStateIndex,
StreamShardHandle subscribedShard,
SequenceNumber lastSequenceNum,
KinesisProxyInterface kinesis,
ShardMetricsReporter shardMetricsReporter) {
this.fetcherRef = checkNotNull(fetcherRef);
this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
this.shardMetricsReporter = checkNotNull(shardMetricsReporter);
checkArgument(
!lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()),
"Should not start a ShardConsumer if the shard has already been completely read.");
this.deserializer = fetcherRef.getClonedDeserializationSchema();
Properties consumerConfig = fetcherRef.getConsumerConfiguration();
this.kinesis = kinesis;
this.maxNumberOfRecordsPerFetch = Integer.valueOf(consumerConfig.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
Integer.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)));
this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS)));
if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
String timestamp = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
try {
String format = consumerConfig.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
this.initTimestamp = customDateFormat.parse(timestamp);
} catch (IllegalArgumentException | NullPointerException exception) {
throw new IllegalArgumentException(exception);
} catch (ParseException exception) {
this.initTimestamp = new Date((long) (Double.parseDouble(timestamp) * 1000));
}
} else {
this.initTimestamp = null;
}
}
@SuppressWarnings("unchecked")
@Override
public void run() {
String nextShardItr;
try {
// before infinitely looping, we set the initial nextShardItr appropriately
if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) {
// if the shard is already closed, there will be no latest next record to get for this shard
if (subscribedShard.isClosed()) {
nextShardItr = null;
} else {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null);
}
} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null);
} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
nextShardItr = null;
} else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get())) {
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_TIMESTAMP.toString(), initTimestamp);
} else {
// we will be starting from an actual sequence number (due to restore from failure).
// if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records
// from the last aggregated record; otherwise, we can simply start iterating from the record right after.
if (lastSequenceNum.isAggregated()) {
String itrForLastAggregatedRecord =
kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// get only the last aggregated record
GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
List<UserRecord> fetchedRecords = deaggregateRecords(
getRecordsResult.getRecords(),
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
long lastSubSequenceNum = lastSequenceNum.getSubSequenceNumber();
for (UserRecord record : fetchedRecords) {
// we have found a dangling sub-record if it has a larger subsequence number
// than our last sequence number; if so, collect the record and update state
if (record.getSubSequenceNumber() > lastSubSequenceNum) {
deserializeRecordForCollectionAndUpdateState(record);
}
}
// set the nextShardItr so we can continue iterating in the next while loop
nextShardItr = getRecordsResult.getNextShardIterator();
} else {
// the last record was non-aggregated, so we can simply start from the next record
nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
}
}
while (isRunning()) {
if (nextShardItr == null) {
fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
// we can close this consumer thread once we've reached the end of the subscribed shard
break;
} else {
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
}
GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
List<UserRecord> fetchedRecords = deaggregateRecords(
getRecordsResult.getRecords(),
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
for (UserRecord record : fetchedRecords) {
deserializeRecordForCollectionAndUpdateState(record);
}
nextShardItr = getRecordsResult.getNextShardIterator();
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
/**
* The loop in run() checks this before fetching next batch of records. Since this runnable will be executed
* by the ExecutorService {@link KinesisDataFetcher#shardConsumersExecutor}, the only way to close down this thread
* would be by calling shutdownNow() on {@link KinesisDataFetcher#shardConsumersExecutor} and let the executor service
* interrupt all currently running {@link ShardConsumer}s.
*/
private boolean isRunning() {
return !Thread.interrupted();
}
/**
* Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
* successfully collected sequence number in this shard consumer is also updated so that
* {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
* iterators if necessary.
*
* <p>Note that the server-side Kinesis timestamp is attached to the record when collected. When the
* user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
*
* @param record record to deserialize and collect
* @throws IOException
*/
private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
throws IOException {
ByteBuffer recordData = record.getData();
byte[] dataBytes = new byte[recordData.remaining()];
recordData.get(dataBytes);
final long approxArrivalTimestamp = record.getApproximateArrivalTimestamp().getTime();
final T value = deserializer.deserialize(
dataBytes,
record.getPartitionKey(),
record.getSequenceNumber(),
approxArrivalTimestamp,
subscribedShard.getStreamName(),
subscribedShard.getShard().getShardId());
SequenceNumber collectedSequenceNumber = (record.isAggregated())
? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
: new SequenceNumber(record.getSequenceNumber());
fetcherRef.emitRecordAndUpdateState(
value,
approxArrivalTimestamp,
subscribedShardStateIndex,
collectedSequenceNumber);
lastSequenceNum = collectedSequenceNumber;
}
/**
* Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
* AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
* such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
* be used for the next call to this method.
*
* <p>Note: it is important that this method is not called again before all the records from the last result have been
* fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
* {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
* incorrect shard iteration if the iterator had to be refreshed.
*
* @param shardItr shard iterator to use
* @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
* @return get records result
* @throws InterruptedException
*/
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
GetRecordsResult getRecordsResult = null;
while (getRecordsResult == null) {
try {
getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
// Update millis behind latest so it gets reported by the millisBehindLatest gauge
shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
" refreshing the iterator ...", shardItr, subscribedShard);
shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
if (fetchIntervalMillis != 0) {
Thread.sleep(fetchIntervalMillis);
}
}
}
return getRecordsResult;
}
@SuppressWarnings("unchecked")
protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
}