| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.distributedlog.lock; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Stopwatch; |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URLDecoder; |
| import java.net.URLEncoder; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Function; |
| import org.apache.distributedlog.DistributedLogConstants; |
| import org.apache.distributedlog.util.FailpointUtils; |
| import org.apache.distributedlog.exceptions.LockingException; |
| import org.apache.distributedlog.ZooKeeperClient; |
| import org.apache.distributedlog.exceptions.DLInterruptedException; |
| import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; |
| import org.apache.distributedlog.exceptions.UnexpectedException; |
| import org.apache.distributedlog.exceptions.ZKException; |
| import org.apache.distributedlog.common.stats.OpStatsListener; |
| import org.apache.distributedlog.common.concurrent.FutureEventListener; |
| import org.apache.distributedlog.common.concurrent.FutureUtils; |
| import org.apache.distributedlog.util.OrderedScheduler; |
| import org.apache.bookkeeper.stats.Counter; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.OpStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.util.SafeRunnable; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.zookeeper.AsyncCallback; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A lock under a given zookeeper session. This is a one-time lock. |
| * It is not reusable: if lock failed, if zookeeper session is expired, if #unlock is called, |
| * it would be transitioned to expired or closed state. |
| * |
| * The Locking Procedure is described as below. |
| * |
| * <p> |
| * 0. if it is an immediate lock, it would get lock waiters first. if the lock is already held |
| * by someone. it would fail immediately with {@link org.apache.distributedlog.exceptions.OwnershipAcquireFailedException} |
| * with current owner. if there is no lock waiters, it would start locking procedure from 1. |
| * 1. prepare: create a sequential znode to identify the lock. |
| * 2. check lock waiters: get all lock waiters to check after prepare. if it is the first waiter, |
| * claim the ownership; if it is not the first waiter, but first waiter was itself (same client id and same session id) |
| * claim the ownership too; otherwise, it would set watcher on its sibling and wait it to disappared. |
| * </p> |
| * |
| * <pre> |
| * +-----------------+ |
| * | INIT | ------------------------------+ |
| * +--------+--------+ | |
| * | | |
| * | | |
| * +--------v--------+ | |
| * | PREPARING |----------------------------+ | |
| * +--------+--------+ | | |
| * | | | |
| * | | | |
| * +--------v--------+ | | |
| * +-------------| PREPARED |--------------+ | | |
| * | +-----^---------+-+ | | | |
| * | | | | | | | |
| * | | | | | | | |
| * | | | | | | | |
| * +------V-----------+ | | | +--------v----------+ | | |
| * | WAITING |-------+ | | | CLAIMED | | | |
| * +------+-----+-----+ | | +--+----------+-----+ | | |
| * | | | | | | | | |
| * | | | | | | | | |
| * | | | | | | | | |
| * | | | +-v----------v----+ | | | |
| * | +-------------------->| EXPIRED | | | | |
| * | | +--+--------------+ | | | |
| * | | | | | | |
| * | | | | | | |
| * | +--------V-------V-+ | | | |
| * +------------>| CLOSING |<---------------+----------+--+ |
| * +------------------+ |
| * | |
| * | |
| * | |
| * +--------V---------+ |
| * | CLOSED | |
| * +------------------+ |
| * </pre> |
| * |
| * <h3>Metrics</h3> |
| * <ul> |
| * <li>tryAcquire: opstats. latency spent on try locking operations. it includes timeouts. |
| * <li>tryTimeouts: counter. the number of timeouts on try locking operations |
| * <li>unlock: opstats. latency spent on unlock operations. |
| * </ul> |
| */ |
| class ZKSessionLock implements SessionLock { |
| |
| static final Logger LOG = LoggerFactory.getLogger(ZKSessionLock.class); |
| |
| private static final String LOCK_PATH_PREFIX = "/member_"; |
| private static final String LOCK_PART_SEP = "_"; |
| |
| public static String getLockPathPrefixV1(String lockPath) { |
| // member_ |
| return lockPath + LOCK_PATH_PREFIX; |
| } |
| |
| public static String getLockPathPrefixV2(String lockPath, String clientId) throws UnsupportedEncodingException { |
| // member_<clientid>_ |
| return lockPath + LOCK_PATH_PREFIX + URLEncoder.encode(clientId, UTF_8.name()) + LOCK_PART_SEP; |
| } |
| |
| public static String getLockPathPrefixV3(String lockPath, String clientId, long sessionOwner) throws UnsupportedEncodingException { |
| // member_<clientid>_s<owner_session>_ |
| StringBuilder sb = new StringBuilder(); |
| sb.append(lockPath).append(LOCK_PATH_PREFIX).append(URLEncoder.encode(clientId, UTF_8.name())).append(LOCK_PART_SEP) |
| .append("s").append(String.format("%10d", sessionOwner)).append(LOCK_PART_SEP); |
| return sb.toString(); |
| } |
| |
| public static byte[] serializeClientId(String clientId) { |
| return clientId.getBytes(UTF_8); |
| } |
| |
| public static String deserializeClientId(byte[] data) { |
| return new String(data, UTF_8); |
| } |
| |
| public static String getLockIdFromPath(String path) { |
| // We only care about our actual id since we want to compare ourselves to siblings. |
| if (path.contains("/")) { |
| return path.substring(path.lastIndexOf("/") + 1); |
| } else { |
| return path; |
| } |
| } |
| |
| static final Comparator<String> MEMBER_COMPARATOR = new Comparator<String>() { |
| public int compare(String o1, String o2) { |
| int l1 = parseMemberID(o1); |
| int l2 = parseMemberID(o2); |
| return l1 - l2; |
| } |
| }; |
| |
| static enum State { |
| INIT, // initialized state |
| PREPARING, // preparing to lock, but no lock node created |
| PREPARED, // lock node created |
| CLAIMED, // claim lock ownership |
| WAITING, // waiting for the ownership |
| EXPIRED, // lock is expired |
| CLOSING, // lock is being closed |
| CLOSED, // lock is closed |
| } |
| |
| /** |
| * Convenience class for state management. Provide debuggability features by tracing unxpected state |
| * transitions. |
| */ |
| static class StateManagement { |
| |
| static final Logger LOG = LoggerFactory.getLogger(StateManagement.class); |
| |
| private volatile State state; |
| |
| StateManagement() { |
| this.state = State.INIT; |
| } |
| |
| public void transition(State toState) { |
| if (!validTransition(toState)) { |
| LOG.error("Invalid state transition from {} to {} ", |
| new Object[] { this.state, toState, getStack() }); |
| } |
| this.state = toState; |
| } |
| |
| private boolean validTransition(State toState) { |
| switch (toState) { |
| case INIT: |
| return false; |
| case PREPARING: |
| return inState(State.INIT); |
| case PREPARED: |
| return inState(State.PREPARING) || inState(State.WAITING); |
| case CLAIMED: |
| return inState(State.PREPARED); |
| case WAITING: |
| return inState(State.PREPARED); |
| case EXPIRED: |
| return isTryingOrClaimed(); |
| case CLOSING: |
| return !inState(State.CLOSED); |
| case CLOSED: |
| return inState(State.CLOSING) || inState(State.CLOSED); |
| default: |
| return false; |
| } |
| } |
| |
| private State getState() { |
| return state; |
| } |
| |
| private boolean isTryingOrClaimed() { |
| return inState(State.PREPARING) || inState(State.PREPARED) || |
| inState(State.WAITING) || inState(State.CLAIMED); |
| } |
| |
| public boolean isExpiredOrClosing() { |
| return inState(State.CLOSED) || inState(State.EXPIRED) || inState(State.CLOSING); |
| } |
| |
| public boolean isExpiredOrClosed() { |
| return inState(State.CLOSED) || inState(State.EXPIRED); |
| } |
| |
| public boolean isClosed() { |
| return inState(State.CLOSED); |
| } |
| |
| private boolean inState(final State state) { |
| return state == this.state; |
| } |
| |
| private Exception getStack() { |
| return new Exception(); |
| } |
| } |
| |
| private final ZooKeeperClient zkClient; |
| private final ZooKeeper zk; |
| private final String lockPath; |
| // Identify a unique lock |
| private final Pair<String, Long> lockId; |
| private StateManagement lockState; |
| private final DistributedLockContext lockContext; |
| |
| private final CompletableFuture<Boolean> acquireFuture; |
| private String currentId; |
| private String currentNode; |
| private String watchedNode; |
| private LockWatcher watcher; |
| private final AtomicInteger epoch = new AtomicInteger(0); |
| private final OrderedScheduler lockStateExecutor; |
| private LockListener lockListener = null; |
| private final long lockOpTimeout; |
| |
| private final OpStatsLogger tryStats; |
| private final Counter tryTimeouts; |
| private final OpStatsLogger unlockStats; |
| |
| ZKSessionLock(ZooKeeperClient zkClient, |
| String lockPath, |
| String clientId, |
| OrderedScheduler lockStateExecutor) |
| throws IOException { |
| this(zkClient, |
| lockPath, |
| clientId, |
| lockStateExecutor, |
| DistributedLogConstants.LOCK_OP_TIMEOUT_DEFAULT * 1000, NullStatsLogger.INSTANCE, |
| new DistributedLockContext()); |
| } |
| |
| /** |
| * Creates a distributed lock using the given {@code zkClient} to coordinate locking. |
| * |
| * @param zkClient The ZooKeeper client to use. |
| * @param lockPath The path used to manage the lock under. |
| * @param clientId client id use for lock. |
| * @param lockStateExecutor executor to execute all lock state changes. |
| * @param lockOpTimeout timeout of lock operations |
| * @param statsLogger stats logger |
| */ |
| public ZKSessionLock(ZooKeeperClient zkClient, |
| String lockPath, |
| String clientId, |
| OrderedScheduler lockStateExecutor, |
| long lockOpTimeout, |
| StatsLogger statsLogger, |
| DistributedLockContext lockContext) |
| throws IOException { |
| this.zkClient = zkClient; |
| try { |
| this.zk = zkClient.get(); |
| } catch (ZooKeeperClient.ZooKeeperConnectionException zce) { |
| throw new ZKException("Failed to get zookeeper client for lock " + lockPath, |
| KeeperException.Code.CONNECTIONLOSS); |
| } catch (InterruptedException e) { |
| throw new DLInterruptedException("Interrupted on getting zookeeper client for lock " + lockPath, e); |
| } |
| this.lockPath = lockPath; |
| this.lockId = Pair.of(clientId, this.zk.getSessionId()); |
| this.lockContext = lockContext; |
| this.lockStateExecutor = lockStateExecutor; |
| this.lockState = new StateManagement(); |
| this.lockOpTimeout = lockOpTimeout; |
| |
| this.tryStats = statsLogger.getOpStatsLogger("tryAcquire"); |
| this.tryTimeouts = statsLogger.getCounter("tryTimeouts"); |
| this.unlockStats = statsLogger.getOpStatsLogger("unlock"); |
| |
| // Attach interrupt handler to acquire future so clients can abort the future. |
| this.acquireFuture = FutureUtils.createFuture(); |
| this.acquireFuture.whenComplete((value, cause) -> { |
| if (null != cause) { |
| // This will set the lock state to closed, and begin to cleanup the zk lock node. |
| // We have to be careful not to block here since doing so blocks the ordered lock |
| // state executor which can cause deadlocks depending on how futures are chained. |
| ZKSessionLock.this.asyncUnlock(cause); |
| // Note re. logging and exceptions: errors are already logged by unlockAsync. |
| } |
| }); |
| } |
| |
| @Override |
| public ZKSessionLock setLockListener(LockListener lockListener) { |
| this.lockListener = lockListener; |
| return this; |
| } |
| |
| String getLockPath() { |
| return this.lockPath; |
| } |
| |
| @VisibleForTesting |
| AtomicInteger getEpoch() { |
| return epoch; |
| } |
| |
| @VisibleForTesting |
| State getLockState() { |
| return lockState.getState(); |
| } |
| |
| @VisibleForTesting |
| Pair<String, Long> getLockId() { |
| return lockId; |
| } |
| |
| public boolean isLockExpired() { |
| return lockState.isExpiredOrClosing(); |
| } |
| |
| @Override |
| public boolean isLockHeld() { |
| return lockState.inState(State.CLAIMED); |
| } |
| |
| /** |
| * Execute a lock action of a given <i>lockEpoch</i> in ordered safe way. |
| * |
| * @param lockEpoch |
| * lock epoch |
| * @param func |
| * function to execute a lock action |
| */ |
| protected void executeLockAction(final int lockEpoch, final LockAction func) { |
| lockStateExecutor.submit(lockPath, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| if (ZKSessionLock.this.epoch.get() == lockEpoch) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} executing lock action '{}' under epoch {} for lock {}", |
| new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); |
| } |
| func.execute(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} executed lock action '{}' under epoch {} for lock {}", |
| new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); |
| } |
| } else { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} skipped executing lock action '{}' for lock {}, since epoch is changed from {} to {}.", |
| new Object[]{lockId, func.getActionName(), lockPath, lockEpoch, ZKSessionLock.this.epoch.get()}); |
| } |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Execute a lock action of a given <i>lockEpoch</i> in ordered safe way. If the lock action couln't be |
| * executed due to epoch changed, fail the given <i>promise</i> with |
| * {@link EpochChangedException} |
| * |
| * @param lockEpoch |
| * lock epoch |
| * @param func |
| * function to execute a lock action |
| * @param promise |
| * promise |
| */ |
| protected <T> void executeLockAction(final int lockEpoch, final LockAction func, final CompletableFuture<T> promise) { |
| lockStateExecutor.submit(lockPath, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| int currentEpoch = ZKSessionLock.this.epoch.get(); |
| if (currentEpoch == lockEpoch) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} executed lock action '{}' under epoch {} for lock {}", |
| new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); |
| } |
| func.execute(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} executed lock action '{}' under epoch {} for lock {}", |
| new Object[]{lockId, func.getActionName(), lockEpoch, lockPath}); |
| } |
| } else { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} skipped executing lock action '{}' for lock {}, since epoch is changed from {} to {}.", |
| new Object[]{lockId, func.getActionName(), lockPath, lockEpoch, currentEpoch}); |
| } |
| promise.completeExceptionally(new EpochChangedException(lockPath, lockEpoch, currentEpoch)); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Parse member id generated by zookeeper from given <i>nodeName</i> |
| * |
| * @param nodeName |
| * lock node name |
| * @return member id generated by zookeeper |
| */ |
| static int parseMemberID(String nodeName) { |
| int id = -1; |
| String[] parts = nodeName.split("_"); |
| if (parts.length > 0) { |
| try { |
| id = Integer.parseInt(parts[parts.length - 1]); |
| } catch (NumberFormatException nfe) { |
| // make it to be MAX_VALUE, so the bad znode will never acquire the lock |
| id = Integer.MAX_VALUE; |
| } |
| } |
| return id; |
| } |
| |
| static boolean areLockWaitersInSameSession(String node1, String node2) { |
| String[] parts1 = node1.split("_"); |
| String[] parts2 = node2.split("_"); |
| if (parts1.length != 4 || parts2.length != 4) { |
| return node1.equals(node2); |
| } |
| if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) { |
| return node1.equals(node2); |
| } |
| long sessionOwner1 = Long.parseLong(parts1[2].substring(1)); |
| long sessionOwner2 = Long.parseLong(parts2[2].substring(1)); |
| if (sessionOwner1 != sessionOwner2) { |
| return false; |
| } |
| String clientId1, clientId2; |
| try { |
| clientId1 = URLDecoder.decode(parts1[1], UTF_8.name()); |
| clientId2 = URLDecoder.decode(parts2[1], UTF_8.name()); |
| return clientId1.equals(clientId2); |
| } catch (UnsupportedEncodingException e) { |
| // if failed to parse client id, we have to get client id by zookeeper#getData. |
| return node1.equals(node2); |
| } |
| } |
| |
| /** |
| * Get client id and its ephemeral owner. |
| * |
| * @param zkClient |
| * zookeeper client |
| * @param lockPath |
| * lock path |
| * @param nodeName |
| * node name |
| * @return client id and its ephemeral owner. |
| */ |
| static CompletableFuture<Pair<String, Long>> asyncParseClientID(ZooKeeper zkClient, String lockPath, String nodeName) { |
| String[] parts = nodeName.split("_"); |
| // member_<clientid>_s<owner_session>_ |
| if (4 == parts.length && parts[2].startsWith("s")) { |
| long sessionOwner = Long.parseLong(parts[2].substring(1)); |
| String clientId; |
| try { |
| clientId = URLDecoder.decode(parts[1], UTF_8.name()); |
| return FutureUtils.value(Pair.of(clientId, sessionOwner)); |
| } catch (UnsupportedEncodingException e) { |
| // if failed to parse client id, we have to get client id by zookeeper#getData. |
| } |
| } |
| final CompletableFuture<Pair<String, Long>> promise = new CompletableFuture<Pair<String, Long>>(); |
| zkClient.getData(lockPath + "/" + nodeName, false, new AsyncCallback.DataCallback() { |
| @Override |
| public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { |
| if (KeeperException.Code.OK.intValue() != rc) { |
| promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); |
| } else { |
| promise.complete(Pair.of(deserializeClientId(data), stat.getEphemeralOwner())); |
| } |
| } |
| }, null); |
| return promise; |
| } |
| |
| @Override |
| public CompletableFuture<LockWaiter> asyncTryLock(final long timeout, final TimeUnit unit) { |
| final CompletableFuture<String> result = new CompletableFuture<String>(); |
| final boolean wait = DistributedLogConstants.LOCK_IMMEDIATE != timeout; |
| if (wait) { |
| asyncTryLock(wait, result); |
| } else { |
| // try to check locks first |
| zk.getChildren(lockPath, null, new AsyncCallback.Children2Callback() { |
| @Override |
| public void processResult(final int rc, String path, Object ctx, |
| final List<String> children, Stat stat) { |
| lockStateExecutor.submit(lockPath, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| if (!lockState.inState(State.INIT)) { |
| result.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); |
| return; |
| } |
| if (KeeperException.Code.OK.intValue() != rc) { |
| result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); |
| return; |
| } |
| |
| FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockTryAcquire); |
| |
| Collections.sort(children, MEMBER_COMPARATOR); |
| if (children.size() > 0) { |
| asyncParseClientID(zk, lockPath, children.get(0)).whenCompleteAsync( |
| new FutureEventListener<Pair<String, Long>>() { |
| @Override |
| public void onSuccess(Pair<String, Long> owner) { |
| if (!checkOrClaimLockOwner(owner, result)) { |
| acquireFuture.complete(false); |
| } |
| } |
| |
| @Override |
| public void onFailure(final Throwable cause) { |
| result.completeExceptionally(cause); |
| } |
| }, lockStateExecutor.chooseExecutor(lockPath)); |
| } else { |
| asyncTryLock(wait, result); |
| } |
| } |
| }); |
| } |
| }, null); |
| } |
| |
| final CompletableFuture<Boolean> waiterAcquireFuture = FutureUtils.createFuture(); |
| waiterAcquireFuture.whenComplete((value, cause) -> acquireFuture.completeExceptionally(cause)); |
| return result.thenApply(new Function<String, LockWaiter>() { |
| @Override |
| public LockWaiter apply(final String currentOwner) { |
| final Exception acquireException = new OwnershipAcquireFailedException(lockPath, currentOwner); |
| FutureUtils.within( |
| acquireFuture, |
| timeout, |
| unit, |
| acquireException, |
| lockStateExecutor, |
| lockPath |
| ).whenComplete(new FutureEventListener<Boolean>() { |
| |
| @Override |
| public void onSuccess(Boolean acquired) { |
| completeOrFail(acquireException); |
| } |
| |
| @Override |
| public void onFailure(final Throwable acquireCause) { |
| completeOrFail(acquireException); |
| } |
| |
| private void completeOrFail(final Throwable acquireCause) { |
| if (isLockHeld()) { |
| waiterAcquireFuture.complete(true); |
| } else { |
| asyncUnlock().whenComplete(new FutureEventListener<Void>() { |
| @Override |
| public void onSuccess(Void value) { |
| waiterAcquireFuture.completeExceptionally(acquireCause); |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| waiterAcquireFuture.completeExceptionally(acquireCause); |
| } |
| }); |
| } |
| } |
| });; |
| return new LockWaiter( |
| lockId.getLeft(), |
| currentOwner, |
| waiterAcquireFuture); |
| } |
| }); |
| } |
| |
| private boolean checkOrClaimLockOwner(final Pair<String, Long> currentOwner, |
| final CompletableFuture<String> result) { |
| if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) { |
| lockStateExecutor.submit(lockPath, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| result.complete(currentOwner.getLeft()); |
| } |
| }); |
| return false; |
| } |
| // current owner is itself |
| final int curEpoch = epoch.incrementAndGet(); |
| executeLockAction(curEpoch, new LockAction() { |
| @Override |
| public void execute() { |
| if (!lockState.inState(State.INIT)) { |
| result.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); |
| return; |
| } |
| asyncTryLock(false, result); |
| } |
| @Override |
| public String getActionName() { |
| return "claimOwnership(owner=" + currentOwner + ")"; |
| } |
| }, result); |
| return true; |
| } |
| |
| /** |
| * Try lock. If it failed, it would cleanup its attempt. |
| * |
| * @param wait |
| * whether to wait for ownership. |
| * @param result |
| * promise to satisfy with current lock owner |
| */ |
| private void asyncTryLock(boolean wait, final CompletableFuture<String> result) { |
| final CompletableFuture<String> lockResult = new CompletableFuture<String>(); |
| lockResult.whenComplete(new FutureEventListener<String>() { |
| @Override |
| public void onSuccess(String currentOwner) { |
| result.complete(currentOwner); |
| } |
| |
| @Override |
| public void onFailure(final Throwable lockCause) { |
| // If tryLock failed due to state changed, we don't need to cleanup |
| if (lockCause instanceof LockStateChangedException) { |
| LOG.info("skipping cleanup for {} at {} after encountering lock " + |
| "state change exception : ", new Object[] { lockId, lockPath, lockCause }); |
| result.completeExceptionally(lockCause); |
| return; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} is cleaning up its lock state for {} due to : ", |
| new Object[] { lockId, lockPath, lockCause }); |
| } |
| |
| // If we encountered any exception we should cleanup |
| CompletableFuture<Void> unlockResult = asyncUnlock(); |
| unlockResult.whenComplete(new FutureEventListener<Void>() { |
| @Override |
| public void onSuccess(Void value) { |
| result.completeExceptionally(lockCause); |
| } |
| @Override |
| public void onFailure(Throwable cause) { |
| result.completeExceptionally(lockCause); |
| } |
| }); |
| } |
| }); |
| asyncTryLockWithoutCleanup(wait, lockResult); |
| } |
| |
| /** |
| * Try lock. If wait is true, it would wait and watch sibling to acquire lock when |
| * the sibling is dead. <i>acquireCompletableFuture</i> will be notified either it locked successfully |
| * or the lock failed. The promise will only satisfy with current lock owner. |
| * |
| * NOTE: the <i>promise</i> is only satisfied on <i>lockStateExecutor</i>, so any |
| * transformations attached on promise will be executed in order. |
| * |
| * @param wait |
| * whether to wait for ownership. |
| * @param promise |
| * promise to satisfy with current lock owner. |
| */ |
| private void asyncTryLockWithoutCleanup(final boolean wait, final CompletableFuture<String> promise) { |
| executeLockAction(epoch.get(), new LockAction() { |
| @Override |
| public void execute() { |
| if (!lockState.inState(State.INIT)) { |
| promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.INIT, lockState.getState())); |
| return; |
| } |
| lockState.transition(State.PREPARING); |
| |
| final int curEpoch = epoch.incrementAndGet(); |
| watcher = new LockWatcher(curEpoch); |
| // register watcher for session expires |
| zkClient.register(watcher); |
| // Encode both client id and session in the lock node |
| String myPath; |
| try { |
| // member_<clientid>_s<owner_session>_ |
| myPath = getLockPathPrefixV3(lockPath, lockId.getLeft(), lockId.getRight()); |
| } catch (UnsupportedEncodingException uee) { |
| myPath = getLockPathPrefixV1(lockPath); |
| } |
| zk.create(myPath, serializeClientId(lockId.getLeft()), zkClient.getDefaultACL(), CreateMode.EPHEMERAL_SEQUENTIAL, |
| new AsyncCallback.StringCallback() { |
| @Override |
| public void processResult(final int rc, String path, Object ctx, final String name) { |
| executeLockAction(curEpoch, new LockAction() { |
| @Override |
| public void execute() { |
| if (KeeperException.Code.OK.intValue() != rc) { |
| KeeperException ke = KeeperException.create(KeeperException.Code.get(rc)); |
| promise.completeExceptionally(ke); |
| return; |
| } |
| |
| if (FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition)) { |
| lockState.transition(State.CLOSING); |
| lockState.transition(State.CLOSED); |
| } |
| |
| if (null != currentNode) { |
| LOG.error("Current node for {} overwritten current = {} new = {}", |
| new Object[] { lockPath, lockId, getLockIdFromPath(currentNode) }); |
| } |
| |
| currentNode = name; |
| currentId = getLockIdFromPath(currentNode); |
| LOG.trace("{} received member id for lock {}", lockId, currentId); |
| |
| if (lockState.isExpiredOrClosing()) { |
| // Delete node attempt may have come after PREPARING but before create node, in which case |
| // we'd be left with a dangling node unless we clean up. |
| CompletableFuture<Void> deletePromise = new CompletableFuture<Void>(); |
| deleteLockNode(deletePromise); |
| FutureUtils.ensure( |
| deletePromise, |
| () -> promise.completeExceptionally( |
| new LockClosedException(lockPath, lockId, lockState.getState()))); |
| return; |
| } |
| |
| lockState.transition(State.PREPARED); |
| checkLockOwnerAndWaitIfPossible(watcher, wait, promise); |
| } |
| |
| @Override |
| public String getActionName() { |
| return "postPrepare(wait=" + wait + ")"; |
| } |
| }); |
| } |
| }, null); |
| } |
| @Override |
| public String getActionName() { |
| return "prepare(wait=" + wait + ")"; |
| } |
| }, promise); |
| } |
| |
| @Override |
| public void tryLock(long timeout, TimeUnit unit) throws LockingException { |
| final Stopwatch stopwatch = Stopwatch.createStarted(); |
| CompletableFuture<LockWaiter> tryFuture = asyncTryLock(timeout, unit); |
| LockWaiter waiter = waitForTry(stopwatch, tryFuture); |
| boolean acquired = waiter.waitForAcquireQuietly(); |
| if (!acquired) { |
| throw new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()); |
| } |
| } |
| |
| synchronized LockWaiter waitForTry(Stopwatch stopwatch, CompletableFuture<LockWaiter> tryFuture) |
| throws LockingException { |
| boolean success = false; |
| boolean stateChanged = false; |
| LockWaiter waiter; |
| try { |
| waiter = FutureUtils.result(tryFuture, lockOpTimeout, TimeUnit.MILLISECONDS); |
| success = true; |
| } catch (LockStateChangedException ex) { |
| stateChanged = true; |
| throw ex; |
| } catch (LockingException ex) { |
| throw ex; |
| } catch (TimeoutException toe) { |
| tryTimeouts.inc(); |
| throw new LockingException(lockPath, "Timeout during try phase of lock acquire", toe); |
| } catch (Exception ex) { |
| String message = getLockId() + " failed to lock " + lockPath; |
| throw new LockingException(lockPath, message, ex); |
| } finally { |
| if (success) { |
| tryStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); |
| } else { |
| tryStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); |
| } |
| // This can only happen for a Throwable thats not an |
| // Exception, i.e. an Error |
| if (!success && !stateChanged) { |
| unlock(); |
| } |
| } |
| return waiter; |
| } |
| |
| @Override |
| public CompletableFuture<Void> asyncUnlock() { |
| return asyncUnlock(new LockClosedException(lockPath, lockId, lockState.getState())); |
| } |
| |
| CompletableFuture<Void> asyncUnlock(final Throwable cause) { |
| final CompletableFuture<Void> promise = new CompletableFuture<Void>(); |
| |
| // Use lock executor here rather than lock action, because we want this opertaion to be applied |
| // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no |
| // risk of an ABA problem where we delete and recreate a node and then delete it again here. |
| lockStateExecutor.submit(lockPath, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| acquireFuture.completeExceptionally(cause); |
| unlockInternal(promise); |
| promise.whenComplete(new OpStatsListener<Void>(unlockStats)); |
| } |
| }); |
| |
| return promise; |
| } |
| |
| @Override |
| public void unlock() { |
| CompletableFuture<Void> unlockResult = asyncUnlock(); |
| try { |
| FutureUtils.result(unlockResult, lockOpTimeout, TimeUnit.MILLISECONDS); |
| } catch (TimeoutException toe) { |
| // This shouldn't happen unless we lose a watch, and may result in a leaked lock. |
| LOG.error("Timeout unlocking {} owned by {} : ", new Object[] { lockPath, lockId, toe }); |
| } catch (Exception e) { |
| LOG.warn("{} failed to unlock {} : ", new Object[] { lockId, lockPath, e }); |
| } |
| } |
| |
| // Lock State Changes (all state changes should be executed under a LockAction) |
| |
| private void claimOwnership(int lockEpoch) { |
| lockState.transition(State.CLAIMED); |
| // clear previous lock ids |
| lockContext.clearLockIds(); |
| // add current lock id |
| lockContext.addLockId(lockId); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Notify lock waiters on {} at {} : watcher epoch {}, lock epoch {}", |
| new Object[] { lockPath, System.currentTimeMillis(), |
| lockEpoch, ZKSessionLock.this.epoch.get() }); |
| } |
| acquireFuture.complete(true); |
| } |
| |
| /** |
| * NOTE: unlockInternal should only after try lock. |
| */ |
| private void unlockInternal(final CompletableFuture<Void> promise) { |
| |
| // already closed or expired, nothing to cleanup |
| this.epoch.incrementAndGet(); |
| if (null != watcher) { |
| this.zkClient.unregister(watcher); |
| } |
| |
| if (lockState.inState(State.CLOSED)) { |
| promise.complete(null); |
| return; |
| } |
| |
| LOG.info("Lock {} for {} is closed from state {}.", |
| new Object[] { lockId, lockPath, lockState.getState() }); |
| |
| final boolean skipCleanup = lockState.inState(State.INIT) || lockState.inState(State.EXPIRED); |
| |
| lockState.transition(State.CLOSING); |
| |
| if (skipCleanup) { |
| // Nothing to cleanup if INIT (never tried) or EXPIRED (ephemeral node |
| // auto-removed) |
| lockState.transition(State.CLOSED); |
| promise.complete(null); |
| return; |
| } |
| |
| // In any other state, we should clean up the member node |
| CompletableFuture<Void> deletePromise = new CompletableFuture<Void>(); |
| deleteLockNode(deletePromise); |
| |
| // Set the state to closed after we've cleaned up |
| deletePromise.whenCompleteAsync(new FutureEventListener<Void>() { |
| @Override |
| public void onSuccess(Void complete) { |
| lockState.transition(State.CLOSED); |
| promise.complete(null); |
| } |
| @Override |
| public void onFailure(Throwable cause) { |
| // Delete failure is quite serious (causes lock leak) and should be |
| // handled better |
| LOG.error("lock node delete failed {} {}", lockId, lockPath); |
| promise.complete(null); |
| } |
| }, lockStateExecutor.chooseExecutor(lockPath)); |
| } |
| |
| private void deleteLockNode(final CompletableFuture<Void> promise) { |
| if (null == currentNode) { |
| promise.complete(null); |
| return; |
| } |
| |
| zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() { |
| @Override |
| public void processResult(final int rc, final String path, Object ctx) { |
| lockStateExecutor.submit(lockPath, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| if (KeeperException.Code.OK.intValue() == rc) { |
| LOG.info("Deleted lock node {} for {} successfully.", path, lockId); |
| } else if (KeeperException.Code.NONODE.intValue() == rc || |
| KeeperException.Code.SESSIONEXPIRED.intValue() == rc) { |
| LOG.info("Delete node failed. Node already gone for node {} id {}, rc = {}", |
| new Object[] { path, lockId, KeeperException.Code.get(rc) }); |
| } else { |
| LOG.error("Failed on deleting lock node {} for {} : {}", |
| new Object[] { path, lockId, KeeperException.Code.get(rc) }); |
| } |
| |
| FailpointUtils.checkFailPointNoThrow(FailpointUtils.FailPointName.FP_LockUnlockCleanup); |
| promise.complete(null); |
| } |
| }); |
| } |
| }, null); |
| } |
| |
| /** |
| * Handle session expired for lock watcher at epoch <i>lockEpoch</i>. |
| * |
| * @param lockEpoch |
| * lock epoch |
| */ |
| private void handleSessionExpired(final int lockEpoch) { |
| executeLockAction(lockEpoch, new LockAction() { |
| @Override |
| public void execute() { |
| if (lockState.inState(State.CLOSED) || lockState.inState(State.CLOSING)) { |
| // Already fully closed, no need to process expire. |
| return; |
| } |
| |
| boolean shouldNotifyLockListener = lockState.inState(State.CLAIMED); |
| |
| lockState.transition(State.EXPIRED); |
| |
| // remove the watcher |
| if (null != watcher) { |
| zkClient.unregister(watcher); |
| } |
| |
| // increment epoch to avoid any ongoing locking action |
| ZKSessionLock.this.epoch.incrementAndGet(); |
| |
| // if session expired, just notify the waiter. as the lock acquire doesn't succeed. |
| // we don't even need to clean up the lock as the znode will disappear after session expired |
| acquireFuture.completeExceptionally( |
| new LockSessionExpiredException(lockPath, lockId, lockState.getState())); |
| |
| // session expired, ephemeral node is gone. |
| currentNode = null; |
| currentId = null; |
| |
| if (shouldNotifyLockListener) { |
| // if session expired after claimed, we need to notify the caller to re-lock |
| if (null != lockListener) { |
| lockListener.onExpired(); |
| } |
| } |
| } |
| |
| @Override |
| public String getActionName() { |
| return "handleSessionExpired(epoch=" + lockEpoch + ")"; |
| } |
| }); |
| } |
| |
| private void handleNodeDelete(int lockEpoch, final WatchedEvent event) { |
| executeLockAction(lockEpoch, new LockAction() { |
| @Override |
| public void execute() { |
| // The lock is either expired or closed |
| if (!lockState.inState(State.WAITING)) { |
| LOG.info("{} ignore watched node {} deleted event, since lock state has moved to {}.", |
| new Object[] { lockId, event.getPath(), lockState.getState() }); |
| return; |
| } |
| lockState.transition(State.PREPARED); |
| |
| // we don't need to wait and check the result, since: |
| // 1) if it claimed the ownership, it would notify the waiters when claimed ownerships |
| // 2) if it failed, it would also notify the waiters, the waiters would cleanup the state. |
| checkLockOwnerAndWaitIfPossible(watcher, true); |
| } |
| |
| @Override |
| public String getActionName() { |
| return "handleNodeDelete(path=" + event.getPath() + ")"; |
| } |
| }); |
| } |
| |
| private CompletableFuture<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, |
| final boolean wait) { |
| final CompletableFuture<String> promise = new CompletableFuture<String>(); |
| checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise); |
| return promise; |
| } |
| |
| /** |
| * Check Lock Owner Phase 1 : Get all lock waiters. |
| * |
| * @param lockWatcher |
| * lock watcher. |
| * @param wait |
| * whether to wait for ownership. |
| * @param promise |
| * promise to satisfy with current lock owner |
| */ |
| private void checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher, |
| final boolean wait, |
| final CompletableFuture<String> promise) { |
| zk.getChildren(lockPath, false, new AsyncCallback.Children2Callback() { |
| @Override |
| public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { |
| processLockWaiters(lockWatcher, wait, rc, children, promise); |
| } |
| }, null); |
| } |
| |
| /** |
| * Check Lock Owner Phase 2 : check all lock waiters to get current owner and wait for ownership if necessary. |
| * |
| * @param lockWatcher |
| * lock watcher. |
| * @param wait |
| * whether to wait for ownership. |
| * @param getChildrenRc |
| * result of getting all lock waiters |
| * @param children |
| * current lock waiters. |
| * @param promise |
| * promise to satisfy with current lock owner. |
| */ |
| private void processLockWaiters(final LockWatcher lockWatcher, |
| final boolean wait, |
| final int getChildrenRc, |
| final List<String> children, |
| final CompletableFuture<String> promise) { |
| executeLockAction(lockWatcher.epoch, new LockAction() { |
| @Override |
| public void execute() { |
| if (!lockState.inState(State.PREPARED)) { // e.g. lock closed or session expired after prepared |
| promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); |
| return; |
| } |
| |
| if (KeeperException.Code.OK.intValue() != getChildrenRc) { |
| promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(getChildrenRc))); |
| return; |
| } |
| if (children.isEmpty()) { |
| LOG.error("Error, member list is empty for lock {}.", lockPath); |
| promise.completeExceptionally(new UnexpectedException("Empty member list for lock " + lockPath)); |
| return; |
| } |
| |
| // sort the children |
| Collections.sort(children, MEMBER_COMPARATOR); |
| final String cid = currentId; |
| final int memberIndex = children.indexOf(cid); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{} is the number {} member in the list.", cid, memberIndex); |
| } |
| // If we hold the lock |
| if (memberIndex == 0) { |
| LOG.info("{} acquired the lock {}.", cid, lockPath); |
| claimOwnership(lockWatcher.epoch); |
| promise.complete(cid); |
| } else if (memberIndex > 0) { // we are in the member list but we didn't hold the lock |
| // get ownership of current owner |
| asyncParseClientID(zk, lockPath, children.get(0)).whenComplete(new FutureEventListener<Pair<String, Long>>() { |
| @Override |
| public void onSuccess(Pair<String, Long> currentOwner) { |
| watchLockOwner(lockWatcher, wait, |
| cid, children.get(memberIndex - 1), children.get(0), currentOwner, promise); |
| } |
| @Override |
| public void onFailure(final Throwable cause) { |
| // ensure promise is satisfied in lock thread |
| executeLockAction(lockWatcher.epoch, new LockAction() { |
| @Override |
| public void execute() { |
| promise.completeExceptionally(cause); |
| } |
| |
| @Override |
| public String getActionName() { |
| return "handleFailureOnParseClientID(lockPath=" + lockPath + ")"; |
| } |
| }, promise); |
| } |
| }); |
| } else { |
| LOG.error("Member {} doesn't exist in the members list {} for lock {}.", |
| new Object[]{ cid, children, lockPath}); |
| promise.completeExceptionally( |
| new UnexpectedException("Member " + cid + " doesn't exist in member list " + |
| children + " for lock " + lockPath)); |
| } |
| } |
| |
| @Override |
| public String getActionName() { |
| return "processLockWaiters(rc=" + getChildrenRc + ", waiters=" + children + ")"; |
| } |
| }, promise); |
| } |
| |
| /** |
| * Check Lock Owner Phase 3: watch sibling node for lock ownership. |
| * |
| * @param lockWatcher |
| * lock watcher. |
| * @param wait |
| * whether to wait for ownership. |
| * @param myNode |
| * my lock node. |
| * @param siblingNode |
| * my sibling lock node. |
| * @param ownerNode |
| * owner lock node. |
| * @param currentOwner |
| * current owner info. |
| * @param promise |
| * promise to satisfy with current lock owner. |
| */ |
| private void watchLockOwner(final LockWatcher lockWatcher, |
| final boolean wait, |
| final String myNode, |
| final String siblingNode, |
| final String ownerNode, |
| final Pair<String, Long> currentOwner, |
| final CompletableFuture<String> promise) { |
| executeLockAction(lockWatcher.epoch, new LockAction() { |
| @Override |
| public void execute() { |
| boolean shouldWatch; |
| final boolean shouldClaimOwnership; |
| if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) { |
| // if the current owner is the znode left from previous session |
| // we should watch it and claim ownership |
| shouldWatch = true; |
| shouldClaimOwnership = true; |
| LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.", |
| new Object[] { myNode, lockPath, currentOwner }); |
| } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) { |
| // I found that my sibling is the current owner with same lock id (client id & session id) |
| // It must be left by any race condition from same zookeeper client |
| shouldWatch = true; |
| shouldClaimOwnership = true; |
| LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.", |
| new Object[]{myNode, lockPath, lockId, siblingNode}); |
| } else { |
| shouldWatch = wait; |
| if (wait) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Current LockWatcher for {} with ephemeral node {}, is waiting for {} to release lock at {}.", |
| new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()}); |
| } |
| } |
| shouldClaimOwnership = false; |
| } |
| |
| // watch sibling for lock ownership |
| if (shouldWatch) { |
| watchedNode = String.format("%s/%s", lockPath, siblingNode); |
| zk.exists(watchedNode, lockWatcher, new AsyncCallback.StatCallback() { |
| @Override |
| public void processResult(final int rc, String path, Object ctx, final Stat stat) { |
| executeLockAction(lockWatcher.epoch, new LockAction() { |
| @Override |
| public void execute() { |
| if (!lockState.inState(State.PREPARED)) { |
| promise.completeExceptionally(new LockStateChangedException(lockPath, lockId, State.PREPARED, lockState.getState())); |
| return; |
| } |
| |
| if (KeeperException.Code.OK.intValue() == rc) { |
| if (shouldClaimOwnership) { |
| // watch owner successfully |
| LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.", |
| new Object[]{ myNode, lockPath, ownerNode }); |
| claimOwnership(lockWatcher.epoch); |
| promise.complete(currentOwner.getLeft()); |
| } else { |
| // watch sibling successfully |
| lockState.transition(State.WAITING); |
| promise.complete(currentOwner.getLeft()); |
| } |
| } else if (KeeperException.Code.NONODE.intValue() == rc) { |
| // sibling just disappeared, it might be the chance to claim ownership |
| checkLockOwnerAndWaitIfPossible(lockWatcher, wait, promise); |
| } else { |
| promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc))); |
| } |
| } |
| |
| @Override |
| public String getActionName() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("postWatchLockOwner(myNode=").append(myNode).append(", siblingNode=") |
| .append(siblingNode).append(", ownerNode=").append(ownerNode).append(")"); |
| return sb.toString(); |
| } |
| }, promise); |
| } |
| }, null); |
| } else { |
| promise.complete(currentOwner.getLeft()); |
| } |
| } |
| |
| @Override |
| public String getActionName() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("watchLockOwner(myNode=").append(myNode).append(", siblingNode=") |
| .append(siblingNode).append(", ownerNode=").append(ownerNode).append(")"); |
| return sb.toString(); |
| } |
| }, promise); |
| } |
| |
| class LockWatcher implements Watcher { |
| |
| // Enforce a epoch number to avoid a race on canceling attempt |
| final int epoch; |
| |
| LockWatcher(int epoch) { |
| this.epoch = epoch; |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| LOG.debug("Received event {} from lock {} at {} : watcher epoch {}, lock epoch {}.", |
| new Object[] { event, lockPath, System.currentTimeMillis(), epoch, ZKSessionLock.this.epoch.get() }); |
| if (event.getType() == Watcher.Event.EventType.None) { |
| switch (event.getState()) { |
| case SyncConnected: |
| break; |
| case Expired: |
| LOG.info("Session {} is expired for lock {} at {} : watcher epoch {}, lock epoch {}.", |
| new Object[] { lockId.getRight(), lockPath, System.currentTimeMillis(), |
| epoch, ZKSessionLock.this.epoch.get() }); |
| handleSessionExpired(epoch); |
| break; |
| default: |
| break; |
| } |
| } else if (event.getType() == Event.EventType.NodeDeleted) { |
| // this handles the case where we have aborted a lock and deleted ourselves but still have a |
| // watch on the nextLowestNode. This is a workaround since ZK doesn't support unsub. |
| if (!event.getPath().equals(watchedNode)) { |
| LOG.warn("{} (watching {}) ignored watched event from {} ", |
| new Object[] { lockId, watchedNode, event.getPath() }); |
| return; |
| } |
| handleNodeDelete(epoch, event); |
| } else { |
| LOG.warn("Unexpected ZK event: {}", event.getType().name()); |
| } |
| } |
| |
| } |
| } |