Merge 'CURATOR-217' into CURATOR-3.0
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
index b098989..383bc13 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
@@ -23,6 +23,7 @@
public static final String PROPERTY_LOG_EVENTS = "curator-log-events";
public static final String PROPERTY_DONT_LOG_CONNECTION_ISSUES = "curator-dont-log-connection-problems";
public static final String PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL = "curator-log-only-first-connection-issue-as-error-level";
+ public static final String PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = "curator-remove-watchers-in-foreground";
public static final String PROPERTY_RETRY_FAILED_TESTS = "curator-retry-failed-tests";
private DebugUtils()
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 9239ac4..58c5bf5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -190,6 +190,12 @@
public SyncBuilder sync();
/**
+ * Start a remove watches builder.
+ * @return builder object
+ */
+ public RemoveWatchesBuilder watches();
+
+ /**
* Returns the listenable interface for the Connect State
*
* @return listenable
@@ -259,7 +265,11 @@
* Call this method on watchers you are no longer interested in.
*
* @param watcher the watcher
+ *
+ * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
+ * when they are no longer used.
*/
+ @Deprecated
public void clearWatcherReferences(Watcher watcher);
/**
@@ -278,4 +288,13 @@
* @throws InterruptedException If interrupted while waiting
*/
public void blockUntilConnected() throws InterruptedException;
+
+ /**
+ * Returns a facade of the current instance that tracks
+ * watchers created and allows a one-shot removal of all watchers
+ * via {@link WatcherRemoveCuratorFramework#removeWatchers()}
+ *
+ * @return facade
+ */
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java
new file mode 100644
index 0000000..871b53c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework;
+
+/**
+ * A CuratorFramework facade that tracks watchers created and allows a one-shot removal of all watchers
+ */
+public interface WatcherRemoveCuratorFramework extends CuratorFramework
+{
+ /**
+ * Remove all outstanding watchers that have been set
+ */
+ void removeWatchers();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietly.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietly.java
new file mode 100644
index 0000000..13202aa
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietly.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface BackgroundPathableQuietly<T> extends BackgroundPathable<T>, Quietly<BackgroundPathable<T>>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietlyable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietlyable.java
new file mode 100644
index 0000000..8ed73fa
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietlyable.java
@@ -0,0 +1,5 @@
+package org.apache.curator.framework.api;
+
+public interface BackgroundPathableQuietlyable<T> extends BackgroundPathable<T>, Quietly<BackgroundPathable<T>>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5a2dc56..5dea211 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -89,6 +89,11 @@
WATCHED,
/**
+ * Corresponds to {@link CuratorFramework#watches()} ()}
+ */
+ REMOVE_WATCHES,
+
+ /**
* Event sent when client is being closed
*/
CLOSING
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
index 3a3faf7..2da1843 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
@@ -18,6 +18,6 @@
*/
package org.apache.curator.framework.api;
-public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable
+public interface DeleteBuilder extends GuaranteeableDeletable, ChildrenDeletable
{
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
index 481911b..bc033ac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
@@ -18,23 +18,15 @@
*/
package org.apache.curator.framework.api;
-public interface Guaranteeable extends BackgroundVersionable
+public interface Guaranteeable<T>
{
/**
- * <p>
- * Solves this edge case: deleting a node can fail due to connection issues. Further,
- * if the node was ephemeral, the node will not get auto-deleted as the session is still valid.
- * This can wreak havoc with lock implementations.
- * </p>
- *
- * <p>
- * When <code>guaranteed</code> is set, Curator will record failed node deletions and
- * attempt to delete them in the background until successful. NOTE: you will still get an
- * exception when the deletion fails. But, you can be assured that as long as the
- * {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node.
- * </p>
+ * Solves edge cases where an operation may succeed on the server but connection failure occurs before a
+ * response can be successfully returned to the client.
+ *
+ * @see org.apache.curator.framework.api.GuaranteeableDeletable
*
* @return this
*/
- public ChildrenDeletable guaranteed();
+ public T guaranteed();
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDeletable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDeletable.java
new file mode 100644
index 0000000..7f8139c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDeletable.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * <p>
+ * Solves this edge case: deleting a node can fail due to connection issues. Further,
+ * if the node was ephemeral, the node will not get auto-deleted as the session is still valid.
+ * This can wreak havoc with lock implementations.
+ * </p>
+ *
+ * <p>
+ * When <code>guaranteed</code> is set, Curator will record failed node deletions and
+ * attempt to delete them in the background until successful. NOTE: you will still get an
+ * exception when the deletion fails. But, you can be assured that as long as the
+ * {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node.
+ * </p>
+ *
+ * @return this
+ */
+public interface GuaranteeableDeletable extends Guaranteeable<ChildrenDeletable>, BackgroundVersionable
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Quietly.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Quietly.java
new file mode 100644
index 0000000..ad3762f
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Quietly.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface Quietly<T>
+{
+ public T quietly();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
new file mode 100644
index 0000000..6cc0b05
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Builder to allow watches to be removed
+ */
+public interface RemoveWatchesBuilder
+{
+ /**
+ * Specify the watcher to be removed
+ * @param watcher
+ * @return
+ */
+ public RemoveWatchesType remove(Watcher watcher);
+
+ /**
+ * Specify the watcher to be removed
+ * @param watcher
+ * @return
+ */
+ public RemoveWatchesType remove(CuratorWatcher watcher);
+
+ /**
+ * Specify that all watches should be removed
+ * @return
+ */
+ public RemoveWatchesType removeAll();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
new file mode 100644
index 0000000..4a67470
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * Builder to allow the specification of whether it is acceptable to remove client side watch information
+ * in the case where ZK cannot be contacted.
+ */
+public interface RemoveWatchesLocal extends BackgroundPathableQuietlyable<Void>
+{
+
+ /**
+ * Specify if the client should just remove client side watches if a connection to ZK
+ * is not available. Note that the standard Curator retry loop will not be used in t
+ * @return
+ */
+ public BackgroundPathableQuietlyable<Void> locally();
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
new file mode 100644
index 0000000..84d8093
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.Watcher.WatcherType;
+
+/**
+ * Builder to allow the specification of whether it is acceptable to remove client side watch information
+ * in the case where ZK cannot be contacted.
+ */
+public interface RemoveWatchesType extends RemoveWatchesLocal, Guaranteeable<BackgroundPathableQuietlyable<Void>>
+{
+
+ /**
+ * Specify the type of watcher to be removed.
+ * @param watcherType
+ * @return
+ */
+ public RemoveWatchesLocal ofType(WatcherType watcherType);
+
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 900374b..41bb7cd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -30,6 +30,7 @@
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
@@ -76,6 +77,7 @@
private final List<AuthInfo> authInfos;
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
+ private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
@@ -128,6 +130,7 @@
authInfos = buildAuths(builder);
failedDeleteManager = new FailedDeleteManager(this);
+ failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
}
@@ -141,6 +144,12 @@
return builder1.build();
}
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
+ {
+ return new WatcherRemovalFacade(this);
+ }
+
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
@@ -180,6 +189,7 @@
connectionStateManager = parent.connectionStateManager;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
+ failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
@@ -479,6 +489,12 @@
return new SyncBuilderImpl(this);
}
+ @Override
+ public RemoveWatchesBuilder watches()
+ {
+ return new RemoveWatchesBuilderImpl(this);
+ }
+
protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
{
BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
@@ -507,6 +523,11 @@
return failedDeleteManager;
}
+ FailedRemoveWatchManager getFailedRemoveWatcherManager()
+ {
+ return failedRemoveWatcherManager;
+ }
+
RetryLoop newRetryLoop()
{
return client.newRetryLoop();
@@ -689,6 +710,11 @@
return Watcher.Event.KeeperState.fromInt(-1);
}
+ WatcherRemovalManager getWatcherRemovalManager()
+ {
+ return null;
+ }
+
private void suspendConnection()
{
if ( !connectionStateManager.setToSuspended() )
@@ -842,7 +868,7 @@
{
try
{
- if ( client.isConnected() )
+ if ( !operationAndData.isConnectionRequired() || client.isConnected() )
{
operationAndData.callPerformBackgroundOperation();
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 51641b8..2a98f56 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -205,7 +205,7 @@
@Override
public void retriesExhausted(OperationAndData<String> operationAndData)
{
- client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+ client.getFailedDeleteManager().addFailedOperation(unfixedPath);
}
};
}
@@ -261,7 +261,7 @@
//Only retry a guaranteed delete if it's a retryable error
if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed )
{
- client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+ client.getFailedDeleteManager().addFailedOperation(unfixedPath);
}
throw e;
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 345bcf5..a6316ac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -141,7 +141,7 @@
}
else
{
- client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
+ client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext());
}
}
@@ -223,7 +223,7 @@
}
else
{
- returnStat = client.getZooKeeper().exists(path, watching.getWatcher());
+ returnStat = client.getZooKeeper().exists(path, watching.getWatcher(client, path));
}
return returnStat;
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
index deb7f40..934ae40 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
@@ -19,45 +19,18 @@
package org.apache.curator.framework.imps;
import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-class FailedDeleteManager
+class FailedDeleteManager extends FailedOperationManager<String>
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
-
- volatile FailedDeleteManagerListener debugListener = null;
-
- interface FailedDeleteManagerListener
- {
- public void pathAddedForDelete(String path);
- }
-
FailedDeleteManager(CuratorFramework client)
{
- this.client = client;
+ super(client);
}
- void addFailedDelete(String path)
+ @Override
+ protected void executeGuaranteedOperationInBackground(String path)
+ throws Exception
{
- if ( debugListener != null )
- {
- debugListener.pathAddedForDelete(path);
- }
-
-
- if ( client.getState() == CuratorFrameworkState.STARTED )
- {
- log.debug("Path being added to guaranteed delete set: " + path);
- try
- {
- client.delete().guaranteed().inBackground().forPath(path);
- }
- catch ( Exception e )
- {
- addFailedDelete(path);
- }
- }
+ client.delete().guaranteed().inBackground().forPath(path);
}
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
new file mode 100644
index 0000000..405561b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+abstract class FailedOperationManager<T>
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ protected final CuratorFramework client;
+
+ @VisibleForTesting
+ volatile FailedOperationManagerListener<T> debugListener = null;
+
+ interface FailedOperationManagerListener<T>
+ {
+ public void pathAddedForGuaranteedOperation(T detail);
+ }
+
+ FailedOperationManager(CuratorFramework client)
+ {
+ this.client = client;
+ }
+
+ void addFailedOperation(T details)
+ {
+ if ( debugListener != null )
+ {
+ debugListener.pathAddedForGuaranteedOperation(details);
+ }
+
+
+ if ( client.getState() == CuratorFrameworkState.STARTED )
+ {
+ log.debug("Details being added to guaranteed operation set: " + details);
+ try
+ {
+ executeGuaranteedOperationInBackground(details);
+ }
+ catch ( Exception e )
+ {
+ addFailedOperation(details);
+ }
+ }
+ }
+
+ protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
new file mode 100644
index 0000000..f635660
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.Watcher;
+
+class FailedRemoveWatchManager extends FailedOperationManager<FailedRemoveWatchManager.FailedRemoveWatchDetails>
+{
+ FailedRemoveWatchManager(CuratorFramework client)
+ {
+ super(client);
+ }
+
+ @Override
+ protected void executeGuaranteedOperationInBackground(FailedRemoveWatchDetails details)
+ throws Exception
+ {
+ if(details.watcher == null)
+ {
+ client.watches().removeAll().guaranteed().inBackground().forPath(details.path);
+ }
+ else
+ {
+ client.watches().remove(details.watcher).guaranteed().inBackground().forPath(details.path);
+ }
+ }
+
+ static class FailedRemoveWatchDetails
+ {
+ public final String path;
+ public final Watcher watcher;
+
+ public FailedRemoveWatchDetails(String path, Watcher watcher)
+ {
+ this.path = path;
+ this.watcher = watcher;
+ }
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 03010ce..7929ba3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -172,7 +172,7 @@
}
else
{
- client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
+ client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext());
}
}
@@ -211,7 +211,7 @@
}
else
{
- children = client.getZooKeeper().getChildren(path, watching.getWatcher(), responseStat);
+ children = client.getZooKeeper().getChildren(path, watching.getWatcher(client, path), responseStat);
}
return children;
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
index a837809..5468bd4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -30,6 +30,7 @@
import org.apache.curator.framework.api.GetConfigBuilder;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@@ -153,7 +154,7 @@
}
else
{
- client.getZooKeeper().getConfig(watching.getWatcher(), callback, backgrounding.getContext());
+ client.getZooKeeper().getConfig(watching.getWatcher(client, ZooDefs.CONFIG_NODE), callback, backgrounding.getContext());
}
}
@@ -174,7 +175,7 @@
{
return client.getZooKeeper().getConfig(true, stat);
}
- return client.getZooKeeper().getConfig(watching.getWatcher(), stat);
+ return client.getZooKeeper().getConfig(watching.getWatcher(client, ZooDefs.CONFIG_NODE), stat);
}
}
);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 23da075..31ad598 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -260,7 +260,7 @@
}
else
{
- client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
+ client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext());
}
}
@@ -299,7 +299,7 @@
}
else
{
- responseData = client.getZooKeeper().getData(path, watching.getWatcher(), responseStat);
+ responseData = client.getZooKeeper().getData(path, watching.getWatcher(client, path), responseStat);
}
return responseData;
}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 38f59a0..b46cddb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -40,25 +40,37 @@
private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
private final long ordinal = nextOrdinal.getAndIncrement();
private final Object context;
+ private final boolean connectionRequired;
interface ErrorCallback<T>
{
void retriesExhausted(OperationAndData<T> operationAndData);
}
-
- OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
{
this.operation = operation;
this.data = data;
this.callback = callback;
this.errorCallback = errorCallback;
this.context = context;
+ this.connectionRequired = connectionRequired;
+ }
+
+ OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+ {
+ this(operation, data, callback, errorCallback, context, true);
}
Object getContext()
{
return context;
}
+
+ boolean isConnectionRequired()
+ {
+ return connectionRequired;
+ }
void callPerformBackgroundOperation() throws Exception
{
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
new file mode 100644
index 0000000..d872ced
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+import org.apache.curator.RetryLoop;
+import org.apache.curator.TimeTrace;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.BackgroundPathable;
+import org.apache.curator.framework.api.BackgroundPathableQuietlyable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.framework.api.RemoveWatchesLocal;
+import org.apache.curator.framework.api.RemoveWatchesBuilder;
+import org.apache.curator.framework.api.RemoveWatchesType;
+import org.apache.curator.utils.DebugUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+
+
+public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String>
+{
+ private CuratorFrameworkImpl client;
+ private Watcher watcher;
+ private WatcherType watcherType;
+ private boolean guaranteed;
+ private boolean local;
+ private boolean quietly;
+ private Backgrounding backgrounding;
+
+ public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ this.watcher = null;
+ this.watcherType = WatcherType.Any;
+ this.guaranteed = false;
+ this.local = false;
+ this.quietly = false;
+ this.backgrounding = new Backgrounding();
+ }
+
+ void internalRemoval(Watcher watcher, String path) throws Exception
+ {
+ this.watcher = watcher;
+ watcherType = WatcherType.Any;
+ quietly = true;
+ guaranteed = true;
+ if ( Boolean.getBoolean(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND) )
+ {
+ this.backgrounding = new Backgrounding();
+ pathInForeground(path);
+ }
+ else
+ {
+ this.backgrounding = new Backgrounding(true);
+ pathInBackground(path);
+ }
+ }
+
+ @Override
+ public RemoveWatchesType remove(Watcher watcher)
+ {
+ if(watcher == null) {
+ this.watcher = null;
+ } else {
+ //Try and get the namespaced version of the watcher.
+ this.watcher = client.getNamespaceWatcherMap().get(watcher);
+
+ //If this is not present then default to the original watcher. This shouldn't happen in practice unless the user
+ //has added a watch directly to the ZK client rather than via the CuratorFramework.
+ if(this.watcher == null) {
+ this.watcher = watcher;
+ }
+ }
+
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesType remove(CuratorWatcher watcher)
+ {
+ this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().get(watcher);
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesType removeAll()
+ {
+ this.watcher = null;
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesLocal ofType(WatcherType watcherType)
+ {
+ this.watcherType = watcherType;
+
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback, Object context)
+ {
+ backgrounding = new Backgrounding(callback, context);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, context, executor);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback)
+ {
+ backgrounding = new Backgrounding(callback);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor)
+ {
+ backgrounding = new Backgrounding(client, callback, executor);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground()
+ {
+ backgrounding = new Backgrounding(true);
+ return this;
+ }
+
+ @Override
+ public Pathable<Void> inBackground(Object context)
+ {
+ backgrounding = new Backgrounding(context);
+ return this;
+ }
+
+ @Override
+ public RemoveWatchesLocal guaranteed()
+ {
+ guaranteed = true;
+ return this;
+ }
+
+ @Override
+ public BackgroundPathableQuietlyable<Void> locally()
+ {
+ local = true;
+ return this;
+ }
+
+ @Override
+ public BackgroundPathable<Void> quietly()
+ {
+ quietly = true;
+ return this;
+ }
+
+ @Override
+ public Void forPath(String path) throws Exception
+ {
+ final String adjustedPath = client.fixForNamespace(path);
+
+ if(backgrounding.inBackground())
+ {
+ pathInBackground(adjustedPath);
+ }
+ else
+ {
+ pathInForeground(adjustedPath);
+ }
+
+ return null;
+ }
+
+ private void pathInBackground(final String path)
+ {
+ OperationAndData.ErrorCallback<String> errorCallback = null;
+
+ //Only need an error callback if we're in guaranteed mode
+ if(guaranteed)
+ {
+ errorCallback = new OperationAndData.ErrorCallback<String>()
+ {
+ @Override
+ public void retriesExhausted(OperationAndData<String> operationAndData)
+ {
+ client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+ }
+ };
+ }
+
+ client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(),
+ errorCallback, backgrounding.getContext(), !local), null);
+ }
+
+ private void pathInForeground(final String path) throws Exception
+ {
+ //For the local case we don't want to use the normal retry loop and we don't want to block until a connection is available.
+ //We just execute the removeWatch, and if it fails, ZK will just remove local watches.
+ if(local)
+ {
+ ZooKeeper zkClient = client.getZooKeeper();
+ if(watcher == null)
+ {
+ zkClient.removeAllWatches(path, watcherType, local);
+ }
+ else
+ {
+ zkClient.removeWatches(path, watcher, watcherType, local);
+ }
+ }
+ else
+ {
+ RetryLoop.callWithRetry(client.getZookeeperClient(),
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ try
+ {
+ ZooKeeper zkClient = client.getZookeeperClient().getZooKeeper();
+
+ if(watcher == null)
+ {
+ zkClient.removeAllWatches(path, watcherType, local);
+ }
+ else
+ {
+ zkClient.removeWatches(path, watcher, watcherType, local);
+ }
+ }
+ catch(Exception e)
+ {
+ if( RetryLoop.isRetryException(e) && guaranteed )
+ {
+ //Setup the guaranteed handler
+ client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));
+ throw e;
+ }
+ else if(e instanceof KeeperException.NoWatcherException && quietly)
+ {
+ //Ignore
+ }
+ else
+ {
+ //Rethrow
+ throw e;
+ }
+ }
+
+ return null;
+ }
+ });
+ }
+ }
+
+ @Override
+ public void performBackgroundOperation(final OperationAndData<String> operationAndData)
+ throws Exception
+ {
+ final TimeTrace trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");
+
+ AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()
+ {
+ @Override
+ public void processResult(int rc, String path, Object ctx)
+ {
+ trace.commit();
+ CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null);
+ client.processBackgroundOperation(operationAndData, event);
+ }
+ };
+
+ ZooKeeper zkClient = client.getZooKeeper();
+ if(watcher == null)
+ {
+ zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());
+ }
+ else
+ {
+ zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext());
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
new file mode 100644
index 0000000..156341e
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.ZooKeeper;
+
+class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
+{
+ private final CuratorFrameworkImpl client;
+ private final WatcherRemovalManager removalManager;
+
+ WatcherRemovalFacade(CuratorFrameworkImpl client)
+ {
+ super(client);
+ this.client = client;
+ removalManager = new WatcherRemovalManager(client);
+ }
+
+ @Override
+ public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
+ {
+ return client.newWatcherRemoveCuratorFramework();
+ }
+
+ WatcherRemovalManager getRemovalManager()
+ {
+ return removalManager;
+ }
+
+ @Override
+ public void removeWatchers()
+ {
+ removalManager.removeWatchers();
+ }
+
+ @Override
+ WatcherRemovalManager getWatcherRemovalManager()
+ {
+ return removalManager;
+ }
+
+ @Override
+ public CuratorFramework nonNamespaceView()
+ {
+ return client.nonNamespaceView();
+ }
+
+ @Override
+ public CuratorFramework usingNamespace(String newNamespace)
+ {
+ return client.usingNamespace(newNamespace);
+ }
+
+ @Override
+ public String getNamespace()
+ {
+ return client.getNamespace();
+ }
+
+ @Override
+ public void start()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Listenable<ConnectionStateListener> getConnectionStateListenable()
+ {
+ return client.getConnectionStateListenable();
+ }
+
+ @Override
+ public Listenable<CuratorListener> getCuratorListenable()
+ {
+ return client.getCuratorListenable();
+ }
+
+ @Override
+ public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
+ {
+ return client.getUnhandledErrorListenable();
+ }
+
+ @Override
+ public void sync(String path, Object context)
+ {
+ client.sync(path, context);
+ }
+
+ @Override
+ public CuratorZookeeperClient getZookeeperClient()
+ {
+ return client.getZookeeperClient();
+ }
+
+ @Override
+ RetryLoop newRetryLoop()
+ {
+ return client.newRetryLoop();
+ }
+
+ @Override
+ ZooKeeper getZooKeeper() throws Exception
+ {
+ return client.getZooKeeper();
+ }
+
+ @Override
+ <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
+ {
+ client.processBackgroundOperation(operationAndData, event);
+ }
+
+ @Override
+ void logError(String reason, Throwable e)
+ {
+ client.logError(reason, e);
+ }
+
+ @Override
+ String unfixForNamespace(String path)
+ {
+ return client.unfixForNamespace(path);
+ }
+
+ @Override
+ String fixForNamespace(String path)
+ {
+ return client.fixForNamespace(path);
+ }
+
+ @Override
+ public EnsurePath newNamespaceAwareEnsurePath(String path)
+ {
+ return client.newNamespaceAwareEnsurePath(path);
+ }
+
+ @Override
+ FailedDeleteManager getFailedDeleteManager()
+ {
+ return client.getFailedDeleteManager();
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
new file mode 100644
index 0000000..a691a94
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WatcherRemovalManager
+{
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorFrameworkImpl client;
+ private final Set<WrappedWatcher> entries = Sets.newHashSet(); // guarded by sync
+
+ WatcherRemovalManager(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ synchronized Watcher add(String path, Watcher watcher)
+ {
+ path = Preconditions.checkNotNull(path, "path cannot be null");
+ watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null");
+
+ WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path);
+ entries.add(wrappedWatcher);
+ return wrappedWatcher;
+ }
+
+ @VisibleForTesting
+ synchronized Set<? extends Watcher> getEntries()
+ {
+ return Sets.newHashSet(entries);
+ }
+
+ void removeWatchers()
+ {
+ HashSet<WrappedWatcher> localEntries;
+ synchronized(this)
+ {
+ localEntries = Sets.newHashSet(entries);
+ }
+ for ( WrappedWatcher entry : localEntries )
+ {
+ try
+ {
+ log.debug("Removing watcher for path: " + entry.path);
+ RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
+ builder.internalRemoval(entry, entry.path);
+ }
+ catch ( Exception e )
+ {
+ log.error("Could not remove watcher for path: " + entry.path);
+ }
+ }
+ }
+
+ private synchronized void internalRemove(WrappedWatcher entry)
+ {
+ entries.remove(entry);
+ }
+
+ private class WrappedWatcher implements Watcher
+ {
+ private final Watcher watcher;
+ private final String path;
+
+ WrappedWatcher(Watcher watcher, String path)
+ {
+ this.watcher = watcher;
+ this.path = path;
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() != Event.EventType.None )
+ {
+ internalRemove(this);
+ }
+ watcher.process(event);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ WrappedWatcher entry = (WrappedWatcher)o;
+
+ //noinspection SimplifiableIfStatement
+ if ( !watcher.equals(entry.watcher) )
+ {
+ return false;
+ }
+ return path.equals(entry.path);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = watcher.hashCode();
+ result = 31 * result + path.hashCode();
+ return result;
+ }
+ }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index a9d0ab1..4bebbd5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -50,8 +50,12 @@
watched = false;
}
- Watcher getWatcher()
+ Watcher getWatcher(CuratorFrameworkImpl client, String unfixedPath)
{
+ if ( (watcher != null) && (client.getWatcherRemovalManager() != null) )
+ {
+ return client.getWatcherRemovalManager().add(unfixedPath, watcher);
+ }
return watcher;
}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
index 6599745..943529f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
@@ -22,7 +22,6 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.imps.FailedDeleteManager.FailedDeleteManagerListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -291,11 +290,11 @@
final AtomicBoolean pathAdded = new AtomicBoolean(false);
- ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+ ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
{
@Override
- public void pathAddedForDelete(String path)
+ public void pathAddedForGuaranteedOperation(String path)
{
pathAdded.set(true);
}
@@ -325,11 +324,11 @@
final AtomicBoolean pathAdded = new AtomicBoolean(false);
- ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+ ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
{
@Override
- public void pathAddedForDelete(String path)
+ public void pathAddedForGuaranteedOperation(String path)
{
pathAdded.set(true);
}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
new file mode 100644
index 0000000..4e02e95
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -0,0 +1,655 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.imps.FailedRemoveWatchManager.FailedRemoveWatchDetails;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestRemoveWatches extends BaseClassForTests
+{
+ private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)
+ {
+ final AtomicReference<ConnectionState> state = new AtomicReference<ConnectionState>();
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ state.set(newState);
+ synchronized(state)
+ {
+ state.notify();
+ }
+ }
+ });
+
+ return state;
+ }
+
+ private boolean blockUntilDesiredConnectionState(AtomicReference<ConnectionState> stateRef, Timing timing, final ConnectionState desiredState)
+ {
+ if(stateRef.get() == desiredState)
+ {
+ return true;
+ }
+
+ synchronized(stateRef)
+ {
+ if(stateRef.get() == desiredState)
+ {
+ return true;
+ }
+
+ try
+ {
+ stateRef.wait(timing.milliseconds());
+ return stateRef.get() == desiredState;
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ }
+
+ @Test
+ public void testRemoveCuratorDefaultWatcher() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ final String path = "/";
+ client.getCuratorListenable().addListener(new CuratorListener()
+ {
+ @Override
+ public void eventReceived(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ if(event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved) {
+ removedLatch.countDown();
+ }
+ }
+ });
+
+ client.checkExists().watched().forPath(path);
+
+ client.watches().removeAll().forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveCuratorWatch() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ final String path = "/";
+ CuratorWatcher watcher = new CuratorWatcher()
+ {
+
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) {
+ removedLatch.countDown();
+ }
+ }
+ };
+
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.watches().remove(watcher).forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveWatch() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ final String path = "/";
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.watches().remove(watcher).forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveWatchInBackgroundWithCallback() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ //Make sure that the event fires on both the watcher and the callback.
+ final CountDownLatch removedLatch = new CountDownLatch(2);
+ final String path = "/";
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event)
+ throws Exception
+ {
+ if(event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path)) {
+ removedLatch.countDown();
+ }
+ }
+ };
+
+
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveWatchInBackgroundWithNoCallback() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final String path = "/";
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ client.watches().remove(watcher).inBackground().forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveAllWatches() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final String path = "/";
+ final CountDownLatch removedLatch = new CountDownLatch(2);
+
+ Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
+ Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ client.getChildren().usingWatcher(watcher1).forPath(path);
+ client.checkExists().usingWatcher(watcher2).forPath(path);
+
+ client.watches().removeAll().forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveAllDataWatches() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final String path = "/";
+ final AtomicBoolean removedFlag = new AtomicBoolean(false);
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved);
+ Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ client.getChildren().usingWatcher(watcher1).forPath(path);
+ client.checkExists().usingWatcher(watcher2).forPath(path);
+
+ client.watches().removeAll().ofType(WatcherType.Data).forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ Assert.assertEquals(removedFlag.get(), false);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveAllChildWatches() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final String path = "/";
+ final AtomicBoolean removedFlag = new AtomicBoolean(false);
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved);
+ Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);
+
+ client.checkExists().usingWatcher(watcher1).forPath(path);
+ client.getChildren().usingWatcher(watcher2).forPath(path);
+
+ client.watches().removeAll().ofType(WatcherType.Children).forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ Assert.assertEquals(removedFlag.get(), false);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveLocalWatch() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
+
+ final String path = "/";
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ //Stop the server so we can check if we can remove watches locally when offline
+ server.stop();
+
+ Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
+
+ client.watches().removeAll().locally().forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testRemoveLocalWatchInBackground() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
+
+ final String path = "/";
+
+ final CountDownLatch removedLatch = new CountDownLatch(1);
+
+ Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);
+
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ //Stop the server so we can check if we can remove watches locally when offline
+ server.stop();
+
+ Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
+
+ client.watches().removeAll().locally().inBackground().forPath(path);
+
+ Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we try and remove an unregistered watcher. In this case we expect a NoWatcherException to
+ * be thrown.
+ * @throws Exception
+ */
+ @Test(expectedExceptions=KeeperException.NoWatcherException.class)
+ public void testRemoveUnregisteredWatcher() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final String path = "/";
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ }
+ };
+
+ client.watches().remove(watcher).forPath(path);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we try and remove an unregistered watcher but have the quietly flag set. In this case we expect success.
+ * @throws Exception
+ */
+ @Test
+ public void testRemoveUnregisteredWatcherQuietly() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ final AtomicBoolean watcherRemoved = new AtomicBoolean(false);
+
+ final String path = "/";
+ Watcher watcher = new BooleanWatcher(path, watcherRemoved, EventType.DataWatchRemoved);
+
+ client.watches().remove(watcher).quietly().forPath(path);
+
+ timing.sleepABit();
+
+ //There should be no watcher removed as none were registered.
+ Assert.assertEquals(watcherRemoved.get(), false);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testGuaranteedRemoveWatch() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+ try
+ {
+ client.start();
+
+ AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
+
+ String path = "/";
+
+ CountDownLatch removeLatch = new CountDownLatch(1);
+
+ Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ server.stop();
+
+ Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
+
+ //Remove the watch while we're not connected
+ try
+ {
+ client.watches().remove(watcher).guaranteed().forPath(path);
+ Assert.fail();
+ }
+ catch(KeeperException.ConnectionLossException e)
+ {
+ //Expected
+ }
+
+ server.restart();
+
+ timing.awaitLatch(removeLatch);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testGuaranteedRemoveWatchInBackground() throws Exception {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(),
+ new ExponentialBackoffRetry(100, 3));
+ try
+ {
+ client.start();
+
+ AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);
+
+ final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);
+
+ ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()
+ {
+
+ @Override
+ public void pathAddedForGuaranteedOperation(
+ FailedRemoveWatchDetails detail)
+ {
+ guaranteeAddedLatch.countDown();
+ }
+ };
+
+ String path = "/";
+
+ CountDownLatch removeLatch = new CountDownLatch(1);
+
+ Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);
+ client.checkExists().usingWatcher(watcher).forPath(path);
+
+ server.stop();
+ Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));
+
+ //Remove the watch while we're not connected
+ client.watches().remove(watcher).guaranteed().inBackground().forPath(path);
+
+ timing.awaitLatch(guaranteeAddedLatch);
+
+ server.restart();
+
+ timing.awaitLatch(removeLatch);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ private static class CountDownWatcher implements Watcher {
+ private String path;
+ private EventType eventType;
+ private CountDownLatch removeLatch;
+
+ public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) {
+ this.path = path;
+ this.eventType = eventType;
+ this.removeLatch = removeLatch;
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if(event.getPath() == null || event.getType() == null) {
+ return;
+ }
+
+ if(event.getPath().equals(path) && event.getType() == eventType) {
+ removeLatch.countDown();
+ }
+ }
+ }
+
+ private static class BooleanWatcher implements Watcher {
+ private String path;
+ private EventType eventType;
+ private AtomicBoolean removedFlag;
+
+ public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) {
+ this.path = path;
+ this.eventType = eventType;
+ this.removedFlag = removedFlag;
+ }
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if(event.getPath() == null || event.getType() == null) {
+ return;
+ }
+
+ if(event.getPath().equals(path) && event.getType() == eventType) {
+ removedFlag.set(true);
+ }
+ }
+ }
+}
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
new file mode 100644
index 0000000..e20c450
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.test.WatchersDebug;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class TestWatcherRemovalManager extends BaseClassForTests
+{
+ @Test
+ public void testBasic() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ internalTryBasic(client);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBasicNamespace1() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ internalTryBasic(client.usingNamespace("foo"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBasicNamespace2() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .retryPolicy(new RetryOneTime(1))
+ .namespace("hey")
+ .build();
+ try
+ {
+ client.start();
+ internalTryBasic(client);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testBasicNamespace3() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder()
+ .connectString(server.getConnectString())
+ .retryPolicy(new RetryOneTime(1))
+ .namespace("hey")
+ .build();
+ try
+ {
+ client.start();
+ internalTryBasic(client.usingNamespace("lakjsf"));
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testSameWatcher() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ // NOP
+ }
+ };
+
+ removerClient.getData().usingWatcher(watcher).forPath("/");
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+ removerClient.getData().usingWatcher(watcher).forPath("/");
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testTriggered() throws Exception
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() == Event.EventType.NodeCreated )
+ {
+ latch.countDown();
+ }
+ }
+ };
+
+ removerClient.checkExists().usingWatcher(watcher).forPath("/yo");
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+ removerClient.create().forPath("/yo");
+
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testResetFromWatcher() throws Exception
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+
+ final WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+
+ final CountDownLatch createdLatch = new CountDownLatch(1);
+ final CountDownLatch deletedLatch = new CountDownLatch(1);
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() == Event.EventType.NodeCreated )
+ {
+ try
+ {
+ removerClient.checkExists().usingWatcher(this).forPath("/yo");
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ createdLatch.countDown();
+ }
+ else if ( event.getType() == Event.EventType.NodeDeleted )
+ {
+ deletedLatch.countDown();
+ }
+ }
+ };
+
+ removerClient.checkExists().usingWatcher(watcher).forPath("/yo");
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+ removerClient.create().forPath("/yo");
+
+ Assert.assertTrue(timing.awaitLatch(createdLatch));
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+
+ removerClient.delete().forPath("/yo");
+
+ Assert.assertTrue(timing.awaitLatch(deletedLatch));
+
+ Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ private void internalTryBasic(CuratorFramework client) throws Exception
+ {
+ WatcherRemoveCuratorFramework removerClient = client.newWatcherRemoveCuratorFramework();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ if ( event.getType() == Event.EventType.DataWatchRemoved )
+ {
+ latch.countDown();
+ }
+ }
+ };
+ removerClient.checkExists().usingWatcher(watcher).forPath("/hey");
+
+ List<String> existWatches = WatchersDebug.getExistWatches(client.getZookeeperClient().getZooKeeper());
+ Assert.assertEquals(existWatches.size(), 1);
+
+ removerClient.removeWatchers();
+
+ Assert.assertTrue(new Timing().awaitLatch(latch));
+
+ existWatches = WatchersDebug.getExistWatches(client.getZookeeperClient().getZooKeeper());
+ Assert.assertEquals(existWatches.size(), 0);
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 72ee5ff..49b9a3f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -23,6 +23,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
@@ -53,7 +54,7 @@
public class NodeCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String path;
private final boolean dataIsCompressed;
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
@@ -127,7 +128,7 @@
*/
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
this.dataIsCompressed = dataIsCompressed;
}
@@ -169,6 +170,7 @@
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
+ client.removeWatchers();
listeners.clear();
}
client.getConnectionStateListenable().removeListener(connectionStateListener);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index b5d912c..99a652d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -27,6 +27,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
@@ -42,16 +43,13 @@
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -67,7 +65,7 @@
public class PathChildrenCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String path;
private final CloseableExecutorService executorService;
private final boolean cacheData;
@@ -215,7 +213,7 @@
*/
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
@@ -374,6 +372,7 @@
executorService.close();
client.clearWatcherReferences(childrenWatcher);
client.clearWatcherReferences(dataWatcher);
+ client.removeWatchers();
// TODO
// This seems to enable even more GC - I'm not sure why yet - it
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 4f3ffb6..bda00bf 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -480,7 +481,7 @@
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
private final TreeNode root;
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final CloseableExecutorService executorService;
private final boolean cacheData;
private final boolean dataIsCompressed;
@@ -530,7 +531,7 @@
{
this.createParentNodes = createParentNodes;
this.root = new TreeNode(validatePath(path), null);
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
this.maxDepth = maxDepth;
@@ -566,6 +567,7 @@
{
if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) )
{
+ client.removeWatchers();
client.getConnectionStateListenable().removeListener(connectionStateListener);
listeners.clear();
executorService.close();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index a6d8145..da9b8b2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -23,6 +23,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
@@ -63,7 +64,7 @@
public class LeaderLatch implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String latchPath;
private final String id;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -143,7 +144,7 @@
*/
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
{
- this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();
this.latchPath = PathUtils.validatePath(latchPath);
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
@@ -206,6 +207,7 @@
try
{
setNode(null);
+ client.removeWatchers();
}
catch ( Exception e )
{
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 88b5f5d..444b10d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import java.util.concurrent.TimeUnit;
/**
@@ -29,6 +30,7 @@
public class InterProcessSemaphoreMutex implements InterProcessLock
{
private final InterProcessSemaphoreV2 semaphore;
+ private final WatcherRemoveCuratorFramework watcherRemoveClient;
private volatile Lease lease;
/**
@@ -37,7 +39,8 @@
*/
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
{
- this.semaphore = new InterProcessSemaphoreV2(client, path, 1);
+ watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
+ this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
}
@Override
@@ -66,6 +69,7 @@
try
{
lease.close();
+ watcherRemoveClient.removeWatchers();
}
finally
{
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index f4af39b..3bf2ec3 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -22,15 +22,16 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -38,13 +39,13 @@
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.curator.utils.PathUtils;
/**
* <p>
@@ -78,7 +79,7 @@
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final InterProcessMutex lock;
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String leasesPath;
private final Watcher watcher = new Watcher()
{
@@ -122,7 +123,7 @@
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
path = PathUtils.validatePath(path);
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
@@ -352,36 +353,43 @@
String nodeName = ZKPaths.getNodeFromPath(path);
builder.add(makeLease(path));
- synchronized(this)
+ try
{
- for(;;)
+ synchronized(this)
{
- List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
- if ( !children.contains(nodeName) )
+ for(;;)
{
- log.error("Sequential path not found: " + path);
- return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
- }
-
- if ( children.size() <= maxLeases )
- {
- break;
- }
- if ( hasWait )
- {
- long thisWaitMs = getThisWaitMs(startMs, waitMs);
- if ( thisWaitMs <= 0 )
+ List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+ if ( !children.contains(nodeName) )
{
- return InternalAcquireResult.RETURN_NULL;
+ log.error("Sequential path not found: " + path);
+ return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
- wait(thisWaitMs);
- }
- else
- {
- wait();
+
+ if ( children.size() <= maxLeases )
+ {
+ break;
+ }
+ if ( hasWait )
+ {
+ long thisWaitMs = getThisWaitMs(startMs, waitMs);
+ if ( thisWaitMs <= 0 )
+ {
+ return InternalAcquireResult.RETURN_NULL;
+ }
+ wait(thisWaitMs);
+ }
+ else
+ {
+ wait();
+ }
}
}
}
+ finally
+ {
+ client.removeWatchers();
+ }
}
finally
{
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 2b4d3d9..4b0da11 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@ -24,11 +24,11 @@
import com.google.common.collect.Lists;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -42,7 +42,7 @@
public class LockInternals
{
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String path;
private final String basePath;
private final LockInternalsDriver driver;
@@ -100,7 +100,7 @@
this.lockName = lockName;
this.maxLeases = maxLeases;
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
@@ -116,8 +116,9 @@
revocable.set(entry);
}
- void releaseLock(String lockPath) throws Exception
+ final void releaseLock(String lockPath) throws Exception
{
+ client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 0d963e0..0b482ef 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateModable;
@@ -58,7 +59,7 @@
{
private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
private final String basePath;
@@ -212,7 +213,7 @@
*/
public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] initData)
{
- this.client = Preconditions.checkNotNull(client, "client cannot be null");
+ this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(basePath);
this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
@@ -314,6 +315,8 @@
{
throw new IOException(e);
}
+
+ client.removeWatchers();
}
/**
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
index 032dc7a..e5c7e8c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
@@ -36,7 +37,7 @@
class ChildrenCache implements Closeable
{
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String path;
private final AtomicReference<Data> children = new AtomicReference<Data>(new Data(Lists.<String>newArrayList(), 0));
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -79,7 +80,7 @@
ChildrenCache(CuratorFramework client, String path)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
}
@@ -91,6 +92,7 @@
@Override
public void close() throws IOException
{
+ client.removeWatchers();
isClosed.set(true);
notifyFromCallback();
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 6ce6bf4..dcd30ba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -22,6 +22,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.listen.ListenerContainer;
import org.apache.curator.framework.state.ConnectionState;
@@ -45,7 +46,7 @@
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>();
- private final CuratorFramework client;
+ private final WatcherRemoveCuratorFramework client;
private final String path;
private final byte[] seedValue;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -87,7 +88,7 @@
*/
public SharedValue(CuratorFramework client, String path, byte[] seedValue)
{
- this.client = client;
+ this.client = client.newWatcherRemoveCuratorFramework();
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(0, Arrays.copyOf(seedValue, seedValue.length)));
@@ -233,8 +234,9 @@
@Override
public void close() throws IOException
{
- client.getConnectionStateListenable().removeListener(connectionStateListener);
state.set(State.CLOSED);
+ client.removeWatchers();
+ client.getConnectionStateListenable().removeListener(connectionStateListener);
listeners.clear();
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
new file mode 100644
index 0000000..82de1fc
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.WatchersDebug;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.ZooKeeper;
+
+public class TestCleanState
+{
+ public static void closeAndTestClean(CuratorFramework client)
+ {
+ if ( client == null )
+ {
+ return;
+ }
+
+ try
+ {
+ CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
+ ZooKeeper zooKeeper = internalClient.getZooKeeper();
+ if ( zooKeeper != null )
+ {
+ if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
+ {
+ throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper));
+ }
+ if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
+ {
+ throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper));
+ }
+ if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
+ {
+ throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper));
+ }
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace(); // not sure what to do here
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ private TestCleanState()
+ {
+ }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index ab37785..f32c9b2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -22,6 +22,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
@@ -123,7 +124,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
finally
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index 27af6ac..d6d495a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.framework.recipes.cache;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -98,7 +99,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -161,7 +162,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -204,7 +205,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -252,7 +253,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 14d061f..3571ca7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -28,7 +28,7 @@
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.Pathable;
import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
@@ -97,7 +97,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -139,7 +139,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -193,7 +193,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -242,7 +242,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -251,6 +251,7 @@
{
Timing timing = new Timing();
+ PathChildrenCache cache = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
@@ -258,7 +259,7 @@
final CountDownLatch updatedLatch = new CountDownLatch(1);
final CountDownLatch addedLatch = new CountDownLatch(1);
client.create().creatingParentsIfNeeded().forPath("/test");
- PathChildrenCache cache = new PathChildrenCache(client, "/test", false);
+ cache = new PathChildrenCache(client, "/test", false);
cache.getListenable().addListener
(
new PathChildrenCacheListener()
@@ -287,7 +288,8 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -315,7 +317,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -391,7 +393,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -485,7 +487,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -494,6 +496,7 @@
public void testIssue27() throws Exception
{
Timing timing = new Timing();
+ PathChildrenCache cache = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
@@ -507,7 +510,7 @@
final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
final Semaphore semaphore = new Semaphore(0);
- PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+ cache = new PathChildrenCache(client, "/base", true);
cache.getListenable().addListener
(
new PathChildrenCacheListener()
@@ -542,7 +545,8 @@
}
finally
{
- client.close();
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -551,6 +555,7 @@
public void testIssue27Alt() throws Exception
{
Timing timing = new Timing();
+ PathChildrenCache cache = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
@@ -564,7 +569,7 @@
final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
final Semaphore semaphore = new Semaphore(0);
- PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+ cache = new PathChildrenCache(client, "/base", true);
cache.getListenable().addListener
(
new PathChildrenCacheListener()
@@ -594,7 +599,8 @@
}
finally
{
- client.close();
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -602,6 +608,7 @@
public void testKilledSession() throws Exception
{
Timing timing = new Timing();
+ PathChildrenCache cache = null;
CuratorFramework client = null;
try
{
@@ -609,7 +616,7 @@
client.start();
client.create().forPath("/test");
- PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
+ cache = new PathChildrenCache(client, "/test", true);
cache.start();
final CountDownLatch childAddedLatch = new CountDownLatch(1);
@@ -653,7 +660,8 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(cache);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -677,7 +685,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -721,7 +729,7 @@
finally
{
CloseableUtils.closeQuietly(cache);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -809,7 +817,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -878,7 +886,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -911,8 +919,9 @@
timing.sleepABit();
Assert.assertFalse(exec.isExecuteCalled());
}
- finally {
- client.close();
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -957,9 +966,10 @@
latch.await(5, TimeUnit.SECONDS);
Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
- } finally
+ }
+ finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 96e6d45..3742fb7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -24,6 +24,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
@@ -96,7 +97,7 @@
finally
{
CloseableUtils.closeQuietly(latch);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -126,7 +127,7 @@
finally
{
CloseableUtils.closeQuietly(latch);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -158,7 +159,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -213,7 +214,7 @@
{
CloseableUtils.closeQuietly(latch);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -256,9 +257,8 @@
{
CloseableUtils.closeQuietly(latch);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
-
}
@Test
@@ -320,7 +320,7 @@
finally
{
executorService.shutdownNow();
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -416,7 +416,7 @@
CloseableUtils.closeQuietly(latch);
}
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -504,7 +504,7 @@
CloseableUtils.closeQuietly(latch);
}
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -583,7 +583,7 @@
{
CloseableUtils.closeQuietly(notifiedLeader);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -639,7 +639,7 @@
finally
{
CloseableUtils.closeQuietly(leader);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
CloseableUtils.closeQuietly(server);
}
}
@@ -709,7 +709,7 @@
{
CloseableUtils.closeQuietly(latch);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -745,6 +745,6 @@
{
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
- LeaderLatch latch = new LeaderLatch(client, "parent");
+ new LeaderLatch(client, "parent");
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index 09b5fe6..dceff88 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -20,6 +20,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -84,13 +85,14 @@
}
catch ( Exception e )
{
+ // ignore
}
Assert.assertFalse(goodLock.isAcquiredInThisProcess());
Assert.assertTrue(otherGoodLock.isAcquiredInThisProcess());
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -142,13 +144,14 @@
}
catch ( Exception e )
{
+ // ignore
}
Assert.assertFalse(goodLock.isAcquiredInThisProcess());
Assert.assertTrue(goodLockWasLocked.get());
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index a2c079e..c37d88d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -20,6 +20,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.KillSession;
import org.apache.zookeeper.CreateMode;
@@ -106,7 +107,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -151,7 +152,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 99ea11f..f44d238 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -22,12 +22,12 @@
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
@@ -126,7 +126,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -278,7 +278,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -344,7 +344,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -390,7 +390,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -407,7 +407,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -539,11 +539,14 @@
Assert.assertTrue(acquiredLatchForClient1.await(10, TimeUnit.SECONDS));
Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess());
}
+
+ future1.get();
+ future2.get();
}
finally
{
- CloseableUtils.closeQuietly(client1);
- CloseableUtils.closeQuietly(client2);
+ TestCleanState.closeAndTestClean(client1);
+ TestCleanState.closeAndTestClean(client2);
}
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index f7636ed..48e4805 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -31,6 +32,7 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -40,21 +42,22 @@
public class TestInterProcessReadWriteLock extends BaseClassForTests
{
@Test
- public void testGetParticipantNodes() throws Exception
+ public void testGetParticipantNodes() throws Exception
{
- final int READERS = 20;
- final int WRITERS = 8;
+ final int READERS = 20;
+ final int WRITERS = 8;
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
- final CountDownLatch readLatch = new CountDownLatch(READERS);
- final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+ final CountDownLatch readLatch = new CountDownLatch(READERS);
+ final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- ExecutorService service = Executors.newCachedThreadPool();
+ final CountDownLatch exitLatch = new CountDownLatch(1);
+ ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
for ( int i = 0; i < READERS; ++i )
{
service.submit
@@ -65,8 +68,16 @@
public Void call() throws Exception
{
lock.readLock().acquire();
- latch.countDown();
- readLatch.countDown();
+ try
+ {
+ latch.countDown();
+ readLatch.countDown();
+ exitLatch.await();
+ }
+ finally
+ {
+ lock.readLock().release();
+ }
return null;
}
}
@@ -84,6 +95,14 @@
Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
latch.countDown(); // must be before as there can only be one writer
lock.writeLock().acquire();
+ try
+ {
+ exitLatch.await();
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
return null;
}
}
@@ -97,22 +116,28 @@
Assert.assertEquals(readers.size(), READERS);
Assert.assertEquals(writers.size(), WRITERS);
+
+ exitLatch.countDown();
+ for ( int i = 0; i < (READERS + WRITERS); ++i )
+ {
+ service.take().get();
+ }
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testThatUpgradingIsDisallowed() throws Exception
+ public void testThatUpgradingIsDisallowed() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
lock.readLock().acquire();
Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
@@ -120,70 +145,80 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testThatDowngradingRespectsThreads() throws Exception
+ public void testThatDowngradingRespectsThreads() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- ExecutorService t1 = Executors.newSingleThreadExecutor();
- ExecutorService t2 = Executors.newSingleThreadExecutor();
+ final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ ExecutorService t1 = Executors.newSingleThreadExecutor();
+ ExecutorService t2 = Executors.newSingleThreadExecutor();
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
- Future<Object> f1 = t1.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ final CountDownLatch releaseLatch = new CountDownLatch(1);
+ Future<Object> f1 = t1.submit
+ (
+ new Callable<Object>()
{
- lock.writeLock().acquire();
- latch.countDown();
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ lock.writeLock().acquire();
+ latch.countDown();
+ try
+ {
+ releaseLatch.await();
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ return null;
+ }
}
- }
- );
+ );
- Future<Object> f2 = t2.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ Future<Object> f2 = t2.submit
+ (
+ new Callable<Object>()
{
- Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
- Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+ return null;
+ }
}
- }
- );
+ );
- f1.get();
f2.get();
+ releaseLatch.countDown();
+ f1.get();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testDowngrading() throws Exception
+ public void testDowngrading() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
lock.writeLock().acquire();
Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
lock.writeLock().release();
@@ -192,60 +227,60 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@Test
- public void testBasic() throws Exception
+ public void testBasic() throws Exception
{
- final int CONCURRENCY = 8;
- final int ITERATIONS = 100;
+ final int CONCURRENCY = 8;
+ final int ITERATIONS = 100;
- final Random random = new Random();
- final AtomicInteger concurrentCount = new AtomicInteger(0);
- final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
- final AtomicInteger writeCount = new AtomicInteger(0);
- final AtomicInteger readCount = new AtomicInteger(0);
+ final Random random = new Random();
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger writeCount = new AtomicInteger(0);
+ final AtomicInteger readCount = new AtomicInteger(0);
- List<Future<Void>> futures = Lists.newArrayList();
- ExecutorService service = Executors.newCachedThreadPool();
+ List<Future<Void>> futures = Lists.newArrayList();
+ ExecutorService service = Executors.newCachedThreadPool();
for ( int i = 0; i < CONCURRENCY; ++i )
{
- Future<Void> future = service.submit
- (
- new Callable<Void>()
- {
- @Override
- public Void call() throws Exception
+ Future<Void> future = service.submit
+ (
+ new Callable<Void>()
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
- client.start();
- try
+ @Override
+ public Void call() throws Exception
{
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
- for ( int i = 0; i < ITERATIONS; ++i )
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ try
{
- if ( random.nextInt(100) < 10 )
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+ for ( int i = 0; i < ITERATIONS; ++i )
{
- doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
- writeCount.incrementAndGet();
- }
- else
- {
- doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
- readCount.incrementAndGet();
+ if ( random.nextInt(100) < 10 )
+ {
+ doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+ writeCount.incrementAndGet();
+ }
+ else
+ {
+ doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+ readCount.incrementAndGet();
+ }
}
}
+ finally
+ {
+ TestCleanState.closeAndTestClean(client);
+ }
+ return null;
}
- finally
- {
- CloseableUtils.closeQuietly(client);
- }
- return null;
}
- }
- );
+ );
futures.add(future);
}
@@ -262,17 +297,17 @@
}
@Test
- public void testSetNodeData() throws Exception
+ public void testSetNodeData() throws Exception
{
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
try
{
client.start();
- final byte[] nodeData = new byte[] { 1, 2, 3, 4 };
+ final byte[] nodeData = new byte[]{1, 2, 3, 4};
- InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData);
+ InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData);
// mutate passed-in node data, lock has made copy
nodeData[0] = 5;
@@ -284,13 +319,13 @@
byte dataInZk[] = client.getData().forPath("/lock/" + children.get(0));
Assert.assertNotNull(dataInZk);
- Assert.assertEquals(new byte[] { 1, 2, 3, 4 }, dataInZk);
+ Assert.assertEquals(new byte[]{1, 2, 3, 4}, dataInZk);
lock.writeLock().release();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -299,7 +334,7 @@
try
{
Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
- int localConcurrentCount;
+ int localConcurrentCount;
synchronized(this)
{
localConcurrentCount = concurrentCount.incrementAndGet();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 631b7c7..3ba75d8 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,13 +20,14 @@
package org.apache.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collection;
@@ -100,10 +101,12 @@
future1.get();
future2.get();
+
+ count.close();
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -140,8 +143,8 @@
}
finally
{
- CloseableUtils.closeQuietly(client1);
- CloseableUtils.closeQuietly(client2);
+ TestCleanState.closeAndTestClean(client1);
+ TestCleanState.closeAndTestClean(client2);
}
}
@@ -226,7 +229,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
return null;
}
@@ -299,7 +302,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
return null;
}
@@ -401,7 +404,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -445,7 +448,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -463,7 +466,7 @@
}
finally
{
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -499,7 +502,7 @@
{
CloseableUtils.closeQuietly(l);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -528,7 +531,7 @@
{
CloseableUtils.closeQuietly(l);
}
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index 2aa8a72..f4cb7bb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -22,6 +22,7 @@
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -147,7 +148,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
return null;
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
index 2d9a9aa..d1e6db5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.locks;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.RetryPolicy;
@@ -74,7 +75,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
index 457be75..dc14c11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
@@ -20,6 +20,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
@@ -67,7 +68,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 9f5907a..c81cc65 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,17 +20,16 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
@@ -64,6 +63,7 @@
private final Timing timing = new Timing();
@AfterMethod
+ @Override
public void teardown() throws Exception
{
for ( PersistentEphemeralNode node : createdNodes )
@@ -73,10 +73,8 @@
for ( CuratorFramework curator : curatorInstances )
{
- curator.close();
+ TestCleanState.closeAndTestClean(curator);
}
-
- super.teardown();
}
@Test
@@ -122,7 +120,7 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -132,10 +130,11 @@
server.stop();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ PersistentEphemeralNode node = null;
try
{
client.start();
- PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+ node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
node.start();
final CountDownLatch connectedLatch = new CountDownLatch(1);
@@ -164,7 +163,8 @@
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(node);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -247,7 +247,7 @@
{
node.close();
}
- client.close();
+ TestCleanState.closeAndTestClean(client);
}
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 659154a..2bdd278 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -23,6 +23,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
@@ -147,7 +148,7 @@
}
for ( CuratorFramework client : clients )
{
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
}
@@ -170,7 +171,7 @@
finally
{
CloseableUtils.closeQuietly(count);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -215,7 +216,7 @@
finally
{
CloseableUtils.closeQuietly(count);
- CloseableUtils.closeQuietly(client);
+ TestCleanState.closeAndTestClean(client);
}
}
@@ -252,8 +253,8 @@
{
CloseableUtils.closeQuietly(count2);
CloseableUtils.closeQuietly(count1);
- CloseableUtils.closeQuietly(client2);
- CloseableUtils.closeQuietly(client1);
+ TestCleanState.closeAndTestClean(client2);
+ TestCleanState.closeAndTestClean(client1);
}
}
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 13c3138..6ef3bb0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -35,6 +35,7 @@
private static final int RETRY_WAIT_MS = 5000;
private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
+ private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
private static final String INTERNAL_RETRY_FAILED_TESTS;
static
{
@@ -53,6 +54,17 @@
}
INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES = logConnectionIssues;
INTERNAL_RETRY_FAILED_TESTS = retryFailedTests;
+ String s = null;
+ try
+ {
+ // use reflection to avoid adding a circular dependency in the pom
+ s = (String)Class.forName("org.apache.curator.utils.DebugUtils").getField("PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND").get(null);
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ }
+ INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = s;
}
@BeforeSuite(alwaysRun = true)
@@ -71,6 +83,7 @@
{
System.setProperty(INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
}
+ System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
while ( server == null )
{
@@ -89,6 +102,7 @@
@AfterMethod
public void teardown() throws Exception
{
+ System.clearProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND);
if ( server != null )
{
try
diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
new file mode 100644
index 0000000..e4c3b7e
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import org.apache.zookeeper.ZooKeeper;
+import java.lang.reflect.Method;
+import java.util.List;
+
+public class WatchersDebug
+{
+ private static final Method getDataWatches;
+ private static final Method getExistWatches;
+ private static final Method getChildWatches;
+ static
+ {
+ Method localGetDataWatches = null;
+ Method localGetExistWatches = null;
+ Method localGetChildWatches = null;
+ try
+ {
+ localGetDataWatches = getMethod("getDataWatches");
+ localGetExistWatches = getMethod("getExistWatches");
+ localGetChildWatches = getMethod("getChildWatches");
+ }
+ catch ( NoSuchMethodException e )
+ {
+ e.printStackTrace();
+ }
+ getDataWatches = localGetDataWatches;
+ getExistWatches = localGetExistWatches;
+ getChildWatches = localGetChildWatches;
+ }
+
+ public static List<String> getDataWatches(ZooKeeper zooKeeper)
+ {
+ return callMethod(zooKeeper, WatchersDebug.getDataWatches);
+ }
+
+ public static List<String> getExistWatches(ZooKeeper zooKeeper)
+ {
+ return callMethod(zooKeeper, getExistWatches);
+ }
+
+ public static List<String> getChildWatches(ZooKeeper zooKeeper)
+ {
+ return callMethod(zooKeeper, getChildWatches);
+ }
+
+ private WatchersDebug()
+ {
+ }
+
+ private static Method getMethod(String name) throws NoSuchMethodException
+ {
+ Method m = ZooKeeper.class.getDeclaredMethod(name);
+ m.setAccessible(true);
+ return m;
+ }
+
+ private static List<String> callMethod(ZooKeeper zooKeeper, Method method)
+ {
+ if ( zooKeeper == null )
+ {
+ return null;
+ }
+ try
+ {
+ //noinspection unchecked
+ return (List<String>)method.invoke(zooKeeper);
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}