[FLINK-20574] [core] Block first request to mitigate excessive IncompleteInvocationContext roundtrips

For the first request, we block until response is received; for stateful
applications, especially at restore time of a restored execution where
there may be a large backlog of events and checkpointed inflight
requests, this helps mitigate excessive hoards of
IncompleteInvocationContext responses and retry attempt round-trips.

This closes #256.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index b30029f..34a72fe 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -44,6 +44,7 @@
     final URI endpointUrl = endpointSpec.urlPathTemplate().apply(functionType);
 
     return new RequestReplyFunction(
+        functionType,
         endpointSpec.maxNumBatchRequests(),
         requestReplyClientFactory.createTransportClient(
             endpointSpec.transportClientProperties(), endpointUrl));
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index b9fdc1a..da3726a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -30,6 +30,7 @@
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -44,13 +45,33 @@
 import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 import org.apache.flink.types.Either;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class RequestReplyFunction implements StatefulFunction {
 
+  public static final Logger LOG = LoggerFactory.getLogger(RequestReplyFunction.class);
+
+  private final FunctionType functionType;
   private final RequestReplyClient client;
   private final int maxNumBatchRequests;
 
   /**
+   * This flag indicates whether or not at least one request has already been sent to the remote
+   * function. It is toggled by the {@link #sendToFunction(InternalContext, ToFunction)} method upon
+   * sending the first request.
+   *
+   * <p>For the first request, we block until response is received; for stateful applications,
+   * especially at restore time of a restored execution where there may be a large backlog of events
+   * and checkpointed inflight requests, this helps mitigate excessive hoards of
+   * IncompleteInvocationContext responses and retry attempt round-trips.
+   *
+   * <p>After this flag is toggled upon sending the first request, all successive requests will be
+   * performed as usual async operations.
+   */
+  private boolean isFirstRequestSent;
+
+  /**
    * A request state keeps tracks of the number of inflight & batched requests.
    *
    * <p>A tracking state can have one of the following values:
@@ -71,16 +92,23 @@
 
   @Persisted private final PersistedRemoteFunctionValues managedStates;
 
-  public RequestReplyFunction(int maxNumBatchRequests, RequestReplyClient client) {
-    this(new PersistedRemoteFunctionValues(), maxNumBatchRequests, client);
+  public RequestReplyFunction(
+      FunctionType functionType, int maxNumBatchRequests, RequestReplyClient client) {
+    this(functionType, new PersistedRemoteFunctionValues(), maxNumBatchRequests, client, false);
   }
 
   @VisibleForTesting
   RequestReplyFunction(
-      PersistedRemoteFunctionValues states, int maxNumBatchRequests, RequestReplyClient client) {
+      FunctionType functionType,
+      PersistedRemoteFunctionValues states,
+      int maxNumBatchRequests,
+      RequestReplyClient client,
+      boolean isFirstRequestSent) {
+    this.functionType = Objects.requireNonNull(functionType);
     this.managedStates = Objects.requireNonNull(states);
     this.maxNumBatchRequests = maxNumBatchRequests;
     this.client = Objects.requireNonNull(client);
+    this.isFirstRequestSent = isFirstRequestSent;
   }
 
   @Override
@@ -281,34 +309,62 @@
    * Sends a {@link InvocationBatchRequest} to the remote function consisting out of a single
    * invocation represented by {@code invocationBuilder}.
    */
-  private void sendToFunction(Context context, Invocation.Builder invocationBuilder) {
+  private void sendToFunction(InternalContext context, Invocation.Builder invocationBuilder) {
     InvocationBatchRequest.Builder batchBuilder = InvocationBatchRequest.newBuilder();
     batchBuilder.addInvocations(invocationBuilder);
     sendToFunction(context, batchBuilder);
   }
 
   /** Sends a {@link InvocationBatchRequest} to the remote function. */
-  private void sendToFunction(Context context, InvocationBatchRequest.Builder batchBuilder) {
+  private void sendToFunction(
+      InternalContext context, InvocationBatchRequest.Builder batchBuilder) {
     batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self()));
     managedStates.attachStateValues(batchBuilder);
     ToFunction toFunction = ToFunction.newBuilder().setInvocation(batchBuilder).build();
     sendToFunction(context, toFunction);
   }
 
-  private void sendToFunction(Context context, ToFunction toFunction) {
+  private void sendToFunction(InternalContext context, ToFunction toFunction) {
     ToFunctionRequestSummary requestSummary =
         new ToFunctionRequestSummary(
             context.self(),
             toFunction.getSerializedSize(),
             toFunction.getInvocation().getStateCount(),
             toFunction.getInvocation().getInvocationsCount());
-    RemoteInvocationMetrics metrics = ((InternalContext) context).functionTypeMetrics();
+    RemoteInvocationMetrics metrics = context.functionTypeMetrics();
     CompletableFuture<FromFunction> responseFuture =
         client.call(requestSummary, metrics, toFunction);
-    context.registerAsyncOperation(toFunction, responseFuture);
+
+    if (isFirstRequestSent) {
+      context.registerAsyncOperation(toFunction, responseFuture);
+    } else {
+      LOG.info(
+          "Bootstrapping function {}. Blocking processing until first request is completed. Successive requests will be performed asynchronously.",
+          functionType);
+
+      // it is important to toggle the flag *before* handling the response. As a result of handling
+      // the first response, we may send retry requests in the case of an
+      // IncompleteInvocationContext response. For those requests, we already want to handle them as
+      // usual async operations.
+      isFirstRequestSent = true;
+      onAsyncResult(context, joinResponse(responseFuture, toFunction));
+    }
   }
 
   private boolean isMaxNumBatchRequestsExceeded(final int currentNumBatchRequests) {
     return maxNumBatchRequests > 0 && currentNumBatchRequests >= maxNumBatchRequests;
   }
+
+  private AsyncOperationResult<ToFunction, FromFunction> joinResponse(
+      CompletableFuture<FromFunction> responseFuture, ToFunction originalRequest) {
+    FromFunction response;
+    try {
+      response = responseFuture.join();
+    } catch (Exception e) {
+      return new AsyncOperationResult<>(
+          originalRequest, AsyncOperationResult.Status.FAILURE, null, e.getCause());
+    }
+    return new AsyncOperationResult<>(
+        originalRequest, AsyncOperationResult.Status.SUCCESS, response, null);
+  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 5b7c536..7db1bd9 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -68,7 +68,7 @@
 
   private final RequestReplyFunction functionUnderTest =
       new RequestReplyFunction(
-          testInitialRegisteredState("session", "com.foo.bar/myType"), 10, client);
+          FN_TYPE, testInitialRegisteredState("session", "com.foo.bar/myType"), 10, client, true);
 
   @Test
   public void example() {
@@ -119,7 +119,7 @@
 
   @Test
   public void reachingABatchLimitTriggersBackpressure() {
-    RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
+    RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client);
 
     // send one message
     functionUnderTest.invoke(context, TypedValue.getDefaultInstance());
@@ -135,7 +135,7 @@
 
   @Test
   public void returnedMessageReleaseBackpressure() {
-    RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
+    RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client);
 
     // the following invocations should cause backpressure
     functionUnderTest.invoke(context, TypedValue.getDefaultInstance());
@@ -295,7 +295,8 @@
     functionUnderTest.invoke(context, argument);
     ToFunction originalRequest = client.wasSentToFunction;
 
-    RequestReplyFunction restoredFunction = new RequestReplyFunction(2, client);
+    RequestReplyFunction restoredFunction =
+        new RequestReplyFunction(FN_TYPE, new PersistedRemoteFunctionValues(), 2, client, true);
     restoredFunction.invoke(context, unknownAsyncOperation(originalRequest));
 
     // retry batch after a restore on an unknown async operation should start with empty state specs