| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.base.sink.writer; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.api.common.operators.MailboxExecutor; |
| import org.apache.flink.api.connector.sink.Sink; |
| import org.apache.flink.api.connector.sink.SinkWriter; |
| import org.apache.flink.util.Preconditions; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Deque; |
| import java.util.List; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| |
| /** |
| * A generic sink writer that handles the general behaviour of a sink such as batching and flushing, |
| * and allows extenders to implement the logic for persisting individual request elements, with |
| * allowance for retries. |
| * |
| * <p>At least once semantics is supported through {@code prepareCommit} as outstanding requests are |
| * flushed or completed prior to checkpointing. |
| * |
| * <p>Designed to be returned at {@code createWriter} time by an {@code AsyncSinkBase}. |
| * |
| * <p>There are configuration options to customize the buffer size etc. |
| */ |
| @PublicEvolving |
| public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable> |
| implements SinkWriter<InputT, Void, Collection<RequestEntryT>> { |
| |
| private final MailboxExecutor mailboxExecutor; |
| private final Sink.ProcessingTimeService timeService; |
| |
| private final int maxBatchSize; |
| private final int maxInFlightRequests; |
| private final int maxBufferedRequests; |
| private final long flushOnBufferSizeInBytes; |
| private final long maxTimeInBufferMS; |
| |
| /** |
| * The ElementConverter provides a mapping between for the elements of a stream to request |
| * entries that can be sent to the destination. |
| * |
| * <p>The resulting request entry is buffered by the AsyncSinkWriter and sent to the destination |
| * when the {@code submitRequestEntries} method is invoked. |
| */ |
| private final ElementConverter<InputT, RequestEntryT> elementConverter; |
| |
| /** |
| * Buffer to hold request entries that should be persisted into the destination, along with its |
| * size in bytes. |
| * |
| * <p>A request entry contain all relevant details to make a call to the destination. Eg, for |
| * Kinesis Data Streams a request entry contains the payload and partition key. |
| * |
| * <p>It seems more natural to buffer InputT, ie, the events that should be persisted, rather |
| * than RequestEntryT. However, in practice, the response of a failed request call can make it |
| * very hard, if not impossible, to reconstruct the original event. It is much easier, to just |
| * construct a new (retry) request entry from the response and add that back to the queue for |
| * later retry. |
| */ |
| private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries = |
| new ArrayDeque<>(); |
| |
| /** |
| * Tracks all pending async calls that have been executed since the last checkpoint. Calls that |
| * completed (successfully or unsuccessfully) are automatically decrementing the counter. Any |
| * request entry that was not successfully persisted needs to be handled and retried by the |
| * logic in {@code submitRequestsToApi}. |
| * |
| * <p>There is a limit on the number of concurrent (async) requests that can be handled by the |
| * client library. This limit is enforced by checking the queue size before accepting a new |
| * element into the queue. |
| * |
| * <p>To complete a checkpoint, we need to make sure that no requests are in flight, as they may |
| * fail, which could then lead to data loss. |
| */ |
| private int inFlightRequestsCount; |
| |
| /** |
| * Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate |
| * the criterion for flushing after {@code flushOnBufferSizeInBytes} is reached. |
| */ |
| private double bufferedRequestEntriesTotalSizeInBytes; |
| |
| private boolean existsActiveTimerCallback = false; |
| |
| /** |
| * This method specifies how to persist buffered request entries into the destination. It is |
| * implemented when support for a new destination is added. |
| * |
| * <p>The method is invoked with a set of request entries according to the buffering hints (and |
| * the valid limits of the destination). The logic then needs to create and execute the request |
| * against the destination (ideally by batching together multiple request entries to increase |
| * efficiency). The logic also needs to identify individual request entries that were not |
| * persisted successfully and resubmit them using the {@code requeueFailedRequestEntry} method. |
| * |
| * <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight |
| * requests. |
| * |
| * @param requestEntries a set of request entries that should be sent to the destination |
| * @param requestResult the {@code accept} method should be called on this Consumer once the |
| * processing of the {@code requestEntries} are complete. Any entries that encountered |
| * difficulties in persisting should be re-queued through {@code requestResult} by including |
| * that element in the collection of {@code RequestEntryT}s passed to the {@code accept} |
| * method. All other elements are assumed to have been successfully persisted. |
| */ |
| protected abstract void submitRequestEntries( |
| List<RequestEntryT> requestEntries, Consumer<Collection<RequestEntryT>> requestResult); |
| |
| /** |
| * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in |
| * this case is measured as the total bytes that is written to the destination as a result of |
| * persisting this particular {@code RequestEntryT} rather than the serialized length (which may |
| * be the same). |
| * |
| * @param requestEntry the requestEntry for which we want to know the size |
| * @return the size of the requestEntry, as defined previously |
| */ |
| protected abstract long getSizeInBytes(RequestEntryT requestEntry); |
| |
| public AsyncSinkWriter( |
| ElementConverter<InputT, RequestEntryT> elementConverter, |
| Sink.InitContext context, |
| int maxBatchSize, |
| int maxInFlightRequests, |
| int maxBufferedRequests, |
| long flushOnBufferSizeInBytes, |
| long maxTimeInBufferMS) { |
| this.elementConverter = elementConverter; |
| this.mailboxExecutor = context.getMailboxExecutor(); |
| this.timeService = context.getProcessingTimeService(); |
| |
| Preconditions.checkNotNull(elementConverter); |
| Preconditions.checkArgument(maxBatchSize > 0); |
| Preconditions.checkArgument(maxBufferedRequests > 0); |
| Preconditions.checkArgument(maxInFlightRequests > 0); |
| Preconditions.checkArgument(flushOnBufferSizeInBytes > 0); |
| Preconditions.checkArgument(maxTimeInBufferMS > 0); |
| Preconditions.checkArgument( |
| maxBufferedRequests > maxBatchSize, |
| "The maximum number of requests that may be buffered should be strictly" |
| + " greater than the maximum number of requests per batch."); |
| this.maxBatchSize = maxBatchSize; |
| this.maxInFlightRequests = maxInFlightRequests; |
| this.maxBufferedRequests = maxBufferedRequests; |
| this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes; |
| this.maxTimeInBufferMS = maxTimeInBufferMS; |
| |
| this.inFlightRequestsCount = 0; |
| this.bufferedRequestEntriesTotalSizeInBytes = 0; |
| } |
| |
| private void registerCallback() { |
| Sink.ProcessingTimeService.ProcessingTimeCallback ptc = |
| instant -> { |
| existsActiveTimerCallback = false; |
| while (!bufferedRequestEntries.isEmpty()) { |
| flush(); |
| } |
| }; |
| timeService.registerProcessingTimer( |
| timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc); |
| existsActiveTimerCallback = true; |
| } |
| |
| @Override |
| public void write(InputT element, Context context) throws IOException, InterruptedException { |
| while (bufferedRequestEntries.size() >= maxBufferedRequests) { |
| mailboxExecutor.tryYield(); |
| } |
| |
| addEntryToBuffer(elementConverter.apply(element, context), false); |
| |
| flushIfAble(); |
| } |
| |
| private void flushIfAble() { |
| while (bufferedRequestEntries.size() >= maxBatchSize |
| || bufferedRequestEntriesTotalSizeInBytes >= flushOnBufferSizeInBytes) { |
| flush(); |
| } |
| } |
| |
| /** |
| * Persists buffered RequestsEntries into the destination by invoking {@code |
| * submitRequestEntries} with batches according to the user specified buffering hints. |
| * |
| * <p>The method blocks if too many async requests are in flight. |
| */ |
| private void flush() { |
| while (inFlightRequestsCount >= maxInFlightRequests) { |
| mailboxExecutor.tryYield(); |
| } |
| |
| List<RequestEntryT> batch = new ArrayList<>(maxBatchSize); |
| |
| int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size()); |
| for (int i = 0; i < batchSize; i++) { |
| RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove(); |
| batch.add(elem.getRequestEntry()); |
| bufferedRequestEntriesTotalSizeInBytes -= elem.getSize(); |
| } |
| |
| if (batch.size() == 0) { |
| return; |
| } |
| |
| Consumer<Collection<RequestEntryT>> requestResult = |
| failedRequestEntries -> |
| mailboxExecutor.execute( |
| () -> completeRequest(failedRequestEntries), |
| "Mark in-flight request as completed and requeue %d request entries", |
| failedRequestEntries.size()); |
| |
| inFlightRequestsCount++; |
| submitRequestEntries(batch, requestResult); |
| } |
| |
| /** |
| * Marks an in-flight request as completed and prepends failed requestEntries back to the |
| * internal requestEntry buffer for later retry. |
| * |
| * @param failedRequestEntries requestEntries that need to be retried |
| */ |
| private void completeRequest(Collection<RequestEntryT> failedRequestEntries) { |
| inFlightRequestsCount--; |
| failedRequestEntries.forEach(failedEntry -> addEntryToBuffer(failedEntry, true)); |
| } |
| |
| private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) { |
| if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) { |
| registerCallback(); |
| } |
| |
| RequestEntryWrapper<RequestEntryT> wrappedEntry = |
| new RequestEntryWrapper<>(entry, getSizeInBytes(entry)); |
| |
| if (insertAtHead) { |
| bufferedRequestEntries.addFirst(wrappedEntry); |
| } else { |
| bufferedRequestEntries.add(wrappedEntry); |
| } |
| |
| bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize(); |
| } |
| |
| /** |
| * In flight requests will be retried if the sink is still healthy. But if in-flight requests |
| * fail after a checkpoint has been triggered and Flink needs to recover from the checkpoint, |
| * the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any |
| * outstanding in-flight requests when a commit is initialized. |
| * |
| * <p>To this end, all in-flight requests need to completed before proceeding with the commit. |
| */ |
| @Override |
| public List<Void> prepareCommit(boolean flush) { |
| while (inFlightRequestsCount > 0 || bufferedRequestEntries.size() > 0) { |
| mailboxExecutor.tryYield(); |
| if (flush) { |
| flush(); |
| } |
| } |
| |
| return Collections.emptyList(); |
| } |
| |
| /** |
| * All in-flight requests that are relevant for the snapshot have been completed, but there may |
| * still be request entries in the internal buffers that are yet to be sent to the endpoint. |
| * These request entries are stored in the snapshot state so that they don't get lost in case of |
| * a failure/restart of the application. |
| */ |
| @Override |
| public List<Collection<RequestEntryT>> snapshotState() { |
| return Arrays.asList( |
| bufferedRequestEntries.stream() |
| .map(RequestEntryWrapper::getRequestEntry) |
| .collect(Collectors.toList())); |
| } |
| |
| @Override |
| public void close() {} |
| } |