blob: b5f6b40660864dda997393962d978f76cdf61788 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* A generic sink writer that handles the general behaviour of a sink such as batching and flushing,
* and allows extenders to implement the logic for persisting individual request elements, with
* allowance for retries.
*
* <p>At least once semantics is supported through {@code prepareCommit} as outstanding requests are
* flushed or completed prior to checkpointing.
*
* <p>Designed to be returned at {@code createWriter} time by an {@code AsyncSinkBase}.
*
* <p>There are configuration options to customize the buffer size etc.
*/
@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
private final MailboxExecutor mailboxExecutor;
private final Sink.ProcessingTimeService timeService;
private final int maxBatchSize;
private final int maxInFlightRequests;
private final int maxBufferedRequests;
private final long flushOnBufferSizeInBytes;
private final long maxTimeInBufferMS;
/**
* The ElementConverter provides a mapping between for the elements of a stream to request
* entries that can be sent to the destination.
*
* <p>The resulting request entry is buffered by the AsyncSinkWriter and sent to the destination
* when the {@code submitRequestEntries} method is invoked.
*/
private final ElementConverter<InputT, RequestEntryT> elementConverter;
/**
* Buffer to hold request entries that should be persisted into the destination, along with its
* size in bytes.
*
* <p>A request entry contain all relevant details to make a call to the destination. Eg, for
* Kinesis Data Streams a request entry contains the payload and partition key.
*
* <p>It seems more natural to buffer InputT, ie, the events that should be persisted, rather
* than RequestEntryT. However, in practice, the response of a failed request call can make it
* very hard, if not impossible, to reconstruct the original event. It is much easier, to just
* construct a new (retry) request entry from the response and add that back to the queue for
* later retry.
*/
private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries =
new ArrayDeque<>();
/**
* Tracks all pending async calls that have been executed since the last checkpoint. Calls that
* completed (successfully or unsuccessfully) are automatically decrementing the counter. Any
* request entry that was not successfully persisted needs to be handled and retried by the
* logic in {@code submitRequestsToApi}.
*
* <p>There is a limit on the number of concurrent (async) requests that can be handled by the
* client library. This limit is enforced by checking the queue size before accepting a new
* element into the queue.
*
* <p>To complete a checkpoint, we need to make sure that no requests are in flight, as they may
* fail, which could then lead to data loss.
*/
private int inFlightRequestsCount;
/**
* Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
* the criterion for flushing after {@code flushOnBufferSizeInBytes} is reached.
*/
private double bufferedRequestEntriesTotalSizeInBytes;
private boolean existsActiveTimerCallback = false;
/**
* This method specifies how to persist buffered request entries into the destination. It is
* implemented when support for a new destination is added.
*
* <p>The method is invoked with a set of request entries according to the buffering hints (and
* the valid limits of the destination). The logic then needs to create and execute the request
* against the destination (ideally by batching together multiple request entries to increase
* efficiency). The logic also needs to identify individual request entries that were not
* persisted successfully and resubmit them using the {@code requeueFailedRequestEntry} method.
*
* <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight
* requests.
*
* @param requestEntries a set of request entries that should be sent to the destination
* @param requestResult the {@code accept} method should be called on this Consumer once the
* processing of the {@code requestEntries} are complete. Any entries that encountered
* difficulties in persisting should be re-queued through {@code requestResult} by including
* that element in the collection of {@code RequestEntryT}s passed to the {@code accept}
* method. All other elements are assumed to have been successfully persisted.
*/
protected abstract void submitRequestEntries(
List<RequestEntryT> requestEntries, Consumer<Collection<RequestEntryT>> requestResult);
/**
* This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
* this case is measured as the total bytes that is written to the destination as a result of
* persisting this particular {@code RequestEntryT} rather than the serialized length (which may
* be the same).
*
* @param requestEntry the requestEntry for which we want to know the size
* @return the size of the requestEntry, as defined previously
*/
protected abstract long getSizeInBytes(RequestEntryT requestEntry);
public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long flushOnBufferSizeInBytes,
long maxTimeInBufferMS) {
this.elementConverter = elementConverter;
this.mailboxExecutor = context.getMailboxExecutor();
this.timeService = context.getProcessingTimeService();
Preconditions.checkNotNull(elementConverter);
Preconditions.checkArgument(maxBatchSize > 0);
Preconditions.checkArgument(maxBufferedRequests > 0);
Preconditions.checkArgument(maxInFlightRequests > 0);
Preconditions.checkArgument(flushOnBufferSizeInBytes > 0);
Preconditions.checkArgument(maxTimeInBufferMS > 0);
Preconditions.checkArgument(
maxBufferedRequests > maxBatchSize,
"The maximum number of requests that may be buffered should be strictly"
+ " greater than the maximum number of requests per batch.");
this.maxBatchSize = maxBatchSize;
this.maxInFlightRequests = maxInFlightRequests;
this.maxBufferedRequests = maxBufferedRequests;
this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
this.maxTimeInBufferMS = maxTimeInBufferMS;
this.inFlightRequestsCount = 0;
this.bufferedRequestEntriesTotalSizeInBytes = 0;
}
private void registerCallback() {
Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
instant -> {
existsActiveTimerCallback = false;
while (!bufferedRequestEntries.isEmpty()) {
flush();
}
};
timeService.registerProcessingTimer(
timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
existsActiveTimerCallback = true;
}
@Override
public void write(InputT element, Context context) throws IOException, InterruptedException {
while (bufferedRequestEntries.size() >= maxBufferedRequests) {
mailboxExecutor.tryYield();
}
addEntryToBuffer(elementConverter.apply(element, context), false);
flushIfAble();
}
private void flushIfAble() {
while (bufferedRequestEntries.size() >= maxBatchSize
|| bufferedRequestEntriesTotalSizeInBytes >= flushOnBufferSizeInBytes) {
flush();
}
}
/**
* Persists buffered RequestsEntries into the destination by invoking {@code
* submitRequestEntries} with batches according to the user specified buffering hints.
*
* <p>The method blocks if too many async requests are in flight.
*/
private void flush() {
while (inFlightRequestsCount >= maxInFlightRequests) {
mailboxExecutor.tryYield();
}
List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
for (int i = 0; i < batchSize; i++) {
RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove();
batch.add(elem.getRequestEntry());
bufferedRequestEntriesTotalSizeInBytes -= elem.getSize();
}
if (batch.size() == 0) {
return;
}
Consumer<Collection<RequestEntryT>> requestResult =
failedRequestEntries ->
mailboxExecutor.execute(
() -> completeRequest(failedRequestEntries),
"Mark in-flight request as completed and requeue %d request entries",
failedRequestEntries.size());
inFlightRequestsCount++;
submitRequestEntries(batch, requestResult);
}
/**
* Marks an in-flight request as completed and prepends failed requestEntries back to the
* internal requestEntry buffer for later retry.
*
* @param failedRequestEntries requestEntries that need to be retried
*/
private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
inFlightRequestsCount--;
failedRequestEntries.forEach(failedEntry -> addEntryToBuffer(failedEntry, true));
}
private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
registerCallback();
}
RequestEntryWrapper<RequestEntryT> wrappedEntry =
new RequestEntryWrapper<>(entry, getSizeInBytes(entry));
if (insertAtHead) {
bufferedRequestEntries.addFirst(wrappedEntry);
} else {
bufferedRequestEntries.add(wrappedEntry);
}
bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize();
}
/**
* In flight requests will be retried if the sink is still healthy. But if in-flight requests
* fail after a checkpoint has been triggered and Flink needs to recover from the checkpoint,
* the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
* outstanding in-flight requests when a commit is initialized.
*
* <p>To this end, all in-flight requests need to completed before proceeding with the commit.
*/
@Override
public List<Void> prepareCommit(boolean flush) {
while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0) {
mailboxExecutor.tryYield();
if (flush) {
flush();
}
}
return Collections.emptyList();
}
/**
* All in-flight requests that are relevant for the snapshot have been completed, but there may
* still be request entries in the internal buffers that are yet to be sent to the endpoint.
* These request entries are stored in the snapshot state so that they don't get lost in case of
* a failure/restart of the application.
*/
@Override
public List<Collection<RequestEntryT>> snapshotState() {
return Arrays.asList(
bufferedRequestEntries.stream()
.map(RequestEntryWrapper::getRequestEntry)
.collect(Collectors.toList()));
}
@Override
public void close() {}
}