Merge 'CURATOR-217' into CURATOR-3.0
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
index b098989..383bc13 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
@@ -23,6 +23,7 @@
     public static final String          PROPERTY_LOG_EVENTS = "curator-log-events";
     public static final String          PROPERTY_DONT_LOG_CONNECTION_ISSUES = "curator-dont-log-connection-problems";
     public static final String          PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL = "curator-log-only-first-connection-issue-as-error-level";
+    public static final String          PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = "curator-remove-watchers-in-foreground";
     public static final String          PROPERTY_RETRY_FAILED_TESTS = "curator-retry-failed-tests";
 
     private DebugUtils()
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 9239ac4..58c5bf5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -190,6 +190,12 @@
     public SyncBuilder sync();
 
     /**
+     * Start a remove watches builder.
+     * @return builder object
+     */
+    public RemoveWatchesBuilder watches();
+
+    /**
      * Returns the listenable interface for the Connect State
      *
      * @return listenable
@@ -259,7 +265,11 @@
      * Call this method on watchers you are no longer interested in.
      *
      * @param watcher the watcher
+     * 
+     * @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
+     * when they are no longer used.
      */
+    @Deprecated
     public void clearWatcherReferences(Watcher watcher);
         
     /**
@@ -278,4 +288,13 @@
      * @throws InterruptedException If interrupted while waiting
      */
     public void blockUntilConnected() throws InterruptedException;
+
+    /**
+     * Returns a facade of the current instance that tracks
+     * watchers created and allows a one-shot removal of all watchers
+     * via {@link WatcherRemoveCuratorFramework#removeWatchers()}
+     *
+     * @return facade
+     */
+    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework();
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java
new file mode 100644
index 0000000..871b53c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/WatcherRemoveCuratorFramework.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework;
+
+/**
+ * A CuratorFramework facade that tracks watchers created and allows a one-shot removal of all watchers
+ */
+public interface WatcherRemoveCuratorFramework extends CuratorFramework
+{
+    /**
+     * Remove all outstanding watchers that have been set
+     */
+    void removeWatchers();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietly.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietly.java
new file mode 100644
index 0000000..13202aa
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietly.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface BackgroundPathableQuietly<T> extends BackgroundPathable<T>, Quietly<BackgroundPathable<T>>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietlyable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietlyable.java
new file mode 100644
index 0000000..8ed73fa
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/BackgroundPathableQuietlyable.java
@@ -0,0 +1,5 @@
+package org.apache.curator.framework.api;
+
+public interface BackgroundPathableQuietlyable<T> extends BackgroundPathable<T>, Quietly<BackgroundPathable<T>>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
index 5a2dc56..5dea211 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CuratorEventType.java
@@ -89,6 +89,11 @@
     WATCHED,
 
     /**
+     * Corresponds to {@link CuratorFramework#watches()} ()}
+     */
+    REMOVE_WATCHES,
+
+    /**
      * Event sent when client is being closed
      */
     CLOSING
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
index 3a3faf7..2da1843 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DeleteBuilder.java
@@ -18,6 +18,6 @@
  */
 package org.apache.curator.framework.api;
 
-public interface DeleteBuilder extends Guaranteeable, ChildrenDeletable
+public interface DeleteBuilder extends GuaranteeableDeletable, ChildrenDeletable
 {
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
index 481911b..bc033ac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Guaranteeable.java
@@ -18,23 +18,15 @@
  */
 package org.apache.curator.framework.api;
 
-public interface Guaranteeable extends BackgroundVersionable
+public interface Guaranteeable<T>
 {
     /**
-     * <p>
-     *     Solves this edge case: deleting a node can fail due to connection issues. Further,
-     *     if the node was ephemeral, the node will not get auto-deleted as the session is still valid.
-     *     This can wreak havoc with lock implementations.
-     * </p>
-     *
-     * <p>
-     *     When <code>guaranteed</code> is set, Curator will record failed node deletions and
-     *     attempt to delete them in the background until successful. NOTE: you will still get an
-     *     exception when the deletion fails. But, you can be assured that as long as the
-     *     {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node.
-     * </p>
+     * Solves edge cases where an operation may succeed on the server but connection failure occurs before a
+     * response can be successfully returned to the client.
+     * 
+     * @see org.apache.curator.framework.api.GuaranteeableDeletable 
      *  
      * @return this
      */
-    public ChildrenDeletable guaranteed();
+    public T guaranteed();
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDeletable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDeletable.java
new file mode 100644
index 0000000..7f8139c
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GuaranteeableDeletable.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+/**
+ * <p>
+ *     Solves this edge case: deleting a node can fail due to connection issues. Further,
+ *     if the node was ephemeral, the node will not get auto-deleted as the session is still valid.
+ *     This can wreak havoc with lock implementations.
+ * </p>
+ *
+ * <p>
+ *     When <code>guaranteed</code> is set, Curator will record failed node deletions and
+ *     attempt to delete them in the background until successful. NOTE: you will still get an
+ *     exception when the deletion fails. But, you can be assured that as long as the
+ *     {@link org.apache.curator.framework.CuratorFramework} instance is open attempts will be made to delete the node.
+ * </p>
+ *  
+ * @return this
+ */
+public interface GuaranteeableDeletable extends Guaranteeable<ChildrenDeletable>, BackgroundVersionable
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Quietly.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Quietly.java
new file mode 100644
index 0000000..ad3762f
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Quietly.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+public interface Quietly<T>
+{
+    public T quietly();
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
new file mode 100644
index 0000000..6cc0b05
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesBuilder.java
@@ -0,0 +1,47 @@
+/**

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+package org.apache.curator.framework.api;

+

+import org.apache.zookeeper.Watcher;

+

+/**

+ * Builder to allow watches to be removed 

+ */

+public interface RemoveWatchesBuilder

+{

+    /**

+     * Specify the watcher to be removed

+     * @param watcher

+     * @return

+     */

+    public RemoveWatchesType remove(Watcher watcher);

+    

+    /**

+     * Specify the watcher to be removed

+     * @param watcher

+     * @return

+     */

+    public RemoveWatchesType remove(CuratorWatcher watcher);

+    

+    /**

+     * Specify that all watches should be removed

+     * @return

+     */

+    public RemoveWatchesType removeAll();

+}

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
new file mode 100644
index 0000000..4a67470
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesLocal.java
@@ -0,0 +1,35 @@
+/**

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+package org.apache.curator.framework.api;

+

+/**

+ * Builder to allow the specification of whether it is acceptable to remove client side watch information

+ * in the case where ZK cannot be contacted. 

+ */

+public interface RemoveWatchesLocal extends BackgroundPathableQuietlyable<Void>

+{

+   

+    /**

+     * Specify if the client should just remove client side watches if a connection to ZK

+     * is not available. Note that the standard Curator retry loop will not be used in t

+     * @return

+     */

+    public BackgroundPathableQuietlyable<Void> locally();

+    

+}

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
new file mode 100644
index 0000000..84d8093
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/RemoveWatchesType.java
@@ -0,0 +1,37 @@
+/**

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+package org.apache.curator.framework.api;

+

+import org.apache.zookeeper.Watcher.WatcherType;

+

+/**

+ * Builder to allow the specification of whether it is acceptable to remove client side watch information

+ * in the case where ZK cannot be contacted. 

+ */

+public interface RemoveWatchesType extends RemoveWatchesLocal, Guaranteeable<BackgroundPathableQuietlyable<Void>>

+{

+   

+    /**

+     * Specify the type of watcher to be removed.

+     * @param watcherType

+     * @return

+     */

+    public RemoveWatchesLocal ofType(WatcherType watcherType);

+    

+}

diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 900374b..41bb7cd 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -30,6 +30,7 @@
 import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
@@ -76,6 +77,7 @@
     private final List<AuthInfo> authInfos;
     private final byte[] defaultData;
     private final FailedDeleteManager failedDeleteManager;
+    private final FailedRemoveWatchManager failedRemoveWatcherManager;
     private final CompressionProvider compressionProvider;
     private final ACLProvider aclProvider;
     private final NamespaceFacadeCache namespaceFacadeCache;
@@ -128,6 +130,7 @@
         authInfos = buildAuths(builder);
 
         failedDeleteManager = new FailedDeleteManager(this);
+        failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
     }
 
@@ -141,6 +144,12 @@
         return builder1.build();
     }
 
+    @Override
+    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
+    {
+        return new WatcherRemovalFacade(this);
+    }
+
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
     {
         return new ZookeeperFactory()
@@ -180,6 +189,7 @@
         connectionStateManager = parent.connectionStateManager;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
+        failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
         compressionProvider = parent.compressionProvider;
         aclProvider = parent.aclProvider;
         namespaceFacadeCache = parent.namespaceFacadeCache;
@@ -479,6 +489,12 @@
         return new SyncBuilderImpl(this);
     }
 
+    @Override
+    public RemoveWatchesBuilder watches()
+    {
+        return new RemoveWatchesBuilderImpl(this);
+    }
+
     protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
     {
         BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
@@ -507,6 +523,11 @@
         return failedDeleteManager;
     }
 
+    FailedRemoveWatchManager getFailedRemoveWatcherManager()
+    {
+        return failedRemoveWatcherManager;
+    }
+
     RetryLoop newRetryLoop()
     {
         return client.newRetryLoop();
@@ -689,6 +710,11 @@
         return Watcher.Event.KeeperState.fromInt(-1);
     }
 
+    WatcherRemovalManager getWatcherRemovalManager()
+    {
+        return null;
+    }
+
     private void suspendConnection()
     {
         if ( !connectionStateManager.setToSuspended() )
@@ -842,7 +868,7 @@
     {
         try
         {
-            if ( client.isConnected() )
+            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
             {
                 operationAndData.callPerformBackgroundOperation();
             }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
index 51641b8..2a98f56 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/DeleteBuilderImpl.java
@@ -205,7 +205,7 @@
                     @Override
                     public void retriesExhausted(OperationAndData<String> operationAndData)
                     {
-                        client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+                        client.getFailedDeleteManager().addFailedOperation(unfixedPath);
                     }
                 };
             }
@@ -261,7 +261,7 @@
             //Only retry a guaranteed delete if it's a retryable error
             if( (RetryLoop.isRetryException(e) || (e instanceof InterruptedException)) && guaranteed )
             {
-                client.getFailedDeleteManager().addFailedDelete(unfixedPath);
+                client.getFailedDeleteManager().addFailedOperation(unfixedPath);
             }
             throw e;
         }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 345bcf5..a6316ac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -141,7 +141,7 @@
         }
         else
         {
-            client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
+            client.getZooKeeper().exists(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext());
         }
     }
 
@@ -223,7 +223,7 @@
                     }
                     else
                     {
-                        returnStat = client.getZooKeeper().exists(path, watching.getWatcher());
+                        returnStat = client.getZooKeeper().exists(path, watching.getWatcher(client, path));
                     }
                     return returnStat;
                 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
index deb7f40..934ae40 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedDeleteManager.java
@@ -19,45 +19,18 @@
 package org.apache.curator.framework.imps;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-class FailedDeleteManager
+class FailedDeleteManager extends FailedOperationManager<String>
 {
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
-    
-    volatile FailedDeleteManagerListener debugListener = null;
-    
-    interface FailedDeleteManagerListener
-    {
-       public void pathAddedForDelete(String path);
-    }
-
     FailedDeleteManager(CuratorFramework client)
     {
-        this.client = client;
+        super(client);
     }
 
-    void addFailedDelete(String path)
+    @Override
+    protected void executeGuaranteedOperationInBackground(String path)
+            throws Exception
     {
-        if ( debugListener != null )
-        {
-            debugListener.pathAddedForDelete(path);
-        }
-        
-        
-        if ( client.getState() == CuratorFrameworkState.STARTED )
-        {
-            log.debug("Path being added to guaranteed delete set: " + path);
-            try
-            {
-                client.delete().guaranteed().inBackground().forPath(path);
-            }
-            catch ( Exception e )
-            {
-                addFailedDelete(path);
-            }
-        }
+        client.delete().guaranteed().inBackground().forPath(path);
     }
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
new file mode 100644
index 0000000..405561b
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedOperationManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+abstract class FailedOperationManager<T>
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    protected final CuratorFramework client;
+    
+    @VisibleForTesting
+    volatile FailedOperationManagerListener<T> debugListener = null;
+    
+    interface FailedOperationManagerListener<T>
+    {
+       public void pathAddedForGuaranteedOperation(T detail);
+    }
+
+    FailedOperationManager(CuratorFramework client)
+    {
+        this.client = client;
+    }
+
+    void addFailedOperation(T details)
+    {
+        if ( debugListener != null )
+        {
+            debugListener.pathAddedForGuaranteedOperation(details);
+        }
+        
+        
+        if ( client.getState() == CuratorFrameworkState.STARTED )
+        {
+            log.debug("Details being added to guaranteed operation set: " + details);
+            try
+            {
+                executeGuaranteedOperationInBackground(details);
+            }
+            catch ( Exception e )
+            {
+                addFailedOperation(details);
+            }
+        }
+    }
+    
+    protected abstract void executeGuaranteedOperationInBackground(T details) throws Exception;
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
new file mode 100644
index 0000000..f635660
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/FailedRemoveWatchManager.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.Watcher;
+
+class FailedRemoveWatchManager extends FailedOperationManager<FailedRemoveWatchManager.FailedRemoveWatchDetails>
+{
+    FailedRemoveWatchManager(CuratorFramework client)
+    {
+        super(client);
+    }
+
+    @Override
+    protected void executeGuaranteedOperationInBackground(FailedRemoveWatchDetails details)
+            throws Exception
+    {
+        if(details.watcher == null)
+        {
+            client.watches().removeAll().guaranteed().inBackground().forPath(details.path);
+        }
+        else
+        {
+            client.watches().remove(details.watcher).guaranteed().inBackground().forPath(details.path);
+        }
+    }
+    
+    static class FailedRemoveWatchDetails
+    {
+        public final String path;
+        public final Watcher watcher;
+        
+        public FailedRemoveWatchDetails(String path, Watcher watcher)
+        {
+            this.path = path;
+            this.watcher = watcher;
+        }
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
index 03010ce..7929ba3 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetChildrenBuilderImpl.java
@@ -172,7 +172,7 @@
         }
         else
         {
-            client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
+            client.getZooKeeper().getChildren(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext());
         }
     }
 
@@ -211,7 +211,7 @@
                     }
                     else
                     {
-                        children = client.getZooKeeper().getChildren(path, watching.getWatcher(), responseStat);
+                        children = client.getZooKeeper().getChildren(path, watching.getWatcher(client, path), responseStat);
                     }
                     return children;
                 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
index a837809..5468bd4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java
@@ -30,6 +30,7 @@
 import org.apache.curator.framework.api.GetConfigBuilder;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
@@ -153,7 +154,7 @@
         }
         else
         {
-            client.getZooKeeper().getConfig(watching.getWatcher(), callback, backgrounding.getContext());
+            client.getZooKeeper().getConfig(watching.getWatcher(client, ZooDefs.CONFIG_NODE), callback, backgrounding.getContext());
         }
     }
 
@@ -174,7 +175,7 @@
                         {
                             return client.getZooKeeper().getConfig(true, stat);
                         }
-                        return client.getZooKeeper().getConfig(watching.getWatcher(), stat);
+                        return client.getZooKeeper().getConfig(watching.getWatcher(client, ZooDefs.CONFIG_NODE), stat);
                     }
                 }
             );
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 23da075..31ad598 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -260,7 +260,7 @@
         }
         else
         {
-            client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
+            client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(client, operationAndData.getData()), callback, backgrounding.getContext());
         }
     }
 
@@ -299,7 +299,7 @@
                     }
                     else
                     {
-                        responseData = client.getZooKeeper().getData(path, watching.getWatcher(), responseStat);
+                        responseData = client.getZooKeeper().getData(path, watching.getWatcher(client, path), responseStat);
                     }
                     return responseData;
                 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
index 38f59a0..b46cddb 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/OperationAndData.java
@@ -40,25 +40,37 @@
     private final AtomicLong sleepUntilTimeMs = new AtomicLong(0);
     private final long ordinal = nextOrdinal.getAndIncrement();
     private final Object context;
+    private final boolean connectionRequired;
 
     interface ErrorCallback<T>
     {
         void retriesExhausted(OperationAndData<T> operationAndData);
     }
-
-    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+    
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context, boolean connectionRequired)
     {
         this.operation = operation;
         this.data = data;
         this.callback = callback;
         this.errorCallback = errorCallback;
         this.context = context;
+        this.connectionRequired = connectionRequired;
+    }      
+
+    OperationAndData(BackgroundOperation<T> operation, T data, BackgroundCallback callback, ErrorCallback<T> errorCallback, Object context)
+    {
+        this(operation, data, callback, errorCallback, context, true);
     }
 
     Object getContext()
     {
         return context;
     }
+    
+    boolean isConnectionRequired()
+    {
+        return connectionRequired;
+    }
 
     void callPerformBackgroundOperation() throws Exception
     {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
new file mode 100644
index 0000000..d872ced
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java
@@ -0,0 +1,315 @@
+/**

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+package org.apache.curator.framework.imps;

+

+import java.util.concurrent.Callable;

+import java.util.concurrent.Executor;

+

+import org.apache.curator.RetryLoop;

+import org.apache.curator.TimeTrace;

+import org.apache.curator.framework.api.BackgroundCallback;

+import org.apache.curator.framework.api.BackgroundPathable;

+import org.apache.curator.framework.api.BackgroundPathableQuietlyable;

+import org.apache.curator.framework.api.CuratorEvent;

+import org.apache.curator.framework.api.CuratorEventType;

+import org.apache.curator.framework.api.CuratorWatcher;

+import org.apache.curator.framework.api.Pathable;

+import org.apache.curator.framework.api.RemoveWatchesLocal;

+import org.apache.curator.framework.api.RemoveWatchesBuilder;

+import org.apache.curator.framework.api.RemoveWatchesType;

+import org.apache.curator.utils.DebugUtils;

+import org.apache.zookeeper.AsyncCallback;

+import org.apache.zookeeper.KeeperException;

+import org.apache.zookeeper.Watcher;

+import org.apache.zookeeper.Watcher.WatcherType;

+import org.apache.zookeeper.ZooKeeper;

+

+

+public class RemoveWatchesBuilderImpl implements RemoveWatchesBuilder, RemoveWatchesType, RemoveWatchesLocal, BackgroundOperation<String>

+{

+    private CuratorFrameworkImpl client;

+    private Watcher watcher;

+    private WatcherType watcherType;

+    private boolean guaranteed;

+    private boolean local;

+    private boolean quietly;    

+    private Backgrounding backgrounding;

+    

+    public RemoveWatchesBuilderImpl(CuratorFrameworkImpl client)

+    {

+        this.client = client;

+        this.watcher = null;

+        this.watcherType = WatcherType.Any;

+        this.guaranteed = false;

+        this.local = false;

+        this.quietly = false;

+        this.backgrounding = new Backgrounding();

+    }

+

+    void internalRemoval(Watcher watcher, String path) throws Exception

+    {

+        this.watcher = watcher;

+        watcherType = WatcherType.Any;

+        quietly = true;

+        guaranteed = true;

+        if ( Boolean.getBoolean(DebugUtils.PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND) )

+        {

+            this.backgrounding = new Backgrounding();

+            pathInForeground(path);

+        }

+        else

+        {

+            this.backgrounding = new Backgrounding(true);

+            pathInBackground(path);

+        }

+    }

+

+    @Override

+    public RemoveWatchesType remove(Watcher watcher)

+    {

+        if(watcher == null) {

+            this.watcher = null;

+        } else {

+            //Try and get the namespaced version of the watcher.

+            this.watcher = client.getNamespaceWatcherMap().get(watcher);

+            

+            //If this is not present then default to the original watcher. This shouldn't happen in practice unless the user

+            //has added a watch directly to the ZK client rather than via the CuratorFramework.

+            if(this.watcher == null) {

+                this.watcher = watcher;

+            }

+        }

+

+        return this;

+    }

+    

+    @Override

+    public RemoveWatchesType remove(CuratorWatcher watcher)

+    {

+        this.watcher = watcher == null ? null : client.getNamespaceWatcherMap().get(watcher);

+        return this;

+    }    

+

+    @Override

+    public RemoveWatchesType removeAll()

+    {

+        this.watcher = null;

+        return this;

+    }

+

+    @Override

+    public RemoveWatchesLocal ofType(WatcherType watcherType)

+    {

+        this.watcherType = watcherType;

+        

+        return this;

+    }

+

+    @Override

+    public Pathable<Void> inBackground(BackgroundCallback callback, Object context)

+    {

+        backgrounding = new Backgrounding(callback, context);

+        return this;

+    }

+

+    @Override

+    public Pathable<Void> inBackground(BackgroundCallback callback, Object context, Executor executor)

+    {

+        backgrounding = new Backgrounding(client, callback, context, executor);

+        return this;

+    }

+

+    @Override

+    public Pathable<Void> inBackground(BackgroundCallback callback)

+    {

+        backgrounding = new Backgrounding(callback);

+        return this;

+    }

+

+    @Override

+    public Pathable<Void> inBackground(BackgroundCallback callback, Executor executor)

+    {

+        backgrounding = new Backgrounding(client, callback, executor);

+        return this;

+    }

+

+    @Override

+    public Pathable<Void> inBackground()

+    {

+        backgrounding = new Backgrounding(true);

+        return this;

+    }

+

+    @Override

+    public Pathable<Void> inBackground(Object context)

+    {

+        backgrounding = new Backgrounding(context);

+        return this;

+    }

+    

+    @Override

+    public RemoveWatchesLocal guaranteed()

+    {

+        guaranteed = true;

+        return this;

+    }    

+

+    @Override

+    public BackgroundPathableQuietlyable<Void> locally()

+    {

+        local = true;

+        return this;

+    }

+    

+    @Override

+    public BackgroundPathable<Void> quietly()

+    {

+        quietly = true;

+        return this;

+    }

+    

+    @Override

+    public Void forPath(String path) throws Exception

+    {

+        final String adjustedPath = client.fixForNamespace(path);

+        

+        if(backgrounding.inBackground())

+        {

+            pathInBackground(adjustedPath);

+        }

+        else

+        {

+            pathInForeground(adjustedPath);

+        }        

+        

+        return null;

+    }    

+    

+    private void pathInBackground(final String path)

+    {

+        OperationAndData.ErrorCallback<String>  errorCallback = null;

+        

+        //Only need an error callback if we're in guaranteed mode

+        if(guaranteed)

+        {

+            errorCallback = new OperationAndData.ErrorCallback<String>()

+            {

+                @Override

+                public void retriesExhausted(OperationAndData<String> operationAndData)

+                {

+                    client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));

+                }            

+            };

+        }

+        

+        client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(),

+                                                                       errorCallback, backgrounding.getContext(), !local), null);

+    }

+    

+    private void pathInForeground(final String path) throws Exception

+    {

+        //For the local case we don't want to use the normal retry loop and we don't want to block until a connection is available.

+        //We just execute the removeWatch, and if it fails, ZK will just remove local watches.

+        if(local)

+        {

+            ZooKeeper zkClient = client.getZooKeeper();

+            if(watcher == null)

+            {

+                zkClient.removeAllWatches(path, watcherType, local);    

+            }

+            else

+            {

+                zkClient.removeWatches(path, watcher, watcherType, local);

+            }

+        }

+        else

+        {

+            RetryLoop.callWithRetry(client.getZookeeperClient(), 

+                    new Callable<Void>()

+                    {

+                        @Override

+                        public Void call() throws Exception

+                        {

+                            try

+                            {

+                                ZooKeeper zkClient = client.getZookeeperClient().getZooKeeper();    

+                                

+                                if(watcher == null)

+                                {

+                                    zkClient.removeAllWatches(path, watcherType, local);    

+                                }

+                                else

+                                {

+                                    zkClient.removeWatches(path, watcher, watcherType, local);

+                                }

+                            }

+                            catch(Exception e)

+                            {

+                                if( RetryLoop.isRetryException(e) && guaranteed )

+                                {

+                                    //Setup the guaranteed handler

+                                    client.getFailedRemoveWatcherManager().addFailedOperation(new FailedRemoveWatchManager.FailedRemoveWatchDetails(path, watcher));

+                                    throw e;

+                                }

+                                else if(e instanceof KeeperException.NoWatcherException && quietly)

+                                {

+                                    //Ignore

+                                }

+                                else

+                                {

+                                    //Rethrow

+                                    throw e;

+                                }

+                            }

+                     

+                            return null;

+                        }

+            });

+        }

+    }

+    

+    @Override

+    public void performBackgroundOperation(final OperationAndData<String> operationAndData)

+            throws Exception

+    {

+        final TimeTrace   trace = client.getZookeeperClient().startTracer("RemoteWatches-Background");

+        

+        AsyncCallback.VoidCallback callback = new AsyncCallback.VoidCallback()

+        {

+            @Override

+            public void processResult(int rc, String path, Object ctx)

+            {

+                trace.commit();

+                CuratorEvent event = new CuratorEventImpl(client, CuratorEventType.REMOVE_WATCHES, rc, path, null, ctx, null, null, null, null, null);

+                client.processBackgroundOperation(operationAndData, event);                

+            }

+        };

+        

+        ZooKeeper zkClient = client.getZooKeeper();

+        if(watcher == null)

+        {

+            zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext());    

+        }

+        else

+        {

+            zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext());

+        }

+        

+    }

+}
\ No newline at end of file
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
new file mode 100644
index 0000000..156341e
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.zookeeper.ZooKeeper;
+
+class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemoveCuratorFramework
+{
+    private final CuratorFrameworkImpl client;
+    private final WatcherRemovalManager removalManager;
+
+    WatcherRemovalFacade(CuratorFrameworkImpl client)
+    {
+        super(client);
+        this.client = client;
+        removalManager = new WatcherRemovalManager(client);
+    }
+
+    @Override
+    public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
+    {
+        return client.newWatcherRemoveCuratorFramework();
+    }
+
+    WatcherRemovalManager getRemovalManager()
+    {
+        return removalManager;
+    }
+
+    @Override
+    public void removeWatchers()
+    {
+        removalManager.removeWatchers();
+    }
+
+    @Override
+    WatcherRemovalManager getWatcherRemovalManager()
+    {
+        return removalManager;
+    }
+
+    @Override
+    public CuratorFramework nonNamespaceView()
+    {
+        return client.nonNamespaceView();
+    }
+
+    @Override
+    public CuratorFramework usingNamespace(String newNamespace)
+    {
+        return client.usingNamespace(newNamespace);
+    }
+
+    @Override
+    public String getNamespace()
+    {
+        return client.getNamespace();
+    }
+
+    @Override
+    public void start()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Listenable<ConnectionStateListener> getConnectionStateListenable()
+    {
+        return client.getConnectionStateListenable();
+    }
+
+    @Override
+    public Listenable<CuratorListener> getCuratorListenable()
+    {
+        return client.getCuratorListenable();
+    }
+
+    @Override
+    public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
+    {
+        return client.getUnhandledErrorListenable();
+    }
+
+    @Override
+    public void sync(String path, Object context)
+    {
+        client.sync(path, context);
+    }
+
+    @Override
+    public CuratorZookeeperClient getZookeeperClient()
+    {
+        return client.getZookeeperClient();
+    }
+
+    @Override
+    RetryLoop newRetryLoop()
+    {
+        return client.newRetryLoop();
+    }
+
+    @Override
+    ZooKeeper getZooKeeper() throws Exception
+    {
+        return client.getZooKeeper();
+    }
+
+    @Override
+    <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
+    {
+        client.processBackgroundOperation(operationAndData, event);
+    }
+
+    @Override
+    void logError(String reason, Throwable e)
+    {
+        client.logError(reason, e);
+    }
+
+    @Override
+    String unfixForNamespace(String path)
+    {
+        return client.unfixForNamespace(path);
+    }
+
+    @Override
+    String fixForNamespace(String path)
+    {
+        return client.fixForNamespace(path);
+    }
+
+    @Override
+    public EnsurePath newNamespaceAwareEnsurePath(String path)
+    {
+        return client.newNamespaceAwareEnsurePath(path);
+    }
+
+    @Override
+    FailedDeleteManager getFailedDeleteManager()
+    {
+        return client.getFailedDeleteManager();
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
new file mode 100644
index 0000000..a691a94
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+public class WatcherRemovalManager
+{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final CuratorFrameworkImpl client;
+    private final Set<WrappedWatcher> entries = Sets.newHashSet();  // guarded by sync
+
+    WatcherRemovalManager(CuratorFrameworkImpl client)
+    {
+        this.client = client;
+    }
+
+    synchronized Watcher add(String path, Watcher watcher)
+    {
+        path = Preconditions.checkNotNull(path, "path cannot be null");
+        watcher = Preconditions.checkNotNull(watcher, "watcher cannot be null");
+
+        WrappedWatcher wrappedWatcher = new WrappedWatcher(watcher, path);
+        entries.add(wrappedWatcher);
+        return wrappedWatcher;
+    }
+
+    @VisibleForTesting
+    synchronized Set<? extends Watcher> getEntries()
+    {
+        return Sets.newHashSet(entries);
+    }
+
+    void removeWatchers()
+    {
+        HashSet<WrappedWatcher> localEntries;
+        synchronized(this)
+        {
+            localEntries = Sets.newHashSet(entries);
+        }
+        for ( WrappedWatcher entry : localEntries )
+        {
+            try
+            {
+                log.debug("Removing watcher for path: " + entry.path);
+                RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client);
+                builder.internalRemoval(entry, entry.path);
+            }
+            catch ( Exception e )
+            {
+                log.error("Could not remove watcher for path: " + entry.path);
+            }
+        }
+    }
+
+    private synchronized void internalRemove(WrappedWatcher entry)
+    {
+        entries.remove(entry);
+    }
+
+    private class WrappedWatcher implements Watcher
+    {
+        private final Watcher watcher;
+        private final String path;
+
+        WrappedWatcher(Watcher watcher, String path)
+        {
+            this.watcher = watcher;
+            this.path = path;
+        }
+
+        @Override
+        public void process(WatchedEvent event)
+        {
+            if ( event.getType() != Event.EventType.None )
+            {
+                internalRemove(this);
+            }
+            watcher.process(event);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if ( this == o )
+            {
+                return true;
+            }
+            if ( o == null || getClass() != o.getClass() )
+            {
+                return false;
+            }
+
+            WrappedWatcher entry = (WrappedWatcher)o;
+
+            //noinspection SimplifiableIfStatement
+            if ( !watcher.equals(entry.watcher) )
+            {
+                return false;
+            }
+            return path.equals(entry.path);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = watcher.hashCode();
+            result = 31 * result + path.hashCode();
+            return result;
+        }
+    }
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
index a9d0ab1..4bebbd5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/Watching.java
@@ -50,8 +50,12 @@
         watched = false;
     }
 
-    Watcher getWatcher()
+    Watcher getWatcher(CuratorFrameworkImpl client, String unfixedPath)
     {
+        if ( (watcher != null) && (client.getWatcherRemovalManager() != null) )
+        {
+            return client.getWatcherRemovalManager().add(unfixedPath, watcher);
+        }
         return watcher;
     }
 
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
index 6599745..943529f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java
@@ -22,7 +22,6 @@
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.imps.FailedDeleteManager.FailedDeleteManagerListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -291,11 +290,11 @@
         
         final AtomicBoolean pathAdded = new AtomicBoolean(false);
         
-        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
         {
             
             @Override
-            public void pathAddedForDelete(String path)
+            public void pathAddedForGuaranteedOperation(String path)
             {
                 pathAdded.set(true);
             }
@@ -325,11 +324,11 @@
         
         final AtomicBoolean pathAdded = new AtomicBoolean(false);
         
-        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedDeleteManagerListener()
+        ((CuratorFrameworkImpl)client).getFailedDeleteManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<String>()
         {
             
             @Override
-            public void pathAddedForDelete(String path)
+            public void pathAddedForGuaranteedOperation(String path)
             {
                 pathAdded.set(true);
             }
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
new file mode 100644
index 0000000..4e02e95
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java
@@ -0,0 +1,655 @@
+/**

+ * Licensed to the Apache Software Foundation (ASF) under one

+ * or more contributor license agreements.  See the NOTICE file

+ * distributed with this work for additional information

+ * regarding copyright ownership.  The ASF licenses this file

+ * to you under the Apache License, Version 2.0 (the

+ * "License"); you may not use this file except in compliance

+ * with the License.  You may obtain a copy of the License at

+ *

+ *   http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing,

+ * software distributed under the License is distributed on an

+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

+ * KIND, either express or implied.  See the License for the

+ * specific language governing permissions and limitations

+ * under the License.

+ */

+package org.apache.curator.framework.imps;

+

+import java.util.concurrent.CountDownLatch;

+import java.util.concurrent.Semaphore;

+import java.util.concurrent.TimeUnit;

+import java.util.concurrent.atomic.AtomicBoolean;

+import java.util.concurrent.atomic.AtomicReference;

+

+import org.apache.curator.framework.CuratorFramework;

+import org.apache.curator.framework.CuratorFrameworkFactory;

+import org.apache.curator.framework.api.BackgroundCallback;

+import org.apache.curator.framework.api.CuratorEvent;

+import org.apache.curator.framework.api.CuratorEventType;

+import org.apache.curator.framework.api.CuratorListener;

+import org.apache.curator.framework.api.CuratorWatcher;

+import org.apache.curator.framework.imps.FailedRemoveWatchManager.FailedRemoveWatchDetails;

+import org.apache.curator.framework.state.ConnectionState;

+import org.apache.curator.framework.state.ConnectionStateListener;

+import org.apache.curator.retry.ExponentialBackoffRetry;

+import org.apache.curator.retry.RetryOneTime;

+import org.apache.curator.test.BaseClassForTests;

+import org.apache.curator.test.Timing;

+import org.apache.curator.utils.CloseableUtils;

+import org.apache.zookeeper.KeeperException;

+import org.apache.zookeeper.WatchedEvent;

+import org.apache.zookeeper.Watcher;

+import org.apache.zookeeper.Watcher.Event.EventType;

+import org.apache.zookeeper.Watcher.WatcherType;

+import org.testng.Assert;

+import org.testng.annotations.Test;

+

+public class TestRemoveWatches extends BaseClassForTests

+{

+    private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client)

+    {

+        final AtomicReference<ConnectionState> state = new AtomicReference<ConnectionState>();

+        client.getConnectionStateListenable().addListener(new ConnectionStateListener()

+        {

+            

+            @Override

+            public void stateChanged(CuratorFramework client, ConnectionState newState)

+            {

+                state.set(newState);

+                synchronized(state)

+                {

+                    state.notify();

+                }

+            }

+        });

+        

+        return state;

+    }

+    

+    private boolean blockUntilDesiredConnectionState(AtomicReference<ConnectionState> stateRef, Timing timing, final ConnectionState desiredState)

+    {

+        if(stateRef.get() == desiredState)

+        {

+            return true;

+        }

+        

+        synchronized(stateRef)

+        {

+            if(stateRef.get() == desiredState)

+            {

+                return true;

+            }

+            

+            try

+            {

+                stateRef.wait(timing.milliseconds());

+                return stateRef.get() == desiredState;

+            }

+            catch(InterruptedException e)

+            {

+                Thread.currentThread().interrupt();

+                return false;

+            }

+        }

+    }

+    

+    @Test

+    public void testRemoveCuratorDefaultWatcher() throws Exception

+    {

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            final String path = "/";            

+            client.getCuratorListenable().addListener(new CuratorListener()

+            {                

+                @Override

+                public void eventReceived(CuratorFramework client, CuratorEvent event)

+                        throws Exception

+                {

+                    if(event.getType() == CuratorEventType.WATCHED && event.getWatchedEvent().getType() == EventType.DataWatchRemoved) {                        

+                        removedLatch.countDown();

+                    }        

+                }

+            });

+                        

+            client.checkExists().watched().forPath(path);

+            

+            client.watches().removeAll().forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testRemoveCuratorWatch() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            final String path = "/";            

+            CuratorWatcher watcher = new CuratorWatcher()

+            {

+                

+                @Override

+                public void process(WatchedEvent event) throws Exception

+                {

+                    if(event.getPath().equals(path) && event.getType() == EventType.DataWatchRemoved) {

+                        removedLatch.countDown();

+                    }

+                }

+            };

+                        

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            client.watches().remove(watcher).forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }    

+    

+    @Test

+    public void testRemoveWatch() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            final String path = "/";    

+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);

+            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            client.watches().remove(watcher).forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testRemoveWatchInBackgroundWithCallback() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {            

+            client.start();

+         

+            //Make sure that the event fires on both the watcher and the callback.

+            final CountDownLatch removedLatch = new CountDownLatch(2);

+            final String path = "/";

+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);

+            

+            BackgroundCallback callback = new BackgroundCallback()

+            {

+                

+                @Override

+                public void processResult(CuratorFramework client, CuratorEvent event)

+                        throws Exception

+                {

+                    if(event.getType() == CuratorEventType.REMOVE_WATCHES && event.getPath().equals(path)) {

+                        removedLatch.countDown();

+                    }

+                }

+            };

+            

+            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            client.watches().remove(watcher).ofType(WatcherType.Any).inBackground(callback).forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+            

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testRemoveWatchInBackgroundWithNoCallback() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final String path = "/";

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);

+            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            client.watches().remove(watcher).inBackground().forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+            

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }        

+    

+    @Test

+    public void testRemoveAllWatches() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final String path = "/";

+            final CountDownLatch removedLatch = new CountDownLatch(2);

+            

+            Watcher watcher1 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);            

+            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);                        

+            

+            client.getChildren().usingWatcher(watcher1).forPath(path);

+            client.checkExists().usingWatcher(watcher2).forPath(path);

+            

+            client.watches().removeAll().forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }  

+    

+    @Test

+    public void testRemoveAllDataWatches() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final String path = "/";

+            final AtomicBoolean removedFlag = new AtomicBoolean(false);

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.ChildWatchRemoved);            

+            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);                        

+            

+            client.getChildren().usingWatcher(watcher1).forPath(path);

+            client.checkExists().usingWatcher(watcher2).forPath(path);

+            

+            client.watches().removeAll().ofType(WatcherType.Data).forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+            Assert.assertEquals(removedFlag.get(), false);

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testRemoveAllChildWatches() throws Exception

+    {       

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final String path = "/";

+            final AtomicBoolean removedFlag = new AtomicBoolean(false);

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            Watcher watcher1 = new BooleanWatcher(path, removedFlag, EventType.DataWatchRemoved);            

+            Watcher watcher2 = new CountDownWatcher(path, removedLatch, EventType.ChildWatchRemoved);                        

+                        

+            client.checkExists().usingWatcher(watcher1).forPath(path);

+            client.getChildren().usingWatcher(watcher2).forPath(path);

+            

+            client.watches().removeAll().ofType(WatcherType.Children).forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+            Assert.assertEquals(removedFlag.get(), false);

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }     

+    

+    @Test

+    public void testRemoveLocalWatch() throws Exception {

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);

+            

+            final String path = "/";

+            

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);        

+            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            //Stop the server so we can check if we can remove watches locally when offline

+            server.stop();

+            

+            Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));

+                       

+            client.watches().removeAll().locally().forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testRemoveLocalWatchInBackground() throws Exception {

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);

+            

+            final String path = "/";

+            

+            final CountDownLatch removedLatch = new CountDownLatch(1);

+            

+            Watcher watcher = new CountDownWatcher(path, removedLatch, EventType.DataWatchRemoved);        

+            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            //Stop the server so we can check if we can remove watches locally when offline

+            server.stop();

+            

+            Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));

+                       

+            client.watches().removeAll().locally().inBackground().forPath(path);

+            

+            Assert.assertTrue(timing.awaitLatch(removedLatch), "Timed out waiting for watch removal");

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }    

+    

+    /**

+     * Test the case where we try and remove an unregistered watcher. In this case we expect a NoWatcherException to

+     * be thrown. 

+     * @throws Exception

+     */

+    @Test(expectedExceptions=KeeperException.NoWatcherException.class)

+    public void testRemoveUnregisteredWatcher() throws Exception

+    {

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final String path = "/";            

+            Watcher watcher = new Watcher() {

+                @Override

+                public void process(WatchedEvent event)

+                {

+                }                

+            };

+            

+            client.watches().remove(watcher).forPath(path);

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    /**

+     * Test the case where we try and remove an unregistered watcher but have the quietly flag set. In this case we expect success. 

+     * @throws Exception

+     */

+    @Test

+    public void testRemoveUnregisteredWatcherQuietly() throws Exception

+    {

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            final AtomicBoolean watcherRemoved = new AtomicBoolean(false);

+            

+            final String path = "/";            

+            Watcher watcher = new BooleanWatcher(path, watcherRemoved, EventType.DataWatchRemoved);

+            

+            client.watches().remove(watcher).quietly().forPath(path);

+            

+            timing.sleepABit();

+            

+            //There should be no watcher removed as none were registered.

+            Assert.assertEquals(watcherRemoved.get(), false);

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testGuaranteedRemoveWatch() throws Exception {

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.builder().

+                connectString(server.getConnectString()).

+                retryPolicy(new RetryOneTime(1)).

+                build();

+        try

+        {

+            client.start();

+            

+            AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);

+                       

+            String path = "/";

+            

+            CountDownLatch removeLatch = new CountDownLatch(1);

+            

+            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            server.stop();           

+            

+            Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));

+            

+            //Remove the watch while we're not connected

+            try 

+            {

+                client.watches().remove(watcher).guaranteed().forPath(path);

+                Assert.fail();

+            }

+            catch(KeeperException.ConnectionLossException e)

+            {

+                //Expected

+            }

+            

+            server.restart();

+            

+            timing.awaitLatch(removeLatch);            

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }

+    

+    @Test

+    public void testGuaranteedRemoveWatchInBackground() throws Exception {

+        Timing timing = new Timing();

+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(),

+                                                                    new ExponentialBackoffRetry(100, 3));

+        try

+        {

+            client.start();

+            

+            AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client);

+                        

+            final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1);

+            

+            ((CuratorFrameworkImpl)client).getFailedRemoveWatcherManager().debugListener = new FailedOperationManager.FailedOperationManagerListener<FailedRemoveWatchManager.FailedRemoveWatchDetails>()

+            {

+

+                @Override

+                public void pathAddedForGuaranteedOperation(

+                        FailedRemoveWatchDetails detail)

+                {

+                    guaranteeAddedLatch.countDown();

+                }

+            };

+            

+            String path = "/";

+            

+            CountDownLatch removeLatch = new CountDownLatch(1);

+            

+            Watcher watcher = new CountDownWatcher(path, removeLatch, EventType.DataWatchRemoved);            

+            client.checkExists().usingWatcher(watcher).forPath(path);

+            

+            server.stop();           

+            Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED));

+            

+            //Remove the watch while we're not connected

+            client.watches().remove(watcher).guaranteed().inBackground().forPath(path);

+            

+            timing.awaitLatch(guaranteeAddedLatch);

+            

+            server.restart();

+            

+            timing.awaitLatch(removeLatch);            

+        }

+        finally

+        {

+            CloseableUtils.closeQuietly(client);

+        }

+    }    

+    

+    private static class CountDownWatcher implements Watcher {

+        private String path;

+        private EventType eventType;

+        private CountDownLatch removeLatch;

+        

+        public CountDownWatcher(String path, CountDownLatch removeLatch, EventType eventType) {

+            this.path = path;

+            this.eventType = eventType;

+            this.removeLatch = removeLatch;            

+        }

+        

+        @Override

+        public void process(WatchedEvent event)

+        {

+            if(event.getPath() == null || event.getType() == null) {

+                return;

+            }

+            

+            if(event.getPath().equals(path) && event.getType() == eventType) {

+                removeLatch.countDown();

+            }

+        }  

+    }

+    

+    private static class BooleanWatcher implements Watcher {

+        private String path;

+        private EventType eventType;

+        private AtomicBoolean removedFlag;

+        

+        public BooleanWatcher(String path, AtomicBoolean removedFlag, EventType eventType) {

+            this.path = path;

+            this.eventType = eventType;

+            this.removedFlag = removedFlag;            

+        }

+        

+        @Override

+        public void process(WatchedEvent event)

+        {

+            if(event.getPath() == null || event.getType() == null) {

+                return;

+            }

+            

+            if(event.getPath().equals(path) && event.getType() == eventType) {

+                removedFlag.set(true);

+            }

+        }  

+    }    

+}

diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
new file mode 100644
index 0000000..e20c450
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestWatcherRemovalManager.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.Timing;
+import org.apache.curator.test.WatchersDebug;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+public class TestWatcherRemovalManager extends BaseClassForTests
+{
+    @Test
+    public void testBasic() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            internalTryBasic(client);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBasicNamespace1() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+            internalTryBasic(client.usingNamespace("foo"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBasicNamespace2() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .retryPolicy(new RetryOneTime(1))
+            .namespace("hey")
+            .build();
+        try
+        {
+            client.start();
+            internalTryBasic(client);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testBasicNamespace3() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .retryPolicy(new RetryOneTime(1))
+            .namespace("hey")
+            .build();
+        try
+        {
+            client.start();
+            internalTryBasic(client.usingNamespace("lakjsf"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testSameWatcher() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    // NOP
+                }
+            };
+
+            removerClient.getData().usingWatcher(watcher).forPath("/");
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+            removerClient.getData().usingWatcher(watcher).forPath("/");
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testTriggered() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    if ( event.getType() == Event.EventType.NodeCreated )
+                    {
+                        latch.countDown();
+                    }
+                }
+            };
+
+            removerClient.checkExists().usingWatcher(watcher).forPath("/yo");
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+            removerClient.create().forPath("/yo");
+
+            Assert.assertTrue(new Timing().awaitLatch(latch));
+
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testResetFromWatcher() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            final WatcherRemovalFacade removerClient = (WatcherRemovalFacade)client.newWatcherRemoveCuratorFramework();
+
+            final CountDownLatch createdLatch = new CountDownLatch(1);
+            final CountDownLatch deletedLatch = new CountDownLatch(1);
+            Watcher watcher = new Watcher()
+            {
+                @Override
+                public void process(WatchedEvent event)
+                {
+                    if ( event.getType() == Event.EventType.NodeCreated )
+                    {
+                        try
+                        {
+                            removerClient.checkExists().usingWatcher(this).forPath("/yo");
+                        }
+                        catch ( Exception e )
+                        {
+                            e.printStackTrace();
+                        }
+                        createdLatch.countDown();
+                    }
+                    else if ( event.getType() == Event.EventType.NodeDeleted )
+                    {
+                        deletedLatch.countDown();
+                    }
+                }
+            };
+
+            removerClient.checkExists().usingWatcher(watcher).forPath("/yo");
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+            removerClient.create().forPath("/yo");
+
+            Assert.assertTrue(timing.awaitLatch(createdLatch));
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 1);
+
+            removerClient.delete().forPath("/yo");
+
+            Assert.assertTrue(timing.awaitLatch(deletedLatch));
+
+            Assert.assertEquals(removerClient.getRemovalManager().getEntries().size(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private void internalTryBasic(CuratorFramework client) throws Exception
+    {
+        WatcherRemoveCuratorFramework removerClient = client.newWatcherRemoveCuratorFramework();
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        Watcher watcher = new Watcher()
+        {
+            @Override
+            public void process(WatchedEvent event)
+            {
+                if ( event.getType() == Event.EventType.DataWatchRemoved )
+                {
+                    latch.countDown();
+                }
+            }
+        };
+        removerClient.checkExists().usingWatcher(watcher).forPath("/hey");
+
+        List<String> existWatches = WatchersDebug.getExistWatches(client.getZookeeperClient().getZooKeeper());
+        Assert.assertEquals(existWatches.size(), 1);
+
+        removerClient.removeWatchers();
+
+        Assert.assertTrue(new Timing().awaitLatch(latch));
+
+        existWatches = WatchersDebug.getExistWatches(client.getZookeeperClient().getZooKeeper());
+        Assert.assertEquals(existWatches.size(), 0);
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index 72ee5ff..49b9a3f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -23,6 +23,7 @@
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
@@ -53,7 +54,7 @@
 public class NodeCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final boolean dataIsCompressed;
     private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
@@ -127,7 +128,7 @@
      */
     public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.dataIsCompressed = dataIsCompressed;
     }
@@ -169,6 +170,7 @@
     {
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
+            client.removeWatchers();
             listeners.clear();
         }
         client.getConnectionStateListenable().removeListener(connectionStateListener);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index b5d912c..99a652d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -27,6 +27,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
@@ -42,16 +43,13 @@
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -67,7 +65,7 @@
 public class PathChildrenCache implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final CloseableExecutorService executorService;
     private final boolean cacheData;
@@ -215,7 +213,7 @@
      */
     public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.cacheData = cacheData;
         this.dataIsCompressed = dataIsCompressed;
@@ -374,6 +372,7 @@
             executorService.close();
             client.clearWatcherReferences(childrenWatcher);
             client.clearWatcherReferences(dataWatcher);
+            client.removeWatchers();
 
             // TODO
             // This seems to enable even more GC - I'm not sure why yet - it
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 4f3ffb6..bda00bf 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -480,7 +481,7 @@
     private final AtomicBoolean isInitialized = new AtomicBoolean(false);
 
     private final TreeNode root;
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final CloseableExecutorService executorService;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
@@ -530,7 +531,7 @@
     {
         this.createParentNodes = createParentNodes;
         this.root = new TreeNode(validatePath(path), null);
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.cacheData = cacheData;
         this.dataIsCompressed = dataIsCompressed;
         this.maxDepth = maxDepth;
@@ -566,6 +567,7 @@
     {
         if ( treeState.compareAndSet(TreeState.STARTED, TreeState.CLOSED) )
         {
+            client.removeWatchers();
             client.getConnectionStateListenable().removeListener(connectionStateListener);
             listeners.clear();
             executorService.close();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index a6d8145..da9b8b2 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -23,6 +23,7 @@
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.listen.ListenerContainer;
@@ -63,7 +64,7 @@
 public class LeaderLatch implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String latchPath;
     private final String id;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -143,7 +144,7 @@
      */
     public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
     {
-        this.client = Preconditions.checkNotNull(client, "client cannot be null");
+        this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();
         this.latchPath = PathUtils.validatePath(latchPath);
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
@@ -206,6 +207,7 @@
         try
         {
             setNode(null);
+            client.removeWatchers();
         }
         catch ( Exception e )
         {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
index 88b5f5d..444b10d 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreMutex.java
@@ -20,6 +20,7 @@
 
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -29,6 +30,7 @@
 public class InterProcessSemaphoreMutex implements InterProcessLock
 {
     private final InterProcessSemaphoreV2 semaphore;
+    private final WatcherRemoveCuratorFramework watcherRemoveClient;
     private volatile Lease lease;
 
     /**
@@ -37,7 +39,8 @@
      */
     public InterProcessSemaphoreMutex(CuratorFramework client, String path)
     {
-        this.semaphore = new InterProcessSemaphoreV2(client, path, 1);
+        watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
+        this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
     }
 
     @Override
@@ -66,6 +69,7 @@
         try
         {
             lease.close();
+            watcherRemoveClient.removeWatchers();
         }
         finally
         {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index f4af39b..3bf2ec3 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -22,15 +22,16 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
-
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.shared.SharedCountListener;
 import org.apache.curator.framework.recipes.shared.SharedCountReader;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,13 +39,13 @@
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>
@@ -78,7 +79,7 @@
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final InterProcessMutex lock;
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String leasesPath;
     private final Watcher watcher = new Watcher()
     {
@@ -122,7 +123,7 @@
 
     private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         path = PathUtils.validatePath(path);
         lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
         this.maxLeases = (count != null) ? count.getCount() : maxLeases;
@@ -352,36 +353,43 @@
             String nodeName = ZKPaths.getNodeFromPath(path);
             builder.add(makeLease(path));
 
-            synchronized(this)
+            try
             {
-                for(;;)
+                synchronized(this)
                 {
-                    List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
-                    if ( !children.contains(nodeName) )
+                    for(;;)
                     {
-                        log.error("Sequential path not found: " + path);
-                        return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
-                    }
-
-                    if ( children.size() <= maxLeases )
-                    {
-                        break;
-                    }
-                    if ( hasWait )
-                    {
-                        long thisWaitMs = getThisWaitMs(startMs, waitMs);
-                        if ( thisWaitMs <= 0 )
+                        List<String> children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
+                        if ( !children.contains(nodeName) )
                         {
-                            return InternalAcquireResult.RETURN_NULL;
+                            log.error("Sequential path not found: " + path);
+                            return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
                         }
-                        wait(thisWaitMs);
-                    }
-                    else
-                    {
-                        wait();
+
+                        if ( children.size() <= maxLeases )
+                        {
+                            break;
+                        }
+                        if ( hasWait )
+                        {
+                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
+                            if ( thisWaitMs <= 0 )
+                            {
+                                return InternalAcquireResult.RETURN_NULL;
+                            }
+                            wait(thisWaitMs);
+                        }
+                        else
+                        {
+                            wait();
+                        }
                     }
                 }
             }
+            finally
+            {
+                client.removeWatchers();
+            }
         }
         finally
         {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 2b4d3d9..4b0da11 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@ -24,11 +24,11 @@
 import com.google.common.collect.Lists;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ZKPaths;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -42,7 +42,7 @@
 
 public class LockInternals
 {
-    private final CuratorFramework                  client;
+    private final WatcherRemoveCuratorFramework     client;
     private final String                            path;
     private final String                            basePath;
     private final LockInternalsDriver               driver;
@@ -100,7 +100,7 @@
         this.lockName = lockName;
         this.maxLeases = maxLeases;
 
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.basePath = PathUtils.validatePath(path);
         this.path = ZKPaths.makePath(path, lockName);
     }
@@ -116,8 +116,9 @@
         revocable.set(entry);
     }
 
-    void releaseLock(String lockPath) throws Exception
+    final void releaseLock(String lockPath) throws Exception
     {
+        client.removeWatchers();
         revocable.set(null);
         deleteOurPath(lockPath);
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index 0d963e0..0b482ef 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Preconditions;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CreateModable;
@@ -58,7 +59,7 @@
 {
     private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
     private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
     private final String basePath;
@@ -212,7 +213,7 @@
      */
     public PersistentEphemeralNode(CuratorFramework client, Mode mode, String basePath, byte[] initData)
     {
-        this.client = Preconditions.checkNotNull(client, "client cannot be null");
+        this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();
         this.basePath = PathUtils.validatePath(basePath);
         this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
         final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
@@ -314,6 +315,8 @@
         {
             throw new IOException(e);
         }
+
+        client.removeWatchers();
     }
 
     /**
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
index 032dc7a..e5c7e8c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java
@@ -21,6 +21,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorWatcher;
@@ -36,7 +37,7 @@
 
 class ChildrenCache implements Closeable
 {
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final AtomicReference<Data> children = new AtomicReference<Data>(new Data(Lists.<String>newArrayList(), 0));
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -79,7 +80,7 @@
 
     ChildrenCache(CuratorFramework client, String path)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
     }
 
@@ -91,6 +92,7 @@
     @Override
     public void close() throws IOException
     {
+        client.removeWatchers();
         isClosed.set(true);
         notifyFromCallback();
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 6ce6bf4..dcd30ba 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
@@ -45,7 +46,7 @@
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final ListenerContainer<SharedValueListener> listeners = new ListenerContainer<SharedValueListener>();
-    private final CuratorFramework client;
+    private final WatcherRemoveCuratorFramework client;
     private final String path;
     private final byte[] seedValue;
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
@@ -87,7 +88,7 @@
      */
     public SharedValue(CuratorFramework client, String path, byte[] seedValue)
     {
-        this.client = client;
+        this.client = client.newWatcherRemoveCuratorFramework();
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         currentValue = new AtomicReference<VersionedValue<byte[]>>(new VersionedValue<byte[]>(0, Arrays.copyOf(seedValue, seedValue.length)));
@@ -233,8 +234,9 @@
     @Override
     public void close() throws IOException
     {
-        client.getConnectionStateListenable().removeListener(connectionStateListener);
         state.set(State.CLOSED);
+        client.removeWatchers();
+        client.getConnectionStateListenable().removeListener(connectionStateListener);
         listeners.clear();
     }
 
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
new file mode 100644
index 0000000..82de1fc
--- /dev/null
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/imps/TestCleanState.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.WatchersDebug;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.zookeeper.ZooKeeper;
+
+public class TestCleanState
+{
+    public static void closeAndTestClean(CuratorFramework client)
+    {
+        if ( client == null )
+        {
+            return;
+        }
+
+        try
+        {
+            CuratorFrameworkImpl internalClient = (CuratorFrameworkImpl)client;
+            ZooKeeper zooKeeper = internalClient.getZooKeeper();
+            if ( zooKeeper != null )
+            {
+                if ( WatchersDebug.getChildWatches(zooKeeper).size() != 0 )
+                {
+                    throw new AssertionError("One or more child watchers are still registered: " + WatchersDebug.getChildWatches(zooKeeper));
+                }
+                if ( WatchersDebug.getExistWatches(zooKeeper).size() != 0 )
+                {
+                    throw new AssertionError("One or more exists watchers are still registered: " + WatchersDebug.getExistWatches(zooKeeper));
+                }
+                if ( WatchersDebug.getDataWatches(zooKeeper).size() != 0 )
+                {
+                    throw new AssertionError("One or more data watchers are still registered: " + WatchersDebug.getDataWatches(zooKeeper));
+                }
+            }
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();    // not sure what to do here
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private TestCleanState()
+    {
+    }
+}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
index ab37785..f32c9b2 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/BaseTestTreeCache.java
@@ -22,6 +22,7 @@
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
@@ -123,7 +124,7 @@
             finally
             {
                 CloseableUtils.closeQuietly(cache);
-                CloseableUtils.closeQuietly(client);
+                TestCleanState.closeAndTestClean(client);
             }
         }
         finally
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
index 27af6ac..d6d495a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestNodeCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.cache;
 
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -98,7 +99,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -161,7 +162,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -204,7 +205,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -252,7 +253,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index 14d061f..3571ca7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -28,7 +28,7 @@
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.Pathable;
 import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
@@ -97,7 +97,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -139,7 +139,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -193,7 +193,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -242,7 +242,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -251,6 +251,7 @@
     {
         Timing timing = new Timing();
 
+        PathChildrenCache cache = null;
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
         try
@@ -258,7 +259,7 @@
             final CountDownLatch updatedLatch = new CountDownLatch(1);
             final CountDownLatch addedLatch = new CountDownLatch(1);
             client.create().creatingParentsIfNeeded().forPath("/test");
-            PathChildrenCache cache = new PathChildrenCache(client, "/test", false);
+            cache = new PathChildrenCache(client, "/test", false);
             cache.getListenable().addListener
                 (
                     new PathChildrenCacheListener()
@@ -287,7 +288,8 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -315,7 +317,7 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -391,7 +393,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -485,7 +487,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -494,6 +496,7 @@
     public void testIssue27() throws Exception
     {
         Timing timing = new Timing();
+        PathChildrenCache cache = null;
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
         try
@@ -507,7 +510,7 @@
 
             final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
             final Semaphore semaphore = new Semaphore(0);
-            PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+            cache = new PathChildrenCache(client, "/base", true);
             cache.getListenable().addListener
                 (
                     new PathChildrenCacheListener()
@@ -542,7 +545,8 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -551,6 +555,7 @@
     public void testIssue27Alt() throws Exception
     {
         Timing timing = new Timing();
+        PathChildrenCache cache = null;
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
         client.start();
         try
@@ -564,7 +569,7 @@
 
             final List<PathChildrenCacheEvent.Type> events = Lists.newArrayList();
             final Semaphore semaphore = new Semaphore(0);
-            PathChildrenCache cache = new PathChildrenCache(client, "/base", true);
+            cache = new PathChildrenCache(client, "/base", true);
             cache.getListenable().addListener
                 (
                     new PathChildrenCacheListener()
@@ -594,7 +599,8 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -602,6 +608,7 @@
     public void testKilledSession() throws Exception
     {
         Timing timing = new Timing();
+        PathChildrenCache cache = null;
         CuratorFramework client = null;
         try
         {
@@ -609,7 +616,7 @@
             client.start();
             client.create().forPath("/test");
 
-            PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
+            cache = new PathChildrenCache(client, "/test", true);
             cache.start();
 
             final CountDownLatch childAddedLatch = new CountDownLatch(1);
@@ -653,7 +660,8 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(cache);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -677,7 +685,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -721,7 +729,7 @@
         finally
         {
             CloseableUtils.closeQuietly(cache);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -809,7 +817,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -878,7 +886,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -911,8 +919,9 @@
             timing.sleepABit();
             Assert.assertFalse(exec.isExecuteCalled());
         }
-        finally {
-            client.close();
+        finally
+        {
+            TestCleanState.closeAndTestClean(client);
         }
 
     }
@@ -957,9 +966,10 @@
             latch.await(5, TimeUnit.SECONDS);
 
             Assert.assertTrue(latch.getCount() == 1, "Unexpected exception occurred");
-        } finally
+        }
+        finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 96e6d45..3742fb7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -24,6 +24,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryNTimes;
@@ -96,7 +97,7 @@
         finally
         {
             CloseableUtils.closeQuietly(latch);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -126,7 +127,7 @@
         finally
         {
             CloseableUtils.closeQuietly(latch);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -158,7 +159,7 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -213,7 +214,7 @@
             {
                 CloseableUtils.closeQuietly(latch);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -256,9 +257,8 @@
             {
                 CloseableUtils.closeQuietly(latch);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
-
     }
 
     @Test
@@ -320,7 +320,7 @@
         finally
         {
             executorService.shutdownNow();
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -416,7 +416,7 @@
                     CloseableUtils.closeQuietly(latch);
                 }
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -504,7 +504,7 @@
                     CloseableUtils.closeQuietly(latch);
                 }
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -583,7 +583,7 @@
             {
                 CloseableUtils.closeQuietly(notifiedLeader);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -639,7 +639,7 @@
         finally
         {
             CloseableUtils.closeQuietly(leader);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
             CloseableUtils.closeQuietly(server);
         }
     }
@@ -709,7 +709,7 @@
             {
                 CloseableUtils.closeQuietly(latch);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -745,6 +745,6 @@
     {
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
-        LeaderLatch latch = new LeaderLatch(client, "parent");
+        new LeaderLatch(client, "parent");
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index 09b5fe6..dceff88 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -20,6 +20,7 @@
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -84,13 +85,14 @@
             }
             catch ( Exception e )
             {
+                // ignore
             }
             Assert.assertFalse(goodLock.isAcquiredInThisProcess());
             Assert.assertTrue(otherGoodLock.isAcquiredInThisProcess());
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -142,13 +144,14 @@
             }
             catch ( Exception e )
             {
+                // ignore
             }
             Assert.assertFalse(goodLock.isAcquiredInThisProcess());
             Assert.assertTrue(goodLockWasLocked.get());
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index a2c079e..c37d88d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -20,6 +20,7 @@
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.KillSession;
 import org.apache.zookeeper.CreateMode;
@@ -106,7 +107,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -151,7 +152,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 99ea11f..f44d238 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -22,12 +22,12 @@
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
@@ -126,7 +126,7 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -278,7 +278,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -344,7 +344,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -390,7 +390,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -407,7 +407,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -539,11 +539,14 @@
                 Assert.assertTrue(acquiredLatchForClient1.await(10, TimeUnit.SECONDS));
                 Assert.assertTrue(mutexForClient1.isAcquiredInThisProcess());
             }
+
+            future1.get();
+            future2.get();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client1);
-            CloseableUtils.closeQuietly(client2);
+            TestCleanState.closeAndTestClean(client1);
+            TestCleanState.closeAndTestClean(client2);
         }
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
index f7636ed..48e4805 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessReadWriteLock.java
@@ -16,14 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
@@ -31,6 +32,7 @@
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -40,21 +42,22 @@
 public class TestInterProcessReadWriteLock extends BaseClassForTests
 {
     @Test
-    public void     testGetParticipantNodes() throws Exception
+    public void testGetParticipantNodes() throws Exception
     {
-        final int               READERS = 20;
-        final int               WRITERS = 8;
+        final int READERS = 20;
+        final int WRITERS = 8;
 
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            final CountDownLatch              latch = new CountDownLatch(READERS + WRITERS);
-            final CountDownLatch              readLatch = new CountDownLatch(READERS);
-            final InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
+            final CountDownLatch latch = new CountDownLatch(READERS + WRITERS);
+            final CountDownLatch readLatch = new CountDownLatch(READERS);
+            final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
 
-            ExecutorService                   service = Executors.newCachedThreadPool();
+            final CountDownLatch exitLatch = new CountDownLatch(1);
+            ExecutorCompletionService<Void> service = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
             for ( int i = 0; i < READERS; ++i )
             {
                 service.submit
@@ -65,8 +68,16 @@
                         public Void call() throws Exception
                         {
                             lock.readLock().acquire();
-                            latch.countDown();
-                            readLatch.countDown();
+                            try
+                            {
+                                latch.countDown();
+                                readLatch.countDown();
+                                exitLatch.await();
+                            }
+                            finally
+                            {
+                                lock.readLock().release();
+                            }
                             return null;
                         }
                     }
@@ -84,6 +95,14 @@
                             Assert.assertTrue(readLatch.await(10, TimeUnit.SECONDS));
                             latch.countDown();  // must be before as there can only be one writer
                             lock.writeLock().acquire();
+                            try
+                            {
+                                exitLatch.await();
+                            }
+                            finally
+                            {
+                                lock.writeLock().release();
+                            }
                             return null;
                         }
                     }
@@ -97,22 +116,28 @@
 
             Assert.assertEquals(readers.size(), READERS);
             Assert.assertEquals(writers.size(), WRITERS);
+
+            exitLatch.countDown();
+            for ( int i = 0; i < (READERS + WRITERS); ++i )
+            {
+                service.take().get();
+            }
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testThatUpgradingIsDisallowed() throws Exception
+    public void testThatUpgradingIsDisallowed() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
+            InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
             lock.readLock().acquire();
             Assert.assertFalse(lock.writeLock().acquire(5, TimeUnit.SECONDS));
 
@@ -120,70 +145,80 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testThatDowngradingRespectsThreads() throws Exception
+    public void testThatDowngradingRespectsThreads() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            final InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-            ExecutorService                   t1 = Executors.newSingleThreadExecutor();
-            ExecutorService                   t2 = Executors.newSingleThreadExecutor();
+            final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+            ExecutorService t1 = Executors.newSingleThreadExecutor();
+            ExecutorService t2 = Executors.newSingleThreadExecutor();
 
-            final CountDownLatch              latch = new CountDownLatch(1);
+            final CountDownLatch latch = new CountDownLatch(1);
 
-            Future<Object>                    f1 = t1.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            final CountDownLatch releaseLatch = new CountDownLatch(1);
+            Future<Object> f1 = t1.submit
+                (
+                    new Callable<Object>()
                     {
-                        lock.writeLock().acquire();
-                        latch.countDown();
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            lock.writeLock().acquire();
+                            latch.countDown();
+                            try
+                            {
+                                releaseLatch.await();
+                            }
+                            finally
+                            {
+                                lock.writeLock().release();
+                            }
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
-            Future<Object>                    f2 = t2.submit
-            (
-                new Callable<Object>()
-                {
-                    @Override
-                    public Object call() throws Exception
+            Future<Object> f2 = t2.submit
+                (
+                    new Callable<Object>()
                     {
-                        Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
-                        Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
-                        return null;
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
+                            Assert.assertFalse(lock.readLock().acquire(5, TimeUnit.SECONDS));
+                            return null;
+                        }
                     }
-                }
-            );
+                );
 
-            f1.get();
             f2.get();
+            releaseLatch.countDown();
+            f1.get();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testDowngrading() throws Exception
+    public void testDowngrading() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
         try
         {
             client.start();
 
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
+            InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
             lock.writeLock().acquire();
             Assert.assertTrue(lock.readLock().acquire(5, TimeUnit.SECONDS));
             lock.writeLock().release();
@@ -192,60 +227,60 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
     @Test
-    public void     testBasic() throws Exception
+    public void testBasic() throws Exception
     {
-        final int               CONCURRENCY = 8;
-        final int               ITERATIONS = 100;
+        final int CONCURRENCY = 8;
+        final int ITERATIONS = 100;
 
-        final Random            random = new Random();
-        final AtomicInteger     concurrentCount = new AtomicInteger(0);
-        final AtomicInteger     maxConcurrentCount = new AtomicInteger(0);
-        final AtomicInteger     writeCount = new AtomicInteger(0);
-        final AtomicInteger     readCount = new AtomicInteger(0);
+        final Random random = new Random();
+        final AtomicInteger concurrentCount = new AtomicInteger(0);
+        final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+        final AtomicInteger writeCount = new AtomicInteger(0);
+        final AtomicInteger readCount = new AtomicInteger(0);
 
-        List<Future<Void>>  futures = Lists.newArrayList();
-        ExecutorService     service = Executors.newCachedThreadPool();
+        List<Future<Void>> futures = Lists.newArrayList();
+        ExecutorService service = Executors.newCachedThreadPool();
         for ( int i = 0; i < CONCURRENCY; ++i )
         {
-            Future<Void>    future = service.submit
-            (
-                new Callable<Void>()
-                {
-                    @Override
-                    public Void call() throws Exception
+            Future<Void> future = service.submit
+                (
+                    new Callable<Void>()
                     {
-                        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-                        client.start();
-                        try
+                        @Override
+                        public Void call() throws Exception
                         {
-                            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock");
-                            for ( int i = 0; i < ITERATIONS; ++i )
+                            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+                            client.start();
+                            try
                             {
-                                if ( random.nextInt(100) < 10 )
+                                InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock");
+                                for ( int i = 0; i < ITERATIONS; ++i )
                                 {
-                                    doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
-                                    writeCount.incrementAndGet();
-                                }
-                                else
-                                {
-                                    doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
-                                    readCount.incrementAndGet();
+                                    if ( random.nextInt(100) < 10 )
+                                    {
+                                        doLocking(lock.writeLock(), concurrentCount, maxConcurrentCount, random, 1);
+                                        writeCount.incrementAndGet();
+                                    }
+                                    else
+                                    {
+                                        doLocking(lock.readLock(), concurrentCount, maxConcurrentCount, random, Integer.MAX_VALUE);
+                                        readCount.incrementAndGet();
+                                    }
                                 }
                             }
+                            finally
+                            {
+                                TestCleanState.closeAndTestClean(client);
+                            }
+                            return null;
                         }
-                        finally
-                        {
-                            CloseableUtils.closeQuietly(client);
-                        }
-                        return null;
                     }
-                }
-            );
+                );
             futures.add(future);
         }
 
@@ -262,17 +297,17 @@
     }
 
     @Test
-    public void     testSetNodeData() throws Exception
+    public void testSetNodeData() throws Exception
     {
-        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
 
         try
         {
             client.start();
 
-            final byte[] nodeData = new byte[] { 1, 2, 3, 4 };
+            final byte[] nodeData = new byte[]{1, 2, 3, 4};
 
-            InterProcessReadWriteLock   lock = new InterProcessReadWriteLock(client, "/lock", nodeData);
+            InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/lock", nodeData);
 
             // mutate passed-in node data, lock has made copy
             nodeData[0] = 5;
@@ -284,13 +319,13 @@
 
             byte dataInZk[] = client.getData().forPath("/lock/" + children.get(0));
             Assert.assertNotNull(dataInZk);
-            Assert.assertEquals(new byte[] { 1, 2, 3, 4 }, dataInZk);
+            Assert.assertEquals(new byte[]{1, 2, 3, 4}, dataInZk);
 
             lock.writeLock().release();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -299,7 +334,7 @@
         try
         {
             Assert.assertTrue(lock.acquire(10, TimeUnit.SECONDS));
-            int     localConcurrentCount;
+            int localConcurrentCount;
             synchronized(this)
             {
                 localConcurrentCount = concurrentCount.incrementAndGet();
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index 631b7c7..3ba75d8 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -20,13 +20,14 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
@@ -100,10 +101,12 @@
 
             future1.get();
             future2.get();
+
+            count.close();
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -140,8 +143,8 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client1);
-            CloseableUtils.closeQuietly(client2);
+            TestCleanState.closeAndTestClean(client1);
+            TestCleanState.closeAndTestClean(client2);
         }
     }
 
@@ -226,7 +229,7 @@
                                     }
                                     finally
                                     {
-                                        client.close();
+                                        TestCleanState.closeAndTestClean(client);
                                     }
                                     return null;
                                 }
@@ -299,7 +302,7 @@
                             }
                             finally
                             {
-                                client.close();
+                                TestCleanState.closeAndTestClean(client);
                             }
                             return null;
                         }
@@ -401,7 +404,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -445,7 +448,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -463,7 +466,7 @@
         }
         finally
         {
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -499,7 +502,7 @@
             {
                 CloseableUtils.closeQuietly(l);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -528,7 +531,7 @@
             {
                 CloseableUtils.closeQuietly(l);
             }
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
index 2aa8a72..f4cb7bb 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreCluster.java
@@ -22,6 +22,7 @@
 import org.apache.curator.ensemble.EnsembleProvider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -147,7 +148,7 @@
                             }
                             finally
                             {
-                                CloseableUtils.closeQuietly(client);
+                                TestCleanState.closeAndTestClean(client);
                             }
                             return null;
                         }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
index 2d9a9aa..d1e6db5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockACLs.java
@@ -19,6 +19,7 @@
 
 package org.apache.curator.framework.recipes.locks;
 
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryPolicy;
@@ -74,7 +75,7 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
index 457be75..dc14c11 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestLockCleanlinessWithFaults.java
@@ -20,6 +20,7 @@
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
@@ -67,7 +68,7 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 9f5907a..c81cc65 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -20,17 +20,16 @@
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
-
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
-import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.ZKPaths;
@@ -64,6 +63,7 @@
     private final Timing timing = new Timing();
 
     @AfterMethod
+    @Override
     public void teardown() throws Exception
     {
         for ( PersistentEphemeralNode node : createdNodes )
@@ -73,10 +73,8 @@
 
         for ( CuratorFramework curator : curatorInstances )
         {
-            curator.close();
+            TestCleanState.closeAndTestClean(curator);
         }
-
-        super.teardown();
     }
 
     @Test
@@ -122,7 +120,7 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -132,10 +130,11 @@
         server.stop();
 
         CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        PersistentEphemeralNode node = null;
         try
         {
             client.start();
-            PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
+            node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, "/abc/node", "hello".getBytes());
             node.start();
 
             final CountDownLatch connectedLatch = new CountDownLatch(1);
@@ -164,7 +163,8 @@
         }
         finally
         {
-            CloseableUtils.closeQuietly(client);
+            CloseableUtils.closeQuietly(node);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -247,7 +247,7 @@
             {
                 node.close();
             }
-            client.close();
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index 659154a..2bdd278 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -23,6 +23,7 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
@@ -147,7 +148,7 @@
             }
             for ( CuratorFramework client : clients )
             {
-                CloseableUtils.closeQuietly(client);
+                TestCleanState.closeAndTestClean(client);
             }
         }
     }
@@ -170,7 +171,7 @@
         finally
         {
             CloseableUtils.closeQuietly(count);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -215,7 +216,7 @@
         finally
         {
             CloseableUtils.closeQuietly(count);
-            CloseableUtils.closeQuietly(client);
+            TestCleanState.closeAndTestClean(client);
         }
     }
 
@@ -252,8 +253,8 @@
         {
             CloseableUtils.closeQuietly(count2);
             CloseableUtils.closeQuietly(count1);
-            CloseableUtils.closeQuietly(client2);
-            CloseableUtils.closeQuietly(client1);
+            TestCleanState.closeAndTestClean(client2);
+            TestCleanState.closeAndTestClean(client1);
         }
     }
 }
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index 13c3138..6ef3bb0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -35,6 +35,7 @@
 
     private static final int    RETRY_WAIT_MS = 5000;
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
+    private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND;
     private static final String INTERNAL_RETRY_FAILED_TESTS;
     static
     {
@@ -53,6 +54,17 @@
         }
         INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES = logConnectionIssues;
         INTERNAL_RETRY_FAILED_TESTS = retryFailedTests;
+        String s = null;
+        try
+        {
+            // use reflection to avoid adding a circular dependency in the pom
+            s = (String)Class.forName("org.apache.curator.utils.DebugUtils").getField("PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND").get(null);
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();
+        }
+        INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = s;
     }
 
     @BeforeSuite(alwaysRun = true)
@@ -71,6 +83,7 @@
         {
             System.setProperty(INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true");
         }
+        System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true");
 
         while ( server == null )
         {
@@ -89,6 +102,7 @@
     @AfterMethod
     public void teardown() throws Exception
     {
+        System.clearProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND);
         if ( server != null )
         {
             try
diff --git a/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
new file mode 100644
index 0000000..e4c3b7e
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/WatchersDebug.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import org.apache.zookeeper.ZooKeeper;
+import java.lang.reflect.Method;
+import java.util.List;
+
+public class WatchersDebug
+{
+    private static final Method getDataWatches;
+    private static final Method getExistWatches;
+    private static final Method getChildWatches;
+    static
+    {
+        Method localGetDataWatches = null;
+        Method localGetExistWatches = null;
+        Method localGetChildWatches = null;
+        try
+        {
+            localGetDataWatches = getMethod("getDataWatches");
+            localGetExistWatches = getMethod("getExistWatches");
+            localGetChildWatches = getMethod("getChildWatches");
+        }
+        catch ( NoSuchMethodException e )
+        {
+            e.printStackTrace();
+        }
+        getDataWatches = localGetDataWatches;
+        getExistWatches = localGetExistWatches;
+        getChildWatches = localGetChildWatches;
+    }
+
+    public static List<String> getDataWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, WatchersDebug.getDataWatches);
+    }
+
+    public static List<String> getExistWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, getExistWatches);
+    }
+
+    public static List<String> getChildWatches(ZooKeeper zooKeeper)
+    {
+        return callMethod(zooKeeper, getChildWatches);
+    }
+
+    private WatchersDebug()
+    {
+    }
+
+    private static Method getMethod(String name) throws NoSuchMethodException
+    {
+        Method m = ZooKeeper.class.getDeclaredMethod(name);
+        m.setAccessible(true);
+        return m;
+    }
+
+    private static List<String> callMethod(ZooKeeper zooKeeper, Method method)
+    {
+        if ( zooKeeper == null )
+        {
+            return null;
+        }
+        try
+        {
+            //noinspection unchecked
+            return (List<String>)method.invoke(zooKeeper);
+        }
+        catch ( Exception e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}