[FLINK-19256] [core] Only allow creating throush static factory methods

This closes #153.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/FlinkConfigExtractor.java
similarity index 80%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/FlinkConfigExtractor.java
index 72bc44c..30dfba0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/FlinkConfigExtractor.java
@@ -23,9 +23,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-class ReflectiveFlinkConfigExtractor {
+final class FlinkConfigExtractor {
 
-  static Configuration extractFromEnv(StreamExecutionEnvironment env) {
+  /**
+   * Reflectively extracts Flink {@link Configuration} from a {@link StreamExecutionEnvironment}.
+   * The Flink configuration contains Stateful Functions specific configurations.
+   * This is currently a private method in the {@code StreamExecutionEnvironment} class.
+   */
+  static Configuration reflectivelyExtractFromEnv(StreamExecutionEnvironment env) {
     try {
       return (Configuration) getConfigurationMethod().invoke(env);
     } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 9c96bce..4982571 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -104,7 +104,7 @@
    * current environment set via the {@code flink-conf.yaml}.
    */
   public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
-    Configuration configuration = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+    Configuration configuration = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
     return new StatefulFunctionsConfig(configuration);
   }
 
@@ -131,7 +131,7 @@
    *
    * @param configuration a configuration to read the values from
    */
-  public StatefulFunctionsConfig(Configuration configuration) {
+  private StatefulFunctionsConfig(Configuration configuration) {
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
     this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 803bde4..5f56786 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -36,7 +36,7 @@
 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-    Configuration flinkConfig = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+    Configuration flinkConfig = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
     StatefulFunctionsConfigValidator.validate(flinkConfig);
 
     StatefulFunctionsConfig stateFunConfig =
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index 5a93dd7..baa704b 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -20,7 +20,6 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.hamcrest.Matchers;
@@ -48,7 +47,8 @@
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
 
-    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(configuration);
+    StatefulFunctionsConfig stateFunConfig =
+        StatefulFunctionsConfig.fromFlinkConfiguration(configuration);
 
     Assert.assertEquals(stateFunConfig.getFlinkJobName(), testName);
     Assert.assertEquals(stateFunConfig.getFactoryType(), MessageFactoryType.WITH_KRYO_PAYLOADS);
@@ -60,12 +60,6 @@
         stateFunConfig.getGlobalConfigurations(), Matchers.hasEntry("key2", "value2"));
   }
 
-  @Test(expected = StatefulFunctionsInvalidConfigException.class)
-  public void invalidStrictFlinkConfigsThrows() {
-    Configuration configuration = new Configuration();
-    new StatefulFunctionsConfig(configuration);
-  }
-
   private static Configuration validConfiguration() {
     Configuration configuration = new Configuration();
     configuration.set(StatefulFunctionsConfig.FLINK_JOB_NAME, "name");
diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 6fe3856..9eb7def 100644
--- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -120,7 +120,8 @@
     // untouched.
     env.configure(flinkConfig, Thread.currentThread().getContextClassLoader());
 
-    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConfig);
+    StatefulFunctionsConfig stateFunConfig =
+        StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
     stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
     stateFunConfig.setProvider(new HarnessProvider(overrideIngress, overrideEgress));
     StatefulFunctionsJob.main(env, stateFunConfig);