blob: 3ec833a29f7825fc8bbbe98fc4c82321286282f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEADLOCK_MAX_ITERS;
/**
* Transactions deadlock detection.
*/
public class TxDeadlockDetection {
/** @see IgniteSystemProperties#IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT */
public static final int DFLT_TX_DEADLOCK_DETECTION_TIMEOUT = 60000;
/** Deadlock detection maximum iterations. */
private static int deadLockTimeout =
getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, DFLT_TX_DEADLOCK_DETECTION_TIMEOUT);
/** Sequence. */
private static final AtomicLong SEQ = new AtomicLong();
/** Cctx. */
private final GridCacheSharedContext cctx;
/** Logger. */
private final IgniteLogger log;
/**
* @param cctx Context.
*/
public TxDeadlockDetection(GridCacheSharedContext<?, ?> cctx) {
this.cctx = cctx;
this.log = cctx.logger(TxDeadlockDetection.class);
}
/**
* Detects deadlock starting from given keys.
*
* @param tx Target tx.
* @param keys Keys.
* @return {@link TxDeadlock} if found, otherwise - {@code null}.
*/
TxDeadlockFuture detectDeadlock(IgniteInternalTx tx, Set<IgniteTxKey> keys) {
GridCacheVersion txId = tx.nearXidVersion();
if (log.isDebugEnabled()) {
log.debug("Deadlock detection started " +
"[nodeId=" + cctx.localNodeId() + ", xidVersion=" + txId + ", keys=" + keys + ']');
}
TxDeadlockFuture fut = new TxDeadlockFuture(cctx, txId, tx.topologyVersion(), keys);
fut.init();
return fut;
}
/**
* @param wfg Wait-for-graph.
* @param txId Tx ID - start vertex for cycle search in graph.
*/
static List<GridCacheVersion> findCycle(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion txId) {
if (wfg == null || wfg.isEmpty())
return null;
ArrayDeque<GridCacheVersion> stack = new ArrayDeque<>();
Set<GridCacheVersion> inPath = new HashSet<>();
Set<GridCacheVersion> visited = new HashSet<>();
Map<GridCacheVersion, GridCacheVersion> edgeTo = new HashMap<>();
stack.push(txId);
while (!stack.isEmpty()) {
GridCacheVersion v = stack.peek();
if (visited.contains(v)) {
stack.pop();
inPath.remove(v);
continue;
}
visited.add(v);
Set<GridCacheVersion> children = wfg.get(v);
if (children == null || children.isEmpty()) {
stack.pop();
inPath.remove(v);
continue;
}
inPath.add(v);
for (GridCacheVersion w : children) {
if (inPath.contains(w) && visited.contains(w)) {
List<GridCacheVersion> cycle = new ArrayList<>();
for (GridCacheVersion x = v; !x.equals(w); x = edgeTo.get(x))
cycle.add(x);
cycle.add(w);
cycle.add(v);
return cycle;
}
edgeTo.put(w, v);
stack.push(w);
}
}
return null;
}
/**
*
*/
static class TxDeadlockFuture extends GridFutureAdapter<TxDeadlock> {
/** Context. */
private final GridCacheSharedContext cctx;
/** Future ID. */
private final long futId = SEQ.incrementAndGet();
/** Tx ID. */
private final GridCacheVersion txId;
/** Keys. */
private final Set<IgniteTxKey> keys;
/** Processed keys. */
@GridToStringInclude
private final Set<IgniteTxKey> processedKeys = new HashSet<>();
/** Processed nodes. */
private final Set<UUID> processedNodes = new HashSet<>();
/** Pending keys. */
@GridToStringInclude
private Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>();
/** Nodes queue. */
@GridToStringInclude
private final UniqueDeque<UUID> nodesQueue = new UniqueDeque<>();
/** Preferred nodes. */
private final Set<UUID> preferredNodes = new HashSet<>();
/** Tx locked keys. */
private final Map<GridCacheVersion, Set<IgniteTxKey>> txLockedKeys = new HashMap<>();
/** Tx requested keys. */
private final Map<IgniteTxKey, Set<GridCacheVersion>> txRequestedKeys = new HashMap<>();
/** Wait-for-graph. */
private final Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<>();
/** Topology version. */
private final AffinityTopologyVersion topVer;
/** Transactions. */
private final Map<GridCacheVersion, T2<UUID, Long>> txs = new HashMap<>();
/** Current processing node ID. */
private UUID curNodeId;
/** Iterations count. */
private int itersCnt;
/** Timeout object. */
@GridToStringExclude
private DeadlockTimeoutObject timeoutObj;
/** Timed out flag. */
private volatile boolean timedOut;
/**
* @param cctx Context.
* @param txId Tx ID.
* @param topVer Transaction topology version.
* @param keys Keys.
*/
private TxDeadlockFuture(GridCacheSharedContext cctx,
GridCacheVersion txId,
AffinityTopologyVersion topVer,
Set<IgniteTxKey> keys) {
this.cctx = cctx;
this.txId = txId;
this.topVer = topVer;
this.keys = keys;
if (deadLockTimeout > 0) {
timeoutObj = new DeadlockTimeoutObject();
cctx.time().addTimeoutObject(timeoutObj);
}
}
/**
* @return Future ID.
*/
long futureId() {
return futId;
}
/**
* @param nodeId Node ID.
*/
public void onNodeLeft(UUID nodeId) {
if (compareAndSet(nodeId, null)) {
IgniteLogger log = cctx.logger(TxDeadlockDetection.class);
if (log.isDebugEnabled())
log.debug("Failed to finish deadlock detection, node left: " + nodeId);
onDone();
}
}
/** */
private void init() {
cctx.tm().addFuture(this);
if (topVer == null) // Tx manager already stopped
onDone();
else
map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap());
}
/**
* @param keys Keys.
* @param txLocks Tx locks.
*/
private void map(@Nullable Set<IgniteTxKey> keys, Map<IgniteTxKey, TxLockList> txLocks) {
mapTxKeys(keys, txLocks);
UUID nodeId = nodesQueue.pollFirst();
boolean set = compareAndSet(null, nodeId);
assert set;
if (nodeId == null || itersCnt++ >= DEADLOCK_MAX_ITERS || timedOut)
onDone();
else {
final Set<IgniteTxKey> txKeys = pendingKeys.get(nodeId);
processedKeys.addAll(txKeys);
processedNodes.add(nodeId);
pendingKeys.remove(nodeId);
cctx.tm().txLocksInfo(nodeId, this, txKeys);
}
}
/**
* @param res Response.
*/
private void detect(TxLocksResponse res) {
assert res != null;
merge(res);
updateWaitForGraph(res.txLocks());
List<GridCacheVersion> cycle = findCycle(wfg, txId);
if (cycle != null)
onDone(new TxDeadlock(cycle, txs, txLockedKeys, txRequestedKeys));
else
map(res.keys(), res.txLocks());
}
/**
* Maps tx keys on nodes. Key can be mapped on some node if this node is primary for given key or
* node is near for transaction that holds or requests lock for key.
*
* Key will not be be mapped to node if both key and node are already handled.
*
* @param txKeys Tx keys.
* @param txLocks Tx locks.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
private void mapTxKeys(@Nullable Set<IgniteTxKey> txKeys, Map<IgniteTxKey, TxLockList> txLocks) {
for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) {
List<TxLock> locks = e.getValue().txLocks();
for (int i = 0; i < locks.size(); i++) {
TxLock txLock = locks.get(i);
UUID nearNodeId = txLock.nearNodeId();
IgniteTxKey txKey = e.getKey();
if (processedKeys.contains(txKey) && processedNodes.contains(nearNodeId))
continue;
if (txLock.requested()) {
UUID nodeId = primary(txKey);
// Process this node earlier than other in order to optimize amount of requests.
preferredNodes.add(nodeId);
Set<IgniteTxKey> mappedKeys = pendingKeys.get(nodeId);
if (mappedKeys == null)
pendingKeys.put(nodeId, mappedKeys = new HashSet<>());
mappedKeys.add(txKey);
}
else {
if (txLock.owner()) {
if (!preferredNodes.contains(nearNodeId))
nodesQueue.addFirst(nearNodeId);
}
else
nodesQueue.addLast(nearNodeId);
Set<IgniteTxKey> mappedKeys = pendingKeys.get(nearNodeId);
if (mappedKeys == null)
pendingKeys.put(nearNodeId, mappedKeys = new HashSet<>());
mappedKeys.add(txKey);
}
}
}
for (UUID nodeId : preferredNodes)
nodesQueue.addFirst(nodeId);
preferredNodes.clear();
if (txKeys != null) {
for (IgniteTxKey txKey : txKeys) {
UUID nodeId = primary(txKey);
if (processedKeys.contains(txKey) && processedNodes.contains(nodeId))
continue;
nodesQueue.addLast(nodeId);
Set<IgniteTxKey> mappedKeys = pendingKeys.get(nodeId);
if (mappedKeys == null)
pendingKeys.put(nodeId, mappedKeys = new HashSet<>());
mappedKeys.add(txKey);
}
}
}
/**
* @param txKey Tx key.
* @return Primary node ID.
*/
private UUID primary(IgniteTxKey txKey) {
GridCacheContext ctx = cctx.cacheContext(txKey.cacheId());
ClusterNode node = ctx.affinity().primaryByKey(txKey.key(), topVer);
assert node != null : topVer;
return node.id();
}
/**
* @param res Tx locks.
*/
private void merge(TxLocksResponse res) {
Map<IgniteTxKey, TxLockList> txLocks = res.txLocks();
if (txLocks == null || txLocks.isEmpty())
return;
for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) {
IgniteTxKey txKey = e.getKey();
TxLockList lockList = e.getValue();
if (lockList != null && !lockList.isEmpty()) {
for (TxLock lock : lockList.txLocks()) {
if (lock.owner() || lock.candiate()) {
if (txs.get(lock.txId()) == null)
txs.put(lock.txId(), new T2<>(lock.nearNodeId(), lock.threadId()));
}
if (lock.owner()) {
GridCacheVersion txId = lock.txId();
Set<IgniteTxKey> keys = txLockedKeys.get(txId);
if (keys == null)
txLockedKeys.put(txId, keys = new HashSet<>());
keys.add(txKey);
}
else if (lock.candiate()) {
Set<GridCacheVersion> txs = txRequestedKeys.get(txKey);
if (txs == null)
txRequestedKeys.put(txKey, txs = new HashSet<>());
txs.add(lock.txId());
}
}
}
}
}
/**
* @param txLocks Tx locks.
*/
private void updateWaitForGraph(Map<IgniteTxKey, TxLockList> txLocks) {
if (txLocks == null || txLocks.isEmpty())
return;
for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) {
GridCacheVersion txOwner = null;
for (TxLock lock : e.getValue().txLocks()) {
if (lock.owner() && txOwner == null) {
// Actually we can get lock list with more than one owner. In this case ignore all owners
// except first because likely the first owner was cause of deadlock.
txOwner = lock.txId();
if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) {
Set<GridCacheVersion> waitingTxs = wfg.get(txId);
if (waitingTxs == null)
wfg.put(txId, waitingTxs = new HashSet<>());
waitingTxs.add(lock.txId());
}
continue;
}
if (lock.candiate() || lock.owner()) {
GridCacheVersion txId0 = lock.txId();
Set<GridCacheVersion> waitForTxs = wfg.get(txId0);
if (waitForTxs == null)
wfg.put(txId0, waitForTxs = new HashSet<>());
waitForTxs.add(txOwner);
}
}
}
}
/**
* @param res Response.
*/
public void onResult(UUID nodeId, TxLocksResponse res) {
boolean set = compareAndSet(nodeId, null);
if (res != null && set) {
if (res.classError() != null) {
IgniteLogger log = cctx.kernalContext().log(this.getClass());
U.warn(log, "Failed to finish deadlock detection due to an error: " + nodeId);
onDone();
}
else
detect(res);
}
else
onDone();
}
/**
* @param exp Expected.
* @param val Value.
*/
private boolean compareAndSet(UUID exp, UUID val) {
synchronized (this) {
if (Objects.equals(curNodeId, exp)) {
curNodeId = val;
return true;
}
}
return false;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable TxDeadlock res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
cctx.tm().removeFuture(futId);
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TxDeadlockFuture.class, this);
}
/**
* Lock request timeout object.
*/
private class DeadlockTimeoutObject extends GridTimeoutObjectAdapter {
/**
* Default constructor.
*/
DeadlockTimeoutObject() {
super(deadLockTimeout);
}
/** {@inheritDoc} */
@Override public void onTimeout() {
timedOut = true;
IgniteLogger log = cctx.kernalContext().log(this.getClass());
U.warn(log, "Deadlock detection was timed out [timeout=" + deadLockTimeout + ", fut=" + this + ']');
onDone();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DeadlockTimeoutObject.class, this);
}
}
}
/**
* Deque with Set semantic.
* Only overridden methods can be used.
*/
private static class UniqueDeque<E> extends ArrayDeque<E> {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
/** Items. */
private final Set<E> items = new HashSet<>();
/** {@inheritDoc} */
@Override public void addFirst(E e) {
boolean contains, first = false;
if ((contains = items.contains(e)) && !(first = getFirst().equals(e)))
remove(e);
if (!contains)
items.add(e);
if (!first)
super.addFirst(e);
}
/** {@inheritDoc} */
@Override public void addLast(E e) {
if (!items.contains(e)) {
super.addLast(e);
items.add(e);
}
}
/** {@inheritDoc} */
@Override public E pollFirst() {
E e = super.pollFirst();
items.remove(e);
return e;
}
}
}