[FLINK-19130] [core] Wire-in new metrics into AsyncSink
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
index 29f1c19..393f41b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
@@ -53,4 +53,12 @@
* @param address the address
*/
void blockAddress(Address address);
+
+ /**
+ * Checks whether a given address was previously blocked with {@link #blockAddress(Address)}.
+ *
+ * @param address the address to check
+ * @return boolean indicating whether or not the address was blocked.
+ */
+ boolean isAddressBlocked(Address address);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
index 1ce0b00..91b642b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
@@ -88,6 +88,12 @@
blockedAddressSet.remove(owningAddress);
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean isAddressBlocked(Address address) {
+ return blockedAddressSet.containsKey(address);
+ }
+
private boolean totalPendingAsyncOperationsAtCapacity() {
return maximumPendingAsynchronousOperations > 0
&& pendingAsynchronousOperationsCount >= maximumPendingAsynchronousOperations;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
index 873ab34..647c968 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
@@ -27,6 +27,9 @@
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.di.Lazy;
import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
import org.apache.flink.statefun.flink.core.queue.Locks;
import org.apache.flink.statefun.flink.core.queue.MpscQueue;
import org.apache.flink.statefun.sdk.Address;
@@ -36,6 +39,8 @@
private final Lazy<Reductions> reductions;
private final Executor operatorMailbox;
private final BackPressureValve backPressureValve;
+ private final FunctionTypeMetricsRepository metricsRepository;
+ private final FunctionDispatcherMetrics dispatcherMetrics;
private final MpscQueue<Message> completed = new MpscQueue<>(32768, Locks.jdkReentrantLock());
@@ -44,14 +49,18 @@
PendingAsyncOperations pendingAsyncOperations,
@Label("mailbox-executor") Executor operatorMailbox,
@Label("reductions") Lazy<Reductions> reductions,
- @Label("backpressure-valve") BackPressureValve backPressureValve) {
+ @Label("backpressure-valve") BackPressureValve backPressureValve,
+ @Label("function-metrics-repository") FunctionTypeMetricsRepository metricsRepository,
+ @Label("function-dispatcher-metrics") FunctionDispatcherMetrics dispatcherMetrics) {
this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations);
this.reductions = Objects.requireNonNull(reductions);
this.operatorMailbox = Objects.requireNonNull(operatorMailbox);
this.backPressureValve = Objects.requireNonNull(backPressureValve);
+ this.metricsRepository = Objects.requireNonNull(metricsRepository);
+ this.dispatcherMetrics = Objects.requireNonNull(dispatcherMetrics);
}
- <T> void accept(Message metadata, CompletableFuture<T> future) {
+ <T> void accept(Address sourceAddress, Message metadata, CompletableFuture<T> future) {
final long futureId = ThreadLocalRandom.current().nextLong(); // TODO: is this is good enough?
// we keep the message in state (associated with futureId) until either:
// 1. the future successfully completes and the message is processed. The state would be
@@ -59,8 +68,11 @@
// 2. after recovery, we clear that state by notifying the owning function that we don't know
// what happened
// with that particular async operation.
- pendingAsyncOperations.add(metadata.source(), futureId, metadata);
+ pendingAsyncOperations.add(sourceAddress, futureId, metadata);
backPressureValve.notifyAsyncOperationRegistered();
+
+ metricsRepository.getMetrics(sourceAddress.type()).asyncOperationRegistered();
+ dispatcherMetrics.asyncOperationRegistered();
future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable));
}
@@ -72,6 +84,7 @@
*/
void blockAddress(Address address) {
backPressureValve.blockAddress(address);
+ metricsRepository.getMetrics(address.type()).blockedAddress();
}
private <T> void enqueue(Message message, long futureId, T result, Throwable throwable) {
@@ -90,7 +103,17 @@
Reductions reductions = this.reductions.get();
Message message;
while ((message = batchOfCompletedFutures.poll()) != null) {
- backPressureValve.notifyAsyncOperationCompleted(message.target());
+ Address target = message.target();
+ FunctionTypeMetrics functionMetrics = metricsRepository.getMetrics(target.type());
+
+ // must check whether address was blocked BEFORE notifying completion
+ if (backPressureValve.isAddressBlocked(target)) {
+ functionMetrics.unblockedAddress();
+ }
+ backPressureValve.notifyAsyncOperationCompleted(target);
+
+ functionMetrics.asyncOperationCompleted();
+ dispatcherMetrics.asyncOperationCompleted();
reductions.enqueue(message);
}
reductions.processEnvelopes();
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index abce51a..44686e0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -32,7 +32,10 @@
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.metrics.FlinkFunctionDispatcherMetrics;
import org.apache.flink.statefun.flink.core.metrics.FlinkMetricsFactory;
+import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
import org.apache.flink.statefun.flink.core.metrics.MetricsFactory;
import org.apache.flink.statefun.flink.core.state.FlinkState;
import org.apache.flink.statefun.flink.core.state.State;
@@ -71,6 +74,11 @@
container.add("function-providers", Map.class, statefulFunctionsUniverse.functions());
container.add(
"function-repository", FunctionRepository.class, StatefulFunctionRepository.class);
+ container.addAlias(
+ "function-metrics-repository",
+ FunctionTypeMetricsRepository.class,
+ "function-repository",
+ FunctionRepository.class);
// for FlinkState
container.add("runtime-context", RuntimeContext.class, context);
@@ -96,6 +104,10 @@
container.add(Reductions.class);
container.add(LocalFunctionGroup.class);
container.add("metrics-factory", MetricsFactory.class, new FlinkMetricsFactory(metricGroup));
+ container.add(
+ "function-dispatcher-metrics",
+ FunctionDispatcherMetrics.class,
+ new FlinkFunctionDispatcherMetrics(metricGroup));
// for delayed messages
container.add(
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 55c646b..62289e5 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -113,7 +113,7 @@
Objects.requireNonNull(future);
Message message = messageFactory.from(self(), self(), metadata);
- asyncSink.accept(message, future);
+ asyncSink.accept(self(), message, future);
}
@Override
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index 46d9ed1..bdb67e3 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -58,6 +58,7 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -578,7 +579,7 @@
@Override
public Counter counter(String s) {
- throw new UnsupportedOperationException();
+ return new SimpleCounter();
}
@Override