blob: bb2d6d8630e1323e1889eca0bb269b76faa2db12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastructures;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCondition;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Cache reentrant lock implementation based on AbstractQueuedSynchronizer.
*/
public final class GridCacheLockImpl extends AtomicDataStructureProxy<GridCacheLockState>
implements GridCacheLockEx, IgniteChangeGlobalStateSupport, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Deserialization stash. */
private static final ThreadLocal<String> stash = new ThreadLocal<>();
/** Initialization guard. */
private final AtomicBoolean initGuard = new AtomicBoolean();
/** Initialization latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
/** Lock that provides non-overlapping processing of updates. */
private Lock updateLock = new ReentrantLock();
/** Internal synchronization object. */
private Sync sync;
/** Flag indicating that every operation on this lock should be interrupted. */
private volatile boolean interruptAll;
/**
* Empty constructor required by {@link Externalizable}.
*/
public GridCacheLockImpl() {
// This instance should never be used directly.
ctx = null;
}
/**
* Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer.
*/
@SuppressWarnings({"CallToThreadYield", "CallToSignalInsteadOfSignalAll"})
private class Sync extends AbstractQueuedSynchronizer {
/** */
private static final long serialVersionUID = 1192457210091910933L;
/** */
private static final long LOCK_FREE = 0;
/** Map containing condition objects. */
private Map<String, ConditionObject> conditionMap;
/** List of condition signal calls on this node. */
private Map<String, Integer> outgoingSignals;
/** Last condition waited on. */
@Nullable
private volatile String lastCondition;
/** True if any node owning the lock had failed. */
private volatile boolean isBroken;
/** UUID of the node that currently owns the lock. */
private volatile UUID currentOwnerNode;
/** ID of the thread that currently owns the lock. */
private volatile long currentOwnerThreadId;
/** UUID of this node. */
private final UUID thisNode;
/** FailoverSafe flag. */
private final boolean failoverSafe;
/** Fairness flag. */
private final boolean fair;
/** Threads that are waiting on this lock. */
private Set<Long> waitingThreads;
/**
* @param state State.
*/
protected Sync(GridCacheLockState state) {
setState(state.get());
thisNode = ctx.localNodeId();
currentOwnerNode = state.getId();
currentOwnerThreadId = state.getThreadId();
conditionMap = new HashMap<>();
outgoingSignals = new HashMap<>();
failoverSafe = state.isFailoverSafe();
fair = state.isFair();
waitingThreads = new ConcurrentSkipListSet<>();
}
/**
*
*/
protected void addOutgoingSignal(String condition) {
int cnt = 0;
if (outgoingSignals.containsKey(condition)) {
cnt = outgoingSignals.get(condition);
// SignalAll has already been called.
if (cnt == 0)
return;
}
outgoingSignals.put(condition, cnt + 1);
}
/**
* @param condition Condition.
*/
protected void addOutgoingSignalAll(String condition) {
outgoingSignals.put(condition, 0);
}
/**
* Process any condition await calls on this node.
*/
private String processAwait() {
if (lastCondition == null)
return null;
String ret = lastCondition;
lastCondition = null;
return ret;
}
/** */
private Map<String, Integer> processSignal() {
Map<String, Integer> ret = new HashMap<>(outgoingSignals);
outgoingSignals.clear();
return ret;
}
/** Interrupt every thread on this node waiting on this lock. */
private synchronized void interruptAll() {
// First release all threads waiting on associated condition queues.
if (!conditionMap.isEmpty()) {
// Temporarily obtain ownership of the lock,
// in order to signal all conditions.
UUID tempUUID = getOwnerNode();
long tempThreadID = currentOwnerThreadId;
setCurrentOwnerNode(thisNode);
currentOwnerThreadId = Thread.currentThread().getId();
for (Condition c : conditionMap.values())
c.signalAll();
// Restore owner node and owner thread.
setCurrentOwnerNode(tempUUID);
currentOwnerThreadId = tempThreadID;
}
// Interrupt any future call to acquire/release on this sync object.
interruptAll = true;
// Interrupt any ongoing transactions.
for (Thread t: getQueuedThreads())
t.interrupt();
}
/** Check if lock is in correct state (i.e. not broken in non-failoversafe mode),
* if not throw {@linkplain IgniteInterruptedException} */
private void validate(final boolean throwInterrupt) {
// Interrupted flag shouldn't be always cleared
// (e.g. lock() method doesn't throw exception and doesn't clear interrupted)
// but should be cleared if this method is called after lock breakage or node stop.
// If interruptAll is set, exception is thrown anyway.
boolean interrupted = Thread.currentThread().isInterrupted();
// Clear interrupt flag.
if (throwInterrupt || interruptAll)
Thread.interrupted();
if (interruptAll)
throw new IgniteException("Lock broken (possible reason: node stopped" +
" or node owning lock failed while in non-failoversafe mode).");
// Global queue should be synchronized only if interrupted exception should be thrown.
if (fair && (throwInterrupt && interrupted) && !interruptAll) {
synchronizeQueue(true, Thread.currentThread());
throw new IgniteInterruptedException("Lock is interrupted.");
}
}
/**
* Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain
* GridCacheLockImpl#onUpdate(GridCacheLockState)}.
*
* @param permits Number of permits acquired at this reentrant lock.
*/
final synchronized void setPermits(int permits) {
setState(permits);
}
/**
* Gets the number of permissions currently acquired at this lock.
*
* @return Number of permits acquired at this reentrant lock.
*/
final int getPermits() {
return getState();
}
/**
* Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain
* GridCacheLockImpl#onUpdate(GridCacheLockState)}.
*
* @param ownerNode UUID of the node owning this lock.
*/
final synchronized void setCurrentOwnerNode(UUID ownerNode) {
currentOwnerNode = ownerNode;
}
/**
* Gets the UUID of the node that currently owns the lock.
*
* @return UUID of the node that currently owns the lock.
*/
final UUID getOwnerNode() {
return currentOwnerNode;
}
/**
* Checks if latest call to acquire/release was called on this node.
* Should only be called from update method.
*
* @param newOwnerID ID of the node that is about to acquire this lock (or null).
* @return true if acquire/release that triggered last update came from this node.
*/
protected boolean isLockedLocally(UUID newOwnerID) {
return thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID);
}
/**
* @param newOwnerThreadId New owner thread id.
*/
protected void setCurrentOwnerThread(long newOwnerThreadId) {
currentOwnerThreadId = newOwnerThreadId;
}
/**
* Returns true if node that owned the locked failed before call to unlock.
*
* @return true if any node failed while owning the lock.
*/
protected boolean isBroken() {
return isBroken;
}
/** */
protected void setBroken(boolean isBroken) {
this.isBroken = isBroken;
}
/** */
protected synchronized boolean hasPredecessor(LinkedList<UUID> nodes) {
if (!fair)
return false;
for (Iterator<UUID> it = nodes.iterator(); it.hasNext(); ) {
UUID node = it.next();
if (ctx.discovery().node(node) == null) {
it.remove();
continue;
}
return !node.equals(thisNode);
}
return false;
}
/**
* Performs tryLock.
* @param acquires Number of permits to acquire.
* @param fair Fairness parameter.
* @return {@code True} if succeeded, false otherwise.
*/
final boolean tryAcquire(final int acquires, final boolean fair) {
// If broken in non-failoversafe mode, exit immediately.
if (interruptAll)
return true;
final Thread current = Thread.currentThread();
boolean failed = false;
int c = getState();
// Wait for lock to reach stable state.
while (c != 0) {
UUID currentOwner = currentOwnerNode;
if (currentOwner != null) {
failed = ctx.discovery().node(currentOwner) == null;
break;
}
c = getState();
}
// Check if lock is released or current owner failed.
if (c == 0 || failed) {
if (compareAndSetGlobalState(0, acquires, current, fair)) {
// Not used for synchronization (we use ThreadID), but updated anyway.
setExclusiveOwnerThread(current);
while (!isHeldExclusively() && !interruptAll)
Thread.yield();
return true;
}
}
else if (isHeldExclusively()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded.");
setState(nextc);
return true;
}
if (fair && !isQueued(current))
synchronizeQueue(false, current);
return false;
}
/**
* Performs lock.
*/
final void lock() {
acquire(1);
}
/** {@inheritDoc} */
@Override protected final boolean tryAcquire(int acquires) {
return tryAcquire(acquires, fair);
}
/** {@inheritDoc} */
@Override protected final boolean tryRelease(int releases) {
// This method is called with release==0 only when trying to wake through update,
// to check if some other node released the lock.
if (releases == 0)
return true;
// If broken in non-failoversafe mode, exit immediately.
if (interruptAll)
return true;
int c = getState() - releases;
if (!isHeldExclusively()) {
log.error("Lock.unlock() is called in illegal state [callerNodeId=" + thisNode + ", ownerNodeId="
+ currentOwnerNode + ", callerThreadId=" + Thread.currentThread().getId() + ", ownerThreadId="
+ currentOwnerThreadId + ", lockState=" + getState() + "]");
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
free = true;
setGlobalState(0, processAwait(), processSignal());
while (isHeldExclusively() && !interruptAll)
Thread.yield();
}
else
setState(c);
return free;
}
/** {@inheritDoc} */
@Override protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode);
}
/**
* @param name Condition name.
* @return Condition object.
*/
final synchronized IgniteCondition newCondition(String name) {
if (conditionMap.containsKey(name))
return new IgniteConditionObject(name, conditionMap.get(name));
ConditionObject cond = new ConditionObject();
conditionMap.put(name, cond);
return new IgniteConditionObject(name, cond);
}
// Methods relayed from outer class
/** */
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
/** */
final boolean isLocked() throws IgniteCheckedException {
return getState() != 0 || cacheView.get(key).get() != 0;
}
/**
* This method is used for synchronizing the reentrant lock state across all nodes.
*/
boolean compareAndSetGlobalState(final int expVal, final int newVal,
final Thread newThread, final boolean bargingProhibited) {
try {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = cacheView.get(key);
if (val == null)
throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
final long newThreadID = newThread.getId();
LinkedList<UUID> nodes = val.getNodes();
// Barging is prohibited in fair mode unless tryLock() is called.
if (!(bargingProhibited && hasPredecessor(nodes))) {
if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) {
val.set(newVal);
val.setId(thisNode);
val.setThreadId(newThreadID);
val.setSignals(null);
// This node is already in queue, except in cases where this is the only node
// or this is a call to tryLock(), in which case barging is ok.
// Queue is only updated if this is fair lock.
if (val.isFair() && (nodes.isEmpty() || !bargingProhibited))
nodes.addFirst(thisNode);
val.setNodes(nodes);
val.setChanged(true);
cacheView.put(key, val);
tx.commit();
return true;
}
}
return false;
}
catch (Exception e) {
if (interruptAll) {
if (log.isInfoEnabled())
log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
" aborting transaction.");
// Return immediately, exception will be thrown later.
return true;
}
else {
if (Thread.currentThread().isInterrupted()) {
if (log.isInfoEnabled())
log.info("Thread is interrupted while attempting to acquire lock.");
// Delegate the decision to throw InterruptedException to the AQS.
sync.release(0);
return false;
}
U.error(log, "Failed to compare and set: " + this, e);
}
throw e;
}
}
});
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/**
* This method is used for synchronizing the number of acquire attempts on this lock across all nodes.
*
* @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered.
*/
boolean synchronizeQueue(final boolean cancelled, final Thread thread) {
final AtomicBoolean interrupted = new AtomicBoolean(false);
try {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = cacheView.get(key);
if (val == null)
throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
LinkedList<UUID> nodes = val.getNodes();
if (!cancelled) {
nodes.add(thisNode);
val.setChanged(false);
cacheView.put(key, val);
tx.commit();
// Keep track of all threads that are queued in global queue.
// We deliberately don't use #sync.isQueued(), because AQS
// cancel threads immediately after throwing interrupted exception.
sync.waitingThreads.add(thread.getId());
return true;
}
else {
if (sync.waitingThreads.contains(thread.getId())) {
// Update other nodes if this is the first node in queue.
val.setChanged(nodes.lastIndexOf(thisNode) == 0);
nodes.removeLastOccurrence(thisNode);
cacheView.put(key, val);
tx.commit();
sync.waitingThreads.remove(thread.getId());
return true;
}
}
return false;
}
catch (Exception e) {
if (interruptAll) {
if (log.isInfoEnabled())
log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
" aborting transaction.");
// Abort this attempt to synchronize queue and start another one,
// that will return immediately.
sync.release(0);
return false;
}
else {
// If thread got interrupted, abort this attempt to synchronize queue,
// clear interrupt flag and try again, and let the AQS decide
// whether to throw an exception or ignore it.
if (Thread.interrupted() || X.hasCause(e, InterruptedException.class)) {
interrupted.set(true);
throw new TransactionRollbackException("Thread got interrupted " +
"while synchronizing the global queue, retrying. ");
}
U.error(log, "Failed to synchronize global lock queue: " + this, e);
}
throw e;
}
}
});
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
// Restore interrupt flag and let AQS decide what to do with it.
if (interrupted.get())
Thread.currentThread().interrupt();
}
}
/**
* Sets the global state across all nodes after releasing the reentrant lock.
*
* @param newVal New state.
* @param lastCond Id of the condition await is called.
* @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock.
*/
protected boolean setGlobalState(final int newVal,
@Nullable final String lastCond,
final Map<String, Integer> outgoingSignals) {
try {
return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = cacheView.get(key);
if (val == null)
throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
val.set(newVal);
if (newVal == 0) {
val.setId(null);
val.setThreadId(LOCK_FREE);
}
val.setChanged(true);
// If this lock is fair, remove this node from queue.
if (val.isFair() && newVal == 0) {
UUID rmvdNode = val.getNodes().removeFirst();
assert (thisNode.equals(rmvdNode));
}
// Get global condition queue.
Map<String, LinkedList<UUID>> condMap = val.getConditionMap();
// Create map containing signals from this node.
Map<UUID, LinkedList<String>> signalMap = new HashMap<>();
// Put any signal calls on this node to global state.
if (!outgoingSignals.isEmpty()) {
for (String condition : outgoingSignals.keySet()) {
int cnt = outgoingSignals.get(condition);
// Get queue for this condition.
List<UUID> list = condMap.get(condition);
if (list != null && !list.isEmpty()) {
// Check if signalAll was called.
if (cnt == 0)
cnt = list.size();
// Remove from global condition queue.
for (int i = 0; i < cnt; i++) {
if (list.isEmpty())
break;
UUID uuid = list.remove(0);
// Skip if node to be released is not alive anymore.
if (ctx.discovery().node(uuid) == null) {
cnt++;
continue;
}
LinkedList<String> queue = signalMap.get(uuid);
if (queue == null) {
queue = new LinkedList<>();
signalMap.put(uuid, queue);
}
queue.add(condition);
}
}
}
}
val.setSignals(signalMap);
// Check if this release is called after condition.await() call;
// If true, add this node to the global waiting queue.
if (lastCond != null) {
LinkedList<UUID> queue;
//noinspection IfMayBeConditional
if (!condMap.containsKey(lastCond))
// New condition object.
queue = new LinkedList<>();
else
// Existing condition object.
queue = condMap.get(lastCond);
queue.add(thisNode);
condMap.put(lastCond, queue);
}
val.setConditionMap(condMap);
cacheView.put(key, val);
tx.commit();
return true;
}
catch (Exception e) {
if (interruptAll) {
if (log.isInfoEnabled())
log.info("Node is stopped (or lock is broken in non-failover safe mode)," +
" aborting transaction.");
return true;
}
else
U.error(log, "Failed to release: " + this, e);
throw e;
}
}
});
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** */
synchronized boolean checkIncomingSignals(GridCacheLockState state) {
if (state.getSignals() == null)
return false;
LinkedList<String> signals = state.getSignals().get(thisNode);
if (signals == null || signals.isEmpty())
return false;
UUID tempUUID = getOwnerNode();
Thread tempThread = getExclusiveOwnerThread();
long tempThreadID = currentOwnerThreadId;
// Temporarily allow current thread to signal condition object.
// This is safe to do because:
// 1. if release was called on this node,
// it was called from currently active thread;
// 2. if release came from a thread on any other node,
// all threads on this node are already blocked.
setCurrentOwnerNode(thisNode);
setExclusiveOwnerThread(Thread.currentThread());
currentOwnerThreadId = Thread.currentThread().getId();
for (String signal: signals)
conditionMap.get(signal).signal();
// Restore owner node and owner thread.
setCurrentOwnerNode(tempUUID);
setExclusiveOwnerThread(tempThread);
currentOwnerThreadId = tempThreadID;
return true;
}
/**
* Condition implementation for {@linkplain IgniteLock}.
*
**/
private class IgniteConditionObject implements IgniteCondition {
/** */
private final String name;
/** */
private final AbstractQueuedSynchronizer.ConditionObject obj;
/**
* @param name Condition name.
* @param obj Condition object.
*/
protected IgniteConditionObject(String name, ConditionObject obj) {
this.name = name;
this.obj = obj;
}
/**
* Name of this condition.
*
* @return name Name of this condition object.
*/
@Override public String name() {
return name;
}
/** {@inheritDoc} */
@Override public void await() throws IgniteInterruptedException {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
lastCondition = name;
obj.await();
sync.validate(true);
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public void awaitUninterruptibly() {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
lastCondition = name;
obj.awaitUninterruptibly();
sync.validate(false);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
lastCondition = name;
long result = obj.awaitNanos(nanosTimeout);
sync.validate(true);
return result;
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
lastCondition = name;
boolean result = obj.await(time, unit);
sync.validate(true);
return result;
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
lastCondition = name;
boolean result = obj.awaitUntil(deadline);
sync.validate(true);
return result;
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public void signal() {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
validate(false);
addOutgoingSignal(name);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public void signalAll() {
ctx.kernalContext().gateway().readLock();
try {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
sync.validate(false);
addOutgoingSignalAll(name);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
}
}
/**
* Constructor.
*
* @param name Reentrant lock name.
* @param key Reentrant lock key.
* @param lockView Reentrant lock projection.
*/
public GridCacheLockImpl(String name,
GridCacheInternalKey key,
IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView) {
super(name, key, lockView);
}
/**
* @throws IgniteCheckedException If operation failed.
*/
private void initializeReentrantLock() throws IgniteCheckedException {
if (initGuard.compareAndSet(false, true)) {
try {
sync = retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, cacheView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = cacheView.get(key);
if (val == null) {
if (log.isDebugEnabled())
log.debug("Failed to find reentrant lock with given name: " + name);
return null;
}
tx.rollback();
return new Sync(val);
}
}
});
if (log.isDebugEnabled())
log.debug("Initialized internal sync structure: " + sync);
}
finally {
initLatch.countDown();
}
}
else {
U.await(initLatch);
if (sync == null)
throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized.");
}
}
/** {@inheritDoc} */
@Override public void onUpdate(GridCacheLockState val) {
// Called only on initialization, so it's safe to ignore update.
if (sync == null)
return;
updateLock.lock();
try {
// If this update is a result of unsuccessful acquire in fair mode, no local update should be done.
if (!val.isChanged())
return;
// Check if update came from this node.
boolean loc = sync.isLockedLocally(val.getId());
// Process any incoming signals.
boolean incomingSignals = sync.checkIncomingSignals(val);
// Update permission count.
sync.setPermits(val.get());
// Update owner's node id.
sync.setCurrentOwnerNode(val.getId());
// Update owner's thread id.
sync.setCurrentOwnerThread(val.getThreadId());
// Check if any threads waiting on this node need to be notified.
if ((incomingSignals || sync.getPermits() == 0) && !loc) {
// Try to notify any waiting threads.
sync.release(0);
}
} finally {
updateLock.unlock();
}
}
/** {@inheritDoc} */
@Override public void onNodeRemoved(UUID nodeId) {
updateLock.lock();
try {
if (nodeId.equals(sync.getOwnerNode())) {
if (!sync.failoverSafe) {
sync.setBroken(true);
sync.interruptAll();
}
}
// Try to notify any waiting threads.
sync.release(0);
}
finally {
updateLock.unlock();
}
}
/** {@inheritDoc} */
@Override public void onStop() {
if (sync == null) {
interruptAll = true;
return;
}
if (!sync.failoverSafe) {
sync.setBroken(true);
}
sync.interruptAll();
// Try to notify any waiting threads.
sync.release(0);
}
/** {@inheritDoc} */
@Override public void lock() {
ctx.kernalContext().gateway().readLock();
try {
initializeReentrantLock();
if (sync == null)
throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
sync.lock();
sync.validate(false);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public void lockInterruptibly() throws IgniteInterruptedException {
ctx.kernalContext().gateway().readLock();
try {
initializeReentrantLock();
sync.acquireInterruptibly(1);
sync.validate(true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
catch (InterruptedException e) {
if (sync.fair)
sync.synchronizeQueue(true, Thread.currentThread());
throw new IgniteInterruptedException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean tryLock() {
ctx.kernalContext().gateway().readLock();
try {
initializeReentrantLock();
boolean result = sync.tryAcquire(1, false);
sync.validate(false);
return result;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException {
ctx.kernalContext().gateway().readLock();
try {
initializeReentrantLock();
boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout));
sync.validate(true);
return result;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
catch (InterruptedException e) {
if (sync.fair)
sync.synchronizeQueue(true, Thread.currentThread());
throw new IgniteInterruptedException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public void unlock() {
ctx.kernalContext().gateway().readLock();
try {
initializeReentrantLock();
// Validate before release.
sync.validate(false);
sync.release(1);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** */
@NotNull @Override public Condition newCondition() {
throw new UnsupportedOperationException("IgniteLock does not allow creation of nameless conditions. ");
}
/** {@inheritDoc} */
@Override public IgniteCondition getOrCreateCondition(String name) {
ctx.kernalContext().gateway().readLock();
try {
initializeReentrantLock();
IgniteCondition result = sync.newCondition(name);
sync.validate(false);
return result;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
ctx.kernalContext().gateway().readUnlock();
}
}
/** {@inheritDoc} */
@Override public int getHoldCount() {
try {
initializeReentrantLock();
return sync.getHoldCount();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public boolean isHeldByCurrentThread() {
try {
initializeReentrantLock();
return sync.isHeldExclusively();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public boolean isLocked() {
try {
initializeReentrantLock();
return sync.isLocked();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public boolean hasQueuedThreads() {
try {
initializeReentrantLock();
return sync.hasQueuedThreads();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public boolean hasQueuedThread(Thread thread) {
try {
initializeReentrantLock();
return sync.isQueued(thread);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public boolean hasWaiters(IgniteCondition condition) {
try {
initializeReentrantLock();
AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
if (c == null)
throw new IllegalArgumentException();
return sync.hasWaiters(c);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public int getWaitQueueLength(IgniteCondition condition) {
try {
initializeReentrantLock();
AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
if (c == null)
throw new IllegalArgumentException();
return sync.getWaitQueueLength(c);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** */
@Override public boolean isFailoverSafe() {
try {
initializeReentrantLock();
return sync.failoverSafe;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** */
@Override public boolean isFair() {
try {
initializeReentrantLock();
return sync.fair;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public boolean isBroken() {
try {
initializeReentrantLock();
return sync.isBroken();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/** {@inheritDoc} */
@Override public void needCheckNotRemoved() {
// no-op
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(name);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
stash.set(in.readUTF());
}
/**
* Reconstructs object on unmarshalling.
*
* @return Reconstructed object.
* @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
private Object readResolve() throws ObjectStreamException {
String name = stash.get();
assert name != null;
try {
IgniteLock lock = IgnitionEx.localIgnite().context().dataStructures().reentrantLock(
name,
null,
false,
false,
false);
if (lock == null)
throw new IllegalStateException("Lock was not found on deserialization: " + name);
return lock;
}
catch (IgniteCheckedException e) {
throw U.withCause(new InvalidObjectException(e.getMessage()), e);
}
finally {
stash.remove();
}
}
/** {@inheritDoc} */
@Override public void close() {
if (!rmvd) {
try {
boolean force = sync != null && (sync.isBroken() && !sync.failoverSafe);
ctx.kernalContext().dataStructures().removeReentrantLock(name, ctx.group().name(), force);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheLockImpl.class, this);
}
}