blob: 2678c9012bbd8ecb7eff9675f416f4c6be720733 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
import com.fasterxml.jackson.databind.deser.DeserializerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Some utilities specific to Amazon Web Service.
*/
@Internal
public class AWSUtil {
/** Used for formatting Flink-specific user agent string when creating Kinesis client. */
private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector";
/**
* Creates an AmazonKinesis client.
* @param configProps configuration properties containing the access key, secret key, and region
* @return a new AmazonKinesis client
*/
public static AmazonKinesis createKinesisClient(Properties configProps) {
return createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
}
/**
* Creates an Amazon Kinesis Client.
* @param configProps configuration properties containing the access key, secret key, and region
* @param awsClientConfig preconfigured AWS SDK client configuration
* @return a new Amazon Kinesis Client
*/
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
.withClientConfiguration(awsClientConfig);
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
// Set signingRegion as null, to facilitate mocking Kinesis for local tests
builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
null));
} else {
builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
}
return builder.build();
}
/**
* Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
*
* @param configProps the configuration properties
* @return The corresponding AWS Credentials Provider instance
*/
public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
CredentialProvider credentialProviderType;
if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
&& configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
credentialProviderType = CredentialProvider.BASIC;
} else {
// if the credential provider type is not specified, it will default to AUTO
credentialProviderType = CredentialProvider.AUTO;
}
} else {
credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
}
AWSCredentialsProvider credentialsProvider;
switch (credentialProviderType) {
case ENV_VAR:
credentialsProvider = new EnvironmentVariableCredentialsProvider();
break;
case SYS_PROP:
credentialsProvider = new SystemPropertiesCredentialsProvider();
break;
case PROFILE:
String profileName = configProps.getProperty(
AWSConfigConstants.AWS_PROFILE_NAME, null);
String profileConfigPath = configProps.getProperty(
AWSConfigConstants.AWS_PROFILE_PATH, null);
credentialsProvider = (profileConfigPath == null)
? new ProfileCredentialsProvider(profileName)
: new ProfileCredentialsProvider(profileConfigPath, profileName);
break;
case BASIC:
credentialsProvider = new AWSCredentialsProvider() {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(
configProps.getProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID),
configProps.getProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
}
@Override
public void refresh() {
// do nothing
}
};
break;
default:
case AUTO:
credentialsProvider = new DefaultAWSCredentialsProviderChain();
}
return credentialsProvider;
}
/**
* Checks whether or not a region ID is valid.
*
* @param region The AWS region ID to check
* @return true if the supplied region ID is valid, false otherwise
*/
public static boolean isValidRegion(String region) {
try {
Regions.fromName(region.toLowerCase());
} catch (IllegalArgumentException e) {
return false;
}
return true;
}
/**
* The prefix used for properties that should be applied to {@link ClientConfiguration}.
*/
public static final String AWS_CLIENT_CONFIG_PREFIX = "aws.clientconfig.";
/**
* Set all prefixed properties on {@link ClientConfiguration}.
* @param config
* @param configProps
*/
public static void setAwsClientConfigProperties(ClientConfiguration config,
Properties configProps) {
Map<String, Object> awsConfigProperties = new HashMap<>();
for (Map.Entry<Object, Object> entry : configProps.entrySet()) {
String key = (String) entry.getKey();
if (key.startsWith(AWS_CLIENT_CONFIG_PREFIX)) {
awsConfigProperties.put(key.substring(AWS_CLIENT_CONFIG_PREFIX.length()), entry.getValue());
}
}
// Jackson does not like the following properties
String[] ignorableProperties = {"secureRandom"};
BeanDeserializerModifier modifier = new BeanDeserializerModifierForIgnorables(
ClientConfiguration.class, ignorableProperties);
DeserializerFactory factory = BeanDeserializerFactory.instance.withDeserializerModifier(
modifier);
ObjectMapper mapper = new ObjectMapper(null, null,
new DefaultDeserializationContext.Impl(factory));
JsonNode propTree = mapper.convertValue(awsConfigProperties, JsonNode.class);
try {
mapper.readerForUpdating(config).readValue(propTree);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}