blob: 1d81005c29c0e020e1002dddc904ae9753780701 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.locks;
import static java.util.concurrent.TimeUnit.DAYS;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.locks.DLockQueryProcessor.DLockQueryMessage;
import org.apache.geode.distributed.internal.locks.DLockRequestProcessor.DLockRequestMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.IdentityArrayList;
import org.apache.geode.internal.cache.TXReservationMgr;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Provides lock grantor authority to a distributed lock service. This is responsible for granting,
* releasing, and timing out locks as well as exposing hooks for recovery or transfer of lock
* grantor.
* <p>
* ReadWriteLocks are not currently handled by grantor recovery or transfer.
*
*/
@SuppressWarnings("unchecked")
public class DLockGrantor {
private static final Logger logger = LogService.getLogger();
public static final boolean DEBUG_SUSPEND_LOCK = // TODO:LOG:CONVERT: REMOVE THIS
Boolean.getBoolean(
DistributionConfig.GEMFIRE_PREFIX + "DLockService.DLockGrantor.debugSuspendLock");
/**
* Default wait before grantor thread will reawaken to check for expirations and timeouts.
*/
public static final long GRANTOR_THREAD_MAX_WAIT = DLockGrantorThread.MAX_WAIT;
/**
* Newly constructed grantor is INITIALIZING. All lock requests and related messages will block.
*/
private static final int INITIALIZING = 0; // msgs will block until READY
/**
* Grantor is READY and handling lock requests.
*/
private static final int READY = 1; // msgs will be processed
/**
* Grantor is DESTROYED and will respond as NOT_GRANTOR to any requests.
*/
private static final int DESTROYED = 5; // NOT_GRANTOR
/**
* DistributedLockService that this grantor is granting locks for.
*/
protected final DLockService dlock;
/**
* Map of grant tokens for tracking grantor-side state of distributed locks. Key: Object name,
* Value: DLockGrantToken grant
*
* guarded.By grantTokens
*/
private final Map grantTokens = new HashMap();
/**
* Dedicated thread responsible for handling expirations and timeouts.
*/
protected final DLockGrantorThread thread;
/**
* The current state of this grantor. Volatile for read access. Mutation must occur while
* synchronized on this grantor.
* <p>
* State starts out as INITIALIZING and moves to READY at which point the grantor will start
* servicing requests.
* <p>
* If a newcomer requests transfer of grantorship, then the state will change to HALTED and then
* finally become DESTROYED when transfer is complete. During HALTED, the elder recognizes the
* newcomer as the current grantor.
* <p>
* If this service is shutting down, it will seek out a successor to become the new grantor.
* During this phase, the state will be PENDING_SHUTDOWN which means no requests or releases will
* be replied to until a successor is found, registers transfer intentions with the elder and
* sends this process a transfer grantorship request. At that point the state will be HALTED as
* described above.
*
* guarded.By this
*/
private volatile int state = INITIALIZING;
/**
* ReadWriteLock to protect in-progress operations from destroying service.
*/
private final StoppableReentrantReadWriteLock destroyLock;
/**
* Specialized lock information for Transaction lock batches.
* <p>
* Key: Object batchId, Value: DLockBatch batch
* <p>
* Handling of batch locks synchronizes on this to assure serial processing.
*
* guarded.By batchLocks
*/
private final Map batchLocks = new HashMap();
/**
* Handles special lock-reservation type for transactions.
*/
private final TXReservationMgr resMgr = new TXReservationMgr(false);
private final Map<InternalDistributedMember, Long> membersDepartedTime = new LinkedHashMap();
private final long departedMemberKeptInMapMilliSeconds = DAYS.toMillis(1);
/**
* Enforces waiting until this grantor is initialized. Used to block all lock requests until
* INITIALIZED.
*/
private final StoppableCountDownLatch whileInitializing;
/**
* Enforces waiting until this grantor is destroyed. Used to block all lock requests while
* destroying. Latch opens after state becomes DESTROYED and grantor begins replying with
* NOT_GRANTOR.
*/
private final StoppableCountDownLatch untilDestroyed;
/**
* If -1 then it has not yet been fetched from elder. Otherwise it is the versionId that the elder
* gave us. During explicit becomeGrantor, the value is -1, and then the elder provides a real
* versionId.
*/
private final AtomicLong versionId = new AtomicLong(-1);
/**
* Used to verify that requestor member is still in view when granting.
*/
protected final DistributionManager dm;
// -------------------------------------------------------------------------
// SuspendLocking state (BEGIN)
/**
* Synchronization for protecting all SuspendLocking state. Guards all suspend locking state.
*/
protected final Object suspendLock = new Object();
/**
* Identifies remote thread that has currently suspended locking or null.
*
* Concurrency: protected by synchronization of {@link #suspendLock}
*/
protected RemoteThread lockingSuspendedBy = null;
/**
* Value indicates nonexistent lock
*/
protected static final int INVALID_LOCK_ID = -1;
/**
* Identifies the lockId used by the remote thread to suspend locking.
*
* Concurrency: protected by synchronization of {@link #suspendLock}
*/
protected int suspendedLockId = INVALID_LOCK_ID;
/**
* FIFO queue for suspend read and write lock waiters.
*
* guarded.By {@link #suspendLock}
*/
private final LinkedList suspendQueue = new LinkedList();
/**
* Map of read lock counts for each RemoteThread currently holding locks.
* <p>
* Key=RemoteThread, Value=ReadLockCount
*
* guarded.By {@link #suspendLock}
*/
private final HashMap readLockCountMap = new HashMap();
/**
* Number of suspend waiters waiting for write lock.
*
* guarded.By {@link #suspendLock}
*/
private int writeLockWaiters = 0;
/**
* Total number of read locks held against suspend write lock.
*
* guarded.By {@link #suspendLock}
*/
private int totalReadLockCount = 0;
/**
* List of next requests to process after handling an unlock or resume.
*
* guarded.By {@link #suspendLock}
*/
private ArrayList permittedRequests = new ArrayList();
/**
* List of active drains of permittedRequests. TODO: does this need to be a synchronizedList?
*
* guarded.By {@link #suspendLock}
*/
private final List permittedRequestsDrain = Collections.synchronizedList(new LinkedList());
// SuspendLocking state (END)
// -------------------------------------------------------------------------
// -------------------------------------------------------------------------
// Static methods
// -------------------------------------------------------------------------
/**
* Creates new instance of grantor for the lock service.
*
* @param dlock the lock service the grantor is authority for
* @param versionId the version, from the elder, of this grantor
* @return new instance of grantor for the lock service
*/
static DLockGrantor createGrantor(DLockService dlock, long versionId) {
return new DLockGrantor(dlock, versionId);
}
/**
* Gets the local instance of grantor for the lock service if one exists.
*
* @param dlock the lock service the grantor is authority for
* @return instance of grantor for the lock service or null if no local grantor has been created
*/
static DLockGrantor getGrantorForService(DLockService dlock) {
if (dlock == null)
return null;
return dlock.getGrantor();
}
/**
* Returns instance of DLockGrantor that will handle distributed lock granting for specified
* service.
* <p>
* Returns null if unable to get ready grantor or if this process is not the grantor for the
* service.
*
* @param svc the lock service to return the grantor instance for
*/
public static DLockGrantor waitForGrantor(DLockService svc) throws InterruptedException {
if (svc == null)
return null;
// now we need to get the grantor and make sure it's ready...
DLockGrantor oldGrantor = null;
DLockGrantor grantor = getGrantorForService(svc);
do {
if (grantor == null || grantor.isDestroyed())
return null;
grantor.waitWhileInitializing();
if (svc.isDestroyed())
return null;
// make sure we are lock grantor
if (!svc.isCurrentlyOrIsMakingLockGrantor())
return null;
if (!grantor.isReady())
return null;
// Now make sure the service still has this member as its grantor
oldGrantor = grantor;
grantor = getGrantorForService(svc);
} while (oldGrantor != grantor);
return grantor;
}
// -------------------------------------------------------------------------
// Constructor
// -------------------------------------------------------------------------
/**
* Creates instance of grantor for the lock service.
*
* @param dlock the lock service the grantor is authority for
* @param vId unique id that the elder increments for each new grantor
*/
private DLockGrantor(DLockService dlock, long vId) {
this.dm = dlock.getDistributionManager();
CancelCriterion stopper = this.dm.getCancelCriterion();
this.whileInitializing = new StoppableCountDownLatch(stopper, 1);
this.untilDestroyed = new StoppableCountDownLatch(stopper, 1);
this.dlock = dlock;
this.destroyLock = new StoppableReentrantReadWriteLock(stopper);
this.versionId.set(vId);
this.dm.addMembershipListener(this.membershipListener);
this.thread = new DLockGrantorThread(this, stopper);
this.dlock.getStats().incGrantors(1);
}
// -------------------------------------------------------------------------
// Public and package methods
// -------------------------------------------------------------------------
/**
* Returns the grantor's version id which was assigned by an elder. Required to help uniquely
* identify this grantor instance.
*
* @return the grantor's version id which was assigned by an elder
*/
public long getVersionId() {
return this.versionId.get();
}
/**
* Sets the version id after the elder tells us what it is. This is called during the explicit
* become grantor process.
*
* @param v the elder assigned version id for this grantor
*/
public void setVersionId(long v) {
this.versionId.set(v);
}
/**
* Waits uninterruptibly while this grantor is initializing. Returns when grantor is ready to
* handle lock requests.
*
* @throws DistributedSystemDisconnectedException if system shuts down before grantor is ready
*/
public void waitWhileInitializing() throws InterruptedException {
boolean interrupted = Thread.interrupted();
try {
if (interrupted && this.dlock.isInterruptibleLockRequest()) {
throw new InterruptedException();
}
while (true) {
try {
this.whileInitializing.await();
break;
} catch (InterruptedException e) {
interrupted = true;
throwIfInterruptible(e);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Waits uninterruptibly until this service is destroyed. Returns when grantor has been completely
* destroyed.
*
* @throws DistributedSystemDisconnectedException if system shuts down before grantor is destroyed
*/
public void waitUntilDestroyed() throws InterruptedException {
while (true) {
boolean interrupted = Thread.interrupted();
try {
this.untilDestroyed.await();
break;
} catch (InterruptedException e) {
interrupted = true;
throwIfInterruptible(e);
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
}
@Override
public String toString() {
StringBuffer buffer = new StringBuffer(128);
buffer.append('<').append("DLockGrantor").append("@")
.append(Integer.toHexString(System.identityHashCode(this))).append(" state=")
.append(stateToString(this.state)).append(" name=").append(this.dlock.getName())
.append(" version=").append(this.getVersionId()).append('>');
return buffer.toString();
}
/**
* Returns true if this grantor is ready to handle lock requests.
*
* @return true if this grantor is ready to handle lock requests
*/
boolean isReady() {
return this.state == READY;
}
/**
* Returns true if this grantor is still initializing and not yet ready for lock requests.
*
* @return true if this grantor is still initializing
*/
boolean isInitializing() {
return this.state == INITIALIZING;
}
/**
* Returns true if this grantor has been destroyed.
*
* @return true if this grantor has been destroyed
*/
public boolean isDestroyed() {
return this.state == DESTROYED;
}
/**
* Throws LockGrantorDestroyedException if this grantor has been destroyed.
*
* @throws LockGrantorDestroyedException if grantor is destroyed
*/
void checkDestroyed() {
throwIfDestroyed(isDestroyed());
}
/**
* Throws LockGrantorDestroyedException if destroyed is true.
*
* @param destroyed if true then throw LockGrantorDestroyedException
* @throws LockGrantorDestroyedException if destroyed is true
*/
private void throwIfDestroyed(boolean destroyed) {
if (destroyed) {
throw new LockGrantorDestroyedException(
"Grantor is destroyed");
}
}
/**
* Handles request for a batch of locks using optimization for transactions.
* <p>
* Synchronizes on {@link #batchLocks}.
*
* @throws LockGrantorDestroyedException if grantor is destroyed
*/
void handleLockBatch(DLockRequestMessage request) throws InterruptedException {
DLockLessorDepartureHandler handler = this.dlock.getDLockLessorDepartureHandler();
// make sure the tx locks of departed members have been cleared so we don't have
// conflicts with non-existent members. This is done in a waiting-pool thread launched
// when the member-departure is announced.
handler.waitForInProcessDepartures();
synchronized (this.batchLocks) { // assures serial processing
waitWhileInitializing();
if (request.checkForTimeout()) {
cleanupSuspendState(request);
return;
}
final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch]");
}
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] request: {}",
request);
}
DLockBatch batch = (DLockBatch) request.getObjectName();
checkIfHostDeparted(batch.getOwner());
resMgr.makeReservation((IdentityArrayList) batch.getReqs());
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] granting {}",
batch.getBatchId());
}
this.batchLocks.put(batch.getBatchId(), batch);
request.respondWithGrant(Long.MAX_VALUE);
} catch (CommitConflictException ex) {
request.respondWithTryLockFailed(ex.getMessage());
} finally {
releaseDestroyReadLock();
}
}
}
private void checkIfHostDeparted(InternalDistributedMember owner) {
// Already held batchLocks; hold membersDepartedTime lock just for clarity
synchronized (membersDepartedTime) {
// the transaction host/txLock requester has departed.
if (membersDepartedTime.containsKey(owner)) {
throw new TransactionDataNodeHasDepartedException(
"The transaction host " + owner + " is no longer a member of the cluster.");
}
}
}
/**
* Returns transaction optimized lock batches that were created by the specified owner.
* <p>
* Synchronizes on batchLocks.
*
* @param owner member that owned the lock batches to return
* @return lock batches that were created by owner
*/
public DLockBatch[] getLockBatches(InternalDistributedMember owner) {
// Key: Object batchId, Value: DLockBatch batch
synchronized (this.batchLocks) {
// put owner into the map first so that no new threads will handle in-flight requests
// from the departed member to lock keys
recordMemberDepartedTime(owner);
List batchList = new ArrayList();
for (Iterator iter = this.batchLocks.values().iterator(); iter.hasNext();) {
DLockBatch batch = (DLockBatch) iter.next();
if (batch.getOwner().equals(owner)) {
batchList.add(batch);
}
}
return (DLockBatch[]) batchList.toArray(new DLockBatch[0]);
}
}
void recordMemberDepartedTime(InternalDistributedMember owner) {
// Already held batchLocks; hold membersDepartedTime lock just for clarity
synchronized (membersDepartedTime) {
long currentTime = getCurrentTime();
for (Iterator iterator = membersDepartedTime.values().iterator(); iterator.hasNext();) {
if ((long) iterator.next() < currentTime - departedMemberKeptInMapMilliSeconds) {
iterator.remove();
} else {
break;
}
}
membersDepartedTime.put(owner, currentTime);
}
}
long getCurrentTime() {
return System.currentTimeMillis();
}
@VisibleForTesting
Map getMembersDepartedTimeRecords() {
return membersDepartedTime;
}
/**
* Get the batch for the given batchId (for example use a txLockId from TXLockBatch in order to
* update its participants). This operation was added as part of the solution to bug 32999.
* <p>
* Acquires acquireDestroyReadLock. Synchronizes on batchLocks.
* <p>
* see org.apache.geode.internal.cache.TXCommitMessage#updateLockMembers()
*
* @param batchId the identifier for the batch to retrieve
* @return the transaction lock batch identified by the given batchId
* @see org.apache.geode.internal.cache.locks.TXLockUpdateParticipantsMessage
* @see org.apache.geode.internal.cache.locks.TXLockBatch#getBatchId()
*/
public DLockBatch getLockBatch(Object batchId) throws InterruptedException {
DLockBatch ret = null;
final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.getLockBatch] enter: {}", batchId);
}
synchronized (this.batchLocks) {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
ret = (DLockBatch) this.batchLocks.get(batchId);
} finally {
releaseDestroyReadLock();
}
}
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.getLockBatch] exit: {}", batchId);
}
return ret;
}
/**
* Update the batch for the given batch. This operation was added as part of the solution to bug
* 32999.
* <p>
* Acquires acquireDestroyReadLock. Synchronizes on batchLocks.
* <p>
* see org.apache.geode.internal.cache.locks.TXCommitMessage#updateLockMembers()
*
* @param batchId the identify of the transaction lock batch
* @param newBatch the new lock batch to be used
* @see org.apache.geode.internal.cache.locks.TXLockUpdateParticipantsMessage
* @see org.apache.geode.internal.cache.locks.TXLockBatch#getBatchId()
*/
public void updateLockBatch(Object batchId, DLockBatch newBatch) throws InterruptedException {
final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.updateLockBatch] enter: {}", batchId);
}
synchronized (this.batchLocks) {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
final DLockBatch oldBatch = (DLockBatch) this.batchLocks.get(batchId);
if (oldBatch != null) {
this.batchLocks.put(batchId, newBatch);
}
} finally {
releaseDestroyReadLock();
}
}
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.updateLockBatch] exit: {}", batchId);
}
}
/**
* Releases the transaction optimized lock batch.
* <p>
* Acquires acquireDestroyReadLock. Synchronizes on batchLocks.
*
* @param batchId the identify of the transaction lock batch to release
* @param owner the member that has created and locked the lock batch
* @throws LockGrantorDestroyedException if grantor is destroyed or interrupted
*/
public void releaseLockBatch(Object batchId, InternalDistributedMember owner)
throws InterruptedException {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.releaseLockBatch]");
}
synchronized (this.batchLocks) {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
DLockBatch batch = (DLockBatch) this.batchLocks.remove(batchId);
if (batch != null) {
this.resMgr.releaseReservation((IdentityArrayList) batch.getReqs());
}
} finally {
releaseDestroyReadLock();
}
}
}
/**
* Returns true if the request comes from the local member.
*
* @param request the lock request to check
* @return true if the request comes from the local member
*/
private boolean isLocalRequest(DLockRequestMessage request) {
return request.getSender().equals(this.dlock.getDistributionManager().getId());
}
/**
* TEST HOOK: Allows testing to determine if there are waiting requests for a lock.
* <p>
* Synchronizes on grantTokens and the grant token if one exists.
*
* @param name the lock to check for waiting requests for
* @return true if the named lock has requests waiting to acquire it
*/
boolean hasWaitingRequests(Object name) {
DLockGrantToken grant = getGrantToken(name);
if (grant == null)
return false;
synchronized (grant) {
return grant.hasWaitingRequests();
}
}
/**
* Handles a DLockQueryMessage. Returns DLockGrantToken for the lock or null.
* <p>
* Acquires destroyReadLock. Synchronizes on grantTokens.
*
* @param query the dlock query message to handle
* @return DLockGrantToken for the lock or null
*/
DLockGrantToken handleLockQuery(DLockQueryMessage query) throws InterruptedException {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockQuery] {}", query);
}
if (acquireDestroyReadLock(0)) {
try {
checkDestroyed();
return getGrantToken(query.objectName);
} finally {
releaseDestroyReadLock();
}
}
return null;
}
/**
* Handles the provided lock request. The lock will either be granted, refused if try-lock, or
* scheduled at end of waiting queue to eventually be granted or timed out.
* <p>
* Acquires destroyReadLock. Synchronizes on grantTokens, suspendLock and the grant token.
*
* @param request the lock request to be processed by this grantor
* @throws LockGrantorDestroyedException if grantor is destroyed
*/
void handleLockRequest(DLockRequestMessage request) throws InterruptedException {
Assert.assertTrue(request.getRemoteThread() != null);
if (request.getObjectName() instanceof DLockBatch) {
handleLockBatch(request);
return;
}
waitWhileInitializing(); // calcWaitMillisFromNow
final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockRequest] {}", request);
}
if (!acquireDestroyReadLock(0)) {
if (isLocalRequest(request) && this.dlock.isDestroyed()) {
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.handleLockRequest] about to throwIfDestroyed");
}
// this special case is one fix for deadlock between waitUntilDestroyed
// and dlock waitForGrantorCallsInProgress (when request is local)
throwIfDestroyed(true);
} else {
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.handleLockRequest] about to waitUntilDestroyed");
}
// is there still a deadlock when an explicit become is destroying
// this grantor instead of destroying the dlock service?
waitUntilDestroyed();
checkDestroyed();
}
}
try {
// make sure we don't grant a dlock held by a departed member until that member's
// transactions are resolved
DLockLessorDepartureHandler dLockLessorDepartureHandler =
this.dlock.getDLockLessorDepartureHandler();
if (dLockLessorDepartureHandler != null) {
dLockLessorDepartureHandler.waitForInProcessDepartures();
}
checkDestroyed();
if (acquireLockPermission(request)) {
handlePermittedLockRequest(request);
} else {
// request has been added to suspendQueue for deferred handling
}
} finally {
releaseDestroyReadLock();
}
}
/**
* Internally handles a lock request which has permission to proceed.
* <p>
* Calling thread must hold destroyReadLock. Synchronizes on grantTokens, suspendLock and the
* grant token.
*
* @param request the lock request to be processed by this grantor guarded.By
* {@link #acquireDestroyReadLock(long)}
*/
private void handlePermittedLockRequest(final DLockRequestMessage request) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handlePermittedLockRequest] {}", request);
}
Assert.assertTrue(request.getRemoteThread() != null);
DLockGrantToken grant = getOrCreateGrant(request.getObjectName());
try {
// try to grant immediately if not currently granted...
if (grant.grantLockToRequest(request)) {
// do nothing
}
// if request was local and then interrupted/released...
else if (request.responded()) {
// do nothing
}
// if request was a failed try-lock...
else if (request.isTryLock()) {
cleanupSuspendState(request);
request.respondWithTryLockFailed(request.getObjectName());
}
// if request has timed out...
else if (request.checkForTimeout()) {
cleanupSuspendState(request);
}
// schedule into waiting queue for eventual granting...
else {
grant.schedule(request);
this.thread.checkTimeToWait(calcWaitMillisFromNow(request), false);
}
} finally {
grant.decAccess();
}
}
/**
* Initializes this new grantor with previously held locks as provided during grantor recovery.
* <p>
* Acquires destroyReadLock. Synchronizes on this grantor, grantTokens, suspendLock, the grant
* token.
*
* @param owner the member that owns the tokens to be scheduled
* @param tokens set of DLockRemoteTokens to be scheduled for owner
*/
void initializeHeldLocks(InternalDistributedMember owner, Set tokens)
throws InterruptedException {
synchronized (this) {
if (isDestroyed())
return;
if (!acquireDestroyReadLock(0)) {
return;
}
}
try {
synchronized (this.grantTokens) {
Set members = this.dlock.getDistributionManager().getDistributionManagerIds();
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
for (Iterator iter = tokens.iterator(); iter.hasNext();) {
DLockRemoteToken token = (DLockRemoteToken) iter.next();
DLockGrantToken grantToken = getOrCreateGrant(token.getName());
try {
// make sure the token's owner is still in the system
if (!members.contains(owner)) {
// skipping because member is no longer in view
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"Initialization of held locks is skipping {} because owner {} is not in view: ",
token, owner, members);
}
continue;
}
RemoteThread rThread = null;
boolean isSuspendLock = false;
int lockId = -1;
synchronized (grantToken) {
if (grantToken.isLeaseHeld()) {
logger.error(LogMarker.DLS_MARKER,
"Initialization of held locks is skipping {} because lock is already held: {}",
token, grantToken);
continue;
}
grantToken.grantLock(owner, token.getLeaseExpireTime(), token.getLeaseId(),
token.getLesseeThread());
// grantToken may have already expired or is about to expire
// complete initialization but make sure grantor thread will wake
// up and expire it as soon as it's running
if (grantToken.getLeaseExpireTime() > -1
&& grantToken.getLeaseExpireTime() < Long.MAX_VALUE) {
long now = DLockService.getLockTimeStamp(this.dm);
this.thread.checkTimeToWait(grantToken.getLeaseExpireTime() - now, true);
}
rThread = grantToken.getRemoteThread();
isSuspendLock = grantToken.isSuspendLockingToken();
lockId = grantToken.getLockId();
}
// update the readLock and suspendLocking states...
synchronized (suspendLock) {
if (isSuspendLock) {
suspendLocking(rThread, lockId);
} else {
Assert.assertTrue(!isLockingSuspended() || isLockingSuspendedBy(rThread),
"Locking is suspended by a different thread: " + token);
Integer integer = (Integer) readLockCountMap.get(rThread);
int readLockCount = integer == null ? 0 : integer.intValue();
readLockCount++;
readLockCountMap.put(rThread, Integer.valueOf(readLockCount));
totalReadLockCount++;
checkTotalReadLockCount();
}
} // suspendLock sync
} finally {
grantToken.decAccess();
}
} // tokens iter
} // grantTokens sync
return;
} finally {
releaseDestroyReadLock();
}
}
/**
* Handles a request for extending the lease time of an already held lock.
* <p>
* Acquires destroyReadLock. Synchronizes on grantTokens and the grant token.
*
* @param request the lock request to be reentered for lease extension
* @return new extended leaseExpireTime or 0 if requestor no longer holds lock
*/
long reenterLock(DLockRequestMessage request) throws InterruptedException {
waitWhileInitializing(); // calcWaitMillisFromNow
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
// to fix GEODE-678 no longer call request.checkForTimeout
DLockGrantToken grant = getGrantToken(request.getObjectName());
if (grant == null) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.reenterLock] no grantToken found for {}", request.getObjectName());
}
return 0;
}
synchronized (grant) { // synchronize against grant.expireAndGrantLock
if (!this.dm.isCurrentMember(request.getSender()) || grant.isDestroyed()) {
return 0;
}
if (!grant.isLockedBy(request.getSender(), request.getLockId())) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.reenterLock] grant is not locked by sender={} lockId={} grant={}",
request.getSender(), request.getLockId(), grant);
}
return 0;
}
long leaseExpireTime =
Math.max(grant.getLeaseExpireTime(), grant.calcLeaseExpireTime(request.getLeaseTime()));
grant.grantLock(request.getSender(), leaseExpireTime, request.getLockId(),
grant.getRemoteThread());
return grant.getLeaseExpireTime();
}
} finally {
releaseDestroyReadLock();
}
}
/**
* Release named lock if held by owner using lockId. Called from DLockReleaseMessage.basicProcess
* for remote unlock.
* <p>
* Acquires destroyReadLock. Synchronizes on grantTokens and the grant token.
*
* @param name the name of the lock to release
* @param owner the member releasing the lock
* @param lockId the identity of the lease used by the owner
* @throws LockGrantorDestroyedException if grantor is destroyed
*/
void releaseIfLocked(Object name, InternalDistributedMember owner, int lockId)
throws InterruptedException {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
getAndReleaseGrantIfLockedBy(name, owner, lockId);
} finally {
releaseDestroyReadLock();
}
}
/**
* Fetches the actual grant token and releases it if leased by owner using lockId.
* DLockReleaseMessage.basicProcess -> releaseIfLocked -> getAndReleaseGrantIfLockedBy
* <p>
* Caller must hold destroyReadLock. Synchronizes on grantTokens and the grant token.
*
* @param name the name of the lock to release
* @param owner the member attempting to release the granted lock
* @param lockId the id of the lease used by the owner guarded.By
* {@link #acquireDestroyReadLock(long)}
*/
private void getAndReleaseGrantIfLockedBy(Object name, InternalDistributedMember owner,
int lockId) {
synchronized (this.grantTokens) {
DLockGrantToken grantToken = basicGetGrantToken(name);
if (grantToken != null) { // checking isTokenDestroyed here will deadlock
synchronized (grantToken) {
// if (!grantToken.isTokenDestroyed())
try {
grantToken.releaseIfLockedBy(owner, lockId);
removeGrantIfUnused(grantToken);
} catch (IllegalStateException e) {
this.dlock.checkDestroyed();
checkDestroyed();
// must have hit race... grantor doesn't have the token
return;
}
}
}
}
}
/**
* Fetches the grant token for named lock and attempts to grant it to the next waiting requestor
* if one exists. Called from DLockReleaseProcessor when another process releases a lock and after
* the reply has been sent.
* <p>
* Acquires destroyReadLock. Synchronizes on grantTokens and the grant token.
*
* @param name the name of the lock to grant
* @throws LockGrantorDestroyedException if grantor is destroyed
*/
void grantLock(Object name) throws InterruptedException {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
try {
checkDestroyed();
DLockGrantToken grant = getGrantToken(name);
if (grant != null) {
removeGrantIfUnused(grant);
}
} finally {
releaseDestroyReadLock();
}
}
/**
* Handles the departure of a member by releasing every lock it owned.
* <p>
* Acquires destroyReadLock. Synchronizes on grantTokens, suspendLock, and the grant token.
*
* @param owner the member that departed
*/
void handleDepartureOf(InternalDistributedMember owner) throws InterruptedException {
// bug 32657 has another cause in this method... interrupted thread from
// connection/channel layer caused acquireDestroyReadLock to fail...
// fixed by Darrel in org.apache.geode.internal.tcp.Connection
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (acquireDestroyReadLock(0)) {
try {
if (isDestroyed()) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.handleDepartureOf] grantor is destroyed; ignoring {}", owner);
}
return;
}
try {
DLockLessorDepartureHandler handler = this.dlock.getDLockLessorDepartureHandler();
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleDepartureOf] handler = {}",
handler);
}
if (handler != null) {
handler.handleDepartureOf(owner, this);
}
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DlockGrantor.handleDepartureOf] ignored cancellation (1)");
}
} finally {
synchronized (this.suspendLock) {
HashSet removals = new HashSet();
for (Iterator it = readLockCountMap.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
RemoteThread rThread = (RemoteThread) entry.getKey();
if (rThread.getDistributedMember().equals(owner)) {
removals.add(rThread);
}
}
for (Iterator it = removals.iterator(); it.hasNext();) {
try {
postReleaseLock((RemoteThread) it.next(), null);
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DlockGrantor.handleDepartureOf] ignored cancellation (2)");
}
}
}
} // synchronized
synchronized (this.grantTokens) {
// do not call handleDepartureOf while iterating grantTokens
// changes fix bug 39172 (ConcurrentModificationException)
// 1) built up list of grants that reference departed member
List grantsReferencingMember = new ArrayList();
Collection grants = this.grantTokens.values();
for (Iterator iter = grants.iterator(); iter.hasNext();) {
DLockGrantToken grant = (DLockGrantToken) iter.next();
try {
grant.checkDepartureOf(owner, grantsReferencingMember);
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DlockGrantor.handleDepartureOf] ignored cancellation (3)");
}
}
} // for
// 2) call handleDepartureOf on list of grantsReferencingMember
ArrayList grantsToRemoveIfUnused = new ArrayList();
for (Iterator iter = grantsReferencingMember.iterator(); iter.hasNext();) {
DLockGrantToken grant = (DLockGrantToken) iter.next();
try {
grant.handleDepartureOf(owner, grantsToRemoveIfUnused);
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DlockGrantor.handleDepartureOf] ignored cancellation (4)");
}
}
} // for
// 3) remove grants in grantsToRemoveIfUnused list
// TODO: if grantsReferencingMember is always empty remove this
for (Iterator iter = grantsToRemoveIfUnused.iterator(); iter.hasNext();) {
DLockGrantToken grant = (DLockGrantToken) iter.next();
try {
removeGrantIfUnused(grant);
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DlockGrantor.handleDepartureOf] ignored cancellation (5)");
}
}
} // for
} // synchronized this.grantTokens
} // finally
} finally {
releaseDestroyReadLock();
}
}
}
/**
* Destroys this grantor without attempting to transfer grant tokens to a successor.
* <p>
* Acquires destroyWriteLock. Synchronizes on this grantor, grantTokens, and each grant token.
*/
void destroy() {
synchronized (this) {
if (isDestroyed())
return;
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[simpleDestroy]");
}
// wait for the destroy write lock ignoring interrupts...
boolean acquired = false;
try {
boolean locksHeld = false;
try {
acquireDestroyWriteLock(Long.MAX_VALUE);
acquired = true;
// check for any held locks...
if (isInitializing()) {
// assume the worst case and tell the elder that recovery will be required
locksHeld = true;
} else {
synchronized (this.grantTokens) {
InternalDistributedMember me = this.dlock.getDistributionManager().getId();
for (Iterator iter = this.grantTokens.values().iterator(); iter.hasNext();) {
DLockGrantToken grant = (DLockGrantToken) iter.next();
InternalDistributedMember owner = grant.getOwner();
if (owner != null && !owner.equals(me)) {
locksHeld = true;
break;
}
}
}
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[simpleDestroy] {} locks held",
(locksHeld ? "with" : "without"));
}
} finally {
// make sure the following occurs even if checking locks above failed
try {
// release latches and change internal state
destroyGrantor();
} finally {
// tell the elder we are not the grantor anymore
this.dlock.clearGrantor(this.getVersionId(), locksHeld);
}
}
} finally {
if (acquired) {
releaseDestroyWriteLock();
}
}
}
}
/**
* Send replies to all waiting requestors to notify them that this is no longer the grantor.
* <p>
* Caller must acquire destroyWriteLock. Synchronizes on suspendLock, grantTokens, and each grant
* token.
*
* guarded.By {@link #acquireDestroyWriteLock(long)}
*/
private void destroyGrantor() {
Assert.assertHoldsLock(this, true);
makeDestroyed();
// reply to all pending requests w/ NOT_GRANTOR
synchronized (this.grantTokens) {
Collection grants = this.grantTokens.values();
for (Iterator iter = grants.iterator(); iter.hasNext();) {
DLockGrantToken grant = (DLockGrantToken) iter.next();
grant.handleGrantorDestruction();
}
}
synchronized (suspendLock) {
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.destroyAndRemove] responding to {} permitted requests.",
permittedRequests.size());
}
respondWithNotGrantor(permittedRequests.iterator());
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.destroyAndRemove] responding to {} requests awaiting permission.",
suspendQueue.size());
}
respondWithNotGrantor(suspendQueue.iterator());
for (Iterator iter = permittedRequestsDrain.iterator(); iter.hasNext();) {
final List drain = (List) iter.next();
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.destroyAndRemove] responding to {} drained permitted requests.",
drain.size());
}
respondWithNotGrantor(drain.iterator());
}
}
}
/**
* Send responses to specified requests informing the senders that this is no longer the grantor.
* <p>
* Caller must acquire destroyWriteLock.
*
* @param requests the requests to respond to guarded.By {@link #acquireDestroyWriteLock(long)}
*/
private void respondWithNotGrantor(Iterator requests) {
while (requests.hasNext()) {
final DLockRequestMessage request = (DLockRequestMessage) requests.next();
request.respondWithNotGrantor();
}
}
/**
* Make this grantor ready for handling lock requests.
* <p>
* Synchronizes on this grantor.
*
* @param enforceInitializing true if this should assert isInitializing
* @return true if grantor was successfully made ready
*/
synchronized boolean makeReady(boolean enforceInitializing) {
if (isDestroyed()) {
this.dlock.checkDestroyed();
}
if (!enforceInitializing) {
if (!isInitializing()) {
return false;
}
}
assertInitializing();
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
StringBuffer sb =
new StringBuffer("DLockGrantor " + this.dlock.getName() + " initialized with:");
for (Iterator tokens = grantTokens.values().iterator(); tokens.hasNext();) {
sb.append("\n\t" + tokens.next());
}
logger.trace(LogMarker.DLS_VERBOSE, sb.toString());
}
this.state = READY;
this.whileInitializing.countDown();
this.thread.start();
return true;
}
// -------------------------------------------------------------------------
// Private methods
// -------------------------------------------------------------------------
/**
* Drain currently permitted requests and grant lock to next requestor.
* <p>
* Acquires destroyReadLock. Synchronizes on suspendLock, grantTokens, and the grant token.
*
* @param objectName the lock to perform post release tasks for
*/
void postRemoteReleaseLock(Object objectName) throws InterruptedException {
if (!acquireDestroyReadLock(0)) {
return;
}
try {
checkDestroyed();
drainPermittedRequests();
grantLock(objectName);
} catch (LockServiceDestroyedException | LockGrantorDestroyedException e) {
// ignore... service was destroyed and that's ok
} finally {
releaseDestroyReadLock();
}
}
/**
* Acquires a read lock on the destroy ReadWrite lock uninterruptibly using millis for try-lock
* attempt.
*
* @param millis the milliseconds to try to acquire lock within
* @return true if destroy read lock was acquired
* @throws DistributedSystemDisconnectedException if system has been disconnected
*/
private boolean acquireDestroyReadLock(long millis) throws InterruptedException {
boolean interrupted = Thread.interrupted();
try {
if (interrupted && this.dlock.isInterruptibleLockRequest()) {
throw new InterruptedException();
}
while (true) {
try {
this.dm.getCancelCriterion().checkCancelInProgress(null); // is this needed?
boolean acquired = this.destroyLock.readLock().tryLock(millis);
return acquired;
} catch (InterruptedException e) {
interrupted = true;
throwIfInterruptible(e);
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
/**
* Releases a read lock on the destroy ReadWrite lock.
*/
private void releaseDestroyReadLock() {
this.destroyLock.readLock().unlock();
}
/**
* Acquires the write lock on the destroy ReadWrite lock within specified millis.
*
* @param millis the milliseconds to attempt to acquire the lock within
* @throws DistributedSystemDisconnectedException if system has been disconnected
*/
private void acquireDestroyWriteLock(long millis) {
for (;;) {
boolean interrupted = Thread.interrupted();
try {
this.dm.getCancelCriterion().checkCancelInProgress(null);
boolean acquired = this.destroyLock.writeLock().tryLock(millis);
if (acquired) {
return;
}
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Releases the write lock on the destroy ReadWrite lock.
*/
private void releaseDestroyWriteLock() {
this.destroyLock.writeLock().unlock();
}
/**
* Returns time to wait in millis from now based on start and wait in request.
*
* @return the current wait time for the request before it times out
*/
private long calcWaitMillisFromNow(DLockRequestMessage request) {
long result = request.getTimeoutTS();
if (result != Long.MAX_VALUE) {
long now = DLockService.getLockTimeStamp(this.dlock.getDistributionManager());
result = result - now;
}
return result;
}
/**
* Shuts down the grantor thread and changes internal state to destroyed.
*/
private void makeDestroyed() {
try {
this.thread.shutdown();
this.state = DESTROYED;
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "DLockGrantor {} state is DESTROYED",
this.dlock.getName());
}
if (this.untilDestroyed.getCount() > 0) {
this.untilDestroyed.countDown();
}
if (this.whileInitializing.getCount() > 0) {
this.whileInitializing.countDown();
}
this.dlock.getDistributionManager().removeMembershipListener(this.membershipListener);
} finally {
this.dlock.getStats().incGrantors(-1);
}
}
/**
* Returns a snapshot of the current grant tokens.
* <p>
* Synchronizes on grantTokens.
*
* @return a snapshot of the current grant tokens
*/
protected Collection snapshotGrantTokens() {
Collection snapshot = null;
synchronized (this.grantTokens) {
snapshot = new ArrayList(this.grantTokens.values());
}
return snapshot;
}
/**
* Fetches or creates a new grant token for the named lock.
* <p>
* Synchronizes on grantTokens and the grant token.
*
* @param name the name of the lock
* @return the grant token for the named lock
*/
private DLockGrantToken getOrCreateGrant(Object name) {
DLockGrantToken grantToken = null;
synchronized (this.grantTokens) {
grantToken = basicGetGrantToken(name);
if (grantToken == null) { // checking isTokenDestroyed here will deadlock
grantToken = new DLockGrantToken(this.dlock, this, name);
grantToken.incAccess();
basicPutGrantToken(grantToken);
} else {
synchronized (grantToken) {
if (grantToken.isDestroyed()) {
grantToken = new DLockGrantToken(this.dlock, this, name);
grantToken.incAccess();
basicPutGrantToken(grantToken);
} else {
grantToken.incAccess();
}
}
}
}
return grantToken;
}
/**
* TEST HOOK: Returns an unmodifible collection backed by the values of the DLockGrantToken map
* for testing purposes only.
* <p>
* Synchronizes on grantTokens.
*
* @return unmodifible collection of the grant tokens
*/
public Collection getGrantTokens() {
synchronized (this.grantTokens) {
return Collections.unmodifiableCollection(this.grantTokens.values());
}
}
/**
* Remove the grant token if it is unused.
* <p>
* Synchronizes on grantTokens and the grant token.
*
* @param grant the grant token to remove
*/
protected void removeGrantIfUnused(DLockGrantToken grant) {
synchronized (this.grantTokens) {
synchronized (grant) {
if (isDestroyed() || grant.isDestroyed()) {
return;
} else if (grant.grantLockToNextRequest()) {
return;
} else if (!grant.isBeingAccessed() && !grant.isGranted(false)
&& !grant.hasWaitingRequests()) {
basicRemoveGrantToken(grant);
}
}
}
}
/**
* Iterates over grants and attempts to remove any that are no longer in use.
* <p>
* Synchronizes on grantTokens and the grant token.
*
* @param grants the grants to be checked for removal
*/
protected void removeUnusedGrants(Iterator grants) {
while (grants.hasNext()) {
DLockGrantToken grant = (DLockGrantToken) grants.next();
removeGrantIfUnused(grant);
}
}
/**
* Returns the DLockGrantToken from grant tokens map stored under the key name.
* <p>
* Synchronizes on grantTokens.
*/
public DLockGrantToken getGrantToken(Object name) {
synchronized (this.grantTokens) {
return basicGetGrantToken(name);
}
}
/**
* Fetches the grant token value stored in the map under key name.
* <p>
* Caller must synchronize on grantTokens
*
* @param name the key to fetch the grant token value for
* @return the grant token stored under key name guarded.By {@link #grantTokens}
*/
private DLockGrantToken basicGetGrantToken(Object name) {
return (DLockGrantToken) this.grantTokens.get(name);
}
/**
* Stores the grant token as a value in the map under the key of its name.
* <p>
* Caller must synchronize on grantTokens
*
* @param grantToken the grant token to store in the map guarded.By {@link #grantTokens}
*/
private void basicPutGrantToken(DLockGrantToken grantToken) {
this.grantTokens.put(grantToken.getName(), grantToken);
dlock.getStats().incGrantTokens(1);
}
/**
* Removes the grant token from the map.
* <p>
* Caller must synchronize on grantTokens and then the grantToken.
*
* @param grantToken the grant token to remove from the map. guarded.By {@link #grantTokens} and
* grantToken
*/
private void basicRemoveGrantToken(DLockGrantToken grantToken) {
Object removed = this.grantTokens.remove(grantToken.getName()); // changed to ref token
if (removed != null) {
Assert.assertTrue(removed == grantToken);
grantToken.destroy();
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.basicRemoveGrantToken] removed {}; removed={}", grantToken, removed);
}
}
}
/**
* Iterates over grants and handles any that have expired.
* <p>
* Synchronizes on each grant token.
*
* @param grants the grants to iterate over
* @return the next smallest expiration time
*/
protected long expireAndGrantLocks(Iterator grants) {
long smallestExpire = Long.MAX_VALUE;
while (grants.hasNext()) {
DLockGrantToken grant = (DLockGrantToken) grants.next();
if (grant.isDestroyed()) {
continue;
}
long expire = grant.expireAndGrantLock();
if (expire < smallestExpire) {
smallestExpire = expire;
}
}
return smallestExpire;
}
/**
* Iterates over grants and handles any that have timed out.
* <p>
* Synchronizes on each grant token.
*
* @param grants the grants to iterate over
* @return the next smallest timeout
*/
protected long handleRequestTimeouts(Iterator grants) {
long smallestTimeout = Long.MAX_VALUE;
while (grants.hasNext()) {
DLockGrantToken grant = (DLockGrantToken) grants.next();
if (grant.isDestroyed()) {
continue;
}
long timeout = grant.handleRequestTimeouts();
if (timeout < smallestTimeout) {
smallestTimeout = timeout;
}
}
return smallestTimeout;
}
/**
* TEST HOOK: Specifies time to sleep while handling suspend in order to cause a timeout.
* <p>
* Synchronizes on suspendLock.
*
*/
public void setDebugHandleSuspendTimeouts(int value) {
synchronized (suspendLock) {
debugHandleSuspendTimeouts = value;
}
}
/**
* True to enable test hook to sleep while handling suspend to cause timeout.
*
* guarded.By {@link #suspendLock}
*/
private int debugHandleSuspendTimeouts = 0;
/**
* Iterates through a copy of suspendQueue and handles any requests that have timed out.
* <p>
* Synchronizes on suspendLock.
*
* @return the next smallest timeout in the suspendQueue
*/
protected long handleSuspendTimeouts() {
long smallestTimeout = Long.MAX_VALUE;
synchronized (suspendLock) {
if (suspendQueue.isEmpty())
return smallestTimeout;
if (isDestroyed())
return smallestTimeout;
}
List timeouts = new ArrayList();
List copySuspendQueue = null;
synchronized (suspendLock) {
copySuspendQueue = new ArrayList(suspendQueue);
}
for (Iterator iter = copySuspendQueue.iterator(); iter.hasNext();) {
DLockRequestMessage req = (DLockRequestMessage) iter.next();
if (req.checkForTimeout()) { // sends DLockResponseMessage if timeout
cleanupSuspendState(req);
timeouts.add(req);
} else {
long timeout = req.getTimeoutTS();
if (timeout < smallestTimeout) {
smallestTimeout = timeout;
}
}
}
int localDebugHandleSuspendTimeouts = 0;
synchronized (suspendLock) {
localDebugHandleSuspendTimeouts = debugHandleSuspendTimeouts;
}
if (localDebugHandleSuspendTimeouts > 0) {
try {
logger.info(LogMarker.DLS_MARKER,
"debugHandleSuspendTimeouts sleeping for {}",
localDebugHandleSuspendTimeouts);
Thread.sleep(localDebugHandleSuspendTimeouts);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (!timeouts.isEmpty()) {
synchronized (suspendLock) {
if (writeLockWaiters > 0) {
// suspenders exist... must iterate through for safe removal
for (Iterator iter = timeouts.iterator(); iter.hasNext();) {
DLockRequestMessage req = (DLockRequestMessage) iter.next();
// attempt to remove timed out req from suspendQueue
if (suspendQueue.remove(req)) {
// request was still in suspendQueue, so check if suspender
if (req.isSuspendLockingRequest()) {
writeLockWaiters--;
}
}
} // for
} else {
// no suspenders so safe to removeAll
Assert.assertTrue(writeLockWaiters == 0,
"Grantor state writeLockWaiters changed while holding suspendLock");
suspendQueue.removeAll(timeouts);
}
checkWriteLockWaiters();
} // synchronized
}
return smallestTimeout;
}
/**
* Returns string representation for the enumerated grantor state.
*
* @param stateInt the number of the state to return a string for
* @return string representation for the enumerated grantor state
*/
private String stateToString(int stateInt) {
String stateDesc = null;
switch (stateInt) {
case INITIALIZING:
stateDesc = "INITIALIZING";
break;
case READY:
stateDesc = "READY";
break;
case DESTROYED:
stateDesc = "DESTROYED";
break;
default:
stateDesc = null;
break;
}
if (stateDesc == null) {
throw new IllegalArgumentException(String.format("Unknown state for grantor: %s",
Integer.valueOf(state)));
}
return stateDesc;
}
/**
* Throws IllegalStateException if this grantor is not still initializing.
*
* @throws IllegalStateException if this grantor is not still initializing
*/
private void assertInitializing() {
if (this.state != INITIALIZING) {
String stateDesc = stateToString(this.state);
throw new IllegalStateException(
String.format("DLockGrantor operation only allowed when initializing, not %s",
stateDesc));
}
}
/**
* Suspends locking by the remote thread and lease id.
* <p>
* Caller must synchronize on suspendLock.
*
* Concurrency: protected by synchronization of {@link #suspendLock}
*
* @param myRThread the remote thread that is about to suspend locking
* @param lockId the id of the lock request used to suspend locking
*/
protected void suspendLocking(final RemoteThread myRThread, final int lockId) {
if (DEBUG_SUSPEND_LOCK) {
Assert.assertHoldsLock(this.suspendLock, true);
}
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "Suspend locking of {} by {} with lockId of {}",
this.dlock, myRThread, lockId);
}
Assert.assertTrue(myRThread != null, "Attempted to suspend locking for null RemoteThread");
Assert.assertTrue(this.lockingSuspendedBy == null || this.lockingSuspendedBy.equals(myRThread),
"Attempted to suspend locking for " + myRThread + " but locking is already suspended by "
+ this.lockingSuspendedBy);
this.suspendedLockId = lockId;
this.lockingSuspendedBy = myRThread;
}
/**
* Resume locking after it has been suspended.
* <p>
* Caller must synchronize on suspendLock.
*/
private void resumeLocking() {
if (DEBUG_SUSPEND_LOCK) {
Assert.assertHoldsLock(this.suspendLock, true);
}
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "Resume locking of {}", this.dlock);
}
this.lockingSuspendedBy = null;
this.suspendedLockId = INVALID_LOCK_ID;
}
/**
* Returns true if locking has been suspended.
* <p>
* Caller must synchronize on suspendLock.
*
* Concurrency: protected by synchronization of {@link #suspendLock}
*
* @return true if locking has been suspended
*/
protected boolean isLockingSuspended() {
if (DEBUG_SUSPEND_LOCK) {
Assert.assertHoldsLock(this.suspendLock, true);
}
return this.lockingSuspendedBy != null;
}
/**
* Returns true if locking has been suspended.
* <p>
* Synchronizes on suspendLock.
*
* @return true if locking has been suspended
*/
protected boolean isLockingSuspendedWithSync() {
synchronized (this.suspendLock) {
return this.lockingSuspendedBy != null;
}
}
/**
* Returns true if locking has been suspended by the remote thread.
* <p>
* Caller must synchronize on suspendLock.
*
* Concurrency: protected by synchronization of {@link #suspendLock}
*
* @return true if locking has been suspended by the remote thread
*/
protected boolean isLockingSuspendedBy(final RemoteThread rThread) {
if (DEBUG_SUSPEND_LOCK) {
Assert.assertHoldsLock(this.suspendLock, true);
}
if (rThread == null)
return false;
return rThread.equals(this.lockingSuspendedBy);
}
String displayStatus(RemoteThread rThread, Object name) {
StringBuffer sb = new StringBuffer();
synchronized (this.suspendLock) {
sb.append(' ');
sb.append(this.toString());
sb.append(" id=" + this.hashCode());
sb.append(" rThread=" + rThread);
if (name != null) {
sb.append(" name=" + name);
}
sb.append(" permittedRequests (" + permittedRequests.size() + ")="
+ permittedRequests.toString() + "");
sb.append(" suspendedLockId = " + suspendedLockId);
sb.append(" lockingSuspendedBy = " + lockingSuspendedBy);
sb.append(" writeLockWaiters = " + writeLockWaiters);
sb.append(" totalReadLockCount = " + totalReadLockCount);
sb.append("\nsuspendQueue (" + suspendQueue.size() + ")=" + suspendQueue.toString());
// Kirk said it was ok to not log the list of readLockers to cut
// down on how much logging is done at fine level.
sb.append("\nreadLockers (" + readLockCountMap.size()
+ ")" /* + "=" + readLockCountMap.toString() */);
}
return sb.toString();
}
/**
* guarded.By {@link #suspendLock}
*/
private void postReleaseSuspendLock(RemoteThread rThread, Object lock) {
if (!isLockingSuspendedBy(rThread)) {
// hit bug related to 35749
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[postReleaseSuspendLock] locking is no longer suspended by {}", rThread);
}
return;
}
boolean resume = true;
Integer integer = (Integer) readLockCountMap.get(rThread);
int readLockCount = integer == null ? 0 : integer.intValue();
if (readLockCount == 0 && !suspendQueue.isEmpty()) {
final DLockRequestMessage nextRequest = (DLockRequestMessage) suspendQueue.getFirst();
if (nextRequest.isSuspendLockingRequest()) {
resume = false;
// final RemoteThread myRemoteThread = nextRequest.getRemoteThread();
// hand-off suspendLocking while under sync...
resumeLocking();
suspendLocking(nextRequest.getRemoteThread(), nextRequest.getLockId());
permittedRequests.add(suspendQueue.removeFirst());
writeLockWaiters--;
checkWriteLockWaiters();
}
}
if (resume) {
resumeLocking();
// drain readLocks from suspendQueue into permittedRequests queue
while (!suspendQueue.isEmpty()) {
final DLockRequestMessage nextRequest = (DLockRequestMessage) suspendQueue.getFirst();
if (nextRequest.isSuspendLockingRequest()) {
Assert.assertTrue(writeLockWaiters > 0,
"SuspendLocking request is waiting but writeLockWaiters is 0");
break;
}
RemoteThread nextRThread = nextRequest.getRemoteThread();
integer = (Integer) readLockCountMap.get(nextRThread);
readLockCount = integer == null ? 0 : integer.intValue();
readLockCount++;
readLockCountMap.put(nextRThread, Integer.valueOf(readLockCount));
totalReadLockCount++;
checkTotalReadLockCount();
permittedRequests.add(suspendQueue.removeFirst());
}
}
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[postReleaseSuspendLock] new status {}",
displayStatus(rThread, null));
}
}
/**
* guarded.By {@link #suspendLock}
*/
private void postReleaseReadLock(RemoteThread rThread, Object lock) {
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
// handle release of regular lock
// boolean permitSuspend = false;
Integer integer = (Integer) readLockCountMap.get(rThread);
int readLockCount = integer == null ? 0 : integer.intValue();
if (readLockCount < 1) {
// hit bug 35749
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[postReleaseReadLock] no locks are currently held by {}", rThread);
}
return;
}
readLockCount--;
if (readLockCount == 0) {
readLockCountMap.remove(rThread);
} else {
readLockCountMap.put(rThread, Integer.valueOf(readLockCount));
}
totalReadLockCount--;
if (totalReadLockCount < 0) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "Total readlock count has dropped to {} for {}",
totalReadLockCount, this);
}
}
if (totalReadLockCount == 0 && !suspendQueue.isEmpty()) {
final DLockRequestMessage nextRequest = (DLockRequestMessage) suspendQueue.getFirst();
if (nextRequest.isSuspendLockingRequest()) {
suspendLocking(nextRequest.getRemoteThread(), nextRequest.getLockId());
writeLockWaiters--;
permittedRequests.add(suspendQueue.removeFirst());
checkWriteLockWaiters();
} else {
String s = new StringBuilder("\n (readLockCount=").append(readLockCount)
.append(", totalReadLockCount=").append(totalReadLockCount)
.append(", writeLockWaiters=").append(writeLockWaiters).append(",\nsuspendQueue=")
.append(suspendQueue).append(",\npermittedRequests=").append(permittedRequests)
.toString();
logger.warn("Released regular lock with waiting read lock: {}", s);
Assert.assertTrue(false,
String.format("Released regular lock with waiting read lock: %s",
s));
}
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[postReleaseReadLock] new status {}",
displayStatus(rThread, null));
}
checkTotalReadLockCount();
}
/**
* Handles post release lock tasks including tracking the current suspend locking states.
* <p>
* Synchronizes on suspendLock.
*
* @param rThread the remote thread that released the lock
* @param lock the named lock that was released
*/
protected void postReleaseLock(RemoteThread rThread, Object lock) {
Assert.assertTrue(rThread != null);
synchronized (suspendLock) {
checkDestroyed();
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[postReleaseLock] rThread={} lock={} permittedRequests={} suspendQueue={}", rThread,
lock, permittedRequests, suspendQueue);
}
if (DLockService.SUSPEND_LOCKING_TOKEN.equals(lock)) {
postReleaseSuspendLock(rThread, lock);
} else {
postReleaseReadLock(rThread, lock);
}
} // suspendLock sync
}
/**
* Departure or other codepath NOT specific to unlock requires that we cleanup suspend state that
* was already permitted to request. This needs to be invoked for both regular and suspend locks.
* <p>
* Synchronizes on suspendLock.
*
* @param request the request to cleanup after due to departure of sender
*/
protected void cleanupSuspendState(DLockRequestMessage request) {
postReleaseLock(request.getRemoteThread(), request.getObjectName());
}
/**
* Drains newly permitted requests that have been removed from suspendQueue. All requests in the
* permittedRequests queue already have permission to proceed with granting or scheduling.
* <p>
* Caller must acquire destroyReadLock. Synchronizes on suspendLock, grantTokens and each grant
* token.
*
* Concurrency: protected by {@link #destroyLock} via invoking
* {@link #acquireDestroyReadLock(long)}
*/
protected void drainPermittedRequests() {
ArrayList drain = null;
synchronized (suspendLock) {
checkDestroyed();
if (this.permittedRequests.isEmpty()) {
return;
}
drain = this.permittedRequests;
this.permittedRequestsDrain.add(drain);
this.permittedRequests = new ArrayList();
} // suspendLock sync
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[drainPermittedRequests] draining {}", drain);
}
// iterate and attempt to grantOrSchedule each request
for (Iterator iter = drain.iterator(); iter.hasNext();) {
DLockRequestMessage request = (DLockRequestMessage) iter.next();
checkDestroyed(); // destroyAndRemove should respond to all of these
try {
handlePermittedLockRequest(request); // synchronizes on grant instance
} catch (LockGrantorDestroyedException e) {
try {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"LockGrantorDestroyedException respondWithNotGrantor to {}", request);
}
request.respondWithNotGrantor();
} finally {
}
} catch (LockServiceDestroyedException e) {
try {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"LockServiceDestroyedException respondWithNotGrantor to {}", request);
}
request.respondWithNotGrantor();
} finally {
}
} catch (RuntimeException e) {
logger.error(
"Processing of postRemoteReleaseLock threw unexpected RuntimeException",
e);
request.respondWithException(e);
} finally {
}
}
synchronized (suspendLock) {
checkDestroyed();
this.permittedRequestsDrain.remove(drain);
}
}
/**
* Synchronizes on suspendLock.
*/
private boolean acquireSuspendLockPermission(DLockRequestMessage request) {
boolean permitLockRequest = false;
final RemoteThread rThread = request.getRemoteThread();
Assert.assertTrue(rThread != null);
synchronized (suspendLock) {
checkDestroyed();
if (!dm.isCurrentMember(request.getSender())) {
logger.info(LogMarker.DLS_MARKER, "Ignoring lock request from non-member: {}", request);
return false;
}
Integer integer = (Integer) readLockCountMap.get(rThread);
int readLockCount = integer == null ? 0 : integer.intValue();
boolean othersHaveReadLocks = totalReadLockCount > readLockCount;
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isLockingSuspended() || writeLockWaiters > 0 || othersHaveReadLocks) {
writeLockWaiters++;
suspendQueue.addLast(request);
this.thread.checkTimeToWait(calcWaitMillisFromNow(request), false);
checkWriteLockWaiters();
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.acquireSuspend] added '{}' to end of suspendQueue.", request);
}
} else {
permitLockRequest = true;
suspendLocking(rThread, request.getLockId());
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.acquireSuspendLockPermission] permitted and suspended for {}",
request);
}
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.acquireSuspendLockPermission] new status permitLockRequest = {}{}",
permitLockRequest, displayStatus(rThread, null));
}
} // suspendLock sync
return permitLockRequest;
}
/**
* Synchronizes on suspendLock.
*/
private boolean acquireReadLockPermission(DLockRequestMessage request) {
boolean permitLockRequest = false;
final RemoteThread rThread = request.getRemoteThread();
Assert.assertTrue(rThread != null);
synchronized (suspendLock) {
checkDestroyed();
if (!dm.isCurrentMember(request.getSender())) {
logger.info(LogMarker.DLS_MARKER, "Ignoring lock request from non-member: %s", request);
return false;
}
Integer integer = (Integer) readLockCountMap.get(rThread);
int readLockCount = integer == null ? 0 : integer.intValue();
boolean threadHoldsLock = readLockCount > 0 || isLockingSuspendedBy(rThread);
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (!threadHoldsLock && (isLockingSuspended() || writeLockWaiters > 0)) {
suspendQueue.addLast(request);
this.thread.checkTimeToWait(calcWaitMillisFromNow(request), false);
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.acquireReadLockPermission] added {} to end of suspendQueue.", request);
}
} else {
readLockCount++;
readLockCountMap.put(rThread, Integer.valueOf(readLockCount));
totalReadLockCount++;
permitLockRequest = true;
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.acquireReadLockPermission] permitted {}", request);
}
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.acquireReadLockPermission] new status threadHoldsLock = {} permitLockRequest = {}{}",
threadHoldsLock, permitLockRequest, displayStatus(rThread, null));
}
checkTotalReadLockCount();
} // suspendLock sync
return permitLockRequest;
}
/**
* Returns true if lock request has permission to proceed; else adds the request to the end of
* suspendQueue and returns false.
* <p>
* Synchronizes on suspendLock.
*
* @param request the lock request to acquire permission for
*/
private boolean acquireLockPermission(final DLockRequestMessage request) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.acquireLockPermission] {}", request);
}
boolean permitLockRequest = false;
if (request.getObjectName().equals(DLockService.SUSPEND_LOCKING_TOKEN)) {
permitLockRequest = acquireSuspendLockPermission(request);
} else {
permitLockRequest = acquireReadLockPermission(request);
}
return permitLockRequest;
}
/**
* Throws InterruptedException if local lock request exists and is interruptible or
* CancelException if DistributionManager is forcing us to cancel for shutdown.
*
* @param e the throwable that caused this check
*/
private void throwIfInterruptible(InterruptedException e) throws InterruptedException {
// This needs to be first, otherwise user gets a complaint that
// the TV is off when the problem is the house is burning down...
this.dm.getCancelCriterion().checkCancelInProgress(e);
if (this.dlock.isInterruptibleLockRequest()) {
throw e;
}
}
/**
* TEST HOOK: Logs all grant tokens and other lock information for this service at INFO level.
* <p>
* Synchronizes on grantTokens.
*/
protected void dumpService() {
synchronized (this.grantTokens) {
StringBuffer buffer = new StringBuffer();
buffer.append("DLockGrantor.dumpService() for ").append(this);
buffer.append("\n").append(this.grantTokens.size()).append(" grantTokens\n");
for (Iterator iter = this.grantTokens.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
buffer.append(" ").append(entry.getKey()).append(": ");
DLockGrantToken token = (DLockGrantToken) entry.getValue();
buffer.append(token.toString()).append("\n");
}
logger.info(LogMarker.DLS_MARKER, "{}", buffer);
logger.info(LogMarker.DLS_MARKER, "{}",
"\nreadLockCountMap:\n" + readLockCountMap);
}
}
/**
* Verify the waiters (for debugging)
*
* guarded.By {@link #suspendLock}
*/
private void checkWriteLockWaiters() {
if (!DEBUG_SUSPEND_LOCK) {
return;
}
Assert.assertHoldsLock(this.suspendLock, true);
int result = 0;
Iterator it = this.suspendQueue.iterator();
while (it.hasNext()) {
DLockRequestMessage r = (DLockRequestMessage) it.next();
if (r.isSuspendLockingRequest()) {
result++;
}
} // while
Assert.assertTrue(result == this.writeLockWaiters);
}
/**
* Debugging method
*
* guarded.By {@link #suspendLock}
*/
private void checkTotalReadLockCount() {
if (!DEBUG_SUSPEND_LOCK) {
return;
}
Assert.assertHoldsLock(this.suspendLock, true);
int result = 0;
Iterator it = readLockCountMap.values().iterator();
while (it.hasNext()) {
result += ((Integer) it.next()).intValue();
}
Assert.assertTrue(result == totalReadLockCount);
}
// -------------------------------------------------------------------------
// DLockGrantToken (static inner class)
// -------------------------------------------------------------------------
/**
* Handles leasing and queued scheduling for an individual distributed lock.
*/
public static class DLockGrantToken {
/**
* DLS which contains this lock. Reference is used for stats and lifecycle.
*/
private final DLockService dlock;
/**
* Grantor instance that handles leasing of this lock.
*/
private final DLockGrantor grantor;
/**
* The uniquely identifying object name for this lock
*/
private final Object lockName;
/**
* Pending requests queued up for the lock
*
* guarded.By this
*/
private LinkedList pendingRequests;
/**
* The reply processor id is used to identify the specific lock operation used by the lessee to
* lease this lock
*
* guarded.By this
*/
private int leaseId = -1;
/**
* Distributed member that currently has a lease on this lock
*
* guarded.By this
*/
private InternalDistributedMember lessee;
/**
* Absolute time in milliseconds when the current lease will expire. When this lock is not
* leased out, the value is -1. When the lock is leased out, the value is > 0. A value of
* Long.MAX_VALUE indicates a non-expiring (infinite) lease.
*
* guarded.By this
*/
private long leaseExpireTime = -1;
/**
* Current count of threads attempting to access this grant token.
*
* guarded.By this
*/
private int accessCount = 0;
/**
* True if this token has been destroyed and removed from usage.
*
* guarded.By this
*/
private boolean destroyed = false;
/**
* RemoteThread identity of thread currently holding lease on this lock
*
* guarded.By this
*/
private RemoteThread lesseeThread = null;
/**
* Instatiates a new instance of DLockGrantToken.
*
* @param dlock the lock service scope for this lock
* @param grantor the grantor handling locks for the lock service
* @param name the name of this lock
*/
protected DLockGrantToken(DLockService dlock, DLockGrantor grantor, Object name) {
this.lockName = name;
this.dlock = dlock;
this.grantor = grantor;
}
/**
* Schedules the lock request for immediate or later granting of lock. This will grant the lock
* if it is available, otherwise it will add the request at the end of the pending requests
* queue.
* <p>
* Synchronizes on this grant token.
*
* @param request the request to grant or schedule
* @return true if the lock request was immediately granted
*/
protected synchronized boolean schedule(DLockRequestMessage request) {
if (!this.grantor.dm.isCurrentMember(request.getSender())) {
this.grantor.cleanupSuspendState(request);
return false;
}
if (!isGranted(false) && !hasWaitingRequests()) {
// don't need to schedule... just grant it
if (grantLockToRequest(request)) {
return true;
}
}
// add the request to the sorted set...
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.schedule] {} scheduling: {}", this,
request);
}
if (this.pendingRequests == null) {
this.pendingRequests = new LinkedList();
this.dlock.getStats().incRequestQueues(1);
}
this.pendingRequests.add(request);
this.dlock.getStats().incPendingRequests(1);
return true;
}
/**
* Sends NOT_GRANTOR replies to every request waiting for this grant token and then destroys the
* grant token.
* <p>
* Synchronizes on this grant token.
*/
protected synchronized void handleGrantorDestruction() {
try {
if (this.pendingRequests != null) {
for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) {
DLockRequestMessage request = (DLockRequestMessage) iter.next();
request.respondWithNotGrantor();
}
}
} finally {
destroy();
}
}
/**
* Checks current lock for expiration and attempts to grant the lock if it is available.
* <p>
* Synchronizes on this grant token.
* <p>
* NOTE: expiration is only as accurate as clock synchronization on the hardware that the
* members are running on probably should have Requestors handle expirations and send Release
* msg - need an Evictor thread in each Requestor
*
* @return the lease expiration time in millis for the currently held lock or Long.MAX_VALUE if
* lock has no owner
*/
protected synchronized long expireAndGrantLock() {
// isGranted calls checkForExpiration...
if (this.grantor.isDestroyed())
return Long.MAX_VALUE;
if (!isGranted(true) && !this.grantor.isLockingSuspendedWithSync()) {
grantLockToNextRequest();
}
long result = getLeaseExpireTime();
if (result <= 0) {
result = Long.MAX_VALUE;
}
return result;
}
/**
* Returns true if there are pending requests waiting to lock this grant token.
* <p>
* Caller must synchronize on this grant token.
*
* Concurrency: protected by synchronization of *this* DLockGrantToken
*
* @return true if there are pending requests waiting to lock this
*/
protected synchronized boolean hasWaitingRequests() {
if (this.pendingRequests == null)
return false;
return !this.pendingRequests.isEmpty();
}
/**
* Grant this lock to the request if possible. Returns true if lock was granted to the request.
* <p>
* Synchronizes on this grant token.
*
* @param request the lock request asking for this lock
* @return true if lock was granted to the request
*/
protected synchronized boolean grantLockToRequest(DLockRequestMessage request) {
Assert.assertTrue(request.getRemoteThread() != null);
if (isGranted(true) || hasWaitingRequests()) {
return false;
}
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.grantLockToRequest] granting: {}",
request);
}
long newLeaseExpireTime = grantAndRespondToRequest(request);
if (newLeaseExpireTime == -1)
return false;
if (newLeaseExpireTime < Long.MAX_VALUE) {
long now = DLockService.getLockTimeStamp(this.grantor.dm);
this.grantor.thread.checkTimeToWait(newLeaseExpireTime - now, true);
}
return true;
}
/**
* Called to release a remote lock when processing a DLockReleaseMessage.
* <p>
* Caller must synchronize on this grant token.
* <p>
* Call stack: DLockReleaseMessage -> releaseIfLocked -> getAndReleaseGrantIfLockedBy ->
* grant.releaseIfLockedBy
*
* Concurrency: protected by synchronization of *this* DLockGrantToken
*
* @param owner the member to release the lock for
* @param lockId the lock id that the member used to acquire the lock
*/
protected void releaseIfLockedBy(InternalDistributedMember owner, int lockId) {
final RemoteThread rThread = getRemoteThread();
boolean released = false;
try {
released = releaseLock(owner, lockId);
} catch (IllegalStateException e) {
this.dlock.checkDestroyed();
this.grantor.checkDestroyed();
// must have hit race... grantor doesn't have the token
return;
}
if (released) {
// don't bother synchronizing requests for this log statement...
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
synchronized (this) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.releaseIfLockedBy] pending requests: {}",
(this.pendingRequests == null ? "none" : "" + this.pendingRequests.size()));
}
}
Assert.assertTrue(rThread != null);
// releaseIfLockedBy (remote unlock)
this.grantor.postReleaseLock(rThread, getName());
// note: DLockReleaseMessage calls drainPermittedRequests next...
}
}
/**
* Returns true if lock is currently leased by the owner with the specified lock id.
* <p>
* Caller must synchronize on this grant token.
*
* Concurrency: protected by synchronization of *this* DLockGrantToken
*
* @param owner the member to check for lock ownership
* @param lockId the lock id that the member used for locking
* @return true if lock is currently leased by the owner with the specified lock id
*/
protected boolean isLockedBy(InternalDistributedMember owner, int lockId) {
return isLeaseHeldBy(owner, lockId);
}
/**
* Handle timeouts for requests waiting on this lock. Any requests that have timed out will be
* removed. Calculates and returns the next smallest timeout of the requests still waiting on
* this lock.
* <p>
* Synchronizes on this grant token.
*
* @return next smallest timeout of the requests still waiting on this lock
*/
protected long handleRequestTimeouts() {
long smallestTimeout = Long.MAX_VALUE;
synchronized (this) {
if (this.pendingRequests == null)
return smallestTimeout;
if (this.grantor.isDestroyed())
return smallestTimeout;
}
List timeouts = new ArrayList();
// narrow timeouts to just contain requests that have timed out...
DLockRequestMessage req = null;
// ... copyRequests is synchronized on this ...
synchronized (this) {
for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) {
req = (DLockRequestMessage) iter.next();
if (req.checkForTimeout()) { // sends DLockResponseMessage if timeout
this.grantor.cleanupSuspendState(req);
timeouts.add(req);
} else {
long timeout = req.getTimeoutTS();
if (timeout < smallestTimeout) {
smallestTimeout = timeout;
}
}
}
removeRequests(timeouts);
}
return smallestTimeout;
}
/**
* Cleans up any state for the departed member. If the lock is held by this member, it will be
* released. Any pending lock requests for this member will be removed.
* <p>
* Synchronizes on this grant token, suspendLock, and grantTokens.
*
* @param member the departed member
*/
protected void handleDepartureOf(final InternalDistributedMember member,
final ArrayList grantsToRemoveIfUnused) {
boolean released = false;
RemoteThread rThread = null;
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
try {
synchronized (this) {
try {
if (isDestroyed())
return;
if (this.pendingRequests == null)
return;
// remove member from pendingRequests...
DLockRequestMessage req = null;
for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) {
req = (DLockRequestMessage) iter.next();
if (member.equals(req.getSender())) {
// found departed member, respondWithNotHolder to end dlock stats
try {
req.handleDepartureOfSender();
// cleanup suspend state for this request
this.grantor.cleanupSuspendState(req);
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.handleDepartureOf] ignored cancellation (1)");
}
}
// remove the request
iter.remove();
this.dlock.getStats().incPendingRequests(-1);
}
}
} finally {
synchronized (this) {
// bugfix 32657 release lock AFTER removing member from queued requests
// because release will grant to first request in queued requests
rThread = getRemoteThread();
boolean releasedToken = false;
try {
releasedToken = releaseLock(member, getLockId());
} catch (IllegalStateException e) {
this.dlock.checkDestroyed();
this.grantor.checkDestroyed();
// must have hit race... grantor doesn't have the token
return;
}
if (releasedToken) {
released = true;
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.handleDepartureOf] pending requests: {}",
(this.pendingRequests == null ? "none" : "" + this.pendingRequests.size()));
}
Assert.assertTrue(rThread != null);
}
}
}
}
} finally {
if (released) {
try {
this.grantor.postReleaseLock(rThread, getName());
} catch (CancelException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.handleDepartureOf] ignored cancellation (2)");
}
}
this.grantor.drainPermittedRequests(); // destroyReadLock{grant{}, suspendLock{}}
grantsToRemoveIfUnused.add(this);
}
}
}
/**
* Adds this grant to the list if it references the departed member.
* <p>
* Synchronizes on this grant token.
*
* @param member the departed member
* @param grantsReferencingMember list to add grant to if it references departed member
*/
protected synchronized void checkDepartureOf(final InternalDistributedMember member,
final List grantsReferencingMember) {
if (this.destroyed) {
return;
}
if (member.equals(this.lessee)) {
grantsReferencingMember.add(this);
return;
}
if (this.pendingRequests != null) {
DLockRequestMessage req = null;
for (Iterator iter = this.pendingRequests.iterator(); iter.hasNext();) {
req = (DLockRequestMessage) iter.next();
if (member.equals(req.getSender())) {
grantsReferencingMember.add(this);
return;
}
}
}
}
/**
* Remove all the specified pending requests.
* <p>
* Caller must synchronize on this grant token.
*
* @param requestsToRemove the pending requests to remove guarded.By this
*/
private void removeRequests(Collection requestsToRemove) {
if (!requestsToRemove.isEmpty()) {
synchronized (this) {
this.pendingRequests.removeAll(requestsToRemove);
}
this.dlock.getStats().incPendingRequests(-requestsToRemove.size());
}
}
/**
* Grants this lock to the next waiting request if one exists.
* <p>
* Caller must synchronize on this grant token.
*
* Concurrency: protected by synchronization of *this* DLockGrantToken
*
* @return true if the lock was granted to next request
*/
protected boolean grantLockToNextRequest() {
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.grantLock] {} isGranted={} hasWaitingRequests={}", getName(),
isLeaseHeld(), hasWaitingRequests());
}
while (!isGranted(true) && hasWaitingRequests()) {
try {
// get request at front of queue...
DLockRequestMessage request = null;
synchronized (this) {
request = (DLockRequestMessage) this.pendingRequests.remove(0);
}
this.dlock.getStats().incPendingRequests(-1);
// grant lock to the request unless it is timed out...
if (request.checkForTimeout()) {
this.grantor.cleanupSuspendState(request);
continue;
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.grantLock] granting {} to {}",
getName(), request.getSender());
}
long newLeaseExpireTime = grantAndRespondToRequest(request);
if (newLeaseExpireTime == -1)
continue;
if (newLeaseExpireTime < Long.MAX_VALUE) {
long now = DLockService.getLockTimeStamp(this.grantor.dm);
this.grantor.thread.checkTimeToWait(newLeaseExpireTime - now, true);
}
} catch (IndexOutOfBoundsException e) {
// ignore... entry may have timed out between empty check and remove
}
}
return isGranted(false);
}
/**
* Grants the lock to the specified request and sends a reply to the member that initiated the
* request.
* <p>
* Caller must synchronize on this grant token.
*
* @param request the request to grant the lock to
* @return leaseExpireTime or -1 if failed to grant. guarded.By this
*/
private long grantAndRespondToRequest(DLockRequestMessage request) {
synchronized (request) {
if (request.respondedNoSync()) {
return -1;
}
Assert.assertTrue(request.getRemoteThread() != null);
if (!this.grantor.dm.isCurrentMember(request.getSender())) {
this.grantor.cleanupSuspendState(request);
return -1;
}
if (isSuspendLockingToken()) {
synchronized (this.grantor.suspendLock) {
Assert.assertTrue(
this.grantor.lockingSuspendedBy == null
|| this.grantor.isLockingSuspendedBy(request.getRemoteThread()),
"Locking is suspended by " + this.grantor.lockingSuspendedBy + " with lockId of "
+ this.grantor.suspendedLockId + " instead of " + request.getRemoteThread()
+ " with lockId of " + request.getLockId());
} // suspendLock sync
}
long newLeaseExpireTime = calcLeaseExpireTime(request.getLeaseTime());
grantLock(request.getSender(), newLeaseExpireTime, request.getLockId(),
request.getRemoteThread());
if (isSuspendLockingToken()) {
synchronized (this.grantor.suspendLock) {
// no-op if already suspend by this RemoteThread...
this.grantor.suspendLocking(request.getRemoteThread(), request.getLockId());
Assert.assertTrue(this.grantor.isLockingSuspendedBy(request.getRemoteThread()),
"Locking should now be suspended by " + request.getRemoteThread()
+ " with lockId of " + request.getLockId() + " instead of "
+ this.grantor.lockingSuspendedBy + " with lockId of "
+ this.grantor.suspendedLockId);
} // suspendLock sync
}
// NOTE: if grantor is local and client interrupts the request, the
// following will release the lock because the reply processor is gone
request.respondWithGrant(newLeaseExpireTime);
if (!isLeaseHeldBy(request.getSender(), request.getLockId())) {
// lock request was local and interrupted then released
return -1;
}
return newLeaseExpireTime;
}
}
/**
* Returns the absolute time at which the specified lease time will expire from now. This call
* does not change or check any state other than current time.
*
* @param leaseTime the desired length of lease time
* @return the absolute time at which the lease will expire
*/
protected long calcLeaseExpireTime(long leaseTime) {
if (leaseTime == Long.MAX_VALUE || leaseTime == -1) {
return Long.MAX_VALUE;
}
long currentTime = getCurrentTime();
long newLeaseExpireTime = currentTime + leaseTime;
if (newLeaseExpireTime < leaseTime) { // rolled over MAX_VALUE...
newLeaseExpireTime = Long.MAX_VALUE;
}
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.calcLeaseExpireTime] currentTime={} newLeaseExpireTime={}",
currentTime, newLeaseExpireTime);
}
return newLeaseExpireTime;
}
/**
* Returns true if this grant token is currently granted.
* <p>
* Caller must synchronize on this grant token.
*
* Concurrency: protected by synchronization of *this* DLockGrantToken
*
* @param checkForExpiration true if expiration should be attempted before checking if this
* grant token is currently granted
* @return true if this grant token is currently granted
*/
protected boolean isGranted(boolean checkForExpiration) {
if (checkForExpiration) {
checkForExpiration();
}
return isLeaseHeld();
}
/**
* Creates a string of the pending requests for logging or debugging.
* <p>
* Caller must synchronize on this grant token.
*
* @return a string of the pending requests for logging or debugging guarded.By this
*/
private String pendingRequestsToString() {
if (this.pendingRequests == null) {
return "(null)";
}
StringBuffer sb = new StringBuffer();
Iterator it = this.pendingRequests.iterator();
while (it.hasNext()) {
Object req = it.next();
sb.append("[");
sb.append(req.toString());
sb.append("]");
}
return sb.toString();
}
/**
* Returns string representation of this object.
* <p>
* Synchronizes on this grant token.
*/
@Override
public String toString() {
return toString(true);
}
/**
* Returns string representation of this object.
* <p>
* Synchronizes on this grant token.
* <p>
*
* @param displayPendingRequests true if string should include pendingRequests
*/
public String toString(boolean displayPendingRequests) {
StringBuffer sb = new StringBuffer("DLockGrantToken");
sb.append("@").append(Integer.toHexString(hashCode()));
synchronized (this) {
sb.append(" {name: ").append(getName());
sb.append(", isGranted: ").append(isLeaseHeld());
sb.append(", isDestroyed: ").append(this.destroyed);
sb.append(", accessCount: ").append(this.accessCount);
sb.append(", lessee: ").append(this.lessee);
sb.append(", leaseExpireTime: ").append(this.leaseExpireTime);
sb.append(", leaseId: ").append(this.leaseId);
sb.append(", lesseeThread: ").append(this.lesseeThread);
if (displayPendingRequests) {
sb.append(", pendingRequests: ").append(pendingRequestsToString());
}
sb.append("}");
}
return sb.toString();
}
/**
* Returns the name of this lock.
*
* @return the name of this lock
*/
Object getName() {
return this.lockName;
}
/**
* Returns true if this lock represents suspend locking.
*
* @return true if this lock represents suspend locking
* @see org.apache.geode.distributed.DistributedLockService#suspendLocking(long)
*/
boolean isSuspendLockingToken() {
return DLockService.SUSPEND_LOCKING_TOKEN.equals(this.lockName);
}
/**
* Returns the lock id used to lease this lock.
* <p>
* Caller must synchronize on this grant token.
*
* @return the lock id used to lease this lock guarded.By this
*/
int getLockId() {
return this.leaseId;
}
/**
* Returns the identity of the thread that has this lock leased.
* <p>
* Caller must synchronize on this grant token.
*
* @return the identity of the thread that has this lock leased guarded.By this
*/
RemoteThread getRemoteThread() {
return this.lesseeThread;
}
/**
* Increments or decrements access count by the specified amount.
* <p>
* Synchronizes on this grant token.
*
* @param amount the amount to inc or dec access count by
*/
private synchronized void incAccess(int amount) {
if (amount < 0) {
Assert.assertTrue(this.accessCount - amount >= 0,
amount + " cannot be subtracted from accessCount " + this.accessCount);
}
this.accessCount += amount;
}
/**
* Increments the access count by one.
* <p>
* Synchronizes on this grant token.
*/
void incAccess() {
incAccess(1);
}
/**
* Decrements the access count by one.
* <p>
* Synchronizes on this grant token.
*/
void decAccess() {
incAccess(-1);
}
/**
* Returns true if the access count is greater than zero.
* <p>
* Synchronizes on this grant token.
*
* @return true if the access count is greater than zero
*/
boolean isBeingAccessed() {
synchronized (this) {
return this.accessCount > 0;
}
}
/**
* Returns the member that currently holds a lease on this lock.
* <p>
* Synchronizes on this grant token.
*
* @return the member that currently holds a lease on this lock
*/
public synchronized InternalDistributedMember getOwner() {
return this.lessee;
}
/**
* Returns the lease expiration time. This the absolute time in milliseconds when the current
* lease will expire.
* <p>
* Caller must synchronize on this grant token.
*
* Concurrency: protected by synchronization of *this* DLockGrantToken
*
* @return the lease expiration time
*/
public long getLeaseExpireTime() {
return this.leaseExpireTime;
}
/**
* Returns true if this grant token has been destroyed.
* <p>
* Synchronizes on this grant token.
*
* @return true if this grant token has been destroyed
*/
public synchronized boolean isDestroyed() {
return this.destroyed;
}
/**
* Returns the current time in milliseconds.
*
* @return the current time in milliseconds
*/
long getCurrentTime() {
return DLockService.getLockTimeStamp(this.grantor.dm);
}
/**
* Handle expiration if the lease expire time has been reached for the current lease on this
* grant token.
* <p>
* Synchronizes on this grant token.
*
* @return true if the lease is expired
*/
synchronized boolean checkForExpiration() {
if (this.lessee != null && this.leaseId > -1) {
if (this.leaseExpireTime == Long.MAX_VALUE)
return false;
long currentTime = getCurrentTime();
if (currentTime > this.leaseExpireTime) {
// expired!
final RemoteThread rThread = this.lesseeThread;
this.lessee = null;
this.leaseId = -1;
this.lesseeThread = null;
this.leaseExpireTime = -1;
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[checkForExpiration] Expired token at {}: {}",
currentTime, toString(true));
}
this.grantor.postReleaseLock(rThread, this.lockName);
return true;
}
}
return false;
}
/**
* Grants this lock.
* <p>
* Caller must synchronize on this grant token.
*
* @param owner the member that is being granted the lock
* @param newLeaseExpireTime the absolute expiration time
* @param lockId the lock id used to request the lock
* @param remoteThread identity of the locking thread guarded.By this
*/
void grantLock(InternalDistributedMember owner, long newLeaseExpireTime, int lockId,
RemoteThread remoteThread) {
Assert.assertTrue(remoteThread != null);
checkDestroyed();
basicGrantLock(owner, newLeaseExpireTime, lockId, remoteThread);
}
/**
* Modify grant token state to mark the lock as granted.
* <p>
* Caller must synchronize on this grant token.
*
* @param owner the member that has been granted the lock
* @param newLeaseExpireTime the absolute expiration time
* @param lockId the lock id used to request the lock
* @param remoteThread identity of the locking thread guarded.By this
*/
private void basicGrantLock(InternalDistributedMember owner, long newLeaseExpireTime,
int lockId, RemoteThread remoteThread) {
Assert.assertTrue(remoteThread != null);
Assert.assertTrue(lockId > -1, "Invalid attempt to grant lock with lockId " + lockId);
this.lessee = owner;
this.leaseExpireTime = newLeaseExpireTime;
this.leaseId = lockId;
this.lesseeThread = remoteThread;
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantToken.grantLock.grantor] Granting {}",
toString(false));
}
}
/**
* Returns true if this lock is currently leased out.
* <p>
* Caller must synchronize on this grant token.
*
* @return true if this lock is currently leased out guarded.By this
*/
boolean isLeaseHeld() {
return this.lessee != null && this.leaseId > -1;
}
/**
* Mark this grant token as destroyed. This should only happen to a token that is no longer in
* use.
* <p>
* Caller must synchronize on this grant token. guarded.By this
*/
void destroy() {
if (!this.destroyed) {
this.destroyed = true;
this.dlock.getStats().incGrantTokens(-1);
if (this.pendingRequests != null) {
this.dlock.getStats().incPendingRequests(-this.pendingRequests.size());
this.dlock.getStats().incRequestQueues(-1);
}
}
}
/**
* Throws IllegalStateException if this grant token has been destroyed.
* <p>
* Caller must synchronize on this grant token.
*
* @throws IllegalStateException if this grant token has been destroyed guarded.By this
*/
private void checkDestroyed() {
if (this.destroyed) {
String s = "Attempting to use destroyed grant token: " + this;
IllegalStateException e = new IllegalStateException(s);
throw e;
}
}
/**
* Called by the grantor. Releases lock on this token if it is currently locked by the specified
* member and lockId.
* <p>
* Caller must synchronize on this grant token.
*
* @param member the member to release the lock from
* @param lockId the lock id that the member used when locking
* @return true if lock was released guarded.By this
*/
private boolean releaseLock(InternalDistributedMember member, int lockId) {
if (lockId == -1)
return false;
checkDestroyed();
if (isLeaseHeldBy(member, lockId)) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.releaseLock] releasing ownership: {}", this);
}
this.lessee = null;
this.leaseId = -1;
this.lesseeThread = null;
this.leaseExpireTime = -1;
return true;
}
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantToken.releaseLock] {} attempted to release: {}", member, this);
}
return false;
}
/**
* Returns true if the sender holds a lease on this lock using lockId.
* <p>
* Caller must synchronize on this grant token.
*
* @param sender the member that potentially holds a lease
* @param lockId the lock id provided by the member
* @return true if the sender holds a lease on this lock guarded.By this
*/
private boolean isLeaseHeldBy(InternalDistributedMember sender, int lockId) {
Assert.assertTrue(sender != null, "sender is null: " + this);
Assert.assertTrue(lockId > -1, "lockId is < 0: " + this);
return sender.equals(this.lessee) && lockId == this.leaseId;
}
}
// -------------------------------------------------------------------------
// DLockGrantorThread (static inner class)
// -------------------------------------------------------------------------
/**
* Thread dedicated to handling background tasks for this grantor.
*/
private static class DLockGrantorThread extends LoggingThread {
private static final long MAX_WAIT = 60 * 1000; // 60 seconds...
private volatile boolean shutdown = false;
private boolean waiting = false;
private boolean requireTimeToWait = false;
private boolean goIntoWait = false;
private long timeToWait = MAX_WAIT;
private long expectedWakeupTimeStamp = 0;
private final Object lock = new Object();
private final DLockGrantor grantor;
private final CancelCriterion stopper;
/** Time in millis that next pending request will timeout */
private long nextTimeout = DLockGrantorThread.MAX_WAIT;
/** Time in millis that next lock is due to expire */
private long nextExpire = DLockGrantorThread.MAX_WAIT;
DLockGrantorThread(DLockGrantor grantor, CancelCriterion stopper) {
super("Lock Grantor for " + grantor.dlock.getName());
this.grantor = grantor;
this.stopper = stopper;
}
private long now() {
DistributionManager dm = this.grantor.dlock.getDistributionManager();
return DLockService.getLockTimeStamp(dm);
}
protected void shutdown() {
this.shutdown = true;
this.interrupt();
}
protected void checkTimeToWait(long newTimeToWaitArg, boolean expire) {
long newTimeToWait = newTimeToWaitArg;
if (newTimeToWait == Long.MAX_VALUE) {
// never expire
return;
} else if (newTimeToWait < 0) {
// negative means already expired or timed out so we wakeup immediately
newTimeToWait = 0;
}
synchronized (this.lock) {
if (expire && newTimeToWait < this.nextExpire) {
this.nextExpire = newTimeToWait;
}
if (!expire && newTimeToWait < this.nextTimeout) {
this.nextTimeout = newTimeToWait;
}
if (newTimeToWait < this.timeToWait) {
if (this.waiting) {
long newWakeupTimeStamp = now() + newTimeToWait;
if (newWakeupTimeStamp > -1 // accounts for overflow
&& newWakeupTimeStamp < this.expectedWakeupTimeStamp) {
this.timeToWait = newTimeToWait;
this.requireTimeToWait = true;
this.goIntoWait = true;
this.lock.notify();
}
} else {
this.timeToWait = newTimeToWait;
this.requireTimeToWait = true;
}
} // end if newTimeToWait
} // end sync this.lock
}
@Override
public void run() {
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
DistributedLockStats stats = this.grantor.dlock.getStats();
boolean recalcTimeToWait = false;
while (!this.shutdown) {
if (stopper.isCancelInProgress()) {
break; // done
}
try {
// go into wait if we know we have no timeouts or expires for a while
synchronized (this.lock) { // synchronized
if (recalcTimeToWait || this.requireTimeToWait) {
recalcTimeToWait = false;
long nextTS = Math.min(this.nextExpire, this.nextTimeout);
this.nextExpire = Long.MAX_VALUE;
this.nextTimeout = Long.MAX_VALUE;
if (nextTS != Long.MAX_VALUE || this.requireTimeToWait) {
this.requireTimeToWait = false;
long now = now();
// fix bug 39355 by using current timeToWait if smaller
long newTimeToWait = nextTS - now;
if (this.requireTimeToWait) {
this.timeToWait = Math.min(this.timeToWait, newTimeToWait);
} else {
this.timeToWait = newTimeToWait;
}
if (this.timeToWait < 0)
this.timeToWait = 0;
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"DLockGrantorThread will wait for {} ms. nextExpire={} nextTimeout={} now={}",
this.timeToWait, this.nextExpire, this.nextTimeout, now);
}
} else {
this.timeToWait = Long.MAX_VALUE;
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"DLockGrantorThread will wait until rescheduled.");
}
}
}
if (this.timeToWait > 0) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"DLockGrantorThread is about to wait for {} ms.", this.timeToWait);
}
if (this.timeToWait != Long.MAX_VALUE) {
this.expectedWakeupTimeStamp = now() + this.timeToWait;
if (this.expectedWakeupTimeStamp < 0) {
// overflow
this.expectedWakeupTimeStamp = Long.MAX_VALUE;
}
} else {
this.expectedWakeupTimeStamp = Long.MAX_VALUE;
}
if (this.expectedWakeupTimeStamp == Long.MAX_VALUE) {
while (!this.goIntoWait) {
this.waiting = true;
this.lock.wait(); // spurious wakeup ok
this.waiting = false;
}
} else {
long timeToWaitThisTime = this.timeToWait;
for (;;) {
this.waiting = true;
this.lock.wait(timeToWaitThisTime); // spurious wakeup ok
this.waiting = false;
if (this.goIntoWait)
break; // out of for loop
timeToWaitThisTime = this.expectedWakeupTimeStamp - now();
if (timeToWaitThisTime <= 0)
break; // out of for loop
}
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "DLockGrantorThread has woken up...");
}
if (this.shutdown)
break;
// if goIntoWait, continue back around and enter wait again
if (this.goIntoWait) {
this.goIntoWait = false;
continue;
}
}
} // synchronized
long statStart = stats.startGrantorThread();
try {
Collection grants = this.grantor.snapshotGrantTokens();
// TASK: expire and grant locks
if (this.shutdown) {
return;
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"DLockGrantorThread about to expireAndGrantLocks...");
}
{
long smallestExpire = this.grantor.expireAndGrantLocks(grants.iterator());
synchronized (this.lock) {
if (smallestExpire < this.nextExpire) {
this.nextExpire = smallestExpire;
}
}
}
long timing = stats.endGrantorThreadExpireAndGrantLocks(statStart);
// TASK: timeout waiting requests
if (this.shutdown) {
return;
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"DLockGrantorThread about to handleRequestTimeouts...");
}
{
long smallestRequestTimeout = this.grantor.handleRequestTimeouts(grants.iterator());
long smallestSuspendTimeout = this.grantor.handleSuspendTimeouts();
synchronized (this.lock) {
if (smallestRequestTimeout < this.nextTimeout) {
this.nextTimeout = smallestRequestTimeout;
}
if (smallestSuspendTimeout < this.nextTimeout) {
this.nextTimeout = smallestSuspendTimeout;
}
}
}
timing = stats.endGrantorThreadHandleRequestTimeouts(timing);
// TASK: remove unused tokens
if (this.shutdown) {
return;
}
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"DLockGrantorThread about to removeUnusedGrants...");
}
this.grantor.removeUnusedGrants(grants.iterator());
stats.endGrantorThreadRemoveUnusedTokens(timing);
} catch (CancelException e) {
// so, exit then.
} finally {
recalcTimeToWait = true;
stats.endGrantorThread(statStart);
}
} catch (LockGrantorDestroyedException ex) {
this.shutdown = true;
return;
} catch (InterruptedException e) {
// shutdown probably interrupted us
// Not necessary to reset the interrupt bit, we're going to go
// away of our own accord.
if (this.shutdown) {
// ok to ignore since this thread will now shutdown
} else {
logger.warn("DLockGrantorThread was unexpectedly interrupted",
e);
// do not set interrupt flag since this thread needs to resume
stopper.checkCancelInProgress(e);
}
}
}
}
}
// -------------------------------------------------------------------------
// MembershipListener inner classes
// -------------------------------------------------------------------------
/** Detects loss of the lock grantor and initiates grantor recovery. */
private MembershipListener membershipListener = new MembershipListener() {
@Override
public void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {}
@Override
public void memberDeparted(DistributionManager distMgr, final InternalDistributedMember id,
final boolean crashed) {
final DLockGrantor me = DLockGrantor.this;
// if the VM is being forcibly disconnected, we shouldn't release locks as it
// will take longer than the time allowed by the InternalDistributedSystem
// shutdown mechanism.
if (distMgr.getCancelCriterion().isCancelInProgress()) {
return;
}
final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
try {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.memberDeparted] waiting thread pool will process id={}", id);
}
distMgr.getExecutors().getWaitingThreadPool().execute(new Runnable() {
@Override
public void run() {
try {
processMemberDeparted(id, crashed, me);
} catch (InterruptedException e) {
// ignore
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "Ignored interrupt processing departed member");
}
}
}
});
} catch (RejectedExecutionException e) {
if (isDebugEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE,
"[DLockGrantor.memberDeparted] rejected handling of id={}", id);
}
}
}
protected void processMemberDeparted(InternalDistributedMember id, boolean crashed,
DLockGrantor me) throws InterruptedException {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.processMemberDeparted] id={}", id);
}
try {
me.waitWhileInitializing();
// one cause of bug 32657 is "if (crashed) {" around handleDepartureOf
// ... we cannot rely on the value of crashed to determine if grantor
// has or will receive a NonGrantorDestroy message
me.handleDepartureOf(id);
} catch (LockGrantorDestroyedException e) {
// ignore... grantor was destroyed
} // outer try-catch
}
};
}