Wait for the async broker port listener close operations to complete at shutdown (#9308)
* Wait for closing of ports at shutdown
- add BrokerService.closeAsync and PulsarService.closeAsync
so that shutdown can handle asynchronous closing operations
* Don't throw checked exceptions from methods returning CompletableFuture
- address review comment
* Set interrupt status when catching InterruptedException
* Catch RuntimeException instead of Exception
* Revert "Catch RuntimeException instead of Exception"
Catching RuntimeExceptions isn't sufficient since checked exceptions
can be thrown in the JVM with solutions like "sneaky throws"
This reverts commit 01b5f6f9ff574e6da2951ec471a532de3dbc67cd.
* Address review comments about exception handling and AtomicReference usage
* Fix checkstyle
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index c6dec9a..319628d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -59,9 +59,14 @@
executor.execute(() -> {
try {
- service.close();
- future.complete(null);
- } catch (PulsarServerException e) {
+ service.closeAsync().whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ } else {
+ future.complete(result);
+ }
+ });
+ } catch (Exception e) {
future.completeExceptionally(e);
}
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1255139..b32344b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -32,6 +32,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -41,6 +42,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -238,6 +240,7 @@
private final ReentrantLock mutex = new ReentrantLock();
private final Condition isClosedCondition = mutex.newCondition();
+ private volatile CompletableFuture<Void> closeFuture;
// key is listener name , value is pulsar address and pulsar ssl address
private Map<String, AdvertisedListener> advertisedListeners;
@@ -297,16 +300,29 @@
.build());
}
+ @Override
+ public void close() throws PulsarServerException {
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof PulsarServerException) {
+ throw (PulsarServerException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* Close the current pulsar service. All resources are released.
*/
- @Override
- public void close() throws PulsarServerException {
+ public CompletableFuture<Void> closeAsync() {
mutex.lock();
-
try {
- if (state == State.Closed) {
- return;
+ if (closeFuture != null) {
+ return closeFuture;
}
// close the service in reverse order v.s. in which they are started
@@ -324,8 +340,9 @@
this.webSocketService.close();
}
+ List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
if (this.brokerService != null) {
- this.brokerService.close();
+ asyncCloseFutures.add(this.brokerService.closeAsync());
this.brokerService = null;
}
@@ -430,15 +447,21 @@
state = State.Closed;
isClosedCondition.signalAll();
+ CompletableFuture<Void> shutdownFuture =
+ CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]));
+ closeFuture = shutdownFuture;
+ return shutdownFuture;
} catch (Exception e) {
+ PulsarServerException pse;
if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
- throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
+ pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
} else if (e.getCause() instanceof CompletionException
&& e.getCause().getCause() instanceof MetadataStoreException) {
- throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
+ pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
} else {
- throw new PulsarServerException(e);
+ pse = new PulsarServerException(e);
}
+ return FutureUtil.failedFuture(pse);
} finally {
mutex.unlock();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0c556f3..e98978d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -45,6 +45,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -55,6 +56,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -160,6 +162,7 @@
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
@@ -640,75 +643,103 @@
}
}
- @Override
public void close() throws IOException {
- log.info("Shutting down Pulsar Broker service");
-
- if (pulsar.getConfigurationCache() != null) {
- pulsar.getConfigurationCache().policiesCache().unregisterListener(this);
- }
-
- // unloads all namespaces gracefully without disrupting mutually
- unloadNamespaceBundlesGracefully();
-
- // close replication clients
- replicationClients.forEach((cluster, client) -> {
- try {
- client.shutdown();
- } catch (PulsarClientException e) {
- log.warn("Error shutting down repl client for cluster {}", cluster, e);
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new PulsarServerException(e.getCause());
}
- });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
- // close replication admins
- clusterAdmins.forEach((cluster, admin) -> {
- try {
- admin.close();
- } catch (Exception e) {
- log.warn("Error shutting down repl admin for cluster {}", cluster, e);
+ public CompletableFuture<Void> closeAsync() {
+ try {
+ log.info("Shutting down Pulsar Broker service");
+
+ if (pulsar.getConfigurationCache() != null) {
+ pulsar.getConfigurationCache().policiesCache().unregisterListener(this);
}
- });
- if (listenChannel != null) {
- listenChannel.close();
- }
+ // unloads all namespaces gracefully without disrupting mutually
+ unloadNamespaceBundlesGracefully();
- if (listenChannelTls != null) {
- listenChannelTls.close();
- }
+ // close replication clients
+ replicationClients.forEach((cluster, client) -> {
+ try {
+ client.shutdown();
+ } catch (PulsarClientException e) {
+ log.warn("Error shutting down repl client for cluster {}", cluster, e);
+ }
+ });
- acceptorGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
+ // close replication admins
+ clusterAdmins.forEach((cluster, admin) -> {
+ try {
+ admin.close();
+ } catch (Exception e) {
+ log.warn("Error shutting down repl admin for cluster {}", cluster, e);
+ }
+ });
- if (interceptor != null) {
- interceptor.close();
- interceptor = null;
- }
+ List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
- statsUpdater.shutdown();
- inactivityMonitor.shutdown();
- messageExpiryMonitor.shutdown();
- compactionMonitor.shutdown();
- messagePublishBufferMonitor.shutdown();
- consumedLedgersMonitor.shutdown();
- backlogQuotaChecker.shutdown();
- authenticationService.close();
- pulsarStats.close();
- ClientCnxnAspect.removeListener(zkStatsListener);
- ClientCnxnAspect.registerExecutor(null);
- topicOrderedExecutor.shutdown();
- delayedDeliveryTrackerFactory.close();
- if (topicPublishRateLimiterMonitor != null) {
- topicPublishRateLimiterMonitor.shutdown();
- }
- if (brokerPublishRateLimiterMonitor != null) {
- brokerPublishRateLimiterMonitor.shutdown();
- }
- if (deduplicationSnapshotMonitor != null) {
- deduplicationSnapshotMonitor.shutdown();
- }
+ if (listenChannel != null) {
+ asyncCloseFutures.add(closeChannel(listenChannel));
+ }
- log.info("Broker service completely shut down");
+ if (listenChannelTls != null) {
+ asyncCloseFutures.add(closeChannel(listenChannelTls));
+ }
+
+ acceptorGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+
+ if (interceptor != null) {
+ interceptor.close();
+ interceptor = null;
+ }
+
+ statsUpdater.shutdown();
+ inactivityMonitor.shutdown();
+ messageExpiryMonitor.shutdown();
+ compactionMonitor.shutdown();
+ messagePublishBufferMonitor.shutdown();
+ consumedLedgersMonitor.shutdown();
+ backlogQuotaChecker.shutdown();
+ authenticationService.close();
+ pulsarStats.close();
+ ClientCnxnAspect.removeListener(zkStatsListener);
+ ClientCnxnAspect.registerExecutor(null);
+ topicOrderedExecutor.shutdown();
+ delayedDeliveryTrackerFactory.close();
+ if (topicPublishRateLimiterMonitor != null) {
+ topicPublishRateLimiterMonitor.shutdown();
+ }
+ if (brokerPublishRateLimiterMonitor != null) {
+ brokerPublishRateLimiterMonitor.shutdown();
+ }
+ if (deduplicationSnapshotMonitor != null) {
+ deduplicationSnapshotMonitor.shutdown();
+ }
+
+ CompletableFuture<Void> shutdownFuture =
+ CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]))
+ .thenAccept(__ -> log.info("Broker service completely shut down"));
+ return shutdownFuture;
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
+ private CompletableFuture<Void> closeChannel(Channel channel) {
+ return ChannelFutures.toCompletableFuture(channel.close())
+ // convert to CompletableFuture<Void>
+ .thenAccept(__ -> {});
}
/**