Closes #637: Add a UUID to the server's ZooLock (#1866)

Modified ZooLock to create ephemeral nodes based on a supplied instance UUID parameter so that duplicate ephemeral nodes created at a given path by the same ZooLock instance could be cleaned up.
diff --git a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index 54b6eb3..be796f6 100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -21,8 +21,9 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
@@ -38,9 +39,25 @@
 import org.slf4j.LoggerFactory;
 
 public class ZooLock implements Watcher {
-  private static final Logger log = LoggerFactory.getLogger(ZooLock.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZooLock.class);
 
-  public static final String LOCK_PREFIX = "zlock-";
+  private static final String ZLOCK_PREFIX = "zlock#";
+
+  private static class Prefix {
+    private final String prefix;
+
+    public Prefix(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public String toString() {
+      return this.prefix;
+    }
+
+  }
+
+  private static ZooCache LOCK_DATA_ZOO_CACHE;
 
   public enum LockLossReason {
     LOCK_DELETED, SESSION_EXPIRED
@@ -55,48 +72,53 @@
     void unableToMonitorLockNode(Exception e);
   }
 
-  public interface AsyncLockWatcher extends LockWatcher {
+  public interface AccumuloLockWatcher extends LockWatcher {
     void acquiredLock();
 
     void failedToAcquireLock(Exception e);
   }
 
-  private boolean lockWasAcquired;
   private final String path;
+  private final Prefix vmLockPrefix;
   protected final ZooReaderWriter zooKeeper;
-  private String lock;
+
   private LockWatcher lockWatcher;
-  private boolean watchingParent = false;
-  private String asyncLock;
+  private String lockNodeName;
+  private volatile boolean lockWasAcquired;
+  private volatile boolean watchingParent = false;
 
-  public ZooLock(ZooReaderWriter zoo, String path) {
-    this(new ZooCache(zoo), zoo, path);
+  private String createdNodeName;
+  private String watchingNodeName;
+
+  public ZooLock(ZooReaderWriter zoo, String path, UUID uuid) {
+    this(new ZooCache(zoo), zoo, path, uuid);
   }
 
-  public ZooLock(String zookeepers, int timeInMillis, String secret, String path) {
+  public ZooLock(String zookeepers, int timeInMillis, String secret, String path, UUID uuid) {
     this(new ZooCacheFactory().getZooCache(zookeepers, timeInMillis),
-        new ZooReaderWriter(zookeepers, timeInMillis, secret), path);
+        new ZooReaderWriter(zookeepers, timeInMillis, secret), path, uuid);
   }
 
-  protected ZooLock(ZooCache zc, ZooReaderWriter zrw, String path) {
-    getLockDataZooCache = zc;
+  protected ZooLock(ZooCache zc, ZooReaderWriter zrw, String path, UUID uuid) {
+    LOCK_DATA_ZOO_CACHE = zc;
     this.path = path;
     zooKeeper = zrw;
     try {
       zooKeeper.getStatus(path, this);
       watchingParent = true;
+      this.vmLockPrefix = new Prefix(ZLOCK_PREFIX + uuid.toString() + "#");
     } catch (Exception ex) {
-      log.warn("Error getting setting initial watch on ZooLock", ex);
+      LOG.error("Error setting initial watch", ex);
       throw new RuntimeException(ex);
     }
   }
 
-  private static class TryLockAsyncLockWatcher implements AsyncLockWatcher {
+  private static class LockWatcherWrapper implements AccumuloLockWatcher {
 
     boolean acquiredLock = false;
     LockWatcher lw;
 
-    public TryLockAsyncLockWatcher(LockWatcher lw2) {
+    public LockWatcherWrapper(LockWatcher lw2) {
       this.lw = lw2;
     }
 
@@ -123,189 +145,366 @@
   public synchronized boolean tryLock(LockWatcher lw, byte[] data)
       throws KeeperException, InterruptedException {
 
-    TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw);
+    LockWatcherWrapper lww = new LockWatcherWrapper(lw);
 
-    lockAsync(tlalw, data);
+    lock(lww, data);
 
-    if (tlalw.acquiredLock) {
+    if (lww.acquiredLock) {
       return true;
     }
 
-    if (asyncLock != null) {
-      zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
-      asyncLock = null;
+    // If we didn't acquire the lock, then delete the path we just created
+    if (createdNodeName != null) {
+      String pathToDelete = path + "/" + createdNodeName;
+      LOG.debug("[{}] Failed to acquire lock in tryLock(), deleting all at path: {}", vmLockPrefix,
+          pathToDelete);
+      zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
+      createdNodeName = null;
     }
 
     return false;
   }
 
-  private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw)
-      throws KeeperException, InterruptedException {
-
-    if (asyncLock == null) {
-      throw new IllegalStateException("Called lockAsync() when asyncLock == null");
+  /**
+   * Sort list of ephemeral nodes by their sequence number. Any ephemeral nodes that are not of the
+   * correct form will sort last.
+   *
+   * @param children
+   *          list of ephemeral nodes
+   * @return list of ephemeral nodes that have valid formats, sorted by sequence number
+   */
+  public static List<String> validateAndSortChildrenByLockPrefix(String path,
+      List<String> children) {
+    LOG.debug("validating and sorting children at path {}", path);
+    List<String> validChildren = new ArrayList<>();
+    if (null == children || children.size() == 0) {
+      return validChildren;
     }
+    children.forEach(c -> {
+      LOG.trace("Validating {}", c);
+      if (c.startsWith(ZLOCK_PREFIX)) {
+        String candidate = c.substring(ZLOCK_PREFIX.length() + 1);
+        if (candidate.contains("#")) {
+          int idx = candidate.indexOf('#');
+          String uuid = candidate.substring(0, idx - 1);
+          String sequenceNum = candidate.substring(idx + 1);
+          try {
+            LOG.trace("Testing uuid format of {}", uuid);
+            UUID.fromString(uuid);
+            if (sequenceNum.length() == 10) {
+              try {
+                LOG.trace("Testing number format of {}", sequenceNum);
+                Integer.parseInt(sequenceNum);
+                validChildren.add(c);
+              } catch (NumberFormatException e) {
+                LOG.warn("Child found with invalid sequence format: {} (not a number)", c);
+              }
+            } else {
+              LOG.warn("Child found with invalid sequence format: {} (not 10 characters)", c);
+            }
+          } catch (IllegalArgumentException e) {
+            LOG.warn("Child found with invalid UUID format: {}", c);
+          }
+        } else {
+          LOG.warn("Child found with invalid format: {} (does not contain second '#')", c);
+        }
+      } else {
+        LOG.warn("Child found with invalid format: {} (does not start with {})", c, ZLOCK_PREFIX);
+      }
+    });
 
-    List<String> children = zooKeeper.getChildren(path);
+    if (validChildren.size() > 1) {
+      validChildren.sort(new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
 
-    if (!children.contains(myLock)) {
-      throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock);
+          // Lock should be of the form:
+          // zlock#UUID#sequenceNumber
+          // Example:
+          // zlock#44755fbe-1c9e-40b3-8458-03abaf950d7e#0000000000
+          int secondHashIdx = 43;
+          return Integer.valueOf(o1.substring(secondHashIdx))
+              .compareTo(Integer.valueOf(o2.substring(secondHashIdx)));
+        }
+      });
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Children nodes: {}", validChildren.size());
+      validChildren.forEach(c -> LOG.debug("- {}", c));
+    }
+    return validChildren;
+  }
 
-    Collections.sort(children);
-    if (log.isTraceEnabled()) {
-      log.trace("Candidate lock nodes");
-      for (String child : children) {
-        log.trace("- {}", child);
+  /**
+   * Given a pre-sorted set of children ephemeral nodes where the node name is of the form
+   * "zlock#UUID#sequenceNumber", find the ephemeral node that sorts before the ephemeralNode
+   * parameter with the lowest sequence number
+   *
+   * @param children
+   *          list of sequential ephemera nodes, already sorted
+   * @param ephemeralNode
+   *          starting node for the search
+   * @return next lowest prefix with the lowest sequence number
+   */
+  public static String findLowestPrevPrefix(final List<String> children,
+      final String ephemeralNode) {
+    int idx = children.indexOf(ephemeralNode);
+    // Get the prefix from the prior ephemeral node
+    String prev = children.get(idx - 1);
+    int prefixIdx = prev.lastIndexOf('#');
+    String prevPrefix = prev.substring(0, prefixIdx);
+
+    // Find the lowest sequential ephemeral node with prevPrefix
+    int i = 2;
+    String lowestPrevNode = prev;
+    while ((idx - i) >= 0) {
+      prev = children.get(idx - i);
+      i++;
+      if (prev.startsWith(prevPrefix)) {
+        lowestPrevNode = prev;
+      } else {
+        break;
       }
     }
+    return lowestPrevNode;
+  }
 
-    if (children.get(0).equals(myLock)) {
-      log.trace("First candidate is my lock, acquiring");
+  private synchronized void determineLockOwnership(final String createdEphemeralNode,
+      final AccumuloLockWatcher lw) throws KeeperException, InterruptedException {
+
+    if (createdNodeName == null) {
+      throw new IllegalStateException(
+          "Called determineLockOwnership() when ephemeralNodeName == null");
+    }
+
+    List<String> children = validateAndSortChildrenByLockPrefix(path, zooKeeper.getChildren(path));
+
+    if (null == children || !children.contains(createdEphemeralNode)) {
+      LOG.error("Expected ephemeral node {} to be in the list of children {}", createdEphemeralNode,
+          children);
+      throw new RuntimeException(
+          "Lock attempt ephemeral node no longer exist " + createdEphemeralNode);
+    }
+
+    if (children.get(0).equals(createdEphemeralNode)) {
+      LOG.debug("[{}] First candidate is my lock, acquiring...", vmLockPrefix);
       if (!watchingParent) {
         throw new IllegalStateException(
             "Can not acquire lock, no longer watching parent : " + path);
       }
       this.lockWatcher = lw;
-      this.lock = myLock;
-      asyncLock = null;
+      this.lockNodeName = createdEphemeralNode;
+      createdNodeName = null;
       lockWasAcquired = true;
       lw.acquiredLock();
-      return;
-    }
-    String prev = null;
-    for (String child : children) {
-      if (child.equals(myLock)) {
-        break;
-      }
+    } else {
+      LOG.debug("[{}] Lock held by another process with ephemeral node: {}", vmLockPrefix,
+          children.get(0));
 
-      prev = child;
-    }
+      String lowestPrevNode = findLowestPrevPrefix(children, createdEphemeralNode);
 
-    final String lockToWatch = path + "/" + prev;
-    log.trace("Establishing watch on {}", lockToWatch);
-    Stat stat = zooKeeper.getStatus(lockToWatch, new Watcher() {
+      watchingNodeName = path + "/" + lowestPrevNode;
+      final String nodeToWatch = watchingNodeName;
+      LOG.debug("[{}] Establishing watch on prior node {}", vmLockPrefix, nodeToWatch);
+      Watcher priorNodeWatcher = new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("[{}] Processing event:", vmLockPrefix);
+            LOG.trace("- type  {}", event.getType());
+            LOG.trace("- path  {}", event.getPath());
+            LOG.trace("- state {}", event.getState());
+          }
+          boolean renew = true;
+          if (event.getType() == EventType.NodeDeleted && event.getPath().equals(nodeToWatch)) {
+            LOG.debug("[{}] Detected deletion of prior node {}, attempting to acquire lock",
+                vmLockPrefix, nodeToWatch);
+            synchronized (ZooLock.this) {
+              try {
+                if (createdNodeName != null) {
+                  determineLockOwnership(createdEphemeralNode, lw);
+                } else if (LOG.isDebugEnabled()) {
+                  LOG.debug("[{}] While waiting for another lock {}, {} was deleted", vmLockPrefix,
+                      nodeToWatch, createdEphemeralNode);
+                }
+              } catch (Exception e) {
+                if (lockNodeName == null) {
+                  // have not acquired lock yet
+                  lw.failedToAcquireLock(e);
+                }
+              }
+            }
+            renew = false;
+          }
 
-      @Override
-      public void process(WatchedEvent event) {
-        if (log.isTraceEnabled()) {
-          log.trace("Processing event:");
-          log.trace("- type  {}", event.getType());
-          log.trace("- path  {}", event.getPath());
-          log.trace("- state {}", event.getState());
-        }
-        boolean renew = true;
-        if (event.getType() == EventType.NodeDeleted && event.getPath().equals(lockToWatch)) {
-          log.trace("Detected deletion of {}, attempting to acquire lock", lockToWatch);
-          synchronized (ZooLock.this) {
+          if (event.getState() == KeeperState.Expired
+              || event.getState() == KeeperState.Disconnected) {
+            synchronized (ZooLock.this) {
+              if (lockNodeName == null) {
+                LOG.info("Zookeeper Session expired / disconnected");
+                lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
+              }
+            }
+            renew = false;
+          }
+          if (renew) {
+            LOG.debug("[{}] Renewing watch on prior node  {}", vmLockPrefix, nodeToWatch);
             try {
-              if (asyncLock != null) {
-                lockAsync(myLock, lw);
-              } else if (log.isTraceEnabled()) {
-                log.trace("While waiting for another lock {} {} was deleted", lockToWatch, myLock);
+              Stat restat = zooKeeper.getStatus(nodeToWatch, this);
+              if (restat == null) {
+                // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+                // created a Watcher on a node that does not exist. Delete the watcher we just
+                // created.
+                zooKeeper.getZooKeeper().removeWatches(nodeToWatch, this, WatcherType.Any, true);
+                determineLockOwnership(createdEphemeralNode, lw);
               }
-            } catch (Exception e) {
-              if (lock == null) {
-                // have not acquired lock yet
-                lw.failedToAcquireLock(e);
-              }
+            } catch (KeeperException | InterruptedException e) {
+              lw.failedToAcquireLock(new Exception("Failed to renew watch on other master node"));
             }
           }
-          renew = false;
         }
 
-        if (event.getState() == KeeperState.Expired
-            || event.getState() == KeeperState.Disconnected) {
-          synchronized (ZooLock.this) {
-            if (lock == null) {
-              lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
-            }
-          }
-          renew = false;
-        }
-        if (renew) {
-          log.trace("Renewing watch on {}", lockToWatch);
-          try {
-            Stat restat = zooKeeper.getStatus(lockToWatch, this);
-            if (restat == null) {
-              lockAsync(myLock, lw);
-            }
-          } catch (KeeperException | InterruptedException e) {
-            lw.failedToAcquireLock(new Exception("Failed to renew watch on other master node"));
-          }
-        }
+      };
+
+      Stat stat = zooKeeper.getStatus(nodeToWatch, priorNodeWatcher);
+      if (stat == null) {
+        // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+        // created a Watcher on a node that does not exist. Delete the watcher we just created.
+        zooKeeper.getZooKeeper().removeWatches(nodeToWatch, priorNodeWatcher, WatcherType.Any,
+            true);
+        determineLockOwnership(createdEphemeralNode, lw);
       }
+    }
 
-    });
-
-    if (stat == null)
-      lockAsync(myLock, lw);
   }
 
   private void lostLock(LockLossReason reason) {
     LockWatcher localLw = lockWatcher;
-    lock = null;
+    lockNodeName = null;
     lockWatcher = null;
 
     localLw.lostLock(reason);
   }
 
-  public synchronized void lockAsync(final AsyncLockWatcher lw, byte[] data) {
+  public synchronized void lock(final AccumuloLockWatcher lw, byte[] data) {
 
-    if (lockWatcher != null || lock != null || asyncLock != null) {
+    if (lockWatcher != null || lockNodeName != null || createdNodeName != null) {
       throw new IllegalStateException();
     }
 
     lockWasAcquired = false;
 
     try {
-      final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
-      log.trace("Ephemeral node {} created", asyncLockPath);
-      Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+      final String lockPathPrefix = path + "/" + vmLockPrefix.toString();
+      // Implement recipe at https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Locks
+      // except that instead of the ephemeral lock node being of the form guid-lock- use lock-guid-.
+      // Another deviation from the recipe is that we cleanup any extraneous ephemeral nodes that
+      // were created.
+      final String createPath = zooKeeper.putEphemeralSequential(lockPathPrefix, data);
+      LOG.debug("[{}] Ephemeral node {} created", vmLockPrefix, createPath);
+
+      // It's possible that the call above was retried several times and multiple ephemeral nodes
+      // were created but the client missed the response for some reason. Find the ephemeral nodes
+      // with this ZLOCK_UUID and lowest sequential number.
+      List<String> children =
+          validateAndSortChildrenByLockPrefix(path, zooKeeper.getChildren(path));
+      if (null == children || !children.contains(createPath.substring(path.length() + 1))) {
+        LOG.error("Expected ephemeral node {} to be in the list of children {}", createPath,
+            children);
+        throw new RuntimeException("Lock attempt ephemeral node no longer exist " + createPath);
+      }
+
+      String lowestSequentialPath = null;
+      boolean msgLoggedOnce = false;
+      for (String child : children) {
+        if (child.startsWith(vmLockPrefix.toString())) {
+          if (null == lowestSequentialPath) {
+            if (createPath.equals(path + "/" + child)) {
+              // the path returned from create is the lowest sequential one
+              lowestSequentialPath = createPath;
+              break;
+            }
+            lowestSequentialPath = path + "/" + child;
+            LOG.debug("[{}] lowest sequential node found: {}", vmLockPrefix, lowestSequentialPath);
+          } else {
+            if (!msgLoggedOnce) {
+              LOG.info(
+                  "[{}] Zookeeper client missed server response, multiple ephemeral child nodes created at {}",
+                  vmLockPrefix, lockPathPrefix);
+              msgLoggedOnce = true;
+            }
+            LOG.debug("[{}] higher sequential node found: {}, deleting it", vmLockPrefix, child);
+            zooKeeper.delete(path + "/" + child);
+          }
+        }
+      }
+      final String pathForWatcher = lowestSequentialPath;
+
+      // Set a watcher on the lowest sequential node that we created, this handles the case
+      // where the node we created is deleted or if this client becomes disconnected.
+      LOG.debug("[{}] Setting watcher on {}", vmLockPrefix, pathForWatcher);
+      Watcher watcherForNodeWeCreated = new Watcher() {
 
         private void failedToAcquireLock() {
+          LOG.debug("[{}] Lock deleted before acquired, setting createdNodeName {} to null",
+              vmLockPrefix, createdNodeName);
           lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
-          asyncLock = null;
+          createdNodeName = null;
         }
 
         @Override
         public void process(WatchedEvent event) {
           synchronized (ZooLock.this) {
-            if (lock != null && event.getType() == EventType.NodeDeleted
-                && event.getPath().equals(path + "/" + lock)) {
+            if (lockNodeName != null && event.getType() == EventType.NodeDeleted
+                && event.getPath().equals(path + "/" + lockNodeName)) {
+              LOG.debug("[{}] {} was deleted", vmLockPrefix, lockNodeName);
               lostLock(LockLossReason.LOCK_DELETED);
-            } else if (asyncLock != null && event.getType() == EventType.NodeDeleted
-                && event.getPath().equals(path + "/" + asyncLock)) {
+            } else if (createdNodeName != null && event.getType() == EventType.NodeDeleted
+                && event.getPath().equals(path + "/" + createdNodeName)) {
+              LOG.debug("[{}] {} was deleted", vmLockPrefix, createdNodeName);
               failedToAcquireLock();
             } else if (event.getState() != KeeperState.Disconnected
-                && event.getState() != KeeperState.Expired && (lock != null || asyncLock != null)) {
-              log.debug("Unexpected event watching lock node {} {}", event, asyncLockPath);
+                && event.getState() != KeeperState.Expired
+                && (lockNodeName != null || createdNodeName != null)) {
+              LOG.debug("Unexpected event watching lock node {} {}", event, pathForWatcher);
               try {
-                Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
+                Stat stat2 = zooKeeper.getStatus(pathForWatcher, this);
                 if (stat2 == null) {
-                  if (lock != null)
+                  // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+                  // created a Watcher on a node that does not exist. Delete the watcher we just
+                  // created.
+                  zooKeeper.getZooKeeper().removeWatches(pathForWatcher, this, WatcherType.Any,
+                      true);
+
+                  if (lockNodeName != null)
                     lostLock(LockLossReason.LOCK_DELETED);
-                  else if (asyncLock != null)
+                  else if (createdNodeName != null)
                     failedToAcquireLock();
                 }
               } catch (Exception e) {
                 lockWatcher.unableToMonitorLockNode(e);
-                log.error("Failed to stat lock node " + asyncLockPath, e);
+                LOG.error("Failed to stat lock node: {} ", pathForWatcher, e);
               }
             }
 
           }
         }
-      });
+      };
 
+      Stat stat = zooKeeper.getStatus(pathForWatcher, watcherForNodeWeCreated);
       if (stat == null) {
+        // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+        // created a Watcher on a node that does not exist. Delete the watcher we just created.
+        zooKeeper.getZooKeeper().removeWatches(pathForWatcher, watcherForNodeWeCreated,
+            WatcherType.Any, true);
         lw.failedToAcquireLock(new Exception("Lock does not exist after create"));
         return;
       }
 
-      asyncLock = asyncLockPath.substring(path.length() + 1);
+      createdNodeName = pathForWatcher.substring(path.length() + 1);
 
-      lockAsync(asyncLock, lw);
+      // We have created a node, do we own the lock?
+      determineLockOwnership(createdNodeName, lw);
 
     } catch (KeeperException | InterruptedException e) {
       lw.failedToAcquireLock(e);
@@ -316,12 +515,15 @@
       throws InterruptedException, KeeperException {
     boolean del = false;
 
-    if (asyncLock != null) {
-      zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
+    if (createdNodeName != null) {
+      String pathToDelete = path + "/" + createdNodeName;
+      LOG.debug("[{}] Deleting all at path {} due to lock cancellation", vmLockPrefix,
+          pathToDelete);
+      zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
       del = true;
     }
 
-    if (lock != null) {
+    if (lockNodeName != null) {
       unlock();
       del = true;
     }
@@ -330,37 +532,46 @@
   }
 
   public synchronized void unlock() throws InterruptedException, KeeperException {
-    if (lock == null) {
+    if (lockNodeName == null) {
       throw new IllegalStateException();
     }
 
     LockWatcher localLw = lockWatcher;
-    String localLock = lock;
+    String localLock = lockNodeName;
 
-    lock = null;
+    lockNodeName = null;
     lockWatcher = null;
 
-    zooKeeper.recursiveDelete(path + "/" + localLock, NodeMissingPolicy.SKIP);
+    final String pathToDelete = path + "/" + localLock;
+    LOG.debug("[{}] Deleting all at path {} due to unlock", vmLockPrefix, pathToDelete);
+    zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
 
     localLw.lostLock(LockLossReason.LOCK_DELETED);
   }
 
+  /**
+   * @return path of node that this lock is watching
+   */
+  public synchronized String getWatching() {
+    return watchingNodeName;
+  }
+
   public synchronized String getLockPath() {
-    if (lock == null) {
+    if (lockNodeName == null) {
       return null;
     }
-    return path + "/" + lock;
+    return path + "/" + lockNodeName;
   }
 
   public synchronized String getLockName() {
-    return lock;
+    return lockNodeName;
   }
 
   public synchronized LockID getLockID() {
-    if (lock == null) {
+    if (lockNodeName == null) {
       throw new IllegalStateException("Lock not held");
     }
-    return new LockID(path, lock, zooKeeper.getZooKeeper().getSessionId());
+    return new LockID(path, lockNodeName, zooKeeper.getZooKeeper().getSessionId());
   }
 
   /**
@@ -374,7 +585,7 @@
   }
 
   public synchronized boolean isLocked() {
-    return lock != null;
+    return lockNodeName != null;
   }
 
   public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException {
@@ -384,11 +595,13 @@
 
   @Override
   public synchronized void process(WatchedEvent event) {
-    log.debug("event {} {} {}", event.getPath(), event.getType(), event.getState());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("event {} {} {}", event.getPath(), event.getType(), event.getState());
+    }
 
     watchingParent = false;
 
-    if (event.getState() == KeeperState.Expired && lock != null) {
+    if (event.getState() == KeeperState.Expired && lockNodeName != null) {
       lostLock(LockLossReason.SESSION_EXPIRED);
     } else {
 
@@ -397,12 +610,12 @@
         watchingParent = true;
       } catch (KeeperException.ConnectionLossException ex) {
         // we can't look at the lock because we aren't connected, but our session is still good
-        log.warn("lost connection to zookeeper");
+        LOG.warn("lost connection to zookeeper", ex);
       } catch (Exception ex) {
-        if (lock != null || asyncLock != null) {
+        if (lockNodeName != null || createdNodeName != null) {
           lockWatcher.unableToMonitorLockNode(ex);
-          log.error("Error resetting watch on ZooLock {} {}", lock != null ? lock : asyncLock,
-              event, ex);
+          LOG.error("Error resetting watch on ZooLock {} {}",
+              lockNodeName != null ? lockNodeName : createdNodeName, event, ex);
         }
       }
 
@@ -412,15 +625,12 @@
 
   public static boolean isLockHeld(ZooCache zc, LockID lid) {
 
-    List<String> children = zc.getChildren(lid.path);
+    List<String> children = validateAndSortChildrenByLockPrefix(lid.path, zc.getChildren(lid.path));
 
     if (children == null || children.isEmpty()) {
       return false;
     }
 
-    children = new ArrayList<>(children);
-    Collections.sort(children);
-
     String lockNode = children.get(0);
     if (!lid.node.equals(lockNode))
       return false;
@@ -431,14 +641,13 @@
 
   public static byte[] getLockData(ZooKeeper zk, String path)
       throws KeeperException, InterruptedException {
-    List<String> children = zk.getChildren(path, false);
+
+    List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path, false));
 
     if (children == null || children.isEmpty()) {
       return null;
     }
 
-    Collections.sort(children);
-
     String lockNode = children.get(0);
 
     return zk.getData(path + "/" + lockNode, false, null);
@@ -447,18 +656,15 @@
   public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path,
       ZcStat stat) {
 
-    List<String> children = zc.getChildren(path);
+    List<String> children = validateAndSortChildrenByLockPrefix(path, zc.getChildren(path));
 
     if (children == null || children.isEmpty()) {
       return null;
     }
 
-    children = new ArrayList<>(children);
-    Collections.sort(children);
-
     String lockNode = children.get(0);
 
-    if (!lockNode.startsWith(LOCK_PREFIX)) {
+    if (!lockNode.startsWith(ZLOCK_PREFIX)) {
       throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
     }
 
@@ -466,15 +672,13 @@
   }
 
   public static long getSessionId(ZooCache zc, String path) {
-    List<String> children = zc.getChildren(path);
+
+    List<String> children = validateAndSortChildrenByLockPrefix(path, zc.getChildren(path));
 
     if (children == null || children.isEmpty()) {
       return 0;
     }
 
-    children = new ArrayList<>(children);
-    Collections.sort(children);
-
     String lockNode = children.get(0);
 
     ZcStat stat = new ZcStat();
@@ -483,56 +687,52 @@
     return 0;
   }
 
-  private static ZooCache getLockDataZooCache;
-
   public long getSessionId() throws KeeperException, InterruptedException {
-    return getSessionId(getLockDataZooCache, path);
+    return getSessionId(LOCK_DATA_ZOO_CACHE, path);
   }
 
   public static void deleteLock(ZooReaderWriter zk, String path)
       throws InterruptedException, KeeperException {
-    List<String> children;
 
-    children = zk.getChildren(path);
+    List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path));
 
     if (children == null || children.isEmpty()) {
       throw new IllegalStateException("No lock is held at " + path);
     }
 
-    Collections.sort(children);
-
     String lockNode = children.get(0);
 
-    if (!lockNode.startsWith(LOCK_PREFIX)) {
+    if (!lockNode.startsWith(ZLOCK_PREFIX)) {
       throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
     }
 
-    zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.SKIP);
+    String pathToDelete = path + "/" + lockNode;
+    LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
+    zk.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
 
   }
 
   public static boolean deleteLock(ZooReaderWriter zk, String path, String lockData)
       throws InterruptedException, KeeperException {
-    List<String> children;
 
-    children = zk.getChildren(path);
+    List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path));
 
     if (children == null || children.isEmpty()) {
       throw new IllegalStateException("No lock is held at " + path);
     }
 
-    Collections.sort(children);
-
     String lockNode = children.get(0);
 
-    if (!lockNode.startsWith(LOCK_PREFIX)) {
+    if (!lockNode.startsWith(ZLOCK_PREFIX)) {
       throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
     }
 
     byte[] data = zk.getData(path + "/" + lockNode);
 
     if (lockData.equals(new String(data, UTF_8))) {
-      zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.FAIL);
+      String pathToDelete = path + "/" + lockNode;
+      LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
+      zk.recursiveDelete(pathToDelete, NodeMissingPolicy.FAIL);
       return true;
     }
 
diff --git a/core/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java b/core/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
new file mode 100644
index 0000000..8d46c09
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+public class ZooLockTest {
+
+  @Test
+  public void testSortAndFindLowestPrevPrefix() throws Exception {
+    List<String> children = new ArrayList<>();
+    children.add("zlock#00000000-0000-0000-0000-ffffffffffff#0000000007");
+    children.add("zlock#00000000-0000-0000-0000-eeeeeeeeeeee#0000000010");
+    children.add("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000006");
+    children.add("zlock#00000000-0000-0000-0000-dddddddddddd#0000000008");
+    children.add("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000004");
+    children.add("zlock-123456789");
+    children.add("zlock#00000000-0000-0000-0000-cccccccccccc#0000000003");
+    children.add("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000002");
+    children.add("zlock#987654321");
+    children.add("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001");
+
+    final List<String> validChildren = ZooLock.validateAndSortChildrenByLockPrefix("", children);
+
+    assertEquals(8, validChildren.size());
+    assertEquals("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001", validChildren.get(0));
+    assertEquals("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000002", validChildren.get(1));
+    assertEquals("zlock#00000000-0000-0000-0000-cccccccccccc#0000000003", validChildren.get(2));
+    assertEquals("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000004", validChildren.get(3));
+    assertEquals("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000006", validChildren.get(4));
+    assertEquals("zlock#00000000-0000-0000-0000-ffffffffffff#0000000007", validChildren.get(5));
+    assertEquals("zlock#00000000-0000-0000-0000-dddddddddddd#0000000008", validChildren.get(6));
+    assertEquals("zlock#00000000-0000-0000-0000-eeeeeeeeeeee#0000000010", validChildren.get(7));
+
+    assertEquals("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000004",
+        ZooLock.findLowestPrevPrefix(validChildren,
+            "zlock#00000000-0000-0000-0000-ffffffffffff#0000000007"));
+
+    assertEquals("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001",
+        ZooLock.findLowestPrevPrefix(validChildren,
+            "zlock#00000000-0000-0000-0000-cccccccccccc#0000000003"));
+
+    assertEquals("zlock#00000000-0000-0000-0000-dddddddddddd#0000000008",
+        ZooLock.findLowestPrevPrefix(validChildren,
+            "zlock#00000000-0000-0000-0000-eeeeeeeeeeee#0000000010"));
+
+    assertThrows(IndexOutOfBoundsException.class, () -> {
+      ZooLock.findLowestPrevPrefix(validChildren,
+          "zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001");
+    });
+
+    assertThrows(IndexOutOfBoundsException.class, () -> {
+      ZooLock.findLowestPrevPrefix(validChildren,
+          "zlock#00000000-0000-0000-0000-XXXXXXXXXXXX#0000000099");
+    });
+  }
+
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
index b6d55b6..7574bb4 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
@@ -56,9 +56,11 @@
     final long session = 123456789L;
 
     String serverPath = root + "/" + server;
-    EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList("child"));
-    EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(ZcStat.class)))
-        .andAnswer(() -> {
+    String validZLockEphemeralNode = "zlock#" + UUID.randomUUID().toString() + "#0000000000";
+    EasyMock.expect(zc.getChildren(serverPath))
+        .andReturn(Collections.singletonList(validZLockEphemeralNode));
+    EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/" + validZLockEphemeralNode),
+        EasyMock.anyObject(ZcStat.class))).andAnswer(() -> {
           ZcStat stat = (ZcStat) EasyMock.getCurrentArguments()[1];
           stat.setEphemeralOwner(session);
           return new byte[0];
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index b7a8fc8..954c1f2 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -33,6 +33,7 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
@@ -632,8 +633,9 @@
       }
     };
 
+    UUID zooLockUUID = UUID.randomUUID();
     while (true) {
-      lock = new ZooLock(getContext().getZooReaderWriter(), path);
+      lock = new ZooLock(getContext().getZooReaderWriter(), path, zooLockUUID);
       if (lock.tryLock(lockWatcher,
           new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) {
         log.debug("Got GC ZooKeeper lock");
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/Master.java b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
index cf2ec31..c36e42e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
@@ -35,6 +35,7 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -1343,7 +1344,7 @@
     return masterLock;
   }
 
-  private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
+  private static class MasterLockWatcher implements ZooLock.AccumuloLockWatcher {
 
     boolean acquiredLock = false;
     boolean failedToAcquireLock = false;
@@ -1407,11 +1408,12 @@
     final String masterClientAddress =
         getHostname() + ":" + getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0];
 
+    UUID zooLockUUID = UUID.randomUUID();
     while (true) {
 
       MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
-      masterLock = new ZooLock(context.getZooReaderWriter(), zMasterLoc);
-      masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
+      masterLock = new ZooLock(context.getZooReaderWriter(), zMasterLoc, zooLockUUID);
+      masterLock.lock(masterLockWatcher, masterClientAddress.getBytes());
 
       masterLockWatcher.waitForChange();
 
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index df60ab1..51aecc1 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -35,6 +35,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -627,10 +628,11 @@
     }
 
     // Get a ZooLock for the monitor
+    UUID zooLockUUID = UUID.randomUUID();
     while (true) {
       MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
-      monitorLock = new ZooLock(zoo, monitorLockPath);
-      monitorLock.lockAsync(monitorLockWatcher, new byte[0]);
+      monitorLock = new ZooLock(zoo, monitorLockPath, zooLockUUID);
+      monitorLock.lock(monitorLockWatcher, new byte[0]);
 
       monitorLockWatcher.waitForChange();
 
@@ -655,7 +657,7 @@
   /**
    * Async Watcher for monitor lock
    */
-  private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher {
+  private static class MoniterLockWatcher implements ZooLock.AccumuloLockWatcher {
 
     boolean acquiredLock = false;
     boolean failedToAcquireLock = false;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bd0ec23..70cad33 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -43,6 +43,7 @@
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -639,7 +640,7 @@
         throw e;
       }
 
-      tabletServerLock = new ZooLock(zoo, zPath);
+      tabletServerLock = new ZooLock(zoo, zPath, UUID.randomUUID());
 
       LockWatcher lw = new LockWatcher() {
 
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java
index d3cb5e3..6629a0f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java
@@ -25,25 +25,33 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.accumulo.fate.zookeeper.ZooLock;
-import org.apache.accumulo.fate.zookeeper.ZooLock.AsyncLockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooLock.AccumuloLockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZooLockIT extends SharedMiniClusterBase {
 
@@ -57,6 +65,56 @@
     SharedMiniClusterBase.stopMiniCluster();
   }
 
+  static class ZooKeeperWrapper extends ZooKeeper {
+
+    public ZooKeeperWrapper(String connectString, int sessionTimeout, Watcher watcher)
+        throws IOException {
+      super(connectString, sessionTimeout, watcher);
+    }
+
+    public String createOnce(String path, byte[] data, List<ACL> acl, CreateMode createMode)
+        throws KeeperException, InterruptedException {
+      return super.create(path, data, acl, createMode);
+    }
+
+    @Override
+    public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
+        throws KeeperException, InterruptedException {
+      // Let's simulate that the first call succeeded but the client didn't get the message,
+      // so the ZooKeeper client retries.
+      super.create(path, data, acl, createMode);
+      return super.create(path, data, acl, createMode);
+    }
+
+  }
+
+  static class RetryLockWatcher implements AccumuloLockWatcher {
+
+    private boolean lockHeld = false;
+
+    @Override
+    public void lostLock(LockLossReason reason) {
+      this.lockHeld = false;
+    }
+
+    @Override
+    public void unableToMonitorLockNode(final Exception e) {}
+
+    @Override
+    public void acquiredLock() {
+      this.lockHeld = true;
+    }
+
+    @Override
+    public void failedToAcquireLock(Exception e) {
+      this.lockHeld = false;
+    }
+
+    public boolean isLockHeld() {
+      return this.lockHeld;
+    }
+  }
+
   static class ConnectedWatcher implements Watcher {
     volatile boolean connected = false;
 
@@ -70,7 +128,7 @@
     }
   }
 
-  static class TestALW implements AsyncLockWatcher {
+  static class TestALW implements AccumuloLockWatcher {
 
     LockLossReason reason = null;
     boolean locked = false;
@@ -115,9 +173,10 @@
 
   @Test(timeout = 10000)
   public void testDeleteParent() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestDeleteParent-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
 
-    ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     assertFalse(zl.isLocked());
 
@@ -132,7 +191,7 @@
 
     TestALW lw = new TestALW();
 
-    zl.lockAsync(lw, "test1".getBytes(UTF_8));
+    zl.lock(lw, "test1".getBytes(UTF_8));
 
     lw.waitForChanges(1);
 
@@ -146,15 +205,16 @@
 
   @Test(timeout = 10000)
   public void testNoParent() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestNoParent-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
 
-    ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     assertFalse(zl.isLocked());
 
     TestALW lw = new TestALW();
 
-    zl.lockAsync(lw, "test1".getBytes(UTF_8));
+    zl.lock(lw, "test1".getBytes(UTF_8));
 
     lw.waitForChanges(1);
 
@@ -166,18 +226,19 @@
 
   @Test(timeout = 10000)
   public void testDeleteLock() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestDeleteLock-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
 
     ZooReaderWriter zk = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret");
     zk.mkdirs(parent);
 
-    ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     assertFalse(zl.isLocked());
 
     TestALW lw = new TestALW();
 
-    zl.lockAsync(lw, "test1".getBytes(UTF_8));
+    zl.lock(lw, "test1".getBytes(UTF_8));
 
     lw.waitForChanges(1);
 
@@ -195,20 +256,21 @@
 
   }
 
-  @Test(timeout = 10000)
+  @Test(timeout = 15000)
   public void testDeleteWaiting() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestDeleteWaiting-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
 
     ZooReaderWriter zk = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret");
     zk.mkdirs(parent);
 
-    ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     assertFalse(zl.isLocked());
 
     TestALW lw = new TestALW();
 
-    zl.lockAsync(lw, "test1".getBytes(UTF_8));
+    zl.lock(lw, "test1".getBytes(UTF_8));
 
     lw.waitForChanges(1);
 
@@ -217,23 +279,25 @@
     assertNull(lw.exception);
     assertNull(lw.reason);
 
-    ZooLock zl2 = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl2 =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     TestALW lw2 = new TestALW();
 
-    zl2.lockAsync(lw2, "test2".getBytes(UTF_8));
+    zl2.lock(lw2, "test2".getBytes(UTF_8));
 
     assertFalse(lw2.locked);
     assertFalse(zl2.isLocked());
 
-    ZooLock zl3 = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl3 =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     TestALW lw3 = new TestALW();
 
-    zl3.lockAsync(lw3, "test3".getBytes(UTF_8));
+    zl3.lock(lw3, "test3".getBytes(UTF_8));
 
-    List<String> children = zk.getChildren(parent);
-    Collections.sort(children);
+    List<String> children =
+        ZooLock.validateAndSortChildrenByLockPrefix(parent, zk.getChildren(parent));
 
     zk.delete(parent + "/" + children.get(1));
 
@@ -263,7 +327,7 @@
 
   @Test(timeout = 10000)
   public void testUnexpectedEvent() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestUnexpectedEvent-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
 
     ConnectedWatcher watcher = new ConnectedWatcher();
     try (ZooKeeper zk = new ZooKeeper(getCluster().getZooKeepers(), 30000, watcher)) {
@@ -275,7 +339,8 @@
 
       zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-      ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+      ZooLock zl =
+          new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
       assertFalse(zl.isLocked());
 
@@ -284,7 +349,7 @@
 
       TestALW lw = new TestALW();
 
-      zl.lockAsync(lw, "test1".getBytes(UTF_8));
+      zl.lock(lw, "test1".getBytes(UTF_8));
 
       lw.waitForChanges(1);
 
@@ -306,11 +371,262 @@
 
   }
 
+  @Test(timeout = 60000)
+  public void testLockSerial() throws Exception {
+    String parent = "/zlretryLockSerial";
+
+    ConnectedWatcher watcher = new ConnectedWatcher();
+    try (ZooKeeperWrapper zk1 = new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher);
+        ZooKeeperWrapper zk2 = new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher)) {
+
+      zk1.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+      zk2.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+
+      while (!watcher.isConnected()) {
+        Thread.sleep(200);
+      }
+
+      // Create the parent node
+      zk1.createOnce(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+      ZooReaderWriter zrw1 = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret") {
+        @Override
+        public ZooKeeper getZooKeeper() {
+          return zk1;
+        }
+      };
+
+      final RetryLockWatcher zlw1 = new RetryLockWatcher();
+      ZooLock zl1 =
+          new ZooLock(zrw1, parent, UUID.fromString("00000000-0000-0000-0000-aaaaaaaaaaaa"));
+      zl1.lock(zlw1, "test1".getBytes(UTF_8));
+      // The call above creates two nodes in ZK because of the overridden create method in
+      // ZooKeeperWrapper.
+      // The nodes created are:
+      // zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000
+      // zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001
+      //
+      // ZooLock should realize this and remove the latter one and place a watcher on the first one
+      // in case the ZooKeeper ephemeral node is deleted by some external process.
+      // Lastly, because zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000 is the first child,
+      // zl1 assumes that it has the lock.
+
+      ZooReaderWriter zrw2 = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret") {
+        @Override
+        public ZooKeeper getZooKeeper() {
+          return zk2;
+        }
+      };
+
+      final RetryLockWatcher zlw2 = new RetryLockWatcher();
+      ZooLock zl2 =
+          new ZooLock(zrw2, parent, UUID.fromString("00000000-0000-0000-0000-bbbbbbbbbbbb"));
+      zl2.lock(zlw2, "test1".getBytes(UTF_8));
+      // The call above creates two nodes in ZK because of the overridden create method in
+      // ZooKeeperWrapper.
+      // The nodes created are:
+      // zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000002
+      // zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000003
+      //
+      // ZooLock should realize this and remove the latter one and place a watcher on the first one
+      // in case
+      // the ZooKeeper ephemeral node is deleted by some external process.
+      // Because zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000002 is not the first child in the
+      // list, it places a watcher on zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000
+      // so that it may try to acquire the lock when
+      // zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000 is removed.
+
+      assertTrue(zlw1.isLockHeld());
+      assertFalse(zlw2.isLockHeld());
+
+      List<String> children = zk1.getChildren(parent, false);
+      assertTrue(children.contains("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000"));
+      assertFalse("this node should have been deleted",
+          children.contains("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001"));
+      assertTrue(children.contains("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000002"));
+      assertFalse("this node should have been deleted",
+          children.contains("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000003"));
+
+      assertNull(zl1.getWatching());
+      assertEquals("/zlretryLockSerial/zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000",
+          zl2.getWatching());
+
+      zl1.unlock();
+      assertFalse(zlw1.isLockHeld());
+      zk1.close();
+
+      while (!zlw2.isLockHeld()) {
+        LockSupport.parkNanos(50);
+      }
+
+      assertTrue(zlw2.isLockHeld());
+      zl2.unlock();
+      zk2.close();
+
+    }
+
+  }
+
+  static class LockWorker implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LockWorker.class);
+
+    private final String parent;
+    private final UUID uuid;
+    private final CountDownLatch getLockLatch;
+    private final CountDownLatch lockCompletedLatch;
+    private final CountDownLatch unlockLatch = new CountDownLatch(1);
+    private final RetryLockWatcher lockWatcher = new RetryLockWatcher();
+    private volatile Exception ex = null;
+
+    public LockWorker(final String parent, final UUID uuid, final CountDownLatch lockLatch,
+        final CountDownLatch lockCompletedLatch) {
+      this.parent = parent;
+      this.uuid = uuid;
+      this.getLockLatch = lockLatch;
+      this.lockCompletedLatch = lockCompletedLatch;
+    }
+
+    public void unlock() {
+      unlockLatch.countDown();
+    }
+
+    public boolean holdsLock() {
+      return lockWatcher.isLockHeld();
+    }
+
+    @Override
+    public void run() {
+      try {
+        ConnectedWatcher watcher = new ConnectedWatcher();
+        try (ZooKeeperWrapper zk =
+            new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher)) {
+          zk.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+          while (!watcher.isConnected()) {
+            Thread.sleep(50);
+          }
+          ZooReaderWriter zrw = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret") {
+            @Override
+            public ZooKeeper getZooKeeper() {
+              return zk;
+            }
+          };
+          ZooLock zl = new ZooLock(zrw, parent, uuid);
+          getLockLatch.countDown(); // signal we are done
+          getLockLatch.await(); // wait for others to finish
+          zl.lock(lockWatcher, "test1".getBytes(UTF_8)); // race to the lock
+          lockCompletedLatch.countDown();
+          unlockLatch.await();
+          zl.unlock();
+        }
+      } catch (Exception e) {
+        LOG.error("Error in LockWorker.run() for {}", uuid, e);
+        ex = e;
+      }
+    }
+
+    public Throwable getException() {
+      return ex;
+    }
+
+    @Override
+    public String toString() {
+      return "LockWorker [name=" + uuid + ", holdsLock()=" + holdsLock() + "]";
+    }
+
+  }
+
+  private int parseLockWorkerName(String child) {
+    if (child.startsWith("zlock#00000000-0000-0000-0000-000000000000#")) {
+      return 0;
+    } else if (child.startsWith("zlock#00000000-0000-0000-0000-111111111111#")) {
+      return 1;
+    } else if (child.startsWith("zlock#00000000-0000-0000-0000-222222222222#")) {
+      return 2;
+    } else if (child.startsWith("zlock#00000000-0000-0000-0000-333333333333#")) {
+      return 3;
+    } else {
+      return -1;
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testLockParallel() throws Exception {
+    String parent = "/zlParallel";
+
+    ConnectedWatcher watcher = new ConnectedWatcher();
+    try (ZooKeeperWrapper zk = new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher)) {
+      zk.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+
+      while (!watcher.isConnected()) {
+        Thread.sleep(50);
+      }
+      // Create the parent node
+      zk.createOnce(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+      int numWorkers = 4;
+      final CountDownLatch getLockLatch = new CountDownLatch(numWorkers);
+      final CountDownLatch lockFinishedLatch = new CountDownLatch(numWorkers);
+      final List<LockWorker> workers = new ArrayList<>(numWorkers);
+      final List<Thread> threads = new ArrayList<>(numWorkers);
+      for (int i = 0; i < numWorkers; i++) {
+        UUID uuid = UUID.fromString(
+            "00000000-0000-0000-0000-aaaaaaaaaaaa".replaceAll("a", Integer.toString(i)));
+        LockWorker w = new LockWorker(parent, uuid, getLockLatch, lockFinishedLatch);
+        Thread t = new Thread(w);
+        workers.add(w);
+        threads.add(t);
+        t.start();
+      }
+
+      workers.forEach(w -> assertNull(w.getException()));
+      getLockLatch.await(); // Threads compete for lock
+      workers.forEach(w -> assertNull(w.getException()));
+      lockFinishedLatch.await(); // Threads lock logic complete
+      workers.forEach(w -> assertNull(w.getException()));
+
+      for (int i = 4; i > 0; i--) {
+        List<String> children =
+            ZooLock.validateAndSortChildrenByLockPrefix(parent, zk.getChildren(parent, false));
+        while (children.size() != i) {
+          Thread.sleep(100);
+          children = zk.getChildren(parent, false);
+        }
+        assertEquals(i, children.size());
+        String first = children.get(0);
+        int workerWithLock = parseLockWorkerName(first);
+        LockWorker worker = workers.get(workerWithLock);
+        assertTrue(worker.holdsLock());
+        workers.forEach(w -> {
+          if (w != worker) {
+            assertFalse(w.holdsLock());
+          }
+        });
+        worker.unlock();
+        Thread.sleep(100); // need to wait here so that the watchers fire.
+      }
+
+      workers.forEach(w -> assertFalse(w.holdsLock()));
+      workers.forEach(w -> assertNull(w.getException()));
+      assertEquals(0, zk.getChildren(parent, false).size());
+
+      threads.forEach(t -> {
+        try {
+          t.join();
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      });
+    }
+
+  }
+
   @Test(timeout = 10000)
   public void testTryLock() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestTryLock-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
 
-    ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+    ZooLock zl =
+        new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
     ConnectedWatcher watcher = new ConnectedWatcher();
     try (ZooKeeper zk = new ZooKeeper(getCluster().getZooKeepers(), 30000, watcher)) {
@@ -346,7 +662,7 @@
 
   @Test(timeout = 10000)
   public void testChangeData() throws Exception {
-    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+    String parent = "/zltestChangeData-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
     ConnectedWatcher watcher = new ConnectedWatcher();
     try (ZooKeeper zk = new ZooKeeper(getCluster().getZooKeepers(), 30000, watcher)) {
       zk.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
@@ -357,11 +673,12 @@
 
       zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-      ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+      ZooLock zl =
+          new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
 
       TestALW lw = new TestALW();
 
-      zl.lockAsync(lw, "test1".getBytes(UTF_8));
+      zl.lock(lw, "test1".getBytes(UTF_8));
       assertEquals("test1", new String(zk.getData(zl.getLockPath(), null, null)));
 
       zl.replaceLockData("test2".getBytes(UTF_8));
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index a43f2ba..a3d302f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -32,6 +32,7 @@
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Scanner;
@@ -94,7 +95,7 @@
     String zPath = c.getZooKeeperRoot() + "/testLock";
     ZooReaderWriter zoo = c.getZooReaderWriter();
     zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.OVERWRITE);
-    ZooLock zl = new ZooLock(zoo, zPath);
+    ZooLock zl = new ZooLock(zoo, zPath, UUID.randomUUID());
     boolean gotLock = zl.tryLock(new LockWatcher() {
 
       @SuppressFBWarnings(value = "DM_EXIT",
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 658b721..63e803a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -24,6 +24,7 @@
 import java.security.SecureRandom;
 import java.util.HashMap;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
@@ -119,7 +120,7 @@
     ZooReaderWriter zoo = context.getZooReaderWriter();
     zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
 
-    ZooLock zlock = new ZooLock(zoo, zPath);
+    ZooLock zlock = new ZooLock(zoo, zPath, UUID.randomUUID());
 
     LockWatcher lw = new LockWatcher() {