| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kinesis.proxy; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; |
| import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; |
| |
| import com.amazonaws.AmazonServiceException; |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.ClientConfigurationFactory; |
| import com.amazonaws.SdkClientException; |
| import com.amazonaws.services.kinesis.AmazonKinesis; |
| import com.amazonaws.services.kinesis.model.DescribeStreamRequest; |
| import com.amazonaws.services.kinesis.model.DescribeStreamResult; |
| import com.amazonaws.services.kinesis.model.GetRecordsRequest; |
| import com.amazonaws.services.kinesis.model.GetRecordsResult; |
| import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; |
| import com.amazonaws.services.kinesis.model.GetShardIteratorResult; |
| import com.amazonaws.services.kinesis.model.LimitExceededException; |
| import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; |
| import com.amazonaws.services.kinesis.model.ResourceNotFoundException; |
| import com.amazonaws.services.kinesis.model.Shard; |
| import com.amazonaws.services.kinesis.model.ShardIteratorType; |
| import com.amazonaws.services.kinesis.model.StreamStatus; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * Kinesis proxy implementation - a utility class that is used as a proxy to make |
| * calls to AWS Kinesis for several functions, such as getting a list of shards and |
| * fetching a batch of data records starting from a specified record sequence number. |
| * |
| * <p>NOTE: |
| * In the AWS KCL library, there is a similar implementation - {@link com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy}. |
| * This implementation differs mainly in that we can make operations to arbitrary Kinesis streams, which is a needed |
| * functionality for the Flink Kinesis Connector since the consumer may simultaneously read from multiple Kinesis streams. |
| */ |
| @Internal |
| public class KinesisProxy implements KinesisProxyInterface { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); |
| |
| /** The actual Kinesis client from the AWS SDK that we will be using to make calls. */ |
| private final AmazonKinesis kinesisClient; |
| |
| /** Random seed used to calculate backoff jitter for Kinesis operations. */ |
| private static final Random seed = new Random(); |
| |
| // ------------------------------------------------------------------------ |
| // describeStream() related performance settings |
| // ------------------------------------------------------------------------ |
| |
| /** Base backoff millis for the describe stream operation. */ |
| private final long describeStreamBaseBackoffMillis; |
| |
| /** Maximum backoff millis for the describe stream operation. */ |
| private final long describeStreamMaxBackoffMillis; |
| |
| /** Exponential backoff power constant for the describe stream operation. */ |
| private final double describeStreamExpConstant; |
| |
| // ------------------------------------------------------------------------ |
| // getRecords() related performance settings |
| // ------------------------------------------------------------------------ |
| |
| /** Base backoff millis for the get records operation. */ |
| private final long getRecordsBaseBackoffMillis; |
| |
| /** Maximum backoff millis for the get records operation. */ |
| private final long getRecordsMaxBackoffMillis; |
| |
| /** Exponential backoff power constant for the get records operation. */ |
| private final double getRecordsExpConstant; |
| |
| /** Maximum attempts for the get records operation. */ |
| private final int getRecordsMaxAttempts; |
| |
| // ------------------------------------------------------------------------ |
| // getShardIterator() related performance settings |
| // ------------------------------------------------------------------------ |
| |
| /** Base backoff millis for the get shard iterator operation. */ |
| private final long getShardIteratorBaseBackoffMillis; |
| |
| /** Maximum backoff millis for the get shard iterator operation. */ |
| private final long getShardIteratorMaxBackoffMillis; |
| |
| /** Exponential backoff power constant for the get shard iterator operation. */ |
| private final double getShardIteratorExpConstant; |
| |
| /** Maximum attempts for the get shard iterator operation. */ |
| private final int getShardIteratorMaxAttempts; |
| |
| /** |
| * Create a new KinesisProxy based on the supplied configuration properties. |
| * |
| * @param configProps configuration properties containing AWS credential and AWS region info |
| */ |
| protected KinesisProxy(Properties configProps) { |
| checkNotNull(configProps); |
| |
| this.kinesisClient = createKinesisClient(configProps); |
| |
| this.describeStreamBaseBackoffMillis = Long.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, |
| Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE))); |
| this.describeStreamMaxBackoffMillis = Long.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, |
| Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX))); |
| this.describeStreamExpConstant = Double.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, |
| Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT))); |
| |
| this.getRecordsBaseBackoffMillis = Long.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, |
| Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE))); |
| this.getRecordsMaxBackoffMillis = Long.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, |
| Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX))); |
| this.getRecordsExpConstant = Double.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, |
| Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); |
| this.getRecordsMaxAttempts = Integer.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, |
| Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); |
| |
| this.getShardIteratorBaseBackoffMillis = Long.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, |
| Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE))); |
| this.getShardIteratorMaxBackoffMillis = Long.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, |
| Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX))); |
| this.getShardIteratorExpConstant = Double.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, |
| Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); |
| this.getShardIteratorMaxAttempts = Integer.valueOf( |
| configProps.getProperty( |
| ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, |
| Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); |
| |
| } |
| |
| /** |
| * Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}. |
| * Derived classes can override this method to customize the client configuration. |
| * @param configProps |
| * @return |
| */ |
| protected AmazonKinesis createKinesisClient(Properties configProps) { |
| |
| ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); |
| AWSUtil.setAwsClientConfigProperties(awsClientConfig, configProps); |
| return AWSUtil.createKinesisClient(configProps, awsClientConfig); |
| } |
| |
| /** |
| * Creates a Kinesis proxy. |
| * |
| * @param configProps configuration properties |
| * @return the created kinesis proxy |
| */ |
| public static KinesisProxyInterface create(Properties configProps) { |
| return new KinesisProxy(configProps); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { |
| final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); |
| getRecordsRequest.setShardIterator(shardIterator); |
| getRecordsRequest.setLimit(maxRecordsToGet); |
| |
| GetRecordsResult getRecordsResult = null; |
| |
| int attempt = 0; |
| while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { |
| try { |
| getRecordsResult = kinesisClient.getRecords(getRecordsRequest); |
| } catch (SdkClientException ex) { |
| if (isRecoverableSdkClientException(ex)) { |
| long backoffMillis = fullJitterBackoff( |
| getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); |
| LOG.warn("Got recoverable SdkClientException. Backing off for " |
| + backoffMillis + " millis (" + ex.getMessage() + ")"); |
| Thread.sleep(backoffMillis); |
| } else { |
| throw ex; |
| } |
| } |
| } |
| |
| if (getRecordsResult == null) { |
| throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + |
| " retry attempts returned ProvisionedThroughputExceededException."); |
| } |
| |
| return getRecordsResult; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException { |
| GetShardListResult result = new GetShardListResult(); |
| |
| for (Map.Entry<String, String> streamNameWithLastSeenShardId : streamNamesWithLastSeenShardIds.entrySet()) { |
| String stream = streamNameWithLastSeenShardId.getKey(); |
| String lastSeenShardId = streamNameWithLastSeenShardId.getValue(); |
| result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId)); |
| } |
| return result; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { |
| GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() |
| .withStreamName(shard.getStreamName()) |
| .withShardId(shard.getShard().getShardId()) |
| .withShardIteratorType(shardIteratorType); |
| |
| switch (ShardIteratorType.fromValue(shardIteratorType)) { |
| case TRIM_HORIZON: |
| case LATEST: |
| break; |
| case AT_TIMESTAMP: |
| if (startingMarker instanceof Date) { |
| getShardIteratorRequest.setTimestamp((Date) startingMarker); |
| } else { |
| throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object."); |
| } |
| break; |
| case AT_SEQUENCE_NUMBER: |
| case AFTER_SEQUENCE_NUMBER: |
| if (startingMarker instanceof String) { |
| getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker); |
| } else { |
| throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String."); |
| } |
| } |
| return getShardIterator(getShardIteratorRequest); |
| } |
| |
| private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException { |
| GetShardIteratorResult getShardIteratorResult = null; |
| |
| int attempt = 0; |
| while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { |
| try { |
| getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); |
| } catch (AmazonServiceException ex) { |
| if (isRecoverableException(ex)) { |
| long backoffMillis = fullJitterBackoff( |
| getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); |
| LOG.warn("Got recoverable AmazonServiceException. Backing off for " |
| + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); |
| Thread.sleep(backoffMillis); |
| } else { |
| throw ex; |
| } |
| } |
| } |
| |
| if (getShardIteratorResult == null) { |
| throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + |
| " retry attempts returned ProvisionedThroughputExceededException."); |
| } |
| return getShardIteratorResult.getShardIterator(); |
| } |
| |
| /** |
| * Determines whether the exception is recoverable using exponential-backoff. |
| * |
| * @param ex Exception to inspect |
| * @return <code>true</code> if the exception can be recovered from, else |
| * <code>false</code> |
| */ |
| protected boolean isRecoverableSdkClientException(SdkClientException ex) { |
| if (ex instanceof AmazonServiceException) { |
| return KinesisProxy.isRecoverableException((AmazonServiceException) ex); |
| } |
| // customizations may decide to retry other errors, such as read timeouts |
| return false; |
| } |
| |
| /** |
| * Determines whether the exception is recoverable using exponential-backoff. |
| * |
| * @param ex Exception to inspect |
| * @return <code>true</code> if the exception can be recovered from, else |
| * <code>false</code> |
| */ |
| protected static boolean isRecoverableException(AmazonServiceException ex) { |
| if (ex.getErrorType() == null) { |
| return false; |
| } |
| |
| switch (ex.getErrorType()) { |
| case Client: |
| return ex instanceof ProvisionedThroughputExceededException; |
| case Service: |
| case Unknown: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { |
| List<StreamShardHandle> shardsOfStream = new ArrayList<>(); |
| |
| DescribeStreamResult describeStreamResult; |
| do { |
| describeStreamResult = describeStream(streamName, lastSeenShardId); |
| |
| List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); |
| for (Shard shard : shards) { |
| shardsOfStream.add(new StreamShardHandle(streamName, shard)); |
| } |
| |
| if (shards.size() != 0) { |
| lastSeenShardId = shards.get(shards.size() - 1).getShardId(); |
| } |
| } while (describeStreamResult.getStreamDescription().isHasMoreShards()); |
| |
| return shardsOfStream; |
| } |
| |
| /** |
| * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess. |
| * |
| * <p>This method is using a "full jitter" approach described in AWS's article, |
| * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>. |
| * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This |
| * jitter backoff approach will help distribute calls across the fetchers over time. |
| * |
| * @param streamName the stream to describe |
| * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) |
| * @return the result of the describe stream operation |
| */ |
| private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { |
| final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); |
| describeStreamRequest.setStreamName(streamName); |
| describeStreamRequest.setExclusiveStartShardId(startShardId); |
| |
| DescribeStreamResult describeStreamResult = null; |
| |
| // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). |
| int attemptCount = 0; |
| while (describeStreamResult == null) { // retry until we get a result |
| try { |
| describeStreamResult = kinesisClient.describeStream(describeStreamRequest); |
| } catch (LimitExceededException le) { |
| long backoffMillis = fullJitterBackoff( |
| describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); |
| LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " |
| + backoffMillis + " millis."); |
| Thread.sleep(backoffMillis); |
| } catch (ResourceNotFoundException re) { |
| throw new RuntimeException("Error while getting stream details", re); |
| } |
| } |
| |
| String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); |
| if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) { |
| if (LOG.isWarnEnabled()) { |
| LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + |
| "describeStream operation will not contain any shard information."); |
| } |
| } |
| |
| // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive |
| // start shard id in the returned shards list; check if we need to remove these erroneously returned shards |
| if (startShardId != null) { |
| List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); |
| Iterator<Shard> shardItr = shards.iterator(); |
| while (shardItr.hasNext()) { |
| if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) { |
| shardItr.remove(); |
| } |
| } |
| } |
| |
| return describeStreamResult; |
| } |
| |
| protected static long fullJitterBackoff(long base, long max, double power, int attempt) { |
| long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt)); |
| return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff |
| } |
| } |