blob: 4651bcd767f53e4cda688229a24c04a3e918ca76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.ExchangeContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryCntr;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQueryId;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxKey;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_HINTS_BIT_OFF;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_INITIAL_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_READ_OP_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_START_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_START_OP_CNTR;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compare;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.hasNewVersion;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.isVisible;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.noCoordinatorError;
import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_ID;
import static org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog.TX_LOG_CACHE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.KEY_ONLY;
/**
* MVCC processor.
*/
@SuppressWarnings("serial")
public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProcessor, DatabaseLifecycleListener {
/** */
private static final IgniteProductVersion MVCC_SUPPORTED_SINCE = IgniteProductVersion.fromString("2.7.0");
/** */
private static final Waiter LOCAL_TRANSACTION_MARKER = new LocalTransactionMarker();
/** Dummy tx for vacuum. */
private static final IgniteInternalTx DUMMY_TX = new GridNearTxLocal();
/** For tests only. */
private static IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC;
/**
* For testing only.
*
* @param crdC Closure assigning coordinator.
*/
static void coordinatorAssignClosure(IgniteClosure<Collection<ClusterNode>, ClusterNode> crdC) {
MvccProcessorImpl.crdC = crdC;
}
/** Topology version when local node was assigned as coordinator. */
private long crdVer;
/** */
private volatile MvccCoordinator curCrd;
/** */
private volatile MvccCoordinator assignedCrd;
/** */
private TxLog txLog;
/** */
private List<GridWorker> vacuumWorkers;
/** */
private BlockingQueue<VacuumTask> cleanupQueue;
/**
* Vacuum mutex. Prevents concurrent vacuum while start/stop operations
*/
private final Object mux = new Object();
/** */
private final GridAtomicLong futIdCntr = new GridAtomicLong(0);
/** */
private final GridAtomicLong mvccCntr = new GridAtomicLong(MVCC_START_CNTR);
/** */
private final GridAtomicLong committedCntr = new GridAtomicLong(MVCC_INITIAL_CNTR);
/**
* Contains active transactions on mvcc coordinator. Key is mvcc counter.
* Access is protected by "this" monitor.
*/
private final Map<Long, ActiveTx> activeTxs = new HashMap<>();
/** Active query trackers. */
private final Map<Long, MvccQueryTracker> activeTrackers = new ConcurrentHashMap<>();
/** */
private final Map<UUID, Map<Long, MvccSnapshotResponseListener>> snapLsnrs = new ConcurrentHashMap<>();
/** */
private final Map<Long, WaitAckFuture> ackFuts = new ConcurrentHashMap<>();
/** */
private final Map<Long, GridFutureAdapter> waitTxFuts = new ConcurrentHashMap<>();
/** */
private final Map<TxKey, Waiter> waitMap = new ConcurrentHashMap<>();
/** */
private final ActiveQueries activeQueries = new ActiveQueries();
/** */
private final MvccPreviousCoordinatorQueries prevCrdQueries = new MvccPreviousCoordinatorQueries();
/** */
private final GridFutureAdapter<Void> initFut = new GridFutureAdapter<>();
/** Flag whether at least one cache with {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered. */
private volatile boolean mvccEnabled;
/** Flag whether all nodes in cluster support MVCC. */
private volatile boolean mvccSupported = true;
/**
* Maps failed node id to votes accumulator for that node.
*/
private final ConcurrentHashMap<UUID, RecoveryBallotBox> recoveryBallotBoxes = new ConcurrentHashMap<>();
/**
* @param ctx Context.
*/
public MvccProcessorImpl(GridKernalContext ctx) {
super(ctx);
ctx.internalSubscriptionProcessor().registerDatabaseListener(this);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
ctx.event().addLocalEventListener(new CacheCoordinatorNodeFailListener(),
EVT_NODE_FAILED, EVT_NODE_LEFT);
ctx.io().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener());
}
/** {@inheritDoc} */
@Override public boolean mvccEnabled() {
return mvccEnabled;
}
/** {@inheritDoc} */
@Override public void preProcessCacheConfiguration(CacheConfiguration ccfg) {
if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
if (!mvccSupported)
throw new IgniteException("Cannot start MVCC transactional cache. " +
"MVCC is unsupported by the cluster.");
mvccEnabled = true;
}
}
/** {@inheritDoc} */
@Override public void validateCacheConfiguration(CacheConfiguration ccfg) {
if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
if (!mvccSupported)
throw new IgniteException("Cannot start MVCC transactional cache. " +
"MVCC is unsupported by the cluster.");
mvccEnabled = true;
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) {
if (mvccEnabled && node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) < 0) {
String errMsg = "Failed to add node to topology. MVCC is enabled on the cluster, but " +
"the node doesn't support MVCC [nodeId=" + node.id() + ']';
return new IgniteNodeValidationResult(node.id(), errMsg, errMsg);
}
return null;
}
/** {@inheritDoc} */
@Override public void ensureStarted() throws IgniteCheckedException {
if (!ctx.clientNode()) {
assert mvccEnabled && mvccSupported;
if (txLog == null)
txLog = new TxLog(ctx, ctx.cache().context().database());
startVacuumWorkers();
if (log.isInfoEnabled())
log.info("Mvcc processor started.");
}
}
/** {@inheritDoc} */
@Override public void beforeStop(IgniteCacheDatabaseSharedManager mgr) {
stopVacuumWorkers();
txLog = null;
}
/** {@inheritDoc} */
@Override public void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
// We have to always init txLog data region.
DataStorageConfiguration dscfg = dataStorageConfiguration();
mgr.addDataRegion(
dscfg,
createTxLogRegion(dscfg),
CU.isPersistenceEnabled(ctx.config()));
}
/** {@inheritDoc} */
@Override public void afterInitialise(IgniteCacheDatabaseSharedManager mgr) {
// No-op.
}
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public void beforeMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
assert CU.isPersistenceEnabled(ctx.config());
assert txLog == null;
ctx.cache().context().pageStore().initialize(TX_LOG_CACHE_ID, 1,
TX_LOG_CACHE_NAME, mgr.dataRegion(TX_LOG_CACHE_NAME).memoryMetrics());
boolean hasMvccCaches = ctx.cache().cacheGroups().stream().filter(CacheGroupContext::persistenceEnabled)
.anyMatch(g -> g.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT);
if (hasMvccCaches) {
txLog = new TxLog(ctx, mgr);
mvccEnabled = true;
}
}
/** {@inheritDoc} */
@Override public void afterMemoryRestore(IgniteCacheDatabaseSharedManager mgr) {
// No-op.
}
/** {@inheritDoc} */
@Override public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer,
@Nullable DiscoveryCustomMessage customMsg) {
if (evtType == EVT_NODE_METRICS_UPDATED)
return;
if (evtType == EVT_DISCOVERY_CUSTOM_EVT)
checkMvccCacheStarted(customMsg);
else
assignMvccCoordinator(evtType, nodes, topVer);
}
/** {@inheritDoc} */
@Override public void onExchangeStart(MvccCoordinator mvccCrd, ExchangeContext exchCtx, ClusterNode exchCrd) {
if (!exchCtx.newMvccCoordinator())
return;
GridLongList activeQryTrackers = collectActiveQueryTrackers();
exchCtx.addActiveQueries(ctx.localNodeId(), activeQryTrackers);
if (exchCrd == null || !mvccCrd.nodeId().equals(exchCrd.id())) {
try {
sendMessage(mvccCrd.nodeId(), new MvccActiveQueriesMessage(activeQryTrackers));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
}
}
}
/** {@inheritDoc} */
@Override public void onExchangeDone(boolean newCrd, DiscoCache discoCache, Map<UUID, GridLongList> activeQueries) {
if (!newCrd) {
if (curCrd != null && ctx.localNodeId().equals(curCrd.nodeId()) && discoCache != null)
cleanupOrphanedServerTransactions(discoCache.serverNodes());
return;
}
ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange();
if (ctx.localNodeId().equals(curCrd.nodeId())) {
assert ctx.localNodeId().equals(curCrd.nodeId());
MvccCoordinator crd = discoCache.mvccCoordinator();
assert crd != null;
// No need to re-initialize if coordinator version hasn't changed (e.g. it was cluster activation).
if (crdVer == crd.coordinatorVersion())
return;
crdVer = crd.coordinatorVersion();
if (log.isInfoEnabled())
log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
", crdVer=" + crdVer + ']');
prevCrdQueries.init(activeQueries, F.view(discoCache.allNodes(), this::supportsMvcc), ctx.discovery());
initFut.onDone();
}
}
/**
* Cleans up active transacitons lost near node which is server. Executed on coordinator.
* @param liveSrvs Live server nodes at the moment of cleanup.
*/
private void cleanupOrphanedServerTransactions(Collection<ClusterNode> liveSrvs) {
Set<UUID> ids = liveSrvs.stream()
.map(ClusterNode::id)
.collect(Collectors.toSet());
List<Long> forRmv = new ArrayList<>();
synchronized (this) {
for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) {
// If node started tx is not known as live then remove such tx from active list
ActiveTx activeTx = entry.getValue();
if (activeTx.getClass() == ActiveServerTx.class && !ids.contains(activeTx.nearNodeId))
forRmv.add(entry.getKey());
}
}
for (Long txCntr : forRmv)
// Committed counter is increased because it is not known if transaction was committed or not and we must
// bump committed counter for committed transaction as it is used in (read-only) query snapshot.
onTxDone(txCntr, true);
}
/** {@inheritDoc} */
@Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) {
prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
}
/** {@inheritDoc} */
@Override @Nullable public MvccCoordinator currentCoordinator() {
return currentCoordinator(AffinityTopologyVersion.NONE);
}
/** {@inheritDoc} */
@Override @Nullable public MvccCoordinator currentCoordinator(AffinityTopologyVersion topVer) {
MvccCoordinator crd = curCrd;
// Assert coordinator did not already change.
assert crd == null
|| topVer == AffinityTopologyVersion.NONE
|| crd.topologyVersion().compareTo(topVer) <= 0 : "Invalid coordinator [crd=" + crd + ", topVer=" + topVer + ']';
return crd;
}
/** {@inheritDoc} */
@Override @Nullable public MvccCoordinator assignedCoordinator() {
return assignedCrd;
}
/** {@inheritDoc} */
@Override public UUID currentCoordinatorId() {
MvccCoordinator curCrd = this.curCrd;
return curCrd != null ? curCrd.nodeId() : null;
}
/** {@inheritDoc} */
@Override public void updateCoordinator(MvccCoordinator curCrd) {
this.curCrd = curCrd;
}
/** {@inheritDoc} */
@Override public byte state(MvccVersion ver) throws IgniteCheckedException {
return state(ver.coordinatorVersion(), ver.counter());
}
/** {@inheritDoc} */
@Override public byte state(long crdVer, long cntr) throws IgniteCheckedException {
assert txLog != null && mvccEnabled;
return txLog.get(crdVer, cntr);
}
/** {@inheritDoc} */
@Override public void updateState(MvccVersion ver, byte state) throws IgniteCheckedException {
updateState(ver, state, true);
}
/** {@inheritDoc} */
@Override public void updateState(MvccVersion ver, byte state, boolean primary) throws IgniteCheckedException {
assert txLog != null && mvccEnabled;
TxKey key = new TxKey(ver.coordinatorVersion(), ver.counter());
txLog.put(key, state, primary);
Waiter waiter;
if (primary && (state == TxState.ABORTED || state == TxState.COMMITTED)
&& (waiter = waitMap.remove(key)) != null)
waiter.run(ctx);
}
/** {@inheritDoc} */
@Override public void registerLocalTransaction(long crd, long cntr) {
Waiter old = waitMap.putIfAbsent(new TxKey(crd, cntr), LOCAL_TRANSACTION_MARKER);
assert old == null || old.hasLocalTransaction();
}
/** {@inheritDoc} */
@Override public boolean hasLocalTransaction(long crd, long cntr) {
Waiter waiter = waitMap.get(new TxKey(crd, cntr));
return waiter != null && waiter.hasLocalTransaction();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> waitFor(GridCacheContext cctx, MvccVersion locked) throws IgniteCheckedException {
TxKey key = new TxKey(locked.coordinatorVersion(), locked.counter());
LockFuture fut = new LockFuture(cctx.ioPolicy());
Waiter waiter = waitMap.merge(key, fut, Waiter::concat);
byte state = txLog.get(key);
if ((state == TxState.ABORTED || state == TxState.COMMITTED)
&& !waiter.hasLocalTransaction() && (waiter = waitMap.remove(key)) != null)
waiter.run(ctx);
return fut;
}
/** {@inheritDoc} */
@Override public void addQueryTracker(MvccQueryTracker tracker) {
assert tracker.id() != MVCC_TRACKER_ID_NA;
MvccQueryTracker tr = activeTrackers.put(tracker.id(), tracker);
assert tr == null;
}
/** {@inheritDoc} */
@Override public void removeQueryTracker(Long id) {
activeTrackers.remove(id);
}
/** {@inheritDoc} */
@Override public MvccSnapshot tryRequestSnapshotLocal() throws ClusterTopologyCheckedException {
return tryRequestSnapshotLocal(null);
}
/** {@inheritDoc} */
@Override public MvccSnapshot tryRequestSnapshotLocal(@Nullable IgniteInternalTx tx) throws ClusterTopologyCheckedException {
MvccCoordinator crd = currentCoordinator();
if (crd == null)
throw noCoordinatorError();
if (tx != null) {
AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
if (topVer != null && topVer.compareTo(crd.topologyVersion()) < 0)
throw new ClusterTopologyCheckedException("Mvcc coordinator is outdated " +
"for the locked topology version. [crd=" + crd + ", tx=" + tx + ']');
}
if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone())
return null;
else if (tx != null)
return assignTxSnapshot(0L, ctx.localNodeId(), false);
else
return activeQueries.assignQueryCounter(ctx.localNodeId(), 0L);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) {
MvccSnapshotFuture fut = new MvccSnapshotFuture();
requestSnapshotAsync(tx, fut);
return fut;
}
/** {@inheritDoc} */
@Override public void requestSnapshotAsync(MvccSnapshotResponseListener lsnr) {
requestSnapshotAsync(null, lsnr);
}
/** {@inheritDoc} */
@Override public void requestSnapshotAsync(IgniteInternalTx tx, MvccSnapshotResponseListener lsnr) {
MvccCoordinator crd = currentCoordinator();
if (crd == null) {
lsnr.onError(noCoordinatorError());
return;
}
if (tx != null) {
AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
if (topVer != null && topVer.compareTo(crd.topologyVersion()) < 0) {
lsnr.onError(new ClusterTopologyCheckedException("Mvcc coordinator is outdated " +
"for the locked topology version. [crd=" + crd + ", tx=" + tx + ']'));
return;
}
}
if (ctx.localNodeId().equals(crd.nodeId())) {
if (!initFut.isDone()) {
// Wait for the local coordinator init.
initFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
requestSnapshotAsync(tx, lsnr);
}
});
}
else if (tx != null)
lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false));
else
lsnr.onResponse(activeQueries.assignQueryCounter(ctx.localNodeId(), 0L));
return;
}
// Send request to the remote coordinator.
UUID nodeId = crd.nodeId();
long id = futIdCntr.incrementAndGet();
Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.get(nodeId), map0;
if (map == null && (map0 = snapLsnrs.putIfAbsent(nodeId, map = new ConcurrentHashMap<>())) != null)
map = map0;
map.put(id, lsnr);
try {
sendMessage(nodeId, tx != null ? new MvccTxSnapshotRequest(id) : new MvccQuerySnapshotRequest(id));
}
catch (IgniteCheckedException e) {
if (map.remove(id) != null)
lsnr.onError(e);
}
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> ackTxCommit(MvccSnapshot updateVer) {
return ackTxCommit(updateVer, null, 0L);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> ackTxCommit(MvccVersion updateVer, MvccSnapshot readSnapshot,
long qryId) {
assert updateVer != null;
MvccCoordinator crd = curCrd;
if (updateVer.coordinatorVersion() == crd.coordinatorVersion())
return sendTxCommit(crd, createTxAckMessage(futIdCntr.incrementAndGet(), updateVer, readSnapshot, qryId));
else if (readSnapshot != null)
ackQueryDone(readSnapshot, qryId);
return new GridFinishedFuture<>();
}
/** {@inheritDoc} */
@Override public void ackTxRollback(MvccVersion updateVer) {
assert updateVer != null;
MvccCoordinator crd = curCrd;
if (crd.coordinatorVersion() != updateVer.coordinatorVersion())
return;
MvccAckRequestTx msg = createTxAckMessage(-1, updateVer, null, 0L);
msg.skipResponse(true);
try {
sendMessage(crd.nodeId(), msg);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.nodeId() + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.nodeId() + ']', e);
}
}
/** {@inheritDoc} */
@Override public void ackTxRollback(MvccVersion updateVer, MvccSnapshot readSnapshot, long qryTrackerId) {
assert updateVer != null;
MvccCoordinator crd = curCrd;
if (crd.coordinatorVersion() != updateVer.coordinatorVersion()) {
if (readSnapshot != null)
ackQueryDone(readSnapshot, qryTrackerId);
return;
}
MvccAckRequestTx msg = createTxAckMessage(-1, updateVer, readSnapshot, qryTrackerId);
msg.skipResponse(true);
try {
sendMessage(crd.nodeId(), msg);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send tx rollback ack, node left [msg=" + msg + ", node=" + crd.nodeId() + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send tx rollback ack [msg=" + msg + ", node=" + crd.nodeId() + ']', e);
}
}
/** {@inheritDoc} */
@Override public void ackQueryDone(MvccSnapshot snapshot, long qryId) {
assert snapshot != null;
MvccCoordinator crd = currentCoordinator();
if (crd == null || crd.coordinatorVersion() == snapshot.coordinatorVersion()
&& sendQueryDone(crd, new MvccAckRequestQueryCntr(queryTrackCounter(snapshot))))
return;
Message msg = new MvccAckRequestQueryId(qryId);
do {
crd = currentCoordinator();
}
while (!sendQueryDone(crd, msg));
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> waitTxsFuture(UUID crdId, GridLongList txs) {
assert crdId != null;
assert txs != null && !txs.isEmpty();
WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crdId, false);
ackFuts.put(fut.id, fut);
try {
sendMessage(crdId, new MvccWaitTxsRequest(fut.id, txs));
}
catch (IgniteCheckedException e) {
if (ackFuts.remove(fut.id) != null) {
if (e instanceof ClusterTopologyCheckedException)
fut.onDone(); // No need to wait, new coordinator will be assigned, finish without error.
else
fut.onDone(e);
}
}
return fut;
}
/** {@inheritDoc} */
// TODO: Proper use of diagnostic context.
@Override public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) {
boolean first = true;
for (Map<Long, MvccSnapshotResponseListener> map : snapLsnrs.values()) {
if (first) {
U.warn(log, "Pending mvcc listener: ");
first = false;
}
for (MvccSnapshotResponseListener lsnr : map.values())
U.warn(log, ">>> " + lsnr.toString());
}
first = true;
for (WaitAckFuture waitAckFut : ackFuts.values()) {
if (first) {
U.warn(log, "Pending mvcc wait ack futures: ");
first = false;
}
U.warn(log, ">>> " + waitAckFut.toString());
}
}
/**
* Removes all less or equals to the given one records from Tx log.
*
* @param ver Version.
* @throws IgniteCheckedException If fails.
*/
void removeUntil(MvccVersion ver) throws IgniteCheckedException {
txLog.removeUntil(ver.coordinatorVersion(), ver.counter());
}
/**
* TODO IGNITE-7966
*
* @return Data region configuration.
*/
private DataRegionConfiguration createTxLogRegion(DataStorageConfiguration dscfg) {
DataRegionConfiguration cfg = new DataRegionConfiguration();
cfg.setName(TX_LOG_CACHE_NAME);
cfg.setInitialSize(dscfg.getSystemRegionInitialSize());
cfg.setMaxSize(dscfg.getSystemRegionMaxSize());
cfg.setPersistenceEnabled(CU.isPersistenceEnabled(dscfg));
return cfg;
}
/**
* @return Data storage configuration.
*/
private DataStorageConfiguration dataStorageConfiguration() {
return ctx.config().getDataStorageConfiguration();
}
/** */
private void assignMvccCoordinator(int evtType, Collection<ClusterNode> nodes, long topVer) {
checkMvccSupported(nodes);
MvccCoordinator crd;
if (evtType == EVT_NODE_SEGMENTED || evtType == EVT_CLIENT_NODE_DISCONNECTED)
crd = null;
else {
crd = assignedCrd;
if (crd == null ||
((evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) && !F.nodeIds(nodes).contains(crd.nodeId()))) {
ClusterNode crdNode = null;
if (crdC != null) {
crdNode = crdC.apply(nodes);
if (log.isInfoEnabled())
log.info("Assigned coordinator using test closure: " + crd);
}
else {
// Expect nodes are sorted by order.
for (ClusterNode node : nodes) {
if (!node.isClient() && supportsMvcc(node)) {
crdNode = node;
break;
}
}
}
crd = crdNode != null ? new MvccCoordinator(crdNode.id(), coordinatorVersion(crdNode),
new AffinityTopologyVersion(topVer, 0)) : null;
if (log.isInfoEnabled() && crd != null)
log.info("Assigned mvcc coordinator [crd=" + crd + ", crdNode=" + crdNode + ']');
else if (crd == null)
U.warn(log, "New mvcc coordinator was not assigned [topVer=" + topVer + ']');
}
}
assignedCrd = crd;
}
/**
* @param crdNode Assigned coordinator node.
* @return Coordinator version.
*/
private long coordinatorVersion(ClusterNode crdNode) {
return crdNode.order() + ctx.discovery().gridStartTime();
}
/** */
private void checkMvccSupported(Collection<ClusterNode> nodes) {
if (mvccEnabled) {
assert mvccSupported;
return;
}
boolean res = true, was = mvccSupported;
for (ClusterNode node : nodes) {
if (!supportsMvcc(node)) {
res = false;
break;
}
}
if (was != res)
mvccSupported = res;
}
/** */
private boolean supportsMvcc(ClusterNode node) {
return node.version().compareToIgnoreTimestamp(MVCC_SUPPORTED_SINCE) >= 0;
}
/** */
private void checkMvccCacheStarted(@Nullable DiscoveryCustomMessage customMsg) {
assert customMsg != null;
if (!mvccEnabled && customMsg instanceof DynamicCacheChangeBatch) {
for (DynamicCacheChangeRequest req : ((DynamicCacheChangeBatch)customMsg).requests()) {
CacheConfiguration ccfg = req.startCacheConfiguration();
if (ccfg == null)
continue;
if (ccfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT) {
assert mvccSupported;
mvccEnabled = true;
}
}
}
}
/**
* @return Active queries list.
*/
private GridLongList collectActiveQueryTrackers() {
assert curCrd != null;
GridLongList activeQryTrackers = new GridLongList();
for (MvccQueryTracker tracker : activeTrackers.values()) {
long trackerId = tracker.onMvccCoordinatorChange(curCrd);
if (trackerId != MVCC_TRACKER_ID_NA)
activeQryTrackers.add(trackerId);
}
return activeQryTrackers;
}
/** */
private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) {
assert initFut.isDone();
assert crdVer != 0;
assert ctx.localNodeId().equals(currentCoordinatorId());
MvccSnapshotResponse res = new MvccSnapshotResponse();
long ver, cleanup, tracking;
synchronized (this) {
ver = mvccCntr.incrementAndGet();
tracking = ver;
cleanup = committedCntr.get() + 1;
for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) {
cleanup = Math.min(entry.getValue().tracking, cleanup);
tracking = Math.min(entry.getKey(), tracking);
res.addTx(entry.getKey());
}
ActiveTx activeTx = client ? new ActiveTx(tracking, nearId) : new ActiveServerTx(tracking, nearId);
boolean add = activeTxs.put(ver, activeTx) == null;
assert add : ver;
}
long minQry = activeQueries.minimalQueryCounter();
if (minQry != -1)
cleanup = Math.min(cleanup, minQry);
cleanup = prevCrdQueries.previousQueriesDone() ? cleanup - 1 : MVCC_COUNTER_NA;
res.init(futId, crdVer, ver, MVCC_START_OP_CNTR, cleanup, tracking);
return res;
}
/** */
private void onTxDone(Long txCntr, boolean increaseCommittedCntr) {
assert initFut.isDone();
GridFutureAdapter fut;
synchronized (this) {
activeTxs.remove(txCntr);
if (increaseCommittedCntr)
committedCntr.setIfGreater(txCntr);
}
fut = waitTxFuts.remove(txCntr);
if (fut != null)
fut.onDone();
}
/**
* @param mvccCntr Query counter.
*/
private void onQueryDone(UUID nodeId, Long mvccCntr) {
activeQueries.onQueryDone(nodeId, mvccCntr);
}
/**
* @param futId Future ID.
* @param updateVer Update version.
* @param readSnapshot Optional read version.
* @param qryTrackerId Query tracker id.
* @return Message.
*/
private MvccAckRequestTx createTxAckMessage(long futId, MvccVersion updateVer, MvccSnapshot readSnapshot,
long qryTrackerId) {
if (readSnapshot == null)
return new MvccAckRequestTx(futId, updateVer.counter());
else if (readSnapshot.coordinatorVersion() == updateVer.coordinatorVersion())
return new MvccAckRequestTxAndQueryCntr(futId, updateVer.counter(), queryTrackCounter(readSnapshot));
else
return new MvccAckRequestTxAndQueryId(futId, updateVer.counter(), qryTrackerId);
}
/**
* @param mvccVer Read version.
* @return Tracker counter.
*/
private long queryTrackCounter(MvccSnapshot mvccVer) {
long trackCntr = mvccVer.counter();
MvccLongList txs = mvccVer.activeTransactions();
int size = txs.size();
for (int i = 0; i < size; i++) {
long txVer = txs.get(i);
if (txVer < trackCntr)
trackCntr = txVer;
}
return trackCntr;
}
/**
* Launches vacuum workers and scheduler.
*/
void startVacuumWorkers() {
if (!ctx.clientNode()) {
synchronized (mux) {
if (vacuumWorkers == null) {
assert cleanupQueue == null;
cleanupQueue = new LinkedBlockingQueue<>();
vacuumWorkers = new ArrayList<>(ctx.config().getMvccVacuumThreadCount() + 1);
vacuumWorkers.add(new VacuumScheduler(ctx, log, this));
for (int i = 0; i < ctx.config().getMvccVacuumThreadCount(); i++) {
vacuumWorkers.add(new VacuumWorker(ctx, log, cleanupQueue));
}
for (GridWorker worker : vacuumWorkers) {
new IgniteThread(worker).start();
}
return;
}
}
U.warn(log, "Attempting to start active vacuum.");
}
}
/**
* Stops vacuum worker and scheduler.
*/
void stopVacuumWorkers() {
if (!ctx.clientNode()) {
List<GridWorker> workers;
BlockingQueue<VacuumTask> queue;
synchronized (mux) {
workers = vacuumWorkers;
queue = cleanupQueue;
vacuumWorkers = null;
cleanupQueue = null;
}
if (workers == null) {
if (log.isDebugEnabled() && mvccEnabled())
log.debug("Attempting to stop inactive vacuum.");
return;
}
assert queue != null;
// Stop vacuum workers outside mutex to prevent deadlocks.
U.cancel(workers);
U.join(workers, log);
if (!queue.isEmpty()) {
IgniteCheckedException ex = vacuumCancelledException();
for (VacuumTask task : queue) {
task.onDone(ex);
}
}
}
}
/**
* Runs vacuum process.
*
* @return {@code Future} with {@link VacuumMetrics}.
*/
IgniteInternalFuture<VacuumMetrics> runVacuum() {
assert !ctx.clientNode();
MvccCoordinator crd0 = currentCoordinator();
if (Thread.currentThread().isInterrupted() ||
crd0 == null ||
crdVer == 0 && ctx.localNodeId().equals(crd0.nodeId()))
return new GridFinishedFuture<>(new VacuumMetrics());
final GridFutureAdapter<VacuumMetrics> res = new GridFutureAdapter<>();
MvccSnapshot snapshot;
try {
// TODO IGNITE-8974 create special method for getting cleanup version only.
snapshot = tryRequestSnapshotLocal(DUMMY_TX);
}
catch (ClusterTopologyCheckedException e) {
throw new AssertionError(e);
}
if (snapshot != null)
continueRunVacuum(res, snapshot);
else
requestSnapshotAsync(DUMMY_TX, new MvccSnapshotResponseListener() {
@Override public void onResponse(MvccSnapshot s) {
continueRunVacuum(res, s);
}
@Override public void onError(IgniteCheckedException e) {
if (!(e instanceof ClusterTopologyCheckedException))
completeWithException(res, e);
else {
if (log.isDebugEnabled())
log.debug("Vacuum failed to receive an Mvcc snapshot. " +
"Need to retry on the stable topology. " + e.getMessage());
res.onDone(new VacuumMetrics());
}
}
});
return res;
}
/**
* @param res Result.
* @param snapshot Snapshot.
*/
private void continueRunVacuum(GridFutureAdapter<VacuumMetrics> res, MvccSnapshot snapshot) {
ackTxCommit(snapshot)
.listen(new IgniteInClosure<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
Throwable err;
if ((err = fut.error()) != null) {
U.error(log, "Vacuum error.", err);
res.onDone(err);
}
else if (snapshot.cleanupVersion() <= MVCC_COUNTER_NA)
res.onDone(new VacuumMetrics());
else {
try {
if (log.isDebugEnabled())
log.debug("Started vacuum with cleanup version=" + snapshot.cleanupVersion() + '.');
synchronized (mux) {
if (cleanupQueue == null) {
res.onDone(vacuumCancelledException());
return;
}
GridCompoundIdentityFuture<VacuumMetrics> res0 =
new GridCompoundIdentityFuture<VacuumMetrics>(new VacuumMetricsReducer()) {
/** {@inheritDoc} */
@Override protected void logError(IgniteLogger log, String msg, Throwable e) {
// no-op
}
/** {@inheritDoc} */
@Override protected void logDebug(IgniteLogger log, String msg) {
// no-op
}
};
for (CacheGroupContext grp : ctx.cache().cacheGroups()) {
if (grp.mvccEnabled()) {
grp.topology().readLock();
try {
for (GridDhtLocalPartition part : grp.topology().localPartitions()) {
VacuumTask task = new VacuumTask(snapshot, part);
cleanupQueue.offer(task);
res0.add(task);
}
}
finally {
grp.topology().readUnlock();
}
}
}
res0.markInitialized();
res0.listen(future -> {
VacuumMetrics metrics = null; Throwable ex = null;
try {
metrics = future.get();
if (U.assertionsEnabled()) {
MvccCoordinator crd = currentCoordinator();
assert crd != null
&& crd.coordinatorVersion() >= snapshot.coordinatorVersion();
for (TxKey key : waitMap.keySet()) {
if (!( key.major() == snapshot.coordinatorVersion()
&& key.minor() > snapshot.cleanupVersion()
|| key.major() > snapshot.coordinatorVersion())) {
byte state = state(key.major(), key.minor());
assert state == TxState.ABORTED : "tx state=" + state;
}
}
}
txLog.removeUntil(snapshot.coordinatorVersion(), snapshot.cleanupVersion());
if (log.isDebugEnabled())
log.debug("Vacuum completed. " + metrics);
} catch (Throwable e) {
if (X.hasCause(e, NodeStoppingException.class)) {
if (log.isDebugEnabled())
log.debug("Cannot complete vacuum (node is stopping).");
metrics = new VacuumMetrics();
} else
ex = new GridClosureException(e);
}
res.onDone(metrics, ex);
});
}
}
catch (Throwable e) {
completeWithException(res, e);
}
}
}
});
}
/** */
private void completeWithException(GridFutureAdapter fut, Throwable e) {
fut.onDone(e);
if (e instanceof Error)
throw (Error)e;
}
/** */
@NotNull private IgniteCheckedException vacuumCancelledException() {
return new NodeStoppingException("Operation has been cancelled (node is stopping).");
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
private void sendFutureResponse(UUID nodeId, MvccWaitTxsRequest msg) {
try {
sendMessage(nodeId, new MvccFutureResponse(msg.futureId()));
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
}
}
/** */
@NotNull private IgniteInternalFuture<Void> sendTxCommit(MvccCoordinator crd, MvccAckRequestTx msg) {
WaitAckFuture fut = new WaitAckFuture(msg.futureId(), crd.nodeId(), true);
ackFuts.put(fut.id, fut);
try {
sendMessage(crd.nodeId(), msg);
}
catch (IgniteCheckedException e) {
if (ackFuts.remove(fut.id) != null) {
if (e instanceof ClusterTopologyCheckedException) {
if (log.isDebugEnabled())
log.debug("Failed to send tx ack, node left [crd=" + crd + ", msg=" + msg + ']');
fut.onDone(); // No need to ack, finish without error.
}
else
fut.onDone(e);
}
}
return fut;
}
/**
* @param crd Mvcc coordinator.
* @param msg Message.
* @return {@code True} if no need to resend the message to a new coordinator.
*/
private boolean sendQueryDone(MvccCoordinator crd, Message msg) {
if (crd == null)
return true; // no need to send ack;
try {
sendMessage(crd.nodeId(), msg);
return true;
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send query ack, node left [crd=" + crd + ", msg=" + msg + ']');
MvccCoordinator crd0 = currentCoordinator();
// Coordinator is unassigned or still the same.
return crd0 == null || crd.coordinatorVersion() == crd0.coordinatorVersion();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send query ack [crd=" + crd + ", msg=" + msg + ']', e);
return true;
}
}
/**
* Send IO message.
*
* @param nodeId Node ID.
* @param msg Message.
*/
private void sendMessage(UUID nodeId, Message msg) throws IgniteCheckedException {
ctx.io().sendToGridTopic(nodeId, TOPIC_CACHE_COORDINATOR, msg, SYSTEM_POOL);
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
private void processCoordinatorTxSnapshotRequest(UUID nodeId, MvccTxSnapshotRequest msg) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Ignore tx snapshot request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
return;
}
MvccSnapshotResponse res = assignTxSnapshot(msg.futureId(), nodeId, node.isClient());
boolean finishFailed = true;
try {
sendMessage(node.id(), res);
finishFailed = false;
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send tx snapshot response, node left [msg=" + msg + ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send tx snapshot response [msg=" + msg + ", node=" + nodeId + ']', e);
}
if (finishFailed)
onTxDone(res.counter(), false);
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
private void processCoordinatorQuerySnapshotRequest(UUID nodeId, MvccQuerySnapshotRequest msg) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Ignore query counter request processing, node left [msg=" + msg + ", node=" + nodeId + ']');
return;
}
MvccSnapshotResponse res = activeQueries.assignQueryCounter(nodeId, msg.futureId());
try {
sendMessage(node.id(), res);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
onQueryDone(nodeId, res.tracking());
U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
}
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
private void processCoordinatorSnapshotResponse(UUID nodeId, MvccSnapshotResponse msg) {
Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.get(nodeId);
MvccSnapshotResponseListener lsnr;
if (map != null && (lsnr = map.remove(msg.futureId())) != null)
lsnr.onResponse(msg);
else {
if (ctx.discovery().alive(nodeId))
U.warn(log, "Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
else if (log.isDebugEnabled())
log.debug("Failed to find query version future [node=" + nodeId + ", msg=" + msg + ']');
}
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
private void processCoordinatorQueryAckRequest(UUID nodeId, MvccAckRequestQueryCntr msg) {
onQueryDone(nodeId, msg.counter());
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
private void processNewCoordinatorQueryAckRequest(UUID nodeId, MvccAckRequestQueryId msg) {
prevCrdQueries.onQueryDone(nodeId, msg.queryTrackerId());
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
private void processCoordinatorTxAckRequest(UUID nodeId, MvccAckRequestTx msg) {
onTxDone(msg.txCounter(), msg.futureId() >= 0);
if (msg.queryCounter() != MVCC_COUNTER_NA)
onQueryDone(nodeId, msg.queryCounter());
else if (msg.queryTrackerId() != MVCC_TRACKER_ID_NA)
prevCrdQueries.onQueryDone(nodeId, msg.queryTrackerId());
if (!msg.skipResponse()) {
try {
sendMessage(nodeId, new MvccFutureResponse(msg.futureId()));
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send tx ack response, node left [msg=" + msg + ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send tx ack response [msg=" + msg + ", node=" + nodeId + ']', e);
}
}
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
private void processCoordinatorAckResponse(UUID nodeId, MvccFutureResponse msg) {
WaitAckFuture fut = ackFuts.remove(msg.futureId());
if (fut != null)
fut.onResponse();
else {
if (ctx.discovery().alive(nodeId))
U.warn(log, "Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
else if (log.isDebugEnabled())
log.debug("Failed to find tx ack future [node=" + nodeId + ", msg=" + msg + ']');
}
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
@SuppressWarnings("unchecked")
private void processCoordinatorWaitTxsRequest(final UUID nodeId, final MvccWaitTxsRequest msg) {
GridLongList txs = msg.transactions();
GridCompoundFuture resFut = null;
for (int i = 0; i < txs.size(); i++) {
Long txId = txs.get(i);
GridFutureAdapter fut = waitTxFuts.get(txId);
if (fut == null) {
GridFutureAdapter old = waitTxFuts.putIfAbsent(txId, fut = new GridFutureAdapter());
if (old != null)
fut = old;
}
boolean isDone;
synchronized (this) {
isDone = !activeTxs.containsKey(txId);
}
if (isDone)
fut.onDone();
if (!fut.isDone()) {
if (resFut == null)
resFut = new GridCompoundFuture();
resFut.add(fut);
}
}
if (resFut != null)
resFut.markInitialized();
if (resFut == null || resFut.isDone())
sendFutureResponse(nodeId, msg);
else {
resFut.listen(new IgniteInClosure<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
sendFutureResponse(nodeId, msg);
}
});
}
}
/**
* @param nodeId Node ID.
* @param msg Message.
*/
private void processCoordinatorActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) {
prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
}
/**
*
*/
private class ActiveQueries {
/** */
private final Map<UUID, TreeMap<Long, AtomicInteger>> activeQueries = new HashMap<>();
/** */
private Long minQry;
/** */
private synchronized long minimalQueryCounter() {
return minQry == null ? -1 : minQry;
}
/** */
private synchronized MvccSnapshotResponse assignQueryCounter(UUID nodeId, long futId) {
MvccSnapshotResponse res = new MvccSnapshotResponse();
long ver, tracking;
synchronized (MvccProcessorImpl.this) {
ver = committedCntr.get();
tracking = ver;
for (Long txVer : activeTxs.keySet()) {
if (txVer < ver) {
tracking = Math.min(txVer, tracking);
res.addTx(txVer);
}
}
}
TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId);
if (nodeMap == null) {
activeQueries.put(nodeId, nodeMap = new TreeMap<>());
nodeMap.put(tracking, new AtomicInteger(1));
}
else {
AtomicInteger cntr = nodeMap.get(tracking);
if (cntr == null)
nodeMap.put(tracking, new AtomicInteger(1));
else
cntr.incrementAndGet();
}
if (minQry == null)
minQry = tracking;
res.init(futId, crdVer, ver, MVCC_READ_OP_CNTR, MVCC_COUNTER_NA, tracking);
return res;
}
/** */
private synchronized void onQueryDone(UUID nodeId, Long ver) {
TreeMap<Long, AtomicInteger> nodeMap = activeQueries.get(nodeId);
if (nodeMap == null)
return;
assert minQry != null;
AtomicInteger cntr = nodeMap.get(ver);
assert cntr != null && cntr.get() > 0 : "onQueryDone ver=" + ver;
if (cntr.decrementAndGet() == 0) {
nodeMap.remove(ver);
if (nodeMap.isEmpty())
activeQueries.remove(nodeId);
if (ver.equals(minQry))
minQry = activeMinimal();
}
}
/** */
private synchronized void onNodeFailed(UUID nodeId) {
activeQueries.remove(nodeId);
minQry = activeMinimal();
}
/** */
private Long activeMinimal() {
Long min = null;
for (TreeMap<Long, AtomicInteger> s : activeQueries.values()) {
Long first = s.firstKey();
if (min == null || first < min)
min = first;
}
return min;
}
}
/**
*
*/
private class WaitAckFuture extends MvccFuture<Void> {
/** */
private final long id;
/** */
final boolean ackTx;
/**
* @param id Future ID.
* @param nodeId Coordinator node ID.
* @param ackTx {@code True} if ack tx commit, {@code false} if waits for previous txs.
*/
WaitAckFuture(long id, UUID nodeId, boolean ackTx) {
super(nodeId);
this.id = id;
this.ackTx = ackTx;
}
/**
*
*/
void onResponse() {
onDone();
}
/**
* @param nodeId Failed node ID.
*/
void onNodeLeft(UUID nodeId) {
if (crdId.equals(nodeId) && ackFuts.remove(id) != null)
onDone();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(WaitAckFuture.class, this, super.toString());
}
}
/**
*
*/
private class CacheCoordinatorNodeFailListener implements GridLocalEventListener {
/** {@inheritDoc} */
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent : evt;
DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
UUID nodeId = discoEvt.eventNode().id();
Map<Long, MvccSnapshotResponseListener> map = snapLsnrs.remove(nodeId);
if (map != null) {
ClusterTopologyCheckedException ex = new ClusterTopologyCheckedException("Failed to request mvcc " +
"version, coordinator failed: " + nodeId);
MvccSnapshotResponseListener lsnr;
for (Long id : map.keySet()) {
if ((lsnr = map.remove(id)) != null)
lsnr.onError(ex);
}
}
for (WaitAckFuture fut : ackFuts.values())
fut.onNodeLeft(nodeId);
activeQueries.onNodeFailed(nodeId);
prevCrdQueries.onNodeFailed(nodeId);
recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
// Put synthetic vote from another failed node
ballotBox.vote(nodeId);
tryFinishRecoveryVoting(nearNodeId, ballotBox);
});
if (discoEvt.eventNode().isClient()) {
RecoveryBallotBox ballotBox = recoveryBallotBoxes
.computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
ballotBox
.voters(discoEvt.topologyNodes().stream().map(ClusterNode::id).collect(Collectors.toList()));
tryFinishRecoveryVoting(nodeId, ballotBox);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return "CacheCoordinatorDiscoveryListener[]";
}
}
/**
*
*/
private class CoordinatorMessageListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
MvccMessage msg0 = (MvccMessage)msg;
if (msg0.waitForCoordinatorInit() && !initFut.isDone()) {
initFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> future) {
assert crdVer != 0L;
processMessage(nodeId, msg);
}
});
}
else
processMessage(nodeId, msg);
}
/**
* Processes mvcc message.
*
* @param nodeId Node id.
* @param msg Message.
*/
private void processMessage(UUID nodeId, Object msg) {
if (msg instanceof MvccTxSnapshotRequest)
processCoordinatorTxSnapshotRequest(nodeId, (MvccTxSnapshotRequest)msg);
else if (msg instanceof MvccAckRequestTx)
processCoordinatorTxAckRequest(nodeId, (MvccAckRequestTx)msg);
else if (msg instanceof MvccFutureResponse)
processCoordinatorAckResponse(nodeId, (MvccFutureResponse)msg);
else if (msg instanceof MvccAckRequestQueryCntr)
processCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryCntr)msg);
else if (msg instanceof MvccQuerySnapshotRequest)
processCoordinatorQuerySnapshotRequest(nodeId, (MvccQuerySnapshotRequest)msg);
else if (msg instanceof MvccSnapshotResponse)
processCoordinatorSnapshotResponse(nodeId, (MvccSnapshotResponse)msg);
else if (msg instanceof MvccWaitTxsRequest)
processCoordinatorWaitTxsRequest(nodeId, (MvccWaitTxsRequest)msg);
else if (msg instanceof MvccAckRequestQueryId)
processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg);
else if (msg instanceof MvccActiveQueriesMessage)
processCoordinatorActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg);
else if (msg instanceof MvccRecoveryFinishedMessage)
processRecoveryFinishedMessage(nodeId, ((MvccRecoveryFinishedMessage)msg));
else
U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
}
/** {@inheritDoc} */
@Override public String toString() {
return "CoordinatorMessageListener[]";
}
}
/**
* Accumulates transaction recovery votes for a node left the cluster.
* Transactions started by the left node are considered not active
* when each cluster server node aknowledges that is has finished transactions for the left node.
*/
private static class RecoveryBallotBox {
/** */
private List<UUID> voters;
/** */
private final Set<UUID> ballots = new HashSet<>();
/**
* @param voters Nodes which can have transaction started by the left node.
*/
private synchronized void voters(List<UUID> voters) {
this.voters = voters;
}
/**
* @param nodeId Voting node id.
*
*/
private synchronized void vote(UUID nodeId) {
ballots.add(nodeId);
}
/**
* @return {@code True} if all nodes expected to vote done it.
*/
private synchronized boolean isVotingDone() {
if (voters == null)
return false;
return ballots.containsAll(voters);
}
}
/**
* Process message that one node has finished with transactions for the left node.
* @param nodeId Node sent the message.
* @param msg Message.
*/
private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) {
UUID nearNodeId = msg.nearNodeId();
RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox());
ballotBox.vote(nodeId);
tryFinishRecoveryVoting(nearNodeId, ballotBox);
}
/**
* Finishes recovery on coordinator by removing transactions started by the left node
* @param nearNodeId Left node.
* @param ballotBox Votes accumulator for the left node.
*/
private void tryFinishRecoveryVoting(UUID nearNodeId, RecoveryBallotBox ballotBox) {
if (ballotBox.isVotingDone()) {
List<Long> recoveredTxs;
synchronized (this) {
recoveredTxs = activeTxs.entrySet().stream()
.filter(e -> e.getValue().nearNodeId.equals(nearNodeId))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
// Committed counter is increased because it is not known if transaction was committed or not and we must
// bump committed counter for committed transaction as it is used in (read-only) query snapshot.
recoveredTxs.forEach(txCntr -> onTxDone(txCntr, true));
recoveryBallotBoxes.remove(nearNodeId);
}
}
/** */
private interface Waiter {
/**
* @param ctx Grid kernal context.
*/
void run(GridKernalContext ctx);
/**
* @param other Another waiter.
* @return New compound waiter.
*/
Waiter concat(Waiter other);
/**
* @return {@code True} if there is an active local transaction
*/
boolean hasLocalTransaction();
/**
* @return {@code True} if it is a compound waiter.
*/
boolean compound();
}
/** */
private static class LockFuture extends GridFutureAdapter<Void> implements Waiter, Runnable {
/** */
private final byte plc;
/**
* @param plc Pool policy.
*/
LockFuture(byte plc) {
this.plc = plc;
}
/** {@inheritDoc} */
@Override public void run() {
onDone();
}
/** {@inheritDoc} */
@Override public void run(GridKernalContext ctx) {
try {
ctx.pools().poolForPolicy(plc).execute(this);
}
catch (IgniteCheckedException e) {
U.error(ctx.log(LockFuture.class), e);
}
}
/** {@inheritDoc} */
@Override public Waiter concat(Waiter other) {
return new CompoundWaiterNoLocal(this, other);
}
/** {@inheritDoc} */
@Override public boolean hasLocalTransaction() {
return false;
}
/** {@inheritDoc} */
@Override public boolean compound() {
return false;
}
}
/** */
private static class LocalTransactionMarker implements Waiter {
/** {@inheritDoc} */
@Override public void run(GridKernalContext ctx) {
// No-op
}
/** {@inheritDoc} */
@Override public Waiter concat(Waiter other) {
return new CompoundWaiter(other);
}
/** {@inheritDoc} */
@Override public boolean hasLocalTransaction() {
return true;
}
/** {@inheritDoc} */
@Override public boolean compound() {
return false;
}
}
/** */
@SuppressWarnings("unchecked")
private static class CompoundWaiter implements Waiter {
/** */
private final Object inner;
/**
* @param waiter Waiter to wrap.
*/
private CompoundWaiter(Waiter waiter) {
inner = waiter.compound() ? ((CompoundWaiter)waiter).inner : waiter;
}
/**
* @param first First waiter.
* @param second Second waiter.
*/
private CompoundWaiter(Waiter first, Waiter second) {
ArrayList<Waiter> list = new ArrayList<>();
add(list, first);
add(list, second);
inner = list;
}
/** */
private void add(List<Waiter> to, Waiter waiter) {
if (!waiter.compound())
to.add(waiter);
else if (((CompoundWaiter)waiter).inner.getClass() == ArrayList.class)
to.addAll((List<Waiter>)((CompoundWaiter)waiter).inner);
else
to.add((Waiter)((CompoundWaiter)waiter).inner);
}
/** {@inheritDoc} */
@Override public void run(GridKernalContext ctx) {
if (inner.getClass() == ArrayList.class) {
for (Waiter waiter : (List<Waiter>)inner) {
waiter.run(ctx);
}
}
else
((Waiter)inner).run(ctx);
}
/** {@inheritDoc} */
@Override public Waiter concat(Waiter other) {
return new CompoundWaiter(this, other);
}
/** {@inheritDoc} */
@Override public boolean hasLocalTransaction() {
return true;
}
/** {@inheritDoc} */
@Override public boolean compound() {
return true;
}
}
/** */
private static class CompoundWaiterNoLocal extends CompoundWaiter {
/**
* @param first First waiter.
* @param second Second waiter.
*/
private CompoundWaiterNoLocal(Waiter first, Waiter second) {
super(first, second);
}
/** {@inheritDoc} */
@Override public Waiter concat(Waiter other) {
return new CompoundWaiterNoLocal(this, other);
}
/** {@inheritDoc} */
@Override public boolean hasLocalTransaction() {
return false;
}
}
/**
* Mvcc garbage collection scheduler.
*/
private static class VacuumScheduler extends GridWorker {
/** */
private final static long VACUUM_TIMEOUT = 60_000;
/** */
private final long interval;
/** */
private final MvccProcessorImpl prc;
/**
* @param ctx Kernal context.
* @param log Logger.
* @param prc Mvcc processor.
*/
VacuumScheduler(GridKernalContext ctx, IgniteLogger log, MvccProcessorImpl prc) {
super(ctx.igniteInstanceName(), "vacuum-scheduler", log);
this.interval = ctx.config().getMvccVacuumFrequency();
this.prc = prc;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
U.sleep(interval); // initial delay
while (!isCancelled()) {
long nextScheduledTime = U.currentTimeMillis() + interval;
try {
IgniteInternalFuture<VacuumMetrics> fut = prc.runVacuum();
if (log.isDebugEnabled())
log.debug("Vacuum started by scheduler.");
while (true) {
try {
fut.get(VACUUM_TIMEOUT);
break;
}
catch (IgniteFutureTimeoutCheckedException e) {
U.warn(log, "Failed to wait for vacuum complete. Consider increasing vacuum workers count.");
}
}
}
catch (IgniteInterruptedCheckedException e) {
throw e; // Cancelled.
}
catch (Throwable e) {
if (e instanceof Error)
throw (Error) e;
U.error(log, "Vacuum error.", e);
}
long delay = nextScheduledTime - U.currentTimeMillis();
if (delay > 0)
U.sleep(delay);
}
}
}
/**
* Vacuum worker.
*/
private static class VacuumWorker extends GridWorker {
/** */
private final BlockingQueue<VacuumTask> cleanupQueue;
/**
* @param ctx Kernal context.
* @param log Logger.
* @param cleanupQueue Cleanup tasks queue.
*/
VacuumWorker(GridKernalContext ctx, IgniteLogger log, BlockingQueue<VacuumTask> cleanupQueue) {
super(ctx.igniteInstanceName(), "vacuum-cleaner", log);
this.cleanupQueue = cleanupQueue;
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
VacuumTask task = cleanupQueue.take();
try {
switch (task.part().state()) {
case EVICTED:
case RENTING:
task.onDone(new VacuumMetrics());
break;
case MOVING:
task.part().group().preloader().rebalanceFuture().listen(f -> cleanupQueue.add(task));
break;
case OWNING:
task.onDone(processPartition(task));
break;
case LOST:
task.onDone(new IgniteCheckedException("Partition is lost."));
break;
}
}
catch (IgniteInterruptedCheckedException e) {
task.onDone(e);
throw e; // Cancelled.
}
catch (Throwable e) {
task.onDone(e);
if (e instanceof Error)
throw (Error) e;
}
}
}
/**
* Process partition.
*
* @param task VacuumTask.
* @throws IgniteCheckedException If failed.
*/
private VacuumMetrics processPartition(VacuumTask task) throws IgniteCheckedException {
long startNanoTime = System.nanoTime();
GridDhtLocalPartition part = task.part();
VacuumMetrics metrics = new VacuumMetrics();
if (!part.reserve())
return metrics;
int curCacheId = CU.UNDEFINED_CACHE_ID;
try {
GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor(KEY_ONLY);
KeyCacheObject prevKey = null;
Object rest = null;
List<MvccLinkAwareSearchRow> cleanupRows = null;
MvccSnapshot snapshot = task.snapshot();
GridCacheContext cctx = null;
boolean shared = part.group().sharedGroup();
if (!shared && (cctx = F.first(part.group().caches())) == null)
return metrics;
while (cursor.next()) {
if (isCancelled())
throw new IgniteInterruptedCheckedException("Operation has been cancelled.");
MvccDataRow row = (MvccDataRow)cursor.get();
if (prevKey == null)
prevKey = row.key();
if (cctx == null) {
cctx = part.group().shared().cacheContext(curCacheId = row.cacheId());
if (cctx == null)
return metrics;
}
if (!prevKey.equals(row.key()) || (shared && curCacheId != row.cacheId())) {
if (rest != null || !F.isEmpty(cleanupRows))
cleanup(part, prevKey, cleanupRows, rest, cctx, metrics);
cleanupRows = null;
rest = null;
if (shared && curCacheId != row.cacheId()) {
cctx = part.group().shared().cacheContext(curCacheId = row.cacheId());
if (cctx == null)
return metrics;
}
prevKey = row.key();
}
if (canClean(row, snapshot, cctx))
cleanupRows = addRow(cleanupRows, row);
else if (actualize(cctx, row, snapshot))
rest = addRest(rest, row);
metrics.addScannedRowsCount(1);
}
if (rest != null || !F.isEmpty(cleanupRows))
cleanup(part, prevKey, cleanupRows, rest, cctx, metrics);
metrics.addSearchNanoTime(System.nanoTime() - startNanoTime - metrics.cleanupNanoTime());
return metrics;
}
finally {
part.release();
}
}
/** */
@SuppressWarnings("unchecked")
@NotNull private Object addRest(@Nullable Object rest, MvccDataRow row) {
if (rest == null)
rest = row;
else if (rest.getClass() == ArrayList.class)
((List)rest).add(row);
else {
ArrayList list = new ArrayList();
list.add(rest);
list.add(row);
rest = list;
}
return rest;
}
/**
* @param rows Collection of rows.
* @param row Row to add.
* @return Collection of rows.
*/
@NotNull private List<MvccLinkAwareSearchRow> addRow(@Nullable List<MvccLinkAwareSearchRow> rows, MvccDataRow row) {
if (rows == null)
rows = new ArrayList<>();
rows.add(new MvccLinkAwareSearchRow(row.cacheId(), row.key(), row.mvccCoordinatorVersion(),
row.mvccCounter(), row.mvccOperationCounter(), row.link()));
return rows;
}
/**
* @param row Mvcc row to check.
* @param snapshot Cleanup version to compare with.
* @param cctx Cache context.
* @throws IgniteCheckedException If failed.
*/
private boolean canClean(MvccDataRow row, MvccSnapshot snapshot,
GridCacheContext cctx) throws IgniteCheckedException {
// Row can be safely cleaned if it has ABORTED min version or COMMITTED and less than cleanup one max version.
return compare(row, snapshot.coordinatorVersion(), snapshot.cleanupVersion()) <= 0
&& hasNewVersion(row) && MvccUtils.compareNewVersion(row, snapshot.coordinatorVersion(), snapshot.cleanupVersion()) <= 0
&& MvccUtils.state(cctx, row.newMvccCoordinatorVersion(), row.newMvccCounter(),
row.newMvccOperationCounter() | (row.newMvccTxState() << MVCC_HINTS_BIT_OFF)) == TxState.COMMITTED
|| MvccUtils.state(cctx, row.mvccCoordinatorVersion(), row.mvccCounter(),
row.mvccOperationCounter() | (row.mvccTxState() << MVCC_HINTS_BIT_OFF)) == TxState.ABORTED;
}
/** */
private boolean actualize(GridCacheContext cctx, MvccDataRow row,
MvccSnapshot snapshot) throws IgniteCheckedException {
return isVisible(cctx, snapshot, row.mvccCoordinatorVersion(), row.mvccCounter(), row.mvccOperationCounter(), false)
&& (row.mvccTxState() == TxState.NA || (row.newMvccCoordinatorVersion() != MVCC_CRD_COUNTER_NA && row.newMvccTxState() == TxState.NA));
}
/**
* @param part Local partition.
* @param key Key.
* @param cleanupRows Cleanup rows.
* @param cctx Cache context.
* @param metrics Vacuum metrics.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private void cleanup(GridDhtLocalPartition part, KeyCacheObject key, List<MvccLinkAwareSearchRow> cleanupRows,
Object rest, GridCacheContext cctx, VacuumMetrics metrics) throws IgniteCheckedException {
assert key != null && cctx != null && (!F.isEmpty(cleanupRows) || rest != null);
cctx.gate().enter();
try {
long cleanupStartNanoTime = System.nanoTime();
GridCacheEntryEx entry = cctx.cache().entryEx(key);
while (true) {
entry.lockEntry();
if (!entry.obsolete())
break;
entry.unlockEntry();
entry = cctx.cache().entryEx(key);
}
int cleaned = 0;
try {
cctx.shared().database().checkpointReadLock();
try {
if (cleanupRows != null)
cleaned = part.dataStore().cleanup(cctx, cleanupRows);
if (rest != null) {
if (rest.getClass() == ArrayList.class) {
for (MvccDataRow row : ((List<MvccDataRow>) rest)) {
part.dataStore().updateTxState(cctx, row);
}
} else
part.dataStore().updateTxState(cctx, (MvccDataRow) rest);
}
} finally {
cctx.shared().database().checkpointReadUnlock();
}
} finally {
entry.unlockEntry();
cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
metrics.addCleanupNanoTime(System.nanoTime() - cleanupStartNanoTime);
metrics.addCleanupRowsCnt(cleaned);
}
}
finally {
cctx.gate().leave();
}
}
}
/** */
private static class ActiveTx {
/** */
private final long tracking;
/** */
private final UUID nearNodeId;
/** */
private ActiveTx(long tracking, UUID nearNodeId) {
this.tracking = tracking;
this.nearNodeId = nearNodeId;
}
}
/** */
private static class ActiveServerTx extends ActiveTx {
/** */
private ActiveServerTx(long tracking, UUID nearNodeId) {
super(tracking, nearNodeId);
}
}
}