blob: 41a9dd17785194bd0b0d4610ee7188f28ffcfe5d [file] [log] [blame]
package org.apache.helix.zookeeper.zkclient.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
/**
* ZkPathRecursiveWatcherTrie will be used as a registry of persistent recursive watchers.
* When persist recursive watcher is registered on path /X, ZK will send out data change for any
* data/child changing under the tree structure of /X. The event only include the path of changed
* ZNode.
* In ZkClient, when ever we get a dataChange event for path /X/Y/Z, we need to track back the path
* and notify all registered recursive persist listener and notify them about the node change.
* ref: https://zookeeper.apache.org/doc/r3.7.1/zookeeperProgrammers.html#sc_WatchPersistentRecursive
*/
public class ZkPathRecursiveWatcherTrie {
/** Root node of PathTrie */
private final TrieNode _rootNode;
static class TrieNode {
final String _value; // Segmented ZNode path at current level
// A map of segmented ZNode path of next level to TrieNode, We keep the same initial
// children size as Zk server
final Map<String, TrieNode> _children = new HashMap<>(4);
// A list of recursive persist watcher on the current path
Set<RecursivePersistListener> _recursiveListeners = new HashSet<>(4);
/**
* Create a trie node with parent as parameter.
*
* @param value the value stored in this node
*/
private TrieNode(String value) {
_value = value;
}
/**
* The value stored in this node.
*
* @return the value stored in this node
*/
public String getValue() {
return this._value;
}
/**
* Add a child to the existing node.
*
* @param childName the string name of the child
* @param node the node that is the child
*/
void addChild(String childName, TrieNode node) {
this._children.putIfAbsent(childName, node);
}
/**
* Return the child of a node mapping to the input child name.
*
* @param childName the name of the child
* @return the child of a node
*/
@VisibleForTesting
TrieNode getChild(String childName) {
return this._children.get(childName);
}
/**
* Get the list of children of this trienode.
*
* @return A collection containing the node's children
*/
@VisibleForTesting
Map<String, TrieNode> getChildren() {
return _children;
}
/**
* Get the set of RecursivePersistWatcherListener
* Returns an empty set if no listener is registered on the path
* @return
*/
@VisibleForTesting
Set<RecursivePersistListener> getRecursiveListeners() {
return _recursiveListeners;
}
@Override
public String toString() {
return "TrieNode [name=" + _value + ", children=" + _children.keySet() + "]";
}
}
/**
* Construct a new PathTrie with a root node.
*/
public ZkPathRecursiveWatcherTrie() {
this._rootNode = new TrieNode( "/");
}
/**
* Add a path to the path trie. All paths are relative to the root node.
*
* @param path the path to add RecursivePersistListener
* @param listener the RecursivePersistListener to be added
*/
public void addRecursiveListener(final String path, RecursivePersistListener listener) {
Objects.requireNonNull(path, "Path cannot be null");
if (path.isEmpty()) {
throw new IllegalArgumentException("Empty path: " + path);
}
final List<String> pathComponents = split(path);
synchronized (this) {
TrieNode parent = _rootNode;
for (final String part : pathComponents) {
parent = parent.getChildren().computeIfAbsent(part, TrieNode::new);
}
parent._recursiveListeners.add(listener);
}
}
public Set<RecursivePersistListener> getAllRecursiveListeners(String path) {
Objects.requireNonNull(path, "Path cannot be null");
final List<String> pathComponents = split(path);
Set<RecursivePersistListener> result = new HashSet<>();
synchronized (this) {
TrieNode cur = _rootNode;
for (final String element : pathComponents) {
cur = cur.getChild(element);
if (cur == null) {
break;
}
result.addAll(cur.getRecursiveListeners());
}
}
return result;
}
/**
* Removing a RecursivePersistWatcherListener on a path.
*
* Delete a path from the nearest trie node to current node if this is the only listener and there
* is no child on the trie node.
*
* @param path the of the lister registered
* @param listener the RecursivePersistListener to be removed
*/
public void removeRecursiveListener(final String path, RecursivePersistListener listener) {
Objects.requireNonNull(path, "Path cannot be null");
if (path.length() == 0) {
throw new IllegalArgumentException("Invalid path: " + path);
}
final List<String> pathComponents = split(path);
synchronized (this) {
TrieNode cur = _rootNode;
TrieNode highestNodeForDelete =
null; // track the highest node that from that node to leaf node.
TrieNode prevDeletable = _rootNode;
for (final String part : pathComponents) {
cur = cur.getChild(part);
if (cur == null) {
return;
}
// Every time when we move down one level of trie node, 3 pointers may be updated.
// highestNodeForDelete, prev of highestNodeForDelete and cur TrieNode.
// we invalidate `highestNodeForDelete` when cur node is non leaf node but has more than one
// children or listener, or it is leaf node and has more than one listener.
boolean candidateToDelete =
(cur.getChildren().size() == 1 && cur.getRecursiveListeners().isEmpty()) || (
cur.getChildren().isEmpty() && cur.getRecursiveListeners().size() == 1 && cur
.getRecursiveListeners().contains(listener));
if (candidateToDelete) {
if (highestNodeForDelete == null) {
highestNodeForDelete = cur;
}
} else {
prevDeletable = cur;
highestNodeForDelete = null;
}
}
if (!cur.getRecursiveListeners().contains(listener)) {
return;
}
cur.getRecursiveListeners().remove(listener);
if (highestNodeForDelete != null) {
prevDeletable.getChildren().remove(highestNodeForDelete.getValue());
}
}
}
/**
* Return if there is listener on a particular path
* @param path
* @return
*/
public boolean hasListenerOnPath(String path) {
Objects.requireNonNull(path, "Path cannot be null");
final List<String> pathComponents = split(path);
TrieNode cur;
synchronized (this) {
cur = _rootNode;
for (final String element : pathComponents) {
cur = cur.getChild(element);
if (cur == null) {
break;
}
}
}
return cur != null && !cur.getRecursiveListeners().isEmpty();
}
/**
* Clear all nodes in the trie.
*/
public synchronized void clear() {
_rootNode.getChildren().clear();
}
private static List<String> split(final String path) {
return Stream.of(path.split("/")).filter(t -> !t.trim().isEmpty()).collect(Collectors.toList());
}
// only for test
@VisibleForTesting
TrieNode getRootNode() {
return _rootNode;
}
}