blob: 906d23d8382e16d42f50a6109ddd1a7a6c2f4d8b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.cache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.curator.utils.PathUtils.validatePath;
/**
* <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class
* will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can
* register a listener that will get notified when changes occur.</p>
* <p></p>
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
*/
public class TreeCache implements Closeable
{
private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
private final boolean createParentNodes;
private final TreeCacheSelector selector;
public static final class Builder
{
private final CuratorFramework client;
private final String path;
private boolean cacheData = true;
private boolean dataIsCompressed = false;
private ExecutorService executorService = null;
private int maxDepth = Integer.MAX_VALUE;
private boolean createParentNodes = false;
private TreeCacheSelector selector = new DefaultTreeCacheSelector();
private Builder(CuratorFramework client, String path)
{
this.client = checkNotNull(client);
this.path = validatePath(path);
}
/**
* Builds the {@link TreeCache} based on configured values.
*/
public TreeCache build()
{
ExecutorService executor = executorService;
if ( executor == null )
{
executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
}
return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, selector);
}
/**
* Sets whether or not to cache byte data per node; default {@code true}.
*/
public Builder setCacheData(boolean cacheData)
{
this.cacheData = cacheData;
return this;
}
/**
* Sets whether or to decompress node data; default {@code false}.
*/
public Builder setDataIsCompressed(boolean dataIsCompressed)
{
this.dataIsCompressed = dataIsCompressed;
return this;
}
/**
* Sets the executor to publish events; a default executor will be created if not specified.
*/
public Builder setExecutor(ThreadFactory threadFactory)
{
return setExecutor(Executors.newSingleThreadExecutor(threadFactory));
}
/**
* Sets the executor to publish events; a default executor will be created if not specified.
*/
public Builder setExecutor(ExecutorService executorService)
{
this.executorService = checkNotNull(executorService);
return this;
}
/**
* Sets the maximum depth to explore/watch. A {@code maxDepth} of {@code 0} will watch only
* the root node (like {@link NodeCache}); a {@code maxDepth} of {@code 1} will watch the
* root node and its immediate children (kind of like {@link PathChildrenCache}.
* Default: {@code Integer.MAX_VALUE}
*/
public Builder setMaxDepth(int maxDepth)
{
this.maxDepth = maxDepth;
return this;
}
/**
* By default, TreeCache does not auto-create parent nodes for the cached path. Change
* this behavior with this method. NOTE: parent nodes are created as containers
*
* @param createParentNodes true to create parent nodes
* @return this for chaining
*/
public Builder setCreateParentNodes(boolean createParentNodes)
{
this.createParentNodes = createParentNodes;
return this;
}
/**
* By default, {@link DefaultTreeCacheSelector} is used. Change the selector here.
*
* @param selector new selector
* @return this for chaining
*/
public Builder setSelector(TreeCacheSelector selector)
{
this.selector = selector;
return this;
}
}
/**
* Create a TreeCache builder for the given client and path to configure advanced options.
* <p/>
* If the client is namespaced, all operations on the resulting TreeCache will be in terms of
* the namespace, including all published events. The given path is the root at which the
* TreeCache will watch and explore. If no node exists at the given path, the TreeCache will
* be initially empty.
*
* @param client the client to use; may be namespaced
* @param path the path to the root node to watch/explore; this path need not actually exist on
* the server
* @return a new builder
*/
public static Builder newBuilder(CuratorFramework client, String path)
{
return new Builder(client, path);
}
private enum NodeState
{
PENDING, LIVE, DEAD
}
private final class TreeNode implements Watcher, BackgroundCallback
{
final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING);
final TreeNode parent;
final String path;
final AtomicReference<ChildData> childData = new AtomicReference<ChildData>();
final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>();
final int depth;
TreeNode(String path, TreeNode parent)
{
this.path = path;
this.parent = parent;
this.depth = parent == null ? 0 : parent.depth + 1;
}
private void refresh() throws Exception
{
if ((depth < maxDepth) && selector.traverseChildren(path))
{
outstandingOps.addAndGet(2);
doRefreshData();
doRefreshChildren();
} else {
refreshData();
}
}
private void refreshChildren() throws Exception
{
if ((depth < maxDepth) && selector.traverseChildren(path))
{
outstandingOps.incrementAndGet();
doRefreshChildren();
}
}
private void refreshData() throws Exception
{
outstandingOps.incrementAndGet();
doRefreshData();
}
private void doRefreshChildren() throws Exception
{
client.getChildren().usingWatcher(this).inBackground(this).forPath(path);
}
private void doRefreshData() throws Exception
{
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path);
}
else
{
client.getData().usingWatcher(this).inBackground(this).forPath(path);
}
}
void wasReconnected() throws Exception
{
refresh();
ConcurrentMap<String, TreeNode> childMap = children.get();
if ( childMap != null )
{
for ( TreeNode child : childMap.values() )
{
child.wasReconnected();
}
}
}
void wasCreated() throws Exception
{
refresh();
}
void wasDeleted() throws Exception
{
ChildData oldChildData = childData.getAndSet(null);
client.clearWatcherReferences(this);
ConcurrentMap<String, TreeNode> childMap = children.getAndSet(null);
if ( childMap != null )
{
ArrayList<TreeNode> childCopy = new ArrayList<TreeNode>(childMap.values());
childMap.clear();
for ( TreeNode child : childCopy )
{
child.wasDeleted();
}
}
if ( treeState.get() == TreeState.CLOSED )
{
return;
}
NodeState oldState = nodeState.getAndSet(NodeState.DEAD);
if ( oldState == NodeState.LIVE )
{
publishEvent(TreeCacheEvent.Type.NODE_REMOVED, oldChildData);
}
if ( parent == null )
{
// Root node; use an exist query to watch for existence.
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);
}
else
{
// Remove from parent if we're currently a child
ConcurrentMap<String, TreeNode> parentChildMap = parent.children.get();
if ( parentChildMap != null )
{
parentChildMap.remove(ZKPaths.getNodeFromPath(path), this);
}
}
}
@Override
public void process(WatchedEvent event)
{
LOG.debug("process: {}", event);
try
{
switch ( event.getType() )
{
case NodeCreated:
Preconditions.checkState(parent == null, "unexpected NodeCreated on non-root node");
wasCreated();
break;
case NodeChildrenChanged:
refreshChildren();
break;
case NodeDataChanged:
refreshData();
break;
case NodeDeleted:
wasDeleted();
break;
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
LOG.debug("processResult: {}", event);
Stat newStat = event.getStat();
switch ( event.getType() )
{
case EXISTS:
Preconditions.checkState(parent == null, "unexpected EXISTS on non-root node");
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING);
wasCreated();
}
break;
case CHILDREN:
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData oldChildData = childData.get();
if ( oldChildData != null && oldChildData.getStat().getMzxid() == newStat.getMzxid() )
{
// Only update stat if mzxid is same, otherwise we might obscure
// GET_DATA event updates.
childData.compareAndSet(oldChildData, new ChildData(oldChildData.getPath(), newStat, oldChildData.getData()));
}
if ( event.getChildren().isEmpty() )
{
break;
}
ConcurrentMap<String, TreeNode> childMap = children.get();
if ( childMap == null )
{
childMap = Maps.newConcurrentMap();
if ( !children.compareAndSet(null, childMap) )
{
childMap = children.get();
}
}
// Present new children in sorted order for test determinism.
List<String> newChildren = new ArrayList<String>();
for ( String child : event.getChildren() )
{
if ( !childMap.containsKey(child) && selector.acceptChild(ZKPaths.makePath(path, child)) )
{
newChildren.add(child);
}
}
Collections.sort(newChildren);
for ( String child : newChildren )
{
String fullPath = ZKPaths.makePath(path, child);
TreeNode node = new TreeNode(fullPath, this);
if ( childMap.putIfAbsent(child, node) == null )
{
node.wasCreated();
}
}
}
else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
wasDeleted();
}
break;
case GET_DATA:
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData toPublish = new ChildData(event.getPath(), newStat, event.getData());
ChildData oldChildData;
if ( cacheData )
{
oldChildData = childData.getAndSet(toPublish);
}
else
{
oldChildData = childData.getAndSet(new ChildData(event.getPath(), newStat, null));
}
boolean added;
if (parent == null) {
// We're the singleton root.
added = nodeState.getAndSet(NodeState.LIVE) != NodeState.LIVE;
} else {
added = nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE);
if (!added) {
// Ordinary nodes are not allowed to transition from dead -> live;
// make sure this isn't a delayed response that came in after death.
if (nodeState.get() != NodeState.LIVE) {
return;
}
}
}
if ( added )
{
publishEvent(TreeCacheEvent.Type.NODE_ADDED, toPublish);
}
else
{
if ( oldChildData == null || oldChildData.getStat().getMzxid() != newStat.getMzxid() )
{
publishEvent(TreeCacheEvent.Type.NODE_UPDATED, toPublish);
}
}
}
else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
wasDeleted();
}
break;
default:
// An unknown event, probably an error of some sort like connection loss.
LOG.info(String.format("Unknown event %s", event));
// Don't produce an initialized event on error; reconnect can fix this.
outstandingOps.decrementAndGet();
return;
}
if ( outstandingOps.decrementAndGet() == 0 )
{
if ( isInitialized.compareAndSet(false, true) )
{
publishEvent(TreeCacheEvent.Type.INITIALIZED);
}
}
}
}
private enum TreeState
{
LATENT,
STARTED,
CLOSED
}
/**
* Tracks the number of outstanding background requests in flight. The first time this count reaches 0, we publish the initialized event.
*/
private final AtomicLong outstandingOps = new AtomicLong(0);
/**
* Have we published the {@link TreeCacheEvent.Type#INITIALIZED} event yet?
*/
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
private final TreeNode root;
private final CuratorFramework client;
private final ExecutorService executorService;
private final boolean cacheData;
private final boolean dataIsCompressed;
private final int maxDepth;
private final ListenerContainer<TreeCacheListener> listeners = new ListenerContainer<TreeCacheListener>();
private final ListenerContainer<UnhandledErrorListener> errorListeners = new ListenerContainer<UnhandledErrorListener>();
private final AtomicReference<TreeState> treeState = new AtomicReference<TreeState>(TreeState.LATENT);
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
handleStateChange(newState);
}
};
static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache");
/**
* Create a TreeCache for the given client and path with default options.
* <p/>
* If the client is namespaced, all operations on the resulting TreeCache will be in terms of
* the namespace, including all published events. The given path is the root at which the
* TreeCache will watch and explore. If no node exists at the given path, the TreeCache will
* be initially empty.
*
* @param client the client to use; may be namespaced
* @param path the path to the root node to watch/explore; this path need not actually exist on
* the server
* @see #newBuilder(CuratorFramework, String)
*/
public TreeCache(CuratorFramework client, String path)
{
this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, new DefaultTreeCacheSelector());
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService Closeable ExecutorService to use for the TreeCache's background thread
* @param createParentNodes true to create parent nodes as containers
* @param selector the selector to use
*/
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)
{
this.createParentNodes = createParentNodes;
this.selector = Preconditions.checkNotNull(selector, "selector cannot be null");
this.root = new TreeNode(validatePath(path), null);
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
this.maxDepth = maxDepth;
this.executorService = Preconditions.checkNotNull(executorService, "executorService cannot be null");
}
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @return this
* @throws Exception errors
*/
public TreeCache start() throws Exception
{
Preconditions.checkState(treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED), "already started");
if ( createParentNodes )
{
client.createContainers(root.path);
}
client.getConnectionStateListenable().addListener(connectionStateListener);
if ( client.getZookeeperClient().isConnected() )
{
root.wasCreated();
}
return this;
}
/**
* Close/end the cache.
*/
@Override
public void close()
{
if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) )
{
client.getConnectionStateListenable().removeListener(connectionStateListener);
listeners.clear();
executorService.shutdown();
try
{
root.wasDeleted();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
}
/**
* Return the cache listenable
*
* @return listenable
*/
public Listenable<TreeCacheListener> getListenable()
{
return listeners;
}
/**
* Allows catching unhandled errors in asynchornous operations.
*
* TODO: consider making public.
*/
@VisibleForTesting
public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
{
return errorListeners;
}
private TreeNode find(String findPath)
{
PathUtils.validatePath(findPath);
LinkedList<String> rootElements = new LinkedList<String>(ZKPaths.split(root.path));
LinkedList<String> findElements = new LinkedList<String>(ZKPaths.split(findPath));
while (!rootElements.isEmpty()) {
if (findElements.isEmpty()) {
// Target path shorter than root path
return null;
}
String nextRoot = rootElements.removeFirst();
String nextFind = findElements.removeFirst();
if (!nextFind.equals(nextRoot)) {
// Initial root path does not match
return null;
}
}
TreeNode current = root;
while (!findElements.isEmpty()) {
String nextFind = findElements.removeFirst();
ConcurrentMap<String, TreeNode> map = current.children.get();
if ( map == null )
{
return null;
}
current = map.get(nextFind);
if ( current == null )
{
return null;
}
}
return current;
}
/**
* Return the current set of children at the given path, mapped by child name. There are no
* guarantees of accuracy; this is merely the most recent view of the data. If there is no
* node at this path, {@code null} is returned.
*
* @param fullPath full path to the node to check
* @return a possibly-empty list of children if the node is alive, or null
*/
public Map<String, ChildData> getCurrentChildren(String fullPath)
{
TreeNode node = find(fullPath);
if ( node == null || node.nodeState.get() != NodeState.LIVE )
{
return null;
}
ConcurrentMap<String, TreeNode> map = node.children.get();
Map<String, ChildData> result;
if ( map == null )
{
result = ImmutableMap.of();
}
else
{
ImmutableMap.Builder<String, ChildData> builder = ImmutableMap.builder();
for ( Map.Entry<String, TreeNode> entry : map.entrySet() )
{
TreeNode childNode = entry.getValue();
ChildData childData = childNode.childData.get();
// Double-check liveness after retreiving data.
if ( childData != null && childNode.nodeState.get() == NodeState.LIVE )
{
builder.put(entry.getKey(), childData);
}
}
result = builder.build();
}
// Double-check liveness after retreiving children.
return node.nodeState.get() == NodeState.LIVE ? result : null;
}
/**
* Return the current data for the given path. There are no guarantees of accuracy. This is
* merely the most recent view of the data. If there is no node at the given path,
* {@code null} is returned.
*
* @param fullPath full path to the node to check
* @return data if the node is alive, or null
*/
public ChildData getCurrentData(String fullPath)
{
TreeNode node = find(fullPath);
if ( node == null || node.nodeState.get() != NodeState.LIVE )
{
return null;
}
ChildData result = node.childData.get();
// Double-check liveness after retreiving data.
return node.nodeState.get() == NodeState.LIVE ? result : null;
}
private void callListeners(final TreeCacheEvent event)
{
listeners.forEach(new Function<TreeCacheListener, Void>()
{
@Override
public Void apply(TreeCacheListener listener)
{
try
{
listener.childEvent(client, event);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
return null;
}
});
}
/**
* Send an exception to any listeners, or else log the error if there are none.
*/
private void handleException(final Throwable e)
{
if ( errorListeners.size() == 0 )
{
LOG.error("", e);
}
else
{
errorListeners.forEach(new Function<UnhandledErrorListener, Void>()
{
@Override
public Void apply(UnhandledErrorListener listener)
{
try
{
listener.unhandledError("", e);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
LOG.error("Exception handling exception", e);
}
return null;
}
});
}
}
private void handleStateChange(ConnectionState newState)
{
switch ( newState )
{
case SUSPENDED:
publishEvent(TreeCacheEvent.Type.CONNECTION_SUSPENDED);
break;
case LOST:
isInitialized.set(false);
publishEvent(TreeCacheEvent.Type.CONNECTION_LOST);
break;
case CONNECTED:
try
{
root.wasCreated();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
break;
case RECONNECTED:
try
{
root.wasReconnected();
publishEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
break;
}
}
private void publishEvent(TreeCacheEvent.Type type)
{
publishEvent(new TreeCacheEvent(type, null));
}
private void publishEvent(TreeCacheEvent.Type type, String path)
{
publishEvent(new TreeCacheEvent(type, new ChildData(path, null, null)));
}
private void publishEvent(TreeCacheEvent.Type type, ChildData data)
{
publishEvent(new TreeCacheEvent(type, data));
}
private void publishEvent(final TreeCacheEvent event)
{
if ( treeState.get() != TreeState.CLOSED )
{
LOG.debug("publishEvent: {}", event);
executorService.submit(new Runnable()
{
@Override
public void run()
{
{
try
{
callListeners(event);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
}
});
}
}
}