| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.transactions; |
| |
| import java.io.Externalizable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteClientDisconnectedException; |
| import org.apache.ignite.IgniteCompute; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.binary.BinaryObjectException; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.IgniteFeatures; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.managers.communication.GridMessageListener; |
| import org.apache.ignite.internal.managers.discovery.DiscoCache; |
| import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; |
| import org.apache.ignite.internal.managers.systemview.walker.TransactionViewWalker; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.TxRecord; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; |
| import org.apache.ignite.internal.processors.cache.GridCacheMessage; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture; |
| import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; |
| import org.apache.ignite.internal.processors.cache.LongOperationsDumpSettingsClosure; |
| import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; |
| import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage; |
| import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.cluster.BaselineTopology; |
| import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; |
| import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; |
| import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; |
| import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; |
| import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; |
| import org.apache.ignite.internal.util.future.GridCompoundFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.IgnitePair; |
| import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteFuture; |
| import org.apache.ignite.lang.IgniteReducer; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.spi.systemview.view.TransactionView; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.apache.ignite.transactions.TransactionState; |
| import org.jetbrains.annotations.Nullable; |
| import org.jsr166.ConcurrentLinkedHashMap; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS; |
| import static org.apache.ignite.IgniteSystemProperties.getLong; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.events.EventType.EVT_TX_STARTED; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; |
| import static org.apache.ignite.internal.GridTopic.TOPIC_TX; |
| import static org.apache.ignite.internal.IgniteFeatures.LONG_OPERATIONS_DUMP_TIMEOUT; |
| import static org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; |
| import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH; |
| import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH; |
| import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionState.ACTIVE; |
| import static org.apache.ignite.transactions.TransactionState.COMMITTED; |
| import static org.apache.ignite.transactions.TransactionState.COMMITTING; |
| import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; |
| import static org.apache.ignite.transactions.TransactionState.PREPARED; |
| import static org.apache.ignite.transactions.TransactionState.PREPARING; |
| import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; |
| import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; |
| import static org.apache.ignite.transactions.TransactionState.SUSPENDED; |
| import static org.apache.ignite.transactions.TransactionState.UNKNOWN; |
| import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; |
| |
| /** |
| * Cache transaction manager. |
| */ |
| public class IgniteTxManager extends GridCacheSharedManagerAdapter { |
| /** */ |
| public static final String TXS_MON_LIST = "transactions"; |
| |
| /** */ |
| public static final String TXS_MON_LIST_DESC = "Running transactions"; |
| |
| /** Default maximum number of transactions that have completed. */ |
| private static final int DFLT_MAX_COMPLETED_TX_CNT = 262144; // 2^18 |
| |
| /** Slow tx warn timeout (initialized to 0). */ |
| private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0); |
| |
| /** One phase commit deferred ack request timeout. */ |
| public static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT = |
| Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT, 500); |
| |
| /** One phase commit deferred ack request buffer size. */ |
| private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE = |
| Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256); |
| |
| /** Deadlock detection maximum iterations. */ |
| static int DEADLOCK_MAX_ITERS = |
| IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000); |
| |
| /** Committing transactions. */ |
| private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); |
| |
| /** Topology version should be used when mapping internal tx. */ |
| private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); |
| |
| /** Per-thread transaction map. */ |
| private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); |
| |
| /** Per-thread system transaction map. */ |
| private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap(); |
| |
| /** Per-ID map. */ |
| private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> idMap = newMap(); |
| |
| /** Per-ID map for near transactions. */ |
| private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap(); |
| |
| /** Deadlock detection futures. */ |
| private final ConcurrentMap<Long, TxDeadlockFuture> deadlockDetectFuts = new ConcurrentHashMap<>(); |
| |
| /** TX handler. */ |
| private IgniteTxHandler txHnd; |
| |
| /** |
| * Shows if dump requests from local node to near node are allowed, when long running transaction |
| * is found. If allowed, the compute request to near node will be made to get thread dump of transaction |
| * owner thread. |
| */ |
| private boolean txOwnerDumpRequestsAllowed = |
| IgniteSystemProperties.getBoolean(IGNITE_TX_OWNER_DUMP_REQUESTS_ALLOWED, true); |
| |
| /** |
| * Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with |
| * information about how much time did it spent in system time (time while aquiring locks, preparing, |
| * commiting, etc) and user time (time when client node runs some code while holding transaction and not |
| * waiting it). Equals 0 if not set. No transactions are dumped in log if this parameter is not set. |
| */ |
| private volatile long longTransactionTimeDumpThreshold = getLong(IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD, 0); |
| |
| /** |
| * The coefficient for samples of completed transactions that will be dumped in log. |
| */ |
| private volatile double transactionTimeDumpSamplesCoefficient = |
| IgniteSystemProperties.getFloat(IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT, 0.0f); |
| |
| /** |
| * The limit of samples of completed transactions that will be dumped in log per second, if |
| * {@link #transactionTimeDumpSamplesCoefficient} is above <code>0.0</code>. Must be integer value |
| * greater than <code>0</code>. |
| */ |
| private volatile int longTransactionTimeDumpSamplesPerSecondLimit = |
| IgniteSystemProperties.getInteger(IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT, 5); |
| |
| /** Committed local transactions. */ |
| private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVersSorted = |
| new GridBoundedConcurrentOrderedMap<>( |
| Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); |
| |
| /** Committed local transactions. */ |
| private final ConcurrentLinkedHashMap<GridCacheVersion, Object> completedVersHashMap = |
| new ConcurrentLinkedHashMap<>( |
| Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), |
| 0.75f, |
| Runtime.getRuntime().availableProcessors() * 2, |
| Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), |
| PER_SEGMENT_Q); |
| |
| /** Pending one phase commit ack requests sender. */ |
| private GridDeferredAckMessageSender deferredAckMsgSnd; |
| |
| /** Slow tx warn timeout. */ |
| private int slowTxWarnTimeout = SLOW_TX_WARN_TIMEOUT; |
| |
| /** Long operations dump timeout. */ |
| private volatile long longOpsDumpTimeout = |
| getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, DFLT_LONG_OPERATIONS_DUMP_TIMEOUT); |
| |
| /** */ |
| private TxDumpsThrottling txDumpsThrottling = new TxDumpsThrottling(); |
| |
| /** |
| * Near version to DHT version map. Note that we initialize to 5K size from get go, |
| * to avoid future map resizings. |
| */ |
| private final ConcurrentMap<GridCacheVersion, GridCacheVersion> mappedVers = |
| new ConcurrentHashMap<>(5120); |
| |
| /** TxDeadlock detection. */ |
| private TxDeadlockDetection txDeadlockDetection; |
| |
| /** Flag indicates that {@link TxRecord} records will be logged to WAL. */ |
| private boolean logTxRecords; |
| |
| /** |
| * Indicates whether {@code suspend()} and {@code resume()} operations are supported for pessimistic transactions |
| * cluster wide. |
| */ |
| private volatile boolean suspendResumeForPessimisticSupported; |
| |
| /** {@inheritDoc} */ |
| @Override protected void onKernalStop0(boolean cancel) { |
| cctx.gridIO().removeMessageListener(TOPIC_TX); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void start0() throws IgniteCheckedException { |
| txHnd = new IgniteTxHandler(cctx); |
| |
| deferredAckMsgSnd = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) { |
| @Override public int getTimeout() { |
| return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; |
| } |
| |
| @Override public int getBufferSize() { |
| return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; |
| } |
| |
| @Override public void finish(UUID nodeId, Collection<GridCacheVersion> vers) { |
| GridDhtTxOnePhaseCommitAckRequest ackReq = new GridDhtTxOnePhaseCommitAckRequest(vers); |
| |
| cctx.kernalContext().gateway().readLock(); |
| |
| try { |
| cctx.io().send(nodeId, ackReq, GridIoPolicy.SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send one phase commit ack to backup node because it left grid: " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to send one phase commit ack to backup node [backup=" + nodeId + ']', e); |
| } |
| finally { |
| cctx.kernalContext().gateway().readUnlock(); |
| } |
| } |
| }; |
| |
| cctx.gridEvents().addDiscoveryEventListener( |
| new DiscoveryEventListener() { |
| @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { |
| if (evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) { |
| UUID nodeId = evt.eventNode().id(); |
| |
| IgniteInternalFuture<?> recInitFut = cctx.kernalContext().closure().runLocalSafe( |
| new TxRecoveryInitRunnable(evt.eventNode(), cctx.coordinators().currentCoordinator())); |
| |
| recInitFut.listen(future -> { |
| if (future.error() != null) |
| cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, future.error())); |
| }); |
| |
| for (TxDeadlockFuture fut : deadlockDetectFuts.values()) |
| fut.onNodeLeft(nodeId); |
| |
| for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) { |
| Object obj = entry.getValue(); |
| |
| if (obj instanceof GridCacheReturnCompletableWrapper && |
| nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId())) |
| removeTxReturn(entry.getKey()); |
| } |
| } |
| |
| suspendResumeForPessimisticSupported = IgniteFeatures.allNodesSupports( |
| cctx.discovery().remoteNodes(), IgniteFeatures.SUSPEND_RESUME_PESSIMISTIC_TX); |
| } |
| }, |
| EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); |
| |
| this.txDeadlockDetection = new TxDeadlockDetection(cctx); |
| |
| cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener()); |
| |
| this.logTxRecords = IgniteSystemProperties.getBoolean(IGNITE_WAL_LOG_TX_RECORDS, false); |
| |
| cctx.txMetrics().onTxManagerStarted(); |
| |
| cctx.kernalContext().systemView().registerView(TXS_MON_LIST, TXS_MON_LIST_DESC, |
| new TransactionViewWalker(), |
| new ReadOnlyCollectionView2X<>(idMap.values(), nearIdMap.values()), |
| TransactionView::new); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onKernalStart0(boolean active) { |
| suspendResumeForPessimisticSupported = IgniteFeatures.allNodesSupports( |
| cctx.discovery().remoteNodes(), IgniteFeatures.SUSPEND_RESUME_PESSIMISTIC_TX); |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| */ |
| public void rollbackTransactionsForCache(int cacheId) { |
| rollbackTransactionsForCache(cacheId, nearIdMap); |
| |
| rollbackTransactionsForCache(cacheId, idMap); |
| } |
| |
| /** |
| * @param cacheToStop Cache to stop. |
| */ |
| public void rollbackTransactionsForStoppingCache(int cacheToStop) { |
| GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = new GridCompoundFuture<>(); |
| |
| Collection<IgniteInternalTx> active = activeTransactions(); |
| |
| for (IgniteInternalTx tx : active) { |
| IgniteTxState state = tx.txState(); |
| |
| Collection<IgniteTxEntry> txEntries = |
| state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries(); |
| |
| for (IgniteTxEntry e : txEntries) { |
| if (e.context().cacheId() == cacheToStop) { |
| compFut.add(failTxOnPreparing(tx)); |
| |
| break; |
| } |
| } |
| } |
| |
| compFut.markInitialized(); |
| |
| try { |
| compFut.get(); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Error occurred during tx rollback.", e); |
| } |
| } |
| |
| /** |
| * This method allows to roll back the transaction during partition map exchange related to destroying a cache(s). |
| * Semantically, this method is equivalent to two subsequent calls: |
| * <pre> |
| * tx.rollbackAsync(); |
| * tx.currentPrepareFuture().onDone(new IgniteTxRollbackCheckedException()) |
| * </pre> |
| * |
| * It is assumed that the given transaction did not acquired any locks. |
| * |
| * @param tx Transaction. |
| * @return Rollback future. |
| */ |
| private IgniteInternalFuture<IgniteInternalTx> failTxOnPreparing(IgniteInternalTx tx) { |
| IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); |
| |
| IgniteInternalFuture prepFut = tx.currentPrepareFuture(); |
| |
| if (prepFut != null) { |
| assert prepFut instanceof GridFutureAdapter : |
| "It is assumed that prepare future should extend GridFutureAdapter class [prepFut=" + prepFut + ']'; |
| |
| ((GridFutureAdapter)prepFut).onDone( |
| new IgniteTxRollbackCheckedException( |
| "Failed to prepare the transaction, due to the transaction is marked as rolled back " + |
| "[tx=" + CU.txString(tx) + ']')); |
| } |
| |
| return rollbackFut; |
| } |
| |
| /** |
| * Rollback transactions blocking partition map exchange. |
| * |
| * @param topVer Initial exchange version. |
| */ |
| public void rollbackOnTopologyChange(AffinityTopologyVersion topVer) { |
| for (IgniteInternalTx tx : activeTransactions()) { |
| if (tx.local() && tx.near() && needWaitTransaction(tx, topVer)) { |
| U.warn(log, "The transaction was forcibly rolled back on partition map exchange because a timeout is " + |
| "reached: [tx=" + CU.txString(tx) + ", topVer=" + topVer + ']'); |
| |
| ((GridNearTxLocal)tx).rollbackNearTxLocalAsync(false, false); |
| } |
| } |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| * @param txMap Transactions map. |
| */ |
| private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx> txMap) { |
| for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) { |
| IgniteInternalTx tx = e.getValue(); |
| |
| for (IgniteTxEntry entry : tx.allEntries()) { |
| if (entry.cacheId() == cacheId) { |
| rollbackTx(tx, false, false); |
| |
| break; |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected(IgniteFuture reconnectFut) { |
| for (IgniteInternalTx tx : idMap.values()) { |
| rollbackTx(tx, true, false); |
| |
| tx.state(ROLLING_BACK); |
| tx.state(ROLLED_BACK); |
| } |
| |
| for (IgniteInternalTx tx : nearIdMap.values()) { |
| rollbackTx(tx, true, false); |
| |
| tx.state(ROLLING_BACK); |
| tx.state(ROLLED_BACK); |
| } |
| |
| IgniteClientDisconnectedException err = |
| new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); |
| |
| for (TxDeadlockFuture fut : deadlockDetectFuts.values()) |
| fut.onDone(err); |
| } |
| |
| /** |
| * @return TX handler. |
| */ |
| public IgniteTxHandler txHandler() { |
| return txHnd; |
| } |
| |
| /** |
| * Sets if dump requests from local node to near node are allowed, when long running transaction |
| * is found. If allowed, the compute request to near node will be made to get thread dump of transaction |
| * owner thread. |
| * |
| * @return <code>true</code> if allowed, <code>false</code> otherwise. |
| */ |
| public boolean txOwnerDumpRequestsAllowed() { |
| return txOwnerDumpRequestsAllowed; |
| } |
| |
| /** |
| * Sets if dump requests from local node to near node are allowed, when long running transaction |
| * is found. If allowed, the compute request to near node will be made to get thread dump of transaction |
| * owner thread. |
| * |
| * @param allowed whether allowed |
| */ |
| public void setTxOwnerDumpRequestsAllowed(boolean allowed) { |
| txOwnerDumpRequestsAllowed = allowed; |
| } |
| |
| /** |
| * Threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with |
| * information about how much time did it spent in system time (time while aquiring locks, preparing, |
| * commiting, etc) and user time (time when client node runs some code while holding transaction and not |
| * waiting it). Equals 0 if not set. No transactions are dumped in log if this parameter is not set. |
| * |
| * @return Threshold timeout in milliseconds. |
| */ |
| public long longTransactionTimeDumpThreshold() { |
| return longTransactionTimeDumpThreshold; |
| } |
| |
| /** |
| * Sets threshold timeout for long transactions, if transaction exceeds it, it will be dumped in log with |
| * information about how much time did it spent in system time (time while aquiring locks, preparing, |
| * commiting, etc) and user time (time when client node runs some code while holding transaction and not |
| * waiting it). Can be set to 0 - no transactions will be dumped in log in this case. |
| * |
| * @param longTransactionTimeDumpThreshold Value of threshold timeout in milliseconds. |
| */ |
| public void longTransactionTimeDumpThreshold(long longTransactionTimeDumpThreshold) { |
| assert longTransactionTimeDumpThreshold >= 0 |
| : "longTransactionTimeDumpThreshold must be greater than or equal to 0."; |
| |
| this.longTransactionTimeDumpThreshold = longTransactionTimeDumpThreshold; |
| } |
| |
| /** |
| * The coefficient for samples of completed transactions that will be dumped in log. |
| */ |
| public double transactionTimeDumpSamplesCoefficient() { |
| return transactionTimeDumpSamplesCoefficient; |
| } |
| |
| /** |
| * Sets the coefficient for samples of completed transactions that will be dumped in log. |
| */ |
| public void transactionTimeDumpSamplesCoefficient(double transactionTimeDumpSamplesCoefficient) { |
| assert transactionTimeDumpSamplesCoefficient >= 0.0 && transactionTimeDumpSamplesCoefficient <= 1.0 |
| : "transactionTimeDumpSamplesCoefficient value must be between 0.0 and 1.0 inclusively."; |
| |
| this.transactionTimeDumpSamplesCoefficient = transactionTimeDumpSamplesCoefficient; |
| } |
| |
| /** |
| * The limit of samples of completed transactions that will be dumped in log per second, if |
| * {@link #transactionTimeDumpSamplesCoefficient} is above <code>0.0</code>. Must be integer value |
| * greater than <code>0</code>. |
| */ |
| public int transactionTimeDumpSamplesPerSecondLimit() { |
| return longTransactionTimeDumpSamplesPerSecondLimit; |
| } |
| |
| /** |
| * Sets the limit of samples of completed transactions that will be dumped in log per second, if |
| * {@link #transactionTimeDumpSamplesCoefficient} is above <code>0.0</code>. Must be integer value |
| * greater than <code>0</code>. |
| */ |
| public void transactionTimeDumpSamplesPerSecondLimit(int transactionTimeDumpSamplesPerSecondLimit) { |
| assert transactionTimeDumpSamplesPerSecondLimit > 0 |
| : "transactionTimeDumpSamplesPerSecondLimit must be integer value greater than 0."; |
| |
| this.longTransactionTimeDumpSamplesPerSecondLimit = transactionTimeDumpSamplesPerSecondLimit; |
| } |
| |
| /** |
| * Invalidates transaction. |
| * |
| * @param tx Transaction. |
| */ |
| public void salvageTx(IgniteInternalTx tx) { |
| salvageTx(tx, USER_FINISH); |
| } |
| |
| /** |
| * Invalidates transaction. |
| * |
| * @param tx Transaction. |
| * @param status Finalization status. |
| */ |
| private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) { |
| assert tx != null; |
| |
| TransactionState state = tx.state(); |
| |
| if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) { |
| if (!tx.markFinalizing(status)) { |
| if (log.isInfoEnabled()) |
| log.info("Will not try to commit invalidate transaction (could not mark finalized): " + tx); |
| |
| return; |
| } |
| |
| tx.salvageTx(); |
| |
| if (log.isInfoEnabled()) |
| log.info("Invalidated transaction because originating node left grid: " + CU.txString(tx)); |
| } |
| } |
| |
| /** |
| * Prints out memory stats to standard out. |
| * <p> |
| * USE ONLY FOR MEMORY PROFILING DURING TESTS. |
| */ |
| @Override public void printMemoryStats() { |
| X.println(">>> "); |
| X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); |
| X.println(">>> threadMapSize: " + threadMap.size()); |
| X.println(">>> idMap [size=" + idMap.size() + ']'); |
| X.println(">>> nearIdMap [size=" + nearIdMap.size() + ']'); |
| X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); |
| X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); |
| } |
| |
| /** |
| * @return Thread map size. |
| */ |
| public int threadMapSize() { |
| return threadMap.size(); |
| } |
| |
| /** |
| * @return ID map size. |
| */ |
| public int idMapSize() { |
| return idMap.size(); |
| } |
| |
| /** |
| * @return Committed versions size. |
| */ |
| public int completedVersionsSize() { |
| return completedVersHashMap.size(); |
| } |
| |
| /** |
| * |
| * @param tx Transaction to check. |
| * @return {@code True} if transaction has been committed or rolled back, |
| * {@code false} otherwise. |
| */ |
| private boolean isCompleted(IgniteInternalTx tx) { |
| boolean completed = completedVersHashMap.containsKey(tx.xidVersion()); |
| |
| // Need check that for tx rollback message was not received before lock. |
| // This could happen on timeout or async rollback. |
| if (!completed && tx.local() && tx.dht()) |
| return completedVersHashMap.containsKey(tx.nearXidVersion()); |
| |
| return completed; |
| } |
| |
| /** |
| * @param implicit {@code True} if transaction is implicit. |
| * @param implicitSingle Implicit-with-single-key flag. |
| * @param concurrency Concurrency. |
| * @param isolation Isolation. |
| * @param timeout transaction timeout. |
| * @param mvccOp Whether this transaction is being started via SQL API or not, or {@code null} if unknown. |
| * @param txSize Expected transaction size. |
| * @param lb Label. |
| * @return New transaction. |
| */ |
| public GridNearTxLocal newTx( |
| boolean implicit, |
| boolean implicitSingle, |
| @Nullable GridCacheContext sysCacheCtx, |
| TransactionConcurrency concurrency, |
| TransactionIsolation isolation, |
| long timeout, |
| boolean storeEnabled, |
| Boolean mvccOp, |
| int txSize, |
| @Nullable String lb |
| ) { |
| assert sysCacheCtx == null || sysCacheCtx.systemTx(); |
| |
| UUID subjId = null; // TODO GG-9141 how to get subj ID? |
| |
| int taskNameHash = cctx.kernalContext().job().currentTaskNameHash(); |
| |
| GridNearTxLocal tx = new GridNearTxLocal( |
| cctx, |
| implicit, |
| implicitSingle, |
| sysCacheCtx != null, |
| sysCacheCtx != null ? sysCacheCtx.ioPolicy() : SYSTEM_POOL, |
| concurrency, |
| isolation, |
| timeout, |
| storeEnabled, |
| mvccOp, |
| txSize, |
| subjId, |
| taskNameHash, |
| lb, |
| txDumpsThrottling |
| ); |
| |
| if (tx.system()) { |
| AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), tx); |
| |
| // If there is another system transaction in progress, use it's topology version to prevent deadlock. |
| if (topVer != null) |
| tx.topologyVersion(topVer); |
| } |
| |
| return onCreated(sysCacheCtx, tx); |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @param tx Created transaction. |
| * @return Started transaction. |
| */ |
| @Nullable public <T extends IgniteInternalTx> T onCreated(@Nullable GridCacheContext cacheCtx, T tx) { |
| ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); |
| |
| // Start clean. |
| resetContext(); |
| |
| if (isCompleted(tx)) { |
| if (log.isDebugEnabled()) |
| log.debug("Attempt to create a completed transaction (will ignore): " + tx); |
| |
| return null; |
| } |
| |
| IgniteInternalTx t; |
| |
| if ((t = txIdMap.putIfAbsent(tx.xidVersion(), tx)) == null) { |
| if (tx.local() && !tx.dht()) { |
| assert tx instanceof GridNearTxLocal : tx; |
| |
| if (!tx.implicit()) { |
| if (cacheCtx == null || !cacheCtx.systemTx()) |
| threadMap.put(tx.threadId(), tx); |
| else |
| sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); |
| } |
| |
| ((GridNearTxLocal)tx).recordStateChangedEvent(EVT_TX_STARTED); |
| } |
| |
| // Handle mapped versions. |
| if (tx instanceof GridCacheMappedVersion) { |
| GridCacheMappedVersion mapped = (GridCacheMappedVersion)tx; |
| |
| GridCacheVersion from = mapped.mappedVersion(); |
| |
| if (from != null) |
| mappedVers.put(from, tx.xidVersion()); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Added transaction version mapping [from=" + from + ", to=" + tx.xidVersion() + |
| ", tx=" + tx + ']'); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Attempt to create an existing transaction (will ignore) [newTx=" + tx + ", existingTx=" + |
| t + ']'); |
| |
| return null; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Transaction created: " + tx); |
| |
| return tx; |
| } |
| |
| /** |
| * Creates a future that will wait for all ongoing transactions that maybe affected by topology update |
| * to be finished. This set of transactions include |
| * <ul> |
| * <li/> All {@link TransactionConcurrency#PESSIMISTIC} transactions with topology version |
| * less or equal to {@code topVer}. |
| * <li/> {@link TransactionConcurrency#OPTIMISTIC} transactions in PREPARING state with topology |
| * version less or equal to {@code topVer} and having transaction key with entry that belongs to |
| * one of partitions in {@code parts}. |
| * </ul> |
| * |
| * @param topVer Topology version. |
| * @param node Cluster node. |
| * @return Future that will be completed when all ongoing transactions are finished. |
| */ |
| public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer, ClusterNode node) { |
| GridCompoundFuture<IgniteInternalTx, Boolean> res = |
| new CacheObjectsReleaseFuture<>( |
| "LocalTx", |
| topVer, |
| new IgniteReducer<IgniteInternalTx, Boolean>() { |
| @Override public boolean collect(IgniteInternalTx e) { |
| return true; |
| } |
| |
| @Override public Boolean reduce() { |
| return true; |
| } |
| }); |
| |
| for (IgniteInternalTx tx : activeTransactions()) { |
| if (node != null) { |
| if (tx.originatingNodeId().equals(node.id())) { |
| assert needWaitTransaction(tx, topVer); |
| |
| res.add(tx.finishFuture()); |
| } |
| } |
| else if (needWaitTransaction(tx, topVer)) |
| res.add(tx.finishFuture()); |
| } |
| |
| res.markInitialized(); |
| |
| return res; |
| } |
| |
| /** |
| * Creates a future that will wait for finishing all tx updates on backups after all local transactions are finished. |
| * |
| * NOTE: |
| * As we send finish request to backup nodes after transaction successfully completed on primary node |
| * it's important to ensure that all updates from primary to backup are finished or at least remote transaction has created on backup node. |
| * |
| * @param finishLocalTxsFuture Local transactions finish future. |
| * @param topVer Topology version. |
| * @return Future that will be completed when all ongoing transactions are finished. |
| */ |
| public IgniteInternalFuture<?> finishAllTxs(IgniteInternalFuture<?> finishLocalTxsFuture, AffinityTopologyVersion topVer) { |
| final GridCompoundFuture finishAllTxsFuture = new CacheObjectsReleaseFuture("AllTx", topVer); |
| |
| // After finishing all local updates, wait for finishing all tx updates on backups. |
| finishLocalTxsFuture.listen(future -> { |
| finishAllTxsFuture.add(cctx.mvcc().finishRemoteTxs(topVer)); |
| finishAllTxsFuture.markInitialized(); |
| }); |
| |
| return finishAllTxsFuture; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @param topVer Exchange version. |
| * @return {@code True} if need wait transaction for exchange. |
| */ |
| public boolean needWaitTransaction(IgniteInternalTx tx, AffinityTopologyVersion topVer) { |
| AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot(); |
| |
| return txTopVer != null && txTopVer.compareTo(topVer) < 0; |
| } |
| |
| /** |
| * Transaction start callback (has to do with when any operation was |
| * performed on this transaction). |
| * |
| * @param tx Started transaction. |
| * @return {@code True} if transaction is not in completed set. |
| */ |
| public boolean onStarted(IgniteInternalTx tx) { |
| assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid transaction state [locId=" + cctx.localNodeId() + |
| ", tx=" + tx + ']'; |
| |
| if (isCompleted(tx)) { |
| ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); |
| |
| txIdMap.remove(tx.xidVersion(), tx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Attempt to start a completed transaction (will ignore): " + tx); |
| |
| return false; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Transaction started: " + tx); |
| |
| return true; |
| } |
| |
| /** |
| * @param from Near version. |
| * @return DHT version for a near version. |
| */ |
| public GridCacheVersion mappedVersion(GridCacheVersion from) { |
| GridCacheVersion to = mappedVers.get(from); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Found mapped version [from=" + from + ", to=" + to); |
| |
| return to; |
| } |
| |
| /** |
| * |
| * @param ver Alternate version. |
| * @param tx Transaction. |
| */ |
| public void addAlternateVersion(GridCacheVersion ver, IgniteInternalTx tx) { |
| if (idMap.putIfAbsent(ver, tx) == null) |
| if (log.isDebugEnabled()) |
| log.debug("Registered alternate transaction version [ver=" + ver + ", tx=" + tx + ']'); |
| } |
| |
| /** |
| * @return Local transaction. |
| */ |
| @Nullable public IgniteTxLocalAdapter localTx() { |
| IgniteTxLocalAdapter tx = tx(); |
| |
| return tx != null && tx.local() ? tx : null; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @return Transaction for current thread. |
| */ |
| public GridNearTxLocal threadLocalTx(GridCacheContext cctx) { |
| IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId()); |
| |
| if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) { |
| assert tx instanceof GridNearTxLocal : tx; |
| |
| return (GridNearTxLocal)tx; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Sets transaction for current thread. |
| * @return Previously associated transaction. |
| */ |
| public IgniteInternalTx tx(IgniteInternalTx tx) { |
| long key = Thread.currentThread().getId(); |
| |
| return tx == null ? threadMap.remove(key) : threadMap.put(key, tx); |
| } |
| |
| /** |
| * @return Transaction for current thread. |
| */ |
| public <T> T tx() { |
| IgniteInternalTx tx = txContext(); |
| |
| return tx != null ? (T)tx : (T)tx(null, Thread.currentThread().getId()); |
| } |
| |
| /** |
| * @param threadId Thread ID. |
| * @param ignore Transaction to ignore. |
| * @return Not null topology version if current thread holds lock preventing topology change. |
| */ |
| @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { |
| IgniteInternalTx tx = threadMap.get(threadId); |
| |
| if (tx != null) { |
| AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); |
| |
| if (topVer != null) |
| return topVer; |
| } |
| |
| if (!sysThreadMap.isEmpty()) { |
| for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { |
| if (!cacheCtx.systemTx()) |
| continue; |
| |
| tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); |
| |
| if (tx != null && tx != ignore) { |
| AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); |
| |
| if (topVer != null) |
| return topVer; |
| } |
| } |
| } |
| |
| return txTop.get(); |
| } |
| |
| /** |
| * @param topVer Locked topology version. |
| * @return {@code True} if topology hint was set. |
| */ |
| public boolean setTxTopologyHint(@Nullable AffinityTopologyVersion topVer) { |
| if (topVer == null) |
| txTop.set(null); |
| else { |
| if (txTop.get() == null) { |
| txTop.set(topVer); |
| |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @return User transaction for current thread. |
| */ |
| @Nullable public GridNearTxLocal userTx() { |
| IgniteInternalTx tx = txContext(); |
| |
| if (activeUserTx(tx)) |
| return (GridNearTxLocal)tx; |
| |
| tx = tx(null, Thread.currentThread().getId()); |
| |
| if (activeUserTx(tx)) |
| return (GridNearTxLocal)tx; |
| |
| return null; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @return User transaction for current thread. |
| */ |
| @Nullable GridNearTxLocal userTx(GridCacheContext cctx) { |
| IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId()); |
| |
| if (activeUserTx(tx)) |
| return (GridNearTxLocal)tx; |
| |
| return null; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @return {@code True} if given transaction is explicitly started user transaction. |
| */ |
| private boolean activeUserTx(@Nullable IgniteInternalTx tx) { |
| if (tx != null && tx.user() && tx.state() == ACTIVE) { |
| assert tx instanceof GridNearTxLocal : tx; |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param threadId Id of thread for transaction. |
| * @return Transaction for thread with given ID. |
| */ |
| private <T> T tx(GridCacheContext cctx, long threadId) { |
| if (cctx == null || !cctx.systemTx()) |
| return (T)threadMap.get(threadId); |
| |
| TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); |
| |
| return (T)sysThreadMap.get(key); |
| } |
| |
| /** |
| * @return {@code True} if current thread is currently within transaction. |
| */ |
| public boolean inUserTx() { |
| return userTx() != null; |
| } |
| |
| /** |
| * @param txId Transaction ID. |
| * @return Transaction with given ID. |
| */ |
| @Nullable public <T extends IgniteInternalTx> T tx(GridCacheVersion txId) { |
| return (T)idMap.get(txId); |
| } |
| |
| /** |
| * @param txId Transaction ID. |
| * @return Transaction with given ID. |
| */ |
| @Nullable public <T extends IgniteInternalTx> T nearTx(GridCacheVersion txId) { |
| return (T)nearIdMap.get(txId); |
| } |
| |
| /** |
| * Handles prepare stage. |
| * |
| * @param tx Transaction to prepare. |
| * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}. |
| * @throws IgniteCheckedException If preparation failed. |
| */ |
| public void prepareTx(IgniteInternalTx tx, @Nullable Collection<IgniteTxEntry> entries) throws IgniteCheckedException { |
| if (tx.state() == MARKED_ROLLBACK) { |
| if (tx.remainingTime() == -1) |
| throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); |
| |
| throw new IgniteCheckedException("Transaction is marked for rollback: " + tx); |
| } |
| |
| // One-phase commit tx cannot timeout on prepare because it is expected to be committed. |
| if (tx.remainingTime() == -1 && !tx.onePhaseCommit()) { |
| tx.setRollbackOnly(); |
| |
| throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); |
| } |
| |
| if (tx.pessimistic() && tx.local()) |
| return; // Nothing else to do in pessimistic mode. |
| |
| // Optimistic or remote tx. |
| assert tx.optimistic() || !tx.local(); |
| |
| if (!lockMultiple(tx, entries != null ? entries : tx.optimisticLockEntries())) { |
| tx.setRollbackOnly(); |
| |
| throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction (lock conflict): " + tx); |
| } |
| } |
| |
| /** |
| * @param tx Transaction. |
| */ |
| private void removeObsolete(IgniteInternalTx tx) { |
| Collection<IgniteTxEntry> entries = tx.local() ? tx.allEntries() : tx.writeEntries(); |
| |
| for (IgniteTxEntry entry : entries) { |
| cctx.database().checkpointReadLock(); |
| |
| try { |
| GridCacheEntryEx cached = entry.cached(); |
| |
| GridCacheContext cacheCtx = entry.context(); |
| |
| if (cached == null) |
| cached = cacheCtx.cache().peekEx(entry.key()); |
| |
| if (cached.detached()) |
| continue; |
| |
| try { |
| if (cached.obsolete() || cached.markObsoleteIfEmpty(tx.xidVersion())) |
| cacheCtx.cache().removeEntry(cached); |
| |
| if (!tx.near() && isNearEnabled(cacheCtx)) { |
| GridNearCacheAdapter near = cacheCtx.isNear() ? cacheCtx.near() : cacheCtx.dht().near(); |
| |
| GridNearCacheEntry e = near.peekExx(entry.key()); |
| |
| if (e != null && e.markObsoleteIfEmpty(null)) |
| near.removeEntry(e); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to remove obsolete entry from cache: " + cached, e); |
| } |
| } |
| finally { |
| cctx.database().checkpointReadUnlock(); |
| } |
| } |
| } |
| |
| /** |
| * @param min Minimum version. |
| * @return Pair [committed, rolledback] - never {@code null}, elements potentially empty, |
| * but also never {@code null}. |
| */ |
| public IgnitePair<Collection<GridCacheVersion>> versions(GridCacheVersion min) { |
| Collection<GridCacheVersion> committed = null; |
| Collection<GridCacheVersion> rolledback = null; |
| |
| for (Map.Entry<GridCacheVersion, Boolean> e : completedVersSorted.tailMap(min, true).entrySet()) { |
| if (e.getValue()) { |
| if (committed == null) |
| committed = new ArrayList<>(); |
| |
| committed.add(e.getKey()); |
| } |
| else { |
| if (rolledback == null) |
| rolledback = new ArrayList<>(); |
| |
| rolledback.add(e.getKey()); |
| } |
| } |
| |
| return new IgnitePair<>( |
| committed == null ? Collections.<GridCacheVersion>emptyList() : committed, |
| rolledback == null ? Collections.<GridCacheVersion>emptyList() : rolledback); |
| } |
| |
| /** |
| * Peeks completed versions history map to find out whether transaction was committed or rolled back |
| * in the recent past. |
| * |
| * @param xid Transaction XID version. |
| * @return <code>true</code> if transaction was committed, <code>false</code> if transaction was rolled back, |
| * <code>null</code> if information is missed in history. |
| */ |
| public Boolean peekCompletedVersionsHistory(GridCacheVersion xid) { |
| Object o = completedVersHashMap.get(xid); |
| |
| return (o instanceof Boolean) ? (Boolean)o : null; |
| } |
| |
| /** |
| * @return Collection of active transactions. |
| */ |
| public Collection<IgniteInternalTx> activeTransactions() { |
| return F.concat(false, idMap.values(), nearIdMap.values()); |
| } |
| |
| /** |
| * @param tx Tx to remove. |
| */ |
| public void removeCommittedTx(IgniteInternalTx tx) { |
| completedVersHashMap.remove(tx.xidVersion(), true); |
| |
| if (tx.needsCompletedVersions()) |
| completedVersSorted.remove(tx.xidVersion(), true); |
| } |
| |
| /** |
| * @param tx Committed transaction. |
| */ |
| public void addCommittedTx(IgniteInternalTx tx) { |
| addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion()); |
| } |
| |
| /** |
| * @param tx Committed transaction. |
| */ |
| public void addCommittedTxReturn(IgniteInternalTx tx, GridCacheReturnCompletableWrapper ret) { |
| addCommittedTxReturn(tx.nearXidVersion(), null, ret); |
| } |
| |
| /** |
| * @param tx Committed transaction. |
| * @return If transaction was not already present in completed set. |
| */ |
| public boolean addRolledbackTx(IgniteInternalTx tx) { |
| return addRolledbackTx(tx, tx.xidVersion()); |
| } |
| |
| /** |
| * @param tx Tx. |
| * @param xidVer Completed transaction version. |
| * @param nearXidVer Optional near transaction ID. |
| * @return If transaction was not already present in completed set. |
| */ |
| public boolean addCommittedTx( |
| IgniteInternalTx tx, |
| GridCacheVersion xidVer, |
| @Nullable GridCacheVersion nearXidVer |
| ) { |
| if (nearXidVer != null) |
| xidVer = new CommittedVersion(xidVer, nearXidVer); |
| |
| Object committed0 = completedVersHashMap.putIfAbsent(xidVer, true); |
| |
| if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { |
| Boolean b = completedVersSorted.putIfAbsent(xidVer, true); |
| |
| assert b == null; |
| } |
| |
| Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); |
| |
| return committed0 == null || committed; |
| } |
| |
| /** |
| * @param xidVer Completed transaction version. |
| * @param nearXidVer Optional near transaction ID. |
| * @param retVal Invoke result. |
| */ |
| private void addCommittedTxReturn( |
| GridCacheVersion xidVer, |
| @Nullable GridCacheVersion nearXidVer, |
| GridCacheReturnCompletableWrapper retVal |
| ) { |
| assert retVal != null; |
| |
| if (nearXidVer != null) |
| xidVer = new CommittedVersion(xidVer, nearXidVer); |
| |
| Object prev = completedVersHashMap.putIfAbsent(xidVer, retVal); |
| |
| assert prev == null || Boolean.FALSE.equals(prev) : prev; // Can be rolled back. |
| } |
| |
| /** |
| * @param tx Tx. |
| * @param xidVer Completed transaction version. |
| * @return If transaction was not already present in completed set. |
| */ |
| public boolean addRolledbackTx( |
| IgniteInternalTx tx, |
| GridCacheVersion xidVer |
| ) { |
| Object committed0 = completedVersHashMap.putIfAbsent(xidVer, false); |
| |
| if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { |
| Boolean b = completedVersSorted.putIfAbsent(xidVer, false); |
| |
| assert b == null; |
| } |
| |
| Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); |
| |
| return committed0 == null || !committed; |
| } |
| |
| /** |
| * @param xidVer xidVer Completed transaction version. |
| * @return Tx result. |
| */ |
| public GridCacheReturnCompletableWrapper getCommittedTxReturn(GridCacheVersion xidVer) { |
| Object retVal = completedVersHashMap.get(xidVer); |
| |
| // Will gain true in regular case or GridCacheReturn in onePhaseCommit case. |
| if (!Boolean.TRUE.equals(retVal)) { |
| assert !Boolean.FALSE.equals(retVal); // Method should be used only after 'committed' checked. |
| |
| GridCacheReturnCompletableWrapper res = (GridCacheReturnCompletableWrapper)retVal; |
| |
| removeTxReturn(xidVer); |
| |
| return res; |
| } |
| else |
| return null; |
| } |
| |
| /** |
| * @param xidVer xidVer Completed transaction version. |
| */ |
| public void removeTxReturn(GridCacheVersion xidVer) { |
| Object prev = completedVersHashMap.get(xidVer); |
| |
| if (prev instanceof GridCacheReturnCompletableWrapper) |
| completedVersHashMap.replace(xidVer, prev, true); |
| } |
| |
| /** |
| * @param tx Transaction. |
| */ |
| private void processCompletedEntries(IgniteInternalTx tx) { |
| if (tx.needsCompletedVersions()) { |
| GridCacheVersion min = minVersion(tx.readEntries(), tx.xidVersion(), tx); |
| |
| min = minVersion(tx.writeEntries(), min, tx); |
| |
| assert min != null; |
| |
| IgnitePair<Collection<GridCacheVersion>> versPair = versions(min); |
| |
| tx.completedVersions(min, versPair.get1(), versPair.get2()); |
| } |
| } |
| |
| /** |
| * Collects versions for all pending locks for all entries within transaction |
| * |
| * @param dhtTxLoc Transaction being committed. |
| */ |
| private void collectPendingVersions(GridDhtTxLocal dhtTxLoc) { |
| if (dhtTxLoc.needsCompletedVersions()) { |
| if (log.isDebugEnabled()) |
| log.debug("Checking for pending locks with version less then tx version: " + dhtTxLoc); |
| |
| Set<GridCacheVersion> vers = new LinkedHashSet<>(); |
| |
| collectPendingVersions(dhtTxLoc.readEntries(), dhtTxLoc.xidVersion(), vers); |
| collectPendingVersions(dhtTxLoc.writeEntries(), dhtTxLoc.xidVersion(), vers); |
| |
| if (!vers.isEmpty()) |
| dhtTxLoc.pendingVersions(vers); |
| } |
| } |
| |
| /** |
| * Gets versions of all not acquired locks for collection of tx entries that are less then base version. |
| * |
| * @param entries Tx entries to process. |
| * @param baseVer Base version to compare with. |
| * @param vers Collection of versions that will be populated. |
| */ |
| @SuppressWarnings("TypeMayBeWeakened") |
| private void collectPendingVersions(Iterable<IgniteTxEntry> entries, |
| GridCacheVersion baseVer, Set<GridCacheVersion> vers) { |
| |
| // The locks are not released yet, so we can safely list pending candidates versions. |
| for (IgniteTxEntry txEntry : entries) { |
| GridCacheEntryEx cached = txEntry.cached(); |
| |
| try { |
| // If check should be faster then exception handling. |
| if (!cached.obsolete()) { |
| for (GridCacheMvccCandidate cand : cached.localCandidates()) { |
| if (!cand.owner() && cand.version().compareTo(baseVer) < 0) { |
| if (log.isDebugEnabled()) |
| log.debug("Adding candidate version to pending set: " + cand); |
| |
| vers.add(cand.version()); |
| } |
| } |
| } |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("There are no pending locks for entry (entry was deleted in transaction): " + txEntry); |
| } |
| } |
| } |
| |
| /** |
| * Go through all candidates for entries involved in transaction and find their min |
| * version. We know that these candidates will commit after this transaction, and |
| * therefore we can grab the min version so we can send all committed and rolled |
| * back versions from min to current to remote nodes for re-ordering. |
| * |
| * @param entries Entries. |
| * @param min Min version so far. |
| * @param tx Transaction. |
| * @return Minimal available version. |
| */ |
| private GridCacheVersion minVersion(Iterable<IgniteTxEntry> entries, GridCacheVersion min, |
| IgniteInternalTx tx) { |
| for (IgniteTxEntry txEntry : entries) { |
| GridCacheEntryEx cached = txEntry.cached(); |
| |
| // We are assuming that this method is only called on commit. In that |
| // case, if lock is held, entry can never be removed. |
| assert txEntry.isRead() || !cached.obsolete(tx.xidVersion()) : |
| "Invalid obsolete version for transaction [entry=" + cached + ", tx=" + tx + ']'; |
| |
| for (GridCacheMvccCandidate cand : cached.remoteMvccSnapshot()) |
| if (min == null || cand.version().isLess(min)) |
| min = cand.version(); |
| } |
| |
| return min; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @return {@code True} if transaction read entries should be unlocked. |
| */ |
| private boolean unlockReadEntries(IgniteInternalTx tx) { |
| if (tx.pessimistic()) |
| return !tx.readCommitted(); |
| else |
| return tx.serializable(); |
| } |
| |
| /** |
| * Commits a transaction. |
| * |
| * @param tx Transaction to commit. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void commitTx(IgniteInternalTx tx) throws IgniteCheckedException { |
| assert tx != null; |
| assert tx.state() == COMMITTING : "Invalid transaction state for commit from tm [state=" + tx.state() + |
| ", expected=COMMITTING, tx=" + tx + ']'; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Committing from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); |
| |
| /* |
| * Note that write phase is handled by transaction adapter itself, |
| * so we don't do it here. |
| */ |
| |
| Object committed0 = completedVersHashMap.get(tx.xidVersion()); |
| |
| Boolean committed = committed0 != null && !committed0.equals(Boolean.FALSE); |
| |
| // 1. Make sure that committed version has been recorded. |
| if (!(committed || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { |
| uncommitTx(tx); |
| |
| tx.errorWhenCommitting(); |
| |
| throw new IgniteCheckedException("Missing commit version (consider increasing " + |
| IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + |
| ", committed0=" + committed0 + |
| ", tx=" + tx.getClass().getSimpleName() + ']'); |
| } |
| |
| ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); |
| |
| if (txIdMap.remove(tx.xidVersion(), tx)) { |
| // 2. Must process completed entries before unlocking! |
| processCompletedEntries(tx); |
| |
| if (tx instanceof GridDhtTxLocal) { |
| GridDhtTxLocal dhtTxLoc = (GridDhtTxLocal)tx; |
| |
| collectPendingVersions(dhtTxLoc); |
| } |
| |
| // 3. Unlock write resources. |
| unlockMultiple(tx, tx.writeEntries()); |
| |
| // 4. Unlock read resources if required. |
| if (unlockReadEntries(tx)) |
| unlockMultiple(tx, tx.readEntries()); |
| |
| // 5. Notify evictions. |
| notifyEvictions(tx); |
| |
| // 6. Remove obsolete entries from cache. |
| removeObsolete(tx); |
| |
| // 7. Assign transaction number at the end of transaction. |
| tx.endVersion(cctx.versions().next(tx.topologyVersion())); |
| |
| // 8. Remove from per-thread storage. |
| clearThreadMap(tx); |
| |
| // 9. Unregister explicit locks. |
| if (!tx.alternateVersions().isEmpty()) { |
| for (GridCacheVersion ver : tx.alternateVersions()) |
| idMap.remove(ver); |
| } |
| |
| // 10. Remove Near-2-DHT mappings. |
| if (tx instanceof GridCacheMappedVersion) { |
| GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); |
| |
| if (mapped != null) |
| mappedVers.remove(mapped); |
| } |
| |
| // 11. Clear context. |
| resetContext(); |
| |
| // 12. Update metrics. |
| if (!tx.dht() && tx.local()) { |
| if (!tx.system()) |
| cctx.txMetrics().onTxCommit(); |
| |
| tx.txState().onTxEnd(cctx, tx, true); |
| } |
| |
| if (slowTxWarnTimeout > 0 && tx.local() && |
| U.currentTimeMillis() - tx.startTime() > slowTxWarnTimeout) |
| U.warn(log, "Slow transaction detected [tx=" + tx + |
| ", slowTxWarnTimeout=" + slowTxWarnTimeout + ']') ; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Committed from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Did not commit from TM (was already committed): " + tx); |
| } |
| |
| /** |
| * Rolls back a transaction. |
| * |
| * @param tx Transaction to rollback. |
| * @param clearThreadMap {@code True} if need remove tx from thread map. |
| * @param skipCompletedVers {@code True} if tx should skip adding itself to completed versions map on finish. |
| */ |
| public void rollbackTx(IgniteInternalTx tx, boolean clearThreadMap, boolean skipCompletedVers) { |
| assert tx != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Rolling back from TM [locNodeId=" + cctx.localNodeId() + ", tx=" + tx + ']'); |
| |
| // 1. Record transaction version to avoid duplicates. |
| if (!skipCompletedVers) |
| addRolledbackTx(tx); |
| |
| ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); |
| |
| if (txIdMap.remove(tx.xidVersion(), tx)) { |
| // 2. Unlock write resources. |
| unlockMultiple(tx, tx.writeEntries()); |
| |
| // 3. Unlock read resources if required. |
| if (unlockReadEntries(tx)) |
| unlockMultiple(tx, tx.readEntries()); |
| |
| // 4. Notify evictions. |
| notifyEvictions(tx); |
| |
| // 5. Remove obsolete entries. |
| removeObsolete(tx); |
| |
| // 6. Remove from per-thread storage. |
| if (clearThreadMap) |
| clearThreadMap(tx); |
| |
| // 7. Unregister explicit locks. |
| if (!tx.alternateVersions().isEmpty()) |
| for (GridCacheVersion ver : tx.alternateVersions()) |
| idMap.remove(ver); |
| |
| // 8. Remove Near-2-DHT mappings. |
| if (tx instanceof GridCacheMappedVersion) |
| mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); |
| |
| // 9. Clear context. |
| resetContext(); |
| |
| // 10. Update metrics. |
| if (!tx.dht() && tx.local()) { |
| if (!tx.system()) |
| cctx.txMetrics().onTxRollback(); |
| |
| tx.txState().onTxEnd(cctx, tx, false); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Rolled back from TM: " + tx); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Did not rollback from TM (was already rolled back): " + tx); |
| } |
| |
| /** |
| * Fast finish transaction. Can be used only if no locks were acquired. |
| * |
| * @param tx Transaction to finish. |
| * @param commit {@code True} if transaction is committed, {@code false} if rolled back. |
| * @param clearThreadMap {@code True} if need remove tx from thread map. |
| */ |
| public void fastFinishTx(GridNearTxLocal tx, boolean commit, boolean clearThreadMap) { |
| assert tx != null; |
| tx.writeMap().isEmpty(); |
| assert tx.optimistic() || tx.readMap().isEmpty(); |
| |
| ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); |
| |
| if (txIdMap.remove(tx.xidVersion(), tx)) { |
| // 1. Notify evictions. |
| notifyEvictions(tx); |
| |
| // 2. Evict near entries. |
| if (!tx.readMap().isEmpty()) { |
| for (IgniteTxEntry entry : tx.readMap().values()) |
| tx.evictNearEntry(entry, false); |
| } |
| |
| // 3. Remove obsolete entries. |
| removeObsolete(tx); |
| |
| // 4. Remove from per-thread storage. |
| if (clearThreadMap) |
| clearThreadMap(tx); |
| |
| // 5. Clear context. |
| resetContext(); |
| |
| // 6. Update metrics. |
| if (!tx.dht() && tx.local()) { |
| if (!tx.system()) { |
| if (commit) |
| cctx.txMetrics().onTxCommit(); |
| else |
| cctx.txMetrics().onTxRollback(); |
| } |
| |
| tx.txState().onTxEnd(cctx, tx, commit); |
| } |
| } |
| } |
| |
| /** |
| * Removes Tx from manager. Can be used only if there were no updates. |
| * |
| * @param tx Transaction to finish. |
| */ |
| public void forgetTx(IgniteInternalTx tx) { |
| assert tx != null; |
| |
| if (transactionMap(tx).remove(tx.xidVersion(), tx)) { |
| // 1. Remove from per-thread storage. |
| clearThreadMap(tx); |
| |
| // 2. Unregister explicit locks. |
| if (!tx.alternateVersions().isEmpty()) |
| for (GridCacheVersion ver : tx.alternateVersions()) |
| idMap.remove(ver); |
| |
| // 3. Remove Near-2-DHT mappings. |
| if (tx instanceof GridCacheMappedVersion) |
| mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); |
| |
| // 4. Clear context. |
| resetContext(); |
| |
| // 5. Complete finish future. |
| tx.state(UNKNOWN); |
| } |
| } |
| |
| /** |
| * Tries to minimize damage from partially-committed transaction. |
| * |
| * @param tx Tx to uncommit. |
| */ |
| void uncommitTx(IgniteInternalTx tx) { |
| assert tx != null; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Uncommiting from TM: " + tx); |
| |
| ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx); |
| |
| if (txIdMap.remove(tx.xidVersion(), tx)) { |
| // 1. Unlock write resources. |
| unlockMultiple(tx, tx.writeEntries()); |
| |
| // 2. Unlock read resources if required. |
| if (unlockReadEntries(tx)) |
| unlockMultiple(tx, tx.readEntries()); |
| |
| // 3. Notify evictions. |
| notifyEvictions(tx); |
| |
| // 4. Remove from per-thread storage. |
| clearThreadMap(tx); |
| |
| // 5. Unregister explicit locks. |
| if (!tx.alternateVersions().isEmpty()) { |
| for (GridCacheVersion ver : tx.alternateVersions()) |
| idMap.remove(ver); |
| } |
| |
| // 6. Remove Near-2-DHT mappings. |
| if (tx instanceof GridCacheMappedVersion) |
| mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); |
| |
| // 7. Clear context. |
| resetContext(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Uncommitted from TM: " + tx); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Did not uncommit from TM (was already committed or rolled back): " + tx); |
| } |
| |
| /** |
| * @param tx Transaction to clear. |
| */ |
| public void clearThreadMap(IgniteInternalTx tx) { |
| if (tx.local() && !tx.dht()) { |
| assert tx instanceof GridNearTxLocal : tx; |
| |
| if (!tx.system()) |
| threadMap.remove(tx.threadId(), tx); |
| else { |
| Integer cacheId = tx.txState().firstCacheId(); |
| |
| if (cacheId != null) |
| sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); |
| else { |
| for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator(); it.hasNext(); ) { |
| IgniteInternalTx txx = it.next(); |
| |
| if (tx == txx) { |
| it.remove(); |
| |
| break; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Enters system section for thread local near tx, if it is present. |
| * In this section system time for this transaction is counted. |
| */ |
| public void enterNearTxSystemSection() { |
| GridNearTxLocal tx = threadLocalTx(null); |
| |
| if (tx != null) |
| tx.enterSystemSection(); |
| } |
| |
| /** |
| * Leaves system section for thread local near tx, if it is present. |
| */ |
| public void leaveNearTxSystemSection() { |
| GridNearTxLocal tx = threadLocalTx(null); |
| |
| if (tx != null) |
| tx.leaveSystemSection(); |
| } |
| |
| /** |
| * Gets transaction ID map depending on transaction type. |
| * |
| * @param tx Transaction. |
| * @return Transaction map. |
| */ |
| private ConcurrentMap<GridCacheVersion, IgniteInternalTx> transactionMap(IgniteInternalTx tx) { |
| return (tx.near() && !tx.local()) ? nearIdMap : idMap; |
| } |
| |
| /** |
| * @param tx Transaction to notify evictions for. |
| */ |
| private void notifyEvictions(IgniteInternalTx tx) { |
| if (tx.internal()) |
| return; |
| |
| for (IgniteTxEntry txEntry : tx.allEntries()) |
| txEntry.cached().context().evicts().touch(txEntry, tx.local()); |
| } |
| |
| /** |
| * Callback invoked whenever a member of a transaction acquires |
| * lock ownership. |
| * |
| * @param entry Cache entry. |
| * @param owner Candidate that won ownership. |
| * @return {@code True} if transaction was notified, {@code false} otherwise. |
| */ |
| public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { |
| // We only care about acquired locks. |
| if (owner != null) { |
| IgniteTxAdapter tx = entry.isNear() ? nearTx(owner.version()) : tx(owner.version()); |
| |
| if (tx != null) { |
| if (!tx.local()) { |
| if (log.isDebugEnabled()) |
| log.debug("Found transaction for owner changed event [owner=" + owner + ", entry=" + entry + |
| ", tx=" + tx + ']'); |
| |
| tx.onOwnerChanged(entry, owner); |
| |
| return true; |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Ignoring local transaction for owner change event: " + tx); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Transaction not found for owner changed event [owner=" + owner + ", entry=" + entry + ']'); |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @param entries Entries to lock. |
| * @return {@code True} if all keys were locked. |
| * @throws IgniteCheckedException If lock has been cancelled. |
| */ |
| private boolean lockMultiple(IgniteInternalTx tx, Iterable<IgniteTxEntry> entries) |
| throws IgniteCheckedException { |
| assert tx.optimistic() || !tx.local(); |
| |
| long remainingTime = tx.remainingTime(); |
| |
| // For serializable transactions, failure to acquire lock means |
| // that there is a serializable conflict. For all other isolation levels, |
| // we wait for the lock. |
| long timeout = remainingTime < 0 ? 0 : remainingTime; |
| |
| GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null; |
| |
| for (IgniteTxEntry txEntry1 : entries) { |
| // Check if this entry was prepared before. |
| if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null) |
| continue; |
| |
| GridCacheContext cacheCtx = txEntry1.context(); |
| |
| while (true) { |
| cctx.database().checkpointReadLock(); |
| |
| try { |
| GridCacheEntryEx entry1 = txEntry1.cached(); |
| |
| assert entry1 != null : txEntry1; |
| assert !entry1.detached() : "Expected non-detached entry for near transaction " + |
| "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']'; |
| |
| GridCacheVersion serReadVer = txEntry1.entryReadVersion(); |
| |
| assert serReadVer == null || (tx.optimistic() && tx.serializable()) : txEntry1; |
| |
| boolean read = serOrder != null && txEntry1.op() == READ; |
| |
| entry1.unswap(); |
| |
| if (!entry1.tmLock(tx, timeout, serOrder, serReadVer, read)) { |
| // Unlock locks locked so far. |
| for (IgniteTxEntry txEntry2 : entries) { |
| if (txEntry2 == txEntry1) |
| break; |
| |
| txUnlock(tx, txEntry2); |
| } |
| |
| return false; |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry in TM lockMultiple(..) method (will retry): " + txEntry1); |
| |
| try { |
| // Renew cache entry. |
| txEntry1.cached(cacheCtx.cache().entryEx(txEntry1.key(), tx.topologyVersion())); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| assert tx.dht() : "Received invalid partition for non DHT transaction [tx=" + |
| tx + ", invalidPart=" + e.partition() + ']'; |
| |
| // If partition is invalid, we ignore this entry. |
| tx.addInvalidPartition(cacheCtx.cacheId(), e.partition()); |
| |
| break; |
| } |
| } |
| catch (GridDistributedLockCancelledException ignore) { |
| tx.setRollbackOnly(); |
| |
| throw new IgniteCheckedException("Entry lock has been cancelled for transaction: " + tx); |
| } |
| finally { |
| cctx.database().checkpointReadUnlock(); |
| } |
| } |
| } |
| |
| return true; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @param txEntry Entry to unlock. |
| */ |
| private void txUnlock(IgniteInternalTx tx, IgniteTxEntry txEntry) { |
| while (true) { |
| try { |
| GridCacheEntryEx entry = txEntry.cached(); |
| |
| assert entry != null; |
| |
| if (entry.detached()) |
| break; |
| |
| entry.txUnlock(tx); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry in TM txUnlock(..) method (will retry): " + txEntry); |
| |
| try { |
| txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), tx.topologyVersion())); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| return; // Ignore and proceed to next lock. |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param tx Owning transaction. |
| * @param entries Entries to unlock. |
| */ |
| private void unlockMultiple(IgniteInternalTx tx, Iterable<IgniteTxEntry> entries) { |
| for (IgniteTxEntry txEntry : entries) |
| txUnlock(tx, txEntry); |
| } |
| |
| /** |
| * @param tx Committing transaction. |
| */ |
| public void txContext(IgniteInternalTx tx) { |
| threadCtx.set(tx); |
| } |
| |
| /** |
| * @return Currently committing transaction. |
| */ |
| private IgniteInternalTx txContext() { |
| return threadCtx.get(); |
| } |
| |
| /** |
| * Gets version of transaction in tx context or {@code null} |
| * if tx context is empty. |
| * <p> |
| * This is a convenience method provided mostly for debugging. |
| * |
| * @return Transaction version from transaction context. |
| */ |
| @Nullable public GridCacheVersion txContextVersion() { |
| IgniteInternalTx tx = txContext(); |
| |
| return tx == null ? null : tx.xidVersion(); |
| } |
| |
| /** |
| * Commit ended. |
| */ |
| public void resetContext() { |
| threadCtx.set(null); |
| } |
| |
| /** |
| * @return Slow tx warn timeout. |
| */ |
| public int slowTxWarnTimeout() { |
| return slowTxWarnTimeout; |
| } |
| |
| /** |
| * @param slowTxWarnTimeout Slow tx warn timeout. |
| */ |
| public void slowTxWarnTimeout(int slowTxWarnTimeout) { |
| this.slowTxWarnTimeout = slowTxWarnTimeout; |
| } |
| |
| /** |
| * @return Long operations dump timeout. |
| */ |
| public long longOperationsDumpTimeout() { |
| return longOpsDumpTimeout; |
| } |
| |
| /** |
| * @param longOpsDumpTimeout Long operations dump timeout. |
| */ |
| public void longOperationsDumpTimeout(long longOpsDumpTimeout) { |
| this.longOpsDumpTimeout = longOpsDumpTimeout; |
| } |
| |
| /** |
| * Checks if transactions with given near version ID was prepared or committed. |
| * |
| * @param nearVer Near version ID. |
| * @param txNum Number of transactions. |
| * @return Future for flag indicating if transactions were prepared or committed or {@code null} for success future. |
| */ |
| @Nullable public IgniteInternalFuture<Boolean> txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) { |
| return txsPreparedOrCommitted(nearVer, txNum, null, null); |
| } |
| |
| /** |
| * @param xidVer Version. |
| * @return Future for flag indicating if transactions was committed. |
| */ |
| public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion xidVer) { |
| final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>(); |
| |
| final IgniteInternalTx tx = cctx.tm().tx(xidVer); |
| |
| if (tx != null) { |
| assert tx.near() && tx.local() : tx; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Found near transaction, will wait for completion: " + tx); |
| |
| tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { |
| @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { |
| TransactionState state = tx.state(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Near transaction finished with state: " + state); |
| |
| resFut.onDone(state == COMMITTED); |
| } |
| }); |
| |
| return resFut; |
| } |
| |
| boolean committed = false; |
| |
| for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) { |
| if (entry.getKey() instanceof CommittedVersion) { |
| CommittedVersion comm = (CommittedVersion)entry.getKey(); |
| |
| if (comm.nearVer.equals(xidVer)) { |
| committed = !entry.getValue().equals(Boolean.FALSE); |
| |
| break; |
| } |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Near transaction committed: " + committed); |
| |
| resFut.onDone(committed); |
| |
| return resFut; |
| } |
| |
| /** |
| * @param nearVer Near version. |
| * @return Finish future for related remote transactions. |
| */ |
| @SuppressWarnings("unchecked") |
| public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) { |
| GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>(); |
| |
| for (final IgniteInternalTx tx : activeTransactions()) { |
| if (!tx.local() && nearVer.equals(tx.nearXidVersion())) |
| fut.add((IgniteInternalFuture) tx.finishFuture()); |
| } |
| |
| fut.markInitialized(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param nearVer Near version ID. |
| * @param txNum Number of transactions. |
| * @param fut Result future. |
| * @param processedVers Processed versions. |
| * @return Future for flag indicating if transactions were prepared or committed or {@code null} for success future. |
| */ |
| @Nullable private IgniteInternalFuture<Boolean> txsPreparedOrCommitted(final GridCacheVersion nearVer, |
| int txNum, |
| @Nullable GridFutureAdapter<Boolean> fut, |
| @Nullable Collection<GridCacheVersion> processedVers) |
| { |
| for (final IgniteInternalTx tx : activeTransactions()) { |
| if (nearVer.equals(tx.nearXidVersion())) { |
| IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); |
| |
| if (prepFut != null && !prepFut.isDone()) { |
| if (log.isDebugEnabled()) |
| log.debug("Transaction is preparing (will wait): " + tx); |
| |
| final GridFutureAdapter<Boolean> fut0 = fut != null ? fut : new GridFutureAdapter<Boolean>(); |
| |
| final int txNum0 = txNum; |
| |
| final Collection<GridCacheVersion> processedVers0 = processedVers; |
| |
| prepFut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> prepFut) { |
| if (log.isDebugEnabled()) |
| log.debug("Transaction prepare future finished: " + tx); |
| |
| IgniteInternalFuture<Boolean> fut = txsPreparedOrCommitted(nearVer, |
| txNum0, |
| fut0, |
| processedVers0); |
| |
| assert fut == fut0; |
| } |
| }); |
| |
| return fut0; |
| } |
| |
| TransactionState state = tx.state(); |
| |
| if (state == PREPARED || state == COMMITTING || state == COMMITTED) { |
| if (state == PREPARED) |
| tx.markFinalizing(RECOVERY_FINISH); // Prevents concurrent rollback. |
| |
| if (--txNum == 0) { |
| if (fut != null) |
| fut.onDone(true); |
| |
| return fut; |
| } |
| } |
| else { |
| if (tx.setRollbackOnly() || tx.state() == UNKNOWN) { |
| tx.rollbackAsync(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Transaction was not prepared (rolled back): " + tx); |
| |
| if (fut == null) |
| fut = new GridFutureAdapter<>(); |
| |
| fut.onDone(false); |
| |
| return fut; |
| } |
| else { |
| if (tx.state() == COMMITTED) { |
| if (--txNum == 0) { |
| if (fut != null) |
| fut.onDone(true); |
| |
| return fut; |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Transaction is not prepared: " + tx); |
| |
| if (fut == null) |
| fut = new GridFutureAdapter<>(); |
| |
| fut.onDone(false); |
| |
| return fut; |
| } |
| } |
| } |
| |
| if (processedVers == null) |
| processedVers = U.newHashSet(txNum); |
| |
| processedVers.add(tx.xidVersion()); |
| } |
| } |
| |
| // Not all transactions were found. Need to scan committed versions to check |
| // if transaction was already committed. |
| for (Map.Entry<GridCacheVersion, Object> e : completedVersHashMap.entrySet()) { |
| if (e.getValue().equals(Boolean.FALSE)) |
| continue; |
| |
| GridCacheVersion ver = e.getKey(); |
| |
| if (processedVers != null && processedVers.contains(ver)) |
| continue; |
| |
| if (ver instanceof CommittedVersion) { |
| CommittedVersion commitVer = (CommittedVersion)ver; |
| |
| if (commitVer.nearVer.equals(nearVer)) { |
| if (--txNum == 0) { |
| if (fut != null) |
| fut.onDone(true); |
| |
| return fut; |
| } |
| } |
| } |
| } |
| |
| if (fut == null) |
| fut = new GridFutureAdapter<>(); |
| |
| fut.onDone(false); |
| |
| return fut; |
| } |
| |
| /** |
| * Commits or rolls back prepared transaction. |
| * |
| * @param tx Transaction. |
| * @param commit Whether transaction should be committed or rolled back. |
| */ |
| public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) { |
| if (log.isInfoEnabled()) |
| log.info("Finishing prepared transaction [commit=" + commit + ", tx=" + tx + ']'); |
| |
| // Transactions participating in recovery can be finished only by recovery consensus. |
| assert tx.finalizationStatus() == RECOVERY_FINISH : tx; |
| |
| if (tx instanceof IgniteTxRemoteEx) { |
| IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx; |
| |
| rmtTx.doneRemote(tx.xidVersion(), |
| Collections.<GridCacheVersion>emptyList(), |
| Collections.<GridCacheVersion>emptyList(), |
| Collections.<GridCacheVersion>emptyList()); |
| } |
| |
| if (commit) |
| tx.commitAsync().listen(new CommitListener(tx)); |
| else if (!tx.local()) |
| // remote (backup) transaction sends partition counters to other backup transaction on recovery rollback |
| // in order to keep counters consistent |
| neighborcastPartitionCountersAndRollback(tx); |
| else |
| tx.rollbackAsync(); |
| } |
| |
| /** */ |
| private void neighborcastPartitionCountersAndRollback(IgniteInternalTx tx) { |
| TxCounters txCounters = tx.txCounters(false); |
| |
| if (txCounters == null || txCounters.updateCounters() == null) |
| tx.rollbackAsync(); |
| |
| PartitionCountersNeighborcastFuture fut = new PartitionCountersNeighborcastFuture(tx, cctx); |
| |
| fut.listen(fut0 -> tx.rollbackAsync()); |
| |
| fut.init(); |
| } |
| |
| /** |
| * Commits transaction in case when node started transaction failed, but all related |
| * transactions were prepared (invalidates transaction if it is not fully prepared). |
| * |
| * @param tx Transaction. |
| * @param failedNodeIds Failed nodes IDs. |
| */ |
| public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) { |
| assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx; |
| assert !F.isEmpty(tx.transactionNodes()) : tx; |
| assert tx.nearXidVersion() != null : tx; |
| |
| // Transaction will be completed by finish message. |
| if (!tx.markFinalizing(RECOVERY_FINISH)) |
| return; |
| |
| GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture( |
| cctx, |
| tx, |
| failedNodeIds, |
| tx.transactionNodes()); |
| |
| cctx.mvcc().addFuture(fut, fut.futureId()); |
| |
| if (log.isInfoEnabled()) |
| log.info("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); |
| |
| fut.prepare(); |
| } |
| |
| /** |
| * @return {@code True} if deadlock detection is enabled. |
| */ |
| public boolean deadlockDetectionEnabled() { |
| return DEADLOCK_MAX_ITERS > 0; |
| } |
| |
| /** |
| * Performs deadlock detection for given keys. |
| * |
| * @param tx Target tx. |
| * @param keys Keys. |
| * @return Detection result. |
| */ |
| public IgniteInternalFuture<TxDeadlock> detectDeadlock( |
| IgniteInternalTx tx, |
| Set<IgniteTxKey> keys |
| ) { |
| return txDeadlockDetection.detectDeadlock(tx, keys); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param fut Future. |
| * @param txKeys Tx keys. |
| */ |
| void txLocksInfo(UUID nodeId, TxDeadlockFuture fut, Set<IgniteTxKey> txKeys) { |
| ClusterNode node = cctx.node(nodeId); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to finish deadlock detection, node left: " + nodeId); |
| |
| fut.onDone(); |
| |
| return; |
| } |
| |
| TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys); |
| |
| try { |
| if (!cctx.localNodeId().equals(nodeId)) |
| req.prepareMarshal(cctx); |
| |
| cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL); |
| } |
| catch (IgniteCheckedException e) { |
| if (e instanceof ClusterTopologyCheckedException) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to finish deadlock detection, node left: " + nodeId); |
| } |
| else |
| U.warn(log, "Failed to finish deadlock detection: " + e, e); |
| |
| fut.onDone(); |
| } |
| } |
| |
| /** |
| * @param tx Tx. |
| * @param txKeys Tx keys. |
| * @return {@code True} if key is involved into tx. |
| */ |
| private boolean hasKeys(IgniteInternalTx tx, Collection<IgniteTxKey> txKeys) { |
| for (IgniteTxKey key : txKeys) { |
| if (tx.txState().entry(key) != null) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param txKeys Tx keys. |
| * @return Transactions locks and nodes. |
| */ |
| private TxLocksResponse txLocksInfo(Collection<IgniteTxKey> txKeys) { |
| TxLocksResponse res = new TxLocksResponse(); |
| |
| Collection<IgniteInternalTx> txs = activeTransactions(); |
| |
| for (IgniteInternalTx tx : txs) { |
| boolean nearTxLoc = tx instanceof GridNearTxLocal; |
| |
| if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys)) |
| continue; |
| |
| IgniteTxState state = tx.txState(); |
| |
| assert state instanceof IgniteTxStateImpl || state instanceof IgniteTxImplicitSingleStateImpl; |
| |
| Collection<IgniteTxEntry> txEntries = |
| state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries(); |
| |
| Set<IgniteTxKey> requestedKeys = null; |
| |
| // Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction |
| // in order to reduce amount of requests to remote nodes. |
| if (nearTxLoc) { |
| if (tx.pessimistic()) { |
| GridDhtColocatedLockFuture fut = |
| (GridDhtColocatedLockFuture)mvccFuture(tx, GridDhtColocatedLockFuture.class); |
| |
| if (fut != null) |
| requestedKeys = fut.requestedKeys(); |
| |
| GridNearLockFuture nearFut = (GridNearLockFuture)mvccFuture(tx, GridNearLockFuture.class); |
| |
| if (nearFut != null) { |
| Set<IgniteTxKey> nearRequestedKeys = nearFut.requestedKeys(); |
| |
| if (nearRequestedKeys != null) { |
| if (requestedKeys == null) |
| requestedKeys = nearRequestedKeys; |
| else |
| requestedKeys = nearRequestedKeys; |
| } |
| } |
| } |
| else { |
| GridNearOptimisticTxPrepareFuture fut = |
| (GridNearOptimisticTxPrepareFuture)mvccFuture(tx, GridNearOptimisticTxPrepareFuture.class); |
| |
| if (fut != null) |
| requestedKeys = fut.requestedKeys(); |
| } |
| } |
| |
| for (IgniteTxEntry txEntry : txEntries) { |
| IgniteTxKey txKey = txEntry.txKey(); |
| |
| if (res.txLocks(txKey) == null) { |
| GridCacheMapEntry e = (GridCacheMapEntry)txEntry.cached(); |
| |
| List<GridCacheMvccCandidate> locs = e.mvccAllLocal(); |
| |
| if (locs != null) { |
| boolean owner = false; |
| |
| for (GridCacheMvccCandidate loc : locs) { |
| if (!owner && loc.owner() && loc.tx()) |
| owner = true; |
| |
| if (!owner) // Skip all candidates in case when no tx that owns lock. |
| break; |
| |
| if (loc.tx()) { |
| UUID nearNodeId = loc.otherNodeId(); |
| |
| GridCacheVersion txId = loc.otherVersion(); |
| |
| TxLock txLock = new TxLock( |
| txId == null ? loc.version() : txId, |
| nearNodeId == null ? loc.nodeId() : nearNodeId, |
| // We can get outdated value of thread ID, but this value only for information here. |
| loc.threadId(), |
| loc.owner() ? TxLock.OWNERSHIP_OWNER : TxLock.OWNERSHIP_CANDIDATE); |
| |
| res.addTxLock(txKey, txLock); |
| } |
| } |
| } |
| // Special case for optimal sequence of nodes processing. |
| else if (nearTxLoc && requestedKeys != null && requestedKeys.contains(txKey)) { |
| TxLock txLock = new TxLock( |
| tx.nearXidVersion(), |
| tx.nodeId(), |
| tx.threadId(), |
| TxLock.OWNERSHIP_REQUESTED); |
| |
| res.addTxLock(txKey, txLock); |
| } |
| else |
| res.addKey(txKey); |
| } |
| } |
| } |
| |
| return res; |
| } |
| |
| /** |
| * @param tx Tx. Must be instance of {@link GridNearTxLocal}. |
| * @param cls Future class. |
| * @return Cache future. |
| */ |
| private IgniteInternalFuture mvccFuture(IgniteInternalTx tx, Class<? extends IgniteInternalFuture> cls) { |
| assert tx instanceof GridNearTxLocal : tx; |
| |
| Collection<GridCacheVersionedFuture<?>> futs = cctx.mvcc().futuresForVersion(tx.nearXidVersion()); |
| |
| if (futs != null) { |
| for (GridCacheVersionedFuture<?> fut : futs) { |
| if (fut.getClass().equals(cls)) |
| return fut; |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @param fut Future. |
| */ |
| public void addFuture(TxDeadlockFuture fut) { |
| TxDeadlockFuture old = deadlockDetectFuts.put(fut.futureId(), fut); |
| |
| assert old == null : old; |
| } |
| |
| /** |
| * @param futId Future ID. |
| * @return Found future. |
| */ |
| @Nullable public TxDeadlockFuture future(long futId) { |
| return deadlockDetectFuts.get(futId); |
| } |
| |
| /** |
| * @param futId Future ID. |
| */ |
| public void removeFuture(long futId) { |
| deadlockDetectFuts.remove(futId); |
| } |
| |
| /** |
| * @param nodeId Node ID to send message to. |
| * @param ver Version to ack. |
| */ |
| public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) { |
| deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver); |
| } |
| |
| /** |
| * @return Collection of active transaction deadlock detection futures. |
| */ |
| public Collection<IgniteInternalFuture<?>> deadlockDetectionFutures() { |
| Collection<? extends IgniteInternalFuture<?>> values = deadlockDetectFuts.values(); |
| |
| return (Collection<IgniteInternalFuture<?>>)values; |
| } |
| |
| /** |
| * Suspends transaction. |
| * Should not be used directly. Use tx.suspend() instead. |
| * |
| * @param tx Transaction to be suspended. |
| * |
| * @see #resumeTx(GridNearTxLocal, long) |
| * @see GridNearTxLocal#suspend() |
| * @see GridNearTxLocal#resume() |
| * @throws IgniteCheckedException If failed to suspend transaction. |
| */ |
| public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException { |
| assert tx != null && !tx.system() : tx; |
| |
| if (tx.concurrency == PESSIMISTIC && !suspendResumeForPessimisticSupported) { |
| throw new IgniteCheckedException("Suspend operation cannot be called " + |
| "because some nodes in the cluster don't support this feature."); |
| } |
| |
| if (!tx.state(SUSPENDED)) { |
| throw new IgniteCheckedException("Trying to suspend transaction with incorrect state " |
| + "[expected=" + ACTIVE + ", actual=" + tx.state() + ']'); |
| } |
| |
| clearThreadMap(tx); |
| } |
| |
| /** |
| * Resume transaction in current thread. |
| * Please don't use directly. Use tx.resume() instead. |
| * |
| * @param tx Transaction to be resumed. |
| * @param threadId Thread id to restore. |
| * |
| * @see #suspendTx(GridNearTxLocal) |
| * @see GridNearTxLocal#suspend() |
| * @see GridNearTxLocal#resume() |
| * @throws IgniteCheckedException If failed to resume tx. |
| */ |
| public void resumeTx(GridNearTxLocal tx, long threadId) throws IgniteCheckedException { |
| assert tx != null && !tx.system() : tx; |
| |
| if (!tx.state(ACTIVE)) { |
| throw new IgniteCheckedException("Trying to resume transaction with incorrect state " |
| + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); |
| } |
| |
| assert !threadMap.containsValue(tx) : tx; |
| assert !haveSystemTxForThread(Thread.currentThread().getId()); |
| |
| if (threadMap.putIfAbsent(threadId, tx) != null) |
| throw new IgniteCheckedException("Thread already has started a transaction."); |
| |
| tx.threadId(threadId); |
| } |
| |
| /** |
| * @param threadId Thread id. |
| * @return True if thread have system transaction. False otherwise. |
| */ |
| private boolean haveSystemTxForThread(long threadId) { |
| if (!sysThreadMap.isEmpty()) { |
| for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { |
| if (!cacheCtx.systemTx()) |
| continue; |
| |
| if (sysThreadMap.containsKey(new TxThreadKey(threadId, cacheCtx.cacheId()))) |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @return True if {@link TxRecord} records should be logged to WAL. |
| */ |
| public boolean logTxRecords() { |
| return logTxRecords; |
| } |
| |
| /** |
| * Sets MVCC state. |
| * |
| * @param tx Transaction. |
| * @param state New state. |
| */ |
| public void setMvccState(IgniteInternalTx tx, byte state) { |
| if (cctx.kernalContext().clientNode() || tx.mvccSnapshot() == null || tx.near() && !tx.local()) |
| return; |
| |
| cctx.database().checkpointReadLock(); |
| |
| try { |
| cctx.coordinators().updateState(tx.mvccSnapshot(), state, tx.local()); |
| } |
| finally { |
| cctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * Finishes MVCC transaction. |
| * @param tx Transaction. |
| */ |
| public void mvccFinish(IgniteTxAdapter tx) { |
| if (cctx.kernalContext().clientNode() || tx.mvccSnapshot == null || !tx.local()) |
| return; |
| |
| cctx.coordinators().releaseWaiters(tx.mvccSnapshot); |
| } |
| |
| /** |
| * Logs Tx state to WAL if needed. |
| * |
| * @param tx Transaction. |
| * @return WALPointer or {@code null} if nothing was logged. |
| */ |
| @Nullable WALPointer logTxRecord(IgniteTxAdapter tx) { |
| BaselineTopology baselineTop; |
| |
| // Log tx state change to WAL. |
| if (cctx.wal() == null |
| || (!logTxRecords && !tx.txState().mvccEnabled()) |
| || (baselineTop = cctx.kernalContext().state().clusterState().baselineTopology()) == null |
| || !baselineTop.consistentIds().contains(cctx.localNode().consistentId())) |
| return null; |
| |
| Map<Short, Collection<Short>> nodes = tx.consistentIdMapper.mapToCompactIds(tx.topVer, tx.txNodes, baselineTop); |
| |
| TxRecord record; |
| |
| if (tx.txState().mvccEnabled()) |
| record = new MvccTxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes, tx.mvccSnapshot()); |
| else |
| record = new TxRecord(tx.state(), tx.nearXidVersion(), tx.writeVersion(), nodes); |
| |
| try { |
| return cctx.wal().log(record); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to log TxRecord: " + record, e); |
| |
| throw new IgniteException("Failed to log TxRecord: " + record, e); |
| } |
| } |
| |
| /** |
| * Setting (for all nodes) a timeout (in millis) for printing long-running |
| * transactions as well as transactions that cannot receive locks for all |
| * their keys for a long time. Set less than or equal {@code 0} to disable. |
| * |
| * @param longOpsDumpTimeout Long operations dump timeout. |
| */ |
| public void longOperationsDumpTimeoutDistributed(long longOpsDumpTimeout) { |
| broadcastToNodesSupportingFeature( |
| new LongOperationsDumpSettingsClosure(longOpsDumpTimeout), |
| LONG_OPERATIONS_DUMP_TIMEOUT |
| ); |
| } |
| |
| /** |
| * Broadcasts given job to nodes that support ignite feature. |
| * |
| * @param job Ignite job. |
| * @param feature Ignite feature. |
| */ |
| private void broadcastToNodesSupportingFeature(IgniteRunnable job, IgniteFeatures feature) { |
| ClusterGroup grp = cctx.kernalContext().grid() |
| .cluster() |
| .forPredicate(node -> IgniteFeatures.nodeSupports(node, feature)); |
| |
| IgniteCompute compute = cctx.kernalContext().grid().compute(grp); |
| |
| compute.broadcast(job); |
| } |
| |
| /** |
| * Transactions recovery initialization runnable. |
| */ |
| private final class TxRecoveryInitRunnable implements Runnable { |
| /** */ |
| private final ClusterNode node; |
| |
| /** */ |
| private final MvccCoordinator mvccCrd; |
| |
| /** |
| * @param node Failed node. |
| * @param mvccCrd Mvcc coordinator at time of node failure. |
| */ |
| private TxRecoveryInitRunnable(ClusterNode node, MvccCoordinator mvccCrd) { |
| this.node = node; |
| this.mvccCrd = mvccCrd; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| cctx.kernalContext().gateway().readLock(); |
| } |
| catch (IllegalStateException | IgniteClientDisconnectedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to acquire kernal gateway [err=" + e + ']'); |
| |
| return; |
| } |
| |
| UUID evtNodeId = node.id(); |
| |
| try { |
| if (log.isDebugEnabled()) |
| log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() + |
| ", failedNodeId=" + evtNodeId + ']'); |
| |
| // Null means that recovery voting is not needed. |
| GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = |
| node.isClient() && mvccCrd != null && mvccCrd.nodeId() != null |
| ? new GridCompoundFuture<>() : null; |
| |
| for (final IgniteInternalTx tx : activeTransactions()) { |
| if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { |
| // Invalidate transactions. |
| salvageTx(tx, RECOVERY_FINISH); |
| } |
| else { |
| // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. |
| if (tx.originatingNodeId().equals(evtNodeId)) { |
| if (tx.state() == PREPARED) |
| commitIfPrepared(tx, Collections.singleton(evtNodeId)); |
| else { |
| IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture(); |
| |
| if (prepFut != null) { |
| prepFut.listen(fut -> { |
| if (tx.state() == PREPARED) |
| commitIfPrepared(tx, Collections.singleton(evtNodeId)); |
| // If we could not mark tx as rollback, it means that transaction is being committed. |
| else if (tx.setRollbackOnly()) |
| tx.rollbackAsync(); |
| }); |
| } |
| // If we could not mark tx as rollback, it means that transaction is being committed. |
| else if (tx.setRollbackOnly()) |
| tx.rollbackAsync(); |
| } |
| } |
| |
| // Await only mvcc transactions initiated by failed client node. |
| if (allTxFinFut != null && tx.eventNodeId().equals(evtNodeId) |
| && tx.mvccSnapshot() != null) |
| allTxFinFut.add(tx.finishFuture()); |
| } |
| } |
| |
| if (allTxFinFut == null) |
| return; |
| |
| allTxFinFut.markInitialized(); |
| |
| // Send vote to mvcc coordinator when all recovering transactions have finished. |
| allTxFinFut.listen(fut -> { |
| // If mvcc coordinator issued snapshot for recovering transaction has failed during recovery, |
| // then there is no need to send messages to new coordinator. |
| try { |
| cctx.kernalContext().io().sendToGridTopic( |
| mvccCrd.nodeId(), |
| TOPIC_CACHE_COORDINATOR, |
| new MvccRecoveryFinishedMessage(evtNodeId), |
| SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isInfoEnabled()) |
| log.info("Mvcc coordinator issued snapshots for recovering transactions " + |
| "has left the cluster (will ignore) [locNodeId=" + cctx.localNodeId() + |
| ", failedNodeId=" + evtNodeId + |
| ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| log.warning("Failed to notify mvcc coordinator that all recovering transactions were " + |
| "finished [locNodeId=" + cctx.localNodeId() + |
| ", failedNodeId=" + evtNodeId + |
| ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']', e); |
| } |
| }); |
| } |
| finally { |
| cctx.kernalContext().gateway().readUnlock(); |
| } |
| } |
| } |
| |
| /** |
| * Per-thread key for system transactions. |
| */ |
| private static class TxThreadKey { |
| /** Thread ID. */ |
| private long threadId; |
| |
| /** Cache ID. */ |
| private int cacheId; |
| |
| /** |
| * @param threadId Thread ID. |
| * @param cacheId Cache ID. |
| */ |
| private TxThreadKey(long threadId, int cacheId) { |
| this.threadId = threadId; |
| this.cacheId = cacheId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (!(o instanceof TxThreadKey)) |
| return false; |
| |
| TxThreadKey that = (TxThreadKey)o; |
| |
| return cacheId == that.cacheId && threadId == that.threadId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| int res = (int)(threadId ^ (threadId >>> 32)); |
| |
| res = 31 * res + cacheId; |
| |
| return res; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CommittedVersion extends GridCacheVersion { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Corresponding near version. Transient. */ |
| private GridCacheVersion nearVer; |
| |
| /** |
| * Empty constructor required by {@link Externalizable}. |
| */ |
| public CommittedVersion() { |
| // No-op. |
| } |
| |
| /** |
| * @param ver Committed version. |
| * @param nearVer Near transaction version. |
| */ |
| private CommittedVersion(GridCacheVersion ver, GridCacheVersion nearVer) { |
| super(ver.topologyVersion(), ver.order(), ver.nodeOrder(), ver.dataCenterId()); |
| |
| assert nearVer != null; |
| |
| this.nearVer = nearVer; |
| } |
| } |
| |
| /** |
| * Commit listener. Checks if commit succeeded and rollbacks if case of error. |
| */ |
| private class CommitListener implements CI1<IgniteInternalFuture<IgniteInternalTx>> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Transaction. */ |
| private final IgniteInternalTx tx; |
| |
| /** |
| * @param tx Transaction. |
| */ |
| private CommitListener(IgniteInternalTx tx) { |
| this.tx = tx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void apply(IgniteInternalFuture<IgniteInternalTx> t) { |
| try { |
| t.get(); |
| } |
| catch (IgniteTxOptimisticCheckedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Optimistic failure while committing prepared transaction (will rollback): " + |
| tx); |
| |
| try { |
| tx.rollbackAsync(); |
| } |
| catch (Throwable e) { |
| U.error(log, "Failed to automatically rollback transaction: " + tx, e); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to commit transaction during failover: " + tx, e); |
| } |
| } |
| } |
| |
| /** |
| * Transactions deadlock detection process message listener. |
| */ |
| private class DeadlockDetectionListener implements GridMessageListener { |
| /** {@inheritDoc} */ |
| @Override public void onMessage(UUID nodeId, Object msg, byte plc) { |
| GridCacheMessage cacheMsg = (GridCacheMessage)msg; |
| |
| Throwable err = null; |
| |
| try { |
| unmarshall(nodeId, cacheMsg); |
| } |
| catch (Exception e) { |
| err = e; |
| } |
| |
| if (err != null || cacheMsg.classError() != null) { |
| try { |
| processFailedMessage(nodeId, cacheMsg, err); |
| } |
| catch(Throwable e){ |
| U.error(log, "Failed to process message [senderId=" + nodeId + |
| ", messageType=" + cacheMsg.getClass() + ']', e); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Message received [locNodeId=" + cctx.localNodeId() + |
| ", rmtNodeId=" + nodeId + ", msg=" + msg + ']'); |
| |
| if (msg instanceof TxLocksRequest) { |
| TxLocksRequest req = (TxLocksRequest)msg; |
| |
| TxLocksResponse res = txLocksInfo(req.txKeys()); |
| |
| res.futureId(req.futureId()); |
| |
| try { |
| if (!cctx.localNodeId().equals(nodeId)) |
| res.prepareMarshal(cctx); |
| |
| cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send response, node failed: " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e); |
| } |
| } |
| else if (msg instanceof TxLocksResponse) { |
| TxLocksResponse res = (TxLocksResponse)msg; |
| |
| long futId = res.futureId(); |
| |
| TxDeadlockFuture fut = future(futId); |
| |
| if (fut != null) |
| fut.onResult(nodeId, res); |
| else |
| U.warn(log, "Unexpected response received " + res); |
| } |
| else |
| throw new IllegalArgumentException("Unknown message [msg=" + msg + ']'); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msg Message. |
| */ |
| private void processFailedMessage(UUID nodeId, GridCacheMessage msg, Throwable err) throws IgniteCheckedException { |
| switch (msg.directType()) { |
| case -24: { |
| TxLocksRequest req = (TxLocksRequest)msg; |
| |
| TxLocksResponse res = new TxLocksResponse(); |
| |
| res.futureId(req.futureId()); |
| |
| try { |
| cctx.gridIO().sendToGridTopic(nodeId, TOPIC_TX, res, SYSTEM_POOL); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send response, node failed: " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + |
| ", res=" + res + ']', e); |
| } |
| } |
| |
| break; |
| |
| case -23: { |
| TxLocksResponse res = (TxLocksResponse)msg; |
| |
| TxDeadlockFuture fut = future(res.futureId()); |
| |
| if (fut == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to find future for response [sender=" + nodeId + ", res=" + res + ']'); |
| |
| return; |
| } |
| |
| if (err == null) |
| fut.onResult(nodeId, res); |
| else |
| fut.onDone(null, err); |
| } |
| |
| break; |
| |
| default: |
| throw new IgniteCheckedException("Failed to process message. Unsupported direct type [msg=" + |
| msg + ']', msg.classError()); |
| } |
| |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param cacheMsg Message. |
| */ |
| private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) { |
| if (cctx.localNodeId().equals(nodeId)) |
| return; |
| |
| try { |
| cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader()); |
| } |
| catch (IgniteCheckedException e) { |
| cacheMsg.onClassError(e); |
| } |
| catch (BinaryObjectException e) { |
| cacheMsg.onClassError(new IgniteCheckedException(e)); |
| } |
| catch (Error e) { |
| if (cacheMsg.ignoreClassErrors() && |
| X.hasCause(e, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) { |
| cacheMsg.onClassError( |
| new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e) |
| ); |
| } |
| else |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * This class is used to store information about transaction time dump throttling. |
| */ |
| public class TxDumpsThrottling { |
| /** */ |
| private AtomicInteger skippedTxCntr = new AtomicInteger(); |
| |
| /** */ |
| private HitRateMetric transactionHitRateCntr = new HitRateMetric("transactionHitRateCounter", null, 1000, 2); |
| |
| /** |
| * Returns should we skip dumping the transaction in current moment. |
| */ |
| public boolean skipCurrent() { |
| boolean res = transactionHitRateCntr.value() >= transactionTimeDumpSamplesPerSecondLimit(); |
| |
| if (!res) { |
| int skipped = skippedTxCntr.getAndSet(0); |
| |
| //we should not log info about skipped dumps if skippedTxCounter was reset concurrently |
| if (skipped > 0) |
| log.info("Transaction time dumps skipped because of log throttling: " + skipped); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Should be called when we dump transaction to log. |
| */ |
| public void dump() { |
| transactionHitRateCntr.increment(); |
| } |
| |
| /** |
| * Should be called when we skip transaction which we could dump to log because of throttling. |
| */ |
| public void skip() { |
| skippedTxCntr.incrementAndGet(); |
| } |
| } |
| } |