blob: 69ca58b81297596b741b527142435ed9b895da96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* Tests for KinesisConfigUtil.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(KinesisConfigUtil.class)
public class KinesisConfigUtilTest {
@Rule
private ExpectedException exception = ExpectedException.none();
// ----------------------------------------------------------------------
// getValidatedProducerConfiguration() tests
// ----------------------------------------------------------------------
@Test
public void testUnparsableLongForProducerConfiguration() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty("RateLimit", "unparsableLong");
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}
@Test
public void testRateLimitInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(100, kpc.getRateLimit());
testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(150, kpc.getRateLimit());
}
@Test
public void testThreadingModelInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());
testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}
@Test
public void testThreadPoolSizeInProducerConfiguration() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(10, kpc.getThreadPoolSize());
testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals(12, kpc.getThreadPoolSize());
}
@Test
public void testReplaceDeprecatedKeys() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
// these deprecated keys should be replaced
testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1");
testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2");
Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig);
assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT));
assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT));
}
@Test
public void testCorrectlySetRegionInProducerConfiguration() {
String region = "us-east-1";
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
assertEquals("incorrect region", region, kpc.getRegion());
}
// ----------------------------------------------------------------------
// validateAwsConfiguration() tests
// ----------------------------------------------------------------------
@Test
public void testMissingAwsRegionInConfig() {
String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
@Test
public void testUnrecognizableAwsRegionInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid AWS region");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
@Test
public void testAwsRegionOrEndpointInConfig() {
String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east");
testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
@Test
public void testUnrecognizableCredentialProviderTypeInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid AWS Credential Provider Type");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}
// ----------------------------------------------------------------------
// validateConsumerConfiguration() tests
// ----------------------------------------------------------------------
@Test
public void testUnrecognizableStreamInitPositionTypeInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid initial position in stream");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Please set value for initial timestamp ('"
+ ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position.");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableDateForInitialTimestampInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testIllegalValueForInitialTimestampInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testDateStringForValidateOptionDateProperty() {
String timestamp = "2016-04-04T19:58:46.480-00:00";
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp);
try {
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
public void testUnixTimestampForValidateOptionDateProperty() {
String unixTimestamp = "1459799926.480";
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
try {
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
public void testInvalidPatternForInitialTimestampInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14");
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream.");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable");
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
String unixTimestamp = "2016-04-04";
String pattern = "yyyy-MM-dd";
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp);
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern);
try {
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
@Test
public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableIntForGetRecordsRetriesInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableIntForGetRecordsMaxCountInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for maximum records per getRecords shard operation");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get records operation base backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get records operation max backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get records operation backoff exponential constant");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForGetRecordsIntervalMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableIntForGetShardIteratorRetriesInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
@Test
public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds");
Properties testConfig = TestUtils.getStandardProperties();
testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong");
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}
}