[FLINK-19017] Implement the remote function metrics interface
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index 113a694..d45f4d4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -51,7 +51,7 @@
@Override
public CompletableFuture<FromFunction> call(
ToFunctionRequestSummary requestSummary, ToFunction toFunction) {
- Request request =
+ Request request =
new Request.Builder()
.url(url)
.post(RequestBody.create(MEDIA_TYPE_BINARY, toFunction.toByteArray()))
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java
index 34fff35..3a90fa7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java
@@ -17,7 +17,10 @@
*/
package org.apache.flink.statefun.flink.core.metrics;
+import com.codahale.metrics.UniformReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
@@ -30,6 +33,8 @@
private final Counter blockedAddress;
private final Counter inflightAsyncOps;
private final Counter backlogMessage;
+ private final Counter remoteInvocationFailures;
+ private final Histogram remoteInvocationLatency;
FlinkFunctionTypeMetrics(MetricGroup typeGroup) {
this.incoming = metered(typeGroup, "in");
@@ -39,6 +44,8 @@
this.blockedAddress = typeGroup.counter("num-blocked-address");
this.inflightAsyncOps = typeGroup.counter("inflight-async-ops");
this.backlogMessage = typeGroup.counter("num-backlog");
+ this.remoteInvocationFailures = metered(typeGroup, "remote-invocation-failures");
+ this.remoteInvocationLatency = typeGroup.histogram("remote-invocation-latency", histogram());
}
@Override
@@ -91,9 +98,25 @@
backlogMessage.dec(count);
}
+ @Override
+ public void remoteInvocationFailures() {
+ remoteInvocationFailures.inc();
+ }
+
+ @Override
+ public void remoteInvocationLatency(long elapsed) {
+ remoteInvocationLatency.update(elapsed);
+ }
+
private static SimpleCounter metered(MetricGroup metrics, String name) {
SimpleCounter counter = metrics.counter(name, new SimpleCounter());
metrics.meter(name + "Rate", new MeterView(counter, 60));
return counter;
}
+
+ private static DropwizardHistogramWrapper histogram() {
+ com.codahale.metrics.Histogram dropwizardHistogram =
+ new com.codahale.metrics.Histogram(new UniformReservoir());
+ return new DropwizardHistogramWrapper(dropwizardHistogram);
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java
index 9fd779a..f92aec8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.statefun.flink.core.metrics;
-public interface FunctionTypeMetrics {
+public interface FunctionTypeMetrics extends RemoteInvocationMetrics {
void incomingMessage();
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 7554b92..1d60e18 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -251,7 +251,8 @@
@Override
public CompletableFuture<FromFunction> call(
- ToFunctionRequestSummary requestSummary, ToFunction toFunction) {
+ ToFunctionRequestSummary requestSummary,
+ ToFunction toFunction) {
this.wasSentToFunction = toFunction;
try {
return CompletableFuture.completedFuture(this.fromFunction.get());
@@ -343,6 +344,12 @@
}
@Override
+ public void remoteInvocationFailures() {}
+
+ @Override
+ public void remoteInvocationLatency(long elapsed) {}
+
+ @Override
public void asyncOperationRegistered() {}
@Override