[FLINK-16057] Don't end input in ContinuousFileReaderOperator Benchmark
Currently, END_OF_INPUT is sent once all splits were emitted.
Thus, all subsequent reads in ContinuousFileReaderOperator are made in
CLOSING state in a simple loop (MailboxExecutor.isIdle is always true).
Which is not a scenario this benchmark is intended for.
This change makes source to wait until all elements reach sink.
diff --git a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
index 7cc4f3f..b4937c7 100644
--- a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
@@ -24,19 +24,25 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
@OperationsPerInvocation(value = ContinuousFileReaderOperatorBenchmark.RECORDS_PER_INVOCATION)
public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase {
private static final int SPLITS_PER_INVOCATION = 100;
@@ -46,6 +52,10 @@
private static final TimestampedFileInputSplit SPLIT = new TimestampedFileInputSplit(0, 0, new Path("."), 0, 0, new String[]{});
private static final String LINE = Strings.repeat('0', 10);
+ // Source should wait until all elements reach sink. Otherwise, END_OF_INPUT is sent once all splits are emitted.
+ // Thus, all subsequent reads in ContinuousFileReaderOperator would be made in CLOSING state in a simple while-true loop (MailboxExecutor.isIdle is always true).
+ private static OneShotLatch TARGET_COUNT_REACHED_LATCH = new OneShotLatch();
+
public static void main(String[] args)
throws RunnerException {
Options options = new OptionsBuilder()
@@ -56,6 +66,11 @@
new Runner(options).run();
}
+ @TearDown(Level.Iteration)
+ public void tearDown() {
+ TARGET_COUNT_REACHED_LATCH.reset();
+ }
+
@Benchmark
public void readFileSplit(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
@@ -66,7 +81,7 @@
.addSource(new MockSourceFunction())
.transform("fileReader", TypeInformation.of(String.class),
new ContinuousFileReaderOperatorFactory<>(new MockInputFormat()))
- .addSink(new DiscardingSink<>());
+ .addSink(new LimitedSink());
env.execute();
}
@@ -83,6 +98,18 @@
ctx.collect(SPLIT);
}
}
+ while (isRunning) {
+ try {
+ TARGET_COUNT_REACHED_LATCH.await(100, TimeUnit.MILLISECONDS);
+ return;
+ } catch (InterruptedException e) {
+ if (!isRunning) {
+ Thread.currentThread().interrupt();
+ }
+ } catch (TimeoutException e) {
+ // continue waiting
+ }
+ }
}
@Override
@@ -116,4 +143,15 @@
// prevent super from requiring certain settings (input.file.path)
}
}
+
+ private static class LimitedSink implements SinkFunction<String> {
+ private int count;
+
+ @Override
+ public void invoke(String value, Context context) {
+ if (++count == RECORDS_PER_INVOCATION) {
+ TARGET_COUNT_REACHED_LATCH.trigger();
+ }
+ }
+ }
}