| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.distributed.internal.locks; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.StatisticsFactory; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.distributed.DistributedLockService; |
| import org.apache.geode.distributed.DistributedSystem; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.LeaseExpiredException; |
| import org.apache.geode.distributed.LockNotHeldException; |
| import org.apache.geode.distributed.LockServiceDestroyedException; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.InternalDistributedSystem; |
| import org.apache.geode.distributed.internal.ResourceEvent; |
| import org.apache.geode.distributed.internal.deadlock.UnsafeThreadLocal; |
| import org.apache.geode.distributed.internal.locks.DLockQueryProcessor.DLockQueryReplyMessage; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.OSProcess; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.serialization.DataSerializableFixedID; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.util.StopWatch; |
| import org.apache.geode.internal.util.concurrent.FutureResult; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Implements the distributed locking service with distributed lock grantors. |
| * |
| */ |
| public class DLockService extends DistributedLockService { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final long NOT_GRANTOR_SLEEP = Long |
| .getLong(DistributionConfig.GEMFIRE_PREFIX + "DLockService.notGrantorSleep", 100).longValue(); |
| |
| private static final boolean DEBUG_NONGRANTOR_DESTROY_LOOP = Boolean |
| .getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DLockService.debug.nonGrantorDestroyLoop"); |
| |
| private static final int DEBUG_NONGRANTOR_DESTROY_LOOP_COUNT = Integer |
| .getInteger( |
| DistributionConfig.GEMFIRE_PREFIX + "DLockService.debug.nonGrantorDestroyLoopCount", 20) |
| .intValue(); |
| |
| private static final boolean AUTOMATE_FREE_RESOURCES = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DLockService.automateFreeResources"); |
| |
| static final int INVALID_LEASE_ID = -1; |
| |
| /** Unique name for this instance of the named locking service */ |
| protected final String serviceName; |
| |
| /** DistributionManager for this member */ |
| private final DistributionManager dm; |
| |
| /** |
| * DistributedSystem connection for this member (used for DisconnectListener, logging, etc) |
| */ |
| protected final InternalDistributedSystem ds; |
| |
| /** Known lock tokens for this service. Key:Object(name), Value:DLockToken */ |
| private final Map<Object, DLockToken> tokens = new HashMap<Object, DLockToken>(); |
| |
| /** |
| * True if this member has destroyed this named locking service. Field is volatile only because |
| * it's referenced in {@link #toString()} (never synchronize in <code>toString</code>). |
| */ |
| private volatile boolean destroyed = false; |
| |
| /** |
| * True if this is a distributed lock service; false if local to this vm only. TX has a "local" |
| * lock service which sets this to false. |
| */ |
| private final boolean isDistributed; |
| |
| /** Optional handler for departure of lease holders; used by grantor */ |
| private DLockLessorDepartureHandler lessorDepartureHandler; |
| |
| /** |
| * Hook for transactions which allows custom behavior in processing DLockRecoverGrantorMessage |
| */ |
| private DLockRecoverGrantorProcessor.MessageProcessor recoverGrantorProcessor; |
| |
| /** Thread-safe reference to DistributedLockStats */ |
| private final DistributedLockStats dlockStats; |
| |
| /** |
| * Protects {@link #lockGrantorId}, {@link #grantor} and {@link #lockGrantorFutureResult}. Final |
| * granting of a lock occurs under this synchronization and only if <code>lockGrantorId</code> |
| * matches the grantor that granted the lock. |
| */ |
| private final Object lockGrantorIdLock = new Object(); |
| |
| /** Identifies the current grantor for this lock service. */ |
| private LockGrantorId lockGrantorId; |
| |
| /** |
| * Local instance of the lock grantor if this process is the grantor. This field is volatile for |
| * one use: 1) {@link #toString()} which should not use synchronization due to potential for wrong |
| * lock ordering. Can we make this non-volatile?? |
| */ |
| private volatile DLockGrantor grantor; |
| |
| /** |
| * Count of currently active locks and lock requests. Used to determine if destroy must tell the |
| * grantor to release all held locks. |
| */ |
| private int activeLocks = 0; |
| |
| /** True if this service should be destroyed in system DisconnectListener */ |
| private final boolean destroyOnDisconnect; |
| |
| /** True if this service should automatically freeResources */ |
| private final boolean automateFreeResources; |
| |
| /** Identifies the thread that is destroying this lock service. */ |
| private final ThreadLocal<Boolean> destroyingThread = new ThreadLocal<Boolean>(); |
| |
| /// ** Held during destory and creation of this lock service. */ |
| // private final Object serviceLock = new Object(); |
| |
| /** Protects access to {@link #destroyed} and {@link #activeLocks}. */ |
| private final Object destroyLock = new Object(); |
| |
| /** |
| * Created by the thread communicating directly with the elder. Other threads will wait on this |
| * and then use the resulting lockGrantorId. This ensures that only one message is sent to the |
| * elder and that only one thread does so at a time. Protected by {@link #lockGrantorIdLock} and |
| * holds a reference to a {@link LockGrantorId}. |
| * <p> |
| * Only outbound threads and operations should ever wait on this. Do NOT allow inbound threads to |
| * use the <code>lockGrantorFutureResult</code>. |
| */ |
| private FutureResult lockGrantorFutureResult; |
| |
| private final DLockStopper stopper; |
| |
| // ------------------------------------------------------------------------- |
| // State and concurrency construct methods |
| // ------------------------------------------------------------------------- |
| |
| public boolean isDestroyed() { |
| synchronized (this.destroyLock) { |
| if (this.destroyed) { |
| if (!isCurrentThreadDoingDestroy()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| } |
| |
| public void checkDestroyed() { |
| if (isDestroyed()) { |
| throw generateLockServiceDestroyedException(generateLockServiceDestroyedMessage()); |
| } |
| } |
| |
| /** |
| * Create a new LockServiceDestroyedException for this lock service. |
| * |
| * @param message the detail message that explains the exception |
| * @return new LockServiceDestroyedException |
| */ |
| protected LockServiceDestroyedException generateLockServiceDestroyedException(String message) { |
| return new LockServiceDestroyedException(message); |
| } |
| |
| /** |
| * Returns the string message to use in a LockServiceDestroyedException for this lock service. |
| * |
| * @return the detail message that explains LockServiceDestroyedException |
| */ |
| protected String generateLockServiceDestroyedMessage() { |
| return String.format("%s has been destroyed", this); |
| } |
| |
| /** |
| * Returns true if {@link #lockGrantorId} is the same as the specified LockGrantorId. Caller must |
| * synchronize on {@link #lockGrantorIdLock}. |
| * |
| * @param someLockGrantorId the LockGrantorId to check |
| */ |
| private boolean checkLockGrantorId(LockGrantorId someLockGrantorId) { |
| Assert.assertHoldsLock(this.lockGrantorIdLock, true); |
| if (this.lockGrantorId == null) { |
| return false; |
| } |
| return this.lockGrantorId.sameAs(someLockGrantorId); |
| } |
| |
| /** |
| * Returns true if lockGrantorId is the same as the specified LockGrantorId. Caller must |
| * synchronize on lockGrantorIdLock. |
| * |
| * @param someLockGrantorId the LockGrantorId to check |
| */ |
| public boolean isLockGrantorId(LockGrantorId someLockGrantorId) { |
| synchronized (this.lockGrantorIdLock) { |
| return checkLockGrantorId(someLockGrantorId); |
| } |
| } |
| |
| private boolean isCurrentThreadDoingDestroy() { |
| return Boolean.TRUE.equals(this.destroyingThread.get()); |
| } |
| |
| private void setDestroyingThread() { |
| this.destroyingThread.set(Boolean.TRUE); |
| } |
| |
| private void clearDestroyingThread() { |
| this.destroyingThread.remove(); |
| } |
| |
| private InternalDistributedMember getElderId() { |
| InternalDistributedMember elder = this.dm.getElderId(); |
| if (elder == null) { |
| this.dm.getSystem().getCancelCriterion().checkCancelInProgress(null); |
| } |
| Assert.assertTrue(elder != null); |
| return elder; |
| } |
| |
| /** |
| * Returns id of the current lock grantor for this service. If necessary, a request will be sent |
| * to the elder to fetch this information. |
| */ |
| public LockGrantorId getLockGrantorId() { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| boolean ownLockGrantorFutureResult = false; |
| FutureResult lockGrantorFutureResultRef = null; |
| |
| long statStart = -1; |
| LockGrantorId theLockGrantorId = null; |
| while (theLockGrantorId == null) { |
| |
| ownLockGrantorFutureResult = false; |
| try { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| if (this.lockGrantorFutureResult != null) { |
| lockGrantorFutureResultRef = this.lockGrantorFutureResult; |
| } else if (this.lockGrantorId != null) { |
| return this.lockGrantorId; |
| } else { |
| ownLockGrantorFutureResult = true; |
| lockGrantorFutureResultRef = new FutureResult(this.dm.getCancelCriterion()); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[getLockGrantorId] creating lockGrantorFutureResult"); |
| } |
| this.lockGrantorFutureResult = lockGrantorFutureResultRef; |
| } |
| } |
| |
| statStart = getStats().startGrantorWait(); |
| if (!ownLockGrantorFutureResult) { |
| LockGrantorId lockGrantorIdRef = |
| waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, TimeUnit.MILLISECONDS); |
| if (lockGrantorIdRef != null) { |
| return lockGrantorIdRef; |
| } else { |
| continue; |
| } |
| } |
| |
| InternalDistributedMember elder = getElderId(); |
| Assert.assertTrue(elder != null); |
| |
| GrantorInfo gi = getGrantorRequest(); |
| theLockGrantorId = |
| new LockGrantorId(this.dm, gi.getId(), gi.getVersionId(), gi.getSerialNumber()); |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[getLockGrantorId] elder says grantor is {}", |
| theLockGrantorId); |
| } |
| |
| // elder tells us to be the grantor... |
| if (theLockGrantorId.isLocal(getSerialNumber())) { |
| boolean needsRecovery = gi.needsRecovery(); |
| if (!needsRecovery) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[getLockGrantorId] needsRecovery is false"); |
| } |
| synchronized (this.lockGrantorIdLock) { |
| // either no previous grantor or grantor is newer |
| Assert.assertTrue( |
| this.lockGrantorId == null || this.lockGrantorId.isNewerThan(theLockGrantorId) |
| || this.lockGrantorId.sameAs(theLockGrantorId), |
| this.lockGrantorId + " should be null or newer than or same as " |
| + theLockGrantorId); |
| } |
| } |
| if (!createLocalGrantor(elder, needsRecovery, theLockGrantorId)) { |
| theLockGrantorId = this.lockGrantorId; |
| } |
| } |
| |
| // elder says another member is the grantor |
| else { |
| synchronized (this.lockGrantorIdLock) { |
| if (!setLockGrantorId(theLockGrantorId)) { |
| theLockGrantorId = this.lockGrantorId; |
| } |
| } |
| } |
| } finally { |
| synchronized (this.lockGrantorIdLock) { |
| boolean getLockGrantorIdFailed = theLockGrantorId == null; |
| if (statStart > -1) { |
| getStats().endGrantorWait(statStart, getLockGrantorIdFailed); |
| } |
| if (ownLockGrantorFutureResult) { |
| // this thread is doing the real work and must finish the future |
| Assert.assertTrue(this.lockGrantorFutureResult == lockGrantorFutureResultRef); |
| if (getLockGrantorIdFailed) { |
| // failed so cancel lockGrantorFutureResult |
| lockGrantorFutureResultRef.cancel(false); |
| } else { |
| // succeeded so set lockGrantorFutureResult |
| lockGrantorFutureResultRef.set(theLockGrantorId); |
| } |
| // null out the reference so it is free for next usage |
| this.lockGrantorFutureResult = null; |
| } |
| } |
| } // finally block for lockGrantorFutureResult |
| } // while theLockGrantorId == null |
| return theLockGrantorId; |
| } |
| |
| /** |
| * Creates a local {@link DLockGrantor}. |
| * |
| * if (!createLocalGrantor(xxx)) { theLockGrantorId = this.lockGrantorId; } |
| * |
| * @param elder the elder that told us to be the grantor |
| * @param needsRecovery true if recovery is required |
| * @param myLockGrantorId lockGrantorId to use |
| * @return true if successfully created local grantor; false if aborted |
| */ |
| private boolean createLocalGrantor(InternalDistributedMember elder, boolean needsRecovery, |
| LockGrantorId myLockGrantorId) { |
| DLockGrantor myGrantor = |
| DLockGrantor.createGrantor(this, myLockGrantorId.getLockGrantorVersion()); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[createLocalGrantor] Calling makeLocalGrantor"); |
| } |
| return makeLocalGrantor(elder, needsRecovery, myLockGrantorId, myGrantor); |
| } |
| |
| private boolean makeLocalGrantor(InternalDistributedMember elder, boolean needsRecovery, |
| LockGrantorId myLockGrantorId, DLockGrantor myGrantor) { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| boolean success = false; |
| try { |
| synchronized (this.lockGrantorIdLock) { |
| if (isDestroyed()) { |
| checkDestroyed(); // exit |
| } |
| |
| InternalDistributedMember currentElder = getElderId(); |
| if (!currentElder.equals(elder)) { |
| // abort because elder changed |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "Failed to create {} because elder changed from {} to {}", myLockGrantorId, elder, |
| currentElder); |
| } |
| return false; // exit |
| } |
| |
| if (this.deposingLockGrantorId != null) { |
| if (this.deposingLockGrantorId.isNewerThan(myLockGrantorId)) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Failed to create {} because I was deposed by {}", |
| myLockGrantorId, this.deposingLockGrantorId); |
| } |
| this.deposingLockGrantorId = null; |
| return false; // exit |
| } |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{} failed to depose {}", |
| this.deposingLockGrantorId, myLockGrantorId); |
| } |
| // older grantor couldn't depose us, so null it out... |
| this.deposingLockGrantorId = null; |
| } |
| |
| if (!setLockGrantorId(myLockGrantorId, myGrantor)) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[getLockGrantorId] failed to create {} because current grantor is {}", |
| myLockGrantorId, this.lockGrantorId); |
| } |
| return false; // exit |
| } |
| } // release sync on this.lockGrantorIdLock |
| |
| // do NOT sync while doing recovery (because it waits for replies) |
| if (needsRecovery) { |
| boolean recovered = |
| DLockRecoverGrantorProcessor.recoverLockGrantor(this.dm.getDistributionManagerIds(), |
| this, // this lock service |
| myGrantor, this.dm, elder); // the elder that told us to be the grantor |
| if (!recovered) { |
| checkDestroyed(); |
| return false; // exit |
| } |
| } |
| |
| // after recovery, resynchronize on lockGrantorIdLock again |
| // check to see if myLockGrantorId has been deposed |
| synchronized (this.lockGrantorIdLock) { |
| if (isDestroyed()) { |
| checkDestroyed(); // exit |
| } |
| |
| if (this.deposingLockGrantorId != null) { |
| if (this.deposingLockGrantorId.isNewerThan(myLockGrantorId)) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Failed to create {} because I was deposed by {}", |
| myLockGrantorId, this.deposingLockGrantorId); |
| } |
| this.deposingLockGrantorId = null; |
| return false; // exit |
| } |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{} failed to depose {}", |
| this.deposingLockGrantorId, myLockGrantorId); |
| } |
| this.deposingLockGrantorId = null; |
| } |
| |
| if (checkLockGrantorId(myLockGrantorId)) { |
| success = myGrantor.makeReady(true); // do not enforce initializing |
| } |
| } |
| |
| return success; // exit |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Error e) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[makeLocalGrantor] throwing Error", e); |
| } |
| throw e; |
| } catch (RuntimeException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[makeLocalGrantor] throwing RuntimeException", e); |
| } |
| throw e; |
| } finally { |
| |
| try { |
| // abort if unsuccessful or if lock service was destroyed |
| if (!success || isDestroyed()) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[makeLocalGrantor] aborting {} and {}", |
| myLockGrantorId, myGrantor); |
| } |
| nullLockGrantorId(myLockGrantorId); |
| if (!myGrantor.isDestroyed()) { |
| myGrantor.destroy(); |
| } |
| } |
| } finally { |
| // assertion: grantor should now be either ready or destroyed! |
| |
| if (myGrantor.isInitializing() && !dm.getCancelCriterion().isCancelInProgress()) { |
| logger.error(LogMarker.DLS_MARKER, |
| "Grantor is still initializing"); |
| } |
| if (!success && !myGrantor.isDestroyed() && !dm.getCancelCriterion().isCancelInProgress()) { |
| logger.error(LogMarker.DLS_MARKER, |
| "Grantor creation was aborted but grantor was not destroyed"); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Set {@link #lockGrantorId} to the given new value if the current value is null or is an older |
| * grantor version. Caller must hold {@link #lockGrantorIdLock}. |
| * |
| * @param newLockGrantorId the new value for lockGrantorId |
| */ |
| private boolean setLockGrantorId(LockGrantorId newLockGrantorId) { |
| Assert.assertHoldsLock(this.lockGrantorIdLock, true); |
| if (equalsLockGrantorId(newLockGrantorId)) { |
| return true; |
| } else if (!newLockGrantorId.hasLockGrantorVersion()) { |
| // proceed with temporary placeholder used by become grantor |
| this.lockGrantorId = newLockGrantorId; |
| return true; |
| } else if (newLockGrantorId.isRemote() && this.lockGrantorId != null |
| && this.lockGrantorId.hasLockGrantorVersion()) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[setLockGrantorId] tried to replace {} with {}", |
| this.lockGrantorId, newLockGrantorId); |
| } |
| return false; |
| } else if (newLockGrantorId.isNewerThan(this.lockGrantorId)) { |
| this.lockGrantorId = newLockGrantorId; |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * Set {@link #lockGrantorId} to the <code>localLockGrantorId</code> if current value is null or |
| * is an older grantor version. This also atomically sets {@link #grantor} to ensure that the two |
| * fields are kept in sync. Caller must hold {@link #lockGrantorIdLock}. |
| * |
| * @param localLockGrantorId the new value for lockGrantorId |
| * @param localGrantor the new local intance of DLockGrantor |
| */ |
| private boolean setLockGrantorId(LockGrantorId localLockGrantorId, DLockGrantor localGrantor) { |
| Assert.assertHoldsLock(this.lockGrantorIdLock, true); |
| Assert.assertTrue(localLockGrantorId.isLocal(getSerialNumber())); |
| if (setLockGrantorId(localLockGrantorId)) { |
| this.grantor = localGrantor; |
| return true; |
| } |
| return false; |
| } |
| |
| private LockGrantorId deposingLockGrantorId; |
| |
| /** |
| * Deposes {@link #lockGrantorId} if <code>newLockGrantorId</code> is newer. |
| * |
| * @param newLockGrantorId the new lock grantor |
| */ |
| void deposeOlderLockGrantorId(LockGrantorId newLockGrantorId) { |
| LockGrantorId deposedLockGrantorId = null; |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| synchronized (this.lockGrantorIdLock) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[deposeOlderLockGrantorId] pre-deposing {} for new {}", |
| deposedLockGrantorId, newLockGrantorId); |
| } |
| this.deposingLockGrantorId = newLockGrantorId; |
| deposedLockGrantorId = this.lockGrantorId; |
| } |
| if (deposedLockGrantorId != null && deposedLockGrantorId.hasLockGrantorVersion() |
| && newLockGrantorId.isNewerThan(deposedLockGrantorId)) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[deposeOlderLockGrantorId] post-deposing {} for new {}", deposedLockGrantorId, |
| newLockGrantorId); |
| } |
| nullLockGrantorId(deposedLockGrantorId); |
| } |
| } |
| |
| /** |
| * Sets {@link #lockGrantorId} to null if the current value equals the expected old value. Caller |
| * must hold {@link #lockGrantorIdLock}. |
| * |
| * @param oldLockGrantorId the expected old value |
| * @return true if lockGrantorId was set to null |
| */ |
| private boolean nullLockGrantorId(LockGrantorId oldLockGrantorId) { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| Assert.assertHoldsLock(this.lockGrantorIdLock, false); |
| if (oldLockGrantorId == null) { |
| return false; |
| } |
| DLockGrantor grantorToDestroy = null; |
| try { |
| synchronized (this.lockGrantorIdLock) { |
| if (equalsLockGrantorId(oldLockGrantorId) |
| || (oldLockGrantorId.isLocal(getSerialNumber()) && isMakingLockGrantor())) { |
| // this.lockGrantorId != null && this.lockGrantorId.isLocal())) { |
| if (oldLockGrantorId.isLocal(getSerialNumber()) |
| && isLockGrantorVersion(this.grantor, oldLockGrantorId.getLockGrantorVersion())) { |
| // need to destroy and remove grantor |
| grantorToDestroy = this.grantor; |
| this.grantor = null; |
| } |
| this.lockGrantorId = null; |
| return true; |
| } else { |
| return false; |
| } |
| } |
| } finally { |
| if (grantorToDestroy != null) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[nullLockGrantorId] destroying {}", |
| grantorToDestroy); |
| } |
| grantorToDestroy.destroy(); |
| } |
| } |
| } |
| |
| /** |
| * Returns true if the grantor version of <code>dlockGrantor</code> equals the |
| * <code>grantorVersion</code>. |
| * |
| * @param dlockGrantor the grantor instance to compare to grantorVersion |
| * @param grantorVersion the grantor version number |
| * @return true if dlockGrantor is the same grantor version |
| */ |
| private boolean isLockGrantorVersion(DLockGrantor dlockGrantor, long grantorVersion) { |
| if (dlockGrantor == null) { |
| return false; |
| } |
| return dlockGrantor.getVersionId() == grantorVersion; |
| } |
| |
| /** |
| * Returns true if <code>someLockGrantor</code> equals the current {@link #lockGrantorId}. |
| * |
| * @return true if someLockGrantor equals the current lockGrantorId |
| */ |
| private boolean equalsLockGrantorId(LockGrantorId someLockGrantor) { |
| Assert.assertHoldsLock(this.lockGrantorIdLock, true); |
| if (someLockGrantor == null) { |
| return this.lockGrantorId == null; |
| } |
| return someLockGrantor.equals(this.lockGrantorId); |
| } |
| |
| /** |
| * Returns id of the current lock grantor for this service. If necessary, a request will be sent |
| * to the elder to fetch this information. Unlike getLockGrantorId this call will not become the |
| * lock grantor. |
| */ |
| public LockGrantorId peekLockGrantorId() { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| LockGrantorId currentLockGrantorId = this.lockGrantorId; |
| if (currentLockGrantorId != null) { |
| return currentLockGrantorId; |
| } |
| } |
| |
| long statStart = getStats().startGrantorWait(); |
| LockGrantorId theLockGrantorId = null; |
| try { |
| // 1st thread wins the right to request grantor info from elder |
| GrantorInfo gi = peekGrantor(); |
| InternalDistributedMember lockGrantorMember = gi.getId(); |
| if (lockGrantorMember == null) { |
| return null; |
| } |
| theLockGrantorId = |
| new LockGrantorId(this.dm, lockGrantorMember, gi.getVersionId(), gi.getSerialNumber()); |
| return theLockGrantorId; |
| } finally { |
| boolean getLockGrantorIdFailed = theLockGrantorId == null; |
| getStats().endGrantorWait(statStart, getLockGrantorIdFailed); |
| } |
| } |
| |
| /** |
| * Increments {@link #activeLocks} while synchronized on {@link #destroyLock} after calling |
| * {@link #checkDestroyed()}. |
| */ |
| private void incActiveLocks() { |
| synchronized (this.destroyLock) { |
| checkDestroyed(); |
| this.activeLocks++; |
| } |
| } |
| |
| /** |
| * Decrements {@link #activeLocks} while synchronized on {@link #destroyLock}. |
| */ |
| private void decActiveLocks() { |
| synchronized (this.destroyLock) { |
| this.activeLocks--; |
| } |
| } |
| |
| /** |
| * Returns lockGrantorId when lockGrantorFutureResultRef has been set by another thread. |
| * |
| * @param lockGrantorFutureResultRef FutureResult to wait for |
| * @param timeToWait how many ms to wait, 0 = forever |
| * @param timeUnit the unit of measure for timeToWait |
| * @return the LockGrantorId or null if FutureResult was cancelled |
| */ |
| private LockGrantorId waitForLockGrantorFutureResult(FutureResult lockGrantorFutureResultRef, |
| long timeToWait, final TimeUnit timeUnit) { |
| LockGrantorId lockGrantorIdRef = null; |
| while (lockGrantorIdRef == null) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| checkDestroyed(); |
| if (timeToWait == 0) { |
| lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get(); |
| } else { |
| lockGrantorIdRef = (LockGrantorId) lockGrantorFutureResultRef.get(timeToWait, timeUnit); |
| } |
| } catch (TimeoutException e) { |
| break; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| this.dm.getCancelCriterion().checkCancelInProgress(e); |
| if (lockGrantorFutureResultRef.isCancelled()) { |
| // cancelled Future might throw InterruptedException...? |
| checkDestroyed(); |
| break; // return null |
| } |
| continue; |
| } catch (CancellationException e) { // Future was cancelled |
| checkDestroyed(); |
| break; // return null |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| return lockGrantorIdRef; |
| } |
| |
| /** |
| * nulls out grantor to force call to elder |
| * |
| * @param timeToWait how long to wait for a new grantor. -1 don't wait, 0 no time limit |
| * @param timeUnit the unit of measure of timeToWait |
| */ |
| private void notLockGrantorId(LockGrantorId notLockGrantorId, long timeToWait, |
| final TimeUnit timeUnit) { |
| if (notLockGrantorId.isLocal(getSerialNumber())) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "notLockGrantorId {} returning early because notGrantor {} was equal to the local dm {}", |
| this.serviceName, notLockGrantorId, this.dm.getId()); |
| } |
| // Let the local destroy or processing of transfer do the clear |
| return; |
| } |
| |
| boolean ownLockGrantorFutureResult = false; |
| FutureResult lockGrantorFutureResultRef = null; |
| |
| long statStart = -1; |
| LockGrantorId currentLockGrantorId = null; |
| |
| try { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| currentLockGrantorId = this.lockGrantorId; |
| if (this.lockGrantorFutureResult != null) { |
| // some other thread is talking to elder |
| lockGrantorFutureResultRef = this.lockGrantorFutureResult; |
| } else if (!notLockGrantorId.sameAs(currentLockGrantorId)) { |
| return; |
| } else { |
| // this thread needs to talk to elder |
| ownLockGrantorFutureResult = true; |
| lockGrantorFutureResultRef = new FutureResult(this.dm.getCancelCriterion()); |
| this.lockGrantorFutureResult = lockGrantorFutureResultRef; |
| } |
| } |
| |
| statStart = getStats().startGrantorWait(); |
| if (!ownLockGrantorFutureResult) { |
| if (timeToWait >= 0) { |
| waitForLockGrantorFutureResult(lockGrantorFutureResultRef, timeToWait, timeUnit); |
| } |
| return; |
| } |
| |
| InternalDistributedMember elder = getElderId(); |
| Assert.assertTrue(elder != null); |
| |
| LockGrantorId elderLockGrantorId = null; |
| GrantorInfo gi = peekGrantor(); |
| if (gi.getId() != null) { |
| elderLockGrantorId = |
| new LockGrantorId(this.dm, gi.getId(), gi.getVersionId(), gi.getSerialNumber()); |
| } |
| |
| if (notLockGrantorId.sameAs(elderLockGrantorId)) { |
| // elder says that notLockGrantorId is still the grantor... |
| sleep(NOT_GRANTOR_SLEEP); |
| return; |
| } else { |
| // elder says another member is the grantor |
| nullLockGrantorId(notLockGrantorId); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "notLockGrantorId cleared lockGrantorId for service {}", this.serviceName); |
| } |
| } |
| } finally { |
| synchronized (this.lockGrantorIdLock) { |
| if (statStart > -1) { |
| getStats().endGrantorWait(statStart, false); |
| } |
| if (ownLockGrantorFutureResult) { |
| // this thread is doing the real work and must finish the future |
| Assert.assertTrue(this.lockGrantorFutureResult == lockGrantorFutureResultRef); |
| // cancel lockGrantorFutureResult |
| lockGrantorFutureResultRef.cancel(false); |
| // null out the reference so it is free for next usage |
| this.lockGrantorFutureResult = null; |
| } |
| } |
| } // finally block for lockGrantorFutureResult |
| } |
| |
| /** |
| * All calls to GrantorRequestProcessor.clearGrantor must come through this synchronization point. |
| * <p> |
| * This fixes a deadlock between this.becomeGrantorMonitor and DistributionManager.elderLock |
| * <p> |
| * All calls to the elder may result in elder recovery which may call back into dlock and acquire |
| * synchronization on this.becomeGrantorMonitor. |
| */ |
| void clearGrantor(long grantorVersion, boolean withLocks) { |
| GrantorRequestProcessor.clearGrantor(grantorVersion, this, getSerialNumber(), this.ds, |
| withLocks); |
| } |
| |
| /** |
| * All calls to GrantorRequestProcessor.getGrantor must come through this synchronization point. |
| * <p> |
| * This fixes a deadlock between this.becomeGrantorMonitor and DistributionManager.elderLock |
| * <p> |
| * All calls to the elder may result in elder recovery which may call back into dlock and acquire |
| * synchronization on this.becomeGrantorMonitor. |
| */ |
| private GrantorInfo getGrantorRequest() { |
| return GrantorRequestProcessor.getGrantor(this, getSerialNumber(), this.ds); |
| } |
| |
| /** |
| * All calls to GrantorRequestProcessor.peekGrantor must come through this synchronization point. |
| * <p> |
| * This fixes a deadlock between this.becomeGrantorMonitor and DistributionManager.elderLock |
| * <p> |
| * All calls to the elder may result in elder recovery which may call back into dlock and acquire |
| * synchronization on this.becomeGrantorMonitor. |
| */ |
| private GrantorInfo peekGrantor() { |
| return GrantorRequestProcessor.peekGrantor(this, this.ds); |
| } |
| |
| /** |
| * All calls to GrantorRequestProcessor.becomeGrantor must come through this synchronization |
| * point. |
| * <p> |
| * This fixes a deadlock between this.becomeGrantorMonitor and DistributionManager.elderLock |
| * <p> |
| * All calls to the elder may result in elder recovery which may call back into dlock and acquire |
| * synchronization on this.becomeGrantorMonitor. |
| */ |
| private GrantorInfo becomeGrantor(InternalDistributedMember predecessor) { |
| return GrantorRequestProcessor.becomeGrantor(this, getSerialNumber(), predecessor, this.ds); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // New external API methods |
| // ------------------------------------------------------------------------- |
| |
| @Override |
| public void becomeLockGrantor() { |
| becomeLockGrantor((InternalDistributedMember) null); |
| } |
| |
| public DLockGrantor getGrantor() { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| return this.grantor; |
| } |
| } |
| |
| public DLockGrantor getGrantorWithNoSync() { |
| return this.grantor; |
| } |
| |
| /** |
| * @param predecessor non-null if a predecessor asked us to take over for it |
| */ |
| private void becomeLockGrantor(InternalDistributedMember predecessor) { |
| Assert.assertTrue(predecessor == null); |
| boolean ownLockGrantorFutureResult = false; |
| FutureResult lockGrantorFutureResultRef = null; |
| |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| LockGrantorId myLockGrantorId = null; |
| try { // finally handles lockGrantorFutureResult |
| |
| // loop while other threads control the lockGrantorFutureResult |
| // terminate loop if other thread has already made us lock grantor |
| // terminate loop if this thread gets control of lockGrantorFutureResult |
| while (!ownLockGrantorFutureResult) { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| if (isCurrentlyOrIsMakingLockGrantor()) { |
| return; |
| } else if (this.lockGrantorFutureResult != null) { |
| // need to wait for other thread controlling lockGrantorFutureResult |
| lockGrantorFutureResultRef = this.lockGrantorFutureResult; |
| } else { |
| // this thread is in control and will procede to become grantor |
| // create new lockGrantorFutureResult for other threads to block on |
| ownLockGrantorFutureResult = true; |
| lockGrantorFutureResultRef = new FutureResult(this.dm.getCancelCriterion()); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[becomeLockGrantor] creating lockGrantorFutureResult"); |
| } |
| this.lockGrantorFutureResult = lockGrantorFutureResultRef; |
| } |
| } |
| if (!ownLockGrantorFutureResult) { |
| waitForLockGrantorFutureResult(lockGrantorFutureResultRef, 0, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| // this thread is now in charge of the lockGrantorFutureResult future |
| getStats().incBecomeGrantorRequests(); |
| |
| // create the new grantor instance in non-ready state... |
| long tempGrantorVersion = -1; |
| LockGrantorId tempLockGrantorId = |
| new LockGrantorId(this.dm, this.dm.getId(), tempGrantorVersion, getSerialNumber()); |
| |
| DLockGrantor myGrantor = DLockGrantor.createGrantor(this, tempGrantorVersion); |
| |
| try { // finally handles myGrantor |
| |
| synchronized (this.lockGrantorIdLock) { |
| Assert.assertTrue(setLockGrantorId(tempLockGrantorId, myGrantor)); |
| } |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "become set lockGrantorId to {} for service {}", |
| this.lockGrantorId, this.serviceName); |
| } |
| |
| InternalDistributedMember elder = getElderId(); |
| Assert.assertTrue(elder != null); |
| |
| // NOTE: elder currently returns GrantorInfo for the previous grantor |
| // CONSIDER: add elderCommunicatedWith to GrantorInfo |
| GrantorInfo gi = becomeGrantor(predecessor); |
| boolean needsRecovery = gi.needsRecovery(); |
| long myGrantorVersion = gi.getVersionId() + 1; |
| myGrantor.setVersionId(myGrantorVersion); |
| |
| myLockGrantorId = |
| new LockGrantorId(this.dm, this.dm.getId(), myGrantorVersion, getSerialNumber()); |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[becomeLockGrantor] Calling makeLocalGrantor"); |
| } |
| if (!makeLocalGrantor(elder, needsRecovery, myLockGrantorId, myGrantor)) { |
| return; |
| } |
| |
| } finally { |
| Assert.assertTrue( |
| myGrantor == null || !myGrantor.isInitializing() |
| || this.dm.getCancelCriterion().isCancelInProgress() || isDestroyed(), |
| "BecomeLockGrantor failed and left grantor non-ready"); |
| } |
| } finally { |
| synchronized (this.lockGrantorIdLock) { |
| if (ownLockGrantorFutureResult) { |
| // this thread is doing the real work and must finish the future |
| Assert.assertTrue(this.lockGrantorFutureResult == lockGrantorFutureResultRef); |
| boolean getLockGrantorIdFailed = myLockGrantorId == null; |
| if (getLockGrantorIdFailed) { |
| // failed so cancel lockGrantorFutureResult |
| lockGrantorFutureResultRef.cancel(true); // interrupt waiting threads |
| } else { |
| this.dm.getCancelCriterion().checkCancelInProgress(null); // don't succeed if shutting |
| // down |
| // succeeded so set lockGrantorFutureResult |
| lockGrantorFutureResultRef.set(myLockGrantorId); |
| } |
| // null out the reference so it is free for next usage |
| this.lockGrantorFutureResult = null; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean isLockGrantor() { |
| if (isDestroyed()) { |
| return false; |
| } else { |
| return isCurrentlyLockGrantor(); |
| } |
| } |
| |
| boolean isMakingLockGrantor() { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| return this.lockGrantorId != null && this.lockGrantorId.isLocal(getSerialNumber()) |
| && this.grantor != null && this.grantor.isInitializing(); |
| } |
| } |
| |
| boolean isCurrentlyOrIsMakingLockGrantor() { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| return this.lockGrantorId != null && this.lockGrantorId.isLocal(getSerialNumber()); |
| } |
| } |
| |
| boolean isCurrentlyLockGrantor() { |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| return this.lockGrantorId != null && this.lockGrantorId.isLocal(getSerialNumber()) |
| && this.grantor != null && this.grantor.isReady(); |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // External API methods |
| // ------------------------------------------------------------------------- |
| |
| @Override |
| public void freeResources(Object name) { |
| checkDestroyed(); |
| if (name == null) { |
| removeAllUnusedTokens(); |
| } else { |
| removeTokenIfUnused(name); |
| } |
| } |
| |
| /** |
| * Attempt to destroy and remove lock token. Synchronizes on tokens map and the lock token. |
| * |
| * @param name the name of the lock token |
| * @return true if token has been destroyed and removed |
| */ |
| private boolean removeTokenIfUnused(Object name) { |
| synchronized (this.tokens) { |
| if (this.destroyed) { |
| getStats().incFreeResourcesFailed(); |
| return false; |
| } |
| DLockToken token = this.tokens.get(name); |
| if (token != null) { |
| synchronized (token) { |
| if (!token.isBeingUsed()) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Freeing {} in {}", token, this); |
| } |
| removeTokenFromMap(name); |
| token.destroy(); |
| getStats().incTokens(-1); |
| getStats().incFreeResourcesCompleted(); |
| return true; |
| } |
| } |
| } |
| } |
| getStats().incFreeResourcesFailed(); |
| return false; |
| } |
| |
| protected Object removeTokenFromMap(Object name) { |
| return this.tokens.remove(name); |
| } |
| |
| /** |
| * Attempt to destroy and remove all unused lock tokens. Synchronizes on tokens map and each lock |
| * token. |
| */ |
| private void removeAllUnusedTokens() { |
| synchronized (this.tokens) { |
| if (this.destroyed) { |
| getStats().incFreeResourcesFailed(); |
| return; |
| } |
| Set<DLockToken> unusedTokens = Collections.emptySet(); |
| for (DLockToken token : this.tokens.values()) { |
| synchronized (token) { |
| if (!token.isBeingUsed()) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Freeing {} in {}", token, this); |
| } |
| if (unusedTokens == Collections.EMPTY_SET) { |
| unusedTokens = new HashSet<>(); |
| } |
| unusedTokens.add(token); |
| } else { |
| getStats().incFreeResourcesFailed(); |
| } |
| } |
| } |
| for (DLockToken token : unusedTokens) { |
| synchronized (token) { |
| int tokensSizeBefore = this.tokens.size(); |
| Object obj = removeTokenFromMap(token.getName()); |
| Assert.assertTrue(obj != null); |
| int tokensSizeAfter = this.tokens.size(); |
| Assert.assertTrue(tokensSizeBefore - tokensSizeAfter == 1); |
| token.destroy(); |
| getStats().incTokens(-1); |
| getStats().incFreeResourcesCompleted(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Destroys and removes all lock tokens. Caller must synchronize on destroyLock. Synchronizes on |
| * tokens map and each token. |
| */ |
| private void removeAllTokens() { |
| synchronized (this.tokens) { |
| Assert.assertTrue(this.destroyed); |
| for (DLockToken token : this.tokens.values()) { |
| synchronized (token) { |
| token.destroy(); |
| } |
| } |
| getStats().incTokens(-this.tokens.size()); |
| this.tokens.clear(); |
| } |
| } |
| |
| @Override |
| public boolean isHeldByCurrentThread(Object name) { |
| checkDestroyed(); |
| synchronized (this.tokens) { |
| DLockToken token = basicGetToken(name); |
| if (token == null) |
| return false; |
| synchronized (token) { |
| token.checkForExpiration(); |
| return token.isLeaseHeldByCurrentThread(); |
| } |
| } |
| } |
| |
| public boolean isHeldByThreadId(Object name, int threadId) { |
| checkDestroyed(); |
| synchronized (this.tokens) { |
| DLockToken token = basicGetToken(name); |
| if (token == null) |
| return false; |
| synchronized (token) { |
| token.checkForExpiration(); |
| if (token.getLesseeThread() == null) { |
| return false; |
| } |
| return token.getLesseeThread().getThreadId() == threadId; |
| } |
| } |
| } |
| |
| @Override |
| public boolean isLockingSuspendedByCurrentThread() { |
| checkDestroyed(); |
| return isHeldByCurrentThread(SUSPEND_LOCKING_TOKEN); |
| } |
| |
| @Override |
| public boolean lock(Object name, long waitTimeMillis, long leaseTimeMillis) { |
| boolean tryLock = false; |
| return lock(name, waitTimeMillis, leaseTimeMillis, tryLock); |
| } |
| |
| public boolean lock(Object name, long waitTimeMillis, long leaseTimeMillis, boolean tryLock) { |
| return lock(name, waitTimeMillis, leaseTimeMillis, tryLock, false); |
| } |
| |
| public boolean lock(Object name, long waitTimeMillis, long leaseTimeMillis, boolean tryLock, |
| boolean disallowReentrant) { |
| return lock(name, waitTimeMillis, leaseTimeMillis, tryLock, false, false); |
| } |
| |
| |
| public boolean lock(Object name, long waitTimeMillis, long leaseTimeMillis, boolean tryLock, |
| boolean disallowReentrant, boolean diableAlerts) { |
| checkDestroyed(); |
| try { |
| boolean interruptible = false; |
| return lockInterruptibly(name, waitTimeMillis, leaseTimeMillis, tryLock, interruptible, |
| disallowReentrant, diableAlerts); |
| } catch (InterruptedException ex) { // LOST INTERRUPT |
| Thread.currentThread().interrupt(); |
| // fail assertion |
| logger.error(LogMarker.DLS_MARKER, "lock() was interrupted", ex); |
| Assert.assertTrue(false, "lock() was interrupted: " + ex.getMessage()); |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean lockInterruptibly(Object name, long waitTimeMillis, long leaseTimeMillis) |
| throws InterruptedException { |
| checkDestroyed(); |
| boolean tryLock = false; |
| boolean interruptible = true; |
| return lockInterruptibly(name, waitTimeMillis, leaseTimeMillis, tryLock, interruptible, false); |
| } |
| |
| /** Causes the current thread to sleep for millis and may or may not be interruptible */ |
| private void sleep(long millis, boolean interruptible) throws InterruptedException { |
| if (interruptible) { |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| Thread.sleep(millis); |
| return; |
| } else { |
| sleep(millis); |
| } |
| } |
| |
| /** Causes the current thread to sleep for millis uninterruptibly */ |
| private void sleep(long millis) { |
| // Non-interruptible case |
| StopWatch timer = new StopWatch(true); |
| while (true) { |
| boolean interrupted = Thread.interrupted(); |
| try { |
| long timeLeft = millis - timer.elapsedTimeMillis(); |
| if (timeLeft <= 0) { |
| break; |
| } |
| Thread.sleep(timeLeft); |
| break; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| protected DLockRequestProcessor createRequestProcessor(LockGrantorId grantorId, Object name, |
| int threadId, long startTime, long requestLeaseTime, long requestWaitTime, boolean reentrant, |
| boolean tryLock) { |
| return createRequestProcessor(grantorId, name, threadId, startTime, requestLeaseTime, |
| requestWaitTime, reentrant, tryLock, false); |
| } |
| |
| protected DLockRequestProcessor createRequestProcessor(LockGrantorId grantorId, Object name, |
| int threadId, long startTime, long requestLeaseTime, long requestWaitTime, boolean reentrant, |
| boolean tryLock, boolean disableAlerts) { |
| return new DLockRequestProcessor(grantorId, this, name, threadId, startTime, requestLeaseTime, |
| requestWaitTime, reentrant, tryLock, disableAlerts, this.dm); |
| } |
| |
| protected boolean callReleaseProcessor(InternalDistributedMember grantor, Object name, |
| boolean lockBatch, int lockId) { |
| return DLockService.callReleaseProcessor(this.dm, this.serviceName, grantor, name, lockBatch, |
| lockId); |
| } |
| |
| protected static boolean callReleaseProcessor(DistributionManager dm, String serviceName, |
| InternalDistributedMember grantor, Object name, boolean lockBatch, int lockId) { |
| DLockReleaseProcessor processor = new DLockReleaseProcessor(dm, grantor, serviceName, name); |
| return processor.release(grantor, serviceName, lockBatch, lockId); |
| } |
| |
| public boolean lockInterruptibly(final Object name, final long waitTimeMillis, |
| final long leaseTimeMillis, final boolean tryLock, final boolean interruptible, |
| final boolean disallowReentrant) throws InterruptedException { |
| return lockInterruptibly(name, waitTimeMillis, leaseTimeMillis, tryLock, interruptible, |
| disallowReentrant, false); |
| } |
| |
| |
| /** |
| * @param name the name of the lock to acquire in this service. This object must conform to the |
| * general contract of <code>equals(Object)</code> and <code>hashCode()</code> as described |
| * in {@link java.lang.Object#hashCode()}. |
| * |
| * @param waitTimeMillis the number of milliseconds to try to acquire the lock before giving up |
| * and returning false. A value of -1 causes this method to block until the lock is |
| * acquired. |
| * |
| * @param leaseTimeMillis the number of milliseconds to hold the lock after granting it, before |
| * automatically releasing it if it hasn't already been released by invoking |
| * {@link #unlock(Object)}. If <code>leaseTimeMillis</code> is -1, hold the lock until |
| * explicitly unlocked. |
| * |
| * @param tryLock true if the lock should be acquired or fail if currently held. waitTimeMillis |
| * will be ignored if the lock is currently held by another client. |
| * |
| * @param interruptible true if this lock request is interruptible |
| * |
| * @param disableAlerts true to disable logging alerts if the dlock is taking a long time to |
| * acquired. |
| * |
| * @return true if the lock was acquired, false if the timeout <code>waitTimeMillis</code> passed |
| * without acquiring the lock. |
| * |
| * @throws InterruptedException if the thread is interrupted before or during this method. |
| * |
| * @throws UnsupportedOperationException if attempt to lock batch involves non-tryLocks |
| */ |
| public boolean lockInterruptibly(final Object name, final long waitTimeMillis, |
| final long leaseTimeMillis, final boolean tryLock, final boolean interruptible, |
| final boolean disallowReentrant, final boolean disableAlerts) throws InterruptedException { |
| checkDestroyed(); |
| |
| boolean interrupted = Thread.interrupted(); |
| if (interrupted && interruptible) { |
| throw new InterruptedException(); |
| } |
| |
| try { |
| long statStart = getStats().startLockWait(); |
| long startTime = getLockTimeStamp(dm); |
| |
| long requestWaitTime = waitTimeMillis; |
| long requestLeaseTime = leaseTimeMillis; |
| |
| // -1 means "lease forever". Long.MAX_VALUE is pretty close. |
| if (requestLeaseTime == -1) |
| requestLeaseTime = Long.MAX_VALUE; |
| // -1 means "wait forever". Long.MAX_VALUE is pretty close. |
| if (requestWaitTime == -1) |
| requestWaitTime = Long.MAX_VALUE; |
| |
| long waitLimit = startTime + requestWaitTime; |
| if (waitLimit < 0) |
| waitLimit = Long.MAX_VALUE; |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - entering lock()", this, name); |
| } |
| |
| DLockToken token = getOrCreateToken(name); |
| boolean gotLock = false; |
| blockedOn.set(name); |
| try { // try-block for end stats, token cleanup, and interrupt check |
| |
| ThreadRequestState requestState = (ThreadRequestState) this.threadRequestState.get(); |
| if (requestState == null) { |
| requestState = new ThreadRequestState(incThreadSequence(), interruptible); |
| this.threadRequestState.set(requestState); |
| } else { |
| requestState.interruptible = interruptible; |
| } |
| final int threadId = requestState.threadId; |
| |
| // if reentry and no change to expiration then grantor is not bothered |
| |
| long leaseExpireTime = 0; |
| boolean keepTrying = true; |
| int lockId = -1; |
| incActiveLocks(); |
| |
| while (keepTrying) { |
| checkDestroyed(); |
| interrupted = Thread.interrupted() || interrupted; // clear |
| if (interrupted && interruptible) { |
| throw new InterruptedException(); |
| } |
| |
| // Check for recursive lock |
| boolean reentrant = false; |
| int recursionBefore = -1; |
| |
| synchronized (token) { |
| token.checkForExpiration(); |
| if (token.isLeaseHeldByCurrentThread()) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{} , name: {} - lock() is reentrant: {}", this, |
| name, token); |
| } |
| reentrant = true; |
| if (reentrant && disallowReentrant) { |
| throw new IllegalStateException( |
| String.format("%s attempted to reenter non-reentrant lock %s", |
| new Object[] {Thread.currentThread(), token})); |
| } |
| recursionBefore = token.getRecursion(); |
| lockId = token.getLeaseId(); // keep lockId |
| if (lockId < 0) { |
| // loop back around due to expiration |
| continue; |
| } |
| } // isLeaseHeldByCurrentThread |
| } // token sync |
| |
| LockGrantorId theLockGrantorId = getLockGrantorId(); |
| |
| if (reentrant) { |
| Assert.assertTrue(lockId > -1, "Reentrant lock must have lockId > -1"); |
| // lockId = token.getLockId(); // keep lockId |
| } else { |
| // this thread is not current owner... |
| lockId = -1; // reset lockId back to -1 |
| } |
| |
| DLockRequestProcessor processor = createRequestProcessor(theLockGrantorId, name, threadId, |
| startTime, requestLeaseTime, requestWaitTime, reentrant, tryLock, disableAlerts); |
| if (reentrant) { |
| // check for race condition... reentrant expired already... |
| // related to bug 32765, but client-side... see bug 33402 |
| synchronized (token) { |
| if (!token.isLeaseHeldByCurrentThread()) { |
| reentrant = false; |
| recursionBefore = -1; |
| token.checkForExpiration(); |
| } |
| } |
| } else { |
| // set lockId since this is the first granting (non-reentrant) |
| lockId = processor.getProcessorId(); |
| } |
| |
| gotLock = processor.requestLock(interruptible, lockId); // can throw |
| // InterruptedException |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Grantor {} replied {}", theLockGrantorId, |
| processor.getResponseCodeString()); |
| } |
| |
| if (gotLock) { |
| leaseExpireTime = processor.getLeaseExpireTime(); |
| int recursion = recursionBefore + 1; |
| |
| if (!grantLocalDLockAfterObtainingRemoteLock(name, token, threadId, leaseExpireTime, |
| lockId, theLockGrantorId, processor, recursion)) { |
| continue; |
| } |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - granted lock: {}", this, name, |
| token); |
| } |
| keepTrying = false; |
| } else if (processor.repliedDestroyed()) { |
| checkDestroyed(); // throws LockServiceDestroyedException |
| Assert.assertTrue(isDestroyed(), |
| "Grantor reports service " + this + " is destroyed: " + name); |
| } else if (processor.repliedNotGrantor() || processor.hadNoResponse()) { |
| long waitForGrantorTime = waitLimit - token.getCurrentTime(); |
| if (waitForGrantorTime <= 0) { |
| waitForGrantorTime = 100; |
| } |
| notLockGrantorId(theLockGrantorId, waitForGrantorTime, TimeUnit.MILLISECONDS); |
| // keepTrying is still true... loop back around |
| } else if (processor.repliedNotHolder()) { |
| // fix part of bug 32765 - reentrant/expiration problem |
| // probably expired... try to get non-reentrant lock |
| reentrant = false; |
| recursionBefore = -1; |
| synchronized (token) { |
| token.checkForExpiration(); |
| if (token.isLeaseHeldByCurrentThread()) { |
| // THIS SHOULDN'T HAPPEN -- some sort of weird consistency |
| // problem. Do what the grantor says and release the lock... |
| logger.warn(LogMarker.DLS_MARKER, "Grantor reports reentrant lock not held: {}", |
| token); |
| |
| // Attempt at fault tolerance: We thought we owned it, but we |
| // don't; let's release it. Removes hot loop in bug 37276, |
| // but does not address underlying consistency failure. |
| RemoteThread rThread = new RemoteThread(getDistributionManager().getId(), threadId); |
| token.releaseLock(lockId, rThread, false); |
| } |
| } // token sync |
| } // grantor replied NOT_HOLDER for reentrant lock |
| |
| else { |
| // either dlock service is suspended or tryLock failed |
| // fixed the math here... bug 32765 |
| if (waitLimit > token.getCurrentTime() + 20) { |
| sleep(20, interruptible); |
| } |
| keepTrying = waitLimit > token.getCurrentTime(); |
| } |
| |
| } // while (keepTrying) |
| // try-block for end stats, token cleanup, and interrupt check |
| } finally { |
| getStats().endLockWait(statStart, gotLock); |
| |
| // cleanup token if failed to get lock |
| if (!gotLock) { |
| synchronized (token) { |
| token.decUsage(); |
| } |
| freeResources(token.getName()); |
| } |
| |
| // reset the interrupt state |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| |
| // throw InterruptedException only if failed to get lock and interrupted |
| if (!gotLock && interruptible && Thread.interrupted()) { |
| throw new InterruptedException(); |
| } |
| blockedOn.set(null); |
| } |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - exiting lock() returning {}", this, |
| name, gotLock); |
| } |
| return gotLock; |
| } finally { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - exiting lock() without returning value", |
| this, name); |
| } |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| private boolean grantLocalDLockAfterObtainingRemoteLock(Object name, DLockToken token, |
| int threadId, long leaseExpireTime, int lockId, LockGrantorId theLockGrantorId, |
| DLockRequestProcessor processor, int recursion) { |
| boolean needToReleaseOrphanedGrant = false; |
| |
| Assert.assertHoldsLock(this.destroyLock, false); |
| synchronized (this.lockGrantorIdLock) { |
| if (!checkLockGrantorId(theLockGrantorId)) { |
| // race: grantor changed |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "Cannot honor grant from {} because {} is now a grantor.", theLockGrantorId, |
| this.lockGrantorId); |
| } |
| } else if (isDestroyed()) { |
| // race: dls was destroyed |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "Cannot honor grant from {} because this lock service has been destroyed.", |
| theLockGrantorId); |
| } |
| needToReleaseOrphanedGrant = true; |
| } else { |
| synchronized (this.tokens) { |
| checkDestroyed(); |
| Assert.assertTrue(token == basicGetToken(name)); |
| RemoteThread rThread = new RemoteThread(getDistributionManager().getId(), threadId); |
| token.grantLock(leaseExpireTime, lockId, recursion, rThread); |
| return true; |
| } // tokens sync |
| } |
| } |
| |
| if (needToReleaseOrphanedGrant) { |
| processor.getResponse().releaseOrphanedGrant(this.dm); |
| } |
| return false; |
| } |
| |
| /** |
| * Allow locking to resume. |
| */ |
| @Override |
| public void resumeLocking() { |
| checkDestroyed(); |
| try { |
| // need to resumeLocking before unlocking to avoid deadlock with |
| // other thread attempting to suspendLocking |
| unlock(SUSPEND_LOCKING_TOKEN); |
| } catch (IllegalStateException e) { |
| checkDestroyed(); |
| throw e; |
| } |
| } |
| |
| /** |
| * Suspends granting of locks for this instance of DLockService. If distribute is true, sends |
| * suspendLocking to all other members that have created this service. Blocks until all |
| * outstanding locks have been released (excluding those held by the initial calling thread). |
| * |
| * @param waitTimeMillis -1 means "wait forever", >=0 = milliseconds to wait |
| * |
| * @return true if locking is suspended and all locks have been released. Otherwise, resumeLocking |
| * is invoked and false is returned. |
| */ |
| @Override |
| public boolean suspendLocking(final long waitTimeMillis) { |
| |
| long startTime = System.currentTimeMillis(); |
| long requestWaitTime = waitTimeMillis; |
| boolean interrupted = false; |
| |
| try { |
| do { |
| checkDestroyed(); |
| try { |
| return suspendLockingInterruptibly(requestWaitTime, false); |
| } catch (InterruptedException ex) { |
| interrupted = true; |
| long millisPassed = System.currentTimeMillis() - startTime; |
| if (requestWaitTime >= 0) { |
| requestWaitTime = Math.max(0, requestWaitTime - millisPassed); |
| } |
| } |
| } while (requestWaitTime != 0); |
| |
| } finally { |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public boolean suspendLockingInterruptibly(long waitTimeMillis) throws InterruptedException { |
| return suspendLockingInterruptibly(waitTimeMillis, true); |
| } |
| |
| public boolean suspendLockingInterruptibly(long waitTimeMillis, boolean interruptible) |
| throws InterruptedException { |
| checkDestroyed(); |
| |
| boolean wasInterrupted = false; |
| if (Thread.interrupted()) { |
| if (interruptible) { |
| throw new InterruptedException(); |
| } else { |
| wasInterrupted = true; |
| } |
| } |
| |
| try { |
| |
| if (isLockingSuspendedByCurrentThread()) { |
| throw new IllegalStateException( |
| "Current thread has already locked entire service"); |
| } |
| |
| // have to use tryLock to avoid deadlock with other members that are |
| // simultaneously attempting to suspend locking |
| boolean tryLock = false; // go with false to queue up suspend lock requests |
| // when tryLock is false, we get deadlock: |
| // thread 1 is this thread |
| // thread 2 is processing a SuspendMessage... goes thru |
| // suspendLocking with distribute=false and gets stuck in |
| // waitForGrantorCallsInProgress |
| |
| SuspendLockingToken suspendToken = SUSPEND_LOCKING_TOKEN; |
| |
| boolean gotToken = false; |
| boolean keepTrying = true; |
| |
| long startTime = System.currentTimeMillis(); |
| long waitLimit = startTime + waitTimeMillis; |
| if (waitLimit < 0) |
| waitLimit = Long.MAX_VALUE; |
| |
| while (!gotToken && keepTrying) { |
| gotToken = |
| lockInterruptibly(suspendToken, waitTimeMillis, -1, tryLock, interruptible, false); |
| keepTrying = !gotToken && waitLimit > System.currentTimeMillis(); |
| } |
| return gotToken; |
| } finally { |
| if (wasInterrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| @Override |
| public void unlock(Object name) throws LockNotHeldException, LeaseExpiredException { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - entering unlock()", this, name); |
| } |
| |
| long statStart = getStats().startLockRelease(); |
| |
| boolean hadRecursion = false; |
| boolean unlocked = false; |
| int lockId = -1; |
| DLockToken token = null; |
| RemoteThread rThread = null; |
| |
| try { |
| |
| synchronized (this.tokens) { |
| checkDestroyed(); |
| token = basicGetToken(name); |
| if (token == null) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, [unlock] no token found for: {}", this, name); |
| } |
| throw new LockNotHeldException( |
| String.format( |
| "Attempting to unlock %s : %s , but this thread does not own the lock.", |
| new Object[] {this, name})); |
| } |
| |
| synchronized (token) { |
| token.checkForExpiration(); |
| rThread = token.getLesseeThread(); |
| if (!token.isLeaseHeldByCurrentOrRemoteThread(rThread)) { |
| token.throwIfCurrentThreadHadExpiredLease(); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, [unlock] {} not leased by this thread.", |
| this, token); |
| } |
| throw new LockNotHeldException( |
| String.format( |
| "Attempting to unlock %s : %s , but this thread does not own the lock. %s", |
| new Object[] {this, name, token})); |
| } |
| // if recursion > 0 then token will still be locked after calling release |
| hadRecursion = token.getRecursion() > 0; |
| lockId = token.getLeaseId(); |
| Assert.assertTrue(lockId > -1); |
| if (hadRecursion) { |
| unlocked = token.releaseLock(lockId, rThread); |
| } else { |
| token.setIgnoreForRecovery(true); |
| } |
| } // token sync |
| } // tokens map sync |
| |
| if (!hadRecursion) { |
| boolean lockBatch = false; |
| boolean released = false; |
| |
| while (!released) { |
| checkDestroyed(); |
| LockGrantorId theLockGrantorId = getLockGrantorId(); |
| try { |
| synchronized (this.lockGrantorIdLock) { |
| unlocked = token.releaseLock(lockId, rThread); |
| } |
| released = callReleaseProcessor(theLockGrantorId.getLockGrantorMember(), name, |
| lockBatch, lockId); |
| |
| } catch (LockGrantorDestroyedException e) { // part of fix for bug 35239 |
| // loop back around to get next lock grantor |
| } catch (LockServiceDestroyedException e) { // part of fix for bug 35239 |
| // done... NonGrantorDestroyedMessage will release locks for us |
| released = true; |
| } finally { |
| if (!released) { |
| notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); |
| } |
| } |
| } // while !released |
| } // !hadRecursion |
| |
| } // try |
| finally { |
| try { |
| if (!hadRecursion && lockId > -1 && token != null) { |
| decActiveLocks(); |
| if (!unlocked) { |
| // // token is still held if grantor was remote, so now we unlock... |
| // checkDestroyed(); // part of fix for bug 35239 |
| // // this release is ok even if we have become the lock grantor |
| // // because the grantor will have no state for this lock |
| token.releaseLock(lockId, rThread); |
| } |
| } |
| } finally { |
| getStats().endLockRelease(statStart); |
| if (this.automateFreeResources) { |
| freeResources(name); |
| } |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "{}, name: {} - exiting unlock()", this, name); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Query the grantor for current leasing information of a lock. Returns the current lease info. |
| * |
| * @param name the named lock to get lease information for |
| * @return snapshot of the remote lock information |
| * @throws LockServiceDestroyedException if local instance of lock service has been destroyed |
| */ |
| public DLockRemoteToken queryLock(final Object name) { |
| |
| DLockQueryReplyMessage queryReply = null; |
| while (queryReply == null || queryReply.repliedNotGrantor()) { |
| checkDestroyed(); |
| LockGrantorId theLockGrantorId = getLockGrantorId(); |
| try { |
| queryReply = DLockQueryProcessor.query(theLockGrantorId.getLockGrantorMember(), |
| this.serviceName, name, false /* lockBatch */, this.dm); |
| } catch (LockGrantorDestroyedException e) { |
| // loop back around to get next lock grantor |
| } finally { |
| if (queryReply != null && queryReply.repliedNotGrantor()) { |
| notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); |
| } |
| } |
| } // while querying |
| |
| return DLockRemoteToken.create(name, queryReply.getLesseeThread(), queryReply.getLeaseId(), |
| queryReply.getLeaseExpireTime()); |
| |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Creation methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Factory method for creating a new instance of <code>DLockService</code>. This ensures that |
| * adding the {@link #disconnectListener} is done while synchronized on the fully constructed |
| * instance. |
| * <p> |
| * Caller must be synchronized on {@link DLockService#services}. |
| * |
| * @see org.apache.geode.distributed.DistributedLockService#create(String, DistributedSystem) |
| */ |
| static DLockService basicCreate(String serviceName, InternalDistributedSystem ds, |
| boolean isDistributed, boolean destroyOnDisconnect, boolean automateFreeResources) |
| throws IllegalArgumentException { |
| Assert.assertHoldsLock(services, true); |
| |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "About to create DistributedLockService <{}>", |
| serviceName); |
| } |
| |
| DLockService svc = new DLockService(serviceName, ds, isDistributed, destroyOnDisconnect, |
| automateFreeResources); |
| svc.init(); |
| return svc; |
| } |
| |
| /** initialize this DLockService object */ |
| protected boolean init() { |
| boolean success = false; |
| try { |
| services.put(this.serviceName, this); |
| getStats().incServices(1); |
| this.ds.addDisconnectListener(disconnectListener); |
| success = true; |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Created DistributedLockService <{}>", |
| this.serviceName); |
| } |
| } finally { |
| if (!success) { |
| services.remove(this.serviceName); |
| getStats().incServices(-1); |
| } |
| } |
| |
| ds.handleResourceEvent(ResourceEvent.LOCKSERVICE_CREATE, this); |
| |
| return success; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Constructors |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * To create an instance, use DistributedLockService.create(Object, DistributedSystem) or |
| * DLockService.create(Object, DistributedSystem, DistributionAdvisor) |
| */ |
| protected DLockService(String serviceName, DistributedSystem ds, boolean isDistributed, |
| boolean destroyOnDisconnect, boolean automateFreeResources) { |
| super(); |
| this.dlockStats = getOrCreateStats(ds); |
| this.serialNumber = createSerialNumber(); |
| this.serviceName = serviceName; |
| this.ds = (InternalDistributedSystem) ds; |
| this.dm = this.ds.getDistributionManager(); |
| this.stopper = new DLockStopper(this.dm, this); |
| this.isDistributed = isDistributed; |
| this.destroyOnDisconnect = destroyOnDisconnect; |
| this.automateFreeResources = automateFreeResources || AUTOMATE_FREE_RESOURCES; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // java.lang.Object methods |
| // ------------------------------------------------------------------------- |
| |
| @Override |
| public String toString() { |
| StringBuffer buffer = new StringBuffer(128); |
| buffer.append('<').append("DLockService").append("@") |
| .append(Integer.toHexString(System.identityHashCode(this))).append(" named ") |
| .append(this.serviceName).append(" destroyed=").append(this.destroyed).append(" grantorId=") |
| .append(this.lockGrantorId).append(" grantor=").append(this.grantor).append('>'); |
| return buffer.toString(); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Public instance methods |
| // ------------------------------------------------------------------------- |
| |
| public DistributedLockStats getStats() { |
| return this.dlockStats; |
| } |
| |
| public void releaseTryLocks(DLockBatchId batchId, Callable<Boolean> untilCondition) { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockService.releaseTryLocks] enter: {}", batchId); |
| } |
| |
| long statStart = getStats().startLockRelease(); |
| |
| try { |
| boolean lockBatch = true; |
| boolean released = false; |
| while (!released) { |
| try { |
| boolean quit = untilCondition.call(); |
| if (quit) { |
| return; |
| } |
| } catch (Exception e) { |
| throw new InternalGemFireException("unexpected exception", e); |
| } |
| checkDestroyed(); |
| LockGrantorId theLockGrantorId = null; |
| |
| theLockGrantorId = batchId.getLockGrantorId(); |
| synchronized (this.lockGrantorIdLock) { |
| if (!checkLockGrantorId(theLockGrantorId)) { |
| // the grantor is different so break and skip DLockReleaseProcessor |
| break; |
| } |
| } |
| |
| released = |
| callReleaseProcessor(theLockGrantorId.getLockGrantorMember(), batchId, lockBatch, -1); |
| if (!released) { |
| notLockGrantorId(theLockGrantorId, 100, TimeUnit.MILLISECONDS); |
| } |
| } |
| } finally { |
| decActiveLocks(); |
| getStats().endLockRelease(statStart); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[DLockService.releaseTryLocks] exit: {}", batchId); |
| } |
| } |
| } |
| |
| public boolean acquireTryLocks(final DLockBatch dlockBatch, final long waitTimeMillis, |
| final long leaseTimeMillis, final Object[] keyIfFailed) throws InterruptedException { |
| checkDestroyed(); |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| if (keyIfFailed.length < 1) { |
| throw new IllegalArgumentException( |
| "keyIfFailed must have a length of one or greater"); |
| } |
| |
| long startTime = getLockTimeStamp(dm); |
| |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[acquireTryLocks] acquiring {}", dlockBatch); |
| } |
| |
| long requestWaitTime = waitTimeMillis; |
| long requestLeaseTime = leaseTimeMillis; |
| |
| // -1 means "lease forever". Long.MAX_VALUE is pretty close. |
| if (requestLeaseTime == -1) |
| requestLeaseTime = Long.MAX_VALUE; |
| |
| // -1 means "wait forever". Long.MAX_VALUE is pretty close. |
| if (requestWaitTime == -1) |
| requestWaitTime = Long.MAX_VALUE; |
| long waitLimit = startTime + requestWaitTime; |
| if (waitLimit < 0) |
| waitLimit = Long.MAX_VALUE; |
| |
| long statStart = getStats().startLockWait(); |
| boolean gotLocks = false; |
| |
| try { |
| ThreadRequestState requestState = (ThreadRequestState) this.threadRequestState.get(); |
| if (requestState == null) { |
| requestState = new ThreadRequestState(incThreadSequence(), false); |
| this.threadRequestState.set(requestState); |
| } else { |
| requestState.interruptible = false; |
| } |
| final int threadId = requestState.threadId; |
| |
| boolean keepTrying = true; |
| incActiveLocks(); |
| |
| while (keepTrying) { |
| checkDestroyed(); |
| LockGrantorId theLockGrantorId = getLockGrantorId(); |
| |
| boolean tryLock = true; |
| boolean reentrant = false; |
| DLockRequestProcessor processor = createRequestProcessor(theLockGrantorId, dlockBatch, |
| threadId, startTime, requestLeaseTime, requestWaitTime, reentrant, tryLock, false); |
| boolean interruptible = true; |
| int lockId = processor.getProcessorId(); |
| gotLocks = processor.requestLock(interruptible, lockId); |
| if (gotLocks) { |
| dlockBatch.grantedBy(theLockGrantorId); |
| } else if (processor.repliedDestroyed()) { |
| checkDestroyed(); |
| // should have thrown LockServiceDestroyedException |
| Assert.assertTrue(isDestroyed(), "Grantor reports service " + this + " is destroyed"); |
| } else if (processor.repliedNotGrantor() || processor.hadNoResponse()) { |
| notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); |
| } else { |
| keyIfFailed[0] = processor.getKeyIfFailed(); |
| if (keyIfFailed[0] == null) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[acquireTryLocks] lock request failed but provided no conflict key; responseCode=", |
| processor.getResponseCodeString()); |
| } |
| } else { |
| break; |
| } |
| } |
| |
| long timeLeft = requestWaitTime; |
| if (requestWaitTime < Long.MAX_VALUE) { |
| // prevent txLock from performing next line... |
| timeLeft = waitLimit - getLockTimeStamp(this.dm); |
| } |
| keepTrying = !gotLocks && timeLeft > 0; |
| if (keepTrying && timeLeft > 10) { |
| // didn't receive msg or processor timed out... sleep briefly |
| try { |
| Thread.sleep(10); |
| } catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[acquireTryLocks] {} locks for {}", |
| (gotLocks ? "acquired" : "failed to acquire"), dlockBatch); |
| } |
| } finally { |
| getStats().endLockWait(statStart, gotLocks); |
| } |
| return gotLocks; |
| } |
| |
| /** |
| * Returns copy of the tokens map. Synchronizes on token map. |
| * <p> |
| * Called by {@link org.apache.geode.internal.admin.remote.FetchDistLockInfoResponse}. |
| * |
| * @return copy of the tokens map |
| */ |
| public Map<Object, DLockToken> snapshotService() { |
| synchronized (this.tokens) { |
| return new HashMap(this.tokens); |
| } |
| } |
| |
| /** |
| * Used for instrumenting blocked threads |
| */ |
| public UnsafeThreadLocal<Object> getBlockedOn() { |
| return blockedOn; |
| } |
| |
| /** Returns true if the lock service is distributed; false if local only */ |
| public boolean isDistributed() { |
| return this.isDistributed; |
| } |
| |
| public void setDLockLessorDepartureHandler(DLockLessorDepartureHandler handler) { |
| this.lessorDepartureHandler = handler; |
| } |
| |
| public DLockLessorDepartureHandler getDLockLessorDepartureHandler() { |
| return this.lessorDepartureHandler; |
| } |
| |
| /** The name of this service */ |
| public String getName() { |
| return this.serviceName; |
| } |
| |
| public DistributionManager getDistributionManager() { |
| return this.dm; |
| } |
| |
| public void setDLockRecoverGrantorMessageProcessor( |
| DLockRecoverGrantorProcessor.MessageProcessor recoverGrantorProcessor) { |
| this.recoverGrantorProcessor = recoverGrantorProcessor; |
| } |
| |
| public DLockRecoverGrantorProcessor.MessageProcessor getDLockRecoverGrantorMessageProcessor() { |
| return this.recoverGrantorProcessor; |
| } |
| |
| /** |
| * Returns true if any tokens in this service are currently held. Synchronizes on tokens map and |
| * on each token to check the lease. |
| * |
| * @return true if any tokens in this service are currently held |
| */ |
| boolean hasHeldLocks() { |
| synchronized (this.tokens) { |
| for (DLockToken token : this.tokens.values()) { |
| if (token.isLeaseHeld()) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * @see org.apache.geode.distributed.DistributedLockService#destroy(String) |
| */ |
| public static void destroyServiceNamed(String serviceName) throws IllegalArgumentException { |
| DLockService svc = null; |
| synchronized (services) { |
| svc = services.get(serviceName); |
| } |
| if (svc == null) { |
| throw new IllegalArgumentException( |
| String.format("Service named %s not created", serviceName)); |
| } else { |
| svc.destroyAndRemove(); |
| } |
| } |
| |
| /** Destroys all lock services in this VM. Used in test tearDown code. */ |
| public static void destroyAll() { |
| Collection<DLockService> svcs; |
| synchronized (services) { |
| svcs = new HashSet<>(services.values()); |
| } |
| for (DLockService svc : svcs) { |
| try { |
| svc.destroyAndRemove(); |
| } catch (CancelException e) { |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "destroyAndRemove of {} terminated due to cancellation: ", svc, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Destroys an existing service and removes it from the map |
| * |
| * @since GemFire 3.5 |
| */ |
| public void destroyAndRemove() { |
| // isLockGrantor determines if we need to tell elder of destroy |
| boolean isCurrentlyLockGrantor = false; |
| boolean isMakingLockGrantor = false; |
| |
| // maybeHasActiveLocks determines if we need to tell grantor of destroy |
| boolean maybeHasActiveLocks = false; |
| |
| synchronized (creationLock) { |
| try { |
| synchronized (services) { |
| try { |
| if (isDestroyed()) |
| return; |
| setDestroyingThread(); |
| synchronized (this.lockGrantorIdLock) { // force ordering in lock request |
| synchronized (this.destroyLock) { |
| this.destroyed = true; |
| maybeHasActiveLocks = this.activeLocks > 0; |
| } |
| isCurrentlyLockGrantor = this.isCurrentlyLockGrantor(); |
| isMakingLockGrantor = this.isMakingLockGrantor(); |
| } |
| } finally { |
| if (isCurrentThreadDoingDestroy()) { |
| removeLockService(this); |
| } |
| } |
| } // services sync |
| } catch (CancelException e) { |
| // don't report to caller |
| } finally { |
| if (isCurrentThreadDoingDestroy()) { |
| try { |
| this.basicDestroy(isCurrentlyLockGrantor, isMakingLockGrantor, maybeHasActiveLocks); |
| } catch (CancelException e) { |
| // don't propagate |
| } finally { |
| clearDestroyingThread(); |
| } |
| } |
| postDestroyAction(); |
| } |
| } // creationLock sync |
| } |
| |
| /** |
| * Unlock all locks currently held by this process, and mark it as destroyed. Returns true if |
| * caller performed actual destroy. Returns false if this lock service has already been destroyed. |
| * <p> |
| * {@link #services} is held for the entire operation. {@link #destroyLock} is held only while |
| * setting {@link #destroyed} to true and just prior to performing any real work. |
| * <p> |
| * Caller must be synchronized on {@link DLockService#services}; |
| */ |
| private void basicDestroy(boolean isCurrentlyLockGrantor, boolean isMakingLockGrantor, |
| boolean maybeHasActiveLocks) { |
| Assert.assertHoldsLock(services, false); |
| // synchronized (this.serviceLock) { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "[DLockService.basicDestroy] Destroying {}, isCurrentlyLockGrantor={}, isMakingLockGrantor={}", |
| this, isCurrentlyLockGrantor, isMakingLockGrantor); |
| } |
| |
| // if hasActiveLocks, tell grantor we're destroying... |
| if (!isCurrentlyLockGrantor && maybeHasActiveLocks && !this.ds.isDisconnectThread()) { |
| boolean retry; |
| int nonGrantorDestroyLoopCount = 0; |
| do { |
| retry = false; |
| LockGrantorId theLockGrantorId = peekLockGrantorId(); |
| |
| if (theLockGrantorId != null && !theLockGrantorId.isLocal(getSerialNumber())) { |
| if (!NonGrantorDestroyedProcessor.send(this.serviceName, theLockGrantorId, dm)) { |
| // grantor responded NOT_GRANTOR |
| notLockGrantorId(theLockGrantorId, 0, TimeUnit.MILLISECONDS); // nulls out grantor to |
| // force call to elder |
| retry = true; |
| } |
| } |
| |
| if (DEBUG_NONGRANTOR_DESTROY_LOOP) { |
| nonGrantorDestroyLoopCount++; |
| if (nonGrantorDestroyLoopCount >= DEBUG_NONGRANTOR_DESTROY_LOOP_COUNT) { |
| logger.fatal(LogMarker.DLS_MARKER, |
| "Failed to notify grantor of destruction within {} attempts.", |
| Integer.valueOf(DEBUG_NONGRANTOR_DESTROY_LOOP_COUNT)); |
| Assert.assertTrue(false, |
| String.format("Failed to notify grantor of destruction within %s attempts.", |
| |
| new Object[] {Integer.valueOf(DEBUG_NONGRANTOR_DESTROY_LOOP_COUNT)})); |
| } |
| } |
| |
| } while (retry); |
| } |
| |
| if (isCurrentlyLockGrantor || isMakingLockGrantor) { |
| // If forcedDisconnect is in progress, the membership view will not |
| // change and no-one else can contact this member, so don't wait for a grantor |
| if (this.ds.getCancelCriterion().isCancelInProgress()) { |
| try { |
| DLockGrantor.waitForGrantor(this); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } catch (DistributedSystemDisconnectedException e) { |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, |
| "No longer waiting for grantor because of disconnect.", e); |
| } |
| } |
| } |
| nullLockGrantorId(this.lockGrantorId); |
| } |
| // } |
| } |
| |
| protected void postDestroyAction() { |
| ds.handleResourceEvent(ResourceEvent.LOCKSERVICE_REMOVE, this); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Package instance methods |
| // ------------------------------------------------------------------------- |
| |
| boolean destroyOnDisconnect() { |
| return this.destroyOnDisconnect; |
| } |
| |
| /** |
| * Called by grantor recovery to return set of locks held by this process. Synchronizes on |
| * lockGrantorIdLock, tokens map, and each lock token. |
| * |
| * @param newlockGrantorId the newly recovering grantor |
| */ |
| Set<DLockRemoteToken> getLockTokensForRecovery(LockGrantorId newlockGrantorId) { |
| Set<DLockRemoteToken> heldLockSet = Collections.EMPTY_SET; |
| |
| LockGrantorId currentLockGrantorId = null; |
| synchronized (this.lockGrantorIdLock) { |
| if (isDestroyed()) { |
| return heldLockSet; |
| } |
| |
| currentLockGrantorId = this.lockGrantorId; |
| } |
| |
| // destroy local grantor if currentLockGrantorId is local |
| // and grantorVersion is greater than currentGrantorVersion |
| if (currentLockGrantorId != null && currentLockGrantorId.hasLockGrantorVersion() |
| && newlockGrantorId.isNewerThan(currentLockGrantorId)) { |
| nullLockGrantorId(currentLockGrantorId); |
| } |
| |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| synchronized (this.lockGrantorIdLock) { |
| synchronized (this.tokens) { |
| // build up set of currently held locks |
| for (DLockToken token : this.tokens.values()) { |
| synchronized (token) { |
| if (token.isLeaseHeld()) { |
| |
| // skip over token if ignoreForRecovery is true |
| if (token.ignoreForRecovery()) { |
| // unlock of token must be in progress... ignore for recovery |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "getLockTokensForRecovery is skipping {}", |
| token); |
| } |
| } |
| |
| // add token to heldLockSet |
| else { |
| if (heldLockSet == Collections.EMPTY_SET) { |
| heldLockSet = new HashSet<>(); |
| } |
| heldLockSet.add(DLockRemoteToken.createFromDLockToken(token)); |
| } |
| } // isLeaseHeld |
| } // token sync |
| } // tokens iter |
| } // tokens sync |
| |
| return heldLockSet; |
| } |
| } |
| |
| /** |
| * Returns the named lock token or null if it doesn't exist. Synchronizes on tokens map. |
| * |
| * @return the named lock token or null if it doesn't exist |
| */ |
| public DLockToken getToken(Object name) { |
| synchronized (this.tokens) { |
| return this.tokens.get(name); |
| } |
| } |
| |
| /** |
| * Returns the named lock token or null if it doesn't exist. Caller must synchronize on tokens |
| * map. |
| * |
| * @return the named lock token or null if it doesn't exist |
| */ |
| private DLockToken basicGetToken(Object name) { |
| return this.tokens.get(name); |
| } |
| |
| /** |
| * Returns an unmodifiable collection backed by the values of the DLockToken map for testing |
| * purposes only. Synchronizes on tokens map. |
| * |
| * @return an unmodifiable collection of the tokens map values |
| */ |
| public Collection getTokens() { |
| synchronized (this.tokens) { |
| return Collections.unmodifiableCollection(this.tokens.values()); |
| } |
| } |
| |
| /** |
| * Returns an existing or creates a new DLockToken. Synchronizes on tokens map and the lock token. |
| * |
| * @param name the name of the lock |
| * @return an existing or new instantiated lock token |
| */ |
| DLockToken getOrCreateToken(Object name) { |
| synchronized (this.tokens) { |
| checkDestroyed(); // check destroy after acquiring tokens map sync |
| DLockToken token = this.tokens.get(name); |
| boolean createNewToken = token == null; |
| if (createNewToken) { |
| token = new DLockToken(this.dm, name); |
| } |
| synchronized (token) { |
| if (createNewToken) { |
| this.tokens.put(name, token); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Creating {} in {}", token, this); |
| } |
| getStats().incTokens(1); |
| } |
| token.incUsage(); |
| } |
| return token; |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Private instance methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Returns number of lock tokens currently leased by this member. Synchronizes on tokens map and |
| * each lock token. |
| * |
| * @return number of lock tokens currently leased by this member |
| */ |
| private int numLocksHeldInThisVM() { |
| int numLocksHeld = 0; |
| synchronized (this.tokens) { |
| for (DLockToken token : this.tokens.values()) { |
| synchronized (token) { |
| if (token.isLeaseHeld()) { |
| numLocksHeld++; |
| } |
| } |
| } |
| } |
| return numLocksHeld; |
| } |
| |
| /** |
| * TEST HOOK: Logs all lock tokens for this service at INFO level. Synchronizes on tokens map and |
| * each lock token. |
| */ |
| protected void dumpService() { |
| synchronized (this.tokens) { |
| StringBuffer buffer = new StringBuffer(); |
| buffer.append(" ").append(this.tokens.size()).append(" tokens, "); |
| buffer.append(numLocksHeldInThisVM()).append(" locks held\n"); |
| for (Map.Entry<Object, DLockToken> entry : this.tokens.entrySet()) { |
| buffer.append(" ").append(entry.getKey()).append(": "); |
| DLockToken token = entry.getValue(); |
| buffer.append(token.toString()).append("\n"); |
| } |
| logger.info(LogMarker.DLS_MARKER, buffer); |
| } |
| } |
| |
| // ----------- new thread state for interruptible and threadId ------------ |
| private final AtomicInteger threadSequence = new AtomicInteger(); |
| |
| protected static class ThreadRequestState { |
| protected final int threadId; |
| protected boolean interruptible; |
| |
| ThreadRequestState(int threadId, boolean interruptible) { |
| this.threadId = threadId; |
| this.interruptible = interruptible; |
| } |
| } |
| |
| private final ThreadLocal threadRequestState = new ThreadLocal(); |
| |
| private final UnsafeThreadLocal<Object> blockedOn = new UnsafeThreadLocal<Object>(); |
| |
| /** |
| * Returns true if the calling thread has an active lock request that is interruptible |
| */ |
| boolean isInterruptibleLockRequest() { |
| // this method is called by grantor for local lock requests in grantor vm |
| final ThreadRequestState requestState = (ThreadRequestState) threadRequestState.get(); |
| if (requestState == null) { |
| return false; |
| } |
| return requestState.interruptible; |
| } |
| |
| ThreadLocal getThreadRequestState() { |
| return threadRequestState; |
| } |
| |
| protected int incThreadSequence() { |
| return this.threadSequence.incrementAndGet(); |
| } |
| |
| // ------------------------------------------------------------------------- |
| // System disconnect listener |
| // ------------------------------------------------------------------------- |
| |
| /** Destroys all named locking services on disconnect from system */ |
| @MakeNotStatic("This object is actually immutable ... but it uses singletons") |
| protected static final InternalDistributedSystem.DisconnectListener disconnectListener = |
| new InternalDistributedSystem.DisconnectListener() { |
| @Override |
| public String toString() { |
| return "Disconnect listener for DistributedLockService"; |
| } |
| |
| @Override |
| public void onDisconnect(final InternalDistributedSystem sys) { |
| final boolean isDebugEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE); |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Shutting down Distributed Lock Services"); |
| } |
| long start = System.currentTimeMillis(); |
| try { |
| destroyAll(); |
| } finally { |
| closeStats(); |
| long delta = System.currentTimeMillis() - start; |
| if (isDebugEnabled_DLS) { |
| logger.trace(LogMarker.DLS_VERBOSE, "Distributed Lock Services stopped (took {} ms)", |
| delta); |
| } |
| } |
| } |
| }; |
| |
| // ---------------------------------------------------------------- |
| |
| @Immutable |
| private static final DummyDLockStats DUMMY_STATS = new DummyDLockStats(); |
| |
| @Immutable |
| static final SuspendLockingToken SUSPEND_LOCKING_TOKEN = new SuspendLockingToken(); |
| |
| // ------------------------------------------------------------------------- |
| // Static fields |
| // ------------------------------------------------------------------------- |
| |
| /** Map of all locking services. */ |
| @MakeNotStatic |
| protected static final Map<String, DLockService> services = new HashMap<>(); |
| |
| private static final Object creationLock = new Object(); |
| |
| /** All DLock threads belong to this group */ |
| @MakeNotStatic |
| private static ThreadGroup threadGroup; |
| |
| /** DLock statistics; static because multiple dlock instances can exist */ |
| @MakeNotStatic |
| private static DistributedLockStats stats = DUMMY_STATS; |
| |
| // ------------------------------------------------------------------------- |
| // Reserved lock service names |
| // ------------------------------------------------------------------------- |
| |
| public static final String LTLS = "LTLS"; |
| public static final String DTLS = "DTLS"; |
| private static final String[] reservedNames = new String[] {LTLS, DTLS}; |
| |
| // ------------------------------------------------------------------------- |
| // DLS serial number (uniquely identifies local instance of DLS) |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * Specifies the starting serial number for the serialNumberSequencer |
| */ |
| public static final int START_SERIAL_NUMBER = Integer |
| .getInteger(DistributionConfig.GEMFIRE_PREFIX + "DistributedLockService.startSerialNumber", 1) |
| .intValue(); |
| |
| /** |
| * Incrementing serial number used to identify order of DLS creation |
| * |
| * @see DLockService#getSerialNumber() |
| */ |
| @MakeNotStatic |
| private static final AtomicInteger serialNumberSequencer = new AtomicInteger(START_SERIAL_NUMBER); |
| |
| /** |
| * Identifies the static order in which this DLS was created in relation to other DLS or other |
| * instances of this DLS during the life of this JVM. Rollover to negative is allowed. |
| */ |
| private final int serialNumber; |
| |
| /** |
| * Generates a serial number for identifying a DLS. Later instances of the same named DLS will |
| * have a greater serial number than earlier instances. This number increments statically |
| * throughout the life of this JVM. Rollover to negative is allowed. |
| * |
| * @return the new serial number |
| */ |
| protected static int createSerialNumber() { |
| // NOTE: AtomicInteger should rollover if value is Integer.MAX_VALUE |
| return serialNumberSequencer.incrementAndGet(); |
| } |
| |
| /** |
| * Returns the serial number which identifies the static order in which this DLS was created in |
| * relation to other DLS'es or other instances of this named DLS during the life of this JVM. |
| */ |
| public int getSerialNumber() { |
| return this.serialNumber; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // External API methods |
| // ------------------------------------------------------------------------- |
| |
| /** |
| * @see org.apache.geode.distributed.DistributedLockService#getServiceNamed(String) |
| */ |
| public static DistributedLockService getServiceNamed(String serviceName) { |
| synchronized (services) { |
| return services.get(serviceName); |
| } |
| } |
| |
| public static DistributedLockService create(String serviceName, InternalDistributedSystem ds, |
| boolean distributed, boolean destroyOnDisconnect) { |
| return create(serviceName, ds, distributed, destroyOnDisconnect, false); |
| } |
| |
| /** |
| * Creates named <code>DistributedLockService</code>. |
| * |
| * @param serviceName name of the service |
| * @param ds InternalDistributedSystem |
| * @param distributed true if lock service will be distributed; false will be local only |
| * @param destroyOnDisconnect true if lock service should destroy itself using system disconnect |
| * listener |
| * @param automateFreeResources true if freeResources should be automatically called during unlock |
| * |
| * @throws IllegalArgumentException if serviceName is invalid or this process has already created |
| * named service |
| * |
| * @throws IllegalStateException if system is in process of disconnecting |
| * |
| * @see org.apache.geode.distributed.DistributedLockService#create(String, DistributedSystem) |
| */ |
| public static DistributedLockService create(String serviceName, InternalDistributedSystem ds, |
| boolean distributed, boolean destroyOnDisconnect, boolean automateFreeResources) |
| throws IllegalArgumentException, IllegalStateException { |
| // basicCreate will construct DLockService and it calls getOrCreateStats... |
| synchronized (creationLock) { |
| synchronized (services) { // disconnectListener syncs on this |
| ds.getCancelCriterion().checkCancelInProgress(null); |
| if (services.get(serviceName) != null) { |
| throw new IllegalArgumentException( |
| String.format("Service named %s already created", |
| serviceName)); |
| } |
| return DLockService.basicCreate(serviceName, ds, distributed, destroyOnDisconnect, |
| automateFreeResources); |
| } |
| } |
| } |
| |
| public static void becomeLockGrantor(String serviceName) throws IllegalArgumentException { |
| becomeLockGrantor(serviceName, null); |
| } |
| |
| public static void becomeLockGrantor(String serviceName, InternalDistributedMember oldTurk) |
| throws IllegalArgumentException { |
| if (serviceName == null || serviceName.length() == 0) { |
| throw new IllegalArgumentException(String.format("Service named '%s' is not valid", |
| serviceName)); |
| } |
| DLockService svc = getInternalServiceNamed(serviceName); |
| if (svc == null) { |
| throw new IllegalArgumentException( |
| String.format("Service named %s not created", serviceName)); |
| } else { |
| svc.becomeLockGrantor(oldTurk); |
| } |
| } |
| |
| public static boolean isLockGrantor(String serviceName) throws IllegalArgumentException { |
| if (serviceName == null || serviceName.length() == 0) { |
| throw new IllegalArgumentException(String.format("Service named '%s' is not valid", |
| serviceName)); |
| } |
| DLockService svc = null; |
| synchronized (services) { |
| svc = services.get(serviceName); |
| } |
| if (svc == null) { |
| throw new IllegalArgumentException( |
| String.format("Service named %s not created", serviceName)); |
| } else { |
| return svc.isLockGrantor(); |
| } |
| } |
| |
| /** |
| * Fills lists of service names. |
| * |
| * @param grantors filled with service names of all services we are currently the grantor of. |
| * @param grantorVersions elder assigned grantor version ids |
| * @param grantorSerialNumbers member specific DLS serial number hosting grantor |
| * @param nonGrantors filled with service names of all services we have that we are not the |
| * grantor of. |
| */ |
| public static void recoverRmtElder(ArrayList grantors, ArrayList grantorVersions, |
| ArrayList grantorSerialNumbers, ArrayList nonGrantors) { |
| synchronized (services) { |
| Iterator entries = services.entrySet().iterator(); |
| while (entries.hasNext()) { |
| Map.Entry entry = (Map.Entry) entries.next(); |
| String serviceName = (String) entry.getKey(); |
| DLockService service = (DLockService) entry.getValue(); |
| boolean foundGrantor = false; |
| DLockGrantor grantor = service.getGrantor(); |
| if (grantor != null && grantor.getVersionId() != -1 && !grantor.isDestroyed()) { |
| foundGrantor = true; |
| grantors.add(serviceName); |
| grantorVersions.add(Long.valueOf(grantor.getVersionId())); |
| grantorSerialNumbers.add(Integer.valueOf(service.getSerialNumber())); |
| } |
| if (!foundGrantor) { |
| nonGrantors.add(serviceName); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called when an elder is doing recovery. For every service that we are the grantor for add it to |
| * the grantorMap For every service we have that is not in the grantorMap add its name to |
| * needsRecovery set. |
| * |
| * @param dm our local DM |
| */ |
| public static void recoverLocalElder(DistributionManager dm, Map<String, GrantorInfo> grantors, |
| Set<String> needsRecovery) { |
| synchronized (services) { |
| Iterator entries = services.entrySet().iterator(); |
| while (entries.hasNext()) { |
| Map.Entry entry = (Map.Entry) entries.next(); |
| String serviceName = (String) entry.getKey(); |
| DLockService service = (DLockService) entry.getValue(); |
| boolean foundGrantor = false; |
| DLockGrantor grantor = service.getGrantor(); |
| if (grantor != null && grantor.getVersionId() != -1 && !grantor.isDestroyed()) { |
| foundGrantor = true; |
| GrantorInfo oldgi = grantors.get(serviceName); |
| if (oldgi == null || oldgi.getVersionId() < grantor.getVersionId()) { |
| grantors.put(serviceName, new GrantorInfo(dm.getId(), grantor.getVersionId(), |
| service.getSerialNumber(), false)); |
| needsRecovery.remove(serviceName); |
| } |
| } |
| if (!foundGrantor && !dm.isLoner()) { |
| if (!grantors.containsKey(serviceName)) { |
| needsRecovery.add(serviceName); |
| } |
| } |
| } |
| } |
| } |
| // ------------------------------------------------------------------------- |
| // Public static methods |
| // ------------------------------------------------------------------------- |
| |
| /** Convenience method to get named DLockService */ |
| public static DLockService getInternalServiceNamed(String serviceName) { |
| synchronized (services) { |
| return services.get(serviceName); |
| } |
| } |
| |
| /** Validates service name for external creation */ |
| public static void validateServiceName(String serviceName) { |
| if (serviceName == null || serviceName.length() == 0) { |
| throw new IllegalArgumentException( |
| "Lock service name must not be null or empty"); |
| } |
| for (String reservedName : reservedNames) { |
| if (serviceName.startsWith(reservedName)) { |
| throw new IllegalArgumentException( |
| String.format("Service named %s is reserved for internal use only", |
| serviceName)); |
| } |
| } |
| } |
| |
| /** Return a snapshot of all services */ |
| public static Map<String, DLockService> snapshotAllServices() { // used by: internal/admin/remote |
| synchronized (services) { |
| return new HashMap<>(services); |
| } |
| } |
| |
| /** |
| * TEST HOOK: Logs all lock tokens for every service at INFO level. Synchronizes on services map, |
| * service tokens maps and each lock token. |
| */ |
| public static void dumpAllServices() { // used by: distributed/DistributedLockServiceTest |
| StringBuffer buffer = new StringBuffer(); |
| synchronized (services) { |
| logger.info(LogMarker.DLS_MARKER, |
| "DLockService.dumpAllServices() - " + services.size() + " services:\n"); |
| for (Map.Entry entry : services.entrySet()) { |
| buffer.append(" " + entry.getKey() + ":\n"); |
| DLockService svc = (DLockService) entry.getValue(); |
| svc.dumpService(); |
| if (svc.isCurrentlyLockGrantor()) { |
| svc.grantor.dumpService(); |
| } |
| |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------- |
| // Package static methods |
| // ------------------------------------------------------------------------- |
| |
| static ThreadGroup getThreadGroup() { |
| return threadGroup; |
| } |
| |
| /** |
| * Return a timestamp that represents the current time for locking purposes. |
| * |
| * @since GemFire 3.5 |
| */ |
| static long getLockTimeStamp(DistributionManager dm) { |
| return dm.cacheTimeMillis(); |
| } |
| |
| /** Get or create static dlock stats */ |
| protected static synchronized DistributedLockStats getOrCreateStats(DistributedSystem ds) { |
| if (stats == DUMMY_STATS) { |
| Assert.assertTrue(ds != null, "Need an instance of InternalDistributedSystem"); |
| StatisticsFactory statFactory = ds; |
| long statId = OSProcess.getId(); |
| stats = new DLockStats(statFactory, statId); |
| } |
| return stats; |
| } |
| |
| protected static synchronized DistributedLockStats getDistributedLockStats() { |
| return stats; |
| } |
| |
| public static void addLockServiceForTests(String name, DLockService service) { |
| synchronized (services) { |
| services.put(name, service); |
| } |
| } |
| |
| public static void removeLockServiceForTests(String name) { |
| synchronized (services) { |
| services.remove(name); |
| } |
| } |
| |
| protected static void removeLockService(DLockService service) { |
| service.removeAllTokens(); |
| |
| InternalDistributedSystem system = null; |
| synchronized (services) { |
| DLockService removedService = services.remove(service.getName()); |
| if (removedService == null) { |
| // another thread beat us to the removal... return |
| return; |
| } |
| if (removedService != service) { |
| services.put(service.getName(), removedService); |
| } else { |
| service.getStats().incServices(-1); |
| } |
| system = removedService.getDistributionManager().getSystem(); |
| } |
| // if disconnecting and this was the last service, cleanup! |
| if (services.isEmpty() && system.isDisconnecting()) { |
| // get rid of stats and thread group... |
| synchronized (DLockService.class) { |
| closeStats(); |
| } |
| } |
| } |
| |
| static void closeStats() { |
| if (stats != DUMMY_STATS) { |
| ((DLockStats) stats).close(); |
| stats = DUMMY_STATS; |
| } |
| } |
| |
| /** Provide way to peek at current lock grantor id when dls does not exist */ |
| static GrantorInfo checkLockGrantorInfo(String serviceName, InternalDistributedSystem system) { |
| GrantorInfo gi = GrantorRequestProcessor.peekGrantor(serviceName, system); |
| if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) { |
| logger.trace(LogMarker.DLS_VERBOSE, "[checkLockGrantorId] returning {}", gi); |
| } |
| return gi; |
| } |
| |
| // ------------------------------------------------------------------------- |
| // SuspendLockingToken inner class |
| // ------------------------------------------------------------------------- |
| |
| /** Used as the name (key) for the suspend locking entry in the tokens map */ |
| @Immutable |
| public static class SuspendLockingToken implements DataSerializableFixedID { |
| public SuspendLockingToken() {} |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o == null) |
| return false; |
| return o instanceof SuspendLockingToken; |
| } |
| |
| @Override |
| public int hashCode() { |
| // Since instances always equal each other, they return the same hashCode |
| return 15325; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return SUSPEND_LOCKING_TOKEN; |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) {} |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) {} |
| |
| @Override |
| public Version[] getSerializationVersions() { |
| return null; |
| } |
| } |
| |
| /** |
| * CancelCriterion which provides cancellation for DLS request and release messages. Cancellation |
| * may occur if shutdown or if DLS is destroyed. |
| */ |
| private static class DLockStopper extends CancelCriterion { |
| |
| /** |
| * The DLockService this stopper will check for cancellation. |
| */ |
| private final DLockService dls; |
| |
| /** |
| * Creates a new DLockStopper for the specified DLockService and DM. |
| * |
| * @param dm the DM to check for shutdown |
| * @param dls the DLockService to check for DLS destroy |
| */ |
| DLockStopper(DistributionManager dm, DLockService dls) { |
| Assert.assertTrue(dls != null); |
| this.dls = dls; |
| Assert.assertTrue(dls.getDistributionManager() != null); |
| } |
| |
| @Override |
| public String cancelInProgress() { |
| String cancelInProgressString = |
| this.dls.getDistributionManager().getCancelCriterion().cancelInProgress(); |
| if (cancelInProgressString != null) { |
| // delegate to underlying DM's CancelCriterion... |
| return cancelInProgressString; |
| } else if (this.dls.isDestroyed()) { |
| // this.stoppedByDLS = true; |
| return this.dls.generateLockServiceDestroyedMessage(); |
| } else { |
| // return null since neither DM nor DLS are shutting down |
| // cannot call super.cancelInProgress because it's abstract |
| return null; |
| } |
| } |
| |
| @Override |
| public RuntimeException generateCancelledException(Throwable e) { |
| String reason = cancelInProgress(); |
| if (reason == null) { |
| return null; |
| } |
| return this.dls.generateLockServiceDestroyedException(reason); |
| } |
| |
| } |
| |
| public CancelCriterion getCancelCriterion() { |
| return stopper; |
| } |
| |
| } |