[FLINK-24234][connectors] Added byte based flushing for AsyncSinkWriter
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 1d17d2f..5c6f0e4 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -50,12 +50,15 @@
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
+ private static final int BYTES_IN_MB = 1024 * 1024;
+
private final MailboxExecutor mailboxExecutor;
private final Sink.ProcessingTimeService timeService;
private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
+ private final double flushOnBufferSizeMB;
/**
* The ElementConverter provides a mapping between for the elements of a stream to request
@@ -96,6 +99,19 @@
private int inFlightRequestsCount;
/**
+ * Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
+ * the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
+ */
+ private double bufferedRequestEntriesTotalSizeMB;
+
+ /**
+ * Tracks the size of each element in {@code bufferedRequestEntries}. The sizes are stored in MB
+ * and the position in the deque reflects the position of the corresponding element in {@code
+ * bufferedRequestEntries}.
+ */
+ private final Deque<Double> bufferedRequestEntriesSizeMB = new ArrayDeque<>();
+
+ /**
* This method specifies how to persist buffered request entries into the destination. It is
* implemented when support for a new destination is added.
*
@@ -118,12 +134,24 @@
protected abstract void submitRequestEntries(
List<RequestEntryT> requestEntries, Consumer<Collection<RequestEntryT>> requestResult);
+ /**
+ * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
+ * this case is measured as the total bytes that is written to the destination as a result of
+ * persisting this particular {@code RequestEntryT} rather than the serialized length (which may
+ * be the same).
+ *
+ * @param requestEntry the requestEntry for which we want to know the size
+ * @return the size of the requestEntry, as defined previously
+ */
+ protected abstract int getSizeInBytes(RequestEntryT requestEntry);
+
public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
- int maxBufferedRequests) {
+ int maxBufferedRequests,
+ double flushOnBufferSizeMB) {
this.elementConverter = elementConverter;
this.mailboxExecutor = context.getMailboxExecutor();
this.timeService = context.getProcessingTimeService();
@@ -139,6 +167,10 @@
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
+ this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+
+ this.inFlightRequestsCount = 0;
+ this.bufferedRequestEntriesTotalSizeMB = 0;
}
@Override
@@ -147,13 +179,18 @@
mailboxExecutor.yield();
}
- bufferedRequestEntries.add(elementConverter.apply(element, context));
+ RequestEntryT requestEntry = elementConverter.apply(element, context);
+ double requestEntrySizeMB = getSizeInMB(requestEntry);
+ bufferedRequestEntries.add(requestEntry);
+ bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
+ bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
- flushIfFull();
+ flushIfAble();
}
- private void flushIfFull() throws InterruptedException {
- while (bufferedRequestEntries.size() >= maxBatchSize) {
+ private void flushIfAble() throws InterruptedException {
+ while (bufferedRequestEntries.size() >= maxBatchSize
+ || bufferedRequestEntriesTotalSizeMB >= flushOnBufferSizeMB) {
flush();
}
}
@@ -174,6 +211,8 @@
int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
for (int i = 0; i < batchSize; i++) {
batch.add(bufferedRequestEntries.remove());
+ double elementSizeMB = bufferedRequestEntriesSizeMB.remove();
+ bufferedRequestEntriesTotalSizeMB -= elementSizeMB;
}
if (batch.size() == 0) {
@@ -199,7 +238,17 @@
*/
private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
inFlightRequestsCount--;
- failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
+ failedRequestEntries.forEach(
+ failedEntry -> {
+ bufferedRequestEntries.addFirst(failedEntry);
+ double sizeOfFailedEntry = getSizeInMB(failedEntry);
+ bufferedRequestEntriesSizeMB.addFirst(sizeOfFailedEntry);
+ bufferedRequestEntriesTotalSizeMB += sizeOfFailedEntry;
+ });
+ }
+
+ private double getSizeInMB(RequestEntryT requestEntry){
+ return getSizeInBytes(requestEntry) / (double) BYTES_IN_MB;
}
/**
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index c4d6639..4661110 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -33,15 +33,18 @@
private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
+ private final double flushOnBufferSizeMB;
public ArrayListAsyncSink() {
- this(25, 1, 100);
+ this(25, 1, 100, 0.1);
}
- public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests) {
+ public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
+ double flushOnBufferSizeMB) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
+ this.flushOnBufferSizeMB = flushOnBufferSizeMB;
}
@Override
@@ -55,13 +58,20 @@
context,
maxBatchSize,
maxInFlightRequests,
- maxBufferedRequests) {
+ maxBufferedRequests,
+ flushOnBufferSizeMB) {
+
@Override
protected void submitRequestEntries(
List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
ArrayListDestination.putRecords(requestEntries);
requestResult.accept(Arrays.asList());
}
+
+ @Override
+ protected int getSizeInBytes(Integer requestEntry) {
+ return 4;
+ }
};
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
index 63d51e3..4f6d5b1 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
@@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -42,7 +41,7 @@
public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
env.fromSequence(999_999, 1_000_100)
.map(Object::toString)
- .sinkTo(new ArrayListAsyncSink(1, 1, 2));
+ .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10));
Exception e =
assertThrows(
JobExecutionException.class,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 378198d..8db0aec 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -26,7 +26,6 @@
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
-
import org.junit.Before;
import org.junit.Test;
@@ -52,6 +51,7 @@
*/
public class AsyncSinkWriterTest {
+ private static final int BYTES_IN_MB = 1024 * 1024;
private final List<Integer> res = new ArrayList<>();
private final SinkInitContext sinkInitContext = new SinkInitContext();
@@ -132,8 +132,7 @@
sink.write("95");
sink.write("35");
Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
- assertEquals(
- "Deliberate runtime exception occurred in SinkWriterImplementation.",
+ assertEquals("Deliberate runtime exception occurred in SinkWriterImplementation.",
e.getMessage());
assertEquals(3, res.size());
}
@@ -241,23 +240,47 @@
assertEquals(z, new ArrayList<>(sink.snapshotState().get(0)));
}
+ @Test
+ public void flushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
+ throws IOException, InterruptedException {
+ AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100,
+ (double) 30 / BYTES_IN_MB, true);
+
+ /* Sink has flush threshold of 30 bytes, each integer is 4 bytes, therefore, flushing
+ * should occur once 8 elements have been written.
+ */
+ for (int i = 0; i < 15; i++) {
+ sink.write(String.valueOf(i));
+ }
+ assertEquals(8, res.size());
+ }
+
private class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
private final Set<Integer> failedFirstAttempts = new HashSet<>();
private final boolean simulateFailures;
public AsyncSinkWriterImpl(
- Sink.InitContext context,
- int maxBatchSize,
- int maxInFlightRequests,
- int maxBufferedRequests,
- boolean simulateFailures) {
+ Sink.InitContext context, int maxBatchSize, int maxInFlightRequests,
+ int maxBufferedRequests, boolean simulateFailures) {
super(
(elem, ctx) -> Integer.parseInt(elem),
context,
maxBatchSize,
maxInFlightRequests,
- maxBufferedRequests);
+ maxBufferedRequests, 100);
+ this.simulateFailures = simulateFailures;
+ }
+
+ public AsyncSinkWriterImpl(
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ double flushOnBufferSizeMB,
+ boolean simulateFailures) {
+ super((elem, ctx) -> Integer.parseInt(elem), context, maxBatchSize, maxInFlightRequests,
+ maxBufferedRequests, flushOnBufferSizeMB);
this.simulateFailures = simulateFailures;
}
@@ -305,6 +328,11 @@
requestResult.accept(new ArrayList<>());
}
}
+
+ @Override
+ protected int getSizeInBytes(Integer requestEntry) {
+ return 4;
+ }
}
private static class SinkInitContext implements Sink.InitContext {