blob: ccdfb7aa77cb4894f7b84df335a4546cb0643bc6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.jute.Record;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class TopicOwnerTest {
private static final Logger log = LoggerFactory.getLogger(TopicOwnerTest.class);
LocalBookkeeperEnsemble bkEnsemble;
protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
protected static final int BROKER_COUNT = 5;
protected ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT];
protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
protected PulsarService leaderPulsar;
protected PulsarAdmin leaderAdmin;
protected String testCluster = "my-cluster";
protected String testTenant = "my-tenant";
protected String testNamespace = testTenant + "/my-ns";
@BeforeMethod
void setup() throws Exception {
log.info("---- Initializing TopicOwnerTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
pulsarServices[i] = new PulsarService(config);
pulsarServices[i].start();
// Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily.
while (i == 0 && !pulsarServices[0].getLeaderElectionService().isLeader()) {
Thread.sleep(10);
}
pulsarAdmins[i] = PulsarAdmin.builder()
.serviceHttpUrl(pulsarServices[i].getWebServiceAddress())
.build();
}
leaderPulsar = pulsarServices[0];
leaderAdmin = pulsarAdmins[0];
Thread.sleep(1000);
pulsarAdmins[0].clusters().createCluster(testCluster, ClusterData.builder().serviceUrl(pulsarServices[0].getWebServiceAddress()).build());
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Sets.newHashSet(testCluster))
.build();
pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo);
pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16);
}
@AfterMethod(alwaysRun = true)
void tearDown() throws Exception {
for (int i = 0; i < BROKER_COUNT; i++) {
pulsarServices[i].close();
pulsarAdmins[i].close();
}
bkEnsemble.stop();
}
@SuppressWarnings("unchecked")
private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker() {
// Spy leader namespace service to inject authorized broker for namespace-bundle from leader,
// this is a safe operation since it is just an recommendation if namespace-bundle has no owner
// currently. Namespace-bundle ownership contention is an atomic operation through zookeeper.
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceService spyLeaderNamespaceService = spy(leaderNamespaceService);
final MutableObject<PulsarService> leaderAuthorizedBroker = new MutableObject<>();
Answer<CompletableFuture<Optional<LookupResult>>> answer = invocation -> {
PulsarService pulsarService = leaderAuthorizedBroker.getValue();
if (pulsarService == null) {
return (CompletableFuture<Optional<LookupResult>>) invocation.callRealMethod();
}
LookupResult lookupResult = new LookupResult(
pulsarService.getWebServiceAddress(),
pulsarService.getWebServiceAddressTls(),
pulsarService.getBrokerServiceUrl(),
pulsarService.getBrokerServiceUrlTls(),
true);
return CompletableFuture.completedFuture(Optional.of(lookupResult));
};
doAnswer(answer).when(spyLeaderNamespaceService).getBrokerServiceUrlAsync(any(TopicName.class), any(LookupOptions.class));
Whitebox.setInternalState(leaderPulsar, "nsService", spyLeaderNamespaceService);
return leaderAuthorizedBroker;
}
private CompletableFuture<Void> watchMetadataStoreReconnect(MetadataStoreExtended store) {
CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>();
store.registerSessionListener(event -> {
if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
reconnectedFuture.complete(null);
}
});
return reconnectedFuture;
}
@FunctionalInterface
interface RequestMatcher {
boolean match(Request request) throws Exception;
}
private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer();
ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper);
ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer);
// Spy zk server connection to close connection to cause connection loss after namespace-bundle
// deleted successfully. This mimics crash of connected zk server after committing requested operation.
Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer);
doAnswer(invocation -> {
Request request = invocation.getArgument(0);
if (request.sessionId != zooKeeper.getSessionId()) {
return invocation.callRealMethod();
}
if (!matcher.match(request)) {
return invocation.callRealMethod();
}
Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer);
bkEnsemble.disconnectZookeeper(zooKeeper);
return null;
}).when(spyZooKeeperServer).submitRequest(any(Request.class));
}
private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher matcher) {
ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer();
ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper);
ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer);
// Spy zk server connection to close connection to cause connection loss after namespace-bundle
// deleted successfully. This mimics crash of connected zk server after committing requested operation.
Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer);
MutableBoolean disconnected = new MutableBoolean();
doAnswer(invocation -> {
Request request = invocation.getArgument(0);
if (request.sessionId != zooKeeper.getSessionId()) {
return invocation.callRealMethod();
}
if (!matcher.match(request)) {
return invocation.callRealMethod();
}
ServerCnxn spyZkServerConnection1 = spy(zkServerConnection);
doAnswer(responseInvocation -> {
synchronized (disconnected) {
ReplyHeader replyHeader = responseInvocation.getArgument(0);
if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) {
Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer);
disconnected.setTrue();
bkEnsemble.disconnectZookeeper(zooKeeper);
} else if (disconnected.isFalse()) {
return responseInvocation.callRealMethod();
}
return null;
}
}).when(spyZkServerConnection1).sendResponse(any(ReplyHeader.class), nullable(Record.class), any(String.class));
Whitebox.setInternalState(request, "cnxn", spyZkServerConnection1);
return invocation.callRealMethod();
}).when(spyZooKeeperServer).submitRequest(any(Request.class));
}
@Test
public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();
PulsarService pulsar1 = pulsarServices[1];
leaderAuthorizedBroker.setValue(pulsar1);
Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache();
AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache");
leaderAuthorizedBroker.setValue(null);
Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle));
ownedBundlesCache1.synchronous().invalidate(namespaceBundle);
Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle));
// pulsar1 is still owner in zk.
Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
}
@Test
public void testConnectToInvalidateBundleCacheBroker() throws Exception {
Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(), 16);
final String topic1 = "persistent://my-tenant/my-ns/topic-1";
final String topic2 = "persistent://my-tenant/my-ns/topic-2";
// Do topic lookup here for broker to own namespace bundles
String serviceUrlForTopic1 = pulsarAdmins[0].lookups().lookupTopic(topic1);
String serviceUrlForTopic2 = pulsarAdmins[0].lookups().lookupTopic(topic2);
while (serviceUrlForTopic1.equals(serviceUrlForTopic2)) {
// Retry for bundle distribution, should make sure bundles for topic1 and topic2 are maintained in different brokers.
pulsarAdmins[0].namespaces().unload("my-tenant/my-ns");
serviceUrlForTopic1 = pulsarAdmins[0].lookups().lookupTopic(topic1);
serviceUrlForTopic2 = pulsarAdmins[0].lookups().lookupTopic(topic2);
}
// All brokers will invalidate bundles cache after namespace bundle split
pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns",
pulsarServices[0].getNamespaceService().getBundle(TopicName.get(topic1)).getBundleRange(),
true, null);
@Cleanup
PulsarClient client = PulsarClient.builder().
serviceUrl(serviceUrlForTopic1)
.build();
// Check connect to a topic which owner broker invalidate all namespace bundles cache
Consumer<byte[]> consumer = client.newConsumer().topic(topic2).subscriptionName("test").subscribe();
Assert.assertTrue(consumer.isConnected());
}
@Test
public void testLookupPartitionedTopic() throws Exception {
final int partitions = 5;
final String topic = "persistent://my-tenant/my-ns/partitionedTopic";
pulsarAdmins[0].topics().createPartitionedTopic(topic, partitions);
Map<String, String> allPartitionMap = pulsarAdmins[0].lookups().lookupPartitionedTopic(topic);
Assert.assertEquals(partitions, allPartitionMap.size());
Map<String, String> partitionedMap = new LinkedHashMap<>();
for (int i = 0; i < partitions; i++) {
String partitionTopicName = topic + "-partition-" + i;
partitionedMap.put(partitionTopicName, pulsarAdmins[0].lookups().lookupTopic(partitionTopicName));
}
Assert.assertEquals(allPartitionMap.size(), partitionedMap.size());
for (Map.Entry<String, String> entry : allPartitionMap.entrySet()) {
Assert.assertTrue(entry.getValue().equalsIgnoreCase(partitionedMap.get(entry.getKey())));
}
}
@Test
public void testListNonPersistentTopic() throws Exception {
final String topicName = "non-persistent://my-tenant/my-ns/my-topic";
pulsarAdmins[0].topics().createPartitionedTopic(topicName, 16);
@Cleanup
PulsarClient client = PulsarClient.builder().
serviceUrl(pulsarServices[0].getBrokerServiceUrl())
.build();
Consumer<byte[]> consumer = client.newConsumer()
.topic(topicName)
.subscriptionName("my-sub")
.subscribe();
List<String> topics = pulsarAdmins[0].topics().getList("my-tenant/my-ns");
Assert.assertEquals(topics.size(), 16);
for (String topic : topics) {
Assert.assertTrue(topic.contains("non-persistent"));
Assert.assertTrue(topic.contains("my-tenant/my-ns/my-topic"));
}
consumer.close();
}
}