blob: c45db7a5e0f89b5f069bf854de3d2024da8e4100 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal.zookeeper;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
/**
* A reentrant distributed lock implementation that uses ZooKeeper. It uses the recipe described in
*
* http://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Locks
*/
public class ReentrantDistributedLock implements Lock {
private static final Logger LOG = LoggerFactory.getLogger(ReentrantDistributedLock.class);
private final ZKClient zkClient;
private final String path;
private final ThreadLocal<String> localLockNode;
private final ReentrantLock lock;
/**
* Creates a distributed lock instance.
*
* @param zkClient the {@link ZKClient} to interact with the ZooKeeper used for the lock coordination
* @param path the path in ZooKeeper where the lock coordination happens
*/
public ReentrantDistributedLock(ZKClient zkClient, String path) {
this.zkClient = zkClient;
this.path = path.startsWith("/") ? path : "/" + path;
this.localLockNode = new ThreadLocal<String>();
this.lock = new ReentrantLock();
}
@Override
public void lock() {
lock.lock();
try {
acquire(false, true);
} catch (Exception e) {
lock.unlock();
throw Throwables.propagate(e);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lock.lockInterruptibly();
try {
acquire(true, true);
} catch (Exception e) {
lock.unlock();
Throwables.propagateIfInstanceOf(e, InterruptedException.class);
throw Throwables.propagate(e);
}
}
@Override
public boolean tryLock() {
if (!lock.tryLock()) {
return false;
}
try {
if (acquire(false, false)) {
return true;
}
lock.unlock();
return false;
} catch (Exception e) {
lock.unlock();
throw Throwables.propagate(e);
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long startTime = System.nanoTime();
if (!lock.tryLock(time, unit)) {
return false;
}
long timeoutNano = unit.toNanos(time) - (System.nanoTime() - startTime);
try {
if (acquire(true, true, timeoutNano, TimeUnit.NANOSECONDS)) {
return true;
}
lock.unlock();
return false;
} catch (ExecutionException e) {
lock.unlock();
throw Throwables.propagate(e.getCause());
} catch (TimeoutException e) {
lock.unlock();
return false;
}
}
@Override
public void unlock() {
if (!lock.isHeldByCurrentThread()) {
throw new IllegalStateException("Cannot unlock without holding a lock by thread " + Thread.currentThread());
}
try {
if (lock.getHoldCount() == 1) {
// If it is the last lock entry for this thread, remove the zk node as well.
try {
Uninterruptibles.getUninterruptibly(zkClient.delete(localLockNode.get()));
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
} finally {
localLockNode.remove();
}
}
} finally {
lock.unlock();
}
}
/**
* Currently not supported and will always throw {@link UnsupportedOperationException}.
*/
@Override
public Condition newCondition() {
// TODO: Add the support of it when needed
throw new UnsupportedOperationException("Condition not supported.");
}
/**
* Acquires a distributed lock through ZooKeeper. It's the same as calling
* {@link #acquire(boolean, boolean, long, TimeUnit)} with {@link Long#MAX_VALUE} as timeout.
*/
private boolean acquire(boolean interruptible, boolean waitForLock) throws InterruptedException, ExecutionException {
try {
return acquire(interruptible, waitForLock, Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// Should never happen
throw Throwables.propagate(e);
}
}
/**
* Acquires a distributed lock through ZooKeeper.
*
* @param interruptible true if acquisition of lock can be interrupted
* @param waitForLock true if wants to wait for the lock when not able to acquire it
* @param timeout time to wait for the lock before giving up
* @param unit unit for the timeout
* @throws InterruptedException if {@code interruptible} is set to {@code true} and the current thread is interrupted
* while acquiring the lock
* @throws ExecutionException if there is failure while trying to acquire the lock
*/
private boolean acquire(boolean interruptible,
final boolean waitForLock,
long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
Preconditions.checkState(lock.isHeldByCurrentThread(), "Not owner of local lock.");
if (lock.getHoldCount() > 1) {
// Already owner of the lock, simply return.
return true;
}
// Use a Future to help deal with different variants of locking
// (lock, lockInterruptibly, tryLock, tryLock with timeout)
// When the completion future is completed successfully, it means the lock is acquired and the future contains
// the ZK node path to the ephemeral node that is representing this lock.
// If it is failed, it means there is exception while trying to acquire the lock
// If it is cancelled, it means to abort the acquisition logic (due to timeout / interrupt).
final SettableFuture<String> completion = SettableFuture.create();
// If the connection expired, fail the locking process if it is still in progress
final Cancellable watcherCancellable = zkClient.addConnectionWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
completion.setException(new IllegalStateException("ZK session expired"));
}
}
});
// Always remove the watcher on completion
completion.addListener(new Runnable() {
@Override
public void run() {
watcherCancellable.cancel();
}
}, Threads.SAME_THREAD_EXECUTOR);
// Step 1. Create a ephemeral sequential node
final String guid = UUID.randomUUID().toString();
final String lockPath = String.format("%s/%s-", path, guid);
OperationFuture<String> future = zkClient.create(lockPath, null, CreateMode.EPHEMERAL_SEQUENTIAL, true);
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(final String lockNode) {
// If lock failed due to whatever reason, delete the lock node.
deleteNodeOnFailure(completion, lockNode);
// If the lock is completed (mainly due to cancellation), simply abort the lock acquisition logic.
if (completion.isDone()) {
return;
}
// Step 2-5. Try to determine who is the lock owner and watch for ZK node changes if itself is not the owner.
doAcquire(completion, waitForLock, guid, lockNode);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof KeeperException.ConnectionLossException) {
// Ignore connection exception in create. Going to handle it in next step.
// See the ZK receipt for details about the possible failure situation that can cause this.
doAcquire(completion, waitForLock, guid, null);
} else {
LOG.error("Exception raised when creating lock node at {}", lockPath, t);
completion.setException(t);
}
}
});
// Gets the result from the completion
try {
if (interruptible) {
localLockNode.set(completion.get(timeout, unit));
} else {
localLockNode.set(Uninterruptibles.getUninterruptibly(completion, timeout, unit));
}
return true;
} catch (InterruptedException e) {
completion.cancel(true);
throw e;
} catch (TimeoutException e) {
completion.cancel(true);
throw e;
} catch (CancellationException e) {
// If the completion get cancelled, meaning the lock acquisition is aborted.
return false;
}
}
/**
* Executes the lock acquisition process. This corresponds to step 2-5 of the distributed lock receipt.
*/
private void doAcquire(final SettableFuture<String> completion, final boolean waitForLock,
final String guid, @Nullable final String lockPath) {
// Step 2. Get all children under the lock parent.
Futures.addCallback(zkClient.getChildren(path), new FutureCallback<NodeChildren>() {
@Override
public void onSuccess(NodeChildren children) {
// Find the lock node in case the creation step failed by matching the guid
// See "Recoverable Errors and the GUID" in the ZooKeeper guide
final String lockNode = lockPath == null ? findLockNode(children.getChildren(), guid) : lockPath;
if (lockNode == null) {
// If not able to find the lock node, fail the locking procedure.
completion.setException(new IllegalStateException("Failed to acquire lock").fillInStackTrace());
return;
}
if (lockPath == null) {
// If lock node was not determined in step 1 due to connection loss exception, need to add the
// node deletion handler in here after the actual lockNode is determined.
deleteNodeOnFailure(completion, lockNode);
}
// Find the node to watch, which is the one with the largest id that is smaller than currentId
// If the current id is the smallest one, nodeToWatch will be null
final String nodeToWatch = findNodeToWatch(children, lockNode, guid);
// Step 3a. lockNode is the lowest, hence this become lock owner.
if (nodeToWatch == null) {
// Acquired lock
completion.set(lockNode);
} else if (!waitForLock) {
// This is for the case of tryLock() without timeout.
completion.cancel(true);
}
// If the lock acquisition is completed, due to whatever reason, we don't need to watch for any other nodes
if (completion.isDone()) {
return;
}
// Step 3b and 4. See if the the next lowest sequence ID exists. If it does, leave a watch
// Use getData() instead of exists() to avoid leaking Watcher resources (if the node is gone, there will
// be a watch left on the ZK server if exists() is used).
OperationFuture<NodeData> getDataFuture = zkClient.getData(nodeToWatch, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (!completion.isDone()) {
// If the watching node changed, go to step 2.
doAcquire(completion, waitForLock, guid, lockNode);
}
}
});
// Step 5. Depends on the exists call result, either go to step 2 if the nodeToWatch is gone or just
// let the watcher to trigger step 2 when there is change to the nodeToWatch.
Futures.addCallback(getDataFuture, new FutureCallback<NodeData>() {
@Override
public void onSuccess(NodeData nodeData) {
// No-op
}
@Override
public void onFailure(Throwable t) {
// See if the failure is due to node not exists. If that's the case, go to step 2.
if (t instanceof KeeperException.NoNodeException && !completion.isDone()) {
doAcquire(completion, waitForLock, guid, lockNode);
} else {
// If failed due to something else, fail the lock acquisition.
completion.setException(t);
}
}
});
}
@Override
public void onFailure(Throwable t) {
if (lockPath != null) {
completion.setException(t);
} else {
doAcquire(completion, waitForLock, guid, null);
}
}
});
}
/**
* Deletes the given node if the given future failed.
*/
private void deleteNodeOnFailure(final ListenableFuture<?> future, final String node) {
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
zkClient.delete(node);
}
}
}, Threads.SAME_THREAD_EXECUTOR);
}
/**
* Find the maximum node that is smaller than node of the given lockPath.
*
* @return the node found or {@code null} if no such node exist
*/
private String findNodeToWatch(NodeChildren children, String lockPath, String guid) {
// Lock path is "path/guid-id"
int guidLen = guid.length();
int id = Integer.parseInt(lockPath.substring(path.length() + guidLen + 2));
String nodeToWatch = null;
int maxOfMins = Integer.MIN_VALUE;
for (String node : children.getChildren()) {
int nodeId = Integer.parseInt(node.substring(guidLen + 1));
if (nodeId < id && nodeId > maxOfMins) {
maxOfMins = nodeId;
nodeToWatch = path + "/" + node;
}
}
return nodeToWatch;
}
/**
* Finds the node path that matches the given prefix.
*/
private String findLockNode(Iterable<String> nodes, String prefix) {
for (String child : nodes) {
if (child.startsWith(prefix)) {
return path + "/" + child;
}
}
return null;
}
}