[FLINK-19827] Use the local execution env with an explicit flink-configuration
This commit provides an explicit configuration to the local execution environment
instead of an empty default flink-configuration.
diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 9eb7def..50fc5ba 100644
--- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -102,6 +102,12 @@
return this;
}
+ /** Set the desired parallelism. */
+ public Harness withParallelism(int parallelism) {
+ flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+ return this;
+ }
+
/**
* Sets a global configuration available in the {@link
* org.apache.flink.statefun.sdk.spi.StatefulFunctionModule} on configure.
@@ -112,9 +118,10 @@
}
public void start() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
configureStrictlyRequiredFlinkConfigs(flinkConfig);
+ final int parallelism = getParallelism(flinkConfig);
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.createLocalEnvironment(parallelism, flinkConfig);
// Configure will change the value of a setting only if a corresponding option was set in the
// underlying configuration. If a key is not present, the current value of a field will remain
// untouched.
@@ -127,6 +134,16 @@
StatefulFunctionsJob.main(env, stateFunConfig);
}
+ private static int getParallelism(Configuration config) {
+ final int parallelism;
+ if (config.contains(CoreOptions.DEFAULT_PARALLELISM)) {
+ parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+ } else {
+ parallelism = Runtime.getRuntime().availableProcessors();
+ }
+ return parallelism;
+ }
+
private static final class HarnessProvider implements StatefulFunctionsUniverseProvider {
private static final long serialVersionUID = 1;