Closes #637: Add a UUID to the server's ZooLock (#1866)
Modified ZooLock to create ephemeral nodes based on a supplied instance UUID parameter so that duplicate ephemeral nodes created at a given path by the same ZooLock instance could be cleaned up.
diff --git a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index 54b6eb3..be796f6 100644
--- a/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -21,8 +21,9 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.UUID;
import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
@@ -38,9 +39,25 @@
import org.slf4j.LoggerFactory;
public class ZooLock implements Watcher {
- private static final Logger log = LoggerFactory.getLogger(ZooLock.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ZooLock.class);
- public static final String LOCK_PREFIX = "zlock-";
+ private static final String ZLOCK_PREFIX = "zlock#";
+
+ private static class Prefix {
+ private final String prefix;
+
+ public Prefix(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return this.prefix;
+ }
+
+ }
+
+ private static ZooCache LOCK_DATA_ZOO_CACHE;
public enum LockLossReason {
LOCK_DELETED, SESSION_EXPIRED
@@ -55,48 +72,53 @@
void unableToMonitorLockNode(Exception e);
}
- public interface AsyncLockWatcher extends LockWatcher {
+ public interface AccumuloLockWatcher extends LockWatcher {
void acquiredLock();
void failedToAcquireLock(Exception e);
}
- private boolean lockWasAcquired;
private final String path;
+ private final Prefix vmLockPrefix;
protected final ZooReaderWriter zooKeeper;
- private String lock;
+
private LockWatcher lockWatcher;
- private boolean watchingParent = false;
- private String asyncLock;
+ private String lockNodeName;
+ private volatile boolean lockWasAcquired;
+ private volatile boolean watchingParent = false;
- public ZooLock(ZooReaderWriter zoo, String path) {
- this(new ZooCache(zoo), zoo, path);
+ private String createdNodeName;
+ private String watchingNodeName;
+
+ public ZooLock(ZooReaderWriter zoo, String path, UUID uuid) {
+ this(new ZooCache(zoo), zoo, path, uuid);
}
- public ZooLock(String zookeepers, int timeInMillis, String secret, String path) {
+ public ZooLock(String zookeepers, int timeInMillis, String secret, String path, UUID uuid) {
this(new ZooCacheFactory().getZooCache(zookeepers, timeInMillis),
- new ZooReaderWriter(zookeepers, timeInMillis, secret), path);
+ new ZooReaderWriter(zookeepers, timeInMillis, secret), path, uuid);
}
- protected ZooLock(ZooCache zc, ZooReaderWriter zrw, String path) {
- getLockDataZooCache = zc;
+ protected ZooLock(ZooCache zc, ZooReaderWriter zrw, String path, UUID uuid) {
+ LOCK_DATA_ZOO_CACHE = zc;
this.path = path;
zooKeeper = zrw;
try {
zooKeeper.getStatus(path, this);
watchingParent = true;
+ this.vmLockPrefix = new Prefix(ZLOCK_PREFIX + uuid.toString() + "#");
} catch (Exception ex) {
- log.warn("Error getting setting initial watch on ZooLock", ex);
+ LOG.error("Error setting initial watch", ex);
throw new RuntimeException(ex);
}
}
- private static class TryLockAsyncLockWatcher implements AsyncLockWatcher {
+ private static class LockWatcherWrapper implements AccumuloLockWatcher {
boolean acquiredLock = false;
LockWatcher lw;
- public TryLockAsyncLockWatcher(LockWatcher lw2) {
+ public LockWatcherWrapper(LockWatcher lw2) {
this.lw = lw2;
}
@@ -123,189 +145,366 @@
public synchronized boolean tryLock(LockWatcher lw, byte[] data)
throws KeeperException, InterruptedException {
- TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw);
+ LockWatcherWrapper lww = new LockWatcherWrapper(lw);
- lockAsync(tlalw, data);
+ lock(lww, data);
- if (tlalw.acquiredLock) {
+ if (lww.acquiredLock) {
return true;
}
- if (asyncLock != null) {
- zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
- asyncLock = null;
+ // If we didn't acquire the lock, then delete the path we just created
+ if (createdNodeName != null) {
+ String pathToDelete = path + "/" + createdNodeName;
+ LOG.debug("[{}] Failed to acquire lock in tryLock(), deleting all at path: {}", vmLockPrefix,
+ pathToDelete);
+ zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
+ createdNodeName = null;
}
return false;
}
- private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw)
- throws KeeperException, InterruptedException {
-
- if (asyncLock == null) {
- throw new IllegalStateException("Called lockAsync() when asyncLock == null");
+ /**
+ * Sort list of ephemeral nodes by their sequence number. Any ephemeral nodes that are not of the
+ * correct form will sort last.
+ *
+ * @param children
+ * list of ephemeral nodes
+ * @return list of ephemeral nodes that have valid formats, sorted by sequence number
+ */
+ public static List<String> validateAndSortChildrenByLockPrefix(String path,
+ List<String> children) {
+ LOG.debug("validating and sorting children at path {}", path);
+ List<String> validChildren = new ArrayList<>();
+ if (null == children || children.size() == 0) {
+ return validChildren;
}
+ children.forEach(c -> {
+ LOG.trace("Validating {}", c);
+ if (c.startsWith(ZLOCK_PREFIX)) {
+ String candidate = c.substring(ZLOCK_PREFIX.length() + 1);
+ if (candidate.contains("#")) {
+ int idx = candidate.indexOf('#');
+ String uuid = candidate.substring(0, idx - 1);
+ String sequenceNum = candidate.substring(idx + 1);
+ try {
+ LOG.trace("Testing uuid format of {}", uuid);
+ UUID.fromString(uuid);
+ if (sequenceNum.length() == 10) {
+ try {
+ LOG.trace("Testing number format of {}", sequenceNum);
+ Integer.parseInt(sequenceNum);
+ validChildren.add(c);
+ } catch (NumberFormatException e) {
+ LOG.warn("Child found with invalid sequence format: {} (not a number)", c);
+ }
+ } else {
+ LOG.warn("Child found with invalid sequence format: {} (not 10 characters)", c);
+ }
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Child found with invalid UUID format: {}", c);
+ }
+ } else {
+ LOG.warn("Child found with invalid format: {} (does not contain second '#')", c);
+ }
+ } else {
+ LOG.warn("Child found with invalid format: {} (does not start with {})", c, ZLOCK_PREFIX);
+ }
+ });
- List<String> children = zooKeeper.getChildren(path);
+ if (validChildren.size() > 1) {
+ validChildren.sort(new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
- if (!children.contains(myLock)) {
- throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock);
+ // Lock should be of the form:
+ // zlock#UUID#sequenceNumber
+ // Example:
+ // zlock#44755fbe-1c9e-40b3-8458-03abaf950d7e#0000000000
+ int secondHashIdx = 43;
+ return Integer.valueOf(o1.substring(secondHashIdx))
+ .compareTo(Integer.valueOf(o2.substring(secondHashIdx)));
+ }
+ });
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Children nodes: {}", validChildren.size());
+ validChildren.forEach(c -> LOG.debug("- {}", c));
+ }
+ return validChildren;
+ }
- Collections.sort(children);
- if (log.isTraceEnabled()) {
- log.trace("Candidate lock nodes");
- for (String child : children) {
- log.trace("- {}", child);
+ /**
+ * Given a pre-sorted set of children ephemeral nodes where the node name is of the form
+ * "zlock#UUID#sequenceNumber", find the ephemeral node that sorts before the ephemeralNode
+ * parameter with the lowest sequence number
+ *
+ * @param children
+ * list of sequential ephemera nodes, already sorted
+ * @param ephemeralNode
+ * starting node for the search
+ * @return next lowest prefix with the lowest sequence number
+ */
+ public static String findLowestPrevPrefix(final List<String> children,
+ final String ephemeralNode) {
+ int idx = children.indexOf(ephemeralNode);
+ // Get the prefix from the prior ephemeral node
+ String prev = children.get(idx - 1);
+ int prefixIdx = prev.lastIndexOf('#');
+ String prevPrefix = prev.substring(0, prefixIdx);
+
+ // Find the lowest sequential ephemeral node with prevPrefix
+ int i = 2;
+ String lowestPrevNode = prev;
+ while ((idx - i) >= 0) {
+ prev = children.get(idx - i);
+ i++;
+ if (prev.startsWith(prevPrefix)) {
+ lowestPrevNode = prev;
+ } else {
+ break;
}
}
+ return lowestPrevNode;
+ }
- if (children.get(0).equals(myLock)) {
- log.trace("First candidate is my lock, acquiring");
+ private synchronized void determineLockOwnership(final String createdEphemeralNode,
+ final AccumuloLockWatcher lw) throws KeeperException, InterruptedException {
+
+ if (createdNodeName == null) {
+ throw new IllegalStateException(
+ "Called determineLockOwnership() when ephemeralNodeName == null");
+ }
+
+ List<String> children = validateAndSortChildrenByLockPrefix(path, zooKeeper.getChildren(path));
+
+ if (null == children || !children.contains(createdEphemeralNode)) {
+ LOG.error("Expected ephemeral node {} to be in the list of children {}", createdEphemeralNode,
+ children);
+ throw new RuntimeException(
+ "Lock attempt ephemeral node no longer exist " + createdEphemeralNode);
+ }
+
+ if (children.get(0).equals(createdEphemeralNode)) {
+ LOG.debug("[{}] First candidate is my lock, acquiring...", vmLockPrefix);
if (!watchingParent) {
throw new IllegalStateException(
"Can not acquire lock, no longer watching parent : " + path);
}
this.lockWatcher = lw;
- this.lock = myLock;
- asyncLock = null;
+ this.lockNodeName = createdEphemeralNode;
+ createdNodeName = null;
lockWasAcquired = true;
lw.acquiredLock();
- return;
- }
- String prev = null;
- for (String child : children) {
- if (child.equals(myLock)) {
- break;
- }
+ } else {
+ LOG.debug("[{}] Lock held by another process with ephemeral node: {}", vmLockPrefix,
+ children.get(0));
- prev = child;
- }
+ String lowestPrevNode = findLowestPrevPrefix(children, createdEphemeralNode);
- final String lockToWatch = path + "/" + prev;
- log.trace("Establishing watch on {}", lockToWatch);
- Stat stat = zooKeeper.getStatus(lockToWatch, new Watcher() {
+ watchingNodeName = path + "/" + lowestPrevNode;
+ final String nodeToWatch = watchingNodeName;
+ LOG.debug("[{}] Establishing watch on prior node {}", vmLockPrefix, nodeToWatch);
+ Watcher priorNodeWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("[{}] Processing event:", vmLockPrefix);
+ LOG.trace("- type {}", event.getType());
+ LOG.trace("- path {}", event.getPath());
+ LOG.trace("- state {}", event.getState());
+ }
+ boolean renew = true;
+ if (event.getType() == EventType.NodeDeleted && event.getPath().equals(nodeToWatch)) {
+ LOG.debug("[{}] Detected deletion of prior node {}, attempting to acquire lock",
+ vmLockPrefix, nodeToWatch);
+ synchronized (ZooLock.this) {
+ try {
+ if (createdNodeName != null) {
+ determineLockOwnership(createdEphemeralNode, lw);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] While waiting for another lock {}, {} was deleted", vmLockPrefix,
+ nodeToWatch, createdEphemeralNode);
+ }
+ } catch (Exception e) {
+ if (lockNodeName == null) {
+ // have not acquired lock yet
+ lw.failedToAcquireLock(e);
+ }
+ }
+ }
+ renew = false;
+ }
- @Override
- public void process(WatchedEvent event) {
- if (log.isTraceEnabled()) {
- log.trace("Processing event:");
- log.trace("- type {}", event.getType());
- log.trace("- path {}", event.getPath());
- log.trace("- state {}", event.getState());
- }
- boolean renew = true;
- if (event.getType() == EventType.NodeDeleted && event.getPath().equals(lockToWatch)) {
- log.trace("Detected deletion of {}, attempting to acquire lock", lockToWatch);
- synchronized (ZooLock.this) {
+ if (event.getState() == KeeperState.Expired
+ || event.getState() == KeeperState.Disconnected) {
+ synchronized (ZooLock.this) {
+ if (lockNodeName == null) {
+ LOG.info("Zookeeper Session expired / disconnected");
+ lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
+ }
+ }
+ renew = false;
+ }
+ if (renew) {
+ LOG.debug("[{}] Renewing watch on prior node {}", vmLockPrefix, nodeToWatch);
try {
- if (asyncLock != null) {
- lockAsync(myLock, lw);
- } else if (log.isTraceEnabled()) {
- log.trace("While waiting for another lock {} {} was deleted", lockToWatch, myLock);
+ Stat restat = zooKeeper.getStatus(nodeToWatch, this);
+ if (restat == null) {
+ // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+ // created a Watcher on a node that does not exist. Delete the watcher we just
+ // created.
+ zooKeeper.getZooKeeper().removeWatches(nodeToWatch, this, WatcherType.Any, true);
+ determineLockOwnership(createdEphemeralNode, lw);
}
- } catch (Exception e) {
- if (lock == null) {
- // have not acquired lock yet
- lw.failedToAcquireLock(e);
- }
+ } catch (KeeperException | InterruptedException e) {
+ lw.failedToAcquireLock(new Exception("Failed to renew watch on other master node"));
}
}
- renew = false;
}
- if (event.getState() == KeeperState.Expired
- || event.getState() == KeeperState.Disconnected) {
- synchronized (ZooLock.this) {
- if (lock == null) {
- lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
- }
- }
- renew = false;
- }
- if (renew) {
- log.trace("Renewing watch on {}", lockToWatch);
- try {
- Stat restat = zooKeeper.getStatus(lockToWatch, this);
- if (restat == null) {
- lockAsync(myLock, lw);
- }
- } catch (KeeperException | InterruptedException e) {
- lw.failedToAcquireLock(new Exception("Failed to renew watch on other master node"));
- }
- }
+ };
+
+ Stat stat = zooKeeper.getStatus(nodeToWatch, priorNodeWatcher);
+ if (stat == null) {
+ // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+ // created a Watcher on a node that does not exist. Delete the watcher we just created.
+ zooKeeper.getZooKeeper().removeWatches(nodeToWatch, priorNodeWatcher, WatcherType.Any,
+ true);
+ determineLockOwnership(createdEphemeralNode, lw);
}
+ }
- });
-
- if (stat == null)
- lockAsync(myLock, lw);
}
private void lostLock(LockLossReason reason) {
LockWatcher localLw = lockWatcher;
- lock = null;
+ lockNodeName = null;
lockWatcher = null;
localLw.lostLock(reason);
}
- public synchronized void lockAsync(final AsyncLockWatcher lw, byte[] data) {
+ public synchronized void lock(final AccumuloLockWatcher lw, byte[] data) {
- if (lockWatcher != null || lock != null || asyncLock != null) {
+ if (lockWatcher != null || lockNodeName != null || createdNodeName != null) {
throw new IllegalStateException();
}
lockWasAcquired = false;
try {
- final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
- log.trace("Ephemeral node {} created", asyncLockPath);
- Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+ final String lockPathPrefix = path + "/" + vmLockPrefix.toString();
+ // Implement recipe at https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Locks
+ // except that instead of the ephemeral lock node being of the form guid-lock- use lock-guid-.
+ // Another deviation from the recipe is that we cleanup any extraneous ephemeral nodes that
+ // were created.
+ final String createPath = zooKeeper.putEphemeralSequential(lockPathPrefix, data);
+ LOG.debug("[{}] Ephemeral node {} created", vmLockPrefix, createPath);
+
+ // It's possible that the call above was retried several times and multiple ephemeral nodes
+ // were created but the client missed the response for some reason. Find the ephemeral nodes
+ // with this ZLOCK_UUID and lowest sequential number.
+ List<String> children =
+ validateAndSortChildrenByLockPrefix(path, zooKeeper.getChildren(path));
+ if (null == children || !children.contains(createPath.substring(path.length() + 1))) {
+ LOG.error("Expected ephemeral node {} to be in the list of children {}", createPath,
+ children);
+ throw new RuntimeException("Lock attempt ephemeral node no longer exist " + createPath);
+ }
+
+ String lowestSequentialPath = null;
+ boolean msgLoggedOnce = false;
+ for (String child : children) {
+ if (child.startsWith(vmLockPrefix.toString())) {
+ if (null == lowestSequentialPath) {
+ if (createPath.equals(path + "/" + child)) {
+ // the path returned from create is the lowest sequential one
+ lowestSequentialPath = createPath;
+ break;
+ }
+ lowestSequentialPath = path + "/" + child;
+ LOG.debug("[{}] lowest sequential node found: {}", vmLockPrefix, lowestSequentialPath);
+ } else {
+ if (!msgLoggedOnce) {
+ LOG.info(
+ "[{}] Zookeeper client missed server response, multiple ephemeral child nodes created at {}",
+ vmLockPrefix, lockPathPrefix);
+ msgLoggedOnce = true;
+ }
+ LOG.debug("[{}] higher sequential node found: {}, deleting it", vmLockPrefix, child);
+ zooKeeper.delete(path + "/" + child);
+ }
+ }
+ }
+ final String pathForWatcher = lowestSequentialPath;
+
+ // Set a watcher on the lowest sequential node that we created, this handles the case
+ // where the node we created is deleted or if this client becomes disconnected.
+ LOG.debug("[{}] Setting watcher on {}", vmLockPrefix, pathForWatcher);
+ Watcher watcherForNodeWeCreated = new Watcher() {
private void failedToAcquireLock() {
+ LOG.debug("[{}] Lock deleted before acquired, setting createdNodeName {} to null",
+ vmLockPrefix, createdNodeName);
lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
- asyncLock = null;
+ createdNodeName = null;
}
@Override
public void process(WatchedEvent event) {
synchronized (ZooLock.this) {
- if (lock != null && event.getType() == EventType.NodeDeleted
- && event.getPath().equals(path + "/" + lock)) {
+ if (lockNodeName != null && event.getType() == EventType.NodeDeleted
+ && event.getPath().equals(path + "/" + lockNodeName)) {
+ LOG.debug("[{}] {} was deleted", vmLockPrefix, lockNodeName);
lostLock(LockLossReason.LOCK_DELETED);
- } else if (asyncLock != null && event.getType() == EventType.NodeDeleted
- && event.getPath().equals(path + "/" + asyncLock)) {
+ } else if (createdNodeName != null && event.getType() == EventType.NodeDeleted
+ && event.getPath().equals(path + "/" + createdNodeName)) {
+ LOG.debug("[{}] {} was deleted", vmLockPrefix, createdNodeName);
failedToAcquireLock();
} else if (event.getState() != KeeperState.Disconnected
- && event.getState() != KeeperState.Expired && (lock != null || asyncLock != null)) {
- log.debug("Unexpected event watching lock node {} {}", event, asyncLockPath);
+ && event.getState() != KeeperState.Expired
+ && (lockNodeName != null || createdNodeName != null)) {
+ LOG.debug("Unexpected event watching lock node {} {}", event, pathForWatcher);
try {
- Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
+ Stat stat2 = zooKeeper.getStatus(pathForWatcher, this);
if (stat2 == null) {
- if (lock != null)
+ // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+ // created a Watcher on a node that does not exist. Delete the watcher we just
+ // created.
+ zooKeeper.getZooKeeper().removeWatches(pathForWatcher, this, WatcherType.Any,
+ true);
+
+ if (lockNodeName != null)
lostLock(LockLossReason.LOCK_DELETED);
- else if (asyncLock != null)
+ else if (createdNodeName != null)
failedToAcquireLock();
}
} catch (Exception e) {
lockWatcher.unableToMonitorLockNode(e);
- log.error("Failed to stat lock node " + asyncLockPath, e);
+ LOG.error("Failed to stat lock node: {} ", pathForWatcher, e);
}
}
}
}
- });
+ };
+ Stat stat = zooKeeper.getStatus(pathForWatcher, watcherForNodeWeCreated);
if (stat == null) {
+ // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+ // created a Watcher on a node that does not exist. Delete the watcher we just created.
+ zooKeeper.getZooKeeper().removeWatches(pathForWatcher, watcherForNodeWeCreated,
+ WatcherType.Any, true);
lw.failedToAcquireLock(new Exception("Lock does not exist after create"));
return;
}
- asyncLock = asyncLockPath.substring(path.length() + 1);
+ createdNodeName = pathForWatcher.substring(path.length() + 1);
- lockAsync(asyncLock, lw);
+ // We have created a node, do we own the lock?
+ determineLockOwnership(createdNodeName, lw);
} catch (KeeperException | InterruptedException e) {
lw.failedToAcquireLock(e);
@@ -316,12 +515,15 @@
throws InterruptedException, KeeperException {
boolean del = false;
- if (asyncLock != null) {
- zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
+ if (createdNodeName != null) {
+ String pathToDelete = path + "/" + createdNodeName;
+ LOG.debug("[{}] Deleting all at path {} due to lock cancellation", vmLockPrefix,
+ pathToDelete);
+ zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
del = true;
}
- if (lock != null) {
+ if (lockNodeName != null) {
unlock();
del = true;
}
@@ -330,37 +532,46 @@
}
public synchronized void unlock() throws InterruptedException, KeeperException {
- if (lock == null) {
+ if (lockNodeName == null) {
throw new IllegalStateException();
}
LockWatcher localLw = lockWatcher;
- String localLock = lock;
+ String localLock = lockNodeName;
- lock = null;
+ lockNodeName = null;
lockWatcher = null;
- zooKeeper.recursiveDelete(path + "/" + localLock, NodeMissingPolicy.SKIP);
+ final String pathToDelete = path + "/" + localLock;
+ LOG.debug("[{}] Deleting all at path {} due to unlock", vmLockPrefix, pathToDelete);
+ zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
localLw.lostLock(LockLossReason.LOCK_DELETED);
}
+ /**
+ * @return path of node that this lock is watching
+ */
+ public synchronized String getWatching() {
+ return watchingNodeName;
+ }
+
public synchronized String getLockPath() {
- if (lock == null) {
+ if (lockNodeName == null) {
return null;
}
- return path + "/" + lock;
+ return path + "/" + lockNodeName;
}
public synchronized String getLockName() {
- return lock;
+ return lockNodeName;
}
public synchronized LockID getLockID() {
- if (lock == null) {
+ if (lockNodeName == null) {
throw new IllegalStateException("Lock not held");
}
- return new LockID(path, lock, zooKeeper.getZooKeeper().getSessionId());
+ return new LockID(path, lockNodeName, zooKeeper.getZooKeeper().getSessionId());
}
/**
@@ -374,7 +585,7 @@
}
public synchronized boolean isLocked() {
- return lock != null;
+ return lockNodeName != null;
}
public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException {
@@ -384,11 +595,13 @@
@Override
public synchronized void process(WatchedEvent event) {
- log.debug("event {} {} {}", event.getPath(), event.getType(), event.getState());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("event {} {} {}", event.getPath(), event.getType(), event.getState());
+ }
watchingParent = false;
- if (event.getState() == KeeperState.Expired && lock != null) {
+ if (event.getState() == KeeperState.Expired && lockNodeName != null) {
lostLock(LockLossReason.SESSION_EXPIRED);
} else {
@@ -397,12 +610,12 @@
watchingParent = true;
} catch (KeeperException.ConnectionLossException ex) {
// we can't look at the lock because we aren't connected, but our session is still good
- log.warn("lost connection to zookeeper");
+ LOG.warn("lost connection to zookeeper", ex);
} catch (Exception ex) {
- if (lock != null || asyncLock != null) {
+ if (lockNodeName != null || createdNodeName != null) {
lockWatcher.unableToMonitorLockNode(ex);
- log.error("Error resetting watch on ZooLock {} {}", lock != null ? lock : asyncLock,
- event, ex);
+ LOG.error("Error resetting watch on ZooLock {} {}",
+ lockNodeName != null ? lockNodeName : createdNodeName, event, ex);
}
}
@@ -412,15 +625,12 @@
public static boolean isLockHeld(ZooCache zc, LockID lid) {
- List<String> children = zc.getChildren(lid.path);
+ List<String> children = validateAndSortChildrenByLockPrefix(lid.path, zc.getChildren(lid.path));
if (children == null || children.isEmpty()) {
return false;
}
- children = new ArrayList<>(children);
- Collections.sort(children);
-
String lockNode = children.get(0);
if (!lid.node.equals(lockNode))
return false;
@@ -431,14 +641,13 @@
public static byte[] getLockData(ZooKeeper zk, String path)
throws KeeperException, InterruptedException {
- List<String> children = zk.getChildren(path, false);
+
+ List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path, false));
if (children == null || children.isEmpty()) {
return null;
}
- Collections.sort(children);
-
String lockNode = children.get(0);
return zk.getData(path + "/" + lockNode, false, null);
@@ -447,18 +656,15 @@
public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path,
ZcStat stat) {
- List<String> children = zc.getChildren(path);
+ List<String> children = validateAndSortChildrenByLockPrefix(path, zc.getChildren(path));
if (children == null || children.isEmpty()) {
return null;
}
- children = new ArrayList<>(children);
- Collections.sort(children);
-
String lockNode = children.get(0);
- if (!lockNode.startsWith(LOCK_PREFIX)) {
+ if (!lockNode.startsWith(ZLOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
@@ -466,15 +672,13 @@
}
public static long getSessionId(ZooCache zc, String path) {
- List<String> children = zc.getChildren(path);
+
+ List<String> children = validateAndSortChildrenByLockPrefix(path, zc.getChildren(path));
if (children == null || children.isEmpty()) {
return 0;
}
- children = new ArrayList<>(children);
- Collections.sort(children);
-
String lockNode = children.get(0);
ZcStat stat = new ZcStat();
@@ -483,56 +687,52 @@
return 0;
}
- private static ZooCache getLockDataZooCache;
-
public long getSessionId() throws KeeperException, InterruptedException {
- return getSessionId(getLockDataZooCache, path);
+ return getSessionId(LOCK_DATA_ZOO_CACHE, path);
}
public static void deleteLock(ZooReaderWriter zk, String path)
throws InterruptedException, KeeperException {
- List<String> children;
- children = zk.getChildren(path);
+ List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path));
if (children == null || children.isEmpty()) {
throw new IllegalStateException("No lock is held at " + path);
}
- Collections.sort(children);
-
String lockNode = children.get(0);
- if (!lockNode.startsWith(LOCK_PREFIX)) {
+ if (!lockNode.startsWith(ZLOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
- zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.SKIP);
+ String pathToDelete = path + "/" + lockNode;
+ LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
+ zk.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
}
public static boolean deleteLock(ZooReaderWriter zk, String path, String lockData)
throws InterruptedException, KeeperException {
- List<String> children;
- children = zk.getChildren(path);
+ List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path));
if (children == null || children.isEmpty()) {
throw new IllegalStateException("No lock is held at " + path);
}
- Collections.sort(children);
-
String lockNode = children.get(0);
- if (!lockNode.startsWith(LOCK_PREFIX)) {
+ if (!lockNode.startsWith(ZLOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
byte[] data = zk.getData(path + "/" + lockNode);
if (lockData.equals(new String(data, UTF_8))) {
- zk.recursiveDelete(path + "/" + lockNode, NodeMissingPolicy.FAIL);
+ String pathToDelete = path + "/" + lockNode;
+ LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
+ zk.recursiveDelete(pathToDelete, NodeMissingPolicy.FAIL);
return true;
}
diff --git a/core/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java b/core/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
new file mode 100644
index 0000000..8d46c09
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+public class ZooLockTest {
+
+ @Test
+ public void testSortAndFindLowestPrevPrefix() throws Exception {
+ List<String> children = new ArrayList<>();
+ children.add("zlock#00000000-0000-0000-0000-ffffffffffff#0000000007");
+ children.add("zlock#00000000-0000-0000-0000-eeeeeeeeeeee#0000000010");
+ children.add("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000006");
+ children.add("zlock#00000000-0000-0000-0000-dddddddddddd#0000000008");
+ children.add("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000004");
+ children.add("zlock-123456789");
+ children.add("zlock#00000000-0000-0000-0000-cccccccccccc#0000000003");
+ children.add("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000002");
+ children.add("zlock#987654321");
+ children.add("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001");
+
+ final List<String> validChildren = ZooLock.validateAndSortChildrenByLockPrefix("", children);
+
+ assertEquals(8, validChildren.size());
+ assertEquals("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001", validChildren.get(0));
+ assertEquals("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000002", validChildren.get(1));
+ assertEquals("zlock#00000000-0000-0000-0000-cccccccccccc#0000000003", validChildren.get(2));
+ assertEquals("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000004", validChildren.get(3));
+ assertEquals("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000006", validChildren.get(4));
+ assertEquals("zlock#00000000-0000-0000-0000-ffffffffffff#0000000007", validChildren.get(5));
+ assertEquals("zlock#00000000-0000-0000-0000-dddddddddddd#0000000008", validChildren.get(6));
+ assertEquals("zlock#00000000-0000-0000-0000-eeeeeeeeeeee#0000000010", validChildren.get(7));
+
+ assertEquals("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000004",
+ ZooLock.findLowestPrevPrefix(validChildren,
+ "zlock#00000000-0000-0000-0000-ffffffffffff#0000000007"));
+
+ assertEquals("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001",
+ ZooLock.findLowestPrevPrefix(validChildren,
+ "zlock#00000000-0000-0000-0000-cccccccccccc#0000000003"));
+
+ assertEquals("zlock#00000000-0000-0000-0000-dddddddddddd#0000000008",
+ ZooLock.findLowestPrevPrefix(validChildren,
+ "zlock#00000000-0000-0000-0000-eeeeeeeeeeee#0000000010"));
+
+ assertThrows(IndexOutOfBoundsException.class, () -> {
+ ZooLock.findLowestPrevPrefix(validChildren,
+ "zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001");
+ });
+
+ assertThrows(IndexOutOfBoundsException.class, () -> {
+ ZooLock.findLowestPrevPrefix(validChildren,
+ "zlock#00000000-0000-0000-0000-XXXXXXXXXXXX#0000000099");
+ });
+ }
+
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
index b6d55b6..7574bb4 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java
@@ -56,9 +56,11 @@
final long session = 123456789L;
String serverPath = root + "/" + server;
- EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList("child"));
- EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(ZcStat.class)))
- .andAnswer(() -> {
+ String validZLockEphemeralNode = "zlock#" + UUID.randomUUID().toString() + "#0000000000";
+ EasyMock.expect(zc.getChildren(serverPath))
+ .andReturn(Collections.singletonList(validZLockEphemeralNode));
+ EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/" + validZLockEphemeralNode),
+ EasyMock.anyObject(ZcStat.class))).andAnswer(() -> {
ZcStat stat = (ZcStat) EasyMock.getCurrentArguments()[1];
stat.setEphemeralOwner(session);
return new byte[0];
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index b7a8fc8..954c1f2 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -33,6 +33,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@@ -632,8 +633,9 @@
}
};
+ UUID zooLockUUID = UUID.randomUUID();
while (true) {
- lock = new ZooLock(getContext().getZooReaderWriter(), path);
+ lock = new ZooLock(getContext().getZooReaderWriter(), path, zooLockUUID);
if (lock.tryLock(lockWatcher,
new ServerServices(addr.toString(), Service.GC_CLIENT).toString().getBytes())) {
log.debug("Got GC ZooKeeper lock");
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/Master.java b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
index cf2ec31..c36e42e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
@@ -35,6 +35,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -1343,7 +1344,7 @@
return masterLock;
}
- private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
+ private static class MasterLockWatcher implements ZooLock.AccumuloLockWatcher {
boolean acquiredLock = false;
boolean failedToAcquireLock = false;
@@ -1407,11 +1408,12 @@
final String masterClientAddress =
getHostname() + ":" + getConfiguration().getPort(Property.MANAGER_CLIENTPORT)[0];
+ UUID zooLockUUID = UUID.randomUUID();
while (true) {
MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
- masterLock = new ZooLock(context.getZooReaderWriter(), zMasterLoc);
- masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
+ masterLock = new ZooLock(context.getZooReaderWriter(), zMasterLoc, zooLockUUID);
+ masterLock.lock(masterLockWatcher, masterClientAddress.getBytes());
masterLockWatcher.waitForChange();
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index df60ab1..51aecc1 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -627,10 +628,11 @@
}
// Get a ZooLock for the monitor
+ UUID zooLockUUID = UUID.randomUUID();
while (true) {
MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
- monitorLock = new ZooLock(zoo, monitorLockPath);
- monitorLock.lockAsync(monitorLockWatcher, new byte[0]);
+ monitorLock = new ZooLock(zoo, monitorLockPath, zooLockUUID);
+ monitorLock.lock(monitorLockWatcher, new byte[0]);
monitorLockWatcher.waitForChange();
@@ -655,7 +657,7 @@
/**
* Async Watcher for monitor lock
*/
- private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher {
+ private static class MoniterLockWatcher implements ZooLock.AccumuloLockWatcher {
boolean acquiredLock = false;
boolean failedToAcquireLock = false;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bd0ec23..70cad33 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -43,6 +43,7 @@
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -639,7 +640,7 @@
throw e;
}
- tabletServerLock = new ZooLock(zoo, zPath);
+ tabletServerLock = new ZooLock(zoo, zPath, UUID.randomUUID());
LockWatcher lw = new LockWatcher() {
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java
index d3cb5e3..6629a0f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooLockIT.java
@@ -25,25 +25,33 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.lang.reflect.Field;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
import org.apache.accumulo.fate.zookeeper.ZooLock;
-import org.apache.accumulo.fate.zookeeper.ZooLock.AsyncLockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooLock.AccumuloLockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZooLockIT extends SharedMiniClusterBase {
@@ -57,6 +65,56 @@
SharedMiniClusterBase.stopMiniCluster();
}
+ static class ZooKeeperWrapper extends ZooKeeper {
+
+ public ZooKeeperWrapper(String connectString, int sessionTimeout, Watcher watcher)
+ throws IOException {
+ super(connectString, sessionTimeout, watcher);
+ }
+
+ public String createOnce(String path, byte[] data, List<ACL> acl, CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ return super.create(path, data, acl, createMode);
+ }
+
+ @Override
+ public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ // Let's simulate that the first call succeeded but the client didn't get the message,
+ // so the ZooKeeper client retries.
+ super.create(path, data, acl, createMode);
+ return super.create(path, data, acl, createMode);
+ }
+
+ }
+
+ static class RetryLockWatcher implements AccumuloLockWatcher {
+
+ private boolean lockHeld = false;
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ this.lockHeld = false;
+ }
+
+ @Override
+ public void unableToMonitorLockNode(final Exception e) {}
+
+ @Override
+ public void acquiredLock() {
+ this.lockHeld = true;
+ }
+
+ @Override
+ public void failedToAcquireLock(Exception e) {
+ this.lockHeld = false;
+ }
+
+ public boolean isLockHeld() {
+ return this.lockHeld;
+ }
+ }
+
static class ConnectedWatcher implements Watcher {
volatile boolean connected = false;
@@ -70,7 +128,7 @@
}
}
- static class TestALW implements AsyncLockWatcher {
+ static class TestALW implements AccumuloLockWatcher {
LockLossReason reason = null;
boolean locked = false;
@@ -115,9 +173,10 @@
@Test(timeout = 10000)
public void testDeleteParent() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestDeleteParent-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
assertFalse(zl.isLocked());
@@ -132,7 +191,7 @@
TestALW lw = new TestALW();
- zl.lockAsync(lw, "test1".getBytes(UTF_8));
+ zl.lock(lw, "test1".getBytes(UTF_8));
lw.waitForChanges(1);
@@ -146,15 +205,16 @@
@Test(timeout = 10000)
public void testNoParent() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestNoParent-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
assertFalse(zl.isLocked());
TestALW lw = new TestALW();
- zl.lockAsync(lw, "test1".getBytes(UTF_8));
+ zl.lock(lw, "test1".getBytes(UTF_8));
lw.waitForChanges(1);
@@ -166,18 +226,19 @@
@Test(timeout = 10000)
public void testDeleteLock() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestDeleteLock-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
ZooReaderWriter zk = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret");
zk.mkdirs(parent);
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
assertFalse(zl.isLocked());
TestALW lw = new TestALW();
- zl.lockAsync(lw, "test1".getBytes(UTF_8));
+ zl.lock(lw, "test1".getBytes(UTF_8));
lw.waitForChanges(1);
@@ -195,20 +256,21 @@
}
- @Test(timeout = 10000)
+ @Test(timeout = 15000)
public void testDeleteWaiting() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestDeleteWaiting-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
ZooReaderWriter zk = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret");
zk.mkdirs(parent);
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
assertFalse(zl.isLocked());
TestALW lw = new TestALW();
- zl.lockAsync(lw, "test1".getBytes(UTF_8));
+ zl.lock(lw, "test1".getBytes(UTF_8));
lw.waitForChanges(1);
@@ -217,23 +279,25 @@
assertNull(lw.exception);
assertNull(lw.reason);
- ZooLock zl2 = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl2 =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
TestALW lw2 = new TestALW();
- zl2.lockAsync(lw2, "test2".getBytes(UTF_8));
+ zl2.lock(lw2, "test2".getBytes(UTF_8));
assertFalse(lw2.locked);
assertFalse(zl2.isLocked());
- ZooLock zl3 = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl3 =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
TestALW lw3 = new TestALW();
- zl3.lockAsync(lw3, "test3".getBytes(UTF_8));
+ zl3.lock(lw3, "test3".getBytes(UTF_8));
- List<String> children = zk.getChildren(parent);
- Collections.sort(children);
+ List<String> children =
+ ZooLock.validateAndSortChildrenByLockPrefix(parent, zk.getChildren(parent));
zk.delete(parent + "/" + children.get(1));
@@ -263,7 +327,7 @@
@Test(timeout = 10000)
public void testUnexpectedEvent() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestUnexpectedEvent-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
ConnectedWatcher watcher = new ConnectedWatcher();
try (ZooKeeper zk = new ZooKeeper(getCluster().getZooKeepers(), 30000, watcher)) {
@@ -275,7 +339,8 @@
zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
assertFalse(zl.isLocked());
@@ -284,7 +349,7 @@
TestALW lw = new TestALW();
- zl.lockAsync(lw, "test1".getBytes(UTF_8));
+ zl.lock(lw, "test1".getBytes(UTF_8));
lw.waitForChanges(1);
@@ -306,11 +371,262 @@
}
+ @Test(timeout = 60000)
+ public void testLockSerial() throws Exception {
+ String parent = "/zlretryLockSerial";
+
+ ConnectedWatcher watcher = new ConnectedWatcher();
+ try (ZooKeeperWrapper zk1 = new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher);
+ ZooKeeperWrapper zk2 = new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher)) {
+
+ zk1.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+ zk2.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+
+ while (!watcher.isConnected()) {
+ Thread.sleep(200);
+ }
+
+ // Create the parent node
+ zk1.createOnce(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ ZooReaderWriter zrw1 = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret") {
+ @Override
+ public ZooKeeper getZooKeeper() {
+ return zk1;
+ }
+ };
+
+ final RetryLockWatcher zlw1 = new RetryLockWatcher();
+ ZooLock zl1 =
+ new ZooLock(zrw1, parent, UUID.fromString("00000000-0000-0000-0000-aaaaaaaaaaaa"));
+ zl1.lock(zlw1, "test1".getBytes(UTF_8));
+ // The call above creates two nodes in ZK because of the overridden create method in
+ // ZooKeeperWrapper.
+ // The nodes created are:
+ // zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000
+ // zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001
+ //
+ // ZooLock should realize this and remove the latter one and place a watcher on the first one
+ // in case the ZooKeeper ephemeral node is deleted by some external process.
+ // Lastly, because zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000 is the first child,
+ // zl1 assumes that it has the lock.
+
+ ZooReaderWriter zrw2 = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret") {
+ @Override
+ public ZooKeeper getZooKeeper() {
+ return zk2;
+ }
+ };
+
+ final RetryLockWatcher zlw2 = new RetryLockWatcher();
+ ZooLock zl2 =
+ new ZooLock(zrw2, parent, UUID.fromString("00000000-0000-0000-0000-bbbbbbbbbbbb"));
+ zl2.lock(zlw2, "test1".getBytes(UTF_8));
+ // The call above creates two nodes in ZK because of the overridden create method in
+ // ZooKeeperWrapper.
+ // The nodes created are:
+ // zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000002
+ // zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000003
+ //
+ // ZooLock should realize this and remove the latter one and place a watcher on the first one
+ // in case
+ // the ZooKeeper ephemeral node is deleted by some external process.
+ // Because zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000002 is not the first child in the
+ // list, it places a watcher on zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000
+ // so that it may try to acquire the lock when
+ // zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000 is removed.
+
+ assertTrue(zlw1.isLockHeld());
+ assertFalse(zlw2.isLockHeld());
+
+ List<String> children = zk1.getChildren(parent, false);
+ assertTrue(children.contains("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000"));
+ assertFalse("this node should have been deleted",
+ children.contains("zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000001"));
+ assertTrue(children.contains("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000002"));
+ assertFalse("this node should have been deleted",
+ children.contains("zlock#00000000-0000-0000-0000-bbbbbbbbbbbb#0000000003"));
+
+ assertNull(zl1.getWatching());
+ assertEquals("/zlretryLockSerial/zlock#00000000-0000-0000-0000-aaaaaaaaaaaa#0000000000",
+ zl2.getWatching());
+
+ zl1.unlock();
+ assertFalse(zlw1.isLockHeld());
+ zk1.close();
+
+ while (!zlw2.isLockHeld()) {
+ LockSupport.parkNanos(50);
+ }
+
+ assertTrue(zlw2.isLockHeld());
+ zl2.unlock();
+ zk2.close();
+
+ }
+
+ }
+
+ static class LockWorker implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LockWorker.class);
+
+ private final String parent;
+ private final UUID uuid;
+ private final CountDownLatch getLockLatch;
+ private final CountDownLatch lockCompletedLatch;
+ private final CountDownLatch unlockLatch = new CountDownLatch(1);
+ private final RetryLockWatcher lockWatcher = new RetryLockWatcher();
+ private volatile Exception ex = null;
+
+ public LockWorker(final String parent, final UUID uuid, final CountDownLatch lockLatch,
+ final CountDownLatch lockCompletedLatch) {
+ this.parent = parent;
+ this.uuid = uuid;
+ this.getLockLatch = lockLatch;
+ this.lockCompletedLatch = lockCompletedLatch;
+ }
+
+ public void unlock() {
+ unlockLatch.countDown();
+ }
+
+ public boolean holdsLock() {
+ return lockWatcher.isLockHeld();
+ }
+
+ @Override
+ public void run() {
+ try {
+ ConnectedWatcher watcher = new ConnectedWatcher();
+ try (ZooKeeperWrapper zk =
+ new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher)) {
+ zk.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+ while (!watcher.isConnected()) {
+ Thread.sleep(50);
+ }
+ ZooReaderWriter zrw = new ZooReaderWriter(getCluster().getZooKeepers(), 30000, "secret") {
+ @Override
+ public ZooKeeper getZooKeeper() {
+ return zk;
+ }
+ };
+ ZooLock zl = new ZooLock(zrw, parent, uuid);
+ getLockLatch.countDown(); // signal we are done
+ getLockLatch.await(); // wait for others to finish
+ zl.lock(lockWatcher, "test1".getBytes(UTF_8)); // race to the lock
+ lockCompletedLatch.countDown();
+ unlockLatch.await();
+ zl.unlock();
+ }
+ } catch (Exception e) {
+ LOG.error("Error in LockWorker.run() for {}", uuid, e);
+ ex = e;
+ }
+ }
+
+ public Throwable getException() {
+ return ex;
+ }
+
+ @Override
+ public String toString() {
+ return "LockWorker [name=" + uuid + ", holdsLock()=" + holdsLock() + "]";
+ }
+
+ }
+
+ private int parseLockWorkerName(String child) {
+ if (child.startsWith("zlock#00000000-0000-0000-0000-000000000000#")) {
+ return 0;
+ } else if (child.startsWith("zlock#00000000-0000-0000-0000-111111111111#")) {
+ return 1;
+ } else if (child.startsWith("zlock#00000000-0000-0000-0000-222222222222#")) {
+ return 2;
+ } else if (child.startsWith("zlock#00000000-0000-0000-0000-333333333333#")) {
+ return 3;
+ } else {
+ return -1;
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testLockParallel() throws Exception {
+ String parent = "/zlParallel";
+
+ ConnectedWatcher watcher = new ConnectedWatcher();
+ try (ZooKeeperWrapper zk = new ZooKeeperWrapper(getCluster().getZooKeepers(), 30000, watcher)) {
+ zk.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
+
+ while (!watcher.isConnected()) {
+ Thread.sleep(50);
+ }
+ // Create the parent node
+ zk.createOnce(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ int numWorkers = 4;
+ final CountDownLatch getLockLatch = new CountDownLatch(numWorkers);
+ final CountDownLatch lockFinishedLatch = new CountDownLatch(numWorkers);
+ final List<LockWorker> workers = new ArrayList<>(numWorkers);
+ final List<Thread> threads = new ArrayList<>(numWorkers);
+ for (int i = 0; i < numWorkers; i++) {
+ UUID uuid = UUID.fromString(
+ "00000000-0000-0000-0000-aaaaaaaaaaaa".replaceAll("a", Integer.toString(i)));
+ LockWorker w = new LockWorker(parent, uuid, getLockLatch, lockFinishedLatch);
+ Thread t = new Thread(w);
+ workers.add(w);
+ threads.add(t);
+ t.start();
+ }
+
+ workers.forEach(w -> assertNull(w.getException()));
+ getLockLatch.await(); // Threads compete for lock
+ workers.forEach(w -> assertNull(w.getException()));
+ lockFinishedLatch.await(); // Threads lock logic complete
+ workers.forEach(w -> assertNull(w.getException()));
+
+ for (int i = 4; i > 0; i--) {
+ List<String> children =
+ ZooLock.validateAndSortChildrenByLockPrefix(parent, zk.getChildren(parent, false));
+ while (children.size() != i) {
+ Thread.sleep(100);
+ children = zk.getChildren(parent, false);
+ }
+ assertEquals(i, children.size());
+ String first = children.get(0);
+ int workerWithLock = parseLockWorkerName(first);
+ LockWorker worker = workers.get(workerWithLock);
+ assertTrue(worker.holdsLock());
+ workers.forEach(w -> {
+ if (w != worker) {
+ assertFalse(w.holdsLock());
+ }
+ });
+ worker.unlock();
+ Thread.sleep(100); // need to wait here so that the watchers fire.
+ }
+
+ workers.forEach(w -> assertFalse(w.holdsLock()));
+ workers.forEach(w -> assertNull(w.getException()));
+ assertEquals(0, zk.getChildren(parent, false).size());
+
+ threads.forEach(t -> {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ });
+ }
+
+ }
+
@Test(timeout = 10000)
public void testTryLock() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestTryLock-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
ConnectedWatcher watcher = new ConnectedWatcher();
try (ZooKeeper zk = new ZooKeeper(getCluster().getZooKeepers(), 30000, watcher)) {
@@ -346,7 +662,7 @@
@Test(timeout = 10000)
public void testChangeData() throws Exception {
- String parent = "/zltest-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
+ String parent = "/zltestChangeData-" + this.hashCode() + "-l" + pdCount.incrementAndGet();
ConnectedWatcher watcher = new ConnectedWatcher();
try (ZooKeeper zk = new ZooKeeper(getCluster().getZooKeepers(), 30000, watcher)) {
zk.addAuthInfo("digest", "accumulo:secret".getBytes(UTF_8));
@@ -357,11 +673,12 @@
zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- ZooLock zl = new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent);
+ ZooLock zl =
+ new ZooLock(getCluster().getZooKeepers(), 30000, "secret", parent, UUID.randomUUID());
TestALW lw = new TestALW();
- zl.lockAsync(lw, "test1".getBytes(UTF_8));
+ zl.lock(lw, "test1".getBytes(UTF_8));
assertEquals("test1", new String(zk.getData(zl.getLockPath(), null, null)));
zl.replaceLockData("test2".getBytes(UTF_8));
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index a43f2ba..a3d302f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -32,6 +32,7 @@
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.UUID;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Scanner;
@@ -94,7 +95,7 @@
String zPath = c.getZooKeeperRoot() + "/testLock";
ZooReaderWriter zoo = c.getZooReaderWriter();
zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.OVERWRITE);
- ZooLock zl = new ZooLock(zoo, zPath);
+ ZooLock zl = new ZooLock(zoo, zPath, UUID.randomUUID());
boolean gotLock = zl.tryLock(new LockWatcher() {
@SuppressFBWarnings(value = "DM_EXIT",
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 658b721..63e803a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -24,6 +24,7 @@
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
@@ -119,7 +120,7 @@
ZooReaderWriter zoo = context.getZooReaderWriter();
zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
- ZooLock zlock = new ZooLock(zoo, zPath);
+ ZooLock zlock = new ZooLock(zoo, zPath, UUID.randomUUID());
LockWatcher lw = new LockWatcher() {