blob: 6cd45148f95b13aa9f3178ff51f683a1d943950a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
/**
* Cache lock future.
*/
public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheVersionedFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Done field updater. */
private static final AtomicIntegerFieldUpdater<GridNearLockFuture> DONE_UPD =
AtomicIntegerFieldUpdater.newUpdater(GridNearLockFuture.class, "done");
/** */
private static IgniteLogger log;
/** Cache registry. */
@GridToStringExclude
private final GridCacheContext<?, ?> cctx;
/** Lock owner thread. */
@GridToStringInclude
private long threadId;
/** Keys to lock. */
@GridToStringInclude
private final Collection<KeyCacheObject> keys;
/** Future ID. */
private final IgniteUuid futId;
/** Lock version. */
private final GridCacheVersion lockVer;
/** Read flag. */
private boolean read;
/** Flag to return value. */
private final boolean retval;
/** Error. */
private volatile Throwable err;
/** Timed out flag. */
private volatile boolean timedOut;
/** Timeout object. */
@GridToStringExclude
private volatile LockTimeoutObject timeoutObj;
/** Lock timeout. */
private final long timeout;
/** Filter. */
private final CacheEntryPredicate[] filter;
/** Transaction. */
@GridToStringExclude
private final GridNearTxLocal tx;
/** Topology snapshot to operate on. */
private volatile AffinityTopologyVersion topVer;
/** Map of current values. */
private final Map<KeyCacheObject, IgniteBiTuple<GridCacheVersion, CacheObject>> valMap;
/** */
@SuppressWarnings("UnusedDeclaration")
private volatile int done;
/** Keys locked so far. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
@GridToStringExclude
private List<GridDistributedCacheEntry> entries;
/** TTL for create operation. */
private long createTtl;
/** TTL for read operation. */
private long accessTtl;
/** Skip store flag. */
private final boolean skipStore;
/** Mappings to proceed. */
@GridToStringExclude
private Queue<GridNearLockMapping> mappings;
/** Keep binary context flag. */
private final boolean keepBinary;
/** Recovery mode context flag. */
private final boolean recovery;
/** */
private int miniId;
/**
* @param cctx Registry.
* @param keys Keys to lock.
* @param tx Transaction.
* @param read Read flag.
* @param retval Flag to return value or not.
* @param timeout Lock acquisition timeout.
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
* @param filter Filter.
* @param skipStore skipStore
* @param keepBinary Keep binary flag.
* @param recovery Recovery flag.
*/
public GridNearLockFuture(
GridCacheContext<?, ?> cctx,
Collection<KeyCacheObject> keys,
@Nullable GridNearTxLocal tx,
boolean read,
boolean retval,
long timeout,
long createTtl,
long accessTtl,
CacheEntryPredicate[] filter,
boolean skipStore,
boolean keepBinary,
boolean recovery
) {
super(CU.boolReducer());
assert keys != null;
assert (tx != null && timeout >= 0) || tx == null;
this.cctx = cctx;
this.keys = keys;
this.tx = tx;
this.read = read;
this.retval = retval;
this.timeout = timeout;
this.createTtl = createTtl;
this.accessTtl = accessTtl;
this.filter = filter;
this.skipStore = skipStore;
this.keepBinary = keepBinary;
this.recovery = recovery;
ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
lockVer = tx != null ? tx.xidVersion() : cctx.versions().next();
futId = IgniteUuid.randomUuid();
entries = new ArrayList<>(keys.size());
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridNearLockFuture.class);
valMap = new ConcurrentHashMap<>();
if (tx != null && !tx.updateLockFuture(null, this)) {
err = tx.timedOut() ? tx.timeoutException() : tx.rollbackException();
onComplete(false, false);
}
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return lockVer;
}
/**
* @return Entries.
*/
public synchronized List<GridDistributedCacheEntry> entriesCopy() {
return new ArrayList<>(entries);
}
/**
* @return Future ID.
*/
@Override public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return true;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
// No-op.
}
/**
* @return {@code True} if transaction is not {@code null}.
*/
private boolean inTx() {
return tx != null;
}
/**
* @return {@code True} if implicit-single-tx flag is set.
*/
private boolean implicitSingleTx() {
return tx != null && tx.implicitSingle();
}
/**
* @return {@code True} if transaction is not {@code null} and has invalidate flag set.
*/
private boolean isInvalidate() {
return tx != null && tx.isInvalidate();
}
/**
* @return Transaction isolation or {@code null} if no transaction.
*/
@Nullable private TransactionIsolation isolation() {
return tx == null ? null : tx.isolation();
}
/**
* @return {@code true} if related transaction is implicit.
*/
private boolean implicitTx() {
return tx != null && tx.implicit();
}
/**
* @param cached Entry.
* @return {@code True} if locked.
* @throws GridCacheEntryRemovedException If removed.
*/
private boolean locked(GridCacheEntryEx cached) throws GridCacheEntryRemovedException {
// Reentry-aware check (If filter failed, lock is failed).
return cached.lockedLocallyByIdOrThread(lockVer, threadId) && filter(cached);
}
/**
* Adds entry to future.
*
* @param topVer Topology version.
* @param entry Entry to add.
* @param dhtNodeId DHT node ID.
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
@Nullable private GridCacheMvccCandidate addEntry(
AffinityTopologyVersion topVer,
GridNearCacheEntry entry,
UUID dhtNodeId
) throws GridCacheEntryRemovedException {
assert Thread.holdsLock(this);
// Check if lock acquisition is timed out.
if (timedOut)
return null;
// Add local lock first, as it may throw GridCacheEntryRemovedException.
GridCacheMvccCandidate c = entry.addNearLocal(
dhtNodeId,
threadId,
lockVer,
topVer,
timeout,
!inTx(),
inTx(),
implicitSingleTx(),
false
);
if (inTx()) {
IgniteTxEntry txEntry = tx.entry(entry.txKey());
txEntry.cached(entry);
}
entries.add(entry);
if (c == null && timeout < 0) {
if (log.isDebugEnabled())
log.debug("Failed to acquire lock with negative timeout: " + entry);
onFailed(false);
return null;
}
// Double check if lock acquisition has already timed out.
if (timedOut) {
entry.removeLock(lockVer);
return null;
}
return c;
}
/**
* Undoes all locks.
*
* @param dist If {@code true}, then remove locks from remote nodes as well.
* @param rollback {@code True} if should rollback tx.
*/
private void undoLocks(boolean dist, boolean rollback) {
// Transactions will undo during rollback.
if (dist && tx == null)
cctx.nearTx().removeLocks(lockVer, keys);
else {
if (rollback && tx != null) {
if (tx.setRollbackOnly()) {
if (log.isDebugEnabled())
log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
}
else if (log.isDebugEnabled())
log.debug("Transaction was not marked rollback-only while locks were not acquired: " + tx);
}
for (GridCacheEntryEx e : entriesCopy()) {
try {
e.removeLock(lockVer);
}
catch (GridCacheEntryRemovedException ignored) {
while (true) {
try {
e = cctx.cache().peekEx(e.key());
if (e != null)
e.removeLock(lockVer);
break;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Attempted to remove lock on removed entry (will retry) [ver=" +
lockVer + ", entry=" + e + ']');
}
}
}
}
}
cctx.mvcc().recheckPendingLocks();
}
/**
*
* @param dist {@code True} if need to distribute lock release.
*/
private void onFailed(boolean dist) {
undoLocks(dist, true);
complete(false);
}
/**
* @param success Success flag.
*/
public void complete(boolean success) {
onComplete(success, true);
}
/**
* @param nodeId Left node ID
* @return {@code True} if node was in the list.
*/
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
@Override public boolean onNodeLeft(UUID nodeId) {
boolean found = false;
for (IgniteInternalFuture<?> fut : futures()) {
if (isMini(fut)) {
MiniFuture f = (MiniFuture)fut;
if (f.node().id().equals(nodeId)) {
if (log.isDebugEnabled())
log.debug("Found mini-future for left node [nodeId=" + nodeId + ", mini=" + f + ", fut=" +
this + ']');
f.onResult(newTopologyException(null, nodeId));
found = true;
}
}
}
if (!found) {
if (log.isDebugEnabled())
log.debug("Near lock future does not have mapping for left node (ignoring) [nodeId=" + nodeId +
", fut=" + this + ']');
}
return found;
}
/**
* @param nodeId Sender.
* @param res Result.
*/
@SuppressWarnings("SynchronizeOnNonFinalField")
void onResult(UUID nodeId, GridNearLockResponse res) {
boolean done = isDone();
if (!done) {
// onResult is always called after map() and timeoutObj is never reset to null, so this is
// a race-free null check.
if (timeoutObj == null) {
onResult0(nodeId, res);
return;
}
synchronized (timeoutObj) {
if (!isDone()) {
if (onResult0(nodeId, res))
return;
}
else
done = true;
}
}
if (done && log.isDebugEnabled())
log.debug("Ignoring lock response from node (future is done) [nodeId=" + nodeId + ", res=" + res +
", fut=" + this + ']');
}
/**
* @param nodeId Sender.
* @param res Result.
*/
private boolean onResult0(UUID nodeId, GridNearLockResponse res) {
if (log.isDebugEnabled())
log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']');
MiniFuture mini = miniFuture(res.miniId());
if (mini != null) {
assert mini.node().id().equals(nodeId);
if (log.isDebugEnabled())
log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
mini.onResult(res);
if (log.isDebugEnabled())
log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
", res=" + res + ']');
return true;
}
U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
", fut=" + this + ']');
return false;
}
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
public synchronized Set<IgniteTxKey> requestedKeys() {
if (timeoutObj != null && timeoutObj.requestedKeys != null)
return timeoutObj.requestedKeys;
return requestedKeys0();
}
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
private Set<IgniteTxKey> requestedKeys0() {
for (IgniteInternalFuture<Boolean> miniFut : futures()) {
if (isMini(miniFut) && !miniFut.isDone()) {
MiniFuture mini = (MiniFuture)miniFut;
Set<IgniteTxKey> requestedKeys = U.newHashSet(mini.keys.size());
for (KeyCacheObject key : mini.keys)
requestedKeys.add(new IgniteTxKey(key, cctx.cacheId()));
return requestedKeys;
}
}
return null;
}
/**
* Finds pending mini future by the given mini ID.
*
* @param miniId Mini ID to find.
* @return Mini future.
*/
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
for (int i = 0; i < size; i++) {
IgniteInternalFuture<Boolean> fut = future(i);
if (!isMini(fut))
continue;
MiniFuture mini = (MiniFuture)fut;
if (mini.futureId() == miniId) {
if (!mini.isDone())
return mini;
else
return null;
}
}
}
return null;
}
/**
* @param t Error.
*/
private void onError(Throwable t) {
synchronized (this) {
if (err == null)
err = t;
}
}
/**
* @param cached Entry to check.
* @return {@code True} if filter passed.
*/
private boolean filter(GridCacheEntryEx cached) {
try {
if (!cctx.isAll(cached, filter)) {
if (log.isDebugEnabled())
log.debug("Filter didn't pass for entry (will fail lock): " + cached);
onFailed(true);
return false;
}
return true;
}
catch (IgniteCheckedException e) {
onError(e);
return false;
}
}
/**
* Callback for whenever entry lock ownership changes.
*
* @param entry Entry whose lock ownership changed.
*/
@Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
if (owner != null && owner.nearLocal() && owner.version().equals(lockVer)) {
onDone(true);
return true;
}
return false;
}
/**
* @return {@code True} if locks have been acquired.
*/
private boolean checkLocks() {
if (!isDone() && initialized() && !hasPending()) {
synchronized (this) {
for (int i = 0; i < entries.size(); i++) {
while (true) {
GridCacheEntryEx cached = entries.get(i);
try {
if (!locked(cached)) {
if (log.isDebugEnabled())
log.debug("Lock is still not acquired for entry (will keep waiting) [entry=" +
cached + ", fut=" + this + ']');
return false;
}
break;
}
// Possible in concurrent cases, when owner is changed after locks
// have been released or cancelled.
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry in onOwnerChanged method (will retry): " + cached);
// Replace old entry with new one.
entries.set(
i,
(GridDistributedCacheEntry)cctx.cache().entryEx(cached.key()));
}
}
}
if (log.isDebugEnabled())
log.debug("Local lock acquired for entries [fut=" + this + ", entries=" + entries + "]");
}
onComplete(true, true);
return true;
}
return false;
}
/**
* Cancellation has special meaning for lock futures. It's called then lock must be released on rollback.
*/
@Override public boolean cancel() {
if (inTx())
onError(tx.rollbackException());
return onComplete(false, true);
}
/** {@inheritDoc} */
@Override public boolean onDone(Boolean success, Throwable err) {
if (log.isDebugEnabled())
log.debug("Received onDone(..) callback [success=" + success + ", err=" + err + ", fut=" + this + ']');
if (inTx() && cctx.tm().deadlockDetectionEnabled() &&
(this.err instanceof IgniteTxTimeoutCheckedException || timedOut))
return false;
// If locks were not acquired yet, delay completion.
if (isDone() || (err == null && success && !checkLocks()))
return false;
if (err != null && !(err instanceof GridCacheLockTimeoutException))
onError(err);
if (err != null)
success = false;
return onComplete(success, true);
}
/**
* Completeness callback.
*
* @param success {@code True} if lock was acquired.
* @param distribute {@code True} if need to distribute lock removal in case of failure.
* @return {@code True} if complete by this operation.
*/
private boolean onComplete(boolean success, boolean distribute) {
if (log.isDebugEnabled()) {
log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute +
", fut=" + this + ']');
}
if (!DONE_UPD.compareAndSet(this, 0, 1))
return false;
if (!success)
undoLocks(distribute, true);
if (tx != null) {
cctx.tm().txContext(tx);
if (success)
tx.clearLockFuture(this);
}
if (super.onDone(success, err)) {
if (log.isDebugEnabled())
log.debug("Completing future: " + this);
// Clean up.
cctx.mvcc().removeVersionedFuture(this);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return futId.hashCode();
}
/** {@inheritDoc} */
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
if (isMini(f)) {
MiniFuture m = (MiniFuture)f;
return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
}
else
return "[loc=true, done=" + f.isDone() + "]";
}
});
return S.toString(GridNearLockFuture.class, this,
"innerFuts", futs,
"inTx", inTx(),
"super", super.toString());
}
/**
* @param f Future.
* @return {@code True} if mini-future.
*/
private boolean isMini(IgniteInternalFuture<?> f) {
return f.getClass().equals(MiniFuture.class);
}
/**
* Basically, future mapping consists from two parts. First, we must determine the topology version this future
* will map on. Locking is performed within a user transaction, we must continue to map keys on the same
* topology version as it started. If topology version is undefined, we get current topology future and wait
* until it completes so the topology is ready to use.
* <p/>
* During the second part we map keys to primary nodes using topology snapshot we obtained during the first
* part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back.
*/
void map() {
if (isDone()) // Possible due to async rollback.
return;
if (timeout > 0) {
timeoutObj = new LockTimeoutObject();
cctx.time().addTimeoutObject(timeoutObj);
}
boolean added = cctx.mvcc().addFuture(this);
assert added : this;
// Obtain the topology version to use.
long threadId = Thread.currentThread().getId();
AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
if (topVer == null && tx != null && tx.system())
topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
if (topVer != null && tx != null)
tx.topologyVersion(topVer);
if (topVer == null && tx != null)
topVer = tx.topologyVersionSnapshot();
if (topVer != null) {
for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) {
if (fut.exchangeDone() && fut.topologyVersion().equals(topVer)){
Throwable err = fut.validateCache(cctx, recovery, read, null, keys);
if (err != null) {
onDone(err);
return;
}
break;
}
}
// Continue mapping on the same topology version as it was before.
if (this.topVer == null)
this.topVer = topVer;
map(keys, false, true);
markInitialized();
return;
}
// Must get topology snapshot and map on that version.
mapOnTopology(false);
}
/**
* Acquires topology future and checks it completeness under the read lock. If it is not complete,
* will asynchronously wait for it's completeness and then try again.
*
* @param remap Remap flag.
*/
synchronized void mapOnTopology(final boolean remap) {
// We must acquire topology snapshot from the topology version future.
cctx.topology().readLock();
try {
if (cctx.topology().stopping()) {
onDone(new CacheStoppedException(cctx.name()));
return;
}
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
Throwable err = fut.validateCache(cctx, recovery, read, null, keys);
if (err != null) {
onDone(err);
return;
}
AffinityTopologyVersion topVer = fut.topologyVersion();
if (remap) {
if (tx != null)
tx.onRemap(topVer);
this.topVer = topVer;
}
else {
if (tx != null)
tx.topologyVersion(topVer);
if (this.topVer == null)
this.topVer = topVer;
}
map(keys, remap, false);
markInitialized();
}
else {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
try {
fut.get();
mapOnTopology(remap);
}
catch (IgniteCheckedException e) {
onDone(e);
}
finally {
cctx.shared().txContextReset();
}
}
});
}
}
finally {
cctx.topology().readUnlock();
}
}
/**
* Maps keys to nodes. Note that we can not simply group keys by nodes and send lock request as
* such approach does not preserve order of lock acquisition. Instead, keys are split in continuous
* groups belonging to one primary node and locks for these groups are acquired sequentially.
*
* @param keys Keys.
* @param remap Remap flag.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
*/
private void map(Iterable<KeyCacheObject> keys, boolean remap, boolean topLocked) {
try {
AffinityTopologyVersion topVer = this.topVer;
assert topVer != null;
assert topVer.topologyVersion() > 0 : topVer;
if (CU.affinityNodes(cctx, topVer).isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " +
"partition nodes left the grid)."));
return;
}
boolean clientNode = cctx.kernalContext().clientNode();
assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
synchronized (this) {
mappings = new ArrayDeque<>();
// Assign keys to primary nodes.
GridNearLockMapping map = null;
for (KeyCacheObject key : keys) {
GridNearLockMapping updated = map(
key,
map,
topVer);
// If new mapping was created, add to collection.
if (updated != map) {
mappings.add(updated);
if (tx != null && updated.node().isLocal())
tx.nearLocallyMapped(true);
}
map = updated;
}
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Abandoning (re)map because future is done: " + this);
return;
}
if (log.isDebugEnabled())
log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
boolean first = true;
// Create mini futures.
for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
GridNearLockMapping mapping = iter.next();
ClusterNode node = mapping.node();
Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
assert !mappedKeys.isEmpty();
GridNearLockRequest req = null;
Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
boolean explicit = false;
for (KeyCacheObject key : mappedKeys) {
IgniteTxKey txKey = cctx.txKey(key);
while (true) {
GridNearCacheEntry entry = null;
try {
entry = cctx.near().entryExx(key, topVer);
if (!cctx.isAll(entry, filter)) {
if (log.isDebugEnabled())
log.debug("Entry being locked did not pass filter (will not lock): " + entry);
onComplete(false, false);
return;
}
// Removed exception may be thrown here.
GridCacheMvccCandidate cand = addEntry(
topVer,
entry,
node.id());
if (isDone()) {
if (log.isDebugEnabled())
log.debug("Abandoning (re)map because future is done after addEntry attempt " +
"[fut=" + this + ", entry=" + entry + ']');
return;
}
if (cand != null) {
if (tx == null && !cand.reentry())
cctx.mvcc().addExplicitLock(threadId,cand,topVer);
IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
if (val == null) {
GridDhtCacheEntry dhtEntry = dht().peekExx(key);
try {
if (dhtEntry != null)
val = dhtEntry.versionedValue(topVer);
}
catch (GridCacheEntryRemovedException ignored) {
assert dhtEntry.obsolete() : dhtEntry;
if (log.isDebugEnabled())
log.debug("Got removed exception for DHT entry in map (will ignore): "
+ dhtEntry);
}
}
GridCacheVersion dhtVer = null;
if (val != null) {
dhtVer = val.get1();
valMap.put(key, val);
}
if (!cand.reentry()) {
if (req == null) {
boolean clientFirst = false;
if (first) {
clientFirst = clientNode &&
!topLocked &&
(tx == null || !tx.hasRemoteLocks());
first = false;
}
assert !implicitTx() && !implicitSingleTx() : tx;
req = new GridNearLockRequest(
cctx.cacheId(),
topVer,
cctx.nodeId(),
threadId,
futId,
lockVer,
inTx(),
read,
retval,
isolation(),
isInvalidate(),
timeout,
mappedKeys.size(),
inTx() ? tx.size() : mappedKeys.size(),
inTx() && tx.syncMode() == FULL_SYNC,
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? createTtl : -1L,
read ? accessTtl : -1L,
skipStore,
keepBinary,
clientFirst,
true,
cctx.deploymentEnabled());
mapping.request(req);
}
distributedKeys.add(key);
if (tx != null)
tx.addKeyMapping(txKey, mapping.node());
req.addKeyBytes(
key,
retval && dhtVer == null,
dhtVer,
// Include DHT version to match remote DHT entry.
cctx);
}
if (cand.reentry())
explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
}
else {
if (timedOut)
return;
// Ignore reentries within transactions.
explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
}
if (explicit)
tx.addKeyMapping(txKey, mapping.node());
break;
}
catch (GridCacheEntryRemovedException ignored) {
assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
if (log.isDebugEnabled())
log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
}
}
// Mark mapping explicit lock flag.
if (explicit) {
boolean marked = tx != null && tx.markExplicit(node.id());
assert tx == null || marked;
}
}
if (!distributedKeys.isEmpty())
mapping.distributedKeys(distributedKeys);
else {
assert mapping.request() == null;
iter.remove();
}
}
}
cctx.mvcc().recheckPendingLocks();
proceedMapping();
}
catch (IgniteCheckedException ex) {
onError(ex);
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void proceedMapping() throws IgniteCheckedException {
boolean set = tx != null && cctx.shared().tm().setTxTopologyHint(tx.topologyVersionSnapshot());
try {
proceedMapping0();
}
finally {
if (set)
cctx.tm().setTxTopologyHint(null);
}
}
/**
* Gets next near lock mapping and either acquires dht locks locally or sends near lock request to
* remote primary node.
*
* @throws IgniteCheckedException If mapping can not be completed.
*/
@SuppressWarnings("unchecked")
private void proceedMapping0()
throws IgniteCheckedException {
GridNearLockMapping map;
synchronized (this) {
map = mappings.poll();
}
// If there are no more mappings to process, complete the future.
if (map == null)
return;
final GridNearLockRequest req = map.request();
final Collection<KeyCacheObject> mappedKeys = map.distributedKeys();
final ClusterNode node = map.node();
if (filter != null && filter.length != 0)
req.filter(filter, cctx);
if (node.isLocal()) {
req.miniId(-1);
if (log.isDebugEnabled())
log.debug("Before locally locking near request: " + req);
IgniteInternalFuture<GridNearLockResponse> fut = dht().lockAllAsync(cctx, cctx.localNode(), req, filter);
// Add new future.
add(new GridEmbeddedFuture<>(
new C2<GridNearLockResponse, Exception, Boolean>() {
@Override public Boolean apply(GridNearLockResponse res, Exception e) {
if (CU.isLockTimeoutOrCancelled(e) ||
(res != null && CU.isLockTimeoutOrCancelled(res.error())))
return false;
if (e != null) {
onError(e);
return false;
}
if (res == null) {
onError(new IgniteCheckedException("Lock response is null for future: " + this));
return false;
}
if (res.error() != null) {
onError(res.error());
return false;
}
if (log.isDebugEnabled())
log.debug("Acquired lock for local DHT mapping [locId=" + cctx.nodeId() +
", mappedKeys=" + mappedKeys + ", fut=" + GridNearLockFuture.this + ']');
try {
int i = 0;
for (KeyCacheObject k : mappedKeys) {
while (true) {
GridNearCacheEntry entry = cctx.near().entryExx(k, req.topologyVersion());
try {
IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup =
valMap.get(entry.key());
boolean hasBytes = entry.hasValue();
CacheObject oldVal = entry.rawGet();
CacheObject newVal = res.value(i);
GridCacheVersion dhtVer = res.dhtVersion(i);
GridCacheVersion mappedVer = res.mappedVersion(i);
// On local node don't record twice if DHT cache already recorded.
boolean record = retval && oldValTup != null && oldValTup.get1().equals(dhtVer);
if (newVal == null) {
if (oldValTup != null) {
if (oldValTup.get1().equals(dhtVer))
newVal = oldValTup.get2();
oldVal = oldValTup.get2();
}
}
// Lock is held at this point, so we can set the
// returned value if any.
entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
entry.readyNearLock(lockVer, mappedVer, res.committedVersions(),
res.rolledbackVersions(), res.pending());
if (inTx() && implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
}
if (record) {
if (cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
cctx.events().addEvent(
entry.partition(),
entry.key(),
tx,
null,
EVT_CACHE_OBJECT_READ,
newVal,
newVal != null,
oldVal,
hasBytes,
CU.subjectId(tx, cctx.shared()),
null,
inTx() ? tx.resolveTaskName() : null,
keepBinary);
if (cctx.statisticsEnabled())
cctx.cache().metrics0().onRead(oldVal != null);
}
if (log.isDebugEnabled())
log.debug("Processed response for entry [res=" + res +
", entry=" + entry + ']');
break; // Inner while loop.
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to add candidates because entry was " +
"removed (will renew).");
synchronized (GridNearLockFuture.this) {
// Replace old entry with new one.
entries.set(i,
(GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
}
}
}
i++; // Increment outside of while loop.
}
// Proceed and add new future (if any) before completing embedded future.
proceedMapping();
}
catch (IgniteCheckedException ex) {
onError(ex);
return false;
}
return true;
}
},
fut));
}
else {
final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId);
req.miniId(fut.futureId());
add(fut); // Append new future.
IgniteInternalFuture<?> txSync = null;
if (inTx())
txSync = cctx.tm().awaitFinishAckAsync(node.id(), tx.threadId());
if (txSync == null || txSync.isDone()) {
try {
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyCheckedException ex) {
fut.onResult(ex);
}
}
else {
txSync.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> t) {
try {
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyCheckedException ex) {
fut.onResult(ex);
}
catch (IgniteCheckedException e) {
onError(e);
}
}
});
}
}
}
/**
* @param mapping Mappings.
* @param key Key to map.
* @param topVer Topology version.
* @return Near lock mapping.
* @throws IgniteCheckedException If mapping for key failed.
*/
private GridNearLockMapping map(
KeyCacheObject key,
@Nullable GridNearLockMapping mapping,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
assert mapping == null || mapping.node() != null;
ClusterNode primary = cctx.affinity().primaryByKey(key, topVer);
if (primary == null)
throw new ClusterTopologyServerNotFoundException("Failed to lock keys " +
"(all partition nodes left the grid).");
if (cctx.discovery().node(primary.id()) == null)
// If primary node left the grid before lock acquisition, fail the whole future.
throw newTopologyException(null, primary.id());
if (mapping == null || !primary.id().equals(mapping.node().id()))
mapping = new GridNearLockMapping(primary, key);
else
mapping.addKey(key);
return mapping;
}
/**
* @return DHT cache.
*/
private GridDhtTransactionalCacheAdapter<?, ?> dht() {
return cctx.nearTx().dht();
}
/**
* Creates new topology exception for cases when primary node leaves grid during mapping.
*
* @param nested Optional nested exception.
* @param nodeId Node ID.
* @return Topology exception with user-friendly message.
*/
private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
"(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer));
return topEx;
}
/**
* Lock request timeout object.
*/
private class LockTimeoutObject extends GridTimeoutObjectAdapter {
/**
* Default constructor.
*/
LockTimeoutObject() {
super(timeout);
}
/** Requested keys. */
private Set<IgniteTxKey> requestedKeys;
/** {@inheritDoc} */
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
@Override public void onTimeout() {
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
timedOut = true;
if (inTx()) {
if (cctx.tm().deadlockDetectionEnabled()) {
synchronized (GridNearLockFuture.this) {
requestedKeys = requestedKeys0();
clear(); // Stop response processing.
}
Set<IgniteTxKey> keys = new HashSet<>();
for (IgniteTxEntry txEntry : tx.allEntries()) {
if (!txEntry.locked())
keys.add(txEntry.txKey());
}
IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys);
fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() {
@Override public void apply(IgniteInternalFuture<TxDeadlock> fut) {
try {
TxDeadlock deadlock = fut.get();
err = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided " +
"timeout for transaction [timeout=" + tx.timeout() + ", tx=" + CU.txString(tx) + ']',
deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx.shared())) :
null);
}
catch (IgniteCheckedException e) {
err = e;
U.warn(log, "Failed to detect deadlock.", e);
}
synchronized (LockTimeoutObject.this) {
onComplete(false, true);
}
}
});
}
else
err = tx.timeoutException();
}
else {
synchronized (this) {
onComplete(false, true);
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(LockTimeoutObject.class, this);
}
}
/**
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
/** */
private final int futId;
/** Node ID. */
@GridToStringExclude
private ClusterNode node;
/** Keys. */
@GridToStringInclude(sensitive = true)
private Collection<KeyCacheObject> keys;
/** */
private boolean rcvRes;
/**
* @param node Node.
* @param keys Keys.
* @param futId Mini future ID.
*/
MiniFuture(
ClusterNode node,
Collection<KeyCacheObject> keys,
int futId
) {
this.node = node;
this.keys = keys;
this.futId = futId;
}
/**
* @return Future ID.
*/
int futureId() {
return futId;
}
/**
* @return Node ID.
*/
public ClusterNode node() {
return node;
}
/**
* @return Keys.
*/
public Collection<KeyCacheObject> keys() {
return keys;
}
/**
* @param e Node left exception.
*/
void onResult(ClusterTopologyCheckedException e) {
if (isDone())
return;
synchronized (this) {
if (!rcvRes)
rcvRes = true;
else
return;
}
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
if (tx != null)
tx.removeMapping(node.id());
// Primary node left the grid, so fail the future.
GridNearLockFuture.this.onDone(false, newTopologyException(e, node.id()));
onDone(true);
}
/**
* @param res Result callback.
*/
void onResult(GridNearLockResponse res) {
synchronized (this) {
if (!rcvRes)
rcvRes = true;
else
return;
}
if (res.error() != null) {
if (inTx() && cctx.tm().deadlockDetectionEnabled() &&
(res.error() instanceof IgniteTxTimeoutCheckedException || tx.remainingTime() == -1))
return;
if (log.isDebugEnabled())
log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
", res=" + res + ']');
// Fail.
if (res.error() instanceof GridCacheLockTimeoutException)
onDone(false);
else
onDone(res.error());
return;
}
if (res.clientRemapVersion() != null) {
assert cctx.kernalContext().clientNode();
IgniteInternalFuture<?> affFut =
cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
if (affFut != null && !affFut.isDone()) {
affFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
fut.get();
remap();
}
catch (IgniteCheckedException e) {
onDone(e);
}
finally {
cctx.shared().txContextReset();
}
}
});
}
else
remap();
}
else {
int i = 0;
AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer;
for (KeyCacheObject k : keys) {
while (true) {
GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
try {
if (res.dhtVersion(i) == null) {
onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
"(will fail the lock): " + res));
return;
}
IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
CacheObject oldVal = entry.rawGet();
boolean hasOldVal = false;
CacheObject newVal = res.value(i);
boolean readRecordable = false;
if (retval) {
readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
if (readRecordable)
hasOldVal = entry.hasValue();
}
GridCacheVersion dhtVer = res.dhtVersion(i);
GridCacheVersion mappedVer = res.mappedVersion(i);
if (newVal == null) {
if (oldValTup != null) {
if (oldValTup.get1().equals(dhtVer))
newVal = oldValTup.get2();
oldVal = oldValTup.get2();
}
}
// Lock is held at this point, so we can set the
// returned value if any.
entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
if (inTx()) {
tx.hasRemoteLocks(true);
if (implicitTx() && tx.onePhaseCommit()) {
boolean pass = res.filterResult(i);
tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
}
}
entry.readyNearLock(lockVer,
mappedVer,
res.committedVersions(),
res.rolledbackVersions(),
res.pending());
if (retval) {
if (readRecordable)
cctx.events().addEvent(
entry.partition(),
entry.key(),
tx,
null,
EVT_CACHE_OBJECT_READ,
newVal,
newVal != null,
oldVal,
hasOldVal,
CU.subjectId(tx, cctx.shared()),
null,
inTx() ? tx.resolveTaskName() : null,
keepBinary);
if (cctx.statisticsEnabled())
cctx.cache().metrics0().onRead(false);
}
if (log.isDebugEnabled())
log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
break; // Inner while loop.
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Failed to add candidates because entry was removed (will renew).");
synchronized (GridNearLockFuture.this) {
// Replace old entry with new one.
entries.set(i,
(GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
}
}
}
i++;
}
try {
proceedMapping();
}
catch (IgniteCheckedException e) {
onDone(e);
}
onDone(true);
}
}
/**
*
*/
private void remap() {
undoLocks(false, false);
mapOnTopology(true);
onDone(true);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());
}
}
}