CURATOR-623: Add ConnectionStateListener for ChildrenCache (used by Queues) (#401)
ChildrenCache (used by Queues) didn't have a ConnectionStateListener. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this.
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
index e5c7e8c..a28a1cc 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
@@ -25,6 +25,9 @@
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.PathUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import java.io.Closeable;
@@ -33,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
class ChildrenCache implements Closeable
{
@@ -49,7 +51,7 @@
{
if ( !isClosed.get() )
{
- sync(true);
+ sync();
}
}
};
@@ -66,6 +68,19 @@
}
};
+ private final ConnectionStateListener connectionStateListener = (__, newState) -> {
+ if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
+ try
+ {
+ sync();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
static class Data
{
final List<String> children;
@@ -86,13 +101,15 @@
void start() throws Exception
{
- sync(true);
+ client.getConnectionStateListenable().addListener(connectionStateListener);
+ sync();
}
@Override
public void close() throws IOException
{
client.removeWatchers();
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
isClosed.set(true);
notifyFromCallback();
}
@@ -137,16 +154,9 @@
notifyAll();
}
- private synchronized void sync(boolean watched) throws Exception
+ private synchronized void sync() throws Exception
{
- if ( watched )
- {
- client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
- }
- else
- {
- client.getChildren().inBackground(callback).forPath(path);
- }
+ client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
}
private synchronized void setNewChildren(List<String> newChildren)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java
new file mode 100644
index 0000000..2e3c7a5
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes.queue;
+
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.compatibility.Timing2;
+import org.junit.jupiter.api.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class TestLongNetworkPartition {
+ private static final Timing2 timing = new Timing2();
+
+ // test for https://issues.apache.org/jira/browse/CURATOR-623
+ @Test
+ public void testLongNetworkPartition() throws Exception {
+ final CompletableFuture<Void> done = new CompletableFuture<>();
+ try (final TestingCluster testingCluster = started(new TestingCluster(1));
+ final CuratorFramework dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
+ final DistributedQueue<String> dyingQueue = newQueue(dyingCuratorFramework, item -> {
+ if ( item.equals("0") )
+ {
+ done.complete(null);
+ }
+ }))
+ {
+ dyingQueue.start();
+ testingCluster.killServer(testingCluster.getInstances().iterator().next());
+ timing.forSessionSleep().multiple(2).sleep();
+ testingCluster.restartServer(testingCluster.getInstances().iterator().next());
+ try (final CuratorFramework aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
+ final DistributedQueue<String> aliveQueue = newQueue(aliveCuratorFramework, null))
+ {
+ aliveQueue.start();
+ aliveQueue.put("0");
+ done.get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) {
+ curatorFramework.start();
+ return QueueBuilder.builder(curatorFramework, consumer == null ? null : new QueueConsumer<String>() {
+ @Override
+ public void consumeMessage(String o) {
+ consumer.accept(o);
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ }
+ }, new QueueSerializer<String>() {
+ @Override
+ public byte[] serialize(String item) {
+ return item.getBytes();
+ }
+
+ @Override
+ public String deserialize(byte[] bytes) {
+ return new String(bytes);
+ }
+ }, "/MyChildrenCacheTest/queue").buildQueue();
+ }
+
+ private static TestingCluster started(TestingCluster testingCluster) throws Exception {
+ testingCluster.start();
+ return testingCluster;
+ }
+
+ private static CuratorFramework getCuratorFramework(String connectString) {
+ return CuratorFrameworkFactory.builder()
+ .ensembleProvider(new FixedEnsembleProvider(connectString, true))
+ .sessionTimeoutMs(timing.session())
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+ }
+}
\ No newline at end of file