| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.tests.integration.cli; |
| |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.meta.MetadataBookieDriver; |
| import org.apache.bookkeeper.meta.MetadataDrivers; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.util.ZkUtils; |
| import org.apache.pulsar.PulsarClusterMetadataTeardown; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.SubscriptionInitialPosition; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.apache.pulsar.tests.integration.containers.ChaosContainer; |
| import org.apache.pulsar.tests.integration.topologies.PulsarCluster; |
| import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.testng.annotations.AfterSuite; |
| import org.testng.annotations.BeforeSuite; |
| import org.testng.annotations.Test; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertFalse; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| @Slf4j |
| public class ClusterMetadataTearDownTest { |
| |
| private final PulsarClusterSpec spec = PulsarClusterSpec.builder() |
| .clusterName("ClusterMetadataTearDownTest-" + UUID.randomUUID().toString().substring(0, 8)) |
| .numProxies(0) |
| .numFunctionWorkers(0) |
| .enablePrestoWorker(false) |
| .build(); |
| |
| private final PulsarCluster pulsarCluster = PulsarCluster.forSpec(spec); |
| |
| private ZooKeeper localZk; |
| private ZooKeeper configStoreZk; |
| |
| private String metadataServiceUri; |
| private MetadataBookieDriver driver; |
| private LedgerManager ledgerManager; |
| |
| private PulsarClient client; |
| private PulsarAdmin admin; |
| |
| @BeforeSuite |
| public void setupCluster() throws Exception { |
| pulsarCluster.start(); |
| metadataServiceUri = "zk+null://" + pulsarCluster.getZKConnString() + "/ledgers"; |
| |
| final int sessionTimeoutMs = 30000; |
| localZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getZKConnString(), sessionTimeoutMs); |
| configStoreZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getCSConnString(), sessionTimeoutMs); |
| |
| driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUri)); |
| driver.initialize(new ServerConfiguration().setMetadataServiceUri(metadataServiceUri), () -> {}, NullStatsLogger.INSTANCE); |
| ledgerManager = driver.getLedgerManagerFactory().newLedgerManager(); |
| |
| client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build(); |
| admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); |
| } |
| |
| @AfterSuite |
| public void tearDownCluster() { |
| try { |
| ledgerManager.close(); |
| } catch (IOException e) { |
| log.warn("Failed to close ledger manager: ", e); |
| } |
| driver.close(); |
| try { |
| configStoreZk.close(); |
| } catch (InterruptedException ignored) { |
| } |
| try { |
| localZk.close(); |
| } catch (InterruptedException ignored) { |
| } |
| pulsarCluster.stop(); |
| } |
| |
| @Test |
| public void testDeleteCluster() throws Exception { |
| assertEquals(getNumOfLedgers(), 0); |
| final String tenant = "my-tenant"; |
| final String namespace = tenant + "/my-ns"; |
| |
| admin.tenants().createTenant(tenant, |
| new TenantInfo(new HashSet<>(), Collections.singleton(pulsarCluster.getClusterName()))); |
| admin.namespaces().createNamespace(namespace); |
| |
| String[] topics = { "topic-1", "topic-2", namespace + "/topic-1" }; |
| for (String topic : topics) { |
| try (Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) { |
| producer.send("msg"); |
| } |
| String[] subscriptions = { "sub-1", "sub-2" }; |
| for (String subscription : subscriptions) { |
| try (Consumer<String> consumer = client.newConsumer(Schema.STRING) |
| .topic(topic) |
| .subscriptionName(subscription) |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscribe()) { |
| Message<String> msg = consumer.receive(5, TimeUnit.SECONDS); |
| consumer.acknowledge(msg); |
| } |
| } |
| } |
| |
| final String partitionedTopic = namespace + "/par-topic"; |
| admin.topics().createPartitionedTopic(partitionedTopic, 3); |
| |
| // TODO: the schema ledgers of a partitioned topic cannot be deleted completely now, |
| // so we create producers/consumers without schema here |
| try (Producer<byte[]> producer = client.newProducer().topic(partitionedTopic).create()) { |
| producer.send("msg".getBytes()); |
| try (Consumer<byte[]> consumer = client.newConsumer() |
| .topic(partitionedTopic) |
| .subscriptionName("my-sub") |
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) |
| .subscribe()) { |
| Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS); |
| consumer.acknowledge(msg); |
| } |
| } |
| |
| pulsarCluster.getBrokers().forEach(ChaosContainer::stop); |
| |
| assertTrue(getNumOfLedgers() > 0); |
| log.info("Before delete, cluster name: {}, num of ledgers: {}", pulsarCluster.getClusterName(), getNumOfLedgers()); |
| |
| String[] args = { "-zk", pulsarCluster.getZKConnString(), |
| "-cs", pulsarCluster.getCSConnString(), |
| "-c", pulsarCluster.getClusterName(), |
| "--bookkeeper-metadata-service-uri", metadataServiceUri }; |
| PulsarClusterMetadataTeardown.main(args); |
| |
| |
| // 1. Check Bookie for number of ledgers |
| assertEquals(getNumOfLedgers(), 0); |
| |
| // 2. Check ZooKeeper for relative nodes |
| final int zkOpTimeoutMs = 10000; |
| List<String> localZkNodes = ZkUtils.getChildrenInSingleNode(localZk, "/", zkOpTimeoutMs); |
| for (String node : PulsarClusterMetadataTeardown.localZkNodes) { |
| assertFalse(localZkNodes.contains(node)); |
| } |
| List<String> clusterNodes = ZkUtils.getChildrenInSingleNode(configStoreZk, "/admin/clusters", zkOpTimeoutMs); |
| assertFalse(clusterNodes.contains(pulsarCluster.getClusterName())); |
| } |
| |
| private long getNumOfLedgers() { |
| final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK); |
| final CountDownLatch processDone = new CountDownLatch(1); |
| final AtomicLong numOfLedgers = new AtomicLong(0L); |
| |
| ledgerManager.asyncProcessLedgers((ledgerId, cb) -> numOfLedgers.incrementAndGet(), (rc, path, ctx) -> { |
| returnCode.set(rc); |
| processDone.countDown(); |
| }, null, BKException.Code.OK, BKException.Code.ReadException); |
| |
| try { |
| processDone.await(5, TimeUnit.SECONDS); // a timeout which is long enough |
| } catch (InterruptedException e) { |
| fail("asyncProcessLedgers failed", e); |
| } |
| return numOfLedgers.get(); |
| } |
| |
| } |