| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.tx.impl; |
| |
| import static java.util.concurrent.CompletableFuture.allOf; |
| import static java.util.concurrent.CompletableFuture.failedFuture; |
| import static java.util.concurrent.CompletableFuture.runAsync; |
| import static java.util.concurrent.CompletableFuture.supplyAsync; |
| import static java.util.function.Function.identity; |
| import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; |
| import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; |
| import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; |
| import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp; |
| import static org.apache.ignite.internal.tx.TxState.ABORTED; |
| import static org.apache.ignite.internal.tx.TxState.COMMITTED; |
| import static org.apache.ignite.internal.tx.TxState.FINISHING; |
| import static org.apache.ignite.internal.tx.TxState.isFinalState; |
| import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; |
| import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; |
| import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; |
| import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync; |
| import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; |
| import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentNavigableMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ForkJoinPool; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.LongSupplier; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.internal.event.EventListener; |
| import org.apache.ignite.internal.hlc.ClockService; |
| import org.apache.ignite.internal.hlc.HybridTimestamp; |
| import org.apache.ignite.internal.lang.IgniteBiTuple; |
| import org.apache.ignite.internal.lang.IgniteInternalException; |
| import org.apache.ignite.internal.logger.IgniteLogger; |
| import org.apache.ignite.internal.logger.Loggers; |
| import org.apache.ignite.internal.lowwatermark.LowWatermark; |
| import org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters; |
| import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent; |
| import org.apache.ignite.internal.network.ClusterService; |
| import org.apache.ignite.internal.network.MessagingService; |
| import org.apache.ignite.internal.network.NetworkMessage; |
| import org.apache.ignite.internal.network.NetworkMessageHandler; |
| import org.apache.ignite.internal.placementdriver.PlacementDriver; |
| import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; |
| import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; |
| import org.apache.ignite.internal.replicator.ReplicaService; |
| import org.apache.ignite.internal.replicator.ReplicationGroupId; |
| import org.apache.ignite.internal.replicator.TablePartitionId; |
| import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; |
| import org.apache.ignite.internal.replicator.exception.ReplicationException; |
| import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; |
| import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse; |
| import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; |
| import org.apache.ignite.internal.replicator.message.ReplicaResponse; |
| import org.apache.ignite.internal.thread.IgniteThreadFactory; |
| import org.apache.ignite.internal.tx.HybridTimestampTracker; |
| import org.apache.ignite.internal.tx.InternalTransaction; |
| import org.apache.ignite.internal.tx.LocalRwTxCounter; |
| import org.apache.ignite.internal.tx.LockManager; |
| import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException; |
| import org.apache.ignite.internal.tx.TransactionMeta; |
| import org.apache.ignite.internal.tx.TransactionResult; |
| import org.apache.ignite.internal.tx.TxManager; |
| import org.apache.ignite.internal.tx.TxPriority; |
| import org.apache.ignite.internal.tx.TxState; |
| import org.apache.ignite.internal.tx.TxStateMeta; |
| import org.apache.ignite.internal.tx.TxStateMetaFinishing; |
| import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; |
| import org.apache.ignite.internal.tx.impl.TransactionInflights.ReadWriteTxContext; |
| import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo; |
| import org.apache.ignite.internal.util.CompletableFutures; |
| import org.apache.ignite.internal.util.ExceptionUtils; |
| import org.apache.ignite.internal.util.IgniteSpinBusyLock; |
| import org.apache.ignite.network.ClusterNode; |
| import org.apache.ignite.network.TopologyService; |
| import org.jetbrains.annotations.Nullable; |
| import org.jetbrains.annotations.TestOnly; |
| |
| /** |
| * A transaction manager implementation. |
| * |
| * <p>Uses 2PC for atomic commitment and 2PL for concurrency control. |
| */ |
| public class TxManagerImpl implements TxManager, NetworkMessageHandler { |
| /** The logger. */ |
| private static final IgniteLogger LOG = Loggers.forClass(TxManagerImpl.class); |
| |
| /** Transaction configuration. */ |
| private final TransactionConfiguration txConfig; |
| |
| /** Lock manager. */ |
| private final LockManager lockManager; |
| |
| /** Executor that runs async write intent switch actions. */ |
| private final ExecutorService writeIntentSwitchPool; |
| |
| private final ClockService clockService; |
| |
| /** Generates transaction IDs. */ |
| private final TransactionIdGenerator transactionIdGenerator; |
| |
| /** The local state storage. */ |
| private final VolatileTxStateMetaStorage txStateVolatileStorage = new VolatileTxStateMetaStorage(); |
| |
| /** Future of a read-only transaction by it {@link TxIdAndTimestamp}. */ |
| private final ConcurrentNavigableMap<TxIdAndTimestamp, CompletableFuture<Void>> readOnlyTxFutureById = new ConcurrentSkipListMap<>( |
| Comparator.comparing(TxIdAndTimestamp::getReadTimestamp).thenComparing(TxIdAndTimestamp::getTxId) |
| ); |
| |
| /** |
| * Low watermark value, does not allow creating read-only transactions less than or equal to this value, {@code null} means it has never |
| * been updated yet. |
| */ |
| private final AtomicReference<HybridTimestamp> lowWatermarkValueReference = new AtomicReference<>(); |
| |
| /** Low watermark. */ |
| private final LowWatermark lowWatermark; |
| |
| private final PlacementDriver placementDriver; |
| |
| private final PlacementDriverHelper placementDriverHelper; |
| |
| private final LongSupplier idleSafeTimePropagationPeriodMsSupplier; |
| |
| /** Prevents double stopping of the tracker. */ |
| private final AtomicBoolean stopGuard = new AtomicBoolean(); |
| |
| /** |
| * Total number of started transaction. |
| * TODO: IGNITE-21440 Implement transaction metrics. |
| */ |
| private final AtomicInteger startedTxs = new AtomicInteger(); |
| |
| /** |
| * Total number of finished transaction. |
| * TODO: IGNITE-21440 Implement transaction metrics. |
| */ |
| private final AtomicInteger finishedTxs = new AtomicInteger(); |
| |
| |
| /** Busy lock to stop synchronously. */ |
| private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); |
| |
| /** Detector of transactions that lost the coordinator. */ |
| private final OrphanDetector orphanDetector; |
| |
| /** Topology service. */ |
| private final TopologyService topologyService; |
| |
| /** Messaging service. */ |
| private final MessagingService messagingService; |
| |
| /** Local node network identity. This id is available only after the network has started. */ |
| private String localNodeId; |
| |
| /** Server cleanup processor. */ |
| private final TxCleanupRequestHandler txCleanupRequestHandler; |
| |
| /** Cleanup request sender. */ |
| private final TxCleanupRequestSender txCleanupRequestSender; |
| |
| /** Transaction message sender. */ |
| private final TxMessageSender txMessageSender; |
| |
| private final EventListener<PrimaryReplicaEventParameters> primaryReplicaExpiredListener; |
| |
| private final EventListener<PrimaryReplicaEventParameters> primaryReplicaElectedListener; |
| |
| private final EventListener<ChangeLowWatermarkEventParameters> lowWatermarkChangedListener = this::onLwnChanged; |
| |
| /** Counter of read-write transactions that were created and completed locally on the node. */ |
| private final LocalRwTxCounter localRwTxCounter; |
| |
| private final Executor partitionOperationsExecutor; |
| |
| private final TransactionInflights transactionInflights; |
| |
| /** |
| * Test-only constructor. |
| * |
| * @param txConfig Transaction configuration. |
| * @param clusterService Cluster service. |
| * @param replicaService Replica service. |
| * @param lockManager Lock manager. |
| * @param clockService Clock service. |
| * @param transactionIdGenerator Used to generate transaction IDs. |
| * @param placementDriver Placement driver. |
| * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms. |
| * @param localRwTxCounter Counter of read-write transactions that were created and completed locally on the node. |
| * @param resourcesRegistry Resources registry. |
| * @param transactionInflights Transaction inflights. |
| * @param lowWatermark Low watermark. |
| */ |
| @TestOnly |
| public TxManagerImpl( |
| TransactionConfiguration txConfig, |
| ClusterService clusterService, |
| ReplicaService replicaService, |
| LockManager lockManager, |
| ClockService clockService, |
| TransactionIdGenerator transactionIdGenerator, |
| PlacementDriver placementDriver, |
| LongSupplier idleSafeTimePropagationPeriodMsSupplier, |
| LocalRwTxCounter localRwTxCounter, |
| RemotelyTriggeredResourceRegistry resourcesRegistry, |
| TransactionInflights transactionInflights, |
| LowWatermark lowWatermark |
| ) { |
| this( |
| clusterService.nodeName(), |
| txConfig, |
| clusterService.messagingService(), |
| clusterService.topologyService(), |
| replicaService, |
| lockManager, |
| clockService, |
| transactionIdGenerator, |
| placementDriver, |
| idleSafeTimePropagationPeriodMsSupplier, |
| localRwTxCounter, |
| ForkJoinPool.commonPool(), |
| resourcesRegistry, |
| transactionInflights, |
| lowWatermark |
| ); |
| } |
| |
| /** |
| * The constructor. |
| * |
| * @param txConfig Transaction configuration. |
| * @param messagingService Messaging service. |
| * @param topologyService Topology service. |
| * @param replicaService Replica service. |
| * @param lockManager Lock manager. |
| * @param clockService Clock service. |
| * @param transactionIdGenerator Used to generate transaction IDs. |
| * @param placementDriver Placement driver. |
| * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe time propagation period in ms. |
| * @param localRwTxCounter Counter of read-write transactions that were created and completed locally on the node. |
| * @param partitionOperationsExecutor Executor on which partition operations will be executed, if needed. |
| * @param resourcesRegistry Resources registry. |
| * @param transactionInflights Transaction inflights. |
| * @param lowWatermark Low watermark. |
| */ |
| public TxManagerImpl( |
| String nodeName, |
| TransactionConfiguration txConfig, |
| MessagingService messagingService, |
| TopologyService topologyService, |
| ReplicaService replicaService, |
| LockManager lockManager, |
| ClockService clockService, |
| TransactionIdGenerator transactionIdGenerator, |
| PlacementDriver placementDriver, |
| LongSupplier idleSafeTimePropagationPeriodMsSupplier, |
| LocalRwTxCounter localRwTxCounter, |
| Executor partitionOperationsExecutor, |
| RemotelyTriggeredResourceRegistry resourcesRegistry, |
| TransactionInflights transactionInflights, |
| LowWatermark lowWatermark |
| ) { |
| this.txConfig = txConfig; |
| this.lockManager = lockManager; |
| this.clockService = clockService; |
| this.transactionIdGenerator = transactionIdGenerator; |
| this.placementDriver = placementDriver; |
| this.idleSafeTimePropagationPeriodMsSupplier = idleSafeTimePropagationPeriodMsSupplier; |
| this.topologyService = topologyService; |
| this.messagingService = messagingService; |
| this.primaryReplicaExpiredListener = this::primaryReplicaExpiredListener; |
| this.primaryReplicaElectedListener = this::primaryReplicaElectedListener; |
| this.localRwTxCounter = localRwTxCounter; |
| this.partitionOperationsExecutor = partitionOperationsExecutor; |
| this.transactionInflights = transactionInflights; |
| this.lowWatermark = lowWatermark; |
| |
| placementDriverHelper = new PlacementDriverHelper(placementDriver, clockService); |
| |
| int cpus = Runtime.getRuntime().availableProcessors(); |
| |
| writeIntentSwitchPool = new ThreadPoolExecutor( |
| cpus, |
| cpus, |
| 100, |
| TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<>(), |
| IgniteThreadFactory.create(nodeName, "tx-async-write-intent", LOG, STORAGE_READ, STORAGE_WRITE) |
| ); |
| |
| orphanDetector = new OrphanDetector(topologyService, replicaService, placementDriverHelper, lockManager); |
| |
| txMessageSender = new TxMessageSender(messagingService, replicaService, clockService, txConfig); |
| |
| var writeIntentSwitchProcessor = new WriteIntentSwitchProcessor(placementDriverHelper, txMessageSender, topologyService); |
| |
| txCleanupRequestHandler = new TxCleanupRequestHandler( |
| messagingService, |
| lockManager, |
| clockService, |
| writeIntentSwitchProcessor, |
| resourcesRegistry |
| ); |
| |
| txCleanupRequestSender = |
| new TxCleanupRequestSender(txMessageSender, placementDriverHelper, txStateVolatileStorage); |
| } |
| |
| private CompletableFuture<Boolean> primaryReplicaEventListener( |
| PrimaryReplicaEventParameters eventParameters, |
| Consumer<TablePartitionId> action |
| ) { |
| return inBusyLock(busyLock, () -> { |
| if (!(eventParameters.groupId() instanceof TablePartitionId)) { |
| return falseCompletedFuture(); |
| } |
| |
| TablePartitionId groupId = (TablePartitionId) eventParameters.groupId(); |
| |
| action.accept(groupId); |
| |
| return falseCompletedFuture(); |
| }); |
| } |
| |
| private CompletableFuture<Boolean> primaryReplicaElectedListener(PrimaryReplicaEventParameters eventParameters) { |
| return primaryReplicaEventListener(eventParameters, groupId -> { |
| String localNodeName = topologyService.localMember().name(); |
| |
| txMessageSender.sendRecoveryCleanup(localNodeName, groupId); |
| }); |
| } |
| |
| private CompletableFuture<Boolean> primaryReplicaExpiredListener(PrimaryReplicaEventParameters eventParameters) { |
| return primaryReplicaEventListener(eventParameters, transactionInflights::cancelWaitingInflights); |
| } |
| |
| @Override |
| public InternalTransaction begin(HybridTimestampTracker timestampTracker) { |
| return begin(timestampTracker, false, TxPriority.NORMAL); |
| } |
| |
| @Override |
| public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean readOnly) { |
| return begin(timestampTracker, readOnly, TxPriority.NORMAL); |
| } |
| |
| @Override |
| public InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean readOnly, TxPriority priority) { |
| HybridTimestamp beginTimestamp = readOnly ? clockService.now() : createBeginTimestampWithIncrementRwTxCounter(); |
| UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp, priority); |
| |
| startedTxs.incrementAndGet(); |
| |
| if (!readOnly) { |
| txStateVolatileStorage.initialize(txId, localNodeId); |
| |
| return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId); |
| } |
| |
| HybridTimestamp observableTimestamp = timestampTracker.get(); |
| |
| HybridTimestamp readTimestamp = observableTimestamp != null |
| ? HybridTimestamp.max(observableTimestamp, currentReadTimestamp(beginTimestamp)) |
| : currentReadTimestamp(beginTimestamp); |
| |
| TxIdAndTimestamp txIdAndTimestamp = new TxIdAndTimestamp(readTimestamp, txId); |
| |
| CompletableFuture<Void> txFuture = new CompletableFuture<>(); |
| |
| CompletableFuture<Void> oldFuture = readOnlyTxFutureById.put(txIdAndTimestamp, txFuture); |
| assert oldFuture == null : "previous transaction has not completed yet: " + txIdAndTimestamp; |
| |
| HybridTimestamp lowWatermark = this.lowWatermarkValueReference.get(); |
| |
| if (lowWatermark != null && readTimestamp.compareTo(lowWatermark) <= 0) { |
| // "updateLowWatermark" method updates "this.lowWatermark" field, and only then scans "this.readOnlyTxFutureById" for old |
| // transactions to wait. In order for that code to work safely, we have to make sure that no "too old" transactions will be |
| // created here in "begin" method after "this.lowWatermark" is already updated. The simplest way to achieve that is to check |
| // LW after we add transaction to the map (adding transaction to the map before reading LW value, of course). |
| readOnlyTxFutureById.remove(txIdAndTimestamp); |
| |
| // Completing the future is necessary, because "updateLowWatermark" method may already wait for it if race condition happened. |
| txFuture.complete(null); |
| |
| throw new IgniteInternalException( |
| TX_READ_ONLY_TOO_OLD_ERR, |
| "Timestamp of read-only transaction must be greater than the low watermark: [txTimestamp={}, lowWatermark={}]", |
| readTimestamp, |
| lowWatermark |
| ); |
| } |
| |
| return new ReadOnlyTransactionImpl(this, timestampTracker, txId, localNodeId, readTimestamp); |
| } |
| |
| /** |
| * Current read timestamp, for calculation of read timestamp of read-only transactions. |
| * |
| * @param beginTx Begin transaction timestamp. |
| * @return Current read timestamp. |
| */ |
| private HybridTimestamp currentReadTimestamp(HybridTimestamp beginTx) { |
| return beginTx.subtractPhysicalTime( |
| idleSafeTimePropagationPeriodMsSupplier.getAsLong() + clockService.maxClockSkewMillis() |
| ); |
| } |
| |
| @Override |
| public @Nullable TxStateMeta stateMeta(UUID txId) { |
| return inBusyLock(busyLock, () -> txStateVolatileStorage.state(txId)); |
| } |
| |
| @Override |
| public @Nullable <T extends TxStateMeta> T updateTxMeta(UUID txId, Function<@Nullable TxStateMeta, TxStateMeta> updater) { |
| return txStateVolatileStorage.updateMeta(txId, updater); |
| } |
| |
| @Override |
| public void finishFull(HybridTimestampTracker timestampTracker, UUID txId, boolean commit) { |
| TxState finalState; |
| |
| finishedTxs.incrementAndGet(); |
| |
| if (commit) { |
| timestampTracker.update(clockService.now()); |
| |
| finalState = COMMITTED; |
| } else { |
| finalState = ABORTED; |
| } |
| |
| updateTxMeta(txId, old -> |
| new TxStateMeta( |
| finalState, |
| old == null ? null : old.txCoordinatorId(), |
| old == null ? null : old.commitPartitionId(), |
| old == null ? null : old.commitTimestamp() |
| )); |
| |
| decrementRwTxCount(txId); |
| } |
| |
| private @Nullable HybridTimestamp commitTimestamp(boolean commit) { |
| return commit ? clockService.now() : null; |
| } |
| |
| @Override |
| public CompletableFuture<Void> finish( |
| HybridTimestampTracker observableTimestampTracker, |
| TablePartitionId commitPartition, |
| boolean commitIntent, |
| Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, |
| UUID txId |
| ) { |
| LOG.debug("Finish [commit={}, txId={}, groups={}].", commitIntent, txId, enlistedGroups); |
| |
| finishedTxs.incrementAndGet(); |
| |
| assert enlistedGroups != null; |
| |
| if (enlistedGroups.isEmpty()) { |
| // If there are no enlisted groups, just update local state - we already marked the tx as finished. |
| updateTxMeta(txId, old -> new TxStateMeta( |
| commitIntent ? COMMITTED : ABORTED, localNodeId, commitPartition, commitTimestamp(commitIntent) |
| )); |
| |
| decrementRwTxCount(txId); |
| |
| return nullCompletedFuture(); |
| } |
| |
| // Here we put finishing state meta into the local map, so that all concurrent operations trying to read tx state |
| // with using read timestamp could see that this transaction is finishing (e.g. see TransactionStateResolver#processTxStateRequest). |
| // None of them are now able to update node's clock with read timestamp and we can create the commit timestamp that is greater |
| // than all the read timestamps processed before. |
| // Every concurrent operation will now use a finish future from the finishing state meta and get only final transaction |
| // state after the transaction is finished. |
| |
| // First we check the current tx state to guarantee txFinish idempotence. |
| TxStateMeta txMeta = stateMeta(txId); |
| |
| TxStateMetaFinishing finishingStateMeta = |
| txMeta == null |
| ? new TxStateMetaFinishing(null, commitPartition) |
| : txMeta.finishing(); |
| |
| TxStateMeta stateMeta = updateTxMeta(txId, oldMeta -> finishingStateMeta); |
| |
| // Means we failed to CAS the state, someone else did it. |
| if (finishingStateMeta != stateMeta) { |
| // If the state is FINISHING then someone else hase in in the middle of finishing this tx. |
| if (stateMeta.txState() == FINISHING) { |
| return ((TxStateMetaFinishing) stateMeta).txFinishFuture() |
| .thenCompose(meta -> checkTxOutcome(commitIntent, txId, meta)); |
| } else { |
| // The TX has already been finished. Check whether it finished with the same outcome. |
| return checkTxOutcome(commitIntent, txId, stateMeta); |
| } |
| } |
| |
| ReadWriteTxContext txContext = transactionInflights.lockTxForNewUpdates(txId, enlistedGroups); |
| |
| // Wait for commit acks first, then proceed with the finish request. |
| return txContext.performFinish(commitIntent, commit -> |
| prepareFinish( |
| observableTimestampTracker, |
| commitPartition, |
| commit, |
| enlistedGroups, |
| txId, |
| finishingStateMeta.txFinishFuture() |
| ) |
| ).thenAccept(unused -> { |
| if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) { |
| decrementRwTxCount(txId); |
| } |
| }).whenComplete((unused, throwable) -> transactionInflights.removeTxContext(txId)); |
| } |
| |
| private static CompletableFuture<Void> checkTxOutcome(boolean commit, UUID txId, TransactionMeta stateMeta) { |
| if ((stateMeta.txState() == COMMITTED) == commit) { |
| return nullCompletedFuture(); |
| } |
| |
| return failedFuture(new MismatchingTransactionOutcomeException( |
| "Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + stateMeta.txState() + "].", |
| new TransactionResult(stateMeta.txState(), stateMeta.commitTimestamp())) |
| ); |
| } |
| |
| private CompletableFuture<Void> prepareFinish( |
| HybridTimestampTracker observableTimestampTracker, |
| TablePartitionId commitPartition, |
| boolean commit, |
| Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, |
| UUID txId, |
| CompletableFuture<TransactionMeta> txFinishFuture |
| ) { |
| HybridTimestamp commitTimestamp = commitTimestamp(commit); |
| // In case of commit it's required to check whether current primaries are still the same that were enlisted and whether |
| // given primaries are not expired or, in other words, whether commitTimestamp is less or equal to the enlisted primaries |
| // expiration timestamps. |
| CompletableFuture<Void> verificationFuture = |
| commit ? verifyCommitTimestamp(enlistedGroups, commitTimestamp) : nullCompletedFuture(); |
| |
| return verificationFuture.handle( |
| (unused, throwable) -> { |
| boolean verifiedCommit = throwable == null && commit; |
| |
| Map<ReplicationGroupId, String> replicationGroupIds = enlistedGroups.entrySet().stream() |
| .collect(Collectors.toMap( |
| Entry::getKey, |
| entry -> entry.getValue().get1().name() |
| )); |
| |
| return durableFinish( |
| observableTimestampTracker, |
| commitPartition, |
| verifiedCommit, |
| replicationGroupIds, |
| txId, |
| commitTimestamp, |
| txFinishFuture); |
| }) |
| .thenCompose(identity()) |
| // Verification future is added in order to share the proper verification exception with the client. |
| .thenCompose(r -> verificationFuture); |
| } |
| |
| /** |
| * Durable finish request. |
| */ |
| private CompletableFuture<Void> durableFinish( |
| HybridTimestampTracker observableTimestampTracker, |
| TablePartitionId commitPartition, |
| boolean commit, |
| Map<ReplicationGroupId, String> replicationGroupIds, |
| UUID txId, |
| HybridTimestamp commitTimestamp, |
| CompletableFuture<TransactionMeta> txFinishFuture |
| ) { |
| return inBusyLockAsync(busyLock, () -> placementDriverHelper.awaitPrimaryReplicaWithExceptionHandling(commitPartition) |
| .thenCompose(meta -> |
| makeFinishRequest( |
| observableTimestampTracker, |
| commitPartition, |
| meta.getLeaseholder(), |
| meta.getStartTime().longValue(), |
| commit, |
| replicationGroupIds, |
| txId, |
| commitTimestamp, |
| txFinishFuture |
| )) |
| .handle((res, ex) -> { |
| if (ex != null) { |
| Throwable cause = ExceptionUtils.unwrapCause(ex); |
| |
| if (cause instanceof MismatchingTransactionOutcomeException) { |
| MismatchingTransactionOutcomeException transactionException = (MismatchingTransactionOutcomeException) cause; |
| |
| TransactionResult result = transactionException.transactionResult(); |
| |
| TxStateMeta updatedMeta = updateTxMeta(txId, old -> |
| new TxStateMeta( |
| result.transactionState(), |
| old == null ? null : old.txCoordinatorId(), |
| commitPartition, |
| result.commitTimestamp() |
| ) |
| ); |
| |
| txFinishFuture.complete(updatedMeta); |
| |
| return CompletableFuture.<Void>failedFuture(cause); |
| } |
| |
| if (TransactionFailureHandler.isRecoverable(cause)) { |
| LOG.warn("Failed to finish Tx. The operation will be retried [txId={}].", ex, txId); |
| |
| return supplyAsync(() -> durableFinish( |
| observableTimestampTracker, |
| commitPartition, |
| commit, |
| replicationGroupIds, |
| txId, |
| commitTimestamp, |
| txFinishFuture |
| ), partitionOperationsExecutor).thenCompose(identity()); |
| } else { |
| LOG.warn("Failed to finish Tx [txId={}].", ex, txId); |
| |
| return CompletableFuture.<Void>failedFuture(cause); |
| } |
| } |
| |
| return CompletableFutures.<Void>nullCompletedFuture(); |
| }) |
| .thenCompose(identity())); |
| } |
| |
| private CompletableFuture<Void> makeFinishRequest( |
| HybridTimestampTracker observableTimestampTracker, |
| TablePartitionId commitPartition, |
| String primaryConsistentId, |
| Long enlistmentConsistencyToken, |
| boolean commit, |
| Map<ReplicationGroupId, String> replicationGroupIds, |
| UUID txId, |
| HybridTimestamp commitTimestamp, |
| CompletableFuture<TransactionMeta> txFinishFuture |
| ) { |
| LOG.debug("Finish [partition={}, node={}, enlistmentConsistencyToken={} commit={}, txId={}, groups={}", |
| commitPartition, primaryConsistentId, enlistmentConsistencyToken, commit, txId, replicationGroupIds); |
| |
| return txMessageSender.finish( |
| primaryConsistentId, |
| commitPartition, |
| replicationGroupIds, |
| txId, |
| enlistmentConsistencyToken, |
| commit, |
| commitTimestamp |
| ) |
| .thenAccept(txResult -> { |
| validateTxFinishedAsExpected(commit, txId, txResult); |
| |
| TxStateMeta updatedMeta = updateTxMeta(txId, old -> |
| new TxStateMeta( |
| txResult.transactionState(), |
| localNodeId, |
| old == null ? null : old.commitPartitionId(), |
| txResult.commitTimestamp() |
| )); |
| |
| assert isFinalState(updatedMeta.txState()) : |
| "Unexpected transaction state [id=" + txId + ", state=" + updatedMeta.txState() + "]."; |
| |
| txFinishFuture.complete(updatedMeta); |
| |
| if (commit) { |
| observableTimestampTracker.update(commitTimestamp); |
| } |
| }); |
| } |
| |
| private static void validateTxFinishedAsExpected(boolean commit, UUID txId, TransactionResult txResult) { |
| if (commit != (txResult.transactionState() == COMMITTED)) { |
| LOG.error("Failed to finish a transaction that is already finished [txId={}, expectedState={}, actualState={}].", |
| txId, |
| commit ? COMMITTED : ABORTED, |
| txResult.transactionState() |
| ); |
| |
| throw new MismatchingTransactionOutcomeException( |
| "Failed to change the outcome of a finished transaction [txId=" + txId + ", txState=" + txResult.transactionState() |
| + "].", |
| txResult |
| ); |
| } |
| } |
| |
| @Override |
| public int finished() { |
| return finishedTxs.get(); |
| } |
| |
| @Override |
| public int pending() { |
| return startedTxs.get() - finishedTxs.get(); |
| } |
| |
| @Override |
| public CompletableFuture<Void> startAsync() { |
| return inBusyLockAsync(busyLock, () -> { |
| localNodeId = topologyService.localMember().id(); |
| |
| messagingService.addMessageHandler(ReplicaMessageGroup.class, this); |
| |
| txStateVolatileStorage.start(); |
| |
| orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs()); |
| |
| txCleanupRequestSender.start(); |
| |
| txCleanupRequestHandler.start(); |
| |
| placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, primaryReplicaExpiredListener); |
| |
| placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, primaryReplicaElectedListener); |
| |
| lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE, lowWatermarkChangedListener); |
| |
| lowWatermarkValueReference.set(lowWatermark.getLowWatermark()); |
| |
| return nullCompletedFuture(); |
| }); |
| } |
| |
| @Override |
| public void beforeNodeStop() { |
| orphanDetector.stop(); |
| } |
| |
| @Override |
| public CompletableFuture<Void> stopAsync() { |
| if (!stopGuard.compareAndSet(false, true)) { |
| return nullCompletedFuture(); |
| } |
| |
| busyLock.block(); |
| |
| txStateVolatileStorage.stop(); |
| |
| txCleanupRequestHandler.stop(); |
| |
| placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, primaryReplicaExpiredListener); |
| |
| placementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, primaryReplicaElectedListener); |
| |
| lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_BEFORE_CHANGE, lowWatermarkChangedListener); |
| |
| shutdownAndAwaitTermination(writeIntentSwitchPool, 10, TimeUnit.SECONDS); |
| |
| return nullCompletedFuture(); |
| } |
| |
| @Override |
| public LockManager lockManager() { |
| return lockManager; |
| } |
| |
| @Override |
| public CompletableFuture<Void> cleanup( |
| Map<TablePartitionId, String> enlistedPartitions, |
| boolean commit, |
| @Nullable HybridTimestamp commitTimestamp, |
| UUID txId |
| ) { |
| return txCleanupRequestSender.cleanup(enlistedPartitions, commit, commitTimestamp, txId); |
| } |
| |
| @Override |
| public CompletableFuture<Void> cleanup( |
| Collection<TablePartitionId> enlistedPartitions, |
| boolean commit, |
| @Nullable HybridTimestamp commitTimestamp, |
| UUID txId |
| ) { |
| return txCleanupRequestSender.cleanup(enlistedPartitions, commit, commitTimestamp, txId); |
| } |
| |
| @Override |
| public CompletableFuture<Void> cleanup(String node, UUID txId) { |
| return txCleanupRequestSender.cleanup(node, txId); |
| } |
| |
| @Override |
| public void vacuum() { |
| long vacuumObservationTimestamp = System.currentTimeMillis(); |
| |
| txStateVolatileStorage.vacuum(vacuumObservationTimestamp, txConfig.txnResourceTtl().value()); |
| } |
| |
| @Override |
| public CompletableFuture<Void> executeWriteIntentSwitchAsync(Runnable runnable) { |
| return runAsync(runnable, writeIntentSwitchPool); |
| } |
| |
| CompletableFuture<Void> completeReadOnlyTransactionFuture(TxIdAndTimestamp txIdAndTimestamp) { |
| finishedTxs.incrementAndGet(); |
| |
| CompletableFuture<Void> readOnlyTxFuture = readOnlyTxFutureById.remove(txIdAndTimestamp); |
| |
| assert readOnlyTxFuture != null : txIdAndTimestamp; |
| |
| readOnlyTxFuture.complete(null); |
| |
| UUID txId = txIdAndTimestamp.getTxId(); |
| |
| transactionInflights.markReadOnlyTxFinished(txId); |
| |
| return readOnlyTxFuture; |
| } |
| |
| private CompletableFuture<Boolean> onLwnChanged(ChangeLowWatermarkEventParameters parameters) { |
| return inBusyLockAsync(busyLock, () -> { |
| HybridTimestamp newLowWatermark = parameters.newLowWatermark(); |
| |
| increaseLowWatermarkValueReferenceBusy(newLowWatermark); |
| |
| TxIdAndTimestamp upperBound = new TxIdAndTimestamp(newLowWatermark, new UUID(Long.MAX_VALUE, Long.MAX_VALUE)); |
| |
| List<CompletableFuture<Void>> readOnlyTxFutures = List.copyOf(readOnlyTxFutureById.headMap(upperBound, true).values()); |
| |
| return allOf(readOnlyTxFutures.toArray(CompletableFuture[]::new)).thenApply(unused -> false); |
| }); |
| } |
| |
| @Override |
| public void onReceived(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) { |
| if (!(message instanceof ReplicaResponse) || correlationId != null) { |
| return; |
| } |
| |
| // Ignore error responses here. A transaction will be rolled back in other place. |
| if (message instanceof ErrorReplicaResponse) { |
| return; |
| } |
| |
| // Process directly sent response. |
| ReplicaResponse response = (ReplicaResponse) message; |
| |
| Object result = response.result(); |
| |
| if (result instanceof UUID) { |
| transactionInflights.removeInflight((UUID) result); |
| } |
| |
| if (result instanceof WriteIntentSwitchReplicatedInfo) { |
| txCleanupRequestHandler.writeIntentSwitchReplicated((WriteIntentSwitchReplicatedInfo) result); |
| } |
| } |
| |
| /** |
| * Check whether previously enlisted primary replicas aren't expired and that commit timestamp is less or equal than primary replicas |
| * expiration timestamp. Given method will either complete result future with void or {@link PrimaryReplicaExpiredException} |
| * |
| * @param enlistedGroups enlisted primary replicas map from groupId to enlistment consistency token. |
| * @param commitTimestamp Commit timestamp. |
| * @return Verification future. |
| */ |
| private CompletableFuture<Void> verifyCommitTimestamp( |
| Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups, |
| HybridTimestamp commitTimestamp |
| ) { |
| var verificationFutures = new CompletableFuture[enlistedGroups.size()]; |
| int cnt = -1; |
| |
| for (Map.Entry<TablePartitionId, IgniteBiTuple<ClusterNode, Long>> enlistedGroup : enlistedGroups.entrySet()) { |
| TablePartitionId groupId = enlistedGroup.getKey(); |
| Long expectedEnlistmentConsistencyToken = enlistedGroup.getValue().get2(); |
| |
| verificationFutures[++cnt] = placementDriver.getPrimaryReplica(groupId, commitTimestamp) |
| .thenAccept(currentPrimaryReplica -> { |
| if (currentPrimaryReplica == null |
| || !expectedEnlistmentConsistencyToken.equals(currentPrimaryReplica.getStartTime().longValue()) |
| ) { |
| throw new PrimaryReplicaExpiredException( |
| groupId, |
| expectedEnlistmentConsistencyToken, |
| commitTimestamp, |
| currentPrimaryReplica |
| ); |
| } else { |
| assert commitTimestamp.compareTo(currentPrimaryReplica.getExpirationTime()) <= 0 : |
| format( |
| "Commit timestamp is greater than primary replica expiration timestamp:" |
| + " [groupId = {}, commit timestamp = {}, primary replica expiration timestamp = {}]", |
| groupId, commitTimestamp, currentPrimaryReplica.getExpirationTime()); |
| } |
| }); |
| } |
| |
| return allOf(verificationFutures); |
| } |
| |
| static class TransactionFailureHandler { |
| private static final Set<Class<? extends Throwable>> RECOVERABLE = Set.of( |
| TimeoutException.class, |
| IOException.class, |
| ReplicationException.class, |
| ReplicationTimeoutException.class, |
| PrimaryReplicaMissException.class |
| ); |
| |
| /** |
| * Check if the provided exception is recoverable. A recoverable transaction is the one that we can send a 'retry' request for. |
| * |
| * @param throwable Exception to test. |
| * @return {@code true} if recoverable, {@code false} otherwise. |
| */ |
| static boolean isRecoverable(Throwable throwable) { |
| if (throwable == null) { |
| return false; |
| } |
| |
| Throwable candidate = ExceptionUtils.unwrapCause(throwable); |
| |
| for (Class<? extends Throwable> recoverableClass : RECOVERABLE) { |
| if (recoverableClass.isAssignableFrom(candidate.getClass())) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| } |
| |
| private HybridTimestamp createBeginTimestampWithIncrementRwTxCounter() { |
| return localRwTxCounter.inUpdateRwTxCountLock(() -> { |
| HybridTimestamp beginTs = clockService.now(); |
| |
| localRwTxCounter.incrementRwTxCount(beginTs); |
| |
| return beginTs; |
| }); |
| } |
| |
| private void decrementRwTxCount(UUID txId) { |
| localRwTxCounter.inUpdateRwTxCountLock(() -> { |
| localRwTxCounter.decrementRwTxCount(beginTimestamp(txId)); |
| |
| return null; |
| }); |
| } |
| |
| private void increaseLowWatermarkValueReferenceBusy(HybridTimestamp newLowWatermark) { |
| lowWatermarkValueReference.updateAndGet(previousLowWatermark -> { |
| if (previousLowWatermark == null) { |
| return newLowWatermark; |
| } |
| |
| assert newLowWatermark.compareTo(previousLowWatermark) > 0 : |
| "lower watermark should be growing: [previous=" + previousLowWatermark + ", new=" + newLowWatermark + ']'; |
| |
| return newLowWatermark; |
| }); |
| } |
| } |