[FLINK-19296][http] Pass a shutdown flag to RetryingCallback
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
index 23001ee..e1ef27e 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpFunctionProvider.java
@@ -39,6 +39,8 @@
   /** lazily initialized by {code buildHttpClient} */
   @Nullable private OkHttpClient sharedClient;
 
+  private volatile boolean shutdown;
+
   public HttpFunctionProvider(Map<FunctionType, HttpFunctionSpec> supportedTypes) {
     this.supportedTypes = supportedTypes;
   }
@@ -84,11 +86,12 @@
     } else {
       url = HttpUrl.get(spec.endpoint());
     }
-    return new HttpRequestReplyClient(url, clientBuilder.build());
+    return new HttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
   }
 
   @Override
   public void shutdown() {
+    shutdown = true;
     OkHttpUtils.closeSilently(sharedClient);
   }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index d9e5cb3..efff157 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -24,6 +24,7 @@
 import java.io.InputStream;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
 import okhttp3.Call;
 import okhttp3.HttpUrl;
 import okhttp3.MediaType;
@@ -43,10 +44,12 @@
 
   private final HttpUrl url;
   private final OkHttpClient client;
+  private final BooleanSupplier isShutdown;
 
-  HttpRequestReplyClient(HttpUrl url, OkHttpClient client) {
+  HttpRequestReplyClient(HttpUrl url, OkHttpClient client, BooleanSupplier isShutdown) {
     this.url = Objects.requireNonNull(url);
     this.client = Objects.requireNonNull(client);
+    this.isShutdown = Objects.requireNonNull(isShutdown);
   }
 
   @Override
@@ -61,7 +64,8 @@
             .build();
 
     Call newCall = client.newCall(request);
-    RetryingCallback callback = new RetryingCallback(requestSummary, metrics, newCall.timeout());
+    RetryingCallback callback =
+        new RetryingCallback(requestSummary, metrics, newCall.timeout(), isShutdown);
     callback.attachToCall(newCall);
     return callback.future().thenApply(HttpRequestReplyClient::parseResponse);
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
index 0a6f834..98184d5 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/RetryingCallback.java
@@ -22,9 +22,11 @@
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import okhttp3.Call;
 import okhttp3.Callback;
 import okhttp3.Response;
@@ -49,15 +51,20 @@
   private final BoundedExponentialBackoff backoff;
   private final ToFunctionRequestSummary requestSummary;
   private final RemoteInvocationMetrics metrics;
+  private final BooleanSupplier isShutdown;
 
   private long requestStarted;
 
   RetryingCallback(
-      ToFunctionRequestSummary requestSummary, RemoteInvocationMetrics metrics, Timeout timeout) {
+      ToFunctionRequestSummary requestSummary,
+      RemoteInvocationMetrics metrics,
+      Timeout timeout,
+      BooleanSupplier isShutdown) {
     this.resultFuture = new CompletableFuture<>();
     this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
     this.requestSummary = requestSummary;
     this.metrics = metrics;
+    this.isShutdown = Objects.requireNonNull(isShutdown);
   }
 
   CompletableFuture<Response> future() {