[FLINK-20574] [core] Block first request to mitigate excessive IncompleteInvocationContext roundtrips
For the first request, we block until response is received; for stateful
applications, especially at restore time of a restored execution where
there may be a large backlog of events and checkpointed inflight
requests, this helps mitigate excessive hoards of
IncompleteInvocationContext responses and retry attempt round-trips.
This closes #256.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index b30029f..34a72fe 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -44,6 +44,7 @@
final URI endpointUrl = endpointSpec.urlPathTemplate().apply(functionType);
return new RequestReplyFunction(
+ functionType,
endpointSpec.maxNumBatchRequests(),
requestReplyClientFactory.createTransportClient(
endpointSpec.transportClientProperties(), endpointUrl));
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index b9fdc1a..da3726a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -30,6 +30,7 @@
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.AsyncOperationResult;
import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
@@ -44,13 +45,33 @@
import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
import org.apache.flink.statefun.sdk.state.PersistedValue;
import org.apache.flink.types.Either;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class RequestReplyFunction implements StatefulFunction {
+ public static final Logger LOG = LoggerFactory.getLogger(RequestReplyFunction.class);
+
+ private final FunctionType functionType;
private final RequestReplyClient client;
private final int maxNumBatchRequests;
/**
+ * This flag indicates whether or not at least one request has already been sent to the remote
+ * function. It is toggled by the {@link #sendToFunction(InternalContext, ToFunction)} method upon
+ * sending the first request.
+ *
+ * <p>For the first request, we block until response is received; for stateful applications,
+ * especially at restore time of a restored execution where there may be a large backlog of events
+ * and checkpointed inflight requests, this helps mitigate excessive hoards of
+ * IncompleteInvocationContext responses and retry attempt round-trips.
+ *
+ * <p>After this flag is toggled upon sending the first request, all successive requests will be
+ * performed as usual async operations.
+ */
+ private boolean isFirstRequestSent;
+
+ /**
* A request state keeps tracks of the number of inflight & batched requests.
*
* <p>A tracking state can have one of the following values:
@@ -71,16 +92,23 @@
@Persisted private final PersistedRemoteFunctionValues managedStates;
- public RequestReplyFunction(int maxNumBatchRequests, RequestReplyClient client) {
- this(new PersistedRemoteFunctionValues(), maxNumBatchRequests, client);
+ public RequestReplyFunction(
+ FunctionType functionType, int maxNumBatchRequests, RequestReplyClient client) {
+ this(functionType, new PersistedRemoteFunctionValues(), maxNumBatchRequests, client, false);
}
@VisibleForTesting
RequestReplyFunction(
- PersistedRemoteFunctionValues states, int maxNumBatchRequests, RequestReplyClient client) {
+ FunctionType functionType,
+ PersistedRemoteFunctionValues states,
+ int maxNumBatchRequests,
+ RequestReplyClient client,
+ boolean isFirstRequestSent) {
+ this.functionType = Objects.requireNonNull(functionType);
this.managedStates = Objects.requireNonNull(states);
this.maxNumBatchRequests = maxNumBatchRequests;
this.client = Objects.requireNonNull(client);
+ this.isFirstRequestSent = isFirstRequestSent;
}
@Override
@@ -281,34 +309,62 @@
* Sends a {@link InvocationBatchRequest} to the remote function consisting out of a single
* invocation represented by {@code invocationBuilder}.
*/
- private void sendToFunction(Context context, Invocation.Builder invocationBuilder) {
+ private void sendToFunction(InternalContext context, Invocation.Builder invocationBuilder) {
InvocationBatchRequest.Builder batchBuilder = InvocationBatchRequest.newBuilder();
batchBuilder.addInvocations(invocationBuilder);
sendToFunction(context, batchBuilder);
}
/** Sends a {@link InvocationBatchRequest} to the remote function. */
- private void sendToFunction(Context context, InvocationBatchRequest.Builder batchBuilder) {
+ private void sendToFunction(
+ InternalContext context, InvocationBatchRequest.Builder batchBuilder) {
batchBuilder.setTarget(sdkAddressToPolyglotAddress(context.self()));
managedStates.attachStateValues(batchBuilder);
ToFunction toFunction = ToFunction.newBuilder().setInvocation(batchBuilder).build();
sendToFunction(context, toFunction);
}
- private void sendToFunction(Context context, ToFunction toFunction) {
+ private void sendToFunction(InternalContext context, ToFunction toFunction) {
ToFunctionRequestSummary requestSummary =
new ToFunctionRequestSummary(
context.self(),
toFunction.getSerializedSize(),
toFunction.getInvocation().getStateCount(),
toFunction.getInvocation().getInvocationsCount());
- RemoteInvocationMetrics metrics = ((InternalContext) context).functionTypeMetrics();
+ RemoteInvocationMetrics metrics = context.functionTypeMetrics();
CompletableFuture<FromFunction> responseFuture =
client.call(requestSummary, metrics, toFunction);
- context.registerAsyncOperation(toFunction, responseFuture);
+
+ if (isFirstRequestSent) {
+ context.registerAsyncOperation(toFunction, responseFuture);
+ } else {
+ LOG.info(
+ "Bootstrapping function {}. Blocking processing until first request is completed. Successive requests will be performed asynchronously.",
+ functionType);
+
+ // it is important to toggle the flag *before* handling the response. As a result of handling
+ // the first response, we may send retry requests in the case of an
+ // IncompleteInvocationContext response. For those requests, we already want to handle them as
+ // usual async operations.
+ isFirstRequestSent = true;
+ onAsyncResult(context, joinResponse(responseFuture, toFunction));
+ }
}
private boolean isMaxNumBatchRequestsExceeded(final int currentNumBatchRequests) {
return maxNumBatchRequests > 0 && currentNumBatchRequests >= maxNumBatchRequests;
}
+
+ private AsyncOperationResult<ToFunction, FromFunction> joinResponse(
+ CompletableFuture<FromFunction> responseFuture, ToFunction originalRequest) {
+ FromFunction response;
+ try {
+ response = responseFuture.join();
+ } catch (Exception e) {
+ return new AsyncOperationResult<>(
+ originalRequest, AsyncOperationResult.Status.FAILURE, null, e.getCause());
+ }
+ return new AsyncOperationResult<>(
+ originalRequest, AsyncOperationResult.Status.SUCCESS, response, null);
+ }
}
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index 5b7c536..7db1bd9 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -68,7 +68,7 @@
private final RequestReplyFunction functionUnderTest =
new RequestReplyFunction(
- testInitialRegisteredState("session", "com.foo.bar/myType"), 10, client);
+ FN_TYPE, testInitialRegisteredState("session", "com.foo.bar/myType"), 10, client, true);
@Test
public void example() {
@@ -119,7 +119,7 @@
@Test
public void reachingABatchLimitTriggersBackpressure() {
- RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
+ RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client);
// send one message
functionUnderTest.invoke(context, TypedValue.getDefaultInstance());
@@ -135,7 +135,7 @@
@Test
public void returnedMessageReleaseBackpressure() {
- RequestReplyFunction functionUnderTest = new RequestReplyFunction(2, client);
+ RequestReplyFunction functionUnderTest = new RequestReplyFunction(FN_TYPE, 2, client);
// the following invocations should cause backpressure
functionUnderTest.invoke(context, TypedValue.getDefaultInstance());
@@ -295,7 +295,8 @@
functionUnderTest.invoke(context, argument);
ToFunction originalRequest = client.wasSentToFunction;
- RequestReplyFunction restoredFunction = new RequestReplyFunction(2, client);
+ RequestReplyFunction restoredFunction =
+ new RequestReplyFunction(FN_TYPE, new PersistedRemoteFunctionValues(), 2, client, true);
restoredFunction.invoke(context, unknownAsyncOperation(originalRequest));
// retry batch after a restore on an unknown async operation should start with empty state specs