blob: 33066d0c3cd6e2de93f2a5983c0d38b57b7a779a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.manager.RedirectManager;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
* for the <code>PulsarService</code>.
* <p/>
* The <code>PulsarService</code> relies on this service for resource ownership operations.
* <p/>
* The focus of this phase is to bring up the system and be able to iterate and improve the services effectively.
* <p/>
*
* @see org.apache.pulsar.broker.PulsarService
*/
public class NamespaceService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
private final ServiceConfiguration config;
private final AtomicReference<LoadManager> loadManager;
private final PulsarService pulsar;
private final OwnershipCache ownershipCache;
private final MetadataCache<LocalBrokerData> localBrokerDataCache;
private final NamespaceBundleFactory bundleFactory;
private final String host;
public static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;
public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s";
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s";
private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;
private final List<NamespaceBundleSplitListener> bundleSplitListeners;
private final RedirectManager redirectManager;
private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();
private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();
/**
* Default constructor.
*/
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
this.host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
}
public void initialize() {
if (!getOwnershipCache().refreshSelfOwnerInfo()) {
throw new RuntimeException("Failed to refresh self owner info.");
}
}
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
long startTime = System.nanoTime();
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> {
// Do redirection if the cluster is in rollback or deploying.
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getBrokerId(), optResult.get(), topic);
return CompletableFuture.completedFuture(optResult);
}
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
} else {
// TODO: Add unit tests cover it.
return findBrokerServiceUrl(bundle, options);
}
});
});
future.thenAccept(optResult -> {
lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (optResult.isPresent()) {
if (optResult.get().isRedirect()) {
lookupRedirects.inc();
} else {
lookupAnswers.inc();
}
}
}).exceptionally(ex -> {
lookupFailures.inc();
return null;
});
return future;
}
private CompletableFuture<Optional<LookupResult>> findRedirectLookupResultAsync(ServiceUnitId bundle) {
if (isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
return CompletableFuture.completedFuture(Optional.empty());
}
return redirectManager.findRedirectLookupResultAsync();
}
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
.thenApply(bundles -> bundles.findBundle(topic));
}
public Optional<NamespaceBundle> getBundleIfPresent(TopicName topicName) {
Optional<NamespaceBundles> bundles = bundleFactory.getBundlesIfPresent(topicName.getNamespaceObject());
return bundles.map(b -> b.findBundle(topicName));
}
public NamespaceBundle getBundle(TopicName topicName) {
return bundleFactory.getBundles(topicName.getNamespaceObject()).findBundle(topicName);
}
public int getBundleCount(NamespaceName namespace) throws Exception {
return bundleFactory.getBundles(namespace).size();
}
private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
return bundleFactory.getFullBundle(fqnn);
}
private CompletableFuture<NamespaceBundle> getFullBundleAsync(NamespaceName fqnn) {
return bundleFactory.getFullBundleAsync(fqnn);
}
/**
* Return the URL of the broker who's owning a particular service unit in asynchronous way.
* <p>
* If the service unit is not owned, return a CompletableFuture with empty optional.
*/
public CompletableFuture<Optional<URL>> getWebServiceUrlAsync(ServiceUnitId suName, LookupOptions options) {
if (suName instanceof TopicName name) {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting web service URL of topic: {} - options: {}", name, options);
}
return getBundleAsync(name)
.thenCompose(namespaceBundle ->
internalGetWebServiceUrl(name, namespaceBundle, options));
}
if (suName instanceof NamespaceName namespaceName) {
return getFullBundleAsync(namespaceName)
.thenCompose(namespaceBundle ->
internalGetWebServiceUrl(null, namespaceBundle, options));
}
if (suName instanceof NamespaceBundle namespaceBundle) {
return internalGetWebServiceUrl(null, namespaceBundle, options);
}
throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
}
/**
* Return the URL of the broker who's owning a particular service unit.
* <p>
* If the service unit is not owned, return an empty optional
*/
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
return getWebServiceUrlAsync(suName, options)
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
}
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(@Nullable ServiceUnitId topic,
NamespaceBundle bundle,
LookupOptions options) {
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getBrokerId(), optResult.get(), topic);
try {
LookupData lookupData = optResult.get().getLookupData();
final String redirectUrl = options.isRequestHttps()
? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return CompletableFuture.completedFuture(Optional.of(new URL(redirectUrl)));
} catch (Exception e) {
// just log the exception, nothing else to do
LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
}
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<LookupResult>> future =
ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
findBrokerServiceUrl(bundle, options);
return future.thenApply(lookupResult -> {
if (lookupResult.isPresent()) {
try {
LookupData lookupData = lookupResult.get().getLookupData();
final String redirectUrl = options.isRequestHttps()
? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return Optional.of(new URL(redirectUrl));
} catch (Exception e) {
// just log the exception, nothing else to do
LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
}
}
return Optional.empty();
});
});
}
/**
* Register all the bootstrap name spaces including the heartbeat namespace.
*
* @throws PulsarServerException if an unexpected error occurs
*/
public void registerBootstrapNamespaces() throws PulsarServerException {
String brokerId = pulsar.getBrokerId();
// ensure that we own the heartbeat namespace
if (registerNamespace(getHeartbeatNamespace(brokerId, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}",
getHeartbeatNamespace(brokerId, config));
}
// ensure that we own the heartbeat namespace
if (registerNamespace(getHeartbeatNamespaceV2(brokerId, config), true)) {
LOG.info("added heartbeat namespace name in local cache: ns={}",
getHeartbeatNamespaceV2(brokerId, config));
}
// we may not need strict ownership checking for bootstrap names for now
for (String namespace : config.getBootstrapNamespaces()) {
if (registerNamespace(NamespaceName.get(namespace), false)) {
LOG.info("added bootstrap namespace name in local cache: ns={}", namespace);
}
}
}
/**
* Tries to register a namespace to this instance.
*
* @param nsname namespace name
* @param ensureOwned sets the behavior when the namespace is already owned by another broker.
* If this flag is set to true, then the method will throw an exception.
* If this flag is set to false, then the method will return false.
* @return true if the namespace was successfully registered, false otherwise
* @throws PulsarServerException if an error occurs when registering the namespace
*/
public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) throws PulsarServerException {
try {
// all pre-registered namespace is assumed to have bundles disabled
NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
final NamespaceEphemeralData otherData;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
} else {
otherData = ownershipCache.tryAcquiringOwnership(nsFullBundle).get();
}
if (StringUtils.equals(pulsar.getBrokerServiceUrl(), otherData.getNativeUrl())
|| StringUtils.equals(pulsar.getBrokerServiceUrlTls(), otherData.getNativeUrlTls())) {
if (nsFullBundle != null) {
// preload heartbeat namespace
pulsar.loadNamespaceTopics(nsFullBundle);
}
return true;
}
String msg = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s",
nsname,
StringUtils.defaultString(pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls()),
StringUtils.defaultString(otherData.getNativeUrl(), otherData.getNativeUrlTls()));
// ignore if not be owned for now
if (!ensureOwned) {
LOG.info(msg);
return false;
}
// should not happen
throw new IllegalStateException(msg);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new PulsarServerException(e);
}
}
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
/**
* Main internal method to lookup and setup ownership of service unit to a broker.
*
* @param bundle the namespace bundle
* @param options the lookup options
* @return the lookup result
*/
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
NamespaceBundle bundle, LookupOptions options) {
if (LOG.isDebugEnabled()) {
LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options);
}
ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
if (options.isAuthoritative()) {
targetMap = findingBundlesAuthoritative;
} else {
targetMap = findingBundlesNotAuthoritative;
}
return targetMap.computeIfAbsent(bundle, (k) -> {
CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>();
// First check if we or someone else already owns the bundle
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
if (nsData.isEmpty()) {
// No one owns this bundle
if (options.isReadOnly()) {
// Do not attempt to acquire ownership
future.complete(Optional.empty());
} else {
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
pulsar.getExecutor().execute(() -> searchForCandidateBroker(bundle, future, options));
}
} else if (nsData.get().isDisabled()) {
future.completeExceptionally(
new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
}
// find the target
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener =
nsData.get().getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
future.completeExceptionally(
new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
future.complete(Optional.of(new LookupResult(nsData.get(),
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString())));
}
} else {
future.complete(Optional.of(new LookupResult(nsData.get())));
}
}
}).exceptionally(exception -> {
LOG.warn("Failed to check owner for bundle {}: {}", bundle, exception.getMessage(), exception);
future.completeExceptionally(exception);
return null;
});
future.whenComplete((r, t) -> pulsar.getExecutor().execute(
() -> targetMap.remove(bundle)
));
return future;
});
}
private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
if (null == pulsar.getLeaderElectionService()) {
LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", bundle);
lookupFuture.completeExceptionally(
new IllegalStateException("The leader election has not yet been completed!"));
return;
}
String candidateBroker;
LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
// The leader election service was not initialized yet. This can happen because the broker service is
// initialized first, and it might start receiving lookup requests before the leader election service is
// fully initialized.
LOG.warn("Leader election service isn't initialized yet. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
boolean authoritativeRedirect = les.isLeader();
try {
// check if this is Heartbeat or SLAMonitor namespace
candidateBroker = checkHeartbeatNamespace(bundle);
if (candidateBroker == null) {
candidateBroker = checkHeartbeatNamespaceV2(bundle);
}
if (candidateBroker == null) {
String broker = getSLAMonitorBrokerName(bundle);
// checking if the broker is up and running
if (broker != null && isBrokerActive(broker)) {
candidateBroker = broker;
}
}
if (candidateBroker == null) {
Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader();
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getBrokerId();
} else {
LoadManager loadManager = this.loadManager.get();
boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader();
if (!makeLoadManagerDecisionOnThisBroker) {
// If leader is not active, fallback to pick the least loaded from current broker loadmanager
boolean leaderBrokerActive = currentLeader.isPresent()
&& isBrokerActive(currentLeader.get().getBrokerId());
if (!leaderBrokerActive) {
makeLoadManagerDecisionOnThisBroker = true;
if (currentLeader.isEmpty()) {
LOG.warn(
"The information about the current leader broker wasn't available. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
bundle);
} else {
LOG.warn(
"The current leader broker {} isn't active. "
+ "Handling load manager decisions in a decentralized way. "
+ "NamespaceBundle[{}]",
currentLeader.get(), bundle);
}
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (availableBroker.isEmpty()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
candidateBroker = currentLeader.get().getBrokerId();
}
}
}
} catch (Exception e) {
LOG.warn("Error when searching for candidate broker to acquire {}: {}", bundle, e.getMessage(), e);
lookupFuture.completeExceptionally(e);
return;
}
try {
Objects.requireNonNull(candidateBroker);
if (candidateBroker.equals(pulsar.getBrokerId())) {
// Load manager decided that the local broker should try to become the owner
ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
if (ownerInfo.isDisabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} is currently being unloaded", bundle);
}
lookupFuture.completeExceptionally(new IllegalStateException(
String.format("Namespace bundle %s is currently being unloaded", bundle)));
} else {
// Found owner for the namespace bundle
if (options.isLoadTopicsInBundle()) {
// Schedule the task to preload topics
pulsar.loadNamespaceTopics(bundle);
}
// find the target
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener =
ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
lookupFuture.completeExceptionally(
new PulsarServerException("the broker do not have "
+ options.getAdvertisedListenerName() + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
lookupFuture.complete(Optional.of(
new LookupResult(ownerInfo,
url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString())));
}
} else {
lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
}
}
}).exceptionally(exception -> {
LOG.warn("Failed to acquire ownership for namespace bundle {}: {}", bundle, exception);
lookupFuture.completeExceptionally(new PulsarServerException(
"Failed to acquire ownership for namespace bundle " + bundle, exception));
return null;
});
} else {
// Load managed decider some other broker should try to acquire ownership
if (LOG.isDebugEnabled()) {
LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", candidateBroker, bundle);
}
// Now setting the redirect url
createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
}
} catch (Exception e) {
LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", bundle, e.getMessage(), e);
lookupFuture.completeExceptionally(e);
}
}
public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName) {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null %s", candidateBroker);
String path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + candidateBroker;
localBrokerDataCache.get(path).thenAccept(reportData -> {
if (reportData.isPresent()) {
LocalBrokerData lookupData = reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
lookupFuture.completeExceptionally(
new PulsarServerException(
"the broker do not have " + advertisedListenerName + " listener"));
} else {
URI url = listener.getBrokerServiceUrl();
URI urlTls = listener.getBrokerServiceUrlTls();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), url == null ? null : url.toString(),
urlTls == null ? null : urlTls.toString(), authoritativeRedirect));
}
} else {
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(),
lookupData.getPulsarServiceUrlTls(), authoritativeRedirect));
}
} else {
lookupFuture.completeExceptionally(new MetadataStoreException.NotFoundException(path));
}
}).exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
lookupFuture.completeExceptionally(e);
}
return lookupFuture;
}
public boolean isBrokerActive(String candidateBroker) {
Set<String> availableBrokers = getAvailableBrokers();
if (availableBrokers.contains(candidateBroker)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} is available for.", candidateBroker);
}
return true;
} else {
LOG.warn("Broker {} couldn't be found in available brokers {}",
candidateBroker, String.join(",", availableBrokers));
return false;
}
}
private Set<String> getAvailableBrokers() {
try {
return loadManager.get().getAvailableBrokers();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Helper function to encapsulate the logic to invoke between old and new load manager.
*
* @param serviceUnit the service unit
* @return the least loaded broker addresses
* @throws Exception if an error occurs
*/
private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
if (leastLoadedBroker.isEmpty()) {
LOG.warn("No broker is available for {}", serviceUnit);
return Optional.empty();
}
String lookupAddress = leastLoadedBroker.get().getResourceId();
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getBrokerId(),
lookupAddress);
}
return Optional.of(lookupAddress);
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
return unloadNamespaceBundle(bundle, Optional.empty());
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle, Optional<String> destinationBroker) {
// unload namespace bundle
return unloadNamespaceBundle(bundle, destinationBroker,
config.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, destinationBroker, timeout, timeoutUnit, true);
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout, timeoutUnit, true);
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
return unloadNamespaceBundle(bundle, Optional.empty(), timeout,
timeoutUnit, closeWithoutWaitingClientDisconnect);
}
public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
Optional<String> destinationBroker,
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
if (ob == null) {
return FutureUtil.failedFuture(new IllegalStateException("Bundle " + bundle + " is not currently owned"));
} else {
return ob.handleUnloadRequest(pulsar, timeout, timeoutUnit, closeWithoutWaitingClientDisconnect);
}
}
public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
}
public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnitsAsync()
.thenApply(OwnedServiceUnits -> OwnedServiceUnits.stream()
.collect(Collectors.toMap(NamespaceBundle::toString,
bundle -> getNamespaceOwnershipStatus(true,
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceObject())))));
}
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
return FutureUtil.waitForAll(futures)
.thenApply(__ -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toMap(bundle -> bundle.getNamespaceBundle().toString(),
bundle -> getNamespaceOwnershipStatus(bundle.isActive(),
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceBundle().getNamespaceObject()))
))
);
});
}
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(boolean isActive,
NamespaceIsolationPolicy nsIsolationPolicy) {
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false,
isActive);
if (nsIsolationPolicy == null) {
// no matching policy found, this namespace must be an uncontrolled one and using shared broker
return nsOwnedStatus;
}
// found corresponding policy, set the status to controlled
nsOwnedStatus.is_controlled = true;
if (nsIsolationPolicy.isPrimaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.primary;
} else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.secondary;
}
return nsOwnedStatus;
}
public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
try {
// Does ZooKeeper say that the namespace is disabled?
CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture = ownershipCache.getOwnerAsync(bundle);
if (nsDataFuture != null) {
Optional<NamespaceEphemeralData> nsData = nsDataFuture.getNow(null);
if (nsData != null && nsData.isPresent()) {
return nsData.get().isDisabled();
} else {
return false;
}
} else {
// if namespace is not owned, it is not considered disabled
return false;
}
} catch (Exception e) {
LOG.warn("Exception in getting ownership info for service unit {}: {}", bundle, e.getMessage(), e);
}
return false;
}
/**
* 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update
* policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache.
* <p>
* It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry "retryTimes".
*
* @param bundle the bundle to split
* @param unload whether to unload the new split bundles
* @param splitAlgorithm the algorithm to split the bundle
* @param boundaries the boundaries to split the bundle
* @return a future that will complete when the bundle is split and owned
*/
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
}
final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);
splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture, splitAlgorithm, boundaries);
return unloadFuture;
}
void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
if (ex == null) {
if (splitBoundaries == null || splitBoundaries.size() == 0) {
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange());
completionFuture.complete(null);
return;
}
try {
bundleFactory.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries)
.thenAccept(splitBundles -> {
// Split and updateNamespaceBundles. Update may fail because of concurrent write to
// Zookeeper.
if (splitBundles == null) {
String msg = format("bundle %s not found under namespace", bundle.toString());
LOG.warn(msg);
updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
return;
}
Objects.requireNonNull(splitBundles.getLeft());
Objects.requireNonNull(splitBundles.getRight());
checkArgument(splitBundles.getRight().size() == splitBoundaries.size() + 1,
"bundle has to be split in " + (splitBoundaries.size() + 1) + " bundles");
NamespaceName nsname = bundle.getNamespaceObject();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, bundles: {}",
nsname.toString(), bundle.getBundleRange(), counter.get(),
splitBundles.getRight());
}
try {
// take ownership of newly split bundles
for (NamespaceBundle sBundle : splitBundles.getRight()) {
Objects.requireNonNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
updateNamespaceBundles(nsname, splitBundles.getLeft()).thenCompose(__ ->
updateNamespaceBundlesForPolicies(nsname, splitBundles.getLeft()))
.thenRun(() -> {
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());
updateFuture.complete(splitBundles.getRight());
}).exceptionally(ex1 -> {
String msg = format("failed to update namespace policies [%s], "
+ "NamespaceBundle: %s due to %s",
nsname.toString(), bundle.getBundleRange(), ex1.getMessage());
LOG.warn(msg);
updateFuture.completeExceptionally(
new ServiceUnitNotReadyException(msg, ex1.getCause()));
return null;
});
} catch (Exception e) {
String msg = format(
"failed to acquire ownership of split bundle for namespace [%s], %s",
nsname.toString(), e.getMessage());
LOG.warn(msg, e);
updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg, e));
}
});
} catch (Exception e) {
updateFuture.completeExceptionally(e);
}
} else {
updateFuture.completeExceptionally(ex);
}
// If success updateNamespaceBundles, then do invalidateBundleCache and unload.
// Else retry splitAndOwnBundleOnceAndRetry.
updateFuture.whenCompleteAsync((r, t)-> {
if (t != null) {
// retry several times on BadVersion
if ((t.getCause() instanceof MetadataStoreException.BadVersionException)
&& (counter.decrementAndGet() >= 0)) {
pulsar.getExecutor().schedule(() -> pulsar.getOrderedExecutor()
.execute(() -> splitAndOwnBundleOnceAndRetry(
bundle, unload, counter, completionFuture, splitAlgorithm, boundaries)),
100, MILLISECONDS);
} else if (t instanceof IllegalArgumentException) {
completionFuture.completeExceptionally(t);
} else {
// Retry enough, or meet other exception
String msg2 = format(" %s not success update nsBundles, counter %d, reason %s",
bundle.toString(), counter.get(), t.getMessage());
LOG.warn(msg2);
completionFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
}
return;
}
// success updateNamespaceBundles
// disable old bundle in memory
getOwnershipCache().updateBundleState(bundle, false)
.thenRun(() -> {
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();
// release old bundle from ownership cache
pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
completionFuture.complete(null);
if (unload) {
// Unload new split bundles, in background. This will not
// affect the split operation which is already safely completed
r.forEach(this::unloadNamespaceBundle);
}
onNamespaceBundleSplit(bundle);
})
.exceptionally(e -> {
String msg1 = format(
"failed to disable bundle %s under namespace [%s] with error %s",
bundle.getNamespaceObject().toString(), bundle, ex.getMessage());
LOG.warn(msg1, e);
completionFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
return null;
});
}, pulsar.getOrderedExecutor());
});
}
/**
* Get the split boundary's.
*
* @param bundle The bundle to split.
* @param boundaries The specified positions,
* use for {@link org.apache.pulsar.common.naming.SpecifiedPositionsBundleSplitAlgorithm}.
* @return A pair, left is target namespace bundle, right is split bundles.
*/
public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplitBoundary(
NamespaceBundle bundle, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm, List<Long> boundaries) {
CompletableFuture<List<Long>> splitBoundary = getSplitBoundary(bundle, boundaries, nsBundleSplitAlgorithm);
return splitBoundary.thenCompose(splitBoundaries -> {
if (splitBoundaries == null || splitBoundaries.size() == 0) {
LOG.info("[{}] No valid boundary found in {} to split bundle {}",
bundle.getNamespaceObject().toString(), boundaries, bundle.getBundleRange());
return CompletableFuture.completedFuture(null);
}
return pulsar.getNamespaceService().getNamespaceBundleFactory()
.splitBundles(bundle, splitBoundaries.size() + 1, splitBoundaries);
});
}
public CompletableFuture<List<Long>> getSplitBoundary(
NamespaceBundle bundle, List<Long> boundaries, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
}
private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
List<Long> boundaries,
ServiceConfiguration config) {
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
config.getLoadBalancerNamespaceBundleMaxMsgRate(),
config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
config.getFlowOrQpsDifferenceThresholdPercentage());
} else {
bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
}
return bundleSplitOption;
}
public NamespaceBundleSplitAlgorithm getNamespaceBundleSplitAlgorithmByName(String algorithmName) {
NamespaceBundleSplitAlgorithm algorithm = NamespaceBundleSplitAlgorithm.of(algorithmName);
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.of(pulsar.getConfig().getDefaultNamespaceBundleSplitAlgorithm());
}
if (algorithm == null) {
algorithm = NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO;
}
return algorithm;
}
/**
* Update new bundle-range to admin/policies/namespace.
* Update may fail because of concurrent write to Zookeeper.
*
* @param nsname the namespace name
* @param nsBundles the new namespace bundles
*/
public CompletableFuture<Void> updateNamespaceBundlesForPolicies(NamespaceName nsname,
NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
Objects.requireNonNull(nsBundles);
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(nsname).thenCompose(policies -> {
if (policies.isPresent()) {
return pulsar.getPulsarResources().getNamespaceResources().setPoliciesAsync(nsname, oldPolicies -> {
oldPolicies.bundles = nsBundles.getBundlesData();
return oldPolicies;
});
} else {
LOG.error("Policies of namespace {} is not exist!", nsname);
Policies newPolicies = new Policies();
newPolicies.bundles = nsBundles.getBundlesData();
return pulsar.getPulsarResources().getNamespaceResources().createPoliciesAsync(nsname, newPolicies);
}
});
}
/**
* Update new bundle-range to LocalZk (create a new node if not present).
* Update may fail because of concurrent write to Zookeeper.
*
* @param nsname the namespace name
* @param nsBundles the new namespace bundles
*/
public CompletableFuture<Void> updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles) {
Objects.requireNonNull(nsname);
Objects.requireNonNull(nsBundles);
LocalPolicies localPolicies = nsBundles.toLocalPolicies();
return pulsar.getPulsarResources().getLocalPolicies()
.setLocalPoliciesWithVersion(nsname, localPolicies, nsBundles.getVersion());
}
public OwnershipCache getOwnershipCache() {
return ownershipCache;
}
public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
try {
return extensibleLoadManager.getOwnedServiceUnitsAsync()
.get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
}
public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
return isServiceUnitOwnedAsync(suName).get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
}
public CompletableFuture<Boolean> isServiceUnitOwnedAsync(ServiceUnitId suName) {
if (suName instanceof TopicName) {
return isTopicOwnedAsync((TopicName) suName);
}
if (suName instanceof NamespaceName) {
return isNamespaceOwnedAsync((NamespaceName) suName);
}
if (suName instanceof NamespaceBundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().checkOwnershipAsync(Optional.empty(), suName);
}
// TODO: Add unit tests cover it.
return CompletableFuture.completedFuture(
ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName));
}
return FutureUtil.failedFuture(
new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName()));
}
/**
* @deprecated This method is only used in test now.
*/
@Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
return isServiceUnitActiveAsync(topicName).get(pulsar.getConfig()
.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Unable to find OwnedBundle for topic in time - [{}]", topicName, e);
throw new RuntimeException(e);
}
}
public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
return getBundleAsync(topicName).thenCompose(bundle -> {
Optional<CompletableFuture<OwnedBundle>> optionalFuture = ownershipCache.getOwnedBundleAsync(bundle);
if (optionalFuture.isEmpty()) {
return CompletableFuture.completedFuture(false);
}
return optionalFuture.get().thenApply(ob -> ob != null && ob.isActive());
});
}
private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getFullBundleAsync(fqnn)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
}
return getFullBundleAsync(fqnn)
.thenApply(bundle -> ownershipCache.getOwnedBundle(bundle) != null);
}
private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topic)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
}
return getBundleAsync(topic).thenApply(ownershipCache::isNamespaceBundleOwned);
}
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
// TODO: Add unit tests cover it.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
return getBundleAsync(topicName)
.thenCompose(ownershipCache::checkOwnershipAsync);
}
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
return future.thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
}
public void onNamespaceBundleOwned(NamespaceBundle bundle) {
for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
notifyNamespaceBundleOwnershipListener(bundle, bundleOwnedListener);
}
}
public void onNamespaceBundleUnload(NamespaceBundle bundle) {
for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
try {
if (bundleOwnedListener.test(bundle)) {
bundleOwnedListener.unLoad(bundle);
}
} catch (Throwable t) {
LOG.error("Call bundle {} ownership lister error", bundle, t);
}
}
}
public void onNamespaceBundleSplit(NamespaceBundle bundle) {
for (NamespaceBundleSplitListener bundleSplitListener : bundleSplitListeners) {
try {
if (bundleSplitListener.test(bundle)) {
bundleSplitListener.onSplit(bundle);
}
} catch (Throwable t) {
LOG.error("Call bundle {} split listener {} error", bundle, bundleSplitListener, t);
}
}
}
public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... listeners) {
Objects.requireNonNull(listeners);
for (NamespaceBundleOwnershipListener listener : listeners) {
if (listener != null) {
bundleOwnershipListeners.add(listener);
}
}
getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners));
}
public void addNamespaceBundleSplitListener(NamespaceBundleSplitListener... listeners) {
Objects.requireNonNull(listeners);
for (NamespaceBundleSplitListener listener : listeners) {
if (listener != null) {
bundleSplitListeners.add(listener);
}
}
}
private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
NamespaceBundleOwnershipListener... listeners) {
if (listeners != null) {
for (NamespaceBundleOwnershipListener listener : listeners) {
try {
if (listener.test(bundle)) {
listener.onLoad(bundle);
}
} catch (Throwable t) {
LOG.error("Call bundle {} ownership lister error", bundle, t);
}
}
}
}
public NamespaceBundleFactory getNamespaceBundleFactory() {
return bundleFactory;
}
public ServiceUnitId getServiceUnitId(TopicName topicName) throws Exception {
return getBundle(topicName);
}
public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) {
return getListOfPersistentTopics(namespaceName)
.thenCombine(getListOfNonPersistentTopics(namespaceName),
ListUtils::union);
}
public CompletableFuture<List<String>> getFullListOfPartitionedTopic(NamespaceName namespaceName) {
NamespaceResources.PartitionedTopicResources partitionedTopicResources =
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
return partitionedTopicResources.listPartitionedTopicsAsync(namespaceName, TopicDomain.persistent)
.thenCombine(partitionedTopicResources
.listPartitionedTopicsAsync(namespaceName, TopicDomain.non_persistent),
ListUtils::union);
}
public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(NamespaceBundle bundle) {
return getFullListOfTopics(bundle.getNamespaceObject()).thenCompose(topics ->
CompletableFuture.completedFuture(
topics.stream()
.filter(topic -> bundle.includes(TopicName.get(topic)))
.collect(Collectors.toList())))
.thenCombine(getAllPartitions(bundle.getNamespaceObject()).thenCompose(topics ->
CompletableFuture.completedFuture(
topics.stream().filter(topic -> bundle.includes(TopicName.get(topic)))
.collect(Collectors.toList()))), (left, right) -> {
for (String topic : right) {
if (!left.contains(topic)) {
left.add(topic);
}
}
return left;
});
}
public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
if (topic.isPersistent()) {
if (topic.isPartitioned()) {
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
.thenCompose(metadata -> {
// Allow creating the non-partitioned persistent topic that name includes `-partition-`
if (metadata.partitions == 0
|| topic.getPartitionIndex() < metadata.partitions) {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
return CompletableFuture.completedFuture(false);
});
} else {
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
}
} else {
if (topic.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topic.getPartitionedTopicName());
return pulsar.getBrokerService()
.fetchPartitionedTopicMetadataAsync(partitionedTopicName)
.thenApply((metadata) -> topic.getPartitionIndex() < metadata.partitions);
} else {
// only checks and don't do any topic creating and loading.
CompletableFuture<Optional<Topic>> topicFuture =
pulsar.getBrokerService().getTopics().get(topic.toString());
if (topicFuture == null) {
return CompletableFuture.completedFuture(false);
} else {
return topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> {
LOG.warn("[{}] topicFuture completed with exception when checkTopicExists, {}",
topic, throwable.getMessage());
return false;
});
}
}
}
}
public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
switch (mode) {
case ALL:
return getFullListOfTopics(namespaceName);
case NON_PERSISTENT:
return getListOfNonPersistentTopics(namespaceName);
case PERSISTENT:
default:
return getListOfPersistentTopics(namespaceName);
}
}
public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
return getPartitions(namespaceName, TopicDomain.persistent)
.thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent),
ListUtils::union);
}
public CompletableFuture<List<String>> getPartitions(NamespaceName namespaceName, TopicDomain topicDomain) {
if (LOG.isDebugEnabled()) {
LOG.debug("Getting children from partitioned-topics now: {} - {}", namespaceName, topicDomain);
}
return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.listPartitionedTopicsAsync(namespaceName, topicDomain)
.thenCompose(topics -> {
CompletableFuture<List<String>> result = new CompletableFuture<>();
List<String> resultPartitions = Collections.synchronizedList(new ArrayList<>());
if (CollectionUtils.isNotEmpty(topics)) {
List<CompletableFuture<List<String>>> futures = new ArrayList<>();
for (String topic : topics) {
CompletableFuture<List<String>> future = getPartitionsForTopic(TopicName.get(topic));
futures.add(future);
future.thenAccept(resultPartitions::addAll);
}
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
result.complete(resultPartitions);
}
});
} else {
result.complete(resultPartitions);
}
return result;
});
}
private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicName) {
return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(meta -> {
List<String> result = new ArrayList<>();
for (int i = 0; i < meta.partitions; i++) {
result.add(topicName.getPartition(i).toString());
}
return CompletableFuture.completedFuture(result);
});
}
/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
}
public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName, true)
.thenCompose(peerClusterData -> {
// if peer-cluster-data is present it means namespace is owned by that peer-cluster and request
// should redirect to the peer-cluster
if (peerClusterData != null) {
return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
} else {
// Non-persistent topics don't have managed ledgers. So we have to retrieve them from local
// cache.
List<String> topics = new ArrayList<>();
synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
if (pulsar.getBrokerService().getMultiLayerTopicMap()
.containsKey(namespaceName.toString())) {
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString())
.forEach((__, bundle) -> bundle.forEach((topicName, topic) -> {
if (topic instanceof NonPersistentTopic
&& ((NonPersistentTopic) topic).isActive()) {
topics.add(topicName);
}
}));
}
}
topics.sort(null);
return CompletableFuture.completedFuture(topics);
}
});
}
private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterDataImpl peerClusterData,
NamespaceName namespace) {
PulsarClientImpl client = getNamespaceClient(peerClusterData);
return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT, null, null)
.thenApply(GetTopicsResult::getTopics);
}
public PulsarClientImpl getNamespaceClient(ClusterDataImpl cluster) {
PulsarClientImpl client = namespaceClients.get(cluster);
if (client != null) {
return client;
}
return namespaceClients.computeIfAbsent(cluster, key -> {
try {
ClientBuilder clientBuilder = PulsarClient.builder()
.memoryLimit(0, SizeUnit.BYTES)
.enableTcpNoDelay(false)
.statsInterval(0, TimeUnit.SECONDS);
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
clientBuilder.loadConf(PropertiesUtils.filterAndMapProperties(config.getProperties(), "brokerClient_"));
// Disabled auto release useless connection.
clientBuilder.connectionMaxIdleSeconds(-1);
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
if (pulsar.getConfiguration().isTlsEnabled()) {
clientBuilder
.serviceUrl(isNotBlank(cluster.getBrokerServiceUrlTls())
? cluster.getBrokerServiceUrlTls() : cluster.getServiceUrlTls())
.enableTls(true)
.tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection())
.enableTlsHostnameVerification(pulsar.getConfiguration().isTlsHostnameVerificationEnabled());
} else {
clientBuilder.serviceUrl(isNotBlank(cluster.getBrokerServiceUrl())
? cluster.getBrokerServiceUrl() : cluster.getServiceUrl());
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return pulsar.createClientImpl(conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
.thenCompose(lookupData -> lookupData
.map(brokerLookupData ->
CompletableFuture.completedFuture(Optional.of(brokerLookupData.toNamespaceEphemeralData())))
.orElseGet(() -> CompletableFuture.completedFuture(Optional.empty())));
}
return ownershipCache.getOwnerAsync(bundle);
}
public boolean checkOwnershipPresent(NamespaceBundle bundle) throws Exception {
return checkOwnershipPresentAsync(bundle).get(pulsar.getConfiguration()
.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
}
public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
}
return getOwnerAsync(bundle).thenApply(Optional::isPresent);
}
public void unloadSLANamespace() throws Exception {
NamespaceName namespaceName = getSLAMonitorNamespace(pulsar.getBrokerId(), config);
LOG.info("Checking owner for SLA namespace {}", namespaceName);
NamespaceBundle nsFullBundle = getFullBundle(namespaceName);
if (!checkOwnershipPresent(nsFullBundle)) {
// No one owns the namespace so no point trying to unload it
// Next lookup will assign the bundle to this broker.
return;
}
LOG.info("Trying to unload SLA namespace {}", namespaceName);
PulsarAdmin adminClient = pulsar.getAdminClient();
adminClient.namespaces().unload(namespaceName.toString());
LOG.info("Namespace {} unloaded successfully", namespaceName);
}
public static NamespaceName getHeartbeatNamespace(String lookupBroker, ServiceConfiguration config) {
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
}
public static NamespaceName getHeartbeatNamespaceV2(String lookupBroker, ServiceConfiguration config) {
return NamespaceName.get(String.format(HEARTBEAT_NAMESPACE_FMT_V2, lookupBroker));
}
public static NamespaceName getSLAMonitorNamespace(String lookupBroker, ServiceConfiguration config) {
return NamespaceName.get(String.format(SLA_NAMESPACE_FMT, config.getClusterName(), lookupBroker));
}
public static String checkHeartbeatNamespace(ServiceUnitId ns) {
Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
LOG.debug("Heartbeat namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
return m.group(1);
} else {
return null;
}
}
public static String checkHeartbeatNamespaceV2(ServiceUnitId ns) {
Matcher m = HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
LOG.debug("Heartbeat namespace v2 matched the lookup namespace {}", ns.getNamespaceObject().toString());
return m.group(1);
} else {
return null;
}
}
public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
return m.group(1);
} else {
return null;
}
}
public static boolean isSystemServiceNamespace(String namespace) {
return SYSTEM_NAMESPACE.toString().equals(namespace)
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
}
/**
* used for filtering bundles in special namespace.
* @param namespace the namespace name
* @return True if namespace is HEARTBEAT_NAMESPACE or SLA_NAMESPACE
*/
public static boolean isSLAOrHeartbeatNamespace(String namespace) {
return SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
}
public static boolean isHeartbeatNamespace(ServiceUnitId ns) {
String namespace = ns.getNamespaceObject().toString();
return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches();
}
public boolean registerSLANamespace() throws PulsarServerException {
String brokerId = pulsar.getBrokerId();
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(brokerId, config), false);
if (isNameSpaceRegistered) {
if (LOG.isDebugEnabled()) {
LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
getSLAMonitorNamespace(brokerId, config));
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("SLA Monitoring not owned by the broker: ns={}",
getSLAMonitorNamespace(brokerId, config));
}
return isNameSpaceRegistered;
}
@Override
public void close() {
namespaceClients.forEach((cluster, client) -> {
try {
client.shutdown();
} catch (PulsarClientException e) {
LOG.warn("Error shutting down namespace client for cluster {}", cluster, e);
}
});
}
}