blob: eb5620faf172aa5d6e671e4a6fb937c5f4d667ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.proxy;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider;
import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties;
/**
* DynamoDB streams proxy: interface interacting with the DynamoDB streams.
*/
public class DynamoDBStreamsProxy extends KinesisProxy {
private static final Logger LOG = LoggerFactory.getLogger(DynamoDBStreamsProxy.class);
/** Used for formatting Flink-specific user agent string when creating Kinesis client. */
private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector";
protected DynamoDBStreamsProxy(Properties configProps) {
super(configProps);
}
/**
* Creates a DynamoDB streams proxy.
*
* @param configProps configuration properties
* @return the created DynamoDB streams proxy
*/
public static KinesisProxyInterface create(Properties configProps) {
return new DynamoDBStreamsProxy(configProps);
}
/**
* Creates an AmazonDynamoDBStreamsAdapterClient.
* Uses it as the internal client interacting with the DynamoDB streams.
*
* @param configProps configuration properties
* @return an AWS DynamoDB streams adapter client
*/
@Override
protected AmazonKinesis createKinesisClient(Properties configProps) {
ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
setAwsClientConfigProperties(awsClientConfig, configProps);
AWSCredentialsProvider credentials = getCredentialsProvider(configProps);
awsClientConfig.setUserAgentPrefix(
String.format(
USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));
AmazonDynamoDBStreamsAdapterClient adapterClient =
new AmazonDynamoDBStreamsAdapterClient(credentials, awsClientConfig);
if (configProps.containsKey(AWS_ENDPOINT)) {
adapterClient.setEndpoint(configProps.getProperty(AWS_ENDPOINT));
} else {
adapterClient.setRegion(Region.getRegion(
Regions.fromName(configProps.getProperty(AWS_REGION))));
}
return adapterClient;
}
@Override
public GetShardListResult getShardList(
Map<String, String> streamNamesWithLastSeenShardIds) throws InterruptedException {
GetShardListResult result = new GetShardListResult();
for (Map.Entry<String, String> streamNameWithLastSeenShardId :
streamNamesWithLastSeenShardIds.entrySet()) {
String stream = streamNameWithLastSeenShardId.getKey();
String lastSeenShardId = streamNameWithLastSeenShardId.getValue();
result.addRetrievedShardsToStream(stream, getShardsOfStream(stream, lastSeenShardId));
}
return result;
}
private List<StreamShardHandle> getShardsOfStream(
String streamName,
@Nullable String lastSeenShardId)
throws InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();
DescribeStreamResult describeStreamResult;
do {
describeStreamResult = describeStream(streamName, lastSeenShardId);
List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
for (Shard shard : shards) {
shardsOfStream.add(new StreamShardHandle(streamName, shard));
}
if (shards.size() != 0) {
lastSeenShardId = shards.get(shards.size() - 1).getShardId();
}
} while (describeStreamResult.getStreamDescription().isHasMoreShards());
return shardsOfStream;
}
}