| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.connectors.kinesis.util; |
| |
| import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; |
| import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; |
| |
| import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.powermock.modules.junit4.PowerMockRunner; |
| |
| import java.util.Properties; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| |
| /** |
| * Tests for KinesisConfigUtil. |
| */ |
| @RunWith(PowerMockRunner.class) |
| @PrepareForTest(KinesisConfigUtil.class) |
| public class KinesisConfigUtilTest { |
| @Rule |
| private ExpectedException exception = ExpectedException.none(); |
| |
| // ---------------------------------------------------------------------- |
| // getValidatedProducerConfiguration() tests |
| // ---------------------------------------------------------------------- |
| |
| @Test |
| public void testUnparsableLongForProducerConfiguration() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Error trying to set field RateLimit with the value 'unparsableLong'"); |
| |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); |
| testConfig.setProperty("RateLimit", "unparsableLong"); |
| |
| KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testRateLimitInProducerConfiguration() { |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); |
| KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals(100, kpc.getRateLimit()); |
| |
| testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150"); |
| kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals(150, kpc.getRateLimit()); |
| } |
| |
| @Test |
| public void testThreadingModelInProducerConfiguration() { |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); |
| KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel()); |
| |
| testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST"); |
| kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel()); |
| } |
| |
| @Test |
| public void testThreadPoolSizeInProducerConfiguration() { |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); |
| KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals(10, kpc.getThreadPoolSize()); |
| |
| testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12"); |
| kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals(12, kpc.getThreadPoolSize()); |
| } |
| |
| @Test |
| public void testReplaceDeprecatedKeys() { |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); |
| // these deprecated keys should be replaced |
| testConfig.setProperty(ProducerConfigConstants.AGGREGATION_MAX_COUNT, "1"); |
| testConfig.setProperty(ProducerConfigConstants.COLLECTION_MAX_COUNT, "2"); |
| Properties replacedConfig = KinesisConfigUtil.replaceDeprecatedProducerKeys(testConfig); |
| |
| assertEquals("1", replacedConfig.getProperty(KinesisConfigUtil.AGGREGATION_MAX_COUNT)); |
| assertEquals("2", replacedConfig.getProperty(KinesisConfigUtil.COLLECTION_MAX_COUNT)); |
| } |
| |
| @Test |
| public void testCorrectlySetRegionInProducerConfiguration() { |
| String region = "us-east-1"; |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, region); |
| KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); |
| |
| assertEquals("incorrect region", region, kpc.getRegion()); |
| } |
| |
| // ---------------------------------------------------------------------- |
| // validateAwsConfiguration() tests |
| // ---------------------------------------------------------------------- |
| |
| @Test |
| public void testMissingAwsRegionInConfig() { |
| String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.", |
| AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION); |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage(expectedMessage); |
| |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); |
| testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); |
| |
| KinesisConfigUtil.validateAwsConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnrecognizableAwsRegionInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid AWS region"); |
| |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "wrongRegionId"); |
| testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId"); |
| testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); |
| |
| KinesisConfigUtil.validateAwsConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testAwsRegionOrEndpointInConfig() { |
| String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.", |
| AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION); |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage(expectedMessage); |
| |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east"); |
| testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake"); |
| testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); |
| testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); |
| |
| KinesisConfigUtil.validateAwsConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " + |
| "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); |
| |
| Properties testConfig = new Properties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); |
| testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| |
| KinesisConfigUtil.validateAwsConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnrecognizableCredentialProviderTypeInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid AWS Credential Provider Type"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType"); |
| |
| KinesisConfigUtil.validateAwsConfiguration(testConfig); |
| } |
| |
| // ---------------------------------------------------------------------- |
| // validateConsumerConfiguration() tests |
| // ---------------------------------------------------------------------- |
| |
| @Test |
| public void testUnrecognizableStreamInitPositionTypeInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid initial position in stream"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "wrongInitPosition"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Please set value for initial timestamp ('" |
| + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP initial position."); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableDateForInitialTimestampInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "unparsableDate"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testIllegalValueForInitialTimestampInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "-1.0"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testDateStringForValidateOptionDateProperty() { |
| String timestamp = "2016-04-04T19:58:46.480-00:00"; |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timestamp); |
| |
| try { |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail(); |
| } |
| } |
| |
| @Test |
| public void testUnixTimestampForValidateOptionDateProperty() { |
| String unixTimestamp = "1459799926.480"; |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); |
| |
| try { |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail(); |
| } |
| } |
| |
| @Test |
| public void testInvalidPatternForInitialTimestampInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "2016-03-14"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "InvalidPattern"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for initial timestamp for AT_TIMESTAMP initial position in stream."); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, "stillUnparsable"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, "yyyy-MM-dd"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() { |
| String unixTimestamp = "2016-04-04"; |
| String pattern = "yyyy-MM-dd"; |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, unixTimestamp); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, pattern); |
| |
| try { |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| fail(); |
| } |
| } |
| |
| @Test |
| public void testUnparsableLongForDescribeStreamBackoffBaseMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for describe stream operation base backoff milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForDescribeStreamBackoffMaxMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for describe stream operation max backoff milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableDoubleForDescribeStreamBackoffExponentialConstantInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for describe stream operation backoff exponential constant"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableIntForGetRecordsRetriesInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for maximum retry attempts for getRecords shard operation"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "unparsableInt"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableIntForGetRecordsMaxCountInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for maximum records per getRecords shard operation"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "unparsableInt"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForGetRecordsBackoffBaseMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for get records operation base backoff milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForGetRecordsBackoffMaxMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for get records operation max backoff milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableDoubleForGetRecordsBackoffExponentialConstantInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for get records operation backoff exponential constant"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForGetRecordsIntervalMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for getRecords sleep interval in milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableIntForGetShardIteratorRetriesInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for maximum retry attempts for getShardIterator shard operation"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, "unparsableInt"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForGetShardIteratorBackoffBaseMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for get shard iterator operation base backoff milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForGetShardIteratorBackoffMaxMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for get shard iterator operation max backoff milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableDoubleForGetShardIteratorBackoffExponentialConstantInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for get shard iterator operation backoff exponential constant"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, "unparsableDouble"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| |
| @Test |
| public void testUnparsableLongForShardDiscoveryIntervalMillisInConfig() { |
| exception.expect(IllegalArgumentException.class); |
| exception.expectMessage("Invalid value given for shard discovery sleep interval in milliseconds"); |
| |
| Properties testConfig = TestUtils.getStandardProperties(); |
| testConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "unparsableLong"); |
| |
| KinesisConfigUtil.validateConsumerConfiguration(testConfig); |
| } |
| } |