DL-86: Improve handling of lock conflicts in zk session lock
merge twitter's change from Sijie Guo.
Author: Jordan Bull <jbull@twitter.com>
Author: Sijie Guo <sijieg@twitter.com>
Reviewers: Leigh Stewart <lstewart@apache.org>
Closes #58 from sijie/merge/DL-86
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
index 87894dc..dc57d55 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
@@ -480,6 +480,31 @@
return id;
}
+ static boolean areLockWaitersInSameSession(String node1, String node2) {
+ String[] parts1 = node1.split("_");
+ String[] parts2 = node2.split("_");
+ if (parts1.length != 4 || parts2.length != 4) {
+ return node1.equals(node2);
+ }
+ if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) {
+ return node1.equals(node2);
+ }
+ long sessionOwner1 = Long.parseLong(parts1[2].substring(1));
+ long sessionOwner2 = Long.parseLong(parts2[2].substring(1));
+ if (sessionOwner1 != sessionOwner2) {
+ return false;
+ }
+ String clientId1, clientId2;
+ try {
+ clientId1 = URLDecoder.decode(parts1[1], UTF_8.name());
+ clientId2 = URLDecoder.decode(parts2[1], UTF_8.name());
+ return clientId1.equals(clientId2);
+ } catch (UnsupportedEncodingException e) {
+ // if failed to parse client id, we have to get client id by zookeeper#getData.
+ return node1.equals(node2);
+ }
+ }
+
/**
* Get client id and its ephemeral owner.
*
@@ -1209,17 +1234,19 @@
@Override
public void execute() {
boolean shouldWatch;
+ final boolean shouldClaimOwnership;
if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) {
// if the current owner is the znode left from previous session
// we should watch it and claim ownership
shouldWatch = true;
+ shouldClaimOwnership = true;
LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.",
new Object[] { myNode, lockPath, currentOwner });
- } else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode)) {
+ } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) {
// I found that my sibling is the current owner with same lock id (client id & session id)
// It must be left by any race condition from same zookeeper client
- // I would watch owner instead of sibling
shouldWatch = true;
+ shouldClaimOwnership = true;
LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.",
new Object[]{myNode, lockPath, lockId, siblingNode});
} else {
@@ -1230,6 +1257,7 @@
new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()});
}
}
+ shouldClaimOwnership = false;
}
// watch sibling for lock ownership
@@ -1247,8 +1275,7 @@
}
if (KeeperException.Code.OK.intValue() == rc) {
- if (siblingNode.equals(ownerNode) &&
- (lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) {
+ if (shouldClaimOwnership) {
// watch owner successfully
LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
new Object[]{ myNode, lockPath, ownerNode });
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
index 629538e..054d714 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
@@ -180,6 +180,28 @@
}
@Test(timeout = 60000)
+ public void testAreLockWaitersInSameSession() throws Exception {
+ ZooKeeper zk = zkc.get();
+
+ String lockPath = "/test-are-lock-waiters-in-same-session";
+ String clientId1 = "test-are-lock-waiters-in-same-session-1";
+ String clientId2 = "test-are-lock-waiters-in-same-session-2";
+
+ createLockPath(zk, lockPath);
+
+ String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+ String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2));
+ String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+
+ assertEquals(node1 + " and " + node3 + " should be in same session.",
+ true, areLockWaitersInSameSession(node1, node3));
+ assertEquals(node1 + " and " + node2 + " should be not in same session.",
+ false, areLockWaitersInSameSession(node1, node2));
+ assertEquals(node3 + " and " + node2 + " should be not in same session.",
+ false, areLockWaitersInSameSession(node3, node2));
+ }
+
+ @Test(timeout = 60000)
public void testExecuteLockAction() throws Exception {
String lockPath = "/test-execute-lock-action";
String clientId = "test-execute-lock-action-" + System.currentTimeMillis();
@@ -921,6 +943,33 @@
lock1_1.unlock();
}
+ @Test(timeout = 60000)
+ public void testLockWithMultipleSiblingWaiters() throws Exception {
+ String lockPath = "/test-lock-with-multiple-sibling-waiters";
+ String clientId = "client-id";
+
+ createLockPath(zkc.get(), lockPath);
+
+ final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+ final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+ final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+
+ lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+ List<String> children = awaitWaiters(3, zkc, lockPath);
+
+ assertEquals(3, children.size());
+ assertEquals(State.CLAIMED, lock0.getLockState());
+ assertEquals(State.CLAIMED, lock1.getLockState());
+ assertEquals(State.CLAIMED, lock2.getLockState());
+
+ lock0.unlock();
+ lock1.unlock();
+ lock2.unlock();
+ }
+
/**
* Immediate lock and unlock first lock
* @throws Exception