[FLINK-22529][kinesis] Allow Flink's ConsumerConfigConstants and flexibility in providing AWS region and credentials
This closes #237.
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index 93e322e..6096ec7 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -60,21 +60,35 @@
private static Properties propertiesFromSpec(KinesisIngressSpec<?> spec) {
final Properties properties = new Properties();
- properties.putAll(resolveClientProperties(spec.clientConfigurationProperties()));
- properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(spec.awsRegion()));
- properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
+ properties.putAll(resolveProperties(spec.properties()));
+ spec.awsRegion()
+ .transformPropertiesIfPresent(
+ properties,
+ ConsumerConfigConstants.AWS_REGION,
+ (props, region) ->
+ properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(region)));
+ spec.awsCredentials()
+ .transformPropertiesIfPresent(
+ properties,
+ ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
+ (props, credentials) ->
+ properties.putAll(AwsAuthConfigProperties.forAwsCredentials(credentials)));
setStartupPositionProperties(properties, spec.startupPosition());
return properties;
}
- private static Properties resolveClientProperties(Properties clientConfigurationProperties) {
+ private static Properties resolveProperties(Properties properties) {
final Properties resolvedProps = new Properties();
- for (String property : clientConfigurationProperties.stringPropertyNames()) {
- resolvedProps.setProperty(
- asFlinkConsumerClientPropertyKey(property),
- clientConfigurationProperties.getProperty(property));
+ for (String property : properties.stringPropertyNames()) {
+ if (property.startsWith("flink.") || property.startsWith("aws.")) {
+ resolvedProps.setProperty(property, properties.getProperty(property));
+ } else {
+ // all other configs are assumed to be AWS configs
+ resolvedProps.setProperty(
+ asAwsClientPropertyKey(property), properties.getProperty(property));
+ }
}
return resolvedProps;
}
@@ -105,7 +119,7 @@
}
}
- private static String asFlinkConsumerClientPropertyKey(String key) {
+ private static String asAwsClientPropertyKey(String key) {
return AWSUtil.AWS_CLIENT_CONFIG_PREFIX + lowercaseFirstLetter(key);
}
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
index cb16d5c..d3b96b6 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
+++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
@@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.core.OptionalProperty;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,14 +40,14 @@
private final List<String> topics = new ArrayList<>();
private final Properties properties = new Properties();
- private OptionalConfig<String> consumerGroupId = OptionalConfig.withoutDefault();
- private OptionalConfig<KafkaIngressDeserializer<T>> deserializer =
- OptionalConfig.withoutDefault();
- private OptionalConfig<String> kafkaAddress = OptionalConfig.withoutDefault();
- private OptionalConfig<KafkaIngressAutoResetPosition> autoResetPosition =
- OptionalConfig.withDefault(KafkaIngressAutoResetPosition.LATEST);
- private OptionalConfig<KafkaIngressStartupPosition> startupPosition =
- OptionalConfig.withDefault(KafkaIngressStartupPosition.fromLatest());
+ private OptionalProperty<String> consumerGroupId = OptionalProperty.withoutDefault();
+ private OptionalProperty<KafkaIngressDeserializer<T>> deserializer =
+ OptionalProperty.withoutDefault();
+ private OptionalProperty<String> kafkaAddress = OptionalProperty.withoutDefault();
+ private OptionalProperty<KafkaIngressAutoResetPosition> autoResetPosition =
+ OptionalProperty.withDefault(KafkaIngressAutoResetPosition.LATEST);
+ private OptionalProperty<KafkaIngressStartupPosition> startupPosition =
+ OptionalProperty.withDefault(KafkaIngressStartupPosition.fromLatest());
private KafkaIngressBuilder(IngressIdentifier<T> id) {
this.id = Objects.requireNonNull(id);
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
index a25d23a..f9325ca 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
@@ -24,6 +24,7 @@
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.core.OptionalProperty;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
@@ -42,9 +43,16 @@
private KinesisIngressDeserializer<T> deserializer;
private KinesisIngressStartupPosition startupPosition =
KinesisIngressStartupPosition.fromLatest();
- private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
- private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
- private final Properties clientConfigurationProperties = new Properties();
+ private OptionalProperty<AwsRegion> awsRegion =
+ OptionalProperty.withDefault(AwsRegion.fromDefaultProviderChain());
+ private OptionalProperty<AwsCredentials> awsCredentials =
+ OptionalProperty.withDefault(AwsCredentials.fromDefaultProviderChain());
+
+ /**
+ * Contains properties for both the underlying AWS client, as well as Flink-connector specific
+ * properties.
+ */
+ private final Properties properties = new Properties();
private KinesisIngressBuilder(IngressIdentifier<T> id) {
this.id = Objects.requireNonNull(id);
@@ -109,7 +117,7 @@
* @see AwsRegion
*/
public KinesisIngressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
- this.awsRegion = Objects.requireNonNull(awsRegion);
+ this.awsRegion.set(Objects.requireNonNull(awsRegion));
return this;
}
@@ -120,7 +128,7 @@
* @param regionName The unique id of the AWS region to connect to.
*/
public KinesisIngressBuilder<T> withAwsRegion(String regionName) {
- this.awsRegion = AwsRegion.ofId(regionName);
+ this.awsRegion.set(AwsRegion.ofId(regionName));
return this;
}
@@ -134,7 +142,7 @@
* @see AwsCredentials
*/
public KinesisIngressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
- this.awsCredentials = Objects.requireNonNull(awsCredentials);
+ this.awsCredentials.set(Objects.requireNonNull(awsCredentials));
return this;
}
@@ -150,24 +158,27 @@
* @param value the value for the property.
* @see <a
* href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+ * @deprecated Please use {@link #withProperty(String, String)} instead.
*/
+ @Deprecated
public KinesisIngressBuilder<T> withClientConfigurationProperty(String key, String value) {
Objects.requireNonNull(key);
Objects.requireNonNull(value);
- this.clientConfigurationProperties.setProperty(key, value);
+ this.properties.setProperty(key, value);
+ return this;
+ }
+
+ public KinesisIngressBuilder<T> withProperty(String key, String value) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(value);
+ this.properties.setProperty(key, value);
return this;
}
/** @return A new {@link KinesisIngressSpec}. */
public KinesisIngressSpec<T> build() {
return new KinesisIngressSpec<>(
- id,
- streams,
- deserializer,
- startupPosition,
- awsRegion,
- awsCredentials,
- clientConfigurationProperties);
+ id, streams, deserializer, startupPosition, awsRegion, awsCredentials, properties);
}
// ========================================================================================
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
index a6da2c5..4dc4b7f 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
@@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.core.OptionalProperty;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
@@ -32,24 +33,24 @@
private final List<String> streams;
private final KinesisIngressDeserializer<T> deserializer;
private final KinesisIngressStartupPosition startupPosition;
- private final AwsRegion awsRegion;
- private final AwsCredentials awsCredentials;
- private final Properties clientConfigurationProperties;
+ private final OptionalProperty<AwsRegion> awsRegion;
+ private final OptionalProperty<AwsCredentials> awsCredentials;
+ private final Properties properties;
KinesisIngressSpec(
IngressIdentifier<T> ingressIdentifier,
List<String> streams,
KinesisIngressDeserializer<T> deserializer,
KinesisIngressStartupPosition startupPosition,
- AwsRegion awsRegion,
- AwsCredentials awsCredentials,
- Properties clientConfigurationProperties) {
+ OptionalProperty<AwsRegion> awsRegion,
+ OptionalProperty<AwsCredentials> awsCredentials,
+ Properties properties) {
this.ingressIdentifier = Objects.requireNonNull(ingressIdentifier, "ingress identifier");
this.deserializer = Objects.requireNonNull(deserializer, "deserializer");
this.startupPosition = Objects.requireNonNull(startupPosition, "startup position");
this.awsRegion = Objects.requireNonNull(awsRegion, "AWS region configuration");
this.awsCredentials = Objects.requireNonNull(awsCredentials, "AWS credentials configuration");
- this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+ this.properties = Objects.requireNonNull(properties);
this.streams = Objects.requireNonNull(streams, "AWS Kinesis stream names");
if (streams.isEmpty()) {
@@ -80,15 +81,15 @@
return startupPosition;
}
- public AwsRegion awsRegion() {
+ public OptionalProperty<AwsRegion> awsRegion() {
return awsRegion;
}
- public AwsCredentials awsCredentials() {
+ public OptionalProperty<AwsCredentials> awsCredentials() {
return awsCredentials;
}
- public Properties clientConfigurationProperties() {
- return clientConfigurationProperties;
+ public Properties properties() {
+ return properties;
}
}
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
index 0bc4cb2..714981f 100644
--- a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
@@ -47,11 +47,11 @@
assertThat(kinesisIngressSpec.id(), is(ID));
assertThat(kinesisIngressSpec.streams(), is(Collections.singletonList(STREAM_NAME)));
- assertTrue(kinesisIngressSpec.awsRegion().isDefault());
- assertTrue(kinesisIngressSpec.awsCredentials().isDefault());
+ assertTrue(kinesisIngressSpec.awsRegion().get().isDefault());
+ assertTrue(kinesisIngressSpec.awsCredentials().get().isDefault());
assertThat(kinesisIngressSpec.deserializer(), instanceOf(TestDeserializer.class));
assertTrue(kinesisIngressSpec.startupPosition().isLatest());
- assertTrue(kinesisIngressSpec.clientConfigurationProperties().isEmpty());
+ assertTrue(kinesisIngressSpec.properties().isEmpty());
}
private static final class TestDeserializer implements KinesisIngressDeserializer<String> {
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/OptionalConfig.java b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/core/OptionalProperty.java
similarity index 66%
rename from statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/OptionalConfig.java
rename to statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/core/OptionalProperty.java
index c6dbfc7..8f53f60 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/OptionalConfig.java
+++ b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/core/OptionalProperty.java
@@ -15,41 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.statefun.sdk.kafka;
+package org.apache.flink.statefun.sdk.core;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
+import java.util.function.BiConsumer;
import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
/**
* Utility class to represent an optional config, which may have a predefined default value.
*
* @param <T> type of the configuration value.
*/
-final class OptionalConfig<T> {
+@ForRuntime
+public final class OptionalProperty<T> {
private final T defaultValue;
private T value;
- static <T> OptionalConfig<T> withDefault(T defaultValue) {
+ public static <T> OptionalProperty<T> withDefault(T defaultValue) {
Objects.requireNonNull(defaultValue);
- return new OptionalConfig<>(defaultValue);
+ return new OptionalProperty<>(defaultValue);
}
- static <T> OptionalConfig<T> withoutDefault() {
- return new OptionalConfig<>(null);
+ public static <T> OptionalProperty<T> withoutDefault() {
+ return new OptionalProperty<>(null);
}
- private OptionalConfig(@Nullable T defaultValue) {
+ private OptionalProperty(@Nullable T defaultValue) {
this.defaultValue = defaultValue;
}
- void set(T value) {
+ public void set(T value) {
this.value = Objects.requireNonNull(value);
}
- T get() {
+ public T get() {
if (!isSet() && !hasDefault()) {
throw new NoSuchElementException(
"A value has not been set, and no default value was defined.");
@@ -57,12 +60,19 @@
return isSet() ? value : defaultValue;
}
- void overwritePropertiesIfPresent(Properties properties, String key) {
+ public void overwritePropertiesIfPresent(Properties properties, String key) {
if (isSet() || (!properties.containsKey(key) && hasDefault())) {
properties.setProperty(key, get().toString());
}
}
+ public void transformPropertiesIfPresent(
+ Properties properties, String key, BiConsumer<Properties, T> transformer) {
+ if (isSet() || (!properties.containsKey(key) && hasDefault())) {
+ transformer.accept(properties, get());
+ }
+ }
+
private boolean hasDefault() {
return defaultValue != null;
}