blob: dc8b79b9ff5705f4a60996f10f598af939932221 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Utilities for Flink Kinesis connector configuration.
*/
@Internal
public class KinesisConfigUtil {
/** Maximum number of items to pack into an PutRecords request. **/
protected static final String COLLECTION_MAX_COUNT = "CollectionMaxCount";
/** Maximum number of items to pack into an aggregated record. **/
protected static final String AGGREGATION_MAX_COUNT = "AggregationMaxCount";
/** Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.
* The default value is set as 100% in Flink. KPL's default value is 150% but it makes KPL throw
* RateLimitExceededException too frequently and breaks Flink sink as a result.
**/
protected static final String RATE_LIMIT = "RateLimit";
/**
* The threading model that KinesisProducer will use.
**/
protected static final String THREADING_MODEL = "ThreadingModel";
/**
* The maximum number of threads that the native process' thread pool will be configured with.
**/
protected static final String THREAD_POOL_SIZE = "ThreadPoolSize";
/** Default values for RateLimit. **/
protected static final long DEFAULT_RATE_LIMIT = 100L;
/** Default value for ThreadingModel. **/
protected static final KinesisProducerConfiguration.ThreadingModel DEFAULT_THREADING_MODEL = KinesisProducerConfiguration.ThreadingModel.POOLED;
/** Default values for ThreadPoolSize. **/
protected static final int DEFAULT_THREAD_POOL_SIZE = 10;
/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
*/
public static void validateConsumerConfiguration(Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);
// specified initial position in stream must be either LATEST, TRIM_HORIZON or AT_TIMESTAMP
try {
InitialPosition.valueOf(initPosType);
} catch (IllegalArgumentException e) {
StringBuilder sb = new StringBuilder();
for (InitialPosition pos : InitialPosition.values()) {
sb.append(pos.toString()).append(", ");
}
throw new IllegalArgumentException("Invalid initial position in stream set in config. Valid values are: " + sb.toString());
}
// specified initial timestamp in stream when using AT_TIMESTAMP
if (InitialPosition.valueOf(initPosType) == InitialPosition.AT_TIMESTAMP) {
if (!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
throw new IllegalArgumentException("Please set value for initial timestamp ('"
+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
}
validateOptionalDateProperty(config,
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
config.getProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT),
"Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream. "
+ "Must be a valid format: yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
}
}
validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_MAX,
"Invalid value given for maximum records per getRecords shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
"Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
"Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
"Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveIntProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
"Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
"Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
"Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
"Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
"Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
checkArgument(
Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
< ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
"Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " +
ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
);
}
}
/**
* Replace deprecated configuration properties for {@link FlinkKinesisProducer}.
* This should be remove along with deprecated keys
*/
public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
// Replace deprecated key
if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
configProps.setProperty(COLLECTION_MAX_COUNT,
configProps.getProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT));
configProps.remove(ProducerConfigConstants.COLLECTION_MAX_COUNT);
}
// Replace deprecated key
if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
configProps.setProperty(AGGREGATION_MAX_COUNT,
configProps.getProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT));
configProps.remove(ProducerConfigConstants.AGGREGATION_MAX_COUNT);
}
return configProps;
}
/**
* Validate configuration properties for {@link FlinkKinesisProducer},
* and return a constructed KinesisProducerConfiguration.
*/
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
checkNotNull(config, "config can not be null");
validateAwsConfiguration(config);
KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));
kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));
// we explicitly lower the credential refresh delay (default is 5 seconds)
// to avoid an ignorable interruption warning that occurs when shutting down the
// KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
kpc.setCredentialsRefreshDelay(100);
// Override default values if they aren't specified by users
if (!config.containsKey(RATE_LIMIT)) {
kpc.setRateLimit(DEFAULT_RATE_LIMIT);
}
if (!config.containsKey(THREADING_MODEL)) {
kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
}
if (!config.containsKey(THREAD_POOL_SIZE)) {
kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
}
return kpc;
}
/**
* Validate configuration properties related to Amazon AWS service.
*/
public static void validateAwsConfiguration(Properties config) {
if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
CredentialProvider providerType;
try {
providerType = CredentialProvider.valueOf(credentialsProviderType);
} catch (IllegalArgumentException e) {
StringBuilder sb = new StringBuilder();
for (CredentialProvider type : CredentialProvider.values()) {
sb.append(type.toString()).append(", ");
}
throw new IllegalArgumentException("Invalid AWS Credential Provider Type set in config. Valid values are: " + sb.toString());
}
// if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
if (providerType == CredentialProvider.BASIC) {
if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
}
}
}
if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION));
}
if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
// specified AWS Region name must be recognizable
if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
StringBuilder sb = new StringBuilder();
for (Regions region : Regions.values()) {
sb.append(region.getName()).append(", ");
}
throw new IllegalArgumentException("Invalid AWS region set in config. Valid values are: " + sb.toString());
}
}
}
private static void validateOptionalPositiveLongProperty(Properties config, String key, String message) {
if (config.containsKey(key)) {
try {
long value = Long.parseLong(config.getProperty(key));
if (value < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(message);
}
}
}
private static void validateOptionalPositiveIntProperty(Properties config, String key, String message) {
if (config.containsKey(key)) {
try {
int value = Integer.parseInt(config.getProperty(key));
if (value < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(message);
}
}
}
private static void validateOptionalPositiveDoubleProperty(Properties config, String key, String message) {
if (config.containsKey(key)) {
try {
double value = Double.parseDouble(config.getProperty(key));
if (value < 0) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(message);
}
}
}
private static void validateOptionalDateProperty(Properties config, String timestampKey, String format, String message) {
if (config.containsKey(timestampKey)) {
try {
SimpleDateFormat customDateFormat = new SimpleDateFormat(format);
customDateFormat.parse(config.getProperty(timestampKey));
} catch (IllegalArgumentException | NullPointerException exception) {
throw new IllegalArgumentException(message);
} catch (ParseException exception) {
try {
double value = Double.parseDouble(config.getProperty(timestampKey));
if (value < 0) {
throw new IllegalArgumentException(message);
}
} catch (NumberFormatException numberFormatException) {
throw new IllegalArgumentException(message);
}
}
}
}
}