blob: 8db0aec1a73ec4b4f7240a3a388f790f261f2219 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
* implementation might do.
*/
public class AsyncSinkWriterTest {
private static final int BYTES_IN_MB = 1024 * 1024;
private final List<Integer> res = new ArrayList<>();
private final SinkInitContext sinkInitContext = new SinkInitContext();
@Before
public void before() {
res.clear();
}
@Test
public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
for (int i = 0; i < 80; i++) {
sink.write(String.valueOf(i));
}
assertEquals(80, res.size());
}
@Test
public void testThatUnwrittenRecordsInBufferArePersistedWhenSnapshotIsTaken()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
for (int i = 0; i < 23; i++) {
sink.write(String.valueOf(i));
}
assertEquals(20, res.size());
assertEquals(Arrays.asList(20, 21, 22), new ArrayList<>(sink.snapshotState().get(0)));
}
@Test
public void testPreparingCommitAtSnapshotTimeEnsuresBufferedRecordsArePersistedToDestination()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100, false);
for (int i = 0; i < 23; i++) {
sink.write(String.valueOf(i));
}
sink.prepareCommit(true);
assertEquals(23, res.size());
}
@Test
public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterAutomaticFlush()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, false);
sink.write("25");
sink.write("55");
assertEquals(Arrays.asList(25, 55), new ArrayList<>(sink.snapshotState().get(0)));
assertEquals(0, res.size());
sink.write("75");
assertEquals(Arrays.asList(), new ArrayList<>(sink.snapshotState().get(0)));
assertEquals(3, res.size());
}
@Test
public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, false);
sink.write("25");
sink.write("55");
sink.write("75");
sink.write("95");
sink.write("955");
assertEquals(Arrays.asList(95, 955), new ArrayList<>(sink.snapshotState().get(0)));
sink.prepareCommit(true);
assertEquals(Arrays.asList(), new ArrayList<>(sink.snapshotState().get(0)));
assertEquals(5, res.size());
}
@Test
public void testRuntimeErrorsInSubmitRequestEntriesEndUpAsIOExceptionsWithNumOfFailedRequests()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, true);
sink.write("25");
sink.write("55");
sink.write("75");
sink.write("95");
sink.write("35");
Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
assertEquals("Deliberate runtime exception occurred in SinkWriterImplementation.",
e.getMessage());
assertEquals(3, res.size());
}
@Test
public void testRetryableErrorsDoNotViolateAtLeastOnceSemanticsDueToRequeueOfFailures()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, true);
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "25", Arrays.asList(), Arrays.asList(25));
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "55", Arrays.asList(), Arrays.asList(25, 55));
// 25, 55 persisted; 965 failed and inflight
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "965", Arrays.asList(25, 55), Arrays.asList());
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "75", Arrays.asList(25, 55), Arrays.asList(75));
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "95", Arrays.asList(25, 55), Arrays.asList(75, 95));
/*
* Writing 955 to the sink increases the buffer to size 3 containing [75, 95, 955]. This
* triggers the outstanding in flight request with the failed 965 to be run, and 965 is
* placed at the front of the queue. The first {@code maxBatchSize = 3} elements are
* persisted, with 965 succeeding this (second) time. 955 remains in the buffer.
*/
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "955", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(955));
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "550", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(955, 550));
/*
* [955, 550, 45] are attempted to be persisted
*/
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "45", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList());
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "35", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList(35));
/* [35, 535] should be in the bufferedRequestEntries
* [955, 550] should be in the inFlightRequest, ready to be added
* [25, 55, 965, 75, 95, 45] should be downstream already
*/
writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
sink, "535", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList(35, 535));
// Checkpoint occurs
sink.prepareCommit(true);
// Everything is saved
assertEquals(Arrays.asList(25, 55, 965, 75, 95, 45, 550, 955, 35, 535), res);
assertEquals(0, sink.snapshotState().get(0).size());
}
@Test
public void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 3, 1, 100, true);
sink.write("25");
sink.write("55");
sink.write("965");
sink.write("75");
sink.write("95");
sink.write("955");
assertTrue(res.contains(965));
sink.write("550");
sink.write("645");
sink.write("545");
sink.write("535");
sink.write("515");
assertTrue(res.contains(955));
sink.write("505");
assertTrue(res.contains(550));
assertTrue(res.contains(645));
sink.prepareCommit(true);
assertTrue(res.contains(545));
assertTrue(res.contains(535));
assertTrue(res.contains(515));
}
@Test
public void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() {
Exception e =
assertThrows(
IllegalArgumentException.class,
() -> new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 10, false));
assertEquals(
e.getMessage(),
"The maximum number of requests that may be buffered should be "
+ "strictly greater than the maximum number of requests per batch.");
}
private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
AsyncSinkWriterImpl sink, String x, List<Integer> y, List<Integer> z)
throws IOException, InterruptedException {
sink.write(x);
assertEquals(y, res);
assertEquals(z, new ArrayList<>(sink.snapshotState().get(0)));
}
@Test
public void flushThresholdMetBeforeBatchLimitWillCreateASmallerBatchOfSizeAboveThreshold()
throws IOException, InterruptedException {
AsyncSinkWriterImpl sink = new AsyncSinkWriterImpl(sinkInitContext, 10, 1, 100,
(double) 30 / BYTES_IN_MB, true);
/* Sink has flush threshold of 30 bytes, each integer is 4 bytes, therefore, flushing
* should occur once 8 elements have been written.
*/
for (int i = 0; i < 15; i++) {
sink.write(String.valueOf(i));
}
assertEquals(8, res.size());
}
private class AsyncSinkWriterImpl extends AsyncSinkWriter<String, Integer> {
private final Set<Integer> failedFirstAttempts = new HashSet<>();
private final boolean simulateFailures;
public AsyncSinkWriterImpl(
Sink.InitContext context, int maxBatchSize, int maxInFlightRequests,
int maxBufferedRequests, boolean simulateFailures) {
super(
(elem, ctx) -> Integer.parseInt(elem),
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests, 100);
this.simulateFailures = simulateFailures;
}
public AsyncSinkWriterImpl(
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
double flushOnBufferSizeMB,
boolean simulateFailures) {
super((elem, ctx) -> Integer.parseInt(elem), context, maxBatchSize, maxInFlightRequests,
maxBufferedRequests, flushOnBufferSizeMB);
this.simulateFailures = simulateFailures;
}
public void write(String val) throws IOException, InterruptedException {
write(val, null);
}
/**
* Fails if any value is between 101 and 200. If {@code simulateFailures} is set, it will
* fail on the first attempt but succeeds upon retry on all others for entries strictly
* greater than 200.
*
* <p>A limitation of this basic implementation is that each element written must be unique.
*
* @param requestEntries a set of request entries that should be persisted to {@code res}
* @param requestResult a Consumer that needs to accept a collection of failure elements
* once all request entries have been persisted
*/
@Override
protected void submitRequestEntries(
List<Integer> requestEntries, Consumer<Collection<Integer>> requestResult) {
if (requestEntries.stream().anyMatch(val -> val > 100 && val <= 200)) {
throw new RuntimeException(
"Deliberate runtime exception occurred in SinkWriterImplementation.");
}
if (simulateFailures) {
List<Integer> successfulRetries =
failedFirstAttempts.stream()
.filter(requestEntries::contains)
.collect(Collectors.toList());
failedFirstAttempts.removeIf(successfulRetries::contains);
List<Integer> firstTimeFailed =
requestEntries.stream()
.filter(x -> !successfulRetries.contains(x))
.filter(val -> val > 200)
.collect(Collectors.toList());
failedFirstAttempts.addAll(firstTimeFailed);
requestEntries.removeAll(firstTimeFailed);
res.addAll(requestEntries);
requestResult.accept(firstTimeFailed);
} else {
res.addAll(requestEntries);
requestResult.accept(new ArrayList<>());
}
}
@Override
protected int getSizeInBytes(Integer requestEntry) {
return 4;
}
}
private static class SinkInitContext implements Sink.InitContext {
@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return null;
}
@Override
public MailboxExecutor getMailboxExecutor() {
StreamTaskActionExecutor streamTaskActionExecutor =
new StreamTaskActionExecutor() {
@Override
public void run(RunnableWithException e) throws Exception {
e.run();
}
@Override
public <E extends Throwable> void runThrowing(
ThrowingRunnable<E> throwingRunnable) throws E {
throwingRunnable.run();
}
@Override
public <R> R call(Callable<R> callable) throws Exception {
return callable.call();
}
};
return new MailboxExecutorImpl(
new TaskMailboxImpl(Thread.currentThread()),
Integer.MAX_VALUE,
streamTaskActionExecutor);
}
@Override
public Sink.ProcessingTimeService getProcessingTimeService() {
return null;
}
@Override
public int getSubtaskId() {
return 0;
}
@Override
public int getNumberOfParallelSubtasks() {
return 0;
}
@Override
public SinkWriterMetricGroup metricGroup() {
return null;
}
@Override
public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}
}
}