[FLINK-19130] [core] Rename AsyncWaiter to InternalContext
As a preparation to expose FunctionTypeMetrics to functions for internal
axcess only, AsyncWaiter is renamed to a more general-purpose name
"InternalContext" so that it makes sense to add more internal-only
context methods there.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
similarity index 94%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
index ddbe247..a40d7c8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/AsyncWaiter.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/InternalContext.java
@@ -19,9 +19,10 @@
package org.apache.flink.statefun.flink.core.backpressure;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.statefun.sdk.Context;
@Internal
-public interface AsyncWaiter {
+public interface InternalContext extends Context {
/**
* Signals the runtime to stop invoking the currently executing function with new input until at
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
index 91b642b..e6f6ade 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/backpressure/ThresholdBackPressureValve.java
@@ -43,7 +43,7 @@
/**
* a set of address that had explicitly requested to stop processing any new inputs (via {@link
- * AsyncWaiter#awaitAsyncOperationComplete()}. Note that this is a set implemented on top of a
+ * InternalContext#awaitAsyncOperationComplete()}. Note that this is a set implemented on top of a
* map, and the value (Boolean) has no meaning.
*/
private final ObjectOpenHashMap<Address, Boolean> blockedAddressSet =
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
index 62289e5..97561dd 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/ReusableContext.java
@@ -20,7 +20,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
@@ -29,7 +29,7 @@
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-final class ReusableContext implements ApplyingContext, AsyncWaiter {
+final class ReusableContext implements ApplyingContext, InternalContext {
private final Partition thisPartition;
private final LocalSink localSink;
private final RemoteSink remoteSink;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index 4defa30..bac18c4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -26,7 +26,7 @@
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
@@ -111,7 +111,7 @@
// we need to signal to the runtime that we are unable to process any new input
// and we must wait for our in flight asynchronous operation to complete before
// we are able to process more input.
- ((AsyncWaiter) context).awaitAsyncOperationComplete();
+ ((InternalContext) context).awaitAsyncOperationComplete();
}
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index bdb10a9..339cd91 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -36,7 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.statefun.flink.core.TestUtils;
-import org.apache.flink.statefun.flink.core.backpressure.AsyncWaiter;
+import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.httpfn.StateSpec;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
@@ -49,7 +49,6 @@
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.AsyncOperationResult.Status;
-import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.junit.Test;
@@ -249,7 +248,7 @@
}
}
- private static final class FakeContext implements Context, AsyncWaiter {
+ private static final class FakeContext implements InternalContext {
Address caller;
boolean needsWaiting;