blob: c47e99074c527e46ccb1354ebe404352e0ad9fe3 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.namespace;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.util.Map;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundleFactory;
import com.yahoo.pulsar.common.naming.NamespaceBundles;
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;
/**
* This class provides a cache service for all the service unit ownership among the brokers. It provide a cache service
* as well as ZooKeeper read/write functions for a) lookup of a service unit ownership to a broker; b) take ownership of
* a service unit by the local broker
*
*
*/
public class OwnershipCache {
private static final Logger LOG = LoggerFactory.getLogger(OwnershipCache.class);
/**
* The local broker URL that this <code>OwnershipCache</code> will set as owner
*/
private final String ownerBrokerUrl;
/**
* The local broker URL that this <code>OwnershipCache</code> will set as owner
*/
private final String ownerBrokerUrlTls;
/**
* The NamespaceEphemeralData objects that can be associated with the current owner
*/
private final NamespaceEphemeralData[] selfOwnerInfos;
/**
* Service unit ownership cache of <code>ZooKeeper</code> data of ephemeral nodes showing all known ownership of
* service unit to active brokers
*/
private final ZooKeeperDataCache<NamespaceEphemeralData> ownershipReadOnlyCache;
/**
* The loading cache of locally owned <code>ServiceUnit</code> objects
*/
private final LoadingCache<String, OwnedServiceUnit> ownedServiceUnitsCache;
/**
* The <code>ObjectMapper</code> to deserialize/serialize JSON objects
*/
private final ObjectMapper jsonMapper = ObjectMapperFactory.create();
/**
* The <code>ZooKeeperCache</code> connecting to the local ZooKeeper
*/
private final ZooKeeperCache localZkCache;
/**
* The <code>NamespaceBundleFactory</code> to construct <code>NamespaceBundles</code>
*/
private final NamespaceBundleFactory bundleFactory;
/**
* The max number of retries to acquire the ownership in <code>ZooKeeper</code>
*/
private static final int MAX_RETRIES_CREATE_ZNODE = 5; // tentatively decide to retry 5 times in acquiring the
// zookeeper node
private class OwnedServiceUnitCacheLoader extends CacheLoader<String, OwnedServiceUnit> {
@Override
public OwnedServiceUnit load(String key) throws Exception {
LOG.info("Acquiring zk lock on namespace {}", key);
// Under the cache sync lock, acquiring the ZNode
// If succeeded, we guaranteed that the cache entry is setup w/ ZNode acquired
// Only enters this load function if ownedServiceUnitsCache does not have the key
// invalidate the read-only cache to ensure loading the up-to-date znode
int numTries = 0;
while (numTries != MAX_RETRIES_CREATE_ZNODE) {
try {
checkArgument(ServiceUnitZkUtils
.acquireNameSpace(localZkCache.getZooKeeper(), key, selfOwnerInfos[0]).getNativeUrl()
.equals(ownerBrokerUrl)
|| ServiceUnitZkUtils.acquireNameSpace(localZkCache.getZooKeeper(), key, selfOwnerInfos[0])
.getNativeUrlTls().equals(ownerBrokerUrlTls));
ownershipReadOnlyCache.invalidate(key);
LOG.info("Acquired zk lock on namespace {}", key);
return new OwnedServiceUnit(ServiceUnitZkUtils.suIdFromPath(key, bundleFactory));
} catch (Exception e) {
// Failed to acquire the namespace, try to load the read-only cache since some other broker may have
// won the race
LOG.warn(String.format("Failed in acquiring the namespace ownership. key=%s", key));
try {
ownershipReadOnlyCache.invalidate(key);
ownershipReadOnlyCache.get(key);
// if successful, the znode has been created by someone. break out the loop
break;
} catch (NoNodeException nne) {
// load read-only cache failed due to no node exists, hence, we should try to acquire the
// namespace
// again
numTries++;
} catch (Exception moreErr) {
// Other unexpected failure
LOG.error(String.format(
"Unexpected exception while loading the read-only ZK cache for namespace. key=%s", key),
moreErr);
throw moreErr;
}
}
}
checkArgument(numTries < MAX_RETRIES_CREATE_ZNODE,
"Maximum retries exceeded to acquire the namespace. key=" + key);
// load the read-only cache w/ update-to-date entry
checkArgument(ownershipReadOnlyCache.get(key).getNativeUrl().equals(ownerBrokerUrl),
"Some other broker acquired the namespace. key=" + key);
return new OwnedServiceUnit(ServiceUnitZkUtils.suIdFromPath(key, bundleFactory));
}
}
/**
* Constructor of <code>OwnershipCache</code>
*
* @param ownerUrl
* the local broker URL that will be set as owner for the <code>ServiceUnit</code>
*/
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
ServiceConfiguration conf = pulsar.getConfiguration();
this.ownerBrokerUrl = pulsar.getBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfos = new NamespaceEphemeralData[] {
new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), false),
new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls, pulsar.getWebServiceAddress(),
pulsar.getWebServiceAddressTls(), true) };
this.bundleFactory = bundleFactory;
this.localZkCache = pulsar.getLocalZkCache();
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
// ownedServiceUnitsCache contains all namespaces that are owned by the local broker
this.ownedServiceUnitsCache = CacheBuilder.newBuilder().build(new OwnedServiceUnitCacheLoader());
}
/**
* Method to get the current owner of the <code>ServiceUnit</code>
*
* @param suId
* identifier of the <code>ServiceUnit</code>
* @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code>
* @throws Exception
* throws exception if no ownership info is found
*/
public NamespaceEphemeralData getOwner(ServiceUnitId suname) throws Exception {
return this.ownershipReadOnlyCache.get(ServiceUnitZkUtils.path(suname));
}
/**
* Method to get the current owner of the <code>ServiceUnit</code> or set the local broker as the owner if absent
*
* @param suId
* identifier of the <code>ServiceUnit</code>
* @return The ephemeral node data showing the current ownership info in <code>ZooKeeper</code>
* @throws Exception
*/
public NamespaceEphemeralData getOrSetOwner(ServiceUnitId suname) throws Exception {
String path = ServiceUnitZkUtils.path(suname);
// If the node has been deleted between two checks, we need to try again
while (true) {
try {
// Trying to load the ownedServiceUnitCache by acquiring the ZNode via the loader
this.ownedServiceUnitsCache.get(path);
} catch (Exception e) {
LOG.info(String.format("Failed in acquiring the ownership of service unit %s", suname), e);
}
try {
return this.ownershipReadOnlyCache.get(path);
} catch (KeeperException.NoNodeException e) {
LOG.warn("Failed in getting service unit from read-only cache {}", suname);
}
}
}
/**
* Method to remove the ownership of local broker on the <code>ServiceUnit</code>, if owned
*
* @param suId
* identifier of the <code>ServiceUnit</code>
* @throws Exception
*/
public void removeOwnership(ServiceUnitId suname) throws Exception {
// Under the cache sync lock, removing the ZNode
// If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
String key = ServiceUnitZkUtils.path(suname);
try {
localZkCache.getZooKeeper().delete(key, -1);
this.ownedServiceUnitsCache.invalidate(key);
this.ownershipReadOnlyCache.invalidate(key);
LOG.info("Removed zk lock for service unit: {}", key);
} catch (KeeperException.NoNodeException e) {
LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key, e.getMessage());
}
}
/**
* Method to remove ownership of all owned bundles
*
* @param bundles
* <code>NamespaceBundles</code> to remove from ownership cache
*/
public void removeOwnership(NamespaceBundles bundles) {
boolean hasError = false;
for (NamespaceBundle bundle : bundles.getBundles()) {
if (getOwnedServiceUnit(bundle) == null) {
// continue
continue;
}
try {
this.removeOwnership(bundle);
} catch (Exception e) {
LOG.warn(String.format("Failed to remove ownership of a service unit: %s", bundle), e);
hasError = true;
}
}
checkState(!hasError, "Not able to remove all owned bundles");
}
/**
* Method to access the map of all <code>ServiceUnit</code> objects owned by the local broker
*
* @return a map of owned <code>ServiceUnit</code> objects
*/
public Map<String, OwnedServiceUnit> getOwnedServiceUnits() {
return this.ownedServiceUnitsCache.asMap();
}
public OwnedServiceUnit getOwnedServiceUnit(ServiceUnitId suname) {
return this.ownedServiceUnitsCache.getIfPresent(ServiceUnitZkUtils.path(suname));
}
public void disableOwnership(ServiceUnitId suName) throws Exception {
String path = ServiceUnitZkUtils.path(suName);
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfos[1]), -1);
this.ownershipReadOnlyCache.invalidate(path);
}
/**
* Update bundle state in a local cache
*
* @param bundle
* @throws Exception
*/
public void updateBundleState(ServiceUnitId bundle, boolean isActive) throws Exception {
// Disable owned instance in local cache
OwnedServiceUnit ownedServiceUnit = getOwnedServiceUnit(bundle);
if (ownedServiceUnit != null) {
ownedServiceUnit.setActive(isActive);
}
}
}