DL-86: Improve handling of lock conflicts in zk session lock

merge twitter's change from Sijie Guo.

Author: Jordan Bull <jbull@twitter.com>
Author: Sijie Guo <sijieg@twitter.com>

Reviewers: Leigh Stewart <lstewart@apache.org>

Closes #58 from sijie/merge/DL-86
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
index 87894dc..dc57d55 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
@@ -480,6 +480,31 @@
         return id;
     }
 
+    static boolean areLockWaitersInSameSession(String node1, String node2) {
+        String[] parts1 = node1.split("_");
+        String[] parts2 = node2.split("_");
+        if (parts1.length != 4 || parts2.length != 4) {
+            return node1.equals(node2);
+        }
+        if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) {
+            return node1.equals(node2);
+        }
+        long sessionOwner1 = Long.parseLong(parts1[2].substring(1));
+        long sessionOwner2 = Long.parseLong(parts2[2].substring(1));
+        if (sessionOwner1 != sessionOwner2) {
+            return false;
+        }
+        String clientId1, clientId2;
+        try {
+            clientId1 = URLDecoder.decode(parts1[1], UTF_8.name());
+            clientId2 = URLDecoder.decode(parts2[1], UTF_8.name());
+            return clientId1.equals(clientId2);
+        } catch (UnsupportedEncodingException e) {
+            // if failed to parse client id, we have to get client id by zookeeper#getData.
+            return node1.equals(node2);
+        }
+    }
+
     /**
      * Get client id and its ephemeral owner.
      *
@@ -1209,17 +1234,19 @@
             @Override
             public void execute() {
                 boolean shouldWatch;
+                final boolean shouldClaimOwnership;
                 if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) {
                     // if the current owner is the znode left from previous session
                     // we should watch it and claim ownership
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.",
                             new Object[] { myNode, lockPath, currentOwner });
-                } else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode)) {
+                } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) {
                     // I found that my sibling is the current owner with same lock id (client id & session id)
                     // It must be left by any race condition from same zookeeper client
-                    // I would watch owner instead of sibling
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.",
                             new Object[]{myNode, lockPath, lockId, siblingNode});
                 } else {
@@ -1230,6 +1257,7 @@
                                     new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()});
                         }
                     }
+                    shouldClaimOwnership = false;
                 }
 
                 // watch sibling for lock ownership
@@ -1247,8 +1275,7 @@
                                     }
 
                                     if (KeeperException.Code.OK.intValue() == rc) {
-                                        if (siblingNode.equals(ownerNode) &&
-                                                (lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) {
+                                        if (shouldClaimOwnership) {
                                             // watch owner successfully
                                             LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
                                                     new Object[]{ myNode, lockPath, ownerNode });
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
index 629538e..054d714 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
@@ -180,6 +180,28 @@
     }
 
     @Test(timeout = 60000)
+    public void testAreLockWaitersInSameSession() throws Exception {
+        ZooKeeper zk = zkc.get();
+
+        String lockPath = "/test-are-lock-waiters-in-same-session";
+        String clientId1 = "test-are-lock-waiters-in-same-session-1";
+        String clientId2 = "test-are-lock-waiters-in-same-session-2";
+
+        createLockPath(zk, lockPath);
+
+        String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+        String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2));
+        String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+
+        assertEquals(node1 + " and " + node3 + " should be in same session.",
+                true, areLockWaitersInSameSession(node1, node3));
+        assertEquals(node1 + " and " + node2 + " should be not in same session.",
+                false, areLockWaitersInSameSession(node1, node2));
+        assertEquals(node3 + " and " + node2 + " should be not in same session.",
+                false, areLockWaitersInSameSession(node3, node2));
+    }
+
+    @Test(timeout = 60000)
     public void testExecuteLockAction() throws Exception {
         String lockPath = "/test-execute-lock-action";
         String clientId = "test-execute-lock-action-" + System.currentTimeMillis();
@@ -921,6 +943,33 @@
         lock1_1.unlock();
     }
 
+    @Test(timeout = 60000)
+    public void testLockWithMultipleSiblingWaiters() throws Exception {
+        String lockPath = "/test-lock-with-multiple-sibling-waiters";
+        String clientId = "client-id";
+
+        createLockPath(zkc.get(), lockPath);
+
+        final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+        final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+
+        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+        List<String> children = awaitWaiters(3, zkc, lockPath);
+
+        assertEquals(3, children.size());
+        assertEquals(State.CLAIMED, lock0.getLockState());
+        assertEquals(State.CLAIMED, lock1.getLockState());
+        assertEquals(State.CLAIMED, lock2.getLockState());
+
+        lock0.unlock();
+        lock1.unlock();
+        lock2.unlock();
+    }
+
     /**
      * Immediate lock and unlock first lock
      * @throws Exception