blob: c14ce263ec42e7254ad6f9aa6361638358286c31 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.namespace;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static java.lang.String.format;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import java.net.URI;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.common.lookup.data.LookupData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.NamespaceIsolationPolicy;
import com.yahoo.pulsar.common.policies.data.BrokerAssignment;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.policies.data.LocalPolicies;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
* for the <code>PulsarService</code>.
* <p/>
* The <code>PulsarService</code> relies on this service for resource ownership operations.
* <p/>
* The focus of this phase is to bring up the system and be able to iterate and improve the services effectively.
* <p/>
*
* @see com.yahoo.pulsar.broker.PulsarService
*/
public class NamespaceService {
public enum AddressType {
BROKER_URL, LOOKUP_URL
}
private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
private final ServiceConfiguration config;
private final AtomicReference<LoadManager> loadManager;
private final PulsarService pulsar;
private final OwnershipCache ownershipCache;
private final NamespaceBundleFactory bundleFactory;
private int uncountedNamespaces;
private final String host;
public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";
/**
* Default constructor.
*
* @throws PulsarServerException
*/
public NamespaceService(PulsarService pulsar) {
this.pulsar = pulsar;
host = pulsar.getAdvertisedAddress();
this.config = pulsar.getConfiguration();
this.loadManager = pulsar.getLoadManager();
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getBrokerServiceUrl());
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
}
public CompletableFuture<LookupResult> getBrokerServiceUrlAsync(DestinationName topic, boolean authoritative) {
return getBundleAsync(topic)
.thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */));
}
public CompletableFuture<NamespaceBundle> getBundleAsync(DestinationName topic) {
return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
.thenApply(bundles -> bundles.findBundle(topic));
}
public NamespaceBundle getBundle(DestinationName destination) throws Exception {
return bundleFactory.getBundles(destination.getNamespaceObject()).findBundle(destination);
}
public int getBundleCount(NamespaceName namespace) throws Exception {
return bundleFactory.getBundles(namespace).size();
}
private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
return bundleFactory.getFullBundle(fqnn);
}
private static final Deserializer<ServiceLookupData> serviceLookupDataDeserializer = (key, content) ->
jsonMapper().readValue(content, ServiceLookupData.class);
public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
DestinationName name = (DestinationName) suName;
if (LOG.isDebugEnabled()) {
LOG.debug("Getting web service URL of destination: {} - auth: {}", name, authoritative);
}
return this.internalGetWebServiceUrl(getBundle(name), authoritative, isRequestHttps, readOnly).get();
}
if (suName instanceof NamespaceName) {
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, isRequestHttps, readOnly).get();
}
if (suName instanceof NamespaceBundle) {
return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, isRequestHttps, readOnly).get();
}
throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
}
private CompletableFuture<URL> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean isRequestHttps, boolean readOnly) {
return findBrokerServiceUrl(bundle, authoritative, readOnly).thenApply(lookupResult -> {
if (lookupResult != null) {
try {
LookupData lookupData = lookupResult.getLookupData();
final String redirectUrl = isRequestHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return new URL(redirectUrl);
} catch (Exception e) {
// just log the exception, nothing else to do
LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
}
}
return null;
});
}
/**
* Register all the bootstrap name spaces including the heartbeat namespace
*
* @return
* @throws PulsarServerException
*/
public void registerBootstrapNamespaces() throws PulsarServerException {
// ensure that we own the heartbeat namespace
if (registerNamespace(getHeartbeatNamespace(host, config), true)) {
this.uncountedNamespaces++;
LOG.info("added heartbeat namespace name in local cache: ns={}", getHeartbeatNamespace(host, config));
}
// we may not need strict ownership checking for bootstrap names for now
for (String namespace : config.getBootstrapNamespaces()) {
if (registerNamespace(namespace, false)) {
LOG.info("added bootstrap namespace name in local cache: ns={}", namespace);
}
}
}
/**
* Tried to registers a namespace to this instance
*
* @param namespace
* @param ensureOwned
* @return
* @throws PulsarServerException
* @throws Exception
*/
private boolean registerNamespace(String namespace, boolean ensureOwned) throws PulsarServerException {
String myUrl = pulsar.getBrokerServiceUrl();
try {
NamespaceName nsname = new NamespaceName(namespace);
// [Bug 6504511] Enable writing with JSON format
String otherUrl = null;
NamespaceBundle nsFullBundle = null;
// all pre-registered namespace is assumed to have bundles disabled
nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
otherUrl = ownershipCache.tryAcquiringOwnership(nsFullBundle).get().getNativeUrl();
if (myUrl.equals(otherUrl)) {
if (nsFullBundle != null) {
// preload heartbeat namespace
pulsar.loadNamespaceDestinations(nsFullBundle);
}
return true;
}
String msg = String.format("namespace already owned by other broker : ns=%s expected=%s actual=%s",
namespace, myUrl, otherUrl);
// ignore if not be owned for now
if (!ensureOwned) {
LOG.info(msg);
return false;
}
// should not happen
throw new IllegalStateException(msg);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new PulsarServerException(e);
}
}
/**
* Main internal method to lookup and setup ownership of service unit to a broker
*
* @param bundle
* @param authoritative
* @param readOnly
* @return
* @throws PulsarServerException
*/
private CompletableFuture<LookupResult> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean readOnly) {
if (LOG.isDebugEnabled()) {
LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, readOnly);
}
CompletableFuture<LookupResult> future = new CompletableFuture<>();
// First check if we or someone else already owns the bundle
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
if (!nsData.isPresent()) {
// No one owns this bundle
if (readOnly) {
// Do not attempt to acquire ownership
future.completeExceptionally(
new IllegalStateException(String.format("Can't find owner of ServiceUnit: %s", bundle)));
} else {
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
pulsar.getExecutor().execute(() -> {
searchForCandidateBroker(bundle, future, authoritative);
});
}
} else if (nsData.get().isDisabled()) {
future.completeExceptionally(
new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
}
future.complete(new LookupResult(nsData.get()));
}
}).exceptionally(exception -> {
LOG.warn("Failed to check owner for bundle {}: {}", bundle, exception.getMessage(), exception);
future.completeExceptionally(exception);
return null;
});
return future;
}
private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<LookupResult> lookupFuture,
boolean authoritative) {
String candidateBroker = null;
try {
// check if this is Heartbeat or SLAMonitor namespace
candidateBroker = checkHeartbeatNamespace(bundle);
if (candidateBroker == null) {
String broker = getSLAMonitorBrokerName(bundle);
// checking if the broker is up and running
if (broker != null && isBrokerActive(broker)) {
candidateBroker = broker;
}
}
if (candidateBroker == null) {
if (!this.loadManager.get().isCentralized() || pulsar.getLeaderElectionService().isLeader()) {
candidateBroker = getLeastLoadedFromLoadManager(bundle);
} else {
if (authoritative) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getWebServiceAddress();
} else {
// forward to leader broker to make assignment
candidateBroker = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
}
}
}
} catch (Exception e) {
LOG.warn("Error when searching for candidate broker to acquire {}: {}", bundle, e.getMessage(), e);
lookupFuture.completeExceptionally(e);
return;
}
try {
checkNotNull(candidateBroker);
if (pulsar.getWebServiceAddress().equals(candidateBroker)) {
// Load manager decided that the local broker should try to become the owner
ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> {
if (ownerInfo.isDisabled()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} is currently being unloaded", bundle);
}
lookupFuture.completeExceptionally(new IllegalStateException(
String.format("Namespace bundle %s is currently being unloaded", bundle)));
} else {
// Found owner for the namespace bundle
// Schedule the task to pre-load destinations
pulsar.loadNamespaceDestinations(bundle);
lookupFuture.complete(new LookupResult(ownerInfo));
}
}).exceptionally(exception -> {
LOG.warn("Failed to acquire ownership for namespace bundle {}: ", bundle, exception.getMessage(),
exception);
lookupFuture.completeExceptionally(new PulsarServerException(
"Failed to acquire ownership for namespace bundle " + bundle, exception));
return null;
});
} else {
// Load managed decider some other broker should try to acquire ownership
if (LOG.isDebugEnabled()) {
LOG.debug("Redirecting to broker {} to acquire ownership of bundle {}", candidateBroker, bundle);
}
// Now setting the redirect url
createLookupResult(candidateBroker).thenAccept(lookupResult -> lookupFuture.complete(lookupResult))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
}
} catch (Exception e) {
LOG.warn("Error in trying to acquire namespace bundle ownership for {}: {}", bundle, e.getMessage(), e);
lookupFuture.completeExceptionally(e);
}
}
private CompletableFuture<LookupResult> createLookupResult(String candidateBroker) throws Exception {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
try {
checkArgument(StringUtils.isNotBlank(candidateBroker), "Lookup broker can't be null " + candidateBroker);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(),
uri.getPort());
pulsar.getLocalZkCache().getDataAsync(path, serviceLookupDataDeserializer).thenAccept(reportData -> {
if (reportData.isPresent()) {
ServiceLookupData lookupData = reportData.get();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(),
lookupData.getPulsarServiceUrlTls()));
} else {
lookupFuture.completeExceptionally(new KeeperException.NoNodeException(path));
}
}).exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
lookupFuture.completeExceptionally(e);
}
return lookupFuture;
}
private boolean isBrokerActive(String candidateBroker) throws KeeperException, InterruptedException {
Set<String> activeNativeBrokers = pulsar.getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
for (String brokerHostPort : activeNativeBrokers) {
if (candidateBroker.equals("http://" + brokerHostPort)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Broker {} found for SLA Monitoring Namespace", brokerHostPort);
}
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Broker not found for SLA Monitoring Namespace {}",
candidateBroker + ":" + config.getWebServicePort());
}
return false;
}
/**
* Helper function to encapsulate the logic to invoke between old and new load manager
*
* @return
* @throws Exception
*/
private String getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
String lookupAddress = loadManager.get().getLeastLoaded(serviceUnit).getResourceId();
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getWebServiceAddress(),
lookupAddress);
}
return lookupAddress;
}
public void unloadNamespace(NamespaceName ns) throws Exception {
NamespaceBundle nsFullBundle = getFullBundle(ns);
unloadNamespaceBundle(nsFullBundle);
}
public void unloadNamespaceBundle(NamespaceBundle bundle) throws Exception {
checkNotNull(ownershipCache.getOwnedBundle(bundle)).handleUnloadRequest(pulsar);
}
public Map<String, NamespaceOwnershipStatus> getOwnedNameSpacesStatus() throws Exception {
NamespaceIsolationPolicies nsIsolationPolicies = this.getLocalNamespaceIsolationPolicies();
Map<String, NamespaceOwnershipStatus> ownedNsStatus = new HashMap<String, NamespaceOwnershipStatus>();
for (OwnedBundle nsObj : this.ownershipCache.getOwnedBundles().values()) {
NamespaceOwnershipStatus nsStatus = this.getNamespaceOwnershipStatus(nsObj,
nsIsolationPolicies.getPolicyByNamespace(nsObj.getNamespaceBundle().getNamespaceObject()));
ownedNsStatus.put(nsObj.getNamespaceBundle().toString(), nsStatus);
}
return ownedNsStatus;
}
private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
NamespaceIsolationPolicy nsIsolationPolicy) {
NamespaceOwnershipStatus nsOwnedStatus = new NamespaceOwnershipStatus(BrokerAssignment.shared, false,
nsObj.isActive());
if (nsIsolationPolicy == null) {
// no matching policy found, this namespace must be an uncontrolled one and using shared broker
return nsOwnedStatus;
}
// found corresponding policy, set the status to controlled
nsOwnedStatus.is_controlled = true;
if (nsIsolationPolicy.isPrimaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.primary;
} else if (nsIsolationPolicy.isSecondaryBroker(pulsar.getAdvertisedAddress())) {
nsOwnedStatus.broker_assignment = BrokerAssignment.secondary;
}
return nsOwnedStatus;
}
private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
String localCluster = pulsar.getConfiguration().getClusterName();
return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache()
.get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> {
// the namespace isolation policies are empty/undefined = an empty object
return new NamespaceIsolationPolicies();
});
}
public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
try {
// Does ZooKeeper says that the namespace is disabled?
CompletableFuture<Optional<NamespaceEphemeralData>> nsDataFuture = ownershipCache.getOwnerAsync(bundle);
if (nsDataFuture != null) {
Optional<NamespaceEphemeralData> nsData = nsDataFuture.getNow(null);
if (nsData != null && nsData.isPresent()) {
return nsData.get().isDisabled();
} else {
return false;
}
} else {
// if namespace is not owned, it is not considered disabled
return false;
}
} catch (Exception e) {
LOG.warn("Exception in getting ownership info for service unit {}: {}", bundle, e.getMessage(), e);
}
return false;
}
/**
* 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update
* policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache
*
* @param bundle
* @return
* @throws Exception
*/
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle) throws Exception {
final CompletableFuture<Void> future = new CompletableFuture<>();
Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = bundleFactory.splitBundles(bundle,
2 /* by default split into 2 */);
if (splittedBundles != null) {
checkNotNull(splittedBundles.getLeft());
checkNotNull(splittedBundles.getRight());
checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles");
NamespaceName nsname = bundle.getNamespaceObject();
try {
// take ownership of newly split bundles
for (NamespaceBundle sBundle : splittedBundles.getRight()) {
checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
updateNamespaceBundles(nsname, splittedBundles.getLeft(),
(rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> {
if (rc == KeeperException.Code.OK.intValue()) {
// disable old bundle
try {
ownershipCache.disableOwnership(bundle);
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(nsname);
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();
future.complete(null);
} catch (Exception e) {
String msg1 = format(
"failed to disable bundle %s under namespace [%s] with error %s",
nsname.toString(), bundle.toString(), e.getMessage());
LOG.warn(msg1, e);
future.completeExceptionally(new ServiceUnitNotReadyException(msg1));
}
} else {
String msg2 = format("failed to update namespace [%s] policies due to %s",
nsname.toString(),
KeeperException.create(KeeperException.Code.get(rc)).getMessage());
LOG.warn(msg2);
future.completeExceptionally(new ServiceUnitNotReadyException(msg2));
}
})));
} catch (Exception e) {
String msg = format("failed to aquire ownership of split bundle for namespace [%s], %s",
nsname.toString(), e.getMessage());
LOG.warn(msg, e);
future.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
} else {
String msg = format("bundle %s not found under namespace", bundle.toString());
future.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
return future;
}
/**
* update new bundle-range to LocalZk (create a new node if not present)
*
* @param nsname
* @param nsBundles
* @param callback
* @throws Exception
*/
private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBundles, StatCallback callback)
throws Exception {
checkNotNull(nsname);
checkNotNull(nsBundles);
String path = joinPath(LOCAL_POLICIES_ROOT, nsname.toString());
Optional<LocalPolicies> policies = pulsar.getLocalZkCacheService().policiesCache().get(path);
if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
this.pulsar.getLocalZkCacheService().createPolicies(path, false).get();
policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
}
policies.get().bundles = getBundlesData(nsBundles);
this.pulsar.getLocalZkCache().getZooKeeper().setData(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null);
}
public OwnershipCache getOwnershipCache() {
return ownershipCache;
}
public int getTotalServiceUnitsLoaded() {
return ownershipCache.getOwnedBundles().size() - this.uncountedNamespaces;
}
public Set<NamespaceBundle> getOwnedServiceUnits() {
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
}
public boolean isServiceUnitOwned(ServiceUnitId suName) throws Exception {
if (suName instanceof DestinationName) {
return isDestinationOwned((DestinationName) suName);
}
if (suName instanceof NamespaceName) {
return isNamespaceOwned((NamespaceName) suName);
}
if (suName instanceof NamespaceBundle) {
return ownershipCache.isNamespaceBundleOwned((NamespaceBundle) suName);
}
throw new IllegalArgumentException("Invalid class of NamespaceBundle: " + suName.getClass().getName());
}
public boolean isServiceUnitActive(DestinationName fqdn) {
try {
return ownershipCache.getOwnedBundle(getBundle(fqdn)).isActive();
} catch (Exception e) {
LOG.warn("Unable to find OwnedBundle for fqdn - [{}]", fqdn.toString());
return false;
}
}
private boolean isNamespaceOwned(NamespaceName fqnn) throws Exception {
return ownershipCache.getOwnedBundle(getFullBundle(fqnn)) != null;
}
private CompletableFuture<Boolean> isDestinationOwnedAsync(DestinationName topic) {
return getBundleAsync(topic).thenApply(bundle -> ownershipCache.isNamespaceBundleOwned(bundle));
}
private boolean isDestinationOwned(DestinationName fqdn) throws Exception {
return ownershipCache.getOwnedBundle(getBundle(fqdn)) != null;
}
public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
ownershipCache.removeOwnership(getFullBundle(nsName)).get();
}
public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle).get();
}
public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get();
}
public NamespaceBundleFactory getNamespaceBundleFactory() {
return bundleFactory;
}
public ServiceUnitId getServiceUnitId(DestinationName destinationName) throws Exception {
return getBundle(destinationName);
}
public List<String> getListOfDestinations(String property, String cluster, String namespace) throws Exception {
List<String> destinations = Lists.newArrayList();
// For every topic there will be a managed ledger created.
try {
String path = String.format("/managed-ledgers/%s/%s/%s/persistent", property, cluster, namespace);
LOG.debug("Getting children from managed-ledgers now: {}", path);
for (String destination : pulsar.getLocalZkCacheService().managedLedgerListCache().get(path)) {
destinations.add(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace,
Codec.decode(destination)));
}
} catch (KeeperException.NoNodeException e) {
// NoNode means there are no persistent topics for this namespace
}
destinations.sort(null);
return destinations;
}
public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
// if there is no znode for the service unit, it is not owned by any broker
return getOwnerAsync(bundle).get();
}
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
return ownershipCache.getOwnerAsync(bundle);
}
public void unloadSLANamespace() throws Exception {
PulsarAdmin adminClient = null;
String namespaceName = getSLAMonitorNamespace(host, config);
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to unload SLA namespace {}", namespaceName);
}
NamespaceBundle nsFullBundle = getFullBundle(new NamespaceName(namespaceName));
if (!getOwner(nsFullBundle).isPresent()) {
// No one owns the namespace so no point trying to unload it
return;
}
adminClient = pulsar.getAdminClient();
adminClient.namespaces().unload(namespaceName);
LOG.debug("Namespace {} unloaded successfully", namespaceName);
}
public static String getHeartbeatNamespace(String host, ServiceConfiguration config) {
return String.format(HEARTBEAT_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort());
}
public static String getSLAMonitorNamespace(String host, ServiceConfiguration config) {
return String.format(SLA_NAMESPACE_FMT, config.getClusterName(), host, config.getWebServicePort());
}
public static String checkHeartbeatNamespace(ServiceUnitId ns) {
Matcher m = HEARTBEAT_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
LOG.debug("SLAMonitoring namespace matched the lookup namespace {}", ns.getNamespaceObject().toString());
return String.format("http://%s", m.group(1));
} else {
return null;
}
}
public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
Matcher m = SLA_NAMESPACE_PATTERN.matcher(ns.getNamespaceObject().toString());
if (m.matches()) {
return String.format("http://%s", m.group(1));
} else {
return null;
}
}
public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
this.uncountedNamespaces++;
if (LOG.isDebugEnabled()) {
LOG.debug("Added SLA Monitoring namespace name in local cache: ns={}",
getSLAMonitorNamespace(host, config));
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("SLA Monitoring not owned by the broker: ns={}", getSLAMonitorNamespace(host, config));
}
return isNameSpaceRegistered;
}
}