[FLINK-19130] [core] Wire-in new metrics into AsyncSink
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
index 29f1c19..393f41b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/BackPressureValve.java
@@ -53,4 +53,12 @@
    * @param address the address
    */
   void blockAddress(Address address);
+
+  /**
+   * Checks whether a given address was previously blocked with {@link #blockAddress(Address)}.
+   *
+   * @param address the address to check
+   * @return boolean indicating whether or not the address was blocked.
+   */
+  boolean isAddressBlocked(Address address);
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
index 1ce0b00..91b642b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
@@ -88,6 +88,12 @@
     blockedAddressSet.remove(owningAddress);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public boolean isAddressBlocked(Address address) {
+    return blockedAddressSet.containsKey(address);
+  }
+
   private boolean totalPendingAsyncOperationsAtCapacity() {
     return maximumPendingAsynchronousOperations > 0
         && pendingAsynchronousOperationsCount >= maximumPendingAsynchronousOperations;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
index 873ab34..647c968 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/AsyncSink.java
@@ -27,6 +27,9 @@
 import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.di.Lazy;
 import org.apache.flink.statefun.flink.core.message.Message;
+import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
 import org.apache.flink.statefun.flink.core.queue.Locks;
 import org.apache.flink.statefun.flink.core.queue.MpscQueue;
 import org.apache.flink.statefun.sdk.Address;
@@ -36,6 +39,8 @@
   private final Lazy<Reductions> reductions;
   private final Executor operatorMailbox;
   private final BackPressureValve backPressureValve;
+  private final FunctionTypeMetricsRepository metricsRepository;
+  private final FunctionDispatcherMetrics dispatcherMetrics;
 
   private final MpscQueue<Message> completed = new MpscQueue<>(32768, Locks.jdkReentrantLock());
 
@@ -44,14 +49,18 @@
       PendingAsyncOperations pendingAsyncOperations,
       @Label("mailbox-executor") Executor operatorMailbox,
       @Label("reductions") Lazy<Reductions> reductions,
-      @Label("backpressure-valve") BackPressureValve backPressureValve) {
+      @Label("backpressure-valve") BackPressureValve backPressureValve,
+      @Label("function-metrics-repository") FunctionTypeMetricsRepository metricsRepository,
+      @Label("function-dispatcher-metrics") FunctionDispatcherMetrics dispatcherMetrics) {
     this.pendingAsyncOperations = Objects.requireNonNull(pendingAsyncOperations);
     this.reductions = Objects.requireNonNull(reductions);
     this.operatorMailbox = Objects.requireNonNull(operatorMailbox);
     this.backPressureValve = Objects.requireNonNull(backPressureValve);
+    this.metricsRepository = Objects.requireNonNull(metricsRepository);
+    this.dispatcherMetrics = Objects.requireNonNull(dispatcherMetrics);
   }
 
-  <T> void accept(Message metadata, CompletableFuture<T> future) {
+  <T> void accept(Address sourceAddress, Message metadata, CompletableFuture<T> future) {
     final long futureId = ThreadLocalRandom.current().nextLong(); // TODO: is this is good enough?
     // we keep the message in state (associated with futureId) until either:
     // 1. the future successfully completes and the message is processed. The state would be
@@ -59,8 +68,11 @@
     // 2. after recovery, we clear that state by notifying the owning function that we don't know
     // what happened
     // with that particular async operation.
-    pendingAsyncOperations.add(metadata.source(), futureId, metadata);
+    pendingAsyncOperations.add(sourceAddress, futureId, metadata);
     backPressureValve.notifyAsyncOperationRegistered();
+
+    metricsRepository.getMetrics(sourceAddress.type()).asyncOperationRegistered();
+    dispatcherMetrics.asyncOperationRegistered();
     future.whenComplete((result, throwable) -> enqueue(metadata, futureId, result, throwable));
   }
 
@@ -72,6 +84,7 @@
    */
   void blockAddress(Address address) {
     backPressureValve.blockAddress(address);
+    metricsRepository.getMetrics(address.type()).blockedAddress();
   }
 
   private <T> void enqueue(Message message, long futureId, T result, Throwable throwable) {
@@ -90,7 +103,17 @@
     Reductions reductions = this.reductions.get();
     Message message;
     while ((message = batchOfCompletedFutures.poll()) != null) {
-      backPressureValve.notifyAsyncOperationCompleted(message.target());
+      Address target = message.target();
+      FunctionTypeMetrics functionMetrics = metricsRepository.getMetrics(target.type());
+
+      // must check whether address was blocked BEFORE notifying completion
+      if (backPressureValve.isAddressBlocked(target)) {
+        functionMetrics.unblockedAddress();
+      }
+      backPressureValve.notifyAsyncOperationCompleted(target);
+
+      functionMetrics.asyncOperationCompleted();
+      dispatcherMetrics.asyncOperationCompleted();
       reductions.enqueue(message);
     }
     reductions.processEnvelopes();
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
index abce51a..44686e0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/Reductions.java
@@ -32,7 +32,10 @@
 import org.apache.flink.statefun.flink.core.di.ObjectContainer;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.metrics.FlinkFunctionDispatcherMetrics;
 import org.apache.flink.statefun.flink.core.metrics.FlinkMetricsFactory;
+import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
 import org.apache.flink.statefun.flink.core.metrics.MetricsFactory;
 import org.apache.flink.statefun.flink.core.state.FlinkState;
 import org.apache.flink.statefun.flink.core.state.State;
@@ -71,6 +74,11 @@
     container.add("function-providers", Map.class, statefulFunctionsUniverse.functions());
     container.add(
         "function-repository", FunctionRepository.class, StatefulFunctionRepository.class);
+    container.addAlias(
+        "function-metrics-repository",
+        FunctionTypeMetricsRepository.class,
+        "function-repository",
+        FunctionRepository.class);
 
     // for FlinkState
     container.add("runtime-context", RuntimeContext.class, context);
@@ -96,6 +104,10 @@
     container.add(Reductions.class);
     container.add(LocalFunctionGroup.class);
     container.add("metrics-factory", MetricsFactory.class, new FlinkMetricsFactory(metricGroup));
+    container.add(
+        "function-dispatcher-metrics",
+        FunctionDispatcherMetrics.class,
+        new FlinkFunctionDispatcherMetrics(metricGroup));
 
     // for delayed messages
     container.add(
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 55c646b..62289e5 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -113,7 +113,7 @@
     Objects.requireNonNull(future);
 
     Message message = messageFactory.from(self(), self(), metadata);
-    asyncSink.accept(message, future);
+    asyncSink.accept(self(), message, future);
   }
 
   @Override
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
index 46d9ed1..bdb67e3 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/functions/ReductionsTest.java
@@ -58,6 +58,7 @@
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -578,7 +579,7 @@
 
     @Override
     public Counter counter(String s) {
-      throw new UnsupportedOperationException();
+      return new SimpleCounter();
     }
 
     @Override