Delete associated ledgers before deleting cluster metadata (#8244)

### Motivation

#8169 introduced a command tool to delete a cluster's metadata from ZK. This PR intends to delete the cluster's ledgers from BK.

### Modifications

- Retrieve ledger ids from related ZK nodes
- Add an optional argument to specify BK metadata service URI, then delete these ledgers if it's specified
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 3219d3a..55729a5 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -50,6 +50,18 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
new file mode 100644
index 0000000..96e9081
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.PulsarClusterMetadataTeardown;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+@Slf4j
+public class ClusterMetadataTearDownTest {
+
+    private final PulsarClusterSpec spec = PulsarClusterSpec.builder()
+            .clusterName("ClusterMetadataTearDownTest-" + UUID.randomUUID().toString().substring(0, 8))
+            .numProxies(0)
+            .numFunctionWorkers(0)
+            .enablePrestoWorker(false)
+            .build();
+
+    private final PulsarCluster pulsarCluster = PulsarCluster.forSpec(spec);
+
+    private ZooKeeper localZk;
+    private ZooKeeper configStoreZk;
+
+    private String metadataServiceUri;
+    private MetadataBookieDriver driver;
+    private LedgerManager ledgerManager;
+
+    private PulsarClient client;
+    private PulsarAdmin admin;
+
+    @BeforeSuite
+    public void setupCluster() throws Exception {
+        pulsarCluster.start();
+        metadataServiceUri = "zk+null://" + pulsarCluster.getZKConnString() + "/ledgers";
+
+        final int sessionTimeoutMs = 30000;
+        localZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getZKConnString(), sessionTimeoutMs);
+        configStoreZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getCSConnString(), sessionTimeoutMs);
+
+        driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUri));
+        driver.initialize(new ServerConfiguration().setMetadataServiceUri(metadataServiceUri), () -> {}, NullStatsLogger.INSTANCE);
+        ledgerManager = driver.getLedgerManagerFactory().newLedgerManager();
+
+        client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+    }
+
+    @AfterSuite
+    public void tearDownCluster() {
+        try {
+            ledgerManager.close();
+        } catch (IOException e) {
+            log.warn("Failed to close ledger manager: ", e);
+        }
+        driver.close();
+        try {
+            configStoreZk.close();
+        } catch (InterruptedException ignored) {
+        }
+        try {
+            localZk.close();
+        } catch (InterruptedException ignored) {
+        }
+        pulsarCluster.stop();
+    }
+
+    @Test
+    public void testDeleteCluster() throws Exception {
+        assertEquals(getNumOfLedgers(), 0);
+        final String tenant = "my-tenant";
+        final String namespace = tenant + "/my-ns";
+
+        admin.tenants().createTenant(tenant,
+                new TenantInfo(new HashSet<>(), Collections.singleton(pulsarCluster.getClusterName())));
+        admin.namespaces().createNamespace(namespace);
+
+        String[] topics = { "topic-1", "topic-2", namespace + "/topic-1" };
+        for (String topic : topics) {
+            try (Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) {
+                producer.send("msg");
+            }
+            String[] subscriptions = { "sub-1", "sub-2" };
+            for (String subscription : subscriptions) {
+                try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                        .topic(topic)
+                        .subscriptionName(subscription)
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                        .subscribe()) {
+                    Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+                    consumer.acknowledge(msg);
+                }
+            }
+        }
+
+        final String partitionedTopic = namespace + "/par-topic";
+        admin.topics().createPartitionedTopic(partitionedTopic, 3);
+
+        // TODO: the schema ledgers of a partitioned topic cannot be deleted completely now,
+        //   so we create producers/consumers without schema here
+        try (Producer<byte[]> producer = client.newProducer().topic(partitionedTopic).create()) {
+            producer.send("msg".getBytes());
+            try (Consumer<byte[]> consumer = client.newConsumer()
+                    .topic(partitionedTopic)
+                    .subscriptionName("my-sub")
+                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscribe()) {
+                Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+                consumer.acknowledge(msg);
+            }
+        }
+
+        pulsarCluster.getBrokers().forEach(ChaosContainer::stop);
+
+        assertTrue(getNumOfLedgers() > 0);
+        log.info("Before delete, cluster name: {}, num of ledgers: {}", pulsarCluster.getClusterName(), getNumOfLedgers());
+
+        String[] args = { "-zk", pulsarCluster.getZKConnString(),
+                "-cs", pulsarCluster.getCSConnString(),
+                "-c", pulsarCluster.getClusterName(),
+                "--bookkeeper-metadata-service-uri", metadataServiceUri };
+        PulsarClusterMetadataTeardown.main(args);
+
+
+        // 1. Check Bookie for number of ledgers
+        assertEquals(getNumOfLedgers(), 0);
+
+        // 2. Check ZooKeeper for relative nodes
+        final int zkOpTimeoutMs = 10000;
+        List<String> localZkNodes = ZkUtils.getChildrenInSingleNode(localZk, "/", zkOpTimeoutMs);
+        for (String node : PulsarClusterMetadataTeardown.localZkNodes) {
+            assertFalse(localZkNodes.contains(node));
+        }
+        List<String> clusterNodes = ZkUtils.getChildrenInSingleNode(configStoreZk, "/admin/clusters", zkOpTimeoutMs);
+        assertFalse(clusterNodes.contains(pulsarCluster.getClusterName()));
+    }
+
+    private long getNumOfLedgers() {
+        final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+        final CountDownLatch processDone = new CountDownLatch(1);
+        final AtomicLong numOfLedgers = new AtomicLong(0L);
+
+        ledgerManager.asyncProcessLedgers((ledgerId, cb) -> numOfLedgers.incrementAndGet(), (rc, path, ctx) -> {
+            returnCode.set(rc);
+            processDone.countDown();
+        }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+        try {
+            processDone.await(5, TimeUnit.SECONDS); // a timeout which is long enough
+        } catch (InterruptedException e) {
+            fail("asyncProcessLedgers failed", e);
+        }
+        return numOfLedgers.get();
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index db77ae6..b916eea 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -200,6 +200,10 @@
         return zkContainer.getContainerIpAddress() + ":" + zkContainer.getMappedPort(ZK_PORT);
     }
 
+    public String getCSConnString() {
+        return csContainer.getContainerIpAddress() + ":" + csContainer.getMappedPort(CS_PORT);
+    }
+
     public Network getNetwork() {
         return network;
     }
diff --git a/tests/integration/src/test/resources/pulsar-cli.xml b/tests/integration/src/test/resources/pulsar-cli.xml
index 56906a6..a5b8605 100644
--- a/tests/integration/src/test/resources/pulsar-cli.xml
+++ b/tests/integration/src/test/resources/pulsar-cli.xml
@@ -22,6 +22,7 @@
 <suite name="Pulsar CLI Integration Tests" verbose="2" annotations="JDK">
     <test name="pulsar-cli-test-suite" preserve-order="true" >
         <classes>
+            <class name="org.apache.pulsar.tests.integration.cli.ClusterMetadataTearDownTest"/>
             <class name="org.apache.pulsar.tests.integration.cli.CLITest" />
             <class name="org.apache.pulsar.tests.integration.cli.HealthCheckTest" />
             <class name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />