[FLINK-24234][connectors] Added byte based flushing for AsyncSinkWriter
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 1d17d2f..5c6f0e4 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -50,12 +50,15 @@
 public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
         implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
 
+    private static final int BYTES_IN_MB = 1024 * 1024;
+
     private final MailboxExecutor mailboxExecutor;
     private final Sink.ProcessingTimeService timeService;
 
     private final int maxBatchSize;
     private final int maxInFlightRequests;
     private final int maxBufferedRequests;
+    private final double flushOnBufferSizeMB;
 
     /**
      * The ElementConverter provides a mapping between for the elements of a stream to request
@@ -96,6 +99,19 @@
     private int inFlightRequestsCount;
 
     /**
+     * Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
+     * the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
+     */
+    private double bufferedRequestEntriesTotalSizeMB;
+
+    /**
+     * Tracks the size of each element in {@code bufferedRequestEntries}. The sizes are stored in MB
+     * and the position in the deque reflects the position of the corresponding element in {@code
+     * bufferedRequestEntries}.
+     */
+    private final Deque<Double> bufferedRequestEntriesSizeMB = new ArrayDeque<>();
+
+    /**
      * This method specifies how to persist buffered request entries into the destination. It is
      * implemented when support for a new destination is added.
      *
@@ -118,12 +134,24 @@
     protected abstract void submitRequestEntries(
             List<RequestEntryT> requestEntries, Consumer<Collection<RequestEntryT>> requestResult);
 
+    /**
+     * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
+     * this case is measured as the total bytes that is written to the destination as a result of
+     * persisting this particular {@code RequestEntryT} rather than the serialized length (which may
+     * be the same).
+     *
+     * @param requestEntry the requestEntry for which we want to know the size
+     * @return the size of the requestEntry, as defined previously
+     */
+    protected abstract int getSizeInBytes(RequestEntryT requestEntry);
+
     public AsyncSinkWriter(
             ElementConverter<InputT, RequestEntryT> elementConverter,
             Sink.InitContext context,
             int maxBatchSize,
             int maxInFlightRequests,
-            int maxBufferedRequests) {
+            int maxBufferedRequests,
+            double flushOnBufferSizeMB) {
         this.elementConverter = elementConverter;
         this.mailboxExecutor = context.getMailboxExecutor();
         this.timeService = context.getProcessingTimeService();
@@ -139,6 +167,10 @@
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
+        this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+
+        this.inFlightRequestsCount = 0;
+        this.bufferedRequestEntriesTotalSizeMB = 0;
     }
 
     @Override
@@ -147,13 +179,18 @@
             mailboxExecutor.yield();
         }
 
-        bufferedRequestEntries.add(elementConverter.apply(element, context));
+        RequestEntryT requestEntry = elementConverter.apply(element, context);
+        double requestEntrySizeMB = getSizeInMB(requestEntry);
+        bufferedRequestEntries.add(requestEntry);
+        bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
+        bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
 
-        flushIfFull();
+        flushIfAble();
     }
 
-    private void flushIfFull() throws InterruptedException {
-        while (bufferedRequestEntries.size() >= maxBatchSize) {
+    private void flushIfAble() throws InterruptedException {
+        while (bufferedRequestEntries.size() >= maxBatchSize
+                || bufferedRequestEntriesTotalSizeMB >= flushOnBufferSizeMB) {
             flush();
         }
     }
@@ -174,6 +211,8 @@
         int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
         for (int i = 0; i < batchSize; i++) {
             batch.add(bufferedRequestEntries.remove());
+            double elementSizeMB = bufferedRequestEntriesSizeMB.remove();
+            bufferedRequestEntriesTotalSizeMB -= elementSizeMB;
         }
 
         if (batch.size() == 0) {
@@ -199,7 +238,17 @@
      */
     private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
         inFlightRequestsCount--;
-        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
+        failedRequestEntries.forEach(
+                failedEntry -> {
+                    bufferedRequestEntries.addFirst(failedEntry);
+                    double sizeOfFailedEntry = getSizeInMB(failedEntry);
+                    bufferedRequestEntriesSizeMB.addFirst(sizeOfFailedEntry);
+                    bufferedRequestEntriesTotalSizeMB += sizeOfFailedEntry;
+                });
+    }
+
+    private double getSizeInMB(RequestEntryT requestEntry){
+        return getSizeInBytes(requestEntry) / (double) BYTES_IN_MB;
     }
 
     /**
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index c4d6639..4661110 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -33,15 +33,18 @@
     private final int maxBatchSize;
     private final int maxInFlightRequests;
     private final int maxBufferedRequests;
+    private final double flushOnBufferSizeMB;
 
     public ArrayListAsyncSink() {
-        this(25, 1, 100);
+        this(25, 1, 100, 0.1);
     }
 
-    public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests) {
+    public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
+                              double flushOnBufferSizeMB) {
         this.maxBatchSize = maxBatchSize;
         this.maxInFlightRequests = maxInFlightRequests;
         this.maxBufferedRequests = maxBufferedRequests;
+        this.flushOnBufferSizeMB = flushOnBufferSizeMB;
     }
 
     @Override
@@ -55,13 +58,20 @@
                 context,
                 maxBatchSize,
                 maxInFlightRequests,
-                maxBufferedRequests) {
+                maxBufferedRequests,
+                flushOnBufferSizeMB) {
+
             @Override
             protected void submitRequestEntries(
                     List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
                 ArrayListDestination.putRecords(requestEntries);
                 requestResult.accept(Arrays.asList());
             }
+
+            @Override
+            protected int getSizeInBytes(Integer requestEntry) {
+                return 4;
+            }
         };
     }
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
index 63d51e3..4f6d5b1 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -42,7 +41,7 @@
     public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
         env.fromSequence(999_999, 1_000_100)
                 .map(Object::toString)
-                .sinkTo(new ArrayListAsyncSink(1, 1, 2));
+                .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10));
         Exception e =
                 assertThrows(
                         JobExecutionException.class,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 378198d..8db0aec 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -26,7 +26,6 @@
 import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
-
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,6 +51,7 @@
  */
 public class AsyncSinkWriterTest {
 
+    private static final int BYTES_IN_MB = 1024 * 1024;
     private final List<Integer> res = new ArrayList<>();
     private final SinkInitContext sinkInitContext = new SinkInitContext();
 
@@ -132,8 +132,7 @@
         sink.write("95");
         sink.write("35");
         Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
-        assertEquals(
-                "Deliberate runtime exception occurred in SinkWriterImplementation.",
+        assertEquals("Deliberate runtime exception occurred in SinkWriterImplementation.",
                 e.getMessage());
         assertEquals(3, res.size());
     }
@@ -241,23 +240,47 @@
         assertEquals(z, new ArrayList<>(sink.snapshotState().get(0)));
     }
 
+    @Test
+    public void flushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
+            throws IOException, InterruptedException {
+        AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100,
+                (double) 30 / BYTES_IN_MB, true);
+
+        /* Sink has flush threshold of 30 bytes, each integer is 4 bytes, therefore, flushing
+         * should occur once 8 elements have been written.
+         */
+        for (int i = 0; i < 15; i++) {
+            sink.write(String.valueOf(i));
+        }
+        assertEquals(8, res.size());
+    }
+
     private class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
 
         private final Set<Integer> failedFirstAttempts = new HashSet<>();
         private final boolean simulateFailures;
 
         public AsyncSinkWriterImpl(
-                Sink.InitContext context,
-                int maxBatchSize,
-                int maxInFlightRequests,
-                int maxBufferedRequests,
-                boolean simulateFailures) {
+                Sink.InitContext context, int maxBatchSize, int maxInFlightRequests,
+                int maxBufferedRequests, boolean simulateFailures) {
             super(
                     (elem, ctx) -> Integer.parseInt(elem),
                     context,
                     maxBatchSize,
                     maxInFlightRequests,
-                    maxBufferedRequests);
+                    maxBufferedRequests, 100);
+            this.simulateFailures = simulateFailures;
+        }
+
+        public AsyncSinkWriterImpl(
+                Sink.InitContext context,
+                int maxBatchSize,
+                int maxInFlightRequests,
+                int maxBufferedRequests,
+                double flushOnBufferSizeMB,
+                boolean simulateFailures) {
+            super((elem, ctx) -> Integer.parseInt(elem), context, maxBatchSize, maxInFlightRequests,
+                    maxBufferedRequests, flushOnBufferSizeMB);
             this.simulateFailures = simulateFailures;
         }
 
@@ -305,6 +328,11 @@
                 requestResult.accept(new ArrayList<>());
             }
         }
+
+        @Override
+        protected int getSizeInBytes(Integer requestEntry) {
+            return 4;
+        }
     }
 
     private static class SinkInitContext implements Sink.InitContext {