Exclude startup/shutdown from benchmarks
- AsyncWaitOperatorBenchmark creates a new ExecutorService per iteration to reduce usage of shared resources
- AsyncWaitOperatorBenchmark shuts down executor service before job is finished to prevent leaftover futures from being completed (which would call into the mail box, causing exceptions)
- these exceptions possibly also interfered with the RM shutdown _somehow_
- FlinkEnvironmentContext creates a MiniCluster for each beachmark to have this setup time excluded from the benchmark times
diff --git a/pom.xml b/pom.xml
index 9c0778d..730e8ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,6 +190,13 @@
</dependency>
<dependency>
+ <!-- required for using a pre-defined MiniCluster -->
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
diff --git a/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
index f3fae5d..93d50ba 100644
--- a/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/AsyncWaitOperatorBenchmark.java
@@ -18,6 +18,12 @@
package org.apache.flink.benchmark;
import org.apache.flink.benchmark.functions.LongSource;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -26,6 +32,8 @@
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FileUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.Param;
@@ -37,6 +45,7 @@
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
+import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -48,8 +57,6 @@
private static final long CHECKPOINT_INTERVAL_MS = 100;
- private static ExecutorService executor;
-
@Param
public AsyncDataStream.OutputMode outputMode;
@@ -63,16 +70,6 @@
new Runner(options).run();
}
- @Setup
- public void setUp() {
- executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- }
-
- @TearDown
- public void tearDown() {
- executor.shutdown();
- }
-
@Benchmark
public void asyncWait(FlinkEnvironmentContext context) throws Exception {
@@ -107,9 +104,23 @@
}
private static class BenchmarkAsyncFunctionExecutor extends RichAsyncFunction<Long, Long> {
+
+ private ExecutorService executor;
+
+ @Override
+ public void open(Configuration parameters) {
+ executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ }
+
@Override
public void asyncInvoke(Long input, ResultFuture<Long> resultFuture) {
executor.execute(() -> resultFuture.complete(Collections.singleton(input * 2)));
}
+
+ @Override
+ public void close() throws InterruptedException {
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 915a4a8..14a3a24 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -88,7 +88,7 @@
private final int parallelism = 4;
@Override
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
super.setUp();
env.setParallelism(parallelism);
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index 15091a4..a40e445 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -67,7 +67,7 @@
public static class BlockingPartitionEnvironmentContext extends FlinkEnvironmentContext {
@Override
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
super.setUp();
env.setParallelism(PARALLELISM);
diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index f206141..9057e74 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -19,15 +19,27 @@
package org.apache.flink.benchmark;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterPipelineExecutorServiceLoader;
+import org.apache.flink.util.Preconditions;
+import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.TemporalUnit;
+import java.util.concurrent.TimeUnit;
import static org.openjdk.jmh.annotations.Scope.Thread;
@@ -37,14 +49,26 @@
public static final int NUM_NETWORK_BUFFERS = 1000;
public StreamExecutionEnvironment env;
+ public MiniCluster miniCluster;
protected final int parallelism = 1;
protected final boolean objectReuse = true;
@Setup
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
+ if (miniCluster != null) {
+ throw new RuntimeException("setUp was called multiple times!");
+ }
+ final Configuration clusterConfig = createConfiguration();
+ miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(clusterConfig).build());
+ miniCluster.start();
+
// set up the execution environment
- env = getStreamExecutionEnvironment();
+ env = new StreamExecutionEnvironment(
+ new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+ clusterConfig,
+ null);
+
env.setParallelism(parallelism);
if (objectReuse) {
env.getConfig().enableObjectReuse();
@@ -53,17 +77,21 @@
env.setStateBackend(new MemoryStateBackend());
}
+ @TearDown
+ public void tearDown() throws Exception {
+ miniCluster.close();
+ miniCluster = null;
+ }
+
public void execute() throws Exception {
env.execute();
}
protected Configuration createConfiguration() {
final Configuration configuration = new Configuration();
+ configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS);
+ configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME);
return configuration;
}
-
- private StreamExecutionEnvironment getStreamExecutionEnvironment() {
- return StreamExecutionEnvironment.createLocalEnvironment(1, createConfiguration());
- }
}
diff --git a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
index 85149f2..eefc994 100644
--- a/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MemoryStateBackendBenchmark.java
@@ -64,7 +64,7 @@
public StateBackend stateBackend = StateBackend.MEMORY;
@Override
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
super.setUp(stateBackend, RECORDS_PER_INVOCATION);
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
index b748d60..c280de4 100644
--- a/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/RocksStateBackendBenchmark.java
@@ -67,7 +67,7 @@
public StateBackend stateBackend = StateBackend.MEMORY;
@Override
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
super.setUp(stateBackend, RECORDS_PER_INVOCATION);
}
diff --git a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
index cbbbd05..c84eb77 100644
--- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
@@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.util.FileUtils;
+import org.openjdk.jmh.annotations.TearDown;
import java.io.File;
import java.io.IOException;
@@ -56,7 +57,7 @@
}
}
- public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws IOException {
+ public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws Exception {
super.setUp();
final AbstractStateBackend backend;
@@ -87,6 +88,7 @@
source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation));
}
+ @Override
public void tearDown() throws IOException {
FileUtils.deleteDirectory(checkpointDir);
}
diff --git a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
index 58b3e1c..87c5e1f 100644
--- a/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
@@ -95,7 +95,7 @@
public String timeout = "0";
@Override
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
super.setUp();
env.setParallelism(parallelism);
diff --git a/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java b/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java
index 8fa6b6b..07ad76d 100644
--- a/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/WindowBenchmarks.java
@@ -84,7 +84,7 @@
public DataStreamSource<IntegerLongSource.Record> source;
@Override
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
super.setUp();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);