Wait for the async broker port listener close operations to complete at shutdown (#9308)

* Wait for closing of ports at shutdown

- add BrokerService.closeAsync and PulsarService.closeAsync
  so that shutdown can handle asynchronous closing operations

* Don't throw checked exceptions from methods returning CompletableFuture

- address review comment

* Set interrupt status when catching InterruptedException

* Catch RuntimeException instead of Exception

* Revert "Catch RuntimeException instead of Exception"

Catching RuntimeExceptions isn't sufficient since checked exceptions
can be thrown in the JVM with solutions like "sneaky throws"

This reverts commit 01b5f6f9ff574e6da2951ec471a532de3dbc67cd.

* Address review comments about exception handling and AtomicReference usage

* Fix checkstyle
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
index c6dec9a..319628d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java
@@ -59,9 +59,14 @@
 
             executor.execute(() -> {
                 try {
-                    service.close();
-                    future.complete(null);
-                } catch (PulsarServerException e) {
+                    service.closeAsync().whenComplete((result, throwable) -> {
+                        if (throwable != null) {
+                            future.completeExceptionally(throwable);
+                        } else {
+                            future.complete(result);
+                        }
+                    });
+                } catch (Exception e) {
                     future.completeExceptionally(e);
                 }
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 1255139..b32344b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -32,6 +32,7 @@
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -41,6 +42,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -238,6 +240,7 @@
 
     private final ReentrantLock mutex = new ReentrantLock();
     private final Condition isClosedCondition = mutex.newCondition();
+    private volatile CompletableFuture<Void> closeFuture;
     // key is listener name , value is pulsar address and pulsar ssl address
     private Map<String, AdvertisedListener> advertisedListeners;
 
@@ -297,16 +300,29 @@
                         .build());
     }
 
+    @Override
+    public void close() throws PulsarServerException {
+        try {
+            closeAsync().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof PulsarServerException) {
+                throw (PulsarServerException) e.getCause();
+            } else {
+                throw new PulsarServerException(e.getCause());
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     /**
      * Close the current pulsar service. All resources are released.
      */
-    @Override
-    public void close() throws PulsarServerException {
+    public CompletableFuture<Void> closeAsync() {
         mutex.lock();
-
         try {
-            if (state == State.Closed) {
-                return;
+            if (closeFuture != null) {
+                return closeFuture;
             }
 
             // close the service in reverse order v.s. in which they are started
@@ -324,8 +340,9 @@
                 this.webSocketService.close();
             }
 
+            List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
             if (this.brokerService != null) {
-                this.brokerService.close();
+                asyncCloseFutures.add(this.brokerService.closeAsync());
                 this.brokerService = null;
             }
 
@@ -430,15 +447,21 @@
             state = State.Closed;
             isClosedCondition.signalAll();
 
+            CompletableFuture<Void> shutdownFuture =
+                    CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]));
+            closeFuture = shutdownFuture;
+            return shutdownFuture;
         } catch (Exception e) {
+            PulsarServerException pse;
             if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) {
-                throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
+                pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e));
             } else if (e.getCause() instanceof CompletionException
                     && e.getCause().getCause() instanceof MetadataStoreException) {
-                throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
+                pse = new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause()));
             } else {
-                throw new PulsarServerException(e);
+                pse = new PulsarServerException(e);
             }
+            return FutureUtil.failedFuture(pse);
         } finally {
             mutex.unlock();
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0c556f3..e98978d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -45,6 +45,7 @@
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,6 +56,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -160,6 +162,7 @@
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
@@ -640,75 +643,103 @@
         }
     }
 
-    @Override
     public void close() throws IOException {
-        log.info("Shutting down Pulsar Broker service");
-
-        if (pulsar.getConfigurationCache() != null) {
-            pulsar.getConfigurationCache().policiesCache().unregisterListener(this);
-        }
-
-        // unloads all namespaces gracefully without disrupting mutually
-        unloadNamespaceBundlesGracefully();
-
-        // close replication clients
-        replicationClients.forEach((cluster, client) -> {
-            try {
-                client.shutdown();
-            } catch (PulsarClientException e) {
-                log.warn("Error shutting down repl client for cluster {}", cluster, e);
+        try {
+            closeAsync().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new PulsarServerException(e.getCause());
             }
-        });
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
 
-        // close replication admins
-        clusterAdmins.forEach((cluster, admin) -> {
-            try {
-                admin.close();
-            } catch (Exception e) {
-                log.warn("Error shutting down repl admin for cluster {}", cluster, e);
+    public CompletableFuture<Void> closeAsync() {
+        try {
+            log.info("Shutting down Pulsar Broker service");
+
+            if (pulsar.getConfigurationCache() != null) {
+                pulsar.getConfigurationCache().policiesCache().unregisterListener(this);
             }
-        });
 
-        if (listenChannel != null) {
-            listenChannel.close();
-        }
+            // unloads all namespaces gracefully without disrupting mutually
+            unloadNamespaceBundlesGracefully();
 
-        if (listenChannelTls != null) {
-            listenChannelTls.close();
-        }
+            // close replication clients
+            replicationClients.forEach((cluster, client) -> {
+                try {
+                    client.shutdown();
+                } catch (PulsarClientException e) {
+                    log.warn("Error shutting down repl client for cluster {}", cluster, e);
+                }
+            });
 
-        acceptorGroup.shutdownGracefully();
-        workerGroup.shutdownGracefully();
+            // close replication admins
+            clusterAdmins.forEach((cluster, admin) -> {
+                try {
+                    admin.close();
+                } catch (Exception e) {
+                    log.warn("Error shutting down repl admin for cluster {}", cluster, e);
+                }
+            });
 
-        if (interceptor != null) {
-            interceptor.close();
-            interceptor = null;
-        }
+            List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
 
-        statsUpdater.shutdown();
-        inactivityMonitor.shutdown();
-        messageExpiryMonitor.shutdown();
-        compactionMonitor.shutdown();
-        messagePublishBufferMonitor.shutdown();
-        consumedLedgersMonitor.shutdown();
-        backlogQuotaChecker.shutdown();
-        authenticationService.close();
-        pulsarStats.close();
-        ClientCnxnAspect.removeListener(zkStatsListener);
-        ClientCnxnAspect.registerExecutor(null);
-        topicOrderedExecutor.shutdown();
-        delayedDeliveryTrackerFactory.close();
-        if (topicPublishRateLimiterMonitor != null) {
-            topicPublishRateLimiterMonitor.shutdown();
-        }
-        if (brokerPublishRateLimiterMonitor != null) {
-            brokerPublishRateLimiterMonitor.shutdown();
-        }
-        if (deduplicationSnapshotMonitor != null) {
-            deduplicationSnapshotMonitor.shutdown();
-        }
+            if (listenChannel != null) {
+                asyncCloseFutures.add(closeChannel(listenChannel));
+            }
 
-        log.info("Broker service completely shut down");
+            if (listenChannelTls != null) {
+                asyncCloseFutures.add(closeChannel(listenChannelTls));
+            }
+
+            acceptorGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+
+            if (interceptor != null) {
+                interceptor.close();
+                interceptor = null;
+            }
+
+            statsUpdater.shutdown();
+            inactivityMonitor.shutdown();
+            messageExpiryMonitor.shutdown();
+            compactionMonitor.shutdown();
+            messagePublishBufferMonitor.shutdown();
+            consumedLedgersMonitor.shutdown();
+            backlogQuotaChecker.shutdown();
+            authenticationService.close();
+            pulsarStats.close();
+            ClientCnxnAspect.removeListener(zkStatsListener);
+            ClientCnxnAspect.registerExecutor(null);
+            topicOrderedExecutor.shutdown();
+            delayedDeliveryTrackerFactory.close();
+            if (topicPublishRateLimiterMonitor != null) {
+                topicPublishRateLimiterMonitor.shutdown();
+            }
+            if (brokerPublishRateLimiterMonitor != null) {
+                brokerPublishRateLimiterMonitor.shutdown();
+            }
+            if (deduplicationSnapshotMonitor != null) {
+                deduplicationSnapshotMonitor.shutdown();
+            }
+
+            CompletableFuture<Void> shutdownFuture =
+                    CompletableFuture.allOf(asyncCloseFutures.toArray(new CompletableFuture[0]))
+                            .thenAccept(__ -> log.info("Broker service completely shut down"));
+            return shutdownFuture;
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
+    }
+
+    private CompletableFuture<Void> closeChannel(Channel channel) {
+        return ChannelFutures.toCompletableFuture(channel.close())
+                // convert to CompletableFuture<Void>
+                .thenAccept(__ -> {});
     }
 
     /**