Merge 'master' into CURATOR-3.0
diff --git a/curator-client/pom.xml b/curator-client/pom.xml
index 0f3955b..7bb437c 100644
--- a/curator-client/pom.xml
+++ b/curator-client/pom.xml
@@ -46,6 +46,16 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
@@ -61,5 +71,17 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
index ce751ec..b098989 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java
@@ -23,6 +23,7 @@
     public static final String          PROPERTY_LOG_EVENTS = "curator-log-events";
     public static final String          PROPERTY_DONT_LOG_CONNECTION_ISSUES = "curator-dont-log-connection-problems";
     public static final String          PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL = "curator-log-only-first-connection-issue-as-error-level";
+    public static final String          PROPERTY_RETRY_FAILED_TESTS = "curator-retry-failed-tests";
 
     private DebugUtils()
     {
diff --git a/curator-client/src/main/java/org/apache/curator/utils/EnsurePath.java b/curator-client/src/main/java/org/apache/curator/utils/EnsurePath.java
index f072775..3845a74 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/EnsurePath.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/EnsurePath.java
@@ -47,7 +47,10 @@
  *         ensurePath.ensure(zk);   // subsequent times are NOPs
  *         zk.create(nodePath, ...);
  * </pre>
+ *
+ * @deprecated Since 2.9.0 - Prefer CuratorFramework.create().creatingParentContainersIfNeeded() or CuratorFramework.exists().creatingParentContainersIfNeeded()
  */
+@Deprecated
 public class EnsurePath
 {
     private final String path;
@@ -64,7 +67,7 @@
         }
     };
 
-    private interface Helper
+    interface Helper
     {
         public void ensure(CuratorZookeeperClient client, String path, final boolean makeLastNode) throws Exception;
     }
@@ -110,7 +113,7 @@
         return new EnsurePath(path, helper, false, aclProvider);
     }
 
-    private EnsurePath(String path, AtomicReference<Helper> helper, boolean makeLastNode, InternalACLProvider aclProvider)
+    protected EnsurePath(String path, AtomicReference<Helper> helper, boolean makeLastNode, InternalACLProvider aclProvider)
     {
         this.path = path;
         this.makeLastNode = makeLastNode;
@@ -128,6 +131,11 @@
         return this.path;
     }
 
+    protected boolean asContainers()
+    {
+        return false;
+    }
+
     private class InitialHelper implements Helper
     {
         private boolean isSet = false;  // guarded by synchronization
@@ -145,7 +153,7 @@
                             @Override
                             public Object call() throws Exception
                             {
-                                ZKPaths.mkdirs(client.getZooKeeper(), path, makeLastNode, aclProvider);
+                                ZKPaths.mkdirs(client.getZooKeeper(), path, makeLastNode, aclProvider, asContainers());
                                 helper.set(doNothingHelper);
                                 isSet = true;
                                 return null;
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java b/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
index f4623a3..75e1171 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ZKPaths.java
@@ -26,6 +26,8 @@
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.Collections;
 import java.util.List;
 
@@ -35,8 +37,48 @@
      * Zookeeper's path separator character.
      */
     public static final String PATH_SEPARATOR = "/";
-    
-    
+
+    private static final CreateMode NON_CONTAINER_MODE = CreateMode.PERSISTENT;
+
+    /**
+     * @return {@link CreateMode#CONTAINER} if the ZK JAR supports it. Otherwise {@link CreateMode#PERSISTENT}
+     */
+    public static CreateMode getContainerCreateMode()
+    {
+        return CreateModeHolder.containerCreateMode;
+    }
+
+    /**
+     * Returns true if the version of ZooKeeper client in use supports containers
+     *
+     * @return true/false
+     */
+    public static boolean hasContainerSupport()
+    {
+        return getContainerCreateMode() != NON_CONTAINER_MODE;
+    }
+
+    private static class CreateModeHolder
+    {
+        private static final Logger log = LoggerFactory.getLogger(ZKPaths.class);
+        private static final CreateMode containerCreateMode;
+
+        static
+        {
+            CreateMode localCreateMode;
+            try
+            {
+                localCreateMode = CreateMode.valueOf("CONTAINER");
+            }
+            catch ( IllegalArgumentException ignore )
+            {
+                localCreateMode = NON_CONTAINER_MODE;
+                log.warn("The version of ZooKeeper being used doesn't support Container nodes. CreateMode.PERSISTENT will be used instead.");
+            }
+            containerCreateMode = localCreateMode;
+        }
+    }
+
     /**
      * Apply the namespace to the given path
      *
@@ -161,7 +203,7 @@
      */
     public static void mkdirs(ZooKeeper zookeeper, String path) throws InterruptedException, KeeperException
     {
-        mkdirs(zookeeper, path, true, null);
+        mkdirs(zookeeper, path, true, null, false);
     }
 
     /**
@@ -176,7 +218,7 @@
      */
     public static void mkdirs(ZooKeeper zookeeper, String path, boolean makeLastNode) throws InterruptedException, KeeperException
     {
-        mkdirs(zookeeper, path, makeLastNode, null);
+        mkdirs(zookeeper, path, makeLastNode, null, false);
     }
 
     /**
@@ -192,6 +234,23 @@
      */
     public static void mkdirs(ZooKeeper zookeeper, String path, boolean makeLastNode, InternalACLProvider aclProvider) throws InterruptedException, KeeperException
     {
+        mkdirs(zookeeper, path, makeLastNode, aclProvider, false);
+    }
+
+    /**
+     * Make sure all the nodes in the path are created. NOTE: Unlike File.mkdirs(), Zookeeper doesn't distinguish
+     * between directories and files. So, every node in the path is created. The data for each node is an empty blob
+     *
+     * @param zookeeper    the client
+     * @param path         path to ensure
+     * @param makeLastNode if true, all nodes are created. If false, only the parent nodes are created
+     * @param aclProvider  if not null, the ACL provider to use when creating parent nodes
+     * @param asContainers if true, nodes are created as {@link CreateMode#CONTAINER}
+     * @throws InterruptedException                 thread interruption
+     * @throws org.apache.zookeeper.KeeperException Zookeeper errors
+     */
+    public static void mkdirs(ZooKeeper zookeeper, String path, boolean makeLastNode, InternalACLProvider aclProvider, boolean asContainers) throws InterruptedException, KeeperException
+    {
         PathUtils.validatePath(path);
 
         int pos = 1; // skip first slash, root is guaranteed to exist
@@ -229,7 +288,7 @@
                     {
                         acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
                     }
-                    zookeeper.create(subPath, new byte[0], acl, CreateMode.PERSISTENT);
+                    zookeeper.create(subPath, new byte[0], acl, getCreateMode(asContainers));
                 }
                 catch ( KeeperException.NodeExistsException e )
                 {
@@ -397,4 +456,9 @@
     private ZKPaths()
     {
     }
+
+    private static CreateMode getCreateMode(boolean asContainers)
+    {
+        return asContainers ? getContainerCreateMode() : CreateMode.PERSISTENT;
+    }
 }
diff --git a/curator-examples/pom.xml b/curator-examples/pom.xml
index 825ca16..536f727 100644
--- a/curator-examples/pom.xml
+++ b/curator-examples/pom.xml
@@ -48,5 +48,11 @@
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-x-discovery</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/curator-examples/src/main/java/cache/PathCacheExample.java b/curator-examples/src/main/java/cache/PathCacheExample.java
index 7c25ec1..e121337 100644
--- a/curator-examples/src/main/java/cache/PathCacheExample.java
+++ b/curator-examples/src/main/java/cache/PathCacheExample.java
@@ -231,7 +231,7 @@
         }
         catch ( KeeperException.NoNodeException e )
         {
-            client.create().creatingParentsIfNeeded().forPath(path, bytes);
+            client.create().creatingParentContainersIfNeeded().forPath(path, bytes);
         }
     }
 
diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml
index a45ce77..a7df78c 100644
--- a/curator-framework/pom.xml
+++ b/curator-framework/pom.xml
@@ -55,6 +55,18 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index d56f2ca..9239ac4 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -62,6 +62,7 @@
      * @return true/false
      * @deprecated use {@link #getState()} instead
      */
+    @Deprecated
     public boolean isStarted();
 
     /**
@@ -168,9 +169,19 @@
      * @param backgroundContextObject optional context
      * @deprecated use {@link #sync()} instead
      */
+    @Deprecated
     public void sync(String path, Object backgroundContextObject);
 
     /**
+     * Create all nodes in the specified path as containers if they don't
+     * already exist
+     *
+     * @param path path to create
+     * @throws Exception errors
+     */
+    public void createContainers(String path) throws Exception;
+
+    /**
      * Start a sync builder. Note: sync is ALWAYS in the background even
      * if you don't use one of the background() methods
      *
@@ -204,8 +215,9 @@
      * pre-pend the namespace to all paths
      *
      * @return facade
-     * @deprecated use {@link #usingNamespace} passing <code>null</code>
+     * @deprecated Since 2.9.0 - use {@link #usingNamespace} passing <code>null</code>
      */
+    @Deprecated
     public CuratorFramework nonNamespaceView();
 
     /**
@@ -236,7 +248,10 @@
      *
      * @param path path to ensure
      * @return new EnsurePath instance
+     * @deprecated Since 2.9.0 - prefer {@link CreateBuilder#creatingParentContainersIfNeeded()}, {@link ExistsBuilder#creatingParentContainersIfNeeded()}
+     * or {@link CuratorFramework#createContainers(String)}
      */
+    @Deprecated
     public EnsurePath newNamespaceAwareEnsurePath(String path);
 
     /**
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 11cee2d..dcb2ee6 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -25,6 +25,7 @@
 import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.api.CompressionProvider;
+import org.apache.curator.framework.api.CreateBuilder;
 import org.apache.curator.framework.api.PathAndBytesable;
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.framework.imps.CuratorTempFrameworkImpl;
@@ -32,6 +33,7 @@
 import org.apache.curator.framework.imps.GzipCompressionProvider;
 import org.apache.curator.utils.DefaultZookeeperFactory;
 import org.apache.curator.utils.ZookeeperFactory;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import java.net.InetAddress;
@@ -113,6 +115,7 @@
         private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
+        private boolean useContainerParentsIfAvailable = true;
 
         /**
          * Apply the current values and build a new CuratorFramework
@@ -328,6 +331,18 @@
             return this;
         }
 
+        /**
+         * By default, Curator uses {@link CreateBuilder#creatingParentContainersIfNeeded()}
+         * if the ZK JAR supports {@link CreateMode#CONTAINER}. Call this method to turn off this behavior.
+         *
+         * @return this
+         */
+        public Builder dontUseContainerParents()
+        {
+            this.useContainerParentsIfAvailable = false;
+            return this;
+        }
+
         public ACLProvider getAclProvider()
         {
             return aclProvider;
@@ -378,6 +393,11 @@
             return namespace;
         }
 
+        public boolean useContainerParentsIfAvailable()
+        {
+            return useContainerParentsIfAvailable;
+        }
+
         @Deprecated
         public String getAuthScheme()
         {
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBackgroundModeACLable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBackgroundModeACLable.java
index d2a4e27..e821d3b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBackgroundModeACLable.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBackgroundModeACLable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator.framework.api;
 
+import org.apache.zookeeper.CreateMode;
+
 public interface CreateBackgroundModeACLable extends
     BackgroundPathAndBytesable<String>,
     CreateModable<ACLBackgroundPathAndBytesable<String>>,
@@ -31,6 +33,16 @@
     public ACLCreateModePathAndBytesable<String>    creatingParentsIfNeeded();
 
     /**
+     * Causes any parent nodes to get created using {@link CreateMode#CONTAINER} if they haven't already been.
+     * IMPORTANT NOTE: container creation is a new feature in recent versions of ZooKeeper.
+     * If the ZooKeeper version you're using does not support containers, the parent nodes
+     * are created as ordinary PERSISTENT nodes.
+     *
+     * @return this
+     */
+    public ACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded();
+
+    /**
      * <p>
      *     Hat-tip to https://github.com/sbridges for pointing this out
      * </p>
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
index d29b475..0db2094 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/CreateBuilder.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator.framework.api;
 
+import org.apache.zookeeper.CreateMode;
+
 public interface CreateBuilder extends
     BackgroundPathAndBytesable<String>,
     CreateModable<ACLBackgroundPathAndBytesable<String>>,
@@ -32,12 +34,23 @@
     public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded();
 
     /**
+     * Causes any parent nodes to get created using {@link CreateMode#CONTAINER} if they haven't already been.
+     * IMPORTANT NOTE: container creation is a new feature in recent versions of ZooKeeper.
+     * If the ZooKeeper version you're using does not support containers, the parent nodes
+     * are created as ordinary PERSISTENT nodes.
+     *
+     * @return this
+     */
+    public ProtectACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded();
+
+    /**
      * @deprecated this has been generalized to support all create modes. Instead, use:
      * <pre>
      *     client.create().withProtection().withMode(CreateMode.PERSISTENT_SEQUENTIAL)...
      * </pre>
      * @return this
      */
+    @Deprecated
     public ACLPathAndBytesable<String>              withProtectedEphemeralSequential();
 
     /**
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilder.java
index b39fea9..7fb00ac 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilder.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilder.java
@@ -16,12 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.framework.api;
 
-import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.CreateMode;
 
 public interface ExistsBuilder extends
-    Watchable<BackgroundPathable<Stat>>,
-    BackgroundPathable<Stat>
+    ExistsBuilderMain
 {
+    /**
+     * Causes any parent nodes to get created using {@link CreateMode#CONTAINER} if they haven't already been.
+     * IMPORTANT NOTE: container creation is a new feature in recent versions of ZooKeeper.
+     * If the ZooKeeper version you're using does not support containers, the parent nodes
+     * are created as ordinary PERSISTENT nodes.
+     *
+     * @return this
+     */
+    ExistsBuilderMain creatingParentContainersIfNeeded();
 }
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilderMain.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilderMain.java
new file mode 100644
index 0000000..2519616
--- /dev/null
+++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ExistsBuilderMain.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.api;
+
+import org.apache.zookeeper.data.Stat;
+
+public interface ExistsBuilderMain extends
+    Watchable<BackgroundPathable<Stat>>,
+    BackgroundPathable<Stat>
+{
+}
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 5432190..7184c39 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -46,6 +46,7 @@
     private CreateMode createMode;
     private Backgrounding backgrounding;
     private boolean createParentsIfNeeded;
+    private boolean createParentsAsContainers;
     private boolean doProtected;
     private boolean compress;
     private String protectedId;
@@ -64,6 +65,7 @@
         backgrounding = new Backgrounding();
         acling = new ACLing(client.getAclProvider());
         createParentsIfNeeded = false;
+        createParentsAsContainers = false;
         compress = false;
         doProtected = false;
         protectedId = null;
@@ -129,6 +131,13 @@
             }
 
             @Override
+            public ACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded()
+            {
+                setCreateParentsAsContainers();
+                return creatingParentsIfNeeded();
+            }
+
+            @Override
             public ACLPathAndBytesable<String> withProtectedEphemeralSequential()
             {
                 return CreateBuilderImpl.this.withProtectedEphemeralSequential();
@@ -259,6 +268,21 @@
     }
 
     @Override
+    public ProtectACLCreateModePathAndBytesable<String> creatingParentContainersIfNeeded()
+    {
+        setCreateParentsAsContainers();
+        return creatingParentsIfNeeded();
+    }
+
+    private void setCreateParentsAsContainers()
+    {
+        if ( client.useContainerParentsIfAvailable() )
+        {
+            createParentsAsContainers = true;
+        }
+    }
+
+    @Override
     public ProtectACLCreateModePathAndBytesable<String> creatingParentsIfNeeded()
     {
         createParentsIfNeeded = true;
@@ -492,7 +516,7 @@
 
                         if ( (rc == KeeperException.Code.NONODE.intValue()) && createParentsIfNeeded )
                         {
-                            backgroundCreateParentsThenNode(operationAndData);
+                            backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData().getPath(), backgrounding, createParentsAsContainers);
                         }
                         else
                         {
@@ -509,16 +533,16 @@
         return PROTECTED_PREFIX + protectedId + "-";
     }
 
-    private void backgroundCreateParentsThenNode(final OperationAndData<PathAndBytes> mainOperationAndData)
+    static <T> void backgroundCreateParentsThenNode(final CuratorFrameworkImpl client, final OperationAndData<T> mainOperationAndData, final String path, Backgrounding backgrounding, final boolean createParentsAsContainers)
     {
-        BackgroundOperation<PathAndBytes> operation = new BackgroundOperation<PathAndBytes>()
+        BackgroundOperation<T> operation = new BackgroundOperation<T>()
         {
             @Override
-            public void performBackgroundOperation(OperationAndData<PathAndBytes> dummy) throws Exception
+            public void performBackgroundOperation(OperationAndData<T> dummy) throws Exception
             {
                 try
                 {
-                    ZKPaths.mkdirs(client.getZooKeeper(), mainOperationAndData.getData().getPath(), false, client.getAclProvider());
+                    ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider(), createParentsAsContainers);
                 }
                 catch ( KeeperException e )
                 {
@@ -527,7 +551,7 @@
                 client.queueOperation(mainOperationAndData);
             }
         };
-        OperationAndData<PathAndBytes> parentOperation = new OperationAndData<PathAndBytes>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
+        OperationAndData<T> parentOperation = new OperationAndData<T>(operation, mainOperationAndData.getData(), null, null, backgrounding.getContext());
         client.queueOperation(parentOperation);
     }
 
@@ -698,7 +722,7 @@
                             {
                                 if ( createParentsIfNeeded )
                                 {
-                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider());
+                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false, client.getAclProvider(), createParentsAsContainers);
                                     createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                                 }
                                 else
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index b41d995..900374b 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -42,6 +42,7 @@
 import org.apache.curator.utils.DebugUtils;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.utils.ZookeeperFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -79,6 +80,7 @@
     private final ACLProvider aclProvider;
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+    private final boolean useContainerParentsIfAvailable;
 
     private volatile ExecutorService executorService;
     private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -119,6 +121,7 @@
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
+        useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
 
         byte[] builderDefaultData = builder.getDefaultData();
         defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
@@ -183,6 +186,13 @@
         namespace = new NamespaceImpl(this, null);
         state = parent.state;
         authInfos = parent.authInfos;
+        useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
+    }
+
+    @Override
+    public void createContainers(String path) throws Exception
+    {
+        checkExists().creatingParentContainersIfNeeded().forPath(ZKPaths.makePath(path, "foo"));
     }
 
     @Override
@@ -512,6 +522,11 @@
         return compressionProvider;
     }
 
+    boolean useContainerParentsIfAvailable()
+    {
+        return useContainerParentsIfAvailable;
+    }
+
     <DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
     {
         boolean isInitialExecution = (event == null);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
index 4681245..345bcf5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExistsBuilderImpl.java
@@ -26,8 +26,11 @@
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.ExistsBuilderMain;
 import org.apache.curator.framework.api.Pathable;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import java.util.concurrent.Callable;
@@ -38,12 +41,21 @@
     private final CuratorFrameworkImpl client;
     private Backgrounding backgrounding;
     private Watching watching;
+    private boolean createParentContainersIfNeeded;
 
     ExistsBuilderImpl(CuratorFrameworkImpl client)
     {
         this.client = client;
         backgrounding = new Backgrounding();
         watching = new Watching();
+        createParentContainersIfNeeded = false;
+    }
+
+    @Override
+    public ExistsBuilderMain creatingParentContainersIfNeeded()
+    {
+        createParentContainersIfNeeded = true;
+        return this;
     }
 
     @Override
@@ -141,7 +153,15 @@
         Stat        returnStat = null;
         if ( backgrounding.inBackground() )
         {
-            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
+            OperationAndData<String> operationAndData = new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext());
+            if ( createParentContainersIfNeeded )
+            {
+                CreateBuilderImpl.backgroundCreateParentsThenNode(client, operationAndData, operationAndData.getData(), backgrounding, true);
+            }
+            else
+            {
+                client.processBackgroundOperation(operationAndData, null);
+            }
         }
         else
         {
@@ -153,6 +173,40 @@
 
     private Stat pathInForeground(final String path) throws Exception
     {
+        if ( createParentContainersIfNeeded )
+        {
+            final String parent = ZKPaths.getPathAndNode(path).getPath();
+            if ( !parent.equals(ZKPaths.PATH_SEPARATOR) )
+            {
+                TimeTrace   trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground-CreateParents");
+                RetryLoop.callWithRetry
+                (
+                    client.getZookeeperClient(),
+                    new Callable<Void>()
+                    {
+                        @Override
+                        public Void call() throws Exception
+                        {
+                            try
+                            {
+                                ZKPaths.mkdirs(client.getZooKeeper(), parent, true, client.getAclProvider(), true);
+                            }
+                            catch ( KeeperException e )
+                            {
+                                // ignore
+                            }
+                            return null;
+                        }
+                    }
+                );
+                trace.commit();
+            }
+        }
+        return pathInForegroundStandard(path);
+    }
+
+    private Stat pathInForegroundStandard(final String path) throws Exception
+    {
         TimeTrace   trace = client.getZookeeperClient().startTracer("ExistsBuilderImpl-Foreground");
         Stat        returnStat = RetryLoop.callWithRetry
         (
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
index 3f24c79..60ef647 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceFacade.java
@@ -21,7 +21,9 @@
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.*;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
@@ -41,6 +43,12 @@
     }
 
     @Override
+    public void createContainers(String path) throws Exception
+    {
+        client.createContainers(path);
+    }
+
+    @Override
     public CuratorFramework nonNamespaceView()
     {
         return usingNamespace(null);
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceImpl.java
index 717b2f4..74f6320 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceImpl.java
@@ -18,15 +18,21 @@
  */
 package org.apache.curator.framework.imps;
 
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.EnsurePath;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.utils.PathUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.ZooDefs;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 class NamespaceImpl
 {
     private final CuratorFrameworkImpl client;
     private final String namespace;
-    private final EnsurePath ensurePath;
+    private final AtomicBoolean ensurePathNeeded;
 
     NamespaceImpl(CuratorFrameworkImpl client, String namespace)
     {
@@ -44,7 +50,7 @@
 
         this.client = client;
         this.namespace = namespace;
-        ensurePath = (namespace != null) ? new EnsurePath(ZKPaths.makePath("/", namespace)) : null;
+        ensurePathNeeded = new AtomicBoolean(namespace != null);
     }
 
     String getNamespace()
@@ -67,11 +73,25 @@
 
     String    fixForNamespace(String path, boolean isSequential)
     {
-        if ( ensurePath != null )
+        if ( ensurePathNeeded.get() )
         {
             try
             {
-                ensurePath.ensure(client.getZookeeperClient());
+                final CuratorZookeeperClient zookeeperClient = client.getZookeeperClient();
+                RetryLoop.callWithRetry
+                (
+                    zookeeperClient,
+                    new Callable<Object>()
+                    {
+                        @Override
+                        public Object call() throws Exception
+                        {
+                            ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), ZKPaths.makePath("/", namespace), true, client.getAclProvider(), true);
+                            return null;
+                        }
+                    }
+                );
+                ensurePathNeeded.set(false);
             }
             catch ( Exception e )
             {
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
index a1d9a8f..811631c 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java
@@ -33,6 +33,7 @@
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -41,6 +42,8 @@
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
@@ -52,6 +55,22 @@
 @SuppressWarnings("deprecation")
 public class TestFramework extends BaseClassForTests
 {
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        System.setProperty("container.checkIntervalMs", "1000");
+        super.setup();
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        System.clearProperty("container.checkIntervalMs");
+        super.teardown();
+    }
+
     @Test
     public void testConnectionState() throws Exception
     {
@@ -115,7 +134,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -163,7 +182,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -220,7 +239,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -313,7 +332,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -396,7 +415,157 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testOverrideCreateParentContainers() throws Exception
+    {
+        if ( !checkForContainers() )
+        {
+            return;
+        }
+
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .retryPolicy(new RetryOneTime(1))
+            .dontUseContainerParents()
+            .build();
+        try
+        {
+            client.start();
+            client.create().creatingParentContainersIfNeeded().forPath("/one/two/three", "foo".getBytes());
+            byte[] data = client.getData().forPath("/one/two/three");
+            Assert.assertEquals(data, "foo".getBytes());
+
+            client.delete().forPath("/one/two/three");
+            new Timing().sleepABit();
+
+            Assert.assertNotNull(client.checkExists().forPath("/one/two"));
+            new Timing().sleepABit();
+            Assert.assertNotNull(client.checkExists().forPath("/one"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testCreateParentContainers() throws Exception
+    {
+        if ( !checkForContainers() )
+        {
+            return;
+        }
+
+        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+        CuratorFramework client = builder.connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1)).build();
+        try
+        {
+            client.start();
+            client.create().creatingParentContainersIfNeeded().forPath("/one/two/three", "foo".getBytes());
+            byte[] data = client.getData().forPath("/one/two/three");
+            Assert.assertEquals(data, "foo".getBytes());
+
+            client.delete().forPath("/one/two/three");
+            new Timing().sleepABit();
+
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+            new Timing().sleepABit();
+            Assert.assertNull(client.checkExists().forPath("/one"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    private boolean checkForContainers()
+    {
+        if ( ZKPaths.getContainerCreateMode() == CreateMode.PERSISTENT )
+        {
+            System.out.println("Not using CreateMode.CONTAINER enabled version of ZooKeeper");
+            return false;
+        }
+        return true;
+    }
+
+    @Test
+    public void testCreatingParentsTheSame() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+            client.create().creatingParentContainersIfNeeded().forPath("/one/two/three");
+            Assert.assertNotNull(client.checkExists().forPath("/one/two"));
+
+            client.delete().deletingChildrenIfNeeded().forPath("/one");
+            Assert.assertNull(client.checkExists().forPath("/one"));
+
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+            client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three");
+            Assert.assertNotNull(client.checkExists().forPath("/one/two"));
+            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testExistsCreatingParents() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+            client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three");
+            Assert.assertNotNull(client.checkExists().forPath("/one/two"));
+            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
+            Assert.assertNull(client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
+    public void testExistsCreatingParentsInBackground() throws Exception
+    {
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        try
+        {
+            client.start();
+
+            Assert.assertNull(client.checkExists().forPath("/one/two"));
+
+            final CountDownLatch latch = new CountDownLatch(1);
+            BackgroundCallback callback = new BackgroundCallback()
+            {
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+                {
+                    latch.countDown();
+                }
+            };
+            client.checkExists().creatingParentContainersIfNeeded().inBackground(callback).forPath("/one/two/three");
+            Assert.assertTrue(new Timing().awaitLatch(latch));
+            Assert.assertNotNull(client.checkExists().forPath("/one/two"));
+            Assert.assertNull(client.checkExists().forPath("/one/two/three"));
+            Assert.assertNull(client.checkExists().creatingParentContainersIfNeeded().forPath("/one/two/three"));
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -420,7 +589,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -452,7 +621,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -483,7 +652,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -519,7 +688,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -550,7 +719,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -587,7 +756,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -624,7 +793,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -642,7 +811,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -662,7 +831,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -682,7 +851,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -705,7 +874,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -758,7 +927,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -791,7 +960,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -845,7 +1014,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -861,7 +1030,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 
@@ -879,7 +1048,7 @@
         }
         finally
         {
-            client.close();
+            CloseableUtils.closeQuietly(client);
         }
     }
 }
diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index 9d1d081..3394361 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -61,5 +61,17 @@
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/atomic/DistributedAtomicValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/atomic/DistributedAtomicValue.java
index c90fb2b..bbd9203 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/atomic/DistributedAtomicValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/atomic/DistributedAtomicValue.java
@@ -22,12 +22,10 @@
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.PathUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import java.util.Arrays;
-import org.apache.curator.utils.PathUtils;
-import org.apache.zookeeper.ZKUtil;
 
 /**
  * <p>A distributed value that attempts atomic sets. It first tries uses optimistic locking. If that fails,
@@ -44,7 +42,6 @@
     private final RetryPolicy       retryPolicy;
     private final PromotedToLock    promotedToLock;
     private final InterProcessMutex mutex;
-    private final EnsurePath        ensurePath;
 
     /**
      * Creates in optimistic mode only - i.e. the promotion to a mutex is not done
@@ -75,7 +72,6 @@
         this.retryPolicy = retryPolicy;
         this.promotedToLock = promotedToLock;
         mutex = (promotedToLock != null) ? new InterProcessMutex(client, promotedToLock.getPath()) : null;
-        ensurePath = client.newNamespaceAwareEnsurePath(path).excludingLast();
     }
 
     /**
@@ -104,14 +100,13 @@
     {
         try
         {
-            ensurePath.ensure(client.getZookeeperClient());
             client.setData().forPath(path, newValue);
         }
         catch ( KeeperException.NoNodeException dummy )
         {
             try
             {
-                client.create().forPath(path, newValue);
+                client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
             }
             catch ( KeeperException.NodeExistsException dummy2 )
             {
@@ -199,10 +194,9 @@
      */
     public boolean initialize(byte[] value) throws Exception
     {
-        ensurePath.ensure(client.getZookeeperClient());
         try
         {
-            client.create().forPath(path, value);
+            client.create().creatingParentContainersIfNeeded().forPath(path, value);
         }
         catch ( KeeperException.NodeExistsException ignore )
         {
@@ -251,7 +245,6 @@
         boolean             createIt = false;
         try
         {
-            ensurePath.ensure(client.getZookeeperClient());
             result.preValue = client.getData().storingStatIn(stat).forPath(path);
         }
         catch ( KeeperException.NoNodeException e )
@@ -335,7 +328,7 @@
             byte[]  newValue = makeValue.makeFrom(result.preValue);
             if ( createIt )
             {
-                client.create().forPath(path, newValue);
+                client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
             }
             else
             {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
index b891b2b..8a376f1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
@@ -67,7 +67,7 @@
     {
         try
         {
-            client.create().creatingParentsIfNeeded().forPath(barrierPath);
+            client.create().creatingParentContainersIfNeeded().forPath(barrierPath);
         }
         catch ( KeeperException.NodeExistsException ignore )
         {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
index 5034b0a..b3bdf2c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
@@ -118,7 +118,7 @@
         long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;
 
         boolean         readyPathExists = (client.checkExists().usingWatcher(watcher).forPath(readyPath) != null);
-        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);
+        client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(ourPath);
 
         boolean         result = (readyPathExists || internalEnter(startMs, hasMaxWait, maxWaitMs));
         if ( connectionLost.get() )
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
index fa0df51..72ee5ff 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/NodeCache.java
@@ -29,7 +29,7 @@
 import org.apache.curator.framework.listen.ListenerContainer;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.PathUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.Stat;
@@ -40,7 +40,6 @@
 import java.util.concurrent.Exchanger;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>A utility that attempts to keep the data from a node locally cached. This class
@@ -57,7 +56,6 @@
     private final CuratorFramework client;
     private final String path;
     private final boolean dataIsCompressed;
-    private final EnsurePath ensurePath;
     private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
@@ -132,7 +130,6 @@
         this.client = client;
         this.path = PathUtils.validatePath(path);
         this.dataIsCompressed = dataIsCompressed;
-        ensurePath = client.newNamespaceAwareEnsurePath(path).excludingLast();
     }
 
     /**
@@ -156,10 +153,10 @@
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        ensurePath.ensure(client.getZookeeperClient());
-
         client.getConnectionStateListenable().addListener(connectionStateListener);
 
+        client.checkExists().creatingParentContainersIfNeeded().forPath(path);
+
         if ( buildInitial )
         {
             internalRebuild();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
index 71d83fe..b5d912c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
@@ -24,7 +24,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
@@ -34,7 +33,7 @@
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
-import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
@@ -54,7 +53,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>A utility that attempts to keep all data from all children of a ZK path locally cached. This class
@@ -74,7 +72,6 @@
     private final CloseableExecutorService executorService;
     private final boolean cacheData;
     private final boolean dataIsCompressed;
-    private final EnsurePath ensurePath;
     private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
     private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
     private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
@@ -143,6 +140,7 @@
      * @param mode   caching mode
      * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean)} instead
      */
+    @Deprecated
     @SuppressWarnings("deprecation")
     public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
     {
@@ -156,6 +154,7 @@
      * @param threadFactory factory to use when creating internal threads
      * @deprecated use {@link #PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)} instead
      */
+    @Deprecated
     @SuppressWarnings("deprecation")
     public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
     {
@@ -200,7 +199,7 @@
      * @param path             path to watch
      * @param cacheData        if true, node contents are cached in addition to the stat
      * @param dataIsCompressed if true, data in the path is compressed
-     * @param executorService  ExecutorService to use for the PathChildrenCache's background thread
+     * @param executorService  ExecutorService to use for the PathChildrenCache's background thread. This service should be single threaded, otherwise the cache may see inconsistent results.
      */
     public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
     {
@@ -212,7 +211,7 @@
      * @param path             path to watch
      * @param cacheData        if true, node contents are cached in addition to the stat
      * @param dataIsCompressed if true, data in the path is compressed
-     * @param executorService  Closeable ExecutorService to use for the PathChildrenCache's background thread
+     * @param executorService  Closeable ExecutorService to use for the PathChildrenCache's background thread. This service should be single threaded, otherwise the cache may see inconsistent results.
      */
     public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
     {
@@ -221,7 +220,6 @@
         this.cacheData = cacheData;
         this.dataIsCompressed = dataIsCompressed;
         this.executorService = executorService;
-        ensurePath = client.newNamespaceAwareEnsurePath(path);
     }
 
     /**
@@ -243,6 +241,7 @@
      * @throws Exception errors
      * @deprecated use {@link #start(StartMode)}
      */
+    @Deprecated
     public void start(boolean buildInitial) throws Exception
     {
         start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
@@ -320,7 +319,7 @@
     {
         Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
 
-        ensurePath.ensure(client.getZookeeperClient());
+        ensurePath();
 
         clear();
 
@@ -352,7 +351,7 @@
         Preconditions.checkArgument(ZKPaths.getPathAndNode(fullPath).getPath().equals(path), "Node is not part of this cache: " + fullPath);
         Preconditions.checkState(!executorService.isShutdown(), "cache has been closed");
 
-        ensurePath.ensure(client.getZookeeperClient());
+        ensurePath();
         internalRebuildNode(fullPath);
 
         // this is necessary so that any updates that occurred while rebuilding are taken
@@ -481,7 +480,7 @@
 
     void refresh(final RefreshMode mode) throws Exception
     {
-        ensurePath.ensure(client.getZookeeperClient());
+        ensurePath();
 
         final BackgroundCallback callback = new BackgroundCallback()
         {
@@ -612,6 +611,11 @@
         }
     }
 
+    private void ensurePath() throws Exception
+    {
+        client.createContainers(path);
+    }
+
     private void handleStateChange(ConnectionState newState)
     {
         switch ( newState )
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheMode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheMode.java
index dcd9be3..5c15fda 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheMode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCacheMode.java
@@ -27,6 +27,7 @@
  * @deprecated no longer used. Instead use either {@link PathChildrenCache#PathChildrenCache(CuratorFramework, String, boolean)}
  * or {@link PathChildrenCache#PathChildrenCache(CuratorFramework, String, boolean, ThreadFactory)}
  */
+@Deprecated
 public enum PathChildrenCacheMode
 {
     /**
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
index 41fc574..4f3ffb6 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
@@ -33,6 +33,7 @@
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.CloseableExecutorService;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.KeeperException;
@@ -44,6 +45,7 @@
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -69,6 +71,7 @@
 public class TreeCache implements Closeable
 {
     private static final Logger LOG = LoggerFactory.getLogger(TreeCache.class);
+    private final boolean createParentNodes;
 
     public static final class Builder
     {
@@ -78,6 +81,7 @@
         private boolean dataIsCompressed = false;
         private CloseableExecutorService executorService = null;
         private int maxDepth = Integer.MAX_VALUE;
+        private boolean createParentNodes = false;
 
         private Builder(CuratorFramework client, String path)
         {
@@ -95,7 +99,7 @@
             {
                 executor = new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory));
             }
-            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor);
+            return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes);
         }
 
         /**
@@ -159,6 +163,19 @@
             this.maxDepth = maxDepth;
             return this;
         }
+
+        /**
+         * By default, TreeCache does not auto-create parent nodes for the cached path. Change
+         * this behavior with this method. NOTE: parent nodes are created as containers
+         *
+         * @param createParentNodes true to create parent nodes
+         * @return this for chaining
+         */
+        public Builder setCreateParentNodes(boolean createParentNodes)
+        {
+            this.createParentNodes = createParentNodes;
+            return this;
+        }
     }
 
     /**
@@ -284,7 +301,8 @@
                 return;
             }
 
-            if ( nodeState.compareAndSet(NodeState.LIVE, NodeState.DEAD) )
+            NodeState oldState = nodeState.getAndSet(NodeState.DEAD);
+            if ( oldState == NodeState.LIVE )
             {
                 publishEvent(TreeCacheEvent.Type.NODE_REMOVED, path);
             }
@@ -346,10 +364,6 @@
                     nodeState.compareAndSet(NodeState.DEAD, NodeState.PENDING);
                     wasCreated();
                 }
-                else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
-                {
-                    wasDeleted();
-                }
                 break;
             case CHILDREN:
                 if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
@@ -412,7 +426,8 @@
                     }
 
                     Stat oldStat = stat.getAndSet(newStat);
-                    if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) )
+                    NodeState oldState = nodeState.getAndSet(NodeState.LIVE);
+                    if ( oldState != NodeState.LIVE )
                     {
                         publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), newStat, event.getData()));
                     }
@@ -500,7 +515,7 @@
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+        this(client, path, true, false, Integer.MAX_VALUE, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true), false);
     }
 
     /**
@@ -509,9 +524,11 @@
      * @param cacheData        if true, node contents are cached in addition to the stat
      * @param dataIsCompressed if true, data in the path is compressed
      * @param executorService  Closeable ExecutorService to use for the TreeCache's background thread
+     * @param createParentNodes true to create parent nodes as containers
      */
-    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final CloseableExecutorService executorService)
+    TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final CloseableExecutorService executorService, boolean createParentNodes)
     {
+        this.createParentNodes = createParentNodes;
         this.root = new TreeNode(validatePath(path), null);
         this.client = client;
         this.cacheData = cacheData;
@@ -529,6 +546,10 @@
     public TreeCache start() throws Exception
     {
         Preconditions.checkState(treeState.compareAndSet(TreeState.LATENT, TreeState.STARTED), "already started");
+        if ( createParentNodes )
+        {
+            client.createContainers(root.path);
+        }
         client.getConnectionStateListenable().addListener(connectionStateListener);
         if ( client.getZookeeperClient().isConnected() )
         {
@@ -580,33 +601,36 @@
         return errorListeners;
     }
 
-    private TreeNode find(String fullPath)
+    private TreeNode find(String findPath)
     {
-        if ( !fullPath.startsWith(root.path) )
-        {
-            return null;
+        PathUtils.validatePath(findPath);
+        LinkedList<String> rootElements = new LinkedList<String>(ZKPaths.split(root.path));
+        LinkedList<String> findElements = new LinkedList<String>(ZKPaths.split(findPath));
+        while (!rootElements.isEmpty()) {
+            if (findElements.isEmpty()) {
+                // Target path shorter than root path
+                return null;
+            }
+            String nextRoot = rootElements.removeFirst();
+            String nextFind = findElements.removeFirst();
+            if (!nextFind.equals(nextRoot)) {
+                // Initial root path does not match
+                return null;
+            }
         }
 
         TreeNode current = root;
-        if ( fullPath.length() > root.path.length() )
-        {
-            if ( root.path.length() > 1 )
+        while (!findElements.isEmpty()) {
+            String nextFind = findElements.removeFirst();
+            ConcurrentMap<String, TreeNode> map = current.children.get();
+            if ( map == null )
             {
-                fullPath = fullPath.substring(root.path.length());
+                return null;
             }
-            List<String> split = ZKPaths.split(fullPath);
-            for ( String part : split )
+            current = map.get(nextFind);
+            if ( current == null )
             {
-                ConcurrentMap<String, TreeNode> map = current.children.get();
-                if ( map == null )
-                {
-                    return null;
-                }
-                current = map.get(part);
-                if ( current == null )
-                {
-                    return null;
-                }
+                return null;
             }
         }
         return current;
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 40d92e4..a6d8145 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -500,7 +500,7 @@
                 }
             }
         };
-        client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
+        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
     }
 
     private synchronized void internalStart()
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
index 9c09b4c..716ca96 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderSelector.java
@@ -230,7 +230,7 @@
         return internalRequeue();
     }
 
-    public synchronized boolean internalRequeue()
+    private synchronized boolean internalRequeue()
     {
         if ( !isQueued && (state.get() == State.STARTED) )
         {
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 2a1d73e..9d196e8 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -29,6 +29,7 @@
 import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +49,11 @@
 /**
  * Utility to reap empty child nodes of a parent node. Periodically calls getChildren on
  * the node and adds empty nodes to an internally managed {@link Reaper}
+ *
+ * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
+ * Also, all Curator recipes create container parents.
  */
+@Deprecated
 public class ChildReaper implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphore.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphore.java
index 2f4b1ed..3d29aa8 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphore.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphore.java
@@ -63,6 +63,7 @@
  *
  * @deprecated Use {@link InterProcessSemaphoreV2} instead of this class. It uses a better algorithm.
  */
+@Deprecated
 public class InterProcessSemaphore
 {
     private final Logger        log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index b6d5ca2..f4af39b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -347,7 +347,7 @@
         }
         try
         {
-            PathAndBytesable<String> createBuilder = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
+            PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
             String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
             String nodeName = ZKPaths.getNodeFromPath(path);
             builder.add(makeLease(path));
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index efd363f..a7a575f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -26,6 +26,7 @@
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.utils.CloseableScheduledExecutorService;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -41,7 +42,11 @@
 
 /**
  * Utility to clean up parent lock nodes so that they don't stay around as garbage
+ *
+ * @deprecated Since 2.9.0 - Reaper/ChildReaper are no longer needed. Use {@link CreateMode#CONTAINER}.
+ * Also, all Curator recipes create container parents.
  */
+@Deprecated
 public class Reaper implements Closeable
 {
     private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
index 0c9b6de..43184f5 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/StandardLockInternalsDriver.java
@@ -47,11 +47,11 @@
         String ourPath;
         if ( lockNodeBytes != null )
         {
-            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
+            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
         }
         else
         {
-            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
+            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
         }
         return ourPath;
     }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
index ddf91ba..0d963e0 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentEphemeralNode.java
@@ -240,7 +240,7 @@
 
                     if(nodeExists)
                     {
-                    	client.setData().inBackground(setDataCallback).forPath(getActualPath(), data);
+                       client.setData().inBackground(setDataCallback).forPath(getActualPath(), getData());
                     }
                     else
                     {
@@ -254,7 +254,7 @@
             }
         };
 
-        createMethod = mode.isProtected() ? client.create().creatingParentsIfNeeded().withProtection() : client.create().creatingParentsIfNeeded();
+        createMethod = mode.isProtected() ? client.create().creatingParentContainersIfNeeded().withProtection() : client.create().creatingParentContainersIfNeeded();
         this.data.set(Arrays.copyOf(data, data.length));
     }
     
@@ -338,10 +338,14 @@
         this.data.set(Arrays.copyOf(data, data.length));
         if ( isActive() )
         {
-            client.setData().inBackground().forPath(getActualPath(), this.data.get());
+            client.setData().inBackground().forPath(getActualPath(), getData());
         }
     }
 
+    private byte[] getData() {
+        return this.data.get();
+    }
+
     private void deleteNode() throws Exception
     {
         String localNodePath = nodePath.getAndSet(null);
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
index 9dd2217..3ed3218 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java
@@ -163,7 +163,7 @@
 
         try
         {
-            client.create().creatingParentsIfNeeded().forPath(queuePath);
+            client.create().creatingParentContainersIfNeeded().forPath(queuePath);
         }
         catch ( KeeperException.NodeExistsException ignore )
         {
@@ -173,7 +173,7 @@
         {
             try
             {
-                client.create().creatingParentsIfNeeded().forPath(lockPath);
+                client.create().creatingParentContainersIfNeeded().forPath(lockPath);
             }
             catch ( KeeperException.NodeExistsException ignore )
             {
@@ -756,7 +756,7 @@
                 client.inTransaction()
                     .delete().forPath(itemPath)
                     .and()
-                    .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeItemPath(), bytes)
+                    .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(itemPath, bytes)
                     .and()
                     .commit();
             }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueSharder.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueSharder.java
index 2dbd484..455794c 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueSharder.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueSharder.java
@@ -111,7 +111,7 @@
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        client.newNamespaceAwareEnsurePath(queuePath).ensure(client.getZookeeperClient());
+        client.createContainers(queuePath);
 
         getInitialQueues();
         leaderLatch.start();
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
index 0c386cd..9650ffb 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.java
@@ -19,7 +19,7 @@
 package org.apache.curator.framework.recipes.queue;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.EnsurePath;
+import org.apache.curator.utils.PathUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -32,7 +32,6 @@
 import java.util.NoSuchElementException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.curator.utils.PathUtils;
 
 /**
  * <p>
@@ -50,7 +49,6 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final CuratorFramework client;
     private final String path;
-    private final EnsurePath ensurePath;
 
     private final String PREFIX = "qn-";
 
@@ -62,7 +60,6 @@
     {
         this.client = client;
         this.path = PathUtils.validatePath(path);
-        ensurePath = client.newNamespaceAwareEnsurePath(path);
     }
 
     /**
@@ -119,10 +116,8 @@
      */
     public boolean offer(byte[] data) throws Exception
     {
-        ensurePath.ensure(client.getZookeeperClient());
-
         String thisPath = ZKPaths.makePath(path, PREFIX);
-        client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(thisPath, data);
+        client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(thisPath, data);
         return true;
     }
 
@@ -181,7 +176,7 @@
 
     private byte[] internalPoll(long timeout, TimeUnit unit) throws Exception
     {
-        ensurePath.ensure(client.getZookeeperClient());
+        ensurePath();
 
         long            startMs = System.currentTimeMillis();
         boolean         hasTimeout = (unit != null);
@@ -220,9 +215,14 @@
         }
     }
 
+    private void ensurePath() throws Exception
+    {
+        client.createContainers(path);
+    }
+
     private byte[] internalElement(boolean removeIt, Watcher watcher) throws Exception
     {
-        ensurePath.ensure(client.getZookeeperClient());
+        ensurePath();
 
         List<String> nodes;
         try
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 6ca53ec..6ce6bf4 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -220,7 +220,7 @@
         client.getConnectionStateListenable().addListener(connectionStateListener);
         try
         {
-            client.create().creatingParentsIfNeeded().forPath(path, seedValue);
+            client.create().creatingParentContainersIfNeeded().forPath(path, seedValue);
         }
         catch ( KeeperException.NodeExistsException ignore )
         {
diff --git a/curator-recipes/src/site/confluence/index.confluence b/curator-recipes/src/site/confluence/index.confluence
index a43963c..4f3a032 100644
--- a/curator-recipes/src/site/confluence/index.confluence
+++ b/curator-recipes/src/site/confluence/index.confluence
@@ -1,6 +1,8 @@
 h1. Recipes
 
 Curator implements all of the recipes listed on the ZooKeeper recipes doc (except two phase commit). Click on the recipe name below for detailed documentation.
+NOTE: Most Curator recipes will auto-create parent nodes of paths given to the recipe as CreateMode.CONTAINER. Also, see [[Tech Note 7|https://cwiki.apache.org/confluence/display/CURATOR/TN7]]
+regarding "Curator Recipes Own Their ZNode/Paths".
 
 ||Elections||
 |[[Leader Latch|leader-latch.html]] \- In distributed computing, leader election is the process of designating a single process as the organizer of some task distributed among several computers (nodes). Before the task is begun, all network nodes are unaware which node will serve as the "leader," or coordinator, of the task. After a leader election algorithm has been run, however, each node throughout the network recognizes a particular, unique node as the task leader.|
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
index b904bdc..14d061f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -31,6 +31,7 @@
 import org.apache.curator.framework.imps.CuratorFrameworkImpl;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
@@ -101,84 +102,6 @@
     }
 
     @Test
-    public void testClientClosedDuringRefreshErrorMessage() throws Exception
-    {
-        Timing timing = new Timing();
-
-        // Fiddle with logging so we can intercept the error events for org.apache.curator
-        final List<LoggingEvent> events = Lists.newArrayList();
-        Collection<String> messages = Collections2.transform(events, new Function<LoggingEvent, String>() {
-            @Override
-            public String apply(LoggingEvent loggingEvent) {
-                return loggingEvent.getRenderedMessage();
-            }
-        });
-        Appender appender = new AppenderSkeleton(true) {
-            @Override
-            protected void append(LoggingEvent event) {
-                if (event.getLevel().equals(Level.ERROR)) {
-                    events.add(event);
-                }
-            }
-
-            @Override
-            public void close() {
-
-            }
-
-            @Override
-            public boolean requiresLayout() {
-                return false;
-            }
-        };
-        appender.setLayout(new SimpleLayout());
-        Logger logger = Logger.getLogger("org.apache.curator");
-        logger.addAppender(appender);
-
-        // Check that we can intercept error log messages from the client
-        CuratorFramework clientTestLogSetup = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-        clientTestLogSetup.start();
-        try {
-            Pathable<byte[]> callback = clientTestLogSetup.getData().inBackground(new BackgroundCallback() {
-                @Override
-                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
-                    // ignore result
-                }
-            });
-            CloseableUtils.closeQuietly(clientTestLogSetup);
-            callback.forPath("/test/aaa"); // this should cause an error log message
-        } catch (IllegalStateException ise) {
-            // ok, excpected
-        } finally {
-            CloseableUtils.closeQuietly(clientTestLogSetup);
-        }
-
-        Assert.assertTrue(messages.contains("Background exception was not retry-able or retry gave up"),
-                "The expected error was not logged. This is an indication that this test could be broken due to" +
-                        " an incomplete logging setup.");
-
-        // try to reproduce a bunch of times because it doesn't happen reliably
-        for (int i = 0; i < 50; i++) {
-            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
-            client.start();
-            try {
-                PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
-                cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
-                client.newNamespaceAwareEnsurePath("/test/aaa").ensure(client.getZookeeperClient());
-                client.setData().forPath("/test/aaa", new byte[]{1, 2, 3, 4, 5});
-                cache.rebuildNode("/test/aaa");
-                CloseableUtils.closeQuietly(cache);
-            } finally {
-                CloseableUtils.closeQuietly(client);
-            }
-        }
-
-        Assert.assertEquals(messages.size(), 1, "There should not be any error events except for the test message, " +
-                "but got:\n" + Joiner.on("\n").join(messages));
-
-    }
-
-    @Test
     public void testAsyncInitialPopulation() throws Exception
     {
         Timing timing = new Timing();
@@ -1039,127 +962,4 @@
             CloseableUtils.closeQuietly(client);
         }
     }
-
-    public static class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
-    {
-        boolean executeCalled = false;
-
-        public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
-        {
-            super(delegate);
-        }
-
-        @Override
-        public synchronized void execute(Runnable command)
-        {
-            executeCalled = true;
-            super.execute(command);
-        }
-
-        public synchronized boolean isExecuteCalled()
-        {
-            return executeCalled;
-        }
-
-        public synchronized void setExecuteCalled(boolean executeCalled)
-        {
-            this.executeCalled = executeCalled;
-        }
-    }
-
-    public static class DelegatingExecutorService implements ExecutorService
-    {
-        private final ExecutorService delegate;
-
-        public DelegatingExecutorService(
-                ExecutorService delegate
-        )
-        {
-            this.delegate = delegate;
-        }
-
-
-        @Override
-        public void shutdown()
-        {
-            delegate.shutdown();
-        }
-
-        @Override
-        public List<Runnable> shutdownNow()
-        {
-            return delegate.shutdownNow();
-        }
-
-        @Override
-        public boolean isShutdown()
-        {
-            return delegate.isShutdown();
-        }
-
-        @Override
-        public boolean isTerminated()
-        {
-            return delegate.isTerminated();
-        }
-
-        @Override
-        public boolean awaitTermination(long timeout, TimeUnit unit)
-                throws InterruptedException
-        {
-            return delegate.awaitTermination(timeout, unit);
-        }
-
-        @Override
-        public <T> Future<T> submit(Callable<T> task)
-        {
-            return delegate.submit(task);
-        }
-
-        @Override
-        public <T> Future<T> submit(Runnable task, T result)
-        {
-            return delegate.submit(task, result);
-        }
-
-        @Override
-        public Future<?> submit(Runnable task)
-        {
-            return delegate.submit(task);
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException
-        {
-            return delegate.invokeAll(tasks);
-        }
-
-        @Override
-        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-                throws InterruptedException
-        {
-            return delegate.invokeAll(tasks, timeout, unit);
-        }
-
-        @Override
-        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-                throws InterruptedException, ExecutionException
-        {
-            return delegate.invokeAny(tasks);
-        }
-
-        @Override
-        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException, TimeoutException
-        {
-            return delegate.invokeAny(tasks, timeout, unit);
-        }
-
-        @Override
-        public void execute(Runnable command)
-        {
-            delegate.execute(command);
-        }
-    }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
index 467f32b..0bccb54 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java
@@ -56,6 +56,19 @@
     }
 
     @Test
+    public void testCreateParents() throws Exception
+    {
+        cache = newTreeCacheWithListeners(client, "/one/two/three");
+        cache.start();
+        Assert.assertNull(client.checkExists().forPath("/one/two/three"));
+        cache.close();
+
+        cache = TreeCache.newBuilder(client, "/one/two/three").setCreateParentNodes(true).build();
+        cache.start();
+        Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
+    }
+
+    @Test
     public void testStartEmpty() throws Exception
     {
         cache = newTreeCacheWithListeners(client, "/test");
@@ -382,11 +395,16 @@
         assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test");
         assertEvent(TreeCacheEvent.Type.INITIALIZED);
         Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.getCurrentChildren("/t"));
+        Assert.assertNull(cache.getCurrentChildren("/testing"));
 
         client.create().forPath("/test/one", "hey there".getBytes());
         assertEvent(TreeCacheEvent.Type.NODE_ADDED, "/test/one");
         Assert.assertEquals(cache.getCurrentChildren("/test").keySet(), ImmutableSet.of("one"));
         Assert.assertEquals(new String(cache.getCurrentData("/test/one").getData()), "hey there");
+        Assert.assertEquals(cache.getCurrentChildren("/test/one").keySet(), ImmutableSet.of());
+        Assert.assertNull(cache.getCurrentChildren("/test/o"));
+        Assert.assertNull(cache.getCurrentChildren("/test/onely"));
 
         client.setData().forPath("/test/one", "sup!".getBytes());
         assertEvent(TreeCacheEvent.Type.NODE_UPDATED, "/test/one");
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
index b1631a0..09b5fe6 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMultiMutex.java
@@ -30,8 +30,8 @@
 
 public class TestInterProcessMultiMutex extends TestInterProcessMutexBase
 {
-    private static final String     LOCK_PATH_1 = "/locks/our-lock-1";
-    private static final String     LOCK_PATH_2 = "/locks/our-lock-2";
+    private static final String     LOCK_PATH_1 = LOCK_BASE_PATH + "/our-lock-1";
+    private static final String     LOCK_PATH_2 = LOCK_BASE_PATH + "/our-lock-2";
 
     @Override
     protected InterProcessLock makeLock(CuratorFramework client)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index 453de33..a2c079e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -35,7 +35,7 @@
 
 public class TestInterProcessMutex extends TestInterProcessMutexBase
 {
-    private static final String LOCK_PATH = "/locks/our-lock";
+    private static final String LOCK_PATH = LOCK_BASE_PATH + "/our-lock";
 
     @Override
     protected InterProcessLock makeLock(CuratorFramework client)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
index 3fe8110..99ea11f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -20,17 +20,18 @@
 package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.KillSession;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.List;
@@ -47,6 +48,8 @@
 
 public abstract class TestInterProcessMutexBase extends BaseClassForTests
 {
+    protected static final String LOCK_BASE_PATH = "/locks";
+
     private volatile CountDownLatch waitLatchForBar = null;
     private volatile CountDownLatch countLatchForBar = null;
 
@@ -182,6 +185,82 @@
     }
 
     @Test
+    public void testContainerCleanup() throws Exception
+    {
+        if ( !ZKPaths.hasContainerSupport() )
+        {
+            System.out.println("ZooKeeper version does not support Containers. Skipping test");
+            return;
+        }
+
+        server.close();
+
+        System.setProperty("container.checkIntervalMs", "10");
+        try
+        {
+            server = new TestingServer();
+
+            final int THREAD_QTY = 10;
+
+            ExecutorService service = null;
+            final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(100, 3));
+            try
+            {
+                client.start();
+
+                List<Future<Object>> threads = Lists.newArrayList();
+                service = Executors.newCachedThreadPool();
+                for ( int i = 0; i < THREAD_QTY; ++i )
+                {
+                    Future<Object> t = service.submit
+                    (
+                        new Callable<Object>()
+                        {
+                            @Override
+                            public Object call() throws Exception
+                            {
+                                InterProcessLock lock = makeLock(client);
+                                lock.acquire();
+                                try
+                                {
+                                    Thread.sleep(10);
+                                }
+                                finally
+                                {
+                                    lock.release();
+                                }
+                                return null;
+                            }
+                        }
+                    );
+                    threads.add(t);
+                }
+
+                for ( Future<Object> t : threads )
+                {
+                    t.get();
+                }
+
+                new Timing().sleepABit();
+
+                Assert.assertNull(client.checkExists().forPath(LOCK_BASE_PATH));
+            }
+            finally
+            {
+                if ( service != null )
+                {
+                    service.shutdownNow();
+                }
+                CloseableUtils.closeQuietly(client);
+            }
+        }
+        finally
+        {
+            System.clearProperty("container.checkIntervalMs");
+        }
+    }
+
+    @Test
     public void testWithNamespace() throws Exception
     {
         CuratorFramework client = CuratorFrameworkFactory.builder().
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreMutex.java
index 0af2bf4..cd8b83e 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphoreMutex.java
@@ -23,7 +23,7 @@
 
 public class TestInterProcessSemaphoreMutex extends TestInterProcessMutexBase
 {
-    private static final String LOCK_PATH = "/locks/our-lock";
+    private static final String LOCK_PATH = LOCK_BASE_PATH + "/our-lock";
 
     @Override
     @Test(enabled = false)
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
index 34620ff..9f5907a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/nodes/TestPersistentEphemeralNode.java
@@ -23,6 +23,8 @@
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.RetryOneTime;
@@ -536,6 +538,45 @@
             node.close();
         }    	
     }
+
+    @Test
+    public void testSetUpdatedDataWhenReconnected() throws Exception
+    {
+        CuratorFramework curator = newCurator();
+
+        byte[] initialData = "Hello World".getBytes();
+        byte[] updatedData = "Updated".getBytes();
+
+        PersistentEphemeralNode node = new PersistentEphemeralNode(curator, PersistentEphemeralNode.Mode.EPHEMERAL, PATH, initialData);
+        node.start();
+        try
+        {
+            node.waitForInitialCreate(timing.forWaiting().seconds(), TimeUnit.SECONDS);
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), initialData));
+
+            node.setData(updatedData);
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData));
+
+            server.restart();
+
+            final CountDownLatch dataUpdateLatch = new CountDownLatch(1);
+            curator.getData().inBackground(new BackgroundCallback() {
+
+                @Override
+                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+                    dataUpdateLatch.countDown();
+                }
+            }).forPath(node.getActualPath());
+
+            assertTrue(timing.awaitLatch(dataUpdateLatch));
+
+            assertTrue(Arrays.equals(curator.getData().forPath(node.getActualPath()), updatedData));
+        }
+        finally
+        {
+            node.close();
+        }
+    }
     
     /**
      * See CURATOR-190
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
index 30e552f..858086b 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java
@@ -124,4 +124,51 @@
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testRequeuingWithLock() throws Exception
+    {
+        DistributedIdQueue<TestQueueItem>  queue = null;
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+        client.start();
+        try
+        {
+            final CountDownLatch        consumingLatch = new CountDownLatch(1);
+
+            QueueConsumer<TestQueueItem> consumer = new QueueConsumer<TestQueueItem>()
+            {
+                @Override
+                public void consumeMessage(TestQueueItem message) throws Exception
+                {
+                    consumingLatch.countDown();
+                    // Throw an exception so requeuing occurs
+                    throw new Exception("Consumer failed");
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                }
+            };
+
+            queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH).lockPath("/locks").buildIdQueue();
+            queue.start();
+
+            queue.put(new TestQueueItem("test"), "id");
+
+            Assert.assertTrue(consumingLatch.await(10, TimeUnit.SECONDS));  // wait until consumer has it
+
+            // Sleep one more second
+
+            Thread.sleep(1000);
+
+            Assert.assertEquals(queue.remove("id"), 1);
+
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(queue);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git a/curator-test/pom.xml b/curator-test/pom.xml
index 1cbd93c..80eedb2 100644
--- a/curator-test/pom.xml
+++ b/curator-test/pom.xml
@@ -36,6 +36,11 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.javassist</groupId>
             <artifactId>javassist</artifactId>
         </dependency>
@@ -46,9 +51,20 @@
         </dependency>
 
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
             <scope>provided</scope>
         </dependency>
+
+	<dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
index d676a9b..13c3138 100644
--- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
+++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java
@@ -25,6 +25,7 @@
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeSuite;
+import java.io.IOException;
 import java.net.BindException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -34,19 +35,24 @@
 
     private static final int    RETRY_WAIT_MS = 5000;
     private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES;
+    private static final String INTERNAL_RETRY_FAILED_TESTS;
     static
     {
-        String s = null;
+        String logConnectionIssues = null;
+        String retryFailedTests = null;
         try
         {
             // use reflection to avoid adding a circular dependency in the pom
-            s = (String)Class.forName("org.apache.curator.utils.DebugUtils").getField("PROPERTY_DONT_LOG_CONNECTION_ISSUES").get(null);
+            Class<?> debugUtilsClazz = Class.forName("org.apache.curator.utils.DebugUtils");
+            logConnectionIssues = (String)debugUtilsClazz.getField("PROPERTY_DONT_LOG_CONNECTION_ISSUES").get(null);
+            retryFailedTests = (String)debugUtilsClazz.getField("PROPERTY_RETRY_FAILED_TESTS").get(null);
         }
         catch ( Exception e )
         {
             e.printStackTrace();
         }
-        INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES = s;
+        INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES = logConnectionIssues;
+        INTERNAL_RETRY_FAILED_TESTS = retryFailedTests;
     }
 
     @BeforeSuite(alwaysRun = true)
@@ -83,13 +89,23 @@
     @AfterMethod
     public void teardown() throws Exception
     {
-        server.close();
-        server = null;
+        if ( server != null )
+        {
+            try
+            {
+                server.close();
+            }
+            catch ( IOException e )
+            {
+                e.printStackTrace();
+            }
+            server = null;
+        }
     }
 
     private static class RetryTest implements IRetryAnalyzer
     {
-        private final AtomicBoolean hasBeenRetried = new AtomicBoolean(false);
+        private final AtomicBoolean hasBeenRetried = new AtomicBoolean(!Boolean.getBoolean(INTERNAL_RETRY_FAILED_TESTS));
 
         @Override
         public boolean retry(ITestResult result)
diff --git a/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
new file mode 100644
index 0000000..eff34dd
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/DelegatingExecutorService.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+
+public class DelegatingExecutorService implements ExecutorService
+{
+    private final ExecutorService delegate;
+
+    public DelegatingExecutorService(
+            ExecutorService delegate
+    )
+    {
+        this.delegate = delegate;
+    }
+
+
+    @Override
+    public void shutdown()
+    {
+        delegate.shutdown();
+    }
+
+    @Override
+    public List<Runnable> shutdownNow()
+    {
+        return delegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown()
+    {
+        return delegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated()
+    {
+        return delegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException
+    {
+        return delegate.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task)
+    {
+        return delegate.submit(task);
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result)
+    {
+        return delegate.submit(task, result);
+    }
+
+    @Override
+    public Future<?> submit(Runnable task)
+    {
+        return delegate.submit(task);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException
+    {
+        return delegate.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException
+    {
+        return delegate.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException, ExecutionException
+    {
+        return delegate.invokeAny(tasks);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException
+    {
+        return delegate.invokeAny(tasks, timeout, unit);
+    }
+
+    @Override
+    public void execute(Runnable command)
+    {
+        delegate.execute(command);
+    }
+}
diff --git a/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
new file mode 100644
index 0000000..da7bc66
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/ExecuteCalledWatchingExecutorService.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.test;
+
+import java.util.concurrent.ExecutorService;
+
+public class ExecuteCalledWatchingExecutorService extends DelegatingExecutorService
+{
+    boolean executeCalled = false;
+
+    public ExecuteCalledWatchingExecutorService(ExecutorService delegate)
+    {
+        super(delegate);
+    }
+
+    @Override
+    public synchronized void execute(Runnable command)
+    {
+        executeCalled = true;
+        super.execute(command);
+    }
+
+    public synchronized boolean isExecuteCalled()
+    {
+        return executeCalled;
+    }
+
+    public synchronized void setExecuteCalled(boolean executeCalled)
+    {
+        this.executeCalled = executeCalled;
+    }
+}
diff --git a/curator-x-discovery-server/pom.xml b/curator-x-discovery-server/pom.xml
index 352310f..0bd9670 100644
--- a/curator-x-discovery-server/pom.xml
+++ b/curator-x-discovery-server/pom.xml
@@ -62,6 +62,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-server</artifactId>
             <scope>test</scope>
diff --git a/curator-x-discovery/pom.xml b/curator-x-discovery/pom.xml
index e211d89..6f380ae 100644
--- a/curator-x-discovery/pom.xml
+++ b/curator-x-discovery/pom.xml
@@ -60,5 +60,11 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
index 10ce305..290d9b1 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/ServiceCacheBuilder.java
@@ -18,6 +18,8 @@
  */
 package org.apache.curator.x.discovery;
 
+import org.apache.curator.utils.CloseableExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 public interface ServiceCacheBuilder<T>
@@ -38,10 +40,30 @@
     public ServiceCacheBuilder<T> name(String name);
 
     /**
-     * Optional thread factory to use for the cache's internal thread
+     * Optional thread factory to use for the cache's internal thread. The specified ExecutorService
+     * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
      *
      * @param threadFactory factory
      * @return this
      */
     public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory);
+
+    /**
+     * Optional ExecutorService to use for the cache's background thread. The specified ExecutorService
+     * will be wrapped in a CloseableExecutorService and overrides any prior ThreadFactory or ExecutorService
+     * set on the ServiceCacheBuilder.
+     *
+     * @param executorService executor service
+     * @return this
+     */
+    public ServiceCacheBuilder<T> executorService(ExecutorService executorService);
+
+    /**
+     * Optional CloseableExecutorService to use for the cache's background thread. The specified ExecutorService
+     * overrides any prior ThreadFactory or ExecutorService set on the ServiceCacheBuilder.
+     *
+     * @param executorService an instance of CloseableExecutorService
+     * @return this
+     */
+    public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService);
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
index c4104f4..8922233 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheBuilderImpl.java
@@ -18,8 +18,10 @@
  */
 package org.apache.curator.x.discovery.details;
 
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.x.discovery.ServiceCache;
 import org.apache.curator.x.discovery.ServiceCacheBuilder;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -30,6 +32,7 @@
     private ServiceDiscoveryImpl<T> discovery;
     private String name;
     private ThreadFactory threadFactory;
+    private CloseableExecutorService executorService;
 
     ServiceCacheBuilderImpl(ServiceDiscoveryImpl<T> discovery)
     {
@@ -44,7 +47,14 @@
     @Override
     public ServiceCache<T> build()
     {
-        return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+        if (executorService != null)
+        {
+            return new ServiceCacheImpl<T>(discovery, name, executorService);
+        }
+        else
+        {
+            return new ServiceCacheImpl<T>(discovery, name, threadFactory);
+        }
     }
 
     /**
@@ -70,6 +80,33 @@
     public ServiceCacheBuilder<T> threadFactory(ThreadFactory threadFactory)
     {
         this.threadFactory = threadFactory;
+        this.executorService = null;
+        return this;
+    }
+
+    /**
+     * Optional executor service to use for the cache's background thread
+     *
+     * @param executorService executor service
+     * @return this
+     */
+    @Override
+    public ServiceCacheBuilder<T> executorService(ExecutorService executorService) {
+        this.executorService = new CloseableExecutorService(executorService);
+        this.threadFactory = null;
+        return this;
+    }
+
+    /**
+     * Optional CloseableExecutorService to use for the cache's background thread
+     *
+     * @param executorService an instance of CloseableExecutorService
+     * @return this
+     */
+    @Override
+    public ServiceCacheBuilder<T> executorService(CloseableExecutorService executorService) {
+        this.executorService = executorService;
+        this.threadFactory = null;
         return this;
     }
 }
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
index 0269d24..b8f39d5 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceCacheImpl.java
@@ -22,6 +22,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.curator.utils.CloseableExecutorService;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.ListenerContainer;
@@ -36,6 +37,7 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -54,15 +56,26 @@
         STOPPED
     }
 
+    private static CloseableExecutorService convertThreadFactory(ThreadFactory threadFactory)
+    {
+        Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
+        return new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory));
+    }
+
     ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, ThreadFactory threadFactory)
     {
+        this(discovery, name, convertThreadFactory(threadFactory));
+    }
+
+    ServiceCacheImpl(ServiceDiscoveryImpl<T> discovery, String name, CloseableExecutorService executorService)
+    {
         Preconditions.checkNotNull(discovery, "discovery cannot be null");
         Preconditions.checkNotNull(name, "name cannot be null");
-        Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
+        Preconditions.checkNotNull(executorService, "executorService cannot be null");
 
         this.discovery = discovery;
 
-        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, threadFactory);
+        cache = new PathChildrenCache(discovery.getClient(), discovery.pathForName(name), true, false, executorService);
         cache.getListenable().addListener(this);
     }
 
diff --git a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
index 7b0bffe..21c9e07 100644
--- a/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
+++ b/curator-x-discovery/src/main/java/org/apache/curator/x/discovery/details/ServiceDiscoveryImpl.java
@@ -219,7 +219,7 @@
             try
             {
                 CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
-                client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, bytes);
+                client.create().creatingParentContainersIfNeeded().withMode(mode).forPath(path, bytes);
                 isDone = true;
             }
             catch ( KeeperException.NodeExistsException e )
@@ -404,7 +404,7 @@
             {
                 try
                 {
-                    client.create().creatingParentsIfNeeded().forPath(path);
+                    client.create().creatingParentContainersIfNeeded().forPath(path);
                 }
                 catch ( KeeperException.NodeExistsException ignore )
                 {
diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
index be114d4..fda5c26 100644
--- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
+++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceCache.java
@@ -16,17 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.curator.x.discovery;
 
 import com.google.common.collect.Lists;
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.test.Timing;
-import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.ExecuteCalledWatchingExecutorService;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,13 +36,14 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 public class TestServiceCache extends BaseClassForTests
 {
     @Test
-    public void     testInitialLoad() throws Exception
+    public void testInitialLoad() throws Exception
     {
         List<Closeable> closeables = Lists.newArrayList();
         try
@@ -50,15 +52,15 @@
             closeables.add(client);
             client.start();
 
-            ServiceDiscovery<String>    discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
             closeables.add(discovery);
             discovery.start();
 
-            ServiceCache<String>        cache = discovery.serviceCacheBuilder().name("test").build();
+            ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").build();
             closeables.add(cache);
 
-            final CountDownLatch        latch = new CountDownLatch(3);
-            ServiceCacheListener        listener = new ServiceCacheListener()
+            final CountDownLatch latch = new CountDownLatch(3);
+            ServiceCacheListener listener = new ServiceCacheListener()
             {
                 @Override
                 public void cacheChanged()
@@ -74,16 +76,16 @@
             cache.addListener(listener);
             cache.start();
 
-            ServiceInstance<String>     instance1 = ServiceInstance.<String>builder().payload("test").name("test").port(10064).build();
-            ServiceInstance<String>     instance2 = ServiceInstance.<String>builder().payload("test").name("test").port(10065).build();
-            ServiceInstance<String>     instance3 = ServiceInstance.<String>builder().payload("test").name("test").port(10066).build();
+            ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("test").name("test").port(10064).build();
+            ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("test").name("test").port(10065).build();
+            ServiceInstance<String> instance3 = ServiceInstance.<String>builder().payload("test").name("test").port(10066).build();
             discovery.registerService(instance1);
             discovery.registerService(instance2);
             discovery.registerService(instance3);
 
             Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
 
-            ServiceCache<String>        cache2 = discovery.serviceCacheBuilder().name("test").build();
+            ServiceCache<String> cache2 = discovery.serviceCacheBuilder().name("test").build();
             closeables.add(cache2);
             cache2.start();
 
@@ -100,7 +102,7 @@
     }
 
     @Test
-    public void     testViaProvider() throws Exception
+    public void testViaProvider() throws Exception
     {
         Timing timing = new Timing();
 
@@ -111,19 +113,19 @@
             closeables.add(client);
             client.start();
 
-            ServiceDiscovery<String>    discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
             closeables.add(discovery);
             discovery.start();
 
-            ServiceProvider<String>     serviceProvider = discovery.serviceProviderBuilder().serviceName("test").build();
+            ServiceProvider<String> serviceProvider = discovery.serviceProviderBuilder().serviceName("test").build();
             closeables.add(serviceProvider);
             serviceProvider.start();
 
-            ServiceInstance<String>     instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
             discovery.registerService(instance);
 
-            int                         count = 0;
-            ServiceInstance<String>     foundInstance = null;
+            int count = 0;
+            ServiceInstance<String> foundInstance = null;
             while ( foundInstance == null )
             {
                 Assert.assertTrue(count++ < 5);
@@ -132,7 +134,7 @@
             }
             Assert.assertEquals(foundInstance, instance);
 
-            ServiceInstance<String>     instance2 = ServiceInstance.<String>builder().address("foo").payload("thing").name("test").port(10064).build();
+            ServiceInstance<String> instance2 = ServiceInstance.<String>builder().address("foo").payload("thing").name("test").port(10064).build();
             discovery.registerService(instance2);
             timing.sleepABit();
             Collection<ServiceInstance<String>> allInstances = serviceProvider.getAllInstances();
@@ -149,24 +151,24 @@
     }
 
     @Test
-    public void     testUpdate() throws Exception
+    public void testUpdate() throws Exception
     {
-        List<Closeable>     closeables = Lists.newArrayList();
+        List<Closeable> closeables = Lists.newArrayList();
         try
         {
             CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
             closeables.add(client);
             client.start();
 
-            ServiceInstance<String>     instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceDiscovery<String>    discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
+            ServiceInstance<String> instance = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/test").client(client).thisInstance(instance).build();
             closeables.add(discovery);
             discovery.start();
 
             final CountDownLatch latch = new CountDownLatch(1);
-            ServiceCache<String>        cache = discovery.serviceCacheBuilder().name("test").build();
+            ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").build();
             closeables.add(cache);
-            ServiceCacheListener        listener = new ServiceCacheListener()
+            ServiceCacheListener listener = new ServiceCacheListener()
             {
                 @Override
                 public void cacheChanged()
@@ -210,7 +212,7 @@
             closeables.add(client);
             client.start();
 
-            ServiceDiscovery<String>    discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
             closeables.add(discovery);
             discovery.start();
 
@@ -219,7 +221,7 @@
             cache.start();
 
             final Semaphore semaphore = new Semaphore(0);
-            ServiceCacheListener    listener = new ServiceCacheListener()
+            ServiceCacheListener listener = new ServiceCacheListener()
             {
                 @Override
                 public void cacheChanged()
@@ -234,15 +236,15 @@
             };
             cache.addListener(listener);
 
-            ServiceInstance<String>     instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
-            ServiceInstance<String>     instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
+            ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            ServiceInstance<String> instance2 = ServiceInstance.<String>builder().payload("thing").name("test").port(10065).build();
             discovery.registerService(instance1);
             Assert.assertTrue(semaphore.tryAcquire(10, TimeUnit.SECONDS));
 
             discovery.registerService(instance2);
             Assert.assertTrue(semaphore.tryAcquire(3, TimeUnit.SECONDS));
 
-            ServiceInstance<String>     instance3 = ServiceInstance.<String>builder().payload("thing").name("another").port(10064).build();
+            ServiceInstance<String> instance3 = ServiceInstance.<String>builder().payload("thing").name("another").port(10064).build();
             discovery.registerService(instance3);
             Assert.assertFalse(semaphore.tryAcquire(3, TimeUnit.SECONDS));  // should not get called for a different service
         }
@@ -255,4 +257,57 @@
             }
         }
     }
+
+    @Test
+    public void testExecutorServiceIsInvoked() throws Exception
+    {
+        List<Closeable> closeables = Lists.newArrayList();
+        try
+        {
+            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+            closeables.add(client);
+            client.start();
+
+            ServiceDiscovery<String> discovery = ServiceDiscoveryBuilder.builder(String.class).basePath("/discovery").client(client).build();
+            closeables.add(discovery);
+            discovery.start();
+
+            ExecuteCalledWatchingExecutorService exec = new ExecuteCalledWatchingExecutorService(Executors.newSingleThreadExecutor());
+            Assert.assertFalse(exec.isExecuteCalled());
+
+            ServiceCache<String> cache = discovery.serviceCacheBuilder().name("test").executorService(exec).build();
+            closeables.add(cache);
+            cache.start();
+
+            final Semaphore semaphore = new Semaphore(0);
+            ServiceCacheListener listener = new ServiceCacheListener()
+            {
+                @Override
+                public void cacheChanged()
+                {
+                    semaphore.release();
+                }
+
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                }
+            };
+            cache.addListener(listener);
+
+            ServiceInstance<String> instance1 = ServiceInstance.<String>builder().payload("thing").name("test").port(10064).build();
+            discovery.registerService(instance1);
+            Assert.assertTrue(semaphore.tryAcquire(10, TimeUnit.SECONDS));
+
+            Assert.assertTrue(exec.isExecuteCalled());
+        }
+        finally
+        {
+            Collections.reverse(closeables);
+            for ( Closeable c : closeables )
+            {
+                CloseableUtils.closeQuietly(c);
+            }
+        }
+    }
 }
diff --git a/curator-x-rpc/pom.xml b/curator-x-rpc/pom.xml
index 6f56f6f..e6ed5e8 100644
--- a/curator-x-rpc/pom.xml
+++ b/curator-x-rpc/pom.xml
@@ -112,6 +112,12 @@
             </exclusions>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java
index 777472c..eb67341 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/services/CuratorProjectionService.java
@@ -125,6 +125,10 @@
             {
                 builder = castBuilder(builder, CreateBuilder.class).creatingParentsIfNeeded();
             }
+            if ( spec.creatingParentContainersIfNeeded )
+            {
+                builder = castBuilder(builder, CreateBuilder.class).creatingParentContainersIfNeeded();
+            }
             if ( spec.compressed )
             {
                 builder = castBuilder(builder, Compressible.class).compressed();
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/CreateSpec.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/CreateSpec.java
index d0232a9..a15fe92 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/CreateSpec.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/CreateSpec.java
@@ -45,11 +45,14 @@
     @ThriftField(7)
     public boolean withProtection;
 
+    @ThriftField(8)
+    public boolean creatingParentContainersIfNeeded;
+
     public CreateSpec()
     {
     }
 
-    public CreateSpec(String path, byte[] data, RpcCreateMode mode, String asyncContext, boolean compressed, boolean creatingParentsIfNeeded, boolean withProtection)
+    public CreateSpec(String path, byte[] data, RpcCreateMode mode, String asyncContext, boolean compressed, boolean creatingParentsIfNeeded, boolean withProtection, boolean creatingParentContainersIfNeeded)
     {
         this.path = path;
         this.data = data;
@@ -58,5 +61,6 @@
         this.compressed = compressed;
         this.creatingParentsIfNeeded = creatingParentsIfNeeded;
         this.withProtection = withProtection;
+        this.creatingParentContainersIfNeeded = creatingParentContainersIfNeeded;
     }
 }
diff --git a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/RpcCreateMode.java b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/RpcCreateMode.java
index d50bb74..020f283 100644
--- a/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/RpcCreateMode.java
+++ b/curator-x-rpc/src/main/java/org/apache/curator/x/rpc/idl/structs/RpcCreateMode.java
@@ -26,5 +26,6 @@
     PERSISTENT,
     PERSISTENT_SEQUENTIAL,
     EPHEMERAL,
-    EPHEMERAL_SEQUENTIAL
+    EPHEMERAL_SEQUENTIAL,
+    CONTAINER
 }
diff --git a/curator-x-rpc/src/main/thrift/curator.thrift b/curator-x-rpc/src/main/thrift/curator.thrift
index d6bcd94..41f2362 100644
--- a/curator-x-rpc/src/main/thrift/curator.thrift
+++ b/curator-x-rpc/src/main/thrift/curator.thrift
@@ -8,7 +8,7 @@
 }
 
 enum CreateMode {
-  PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL
+  PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL, CONTAINER
 }
 
 enum CuratorEventType {
@@ -175,6 +175,7 @@
   5: bool compressed;
   6: bool creatingParentsIfNeeded;
   7: bool withProtection;
+  8: bool creatingParentContainersIfNeeded;
 }
 
 struct DeleteSpec {
diff --git a/curator-x-rpc/src/site/confluence/reference.confluence b/curator-x-rpc/src/site/confluence/reference.confluence
index 68c3692..bb7ea46 100644
--- a/curator-x-rpc/src/site/confluence/reference.confluence
+++ b/curator-x-rpc/src/site/confluence/reference.confluence
@@ -57,6 +57,7 @@
 |compressed|bool|\-|if true, compress the data|
 |creatingParentsIfNeeded|bool|\-|if true, create any needed parent nodes|
 |withProtection|bool|\-|if true, use Curator protection|
+|creatingParentContainersIfNeeded|bool|\-|if true, create any needed parent nodes as CONTAINERs|
 
 h2. DeleteSpec
 
diff --git a/pom.xml b/pom.xml
index 0319c8d..8c984ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
         <surefire-forkcount>1</surefire-forkcount>
 
         <!-- versions -->
+        <zookeeper-version>3.5.0-alpha</zookeeper-version>
         <maven-project-info-reports-plugin-version>2.7</maven-project-info-reports-plugin-version>
         <maven-bundle-plugin-version>2.3.7</maven-bundle-plugin-version>
         <maven-javadoc-plugin-version>2.10.3</maven-javadoc-plugin-version>
@@ -74,7 +75,6 @@
         <jetty-version>6.1.26</jetty-version>
         <scannotation-version>1.0.2</scannotation-version>
         <resteasy-jaxrs-version>2.3.0.GA</resteasy-jaxrs-version>
-        <zookeeper-version>3.5.0-alpha</zookeeper-version>
         <guava-version>16.0.1</guava-version>
         <testng-version>6.8.8</testng-version>
         <swift-version>0.12.0</swift-version>
@@ -227,6 +227,18 @@
         </developer>
 
         <developer>
+            <id>mdrob</id>
+            <name>Mike Drob</name>
+            <email>mdrob@apache.org</email>
+            <timezone>-6</timezone>
+            <roles>
+                <role>Committer</role>
+                <role>PMC Member</role>
+            </roles>
+            <url>http://people.apache.org/~mdrob</url>
+        </developer>
+
+        <developer>
             <name>Patrick Hunt</name>
             <email>phunt1@gmail.com</email>
             <roles>
@@ -472,31 +484,6 @@
         </dependencies>
     </dependencyManagement>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-    </dependencies>
-
     <reporting>
         <plugins>
             <plugin>
diff --git a/src/site/confluence/utilities.confluence b/src/site/confluence/utilities.confluence
index 2ec5c6b..f0d927d 100644
--- a/src/site/confluence/utilities.confluence
+++ b/src/site/confluence/utilities.confluence
@@ -14,24 +14,6 @@
 * getSortedChildren: Return the children of the given path sorted by sequence number
 * makePath: Given a parent path and a child node, create a combined full path
 
-h2. EnsurePath
-Utility to ensure that a particular path is created.
-The first time it is used, a synchronized call to {{ZKPaths.mkdirs(ZooKeeper, String)}} is made to ensure that the entire path has been created (with an empty byte array if needed). Subsequent calls with the instance are un\-synchronized NOPs.
-
-Usage:
-{code}
-EnsurePath       ensurePath = new EnsurePath(aFullPathToEnsure);
-...
-String           nodePath = aFullPathToEnsure + "/foo";
-ensurePath.ensure(zk);   // first time syncs and creates if needed
-zk.create(nodePath, ...);
-...
-ensurePath.ensure(zk);   // subsequent times are NOPs
-zk.create(nodePath, ...);
-{code}
-
-*NOTE:* There's a method in the [[CuratorFramework class|curator-framework/index.html]] that returns an EnsurePath instance that is namespace aware.
-
 h2. BlockingQueueConsumer
 
 See: *[[DistributedQueue|curator-recipes/distributed-queue.html]]* and *[[DistributedPriorityQueue|curator-recipes/distributed-priority-queue.html]]*
@@ -44,20 +26,6 @@
 provides a facade over multiple distributed queues. It monitors the queues and if any one of them goes over a threshold, a new
 queue is added. Puts are distributed amongst the queues.
 
-h2. Reaper and ChildReaper
-
-_Reaper_
-
-A Utility to delete parent paths of locks, etc. Periodically checks paths added to the reaper. If at check time, there are no
-children, the path is deleted. Clients should create one Reaper instance per application. Add lock paths to the reaper as
-needed and the reaper will periodically delete them. Curator's lock recipes will correctly handle parents getting deleted.
-
-_ChildReaper_
-
-Utility to reap the empty child nodes in a parent node. Periodically calls getChildren() on the node and adds empty nodes to an internally managed Reaper.
-
-*NOTE:* You should consider using LeaderSelector to run the Reapers as they don't need to run in every client.
-
 h2. EnsembleTracker
 
 Utility to listen for ensemble/configuration changes via registered EnsembleListeners. Allocate a EnsembleTracker, add one or more listeners