blob: b3754d061a499b1006698beabb48b017f5a0c999 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
* implementation might do.
*/
public class AsyncSinkWriterTest {
private final List<Integer> res = new ArrayList<>();
private final SinkInitContext sinkInitContext = new SinkInitContext();
@Before
public void before() {
res.clear();
}
@Test
public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
for (int i = 0; i < 80; i++) {
sink.write(String.valueOf(i));
}
assertEquals(80, res.size());
}
@Test
public void testThatUnwrittenRecordsInBufferArePersistedWhenSnapshotIsTaken()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
for (int i = 0; i < 23; i++) {
sink.write(String.valueOf(i));
}
assertEquals(20, res.size());
assertEquals(Arrays.asList(20, 21, 22), new ArrayList<>(sink.snapshotState().get(0)));
}
@Test
public void testPreparingCommitAtSnapshotTimeEnsuresBufferedRecordsArePersistedToDestination()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
for (int i = 0; i < 23; i++) {
sink.write(String.valueOf(i));
}
sink.prepareCommit(true);
assertEquals(23, res.size());
}
@Test
public void testThatMailboxYieldDoesNotBlockWhileATimerIsRegisteredAndHasYetToElapse()
throws Exception {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
sink.write(String.valueOf(0));
sink.prepareCommit(true);
assertEquals(1, res.size());
}
@Test
public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterAutomaticFlush()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, false);
sink.write("25");
sink.write("55");
assertEquals(Arrays.asList(25, 55), new ArrayList<>(sink.snapshotState().get(0)));
assertEquals(0, res.size());
sink.write("75");
assertEquals(Arrays.asList(), new ArrayList<>(sink.snapshotState().get(0)));
assertEquals(3, res.size());
}
@Test
public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, false);
sink.write("25");
sink.write("55");
sink.write("75");
sink.write("95");
sink.write("955");
assertEquals(Arrays.asList(95, 955), new ArrayList<>(sink.snapshotState().get(0)));
sink.prepareCommit(true);
assertEquals(Arrays.asList(), new ArrayList<>(sink.snapshotState().get(0)));
assertEquals(5, res.size());
}
@Test
public void testRuntimeErrorsInSubmitRequestEntriesEndUpAsIOExceptionsWithNumOfFailedRequests()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, true);
sink.write("25");
sink.write("55");
sink.write("75");
sink.write("95");
sink.write("35");
Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
assertEquals(
"Deliberate runtime exception occurred in SinkWriterImplementation.",
e.getMessage());
assertEquals(3, res.size());
}
@Test
public void testRetryableErrorsDoNotViolateAtLeastOnceSemanticsDueToRequeueOfFailures()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, true);
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "25", Arrays.asList(), Arrays.asList(25));
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "55", Arrays.asList(), Arrays.asList(25, 55));
// 25, 55 persisted; 965 failed and inflight
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "965", Arrays.asList(25, 55), Arrays.asList());
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "75", Arrays.asList(25, 55), Arrays.asList(75));
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "95", Arrays.asList(25, 55), Arrays.asList(75, 95));
/*
* Writing 955 to the sink increases the buffer to size 3 containing [75, 95, 955]. This
* triggers the outstanding in flight request with the failed 965 to be run, and 965 is
* placed at the front of the queue. The first {@code maxBatchSize = 3} elements are
* persisted, with 965 succeeding this (second) time. 955 remains in the buffer.
*/
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "955", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(955));
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "550", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(955, 550));
/*
* [955, 550, 45] are attempted to be persisted
*/
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "45", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList());
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "35", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList(35));
/* [35, 535] should be in the bufferedRequestEntries
* [955, 550] should be in the inFlightRequest, ready to be added
* [25, 55, 965, 75, 95, 45] should be downstream already
*/
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "535", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList(35, 535));
// Checkpoint occurs
sink.prepareCommit(true);
// Everything is saved
assertEquals(Arrays.asList(25, 55, 965, 75, 95, 45, 550, 955, 35, 535), res);
assertEquals(0, sink.snapshotState().get(0).size());
}
@Test
public void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, true);
sink.write("25");
sink.write("55");
sink.write("965");
sink.write("75");
sink.write("95");
sink.write("955");
assertTrue(res.contains(965));
sink.write("550");
sink.write("645");
sink.write("545");
sink.write("535");
sink.write("515");
assertTrue(res.contains(955));
sink.write("505");
assertTrue(res.contains(550));
assertTrue(res.contains(645));
sink.prepareCommit(true);
assertTrue(res.contains(545));
assertTrue(res.contains(535));
assertTrue(res.contains(515));
}
@Test
public void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() {
Exception e =
assertThrows(
IllegalArgumentException.class,
() -> new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 10, false));
assertEquals(
e.getMessage(),
"The maximum number of requests that may be buffered should be "
+ "strictly greater than the maximum number of requests per batch.");
}
private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
AsyncSinkWriterImpl sink, String x, List<Integer> y, List<Integer> z)
throws IOException, InterruptedException {
sink.write(x);
assertEquals(y, res);
assertEquals(z, new ArrayList<>(sink.snapshotState().get(0)));
}
@Test
public void testFlushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 30, 1000, false);
/* Sink has flush threshold of 30 bytes, each integer is 4 bytes, therefore, flushing
* should occur once 8 elements have been written.
*/
for (int i = 0; i < 15; i++) {
sink.write(String.valueOf(i));
}
assertEquals(8, res.size());
sink.write(String.valueOf(15));
assertEquals(16, res.size());
}
@Test
public void testThatWhenNumberOfItemAndSizeOfRecordThresholdsAreMetSimultaneouslyAFlushOccurs()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 8, 1, 100, 32, 1000, false);
for (int i = 0; i < 8; i++) {
sink.write(String.valueOf(i));
}
assertEquals(8, res.size());
for (int i = 8; i < 16; i++) {
sink.write(String.valueOf(i));
}
assertEquals(16, res.size());
}
@Test
public void testThatIntermittentlyFailingEntriesAreEnqueuedOnToTheBufferWithCorrectSize()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 110, 1000, true);
sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
sink.write(String.valueOf(1)); // Buffer: 104/110B; 2/10 elements; 0 inflight
sink.write(String.valueOf(2)); // Buffer: 108/110B; 3/10 elements; 0 inflight
sink.write(String.valueOf(3)); // Buffer: 112/110B; 4/10 elements; 0 inflight -- flushing
assertEquals(3, res.size()); // Element 225 failed on first attempt
sink.write(String.valueOf(4)); // Buffer: 4/110B; 1/10 elements; 1 inflight
sink.write(String.valueOf(5)); // Buffer: 8/110B; 2/10 elements; 1 inflight
sink.write(String.valueOf(6)); // Buffer: 12/110B; 3/10 elements; 1 inflight
sink.write(String.valueOf(325)); // Buffer: 112/110B; 4/10 elements; 1 inflight -- flushing
assertEquals(Arrays.asList(1, 2, 3, 225, 4, 5, 6), res);
}
@Test
public void testThatABatchWithSizeSmallerThanMaxBatchSizeIsFlushedOnTimeoutExpiry()
throws Exception {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
tpts.setCurrentTime(0L);
for (int i = 0; i < 8; i++) {
sink.write(String.valueOf(i));
}
tpts.setCurrentTime(99L);
assertEquals(0, res.size());
tpts.setCurrentTime(100L);
assertEquals(8, res.size());
}
@Test
public void testThatTimeBasedBatchPicksUpAllRelevantItemsUpUntilExpiryOfTimer()
throws Exception {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
for (int i = 0; i < 98; i++) {
tpts.setCurrentTime(i);
sink.write(String.valueOf(i));
}
tpts.setCurrentTime(99L);
assertEquals(90, res.size());
tpts.setCurrentTime(100L);
assertEquals(98, res.size());
}
@Test
public void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
tpts.setCurrentTime(0L);
sink.write("1"); // A timer is registered here to elapse at t=100
assertEquals(0, res.size());
tpts.setCurrentTime(10L);
sink.prepareCommit(true);
assertEquals(1, res.size());
tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
assertEquals(1, res.size());
tpts.setCurrentTime(100L);
assertEquals(2, res.size());
sink.write("3");
tpts.setCurrentTime(199L); // At t=199s, our third element has not been written
assertEquals(2, res.size()); // therefore, no timer fired at 120s.
tpts.setCurrentTime(200L);
assertEquals(3, res.size());
}
@Test
public void testThatIntermittentlyFailingEntriesShouldBeFlushedWithMainBatchInTimeBasedFlush()
throws Exception {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, 10000, 100, true);
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
tpts.setCurrentTime(0L);
sink.write("1");
sink.write("2");
sink.write("225");
tpts.setCurrentTime(100L);
assertEquals(2, res.size());
sink.write("3");
sink.write("4");
tpts.setCurrentTime(199L);
assertEquals(2, res.size());
tpts.setCurrentTime(200L);
assertEquals(5, res.size());
}
@Test
public void testThatFlushingAnEmptyBufferDoesNotResultInErrorOrFailure() throws Exception {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
tpts.setCurrentTime(0L);
sink.write("1");
tpts.setCurrentTime(50L);
sink.prepareCommit(true);
assertEquals(1, res.size());
tpts.setCurrentTime(200L);
}
@Test
public void testThatOnExpiryOfAnOldTimeoutANewOneMayBeRegisteredImmediately() throws Exception {
AsyncSinkWriterImpl sink =
new AsyncSinkWriterImpl(sinkInitContext, 10, 20, 100, 10000, 100, true);
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
tpts.setCurrentTime(0L);
sink.write("1");
tpts.setCurrentTime(100L);
assertEquals(1, res.size());
sink.write("2");
tpts.setCurrentTime(200L);
assertEquals(2, res.size());
}
/**
* This test considers what could happen if the timer elapses, triggering a flush, while a
* long-running call to {@code submitRequestEntries} remains uncompleted for some time. We have
* a countdown latch with an expiry of 500ms installed in the call to {@code
* submitRequestEntries} that blocks if the batch size received is 3 and subsequently accepts
* and succeeds with any value.
*
* <p>Let us call the thread writing "3" thread3 and the thread writing "4" thread4. Thread3
* will enter {@code submitRequestEntries} with 3 entries and release thread4. Thread3 becomes
* blocked for 500ms. Thread4 writes "4" to the buffer and is flushed when the timer triggers
* (timer was first set when "1" was written). Thread4 then is blocked during the flush phase
* since thread3 is in-flight and maxInFlightRequests=1. After 500ms elapses, thread3 is revived
* and proceeds, which also unblocks thread4. This results in 1, 2, 3 being written prior to 4.
*
* <p>This test also implicitly asserts that any thread in the SinkWriter must be the mailbox
* thread if it enters {@code mailbox.tryYield()}.
*/
@Test
public void testThatInterleavingThreadsMayBlockEachOtherButDoNotCauseRaceConditions()
throws Exception {
CountDownLatch blockedWriteLatch = new CountDownLatch(1);
CountDownLatch delayedStartLatch = new CountDownLatch(1);
AsyncSinkWriterImpl sink =
new AsyncSinkReleaseAndBlockWriterImpl(
sinkInitContext,
3,
1,
20,
100,
100,
blockedWriteLatch,
delayedStartLatch,
true);
writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
assertEquals(Arrays.asList(1, 2, 3, 4), res);
}
/**
* This test considers what could happen if the timer elapses, triggering a flush, while a
* long-running call to {@code submitRequestEntries} remains blocked. We have a countdown latch
* that blocks permanently until freed once the timer based flush is complete.
*
* <p>Let us call the thread writing "3" thread3 and the thread writing "4" thread4. Thread3
* will enter {@code submitRequestEntries} with 3 entries and release thread4. Thread3 becomes
* blocked. Thread4 writes "4" to the buffer and is flushed when the timer triggers (timer was
* first set when "1" was written). Thread4 completes and frees thread3. Thread3 is revived and
* proceeds. This results in 4 being written prior to 1, 2, 3.
*
* <p>This test also implicitly asserts that any thread in the SinkWriter must be the mailbox
* thread if it enters {@code mailbox.tryYield()}.
*/
@Test
public void testThatIfOneInterleavedThreadIsBlockedTheOtherThreadWillContinueAndCorrectlyWrite()
throws Exception {
CountDownLatch blockedWriteLatch = new CountDownLatch(1);
CountDownLatch delayedStartLatch = new CountDownLatch(1);
AsyncSinkWriterImpl sink =
new AsyncSinkReleaseAndBlockWriterImpl(
sinkInitContext,
3,
2,
20,
100,
100,
blockedWriteLatch,
delayedStartLatch,
false);
writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
assertEquals(new ArrayList<>(Arrays.asList(4, 1, 2, 3)), res);
}
private void writeTwoElementsAndInterleaveTheNextTwoElements(
AsyncSinkWriterImpl sink,
CountDownLatch blockedWriteLatch,
CountDownLatch delayedStartLatch)
throws Exception {
TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
ExecutorService es = Executors.newFixedThreadPool(4);
tpts.setCurrentTime(0L);
sink.write("1");
sink.write("2");
es.submit(
() -> {
try {
sink.write("3");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
});
delayedStartLatch.await();
sink.write("4");
tpts.setCurrentTime(100L);
blockedWriteLatch.countDown();
es.shutdown();
assertTrue(
es.awaitTermination(500, TimeUnit.MILLISECONDS),
"Executor Service stuck at termination, not terminated after 500ms!");
}
private class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
private final Set<Integer> failedFirstAttempts = new HashSet<>();
private final boolean simulateFailures;
public AsyncSinkWriterImpl(
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
boolean simulateFailures) {
super(
(elem, ctx) -> Integer.parseInt(elem),
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
10000000,
1000);
this.simulateFailures = simulateFailures;
}
public AsyncSinkWriterImpl(
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long flushOnBufferSizeInBytes,
long maxTimeInBufferMS,
boolean simulateFailures) {
super(
(elem, ctx) -> Integer.parseInt(elem),
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
flushOnBufferSizeInBytes,
maxTimeInBufferMS);
this.simulateFailures = simulateFailures;
}
public void write(String val) throws IOException, InterruptedException {
write(val, null);
}
/**
* Fails if any value is between 101 and 200. If {@code simulateFailures} is set, it will
* fail on the first attempt but succeeds upon retry on all others for entries strictly
* greater than 200.
*
* <p>A limitation of this basic implementation is that each element written must be unique.
*
* @param requestEntries a set of request entries that should be persisted to {@code res}
* @param requestResult a Consumer that needs to accept a collection of failure elements
* once all request entries have been persisted
*/
@Override
protected void submitRequestEntries(
List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) {
throw new RuntimeException(
"Deliberate runtime exception occurred in SinkWriterImplementation.");
}
if (simulateFailures) {
List<Integer> successfulRetries =
failedFirstAttempts.stream()
.filter(requestEntries::contains)
.collect(Collectors.toList());
failedFirstAttempts.removeIf(successfulRetries::contains);
List<Integer> firstTimeFailed =
requestEntries.stream()
.filter(x -> !successfulRetries.contains(x))
.filter(val -> val > 200)
.collect(Collectors.toList());
failedFirstAttempts.addAll(firstTimeFailed);
requestEntries.removeAll(firstTimeFailed);
res.addAll(requestEntries);
requestResult.accept(firstTimeFailed);
} else {
res.addAll(requestEntries);
requestResult.accept(new ArrayList<>());
}
}
/**
* @return If we're simulating failures and the requestEntry value is greater than 200, then
* the entry is size 100 bytes, otherwise each entry is 4 bytes.
*/
@Override
protected long getSizeInBytes(Integer requestEntry) {
return requestEntry > 200 && simulateFailures ? 100 : 4;
}
}
private static class SinkInitContext implements Sink.InitContext {
private static final TestProcessingTimeService processingTimeService;
static {
processingTimeService = new TestProcessingTimeService();
}
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return null;
}
@Override
public MailboxExecutor getMailboxExecutor() {
StreamTaskActionExecutor streamTaskActionExecutor =
new StreamTaskActionExecutor() {
@Override
public void run(RunnableWithException e) throws Exception {
e.run();
}
@Override
public <E extends Throwable> void runThrowing(
ThrowingRunnable<E> throwingRunnable) throws E {
throwingRunnable.run();
}
@Override
public <R> R call(Callable<R> callable) throws Exception {
return callable.call();
}
};
return new MailboxExecutorImpl(
new TaskMailboxImpl(Thread.currentThread()),
Integer.MAX_VALUE,
streamTaskActionExecutor);
}
@Override
public Sink.ProcessingTimeService getProcessingTimeService() {
return new Sink.ProcessingTimeService() {
@Override
public long getCurrentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
@Override
public void registerProcessingTimer(
long time, ProcessingTimeCallback processingTimerCallback) {
processingTimeService.registerTimer(
time, processingTimerCallback::onProcessingTime);
}
};
}
@Override
public int getSubtaskId() {
return 0;
}
@Override
public int getNumberOfParallelSubtasks() {
return 0;
}
@Override
public SinkWriterMetricGroup metricGroup() {
return null;
}
@Override
public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}
public TestProcessingTimeService getTestProcessingTimeService() {
return processingTimeService;
}
}
/**
* This SinkWriter releases the lock on existing threads blocked by {@code delayedStartLatch}
* and blocks itself until {@code blockedThreadLatch} is unblocked.
*/
private class AsyncSinkReleaseAndBlockWriterImpl extends AsyncSinkWriterImpl {
private final CountDownLatch blockedThreadLatch;
private final CountDownLatch delayedStartLatch;
private final boolean blockForLimitedTime;
public AsyncSinkReleaseAndBlockWriterImpl(
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long flushOnBufferSizeInBytes,
long maxTimeInBufferMS,
CountDownLatch blockedThreadLatch,
CountDownLatch delayedStartLatch,
boolean blockForLimitedTime) {
super(
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
flushOnBufferSizeInBytes,
maxTimeInBufferMS,
false);
this.blockedThreadLatch = blockedThreadLatch;
this.delayedStartLatch = delayedStartLatch;
this.blockForLimitedTime = blockForLimitedTime;
}
@Override
protected void submitRequestEntries(
List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
if (requestEntries.size() == 3) {
try {
delayedStartLatch.countDown();
if (blockForLimitedTime) {
assertFalse(
blockedThreadLatch.await(500, TimeUnit.MILLISECONDS),
"The countdown latch was released before the full amount"
+ "of time was reached.");
} else {
blockedThreadLatch.await();
}
} catch (InterruptedException e) {
fail("The unit test latch must not have been interrupted by another thread.");
}
}
res.addAll(requestEntries);
requestResult.accept(new ArrayList<>());
}
}
}