[FLINK-22529][kinesis] Allow Flink's ConsumerConfigConstants and flexibility in providing AWS region and credentials

This closes #237.
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
index 93e322e..6096ec7 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/KinesisSourceProvider.java
@@ -60,21 +60,35 @@
   private static Properties propertiesFromSpec(KinesisIngressSpec<?> spec) {
     final Properties properties = new Properties();
 
-    properties.putAll(resolveClientProperties(spec.clientConfigurationProperties()));
-    properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(spec.awsRegion()));
-    properties.putAll(AwsAuthConfigProperties.forAwsCredentials(spec.awsCredentials()));
+    properties.putAll(resolveProperties(spec.properties()));
+    spec.awsRegion()
+        .transformPropertiesIfPresent(
+            properties,
+            ConsumerConfigConstants.AWS_REGION,
+            (props, region) ->
+                properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(region)));
+    spec.awsCredentials()
+        .transformPropertiesIfPresent(
+            properties,
+            ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
+            (props, credentials) ->
+                properties.putAll(AwsAuthConfigProperties.forAwsCredentials(credentials)));
 
     setStartupPositionProperties(properties, spec.startupPosition());
 
     return properties;
   }
 
-  private static Properties resolveClientProperties(Properties clientConfigurationProperties) {
+  private static Properties resolveProperties(Properties properties) {
     final Properties resolvedProps = new Properties();
-    for (String property : clientConfigurationProperties.stringPropertyNames()) {
-      resolvedProps.setProperty(
-          asFlinkConsumerClientPropertyKey(property),
-          clientConfigurationProperties.getProperty(property));
+    for (String property : properties.stringPropertyNames()) {
+      if (property.startsWith("flink.") || property.startsWith("aws.")) {
+        resolvedProps.setProperty(property, properties.getProperty(property));
+      } else {
+        // all other configs are assumed to be AWS configs
+        resolvedProps.setProperty(
+            asAwsClientPropertyKey(property), properties.getProperty(property));
+      }
     }
     return resolvedProps;
   }
@@ -105,7 +119,7 @@
     }
   }
 
-  private static String asFlinkConsumerClientPropertyKey(String key) {
+  private static String asAwsClientPropertyKey(String key) {
     return AWSUtil.AWS_CLIENT_CONFIG_PREFIX + lowercaseFirstLetter(key);
   }
 
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
index cb16d5c..d3b96b6 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
+++ b/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/KafkaIngressBuilder.java
@@ -24,6 +24,7 @@
 import java.util.Objects;
 import java.util.Properties;
 import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.core.OptionalProperty;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,14 +40,14 @@
   private final List<String> topics = new ArrayList<>();
   private final Properties properties = new Properties();
 
-  private OptionalConfig<String> consumerGroupId = OptionalConfig.withoutDefault();
-  private OptionalConfig<KafkaIngressDeserializer<T>> deserializer =
-      OptionalConfig.withoutDefault();
-  private OptionalConfig<String> kafkaAddress = OptionalConfig.withoutDefault();
-  private OptionalConfig<KafkaIngressAutoResetPosition> autoResetPosition =
-      OptionalConfig.withDefault(KafkaIngressAutoResetPosition.LATEST);
-  private OptionalConfig<KafkaIngressStartupPosition> startupPosition =
-      OptionalConfig.withDefault(KafkaIngressStartupPosition.fromLatest());
+  private OptionalProperty<String> consumerGroupId = OptionalProperty.withoutDefault();
+  private OptionalProperty<KafkaIngressDeserializer<T>> deserializer =
+      OptionalProperty.withoutDefault();
+  private OptionalProperty<String> kafkaAddress = OptionalProperty.withoutDefault();
+  private OptionalProperty<KafkaIngressAutoResetPosition> autoResetPosition =
+      OptionalProperty.withDefault(KafkaIngressAutoResetPosition.LATEST);
+  private OptionalProperty<KafkaIngressStartupPosition> startupPosition =
+      OptionalProperty.withDefault(KafkaIngressStartupPosition.fromLatest());
 
   private KafkaIngressBuilder(IngressIdentifier<T> id) {
     this.id = Objects.requireNonNull(id);
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
index a25d23a..f9325ca 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressBuilder.java
@@ -24,6 +24,7 @@
 import java.util.Objects;
 import java.util.Properties;
 import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.core.OptionalProperty;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressSpec;
 import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials;
@@ -42,9 +43,16 @@
   private KinesisIngressDeserializer<T> deserializer;
   private KinesisIngressStartupPosition startupPosition =
       KinesisIngressStartupPosition.fromLatest();
-  private AwsRegion awsRegion = AwsRegion.fromDefaultProviderChain();
-  private AwsCredentials awsCredentials = AwsCredentials.fromDefaultProviderChain();
-  private final Properties clientConfigurationProperties = new Properties();
+  private OptionalProperty<AwsRegion> awsRegion =
+      OptionalProperty.withDefault(AwsRegion.fromDefaultProviderChain());
+  private OptionalProperty<AwsCredentials> awsCredentials =
+      OptionalProperty.withDefault(AwsCredentials.fromDefaultProviderChain());
+
+  /**
+   * Contains properties for both the underlying AWS client, as well as Flink-connector specific
+   * properties.
+   */
+  private final Properties properties = new Properties();
 
   private KinesisIngressBuilder(IngressIdentifier<T> id) {
     this.id = Objects.requireNonNull(id);
@@ -109,7 +117,7 @@
    * @see AwsRegion
    */
   public KinesisIngressBuilder<T> withAwsRegion(AwsRegion awsRegion) {
-    this.awsRegion = Objects.requireNonNull(awsRegion);
+    this.awsRegion.set(Objects.requireNonNull(awsRegion));
     return this;
   }
 
@@ -120,7 +128,7 @@
    * @param regionName The unique id of the AWS region to connect to.
    */
   public KinesisIngressBuilder<T> withAwsRegion(String regionName) {
-    this.awsRegion = AwsRegion.ofId(regionName);
+    this.awsRegion.set(AwsRegion.ofId(regionName));
     return this;
   }
 
@@ -134,7 +142,7 @@
    * @see AwsCredentials
    */
   public KinesisIngressBuilder<T> withAwsCredentials(AwsCredentials awsCredentials) {
-    this.awsCredentials = Objects.requireNonNull(awsCredentials);
+    this.awsCredentials.set(Objects.requireNonNull(awsCredentials));
     return this;
   }
 
@@ -150,24 +158,27 @@
    * @param value the value for the property.
    * @see <a
    *     href="https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">com.aws.ClientConfiguration</a>.
+   * @deprecated Please use {@link #withProperty(String, String)} instead.
    */
+  @Deprecated
   public KinesisIngressBuilder<T> withClientConfigurationProperty(String key, String value) {
     Objects.requireNonNull(key);
     Objects.requireNonNull(value);
-    this.clientConfigurationProperties.setProperty(key, value);
+    this.properties.setProperty(key, value);
+    return this;
+  }
+
+  public KinesisIngressBuilder<T> withProperty(String key, String value) {
+    Objects.requireNonNull(key);
+    Objects.requireNonNull(value);
+    this.properties.setProperty(key, value);
     return this;
   }
 
   /** @return A new {@link KinesisIngressSpec}. */
   public KinesisIngressSpec<T> build() {
     return new KinesisIngressSpec<>(
-        id,
-        streams,
-        deserializer,
-        startupPosition,
-        awsRegion,
-        awsCredentials,
-        clientConfigurationProperties);
+        id, streams, deserializer, startupPosition, awsRegion, awsCredentials, properties);
   }
 
   // ========================================================================================
diff --git a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
index a6da2c5..4dc4b7f 100644
--- a/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
+++ b/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressSpec.java
@@ -21,6 +21,7 @@
 import java.util.Objects;
 import java.util.Properties;
 import org.apache.flink.statefun.sdk.IngressType;
+import org.apache.flink.statefun.sdk.core.OptionalProperty;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressSpec;
 import org.apache.flink.statefun.sdk.kinesis.KinesisIOTypes;
@@ -32,24 +33,24 @@
   private final List<String> streams;
   private final KinesisIngressDeserializer<T> deserializer;
   private final KinesisIngressStartupPosition startupPosition;
-  private final AwsRegion awsRegion;
-  private final AwsCredentials awsCredentials;
-  private final Properties clientConfigurationProperties;
+  private final OptionalProperty<AwsRegion> awsRegion;
+  private final OptionalProperty<AwsCredentials> awsCredentials;
+  private final Properties properties;
 
   KinesisIngressSpec(
       IngressIdentifier<T> ingressIdentifier,
       List<String> streams,
       KinesisIngressDeserializer<T> deserializer,
       KinesisIngressStartupPosition startupPosition,
-      AwsRegion awsRegion,
-      AwsCredentials awsCredentials,
-      Properties clientConfigurationProperties) {
+      OptionalProperty<AwsRegion> awsRegion,
+      OptionalProperty<AwsCredentials> awsCredentials,
+      Properties properties) {
     this.ingressIdentifier = Objects.requireNonNull(ingressIdentifier, "ingress identifier");
     this.deserializer = Objects.requireNonNull(deserializer, "deserializer");
     this.startupPosition = Objects.requireNonNull(startupPosition, "startup position");
     this.awsRegion = Objects.requireNonNull(awsRegion, "AWS region configuration");
     this.awsCredentials = Objects.requireNonNull(awsCredentials, "AWS credentials configuration");
-    this.clientConfigurationProperties = Objects.requireNonNull(clientConfigurationProperties);
+    this.properties = Objects.requireNonNull(properties);
 
     this.streams = Objects.requireNonNull(streams, "AWS Kinesis stream names");
     if (streams.isEmpty()) {
@@ -80,15 +81,15 @@
     return startupPosition;
   }
 
-  public AwsRegion awsRegion() {
+  public OptionalProperty<AwsRegion> awsRegion() {
     return awsRegion;
   }
 
-  public AwsCredentials awsCredentials() {
+  public OptionalProperty<AwsCredentials> awsCredentials() {
     return awsCredentials;
   }
 
-  public Properties clientConfigurationProperties() {
-    return clientConfigurationProperties;
+  public Properties properties() {
+    return properties;
   }
 }
diff --git a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
index 0bc4cb2..714981f 100644
--- a/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
+++ b/statefun-kinesis-io/src/test/java/org/apache/flink/statefun/sdk/kinesis/KinesisIngressBuilderTest.java
@@ -47,11 +47,11 @@
 
     assertThat(kinesisIngressSpec.id(), is(ID));
     assertThat(kinesisIngressSpec.streams(), is(Collections.singletonList(STREAM_NAME)));
-    assertTrue(kinesisIngressSpec.awsRegion().isDefault());
-    assertTrue(kinesisIngressSpec.awsCredentials().isDefault());
+    assertTrue(kinesisIngressSpec.awsRegion().get().isDefault());
+    assertTrue(kinesisIngressSpec.awsCredentials().get().isDefault());
     assertThat(kinesisIngressSpec.deserializer(), instanceOf(TestDeserializer.class));
     assertTrue(kinesisIngressSpec.startupPosition().isLatest());
-    assertTrue(kinesisIngressSpec.clientConfigurationProperties().isEmpty());
+    assertTrue(kinesisIngressSpec.properties().isEmpty());
   }
 
   private static final class TestDeserializer implements KinesisIngressDeserializer<String> {
diff --git a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/OptionalConfig.java b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/core/OptionalProperty.java
similarity index 66%
rename from statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/OptionalConfig.java
rename to statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/core/OptionalProperty.java
index c6dbfc7..8f53f60 100644
--- a/statefun-kafka-io/src/main/java/org/apache/flink/statefun/sdk/kafka/OptionalConfig.java
+++ b/statefun-sdk-embedded/src/main/java/org/apache/flink/statefun/sdk/core/OptionalProperty.java
@@ -15,41 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.sdk.kafka;
+package org.apache.flink.statefun.sdk.core;
 
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
 
 /**
  * Utility class to represent an optional config, which may have a predefined default value.
  *
  * @param <T> type of the configuration value.
  */
-final class OptionalConfig<T> {
+@ForRuntime
+public final class OptionalProperty<T> {
 
   private final T defaultValue;
   private T value;
 
-  static <T> OptionalConfig<T> withDefault(T defaultValue) {
+  public static <T> OptionalProperty<T> withDefault(T defaultValue) {
     Objects.requireNonNull(defaultValue);
-    return new OptionalConfig<>(defaultValue);
+    return new OptionalProperty<>(defaultValue);
   }
 
-  static <T> OptionalConfig<T> withoutDefault() {
-    return new OptionalConfig<>(null);
+  public static <T> OptionalProperty<T> withoutDefault() {
+    return new OptionalProperty<>(null);
   }
 
-  private OptionalConfig(@Nullable T defaultValue) {
+  private OptionalProperty(@Nullable T defaultValue) {
     this.defaultValue = defaultValue;
   }
 
-  void set(T value) {
+  public void set(T value) {
     this.value = Objects.requireNonNull(value);
   }
 
-  T get() {
+  public T get() {
     if (!isSet() && !hasDefault()) {
       throw new NoSuchElementException(
           "A value has not been set, and no default value was defined.");
@@ -57,12 +60,19 @@
     return isSet() ? value : defaultValue;
   }
 
-  void overwritePropertiesIfPresent(Properties properties, String key) {
+  public void overwritePropertiesIfPresent(Properties properties, String key) {
     if (isSet() || (!properties.containsKey(key) && hasDefault())) {
       properties.setProperty(key, get().toString());
     }
   }
 
+  public void transformPropertiesIfPresent(
+      Properties properties, String key, BiConsumer<Properties, T> transformer) {
+    if (isSet() || (!properties.containsKey(key) && hasDefault())) {
+      transformer.accept(properties, get());
+    }
+  }
+
   private boolean hasDefault() {
     return defaultValue != null;
   }