[FLINK-19296][http] Pass a shutdown flag to RetryingCallback
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 23001ee..e1ef27e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -39,6 +39,8 @@
/** lazily initialized by {code buildHttpClient} */
@Nullable private OkHttpClient sharedClient;
+ private volatile boolean shutdown;
+
public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
this.supportedTypes = supportedTypes;
}
@@ -84,11 +86,12 @@
} else {
url = HttpUrl.get(spec.endpoint());
}
- return new HttpRequestReplyClient(url, clientBuilder.build());
+ return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
}
@Override
public void shutdown() {
+ shutdown = true;
OkHttpUtils.closeSilently(sharedClient);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index d9e5cb3..efff157 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -24,6 +24,7 @@
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
import okhttp3.Call;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
@@ -43,10 +44,12 @@
private final HttpUrl url;
private final OkHttpClient client;
+ private final BooleanSupplier isShutdown;
- HttpRequestReplyClient(HttpUrl url, OkHttpClient client) {
+ HttpRequestReplyClient(HttpUrl url, OkHttpClient client, BooleanSupplier isShutdown) {
this.url = Objects.requireNonNull(url);
this.client = Objects.requireNonNull(client);
+ this.isShutdown = Objects.requireNonNull(isShutdown);
}
@Override
@@ -61,7 +64,8 @@
.build();
Call newCall = client.newCall(request);
- RetryingCallback callback = new RetryingCallback(requestSummary, metrics, newCall.timeout());
+ RetryingCallback callback =
+ new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown);
callback.attachToCall(newCall);
return callback.future().thenApply(HttpRequestReplyClient::parseResponse);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
index 0a6f834..98184d5 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
@@ -22,9 +22,11 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
@@ -49,15 +51,20 @@
private final BoundedExponentialBackoff backoff;
private final ToFunctionRequestSummary requestSummary;
private final RemoteInvocationMetrics metrics;
+ private final BooleanSupplier isShutdown;
private long requestStarted;
RetryingCallback(
- ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, Timeout timeout) {
+ ToFunctionRequestSummary requestSummary,
+ RemoteInvocationMetrics metrics,
+ Timeout timeout,
+ BooleanSupplier isShutdown) {
this.resultFuture = new CompletableFuture<>();
this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
this.requestSummary = requestSummary;
this.metrics = metrics;
+ this.isShutdown = Objects.requireNonNull(isShutdown);
}
CompletableFuture<Response> future() {