[hotfix] Recreate shared OkHttpClient after shutdown in DefaultHttpRequestReplyClientFactory
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
index 2cfadbb..5387c65 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
@@ -40,9 +40,7 @@
private static final ObjectMapper OBJ_MAPPER = StateFunObjectMapper.create();
/** lazily initialized by {@link #createTransportClient} */
- @Nullable private OkHttpClient sharedClient;
-
- private volatile boolean shutdown;
+ @Nullable private volatile OkHttpClient sharedClient;
private DefaultHttpRequestReplyClientFactory() {}
@@ -59,17 +57,18 @@
@Override
public void cleanup() {
- if (!shutdown) {
- shutdown = true;
- OkHttpUtils.closeSilently(sharedClient);
- }
+ final OkHttpClient sharedClient = this.sharedClient;
+ this.sharedClient = null;
+ OkHttpUtils.closeSilently(sharedClient);
}
private DefaultHttpRequestReplyClient createClient(
ObjectNode transportProperties, URI endpointUrl) {
try (SetContextClassLoader ignored = new SetContextClassLoader(this)) {
+ OkHttpClient sharedClient = this.sharedClient;
if (sharedClient == null) {
sharedClient = OkHttpUtils.newClient();
+ this.sharedClient = sharedClient;
}
final OkHttpClient.Builder clientBuilder = sharedClient.newBuilder();
@@ -97,10 +96,15 @@
url = HttpUrl.get(endpointUrl);
}
- return new DefaultHttpRequestReplyClient(url, clientBuilder.build(), () -> shutdown);
+ return new DefaultHttpRequestReplyClient(
+ url, clientBuilder.build(), () -> isShutdown(this.sharedClient));
}
}
+ private boolean isShutdown(OkHttpClient previousClient) {
+ return DefaultHttpRequestReplyClientFactory.this.sharedClient != previousClient;
+ }
+
private static DefaultHttpRequestReplyClientSpec parseTransportProperties(
ObjectNode transportClientProperties) {
try {