[FLINK-24234][connectors] Time based flushing for AsyncSinkWriter
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 5c6f0e4..b5f6b40 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -33,6 +33,7 @@
import java.util.Deque;
import java.util.List;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
@@ -50,15 +51,14 @@
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
- private static final int BYTES_IN_MB = 1024 * 1024;
-
private final MailboxExecutor mailboxExecutor;
private final Sink.ProcessingTimeService timeService;
private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
- private final double flushOnBufferSizeMB;
+ private final long flushOnBufferSizeInBytes;
+ private final long maxTimeInBufferMS;
/**
* The ElementConverter provides a mapping between for the elements of a stream to request
@@ -70,7 +70,8 @@
private final ElementConverter<InputT, RequestEntryT> elementConverter;
/**
- * Buffer to hold request entries that should be persisted into the destination.
+ * Buffer to hold request entries that should be persisted into the destination, along with its
+ * size in bytes.
*
* <p>A request entry contain all relevant details to make a call to the destination. Eg, for
* Kinesis Data Streams a request entry contains the payload and partition key.
@@ -81,7 +82,8 @@
* construct a new (retry) request entry from the response and add that back to the queue for
* later retry.
*/
- private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();
+ private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries =
+ new ArrayDeque<>();
/**
* Tracks all pending async calls that have been executed since the last checkpoint. Calls that
@@ -100,16 +102,11 @@
/**
* Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
- * the criterion for flushing after {@code flushOnBufferSizeMB} is reached.
+ * the criterion for flushing after {@code flushOnBufferSizeInBytes} is reached.
*/
- private double bufferedRequestEntriesTotalSizeMB;
+ private double bufferedRequestEntriesTotalSizeInBytes;
- /**
- * Tracks the size of each element in {@code bufferedRequestEntries}. The sizes are stored in MB
- * and the position in the deque reflects the position of the corresponding element in {@code
- * bufferedRequestEntries}.
- */
- private final Deque<Double> bufferedRequestEntriesSizeMB = new ArrayDeque<>();
+ private boolean existsActiveTimerCallback = false;
/**
* This method specifies how to persist buffered request entries into the destination. It is
@@ -143,7 +140,7 @@
* @param requestEntry the requestEntry for which we want to know the size
* @return the size of the requestEntry, as defined previously
*/
- protected abstract int getSizeInBytes(RequestEntryT requestEntry);
+ protected abstract long getSizeInBytes(RequestEntryT requestEntry);
public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
@@ -151,7 +148,8 @@
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
- double flushOnBufferSizeMB) {
+ long flushOnBufferSizeInBytes,
+ long maxTimeInBufferMS) {
this.elementConverter = elementConverter;
this.mailboxExecutor = context.getMailboxExecutor();
this.timeService = context.getProcessingTimeService();
@@ -160,6 +158,8 @@
Preconditions.checkArgument(maxBatchSize > 0);
Preconditions.checkArgument(maxBufferedRequests > 0);
Preconditions.checkArgument(maxInFlightRequests > 0);
+ Preconditions.checkArgument(flushOnBufferSizeInBytes > 0);
+ Preconditions.checkArgument(maxTimeInBufferMS > 0);
Preconditions.checkArgument(
maxBufferedRequests > maxBatchSize,
"The maximum number of requests that may be buffered should be strictly"
@@ -167,30 +167,40 @@
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
- this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+ this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
this.inFlightRequestsCount = 0;
- this.bufferedRequestEntriesTotalSizeMB = 0;
+ this.bufferedRequestEntriesTotalSizeInBytes = 0;
+ }
+
+ private void registerCallback() {
+ Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
+ instant -> {
+ existsActiveTimerCallback = false;
+ while (!bufferedRequestEntries.isEmpty()) {
+ flush();
+ }
+ };
+ timeService.registerProcessingTimer(
+ timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
+ existsActiveTimerCallback = true;
}
@Override
public void write(InputT element, Context context) throws IOException, InterruptedException {
while (bufferedRequestEntries.size() >= maxBufferedRequests) {
- mailboxExecutor.yield();
+ mailboxExecutor.tryYield();
}
- RequestEntryT requestEntry = elementConverter.apply(element, context);
- double requestEntrySizeMB = getSizeInMB(requestEntry);
- bufferedRequestEntries.add(requestEntry);
- bufferedRequestEntriesSizeMB.add(requestEntrySizeMB);
- bufferedRequestEntriesTotalSizeMB += requestEntrySizeMB;
+ addEntryToBuffer(elementConverter.apply(element, context), false);
flushIfAble();
}
- private void flushIfAble() throws InterruptedException {
+ private void flushIfAble() {
while (bufferedRequestEntries.size() >= maxBatchSize
- || bufferedRequestEntriesTotalSizeMB >= flushOnBufferSizeMB) {
+ || bufferedRequestEntriesTotalSizeInBytes >= flushOnBufferSizeInBytes) {
flush();
}
}
@@ -201,18 +211,18 @@
*
* <p>The method blocks if too many async requests are in flight.
*/
- private void flush() throws InterruptedException {
+ private void flush() {
while (inFlightRequestsCount >= maxInFlightRequests) {
- mailboxExecutor.yield();
+ mailboxExecutor.tryYield();
}
List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
for (int i = 0; i < batchSize; i++) {
- batch.add(bufferedRequestEntries.remove());
- double elementSizeMB = bufferedRequestEntriesSizeMB.remove();
- bufferedRequestEntriesTotalSizeMB -= elementSizeMB;
+ RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove();
+ batch.add(elem.getRequestEntry());
+ bufferedRequestEntriesTotalSizeInBytes -= elem.getSize();
}
if (batch.size() == 0) {
@@ -238,17 +248,24 @@
*/
private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
inFlightRequestsCount--;
- failedRequestEntries.forEach(
- failedEntry -> {
- bufferedRequestEntries.addFirst(failedEntry);
- double sizeOfFailedEntry = getSizeInMB(failedEntry);
- bufferedRequestEntriesSizeMB.addFirst(sizeOfFailedEntry);
- bufferedRequestEntriesTotalSizeMB += sizeOfFailedEntry;
- });
+ failedRequestEntries.forEach(failedEntry -> addEntryToBuffer(failedEntry, true));
}
- private double getSizeInMB(RequestEntryT requestEntry){
- return getSizeInBytes(requestEntry) / (double) BYTES_IN_MB;
+ private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
+ if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
+ registerCallback();
+ }
+
+ RequestEntryWrapper<RequestEntryT> wrappedEntry =
+ new RequestEntryWrapper<>(entry, getSizeInBytes(entry));
+
+ if (insertAtHead) {
+ bufferedRequestEntries.addFirst(wrappedEntry);
+ } else {
+ bufferedRequestEntries.add(wrappedEntry);
+ }
+
+ bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize();
}
/**
@@ -260,9 +277,9 @@
* <p>To this end, all in-flight requests need to completed before proceeding with the commit.
*/
@Override
- public List<Void> prepareCommit(boolean flush) throws InterruptedException {
+ public List<Void> prepareCommit(boolean flush) {
while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0) {
- mailboxExecutor.yield();
+ mailboxExecutor.tryYield();
if (flush) {
flush();
}
@@ -279,7 +296,10 @@
*/
@Override
public List<Collection<RequestEntryT>> snapshotState() {
- return Arrays.asList(bufferedRequestEntries);
+ return Arrays.asList(
+ bufferedRequestEntries.stream()
+ .map(RequestEntryWrapper::getRequestEntry)
+ .collect(Collectors.toList()));
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java
new file mode 100644
index 0000000..2a44c10
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/RequestEntryWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A wrapper that contains a {@code RequestEntryT} ready to be written by the Sink Writer class,
+ * along with the size of that entry as defined by the method {@code getSizeInBytes(RequestEntryT)}
+ * of the {@code AsyncSinkWriter}.
+ *
+ * @param <RequestEntryT> Corresponds to the type parameter of the same name in {@code
+ * AsyncSinkWriter}
+ */
+@PublicEvolving
+public class RequestEntryWrapper<RequestEntryT> {
+
+ private final RequestEntryT requestEntry;
+ private final long size;
+
+ public RequestEntryWrapper(RequestEntryT requestEntry, long size) {
+ this.requestEntry = requestEntry;
+ this.size = size;
+ }
+
+ public RequestEntryT getRequestEntry() {
+ return requestEntry;
+ }
+
+ public long getSize() {
+ return size;
+ }
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index 4661110..29c8c56 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -33,18 +33,24 @@
private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
- private final double flushOnBufferSizeMB;
+ private final long flushOnBufferSizeInBytes;
+ private final long maxTimeInBufferMS;
public ArrayListAsyncSink() {
- this(25, 1, 100, 0.1);
+ this(25, 1, 100, 100000, 1000);
}
- public ArrayListAsyncSink(int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests,
- double flushOnBufferSizeMB) {
+ public ArrayListAsyncSink(
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long flushOnBufferSizeInBytes,
+ long maxTimeInBufferMS) {
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
- this.flushOnBufferSizeMB = flushOnBufferSizeMB;
+ this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
}
@Override
@@ -59,7 +65,8 @@
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
- flushOnBufferSizeMB) {
+ flushOnBufferSizeInBytes,
+ maxTimeInBufferMS) {
@Override
protected void submitRequestEntries(
@@ -69,7 +76,7 @@
}
@Override
- protected int getSizeInBytes(Integer requestEntry) {
+ protected long getSizeInBytes(Integer requestEntry) {
return 4;
}
};
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
index 4f6d5b1..a7ed47d 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/AsyncSinkBaseITCase.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -41,7 +42,7 @@
public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
env.fromSequence(999_999, 1_000_100)
.map(Object::toString)
- .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10));
+ .sinkTo(new ArrayListAsyncSink(1, 1, 2, 10, 1000));
Exception e =
assertThrows(
JobExecutionException.class,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 8db0aec..b3754d0 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -21,11 +21,13 @@
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
+
import org.junit.Before;
import org.junit.Test;
@@ -38,12 +40,18 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
@@ -51,7 +59,6 @@
*/
public class AsyncSinkWriterTest {
- private static final int BYTES_IN_MB = 1024 * 1024;
private final List<Integer> res = new ArrayList<>();
private final SinkInitContext sinkInitContext = new SinkInitContext();
@@ -93,6 +100,15 @@
}
@Test
+ public void testThatMailboxYieldDoesNotBlockWhileATimerIsRegisteredAndHasYetToElapse()
+ throws Exception {
+ AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
+ sink.write(String.valueOf(0));
+ sink.prepareCommit(true);
+ assertEquals(1, res.size());
+ }
+
+ @Test
public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterAutomaticFlush()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, false);
@@ -132,7 +148,8 @@
sink.write("95");
sink.write("35");
Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
- assertEquals("Deliberate runtime exception occurred in SinkWriterImplementation.",
+ assertEquals(
+ "Deliberate runtime exception occurred in SinkWriterImplementation.",
e.getMessage());
assertEquals(3, res.size());
}
@@ -241,10 +258,10 @@
}
@Test
- public void flushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
+ public void testFlushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
throws IOException, InterruptedException {
- AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100,
- (double) 30 / BYTES_IN_MB, true);
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 30, 1000, false);
/* Sink has flush threshold of 30 bytes, each integer is 4 bytes, therefore, flushing
* should occur once 8 elements have been written.
@@ -253,6 +270,256 @@
sink.write(String.valueOf(i));
}
assertEquals(8, res.size());
+ sink.write(String.valueOf(15));
+ assertEquals(16, res.size());
+ }
+
+ @Test
+ public void testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs()
+ throws IOException, InterruptedException {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 8, 1, 100, 32, 1000, false);
+
+ for (int i = 0; i < 8; i++) {
+ sink.write(String.valueOf(i));
+ }
+ assertEquals(8, res.size());
+ for (int i = 8; i < 16; i++) {
+ sink.write(String.valueOf(i));
+ }
+ assertEquals(16, res.size());
+ }
+
+ @Test
+ public void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectSize()
+ throws IOException, InterruptedException {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 110, 1000, true);
+
+ sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
+ sink.write(String.valueOf(1)); // Buffer: 104/110B; 2/10 elements; 0 inflight
+ sink.write(String.valueOf(2)); // Buffer: 108/110B; 3/10 elements; 0 inflight
+ sink.write(String.valueOf(3)); // Buffer: 112/110B; 4/10 elements; 0 inflight -- flushing
+ assertEquals(3, res.size()); // Element 225 failed on first attempt
+ sink.write(String.valueOf(4)); // Buffer: 4/110B; 1/10 elements; 1 inflight
+ sink.write(String.valueOf(5)); // Buffer: 8/110B; 2/10 elements; 1 inflight
+ sink.write(String.valueOf(6)); // Buffer: 12/110B; 3/10 elements; 1 inflight
+ sink.write(String.valueOf(325)); // Buffer: 112/110B; 4/10 elements; 1 inflight -- flushing
+
+ assertEquals(Arrays.asList(1, 2, 3, 225, 4, 5, 6), res);
+ }
+
+ @Test
+ public void testThatABatchWithSizeSmallerThanMaxBatchSizeIsFlushedOnTimeoutExpiry()
+ throws Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ for (int i = 0; i < 8; i++) {
+ sink.write(String.valueOf(i));
+ }
+
+ tpts.setCurrentTime(99L);
+ assertEquals(0, res.size());
+ tpts.setCurrentTime(100L);
+ assertEquals(8, res.size());
+ }
+
+ @Test
+ public void testThatTimeBasedBatchPicksUpAllRelevantItemsUpUntilExpiryOfTimer()
+ throws Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ for (int i = 0; i < 98; i++) {
+ tpts.setCurrentTime(i);
+ sink.write(String.valueOf(i));
+ }
+ tpts.setCurrentTime(99L);
+ assertEquals(90, res.size());
+ tpts.setCurrentTime(100L);
+ assertEquals(98, res.size());
+ }
+
+ @Test
+ public void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ sink.write("1"); // A timer is registered here to elapse at t=100
+ assertEquals(0, res.size());
+ tpts.setCurrentTime(10L);
+ sink.prepareCommit(true);
+ assertEquals(1, res.size());
+ tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
+ sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
+ assertEquals(1, res.size());
+ tpts.setCurrentTime(100L);
+ assertEquals(2, res.size());
+ sink.write("3");
+ tpts.setCurrentTime(199L); // At t=199s, our third element has not been written
+ assertEquals(2, res.size()); // therefore, no timer fired at 120s.
+ tpts.setCurrentTime(200L);
+ assertEquals(3, res.size());
+ }
+
+ @Test
+ public void testThatIntermittentlyFailingEntriesShouldBeFlushedWithMainBatchInTimeBasedFlush()
+ throws Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 10000, 100, true);
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ sink.write("1");
+ sink.write("2");
+ sink.write("225");
+ tpts.setCurrentTime(100L);
+ assertEquals(2, res.size());
+ sink.write("3");
+ sink.write("4");
+ tpts.setCurrentTime(199L);
+ assertEquals(2, res.size());
+ tpts.setCurrentTime(200L);
+ assertEquals(5, res.size());
+ }
+
+ @Test
+ public void testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ sink.write("1");
+ tpts.setCurrentTime(50L);
+ sink.prepareCommit(true);
+ assertEquals(1, res.size());
+ tpts.setCurrentTime(200L);
+ }
+
+ @Test
+ public void testThatOnExpiryOfAnOldTimeoutANewOneMayBeRegisteredImmediately() throws Exception {
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ sink.write("1");
+ tpts.setCurrentTime(100L);
+ assertEquals(1, res.size());
+ sink.write("2");
+ tpts.setCurrentTime(200L);
+ assertEquals(2, res.size());
+ }
+
+ /**
+ * This test considers what could happen if the timer elapses, triggering a flush, while a
+ * long-running call to {@code submitRequestEntries} remains uncompleted for some time. We have
+ * a countdown latch with an expiry of 500ms installed in the call to {@code
+ * submitRequestEntries} that blocks if the batch size received is 3 and subsequently accepts
+ * and succeeds with any value.
+ *
+ * <p>Let us call the thread writing "3" thread3 and the thread writing "4" thread4. Thread3
+ * will enter {@code submitRequestEntries} with 3 entries and release thread4. Thread3 becomes
+ * blocked for 500ms. Thread4 writes "4" to the buffer and is flushed when the timer triggers
+ * (timer was first set when "1" was written). Thread4 then is blocked during the flush phase
+ * since thread3 is in-flight and maxInFlightRequests=1. After 500ms elapses, thread3 is revived
+ * and proceeds, which also unblocks thread4. This results in 1, 2, 3 being written prior to 4.
+ *
+ * <p>This test also implicitly asserts that any thread in the SinkWriter must be the mailbox
+ * thread if it enters {@code mailbox.tryYield()}.
+ */
+ @Test
+ public void testThatInterleavingThreadsMayBlockEachOtherButDoNotCauseRaceConditions()
+ throws Exception {
+ CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+ CountDownLatch delayedStartLatch = new CountDownLatch(1);
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkReleaseAndBlockWriterImpl(
+ sinkInitContext,
+ 3,
+ 1,
+ 20,
+ 100,
+ 100,
+ blockedWriteLatch,
+ delayedStartLatch,
+ true);
+
+ writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
+ assertEquals(Arrays.asList(1, 2, 3, 4), res);
+ }
+
+ /**
+ * This test considers what could happen if the timer elapses, triggering a flush, while a
+ * long-running call to {@code submitRequestEntries} remains blocked. We have a countdown latch
+ * that blocks permanently until freed once the timer based flush is complete.
+ *
+ * <p>Let us call the thread writing "3" thread3 and the thread writing "4" thread4. Thread3
+ * will enter {@code submitRequestEntries} with 3 entries and release thread4. Thread3 becomes
+ * blocked. Thread4 writes "4" to the buffer and is flushed when the timer triggers (timer was
+ * first set when "1" was written). Thread4 completes and frees thread3. Thread3 is revived and
+ * proceeds. This results in 4 being written prior to 1, 2, 3.
+ *
+ * <p>This test also implicitly asserts that any thread in the SinkWriter must be the mailbox
+ * thread if it enters {@code mailbox.tryYield()}.
+ */
+ @Test
+ public void testThatIfOneInterleavedThreadIsBlockedTheOtherThreadWillContinueAndCorrectlyWrite()
+ throws Exception {
+ CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+ CountDownLatch delayedStartLatch = new CountDownLatch(1);
+ AsyncSinkWriterImpl sink =
+ new AsyncSinkReleaseAndBlockWriterImpl(
+ sinkInitContext,
+ 3,
+ 2,
+ 20,
+ 100,
+ 100,
+ blockedWriteLatch,
+ delayedStartLatch,
+ false);
+
+ writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
+ assertEquals(new ArrayList<>(Arrays.asList(4, 1, 2, 3)), res);
+ }
+
+ private void writeTwoElementsAndInterleaveTheNextTwoElements(
+ AsyncSinkWriterImpl sink,
+ CountDownLatch blockedWriteLatch,
+ CountDownLatch delayedStartLatch)
+ throws Exception {
+
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ ExecutorService es = Executors.newFixedThreadPool(4);
+
+ tpts.setCurrentTime(0L);
+ sink.write("1");
+ sink.write("2");
+ es.submit(
+ () -> {
+ try {
+ sink.write("3");
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+
+ delayedStartLatch.await();
+ sink.write("4");
+ tpts.setCurrentTime(100L);
+ blockedWriteLatch.countDown();
+ es.shutdown();
+ assertTrue(
+ es.awaitTermination(500, TimeUnit.MILLISECONDS),
+ "Executor Service stuck at termination, not terminated after 500ms!");
}
private class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
@@ -261,14 +528,19 @@
private final boolean simulateFailures;
public AsyncSinkWriterImpl(
- Sink.InitContext context, int maxBatchSize, int maxInFlightRequests,
- int maxBufferedRequests, boolean simulateFailures) {
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ boolean simulateFailures) {
super(
(elem, ctx) -> Integer.parseInt(elem),
context,
maxBatchSize,
maxInFlightRequests,
- maxBufferedRequests, 100);
+ maxBufferedRequests,
+ 10000000,
+ 1000);
this.simulateFailures = simulateFailures;
}
@@ -277,10 +549,17 @@
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
- double flushOnBufferSizeMB,
+ long flushOnBufferSizeInBytes,
+ long maxTimeInBufferMS,
boolean simulateFailures) {
- super((elem, ctx) -> Integer.parseInt(elem), context, maxBatchSize, maxInFlightRequests,
- maxBufferedRequests, flushOnBufferSizeMB);
+ super(
+ (elem, ctx) -> Integer.parseInt(elem),
+ context,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ flushOnBufferSizeInBytes,
+ maxTimeInBufferMS);
this.simulateFailures = simulateFailures;
}
@@ -329,14 +608,24 @@
}
}
+ /**
+ * @return If we're simulating failures and the requestEntry value is greater than 200, then
+ * the entry is size 100 bytes, otherwise each entry is 4 bytes.
+ */
@Override
- protected int getSizeInBytes(Integer requestEntry) {
- return 4;
+ protected long getSizeInBytes(Integer requestEntry) {
+ return requestEntry > 200 && simulateFailures ? 100 : 4;
}
}
private static class SinkInitContext implements Sink.InitContext {
+ private static final TestProcessingTimeService processingTimeService;
+
+ static {
+ processingTimeService = new TestProcessingTimeService();
+ }
+
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return null;
@@ -370,7 +659,19 @@
@Override
public Sink.ProcessingTimeService getProcessingTimeService() {
- return null;
+ return new Sink.ProcessingTimeService() {
+ @Override
+ public long getCurrentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
+ }
+
+ @Override
+ public void registerProcessingTimer(
+ long time, ProcessingTimeCallback processingTimerCallback) {
+ processingTimeService.registerTimer(
+ time, processingTimerCallback::onProcessingTime);
+ }
+ };
}
@Override
@@ -392,5 +693,66 @@
public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}
+
+ public TestProcessingTimeService getTestProcessingTimeService() {
+ return processingTimeService;
+ }
+ }
+
+ /**
+ * This SinkWriter releases the lock on existing threads blocked by {@code delayedStartLatch}
+ * and blocks itself until {@code blockedThreadLatch} is unblocked.
+ */
+ private class AsyncSinkReleaseAndBlockWriterImpl extends AsyncSinkWriterImpl {
+
+ private final CountDownLatch blockedThreadLatch;
+ private final CountDownLatch delayedStartLatch;
+ private final boolean blockForLimitedTime;
+
+ public AsyncSinkReleaseAndBlockWriterImpl(
+ Sink.InitContext context,
+ int maxBatchSize,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ long flushOnBufferSizeInBytes,
+ long maxTimeInBufferMS,
+ CountDownLatch blockedThreadLatch,
+ CountDownLatch delayedStartLatch,
+ boolean blockForLimitedTime) {
+ super(
+ context,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ flushOnBufferSizeInBytes,
+ maxTimeInBufferMS,
+ false);
+ this.blockedThreadLatch = blockedThreadLatch;
+ this.delayedStartLatch = delayedStartLatch;
+ this.blockForLimitedTime = blockForLimitedTime;
+ }
+
+ @Override
+ protected void submitRequestEntries(
+ List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
+ if (requestEntries.size() == 3) {
+ try {
+ delayedStartLatch.countDown();
+ if (blockForLimitedTime) {
+ assertFalse(
+ blockedThreadLatch.await(500, TimeUnit.MILLISECONDS),
+ "The countdown latch was released before the full amount"
+ + "of time was reached.");
+ } else {
+ blockedThreadLatch.await();
+ }
+ } catch (InterruptedException e) {
+ fail("The unit test latch must not have been interrupted by another thread.");
+ }
+ }
+
+ res.addAll(requestEntries);
+ requestResult.accept(new ArrayList<>());
+ }
}
}