[FLINK-19130] [core] Rename AsyncWaiter to InternalContext

As a preparation to expose FunctionTypeMetrics to functions for internal
axcess only, AsyncWaiter is renamed to a more general-purpose name
"InternalContext" so that it makes sense to add more internal-only
context methods there.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
similarity index 94%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
index ddbe247..a40d7c8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
@@ -19,9 +19,10 @@
 package org.apache.flink.statefun.flink.core.backpressure;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.sdk.Context;
 
 @Internal
-public interface AsyncWaiter {
+public interface InternalContext extends Context {
 
   /**
    * Signals the runtime to stop invoking the currently executing function with new input until at
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
index 91b642b..e6f6ade 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
@@ -43,7 +43,7 @@
 
   /**
    * a set of address that had explicitly requested to stop processing any new inputs (via {@link
-   * AsyncWaiter#awaitAsyncOperationComplete()}. Note that this is a set implemented on top of a
+   * InternalContext#awaitAsyncOperationComplete()}. Note that this is a set implemented on top of a
    * map, and the value (Boolean) has no meaning.
    */
   private final ObjectOpenHashMap<Address, Boolean> blockedAddressSet =
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 62289e5..97561dd 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -20,7 +20,7 @@
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.di.Inject;
 import org.apache.flink.statefun.flink.core.di.Label;
 import org.apache.flink.statefun.flink.core.message.Message;
@@ -29,7 +29,7 @@
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 
-final class ReusableContext implements ApplyingContext, AsyncWaiter {
+final class ReusableContext implements ApplyingContext, InternalContext {
   private final Partition thisPartition;
   private final LocalSink localSink;
   private final RemoteSink remoteSink;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 4defa30..bac18c4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -26,7 +26,7 @@
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
@@ -111,7 +111,7 @@
       // we need to signal to the runtime that we are unable to process any new input
       // and we must wait for our in flight asynchronous operation to complete before
       // we are able to process more input.
-      ((AsyncWaiter) context).awaitAsyncOperationComplete();
+      ((InternalContext) context).awaitAsyncOperationComplete();
     }
   }
 
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index bdb10a9..339cd91 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -36,7 +36,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import org.apache.flink.statefun.flink.core.TestUtils;
-import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
 import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
@@ -49,7 +49,6 @@
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.AsyncOperationResult.Status;
-import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.junit.Test;
@@ -249,7 +248,7 @@
     }
   }
 
-  private static final class FakeContext implements Context, AsyncWaiter {
+  private static final class FakeContext implements InternalContext {
 
     Address caller;
     boolean needsWaiting;