| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.distributed.internal.locks; |
| |
| import static java.util.concurrent.TimeUnit.DAYS; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.cache.CommitConflictException; |
| import org.apache.geode.cache.TransactionDataNodeHasDepartedException; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.LockServiceDestroyedException; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.locks.DLockQueryProcessor.DLockQueryMessage; |
| import org.apache.geode.distributed.internal.locks.DLockRequestProcessor.DLockRequestMessage; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.IdentityArrayList; |
| import org.apache.geode.internal.cache.TXReservationMgr; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Provides lock grantor authority to a distributed lock service. This is responsible for granting, |
| * releasing, and timing out locks as well as exposing hooks for recovery or transfer of lock |
| * grantor. |
| * <p> |
| * ReadWriteLocks are not currently handled by grantor recovery or transfer. |
| * |
| */ |
| @SuppressWarnings("unchecked") |
| public class DLockGrantor { |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final boolean DEBUG_SUSPEND_LOCK = // TODO:LOG:CONVERT: REMOVE THIS |
| Boolean.getBoolean( |
| DistributionConfig.GEMFIRE_PREFIX + "DLockService.DLockGrantor.debugSuspendLock"); |
| |
| /** |
| * Default wait before grantor thread will reawaken to check for expirations and timeouts. |
| */ |
| public static final long GRANTOR_THREAD_MAX_WAIT = DLockGrantorThread.MAX_WAIT; |
| |
| /** |
| * Newly constructed grantor is INITIALIZING. All lock requests and related messages will block. |
| */ |
| private static final int INITIALIZING = 0; // msgs will block until READY |
| |
| /** |
| * Grantor is READY and handling lock requests. |
| */ |
| private static final int READY = 1; // msgs will be processed |
| |
| /** |
| * Grantor is DESTROYED and will respond as NOT_GRANTOR to any requests. |
| */ |
| private static final int DESTROYED = 5; // NOT_GRANTOR |
| |
| /** |
| * DistributedLockService that this grantor is granting locks for. |
| */ |
| protected final DLockService dlock; |
| |
| /** |
| * Map of grant tokens for tracking grantor-side state of distributed locks. Key: Object name, |
| * Value: DLockGrantToken grant |
| * |
| * guarded.By grantTokens |
| */ |
| private final Map grantTokens = new HashMap(); |
| |
| /** |
| * Dedicated thread responsible for handling expirations and timeouts. |
| */ |
| protected final DLockGrantorThread thread; |
| |
| /** |
| * The current state of this grantor. Volatile for read access. Mutation must occur while |
| * synchronized on this grantor. |
| * <p> |
| * State starts out as INITIALIZING and moves to READY at which point the grantor will start |
| * servicing requests. |
| * <p> |
| * If a newcomer requests transfer of grantorship, then the state will change to HALTED and then |
| * finally become DESTROYED when transfer is complete. During HALTED, the elder recognizes the |
| * newcomer as the current grantor. |
| * <p> |
| * If this service is shutting down, it will seek out a successor to become the new grantor. |
| * During this phase, the state will be PENDING_SHUTDOWN which means no requests or releases will |
| * be replied to until a successor is found, registers transfer intentions with the elder and |
| * sends this process a transfer grantorship request. At that point the state will be HALTED as |
| * described above. |
| * |
| * guarded.By this |
| */ |
| private volatile int state = INITIALIZING; |
| |
| /** |
| * ReadWriteLock to protect in-progress operations from destroying service. |
| */ |
| private final StoppableReentrantReadWriteLock destroyLock; |
| |
| /** |
| * Specialized lock information for Transaction lock batches. |
| * <p> |
| * Key: Object batchId, Value: DLockBatch batch |
| * <p> |
| * Handling of batch locks synchronizes on this to assure serial processing. |
| * |
| * guarded.By batchLocks |
| */ |
| private final Map batchLocks = new HashMap(); |
| |
| /** |
| * Handles special lock-reservation type for transactions. |
| */ |
| private final TXReservationMgr resMgr = new TXReservationMgr(false); |
| |
| private final Map<InternalDistributedMember, Long> membersDepartedTime = new LinkedHashMap(); |
| private final long departedMemberKeptInMapMilliSeconds = DAYS.toMillis(1); |
| |
| /** |
| * Enforces waiting until this grantor is initialized. Used to block all lock requests until |
| * INITIALIZED. |
| */ |
| private final StoppableCountDownLatch whileInitializing; |
| |
| /** |
| * Enforces waiting until this grantor is destroyed. Used to block all lock requests while |
| * destroying. Latch opens after state becomes DESTROYED and grantor begins replying with |
| * NOT_GRANTOR. |
| */ |
| private final StoppableCountDownLatch untilDestroyed; |
| |
| /** |
| * If -1 then it has not yet been fetched from elder. Otherwise it is the versionId that the elder |
| * gave us. During explicit becomeGrantor, the value is -1, and then the elder provides a real |
| * versionId. |
| */ |
| private final AtomicLong versionId = new AtomicLong(-1); |
| |
| /** |
| * Used to verify that requestor member is still in view when granting. |
| */ |
| protected final DistributionManager dm; |
| |
| // ------------------------------------------------------------------------- |
| // SuspendLocking state (BEGIN) |
| |
| /** |
| * Synchronization for protecting all SuspendLocking state. Guards all suspend locking state. |
| */ |
| protected final Object suspendLock = new Object(); |
| |
| /** |
| * Identifies remote thread that has currently suspended locking or null. |
| * |
| * Concurrency: protected by synchronization of {@link #suspendLock} |
| */ |
| protected RemoteThread lockingSuspendedBy = null; |
| |
| /** |
| * Value indicates nonexistent lock |
| */ |
| protected static final int INVALID_LOCK_ID = -1; |
| |
| /** |
| * Identifies the lockId used by the remote thread to suspend locking. |
| * |
| * Concurrency: protected by synchronization of {@link #suspendLock} |
| */ |
| protected int suspendedLockId = INVALID_LOCK_ID; |
| |
| /** |
| * FIFO queue for suspend read and write lock waiters. |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private final LinkedList suspendQueue = new LinkedList(); |
| |
| /** |
| * Map of read lock counts for each RemoteThread currently holding locks. |
| * <p> |
| * Key=RemoteThread, Value=ReadLockCount |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private final HashMap readLockCountMap = new HashMap(); |
| |
| /** |
| * Number of suspend waiters waiting for write lock. |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private int writeLockWaiters = 0; |
| |
| /** |
| * Total number of read locks held against suspend write lock. |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private int totalReadLockCount = 0; |
| |
| /** |
| * List of next requests to process after handling an unlock or resume. |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private ArrayList permittedRequests = new ArrayList(); |
| |
| /** |
| * List of active drains of permittedRequests. TODO: does this need to be a synchronizedList? |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private final List permittedRequestsDrain = Collections.synchronizedList(new LinkedList()); |
| |
| // SuspendLocking state (END) |
| // ------------------------------------------------------------------------- |
| |
| // ------------------------------------------------------------------------- |
| // Static methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Creates new instance of grantor for the lock service. |
| * |
| * @param dlock the lock service the grantor is authority for |
| * @param versionId the version, from the elder, of this grantor |
| * @return new instance of grantor for the lock service |
| */ |
| static DLockGrantor createGrantor(DLockService dlock, long versionId) { |
| return new DLockGrantor(dlock, versionId); |
| } |
| |
| /** |
| * Gets the local instance of grantor for the lock service if one exists. |
| * |
| * @param dlock the lock service the grantor is authority for |
| * @return instance of grantor for the lock service or null if no local grantor has been created |
| */ |
| static DLockGrantor getGrantorForService(DLockService dlock) { |
| if (dlock == null) |
| return null; |
| return dlock.getGrantor(); |
| } |
| |
| /** |
| * Returns instance of DLockGrantor that will handle distributed lock granting for specified |
| * service. |
| * <p> |
| * Returns null if unable to get ready grantor or if this process is not the grantor for the |
| * service. |
| * |
| * @param svc the lock service to return the grantor instance for |
| */ |
| public static DLockGrantor waitForGrantor(DLockService svc) throws InterruptedException { |
| if (svc == null) |
| return null; |
| // now we need to get the grantor and make sure it's ready... |
| DLockGrantor oldGrantor = null; |
| DLockGrantor grantor = getGrantorForService(svc); |
| do { |
| if (grantor == null || grantor.isDestroyed()) |
| return null; |
| grantor.waitWhileInitializing(); |
| if (svc.isDestroyed()) |
| return null; |
| // make sure we are lock grantor |
| if (!svc.isCurrentlyOrIsMakingLockGrantor()) |
| return null; |
| if (!grantor.isReady()) |
| return null; |
| // Now make sure the service still has this member as its grantor |
| oldGrantor = grantor; |
| grantor = getGrantorForService(svc); |
| } while (oldGrantor != grantor); |
| return grantor; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Constructor |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Creates instance of grantor for the lock service. |
| * |
| * @param dlock the lock service the grantor is authority for |
| * @param vId unique id that the elder increments for each new grantor |
| */ |
| private DLockGrantor(DLockService dlock, long vId) { |
| this.dm = dlock.getDistributionManager(); |
| CancelCriterion stopper = this.dm.getCancelCriterion(); |
| this.whileInitializing = new StoppableCountDownLatch(stopper, 1); |
| this.untilDestroyed = new StoppableCountDownLatch(stopper, 1); |
| this.dlock = dlock; |
| this.destroyLock = new StoppableReentrantReadWriteLock(stopper); |
| this.versionId.set(vId); |
| this.dm.addMembershipListener(this.membershipListener); |
| this.thread = new DLockGrantorThread(this, stopper); |
| this.dlock.getStats().incGrantors(1); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Public and package methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Returns the grantor's version id which was assigned by an elder. Required to help uniquely |
| * identify this grantor instance. |
| * |
| * @return the grantor's version id which was assigned by an elder |
| */ |
| public long getVersionId() { |
| return this.versionId.get(); |
| } |
| |
| /** |
| * Sets the version id after the elder tells us what it is. This is called during the explicit |
| * become grantor process. |
| * |
| * @param v the elder assigned version id for this grantor |
| */ |
| public void setVersionId(long v) { |
| this.versionId.set(v); |
| } |
| |
| /** |
| * Waits uninterruptibly while this grantor is initializing. Returns when grantor is ready to |
| * handle lock requests. |
| * |
| * @throws DistributedSystemDisconnectedException if system shuts down before grantor is ready |
| */ |
| public void waitWhileInitializing() throws InterruptedException { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (interrupted && this.dlock.isInterruptibleLockRequest()) { |
| throw new InterruptedException(); |
| } |
| while (true) { |
| try { |
| this.whileInitializing.await(); |
| break; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| throwIfInterruptible(e); |
| } |
| } |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Waits uninterruptibly until this service is destroyed. Returns when grantor has been completely |
| * destroyed. |
| * |
| * @throws DistributedSystemDisconnectedException if system shuts down before grantor is destroyed |
| */ |
| public void waitUntilDestroyed() throws InterruptedException { |
| while (true) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.untilDestroyed.await(); |
| break; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| throwIfInterruptible(e); |
| } finally { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer buffer = new StringBuffer(128); |
| buffer.append('<').append("DLockGrantor").append("@") |
| .append(Integer.toHexString(System.identityHashCode(this))).append(" state=") |
| .append(stateToString(this.state)).append(" name=").append(this.dlock.getName()) |
| .append(" version=").append(this.getVersionId()).append('>'); |
| return buffer.toString(); |
| } |
| |
| /** |
| * Returns true if this grantor is ready to handle lock requests. |
| * |
| * @return true if this grantor is ready to handle lock requests |
| */ |
| boolean isReady() { |
| return this.state == READY; |
| } |
| |
| /** |
| * Returns true if this grantor is still initializing and not yet ready for lock requests. |
| * |
| * @return true if this grantor is still initializing |
| */ |
| boolean isInitializing() { |
| return this.state == INITIALIZING; |
| } |
| |
| /** |
| * Returns true if this grantor has been destroyed. |
| * |
| * @return true if this grantor has been destroyed |
| */ |
| public boolean isDestroyed() { |
| return this.state == DESTROYED; |
| } |
| |
| /** |
| * Throws LockGrantorDestroyedException if this grantor has been destroyed. |
| * |
| * @throws LockGrantorDestroyedException if grantor is destroyed |
| */ |
| void checkDestroyed() { |
| throwIfDestroyed(isDestroyed()); |
| } |
| |
| /** |
| * Throws LockGrantorDestroyedException if destroyed is true. |
| * |
| * @param destroyed if true then throw LockGrantorDestroyedException |
| * @throws LockGrantorDestroyedException if destroyed is true |
| */ |
| private void throwIfDestroyed(boolean destroyed) { |
| if (destroyed) { |
| throw new LockGrantorDestroyedException( |
| "Grantor is destroyed"); |
| } |
| } |
| |
| /** |
| * Handles request for a batch of locks using optimization for transactions. |
| * <p> |
| * Synchronizes on {@link #batchLocks}. |
| * |
| * @throws LockGrantorDestroyedException if grantor is destroyed |
| */ |
| void handleLockBatch(DLockRequestMessage request) throws InterruptedException { |
| DLockLessorDepartureHandler handler = this.dlock.getDLockLessorDepartureHandler(); |
| // make sure the tx locks of departed members have been cleared so we don't have |
| // conflicts with non-existent members. This is done in a waiting-pool thread launched |
| // when the member-departure is announced. |
| handler.waitForInProcessDepartures(); |
| |
| synchronized (this.batchLocks) { // assures serial processing |
| waitWhileInitializing(); |
| if (request.checkForTimeout()) { |
| cleanupSuspendState(request); |
| return; |
| } |
| |
| final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch]"); |
| } |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] request: {}", |
| request); |
| } |
| |
| DLockBatch batch = (DLockBatch) request.getObjectName(); |
| checkIfHostDeparted(batch.getOwner()); |
| resMgr.makeReservation((IdentityArrayList) batch.getReqs()); |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] granting {}", |
| batch.getBatchId()); |
| } |
| this.batchLocks.put(batch.getBatchId(), batch); |
| request.respondWithGrant(Long.MAX_VALUE); |
| } catch (CommitConflictException ex) { |
| request.respondWithTryLockFailed(ex.getMessage()); |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| } |
| |
| private void checkIfHostDeparted(InternalDistributedMember owner) { |
| // Already held batchLocks; hold membersDepartedTime lock just for clarity |
| synchronized (membersDepartedTime) { |
| // the transaction host/txLock requester has departed. |
| if (membersDepartedTime.containsKey(owner)) { |
| throw new TransactionDataNodeHasDepartedException( |
| "The transaction host " + owner + " is no longer a member of the cluster."); |
| } |
| } |
| } |
| |
| /** |
| * Returns transaction optimized lock batches that were created by the specified owner. |
| * <p> |
| * Synchronizes on batchLocks. |
| * |
| * @param owner member that owned the lock batches to return |
| * @return lock batches that were created by owner |
| */ |
| public DLockBatch[] getLockBatches(InternalDistributedMember owner) { |
| // Key: Object batchId, Value: DLockBatch batch |
| synchronized (this.batchLocks) { |
| // put owner into the map first so that no new threads will handle in-flight requests |
| // from the departed member to lock keys |
| recordMemberDepartedTime(owner); |
| |
| List batchList = new ArrayList(); |
| for (Iterator iter = this.batchLocks.values().iterator(); iter.hasNext();) { |
| DLockBatch batch = (DLockBatch) iter.next(); |
| if (batch.getOwner().equals(owner)) { |
| batchList.add(batch); |
| } |
| } |
| return (DLockBatch[]) batchList.toArray(new DLockBatch[0]); |
| } |
| } |
| |
| void recordMemberDepartedTime(InternalDistributedMember owner) { |
| // Already held batchLocks; hold membersDepartedTime lock just for clarity |
| synchronized (membersDepartedTime) { |
| long currentTime = getCurrentTime(); |
| for (Iterator iterator = membersDepartedTime.values().iterator(); iterator.hasNext();) { |
| if ((long) iterator.next() < currentTime - departedMemberKeptInMapMilliSeconds) { |
| iterator.remove(); |
| } else { |
| break; |
| } |
| } |
| membersDepartedTime.put(owner, currentTime); |
| } |
| } |
| |
| long getCurrentTime() { |
| return System.currentTimeMillis(); |
| } |
| |
| @VisibleForTesting |
| Map getMembersDepartedTimeRecords() { |
| return membersDepartedTime; |
| } |
| |
| /** |
| * Get the batch for the given batchId (for example use a txLockId from TXLockBatch in order to |
| * update its participants). This operation was added as part of the solution to bug 32999. |
| * <p> |
| * Acquires acquireDestroyReadLock. Synchronizes on batchLocks. |
| * <p> |
| * see org.apache.geode.internal.cache.TXCommitMessage#updateLockMembers() |
| * |
| * @param batchId the identifier for the batch to retrieve |
| * @return the transaction lock batch identified by the given batchId |
| * @see org.apache.geode.internal.cache.locks.TXLockUpdateParticipantsMessage |
| * @see org.apache.geode.internal.cache.locks.TXLockBatch#getBatchId() |
| */ |
| public DLockBatch getLockBatch(Object batchId) throws InterruptedException { |
| DLockBatch ret = null; |
| final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.getLockBatch] enter: {}", batchId); |
| } |
| synchronized (this.batchLocks) { |
| waitWhileInitializing(); |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| ret = (DLockBatch) this.batchLocks.get(batchId); |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.getLockBatch] exit: {}", batchId); |
| } |
| return ret; |
| } |
| |
| /** |
| * Update the batch for the given batch. This operation was added as part of the solution to bug |
| * 32999. |
| * <p> |
| * Acquires acquireDestroyReadLock. Synchronizes on batchLocks. |
| * <p> |
| * see org.apache.geode.internal.cache.locks.TXCommitMessage#updateLockMembers() |
| * |
| * @param batchId the identify of the transaction lock batch |
| * @param newBatch the new lock batch to be used |
| * @see org.apache.geode.internal.cache.locks.TXLockUpdateParticipantsMessage |
| * @see org.apache.geode.internal.cache.locks.TXLockBatch#getBatchId() |
| */ |
| public void updateLockBatch(Object batchId, DLockBatch newBatch) throws InterruptedException { |
| final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.updateLockBatch] enter: {}", batchId); |
| } |
| synchronized (this.batchLocks) { |
| waitWhileInitializing(); |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| final DLockBatch oldBatch = (DLockBatch) this.batchLocks.get(batchId); |
| if (oldBatch != null) { |
| this.batchLocks.put(batchId, newBatch); |
| } |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.updateLockBatch] exit: {}", batchId); |
| } |
| } |
| |
| /** |
| * Releases the transaction optimized lock batch. |
| * <p> |
| * Acquires acquireDestroyReadLock. Synchronizes on batchLocks. |
| * |
| * @param batchId the identify of the transaction lock batch to release |
| * @param owner the member that has created and locked the lock batch |
| * @throws LockGrantorDestroyedException if grantor is destroyed or interrupted |
| */ |
| public void releaseLockBatch(Object batchId, InternalDistributedMember owner) |
| throws InterruptedException { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.releaseLockBatch]"); |
| } |
| synchronized (this.batchLocks) { |
| waitWhileInitializing(); |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| DLockBatch batch = (DLockBatch) this.batchLocks.remove(batchId); |
| if (batch != null) { |
| this.resMgr.releaseReservation((IdentityArrayList) batch.getReqs()); |
| } |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| } |
| |
| /** |
| * Returns true if the request comes from the local member. |
| * |
| * @param request the lock request to check |
| * @return true if the request comes from the local member |
| */ |
| private boolean isLocalRequest(DLockRequestMessage request) { |
| return request.getSender().equals(this.dlock.getDistributionManager().getId()); |
| } |
| |
| /** |
| * TEST HOOK: Allows testing to determine if there are waiting requests for a lock. |
| * <p> |
| * Synchronizes on grantTokens and the grant token if one exists. |
| * |
| * @param name the lock to check for waiting requests for |
| * @return true if the named lock has requests waiting to acquire it |
| */ |
| boolean hasWaitingRequests(Object name) { |
| DLockGrantToken grant = getGrantToken(name); |
| if (grant == null) |
| return false; |
| synchronized (grant) { |
| return grant.hasWaitingRequests(); |
| } |
| } |
| |
| /** |
| * Handles a DLockQueryMessage. Returns DLockGrantToken for the lock or null. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on grantTokens. |
| * |
| * @param query the dlock query message to handle |
| * @return DLockGrantToken for the lock or null |
| */ |
| DLockGrantToken handleLockQuery(DLockQueryMessage query) throws InterruptedException { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockQuery] {}", query); |
| } |
| if (acquireDestroyReadLock(0)) { |
| try { |
| checkDestroyed(); |
| return getGrantToken(query.objectName); |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Handles the provided lock request. The lock will either be granted, refused if try-lock, or |
| * scheduled at end of waiting queue to eventually be granted or timed out. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on grantTokens, suspendLock and the grant token. |
| * |
| * @param request the lock request to be processed by this grantor |
| * @throws LockGrantorDestroyedException if grantor is destroyed |
| */ |
| void handleLockRequest(DLockRequestMessage request) throws InterruptedException { |
| Assert.assertTrue(request.getRemoteThread() != null); |
| if (request.getObjectName() instanceof DLockBatch) { |
| handleLockBatch(request); |
| return; |
| } |
| |
| waitWhileInitializing(); // calcWaitMillisFromNow |
| |
| final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockRequest] {}", request); |
| } |
| |
| if (!acquireDestroyReadLock(0)) { |
| if (isLocalRequest(request) && this.dlock.isDestroyed()) { |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.handleLockRequest] about to throwIfDestroyed"); |
| } |
| // this special case is one fix for deadlock between waitUntilDestroyed |
| // and dlock waitForGrantorCallsInProgress (when request is local) |
| throwIfDestroyed(true); |
| } else { |
| if (isTraceEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.handleLockRequest] about to waitUntilDestroyed"); |
| } |
| // is there still a deadlock when an explicit become is destroying |
| // this grantor instead of destroying the dlock service? |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| } |
| try { |
| // make sure we don't grant a dlock held by a departed member until that member's |
| // transactions are resolved |
| DLockLessorDepartureHandler dLockLessorDepartureHandler = |
| this.dlock.getDLockLessorDepartureHandler(); |
| if (dLockLessorDepartureHandler != null) { |
| dLockLessorDepartureHandler.waitForInProcessDepartures(); |
| } |
| checkDestroyed(); |
| if (acquireLockPermission(request)) { |
| handlePermittedLockRequest(request); |
| } else { |
| // request has been added to suspendQueue for deferred handling |
| } |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| |
| /** |
| * Internally handles a lock request which has permission to proceed. |
| * <p> |
| * Calling thread must hold destroyReadLock. Synchronizes on grantTokens, suspendLock and the |
| * grant token. |
| * |
| * @param request the lock request to be processed by this grantor guarded.By |
| * {@link #acquireDestroyReadLock(long)} |
| */ |
| private void handlePermittedLockRequest(final DLockRequestMessage request) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handlePermittedLockRequest] {}", request); |
| } |
| Assert.assertTrue(request.getRemoteThread() != null); |
| DLockGrantToken grant = getOrCreateGrant(request.getObjectName()); |
| try { |
| |
| // try to grant immediately if not currently granted... |
| if (grant.grantLockToRequest(request)) { |
| // do nothing |
| } |
| |
| // if request was local and then interrupted/released... |
| else if (request.responded()) { |
| // do nothing |
| } |
| |
| // if request was a failed try-lock... |
| else if (request.isTryLock()) { |
| cleanupSuspendState(request); |
| request.respondWithTryLockFailed(request.getObjectName()); |
| } |
| |
| // if request has timed out... |
| else if (request.checkForTimeout()) { |
| cleanupSuspendState(request); |
| } |
| |
| // schedule into waiting queue for eventual granting... |
| else { |
| grant.schedule(request); |
| this.thread.checkTimeToWait(calcWaitMillisFromNow(request), false); |
| } |
| } finally { |
| grant.decAccess(); |
| } |
| } |
| |
| /** |
| * Initializes this new grantor with previously held locks as provided during grantor recovery. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on this grantor, grantTokens, suspendLock, the grant |
| * token. |
| * |
| * @param owner the member that owns the tokens to be scheduled |
| * @param tokens set of DLockRemoteTokens to be scheduled for owner |
| */ |
| void initializeHeldLocks(InternalDistributedMember owner, Set tokens) |
| throws InterruptedException { |
| synchronized (this) { |
| if (isDestroyed()) |
| return; |
| if (!acquireDestroyReadLock(0)) { |
| return; |
| } |
| } |
| |
| try { |
| synchronized (this.grantTokens) { |
| Set members = this.dlock.getDistributionManager().getDistributionManagerIds(); |
| |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| for (Iterator iter = tokens.iterator(); iter.hasNext();) { |
| DLockRemoteToken token = (DLockRemoteToken) iter.next(); |
| DLockGrantToken grantToken = getOrCreateGrant(token.getName()); |
| try { |
| |
| // make sure the token's owner is still in the system |
| if (!members.contains(owner)) { |
| // skipping because member is no longer in view |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "Initialization of held locks is skipping {} because owner {} is not in view: ", |
| token, owner, members); |
| } |
| continue; |
| } |
| |
| RemoteThread rThread = null; |
| boolean isSuspendLock = false; |
| int lockId = -1; |
| |
| synchronized (grantToken) { |
| if (grantToken.isLeaseHeld()) { |
| logger.error(LogMarker.DLS_MARKER, |
| "Initialization of held locks is skipping {} because lock is already held: {}", |
| token, grantToken); |
| continue; |
| } |
| |
| grantToken.grantLock(owner, token.getLeaseExpireTime(), token.getLeaseId(), |
| token.getLesseeThread()); |
| |
| // grantToken may have already expired or is about to expire |
| // complete initialization but make sure grantor thread will wake |
| // up and expire it as soon as it's running |
| if (grantToken.getLeaseExpireTime() > -1 |
| && grantToken.getLeaseExpireTime() < Long.MAX_VALUE) { |
| long now = DLockService.getLockTimeStamp(this.dm); |
| this.thread.checkTimeToWait(grantToken.getLeaseExpireTime() - now, true); |
| } |
| |
| rThread = grantToken.getRemoteThread(); |
| isSuspendLock = grantToken.isSuspendLockingToken(); |
| lockId = grantToken.getLockId(); |
| } |
| |
| // update the readLock and suspendLocking states... |
| synchronized (suspendLock) { |
| if (isSuspendLock) { |
| suspendLocking(rThread, lockId); |
| } else { |
| Assert.assertTrue(!isLockingSuspended() || isLockingSuspendedBy(rThread), |
| "Locking is suspended by a different thread: " + token); |
| Integer integer = (Integer) readLockCountMap.get(rThread); |
| int readLockCount = integer == null ? 0 : integer.intValue(); |
| readLockCount++; |
| readLockCountMap.put(rThread, Integer.valueOf(readLockCount)); |
| totalReadLockCount++; |
| checkTotalReadLockCount(); |
| } |
| } // suspendLock sync |
| |
| } finally { |
| grantToken.decAccess(); |
| } |
| |
| } // tokens iter |
| } // grantTokens sync |
| return; |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| |
| /** |
| * Handles a request for extending the lease time of an already held lock. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on grantTokens and the grant token. |
| * |
| * @param request the lock request to be reentered for lease extension |
| * @return new extended leaseExpireTime or 0 if requestor no longer holds lock |
| */ |
| long reenterLock(DLockRequestMessage request) throws InterruptedException { |
| waitWhileInitializing(); // calcWaitMillisFromNow |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| // to fix GEODE-678 no longer call request.checkForTimeout |
| DLockGrantToken grant = getGrantToken(request.getObjectName()); |
| if (grant == null) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.reenterLock] no grantToken found for {}", request.getObjectName()); |
| } |
| return 0; |
| } |
| |
| synchronized (grant) { // synchronize against grant.expireAndGrantLock |
| if (!this.dm.isCurrentMember(request.getSender()) || grant.isDestroyed()) { |
| return 0; |
| } |
| if (!grant.isLockedBy(request.getSender(), request.getLockId())) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.reenterLock] grant is not locked by sender={} lockId={} grant={}", |
| request.getSender(), request.getLockId(), grant); |
| } |
| return 0; |
| } |
| |
| long leaseExpireTime = |
| Math.max(grant.getLeaseExpireTime(), grant.calcLeaseExpireTime(request.getLeaseTime())); |
| |
| grant.grantLock(request.getSender(), leaseExpireTime, request.getLockId(), |
| grant.getRemoteThread()); |
| |
| return grant.getLeaseExpireTime(); |
| } |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| |
| /** |
| * Release named lock if held by owner using lockId. Called from DLockReleaseMessage.basicProcess |
| * for remote unlock. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on grantTokens and the grant token. |
| * |
| * @param name the name of the lock to release |
| * @param owner the member releasing the lock |
| * @param lockId the identity of the lease used by the owner |
| * @throws LockGrantorDestroyedException if grantor is destroyed |
| */ |
| void releaseIfLocked(Object name, InternalDistributedMember owner, int lockId) |
| throws InterruptedException { |
| waitWhileInitializing(); |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| getAndReleaseGrantIfLockedBy(name, owner, lockId); |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| |
| /** |
| * Fetches the actual grant token and releases it if leased by owner using lockId. |
| * DLockReleaseMessage.basicProcess -> releaseIfLocked -> getAndReleaseGrantIfLockedBy |
| * <p> |
| * Caller must hold destroyReadLock. Synchronizes on grantTokens and the grant token. |
| * |
| * @param name the name of the lock to release |
| * @param owner the member attempting to release the granted lock |
| * @param lockId the id of the lease used by the owner guarded.By |
| * {@link #acquireDestroyReadLock(long)} |
| */ |
| private void getAndReleaseGrantIfLockedBy(Object name, InternalDistributedMember owner, |
| int lockId) { |
| synchronized (this.grantTokens) { |
| DLockGrantToken grantToken = basicGetGrantToken(name); |
| if (grantToken != null) { // checking isTokenDestroyed here will deadlock |
| synchronized (grantToken) { |
| // if (!grantToken.isTokenDestroyed()) |
| try { |
| grantToken.releaseIfLockedBy(owner, lockId); |
| removeGrantIfUnused(grantToken); |
| } catch (IllegalStateException e) { |
| this.dlock.checkDestroyed(); |
| checkDestroyed(); |
| // must have hit race... grantor doesn't have the token |
| return; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Fetches the grant token for named lock and attempts to grant it to the next waiting requestor |
| * if one exists. Called from DLockReleaseProcessor when another process releases a lock and after |
| * the reply has been sent. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on grantTokens and the grant token. |
| * |
| * @param name the name of the lock to grant |
| * @throws LockGrantorDestroyedException if grantor is destroyed |
| */ |
| void grantLock(Object name) throws InterruptedException { |
| waitWhileInitializing(); |
| if (!acquireDestroyReadLock(0)) { |
| waitUntilDestroyed(); |
| checkDestroyed(); |
| } |
| try { |
| checkDestroyed(); |
| DLockGrantToken grant = getGrantToken(name); |
| if (grant != null) { |
| removeGrantIfUnused(grant); |
| } |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| |
| /** |
| * Handles the departure of a member by releasing every lock it owned. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on grantTokens, suspendLock, and the grant token. |
| * |
| * @param owner the member that departed |
| */ |
| void handleDepartureOf(InternalDistributedMember owner) throws InterruptedException { |
| // bug 32657 has another cause in this method... interrupted thread from |
| // connection/channel layer caused acquireDestroyReadLock to fail... |
| // fixed by Darrel in org.apache.geode.internal.tcp.Connection |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (acquireDestroyReadLock(0)) { |
| try { |
| if (isDestroyed()) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.handleDepartureOf] grantor is destroyed; ignoring {}", owner); |
| } |
| return; |
| } |
| try { |
| DLockLessorDepartureHandler handler = this.dlock.getDLockLessorDepartureHandler(); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleDepartureOf] handler = {}", |
| handler); |
| } |
| if (handler != null) { |
| handler.handleDepartureOf(owner, this); |
| } |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DlockGrantor.handleDepartureOf] ignored cancellation (1)"); |
| } |
| } finally { |
| synchronized (this.suspendLock) { |
| HashSet removals = new HashSet(); |
| for (Iterator it = readLockCountMap.entrySet().iterator(); it.hasNext();) { |
| Map.Entry entry = (Map.Entry) it.next(); |
| RemoteThread rThread = (RemoteThread) entry.getKey(); |
| if (rThread.getDistributedMember().equals(owner)) { |
| removals.add(rThread); |
| } |
| } |
| for (Iterator it = removals.iterator(); it.hasNext();) { |
| try { |
| postReleaseLock((RemoteThread) it.next(), null); |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DlockGrantor.handleDepartureOf] ignored cancellation (2)"); |
| } |
| } |
| } |
| } // synchronized |
| synchronized (this.grantTokens) { |
| // do not call handleDepartureOf while iterating grantTokens |
| // changes fix bug 39172 (ConcurrentModificationException) |
| |
| // 1) built up list of grants that reference departed member |
| List grantsReferencingMember = new ArrayList(); |
| Collection grants = this.grantTokens.values(); |
| for (Iterator iter = grants.iterator(); iter.hasNext();) { |
| DLockGrantToken grant = (DLockGrantToken) iter.next(); |
| try { |
| grant.checkDepartureOf(owner, grantsReferencingMember); |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DlockGrantor.handleDepartureOf] ignored cancellation (3)"); |
| } |
| } |
| } // for |
| |
| // 2) call handleDepartureOf on list of grantsReferencingMember |
| ArrayList grantsToRemoveIfUnused = new ArrayList(); |
| for (Iterator iter = grantsReferencingMember.iterator(); iter.hasNext();) { |
| DLockGrantToken grant = (DLockGrantToken) iter.next(); |
| try { |
| grant.handleDepartureOf(owner, grantsToRemoveIfUnused); |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DlockGrantor.handleDepartureOf] ignored cancellation (4)"); |
| } |
| } |
| } // for |
| |
| // 3) remove grants in grantsToRemoveIfUnused list |
| // TODO: if grantsReferencingMember is always empty remove this |
| for (Iterator iter = grantsToRemoveIfUnused.iterator(); iter.hasNext();) { |
| DLockGrantToken grant = (DLockGrantToken) iter.next(); |
| try { |
| removeGrantIfUnused(grant); |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DlockGrantor.handleDepartureOf] ignored cancellation (5)"); |
| } |
| } |
| } // for |
| } // synchronized this.grantTokens |
| } // finally |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| } |
| |
| /** |
| * Destroys this grantor without attempting to transfer grant tokens to a successor. |
| * <p> |
| * Acquires destroyWriteLock. Synchronizes on this grantor, grantTokens, and each grant token. |
| */ |
| void destroy() { |
| synchronized (this) { |
| if (isDestroyed()) |
| return; |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[simpleDestroy]"); |
| } |
| // wait for the destroy write lock ignoring interrupts... |
| boolean acquired = false; |
| try { |
| boolean locksHeld = false; |
| try { |
| acquireDestroyWriteLock(Long.MAX_VALUE); |
| acquired = true; |
| // check for any held locks... |
| |
| if (isInitializing()) { |
| // assume the worst case and tell the elder that recovery will be required |
| locksHeld = true; |
| } else { |
| synchronized (this.grantTokens) { |
| InternalDistributedMember me = this.dlock.getDistributionManager().getId(); |
| for (Iterator iter = this.grantTokens.values().iterator(); iter.hasNext();) { |
| DLockGrantToken grant = (DLockGrantToken) iter.next(); |
| InternalDistributedMember owner = grant.getOwner(); |
| if (owner != null && !owner.equals(me)) { |
| locksHeld = true; |
| break; |
| } |
| } |
| } |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[simpleDestroy] {} locks held", |
| (locksHeld ? "with" : "without")); |
| } |
| } finally { |
| // make sure the following occurs even if checking locks above failed |
| |
| try { |
| // release latches and change internal state |
| destroyGrantor(); |
| } finally { |
| // tell the elder we are not the grantor anymore |
| this.dlock.clearGrantor(this.getVersionId(), locksHeld); |
| } |
| } |
| } finally { |
| if (acquired) { |
| releaseDestroyWriteLock(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Send replies to all waiting requestors to notify them that this is no longer the grantor. |
| * <p> |
| * Caller must acquire destroyWriteLock. Synchronizes on suspendLock, grantTokens, and each grant |
| * token. |
| * |
| * guarded.By {@link #acquireDestroyWriteLock(long)} |
| */ |
| private void destroyGrantor() { |
| Assert.assertHoldsLock(this, true); |
| makeDestroyed(); |
| // reply to all pending requests w/ NOT_GRANTOR |
| synchronized (this.grantTokens) { |
| Collection grants = this.grantTokens.values(); |
| for (Iterator iter = grants.iterator(); iter.hasNext();) { |
| DLockGrantToken grant = (DLockGrantToken) iter.next(); |
| grant.handleGrantorDestruction(); |
| } |
| } |
| |
| synchronized (suspendLock) { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.destroyAndRemove] responding to {} permitted requests.", |
| permittedRequests.size()); |
| } |
| respondWithNotGrantor(permittedRequests.iterator()); |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.destroyAndRemove] responding to {} requests awaiting permission.", |
| suspendQueue.size()); |
| } |
| respondWithNotGrantor(suspendQueue.iterator()); |
| |
| for (Iterator iter = permittedRequestsDrain.iterator(); iter.hasNext();) { |
| final List drain = (List) iter.next(); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.destroyAndRemove] responding to {} drained permitted requests.", |
| drain.size()); |
| } |
| respondWithNotGrantor(drain.iterator()); |
| } |
| } |
| } |
| |
| /** |
| * Send responses to specified requests informing the senders that this is no longer the grantor. |
| * <p> |
| * Caller must acquire destroyWriteLock. |
| * |
| * @param requests the requests to respond to guarded.By {@link #acquireDestroyWriteLock(long)} |
| */ |
| private void respondWithNotGrantor(Iterator requests) { |
| while (requests.hasNext()) { |
| final DLockRequestMessage request = (DLockRequestMessage) requests.next(); |
| request.respondWithNotGrantor(); |
| } |
| } |
| |
| /** |
| * Make this grantor ready for handling lock requests. |
| * <p> |
| * Synchronizes on this grantor. |
| * |
| * @param enforceInitializing true if this should assert isInitializing |
| * @return true if grantor was successfully made ready |
| */ |
| synchronized boolean makeReady(boolean enforceInitializing) { |
| if (isDestroyed()) { |
| this.dlock.checkDestroyed(); |
| } |
| if (!enforceInitializing) { |
| if (!isInitializing()) { |
| return false; |
| } |
| } |
| assertInitializing(); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| StringBuffer sb = |
| new StringBuffer("DLockGrantor " + this.dlock.getName() + " initialized with:"); |
| for (Iterator tokens = grantTokens.values().iterator(); tokens.hasNext();) { |
| sb.append("\n\t" + tokens.next()); |
| } |
| logger.trace(LogMarker.DLS_VERBOSE, sb.toString()); |
| } |
| this.state = READY; |
| this.whileInitializing.countDown(); |
| this.thread.start(); |
| return true; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Private methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Drain currently permitted requests and grant lock to next requestor. |
| * <p> |
| * Acquires destroyReadLock. Synchronizes on suspendLock, grantTokens, and the grant token. |
| * |
| * @param objectName the lock to perform post release tasks for |
| */ |
| void postRemoteReleaseLock(Object objectName) throws InterruptedException { |
| if (!acquireDestroyReadLock(0)) { |
| return; |
| } |
| try { |
| checkDestroyed(); |
| drainPermittedRequests(); |
| grantLock(objectName); |
| } catch (LockServiceDestroyedException | LockGrantorDestroyedException e) { |
| // ignore... service was destroyed and that's ok |
| } finally { |
| releaseDestroyReadLock(); |
| } |
| } |
| |
| /** |
| * Acquires a read lock on the destroy ReadWrite lock uninterruptibly using millis for try-lock |
| * attempt. |
| * |
| * @param millis the milliseconds to try to acquire lock within |
| * @return true if destroy read lock was acquired |
| * @throws DistributedSystemDisconnectedException if system has been disconnected |
| */ |
| private boolean acquireDestroyReadLock(long millis) throws InterruptedException { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| if (interrupted && this.dlock.isInterruptibleLockRequest()) { |
| throw new InterruptedException(); |
| } |
| while (true) { |
| try { |
| this.dm.getCancelCriterion().checkCancelInProgress(null); // is this needed? |
| boolean acquired = this.destroyLock.readLock().tryLock(millis); |
| return acquired; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| throwIfInterruptible(e); |
| } |
| } |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Releases a read lock on the destroy ReadWrite lock. |
| */ |
| private void releaseDestroyReadLock() { |
| this.destroyLock.readLock().unlock(); |
| } |
| |
| /** |
| * Acquires the write lock on the destroy ReadWrite lock within specified millis. |
| * |
| * @param millis the milliseconds to attempt to acquire the lock within |
| * @throws DistributedSystemDisconnectedException if system has been disconnected |
| */ |
| private void acquireDestroyWriteLock(long millis) { |
| for (;;) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| this.dm.getCancelCriterion().checkCancelInProgress(null); |
| boolean acquired = this.destroyLock.writeLock().tryLock(millis); |
| if (acquired) { |
| return; |
| } |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Releases the write lock on the destroy ReadWrite lock. |
| */ |
| private void releaseDestroyWriteLock() { |
| this.destroyLock.writeLock().unlock(); |
| } |
| |
| /** |
| * Returns time to wait in millis from now based on start and wait in request. |
| * |
| * @return the current wait time for the request before it times out |
| */ |
| private long calcWaitMillisFromNow(DLockRequestMessage request) { |
| long result = request.getTimeoutTS(); |
| if (result != Long.MAX_VALUE) { |
| long now = DLockService.getLockTimeStamp(this.dlock.getDistributionManager()); |
| result = result - now; |
| } |
| return result; |
| } |
| |
| |
| /** |
| * Shuts down the grantor thread and changes internal state to destroyed. |
| */ |
| private void makeDestroyed() { |
| try { |
| this.thread.shutdown(); |
| this.state = DESTROYED; |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "DLockGrantor {} state is DESTROYED", |
| this.dlock.getName()); |
| } |
| if (this.untilDestroyed.getCount() > 0) { |
| this.untilDestroyed.countDown(); |
| } |
| if (this.whileInitializing.getCount() > 0) { |
| this.whileInitializing.countDown(); |
| } |
| this.dlock.getDistributionManager().removeMembershipListener(this.membershipListener); |
| } finally { |
| this.dlock.getStats().incGrantors(-1); |
| } |
| } |
| |
| /** |
| * Returns a snapshot of the current grant tokens. |
| * <p> |
| * Synchronizes on grantTokens. |
| * |
| * @return a snapshot of the current grant tokens |
| */ |
| protected Collection snapshotGrantTokens() { |
| Collection snapshot = null; |
| synchronized (this.grantTokens) { |
| snapshot = new ArrayList(this.grantTokens.values()); |
| } |
| return snapshot; |
| } |
| |
| /** |
| * Fetches or creates a new grant token for the named lock. |
| * <p> |
| * Synchronizes on grantTokens and the grant token. |
| * |
| * @param name the name of the lock |
| * @return the grant token for the named lock |
| */ |
| private DLockGrantToken getOrCreateGrant(Object name) { |
| DLockGrantToken grantToken = null; |
| synchronized (this.grantTokens) { |
| grantToken = basicGetGrantToken(name); |
| if (grantToken == null) { // checking isTokenDestroyed here will deadlock |
| grantToken = new DLockGrantToken(this.dlock, this, name); |
| grantToken.incAccess(); |
| basicPutGrantToken(grantToken); |
| } else { |
| synchronized (grantToken) { |
| if (grantToken.isDestroyed()) { |
| grantToken = new DLockGrantToken(this.dlock, this, name); |
| grantToken.incAccess(); |
| basicPutGrantToken(grantToken); |
| } else { |
| grantToken.incAccess(); |
| } |
| } |
| } |
| } |
| return grantToken; |
| } |
| |
| /** |
| * TEST HOOK: Returns an unmodifible collection backed by the values of the DLockGrantToken map |
| * for testing purposes only. |
| * <p> |
| * Synchronizes on grantTokens. |
| * |
| * @return unmodifible collection of the grant tokens |
| */ |
| public Collection getGrantTokens() { |
| synchronized (this.grantTokens) { |
| return Collections.unmodifiableCollection(this.grantTokens.values()); |
| } |
| } |
| |
| /** |
| * Remove the grant token if it is unused. |
| * <p> |
| * Synchronizes on grantTokens and the grant token. |
| * |
| * @param grant the grant token to remove |
| */ |
| protected void removeGrantIfUnused(DLockGrantToken grant) { |
| synchronized (this.grantTokens) { |
| synchronized (grant) { |
| if (isDestroyed() || grant.isDestroyed()) { |
| return; |
| } else if (grant.grantLockToNextRequest()) { |
| return; |
| } else if (!grant.isBeingAccessed() && !grant.isGranted(false) |
| && !grant.hasWaitingRequests()) { |
| basicRemoveGrantToken(grant); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Iterates over grants and attempts to remove any that are no longer in use. |
| * <p> |
| * Synchronizes on grantTokens and the grant token. |
| * |
| * @param grants the grants to be checked for removal |
| */ |
| protected void removeUnusedGrants(Iterator grants) { |
| while (grants.hasNext()) { |
| DLockGrantToken grant = (DLockGrantToken) grants.next(); |
| removeGrantIfUnused(grant); |
| } |
| } |
| |
| /** |
| * Returns the DLockGrantToken from grant tokens map stored under the key name. |
| * <p> |
| * Synchronizes on grantTokens. |
| */ |
| public DLockGrantToken getGrantToken(Object name) { |
| synchronized (this.grantTokens) { |
| return basicGetGrantToken(name); |
| } |
| } |
| |
| /** |
| * Fetches the grant token value stored in the map under key name. |
| * <p> |
| * Caller must synchronize on grantTokens |
| * |
| * @param name the key to fetch the grant token value for |
| * @return the grant token stored under key name guarded.By {@link #grantTokens} |
| */ |
| private DLockGrantToken basicGetGrantToken(Object name) { |
| return (DLockGrantToken) this.grantTokens.get(name); |
| } |
| |
| /** |
| * Stores the grant token as a value in the map under the key of its name. |
| * <p> |
| * Caller must synchronize on grantTokens |
| * |
| * @param grantToken the grant token to store in the map guarded.By {@link #grantTokens} |
| */ |
| private void basicPutGrantToken(DLockGrantToken grantToken) { |
| this.grantTokens.put(grantToken.getName(), grantToken); |
| dlock.getStats().incGrantTokens(1); |
| } |
| |
| /** |
| * Removes the grant token from the map. |
| * <p> |
| * Caller must synchronize on grantTokens and then the grantToken. |
| * |
| * @param grantToken the grant token to remove from the map. guarded.By {@link #grantTokens} and |
| * grantToken |
| */ |
| private void basicRemoveGrantToken(DLockGrantToken grantToken) { |
| Object removed = this.grantTokens.remove(grantToken.getName()); // changed to ref token |
| if (removed != null) { |
| Assert.assertTrue(removed == grantToken); |
| grantToken.destroy(); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.basicRemoveGrantToken] removed {}; removed={}", grantToken, removed); |
| } |
| } |
| } |
| |
| /** |
| * Iterates over grants and handles any that have expired. |
| * <p> |
| * Synchronizes on each grant token. |
| * |
| * @param grants the grants to iterate over |
| * @return the next smallest expiration time |
| */ |
| protected long expireAndGrantLocks(Iterator grants) { |
| long smallestExpire = Long.MAX_VALUE; |
| while (grants.hasNext()) { |
| DLockGrantToken grant = (DLockGrantToken) grants.next(); |
| if (grant.isDestroyed()) { |
| continue; |
| } |
| long expire = grant.expireAndGrantLock(); |
| if (expire < smallestExpire) { |
| smallestExpire = expire; |
| } |
| } |
| return smallestExpire; |
| } |
| |
| /** |
| * Iterates over grants and handles any that have timed out. |
| * <p> |
| * Synchronizes on each grant token. |
| * |
| * @param grants the grants to iterate over |
| * @return the next smallest timeout |
| */ |
| protected long handleRequestTimeouts(Iterator grants) { |
| long smallestTimeout = Long.MAX_VALUE; |
| while (grants.hasNext()) { |
| DLockGrantToken grant = (DLockGrantToken) grants.next(); |
| if (grant.isDestroyed()) { |
| continue; |
| } |
| long timeout = grant.handleRequestTimeouts(); |
| if (timeout < smallestTimeout) { |
| smallestTimeout = timeout; |
| } |
| } |
| return smallestTimeout; |
| } |
| |
| /** |
| * TEST HOOK: Specifies time to sleep while handling suspend in order to cause a timeout. |
| * <p> |
| * Synchronizes on suspendLock. |
| * |
| */ |
| public void setDebugHandleSuspendTimeouts(int value) { |
| synchronized (suspendLock) { |
| debugHandleSuspendTimeouts = value; |
| } |
| } |
| |
| /** |
| * True to enable test hook to sleep while handling suspend to cause timeout. |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private int debugHandleSuspendTimeouts = 0; |
| |
| /** |
| * Iterates through a copy of suspendQueue and handles any requests that have timed out. |
| * <p> |
| * Synchronizes on suspendLock. |
| * |
| * @return the next smallest timeout in the suspendQueue |
| */ |
| protected long handleSuspendTimeouts() { |
| long smallestTimeout = Long.MAX_VALUE; |
| synchronized (suspendLock) { |
| if (suspendQueue.isEmpty()) |
| return smallestTimeout; |
| if (isDestroyed()) |
| return smallestTimeout; |
| } |
| List timeouts = new ArrayList(); |
| |
| List copySuspendQueue = null; |
| synchronized (suspendLock) { |
| copySuspendQueue = new ArrayList(suspendQueue); |
| } |
| |
| for (Iterator iter = copySuspendQueue.iterator(); iter.hasNext();) { |
| DLockRequestMessage req = (DLockRequestMessage) iter.next(); |
| if (req.checkForTimeout()) { // sends DLockResponseMessage if timeout |
| cleanupSuspendState(req); |
| timeouts.add(req); |
| } else { |
| long timeout = req.getTimeoutTS(); |
| if (timeout < smallestTimeout) { |
| smallestTimeout = timeout; |
| } |
| } |
| } |
| |
| int localDebugHandleSuspendTimeouts = 0; |
| synchronized (suspendLock) { |
| localDebugHandleSuspendTimeouts = debugHandleSuspendTimeouts; |
| } |
| if (localDebugHandleSuspendTimeouts > 0) { |
| try { |
| logger.info(LogMarker.DLS_MARKER, |
| "debugHandleSuspendTimeouts sleeping for {}", |
| localDebugHandleSuspendTimeouts); |
| Thread.sleep(localDebugHandleSuspendTimeouts); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (!timeouts.isEmpty()) { |
| synchronized (suspendLock) { |
| if (writeLockWaiters > 0) { |
| // suspenders exist... must iterate through for safe removal |
| for (Iterator iter = timeouts.iterator(); iter.hasNext();) { |
| DLockRequestMessage req = (DLockRequestMessage) iter.next(); |
| |
| // attempt to remove timed out req from suspendQueue |
| if (suspendQueue.remove(req)) { |
| // request was still in suspendQueue, so check if suspender |
| if (req.isSuspendLockingRequest()) { |
| writeLockWaiters--; |
| } |
| } |
| } // for |
| } else { |
| // no suspenders so safe to removeAll |
| Assert.assertTrue(writeLockWaiters == 0, |
| "Grantor state writeLockWaiters changed while holding suspendLock"); |
| suspendQueue.removeAll(timeouts); |
| } |
| checkWriteLockWaiters(); |
| } // synchronized |
| } |
| return smallestTimeout; |
| } |
| |
| /** |
| * Returns string representation for the enumerated grantor state. |
| * |
| * @param stateInt the number of the state to return a string for |
| * @return string representation for the enumerated grantor state |
| */ |
| private String stateToString(int stateInt) { |
| String stateDesc = null; |
| switch (stateInt) { |
| case INITIALIZING: |
| stateDesc = "INITIALIZING"; |
| break; |
| case READY: |
| stateDesc = "READY"; |
| break; |
| case DESTROYED: |
| stateDesc = "DESTROYED"; |
| break; |
| default: |
| stateDesc = null; |
| break; |
| } |
| if (stateDesc == null) { |
| throw new IllegalArgumentException(String.format("Unknown state for grantor: %s", |
| Integer.valueOf(state))); |
| } |
| return stateDesc; |
| } |
| |
| /** |
| * Throws IllegalStateException if this grantor is not still initializing. |
| * |
| * @throws IllegalStateException if this grantor is not still initializing |
| */ |
| private void assertInitializing() { |
| if (this.state != INITIALIZING) { |
| String stateDesc = stateToString(this.state); |
| throw new IllegalStateException( |
| String.format("DLockGrantor operation only allowed when initializing, not %s", |
| stateDesc)); |
| } |
| } |
| |
| /** |
| * Suspends locking by the remote thread and lease id. |
| * <p> |
| * Caller must synchronize on suspendLock. |
| * |
| * Concurrency: protected by synchronization of {@link #suspendLock} |
| * |
| * @param myRThread the remote thread that is about to suspend locking |
| * @param lockId the id of the lock request used to suspend locking |
| */ |
| protected void suspendLocking(final RemoteThread myRThread, final int lockId) { |
| if (DEBUG_SUSPEND_LOCK) { |
| Assert.assertHoldsLock(this.suspendLock, true); |
| } |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Suspend locking of {} by {} with lockId of {}", |
| this.dlock, myRThread, lockId); |
| } |
| Assert.assertTrue(myRThread != null, "Attempted to suspend locking for null RemoteThread"); |
| Assert.assertTrue(this.lockingSuspendedBy == null || this.lockingSuspendedBy.equals(myRThread), |
| "Attempted to suspend locking for " + myRThread + " but locking is already suspended by " |
| + this.lockingSuspendedBy); |
| this.suspendedLockId = lockId; |
| this.lockingSuspendedBy = myRThread; |
| } |
| |
| /** |
| * Resume locking after it has been suspended. |
| * <p> |
| * Caller must synchronize on suspendLock. |
| */ |
| private void resumeLocking() { |
| if (DEBUG_SUSPEND_LOCK) { |
| Assert.assertHoldsLock(this.suspendLock, true); |
| } |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Resume locking of {}", this.dlock); |
| } |
| this.lockingSuspendedBy = null; |
| this.suspendedLockId = INVALID_LOCK_ID; |
| } |
| |
| /** |
| * Returns true if locking has been suspended. |
| * <p> |
| * Caller must synchronize on suspendLock. |
| * |
| * Concurrency: protected by synchronization of {@link #suspendLock} |
| * |
| * @return true if locking has been suspended |
| */ |
| protected boolean isLockingSuspended() { |
| if (DEBUG_SUSPEND_LOCK) { |
| Assert.assertHoldsLock(this.suspendLock, true); |
| } |
| return this.lockingSuspendedBy != null; |
| } |
| |
| /** |
| * Returns true if locking has been suspended. |
| * <p> |
| * Synchronizes on suspendLock. |
| * |
| * @return true if locking has been suspended |
| */ |
| protected boolean isLockingSuspendedWithSync() { |
| synchronized (this.suspendLock) { |
| return this.lockingSuspendedBy != null; |
| } |
| } |
| |
| /** |
| * Returns true if locking has been suspended by the remote thread. |
| * <p> |
| * Caller must synchronize on suspendLock. |
| * |
| * Concurrency: protected by synchronization of {@link #suspendLock} |
| * |
| * @return true if locking has been suspended by the remote thread |
| */ |
| protected boolean isLockingSuspendedBy(final RemoteThread rThread) { |
| if (DEBUG_SUSPEND_LOCK) { |
| Assert.assertHoldsLock(this.suspendLock, true); |
| } |
| if (rThread == null) |
| return false; |
| return rThread.equals(this.lockingSuspendedBy); |
| } |
| |
| String displayStatus(RemoteThread rThread, Object name) { |
| StringBuffer sb = new StringBuffer(); |
| synchronized (this.suspendLock) { |
| sb.append(' '); |
| sb.append(this.toString()); |
| sb.append(" id=" + this.hashCode()); |
| sb.append(" rThread=" + rThread); |
| if (name != null) { |
| sb.append(" name=" + name); |
| } |
| sb.append(" permittedRequests (" + permittedRequests.size() + ")=" |
| + permittedRequests.toString() + ""); |
| sb.append(" suspendedLockId = " + suspendedLockId); |
| sb.append(" lockingSuspendedBy = " + lockingSuspendedBy); |
| sb.append(" writeLockWaiters = " + writeLockWaiters); |
| sb.append(" totalReadLockCount = " + totalReadLockCount); |
| sb.append("\nsuspendQueue (" + suspendQueue.size() + ")=" + suspendQueue.toString()); |
| // Kirk said it was ok to not log the list of readLockers to cut |
| // down on how much logging is done at fine level. |
| sb.append("\nreadLockers (" + readLockCountMap.size() |
| + ")" /* + "=" + readLockCountMap.toString() */); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * guarded.By {@link #suspendLock} |
| */ |
| private void postReleaseSuspendLock(RemoteThread rThread, Object lock) { |
| if (!isLockingSuspendedBy(rThread)) { |
| // hit bug related to 35749 |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[postReleaseSuspendLock] locking is no longer suspended by {}", rThread); |
| } |
| return; |
| } |
| boolean resume = true; |
| Integer integer = (Integer) readLockCountMap.get(rThread); |
| int readLockCount = integer == null ? 0 : integer.intValue(); |
| if (readLockCount == 0 && !suspendQueue.isEmpty()) { |
| final DLockRequestMessage nextRequest = (DLockRequestMessage) suspendQueue.getFirst(); |
| if (nextRequest.isSuspendLockingRequest()) { |
| resume = false; |
| // final RemoteThread myRemoteThread = nextRequest.getRemoteThread(); |
| // hand-off suspendLocking while under sync... |
| resumeLocking(); |
| suspendLocking(nextRequest.getRemoteThread(), nextRequest.getLockId()); |
| permittedRequests.add(suspendQueue.removeFirst()); |
| writeLockWaiters--; |
| checkWriteLockWaiters(); |
| } |
| } |
| if (resume) { |
| resumeLocking(); |
| // drain readLocks from suspendQueue into permittedRequests queue |
| while (!suspendQueue.isEmpty()) { |
| final DLockRequestMessage nextRequest = (DLockRequestMessage) suspendQueue.getFirst(); |
| if (nextRequest.isSuspendLockingRequest()) { |
| Assert.assertTrue(writeLockWaiters > 0, |
| "SuspendLocking request is waiting but writeLockWaiters is 0"); |
| break; |
| } |
| RemoteThread nextRThread = nextRequest.getRemoteThread(); |
| integer = (Integer) readLockCountMap.get(nextRThread); |
| readLockCount = integer == null ? 0 : integer.intValue(); |
| readLockCount++; |
| readLockCountMap.put(nextRThread, Integer.valueOf(readLockCount)); |
| totalReadLockCount++; |
| checkTotalReadLockCount(); |
| permittedRequests.add(suspendQueue.removeFirst()); |
| } |
| } |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[postReleaseSuspendLock] new status {}", |
| displayStatus(rThread, null)); |
| } |
| } |
| |
| /** |
| * guarded.By {@link #suspendLock} |
| */ |
| private void postReleaseReadLock(RemoteThread rThread, Object lock) { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| |
| // handle release of regular lock |
| // boolean permitSuspend = false; |
| Integer integer = (Integer) readLockCountMap.get(rThread); |
| int readLockCount = integer == null ? 0 : integer.intValue(); |
| if (readLockCount < 1) { |
| // hit bug 35749 |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[postReleaseReadLock] no locks are currently held by {}", rThread); |
| } |
| return; |
| } |
| readLockCount--; |
| |
| if (readLockCount == 0) { |
| readLockCountMap.remove(rThread); |
| } else { |
| readLockCountMap.put(rThread, Integer.valueOf(readLockCount)); |
| } |
| totalReadLockCount--; |
| |
| if (totalReadLockCount < 0) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Total readlock count has dropped to {} for {}", |
| totalReadLockCount, this); |
| } |
| } |
| if (totalReadLockCount == 0 && !suspendQueue.isEmpty()) { |
| final DLockRequestMessage nextRequest = (DLockRequestMessage) suspendQueue.getFirst(); |
| if (nextRequest.isSuspendLockingRequest()) { |
| suspendLocking(nextRequest.getRemoteThread(), nextRequest.getLockId()); |
| writeLockWaiters--; |
| permittedRequests.add(suspendQueue.removeFirst()); |
| checkWriteLockWaiters(); |
| } else { |
| String s = new StringBuilder("\n (readLockCount=").append(readLockCount) |
| .append(", totalReadLockCount=").append(totalReadLockCount) |
| .append(", writeLockWaiters=").append(writeLockWaiters).append(",\nsuspendQueue=") |
| .append(suspendQueue).append(",\npermittedRequests=").append(permittedRequests) |
| .toString(); |
| logger.warn("Released regular lock with waiting read lock: {}", s); |
| Assert.assertTrue(false, |
| String.format("Released regular lock with waiting read lock: %s", |
| s)); |
| } |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[postReleaseReadLock] new status {}", |
| displayStatus(rThread, null)); |
| } |
| checkTotalReadLockCount(); |
| } |
| |
| /** |
| * Handles post release lock tasks including tracking the current suspend locking states. |
| * <p> |
| * Synchronizes on suspendLock. |
| * |
| * @param rThread the remote thread that released the lock |
| * @param lock the named lock that was released |
| */ |
| protected void postReleaseLock(RemoteThread rThread, Object lock) { |
| Assert.assertTrue(rThread != null); |
| synchronized (suspendLock) { |
| checkDestroyed(); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[postReleaseLock] rThread={} lock={} permittedRequests={} suspendQueue={}", rThread, |
| lock, permittedRequests, suspendQueue); |
| } |
| if (DLockService.SUSPEND_LOCKING_TOKEN.equals(lock)) { |
| postReleaseSuspendLock(rThread, lock); |
| } else { |
| postReleaseReadLock(rThread, lock); |
| } |
| } // suspendLock sync |
| } |
| |
| /** |
| * Departure or other codepath NOT specific to unlock requires that we cleanup suspend state that |
| * was already permitted to request. This needs to be invoked for both regular and suspend locks. |
| * <p> |
| * Synchronizes on suspendLock. |
| * |
| * @param request the request to cleanup after due to departure of sender |
| */ |
| protected void cleanupSuspendState(DLockRequestMessage request) { |
| postReleaseLock(request.getRemoteThread(), request.getObjectName()); |
| } |
| |
| /** |
| * Drains newly permitted requests that have been removed from suspendQueue. All requests in the |
| * permittedRequests queue already have permission to proceed with granting or scheduling. |
| * <p> |
| * Caller must acquire destroyReadLock. Synchronizes on suspendLock, grantTokens and each grant |
| * token. |
| * |
| * Concurrency: protected by {@link #destroyLock} via invoking |
| * {@link #acquireDestroyReadLock(long)} |
| */ |
| protected void drainPermittedRequests() { |
| ArrayList drain = null; |
| synchronized (suspendLock) { |
| checkDestroyed(); |
| if (this.permittedRequests.isEmpty()) { |
| return; |
| } |
| drain = this.permittedRequests; |
| this.permittedRequestsDrain.add(drain); |
| this.permittedRequests = new ArrayList(); |
| } // suspendLock sync |
| |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[drainPermittedRequests] draining {}", drain); |
| } |
| |
| // iterate and attempt to grantOrSchedule each request |
| for (Iterator iter = drain.iterator(); iter.hasNext();) { |
| DLockRequestMessage request = (DLockRequestMessage) iter.next(); |
| checkDestroyed(); // destroyAndRemove should respond to all of these |
| try { |
| handlePermittedLockRequest(request); // synchronizes on grant instance |
| } catch (LockGrantorDestroyedException e) { |
| try { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "LockGrantorDestroyedException respondWithNotGrantor to {}", request); |
| } |
| request.respondWithNotGrantor(); |
| } finally { |
| |
| } |
| } catch (LockServiceDestroyedException e) { |
| try { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "LockServiceDestroyedException respondWithNotGrantor to {}", request); |
| } |
| request.respondWithNotGrantor(); |
| } finally { |
| |
| } |
| } catch (RuntimeException e) { |
| logger.error( |
| "Processing of postRemoteReleaseLock threw unexpected RuntimeException", |
| e); |
| request.respondWithException(e); |
| } finally { |
| |
| } |
| } |
| |
| synchronized (suspendLock) { |
| checkDestroyed(); |
| this.permittedRequestsDrain.remove(drain); |
| } |
| } |
| |
| /** |
| * Synchronizes on suspendLock. |
| */ |
| private boolean acquireSuspendLockPermission(DLockRequestMessage request) { |
| boolean permitLockRequest = false; |
| final RemoteThread rThread = request.getRemoteThread(); |
| Assert.assertTrue(rThread != null); |
| |
| synchronized (suspendLock) { |
| checkDestroyed(); |
| if (!dm.isCurrentMember(request.getSender())) { |
| logger.info(LogMarker.DLS_MARKER, "Ignoring lock request from non-member: {}", request); |
| return false; |
| } |
| Integer integer = (Integer) readLockCountMap.get(rThread); |
| int readLockCount = integer == null ? 0 : integer.intValue(); |
| boolean othersHaveReadLocks = totalReadLockCount > readLockCount; |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isLockingSuspended() || writeLockWaiters > 0 || othersHaveReadLocks) { |
| writeLockWaiters++; |
| suspendQueue.addLast(request); |
| this.thread.checkTimeToWait(calcWaitMillisFromNow(request), false); |
| checkWriteLockWaiters(); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.acquireSuspend] added '{}' to end of suspendQueue.", request); |
| } |
| } else { |
| permitLockRequest = true; |
| suspendLocking(rThread, request.getLockId()); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.acquireSuspendLockPermission] permitted and suspended for {}", |
| request); |
| } |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.acquireSuspendLockPermission] new status permitLockRequest = {}{}", |
| permitLockRequest, displayStatus(rThread, null)); |
| } |
| } // suspendLock sync |
| return permitLockRequest; |
| } |
| |
| /** |
| * Synchronizes on suspendLock. |
| */ |
| private boolean acquireReadLockPermission(DLockRequestMessage request) { |
| boolean permitLockRequest = false; |
| final RemoteThread rThread = request.getRemoteThread(); |
| Assert.assertTrue(rThread != null); |
| synchronized (suspendLock) { |
| checkDestroyed(); |
| if (!dm.isCurrentMember(request.getSender())) { |
| logger.info(LogMarker.DLS_MARKER, "Ignoring lock request from non-member: %s", request); |
| return false; |
| } |
| Integer integer = (Integer) readLockCountMap.get(rThread); |
| int readLockCount = integer == null ? 0 : integer.intValue(); |
| boolean threadHoldsLock = readLockCount > 0 || isLockingSuspendedBy(rThread); |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (!threadHoldsLock && (isLockingSuspended() || writeLockWaiters > 0)) { |
| suspendQueue.addLast(request); |
| this.thread.checkTimeToWait(calcWaitMillisFromNow(request), false); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.acquireReadLockPermission] added {} to end of suspendQueue.", request); |
| } |
| } else { |
| readLockCount++; |
| readLockCountMap.put(rThread, Integer.valueOf(readLockCount)); |
| totalReadLockCount++; |
| permitLockRequest = true; |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.acquireReadLockPermission] permitted {}", request); |
| } |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.acquireReadLockPermission] new status threadHoldsLock = {} permitLockRequest = {}{}", |
| threadHoldsLock, permitLockRequest, displayStatus(rThread, null)); |
| } |
| checkTotalReadLockCount(); |
| } // suspendLock sync |
| return permitLockRequest; |
| } |
| |
| /** |
| * Returns true if lock request has permission to proceed; else adds the request to the end of |
| * suspendQueue and returns false. |
| * <p> |
| * Synchronizes on suspendLock. |
| * |
| * @param request the lock request to acquire permission for |
| */ |
| private boolean acquireLockPermission(final DLockRequestMessage request) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.acquireLockPermission] {}", request); |
| } |
| |
| boolean permitLockRequest = false; |
| if (request.getObjectName().equals(DLockService.SUSPEND_LOCKING_TOKEN)) { |
| permitLockRequest = acquireSuspendLockPermission(request); |
| } else { |
| permitLockRequest = acquireReadLockPermission(request); |
| } |
| |
| return permitLockRequest; |
| } |
| |
| /** |
| * Throws InterruptedException if local lock request exists and is interruptible or |
| * CancelException if DistributionManager is forcing us to cancel for shutdown. |
| * |
| * @param e the throwable that caused this check |
| */ |
| private void throwIfInterruptible(InterruptedException e) throws InterruptedException { |
| // This needs to be first, otherwise user gets a complaint that |
| // the TV is off when the problem is the house is burning down... |
| this.dm.getCancelCriterion().checkCancelInProgress(e); |
| |
| if (this.dlock.isInterruptibleLockRequest()) { |
| throw e; |
| } |
| } |
| |
| /** |
| * TEST HOOK: Logs all grant tokens and other lock information for this service at INFO level. |
| * <p> |
| * Synchronizes on grantTokens. |
| */ |
| protected void dumpService() { |
| synchronized (this.grantTokens) { |
| StringBuffer buffer = new StringBuffer(); |
| buffer.append("DLockGrantor.dumpService() for ").append(this); |
| buffer.append("\n").append(this.grantTokens.size()).append(" grantTokens\n"); |
| for (Iterator iter = this.grantTokens.entrySet().iterator(); iter.hasNext();) { |
| Map.Entry entry = (Map.Entry) iter.next(); |
| buffer.append(" ").append(entry.getKey()).append(": "); |
| DLockGrantToken token = (DLockGrantToken) entry.getValue(); |
| buffer.append(token.toString()).append("\n"); |
| } |
| logger.info(LogMarker.DLS_MARKER, "{}", buffer); |
| logger.info(LogMarker.DLS_MARKER, "{}", |
| "\nreadLockCountMap:\n" + readLockCountMap); |
| } |
| } |
| |
| /** |
| * Verify the waiters (for debugging) |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private void checkWriteLockWaiters() { |
| if (!DEBUG_SUSPEND_LOCK) { |
| return; |
| } |
| Assert.assertHoldsLock(this.suspendLock, true); |
| int result = 0; |
| Iterator it = this.suspendQueue.iterator(); |
| while (it.hasNext()) { |
| DLockRequestMessage r = (DLockRequestMessage) it.next(); |
| if (r.isSuspendLockingRequest()) { |
| result++; |
| } |
| } // while |
| Assert.assertTrue(result == this.writeLockWaiters); |
| } |
| |
| /** |
| * Debugging method |
| * |
| * guarded.By {@link #suspendLock} |
| */ |
| private void checkTotalReadLockCount() { |
| if (!DEBUG_SUSPEND_LOCK) { |
| return; |
| } |
| Assert.assertHoldsLock(this.suspendLock, true); |
| int result = 0; |
| Iterator it = readLockCountMap.values().iterator(); |
| while (it.hasNext()) { |
| result += ((Integer) it.next()).intValue(); |
| } |
| Assert.assertTrue(result == totalReadLockCount); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // DLockGrantToken (static inner class) |
| // ------------------------------------------------------------------------- |
| /** |
| * Handles leasing and queued scheduling for an individual distributed lock. |
| */ |
| public static class DLockGrantToken { |
| |
| /** |
| * DLS which contains this lock. Reference is used for stats and lifecycle. |
| */ |
| private final DLockService dlock; |
| |
| /** |
| * Grantor instance that handles leasing of this lock. |
| */ |
| private final DLockGrantor grantor; |
| |
| /** |
| * The uniquely identifying object name for this lock |
| */ |
| private final Object lockName; |
| |
| /** |
| * Pending requests queued up for the lock |
| * |
| * guarded.By this |
| */ |
| private LinkedList pendingRequests; |
| |
| /** |
| * The reply processor id is used to identify the specific lock operation used by the lessee to |
| * lease this lock |
| * |
| * guarded.By this |
| */ |
| private int leaseId = -1; |
| |
| /** |
| * Distributed member that currently has a lease on this lock |
| * |
| * guarded.By this |
| */ |
| private InternalDistributedMember lessee; |
| |
| /** |
| * Absolute time in milliseconds when the current lease will expire. When this lock is not |
| * leased out, the value is -1. When the lock is leased out, the value is > 0. A value of |
| * Long.MAX_VALUE indicates a non-expiring (infinite) lease. |
| * |
| * guarded.By this |
| */ |
| private long leaseExpireTime = -1; |
| |
| /** |
| * Current count of threads attempting to access this grant token. |
| * |
| * guarded.By this |
| */ |
| private int accessCount = 0; |
| |
| /** |
| * True if this token has been destroyed and removed from usage. |
| * |
| * guarded.By this |
| */ |
| private boolean destroyed = false; |
| |
| /** |
| * RemoteThread identity of thread currently holding lease on this lock |
| * |
| * guarded.By this |
| */ |
| private RemoteThread lesseeThread = null; |
| |
| /** |
| * Instatiates a new instance of DLockGrantToken. |
| * |
| * @param dlock the lock service scope for this lock |
| * @param grantor the grantor handling locks for the lock service |
| * @param name the name of this lock |
| */ |
| protected DLockGrantToken(DLockService dlock, DLockGrantor grantor, Object name) { |
| this.lockName = name; |
| this.dlock = dlock; |
| this.grantor = grantor; |
| } |
| |
| /** |
| * Schedules the lock request for immediate or later granting of lock. This will grant the lock |
| * if it is available, otherwise it will add the request at the end of the pending requests |
| * queue. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @param request the request to grant or schedule |
| * @return true if the lock request was immediately granted |
| */ |
| protected synchronized boolean schedule(DLockRequestMessage request) { |
| if (!this.grantor.dm.isCurrentMember(request.getSender())) { |
| this.grantor.cleanupSuspendState(request); |
| return false; |
| } |
| |
| if (!isGranted(false) && !hasWaitingRequests()) { |
| // don't need to schedule... just grant it |
| if (grantLockToRequest(request)) { |
| return true; |
| } |
| } |
| |
| // add the request to the sorted set... |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.schedule] {} scheduling: {}", this, |
| request); |
| } |
| if (this.pendingRequests == null) { |
| this.pendingRequests = new LinkedList(); |
| this.dlock.getStats().incRequestQueues(1); |
| } |
| this.pendingRequests.add(request); |
| this.dlock.getStats().incPendingRequests(1); |
| return true; |
| } |
| |
| /** |
| * Sends NOT_GRANTOR replies to every request waiting for this grant token and then destroys the |
| * grant token. |
| * <p> |
| * Synchronizes on this grant token. |
| */ |
| protected synchronized void handleGrantorDestruction() { |
| try { |
| if (this.pendingRequests != null) { |
| for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) { |
| DLockRequestMessage request = (DLockRequestMessage) iter.next(); |
| request.respondWithNotGrantor(); |
| } |
| } |
| } finally { |
| destroy(); |
| } |
| } |
| |
| /** |
| * Checks current lock for expiration and attempts to grant the lock if it is available. |
| * <p> |
| * Synchronizes on this grant token. |
| * <p> |
| * NOTE: expiration is only as accurate as clock synchronization on the hardware that the |
| * members are running on probably should have Requestors handle expirations and send Release |
| * msg - need an Evictor thread in each Requestor |
| * |
| * @return the lease expiration time in millis for the currently held lock or Long.MAX_VALUE if |
| * lock has no owner |
| */ |
| protected synchronized long expireAndGrantLock() { |
| // isGranted calls checkForExpiration... |
| if (this.grantor.isDestroyed()) |
| return Long.MAX_VALUE; |
| if (!isGranted(true) && !this.grantor.isLockingSuspendedWithSync()) { |
| grantLockToNextRequest(); |
| } |
| long result = getLeaseExpireTime(); |
| if (result <= 0) { |
| result = Long.MAX_VALUE; |
| } |
| return result; |
| } |
| |
| /** |
| * Returns true if there are pending requests waiting to lock this grant token. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * Concurrency: protected by synchronization of *this* DLockGrantToken |
| * |
| * @return true if there are pending requests waiting to lock this |
| */ |
| protected synchronized boolean hasWaitingRequests() { |
| if (this.pendingRequests == null) |
| return false; |
| return !this.pendingRequests.isEmpty(); |
| } |
| |
| /** |
| * Grant this lock to the request if possible. Returns true if lock was granted to the request. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @param request the lock request asking for this lock |
| * @return true if lock was granted to the request |
| */ |
| protected synchronized boolean grantLockToRequest(DLockRequestMessage request) { |
| Assert.assertTrue(request.getRemoteThread() != null); |
| if (isGranted(true) || hasWaitingRequests()) { |
| return false; |
| } |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.grantLockToRequest] granting: {}", |
| request); |
| } |
| |
| long newLeaseExpireTime = grantAndRespondToRequest(request); |
| if (newLeaseExpireTime == -1) |
| return false; |
| |
| if (newLeaseExpireTime < Long.MAX_VALUE) { |
| long now = DLockService.getLockTimeStamp(this.grantor.dm); |
| this.grantor.thread.checkTimeToWait(newLeaseExpireTime - now, true); |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Called to release a remote lock when processing a DLockReleaseMessage. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * <p> |
| * Call stack: DLockReleaseMessage -> releaseIfLocked -> getAndReleaseGrantIfLockedBy -> |
| * grant.releaseIfLockedBy |
| * |
| * Concurrency: protected by synchronization of *this* DLockGrantToken |
| * |
| * @param owner the member to release the lock for |
| * @param lockId the lock id that the member used to acquire the lock |
| */ |
| protected void releaseIfLockedBy(InternalDistributedMember owner, int lockId) { |
| final RemoteThread rThread = getRemoteThread(); |
| boolean released = false; |
| try { |
| released = releaseLock(owner, lockId); |
| } catch (IllegalStateException e) { |
| this.dlock.checkDestroyed(); |
| this.grantor.checkDestroyed(); |
| // must have hit race... grantor doesn't have the token |
| return; |
| } |
| if (released) { |
| // don't bother synchronizing requests for this log statement... |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| synchronized (this) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.releaseIfLockedBy] pending requests: {}", |
| (this.pendingRequests == null ? "none" : "" + this.pendingRequests.size())); |
| } |
| } |
| Assert.assertTrue(rThread != null); |
| // releaseIfLockedBy (remote unlock) |
| this.grantor.postReleaseLock(rThread, getName()); |
| // note: DLockReleaseMessage calls drainPermittedRequests next... |
| } |
| } |
| |
| /** |
| * Returns true if lock is currently leased by the owner with the specified lock id. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * Concurrency: protected by synchronization of *this* DLockGrantToken |
| * |
| * @param owner the member to check for lock ownership |
| * @param lockId the lock id that the member used for locking |
| * @return true if lock is currently leased by the owner with the specified lock id |
| */ |
| protected boolean isLockedBy(InternalDistributedMember owner, int lockId) { |
| return isLeaseHeldBy(owner, lockId); |
| } |
| |
| /** |
| * Handle timeouts for requests waiting on this lock. Any requests that have timed out will be |
| * removed. Calculates and returns the next smallest timeout of the requests still waiting on |
| * this lock. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @return next smallest timeout of the requests still waiting on this lock |
| */ |
| protected long handleRequestTimeouts() { |
| long smallestTimeout = Long.MAX_VALUE; |
| synchronized (this) { |
| if (this.pendingRequests == null) |
| return smallestTimeout; |
| if (this.grantor.isDestroyed()) |
| return smallestTimeout; |
| } |
| List timeouts = new ArrayList(); |
| |
| // narrow timeouts to just contain requests that have timed out... |
| DLockRequestMessage req = null; |
| // ... copyRequests is synchronized on this ... |
| synchronized (this) { |
| for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) { |
| req = (DLockRequestMessage) iter.next(); |
| if (req.checkForTimeout()) { // sends DLockResponseMessage if timeout |
| this.grantor.cleanupSuspendState(req); |
| timeouts.add(req); |
| } else { |
| long timeout = req.getTimeoutTS(); |
| if (timeout < smallestTimeout) { |
| smallestTimeout = timeout; |
| } |
| } |
| } |
| removeRequests(timeouts); |
| } |
| |
| return smallestTimeout; |
| } |
| |
| /** |
| * Cleans up any state for the departed member. If the lock is held by this member, it will be |
| * released. Any pending lock requests for this member will be removed. |
| * <p> |
| * Synchronizes on this grant token, suspendLock, and grantTokens. |
| * |
| * @param member the departed member |
| */ |
| protected void handleDepartureOf(final InternalDistributedMember member, |
| final ArrayList grantsToRemoveIfUnused) { |
| boolean released = false; |
| RemoteThread rThread = null; |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| try { |
| synchronized (this) { |
| try { |
| if (isDestroyed()) |
| return; |
| if (this.pendingRequests == null) |
| return; |
| // remove member from pendingRequests... |
| DLockRequestMessage req = null; |
| for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) { |
| req = (DLockRequestMessage) iter.next(); |
| if (member.equals(req.getSender())) { |
| // found departed member, respondWithNotHolder to end dlock stats |
| try { |
| req.handleDepartureOfSender(); |
| // cleanup suspend state for this request |
| this.grantor.cleanupSuspendState(req); |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.handleDepartureOf] ignored cancellation (1)"); |
| } |
| } |
| // remove the request |
| iter.remove(); |
| this.dlock.getStats().incPendingRequests(-1); |
| } |
| } |
| } finally { |
| synchronized (this) { |
| // bugfix 32657 release lock AFTER removing member from queued requests |
| // because release will grant to first request in queued requests |
| rThread = getRemoteThread(); |
| boolean releasedToken = false; |
| try { |
| releasedToken = releaseLock(member, getLockId()); |
| } catch (IllegalStateException e) { |
| this.dlock.checkDestroyed(); |
| this.grantor.checkDestroyed(); |
| // must have hit race... grantor doesn't have the token |
| return; |
| } |
| if (releasedToken) { |
| released = true; |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.handleDepartureOf] pending requests: {}", |
| (this.pendingRequests == null ? "none" : "" + this.pendingRequests.size())); |
| } |
| Assert.assertTrue(rThread != null); |
| } |
| } |
| } |
| } |
| } finally { |
| if (released) { |
| try { |
| this.grantor.postReleaseLock(rThread, getName()); |
| } catch (CancelException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.handleDepartureOf] ignored cancellation (2)"); |
| } |
| } |
| this.grantor.drainPermittedRequests(); // destroyReadLock{grant{}, suspendLock{}} |
| grantsToRemoveIfUnused.add(this); |
| } |
| } |
| } |
| |
| /** |
| * Adds this grant to the list if it references the departed member. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @param member the departed member |
| * @param grantsReferencingMember list to add grant to if it references departed member |
| */ |
| protected synchronized void checkDepartureOf(final InternalDistributedMember member, |
| final List grantsReferencingMember) { |
| |
| if (this.destroyed) { |
| return; |
| } |
| if (member.equals(this.lessee)) { |
| grantsReferencingMember.add(this); |
| return; |
| } |
| if (this.pendingRequests != null) { |
| DLockRequestMessage req = null; |
| for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) { |
| req = (DLockRequestMessage) iter.next(); |
| if (member.equals(req.getSender())) { |
| grantsReferencingMember.add(this); |
| return; |
| } |
| } |
| } |
| } |
| |
| /** |
| * Remove all the specified pending requests. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @param requestsToRemove the pending requests to remove guarded.By this |
| */ |
| private void removeRequests(Collection requestsToRemove) { |
| if (!requestsToRemove.isEmpty()) { |
| synchronized (this) { |
| this.pendingRequests.removeAll(requestsToRemove); |
| } |
| this.dlock.getStats().incPendingRequests(-requestsToRemove.size()); |
| } |
| } |
| |
| /** |
| * Grants this lock to the next waiting request if one exists. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * Concurrency: protected by synchronization of *this* DLockGrantToken |
| * |
| * @return true if the lock was granted to next request |
| */ |
| protected boolean grantLockToNextRequest() { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.grantLock] {} isGranted={} hasWaitingRequests={}", getName(), |
| isLeaseHeld(), hasWaitingRequests()); |
| } |
| |
| while (!isGranted(true) && hasWaitingRequests()) { |
| try { |
| // get request at front of queue... |
| DLockRequestMessage request = null; |
| synchronized (this) { |
| request = (DLockRequestMessage) this.pendingRequests.remove(0); |
| } |
| this.dlock.getStats().incPendingRequests(-1); |
| |
| // grant lock to the request unless it is timed out... |
| if (request.checkForTimeout()) { |
| this.grantor.cleanupSuspendState(request); |
| continue; |
| } |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.grantLock] granting {} to {}", |
| getName(), request.getSender()); |
| } |
| |
| long newLeaseExpireTime = grantAndRespondToRequest(request); |
| if (newLeaseExpireTime == -1) |
| continue; |
| |
| if (newLeaseExpireTime < Long.MAX_VALUE) { |
| long now = DLockService.getLockTimeStamp(this.grantor.dm); |
| this.grantor.thread.checkTimeToWait(newLeaseExpireTime - now, true); |
| } |
| |
| } catch (IndexOutOfBoundsException e) { |
| // ignore... entry may have timed out between empty check and remove |
| } |
| } |
| |
| return isGranted(false); |
| } |
| |
| /** |
| * Grants the lock to the specified request and sends a reply to the member that initiated the |
| * request. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @param request the request to grant the lock to |
| * @return leaseExpireTime or -1 if failed to grant. guarded.By this |
| */ |
| private long grantAndRespondToRequest(DLockRequestMessage request) { |
| |
| synchronized (request) { |
| if (request.respondedNoSync()) { |
| return -1; |
| } |
| |
| Assert.assertTrue(request.getRemoteThread() != null); |
| if (!this.grantor.dm.isCurrentMember(request.getSender())) { |
| this.grantor.cleanupSuspendState(request); |
| return -1; |
| } |
| |
| if (isSuspendLockingToken()) { |
| synchronized (this.grantor.suspendLock) { |
| Assert.assertTrue( |
| this.grantor.lockingSuspendedBy == null |
| || this.grantor.isLockingSuspendedBy(request.getRemoteThread()), |
| "Locking is suspended by " + this.grantor.lockingSuspendedBy + " with lockId of " |
| + this.grantor.suspendedLockId + " instead of " + request.getRemoteThread() |
| + " with lockId of " + request.getLockId()); |
| } // suspendLock sync |
| } |
| |
| long newLeaseExpireTime = calcLeaseExpireTime(request.getLeaseTime()); |
| |
| grantLock(request.getSender(), newLeaseExpireTime, request.getLockId(), |
| request.getRemoteThread()); |
| |
| if (isSuspendLockingToken()) { |
| synchronized (this.grantor.suspendLock) { |
| // no-op if already suspend by this RemoteThread... |
| this.grantor.suspendLocking(request.getRemoteThread(), request.getLockId()); |
| Assert.assertTrue(this.grantor.isLockingSuspendedBy(request.getRemoteThread()), |
| "Locking should now be suspended by " + request.getRemoteThread() |
| + " with lockId of " + request.getLockId() + " instead of " |
| + this.grantor.lockingSuspendedBy + " with lockId of " |
| + this.grantor.suspendedLockId); |
| } // suspendLock sync |
| } |
| |
| // NOTE: if grantor is local and client interrupts the request, the |
| // following will release the lock because the reply processor is gone |
| request.respondWithGrant(newLeaseExpireTime); |
| if (!isLeaseHeldBy(request.getSender(), request.getLockId())) { |
| // lock request was local and interrupted then released |
| return -1; |
| } |
| return newLeaseExpireTime; |
| } |
| } |
| |
| /** |
| * Returns the absolute time at which the specified lease time will expire from now. This call |
| * does not change or check any state other than current time. |
| * |
| * @param leaseTime the desired length of lease time |
| * @return the absolute time at which the lease will expire |
| */ |
| protected long calcLeaseExpireTime(long leaseTime) { |
| if (leaseTime == Long.MAX_VALUE || leaseTime == -1) { |
| return Long.MAX_VALUE; |
| } |
| |
| long currentTime = getCurrentTime(); |
| long newLeaseExpireTime = currentTime + leaseTime; |
| if (newLeaseExpireTime < leaseTime) { // rolled over MAX_VALUE... |
| newLeaseExpireTime = Long.MAX_VALUE; |
| } |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.calcLeaseExpireTime] currentTime={} newLeaseExpireTime={}", |
| currentTime, newLeaseExpireTime); |
| } |
| return newLeaseExpireTime; |
| } |
| |
| /** |
| * Returns true if this grant token is currently granted. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * Concurrency: protected by synchronization of *this* DLockGrantToken |
| * |
| * @param checkForExpiration true if expiration should be attempted before checking if this |
| * grant token is currently granted |
| * @return true if this grant token is currently granted |
| */ |
| protected boolean isGranted(boolean checkForExpiration) { |
| if (checkForExpiration) { |
| checkForExpiration(); |
| } |
| return isLeaseHeld(); |
| } |
| |
| /** |
| * Creates a string of the pending requests for logging or debugging. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @return a string of the pending requests for logging or debugging guarded.By this |
| */ |
| private String pendingRequestsToString() { |
| if (this.pendingRequests == null) { |
| return "(null)"; |
| } |
| StringBuffer sb = new StringBuffer(); |
| Iterator it = this.pendingRequests.iterator(); |
| while (it.hasNext()) { |
| Object req = it.next(); |
| sb.append("["); |
| sb.append(req.toString()); |
| sb.append("]"); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Returns string representation of this object. |
| * <p> |
| * Synchronizes on this grant token. |
| */ |
| @Override |
| public String toString() { |
| return toString(true); |
| } |
| |
| /** |
| * Returns string representation of this object. |
| * <p> |
| * Synchronizes on this grant token. |
| * <p> |
| * |
| * @param displayPendingRequests true if string should include pendingRequests |
| */ |
| public String toString(boolean displayPendingRequests) { |
| StringBuffer sb = new StringBuffer("DLockGrantToken"); |
| sb.append("@").append(Integer.toHexString(hashCode())); |
| synchronized (this) { |
| sb.append(" {name: ").append(getName()); |
| sb.append(", isGranted: ").append(isLeaseHeld()); |
| sb.append(", isDestroyed: ").append(this.destroyed); |
| sb.append(", accessCount: ").append(this.accessCount); |
| sb.append(", lessee: ").append(this.lessee); |
| sb.append(", leaseExpireTime: ").append(this.leaseExpireTime); |
| sb.append(", leaseId: ").append(this.leaseId); |
| sb.append(", lesseeThread: ").append(this.lesseeThread); |
| if (displayPendingRequests) { |
| sb.append(", pendingRequests: ").append(pendingRequestsToString()); |
| } |
| sb.append("}"); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Returns the name of this lock. |
| * |
| * @return the name of this lock |
| */ |
| Object getName() { |
| return this.lockName; |
| } |
| |
| /** |
| * Returns true if this lock represents suspend locking. |
| * |
| * @return true if this lock represents suspend locking |
| * @see org.apache.geode.distributed.DistributedLockService#suspendLocking(long) |
| */ |
| boolean isSuspendLockingToken() { |
| return DLockService.SUSPEND_LOCKING_TOKEN.equals(this.lockName); |
| } |
| |
| /** |
| * Returns the lock id used to lease this lock. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @return the lock id used to lease this lock guarded.By this |
| */ |
| int getLockId() { |
| return this.leaseId; |
| } |
| |
| /** |
| * Returns the identity of the thread that has this lock leased. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @return the identity of the thread that has this lock leased guarded.By this |
| */ |
| RemoteThread getRemoteThread() { |
| return this.lesseeThread; |
| } |
| |
| /** |
| * Increments or decrements access count by the specified amount. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @param amount the amount to inc or dec access count by |
| */ |
| private synchronized void incAccess(int amount) { |
| if (amount < 0) { |
| Assert.assertTrue(this.accessCount - amount >= 0, |
| amount + " cannot be subtracted from accessCount " + this.accessCount); |
| } |
| this.accessCount += amount; |
| } |
| |
| /** |
| * Increments the access count by one. |
| * <p> |
| * Synchronizes on this grant token. |
| */ |
| void incAccess() { |
| incAccess(1); |
| } |
| |
| /** |
| * Decrements the access count by one. |
| * <p> |
| * Synchronizes on this grant token. |
| */ |
| void decAccess() { |
| incAccess(-1); |
| } |
| |
| /** |
| * Returns true if the access count is greater than zero. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @return true if the access count is greater than zero |
| */ |
| boolean isBeingAccessed() { |
| synchronized (this) { |
| return this.accessCount > 0; |
| } |
| } |
| |
| /** |
| * Returns the member that currently holds a lease on this lock. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @return the member that currently holds a lease on this lock |
| */ |
| public synchronized InternalDistributedMember getOwner() { |
| return this.lessee; |
| } |
| |
| /** |
| * Returns the lease expiration time. This the absolute time in milliseconds when the current |
| * lease will expire. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * Concurrency: protected by synchronization of *this* DLockGrantToken |
| * |
| * @return the lease expiration time |
| */ |
| public long getLeaseExpireTime() { |
| return this.leaseExpireTime; |
| } |
| |
| /** |
| * Returns true if this grant token has been destroyed. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @return true if this grant token has been destroyed |
| */ |
| public synchronized boolean isDestroyed() { |
| return this.destroyed; |
| } |
| |
| /** |
| * Returns the current time in milliseconds. |
| * |
| * @return the current time in milliseconds |
| */ |
| long getCurrentTime() { |
| return DLockService.getLockTimeStamp(this.grantor.dm); |
| } |
| |
| /** |
| * Handle expiration if the lease expire time has been reached for the current lease on this |
| * grant token. |
| * <p> |
| * Synchronizes on this grant token. |
| * |
| * @return true if the lease is expired |
| */ |
| synchronized boolean checkForExpiration() { |
| if (this.lessee != null && this.leaseId > -1) { |
| if (this.leaseExpireTime == Long.MAX_VALUE) |
| return false; |
| long currentTime = getCurrentTime(); |
| if (currentTime > this.leaseExpireTime) { |
| // expired! |
| |
| final RemoteThread rThread = this.lesseeThread; |
| |
| this.lessee = null; |
| this.leaseId = -1; |
| this.lesseeThread = null; |
| this.leaseExpireTime = -1; |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[checkForExpiration] Expired token at {}: {}", |
| currentTime, toString(true)); |
| } |
| |
| this.grantor.postReleaseLock(rThread, this.lockName); |
| |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Grants this lock. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @param owner the member that is being granted the lock |
| * @param newLeaseExpireTime the absolute expiration time |
| * @param lockId the lock id used to request the lock |
| * @param remoteThread identity of the locking thread guarded.By this |
| */ |
| void grantLock(InternalDistributedMember owner, long newLeaseExpireTime, int lockId, |
| RemoteThread remoteThread) { |
| Assert.assertTrue(remoteThread != null); |
| checkDestroyed(); |
| basicGrantLock(owner, newLeaseExpireTime, lockId, remoteThread); |
| } |
| |
| /** |
| * Modify grant token state to mark the lock as granted. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @param owner the member that has been granted the lock |
| * @param newLeaseExpireTime the absolute expiration time |
| * @param lockId the lock id used to request the lock |
| * @param remoteThread identity of the locking thread guarded.By this |
| */ |
| private void basicGrantLock(InternalDistributedMember owner, long newLeaseExpireTime, |
| int lockId, RemoteThread remoteThread) { |
| Assert.assertTrue(remoteThread != null); |
| Assert.assertTrue(lockId > -1, "Invalid attempt to grant lock with lockId " + lockId); |
| this.lessee = owner; |
| this.leaseExpireTime = newLeaseExpireTime; |
| this.leaseId = lockId; |
| this.lesseeThread = remoteThread; |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.grantLock.grantor] Granting {}", |
| toString(false)); |
| } |
| } |
| |
| /** |
| * Returns true if this lock is currently leased out. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @return true if this lock is currently leased out guarded.By this |
| */ |
| boolean isLeaseHeld() { |
| return this.lessee != null && this.leaseId > -1; |
| } |
| |
| /** |
| * Mark this grant token as destroyed. This should only happen to a token that is no longer in |
| * use. |
| * <p> |
| * Caller must synchronize on this grant token. guarded.By this |
| */ |
| void destroy() { |
| if (!this.destroyed) { |
| this.destroyed = true; |
| this.dlock.getStats().incGrantTokens(-1); |
| if (this.pendingRequests != null) { |
| this.dlock.getStats().incPendingRequests(-this.pendingRequests.size()); |
| this.dlock.getStats().incRequestQueues(-1); |
| } |
| } |
| } |
| |
| /** |
| * Throws IllegalStateException if this grant token has been destroyed. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @throws IllegalStateException if this grant token has been destroyed guarded.By this |
| */ |
| private void checkDestroyed() { |
| if (this.destroyed) { |
| String s = "Attempting to use destroyed grant token: " + this; |
| IllegalStateException e = new IllegalStateException(s); |
| throw e; |
| } |
| } |
| |
| /** |
| * Called by the grantor. Releases lock on this token if it is currently locked by the specified |
| * member and lockId. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @param member the member to release the lock from |
| * @param lockId the lock id that the member used when locking |
| * @return true if lock was released guarded.By this |
| */ |
| private boolean releaseLock(InternalDistributedMember member, int lockId) { |
| if (lockId == -1) |
| return false; |
| checkDestroyed(); |
| |
| if (isLeaseHeldBy(member, lockId)) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.releaseLock] releasing ownership: {}", this); |
| } |
| |
| this.lessee = null; |
| this.leaseId = -1; |
| this.lesseeThread = null; |
| this.leaseExpireTime = -1; |
| |
| return true; |
| } |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantToken.releaseLock] {} attempted to release: {}", member, this); |
| } |
| return false; |
| } |
| |
| /** |
| * Returns true if the sender holds a lease on this lock using lockId. |
| * <p> |
| * Caller must synchronize on this grant token. |
| * |
| * @param sender the member that potentially holds a lease |
| * @param lockId the lock id provided by the member |
| * @return true if the sender holds a lease on this lock guarded.By this |
| */ |
| private boolean isLeaseHeldBy(InternalDistributedMember sender, int lockId) { |
| Assert.assertTrue(sender != null, "sender is null: " + this); |
| Assert.assertTrue(lockId > -1, "lockId is < 0: " + this); |
| return sender.equals(this.lessee) && lockId == this.leaseId; |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // DLockGrantorThread (static inner class) |
| // ------------------------------------------------------------------------- |
| /** |
| * Thread dedicated to handling background tasks for this grantor. |
| */ |
| private static class DLockGrantorThread extends LoggingThread { |
| private static final long MAX_WAIT = 60 * 1000; // 60 seconds... |
| private volatile boolean shutdown = false; |
| private boolean waiting = false; |
| private boolean requireTimeToWait = false; |
| private boolean goIntoWait = false; |
| private long timeToWait = MAX_WAIT; |
| private long expectedWakeupTimeStamp = 0; |
| private final Object lock = new Object(); |
| private final DLockGrantor grantor; |
| private final CancelCriterion stopper; |
| |
| /** Time in millis that next pending request will timeout */ |
| private long nextTimeout = DLockGrantorThread.MAX_WAIT; |
| |
| /** Time in millis that next lock is due to expire */ |
| private long nextExpire = DLockGrantorThread.MAX_WAIT; |
| |
| DLockGrantorThread(DLockGrantor grantor, CancelCriterion stopper) { |
| super("Lock Grantor for " + grantor.dlock.getName()); |
| this.grantor = grantor; |
| this.stopper = stopper; |
| } |
| |
| private long now() { |
| DistributionManager dm = this.grantor.dlock.getDistributionManager(); |
| return DLockService.getLockTimeStamp(dm); |
| } |
| |
| protected void shutdown() { |
| this.shutdown = true; |
| this.interrupt(); |
| } |
| |
| protected void checkTimeToWait(long newTimeToWaitArg, boolean expire) { |
| long newTimeToWait = newTimeToWaitArg; |
| if (newTimeToWait == Long.MAX_VALUE) { |
| // never expire |
| return; |
| } else if (newTimeToWait < 0) { |
| // negative means already expired or timed out so we wakeup immediately |
| newTimeToWait = 0; |
| } |
| |
| synchronized (this.lock) { |
| if (expire && newTimeToWait < this.nextExpire) { |
| this.nextExpire = newTimeToWait; |
| } |
| if (!expire && newTimeToWait < this.nextTimeout) { |
| this.nextTimeout = newTimeToWait; |
| } |
| |
| if (newTimeToWait < this.timeToWait) { |
| if (this.waiting) { |
| long newWakeupTimeStamp = now() + newTimeToWait; |
| if (newWakeupTimeStamp > -1 // accounts for overflow |
| && newWakeupTimeStamp < this.expectedWakeupTimeStamp) { |
| this.timeToWait = newTimeToWait; |
| this.requireTimeToWait = true; |
| this.goIntoWait = true; |
| this.lock.notify(); |
| } |
| } else { |
| this.timeToWait = newTimeToWait; |
| this.requireTimeToWait = true; |
| } |
| } // end if newTimeToWait |
| } // end sync this.lock |
| } |
| |
| @Override |
| public void run() { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| |
| DistributedLockStats stats = this.grantor.dlock.getStats(); |
| boolean recalcTimeToWait = false; |
| while (!this.shutdown) { |
| if (stopper.isCancelInProgress()) { |
| break; // done |
| } |
| try { |
| // go into wait if we know we have no timeouts or expires for a while |
| synchronized (this.lock) { // synchronized |
| if (recalcTimeToWait || this.requireTimeToWait) { |
| recalcTimeToWait = false; |
| long nextTS = Math.min(this.nextExpire, this.nextTimeout); |
| this.nextExpire = Long.MAX_VALUE; |
| this.nextTimeout = Long.MAX_VALUE; |
| if (nextTS != Long.MAX_VALUE || this.requireTimeToWait) { |
| this.requireTimeToWait = false; |
| long now = now(); |
| |
| // fix bug 39355 by using current timeToWait if smaller |
| long newTimeToWait = nextTS - now; |
| if (this.requireTimeToWait) { |
| this.timeToWait = Math.min(this.timeToWait, newTimeToWait); |
| } else { |
| this.timeToWait = newTimeToWait; |
| } |
| |
| if (this.timeToWait < 0) |
| this.timeToWait = 0; |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "DLockGrantorThread will wait for {} ms. nextExpire={} nextTimeout={} now={}", |
| this.timeToWait, this.nextExpire, this.nextTimeout, now); |
| } |
| } else { |
| this.timeToWait = Long.MAX_VALUE; |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "DLockGrantorThread will wait until rescheduled."); |
| } |
| } |
| } |
| if (this.timeToWait > 0) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "DLockGrantorThread is about to wait for {} ms.", this.timeToWait); |
| } |
| if (this.timeToWait != Long.MAX_VALUE) { |
| this.expectedWakeupTimeStamp = now() + this.timeToWait; |
| if (this.expectedWakeupTimeStamp < 0) { |
| // overflow |
| this.expectedWakeupTimeStamp = Long.MAX_VALUE; |
| } |
| } else { |
| this.expectedWakeupTimeStamp = Long.MAX_VALUE; |
| } |
| if (this.expectedWakeupTimeStamp == Long.MAX_VALUE) { |
| while (!this.goIntoWait) { |
| this.waiting = true; |
| this.lock.wait(); // spurious wakeup ok |
| this.waiting = false; |
| } |
| } else { |
| long timeToWaitThisTime = this.timeToWait; |
| for (;;) { |
| this.waiting = true; |
| this.lock.wait(timeToWaitThisTime); // spurious wakeup ok |
| this.waiting = false; |
| if (this.goIntoWait) |
| break; // out of for loop |
| timeToWaitThisTime = this.expectedWakeupTimeStamp - now(); |
| if (timeToWaitThisTime <= 0) |
| break; // out of for loop |
| } |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "DLockGrantorThread has woken up..."); |
| } |
| if (this.shutdown) |
| break; |
| // if goIntoWait, continue back around and enter wait again |
| if (this.goIntoWait) { |
| this.goIntoWait = false; |
| continue; |
| } |
| } |
| } // synchronized |
| long statStart = stats.startGrantorThread(); |
| try { |
| Collection grants = this.grantor.snapshotGrantTokens(); |
| |
| // TASK: expire and grant locks |
| if (this.shutdown) { |
| return; |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "DLockGrantorThread about to expireAndGrantLocks..."); |
| } |
| { |
| long smallestExpire = this.grantor.expireAndGrantLocks(grants.iterator()); |
| synchronized (this.lock) { |
| if (smallestExpire < this.nextExpire) { |
| this.nextExpire = smallestExpire; |
| } |
| |
| } |
| } |
| long timing = stats.endGrantorThreadExpireAndGrantLocks(statStart); |
| |
| // TASK: timeout waiting requests |
| if (this.shutdown) { |
| return; |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "DLockGrantorThread about to handleRequestTimeouts..."); |
| } |
| { |
| long smallestRequestTimeout = this.grantor.handleRequestTimeouts(grants.iterator()); |
| long smallestSuspendTimeout = this.grantor.handleSuspendTimeouts(); |
| synchronized (this.lock) { |
| if (smallestRequestTimeout < this.nextTimeout) { |
| this.nextTimeout = smallestRequestTimeout; |
| } |
| if (smallestSuspendTimeout < this.nextTimeout) { |
| this.nextTimeout = smallestSuspendTimeout; |
| } |
| } |
| } |
| timing = stats.endGrantorThreadHandleRequestTimeouts(timing); |
| |
| // TASK: remove unused tokens |
| if (this.shutdown) { |
| return; |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "DLockGrantorThread about to removeUnusedGrants..."); |
| } |
| this.grantor.removeUnusedGrants(grants.iterator()); |
| stats.endGrantorThreadRemoveUnusedTokens(timing); |
| |
| } catch (CancelException e) { |
| // so, exit then. |
| } finally { |
| recalcTimeToWait = true; |
| stats.endGrantorThread(statStart); |
| } |
| |
| } catch (LockGrantorDestroyedException ex) { |
| this.shutdown = true; |
| return; |
| } catch (InterruptedException e) { |
| // shutdown probably interrupted us |
| |
| // Not necessary to reset the interrupt bit, we're going to go |
| // away of our own accord. |
| |
| if (this.shutdown) { |
| // ok to ignore since this thread will now shutdown |
| } else { |
| logger.warn("DLockGrantorThread was unexpectedly interrupted", |
| e); |
| // do not set interrupt flag since this thread needs to resume |
| stopper.checkCancelInProgress(e); |
| } |
| } |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // MembershipListener inner classes |
| // ------------------------------------------------------------------------- |
| |
| /** Detects loss of the lock grantor and initiates grantor recovery. */ |
| private MembershipListener membershipListener = new MembershipListener() { |
| @Override |
| public void memberJoined(DistributionManager distributionManager, |
| InternalDistributedMember id) {} |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, |
| InternalDistributedMember whoSuspected, String reason) {} |
| |
| @Override |
| public void memberDeparted(DistributionManager distMgr, final InternalDistributedMember id, |
| final boolean crashed) { |
| final DLockGrantor me = DLockGrantor.this; |
| // if the VM is being forcibly disconnected, we shouldn't release locks as it |
| // will take longer than the time allowed by the InternalDistributedSystem |
| // shutdown mechanism. |
| if (distMgr.getCancelCriterion().isCancelInProgress()) { |
| return; |
| } |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| try { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.memberDeparted] waiting thread pool will process id={}", id); |
| } |
| distMgr.getExecutors().getWaitingThreadPool().execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| processMemberDeparted(id, crashed, me); |
| } catch (InterruptedException e) { |
| // ignore |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Ignored interrupt processing departed member"); |
| } |
| } |
| } |
| }); |
| } catch (RejectedExecutionException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockGrantor.memberDeparted] rejected handling of id={}", id); |
| } |
| } |
| } |
| |
| protected void processMemberDeparted(InternalDistributedMember id, boolean crashed, |
| DLockGrantor me) throws InterruptedException { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.processMemberDeparted] id={}", id); |
| } |
| try { |
| me.waitWhileInitializing(); |
| |
| // one cause of bug 32657 is "if (crashed) {" around handleDepartureOf |
| // ... we cannot rely on the value of crashed to determine if grantor |
| // has or will receive a NonGrantorDestroy message |
| me.handleDepartureOf(id); |
| } catch (LockGrantorDestroyedException e) { |
| // ignore... grantor was destroyed |
| } // outer try-catch |
| } |
| }; |
| |
| } |