blob: daedb712e299cad4e7546d5c759f07b7111dd15d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides a cache service for all the service unit ownership among the brokers. It provide a cache service
* as well as MetadataStore read/write functions for a) lookup of a service unit ownership to a broker; b) take
* ownership of a service unit by the local broker
*/
@Slf4j
public class OwnershipCache {
private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);
/**
* The local broker URL that this <code>OwnershipCache</code> will set as owner.
*/
private final String ownerBrokerUrl;
/**
* The local broker URL that this <code>OwnershipCache</code> will set as owner.
*/
private final String ownerBrokerUrlTls;
/**
* The NamespaceEphemeralData objects that can be associated with the current owner.
*/
private NamespaceEphemeralData selfOwnerInfo;
/**
* The NamespaceEphemeralData objects that can be associated with the current owner, when the broker is disabled.
*/
private final NamespaceEphemeralData selfOwnerInfoDisabled;
private final LockManager<NamespaceEphemeralData> lockManager;
private final Map<NamespaceBundle, ResourceLock> locallyAcquiredLocks;
/**
* The loading cache of locally owned <code>NamespaceBundle</code> objects.
*/
private final AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache;
/**
* The <code>NamespaceBundleFactory</code> to construct <code>NamespaceBundles</code>.
*/
private final NamespaceBundleFactory bundleFactory;
/**
* The <code>NamespaceService</code> which using <code>OwnershipCache</code>.
*/
private final NamespaceService namespaceService;
private final PulsarService pulsar;
private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader<NamespaceBundle, OwnedBundle> {
@Override
public CompletableFuture<OwnedBundle> asyncLoad(NamespaceBundle namespaceBundle, Executor executor) {
return lockManager.acquireLock(ServiceUnitUtils.path(namespaceBundle), selfOwnerInfo)
.thenApply(rl -> {
locallyAcquiredLocks.put(namespaceBundle, rl);
rl.getLockExpiredFuture()
.thenRun(() -> {
log.info("Resource lock for {} has expired", rl.getPath());
namespaceService.unloadNamespaceBundle(namespaceBundle);
ownedBundlesCache.synchronous().invalidate(namespaceBundle);
namespaceService.onNamespaceBundleUnload(namespaceBundle);
});
return new OwnedBundle(namespaceBundle);
});
}
}
/**
* Constructor of <code>OwnershipCache</code>.
*
* the local broker URL that will be set as owner for the <code>ServiceUnit</code>
*/
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory,
NamespaceService namespaceService) {
this.namespaceService = namespaceService;
this.pulsar = pulsar;
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
false, pulsar.getAdvertisedListeners());
this.selfOwnerInfoDisabled = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
pulsar.getSafeWebServiceAddress(), pulsar.getWebServiceAddressTls(),
true, pulsar.getAdvertisedListeners());
this.bundleFactory = bundleFactory;
this.lockManager = pulsar.getCoordinationService().getLockManager(NamespaceEphemeralData.class);
this.locallyAcquiredLocks = new ConcurrentHashMap<>();
// ownedBundlesCache contains all namespaces that are owned by the local broker
this.ownedBundlesCache = Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.recordStats()
.buildAsync(new OwnedServiceUnitCacheLoader());
CacheMetricsCollector.CAFFEINE.addCache("owned-bundles", this.ownedBundlesCache);
}
/**
* Check whether this broker owns given namespace bundle.
*
* @param bundle namespace bundle
* @return future that will complete with check result
*/
public boolean checkOwnership(NamespaceBundle bundle) {
return getOwnedBundle(bundle) != null;
}
/**
* Method to get the current owner of the <code>ServiceUnit</code>.
*
* @param suName
* name of the <code>ServiceUnit</code>
* @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code>
* @throws Exception
* throws exception if no ownership info is found
*/
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle suName) {
CompletableFuture<OwnedBundle> ownedBundleFuture = ownedBundlesCache.getIfPresent(suName);
if (ownedBundleFuture != null) {
// Either we're the owners or we're trying to become the owner.
return ownedBundleFuture.thenApply(serviceUnit -> {
// We are the owner of the service unit
return Optional.of(serviceUnit.isActive() ? selfOwnerInfo : selfOwnerInfoDisabled);
});
}
// If we're not the owner, we need to check if anybody else is
String path = ServiceUnitUtils.path(suName);
return lockManager.readLock(path);
}
/**
* Method to get the current owner of the <code>ServiceUnit</code> or set the local broker as the owner if absent.
*
* @param bundle
* the <code>NamespaceBundle</code>
* @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code>
* @throws Exception
*/
public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(NamespaceBundle bundle) throws Exception {
if (!refreshSelfOwnerInfo()) {
return FutureUtil.failedFuture(
new RuntimeException("Namespace service does not ready for acquiring ownership"));
}
LOG.info("Trying to acquire ownership of {}", bundle);
// Doing a get() on the ownedBundlesCache will trigger an async metatada write to acquire the lock over the
// service unit
return ownedBundlesCache.get(bundle)
.thenApply(namespaceBundle -> {
LOG.info("Successfully acquired ownership of {}", namespaceBundle);
namespaceService.onNamespaceBundleOwned(bundle);
return selfOwnerInfo;
});
}
/**
* Method to remove the ownership of local broker on the <code>NamespaceBundle</code>, if owned.
*
*/
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
ResourceLock<NamespaceEphemeralData> lock = locallyAcquiredLocks.get(bundle);
if (lock == null) {
// We don't own the specified bundle anymore
return CompletableFuture.completedFuture(null);
}
return lock.release();
}
/**
* Method to remove ownership of all owned bundles.
*
* @param bundles
* <code>NamespaceBundles</code> to remove from ownership cache
*/
public CompletableFuture<Void> removeOwnership(NamespaceBundles bundles) {
List<CompletableFuture<Void>> allFutures = Lists.newArrayList();
for (NamespaceBundle bundle : bundles.getBundles()) {
if (getOwnedBundle(bundle) == null) {
// continue
continue;
}
allFutures.add(this.removeOwnership(bundle));
}
return FutureUtil.waitForAll(allFutures);
}
/**
* Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker.
*
* @return a map of owned <code>ServiceUnit</code> objects
*/
public Map<NamespaceBundle, OwnedBundle> getOwnedBundles() {
return this.ownedBundlesCache.synchronous().asMap();
}
/**
* Checked whether a particular bundle is currently owned by this broker.
*
* @param bundle
* @return
*/
public boolean isNamespaceBundleOwned(NamespaceBundle bundle) {
OwnedBundle ownedBundle = getOwnedBundle(bundle);
return ownedBundle != null && ownedBundle.isActive();
}
/**
* Return the {@link OwnedBundle} instance from the local cache. Does not block.
*
* @param bundle
* @return
*/
public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(bundle);
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
try {
return future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
} else {
return null;
}
}
/**
* Disable bundle in local cache and on zk.
*
* @param bundle
* @throws Exception
*/
public CompletableFuture<Void> disableOwnership(NamespaceBundle bundle) {
return updateBundleState(bundle, false)
.thenCompose(__ -> {
ResourceLock<NamespaceEphemeralData> lock = locallyAcquiredLocks.get(bundle);
if (lock == null) {
return CompletableFuture.completedFuture(null);
} else {
return lock.updateValue(selfOwnerInfoDisabled);
}
});
}
/**
* Update bundle state in a local cache.
*
* @param bundle
* @throws Exception
*/
public CompletableFuture<Void> updateBundleState(NamespaceBundle bundle, boolean isActive) {
// Disable owned instance in local cache
CompletableFuture<OwnedBundle> f = ownedBundlesCache.getIfPresent(bundle);
if (f != null && f.isDone() && !f.isCompletedExceptionally()) {
return f.thenAccept(ob -> ob.setActive(isActive));
} else {
return CompletableFuture.completedFuture(null);
}
}
public void invalidateLocalOwnerCache() {
this.ownedBundlesCache.synchronous().invalidateAll();
}
public synchronized boolean refreshSelfOwnerInfo() {
this.selfOwnerInfo = new NamespaceEphemeralData(pulsar.getBrokerServiceUrl(),
pulsar.getBrokerServiceUrlTls(), pulsar.getSafeWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false, pulsar.getAdvertisedListeners());
return selfOwnerInfo.getNativeUrl() != null || selfOwnerInfo.getNativeUrlTls() != null;
}
}