blob: bbbfa41f6c15f95f4082bf601bc17f2e409fae3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal.locks;
import java.util.WeakHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.geode.distributed.LeaseExpiredException;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
/**
* A DistributedLockService contains a collection of DLockToken instances, one for each name in that
* DistributedLockService for which a lock has ever been requested. The token identifies whether
* that name is currently locked, and which distribution manager and thread owns the lock.
*
*/
public class DLockToken {
private static final Logger logger = LogService.getLogger();
// -------------------------------------------------------------------------
// Instance variables
// -------------------------------------------------------------------------
/**
* Lock name for this lock. Logically final but set by fromData.
*/
private final Object name;
/**
* DistributionManager using this lock token. Reference is used to identify local member identity
* and to {@link DLockService#getLockTimeStamp(DistributionManager)}.
*/
private final DistributionManager dm;
/**
* The reply processor id is used to identify the distinct lease which a thread has used to lease
* this lock.
*/
private int leaseId = -1;
/**
* The absolute time at which the current lease on this lock will expire. -1 represents a lease
* which will not expire until explicitly released.
*/
private long leaseExpireTime = -1;
/**
* Remotable identity of thread currently leasing this lock.
*/
private RemoteThread lesseeThread = null;
/**
* Counter that indicates number of times this lock has been re-entered for the current lease.
*/
private int recursion;
/**
* Tracks expired leases so that the leasing thread can report a
* {@link org.apache.geode.distributed.LeaseExpiredException}. Keys are threads that have had
* their lease expire on this lock, but may not yet have noticed. Would use weak set if available.
* Entry is removed upon throwing LeaseExpiredException. Protected by synchronization on this lock
* token.
*/
private WeakHashMap expiredLeases;
/**
* Actual local thread that currently has a lease on this lock.
*/
private Thread thread;
/**
* Number of usages of this lock token. usageCount = recursion + (# of threads waiting for this
* lock). It's weird, I know.
*/
private int usageCount = 0;
/**
* True if this lock token has been destroyed to free up resources.
*/
private boolean destroyed = false;
/**
* True if this lock token should be ignored for remote grantor recovery.
*/
private boolean ignoreForRecovery = false;
// -------------------------------------------------------------------------
// Constructors
// -------------------------------------------------------------------------
/**
* Instantiates a new DLockToken for use by {@link DLockService}.
*
* @param dm the DistributionManager for this member
* @param name the identifying name of this lock
*/
public DLockToken(DistributionManager dm, Object name) {
this.dm = dm;
this.name = name;
}
// -------------------------------------------------------------------------
// Public accessors
// -------------------------------------------------------------------------
/**
* Returns the lock re-entry recursion of the current lease or -1 if there is no current lease.
* Caller must synchronize on this lock token.
* <p>
* Public because {@link org.apache.geode.internal.admin.remote.RemoteDLockInfo} is a caller.
*
* @return the lock re-entry recursion of the current lease or -1 if none
*/
public int getRecursion() {
return this.recursion;
}
/**
* Returns the name of the actual local thread leasing this lock or null if there is no lease.
* Caller must synchronize on this lock token.
* <p>
* Public because {@link org.apache.geode.internal.admin.remote.RemoteDLockInfo} is a caller.
*
* @return the name of the actual local thread leasing this lock or null
*/
public String getThreadName() {
return this.thread == null ? null : this.thread.getName();
}
/**
* Returns the actual local thread leasing this lock or null if there is no lease.
*/
public synchronized Thread getThread() {
return this.thread;
}
/**
* Returns the absolute time at which the current lease will expire or -1 if there is no lease.
* Caller must synchronize on this lock token.
* <p>
* Public because {@link org.apache.geode.internal.admin.remote.RemoteDLockInfo} is a caller.
*
* @return the absolute time at which the current lease will expire or -1
*/
public long getLeaseExpireTime() {
return this.leaseExpireTime;
}
public int getUsageCount() {
return this.usageCount;
}
// -------------------------------------------------------------------------
// Package accessors
// -------------------------------------------------------------------------
/**
* Returns the identifying name of this lock. Caller must synchronize on this lock token if
* instance was deserialized.
*
* @return the identifying name of this lock
*/
Object getName() {
return this.name;
}
/**
* Returns the lease id currently used to hold a lease on this lock or -1 if no thread currently
* holds this lock. Caller must synchronize on this token.
*
* @return the id of the current lease on this lock or -1 if none
*/
int getLeaseId() {
return this.leaseId;
}
/**
* Returns the remotable identity of the thread currently leasing this lock or null if no thread
* currently holds this lock. Caller must synchronize on this lock token.
*
* @return identity of the thread holding the current lease or null if none
*/
RemoteThread getLesseeThread() {
return this.lesseeThread;
}
/**
* Increment usage count for this lock token. Caller must synchronize on this lock token.
*/
void incUsage() {
incUsage(1);
}
/**
* Decrement usage count for this lock token. Caller must synchronize on this lock token.
*/
void decUsage() {
incUsage(-1);
}
/**
* Returns true if the usage count for this lock token is greater than zero. Caller must
* synchronize on this lock token.
*
* @return true if the usage count for this lock token is greater than zero
*/
boolean isBeingUsed() {
return this.usageCount > 0;
}
// -------------------------------------------------------------------------
// Package operations
// -------------------------------------------------------------------------
/**
* Destroys this lock token.
*/
synchronized void destroy() {
this.destroyed = true;
}
/**
* Returns the current time in absolute milliseconds for use calculating lease expiration times.
*
* @return the current time in absolute milliseconds
*/
long getCurrentTime() {
if (this.dm == null)
return -1;
return DLockService.getLockTimeStamp(this.dm);
}
/**
* Throws LeaseExpiredException if the calling thread's lease on this lock previously expired. The
* expired lease will no longer be tracked after throwing LeaseExpiredException. Caller must
* synchronize on this lock token.
*
* @throws LeaseExpiredException if calling thread's lease expired
*/
void throwIfCurrentThreadHadExpiredLease() throws LeaseExpiredException {
if (this.expiredLeases == null) {
return;
}
if (this.expiredLeases.containsKey(Thread.currentThread())) {
this.expiredLeases.remove(Thread.currentThread());
throw new LeaseExpiredException(
"This thread's lease expired for this lock");
}
}
/**
* Checks the current lease for expiration and returns true if it has been marked as expired.
* Caller must synchronize on this lock token.
*
* @return true if the current lease has been marked as expired
*/
boolean checkForExpiration() {
boolean expired = false;
// check if lease exists and lease expire is not MAX_VALUE
if (this.leaseId > -1 && this.leaseExpireTime < Long.MAX_VALUE) {
long currentTime = getCurrentTime();
if (currentTime > this.leaseExpireTime) {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[checkForExpiration] Expiring token at {}: {}",
currentTime, this);
}
noteExpiredLease();
basicReleaseLock();
expired = true;
}
}
return expired;
}
/**
* Grants new lease to calling thread for this lock token. Synchronizes on this lock token.
*
* @param newLeaseExpireTime absolute expiration in millis or Long.MAX_VALUE
* @param newLeaseId uniquely identifies the lease for this thread
* @param newRecursion recursion count if lock has been re-entered
* @param remoteThread identity of the leasing thread
* @return true if lease for this lock token is successfully granted
*/
synchronized void grantLock(long newLeaseExpireTime, int newLeaseId, int newRecursion,
RemoteThread remoteThread) {
Assert.assertTrue(remoteThread != null);
Assert.assertTrue(newLeaseId > -1, "Invalid attempt to grant lock with leaseId " + newLeaseId);
checkDestroyed();
checkForExpiration(); // TODO: this should throw.
this.ignoreForRecovery = false;
this.leaseExpireTime = newLeaseExpireTime;
this.leaseId = newLeaseId;
this.lesseeThread = remoteThread;
this.recursion = newRecursion;
this.thread = Thread.currentThread();
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockToken.grantLock.client] granted {}", this);
}
}
/**
* Returns true if there's currently a lease on this lock token. Synchronizes on this lock token.
*
* @return true if there's currently a lease on this lock token
*/
synchronized boolean isLeaseHeld() {
return this.leaseId > -1;
}
/**
* Returns true if lease on this lock token is held by calling thread or the specified remote
* thread. Caller must synchronize on this lock token.
*
* @param remoteThread remotable identity of thread to check for
* @return true if lease is held by calling thread or remote thread
*/
boolean isLeaseHeldByCurrentOrRemoteThread(RemoteThread remoteThread) {
if (isLeaseHeldByCurrentThread()) {
return true;
} else {
return this.lesseeThread != null && remoteThread != null
&& this.lesseeThread.equals(remoteThread);
}
}
/**
* Returns true if lease on this lock token is held by calling thread. Caller must synchronize on
* this lock token.
*
* @return true if lease is held by calling thread
*/
boolean isLeaseHeldByCurrentThread() {
return this.thread == Thread.currentThread();
}
/**
* Returns true if this lock token should be ignored for grantor recovery. Caller must synchronize
* on this lock token.
*
* @return true if this lock token should be ignored for grantor recovery
*/
synchronized boolean ignoreForRecovery() {
return this.ignoreForRecovery;
}
/**
* Sets whether or not this lock token should be ignored for grantor recovery. Caller must
* synchronize on this lock token.
*
* @param value true if this lock token should be ignored for grantor recovery
*/
void setIgnoreForRecovery(boolean value) {
this.ignoreForRecovery = value;
}
/**
* Releases the current lease on this lock token. Synchronizes on this lock token.
*
* @param leaseIdToRelease lease id to release
* @param remoteThread identity of thread holding lease
* @return true if lock was successfully released
*/
synchronized boolean releaseLock(int leaseIdToRelease, RemoteThread remoteThread) {
return releaseLock(leaseIdToRelease, remoteThread, true);
}
/**
* Releases the current lease on this lock token. Synchronizes on this lock token.
*
* @param leaseIdToRelease lease id to release
* @param remoteThread identity of thread holding lease
* @param decRecursion true if recursion should be decremented
* @return true if lock was successfully released
*/
synchronized boolean releaseLock(int leaseIdToRelease, RemoteThread remoteThread,
boolean decRecursion) {
if (leaseIdToRelease == -1)
return false;
if (this.destroyed) {
return true;
}
// return false if not locked by calling thread
if (!isLeaseHeld(leaseIdToRelease) || !isLeaseHeldByCurrentOrRemoteThread(remoteThread)) {
return false;
}
// reduce recursion if recursion > 0
else if (decRecursion && getRecursion() > 0) {
incRecursion(-1);
decUsage();
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockToken.releaseLock] decremented recursion: {}",
this);
}
return true;
}
// release lock entirely
else {
basicReleaseLock();
return true;
}
}
/**
* Nulls out current lease and decrements usage count. Caller must be synchronized on this lock
* token.
*/
private void basicReleaseLock() {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockToken.basicReleaseLock] releasing ownership: {}",
this);
}
this.leaseId = -1;
this.lesseeThread = null;
this.leaseExpireTime = -1;
this.thread = null;
this.recursion = 0;
this.ignoreForRecovery = false;
decUsage();
}
// -------------------------------------------------------------------------
// Private implementation methods
// -------------------------------------------------------------------------
/**
* Returns true if lease is held using specified lease id. Caller must synchronize on this lock
* token.
*
* @param memberLeaseId lease id used by member
* @return true if lease is held using specified lease id
*/
private boolean isLeaseHeld(int memberLeaseId) {
return memberLeaseId == this.leaseId;
}
/**
* Increments or decrements usage count by the specified amount. Caller must synchronize on this
* lock token.
*
* @param amount the amount to inc or dec usage count by
*/
private void incUsage(int amount) {
if (amount < 0 && !this.destroyed) {
Assert.assertTrue(this.usageCount - amount >= 0,
amount + " cannot be subtracted from usageCount " + this.usageCount);
}
this.usageCount += amount;
}
/**
* Increments or decrements recursion by the specified amount. Caller must synchronize on this
* lock token.
*
* @param amount the amount to inc or dec recursion by
*/
private void incRecursion(int amount) {
if (amount < 0) {
Assert.assertTrue(this.recursion - amount >= 0,
amount + " cannot be subtracted from recursion " + this.recursion);
}
this.recursion += amount;
}
/**
* Throws IllegalStateException if this lock token has been destroyed. Caller must synchronize on
* this lock token.
*
* @throws IllegalStateException if this lock token has been destroyed
*/
private void checkDestroyed() {
if (this.destroyed) {
IllegalStateException e = new IllegalStateException(
String.format("Attempting to use destroyed token: %s", this));
throw e;
}
}
/**
* Record the token's owning thread as having lost its lease, so it can throw an exception later
* if it tries to unlock. A weak reference to the thread is used. Caller must synchronize on this
* lock token.
*/
private void noteExpiredLease() {
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[noteExpiredLease] {}", this.thread);
}
if (this.expiredLeases == null) {
this.expiredLeases = new WeakHashMap();
}
this.expiredLeases.put(this.thread, null);
}
// -------------------------------------------------------------------------
// java.lang.Object methods
// -------------------------------------------------------------------------
/**
* Returns a string representation of this object.
*/
@Override
public String toString() {
synchronized (this) {
return "DLockToken" + "@" + Integer.toHexString(hashCode()) + ", name: " + this.name
+ ", thread: <" + getThreadName() + ">" + ", recursion: " + this.recursion
+ ", leaseExpireTime: " + this.leaseExpireTime + ", leaseId: " + this.leaseId
+ ", ignoreForRecovery: " + this.ignoreForRecovery + ", lesseeThread: "
+ this.lesseeThread + ", usageCount: " + this.usageCount + ", currentTime: "
+ getCurrentTime();
}
}
}