blob: 0b66653bac5cf92985ac43c74f0c8292f2c19ab4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
@Test(groups = "broker")
public class NamespaceServiceTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testSplitAndOwnBundles() throws Exception {
OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
// Split bundle and take ownership of split bundles
CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(
originalBundle,
false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
} catch (Exception e) {
// make sure: no failure
fail("split bundle failed", e);
}
NamespaceBundleFactory bundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
NamespaceBundles updatedNsBundles = bundleFactory.getBundles(nsname);
// new updated bundles shouldn't be null
assertNotNull(updatedNsBundles);
List<NamespaceBundle> bundleList = updatedNsBundles.getBundles();
assertNotNull(bundles);
NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
// (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle
Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(utilityFactory, nsname, bundles,
originalBundle);
assertNotNull(splitBundles);
Set<NamespaceBundle> splitBundleSet = new HashSet<>(splitBundles.getRight());
splitBundleSet.removeAll(bundleList);
assertTrue(splitBundleSet.isEmpty());
// (2) validate LocalZookeeper policies updated with newly created split
// bundles
LocalPolicies policies = pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
assertEquals(localZkBundles, updatedNsBundles);
log.info("Policies: {}", policies);
// (3) validate ownership of new split bundles by local owner
bundleList.forEach(b -> {
try {
byte[] data = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(b)).join().get().getValue();
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
});
}
@Test
public void testSplitMapWithRefreshedStatMap() throws Exception {
OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
ManagedLedger ledger = mock(ManagedLedger.class);
when(ledger.getCursors()).thenReturn(Lists.newArrayList());
doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
PersistentTopic topic = new PersistentTopic(topicName.toString(), ledger, pulsar.getBrokerService());
topic.initialize().join();
Method method = pulsar.getBrokerService().getClass().getDeclaredMethod("addTopicToStatsMaps",
TopicName.class, Topic.class);
method.setAccessible(true);
method.invoke(pulsar.getBrokerService(), topicName, topic);
String nspace = originalBundle.getNamespaceObject().toString();
List<Topic> list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace,
originalBundle.toString());
assertNotNull(list);
// Split bundle and take ownership of split bundles
CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(
originalBundle,
false,
NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
} catch (Exception e) {
// make sure: no failure
fail("split bundle failed", e);
}
// old bundle should be removed from status-map
list = this.pulsar.getBrokerService().getAllTopicsFromNamespaceBundle(nspace, originalBundle.toString());
assertTrue(list.isEmpty());
// status-map should be updated with new split bundles
NamespaceBundle splitBundle = pulsar.getNamespaceService().getBundle(topicName);
assertFalse(CollectionUtils.isEmpty(
this.pulsar.getBrokerService()
.getAllTopicsFromNamespaceBundle(nspace, splitBundle.toString())));
}
@Test
public void testIsServiceUnitDisabled() throws Exception {
OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
ManagedLedger ledger = mock(ManagedLedger.class);
when(ledger.getCursors()).thenReturn(Lists.newArrayList());
doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
assertFalse(namespaceService.isNamespaceBundleDisabled(originalBundle));
}
@Test
public void testRemoveOwnershipNamespaceBundle() throws Exception {
OwnershipCache ownershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
ManagedLedger ledger = mock(ManagedLedger.class);
when(ledger.getCursors()).thenReturn(Lists.newArrayList());
doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), ownershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("prop/use/ns1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle bundle = bundles.getBundles().get(0);
ownershipCache.tryAcquiringOwnership(bundle).get();
assertNotNull(ownershipCache.getOwnedBundle(bundle));
ownershipCache.removeOwnership(bundles).get();
assertNull(ownershipCache.getOwnedBundle(bundle));
}
@Test
public void testUnloadNamespaceBundleFailure() throws Exception {
final String topicName = "persistent://my-property/use/my-ns/my-topic1";
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = pulsar.getBrokerService()
.getTopics();
Topic spyTopic = spy(topics.get(topicName).get().get());
topics.clear();
CompletableFuture<Optional<Topic>> topicFuture = CompletableFuture.completedFuture(Optional.of(spyTopic));
// add mock topic
topics.put(topicName, topicFuture);
doAnswer((Answer<CompletableFuture<Void>>) invocation -> {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(new RuntimeException("first time failed"));
return result;
}).when(spyTopic).close(false);
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
pulsar.getNamespaceService().unloadNamespaceBundle(bundle).join();
Optional<GetResult> res = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(bundle)).join();
assertFalse(res.isPresent());
}
/**
* It verifies that unloading bundle will timeout and will not hung even if one of the topic-unloading stuck.
*
* @throws Exception
*/
@Test(timeOut = 6000)
public void testUnloadNamespaceBundleWithStuckTopic() throws Exception {
final String topicName = "persistent://my-property/use/my-ns/my-topic1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = pulsar.getBrokerService().getTopics();
Topic spyTopic = spy(topics.get(topicName).get().get());
topics.clear();
CompletableFuture<Optional<Topic>> topicFuture = CompletableFuture.completedFuture(Optional.of(spyTopic));
// add mock topic
topics.put(topicName, topicFuture);
// return uncompleted future as close-topic result.
doAnswer((Answer<CompletableFuture<Void>>) invocation -> new CompletableFuture<Void>()).when(spyTopic).close(false);
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
// try to unload bundle whose topic will be stuck
pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 1, TimeUnit.SECONDS).join();
Optional<GetResult> res = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(bundle)).join();
assertFalse(res.isPresent());
consumer.close();
}
/**
* <pre>
* It verifies that namespace service deserialize the load-report based on load-manager which active.
* 1. write candidate1- load report using {@link LoadReport} which is used by SimpleLoadManagerImpl
* 2. Write candidate2- load report using {@link LocalBrokerData} which is used by ModularLoadManagerImpl
* 3. try to get Lookup Result based on active load-manager
* </pre>
* @throws Exception
*/
@Test
public void testLoadReportDeserialize() throws Exception {
final String candidateBroker1 = "http://localhost:8000";
final String candidateBroker2 = "http://localhost:3000";
LoadReport lr = new LoadReport(null, null, candidateBroker1, null);
LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker2, null);
URI uri1 = new URI(candidateBroker1);
URI uri2 = new URI(candidateBroker2);
String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());
pulsar.getLocalMetadataStore().put(path1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)
).join();
pulsar.getLocalMetadataStore().put(path2,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)
).join();
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1, false, null).get();
// update to new load manager
LoadManager oldLoadManager = pulsar.getLoadManager()
.getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
oldLoadManager.stop();
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2, false, null).get();
Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
System.out.println(result2);
}
@Test
public void testCreateLookupResult() throws Exception {
final String candidateBroker = "pulsar://localhost:6650";
final String listenerUrl = "pulsar://localhost:7000";
final String listenerUrlTls = "pulsar://localhost:8000";
final String listener = "listenerName";
Map<String, AdvertisedListener> advertisedListeners = Maps.newHashMap();
advertisedListeners.put(listener, AdvertisedListener.builder().brokerServiceUrl(new URI(listenerUrl)).brokerServiceUrlTls(new URI(listenerUrlTls)).build());
LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker, null, advertisedListeners);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(), uri.getPort());
pulsar.getLocalMetadataStore().put(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)).join();
LookupResult noListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, null).get();
LookupResult withListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, listener).get();
Assert.assertEquals(noListener.getLookupData().getBrokerUrl(), candidateBroker);
Assert.assertEquals(withListener.getLookupData().getBrokerUrl(), listenerUrl);
Assert.assertEquals(withListener.getLookupData().getBrokerUrlTls(), listenerUrlTls);
System.out.println(withListener);
}
@Test
public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), MockOwnershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
// Split bundle and take ownership of split bundles
CompletableFuture<Void> result = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result.get();
} catch (Exception e) {
// make sure: no failure
fail("split bundle failed", e);
}
NamespaceBundleFactory bundleFactory = this.pulsar.getNamespaceService().getNamespaceBundleFactory();
NamespaceBundles updatedNsBundles = bundleFactory.getBundles(nsname);
// new updated bundles shouldn't be null
assertNotNull(updatedNsBundles);
List<NamespaceBundle> bundleList = updatedNsBundles.getBundles();
assertNotNull(bundles);
NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
// (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle
Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(utilityFactory, nsname, bundles,
originalBundle);
assertNotNull(splitBundles);
Set<NamespaceBundle> splitBundleSet = new HashSet<>(splitBundles.getRight());
splitBundleSet.removeAll(bundleList);
assertTrue(splitBundleSet.isEmpty());
// (2) validate LocalZookeeper policies updated with newly created split
// bundles
LocalPolicies policies = this.pulsar.getPulsarResources().getLocalPolicies().getLocalPolicies(nsname).get();
NamespaceBundles localZkBundles = bundleFactory.getBundles(nsname, policies.bundles);
assertEquals(localZkBundles, updatedNsBundles);
log.info("Policies: {}", policies);
// (3) validate ownership of new split bundles by local owner
bundleList.forEach(b -> {
try {
byte[] data = this.pulsar.getLocalMetadataStore().get(ServiceUnitUtils.path(b)).join().get().getValue();
NamespaceEphemeralData node = ObjectMapperFactory.getThreadLocal().readValue(data,
NamespaceEphemeralData.class);
Assert.assertEquals(node.getNativeUrl(), this.pulsar.getBrokerServiceUrl());
} catch (Exception e) {
fail("failed to setup ownership", e);
}
});
}
@Test
public void testRemoveOwnershipAndSplitBundle() throws Exception {
OwnershipCache ownershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), ownershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle originalBundle = bundles.findBundle(topicName);
CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(originalBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result1.get();
} catch (Exception e) {
fail("split bundle failed", e);
}
NamespaceBundles updatedNsBundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
assertNotNull(updatedNsBundles);
NamespaceBundle splittedBundle = updatedNsBundles.findBundle(topicName);
updatedNsBundles.getBundles().stream().filter(bundle -> !bundle.equals(splittedBundle)).forEach(bundle -> {
try {
ownershipCache.removeOwnership(bundle).get();
} catch (Exception e) {
fail("failed to remove ownership", e);
}
});
CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splittedBundle, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result2.get();
} catch (Exception e) {
// make sure: NPE does not occur
fail("split bundle failed", e);
}
}
@Test
public void testSplitBundleAndRemoveOldBundleFromOwnerShipCache() throws Exception {
OwnershipCache ownershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
ownership.setAccessible(true);
ownership.set(pulsar.getNamespaceService(), ownershipCache);
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
NamespaceBundle splitBundle1 = bundles.findBundle(topicName);
ownershipCache.tryAcquiringOwnership(splitBundle1);
CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(splitBundle1, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result1.get();
} catch (Exception e) {
fail("split bundle failed", e);
}
Awaitility.await().untilAsserted(()
-> assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle1)));
//unload split
bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
assertNotNull(bundles);
NamespaceBundle splitBundle2 = bundles.findBundle(topicName);
CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splitBundle2, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
try {
result2.get();
} catch (Exception e) {
// make sure: NPE does not occur
fail("split bundle failed", e);
}
Awaitility.await().untilAsserted(()
-> assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle2)));
}
@Test
public void testSplitLargestBundle() throws Exception {
String namespace = "prop/test/ns-abc2";
String topic = "persistent://" + namespace + "/t1-";
int totalTopics = 100;
BundlesData bundleData = BundlesData.builder().numBundles(10).build();
admin.namespaces().createNamespace(namespace, bundleData);
Consumer<byte[]>[] consumers = new Consumer[totalTopics];
for (int i = 0; i < totalTopics; i++) {
consumers[i] = pulsarClient.newConsumer().topic(topic + i).subscriptionName("my-subscriber-name")
.subscribe();
}
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get(namespace);
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
Map<String, Integer> topicCount = Maps.newHashMap();
int maxTopics = 0;
String maxBundle = null;
for (int i = 0; i < totalTopics; i++) {
String bundle = bundles.findBundle(TopicName.get(topic + i)).getBundleRange();
int count = topicCount.getOrDefault(bundle, 0) + 1;
topicCount.put(bundle, count);
if (count > maxTopics) {
maxTopics = count;
maxBundle = bundle;
}
}
String largestBundle = namespaceService.getNamespaceBundleFactory().getBundleWithHighestTopics(nsname)
.getBundleRange();
assertEquals(maxBundle, largestBundle);
for (int i = 0; i < totalTopics; i++) {
consumers[i].close();
}
admin.namespaces().splitNamespaceBundle(namespace, Policies.BundleType.LARGEST.toString(), false, null);
for (NamespaceBundle bundle : namespaceService.getNamespaceBundleFactory().getBundles(nsname).getBundles()) {
assertNotEquals(bundle.getBundleRange(), maxBundle);
}
}
/**
* Test bundle split with hot bundle which is serving highest load.
*
* @throws Exception
*/
@Test
public void testSplitBundleWithHighestThroughput() throws Exception {
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
restartBroker();
String namespace = "prop/test/ns-abc2";
String topic = "persistent://" + namespace + "/t1-";
int totalTopics = 100;
BundlesData bundleData = BundlesData.builder().numBundles(10).build();
admin.namespaces().createNamespace(namespace, bundleData);
Consumer<byte[]>[] consumers = new Consumer[totalTopics];
for (int i = 0; i < totalTopics; i++) {
consumers[i] = pulsarClient.newConsumer().topic(topic + i).subscriptionName("my-subscriber-name")
.subscribe();
}
NamespaceService namespaceService = pulsar.getNamespaceService();
NamespaceName nsname = NamespaceName.get(namespace);
NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
String bundle = bundles.findBundle(TopicName.get(topic + "0")).getBundleRange();
String path = ModularLoadManagerImpl.getBundleDataPath(bundle);
NamespaceBundleStats defaultStats = new NamespaceBundleStats();
defaultStats.msgThroughputIn = 100000;
defaultStats.msgThroughputOut = 100000;
BundleData bd = new BundleData(10, 19, defaultStats );
byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(bd);
pulsar.getLocalMetadataStore().put(path, data, Optional.empty());
String hotBundle = namespaceService.getNamespaceBundleFactory().getBundleWithHighestThroughput(nsname)
.getBundleRange();
assertEquals(bundle, hotBundle);
for (int i = 0; i < totalTopics; i++) {
consumers[i].close();
}
admin.namespaces().splitNamespaceBundle(namespace, Policies.BundleType.HOT.toString(), false, null);
for (NamespaceBundle b : namespaceService.getNamespaceBundleFactory().getBundles(nsname).getBundles()) {
assertNotEquals(b.getBundleRange(), hotBundle);
}
}
@Test
public void testHeartbeatNamespaceMatch() throws Exception {
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Field bCacheField = NamespaceBundleFactory.class.getDeclaredField("bundlesCache");
bCacheField.setAccessible(true);
((AsyncLoadingCache<NamespaceName, NamespaceBundles>) bCacheField.get(utilityFactory)).put(nsname,
CompletableFuture.completedFuture(bundles));
return utilityFactory.splitBundles(targetBundle, 2, null).join();
}
private static final Logger log = LoggerFactory.getLogger(NamespaceServiceTest.class);
}