blob: be796f613bd88a23f576ca29a195ae0d1da449c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.fate.zookeeper;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZooLock implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(ZooLock.class);
private static final String ZLOCK_PREFIX = "zlock#";
private static class Prefix {
private final String prefix;
public Prefix(String prefix) {
this.prefix = prefix;
}
@Override
public String toString() {
return this.prefix;
}
}
private static ZooCache LOCK_DATA_ZOO_CACHE;
public enum LockLossReason {
LOCK_DELETED, SESSION_EXPIRED
}
public interface LockWatcher {
void lostLock(LockLossReason reason);
/**
* lost the ability to monitor the lock node, and its status is unknown
*/
void unableToMonitorLockNode(Exception e);
}
public interface AccumuloLockWatcher extends LockWatcher {
void acquiredLock();
void failedToAcquireLock(Exception e);
}
private final String path;
private final Prefix vmLockPrefix;
protected final ZooReaderWriter zooKeeper;
private LockWatcher lockWatcher;
private String lockNodeName;
private volatile boolean lockWasAcquired;
private volatile boolean watchingParent = false;
private String createdNodeName;
private String watchingNodeName;
public ZooLock(ZooReaderWriter zoo, String path, UUID uuid) {
this(new ZooCache(zoo), zoo, path, uuid);
}
public ZooLock(String zookeepers, int timeInMillis, String secret, String path, UUID uuid) {
this(new ZooCacheFactory().getZooCache(zookeepers, timeInMillis),
new ZooReaderWriter(zookeepers, timeInMillis, secret), path, uuid);
}
protected ZooLock(ZooCache zc, ZooReaderWriter zrw, String path, UUID uuid) {
LOCK_DATA_ZOO_CACHE = zc;
this.path = path;
zooKeeper = zrw;
try {
zooKeeper.getStatus(path, this);
watchingParent = true;
this.vmLockPrefix = new Prefix(ZLOCK_PREFIX + uuid.toString() + "#");
} catch (Exception ex) {
LOG.error("Error setting initial watch", ex);
throw new RuntimeException(ex);
}
}
private static class LockWatcherWrapper implements AccumuloLockWatcher {
boolean acquiredLock = false;
LockWatcher lw;
public LockWatcherWrapper(LockWatcher lw2) {
this.lw = lw2;
}
@Override
public void acquiredLock() {
acquiredLock = true;
}
@Override
public void failedToAcquireLock(Exception e) {}
@Override
public void lostLock(LockLossReason reason) {
lw.lostLock(reason);
}
@Override
public void unableToMonitorLockNode(Exception e) {
lw.unableToMonitorLockNode(e);
}
}
public synchronized boolean tryLock(LockWatcher lw, byte[] data)
throws KeeperException, InterruptedException {
LockWatcherWrapper lww = new LockWatcherWrapper(lw);
lock(lww, data);
if (lww.acquiredLock) {
return true;
}
// If we didn't acquire the lock, then delete the path we just created
if (createdNodeName != null) {
String pathToDelete = path + "/" + createdNodeName;
LOG.debug("[{}] Failed to acquire lock in tryLock(), deleting all at path: {}", vmLockPrefix,
pathToDelete);
zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
createdNodeName = null;
}
return false;
}
/**
* Sort list of ephemeral nodes by their sequence number. Any ephemeral nodes that are not of the
* correct form will sort last.
*
* @param children
* list of ephemeral nodes
* @return list of ephemeral nodes that have valid formats, sorted by sequence number
*/
public static List<String> validateAndSortChildrenByLockPrefix(String path,
List<String> children) {
LOG.debug("validating and sorting children at path {}", path);
List<String> validChildren = new ArrayList<>();
if (null == children || children.size() == 0) {
return validChildren;
}
children.forEach(c -> {
LOG.trace("Validating {}", c);
if (c.startsWith(ZLOCK_PREFIX)) {
String candidate = c.substring(ZLOCK_PREFIX.length() + 1);
if (candidate.contains("#")) {
int idx = candidate.indexOf('#');
String uuid = candidate.substring(0, idx - 1);
String sequenceNum = candidate.substring(idx + 1);
try {
LOG.trace("Testing uuid format of {}", uuid);
UUID.fromString(uuid);
if (sequenceNum.length() == 10) {
try {
LOG.trace("Testing number format of {}", sequenceNum);
Integer.parseInt(sequenceNum);
validChildren.add(c);
} catch (NumberFormatException e) {
LOG.warn("Child found with invalid sequence format: {} (not a number)", c);
}
} else {
LOG.warn("Child found with invalid sequence format: {} (not 10 characters)", c);
}
} catch (IllegalArgumentException e) {
LOG.warn("Child found with invalid UUID format: {}", c);
}
} else {
LOG.warn("Child found with invalid format: {} (does not contain second '#')", c);
}
} else {
LOG.warn("Child found with invalid format: {} (does not start with {})", c, ZLOCK_PREFIX);
}
});
if (validChildren.size() > 1) {
validChildren.sort(new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// Lock should be of the form:
// zlock#UUID#sequenceNumber
// Example:
// zlock#44755fbe-1c9e-40b3-8458-03abaf950d7e#0000000000
int secondHashIdx = 43;
return Integer.valueOf(o1.substring(secondHashIdx))
.compareTo(Integer.valueOf(o2.substring(secondHashIdx)));
}
});
}
if (LOG.isDebugEnabled()) {
LOG.debug("Children nodes: {}", validChildren.size());
validChildren.forEach(c -> LOG.debug("- {}", c));
}
return validChildren;
}
/**
* Given a pre-sorted set of children ephemeral nodes where the node name is of the form
* "zlock#UUID#sequenceNumber", find the ephemeral node that sorts before the ephemeralNode
* parameter with the lowest sequence number
*
* @param children
* list of sequential ephemera nodes, already sorted
* @param ephemeralNode
* starting node for the search
* @return next lowest prefix with the lowest sequence number
*/
public static String findLowestPrevPrefix(final List<String> children,
final String ephemeralNode) {
int idx = children.indexOf(ephemeralNode);
// Get the prefix from the prior ephemeral node
String prev = children.get(idx - 1);
int prefixIdx = prev.lastIndexOf('#');
String prevPrefix = prev.substring(0, prefixIdx);
// Find the lowest sequential ephemeral node with prevPrefix
int i = 2;
String lowestPrevNode = prev;
while ((idx - i) >= 0) {
prev = children.get(idx - i);
i++;
if (prev.startsWith(prevPrefix)) {
lowestPrevNode = prev;
} else {
break;
}
}
return lowestPrevNode;
}
private synchronized void determineLockOwnership(final String createdEphemeralNode,
final AccumuloLockWatcher lw) throws KeeperException, InterruptedException {
if (createdNodeName == null) {
throw new IllegalStateException(
"Called determineLockOwnership() when ephemeralNodeName == null");
}
List<String> children = validateAndSortChildrenByLockPrefix(path, zooKeeper.getChildren(path));
if (null == children || !children.contains(createdEphemeralNode)) {
LOG.error("Expected ephemeral node {} to be in the list of children {}", createdEphemeralNode,
children);
throw new RuntimeException(
"Lock attempt ephemeral node no longer exist " + createdEphemeralNode);
}
if (children.get(0).equals(createdEphemeralNode)) {
LOG.debug("[{}] First candidate is my lock, acquiring...", vmLockPrefix);
if (!watchingParent) {
throw new IllegalStateException(
"Can not acquire lock, no longer watching parent : " + path);
}
this.lockWatcher = lw;
this.lockNodeName = createdEphemeralNode;
createdNodeName = null;
lockWasAcquired = true;
lw.acquiredLock();
} else {
LOG.debug("[{}] Lock held by another process with ephemeral node: {}", vmLockPrefix,
children.get(0));
String lowestPrevNode = findLowestPrevPrefix(children, createdEphemeralNode);
watchingNodeName = path + "/" + lowestPrevNode;
final String nodeToWatch = watchingNodeName;
LOG.debug("[{}] Establishing watch on prior node {}", vmLockPrefix, nodeToWatch);
Watcher priorNodeWatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (LOG.isTraceEnabled()) {
LOG.trace("[{}] Processing event:", vmLockPrefix);
LOG.trace("- type {}", event.getType());
LOG.trace("- path {}", event.getPath());
LOG.trace("- state {}", event.getState());
}
boolean renew = true;
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(nodeToWatch)) {
LOG.debug("[{}] Detected deletion of prior node {}, attempting to acquire lock",
vmLockPrefix, nodeToWatch);
synchronized (ZooLock.this) {
try {
if (createdNodeName != null) {
determineLockOwnership(createdEphemeralNode, lw);
} else if (LOG.isDebugEnabled()) {
LOG.debug("[{}] While waiting for another lock {}, {} was deleted", vmLockPrefix,
nodeToWatch, createdEphemeralNode);
}
} catch (Exception e) {
if (lockNodeName == null) {
// have not acquired lock yet
lw.failedToAcquireLock(e);
}
}
}
renew = false;
}
if (event.getState() == KeeperState.Expired
|| event.getState() == KeeperState.Disconnected) {
synchronized (ZooLock.this) {
if (lockNodeName == null) {
LOG.info("Zookeeper Session expired / disconnected");
lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
}
}
renew = false;
}
if (renew) {
LOG.debug("[{}] Renewing watch on prior node {}", vmLockPrefix, nodeToWatch);
try {
Stat restat = zooKeeper.getStatus(nodeToWatch, this);
if (restat == null) {
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
// created a Watcher on a node that does not exist. Delete the watcher we just
// created.
zooKeeper.getZooKeeper().removeWatches(nodeToWatch, this, WatcherType.Any, true);
determineLockOwnership(createdEphemeralNode, lw);
}
} catch (KeeperException | InterruptedException e) {
lw.failedToAcquireLock(new Exception("Failed to renew watch on other master node"));
}
}
}
};
Stat stat = zooKeeper.getStatus(nodeToWatch, priorNodeWatcher);
if (stat == null) {
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
// created a Watcher on a node that does not exist. Delete the watcher we just created.
zooKeeper.getZooKeeper().removeWatches(nodeToWatch, priorNodeWatcher, WatcherType.Any,
true);
determineLockOwnership(createdEphemeralNode, lw);
}
}
}
private void lostLock(LockLossReason reason) {
LockWatcher localLw = lockWatcher;
lockNodeName = null;
lockWatcher = null;
localLw.lostLock(reason);
}
public synchronized void lock(final AccumuloLockWatcher lw, byte[] data) {
if (lockWatcher != null || lockNodeName != null || createdNodeName != null) {
throw new IllegalStateException();
}
lockWasAcquired = false;
try {
final String lockPathPrefix = path + "/" + vmLockPrefix.toString();
// Implement recipe at https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Locks
// except that instead of the ephemeral lock node being of the form guid-lock- use lock-guid-.
// Another deviation from the recipe is that we cleanup any extraneous ephemeral nodes that
// were created.
final String createPath = zooKeeper.putEphemeralSequential(lockPathPrefix, data);
LOG.debug("[{}] Ephemeral node {} created", vmLockPrefix, createPath);
// It's possible that the call above was retried several times and multiple ephemeral nodes
// were created but the client missed the response for some reason. Find the ephemeral nodes
// with this ZLOCK_UUID and lowest sequential number.
List<String> children =
validateAndSortChildrenByLockPrefix(path, zooKeeper.getChildren(path));
if (null == children || !children.contains(createPath.substring(path.length() + 1))) {
LOG.error("Expected ephemeral node {} to be in the list of children {}", createPath,
children);
throw new RuntimeException("Lock attempt ephemeral node no longer exist " + createPath);
}
String lowestSequentialPath = null;
boolean msgLoggedOnce = false;
for (String child : children) {
if (child.startsWith(vmLockPrefix.toString())) {
if (null == lowestSequentialPath) {
if (createPath.equals(path + "/" + child)) {
// the path returned from create is the lowest sequential one
lowestSequentialPath = createPath;
break;
}
lowestSequentialPath = path + "/" + child;
LOG.debug("[{}] lowest sequential node found: {}", vmLockPrefix, lowestSequentialPath);
} else {
if (!msgLoggedOnce) {
LOG.info(
"[{}] Zookeeper client missed server response, multiple ephemeral child nodes created at {}",
vmLockPrefix, lockPathPrefix);
msgLoggedOnce = true;
}
LOG.debug("[{}] higher sequential node found: {}, deleting it", vmLockPrefix, child);
zooKeeper.delete(path + "/" + child);
}
}
}
final String pathForWatcher = lowestSequentialPath;
// Set a watcher on the lowest sequential node that we created, this handles the case
// where the node we created is deleted or if this client becomes disconnected.
LOG.debug("[{}] Setting watcher on {}", vmLockPrefix, pathForWatcher);
Watcher watcherForNodeWeCreated = new Watcher() {
private void failedToAcquireLock() {
LOG.debug("[{}] Lock deleted before acquired, setting createdNodeName {} to null",
vmLockPrefix, createdNodeName);
lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
createdNodeName = null;
}
@Override
public void process(WatchedEvent event) {
synchronized (ZooLock.this) {
if (lockNodeName != null && event.getType() == EventType.NodeDeleted
&& event.getPath().equals(path + "/" + lockNodeName)) {
LOG.debug("[{}] {} was deleted", vmLockPrefix, lockNodeName);
lostLock(LockLossReason.LOCK_DELETED);
} else if (createdNodeName != null && event.getType() == EventType.NodeDeleted
&& event.getPath().equals(path + "/" + createdNodeName)) {
LOG.debug("[{}] {} was deleted", vmLockPrefix, createdNodeName);
failedToAcquireLock();
} else if (event.getState() != KeeperState.Disconnected
&& event.getState() != KeeperState.Expired
&& (lockNodeName != null || createdNodeName != null)) {
LOG.debug("Unexpected event watching lock node {} {}", event, pathForWatcher);
try {
Stat stat2 = zooKeeper.getStatus(pathForWatcher, this);
if (stat2 == null) {
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
// created a Watcher on a node that does not exist. Delete the watcher we just
// created.
zooKeeper.getZooKeeper().removeWatches(pathForWatcher, this, WatcherType.Any,
true);
if (lockNodeName != null)
lostLock(LockLossReason.LOCK_DELETED);
else if (createdNodeName != null)
failedToAcquireLock();
}
} catch (Exception e) {
lockWatcher.unableToMonitorLockNode(e);
LOG.error("Failed to stat lock node: {} ", pathForWatcher, e);
}
}
}
}
};
Stat stat = zooKeeper.getStatus(pathForWatcher, watcherForNodeWeCreated);
if (stat == null) {
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
// created a Watcher on a node that does not exist. Delete the watcher we just created.
zooKeeper.getZooKeeper().removeWatches(pathForWatcher, watcherForNodeWeCreated,
WatcherType.Any, true);
lw.failedToAcquireLock(new Exception("Lock does not exist after create"));
return;
}
createdNodeName = pathForWatcher.substring(path.length() + 1);
// We have created a node, do we own the lock?
determineLockOwnership(createdNodeName, lw);
} catch (KeeperException | InterruptedException e) {
lw.failedToAcquireLock(e);
}
}
public synchronized boolean tryToCancelAsyncLockOrUnlock()
throws InterruptedException, KeeperException {
boolean del = false;
if (createdNodeName != null) {
String pathToDelete = path + "/" + createdNodeName;
LOG.debug("[{}] Deleting all at path {} due to lock cancellation", vmLockPrefix,
pathToDelete);
zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
del = true;
}
if (lockNodeName != null) {
unlock();
del = true;
}
return del;
}
public synchronized void unlock() throws InterruptedException, KeeperException {
if (lockNodeName == null) {
throw new IllegalStateException();
}
LockWatcher localLw = lockWatcher;
String localLock = lockNodeName;
lockNodeName = null;
lockWatcher = null;
final String pathToDelete = path + "/" + localLock;
LOG.debug("[{}] Deleting all at path {} due to unlock", vmLockPrefix, pathToDelete);
zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
localLw.lostLock(LockLossReason.LOCK_DELETED);
}
/**
* @return path of node that this lock is watching
*/
public synchronized String getWatching() {
return watchingNodeName;
}
public synchronized String getLockPath() {
if (lockNodeName == null) {
return null;
}
return path + "/" + lockNodeName;
}
public synchronized String getLockName() {
return lockNodeName;
}
public synchronized LockID getLockID() {
if (lockNodeName == null) {
throw new IllegalStateException("Lock not held");
}
return new LockID(path, lockNodeName, zooKeeper.getZooKeeper().getSessionId());
}
/**
* indicates if the lock was acquired in the past.... helps discriminate between the case where
* the lock was never held, or held and lost....
*
* @return true if the lock was acquired, otherwise false.
*/
public synchronized boolean wasLockAcquired() {
return lockWasAcquired;
}
public synchronized boolean isLocked() {
return lockNodeName != null;
}
public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException {
if (getLockPath() != null)
zooKeeper.getZooKeeper().setData(getLockPath(), b, -1);
}
@Override
public synchronized void process(WatchedEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("event {} {} {}", event.getPath(), event.getType(), event.getState());
}
watchingParent = false;
if (event.getState() == KeeperState.Expired && lockNodeName != null) {
lostLock(LockLossReason.SESSION_EXPIRED);
} else {
try { // set the watch on the parent node again
zooKeeper.getStatus(path, this);
watchingParent = true;
} catch (KeeperException.ConnectionLossException ex) {
// we can't look at the lock because we aren't connected, but our session is still good
LOG.warn("lost connection to zookeeper", ex);
} catch (Exception ex) {
if (lockNodeName != null || createdNodeName != null) {
lockWatcher.unableToMonitorLockNode(ex);
LOG.error("Error resetting watch on ZooLock {} {}",
lockNodeName != null ? lockNodeName : createdNodeName, event, ex);
}
}
}
}
public static boolean isLockHeld(ZooCache zc, LockID lid) {
List<String> children = validateAndSortChildrenByLockPrefix(lid.path, zc.getChildren(lid.path));
if (children == null || children.isEmpty()) {
return false;
}
String lockNode = children.get(0);
if (!lid.node.equals(lockNode))
return false;
ZcStat stat = new ZcStat();
return zc.get(lid.path + "/" + lid.node, stat) != null && stat.getEphemeralOwner() == lid.eid;
}
public static byte[] getLockData(ZooKeeper zk, String path)
throws KeeperException, InterruptedException {
List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path, false));
if (children == null || children.isEmpty()) {
return null;
}
String lockNode = children.get(0);
return zk.getData(path + "/" + lockNode, false, null);
}
public static byte[] getLockData(org.apache.accumulo.fate.zookeeper.ZooCache zc, String path,
ZcStat stat) {
List<String> children = validateAndSortChildrenByLockPrefix(path, zc.getChildren(path));
if (children == null || children.isEmpty()) {
return null;
}
String lockNode = children.get(0);
if (!lockNode.startsWith(ZLOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
return zc.get(path + "/" + lockNode, stat);
}
public static long getSessionId(ZooCache zc, String path) {
List<String> children = validateAndSortChildrenByLockPrefix(path, zc.getChildren(path));
if (children == null || children.isEmpty()) {
return 0;
}
String lockNode = children.get(0);
ZcStat stat = new ZcStat();
if (zc.get(path + "/" + lockNode, stat) != null)
return stat.getEphemeralOwner();
return 0;
}
public long getSessionId() throws KeeperException, InterruptedException {
return getSessionId(LOCK_DATA_ZOO_CACHE, path);
}
public static void deleteLock(ZooReaderWriter zk, String path)
throws InterruptedException, KeeperException {
List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path));
if (children == null || children.isEmpty()) {
throw new IllegalStateException("No lock is held at " + path);
}
String lockNode = children.get(0);
if (!lockNode.startsWith(ZLOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
String pathToDelete = path + "/" + lockNode;
LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
zk.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
}
public static boolean deleteLock(ZooReaderWriter zk, String path, String lockData)
throws InterruptedException, KeeperException {
List<String> children = validateAndSortChildrenByLockPrefix(path, zk.getChildren(path));
if (children == null || children.isEmpty()) {
throw new IllegalStateException("No lock is held at " + path);
}
String lockNode = children.get(0);
if (!lockNode.startsWith(ZLOCK_PREFIX)) {
throw new RuntimeException("Node " + lockNode + " at " + path + " is not a lock node");
}
byte[] data = zk.getData(path + "/" + lockNode);
if (lockData.equals(new String(data, UTF_8))) {
String pathToDelete = path + "/" + lockNode;
LOG.debug("Deleting all at path {} due to lock deletion", pathToDelete);
zk.recursiveDelete(pathToDelete, NodeMissingPolicy.FAIL);
return true;
}
return false;
}
}