[FLINK-19130] [core] Expose function type metrics via InternalContext
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
index a40d7c8..7ac6803 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
@@ -19,6 +19,7 @@
package org.apache.flink.statefun.flink.core.backpressure;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.sdk.Context;
@Internal
@@ -37,4 +38,11 @@
* every async operation registered per each address.
*/
void awaitAsyncOperationComplete();
+
+ /**
+ * Returns the metrics handle for the current invoked function's type.
+ *
+ * @return the metrics handle for the current invoked function's type.
+ */
+ FunctionTypeMetrics functionTypeMetrics();
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 97561dd..77db7dc 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -25,6 +25,7 @@
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -122,6 +123,11 @@
}
@Override
+ public FunctionTypeMetrics functionTypeMetrics() {
+ return function.metrics();
+ }
+
+ @Override
public Address caller() {
return in.source();
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 339cd91..1199ab7 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -38,6 +38,7 @@
import org.apache.flink.statefun.flink.core.TestUtils;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
+import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
@@ -250,6 +251,8 @@
private static final class FakeContext implements InternalContext {
+ private final FunctionTypeMetrics fakeMetrics = new FakeMetrics();
+
Address caller;
boolean needsWaiting;
@@ -263,6 +266,11 @@
}
@Override
+ public FunctionTypeMetrics functionTypeMetrics() {
+ return fakeMetrics;
+ }
+
+ @Override
public Address self() {
return new Address(FN_TYPE, "0");
}
@@ -288,4 +296,31 @@
@Override
public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> future) {}
}
+
+ private static final class FakeMetrics implements FunctionTypeMetrics {
+
+ @Override
+ public void asyncOperationRegistered() {}
+
+ @Override
+ public void asyncOperationCompleted() {}
+
+ @Override
+ public void incomingMessage() {}
+
+ @Override
+ public void outgoingRemoteMessage() {}
+
+ @Override
+ public void outgoingEgressMessage() {}
+
+ @Override
+ public void outgoingLocalMessage() {}
+
+ @Override
+ public void blockedAddress() {}
+
+ @Override
+ public void unblockedAddress() {}
+ }
}