| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.nullable; |
| import static org.powermock.api.mockito.PowerMockito.doAnswer; |
| import static org.powermock.api.mockito.PowerMockito.spy; |
| |
| import com.github.benmanes.caffeine.cache.AsyncLoadingCache; |
| import com.google.common.collect.Sets; |
| |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| |
| import org.apache.commons.lang3.mutable.MutableBoolean; |
| import org.apache.commons.lang3.mutable.MutableObject; |
| import org.apache.jute.Record; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.lookup.LookupResult; |
| import org.apache.pulsar.broker.namespace.LookupOptions; |
| import org.apache.pulsar.broker.namespace.NamespaceService; |
| import org.apache.pulsar.broker.namespace.OwnedBundle; |
| import org.apache.pulsar.broker.namespace.OwnershipCache; |
| import org.apache.pulsar.broker.namespace.ServiceUnitZkUtils; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.api.Consumer; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.TenantInfo; |
| import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.proto.CreateRequest; |
| import org.apache.zookeeper.proto.DeleteRequest; |
| import org.apache.zookeeper.proto.ReplyHeader; |
| import org.apache.zookeeper.server.ByteBufferInputStream; |
| import org.apache.zookeeper.server.Request; |
| import org.apache.zookeeper.server.ServerCnxn; |
| import org.apache.zookeeper.server.ZooKeeperServer; |
| import org.mockito.stubbing.Answer; |
| import org.powermock.reflect.Whitebox; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import java.util.Optional; |
| |
| @Test(groups = "broker") |
| public class TopicOwnerTest { |
| |
| private static final Logger log = LoggerFactory.getLogger(TopicOwnerTest.class); |
| |
| LocalBookkeeperEnsemble bkEnsemble; |
| protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; |
| protected static final int BROKER_COUNT = 5; |
| protected ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT]; |
| protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; |
| protected PulsarService leaderPulsar; |
| protected PulsarAdmin leaderAdmin; |
| protected String testCluster = "my-cluster"; |
| protected String testTenant = "my-tenant"; |
| protected String testNamespace = testTenant + "/my-ns"; |
| |
| @BeforeMethod |
| void setup() throws Exception { |
| log.info("---- Initializing TopicOwnerTest -----"); |
| // Start local bookkeeper ensemble |
| bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0); |
| bkEnsemble.start(); |
| |
| // start brokers |
| for (int i = 0; i < BROKER_COUNT; i++) { |
| ServiceConfiguration config = new ServiceConfiguration(); |
| config.setBrokerServicePort(Optional.of(0)); |
| config.setClusterName("my-cluster"); |
| config.setAdvertisedAddress("localhost"); |
| config.setWebServicePort(Optional.of(0)); |
| config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); |
| config.setDefaultNumberOfNamespaceBundles(1); |
| config.setLoadBalancerEnabled(false); |
| configurations[i] = config; |
| |
| pulsarServices[i] = new PulsarService(config); |
| pulsarServices[i].start(); |
| |
| // Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily. |
| while (i == 0 && !pulsarServices[0].getLeaderElectionService().isLeader()) { |
| Thread.sleep(10); |
| } |
| |
| pulsarAdmins[i] = PulsarAdmin.builder() |
| .serviceHttpUrl(pulsarServices[i].getWebServiceAddress()) |
| .build(); |
| } |
| leaderPulsar = pulsarServices[0]; |
| leaderAdmin = pulsarAdmins[0]; |
| Thread.sleep(1000); |
| |
| pulsarAdmins[0].clusters().createCluster(testCluster, new ClusterData(pulsarServices[0].getWebServiceAddress())); |
| TenantInfo tenantInfo = new TenantInfo(); |
| tenantInfo.setAllowedClusters(Sets.newHashSet(testCluster)); |
| pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo); |
| pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16); |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| void tearDown() throws Exception { |
| for (int i = 0; i < BROKER_COUNT; i++) { |
| pulsarServices[i].close(); |
| pulsarAdmins[i].close(); |
| } |
| bkEnsemble.stop(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroker() { |
| // Spy leader namespace service to inject authorized broker for namespace-bundle from leader, |
| // this is a safe operation since it is just an recommendation if namespace-bundle has no owner |
| // currently. Namespace-bundle ownership contention is an atomic operation through zookeeper. |
| NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService(); |
| NamespaceService spyLeaderNamespaceService = spy(leaderNamespaceService); |
| final MutableObject<PulsarService> leaderAuthorizedBroker = new MutableObject<>(); |
| Answer<CompletableFuture<Optional<LookupResult>>> answer = invocation -> { |
| PulsarService pulsarService = leaderAuthorizedBroker.getValue(); |
| if (pulsarService == null) { |
| return (CompletableFuture<Optional<LookupResult>>) invocation.callRealMethod(); |
| } |
| LookupResult lookupResult = new LookupResult( |
| pulsarService.getWebServiceAddress(), |
| pulsarService.getWebServiceAddressTls(), |
| pulsarService.getBrokerServiceUrl(), |
| pulsarService.getBrokerServiceUrlTls(), |
| true); |
| return CompletableFuture.completedFuture(Optional.of(lookupResult)); |
| }; |
| doAnswer(answer).when(spyLeaderNamespaceService).getBrokerServiceUrlAsync(any(TopicName.class), any(LookupOptions.class)); |
| Whitebox.setInternalState(leaderPulsar, "nsService", spyLeaderNamespaceService); |
| return leaderAuthorizedBroker; |
| } |
| |
| private CompletableFuture<Void> watchZookeeperReconnect(ZooKeeper zooKeeper) throws Exception { |
| CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>(); |
| zooKeeper.exists("/", (WatchedEvent event) -> { |
| Watcher.Event.KeeperState state = event.getState(); |
| if (state == Watcher.Event.KeeperState.SyncConnected) { |
| reconnectedFuture.complete(null); |
| } |
| }); |
| return reconnectedFuture; |
| } |
| |
| @FunctionalInterface |
| interface RequestMatcher { |
| boolean match(Request request) throws Exception; |
| } |
| |
| private void spyZookeeperToDisconnectBeforePersist(ZooKeeper zooKeeper, RequestMatcher matcher) { |
| ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer(); |
| ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper); |
| ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer); |
| |
| // Spy zk server connection to close connection to cause connection loss after namespace-bundle |
| // deleted successfully. This mimics crash of connected zk server after committing requested operation. |
| Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer); |
| doAnswer(invocation -> { |
| Request request = invocation.getArgument(0); |
| if (request.sessionId != zooKeeper.getSessionId()) { |
| return invocation.callRealMethod(); |
| } |
| if (!matcher.match(request)) { |
| return invocation.callRealMethod(); |
| } |
| Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer); |
| bkEnsemble.disconnectZookeeper(zooKeeper); |
| return null; |
| }).when(spyZooKeeperServer).submitRequest(any(Request.class)); |
| } |
| |
| private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMatcher matcher) { |
| ZooKeeperServer zooKeeperServer = bkEnsemble.getZkServer(); |
| ServerCnxn zkServerConnection = bkEnsemble.getZookeeperServerConnection(zooKeeper); |
| ZooKeeperServer spyZooKeeperServer = spy(zooKeeperServer); |
| |
| // Spy zk server connection to close connection to cause connection loss after namespace-bundle |
| // deleted successfully. This mimics crash of connected zk server after committing requested operation. |
| Whitebox.setInternalState(zkServerConnection, "zkServer", spyZooKeeperServer); |
| MutableBoolean disconnected = new MutableBoolean(); |
| doAnswer(invocation -> { |
| Request request = invocation.getArgument(0); |
| if (request.sessionId != zooKeeper.getSessionId()) { |
| return invocation.callRealMethod(); |
| } |
| if (!matcher.match(request)) { |
| return invocation.callRealMethod(); |
| } |
| |
| ServerCnxn spyZkServerConnection1 = spy(zkServerConnection); |
| doAnswer(responseInvocation -> { |
| synchronized (disconnected) { |
| ReplyHeader replyHeader = responseInvocation.getArgument(0); |
| if (replyHeader.getXid() == request.cxid && replyHeader.getErr() == 0) { |
| Whitebox.setInternalState(zkServerConnection, "zkServer", zooKeeperServer); |
| disconnected.setTrue(); |
| bkEnsemble.disconnectZookeeper(zooKeeper); |
| } else if (disconnected.isFalse()) { |
| return responseInvocation.callRealMethod(); |
| } |
| return null; |
| } |
| }).when(spyZkServerConnection1).sendResponse(any(ReplyHeader.class), nullable(Record.class), any(String.class)); |
| Whitebox.setInternalState(request, "cnxn", spyZkServerConnection1); |
| return invocation.callRealMethod(); |
| }).when(spyZooKeeperServer).submitRequest(any(Request.class)); |
| } |
| |
| @Test |
| public void testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated() throws Exception { |
| String topic1 = "persistent://my-tenant/my-ns/topic-1"; |
| NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService(); |
| NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1)); |
| |
| final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker(); |
| |
| PulsarService pulsar1 = pulsarServices[1]; |
| final ZooKeeper zooKeeper1 = pulsar1.getZkClient(); |
| |
| final CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1); |
| |
| String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle); |
| |
| spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> { |
| if (request.type != ZooDefs.OpCode.create) { |
| return false; |
| } |
| |
| CreateRequest createRequest = new CreateRequest(); |
| ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), createRequest); |
| return createRequest.getPath().contains(namespaceBundlePath); |
| }); |
| |
| leaderAuthorizedBroker.setValue(pulsar1); |
| |
| try { |
| // Trigger ownership acquiring and zookeeper disconnecting before ownership node created. |
| // |
| // Ignore its execution result since whether it is fail or not depends on concrete implementation. |
| pulsarAdmins[1].lookups().lookupTopic(topic1); |
| } catch (Exception ex) { |
| // Ignored intentionally. |
| } |
| |
| reconnectedFuture.join(); |
| |
| // We don't known whether previous lookup was successful or not, but now all lookups should succeed. |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| |
| pulsar1.getBrokerService().getTopic(topic1, true).join(); |
| |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| } |
| |
| @Test |
| public void testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws Exception { |
| String topic1 = "persistent://my-tenant/my-ns/topic-1"; |
| NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService(); |
| NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1)); |
| |
| final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker(); |
| |
| PulsarService pulsar1 = pulsarServices[1]; |
| final ZooKeeper zooKeeper1 = pulsar1.getZkClient(); |
| |
| final CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1); |
| |
| String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle); |
| |
| spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> { |
| if (request.type != ZooDefs.OpCode.create) { |
| return false; |
| } |
| |
| CreateRequest createRequest = new CreateRequest(); |
| ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), createRequest); |
| return createRequest.getPath().contains(namespaceBundlePath); |
| }); |
| |
| leaderAuthorizedBroker.setValue(pulsar1); |
| |
| try { |
| // Trigger ownership acquiring and zookeeper disconnecting after ownership node created. |
| // |
| // Ignore its execution result since whether it is fail or not depends on concrete implementation. |
| pulsarAdmins[1].lookups().lookupTopic(topic1); |
| } catch (Exception ex) { |
| // Ignored intentionally. |
| } |
| |
| reconnectedFuture.join(); |
| |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| |
| pulsar1.getBrokerService().getTopic(topic1, true).join(); |
| |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| } |
| |
| @Test |
| public void testReestablishOwnershipAfterInvalidateCache() throws Exception { |
| String topic1 = "persistent://my-tenant/my-ns/topic-1"; |
| NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService(); |
| NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1)); |
| |
| final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker(); |
| |
| PulsarService pulsar1 = pulsarServices[1]; |
| |
| leaderAuthorizedBroker.setValue(pulsar1); |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| |
| OwnershipCache ownershipCache1 = pulsar1.getNamespaceService().getOwnershipCache(); |
| AsyncLoadingCache<String, OwnedBundle> ownedBundlesCache1 = Whitebox.getInternalState(ownershipCache1, "ownedBundlesCache"); |
| |
| leaderAuthorizedBroker.setValue(null); |
| |
| ownedBundlesCache1.synchronous().invalidate(ServiceUnitZkUtils.path(namespaceBundle)); |
| Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle)); |
| |
| // pulsar1 is still owner in zk. |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| |
| // Reestablish ownership through lookup ownership. |
| Assert.assertNull(ownershipCache1.getOwnedBundle(namespaceBundle)); |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); |
| |
| // Reestablish ownership through check ownership. |
| ownedBundlesCache1.synchronous().invalidate(ServiceUnitZkUtils.path(namespaceBundle)); |
| ownershipCache1.checkOwnership(namespaceBundle).join(); |
| Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); |
| |
| // Reestablish ownership through load topic. |
| ownedBundlesCache1.synchronous().invalidate(ServiceUnitZkUtils.path(namespaceBundle)); |
| pulsar1.getBrokerService().getTopic(topic1, true).join(); |
| Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); |
| pulsar1.getBrokerService().deleteTopic(topic1, true).join(); |
| |
| // Reestablish ownership through web. |
| ownedBundlesCache1.synchronous().invalidate(ServiceUnitZkUtils.path(namespaceBundle)); |
| pulsarAdmins[0].topics().createNonPartitionedTopic(topic1); |
| Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle)); |
| } |
| |
| @Test |
| public void testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted() throws Exception { |
| String topic1 = "persistent://my-tenant/my-ns/topic-1"; |
| NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService(); |
| NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1)); |
| |
| final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker(); |
| |
| PulsarService pulsar1 = pulsarServices[1]; |
| PulsarService pulsar2 = pulsarServices[2]; |
| |
| leaderAuthorizedBroker.setValue(pulsar1); |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| |
| ZooKeeper zooKeeper1 = pulsar1.getZkClient(); |
| |
| CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1); |
| |
| String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle); |
| |
| spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> { |
| if (request.type != ZooDefs.OpCode.delete) { |
| return false; |
| } |
| DeleteRequest deleteRequest = new DeleteRequest(); |
| ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), deleteRequest); |
| return deleteRequest.getPath().contains(namespaceBundlePath); |
| }); |
| |
| try { |
| pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(), namespaceBundle.getBundleRange()); |
| } catch (Exception ex) { |
| // Ignored since whether failing unloading when zk connection-loss is an implementation detail. |
| } |
| |
| reconnectedFuture.join(); |
| |
| leaderAuthorizedBroker.setValue(pulsar2); |
| |
| // We don't known whether previous unload was successful or not, but now all lookups should return same result. |
| final String currentBrokerServiceUrl = pulsarAdmins[0].lookups().lookupTopic(topic1); |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), currentBrokerServiceUrl); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), currentBrokerServiceUrl); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), currentBrokerServiceUrl); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), currentBrokerServiceUrl); |
| |
| pulsarAdmins[0].topics().createNonPartitionedTopic(topic1); |
| } |
| |
| @Test |
| public void testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws Exception { |
| String topic1 = "persistent://my-tenant/my-ns/topic-1"; |
| NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService(); |
| NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1)); |
| |
| final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker(); |
| |
| PulsarService pulsar1 = pulsarServices[1]; |
| PulsarService pulsar2 = pulsarServices[2]; |
| |
| leaderAuthorizedBroker.setValue(pulsar1); |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl()); |
| |
| ZooKeeper zooKeeper1 = pulsar1.getZkClient(); |
| |
| CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1); |
| |
| String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle); |
| |
| spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> { |
| if (request.type != ZooDefs.OpCode.delete) { |
| return false; |
| } |
| DeleteRequest deleteRequest = new DeleteRequest(); |
| ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), deleteRequest); |
| return deleteRequest.getPath().contains(namespaceBundlePath); |
| }); |
| |
| try { |
| pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(), namespaceBundle.getBundleRange()); |
| } catch (Exception ex) { |
| // Ignored since whether failing unloading when zk connection-loss is an implementation detail. |
| } |
| |
| reconnectedFuture.join(); |
| |
| leaderAuthorizedBroker.setValue(pulsar2); |
| |
| Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl()); |
| |
| pulsar2.getBrokerService().getTopic(topic1, true).join(); |
| |
| Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl()); |
| Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl()); |
| } |
| |
| @Test |
| public void testConnectToInvalidateBundleCacheBroker() throws Exception { |
| Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(), 16); |
| |
| final String topic1 = "persistent://my-tenant/my-ns/topic-1"; |
| final String topic2 = "persistent://my-tenant/my-ns/topic-2"; |
| |
| // Do topic lookup here for broker to own namespace bundles |
| String serviceUrlForTopic1 = pulsarAdmins[0].lookups().lookupTopic(topic1); |
| String serviceUrlForTopic2 = pulsarAdmins[0].lookups().lookupTopic(topic2); |
| |
| while (serviceUrlForTopic1.equals(serviceUrlForTopic2)) { |
| // Retry for bundle distribution, should make sure bundles for topic1 and topic2 are maintained in different brokers. |
| pulsarAdmins[0].namespaces().unload("my-tenant/my-ns"); |
| serviceUrlForTopic1 = pulsarAdmins[0].lookups().lookupTopic(topic1); |
| serviceUrlForTopic2 = pulsarAdmins[0].lookups().lookupTopic(topic2); |
| } |
| // All brokers will invalidate bundles cache after namespace bundle split |
| pulsarAdmins[0].namespaces().splitNamespaceBundle("my-tenant/my-ns", |
| pulsarServices[0].getNamespaceService().getBundle(TopicName.get(topic1)).getBundleRange(), |
| true, null); |
| |
| PulsarClient client = PulsarClient.builder(). |
| serviceUrl(serviceUrlForTopic1) |
| .build(); |
| |
| // Check connect to a topic which owner broker invalidate all namespace bundles cache |
| Consumer<byte[]> consumer = client.newConsumer().topic(topic2).subscriptionName("test").subscribe(); |
| Assert.assertTrue(consumer.isConnected()); |
| } |
| |
| @Test |
| public void testLookupPartitionedTopic() throws Exception { |
| final int partitions = 5; |
| final String topic = "persistent://my-tenant/my-ns/partitionedTopic"; |
| |
| pulsarAdmins[0].topics().createPartitionedTopic(topic, partitions); |
| |
| Map<String, String> allPartitionMap = pulsarAdmins[0].lookups().lookupPartitionedTopic(topic); |
| Assert.assertEquals(partitions, allPartitionMap.size()); |
| |
| Map<String, String> partitionedMap = new LinkedHashMap<>(); |
| for(int i = 0; i < partitions; i++) { |
| String partitionTopicName = topic + "-partition-" + i; |
| partitionedMap.put(partitionTopicName, pulsarAdmins[0].lookups().lookupTopic(partitionTopicName)); |
| } |
| |
| Assert.assertEquals(allPartitionMap.size(), partitionedMap.size()); |
| |
| for(Map.Entry<String, String> entry : allPartitionMap.entrySet()) { |
| Assert.assertTrue(entry.getValue().equalsIgnoreCase(partitionedMap.get(entry.getKey()))); |
| } |
| |
| } |
| |
| @Test |
| public void testListNonPersistentTopic() throws Exception { |
| final String topicName = "non-persistent://my-tenant/my-ns/my-topic"; |
| pulsarAdmins[0].topics().createPartitionedTopic(topicName, 16); |
| |
| PulsarClient client = PulsarClient.builder(). |
| serviceUrl(pulsarServices[0].getBrokerServiceUrl()) |
| .build(); |
| |
| Consumer<byte[]> consumer = client.newConsumer() |
| .topic(topicName) |
| .subscriptionName("my-sub") |
| .subscribe(); |
| |
| List<String> topics = pulsarAdmins[0].topics().getList("my-tenant/my-ns"); |
| Assert.assertEquals(topics.size(), 16); |
| for (String topic : topics) { |
| Assert.assertTrue(topic.contains("non-persistent")); |
| Assert.assertTrue(topic.contains("my-tenant/my-ns/my-topic")); |
| } |
| |
| consumer.close(); |
| client.close(); |
| } |
| } |