[FLINK-24234][connectors] Time based flushing for AsyncSinkWriter
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 5c6f0e4..b5f6b40 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -33,6 +33,7 @@
 import java.util.Deque;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
@@ -50,15 +51,14 @@
 public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
         implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
 
-    private static final int BYTES_IN_MB = 1024 * 1024;
-
     private final MailboxExecutor mailboxExecutor;
     private final Sink.ProcessingTimeService timeService;
 
     private final int maxBatchSize;
     private final int maxInFlightRequests;
     private final int maxBufferedRequests;
-    private final double flushOnBufferSizeMB;
+    private final long flushOnBufferSizeInBytes;
+    private final long maxTimeInBufferMS;
 
     /**
      * The ElementConverter provides a mapping between for the elements of a stream to request
@@ -70,7 +70,8 @@
     private final ElementConverter<InputT, RequestEntryT> elementConverter;
 
     /**
-     * Buffer to hold request entries that should be persisted into the destination.
+     * Buffer to hold request entries that should be persisted into the destination, along with its
+     * size in bytes.
      *
      * <p>A request entry contain all relevant details to make a call to the destination. Eg, for
      * Kinesis Data Streams a request entry contains the payload and partition key.
@@ -81,7 +82,8 @@
      * construct a new (retry) request entry from the response and add that back to the queue for
      * later retry.
      */
-    private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();
+    private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries =
+            new ArrayDeque<>();
 
     /**
      * Tracks all pending async calls that have been executed since the last checkpoint. Calls that
@@ -100,16 +102,11 @@
 
     /**
      * Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
-     * the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
+     * the criterion for flushing after {@code flushOnBufferSizeInBytes} is reached.
      */
-    private double bufferedRequestEntriesTotalSizeMB;
+    private double bufferedRequestEntriesTotalSizeInBytes;
 
-    /**
-     * Tracks the size of each element in {@code bufferedRequestEntries}. The sizes are stored in MB
-     * and the position in the deque reflects the position of the corresponding element in {@code
-     * bufferedRequestEntries}.
-     */
-    private final Deque<Double> bufferedRequestEntriesSizeMB = new ArrayDeque<>();
+    private boolean existsActiveTimerCallback = false;
 
     /**
      * This method specifies how to persist buffered request entries into the destination. It is
@@ -143,7 +140,7 @@
      * @param requestEntry the requestEntry for which we want to know the size
      * @return the size of the requestEntry, as defined previously
      */
-    protected abstract int getSizeInBytes(RequestEntryT requestEntry);
+    protected abstract long getSizeInBytes(RequestEntryT requestEntry);
 
     public AsyncSinkWriter(
             ElementConverter<InputT, RequestEntryT> elementConverter,
@@ -151,7 +148,8 @@
             int maxBatchSize,
             int maxInFlightRequests,
             int maxBufferedRequests,
-            double flushOnBufferSizeMB) {
+            long flushOnBufferSizeInBytes,
+            long maxTimeInBufferMS) {
         this.elementConverter = elementConverter;
         this.mailboxExecutor = context.getMailboxExecutor();
         this.timeService = context.getProcessingTimeService();
@@ -160,6 +158,8 @@
         Preconditions.checkArgument(maxBatchSize > 0);
         Preconditions.checkArgument(maxBufferedRequests > 0);
         Preconditions.checkArgument(maxInFlightRequests > 0);
+        Preconditions.checkArgument(flushOnBufferSizeInBytes > 0);
+        Preconditions.checkArgument(maxTimeInBufferMS > 0);
         Preconditions.checkArgument(
                 maxBufferedRequests > maxBatchSize,
                 "The maximum number of requests that may be buffered should be strictly"
@@ -167,30 +167,40 @@
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
-        this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+        this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
 
         this.inFlightRequestsCount = 0;
-        this.bufferedRequestEntriesTotalSizeMB = 0;
+        this.bufferedRequestEntriesTotalSizeInBytes = 0;
+    }
+
+    private void registerCallback() {
+        Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
+                instant -> {
+                    existsActiveTimerCallback = false;
+                    while (!bufferedRequestEntries.isEmpty()) {
+                        flush();
+                    }
+                };
+        timeService.registerProcessingTimer(
+                timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
+        existsActiveTimerCallback = true;
     }
 
     @Override
     public void write(InputT element, Context context) throws IOException, InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.yield();
+            mailboxExecutor.tryYield();
         }
 
-        RequestEntryT requestEntry = elementConverter.apply(element, context);
-        double requestEntrySizeMB = getSizeInMB(requestEntry);
-        bufferedRequestEntries.add(requestEntry);
-        bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
-        bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
+        addEntryToBuffer(elementConverter.apply(element, context), false);
 
         flushIfAble();
     }
 
-    private void flushIfAble() throws InterruptedException {
+    private void flushIfAble() {
         while (bufferedRequestEntries.size() >= maxBatchSize
-                || bufferedRequestEntriesTotalSizeMB >= flushOnBufferSizeMB) {
+                || bufferedRequestEntriesTotalSizeInBytes >= flushOnBufferSizeInBytes) {
             flush();
         }
     }
@@ -201,18 +211,18 @@
      *
      * <p>The method blocks if too many async requests are in flight.
      */
-    private void flush() throws InterruptedException {
+    private void flush() {
         while (inFlightRequestsCount >= maxInFlightRequests) {
-            mailboxExecutor.yield();
+            mailboxExecutor.tryYield();
         }
 
         List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
 
         int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
         for (int i = 0; i < batchSize; i++) {
-            batch.add(bufferedRequestEntries.remove());
-            double elementSizeMB = bufferedRequestEntriesSizeMB.remove();
-            bufferedRequestEntriesTotalSizeMB -= elementSizeMB;
+            RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove();
+            batch.add(elem.getRequestEntry());
+            bufferedRequestEntriesTotalSizeInBytes -= elem.getSize();
         }
 
         if (batch.size() == 0) {
@@ -238,17 +248,24 @@
      */
     private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
         inFlightRequestsCount--;
-        failedRequestEntries.forEach(
-                failedEntry -> {
-                    bufferedRequestEntries.addFirst(failedEntry);
-                    double sizeOfFailedEntry = getSizeInMB(failedEntry);
-                    bufferedRequestEntriesSizeMB.addFirst(sizeOfFailedEntry);
-                    bufferedRequestEntriesTotalSizeMB += sizeOfFailedEntry;
-                });
+        failedRequestEntries.forEach(failedEntry -> addEntryToBuffer(failedEntry, true));
     }
 
-    private double getSizeInMB(RequestEntryT requestEntry){
-        return getSizeInBytes(requestEntry) / (double) BYTES_IN_MB;
+    private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
+        if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
+            registerCallback();
+        }
+
+        RequestEntryWrapper<RequestEntryT> wrappedEntry =
+                new RequestEntryWrapper<>(entry, getSizeInBytes(entry));
+
+        if (insertAtHead) {
+            bufferedRequestEntries.addFirst(wrappedEntry);
+        } else {
+            bufferedRequestEntries.add(wrappedEntry);
+        }
+
+        bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize();
     }
 
     /**
@@ -260,9 +277,9 @@
      * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
      */
     @Override
-    public List<Void> prepareCommit(boolean flush) throws InterruptedException {
+    public List<Void> prepareCommit(boolean flush) {
         while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0) {
-            mailboxExecutor.yield();
+            mailboxExecutor.tryYield();
             if (flush) {
                 flush();
             }
@@ -279,7 +296,10 @@
      */
     @Override
     public List<Collection<RequestEntryT>> snapshotState() {
-        return Arrays.asList(bufferedRequestEntries);
+        return Arrays.asList(
+                bufferedRequestEntries.stream()
+                        .map(RequestEntryWrapper::getRequestEntry)
+                        .collect(Collectors.toList()));
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java
new file mode 100644
index 0000000..2a44c10
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A wrapper that contains a {@code RequestEntryT} ready to be written by the Sink Writer class,
+ * along with the size of that entry as defined by the method {@code getSizeInBytes(RequestEntryT)}
+ * of the {@code AsyncSinkWriter}.
+ *
+ * @param <RequestEntryT> Corresponds to the type parameter of the same name in {@code
+ *     AsyncSinkWriter}
+ */
+@PublicEvolving
+public class RequestEntryWrapper<RequestEntryT> {
+
+    private final RequestEntryT requestEntry;
+    private final long size;
+
+    public RequestEntryWrapper(RequestEntryT requestEntry, long size) {
+        this.requestEntry = requestEntry;
+        this.size = size;
+    }
+
+    public RequestEntryT getRequestEntry() {
+        return requestEntry;
+    }
+
+    public long getSize() {
+        return size;
+    }
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index 4661110..29c8c56 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -33,18 +33,24 @@
     private final int maxBatchSize;
     private final int maxInFlightRequests;
     private final int maxBufferedRequests;
-    private final double flushOnBufferSizeMB;
+    private final long flushOnBufferSizeInBytes;
+    private final long maxTimeInBufferMS;
 
     public ArrayListAsyncSink() {
-        this(25, 1, 100, 0.1);
+        this(25, 1, 100, 100000, 1000);
     }
 
-    public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
-                              double flushOnBufferSizeMB) {
+    public ArrayListAsyncSink(
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long flushOnBufferSizeInBytes,
+            long maxTimeInBufferMS) {
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
-        this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+        this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+        this.maxTimeInBufferMS = maxTimeInBufferMS;
     }
 
     @Override
@@ -59,7 +65,8 @@
                 maxBatchSize,
                 maxInFlightRequests,
                 maxBufferedRequests,
-                flushOnBufferSizeMB) {
+                flushOnBufferSizeInBytes,
+                maxTimeInBufferMS) {
 
             @Override
             protected void submitRequestEntries(
@@ -69,7 +76,7 @@
             }
 
             @Override
-            protected int getSizeInBytes(Integer requestEntry) {
+            protected long getSizeInBytes(Integer requestEntry) {
                 return 4;
             }
         };
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
index 4f6d5b1..a7ed47d 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -41,7 +42,7 @@
     public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
         env.fromSequence(999_999, 1_000_100)
                 .map(Object::toString)
-                .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10));
+                .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10, 1000));
         Exception e =
                 assertThrows(
                         JobExecutionException.class,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 8db0aec..b3754d0 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -21,11 +21,13 @@
 import org.apache.flink.api.connector.sink.Sink;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
+
 import org.junit.Before;
 import org.junit.Test;
 
@@ -38,12 +40,18 @@
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
@@ -51,7 +59,6 @@
  */
 public class AsyncSinkWriterTest {
 
-    private static final int BYTES_IN_MB = 1024 * 1024;
     private final List<Integer> res = new ArrayList<>();
     private final SinkInitContext sinkInitContext = new SinkInitContext();
 
@@ -93,6 +100,15 @@
     }
 
     @Test
+    public void testThatMailboxYieldDoesNotBlockWhileATimerIsRegisteredAndHasYetToElapse()
+            throws Exception {
+        AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
+        sink.write(String.valueOf(0));
+        sink.prepareCommit(true);
+        assertEquals(1, res.size());
+    }
+
+    @Test
     public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterAutomaticFlush()
             throws IOException, InterruptedException {
         AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, false);
@@ -132,7 +148,8 @@
         sink.write("95");
         sink.write("35");
         Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
-        assertEquals("Deliberate runtime exception occurred in SinkWriterImplementation.",
+        assertEquals(
+                "Deliberate runtime exception occurred in SinkWriterImplementation.",
                 e.getMessage());
         assertEquals(3, res.size());
     }
@@ -241,10 +258,10 @@
     }
 
     @Test
-    public void flushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
+    public void testFlushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
             throws IOException, InterruptedException {
-        AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100,
-                (double) 30 / BYTES_IN_MB, true);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 30, 1000, false);
 
         /* Sink has flush threshold of 30 bytes, each integer is 4 bytes, therefore, flushing
          * should occur once 8 elements have been written.
@@ -253,6 +270,256 @@
             sink.write(String.valueOf(i));
         }
         assertEquals(8, res.size());
+        sink.write(String.valueOf(15));
+        assertEquals(16, res.size());
+    }
+
+    @Test
+    public void testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs()
+            throws IOException, InterruptedException {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 8, 1, 100, 32, 1000, false);
+
+        for (int i = 0; i < 8; i++) {
+            sink.write(String.valueOf(i));
+        }
+        assertEquals(8, res.size());
+        for (int i = 8; i < 16; i++) {
+            sink.write(String.valueOf(i));
+        }
+        assertEquals(16, res.size());
+    }
+
+    @Test
+    public void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectSize()
+            throws IOException, InterruptedException {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 110, 1000, true);
+
+        sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
+        sink.write(String.valueOf(1)); //   Buffer: 104/110B; 2/10 elements; 0 inflight
+        sink.write(String.valueOf(2)); //   Buffer: 108/110B; 3/10 elements; 0 inflight
+        sink.write(String.valueOf(3)); //   Buffer: 112/110B; 4/10 elements; 0 inflight -- flushing
+        assertEquals(3, res.size()); // Element 225 failed on first attempt
+        sink.write(String.valueOf(4)); //   Buffer:   4/110B; 1/10 elements; 1 inflight
+        sink.write(String.valueOf(5)); //   Buffer:   8/110B; 2/10 elements; 1 inflight
+        sink.write(String.valueOf(6)); //   Buffer:  12/110B; 3/10 elements; 1 inflight
+        sink.write(String.valueOf(325)); // Buffer: 112/110B; 4/10 elements; 1 inflight -- flushing
+
+        assertEquals(Arrays.asList(1, 2, 3, 225, 4, 5, 6), res);
+    }
+
+    @Test
+    public void testThatABatchWithSizeSmallerThanMaxBatchSizeIsFlushedOnTimeoutExpiry()
+            throws Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        for (int i = 0; i < 8; i++) {
+            sink.write(String.valueOf(i));
+        }
+
+        tpts.setCurrentTime(99L);
+        assertEquals(0, res.size());
+        tpts.setCurrentTime(100L);
+        assertEquals(8, res.size());
+    }
+
+    @Test
+    public void testThatTimeBasedBatchPicksUpAllRelevantItemsUpUntilExpiryOfTimer()
+            throws Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        for (int i = 0; i < 98; i++) {
+            tpts.setCurrentTime(i);
+            sink.write(String.valueOf(i));
+        }
+        tpts.setCurrentTime(99L);
+        assertEquals(90, res.size());
+        tpts.setCurrentTime(100L);
+        assertEquals(98, res.size());
+    }
+
+    @Test
+    public void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        sink.write("1"); // A timer is registered here to elapse at t=100
+        assertEquals(0, res.size());
+        tpts.setCurrentTime(10L);
+        sink.prepareCommit(true);
+        assertEquals(1, res.size());
+        tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
+        sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
+        assertEquals(1, res.size());
+        tpts.setCurrentTime(100L);
+        assertEquals(2, res.size());
+        sink.write("3");
+        tpts.setCurrentTime(199L); // At t=199s, our third element has not been written
+        assertEquals(2, res.size()); // therefore, no timer fired at 120s.
+        tpts.setCurrentTime(200L);
+        assertEquals(3, res.size());
+    }
+
+    @Test
+    public void testThatIntermittentlyFailingEntriesShouldBeFlushedWithMainBatchInTimeBasedFlush()
+            throws Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 10000, 100, true);
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        sink.write("1");
+        sink.write("2");
+        sink.write("225");
+        tpts.setCurrentTime(100L);
+        assertEquals(2, res.size());
+        sink.write("3");
+        sink.write("4");
+        tpts.setCurrentTime(199L);
+        assertEquals(2, res.size());
+        tpts.setCurrentTime(200L);
+        assertEquals(5, res.size());
+    }
+
+    @Test
+    public void testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        sink.write("1");
+        tpts.setCurrentTime(50L);
+        sink.prepareCommit(true);
+        assertEquals(1, res.size());
+        tpts.setCurrentTime(200L);
+    }
+
+    @Test
+    public void testThatOnExpiryOfAnOldTimeoutANewOneMayBeRegisteredImmediately() throws Exception {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        sink.write("1");
+        tpts.setCurrentTime(100L);
+        assertEquals(1, res.size());
+        sink.write("2");
+        tpts.setCurrentTime(200L);
+        assertEquals(2, res.size());
+    }
+
+    /**
+     * This test considers what could happen if the timer elapses, triggering a flush, while a
+     * long-running call to {@code submitRequestEntries} remains uncompleted for some time. We have
+     * a countdown latch with an expiry of 500ms installed in the call to {@code
+     * submitRequestEntries} that blocks if the batch size received is 3 and subsequently accepts
+     * and succeeds with any value.
+     *
+     * <p>Let us call the thread writing "3" thread3 and the thread writing "4" thread4. Thread3
+     * will enter {@code submitRequestEntries} with 3 entries and release thread4. Thread3 becomes
+     * blocked for 500ms. Thread4 writes "4" to the buffer and is flushed when the timer triggers
+     * (timer was first set when "1" was written). Thread4 then is blocked during the flush phase
+     * since thread3 is in-flight and maxInFlightRequests=1. After 500ms elapses, thread3 is revived
+     * and proceeds, which also unblocks thread4. This results in 1, 2, 3 being written prior to 4.
+     *
+     * <p>This test also implicitly asserts that any thread in the SinkWriter must be the mailbox
+     * thread if it enters {@code mailbox.tryYield()}.
+     */
+    @Test
+    public void testThatInterleavingThreadsMayBlockEachOtherButDoNotCauseRaceConditions()
+            throws Exception {
+        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+        CountDownLatch delayedStartLatch = new CountDownLatch(1);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkReleaseAndBlockWriterImpl(
+                        sinkInitContext,
+                        3,
+                        1,
+                        20,
+                        100,
+                        100,
+                        blockedWriteLatch,
+                        delayedStartLatch,
+                        true);
+
+        writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
+        assertEquals(Arrays.asList(1, 2, 3, 4), res);
+    }
+
+    /**
+     * This test considers what could happen if the timer elapses, triggering a flush, while a
+     * long-running call to {@code submitRequestEntries} remains blocked. We have a countdown latch
+     * that blocks permanently until freed once the timer based flush is complete.
+     *
+     * <p>Let us call the thread writing "3" thread3 and the thread writing "4" thread4. Thread3
+     * will enter {@code submitRequestEntries} with 3 entries and release thread4. Thread3 becomes
+     * blocked. Thread4 writes "4" to the buffer and is flushed when the timer triggers (timer was
+     * first set when "1" was written). Thread4 completes and frees thread3. Thread3 is revived and
+     * proceeds. This results in 4 being written prior to 1, 2, 3.
+     *
+     * <p>This test also implicitly asserts that any thread in the SinkWriter must be the mailbox
+     * thread if it enters {@code mailbox.tryYield()}.
+     */
+    @Test
+    public void testThatIfOneInterleavedThreadIsBlockedTheOtherThreadWillContinueAndCorrectlyWrite()
+            throws Exception {
+        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+        CountDownLatch delayedStartLatch = new CountDownLatch(1);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkReleaseAndBlockWriterImpl(
+                        sinkInitContext,
+                        3,
+                        2,
+                        20,
+                        100,
+                        100,
+                        blockedWriteLatch,
+                        delayedStartLatch,
+                        false);
+
+        writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
+        assertEquals(new ArrayList<>(Arrays.asList(4, 1, 2, 3)), res);
+    }
+
+    private void writeTwoElementsAndInterleaveTheNextTwoElements(
+            AsyncSinkWriterImpl sink,
+            CountDownLatch blockedWriteLatch,
+            CountDownLatch delayedStartLatch)
+            throws Exception {
+
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        ExecutorService es = Executors.newFixedThreadPool(4);
+
+        tpts.setCurrentTime(0L);
+        sink.write("1");
+        sink.write("2");
+        es.submit(
+                () -> {
+                    try {
+                        sink.write("3");
+                    } catch (IOException | InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                });
+
+        delayedStartLatch.await();
+        sink.write("4");
+        tpts.setCurrentTime(100L);
+        blockedWriteLatch.countDown();
+        es.shutdown();
+        assertTrue(
+                es.awaitTermination(500, TimeUnit.MILLISECONDS),
+                "Executor Service stuck at termination, not terminated after 500ms!");
     }
 
     private class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
@@ -261,14 +528,19 @@
         private final boolean simulateFailures;
 
         public AsyncSinkWriterImpl(
-                Sink.InitContext context, int maxBatchSize, int maxInFlightRequests,
-                int maxBufferedRequests, boolean simulateFailures) {
+                Sink.InitContext context,
+                int maxBatchSize,
+                int maxInFlightRequests,
+                int maxBufferedRequests,
+                boolean simulateFailures) {
             super(
                     (elem, ctx) -> Integer.parseInt(elem),
                     context,
                     maxBatchSize,
                     maxInFlightRequests,
-                    maxBufferedRequests, 100);
+                    maxBufferedRequests,
+                    10000000,
+                    1000);
             this.simulateFailures = simulateFailures;
         }
 
@@ -277,10 +549,17 @@
                 int maxBatchSize,
                 int maxInFlightRequests,
                 int maxBufferedRequests,
-                double flushOnBufferSizeMB,
+                long flushOnBufferSizeInBytes,
+                long maxTimeInBufferMS,
                 boolean simulateFailures) {
-            super((elem, ctx) -> Integer.parseInt(elem), context, maxBatchSize, maxInFlightRequests,
-                    maxBufferedRequests, flushOnBufferSizeMB);
+            super(
+                    (elem, ctx) -> Integer.parseInt(elem),
+                    context,
+                    maxBatchSize,
+                    maxInFlightRequests,
+                    maxBufferedRequests,
+                    flushOnBufferSizeInBytes,
+                    maxTimeInBufferMS);
             this.simulateFailures = simulateFailures;
         }
 
@@ -329,14 +608,24 @@
             }
         }
 
+        /**
+         * @return If we're simulating failures and the requestEntry value is greater than 200, then
+         *     the entry is size 100 bytes, otherwise each entry is 4 bytes.
+         */
         @Override
-        protected int getSizeInBytes(Integer requestEntry) {
-            return 4;
+        protected long getSizeInBytes(Integer requestEntry) {
+            return requestEntry > 200 && simulateFailures ? 100 : 4;
         }
     }
 
     private static class SinkInitContext implements Sink.InitContext {
 
+        private static final TestProcessingTimeService processingTimeService;
+
+        static {
+            processingTimeService = new TestProcessingTimeService();
+        }
+
         @Override
         public UserCodeClassLoader getUserCodeClassLoader() {
             return null;
@@ -370,7 +659,19 @@
 
         @Override
         public Sink.ProcessingTimeService getProcessingTimeService() {
-            return null;
+            return new Sink.ProcessingTimeService() {
+                @Override
+                public long getCurrentProcessingTime() {
+                    return processingTimeService.getCurrentProcessingTime();
+                }
+
+                @Override
+                public void registerProcessingTimer(
+                        long time, ProcessingTimeCallback processingTimerCallback) {
+                    processingTimeService.registerTimer(
+                            time, processingTimerCallback::onProcessingTime);
+                }
+            };
         }
 
         @Override
@@ -392,5 +693,66 @@
         public OptionalLong getRestoredCheckpointId() {
             return OptionalLong.empty();
         }
+
+        public TestProcessingTimeService getTestProcessingTimeService() {
+            return processingTimeService;
+        }
+    }
+
+    /**
+     * This SinkWriter releases the lock on existing threads blocked by {@code delayedStartLatch}
+     * and blocks itself until {@code blockedThreadLatch} is unblocked.
+     */
+    private class AsyncSinkReleaseAndBlockWriterImpl extends AsyncSinkWriterImpl {
+
+        private final CountDownLatch blockedThreadLatch;
+        private final CountDownLatch delayedStartLatch;
+        private final boolean blockForLimitedTime;
+
+        public AsyncSinkReleaseAndBlockWriterImpl(
+                Sink.InitContext context,
+                int maxBatchSize,
+                int maxInFlightRequests,
+                int maxBufferedRequests,
+                long flushOnBufferSizeInBytes,
+                long maxTimeInBufferMS,
+                CountDownLatch blockedThreadLatch,
+                CountDownLatch delayedStartLatch,
+                boolean blockForLimitedTime) {
+            super(
+                    context,
+                    maxBatchSize,
+                    maxInFlightRequests,
+                    maxBufferedRequests,
+                    flushOnBufferSizeInBytes,
+                    maxTimeInBufferMS,
+                    false);
+            this.blockedThreadLatch = blockedThreadLatch;
+            this.delayedStartLatch = delayedStartLatch;
+            this.blockForLimitedTime = blockForLimitedTime;
+        }
+
+        @Override
+        protected void submitRequestEntries(
+                List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
+            if (requestEntries.size() == 3) {
+                try {
+                    delayedStartLatch.countDown();
+                    if (blockForLimitedTime) {
+                        assertFalse(
+                                blockedThreadLatch.await(500, TimeUnit.MILLISECONDS),
+                                "The countdown latch was released before the full amount"
+                                        + "of time was reached.");
+                    } else {
+                        blockedThreadLatch.await();
+                    }
+                } catch (InterruptedException e) {
+                    fail("The unit test latch must not have been interrupted by another thread.");
+                }
+            }
+
+            res.addAll(requestEntries);
+            requestResult.accept(new ArrayList<>());
+        }
     }
 }