blob: 96e908190351f3b568a98efebd6b6f4058168989 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.cli;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.PulsarClusterMetadataTeardown;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.tests.integration.containers.ChaosContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.zookeeper.ZooKeeper;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@Slf4j
public class ClusterMetadataTearDownTest {
private final PulsarClusterSpec spec = PulsarClusterSpec.builder()
.clusterName("ClusterMetadataTearDownTest-" + UUID.randomUUID().toString().substring(0, 8))
.numProxies(0)
.numFunctionWorkers(0)
.enablePrestoWorker(false)
.build();
private final PulsarCluster pulsarCluster = PulsarCluster.forSpec(spec);
private ZooKeeper localZk;
private ZooKeeper configStoreZk;
private String metadataServiceUri;
private MetadataBookieDriver driver;
private LedgerManager ledgerManager;
private PulsarClient client;
private PulsarAdmin admin;
@BeforeSuite
public void setupCluster() throws Exception {
pulsarCluster.start();
metadataServiceUri = "zk+null://" + pulsarCluster.getZKConnString() + "/ledgers";
final int sessionTimeoutMs = 30000;
localZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getZKConnString(), sessionTimeoutMs);
configStoreZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getCSConnString(), sessionTimeoutMs);
driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUri));
driver.initialize(new ServerConfiguration().setMetadataServiceUri(metadataServiceUri), () -> {}, NullStatsLogger.INSTANCE);
ledgerManager = driver.getLedgerManagerFactory().newLedgerManager();
client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
}
@AfterSuite
public void tearDownCluster() {
try {
ledgerManager.close();
} catch (IOException e) {
log.warn("Failed to close ledger manager: ", e);
}
driver.close();
try {
configStoreZk.close();
} catch (InterruptedException ignored) {
}
try {
localZk.close();
} catch (InterruptedException ignored) {
}
pulsarCluster.stop();
}
@Test
public void testDeleteCluster() throws Exception {
assertEquals(getNumOfLedgers(), 0);
final String tenant = "my-tenant";
final String namespace = tenant + "/my-ns";
admin.tenants().createTenant(tenant,
new TenantInfo(new HashSet<>(), Collections.singleton(pulsarCluster.getClusterName())));
admin.namespaces().createNamespace(namespace);
String[] topics = { "topic-1", "topic-2", namespace + "/topic-1" };
for (String topic : topics) {
try (Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) {
producer.send("msg");
}
String[] subscriptions = { "sub-1", "sub-2" };
for (String subscription : subscriptions) {
try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
}
}
final String partitionedTopic = namespace + "/par-topic";
admin.topics().createPartitionedTopic(partitionedTopic, 3);
// TODO: the schema ledgers of a partitioned topic cannot be deleted completely now,
// so we create producers/consumers without schema here
try (Producer<byte[]> producer = client.newProducer().topic(partitionedTopic).create()) {
producer.send("msg".getBytes());
try (Consumer<byte[]> consumer = client.newConsumer()
.topic(partitionedTopic)
.subscriptionName("my-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
}
pulsarCluster.getBrokers().forEach(ChaosContainer::stop);
assertTrue(getNumOfLedgers() > 0);
log.info("Before delete, cluster name: {}, num of ledgers: {}", pulsarCluster.getClusterName(), getNumOfLedgers());
String[] args = { "-zk", pulsarCluster.getZKConnString(),
"-cs", pulsarCluster.getCSConnString(),
"-c", pulsarCluster.getClusterName(),
"--bookkeeper-metadata-service-uri", metadataServiceUri };
PulsarClusterMetadataTeardown.main(args);
// 1. Check Bookie for number of ledgers
assertEquals(getNumOfLedgers(), 0);
// 2. Check ZooKeeper for relative nodes
final int zkOpTimeoutMs = 10000;
List<String> localZkNodes = ZkUtils.getChildrenInSingleNode(localZk, "/", zkOpTimeoutMs);
for (String node : PulsarClusterMetadataTeardown.localZkNodes) {
assertFalse(localZkNodes.contains(node));
}
List<String> clusterNodes = ZkUtils.getChildrenInSingleNode(configStoreZk, "/admin/clusters", zkOpTimeoutMs);
assertFalse(clusterNodes.contains(pulsarCluster.getClusterName()));
}
private long getNumOfLedgers() {
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
final AtomicLong numOfLedgers = new AtomicLong(0L);
ledgerManager.asyncProcessLedgers((ledgerId, cb) -> numOfLedgers.incrementAndGet(), (rc, path, ctx) -> {
returnCode.set(rc);
processDone.countDown();
}, null, BKException.Code.OK, BKException.Code.ReadException);
try {
processDone.await(5, TimeUnit.SECONDS); // a timeout which is long enough
} catch (InterruptedException e) {
fail("asyncProcessLedgers failed", e);
}
return numOfLedgers.get();
}
}