blob: fdd1eb7272c307752ab56bafab55f30278d990d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;
import static org.apache.pulsar.broker.loadbalance.LoadManager.LOADBALANCE_BROKERS_ROOT;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Unit test for {@link BrokerRegistry}.
*/
@Slf4j
@Test(groups = "broker")
public class BrokerRegistryTest {
private final List<PulsarService> pulsarServices = new CopyOnWriteArrayList<>();
private final List<BrokerRegistryImpl> brokerRegistries = new CopyOnWriteArrayList<>();
private ExecutorService executor;
private LocalBookkeeperEnsemble bkEnsemble;
// Make sure the load manager don't register itself to `/loadbalance/brokers/{brokerId}`.
public static class MockLoadManager implements LoadManager {
@Override
public void start() throws PulsarServerException {
// No-op
}
@Override
public boolean isCentralized() {
return false;
}
@Override
public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
return Optional.empty();
}
@Override
public LoadManagerReport generateLoadReport() throws Exception {
return null;
}
@Override
public void setLoadReportForceUpdateFlag() {
// No-op
}
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op
}
@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op
}
@Override
public List<Metrics> getLoadBalancingMetrics() {
return null;
}
@Override
public void doLoadShedding() {
// No-op
}
@Override
public void doNamespaceBundleSplit() throws Exception {
// No-op
}
@Override
public void disableBroker() throws Exception {
// No-op
}
@Override
public Set<String> getAvailableBrokers() throws Exception {
return null;
}
@Override
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return null;
}
@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
return null;
}
@Override
public void stop() throws PulsarServerException {
// No-op
}
@Override
public void initialize(PulsarService pulsar) {
// No-op
}
}
@BeforeClass
void setup() throws Exception {
executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
}
@SneakyThrows
private PulsarService createPulsarService() {
ServiceConfiguration config = new ServiceConfiguration();
config.setLoadBalancerEnabled(false);
config.setLoadManagerClassName(MockLoadManager.class.getName());
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
PulsarService pulsar = spy(new PulsarService(config));
pulsarServices.add(pulsar);
return pulsar;
}
private BrokerRegistryImpl createBrokerRegistryImpl(PulsarService pulsar) {
BrokerRegistryImpl brokerRegistry = spy(new BrokerRegistryImpl(pulsar));
brokerRegistries.add(brokerRegistry);
return brokerRegistry;
}
@AfterClass(alwaysRun = true)
void shutdown() throws Exception {
executor.shutdownNow();
bkEnsemble.stop();
}
@AfterMethod(alwaysRun = true)
void cleanUp() {
log.info("Cleaning up the broker registry...");
brokerRegistries.forEach(brokerRegistry -> {
if (brokerRegistry.isStarted()) {
try {
brokerRegistry.close();
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
}
});
brokerRegistries.clear();
log.info("Cleaning up the pulsar services...");
pulsarServices.forEach(pulsarService -> {
try {
pulsarService.close();
} catch (PulsarServerException e) {
throw new RuntimeException(e);
}
});
pulsarServices.clear();
}
@Test(timeOut = 30 * 1000)
public void testRegisterAndLookup() throws Exception {
PulsarService pulsar1 = createPulsarService();
PulsarService pulsar2 = createPulsarService();
PulsarService pulsar3 = createPulsarService();
pulsar1.start();
pulsar2.start();
pulsar3.start();
BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
BrokerRegistryImpl brokerRegistry3 = createBrokerRegistryImpl(pulsar3);
Set<String> brokerIds = new HashSet<>();
brokerRegistry1.addListener((brokerId, type) -> {
brokerIds.add(brokerId);
});
brokerRegistry1.start();
brokerRegistry2.start();
Awaitility.await().atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertEquals(brokerIds.size(), 2));
assertEquals(brokerRegistry1.getAvailableBrokersAsync().get().size(), 2);
assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
// Check three broker cache are flush successes.
brokerRegistry3.start();
assertEquals(brokerRegistry3.getAvailableBrokersAsync().get().size(), 3);
Awaitility.await().atMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertEquals(brokerIds.size(), 3));
assertEquals(brokerIds, new HashSet<>(brokerRegistry1.getAvailableBrokersAsync().get()));
assertEquals(brokerIds, new HashSet<>(brokerRegistry2.getAvailableBrokersAsync().get()));
assertEquals(brokerIds, new HashSet<>(brokerRegistry3.getAvailableBrokersAsync().get()));
assertEquals(brokerIds, brokerRegistry1.getAvailableBrokerLookupDataAsync().get().keySet());
assertEquals(brokerIds, brokerRegistry2.getAvailableBrokerLookupDataAsync().get().keySet());
assertEquals(brokerIds, brokerRegistry3.getAvailableBrokerLookupDataAsync().get().keySet());
Optional<BrokerLookupData> lookupDataOpt =
brokerRegistry1.lookupAsync(brokerRegistry2.getBrokerId()).get();
assertTrue(lookupDataOpt.isPresent());
assertEquals(lookupDataOpt.get().getWebServiceUrl(), pulsar2.getSafeWebServiceAddress());
assertEquals(lookupDataOpt.get().getWebServiceUrlTls(), pulsar2.getWebServiceAddressTls());
assertEquals(lookupDataOpt.get().getPulsarServiceUrl(), pulsar2.getBrokerServiceUrl());
assertEquals(lookupDataOpt.get().getPulsarServiceUrlTls(), pulsar2.getBrokerServiceUrlTls());
assertEquals(lookupDataOpt.get().advertisedListeners(), pulsar2.getAdvertisedListeners());
assertEquals(lookupDataOpt.get().protocols(), pulsar2.getProtocolDataToAdvertise());
assertEquals(lookupDataOpt.get().persistentTopicsEnabled(), pulsar2.getConfiguration()
.isEnablePersistentTopics());
assertEquals(lookupDataOpt.get().nonPersistentTopicsEnabled(), pulsar2.getConfiguration()
.isEnableNonPersistentTopics());
assertEquals(lookupDataOpt.get().brokerVersion(), pulsar2.getBrokerVersion());
// Unregister and see the available brokers.
brokerRegistry1.unregister();
// After unregistering, the other broker registry lock manager's metadata cache might not be updated yet,
// so we need to wait for it to synchronize.
Awaitility.await().untilAsserted(() -> {
assertEquals(brokerRegistry2.getAvailableBrokersAsync().get().size(), 2);
});
}
@Test
public void testRegisterFailWithSameBrokerId() throws Exception {
PulsarService pulsar1 = createPulsarService();
PulsarService pulsar2 = createPulsarService();
pulsar1.start();
pulsar2.start();
doReturn(pulsar1.getBrokerId()).when(pulsar2).getBrokerId();
BrokerRegistryImpl brokerRegistry1 = createBrokerRegistryImpl(pulsar1);
BrokerRegistryImpl brokerRegistry2 = createBrokerRegistryImpl(pulsar2);
brokerRegistry1.start();
try {
brokerRegistry2.start();
fail();
} catch (Exception ex) {
log.info("Broker registry start failed.", ex);
assertTrue(ex instanceof PulsarServerException);
assertTrue(ex.getMessage().contains("LockBusyException"));
}
}
@Test
public void testCloseRegister() throws Exception {
PulsarService pulsar1 = createPulsarService();
pulsar1.start();
BrokerRegistryImpl brokerRegistry = createBrokerRegistryImpl(pulsar1);
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Init);
// Check state after stated.
brokerRegistry.start();
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);
// Add a listener
brokerRegistry.addListener((brokerId, type) -> {
// Ignore.
});
assertTrue(brokerRegistry.isStarted());
List<BiConsumer<String, NotificationType>> listeners =
WhiteboxImpl.getInternalState(brokerRegistry, "listeners");
assertFalse(listeners.isEmpty());
// Check state after unregister.
brokerRegistry.unregister();
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Started);
// Check state after re-register.
brokerRegistry.register();
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered);
// Check state after close.
brokerRegistry.close();
assertFalse(brokerRegistry.isStarted());
assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Closed);
listeners = WhiteboxImpl.getInternalState(brokerRegistry, "listeners");
assertTrue(listeners.isEmpty());
try {
brokerRegistry.getAvailableBrokersAsync().get();
fail();
} catch (Exception ex) {
log.info("Failed to getAvailableBrokersAsync.", ex);
assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
}
try {
brokerRegistry.getAvailableBrokerLookupDataAsync().get();
fail();
} catch (Exception ex) {
log.info("Failed to getAvailableBrokerLookupDataAsync.", ex);
assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
}
try {
brokerRegistry.lookupAsync("test").get();
fail();
} catch (Exception ex) {
log.info("Failed to lookupAsync.", ex);
assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
}
try {
brokerRegistry.addListener((brokerId, type) -> {
// Ignore.
});
fail();
} catch (Exception ex) {
log.info("Failed to lookupAsync.", ex);
assertTrue(FutureUtil.unwrapCompletionException(ex) instanceof IllegalStateException);
}
}
@Test
public void testIsVerifiedNotification() {
assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created, "/")));
assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created,
LOADBALANCE_BROKERS_ROOT + "xyz")));
assertFalse(BrokerRegistryImpl.isVerifiedNotification(new Notification(NotificationType.Created,
LOADBALANCE_BROKERS_ROOT)));
assertTrue(BrokerRegistryImpl.isVerifiedNotification(
new Notification(NotificationType.Created, LOADBALANCE_BROKERS_ROOT + "/brokerId")));
assertTrue(BrokerRegistryImpl.isVerifiedNotification(
new Notification(NotificationType.Created, LOADBALANCE_BROKERS_ROOT + "/brokerId/xyz")));
}
@Test
public void testKeyPath() {
String keyPath = BrokerRegistryImpl.keyPath("brokerId");
assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId");
}
public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) {
return WhiteboxImpl.getInternalState(brokerRegistry, BrokerRegistryImpl.State.class);
}
}