blob: 49b9a3f5c03e828297a3cf5ef07fa9763acd8f91 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.cache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>A utility that attempts to keep the data from a node locally cached. This class
* will watch the node, respond to update/create/delete events, pull down the data, etc. You can
* register a listener that will get notified when changes occur.</p>
*
* <p><b>IMPORTANT</b> - it's not possible to stay transactionally in sync. Users of this class must
* be prepared for false-positives and false-negatives. Additionally, always use the version number
* when updating data to avoid overwriting another process' change.</p>
*/
public class NodeCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final WatcherRemoveCuratorFramework client;
private final String path;
private final boolean dataIsCompressed;
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
private final AtomicBoolean isConnected = new AtomicBoolean(true);
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
{
if ( isConnected.compareAndSet(false, true) )
{
try
{
reset();
}
catch ( Exception e )
{
log.error("Trying to reset after reconnection", e);
}
}
}
else
{
isConnected.set(false);
}
}
};
private final CuratorWatcher watcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
reset();
}
};
private enum State
{
LATENT,
STARTED,
CLOSED
}
private final BackgroundCallback backgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
processBackgroundResult(event);
}
};
/**
* @param client curztor client
* @param path the full path to the node to cache
*/
public NodeCache(CuratorFramework client, String path)
{
this(client, path, false);
}
/**
* @param client curztor client
* @param path the full path to the node to cache
* @param dataIsCompressed if true, data in the path is compressed
*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
{
this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
this.dataIsCompressed = dataIsCompressed;
}
/**
* Start the cache. The cache is not started automatically. You must call this method.
*
* @throws Exception errors
*/
public void start() throws Exception
{
start(false);
}
/**
* Same as {@link #start()} but gives the option of doing an initial build
*
* @param buildInitial if true, {@link #rebuild()} will be called before this method
* returns in order to get an initial view of the node
* @throws Exception errors
*/
public void start(boolean buildInitial) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
client.getConnectionStateListenable().addListener(connectionStateListener);
client.checkExists().creatingParentContainersIfNeeded().forPath(path);
if ( buildInitial )
{
internalRebuild();
}
reset();
}
@Override
public void close() throws IOException
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
client.removeWatchers();
listeners.clear();
}
client.getConnectionStateListenable().removeListener(connectionStateListener);
}
/**
* Return the cache listenable
*
* @return listenable
*/
public ListenerContainer<NodeCacheListener> getListenable()
{
Preconditions.checkState(state.get() != State.CLOSED, "Closed");
return listeners;
}
/**
* NOTE: this is a BLOCKING method. Completely rebuild the internal cache by querying
* for all needed data WITHOUT generating any events to send to listeners.
*
* @throws Exception errors
*/
public void rebuild() throws Exception
{
Preconditions.checkState(state.get() == State.STARTED, "Not started");
internalRebuild();
reset();
}
/**
* Return the current data. There are no guarantees of accuracy. This is
* merely the most recent view of the data. If the node does not exist,
* this returns null
*
* @return data or null
*/
public ChildData getCurrentData()
{
return data.get();
}
@VisibleForTesting
volatile Exchanger<Object> rebuildTestExchanger;
private void reset() throws Exception
{
if ( (state.get() == State.STARTED) && isConnected.get() )
{
client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
private void internalRebuild() throws Exception
{
try
{
Stat stat = new Stat();
byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path);
data.set(new ChildData(path, stat, bytes));
}
catch ( KeeperException.NoNodeException e )
{
data.set(null);
}
}
private void processBackgroundResult(CuratorEvent event) throws Exception
{
switch ( event.getType() )
{
case GET_DATA:
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData childData = new ChildData(path, event.getStat(), event.getData());
setNewData(childData);
}
break;
}
case EXISTS:
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
setNewData(null);
}
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
else
{
client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
break;
}
}
}
private void setNewData(ChildData newData) throws InterruptedException
{
ChildData previousData = data.getAndSet(newData);
if ( !Objects.equal(previousData, newData) )
{
listeners.forEach
(
new Function<NodeCacheListener, Void>()
{
@Override
public Void apply(NodeCacheListener listener)
{
try
{
listener.nodeChanged();
}
catch ( Exception e )
{
log.error("Calling listener", e);
}
return null;
}
}
);
if ( rebuildTestExchanger != null )
{
try
{
rebuildTestExchanger.exchange(new Object());
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
}