| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kinesis.proxy; |
| |
| import org.apache.flink.runtime.util.EnvironmentInformation; |
| import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; |
| |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.ClientConfigurationFactory; |
| import com.amazonaws.auth.AWSCredentialsProvider; |
| import com.amazonaws.regions.Region; |
| import com.amazonaws.regions.Regions; |
| import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; |
| import com.amazonaws.services.kinesis.AmazonKinesis; |
| import com.amazonaws.services.kinesis.model.DescribeStreamResult; |
| import com.amazonaws.services.kinesis.model.Shard; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; |
| import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION; |
| import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider; |
| import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties; |
| |
| /** |
| * DynamoDB streams proxy: interface interacting with the DynamoDB streams. |
| */ |
| public class DynamoDBStreamsProxy extends KinesisProxy { |
| private static final Logger LOG = LoggerFactory.getLogger(DynamoDBStreamsProxy.class); |
| |
| /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ |
| private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector"; |
| |
| protected DynamoDBStreamsProxy(Properties configProps) { |
| super(configProps); |
| } |
| |
| /** |
| * Creates a DynamoDB streams proxy. |
| * |
| * @param configProps configuration properties |
| * @return the created DynamoDB streams proxy |
| */ |
| public static KinesisProxyInterface create(Properties configProps) { |
| return new DynamoDBStreamsProxy(configProps); |
| } |
| |
| /** |
| * Creates an AmazonDynamoDBStreamsAdapterClient. |
| * Uses it as the internal client interacting with the DynamoDB streams. |
| * |
| * @param configProps configuration properties |
| * @return an AWS DynamoDB streams adapter client |
| */ |
| @Override |
| protected AmazonKinesis createKinesisClient(Properties configProps) { |
| ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); |
| setAwsClientConfigProperties(awsClientConfig, configProps); |
| |
| AWSCredentialsProvider credentials = getCredentialsProvider(configProps); |
| awsClientConfig.setUserAgentPrefix( |
| String.format( |
| USER_AGENT_FORMAT, |
| EnvironmentInformation.getVersion(), |
| EnvironmentInformation.getRevisionInformation().commitId)); |
| |
| AmazonDynamoDBStreamsAdapterClient adapterClient = |
| new AmazonDynamoDBStreamsAdapterClient(credentials, awsClientConfig); |
| |
| if (configProps.containsKey(AWS_ENDPOINT)) { |
| adapterClient.setEndpoint(configProps.getProperty(AWS_ENDPOINT)); |
| } else { |
| adapterClient.setRegion(Region.getRegion( |
| Regions.fromName(configProps.getProperty(AWS_REGION)))); |
| } |
| |
| return adapterClient; |
| } |
| |
| @Override |
| public GetShardListResult getShardList( |
| Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException { |
| GetShardListResult result = new GetShardListResult(); |
| |
| for (Map.Entry<String, String> streamNameWithLastSeenShardId : |
| streamNamesWithLastSeenShardIds.entrySet()) { |
| String stream = streamNameWithLastSeenShardId.getKey(); |
| String lastSeenShardId = streamNameWithLastSeenShardId.getValue(); |
| result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId)); |
| } |
| return result; |
| } |
| |
| private List<StreamShardHandle> getShardsOfStream( |
| String streamName, |
| @Nullable String lastSeenShardId) |
| throws InterruptedException { |
| List<StreamShardHandle> shardsOfStream = new ArrayList<>(); |
| |
| DescribeStreamResult describeStreamResult; |
| do { |
| describeStreamResult = describeStream(streamName, lastSeenShardId); |
| List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); |
| for (Shard shard : shards) { |
| shardsOfStream.add(new StreamShardHandle(streamName, shard)); |
| } |
| |
| if (shards.size() != 0) { |
| lastSeenShardId = shards.get(shards.size() - 1).getShardId(); |
| } |
| } while (describeStreamResult.getStreamDescription().isHasMoreShards()); |
| |
| return shardsOfStream; |
| } |
| } |