blob: 77bf58fa8696b2d508f9944c3df9cbe7682bc38b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.EnlistOperation;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFutureCancelledException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH;
import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
import static org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_NEAR_FINISH_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_NEAR_FINISH_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_NEAR_PREPARE_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_NEAR_PREPARE_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_PROCESS_DHT_FINISH_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_PROCESS_DHT_FINISH_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_PROCESS_DHT_ONE_PHASE_COMMIT_ACK_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_PROCESS_DHT_PREPARE_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.TX_PROCESS_DHT_PREPARE_RESP;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
/**
* Isolated logic to process cache messages.
*/
public class IgniteTxHandler {
/** Logger. */
private IgniteLogger log;
/** */
private final IgniteLogger txPrepareMsgLog;
/** */
private final IgniteLogger txFinishMsgLog;
/** */
private final IgniteLogger txRecoveryMsgLog;
/** Shared cache context. */
private GridCacheSharedContext<?, ?> ctx;
/**
* @param nearNodeId Sender node ID.
* @param req Request.
*/
private void processNearTxPrepareRequest(UUID nearNodeId, GridNearTxPrepareRequest req) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_PREPARE_REQ, MTC.span()))) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() +
", node=" + nearNodeId + ']');
}
ClusterNode nearNode = ctx.node(nearNodeId);
if (nearNode == null) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received near prepare from node that left grid (will ignore) [" +
"txId=" + req.version() +
", node=" + nearNodeId + ']');
}
return;
}
processNearTxPrepareRequest0(nearNode, req);
}
}
/**
* @param nearNode Sender node.
* @param req Request.
*/
private IgniteInternalFuture<GridNearTxPrepareResponse> processNearTxPrepareRequest0(
ClusterNode nearNode,
GridNearTxPrepareRequest req
) {
IgniteInternalFuture<GridNearTxPrepareResponse> fut;
if (req.firstClientRequest() && req.allowWaitTopologyFuture()) {
for (;;) {
if (waitForExchangeFuture(nearNode, req))
return new GridFinishedFuture<>();
fut = prepareNearTx(nearNode, req);
if (fut != null)
break;
}
}
else
fut = prepareNearTx(nearNode, req);
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null);
return fut;
}
/**
* @param ctx Shared cache context.
*/
public IgniteTxHandler(GridCacheSharedContext ctx) {
this.ctx = ctx;
log = ctx.logger(IgniteTxHandler.class);
txRecoveryMsgLog = ctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
txPrepareMsgLog = ctx.logger(CU.TX_MSG_PREPARE_LOG_CATEGORY);
txFinishMsgLog = ctx.logger(CU.TX_MSG_FINISH_LOG_CATEGORY);
ctx.io().addCacheHandler(0, GridNearTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxPrepareRequest(nodeId, (GridNearTxPrepareRequest)msg);
}
});
ctx.io().addCacheHandler(0, GridNearTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxPrepareResponse(nodeId, (GridNearTxPrepareResponse)msg);
}
});
ctx.io().addCacheHandler(0, GridNearTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxFinishRequest(nodeId, (GridNearTxFinishRequest)msg);
}
});
ctx.io().addCacheHandler(0, GridNearTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processNearTxFinishResponse(nodeId, (GridNearTxFinishResponse)msg);
}
});
ctx.io().addCacheHandler(0, GridDhtTxPrepareRequest.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxPrepareRequest(nodeId, (GridDhtTxPrepareRequest)msg);
}
});
ctx.io().addCacheHandler(0, GridDhtTxPrepareResponse.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxPrepareResponse(nodeId, (GridDhtTxPrepareResponse)msg);
}
});
ctx.io().addCacheHandler(0, GridDhtTxFinishRequest.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxFinishRequest(nodeId, (GridDhtTxFinishRequest)msg);
}
});
ctx.io().addCacheHandler(0, GridDhtTxOnePhaseCommitAckRequest.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxOnePhaseCommitAckRequest(nodeId, (GridDhtTxOnePhaseCommitAckRequest)msg);
}
});
ctx.io().addCacheHandler(0, GridDhtTxFinishResponse.class, new CI2<UUID, GridCacheMessage>() {
@Override public void apply(UUID nodeId, GridCacheMessage msg) {
processDhtTxFinishResponse(nodeId, (GridDhtTxFinishResponse)msg);
}
});
ctx.io().addCacheHandler(0, GridCacheTxRecoveryRequest.class,
new CI2<UUID, GridCacheTxRecoveryRequest>() {
@Override public void apply(UUID nodeId, GridCacheTxRecoveryRequest req) {
processCheckPreparedTxRequest(nodeId, req);
}
});
ctx.io().addCacheHandler(0, GridCacheTxRecoveryResponse.class,
new CI2<UUID, GridCacheTxRecoveryResponse>() {
@Override public void apply(UUID nodeId, GridCacheTxRecoveryResponse res) {
processCheckPreparedTxResponse(nodeId, res);
}
});
}
/**
* Prepares local colocated tx.
*
* @param locTx Local transaction.
* @param req Near prepare request.
* @return Prepare future.
*/
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareColocatedTx(
final GridNearTxLocal locTx,
final GridNearTxPrepareRequest req
) {
req.txState(locTx.txState());
IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() {
@Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) {
try {
return f.get();
}
catch (Exception e) {
locTx.setRollbackOnly(); // Just in case.
if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
!X.hasCause(e, IgniteFutureCancelledException.class))
U.error(log, "Failed to prepare DHT transaction: " + locTx, e);
return new GridNearTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
req.version(),
req.version(),
null,
e,
null,
req.onePhaseCommit(),
req.deployInfo() != null);
}
}
});
}
/**
* @param entries Entries.
* @return First entry.
* @throws IgniteCheckedException If failed.
*/
private IgniteTxEntry unmarshal(@Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException {
if (entries == null)
return null;
IgniteTxEntry firstEntry = null;
for (IgniteTxEntry e : entries) {
e.unmarshal(ctx, false, ctx.deploy().globalLoader());
if (firstEntry == null)
firstEntry = e;
}
return firstEntry;
}
/**
* @param originTx Transaction for copy.
* @param req Request.
* @return Prepare future.
*/
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal(
final GridNearTxLocal originTx,
final GridNearTxPrepareRequest req) {
// Make sure not to provide Near entries to DHT cache.
req.cloneEntries();
return prepareNearTx(originTx, ctx.localNode(), req);
}
/**
* @param nearNode Node that initiated transaction.
* @param req Near prepare request.
* @return Prepare future or {@code null} if need retry operation.
*/
@Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final ClusterNode nearNode,
final GridNearTxPrepareRequest req
) {
return prepareNearTx(null, nearNode, req);
}
/**
* @param originTx Transaction for copy.
* @param nearNode Node that initiated transaction.
* @param req Near prepare request.
* @return Prepare future or {@code null} if need retry operation.
*/
@Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx(
final GridNearTxLocal originTx,
final ClusterNode nearNode,
final GridNearTxPrepareRequest req
) {
IgniteTxEntry firstEntry;
try {
IgniteTxEntry firstWrite = unmarshal(req.writes());
IgniteTxEntry firstRead = unmarshal(req.reads());
firstEntry = firstWrite != null ? firstWrite : firstRead;
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
GridDhtTxLocal tx = null;
GridCacheVersion mappedVer = ctx.tm().mappedVersion(req.version());
if (mappedVer != null) {
tx = ctx.tm().tx(mappedVer);
if (tx == null)
U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
+ ", mappedVer=" + mappedVer + ']');
else {
if (req.concurrency() == PESSIMISTIC)
tx.nearFutureId(req.futureId());
}
}
else {
GridDhtPartitionTopology top = null;
if (req.firstClientRequest()) {
assert firstEntry != null : req;
assert req.concurrency() == OPTIMISTIC : req;
assert nearNode.isClient() : nearNode;
top = firstEntry.context().topology();
top.readLock();
if (req.allowWaitTopologyFuture()) {
GridDhtTopologyFuture topFut = top.topologyVersionFuture();
if (!topFut.isDone()) {
top.readUnlock();
return null;
}
}
}
try {
if (top != null ) {
boolean retry = false;
GridDhtTopologyFuture topFut = top.topologyVersionFuture();
if (!req.allowWaitTopologyFuture() && !topFut.isDone()) {
retry = true;
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Topology change is in progress, need remap transaction [" +
"txId=" + req.version() +
", node=" + nearNode.id() +
", reqTopVer=" + req.topologyVersion() +
", locTopVer=" + top.readyTopologyVersion() +
", req=" + req + ']');
}
}
if (!retry && needRemap(req.topologyVersion(), top.readyTopologyVersion(), req)) {
retry = true;
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Topology version mismatch for near prepare, need remap transaction [" +
"txId=" + req.version() +
", node=" + nearNode.id() +
", reqTopVer=" + req.topologyVersion() +
", locTopVer=" + top.readyTopologyVersion() +
", req=" + req + ']');
}
}
if (retry) {
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
req.version(),
req.version(),
null,
null,
top.lastTopologyChangeVersion(),
req.onePhaseCommit(),
req.deployInfo() != null);
try {
ctx.io().send(nearNode, res, req.policy());
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() +
", node=" + nearNode.id() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Failed to send remap response for near prepare, node failed [" +
"txId=" + req.version() +
", node=" + nearNode.id() + ']');
}
}
catch (IgniteCheckedException e) {
U.error(txPrepareMsgLog, "Failed to send remap response for near prepare " +
"[txId=" + req.version() +
", node=" + nearNode.id() +
", req=" + req + ']', e);
}
return new GridFinishedFuture<>(res);
}
assert topFut.isDone();
}
tx = new GridDhtTxLocal(
ctx,
req.topologyVersion(),
nearNode.id(),
req.version(),
req.futureId(),
req.miniId(),
req.threadId(),
req.implicitSingle(),
req.implicitSingle(),
req.system(),
req.explicitLock(),
req.policy(),
req.concurrency(),
req.isolation(),
req.timeout(),
req.isInvalidate(),
true,
req.onePhaseCommit(),
req.txSize(),
req.transactionNodes(),
securitySubjectId(ctx),
req.taskNameHash(),
req.txLabel(),
originTx
);
tx = ctx.tm().onCreated(null, tx);
if (tx != null)
tx.topologyVersion(req.topologyVersion());
else
U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
req.version() + ", req=" + req + ']');
}
finally {
if (tx != null)
req.txState(tx.txState());
if (top != null)
top.readUnlock();
}
}
if (tx != null) {
req.txState(tx.txState());
if (req.explicitLock())
tx.explicitLock(true);
tx.transactionNodes(req.transactionNodes());
if (req.near())
tx.nearOnOriginatingNode(true);
if (req.onePhaseCommit()) {
assert req.last() : req;
tx.onePhaseCommit(true);
}
if (req.needReturnValue())
tx.needReturnValue(true);
IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(req);
if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
tx.rollbackDhtLocalAsync();
}
final GridDhtTxLocal tx0 = tx;
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> txFut) {
try {
txFut.get();
}
catch (IgniteCheckedException e) {
tx0.setRollbackOnly(); // Just in case.
if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
!X.hasCause(e, IgniteFutureCancelledException.class) && !ctx.kernalContext().isStopping())
U.error(log, "Failed to prepare DHT transaction: " + tx0, e);
}
}
});
return fut;
}
else
return new GridFinishedFuture<>((GridNearTxPrepareResponse)null);
}
/**
* @param node Sender node.
* @param req Request.
* @return {@code True} if update will be retried from future listener or topology version future is timed out.
*/
private boolean waitForExchangeFuture(final ClusterNode node, final GridNearTxPrepareRequest req) {
assert req.firstClientRequest() : req;
GridDhtTopologyFuture topFut = ctx.exchange().lastTopologyFuture();
if (!topFut.isDone()) {
Thread curThread = Thread.currentThread();
if (curThread instanceof IgniteThread) {
final IgniteThread thread = (IgniteThread)curThread;
if (thread.cachePoolThread()) {
ctx.time().waitAsync(topFut, req.timeout(), (e, timedOut) -> {
if (e != null || timedOut) {
sendResponseOnTimeoutOrError(e, topFut, node, req);
return;
}
ctx.kernalContext().closure().runLocalWithThreadPolicy(thread, () -> {
try {
processNearTxPrepareRequest0(node, req);
}
finally {
ctx.io().onMessageProcessed(req);
}
});
}
);
return true;
}
}
try {
if (req.timeout() > 0)
topFut.get(req.timeout());
else
topFut.get();
}
catch (IgniteFutureTimeoutCheckedException e) {
sendResponseOnTimeoutOrError(null, topFut, node, req);
return true;
}
catch (IgniteCheckedException e) {
U.error(log, "Topology future failed: " + e, e);
}
}
return false;
}
/**
* @param e Exception or null if timed out.
* @param topFut Topology future.
* @param node Node.
* @param req Prepare request.
*/
private void sendResponseOnTimeoutOrError(@Nullable IgniteCheckedException e,
GridDhtTopologyFuture topFut,
ClusterNode node,
GridNearTxPrepareRequest req) {
if (e == null)
e = new IgniteTxTimeoutCheckedException("Failed to wait topology version for near prepare " +
"[txId=" + req.version() +
", topVer=" + topFut.initialVersion() +
", node=" + node.id() +
", req=" + req + ']');
GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
req.version(),
req.version(),
null,
e,
null,
req.onePhaseCommit(),
req.deployInfo() != null);
try {
ctx.io().send(node.id(), res, req.policy());
}
catch (IgniteCheckedException e0) {
U.error(txPrepareMsgLog, "Failed to send wait topology version response for near prepare " +
"[txId=" + req.version() +
", topVer=" + topFut.initialVersion() +
", node=" + node.id() +
", req=" + req + ']', e0);
}
}
/**
* @param expVer Expected topology version.
* @param curVer Current topology version.
* @param req Request.
* @return {@code True} if cache affinity changed and request should be remapped.
*/
private boolean needRemap(AffinityTopologyVersion expVer,
AffinityTopologyVersion curVer,
GridNearTxPrepareRequest req) {
if (curVer.equals(expVer))
return false;
AffinityTopologyVersion lastAffChangedTopVer = ctx.exchange().lastAffinityChangedTopologyVersion(expVer);
if (curVer.compareTo(expVer) <= 0 && curVer.compareTo(lastAffChangedTopVer) >= 0)
return false;
// TODO IGNITE-6754 check mvcc crd for mvcc enabled txs.
for (IgniteTxEntry e : F.concat(false, req.reads(), req.writes())) {
GridCacheContext ctx = e.context();
Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
return true;
try {
List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer);
List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer);
if (!aff1.equals(aff2))
return true;
}
catch (IllegalStateException ignored) {
return true;
}
}
return false;
}
/**
* @param nodeId Node ID.
* @param res Response.
*/
private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_PREPARE_RESP, MTC.span()))) {
if (txPrepareMsgLog.isDebugEnabled())
txPrepareMsgLog.debug("Received near prepare response [txId=" + res.version() + ", node=" +
nodeId + ']');
GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc()
.<IgniteInternalTx>versionedFuture(res.version(), res.futureId());
if (fut == null) {
U.warn(log, "Failed to find future for near prepare response [txId=" + res.version() +
", node=" + nodeId +
", res=" + res + ']');
return;
}
IgniteInternalTx tx = fut.tx();
assert tx != null;
res.txState(tx.txState());
fut.onResult(nodeId, res);
}
}
/**
* @param nodeId Node ID.
* @param res Response.
*/
private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse res) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_FINISH_RESP, MTC.span()))) {
if (txFinishMsgLog.isDebugEnabled())
txFinishMsgLog.debug("Received near finish response [txId=" + res.xid() + ", node=" + nodeId + ']');
GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Failed to find future for near finish response [txId=" + res.xid() +
", node=" + nodeId +
", res=" + res + ']');
}
return;
}
fut.onResult(nodeId, res);
}
}
/**
* @param nodeId Node ID.
* @param res Response.
*/
private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_PREPARE_RESP, MTC.span()))) {
GridDhtTxPrepareFuture fut =
(GridDhtTxPrepareFuture)ctx.mvcc().versionedFuture(res.version(), res.futureId());
if (fut == null) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Failed to find future for dht prepare response [txId=null" +
", dhtTxId=" + res.version() +
", node=" + nodeId +
", res=" + res + ']');
}
return;
}
else if (txPrepareMsgLog.isDebugEnabled())
txPrepareMsgLog.debug("Received dht prepare response [txId=" + fut.tx().nearXidVersion() +
", node=" + nodeId + ']');
IgniteInternalTx tx = fut.tx();
assert tx != null;
res.txState(tx.txState());
fut.onResult(nodeId, res);
}
}
/**
* @param nodeId Node ID.
* @param res Response.
*/
private void processDhtTxFinishResponse(UUID nodeId, GridDhtTxFinishResponse res) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_FINISH_RESP, MTC.span()))) {
assert nodeId != null;
assert res != null;
if (res.checkCommitted()) {
GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Failed to find future for dht finish check committed response [txId=null" +
", dhtTxId=" + res.xid() +
", node=" + nodeId +
", res=" + res + ']');
}
return;
}
else if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Received dht finish check committed response [txId=" + fut.tx().nearXidVersion() +
", dhtTxId=" + res.xid() +
", node=" + nodeId + ']');
}
fut.onResult(nodeId, res);
}
else {
GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().<IgniteInternalTx>future(res.futureId());
if (fut == null) {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Failed to find future for dht finish response [txId=null" +
", dhtTxId=" + res.xid() +
", node=" + nodeId +
", res=" + res);
}
return;
}
else if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Received dht finish response [txId=" + fut.tx().nearXidVersion() +
", dhtTxId=" + res.xid() +
", node=" + nodeId + ']');
}
fut.onResult(nodeId, res);
}
}
}
/**
* @param nodeId Node ID.
* @param req Request.
* @return Future.
*/
@Nullable private IgniteInternalFuture<IgniteInternalTx> processNearTxFinishRequest(
UUID nodeId,
GridNearTxFinishRequest req
) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_NEAR_FINISH_REQ, MTC.span()))) {
if (txFinishMsgLog.isDebugEnabled())
txFinishMsgLog.debug("Received near finish request [txId=" + req.version() + ", node=" + nodeId +
']');
IgniteInternalFuture<IgniteInternalTx> fut = finish(nodeId, null, req);
assert req.txState() != null || fut == null || fut.error() != null ||
(ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null) :
"[req=" + req + ", fut=" + fut + "]";
return fut;
}
}
/**
* @param nodeId Node ID.
* @param locTx Local transaction.
* @param req Request.
* @return Future.
*/
@Nullable public IgniteInternalFuture<IgniteInternalTx> finish(UUID nodeId,
@Nullable GridNearTxLocal locTx,
GridNearTxFinishRequest req
) {
assert nodeId != null;
assert req != null;
if (locTx != null)
req.txState(locTx.txState());
// Always add near version to rollback history to prevent races with rollbacks.
if (!req.commit())
ctx.tm().addRolledbackTx(null, req.version());
// Transaction on local cache only.
if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped())
return new GridFinishedFuture<IgniteInternalTx>(locTx);
if (log.isDebugEnabled())
log.debug("Processing near tx finish request [nodeId=" + nodeId + ", req=" + req + "]");
IgniteInternalFuture<IgniteInternalTx> colocatedFinishFut = null;
if (locTx != null && locTx.colocatedLocallyMapped())
colocatedFinishFut = finishColocatedLocal(req.commit(), locTx);
IgniteInternalFuture<IgniteInternalTx> nearFinishFut = null;
if (locTx == null || locTx.nearLocallyMapped())
nearFinishFut = finishDhtLocal(nodeId, locTx, req);
if (colocatedFinishFut != null && nearFinishFut != null) {
GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> res = new GridCompoundFuture<>();
res.add(colocatedFinishFut);
res.add(nearFinishFut);
res.markInitialized();
return res;
}
if (colocatedFinishFut != null)
return colocatedFinishFut;
return nearFinishFut;
}
/**
* @param nodeId Node ID initiated commit.
* @param locTx Optional local transaction.
* @param req Finish request.
* @return Finish future.
*/
private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId,
@Nullable GridNearTxLocal locTx,
GridNearTxFinishRequest req
) {
GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version());
GridDhtTxLocal tx = null;
if (dhtVer == null) {
if (log.isDebugEnabled())
log.debug("Received transaction finish request for unknown near version (was lock explicit?): " + req);
}
else
tx = ctx.tm().tx(dhtVer);
if (tx != null) {
tx.mvccSnapshot(req.mvccSnapshot());
req.txState(tx.txState());
}
if (tx == null && locTx != null && !req.commit()) {
U.warn(log, "DHT local tx not found for near local tx rollback " +
"[req=" + req + ", dhtVer=" + dhtVer + ", tx=" + locTx + ']');
return null;
}
if (tx == null && !req.explicitLock()) {
assert locTx == null : "DHT local tx should never be lost for near local tx: " + locTx;
U.warn(txFinishMsgLog, "Received finish request for completed transaction (the message may be too late) [" +
"txId=" + req.version() +
", dhtTxId=" + dhtVer +
", node=" + nodeId +
", commit=" + req.commit() + ']');
// Always send finish response.
GridCacheMessage res = new GridNearTxFinishResponse(
req.partition(),
req.version(),
req.threadId(),
req.futureId(),
req.miniId(),
new IgniteTxRollbackCheckedException("Transaction has been already completed or not started yet."));
try {
ctx.io().send(nodeId, res, req.policy());
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Sent near finish response for completed tx [txId=" + req.version() +
", dhtTxId=" + dhtVer +
", node=" + nodeId + ']');
}
}
catch (Throwable e) {
// Double-check.
if (ctx.discovery().node(nodeId) == null) {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Failed to send near finish response for completed tx, node failed [" +
"txId=" + req.version() +
", dhtTxId=" + dhtVer +
", node=" + nodeId + ']');
}
}
else {
U.error(txFinishMsgLog, "Failed to send near finish response for completed tx, node failed [" +
"txId=" + req.version() +
", dhtTxId=" + dhtVer +
", node=" + nodeId +
", req=" + req +
", res=" + res + ']', e);
}
if (e instanceof Error)
throw (Error)e;
}
return null;
}
try {
assert tx != null : "Transaction is null for near finish request [nodeId=" +
nodeId + ", req=" + req + "]";
assert req.syncMode() != null : req;
tx.syncMode(req.syncMode());
tx.nearFinishFutureId(req.futureId());
tx.nearFinishMiniId(req.miniId());
tx.storeEnabled(req.storeEnabled());
if (!tx.markFinalizing(USER_FINISH)) {
if (log.isDebugEnabled())
log.debug("Will not finish transaction (it is handled by another thread) [commit=" + req.commit() + ", tx=" + tx + ']');
return null;
}
if (req.commit()) {
IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitDhtLocalAsync();
// Only for error logging.
commitFut.listen(CU.errorLogger(log));
return commitFut;
}
else {
IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync();
// Only for error logging.
rollbackFut.listen(CU.errorLogger(log));
return rollbackFut;
}
}
catch (Throwable e) {
try {
if (tx != null) {
tx.commitError(e);
tx.systemInvalidate(true);
try {
IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync();
// Only for error logging.
res.listen(CU.errorLogger(log));
return res;
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
tx.logTxFinishErrorSafe(log, req.commit(), e);
}
if (e instanceof Error)
throw (Error)e;
return new GridFinishedFuture<>(e);
}
finally {
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
}
/**
* @param commit Commit flag (rollback if {@code false}).
* @param tx Transaction to commit.
* @return Future.
*/
public IgniteInternalFuture<IgniteInternalTx> finishColocatedLocal(boolean commit, GridNearTxLocal tx) {
try {
if (commit) {
if (!tx.markFinalizing(USER_FINISH)) {
if (log.isDebugEnabled())
log.debug("Will not finish transaction (it is handled by another thread): " + tx);
return null;
}
return tx.commitAsyncLocal();
}
else
return tx.rollbackAsyncLocal();
}
catch (Throwable e) {
try {
if (tx != null) {
try {
return tx.rollbackNearTxLocalAsync();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
tx.logTxFinishErrorSafe(log, commit, e);
}
if (e instanceof Error)
throw e;
return new GridFinishedFuture<>(e);
}
finally {
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
}
/**
* @param nodeId Sender node ID.
* @param req Request.
*/
private void processDhtTxPrepareRequest(final UUID nodeId, final GridDhtTxPrepareRequest req) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_PREPARE_REQ, MTC.span()))) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Received dht prepare request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
assert nodeId != null;
assert req != null;
assert req.transactionNodes() != null;
GridDhtTxRemote dhtTx = null;
GridNearTxRemote nearTx = null;
GridDhtTxPrepareResponse res;
try {
res = new GridDhtTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
req.deployInfo() != null);
// Start near transaction first.
nearTx = !F.isEmpty(req.nearWrites()) ? startNearRemoteTx(ctx.deploy().globalLoader(), nodeId, req) : null;
dhtTx = startRemoteTx(nodeId, req, res);
// Set evicted keys from near transaction.
if (nearTx != null)
res.nearEvicted(nearTx.evicted());
List<IgniteTxKey> writesCacheMissed = req.nearWritesCacheMissed();
if (writesCacheMissed != null) {
Collection<IgniteTxKey> evicted0 = res.nearEvicted();
if (evicted0 != null)
writesCacheMissed.addAll(evicted0);
res.nearEvicted(writesCacheMissed);
}
if (dhtTx != null)
req.txState(dhtTx.txState());
else if (nearTx != null)
req.txState(nearTx.txState());
if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
res.invalidPartitionsByCacheId(dhtTx.invalidPartitions());
if (req.onePhaseCommit()) {
assert req.last();
if (dhtTx != null) {
dhtTx.onePhaseCommit(true);
dhtTx.needReturnValue(req.needReturnValue());
finish(dhtTx, req);
}
if (nearTx != null) {
nearTx.onePhaseCommit(true);
finish(nearTx, req);
}
}
}
catch (IgniteCheckedException e) {
if (e instanceof IgniteTxRollbackCheckedException)
U.error(log, "Transaction was rolled back before prepare completed: " + req, e);
else if (e instanceof IgniteTxOptimisticCheckedException) {
if (log.isDebugEnabled())
log.debug("Optimistic failure for remote transaction (will rollback): " + req);
}
else
U.error(log, "Failed to process prepare request: " + req, e);
if (nearTx != null)
try {
nearTx.rollbackRemoteTx();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
res = new GridDhtTxPrepareResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId(),
e,
req.deployInfo() != null);
}
if (req.onePhaseCommit()) {
IgniteInternalFuture completeFut;
IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
null : dhtTx.done() ? null : dhtTx.finishFuture();
final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
null : nearTx.done() ? null : nearTx.finishFuture();
if (dhtFin != null && nearFin != null) {
GridCompoundFuture fut = new GridCompoundFuture();
fut.add(dhtFin);
fut.add(nearFin);
fut.markInitialized();
completeFut = fut;
}
else
completeFut = dhtFin != null ? dhtFin : nearFin;
if (completeFut != null) {
final GridDhtTxPrepareResponse res0 = res;
final GridDhtTxRemote dhtTx0 = dhtTx;
final GridNearTxRemote nearTx0 = nearTx;
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
sendReply(nodeId, req, res0, dhtTx0, nearTx0);
}
});
}
else
sendReply(nodeId, req, res, dhtTx, nearTx);
}
else
sendReply(nodeId, req, res, dhtTx, nearTx);
assert req.txState() != null || res.error() != null || (dhtTx == null && nearTx == null) :
req + " tx=" + dhtTx + " nearTx=" + nearTx;
}
}
/**
* @param nodeId Node ID.
* @param req Request.
*/
private void processDhtTxOnePhaseCommitAckRequest(final UUID nodeId,
final GridDhtTxOnePhaseCommitAckRequest req) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_ONE_PHASE_COMMIT_ACK_REQ, MTC.span()))) {
assert nodeId != null;
assert req != null;
if (log.isDebugEnabled())
log.debug("Processing dht tx one phase commit ack request [nodeId=" + nodeId + ", req=" + req + ']');
for (GridCacheVersion ver : req.versions())
ctx.tm().removeTxReturn(ver);
}
}
/**
* @param nodeId Node ID.
* @param req Request.
*/
@SuppressWarnings({"unchecked"})
private void processDhtTxFinishRequest(final UUID nodeId, final GridDhtTxFinishRequest req) {
try (TraceSurroundings ignored =
MTC.support(ctx.kernalContext().tracing().create(TX_PROCESS_DHT_FINISH_REQ, MTC.span()))) {
assert nodeId != null;
assert req != null;
if (req.checkCommitted()) {
boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
if (!committed || req.syncMode() != FULL_SYNC)
sendReply(nodeId, req, committed, null);
else {
IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
sendReply(nodeId, req, true, null);
}
});
}
return;
}
// Always add version to rollback history to prevent races with rollbacks.
if (!req.commit())
ctx.tm().addRolledbackTx(null, req.version());
GridDhtTxRemote dhtTx = ctx.tm().tx(req.version());
GridNearTxRemote nearTx = ctx.tm().nearTx(req.version());
IgniteInternalTx anyTx = U.<IgniteInternalTx>firstNotNull(dhtTx, nearTx);
final GridCacheVersion nearTxId = anyTx != null ? anyTx.nearXidVersion() : null;
if (txFinishMsgLog.isDebugEnabled())
txFinishMsgLog.debug("Received dht finish request [txId=" + nearTxId + ", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
if (anyTx == null && req.commit())
ctx.tm().addCommittedTx(null, req.version(), null);
if (dhtTx != null)
finish(nodeId, dhtTx, req);
else {
try {
applyPartitionsUpdatesCounters(req.updateCounters(), !req.commit(), false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
if (nearTx != null)
finish(nodeId, nearTx, req);
if (req.replyRequired()) {
IgniteInternalFuture completeFut;
IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ?
null : dhtTx.done() ? null : dhtTx.finishFuture();
final IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ?
null : nearTx.done() ? null : nearTx.finishFuture();
if (dhtFin != null && nearFin != null) {
GridCompoundFuture fut = new GridCompoundFuture();
fut.add(dhtFin);
fut.add(nearFin);
fut.markInitialized();
completeFut = fut;
}
else
completeFut = dhtFin != null ? dhtFin : nearFin;
if (completeFut != null) {
completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
sendReply(nodeId, req, true, nearTxId);
}
});
}
else
sendReply(nodeId, req, true, nearTxId);
}
else
sendReply(nodeId, req, true, null);
assert req.txState() != null || (dhtTx == null && nearTx == null) : req + " tx=" + dhtTx + " nearTx=" + nearTx;
}
}
/**
* @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
*/
protected void finish(
UUID nodeId,
IgniteTxRemoteEx tx,
GridDhtTxFinishRequest req
) {
assert tx != null;
req.txState(tx.txState());
try {
if (req.commit() || req.isSystemInvalidate()) {
tx.commitVersion(req.commitVersion());
if (req.isInvalidate())
tx.invalidate(true);
if (req.isSystemInvalidate())
tx.systemInvalidate(true);
tx.mvccSnapshot(req.mvccSnapshot());
// Complete remote candidates.
tx.doneRemote(req.baseVersion(), null, null, null);
tx.commitRemoteTx();
}
else {
if (tx.dht() && req.updateCounters() != null)
tx.txCounters(true).updateCounters(req.updateCounters());
tx.doneRemote(req.baseVersion(), null, null, null);
tx.mvccSnapshot(req.mvccSnapshot());
tx.rollbackRemoteTx();
}
}
catch (IgniteTxHeuristicCheckedException ignore) {
// Already uncommitted.
}
catch (Throwable e) {
// Mark transaction for invalidate.
tx.invalidate(true);
tx.systemInvalidate(true);
try {
tx.commitRemoteTx();
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to invalidate transaction: " + tx, ex);
}
if (e instanceof Error)
throw (Error)e;
}
}
/**
* Finish for one-phase distributed tx.
*
* @param tx Transaction.
* @param req Request.
*/
protected void finish(
GridDistributedTxRemoteAdapter tx,
GridDhtTxPrepareRequest req) throws IgniteTxHeuristicCheckedException {
assert tx != null : "No transaction for one-phase commit prepare request: " + req;
try {
tx.commitVersion(req.writeVersion());
tx.invalidate(req.isInvalidate());
tx.mvccSnapshot(req.mvccSnapshot());
// Complete remote candidates.
tx.doneRemote(req.version(), null, null, null);
tx.commitRemoteTx();
}
catch (IgniteTxHeuristicCheckedException e) {
// Just rethrow this exception. Transaction was already uncommitted.
throw e;
}
catch (Throwable e) {
try {
// Mark transaction for invalidate.
tx.invalidate(true);
tx.systemInvalidate(true);
try {
tx.rollbackRemoteTx();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
tx.logTxFinishErrorSafe(log, true, e);
if (e instanceof Error)
throw (Error)e;
}
finally {
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
}
/**
* @param nodeId Node id.
* @param req Request.
* @param res Response.
* @param dhtTx Dht tx.
* @param nearTx Near tx.
*/
private void sendReply(UUID nodeId,
GridDhtTxPrepareRequest req,
GridDhtTxPrepareResponse res,
GridDhtTxRemote dhtTx,
GridNearTxRemote nearTx) {
try {
// Reply back to sender.
ctx.io().send(nodeId, res, req.policy());
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Sent dht prepare response [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyCheckedException) {
if (txPrepareMsgLog.isDebugEnabled()) {
txPrepareMsgLog.debug("Failed to send dht prepare response, node left [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
}
else {
U.warn(log, "Failed to send tx response to remote node (will rollback transaction) [" +
"txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
", node=" + nodeId +
", err=" + e.getMessage() + ']');
}
if (nearTx != null)
try {
nearTx.rollbackRemoteTx();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
if (dhtTx != null)
try {
dhtTx.rollbackRemoteTx();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
}
}
/**
* Sends tx finish response to remote node, if response is requested.
*
* @param nodeId Node id that originated finish request.
* @param req Request.
* @param committed {@code True} if transaction committed on this node.
* @param nearTxId Near tx version.
*/
private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) {
if (req.replyRequired() || req.checkCommitted()) {
GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(
req.partition(),
req.version(),
req.futureId(),
req.miniId());
if (req.checkCommitted()) {
res.checkCommitted(true);
if (committed) {
if (req.needReturnValue()) {
try {
GridCacheReturnCompletableWrapper wrapper = ctx.tm().getCommittedTxReturn(req.version());
if (wrapper != null)
res.returnValue(wrapper.fut().get());
else
assert !ctx.discovery().alive(nodeId) : nodeId;
}
catch (IgniteCheckedException ignored) {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Failed to gain entry processor return value. [txId=" + nearTxId +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
}
}
}
else {
ClusterTopologyCheckedException cause =
new ClusterTopologyCheckedException("Primary node left grid.");
res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed to commit transaction " +
"(transaction has been rolled back on backup node): " + req.version(), cause));
}
}
try {
ctx.io().send(nodeId, res, req.policy());
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Sent dht tx finish response [txId=" + nearTxId +
", dhtTxId=" + req.version() +
", node=" + nodeId +
", checkCommitted=" + req.checkCommitted() + ']');
}
}
catch (Throwable e) {
// Double-check.
if (ctx.discovery().node(nodeId) == null) {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Node left while send dht tx finish response [txId=" + nearTxId +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
}
else {
U.error(log, "Failed to send finish response to node [txId=" + nearTxId +
", dhtTxId=" + req.version() +
", nodeId=" + nodeId +
", res=" + res + ']', e);
}
if (e instanceof Error)
throw (Error)e;
}
}
else {
if (txFinishMsgLog.isDebugEnabled()) {
txFinishMsgLog.debug("Skip send dht tx finish response [txId=" + nearTxId +
", dhtTxId=" + req.version() +
", node=" + nodeId + ']');
}
}
}
/**
* @param nodeId Node ID.
* @param req Request.
* @param res Response.
* @return Remote transaction.
* @throws IgniteCheckedException If failed.
*/
@Nullable GridDhtTxRemote startRemoteTx(
UUID nodeId,
GridDhtTxPrepareRequest req,
GridDhtTxPrepareResponse res
) throws IgniteCheckedException {
if (req.queryUpdate() || !F.isEmpty(req.writes())) {
GridDhtTxRemote tx = ctx.tm().tx(req.version());
if (tx == null) {
boolean single = req.last() && req.writes().size() == 1;
tx = new GridDhtTxRemote(
ctx,
req.nearNodeId(),
req.futureId(),
nodeId,
req.topologyVersion(),
req.version(),
null,
req.system(),
req.policy(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
req.timeout(),
req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
req.nearXidVersion(),
req.transactionNodes(),
securitySubjectId(ctx),
req.taskNameHash(),
single,
req.storeWriteThrough(),
req.txLabel());
tx.onePhaseCommit(req.onePhaseCommit());
tx.writeVersion(req.writeVersion());
tx = ctx.tm().onCreated(null, tx);
if (tx == null || !ctx.tm().onStarted(tx)) {
if (log.isDebugEnabled())
log.debug("Attempt to start a completed transaction (will ignore): " + tx);
applyPartitionsUpdatesCounters(req.updateCounters(), true, false);
return null;
}
if (ctx.discovery().node(nodeId) == null) {
tx.state(ROLLING_BACK);
tx.state(ROLLED_BACK);
ctx.tm().uncommitTx(tx);
applyPartitionsUpdatesCounters(req.updateCounters(), true, false);
return null;
}
ctx.versions().onReceived(nodeId, req.writeVersion());
}
else {
tx.writeVersion(req.writeVersion());
tx.transactionNodes(req.transactionNodes());
}
TxCounters txCounters = null;
if (req.updateCounters() != null) {
txCounters = tx.txCounters(true);
txCounters.updateCounters(req.updateCounters());
}
Set<GridDhtLocalPartition> reservedParts = new HashSet<>();
try {
if (!tx.isSystemInvalidate()) {
int idx = 0;
for (IgniteTxEntry entry : req.writes()) {
GridCacheContext cacheCtx = entry.context();
int part = cacheCtx.affinity().partition(entry.key());
try {
GridDhtLocalPartition locPart = cacheCtx.topology().localPartition(part,
req.topologyVersion(),
false);
// Avoid enlisting to invalid partition.
boolean reserved = locPart != null && reservedParts.contains(locPart);
if (!reserved) {
if ((reserved = locPart != null && locPart.reserve()))
reservedParts.add(locPart);
}
if (reserved) {
tx.addWrite(entry, ctx.deploy().globalLoader());
if (txCounters != null && entry.op() != NOOP && !(entry.op() == TRANSFORM && entry.noop())) {
Long cntr = txCounters.generateNextCounter(entry.cacheId(), part);
if (cntr != null) // Counter is null if entry is no-op.
entry.updateCounter(cntr);
}
if (isNearEnabled(cacheCtx) && req.invalidateNearEntry(idx))
invalidateNearEntry(cacheCtx, entry.key(), req.version());
if (req.needPreloadKey(idx)) {
GridCacheEntryEx cached = entry.cached();
if (cached == null)
cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
GridCacheEntryInfo info = cached.info();
if (info != null && !info.isNew() && !info.isDeleted())
res.addPreloadEntry(info);
}
if (cacheCtx.readThroughConfigured() &&
!entry.skipStore() &&
entry.op() == TRANSFORM &&
entry.oldValueOnPrimary() &&
!entry.hasValue()) {
while (true) {
try {
GridCacheEntryEx cached = entry.cached();
if (cached == null) {
cached = cacheCtx.cache().entryEx(entry.key(), req.topologyVersion());
entry.cached(cached);
}
CacheObject val = cached.innerGet(
/*ver*/null,
tx,
/*readThrough*/false,
/*updateMetrics*/false,
/*evt*/false,
/*transformClo*/null,
tx.resolveTaskName(),
/*expiryPlc*/null,
/*keepBinary*/true);
if (val == null)
val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
if (val != null)
entry.readValue(val);
break;
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got entry removed exception, will retry: " + entry.txKey());
entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
}
}
}
}
else
tx.addInvalidPartition(cacheCtx.cacheId(), part);
}
catch (GridDhtInvalidPartitionException e) {
tx.addInvalidPartition(cacheCtx.cacheId(), part);
}
idx++;
}
}
// Prepare prior to reordering, so the pending locks added
// in prepare phase will get properly ordered as well.
tx.prepareRemoteTx();
}
finally {
reservedParts.forEach(GridDhtLocalPartition::release);
}
if (req.last()) {
assert !F.isEmpty(req.transactionNodes()) :
"Received last prepare request with empty transaction nodes: " + req;
tx.state(PREPARED);
}
res.invalidPartitionsByCacheId(tx.invalidPartitions());
if (!req.queryUpdate() && tx.empty() && req.last()) {
tx.skipCompletedVersions(req.skipCompletedVersion());
tx.rollbackRemoteTx();
return null;
}
return tx;
}
return null;
}
/**
* Writes updated values on the backup node.
*
* @param tx Transaction.
* @param ctx Cache context.
* @param op Operation.
* @param keys Keys.
* @param vals Values sent from the primary node.
* @param snapshot Mvcc snapshot.
* @param batchNum Batch number.
* @param futId Future id.
* @throws IgniteCheckedException If failed.
*/
public void mvccEnlistBatch(GridDhtTxRemote tx, GridCacheContext ctx, EnlistOperation op, List<KeyCacheObject> keys,
List<Message> vals, MvccSnapshot snapshot, IgniteUuid futId, int batchNum) throws IgniteCheckedException {
assert keys != null && (vals == null || vals.size() == keys.size());
assert tx != null;
GridDhtCacheAdapter dht = ctx.dht();
tx.addActiveCache(ctx, false);
for (int i = 0; i < keys.size(); i++) {
KeyCacheObject key = keys.get(i);
assert key != null;
int part = ctx.affinity().partition(key);
try {
GridDhtLocalPartition locPart = ctx.topology().localPartition(part, tx.topologyVersion(), false);
if (locPart != null && locPart.reserve()) {
try {
// Skip renting partitions.
if (locPart.state() == RENTING) {
tx.addInvalidPartition(ctx.cacheId(), part);
continue;
}
CacheObject val = null;
EntryProcessor entryProc = null;
Object[] invokeArgs = null;
boolean needOldVal = tx.txState().useMvccCaching(ctx.cacheId());
Message val0 = vals != null ? vals.get(i) : null;
CacheEntryInfoCollection entries =
val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
if (entries == null && op.isInvoke()) {
assert val0 instanceof GridInvokeValue;
GridInvokeValue invokeVal = (GridInvokeValue)val0;
entryProc = invokeVal.entryProcessor();
invokeArgs = invokeVal.invokeArgs();
}
assert entries != null || entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
GridCacheUpdateTxResult updRes;
while (true) {
ctx.shared().database().checkpointReadLock();
try {
if (entries == null) {
switch (op) {
case DELETE:
updRes = entry.mvccRemove(
tx,
ctx.localNodeId(),
tx.topologyVersion(),
snapshot,
false,
needOldVal,
null,
false);
break;
case INSERT:
case TRANSFORM:
case UPSERT:
case UPDATE:
updRes = entry.mvccSet(
tx,
ctx.localNodeId(),
val,
entryProc,
invokeArgs,
0,
tx.topologyVersion(),
snapshot,
op.cacheOperation(),
false,
false,
needOldVal,
null,
false,
false);
break;
default:
throw new IgniteSQLException("Cannot acquire lock for operation [op= "
+ op + "]" + "Operation is unsupported at the moment ",
IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
}
}
else {
updRes = entry.mvccUpdateRowsWithPreloadInfo(tx,
ctx.localNodeId(),
tx.topologyVersion(),
entries.infos(),
op.cacheOperation(),
snapshot,
futId,
batchNum);
}
break;
}
catch (GridCacheEntryRemovedException ignore) {
entry = dht.entryExx(key);
}
finally {
ctx.shared().database().checkpointReadUnlock();
}
}
if (!updRes.filtered())
ctx.shared().mvccCaching().addEnlisted(key, updRes.newValue(), 0, 0, tx.xidVersion(),
updRes.oldValue(), tx.local(), tx.topologyVersion(), snapshot, ctx.cacheId(), tx, futId, batchNum);
assert updRes.updateFuture() == null : "Entry should not be locked on the backup";
}
finally {
locPart.release();
}
}
else
tx.addInvalidPartition(ctx.cacheId(), part);
}
catch (GridDhtInvalidPartitionException e) {
tx.addInvalidPartition(ctx.cacheId(), e.partition());
}
}
}
/**
* @param cacheCtx Context.
* @param key Key
* @param ver Version.
* @throws IgniteCheckedException If invalidate failed.
*/
private void invalidateNearEntry(GridCacheContext cacheCtx, KeyCacheObject key, GridCacheVersion ver)
throws IgniteCheckedException {
GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near();
GridCacheEntryEx nearEntry = near.peekEx(key);
if (nearEntry != null)
nearEntry.invalidate(ver);
}
/**
* Called while processing dht tx prepare request.
*
* @param ldr Loader.
* @param nodeId Sender node ID.
* @param req Request.
* @return Remote transaction.
* @throws IgniteCheckedException If failed.
*/
@Nullable private GridNearTxRemote startNearRemoteTx(ClassLoader ldr, UUID nodeId,
GridDhtTxPrepareRequest req) throws IgniteCheckedException {
if (!F.isEmpty(req.nearWrites())) {
GridNearTxRemote tx = ctx.tm().nearTx(req.version());
if (tx == null) {
tx = new GridNearTxRemote(
ctx,
req.topologyVersion(),
ldr,
nodeId,
req.nearNodeId(),
req.version(),
null,
req.system(),
req.policy(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
req.timeout(),
req.nearWrites(),
req.txSize(),
securitySubjectId(ctx),
req.taskNameHash(),
req.txLabel()
);
tx.writeVersion(req.writeVersion());
if (!tx.empty()) {
tx = ctx.tm().onCreated(null, tx);
if (tx == null || !ctx.tm().onStarted(tx))
throw new IgniteTxRollbackCheckedException("Attempt to start a completed transaction: " + tx);
}
}
else
tx.addEntries(ldr, req.nearWrites());
tx.ownedVersions(req.owned());
// Prepare prior to reordering, so the pending locks added
// in prepare phase will get properly ordered as well.
tx.prepareRemoteTx();
if (req.last())
tx.state(PREPARED);
return tx;
}
return null;
}
/**
* @param nodeId Node ID.
* @param req Request.
*/
private void processCheckPreparedTxRequest(final UUID nodeId,
final GridCacheTxRecoveryRequest req) {
if (txRecoveryMsgLog.isDebugEnabled()) {
txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() +
", node=" + nodeId + ']');
}
IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) :
ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
if (fut == null || fut.isDone()) {
boolean prepared;
try {
prepared = fut == null ? true : fut.get();
}
catch (IgniteCheckedException e) {
U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
prepared = false;
}
sendCheckPreparedResponse(nodeId, req, prepared);
}
else {
fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
boolean prepared;
try {
prepared = fut.get();
}
catch (IgniteCheckedException e) {
U.error(log, "Check prepared transaction future failed [req=" + req + ']', e);
prepared = false;
}
sendCheckPreparedResponse(nodeId, req, prepared);
}
});
}
}
/**
* @param nodeId Node ID.
* @param req Request.
* @param prepared {@code True} if all transaction prepared or committed.
*/
private void sendCheckPreparedResponse(UUID nodeId,
GridCacheTxRecoveryRequest req,
boolean prepared) {
GridCacheTxRecoveryResponse res = new GridCacheTxRecoveryResponse(req.version(),
req.futureId(),
req.miniId(),
prepared,
req.deployInfo() != null);
try {
ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
if (txRecoveryMsgLog.isDebugEnabled()) {
txRecoveryMsgLog.debug("Sent tx recovery response [txId=" + req.nearXidVersion() +
", node=" + nodeId + ", res=" + res + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
if (txRecoveryMsgLog.isDebugEnabled())
txRecoveryMsgLog.debug("Failed to send tx recovery response, node failed [" +
", txId=" + req.nearXidVersion() +
", node=" + nodeId +
", res=" + res + ']');
}
catch (IgniteCheckedException e) {
U.error(txRecoveryMsgLog, "Failed to send tx recovery response [txId=" + req.nearXidVersion() +
", node=" + nodeId +
", req=" + req +
", res=" + res + ']', e);
}
}
/**
* @param nodeId Node ID.
* @param res Response.
*/
protected void processCheckPreparedTxResponse(UUID nodeId, GridCacheTxRecoveryResponse res) {
if (txRecoveryMsgLog.isInfoEnabled()) {
txRecoveryMsgLog.info("Received tx recovery response [txId=" + res.version() +
", node=" + nodeId +
", res=" + res + ']');
}
GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().future(res.futureId());
if (fut == null) {
if (txRecoveryMsgLog.isInfoEnabled()) {
txRecoveryMsgLog.info("Failed to find future for tx recovery response [txId=" + res.version() +
", node=" + nodeId + ", res=" + res + ']');
}
return;
}
else
res.txState(fut.tx().txState());
fut.onResult(nodeId, res);
}
/**
* Applies partition counter updates for transactions.
* <p>
* Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
*
* @param counters Counters.
*/
public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters)
throws IgniteCheckedException {
applyPartitionsUpdatesCounters(counters, false, false);
}
/**
* Applies partition counter updates for transactions.
* <p>
* Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
* <p>
* On rollback counters should be applied on the primary only after backup nodes, otherwise if the primary fail
* before sending rollback requests to backups remote transactions can be committed by recovery protocol and
* partition consistency will not be restored when primary returns to the grid because RollbackRecord was written
* (actual for persistent mode only).
*
* @param counters Counter values to be updated.
* @param rollback {@code True} if applied during rollbacks.
* @param rollbackOnPrimary {@code True} if rollback happens on primary node. Passed to CQ engine.
*/
public void applyPartitionsUpdatesCounters(
Iterable<PartitionUpdateCountersMessage> counters,
boolean rollback,
boolean rollbackOnPrimary
) throws IgniteCheckedException {
if (counters == null)
return;
WALPointer ptr = null;
try {
for (PartitionUpdateCountersMessage counter : counters) {
GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
GridDhtPartitionTopology top = ctx0.topology();
AffinityTopologyVersion topVer = top.readyTopologyVersion();
assert top != null;
for (int i = 0; i < counter.size(); i++) {
boolean invalid = false;
try {
GridDhtLocalPartition part = top.localPartition(counter.partition(i));
if (part != null && part.reserve()) {
try {
if (part.state() != RENTING) { // Check is actual only for backup node.
long start = counter.initialCounter(i);
long delta = counter.updatesCount(i);
boolean updated = part.updateCounter(start, delta);
// Need to log rolled back range for logical recovery.
if (updated && rollback) {
CacheGroupContext grpCtx = part.group();
if (grpCtx.persistenceEnabled() && grpCtx.walEnabled() && !grpCtx.mvccEnabled()) {
RollbackRecord rec =
new RollbackRecord(grpCtx.groupId(), part.id(), start, delta);
ptr = ctx.wal().log(rec);
}
for (int cntr = 1; cntr <= delta; cntr++) {
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
topVer, rollbackOnPrimary);
}
}
}
else
invalid = true;
}
finally {
part.release();
}
}
else
invalid = true;
}
catch (GridDhtInvalidPartitionException e) {
invalid = true;
}
if (log.isDebugEnabled() && invalid) {
log.debug("Received partition update counters message for invalid partition, ignoring: " +
"[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + ']');
}
}
}
}
finally {
if (ptr != null)
ctx.wal().flush(ptr, false);
}
}
/**
* @param tx Transaction.
* @param node Backup node.
* @return Partition counters for the given backup node.
*/
@Nullable public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(
IgniteInternalTx tx, ClusterNode node) {
TxCounters txCntrs = tx.txCounters(false);
Collection<PartitionUpdateCountersMessage> updCntrs;
if (txCntrs == null || F.isEmpty(updCntrs = txCntrs.updateCounters()))
return null;
List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
AffinityTopologyVersion top = tx.topologyVersionSnapshot();
for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
GridDhtPartitionTopology topology = ctx.cacheContext(partCntrs.cacheId()).topology();
PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
for (int i = 0; i < partCntrs.size(); i++) {
int part = partCntrs.partition(i);
if (topology.nodes(part, top).indexOf(node) > 0)
resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
}
if (resCntrs.size() > 0)
res.add(resCntrs);
}
return res;
}
}