blob: 424abe456710ea1ffba11e131c79df8d221d5dc5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.cache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class
* will watch the ZK path, respond to update/create/delete events, pull down the data, etc. You can
* register a listener that will get notified when changes occur.</p>
* <p/>
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
*/
@SuppressWarnings("NullableProblems")
public class PathChildrenCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String path;
private final CloseableExecutorService executorService;
private final boolean cacheData;
private final boolean dataIsCompressed;
private final EnsurePath ensurePath;
private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private enum State
{
LATENT,
STARTED,
CLOSED
}
private static final ChildData NULL_CHILD_DATA = new ChildData(null, null, null);
private final Watcher childrenWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
}
};
private final Watcher dataWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
if ( event.getType() == Event.EventType.NodeDeleted )
{
remove(event.getPath());
}
else if ( event.getType() == Event.EventType.NodeDataChanged )
{
offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
}
}
catch ( Exception e )
{
handleException(e);
}
}
};
@VisibleForTesting
volatile Exchanger<Object> rebuildTestExchanger;
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
handleStateChange(newState);
}
};
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
/**
* @param client the client
* @param path path to watch
* @param mode caching mode
* @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
*/
@SuppressWarnings("deprecation")
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
{
this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory)));
}
/**
* @param client the client
* @param path path to watch
* @param mode caching mode
* @param threadFactory factory to use when creating internal threads
* @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
*/
@SuppressWarnings("deprecation")
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
{
this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
{
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param threadFactory factory to use when creating internal threads
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
{
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param threadFactory factory to use when creating internal threads
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
{
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService ExecutorService to use for the PathChildrenCache's background thread
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
{
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
}
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
* @param dataIsCompressed if true, data in the path is compressed
* @param executorService Closeable ExecutorService to use for the PathChildrenCache's background thread
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
{
this.client = client;
this.path = path;
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
this.executorService = executorService;
ensurePath = client.newNamespaceAwareEnsurePath(path);
}
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @throws Exception errors
*/
public void start() throws Exception
{
start(StartMode.NORMAL);
}
/**
* Same as {@link #start()} but gives the option of doing an initial build
*
* @param buildInitial if true, {@link #rebuild()} will be called before this method
* returns in order to get an initial view of the node; otherwise,
* the cache will be initialized asynchronously
* @throws Exception errors
* @deprecated use {@link #start(StartMode)}
*/
public void start(boolean buildInitial) throws Exception
{
start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
}
/**
* Method of priming cache on {@link PathChildrenCache#start(StartMode)}
*/
public enum StartMode
{
/**
* cache will _not_ be primed. i.e. it will start empty and you will receive
* events for all nodes added, etc.
*/
NORMAL,
/**
* {@link PathChildrenCache#rebuild()} will be called before this method returns in
* order to get an initial view of the node.
*/
BUILD_INITIAL_CACHE,
/**
* After cache is primed with initial values (in the background) a
* {@link PathChildrenCacheEvent.Type#INITIALIZED} will be posted
*/
POST_INITIALIZED_EVENT
}
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @param mode Method for priming the cache
* @throws Exception errors
*/
public void start(StartMode mode) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
mode = Preconditions.checkNotNull(mode, "mode cannot be null");
client.getConnectionStateListenable().addListener(connectionStateListener);
switch ( mode )
{
case NORMAL:
{
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
break;
}
case BUILD_INITIAL_CACHE:
{
rebuild();
break;
}
case POST_INITIALIZED_EVENT:
{
initialSet.set(Maps.<String, ChildData>newConcurrentMap());
offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
break;
}
}
}
/**
* NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
* for all needed data WITHOUT generating any events to send to listeners.
*
* @throws Exception errors
*/
public void rebuild() throws Exception
{
Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
ensurePath.ensure(client.getZookeeperClient());
clear();
List<String> children = client.getChildren().forPath(path);
for ( String child : children )
{
String fullPath = ZKPaths.makePath(path, child);
internalRebuildNode(fullPath);
if ( rebuildTestExchanger != null )
{
rebuildTestExchanger.exchange(new Object());
}
}
// this is necessary so that any updates that occurred while rebuilding are taken
offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
}
/**
* NOTE: this is a BLOCKING method. Rebuild the internal cache for the given node by querying
* for all needed data WITHOUT generating any events to send to listeners.
*
* @param fullPath full path of the node to rebuild
* @throws Exception errors
*/
public void rebuildNode(String fullPath) throws Exception
{
Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath);
Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
ensurePath.ensure(client.getZookeeperClient());
internalRebuildNode(fullPath);
// this is necessary so that any updates that occurred while rebuilding are taken
// have to rebuild entire tree in case this node got deleted in the interim
offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
}
/**
* Close/end the cache
*
* @throws IOException errors
*/
@Override
public void close() throws IOException
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
client.getConnectionStateListenable().removeListener(connectionStateListener);
executorService.close();
}
}
/**
* Return the cache listenable
*
* @return listenable
*/
public ListenerContainer<PathChildrenCacheListener> getListenable()
{
return listeners;
}
/**
* Return the current data. There are no guarantees of accuracy. This is
* merely the most recent view of the data. The data is returned in sorted order.
*
* @return list of children and data
*/
public List<ChildData> getCurrentData()
{
return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values()));
}
/**
* Return the current data for the given path. There are no guarantees of accuracy. This is
* merely the most recent view of the data. If there is no child with that path, <code>null</code>
* is returned.
*
* @param fullPath full path to the node to check
* @return data or null
*/
public ChildData getCurrentData(String fullPath)
{
return currentData.get(fullPath);
}
/**
* As a memory optimization, you can clear the cached data bytes for a node. Subsequent
* calls to {@link ChildData#getData()} for this node will return <code>null</code>.
*
* @param fullPath the path of the node to clear
*/
public void clearDataBytes(String fullPath)
{
clearDataBytes(fullPath, -1);
}
/**
* As a memory optimization, you can clear the cached data bytes for a node. Subsequent
* calls to {@link ChildData#getData()} for this node will return <code>null</code>.
*
* @param fullPath the path of the node to clear
* @param ifVersion if non-negative, only clear the data if the data's version matches this version
* @return true if the data was cleared
*/
public boolean clearDataBytes(String fullPath, int ifVersion)
{
ChildData data = currentData.get(fullPath);
if ( data != null )
{
if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )
{
data.clearData();
return true;
}
}
return false;
}
/**
* Clear out current data and begin a new query on the path
*
* @throws Exception errors
*/
public void clearAndRefresh() throws Exception
{
currentData.clear();
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
}
/**
* Clears the current data without beginning a new query and without generating any events
* for listeners.
*/
public void clear()
{
currentData.clear();
}
enum RefreshMode
{
STANDARD,
FORCE_GET_DATA_AND_STAT,
POST_INITIALIZED
}
void refresh(final RefreshMode mode) throws Exception
{
ensurePath.ensure(client.getZookeeperClient());
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
processChildren(event.getChildren(), mode);
}
};
client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
void callListeners(final PathChildrenCacheEvent event)
{
listeners.forEach
(
new Function<PathChildrenCacheListener, Void>()
{
@Override
public Void apply(PathChildrenCacheListener listener)
{
try
{
listener.childEvent(client, event);
}
catch ( Exception e )
{
handleException(e);
}
return null;
}
}
);
}
void getDataAndStat(final String fullPath) throws Exception
{
BackgroundCallback existsCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), null);
}
};
BackgroundCallback getDataCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), event.getData());
}
};
if ( cacheData )
{
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
}
else
{
client.getData().usingWatcher(dataWatcher).inBackground(getDataCallback).forPath(fullPath);
}
}
else
{
client.checkExists().usingWatcher(dataWatcher).inBackground(existsCallback).forPath(fullPath);
}
}
/**
* Default behavior is just to log the exception
*
* @param e the exception
*/
protected void handleException(Throwable e)
{
log.error("", e);
}
@VisibleForTesting
protected void remove(String fullPath)
{
ChildData data = currentData.remove(fullPath);
if ( data != null )
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_REMOVED, data)));
}
Map<String, ChildData> localInitialSet = initialSet.get();
if ( localInitialSet != null )
{
localInitialSet.remove(fullPath);
maybeOfferInitializedEvent(localInitialSet);
}
}
private void internalRebuildNode(String fullPath) throws Exception
{
if ( cacheData )
{
try
{
Stat stat = new Stat();
byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(fullPath) : client.getData().storingStatIn(stat).forPath(fullPath);
currentData.put(fullPath, new ChildData(fullPath, stat, bytes));
}
catch ( KeeperException.NoNodeException ignore )
{
// node no longer exists - remove it
currentData.remove(fullPath);
}
}
else
{
Stat stat = client.checkExists().forPath(fullPath);
if ( stat != null )
{
currentData.put(fullPath, new ChildData(fullPath, stat, null));
}
else
{
// node no longer exists - remove it
currentData.remove(fullPath);
}
}
}
private void handleStateChange(ConnectionState newState)
{
switch ( newState )
{
case SUSPENDED:
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
break;
}
case LOST:
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
break;
}
case RECONNECTED:
{
try
{
offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
}
catch ( Exception e )
{
handleException(e);
}
break;
}
}
}
private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
List<String> fullPaths = Lists.newArrayList(Lists.transform
(
children,
new Function<String, String>()
{
@Override
public String apply(String child)
{
return ZKPaths.makePath(path, child);
}
}
));
Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
removedNodes.removeAll(fullPaths);
for ( String fullPath : removedNodes )
{
remove(fullPath);
}
for ( String name : children )
{
String fullPath = ZKPaths.makePath(path, name);
if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
{
getDataAndStat(fullPath);
}
updateInitialSet(name, NULL_CHILD_DATA);
}
maybeOfferInitializedEvent(initialSet.get());
}
private void applyNewData(String fullPath, int resultCode, Stat stat, byte[] bytes)
{
if ( resultCode == KeeperException.Code.OK.intValue() ) // otherwise - node must have dropped or something - we should be getting another event
{
ChildData data = new ChildData(fullPath, stat, bytes);
ChildData previousData = currentData.put(fullPath, data);
if ( previousData == null ) // i.e. new
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_ADDED, data)));
}
else if ( previousData.getStat().getVersion() != stat.getVersion() )
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
}
updateInitialSet(ZKPaths.getNodeFromPath(fullPath), data);
}
}
private void updateInitialSet(String name, ChildData data)
{
Map<String, ChildData> localInitialSet = initialSet.get();
if ( localInitialSet != null )
{
localInitialSet.put(name, data);
maybeOfferInitializedEvent(localInitialSet);
}
}
private void maybeOfferInitializedEvent(Map<String, ChildData> localInitialSet)
{
if ( !hasUninitialized(localInitialSet) )
{
// all initial children have been processed - send initialized message
if ( initialSet.getAndSet(null) != null ) // avoid edge case - don't send more than 1 INITIALIZED event
{
final List<ChildData> children = ImmutableList.copyOf(localInitialSet.values());
PathChildrenCacheEvent event = new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)
{
@Override
public List<ChildData> getInitialData()
{
return children;
}
};
offerOperation(new EventOperation(this, event));
}
}
}
private boolean hasUninitialized(Map<String, ChildData> localInitialSet)
{
if ( localInitialSet == null )
{
return false;
}
Map<String, ChildData> uninitializedChildren = Maps.filterValues
(
localInitialSet,
new Predicate<ChildData>()
{
@Override
public boolean apply(ChildData input)
{
return (input == NULL_CHILD_DATA); // check against ref intentional
}
}
);
return (uninitializedChildren.size() != 0);
}
private void offerOperation(final Operation operation)
{
if ( operationsQuantizer.add(operation) )
{
submitToExecutor
(
new Runnable()
{
@Override
public void run()
{
try
{
operationsQuantizer.remove(operation);
operation.invoke();
}
catch ( Exception e )
{
handleException(e);
}
}
}
);
}
}
/**
* Submits a runnable to the executor.
* <p/>
* This method is synchronized because it has to check state about whether this instance is still open. Without this check
* there is a race condition with the dataWatchers that get set. Even after this object is closed() it can still be
* called by those watchers, because the close() method cannot actually disable the watcher.
* <p/>
* The synchronization overhead should be minimal if non-existant as this is generally only called from the
* ZK client thread and will only contend if close() is called in parallel with an update, and that's the exact state
* we want to protect from.
*
* @param command The runnable to run
*/
private synchronized void submitToExecutor(final Runnable command)
{
if ( state.get() == State.STARTED )
{
executorService.submit(command);
}
}
}