Exclude startup/shutdown from benchmarks

- AsyncWaitOperatorBenchmark creates a new ExecutorService per iteration to reduce usage of shared resources
- AsyncWaitOperatorBenchmark shuts down executor service before job is finished to prevent leaftover futures from being completed (which would call into the mail box, causing exceptions)
	- these exceptions possibly also interfered with the RM shutdown _somehow_
- FlinkEnvironmentContext creates a MiniCluster for each beachmark to have this setup time excluded from the benchmark times
diff --git a/pom.xml b/pom.xml
index 9c0778d..730e8ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,13 @@
 		</dependency>
 
 		<dependency>
+			<!-- required for using a pre-defined MiniCluster -->
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${flink.version}</version>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
diff --git a/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
index f3fae5d..93d50ba 100644
--- a/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
@@ -18,6 +18,12 @@
 package org.apache.flink.benchmark;
 
 import org.apache.flink.benchmark.functions.LongSource;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -26,6 +32,8 @@
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.Param;
@@ -37,6 +45,7 @@
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -48,8 +57,6 @@
 
 	private static final long CHECKPOINT_INTERVAL_MS = 100;
 
-	private static ExecutorService executor;
-
 	@Param
 	public AsyncDataStream.OutputMode outputMode;
 
@@ -63,16 +70,6 @@
 		new Runner(options).run();
 	}
 
-	@Setup
-	public void setUp() {
-		executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-	}
-
-	@TearDown
-	public void tearDown() {
-		executor.shutdown();
-	}
-
 	@Benchmark
 	public void asyncWait(FlinkEnvironmentContext context) throws Exception {
 
@@ -107,9 +104,23 @@
 	}
 
 	private static class BenchmarkAsyncFunctionExecutor extends RichAsyncFunction<Long, Long> {
+
+		private ExecutorService executor;
+
+		@Override
+		public void open(Configuration parameters) {
+			executor =  Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+		}
+
 		@Override
 		public void asyncInvoke(Long input, ResultFuture<Long> resultFuture) {
 			executor.execute(() -> resultFuture.complete(Collections.singleton(input * 2)));
 		}
+
+		@Override
+		public void close() throws InterruptedException {
+			executor.shutdownNow();
+			executor.awaitTermination(1, TimeUnit.MINUTES);
+		}
 	}
 }
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 915a4a8..14a3a24 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -88,7 +88,7 @@
 		private final int parallelism = 4;
 
 		@Override
-		public void setUp() throws IOException {
+		public void setUp() throws Exception {
 			super.setUp();
 
 			env.setParallelism(parallelism);
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 15091a4..a40e445 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -67,7 +67,7 @@
     public static class BlockingPartitionEnvironmentContext extends FlinkEnvironmentContext {
 
         @Override
-        public void setUp() throws IOException {
+        public void setUp() throws Exception {
             super.setUp();
 
             env.setParallelism(PARALLELISM);
diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index f206141..9057e74 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -19,15 +19,27 @@
 package org.apache.flink.benchmark;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
+import org.apache.flink.util.Preconditions;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.TemporalUnit;
+import java.util.concurrent.TimeUnit;
 
 import static org.openjdk.jmh.annotations.Scope.Thread;
 
@@ -37,14 +49,26 @@
     public static final int NUM_NETWORK_BUFFERS = 1000;
 
     public StreamExecutionEnvironment env;
+    public MiniCluster miniCluster;
 
     protected final int parallelism = 1;
     protected final boolean objectReuse = true;
 
     @Setup
-    public void setUp() throws IOException {
+    public void setUp() throws Exception {
+        if (miniCluster != null) {
+            throw new RuntimeException("setUp was called multiple times!");
+        }
+        final Configuration clusterConfig = createConfiguration();
+        miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(clusterConfig).build());
+        miniCluster.start();
+
         // set up the execution environment
-        env = getStreamExecutionEnvironment();
+        env = new StreamExecutionEnvironment(
+            new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+            clusterConfig,
+            null);
+
         env.setParallelism(parallelism);
         if (objectReuse) {
             env.getConfig().enableObjectReuse();
@@ -53,17 +77,21 @@
         env.setStateBackend(new MemoryStateBackend());
     }
 
+    @TearDown
+    public void tearDown() throws Exception {
+        miniCluster.close();
+        miniCluster = null;
+    }
+
     public void execute() throws Exception {
         env.execute();
     }
 
     protected Configuration createConfiguration() {
         final Configuration configuration = new Configuration();
+        configuration.setString(RestOptions.BIND_PORT, "0");
         configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS);
+        configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME);
         return configuration;
     }
-
-    private StreamExecutionEnvironment getStreamExecutionEnvironment() {
-        return StreamExecutionEnvironment.createLocalEnvironment(1, createConfiguration());
-    }
 }
diff --git a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
index 85149f2..eefc994 100644
--- a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
@@ -64,7 +64,7 @@
 		public StateBackend stateBackend = StateBackend.MEMORY;
 
 		@Override
-		public void setUp() throws IOException {
+		public void setUp() throws Exception {
 			super.setUp(stateBackend, RECORDS_PER_INVOCATION);
 		}
 	}
diff --git a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
index b748d60..c280de4 100644
--- a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
@@ -67,7 +67,7 @@
 		public StateBackend stateBackend = StateBackend.MEMORY;
 
 		@Override
-		public void setUp() throws IOException {
+		public void setUp() throws Exception {
 			super.setUp(stateBackend, RECORDS_PER_INVOCATION);
 		}
 
diff --git a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
index cbbbd05..c84eb77 100644
--- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
@@ -26,6 +26,7 @@
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.util.FileUtils;
+import org.openjdk.jmh.annotations.TearDown;
 
 import java.io.File;
 import java.io.IOException;
@@ -56,7 +57,7 @@
 			}
 		}
 
-		public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws IOException {
+		public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws Exception {
 			super.setUp();
 
 			final AbstractStateBackend backend;
@@ -87,6 +88,7 @@
 			source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation));
 		}
 
+		@Override
 		public void tearDown() throws IOException {
 			FileUtils.deleteDirectory(checkpointDir);
 		}
diff --git a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index 58b3e1c..87c5e1f 100644
--- a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -95,7 +95,7 @@
         public String timeout = "0";
 
         @Override
-        public void setUp() throws IOException {
+        public void setUp() throws Exception {
             super.setUp();
 
             env.setParallelism(parallelism);
diff --git a/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java b/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java
index 8fa6b6b..07ad76d 100644
--- a/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java
@@ -84,7 +84,7 @@
 		public DataStreamSource<IntegerLongSource.Record> source;
 
 		@Override
-		public void setUp() throws IOException {
+		public void setUp() throws Exception {
 			super.setUp();
 
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);