blob: 372ebeac45ffd3104446c2f11b7896b20e14673f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.local;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.jetbrains.annotations.Nullable;
/**
* Cache lock future.
*/
public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Boolean>
implements GridCacheVersionedFuture<Boolean> {
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Error updater. */
private static final AtomicReferenceFieldUpdater<GridLocalLockFuture, Throwable> ERR_UPD =
AtomicReferenceFieldUpdater.newUpdater(GridLocalLockFuture.class, Throwable.class, "err");
/** Logger. */
private static IgniteLogger log;
/** Cache registry. */
@GridToStringExclude
private GridCacheContext<K, V> cctx;
/** Underlying cache. */
@GridToStringExclude
private GridLocalCache<K, V> cache;
/** Lock owner thread. */
@GridToStringInclude
private long threadId;
/**
* Keys locked so far.
*
* Thread created this object iterates over entries and tries to lock each of them.
* If it finds some entry already locked by another thread it registers callback which will be executed
* by the thread owning the lock.
*
* Thus access to this collection must be synchronized except cases
* when this object is yet local to the thread created it.
*/
@GridToStringExclude
private List<GridLocalCacheEntry> entries;
/** Future ID. */
private IgniteUuid futId;
/** Lock version. */
private GridCacheVersion lockVer;
/** Error. */
private volatile Throwable err;
/** Timeout object. */
@GridToStringExclude
private LockTimeoutObject timeoutObj;
/** Lock timeout. */
private final long timeout;
/** Filter. */
private CacheEntryPredicate[] filter;
/** Transaction. */
private IgniteTxLocalEx tx;
/** Trackable flag. */
private boolean trackable = true;
/**
* @param cctx Registry.
* @param keys Keys to lock.
* @param tx Transaction.
* @param cache Underlying cache.
* @param timeout Lock acquisition timeout.
* @param filter Filter.
*/
GridLocalLockFuture(
GridCacheContext<K, V> cctx,
Collection<KeyCacheObject> keys,
IgniteTxLocalEx tx,
GridLocalCache<K, V> cache,
long timeout,
CacheEntryPredicate[] filter) {
assert keys != null;
assert cache != null;
assert (tx != null && timeout >= 0) || tx == null;
this.cctx = cctx;
this.cache = cache;
this.timeout = timeout;
this.filter = filter;
this.tx = tx;
ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
lockVer = tx != null ? tx.xidVersion() : cctx.cache().nextVersion();
futId = IgniteUuid.randomUuid();
entries = new ArrayList<>(keys.size());
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridLocalLockFuture.class);
if (tx != null && tx instanceof GridNearTxLocal && !((GridNearTxLocal)tx).updateLockFuture(null, this)) {
GridNearTxLocal tx0 = (GridNearTxLocal)tx;
onError(tx0.timedOut() ? tx0.timeoutException() : tx0.rollbackException());
}
}
/**
* @param keys Keys.
* @return {@code False} in case of error.
* @throws IgniteCheckedException If failed.
*/
public boolean addEntries(Collection<KeyCacheObject> keys) throws IgniteCheckedException {
for (KeyCacheObject key : keys) {
while (true) {
GridLocalCacheEntry entry = null;
try {
entry = cache.entryExx(key);
entry.unswap(false);
if (!cctx.isAll(entry, filter)) {
onFailed();
return false;
}
// Removed exception may be thrown here.
GridCacheMvccCandidate cand = addEntry(entry);
if (cand == null && isDone())
return false;
break;
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
}
}
}
if (timeout > 0) {
timeoutObj = new LockTimeoutObject();
cctx.time().addTimeoutObject(timeoutObj);
}
return true;
}
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return lockVer;
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
return false;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return trackable;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
trackable = false;
}
/**
* @return Entries.
*/
private List<GridLocalCacheEntry> entries() {
return entries;
}
/**
* @return {@code True} if transaction is not {@code null}.
*/
private boolean inTx() {
return tx != null;
}
/**
* @return {@code True} if implicit transaction.
*/
private boolean implicitSingle() {
return tx != null && tx.implicitSingle();
}
/**
* @param cached Entry.
* @return {@code True} if locked.
* @throws GridCacheEntryRemovedException If removed.
*/
private boolean locked(GridCacheEntryEx cached) throws GridCacheEntryRemovedException {
// Reentry-aware check.
return (cached.lockedLocally(lockVer) || (cached.lockedByThread(threadId))) &&
filter(cached); // If filter failed, lock is failed.
}
/**
* Adds entry to future.
*
* @param entry Entry to add.
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
@Nullable private GridCacheMvccCandidate addEntry(GridLocalCacheEntry entry)
throws GridCacheEntryRemovedException {
// Add local lock first, as it may throw GridCacheEntryRemovedException.
GridCacheMvccCandidate c = entry.addLocal(
threadId,
lockVer,
null,
null,
timeout,
!inTx(),
inTx(),
implicitSingle(),
false
);
entries.add(entry);
if (c == null && timeout < 0) {
if (log.isDebugEnabled())
log.debug("Failed to acquire lock with negative timeout: " + entry);
onFailed();
return null;
}
if (c != null) {
// Immediately set lock to ready.
entry.readyLocal(c);
}
return c;
}
/**
* Undoes all locks.
*/
private void undoLocks() {
Collection<GridLocalCacheEntry> entriesCp = entriesCopy();
for (GridLocalCacheEntry e : entriesCp) {
try {
e.removeLock(lockVer);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry while undoing locks: " + e);
}
}
}
/**
* Need of synchronization here is explained in the field's {@link GridLocalLockFuture#entries} comment.
*
* @return Copy of entries collection.
*/
private synchronized Collection<GridLocalCacheEntry> entriesCopy() {
return new ArrayList<>(entries());
}
/**
*
*/
void onFailed() {
undoLocks();
onComplete(false);
}
/**
* @param t Error.
*/
void onError(Throwable t) {
if (ERR_UPD.compareAndSet(this, null, t))
onFailed();
}
/**
* @param cached Entry to check.
* @return {@code True} if filter passed.
*/
private boolean filter(GridCacheEntryEx cached) {
try {
if (!cctx.isAll(cached, filter)) {
if (log.isDebugEnabled())
log.debug("Filter didn't pass for entry (will fail lock): " + cached);
onFailed();
return false;
}
return true;
}
catch (IgniteCheckedException e) {
onError(e);
return false;
}
}
/**
* Explicitly check if lock was acquired.
*/
void checkLocks() {
if (!isDone()) {
for (int i = 0; i < entries.size(); i++) {
while (true) {
GridCacheEntryEx cached = entries.get(i);
try {
if (!locked(cached))
return;
break;
}
// Possible in concurrent cases, when owner is changed after locks
// have been released or cancelled.
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
// Replace old entry with new one.
entries.add(i, (GridLocalCacheEntry)cache.entryEx(cached.key()));
}
}
}
if (log.isDebugEnabled())
log.debug("Local lock acquired for entries: " + entries);
onComplete(true);
}
}
/** {@inheritDoc} */
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
if (!isDone()) {
for (int i = 0; i < entries.size(); i++) {
while (true) {
GridCacheEntryEx cached = entries.get(i);
try {
if (!locked(cached))
return true;
break;
}
// Possible in concurrent cases, when owner is changed after locks
// have been released or cancelled.
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
// Replace old entry with new one.
entries.add(i, (GridLocalCacheEntry)cache.entryEx(cached.key()));
}
}
}
if (log.isDebugEnabled())
log.debug("Local lock acquired for entries: " + entries);
onComplete(true);
}
return false;
}
/** {@inheritDoc} */
@Override public boolean cancel() {
if (onCancelled()) {
// Remove all locks.
undoLocks();
onComplete(false);
}
return isCancelled();
}
/**
* Completeness callback.
*
* @param success If {@code true}, then lock has been acquired.
*/
private void onComplete(boolean success) {
if (!success)
undoLocks();
if (tx != null && success)
((GridNearTxLocal)tx).clearLockFuture(this);
if (onDone(success, err)) {
if (log.isDebugEnabled())
log.debug("Completing future: " + this);
cache.onFutureDone(this);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridLocalLockFuture.class, this);
}
/**
* Lock request timeout object.
*/
private class LockTimeoutObject extends GridTimeoutObjectAdapter {
/**
* Default constructor.
*/
LockTimeoutObject() {
super(timeout);
}
/** {@inheritDoc} */
@SuppressWarnings({"ForLoopReplaceableByForEach"})
@Override public void onTimeout() {
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
if (inTx()) {
if (cctx.tm().deadlockDetectionEnabled()) {
Set<IgniteTxKey> keys = new HashSet<>();
List<GridLocalCacheEntry> entries = entries();
for (int i = 0; i < entries.size(); i++) {
GridLocalCacheEntry e = entries.get(i);
List<GridCacheMvccCandidate> mvcc = e.mvccAllLocal();
if (mvcc == null)
continue;
GridCacheMvccCandidate cand = mvcc.get(0);
if (cand.owner() && cand.tx() && !cand.version().equals(tx.xidVersion()))
keys.add(e.txKey());
}
IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
@Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
try {
TxDeadlock deadlock = fut.get();
err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
"timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']',
deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) :
null);
}
catch (IgniteCheckedException e) {
err = e;
U.warn(log, "Failed to detect deadlock.", e);
}
onComplete(false);
}
});
}
else
err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
"timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']');
}
else
onComplete(false);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LockTimeoutObject.class, this);
}
}
}