[FLINK-19256] [core] Move Flink config validation out of StatefulFunctionsConfig

Instead of always validating the Flink configuration when creating a
StatefulFunctionsConfig, we now validate it only in
StatefulFunctionsJob.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
new file mode 100644
index 0000000..72bc44c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+class ReflectiveFlinkConfigExtractor {
+
+  static Configuration extractFromEnv(StreamExecutionEnvironment env) {
+    try {
+      return (Configuration) getConfigurationMethod().invoke(env);
+    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Failed to acquire the Flink configuration from the current environment", e);
+    }
+  }
+
+  private static Method getConfigurationMethod() throws NoSuchMethodException {
+    Method getConfiguration =
+        StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
+    getConfiguration.setAccessible(true);
+    return getConfiguration;
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index ae338f3..9c96bce 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -21,8 +21,6 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -106,20 +104,12 @@
    * current environment set via the {@code flink-conf.yaml}.
    */
   public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
-    Configuration configuration = getConfiguration(env);
+    Configuration configuration = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
     return new StatefulFunctionsConfig(configuration);
   }
 
-  private static Configuration getConfiguration(StreamExecutionEnvironment env) {
-    try {
-      Method getConfiguration =
-          StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
-      getConfiguration.setAccessible(true);
-      return (Configuration) getConfiguration.invoke(env);
-    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-      throw new RuntimeException(
-          "Failed to acquire the Flink configuration from the current environment", e);
-    }
+  public static StatefulFunctionsConfig fromFlinkConfiguration(Configuration flinkConfiguration) {
+    return new StatefulFunctionsConfig(flinkConfiguration);
   }
 
   private MessageFactoryType factoryType;
@@ -142,8 +132,6 @@
    * @param configuration a configuration to read the values from
    */
   public StatefulFunctionsConfig(Configuration configuration) {
-    StatefulFunctionsConfigValidator.validate(configuration);
-
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
     this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 2398cda..803bde4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +35,12 @@
     Map<String, String> globalConfigurations = parameterTool.toMap();
 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-    StatefulFunctionsConfig stateFunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+
+    Configuration flinkConfig = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+    StatefulFunctionsConfigValidator.validate(flinkConfig);
+
+    StatefulFunctionsConfig stateFunConfig =
+        StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
     stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
     stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());