wip
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java
deleted file mode 100644
index e56dade..0000000
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/ForceRefreshOperation.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2012 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.curator.framework.recipes.cache;
-
-class ForceRefreshOperation implements Operation
-{
- private final PathChildrenCache cache;
-
- ForceRefreshOperation(PathChildrenCache cache)
- {
- this.cache = cache;
- }
-
- @Override
- public void invoke() throws Exception
- {
- cache.refresh(true);
- }
-
- @Override
- public int hashCode()
- {
- return ForceRefreshOperation.class.hashCode();
- }
-
- @Override
- public boolean equals(Object obj)
- {
- //noinspection SimplifiableIfStatement
- if ( obj == null )
- {
- return false;
- }
-
- return (this == obj) || (getClass().equals(obj.getClass()));
- }
-
- @Override
- public String toString()
- {
- return "ForceRefreshOperation{}";
- }
-}
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
index 357bcca..0710ab6 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
@@ -72,12 +72,14 @@
private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
+ private volatile Set<String> initialSet = null;
+
private final Watcher childrenWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
- offerOperation(new RefreshOperation(PathChildrenCache.this));
+ offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
}
};
@@ -189,7 +191,7 @@
*/
public void start() throws Exception
{
- start(false);
+ start(StartMode.STANDARD);
}
/**
@@ -198,32 +200,73 @@
* @param buildInitial if true, {@link #rebuild()} will be called before this method
* returns in order to get an initial view of the node
* @throws Exception errors
+ * @deprecated use {@link #start(StartMode)}
*/
public void start(boolean buildInitial) throws Exception
{
+ start(buildInitial ? StartMode.BUILD_INITIAL : StartMode.STANDARD);
+ }
+
+ /**
+ * How to load initial data when calling start()
+ */
+ public enum StartMode
+ {
+ /**
+ * Load data in the background - start() will return immediately
+ */
+ STANDARD,
+
+ /**
+ * Load initial data set in the foregroup. start() will block until all initial data is loaded
+ */
+ BUILD_INITIAL,
+
+ /**
+ * Load data in the background and post {@link PathChildrenCacheEvent.Type#INITIALIZED} when done
+ */
+ POST_INITIALIZED_EVENT
+ }
+
+ public void start(StartMode mode) throws Exception
+ {
+ mode = Preconditions.checkNotNull(mode, "mode cannot be null");
+
Preconditions.checkState(!executorService.isShutdown(), "already started");
client.getConnectionStateListenable().addListener(connectionStateListener);
executorService.submit
- (
- new Callable<Void>()
+ (
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
{
- @Override
- public Void call() throws Exception
- {
- mainLoop();
- return null;
- }
+ mainLoop();
+ return null;
}
- );
+ }
+ );
- if ( buildInitial )
+ switch ( mode )
{
- rebuild();
- }
- else
- {
- offerOperation(new RefreshOperation(this));
+ case STANDARD:
+ {
+ offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
+ break;
+ }
+
+ case BUILD_INITIAL:
+ {
+ rebuild();
+ break;
+ }
+
+ case POST_INITIALIZED_EVENT:
+ {
+ offerOperation(new RefreshOperation(this, RefreshMode.POST_CHILDREN_INITIALIZED_EVENT));
+ break;
+ }
}
}
@@ -252,7 +295,7 @@
}
// this is necessary so that any updates that occurred while rebuilding are taken
- offerOperation(new ForceRefreshOperation(this));
+ offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
}
/**
@@ -272,7 +315,7 @@
// this is necessary so that any updates that occurred while rebuilding are taken
// have to rebuild entire tree in case this node got deleted in the interim
- offerOperation(new ForceRefreshOperation(this));
+ offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
}
/**
@@ -364,7 +407,7 @@
public void clearAndRefresh() throws Exception
{
currentData.clear();
- offerOperation(new RefreshOperation(this));
+ offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
}
/**
@@ -376,7 +419,14 @@
currentData.clear();
}
- void refresh(final boolean forceGetDataAndStat) throws Exception
+ enum RefreshMode
+ {
+ STANDARD,
+ FORCE_GET_DATA_AND_STAT,
+ POST_CHILDREN_INITIALIZED_EVENT
+ }
+
+ void refresh(final RefreshMode mode) throws Exception
{
ensurePath.ensure(client.getZookeeperClient());
@@ -385,7 +435,7 @@
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
- processChildren(event.getChildren(), forceGetDataAndStat);
+ processChildren(event.getChildren(), mode);
}
};
@@ -507,7 +557,7 @@
{
try
{
- offerOperation(new ForceRefreshOperation(this));
+ offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
}
catch ( Exception e )
@@ -519,7 +569,7 @@
}
}
- private void processChildren(List<String> children, boolean forceGetDataAndStat) throws Exception
+ private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
List<String> fullPaths = Lists.transform
(
@@ -541,13 +591,39 @@
remove(fullPath);
}
+ if ( mode == RefreshMode.POST_CHILDREN_INITIALIZED_EVENT )
+ {
+ initialSet = Sets.newHashSet(children);
+ updateInitialSet(null);
+ }
+
for ( String name : children )
{
String fullPath = ZKPaths.makePath(path, name);
- if ( forceGetDataAndStat || !currentData.containsKey(fullPath) )
+ if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
{
getDataAndStat(fullPath);
}
+ else
+ {
+ updateInitialSet(name);
+ }
+ }
+ }
+
+ private void updateInitialSet(String child)
+ {
+ if ( initialSet != null )
+ {
+ if ( child != null )
+ {
+ initialSet.remove(child);
+ }
+ if ( initialSet.size() == 0 )
+ {
+ offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.INITIALIZED, null)));
+ initialSet = null;
+ }
}
}
@@ -574,6 +650,7 @@
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CHILD_UPDATED, data)));
}
+ updateInitialSet(ZKPaths.getNodeFromPath(fullPath));
}
}
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
index 9a37a6c..c601130 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCacheEvent.java
@@ -46,19 +46,25 @@
CHILD_REMOVED,
/**
- * Called when the connection has changed to {@link ConnectionState#SUSPENDED}
+ * send when the connection has changed to {@link ConnectionState#SUSPENDED}
*/
CONNECTION_SUSPENDED,
/**
- * Called when the connection has changed to {@link ConnectionState#RECONNECTED}
+ * set when the connection has changed to {@link ConnectionState#RECONNECTED}
*/
CONNECTION_RECONNECTED,
/**
- * Called when the connection has changed to {@link ConnectionState#LOST}
+ * set when the connection has changed to {@link ConnectionState#LOST}
*/
- CONNECTION_LOST
+ CONNECTION_LOST,
+
+ /**
+ * Sent when start(StartMode.POST_CHILDREN_INITIALIZED_EVENT) has completed loading
+ * the initial data set
+ */
+ INITIALIZED
}
/**
diff --git a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
index a6988d6..be5bddc 100644
--- a/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
+++ b/curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/RefreshOperation.java
@@ -19,16 +19,18 @@
class RefreshOperation implements Operation
{
private final PathChildrenCache cache;
+ private final PathChildrenCache.RefreshMode mode;
- RefreshOperation(PathChildrenCache cache)
+ RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode)
{
this.cache = cache;
+ this.mode = mode;
}
@Override
public void invoke() throws Exception
{
- cache.refresh(false);
+ cache.refresh(mode);
}
@Override
@@ -52,6 +54,6 @@
@Override
public String toString()
{
- return "RefreshOperation{}";
+ return "RefreshOperation(" + mode + ")";
}
}
diff --git a/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
index a40bc18..0ab2bcc 100644
--- a/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -23,9 +23,12 @@
import com.netflix.curator.framework.recipes.BaseClassForTests;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.KillSession;
+import com.netflix.curator.test.TestingCluster;
import com.netflix.curator.test.Timing;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
@@ -36,6 +39,60 @@
public class TestPathChildrenCache extends BaseClassForTests
{
@Test
+ public void testChildrenInitialized() throws Exception
+ {
+ Timing timing = new Timing();
+ PathChildrenCache cache = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath("/test");
+
+ cache = new PathChildrenCache(client, "/test", true);
+
+ final CountDownLatch addedLatch = new CountDownLatch(3);
+ final CountDownLatch initLatch = new CountDownLatch(1);
+ cache.getListenable().addListener
+ (
+ new PathChildrenCacheListener()
+ {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+ {
+ if ( event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED )
+ {
+ addedLatch.countDown();
+ }
+ else if ( event.getType() == PathChildrenCacheEvent.Type.INITIALIZED )
+ {
+ initLatch.countDown();
+ }
+ }
+ }
+ );
+
+ client.create().forPath("/test/1", "1".getBytes());
+ client.create().forPath("/test/2", "2".getBytes());
+ client.create().forPath("/test/3", "3".getBytes());
+
+ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+
+ Assert.assertTrue(timing.awaitLatch(addedLatch));
+ Assert.assertTrue(timing.awaitLatch(initLatch));
+ Assert.assertEquals(cache.getCurrentData().size(), 3);
+ Assert.assertEquals(cache.getCurrentData().get(0).getData(), "1".getBytes());
+ Assert.assertEquals(cache.getCurrentData().get(1).getData(), "2".getBytes());
+ Assert.assertEquals(cache.getCurrentData().get(2).getData(), "3".getBytes());
+ }
+ finally
+ {
+ Closeables.closeQuietly(cache);
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testUpdateWhenNotCachingData() throws Exception
{
Timing timing = new Timing();
@@ -66,7 +123,7 @@
}
}
);
- cache.start(true);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
client.create().forPath("/test/foo", "first".getBytes());
Assert.assertTrue(timing.awaitLatch(addedLatch));
@@ -161,7 +218,7 @@
}
}
);
- cache.start(true);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
client.delete().forPath("/test/foo");
Assert.assertTrue(removedLatch.await(10, TimeUnit.SECONDS));
@@ -260,7 +317,7 @@
}
}
);
- cache.start(true);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
future.get();
Assert.assertTrue(addedLatch.await(10, TimeUnit.SECONDS));
@@ -362,7 +419,7 @@
}
}
);
- cache.start(true);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
client.delete().forPath("/base/a");
Assert.assertTrue(semaphore.tryAcquire(1, 10, TimeUnit.SECONDS));
@@ -489,7 +546,7 @@
latch.countDown();
}
};
- cache.start(true);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL);
latch.await();