blob: 647c9683e188462859eacd405854239cc63260cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.functions;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.di.Lazy;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
import org.apache.flink.statefun.flink.core.queue.Locks;
import org.apache.flink.statefun.flink.core.queue.MpscQueue;
import org.apache.flink.statefun.sdk.Address;
final class AsyncSink {
private final PendingAsyncOperations pendingAsyncOperations;
private final Lazy<Reductions> reductions;
private final Executor operatorMailbox;
private final BackPressureValve backPressureValve;
private final FunctionTypeMetricsRepository metricsRepository;
private final FunctionDispatcherMetrics dispatcherMetrics;
private final MpscQueue<Message> completed = new MpscQueue<>(32768, Locks.jdkReentrantLock());
@Inject
AsyncSink(
PendingAsyncOperations pendingAsyncOperations,
@Label("mailbox-executor") Executor operatorMailbox,
@Label("reductions") Lazy<Reductions> reductions,
@Label("backpressure-valve") BackPressureValve backPressureValve,
@Label("function-metrics-repository") FunctionTypeMetricsRepository metricsRepository,
@Label("function-dispatcher-metrics") FunctionDispatcherMetrics dispatcherMetrics) {
this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations);
this.reductions = Objects.requireNonNull(reductions);
this.operatorMailbox = Objects.requireNonNull(operatorMailbox);
this.backPressureValve = Objects.requireNonNull(backPressureValve);
this.metricsRepository = Objects.requireNonNull(metricsRepository);
this.dispatcherMetrics = Objects.requireNonNull(dispatcherMetrics);
}
<T> void accept(Address sourceAddress, Message metadata, CompletableFuture<T> future) {
final long futureId = ThreadLocalRandom.current().nextLong(); // TODO: is this is good enough?
// we keep the message in state (associated with futureId) until either:
// 1. the future successfully completes and the message is processed. The state would be
// cleared by the AsyncMessageDecorator after a successful application.
// 2. after recovery, we clear that state by notifying the owning function that we don't know
// what happened
// with that particular async operation.
pendingAsyncOperations.add(sourceAddress, futureId, metadata);
backPressureValve.notifyAsyncOperationRegistered();
metricsRepository.getMetrics(sourceAddress.type()).asyncOperationRegistered();
dispatcherMetrics.asyncOperationRegistered();
future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable));
}
/**
* Requests to stop processing any further input for that address, as long as there is an
* uncompleted async operation (owned by @address).
*
* @param address the address
*/
void blockAddress(Address address) {
backPressureValve.blockAddress(address);
metricsRepository.getMetrics(address.type()).blockedAddress();
}
private <T> void enqueue(Message message, long futureId, T result, Throwable throwable) {
AsyncMessageDecorator<T> decoratedMessage =
new AsyncMessageDecorator<>(pendingAsyncOperations, futureId, message, result, throwable);
final int size = completed.add(decoratedMessage);
if (size == 1) {
// the queue has become non empty, we need to schedule a drain operation.
operatorMailbox.execute(this::drainOnOperatorThread);
}
}
private void drainOnOperatorThread() {
Deque<Message> batchOfCompletedFutures = completed.drainAll();
Reductions reductions = this.reductions.get();
Message message;
while ((message = batchOfCompletedFutures.poll()) != null) {
Address target = message.target();
FunctionTypeMetrics functionMetrics = metricsRepository.getMetrics(target.type());
// must check whether address was blocked BEFORE notifying completion
if (backPressureValve.isAddressBlocked(target)) {
functionMetrics.unblockedAddress();
}
backPressureValve.notifyAsyncOperationCompleted(target);
functionMetrics.asyncOperationCompleted();
dispatcherMetrics.asyncOperationCompleted();
reductions.enqueue(message);
}
reductions.processEnvelopes();
}
}