blob: 880a4cc71bb7d20311b2d84a9468b8034e7bfe08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxAbstractEnlistFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import static java.util.Collections.singleton;
import static org.apache.ignite.internal.GridTopic.TOPIC_DEADLOCK_DETECTION;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.belongToSameTx;
/**
* Component participating in deadlock detection in a cluster. Detection process is collaborative and it is performed
* by relaying special probe messages from waiting transaction to it's blocker.
* <p>
* Ideas for used detection algorithm are borrowed from Chandy-Misra-Haas deadlock detection algorithm for resource
* model.
* <p>
* Current implementation assumes that transactions obeys 2PL.
*/
public class DeadlockDetectionManager extends GridCacheSharedManagerAdapter {
/** */
private long detectionStartDelay;
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
detectionStartDelay = cctx.kernalContext().config().getTransactionConfiguration().getDeadlockTimeout();
cctx.gridIO().addMessageListener(TOPIC_DEADLOCK_DETECTION, (nodeId, msg, plc) -> {
if (msg instanceof DeadlockProbe) {
if (log.isDebugEnabled())
log.debug("Received a probe message [msg=" + msg + ']');
DeadlockProbe msg0 = (DeadlockProbe)msg;
handleDeadlockProbe(msg0);
}
else
log.warning("Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
});
}
/**
* Starts a dedlock detection after a delay.
*
* @param waiterVer Version of the waiting transaction.
* @param blockerVer Version of the waited for transaction.
* @return Cancellable computation.
*/
public DelayedDeadlockComputation initDelayedComputation(MvccVersion waiterVer, MvccVersion blockerVer) {
if (detectionStartDelay <= 0)
return null;
return new DelayedDeadlockComputation(waiterVer, blockerVer, detectionStartDelay);
}
/**
* Starts a deadlock detection for a given pair of transaction versions (wait-for edge).
*
* @param waiterVer Version of the waiting transaction.
* @param blockerVer Version of the waited for transaction.
*/
private void startComputation(MvccVersion waiterVer, MvccVersion blockerVer) {
if (log.isDebugEnabled())
log.debug("Starting deadlock detection [waiterVer=" + waiterVer + ", blockerVer=" + blockerVer + ']');
Optional<GridDhtTxLocalAdapter> waitingTx = findTx(waiterVer);
Optional<GridDhtTxLocalAdapter> blockerTx = findTx(blockerVer);
if (waitingTx.isPresent() && blockerTx.isPresent()) {
GridDhtTxLocalAdapter wTx = waitingTx.get();
GridDhtTxLocalAdapter bTx = blockerTx.get();
sendProbe(
bTx.eventNodeId(),
wTx.xidVersion(),
// real start time will be filled later when corresponding near node is visited
singleton(new ProbedTx(wTx.nodeId(), wTx.xidVersion(), wTx.nearXidVersion(), -1, wTx.lockCounter())),
new ProbedTx(bTx.nodeId(), bTx.xidVersion(), bTx.nearXidVersion(), -1, bTx.lockCounter()),
true);
}
}
/** */
private Optional<GridDhtTxLocalAdapter> findTx(MvccVersion mvccVer) {
return cctx.tm().activeTransactions().stream()
.filter(tx -> tx.local() && tx.mvccSnapshot() != null)
.filter(tx -> belongToSameTx(mvccVer, tx.mvccSnapshot()))
.map(GridDhtTxLocalAdapter.class::cast)
.findAny();
}
/**
* Handles received deadlock probe. Possible outcomes:
* <ol>
* <li>Deadlock is found.</li>
* <li>Probe is relayed to other blocking transactions.</li>
* <li>Probe is discarded because receiving transaction is not blocked.</li>
* </ol>
*
* @param probe Received probe message.
*/
private void handleDeadlockProbe(DeadlockProbe probe) {
if (probe.nearCheck())
handleDeadlockProbeForNear(probe);
else
handleDeadlockProbeForDht(probe);
}
/** */
private void handleDeadlockProbeForNear(DeadlockProbe probe) {
// a probe is simply discarded if next wait-for edge is not found
ProbedTx blocker = probe.blocker();
GridNearTxLocal nearTx = cctx.tm().tx(blocker.nearXidVersion());
if (nearTx == null)
return;
// probe each blocker
for (UUID pendingNodeId : getPendingResponseNodes(nearTx)) {
sendProbe(
pendingNodeId,
probe.initiatorVersion(),
probe.waitChain(),
// real start time is filled here
blocker.withStartTime(nearTx.startTime()),
false);
}
}
/** */
private void handleDeadlockProbeForDht(DeadlockProbe probe) {
// a probe is simply discarded if next wait-for edge is not found
cctx.tm().activeTransactions().stream()
.filter(IgniteInternalTx::local)
.filter(tx -> tx.nearXidVersion().equals(probe.blocker().nearXidVersion()))
.findAny()
.map(GridDhtTxLocalAdapter.class::cast)
.ifPresent(tx -> {
// search for locally checked tx (identified as blocker previously) in the wait chain
Optional<ProbedTx> repeatedTx = probe.waitChain().stream()
.filter(wTx -> wTx.xidVersion().equals(tx.xidVersion()))
.findAny();
if (repeatedTx.isPresent()) {
// a deadlock found
resolveDeadlock(probe, repeatedTx.get(), tx);
}
else
relayProbeIfLocalTxIsWaiting(probe, tx);
});
}
/** */
private void resolveDeadlock(DeadlockProbe probe, ProbedTx repeatedTx, GridDhtTxLocalAdapter locTx) {
if (log.isDebugEnabled())
log.debug("Deadlock detected [probe=" + probe + ']');
ProbedTx victim = chooseVictim(
// real start time is filled here for repeated tx
repeatedTx.withStartTime(probe.blocker().startTime()),
probe.waitChain());
if (victim.xidVersion().equals(locTx.xidVersion())) {
if (log.isDebugEnabled())
log.debug("Chosen victim is on local node, tx will be aborted [victim=" + victim + ']');
// if a victim tx has made a progress since it was identified as waiting
// it means that detected deadlock was broken by other means (e.g. timeout of another tx)
if (victim.lockCounter() == locTx.lockCounter())
abortTx(locTx);
}
else {
if (log.isDebugEnabled())
log.debug("Chosen victim is on remote node, message will be sent [victim=" + victim + ']');
// destination node must determine itself as a victim
sendProbe(victim.nodeId(), probe.initiatorVersion(), singleton(victim), victim, false);
}
}
/** */
private void relayProbeIfLocalTxIsWaiting(DeadlockProbe probe, GridDhtTxLocalAdapter locTx) {
assert locTx.mvccSnapshot() != null;
cctx.coordinators().checkWaiting(locTx.mvccSnapshot())
.flatMap(this::findTx)
.ifPresent(nextBlocker -> {
ArrayList<ProbedTx> waitChain = new ArrayList<>(probe.waitChain().size() + 1);
waitChain.addAll(probe.waitChain());
// real start time is filled here
waitChain.add(new ProbedTx(locTx.nodeId(), locTx.xidVersion(), locTx.nearXidVersion(),
probe.blocker().startTime(), locTx.lockCounter()));
// real start time will be filled later when corresponding near node is visited
ProbedTx nextProbedTx = new ProbedTx(nextBlocker.nodeId(), nextBlocker.xidVersion(),
nextBlocker.nearXidVersion(), -1, nextBlocker.lockCounter());
sendProbe(
nextBlocker.eventNodeId(),
probe.initiatorVersion(),
waitChain,
nextProbedTx,
true);
});
}
/**
* Chooses victim basing on tx start time. Algorithm chooses victim in such way that every site detected a deadlock
* will choose the same victim. As a result only one tx participating in a deadlock will be aborted.
* <p>
* Local tx is needed here because start time for it might not be filled yet for corresponding entry in wait chain.
*
* @param locTx Deadlocked tx on local node.
* @param waitChain Wait chain.
* @return Tx chosen as a victim.
*/
@SuppressWarnings("StatementWithEmptyBody")
private ProbedTx chooseVictim(ProbedTx locTx, Collection<ProbedTx> waitChain) {
Iterator<ProbedTx> it = waitChain.iterator();
// skip until local tx (inclusive), because txs before are not deadlocked
while (it.hasNext() && !it.next().xidVersion().equals(locTx.xidVersion()));
ProbedTx victim = locTx;
long maxStartTime = locTx.startTime();
while (it.hasNext()) {
ProbedTx tx = it.next();
// seek for youngest tx in order to guarantee forward progress
if (tx.startTime() > maxStartTime) {
maxStartTime = tx.startTime();
victim = tx;
}
// tie-breaking
else if (tx.startTime() == maxStartTime && tx.nearXidVersion().compareTo(victim.nearXidVersion()) > 0)
victim = tx;
}
return victim;
}
/** */
private void abortTx(GridDhtTxLocalAdapter tx) {
cctx.coordinators().failWaiter(tx.mvccSnapshot(), new IgniteTxRollbackCheckedException(
"Deadlock detected. Transaction will be rolled back [tx=" + tx + ']'));
}
/** */
private Set<UUID> getPendingResponseNodes(GridNearTxLocal tx) {
IgniteInternalFuture lockFut = tx.lockFuture();
if (lockFut instanceof GridNearTxAbstractEnlistFuture)
return ((GridNearTxAbstractEnlistFuture<?>)lockFut).pendingResponseNodes();
return Collections.emptySet();
}
/** */
private void sendProbe(UUID destNodeId, GridCacheVersion initiatorVer, Collection<ProbedTx> waitChain,
ProbedTx blocker, boolean near) {
DeadlockProbe probe = new DeadlockProbe(initiatorVer, waitChain, blocker, near);
if (log.isDebugEnabled())
log.debug("Sending probe [probe=" + probe + ", destNode=" + destNodeId + ']');
try {
cctx.gridIO().sendToGridTopic(destNodeId, TOPIC_DEADLOCK_DETECTION, probe, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
}
catch (IgniteCheckedException e) {
log.warning("Failed to send a deadlock probe [nodeId=" + destNodeId + ']', e);
}
}
/**
* Delayed deadlock probe computation which can be cancelled.
*/
public class DelayedDeadlockComputation extends GridTimeoutObjectAdapter {
/** */
private final MvccVersion waiterVer;
/** */
private final MvccVersion blockerVer;
/** {@inheritDoc} */
@Override public void onTimeout() {
startComputation(waiterVer, blockerVer);
}
/** */
private DelayedDeadlockComputation(MvccVersion waiterVer, MvccVersion blockerVer, long timeout) {
super(timeout);
this.waiterVer = waiterVer;
this.blockerVer = blockerVer;
cctx.kernalContext().timeout().addTimeoutObject(this);
}
/** */
public void cancel() {
cctx.kernalContext().timeout().removeTimeoutObject(this);
}
}
}