[FLINK-19130] [core] Expose function type metrics via InternalContext
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
index a40d7c8..7ac6803 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.statefun.flink.core.backpressure;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
 import org.apache.flink.statefun.sdk.Context;
 
 @Internal
@@ -37,4 +38,11 @@
    * every async operation registered per each address.
    */
   void awaitAsyncOperationComplete();
+
+  /**
+   * Returns the metrics handle for the current invoked function's type.
+   *
+   * @return the metrics handle for the current invoked function's type.
+   */
+  FunctionTypeMetrics functionTypeMetrics();
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 97561dd..77db7dc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -25,6 +25,7 @@
 import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.message.Message;
 import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
 import org.apache.flink.statefun.flink.core.state.State;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -122,6 +123,11 @@
   }
 
   @Override
+  public FunctionTypeMetrics functionTypeMetrics() {
+    return function.metrics();
+  }
+
+  @Override
   public Address caller() {
     return in.source();
   }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 339cd91..1199ab7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -38,6 +38,7 @@
 import org.apache.flink.statefun.flink.core.TestUtils;
 import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
@@ -250,6 +251,8 @@
 
   private static final class FakeContext implements InternalContext {
 
+    private final FunctionTypeMetrics fakeMetrics = new FakeMetrics();
+
     Address caller;
     boolean needsWaiting;
 
@@ -263,6 +266,11 @@
     }
 
     @Override
+    public FunctionTypeMetrics functionTypeMetrics() {
+      return fakeMetrics;
+    }
+
+    @Override
     public Address self() {
       return new Address(FN_TYPE, "0");
     }
@@ -288,4 +296,31 @@
     @Override
     public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {}
   }
+
+  private static final class FakeMetrics implements FunctionTypeMetrics {
+
+    @Override
+    public void asyncOperationRegistered() {}
+
+    @Override
+    public void asyncOperationCompleted() {}
+
+    @Override
+    public void incomingMessage() {}
+
+    @Override
+    public void outgoingRemoteMessage() {}
+
+    @Override
+    public void outgoingEgressMessage() {}
+
+    @Override
+    public void outgoingLocalMessage() {}
+
+    @Override
+    public void blockedAddress() {}
+
+    @Override
+    public void unblockedAddress() {}
+  }
 }