[FLINK-19827] Use the local execution env with an explicit flink-configuration

This commit provides an explicit configuration to the local execution environment
instead of an empty default flink-configuration.
diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 9eb7def..50fc5ba 100644
--- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -102,6 +102,12 @@
     return this;
   }
 
+  /** Set the desired parallelism. */
+  public Harness withParallelism(int parallelism) {
+    flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+    return this;
+  }
+
   /**
    * Sets a global configuration available in the {@link
    * org.apache.flink.statefun.sdk.spi.StatefulFunctionModule} on configure.
@@ -112,9 +118,10 @@
   }
 
   public void start() throws Exception {
-    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
     configureStrictlyRequiredFlinkConfigs(flinkConfig);
+    final int parallelism = getParallelism(flinkConfig);
+    StreamExecutionEnvironment env =
+        StreamExecutionEnvironment.createLocalEnvironment(parallelism, flinkConfig);
     // Configure will change the value of a setting only if a corresponding option was set in the
     // underlying configuration. If a key is not present, the current value of a field will remain
     // untouched.
@@ -127,6 +134,16 @@
     StatefulFunctionsJob.main(env, stateFunConfig);
   }
 
+  private static int getParallelism(Configuration config) {
+    final int parallelism;
+    if (config.contains(CoreOptions.DEFAULT_PARALLELISM)) {
+      parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+    } else {
+      parallelism = Runtime.getRuntime().availableProcessors();
+    }
+    return parallelism;
+  }
+
   private static final class HarnessProvider implements StatefulFunctionsUniverseProvider {
     private static final long serialVersionUID = 1;