[FLINK-16057] Don't end input in ContinuousFileReaderOperator Benchmark

Currently, END_OF_INPUT is sent once all splits were emitted.
Thus, all subsequent reads in ContinuousFileReaderOperator are made in
CLOSING state in a simple loop (MailboxExecutor.isIdle is always true).
Which is not a scenario this benchmark is intended for.

This change makes source to wait until all elements reach sink.
diff --git a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
index 7cc4f3f..b4937c7 100644
--- a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
@@ -24,19 +24,25 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 import org.openjdk.jmh.runner.options.VerboseMode;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 @OperationsPerInvocation(value = ContinuousFileReaderOperatorBenchmark.RECORDS_PER_INVOCATION)
 public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase {
     private static final int SPLITS_PER_INVOCATION = 100;
@@ -46,6 +52,10 @@
     private static final TimestampedFileInputSplit SPLIT = new TimestampedFileInputSplit(0, 0, new Path("."), 0, 0, new String[]{});
     private static final String LINE = Strings.repeat('0', 10);
 
+    // Source should wait until all elements reach sink. Otherwise, END_OF_INPUT is sent once all splits are emitted.
+    // Thus, all subsequent reads in ContinuousFileReaderOperator would be made in CLOSING state in a simple while-true loop (MailboxExecutor.isIdle is always true).
+    private static OneShotLatch TARGET_COUNT_REACHED_LATCH = new OneShotLatch();
+
     public static void main(String[] args)
             throws RunnerException {
         Options options = new OptionsBuilder()
@@ -56,6 +66,11 @@
         new Runner(options).run();
     }
 
+    @TearDown(Level.Iteration)
+    public void tearDown() {
+        TARGET_COUNT_REACHED_LATCH.reset();
+    }
+
     @Benchmark
     public void readFileSplit(FlinkEnvironmentContext context) throws Exception {
         StreamExecutionEnvironment env = context.env;
@@ -66,7 +81,7 @@
                 .addSource(new MockSourceFunction())
                 .transform("fileReader", TypeInformation.of(String.class),
                         new ContinuousFileReaderOperatorFactory<>(new MockInputFormat()))
-                .addSink(new DiscardingSink<>());
+                .addSink(new LimitedSink());
 
         env.execute();
     }
@@ -83,6 +98,18 @@
                     ctx.collect(SPLIT);
                 }
             }
+            while (isRunning) {
+                try {
+                    TARGET_COUNT_REACHED_LATCH.await(100, TimeUnit.MILLISECONDS);
+                    return;
+                } catch (InterruptedException e) {
+                    if (!isRunning) {
+                        Thread.currentThread().interrupt();
+                    }
+                } catch (TimeoutException e) {
+                	// continue waiting
+                }
+            }
         }
 
         @Override
@@ -116,4 +143,15 @@
             // prevent super from requiring certain settings (input.file.path)
         }
     }
+
+    private static class LimitedSink implements SinkFunction<String> {
+        private int count;
+
+        @Override
+        public void invoke(String value, Context context) {
+        	if (++count == RECORDS_PER_INVOCATION) {
+        	    TARGET_COUNT_REACHED_LATCH.trigger();
+            }
+        }
+    }
 }