[FLINK-19017] Implement the remote function metrics interface
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index 113a694..d45f4d4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -51,7 +51,7 @@
   @Override
   public CompletableFuture<FromFunction> call(
       ToFunctionRequestSummary requestSummary, ToFunction toFunction) {
-    Request request =
+  Request request =
         new Request.Builder()
             .url(url)
             .post(RequestBody.create(MEDIA_TYPE_BINARY, toFunction.toByteArray()))
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java
index 34fff35..3a90fa7 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FlinkFunctionTypeMetrics.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.statefun.flink.core.metrics;
 
+import com.codahale.metrics.UniformReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
@@ -30,6 +33,8 @@
   private final Counter blockedAddress;
   private final Counter inflightAsyncOps;
   private final Counter backlogMessage;
+  private final Counter remoteInvocationFailures;
+  private final Histogram remoteInvocationLatency;
 
   FlinkFunctionTypeMetrics(MetricGroup typeGroup) {
     this.incoming = metered(typeGroup, "in");
@@ -39,6 +44,8 @@
     this.blockedAddress = typeGroup.counter("num-blocked-address");
     this.inflightAsyncOps = typeGroup.counter("inflight-async-ops");
     this.backlogMessage = typeGroup.counter("num-backlog");
+    this.remoteInvocationFailures = metered(typeGroup, "remote-invocation-failures");
+    this.remoteInvocationLatency = typeGroup.histogram("remote-invocation-latency", histogram());
   }
 
   @Override
@@ -91,9 +98,25 @@
     backlogMessage.dec(count);
   }
 
+  @Override
+  public void remoteInvocationFailures() {
+    remoteInvocationFailures.inc();
+  }
+
+  @Override
+  public void remoteInvocationLatency(long elapsed) {
+    remoteInvocationLatency.update(elapsed);
+  }
+
   private static SimpleCounter metered(MetricGroup metrics, String name) {
     SimpleCounter counter = metrics.counter(name, new SimpleCounter());
     metrics.meter(name + "Rate", new MeterView(counter, 60));
     return counter;
   }
+
+  private static DropwizardHistogramWrapper histogram() {
+    com.codahale.metrics.Histogram dropwizardHistogram =
+        new com.codahale.metrics.Histogram(new UniformReservoir());
+    return new DropwizardHistogramWrapper(dropwizardHistogram);
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java
index 9fd779a..f92aec8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.statefun.flink.core.metrics;
 
-public interface FunctionTypeMetrics {
+public interface FunctionTypeMetrics extends RemoteInvocationMetrics {
 
   void incomingMessage();
 
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 7554b92..1d60e18 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -251,7 +251,8 @@
 
     @Override
     public CompletableFuture<FromFunction> call(
-        ToFunctionRequestSummary requestSummary, ToFunction toFunction) {
+        ToFunctionRequestSummary requestSummary,
+        ToFunction toFunction) {
       this.wasSentToFunction = toFunction;
       try {
         return CompletableFuture.completedFuture(this.fromFunction.get());
@@ -343,6 +344,12 @@
     }
 
     @Override
+    public void remoteInvocationFailures() {}
+
+    @Override
+    public void remoteInvocationLatency(long elapsed) {}
+
+    @Override
     public void asyncOperationRegistered() {}
 
     @Override