[FLINK-19256] [core] Only allow creating throush static factory methods
This closes #153.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/FlinkConfigExtractor.java
similarity index 80%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/FlinkConfigExtractor.java
index 72bc44c..30dfba0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/FlinkConfigExtractor.java
@@ -23,9 +23,14 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-class ReflectiveFlinkConfigExtractor {
+final class FlinkConfigExtractor {
- static Configuration extractFromEnv(StreamExecutionEnvironment env) {
+ /**
+ * Reflectively extracts Flink {@link Configuration} from a {@link StreamExecutionEnvironment}.
+ * The Flink configuration contains Stateful Functions specific configurations.
+ * This is currently a private method in the {@code StreamExecutionEnvironment} class.
+ */
+ static Configuration reflectivelyExtractFromEnv(StreamExecutionEnvironment env) {
try {
return (Configuration) getConfigurationMethod().invoke(env);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 9c96bce..4982571 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -104,7 +104,7 @@
* current environment set via the {@code flink-conf.yaml}.
*/
public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
- Configuration configuration = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+ Configuration configuration = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
return new StatefulFunctionsConfig(configuration);
}
@@ -131,7 +131,7 @@
*
* @param configuration a configuration to read the values from
*/
- public StatefulFunctionsConfig(Configuration configuration) {
+ private StatefulFunctionsConfig(Configuration configuration) {
this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
this.flinkJobName = configuration.get(FLINK_JOB_NAME);
this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 803bde4..5f56786 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -36,7 +36,7 @@
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- Configuration flinkConfig = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+ Configuration flinkConfig = FlinkConfigExtractor.reflectivelyExtractFromEnv(env);
StatefulFunctionsConfigValidator.validate(flinkConfig);
StatefulFunctionsConfig stateFunConfig =
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index 5a93dd7..baa704b 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -20,7 +20,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.hamcrest.Matchers;
@@ -48,7 +47,8 @@
configuration.setString("statefun.module.global-config.key1", "value1");
configuration.setString("statefun.module.global-config.key2", "value2");
- StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(configuration);
+ StatefulFunctionsConfig stateFunConfig =
+ StatefulFunctionsConfig.fromFlinkConfiguration(configuration);
Assert.assertEquals(stateFunConfig.getFlinkJobName(), testName);
Assert.assertEquals(stateFunConfig.getFactoryType(), MessageFactoryType.WITH_KRYO_PAYLOADS);
@@ -60,12 +60,6 @@
stateFunConfig.getGlobalConfigurations(), Matchers.hasEntry("key2", "value2"));
}
- @Test(expected = StatefulFunctionsInvalidConfigException.class)
- public void invalidStrictFlinkConfigsThrows() {
- Configuration configuration = new Configuration();
- new StatefulFunctionsConfig(configuration);
- }
-
private static Configuration validConfiguration() {
Configuration configuration = new Configuration();
configuration.set(StatefulFunctionsConfig.FLINK_JOB_NAME, "name");
diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 6fe3856..9eb7def 100644
--- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -120,7 +120,8 @@
// untouched.
env.configure(flinkConfig, Thread.currentThread().getContextClassLoader());
- StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConfig);
+ StatefulFunctionsConfig stateFunConfig =
+ StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
stateFunConfig.setProvider(new HarnessProvider(overrideIngress, overrideEgress));
StatefulFunctionsJob.main(env, stateFunConfig);