blob: 1e2a65db78d3602dbe7787bad7f1a3ef1c3ebacb [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import static;
import static;
import static;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This class encapsulate some utility functions for
* <code>ServiceUnit</code> related <code>ZooKeeper</code> operations.
public final class ServiceUnitZkUtils {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
public static final String UTF8 = "UTF8";
private static final ObjectMapper jsonMapper = ObjectMapperFactory.create();
* <code>ZooKeeper</code> root path for namespace ownership info.
public static final String OWNER_INFO_ROOT = LocalZooKeeperCacheService.OWNER_INFO_ROOT;
public static String path(NamespaceBundle suname) {
// The ephemeral node path for new namespaces should always have bundle name appended
return OWNER_INFO_ROOT + "/" + suname.toString();
public static NamespaceBundle suBundleFromPath(String path, NamespaceBundleFactory factory) {
String[] parts = path.split("/");
checkArgument(parts.length > 2);
checkArgument(parts.length > 4);
if (parts.length > 5) {
// this is a V1 path prop/cluster/namespace/hash
Range<Long> range = getHashRange(parts[5]);
return factory.getBundle(NamespaceName.get(parts[2], parts[3], parts[4]), range);
} else {
// this is a V2 path prop/namespace/hash
Range<Long> range = getHashRange(parts[4]);
return factory.getBundle(NamespaceName.get(parts[2], parts[3]), range);
private static Range<Long> getHashRange(String rangePathPart) {
String[] endPoints = rangePathPart.split("_");
checkArgument(endPoints.length == 2, "Malformed bundle hash range path part:" + rangePathPart);
Long startLong = Long.decode(endPoints[0]);
Long endLong = Long.decode(endPoints[1]);
BoundType endType = (endPoints[1].equals(LAST_BOUNDARY)) ? BoundType.CLOSED : BoundType.OPEN;
return Range.range(startLong, BoundType.CLOSED, endLong, endType);
* initZK is only called when the NamespaceService is initialized. So, no need for synchronization.
* @return
* @throws PulsarServerException
public static void initZK(ZooKeeper zkc, String selfBrokerUrl) {
// initialize the zk client with values
try {
// check and create /namespace path
LocalZooKeeperConnectionService.checkAndCreatePersistNode(zkc, OWNER_INFO_ROOT);
// make sure to cleanup all remaining ephemeral nodes that shows ownership of this broker
cleanupNamespaceNodes(zkc, OWNER_INFO_ROOT, selfBrokerUrl);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
* cleanupNamespaceNodes is only called when the NamespaceService is initialized. So, no need for synchronization.
* @throws Exception
private static void cleanupNamespaceNodes(ZooKeeper zkc, String root, String selfBrokerUrl) throws Exception {
// we don't need a watch here since we are only cleaning up the stale ephemeral nodes from previous session
try {
for (String node : zkc.getChildren(root, false)) {
String currentPath = root + "/" + node;
// retrieve the content and try to decode with ServiceLookupData
List<String> children = zkc.getChildren(currentPath, false);
if (children.size() == 0) {
// clean up a single namespace node
cleanupSingleNamespaceNode(zkc, currentPath, selfBrokerUrl);
} else {
// this is an intermediate node, which means this is v2 namespace path
cleanupNamespaceNodes(zkc, currentPath, selfBrokerUrl);
} catch (NoNodeException nne) {"No children for [{}]", nne.getPath());
* cleanupSingleNamespaceNode is only called when the NamespaceService is initialized. So, no need for
* synchronization.
* @throws Exception
private static void cleanupSingleNamespaceNode(ZooKeeper zkc, String path, String selfBrokerUrl)
throws Exception {
String brokerUrl = null;
try {
byte[] data = zkc.getData(path, false, null);
if (data == null || data.length == 0) {
// skip, ephemeral node will not have zero byte
NamespaceEphemeralData zdata = jsonMapper.readValue(data, NamespaceEphemeralData.class);
brokerUrl = zdata.getNativeUrl();
if (selfBrokerUrl.equals(brokerUrl)) {
// The znode indicate that the owner is this instance while this instance was just started before
// acquiring any namespaces yet
// Hence, the znode must have been created previously by this instance and needs to be cleaned up
zkc.delete(path, -1);
} catch (NoNodeException nne) {
// The node may expired before this instance tries to get or delete it from ZK. It is OK and considered as
// NOOP.
* Create name space Ephemeral node.
* @param zkc
* the <code>ZooKeeper</code> connected session object
* @param path
* the namespace path
* @param value
* the broker url that serves the name space.
* @return
* @throws KeeperException
* @throws InterruptedException
* @throws IOException
* @throws JsonMappingException
* @throws JsonGenerationException
public static NamespaceEphemeralData acquireNameSpace(ZooKeeper zkc, String path,
NamespaceEphemeralData value)
throws KeeperException, InterruptedException, JsonGenerationException, JsonMappingException, IOException {
// the znode data to be written
String data = jsonMapper.writeValueAsString(value);
createZnodeOptimistic(zkc, path, data, CreateMode.EPHEMERAL);
return value;
public static BundlesData createBundlesIfAbsent(ZooKeeper zkc, String path, BundlesData initialBundles)
throws JsonGenerationException, JsonMappingException, IOException, KeeperException, InterruptedException {
String data = jsonMapper.writeValueAsString(initialBundles);
createZnodeOptimistic(zkc, path, data, CreateMode.PERSISTENT);
return initialBundles;
private static void createZnodeOptimistic(ZooKeeper zkc, String path, String data, CreateMode mode)
throws KeeperException, InterruptedException {
try {
// create node optimistically
checkNotNull(LocalZooKeeperConnectionService.createIfAbsent(zkc, path, data, mode));
} catch (NoNodeException e) {
// if path contains multiple levels after the root, create the intermediate nodes first
String[] parts = path.split("/");
if (parts.length > 3) {
String intPath = path.substring(0, path.lastIndexOf("/"));
if (zkc.exists(intPath, false) == null) {
// create the intermediate nodes
try {
ZkUtils.createFullPathOptimistic(zkc, intPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
} catch (KeeperException.NodeExistsException nee) {
"Other broker preempted the full intermediate path [{}] already."
+ " Continue for acquiring the leaf ephemeral node.",
checkNotNull(LocalZooKeeperConnectionService.createIfAbsent(zkc, path, data, mode));
} else {
// If failed to create immediate child of root node, throw exception
throw e;