blob: 3894f7c7429088cbfdf6ab30571f93bd5b6085d4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.zookeeper.ZooKeeper;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
*/
public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
public BrokerBkEnsemblesTests() {
this(3);
}
public BrokerBkEnsemblesTests(int numberOfBookies) {
super(numberOfBookies);
}
/**
* It verifies that broker deletes cursor-ledger when broker-crashes without closing topic gracefully
*
* <pre>
* 1. Create topic : publish/consume-ack msgs to update new cursor-ledger
* 2. Verify cursor-ledger is created and ledger-znode present
* 3. Broker crashes: remove topic and managed-ledgers without closing
* 4. Recreate topic: publish/consume-ack msgs to update new cursor-ledger
* 5. Topic is recovered from old-ledger and broker deletes the old ledger
* 6. verify znode of old-ledger is deleted
* </pre>
*
* @throws Exception
*/
@Test
public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
ZooKeeper zk = bkEnsemble.getZkClient();
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
final String ns1 = "prop/usc/crash-broker";
admin.namespaces().createNamespace(ns1);
final String topic1 = "persistent://" + ns1 + "/my-topic";
// (1) create topic
// publish and ack messages so, cursor can create cursor-ledger and update metadata
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.subscribe();
Producer<byte[]> producer = client.newProducer().topic(topic1).create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message<byte[]> msg = null;
for (int i = 0; i < 10; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedCursorImpl cursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100);
// (2) validate cursor ledger is created and znode is present
long cursorLedgerId = cursor.getCursorLedger();
String ledgerPath = "/ledgers" + StringUtils.getHybridHierarchicalLedgerPath(cursorLedgerId);
Assert.assertNotNull(zk.exists(ledgerPath, false));
// (3) remove topic and managed-ledger from broker which means topic is not closed gracefully
consumer.close();
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field
.get(factory);
ledgers.clear();
// (4) Recreate topic
// publish and ack messages so, cursor can create cursor-ledger and update metadata
consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
producer = client.newProducer().topic(topic1).create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
for (int i = 0; i < 10; i++) {
msg = consumer.receive(1, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
// (5) Broker should create new cursor-ledger and remove old cursor-ledger
topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
final ManagedCursorImpl cursor1 = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next();
retryStrategically((test) -> cursor1.getState().equals("Open"), 5, 100);
long newCursorLedgerId = cursor1.getCursorLedger();
Assert.assertNotEquals(newCursorLedgerId, -1);
Assert.assertNotEquals(cursorLedgerId, newCursorLedgerId);
// cursor node must be deleted
Assert.assertNull(zk.exists(ledgerPath, false));
producer.close();
consumer.close();
client.close();
}
/**
* It verifies broker-configuration using which broker can skip non-recoverable data-ledgers.
*
* <pre>
* 1. publish messages in 5 data-ledgers each with 20 entries under managed-ledger
* 2. delete first 4 data-ledgers
* 3. consumer will fail to consume any message as first data-ledger is non-recoverable
* 4. enable dynamic config to skip non-recoverable data-ledgers
* 5. consumer will be able to consume 20 messages from last non-deleted ledger
*
* </pre>
*
* @throws Exception
*/
@Test
public void testSkipCorruptDataLedger() throws Exception {
// Ensure intended state for autoSkipNonRecoverableData
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
final String ns1 = "prop/usc/crash-broker";
final int totalMessages = 100;
final int totalDataLedgers = 5;
final int entriesPerLedger = totalMessages / totalDataLedgers;
try {
admin.namespaces().createNamespace(ns1);
} catch (Exception e) {
}
final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
// Create subscription
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
Field configField = ManagedCursorImpl.class.getDeclaredField("config");
configField.setAccessible(true);
// Create multiple data-ledger
ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
// bookkeeper client
Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
bookKeeperField.setAccessible(true);
// Create multiple data-ledger
BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
// (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger
Producer<byte[]> producer = client.newProducer().topic(topic1).create();
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
// validate: consumer is able to consume msg and close consumer after reading 1 entry
Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
consumer.close();
NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
// (2) delete first 4 data-ledgers
ledgerInfo.entrySet().forEach(entry -> {
if (!entry.equals(lastLedger)) {
assertEquals(entry.getValue().getEntries(), entriesPerLedger);
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
e.printStackTrace();
}
}
});
// clean managed-ledger and recreate topic to clean any data from the cache
producer.close();
pulsar.getBrokerService().removeTopicFromCache(topic1);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field
.get(factory);
ledgers.clear();
// (3) consumer will fail to consume any message as first data-ledger is non-recoverable
Message<byte[]> msg = null;
// start consuming message
consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
msg = consumer.receive(1, TimeUnit.SECONDS);
Assert.assertNull(msg);
consumer.close();
// (4) enable dynamic config to skip non-recoverable data-ledgers
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
retryStrategically((test) -> config.isAutoSkipNonRecoverableData(), 5, 100);
// (5) consumer will be able to consume 20 messages from last non-deleted ledger
consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
for (int i = 0; i < entriesPerLedger; i++) {
msg = consumer.receive();
System.out.println(i);
consumer.acknowledge(msg);
}
producer.close();
consumer.close();
client.close();
}
@Test(timeOut=20000)
public void testTopicWithWildCardChar() throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
final String ns1 = "prop/usc/topicWithSpecialChar";
try {
admin.namespaces().createNamespace(ns1);
} catch (Exception e) {
}
final String topic1 = "persistent://"+ns1+"/`~!@#$%^&*()-_+=[]://{}|\\;:'\"<>,./?-30e04524";
final String subName1 = "c1";
final byte[] content = "test".getBytes();
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
org.apache.pulsar.client.api.Producer<byte[]> producer = client.newProducer().topic(topic1).create();
producer.send(content);
Message<byte[]> msg = consumer.receive();
Assert.assertEquals(msg.getData(), content);
consumer.close();
producer.close();
client.close();
}
@Test
public void testDeleteTopicWithMissingData() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("prop/usc");
admin.namespaces().createNamespace(namespace);
String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic");
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.build();
@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test").subscribe();
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(topic)
.create();
producer.send("Hello");
// Stop all bookies, to get all data offline
bkEnsemble.stopBK();
// Unload the topic. Since all the bookies are down, the recovery
// will fail and the topic will not get reloaded.
admin.topics().unload(topic);
Thread.sleep(1000);
CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topic);
try {
future.get();
fail("Should have thrown exception");
} catch (ExecutionException e) {
// Expected
}
// Deletion must succeed
admin.topics().delete(topic);
// Topic will not be there after
assertEquals(pulsar.getBrokerService().getTopicIfExists(topic).join(), Optional.empty());
}
}