blob: 56f1813a78b2a5a3e70eace843b9af3585903622 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table.distributed.raft;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.table.distributed.TableUtils.indexIdsAtRwTxBeginTs;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.util.CollectionUtils.last;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.ReadCommand;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.raft.service.BeforeApplyHandler;
import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.CommittedConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowUpgrader;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.Locker;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.command.BuildIndexCommand;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.command.WriteIntentSwitchCommand;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Partition command handler.
*/
public class PartitionListener implements RaftGroupListener, BeforeApplyHandler {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(PartitionListener.class);
/** Transaction manager. */
private final TxManager txManager;
/** Partition storage with access to MV data of a partition. */
private final PartitionDataStorage storage;
/** Handler that processes storage updates. */
private final StorageUpdateHandler storageUpdateHandler;
/** Storage of transaction metadata. */
private final TxStateStorage txStateStorage;
/** Safe time tracker. */
private final PendingComparableValuesTracker<HybridTimestamp, Void> safeTime;
/** Storage index tracker. */
private final PendingComparableValuesTracker<Long, Void> storageIndexTracker;
/** Is used in order to detect and retry safe time reordering within onBeforeApply. */
private volatile long maxObservableSafeTime = -1;
/** Is used in order to assert safe time reordering within onWrite. */
private long maxObservableSafeTimeVerifier = -1;
private final CatalogService catalogService;
private final SchemaRegistry schemaRegistry;
private final ClockService clockService;
/**
* The constructor.
*
* @param txManager Transaction manager.
* @param partitionDataStorage The storage.
* @param safeTime Safe time tracker.
* @param storageIndexTracker Storage index tracker.
* @param catalogService Catalog service.
*/
public PartitionListener(
TxManager txManager,
PartitionDataStorage partitionDataStorage,
StorageUpdateHandler storageUpdateHandler,
TxStateStorage txStateStorage,
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
PendingComparableValuesTracker<Long, Void> storageIndexTracker,
CatalogService catalogService,
SchemaRegistry schemaRegistry,
ClockService clockService
) {
this.txManager = txManager;
this.storage = partitionDataStorage;
this.storageUpdateHandler = storageUpdateHandler;
this.txStateStorage = txStateStorage;
this.safeTime = safeTime;
this.storageIndexTracker = storageIndexTracker;
this.catalogService = catalogService;
this.schemaRegistry = schemaRegistry;
this.clockService = clockService;
}
@Override
public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
iterator.forEachRemaining((CommandClosure<? extends ReadCommand> clo) -> {
Command command = clo.command();
assert false : "No read commands expected, [cmd=" + command + ']';
});
}
@Override
public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
iterator.forEachRemaining((CommandClosure<? extends WriteCommand> clo) -> {
Command command = clo.command();
if (command instanceof SafeTimePropagatingCommand) {
SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command;
long proposedSafeTime = cmd.safeTime().longValue();
// Because of clock.tick it's guaranteed that two different commands will have different safe timestamps.
// maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried
// by raft client because of either TimeoutException or inner raft server recoverable exception.
assert proposedSafeTime >= maxObservableSafeTimeVerifier : "Safe time reordering detected [current="
+ maxObservableSafeTimeVerifier + ", proposed=" + proposedSafeTime + "]";
maxObservableSafeTimeVerifier = proposedSafeTime;
}
long commandIndex = clo.index();
long commandTerm = clo.term();
// We choose the minimum applied index, since we choose it (the minimum one) on local recovery so as not to lose the data for
// one of the storages.
long storagesAppliedIndex = Math.min(storage.lastAppliedIndex(), txStateStorage.lastAppliedIndex());
assert commandIndex > storagesAppliedIndex :
"Write command must have an index greater than that of storages [commandIndex=" + commandIndex
+ ", mvAppliedIndex=" + storage.lastAppliedIndex()
+ ", txStateAppliedIndex=" + txStateStorage.lastAppliedIndex() + "]";
Serializable result = null;
// NB: Make sure that ANY command we accept here updates lastAppliedIndex+term info in one of the underlying
// storages!
// Otherwise, a gap between lastAppliedIndex from the point of view of JRaft and our storage might appear.
// If a leader has such a gap, and does doSnapshot(), it will subsequently truncate its log too aggressively
// in comparison with 'snapshot' state stored in our storages; and if we install a snapshot from our storages
// to a follower at this point, for a subsequent AppendEntries the leader will not be able to get prevLogTerm
// (because it's already truncated in the leader's log), so it will have to install a snapshot again, and then
// repeat same thing over and over again.
storage.acquirePartitionSnapshotsReadLock();
try {
if (command instanceof UpdateCommand) {
result = handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm);
} else if (command instanceof UpdateAllCommand) {
result = handleUpdateAllCommand((UpdateAllCommand) command, commandIndex, commandTerm);
} else if (command instanceof FinishTxCommand) {
result = handleFinishTxCommand((FinishTxCommand) command, commandIndex, commandTerm);
} else if (command instanceof WriteIntentSwitchCommand) {
handleWriteIntentSwitchCommand((WriteIntentSwitchCommand) command, commandIndex, commandTerm);
} else if (command instanceof SafeTimeSyncCommand) {
handleSafeTimeSyncCommand((SafeTimeSyncCommand) command, commandIndex, commandTerm);
} else if (command instanceof BuildIndexCommand) {
handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm);
} else if (command instanceof VacuumTxStatesCommand) {
handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm);
} else {
assert false : "Command was not found [cmd=" + command + ']';
}
} catch (IgniteInternalException e) {
result = e;
} catch (CompletionException e) {
result = e.getCause();
} catch (Throwable t) {
LOG.error(
"Unknown error while processing command [commandIndex={}, commandTerm={}, command={}]",
t,
clo.index(), clo.index(), command
);
throw t;
} finally {
storage.releasePartitionSnapshotsReadLock();
}
// Completing the closure out of the partition snapshots lock to reduce possibility of deadlocks as it might
// trigger other actions taking same locks.
clo.result(result);
if (command instanceof SafeTimePropagatingCommand) {
SafeTimePropagatingCommand safeTimePropagatingCommand = (SafeTimePropagatingCommand) command;
assert safeTimePropagatingCommand.safeTime() != null;
synchronized (safeTime) {
updateTrackerIgnoringTrackerClosedException(safeTime, safeTimePropagatingCommand.safeTime());
}
}
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, commandIndex);
});
}
/**
* Handler for the {@link UpdateCommand}.
*
* @param cmd Command.
* @param commandIndex Index of the RAFT command.
* @param commandTerm Term of the RAFT command.
*/
private UpdateCommandResult handleUpdateCommand(UpdateCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return new UpdateCommandResult(true);
}
if (cmd.leaseStartTime() != null) {
long leaseStartTime = requireNonNull(cmd.leaseStartTime(), "Inconsistent lease information in command [cmd=" + cmd + "].");
long storageLeaseStartTime = storage.leaseStartTime();
if (leaseStartTime != storageLeaseStartTime) {
return new UpdateCommandResult(false, storageLeaseStartTime);
}
}
UUID txId = cmd.txId();
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper storage/raft index handling is required.
synchronized (safeTime) {
if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
storageUpdateHandler.handleUpdate(
txId,
cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(),
cmd.rowToUpdate(),
!cmd.full(),
() -> storage.lastApplied(commandIndex, commandTerm),
cmd.full() ? cmd.safeTime() : null,
cmd.lastCommitTimestamp(),
indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId())
);
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
} else {
// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
}
replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full());
return new UpdateCommandResult(true);
}
/**
* Handler for the {@link UpdateAllCommand}.
*
* @param cmd Command.
* @param commandIndex Index of the RAFT command.
* @param commandTerm Term of the RAFT command.
*/
private UpdateCommandResult handleUpdateAllCommand(UpdateAllCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return new UpdateCommandResult(true);
}
if (cmd.leaseStartTime() != null) {
long leaseStartTime = requireNonNull(cmd.leaseStartTime(), "Inconsistent lease information in command [cmd=" + cmd + "].");
long storageLeaseStartTime = storage.leaseStartTime();
if (leaseStartTime != storageLeaseStartTime) {
return new UpdateCommandResult(false, storageLeaseStartTime);
}
}
UUID txId = cmd.txId();
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper storage/raft index handling is required.
synchronized (safeTime) {
if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
storageUpdateHandler.handleUpdateAll(
txId,
cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(),
!cmd.full(),
() -> storage.lastApplied(commandIndex, commandTerm),
cmd.full() ? cmd.safeTime() : null,
indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId())
);
updateTrackerIgnoringTrackerClosedException(safeTime, cmd.safeTime());
} else {
// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
}
replicaTouch(txId, cmd.txCoordinatorId(), cmd.full() ? cmd.safeTime() : null, cmd.full());
return new UpdateCommandResult(true);
}
/**
* Handler for the {@link FinishTxCommand}.
*
* @param cmd Command.
* @param commandIndex Index of the RAFT command.
* @param commandTerm Term of the RAFT command.
* @return The actually stored transaction state {@link TransactionResult}.
* @throws IgniteInternalException if an exception occurred during a transaction state change.
*/
private @Nullable TransactionResult handleFinishTxCommand(FinishTxCommand cmd, long commandIndex, long commandTerm)
throws IgniteInternalException {
// Skips the write command because the storage has already executed it.
if (commandIndex <= txStateStorage.lastAppliedIndex()) {
return null;
}
UUID txId = cmd.txId();
TxState stateToSet = cmd.commit() ? COMMITTED : ABORTED;
TxMeta txMetaToSet = new TxMeta(
stateToSet,
fromPartitionIdMessage(cmd.partitionIds()),
cmd.commitTimestamp()
);
TxMeta txMetaBeforeCas = txStateStorage.get(txId);
boolean txStateChangeRes = txStateStorage.compareAndSet(
txId,
null,
txMetaToSet,
commandIndex,
commandTerm
);
// Assume that we handle the finish command only on the commit partition.
TablePartitionId commitPartitionId = new TablePartitionId(storage.tableId(), storage.partitionId());
markFinished(txId, cmd.commit(), cmd.commitTimestamp(), commitPartitionId);
LOG.debug("Finish the transaction txId = {}, state = {}, txStateChangeRes = {}", txId, txMetaToSet, txStateChangeRes);
if (!txStateChangeRes) {
onTxStateStorageCasFail(txId, txMetaBeforeCas, txMetaToSet);
}
return new TransactionResult(stateToSet, cmd.commitTimestamp());
}
private static List<TablePartitionId> fromPartitionIdMessage(List<TablePartitionIdMessage> partitionIds) {
List<TablePartitionId> list = new ArrayList<>(partitionIds.size());
for (TablePartitionIdMessage partitionIdMessage : partitionIds) {
list.add(partitionIdMessage.asTablePartitionId());
}
return list;
}
/**
* Handler for the {@link WriteIntentSwitchCommand}.
*
* @param cmd Command.
* @param commandIndex Index of the RAFT command.
* @param commandTerm Term of the RAFT command.
*/
private void handleWriteIntentSwitchCommand(WriteIntentSwitchCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return;
}
UUID txId = cmd.txId();
markFinished(txId, cmd.commit(), cmd.commitTimestamp(), null);
storageUpdateHandler.switchWriteIntents(
txId,
cmd.commit(),
cmd.commitTimestamp(),
() -> storage.lastApplied(commandIndex, commandTerm),
indexIdsAtRwTxBeginTs(catalogService, txId, storage.tableId())
);
}
/**
* Handler for the {@link SafeTimeSyncCommand}.
*
* @param cmd Command.
* @param commandIndex RAFT index of the command.
* @param commandTerm RAFT term of the command.
*/
private void handleSafeTimeSyncCommand(SafeTimeSyncCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return;
}
// We MUST bump information about last updated index+term.
// See a comment in #onWrite() for explanation.
advanceLastAppliedIndexConsistently(commandIndex, commandTerm);
}
private void advanceLastAppliedIndexConsistently(long commandIndex, long commandTerm) {
storage.runConsistently(locker -> {
storage.lastApplied(commandIndex, commandTerm);
return null;
});
}
@Override
public void onConfigurationCommitted(CommittedConfiguration config) {
// Skips the update because the storage has already recorded it.
if (config.index() <= storage.lastAppliedIndex()) {
return;
}
// Do the update under lock to make sure no snapshot is started concurrently with this update.
// Note that we do not need to protect from a concurrent command execution by this listener because
// configuration is committed in the same thread in which commands are applied.
storage.acquirePartitionSnapshotsReadLock();
try {
storage.runConsistently(locker -> {
storage.committedGroupConfiguration(
new RaftGroupConfiguration(config.peers(), config.learners(), config.oldPeers(), config.oldLearners())
);
storage.lastApplied(config.index(), config.term());
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, config.index());
return null;
});
} finally {
storage.releasePartitionSnapshotsReadLock();
}
}
@Override
public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
// The max index here is required for local recovery and a possible scenario
// of false node failure when we actually have all required data. This might happen because we use the minimal index
// among storages on a node restart.
// Let's consider a more detailed example:
// 1) We don't propagate the maximal lastAppliedIndex among storages, and onSnapshotSave finishes, it leads to the raft log
// truncation until the maximal lastAppliedIndex.
// 2) Unexpected cluster restart happens.
// 3) Local recovery of a node is started, where we request data from the minimal lastAppliedIndex among storages, because
// some data for some node might not have been flushed before unexpected cluster restart.
// 4) When we try to restore data starting from the minimal lastAppliedIndex, we come to the situation
// that a raft node doesn't have such data, because the truncation until the maximal lastAppliedIndex from 1) has happened.
// 5) Node cannot finish local recovery.
long maxLastAppliedIndex = Math.max(storage.lastAppliedIndex(), txStateStorage.lastAppliedIndex());
long maxLastAppliedTerm = Math.max(storage.lastAppliedTerm(), txStateStorage.lastAppliedTerm());
storage.runConsistently(locker -> {
storage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm);
return null;
});
txStateStorage.lastApplied(maxLastAppliedIndex, maxLastAppliedTerm);
updateTrackerIgnoringTrackerClosedException(storageIndexTracker, maxLastAppliedIndex);
CompletableFuture.allOf(storage.flush(), txStateStorage.flush())
.whenComplete((unused, throwable) -> doneClo.accept(throwable));
}
@Override
public boolean onSnapshotLoad(Path path) {
return true;
}
@Override
public void onShutdown() {
storage.close();
}
@Override
public void onLeaderStart() {
maxObservableSafeTime = clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue();
}
@Override
public boolean onBeforeApply(Command command) {
// This method is synchronized by replication group specific monitor, see ActionRequestProcessor#handleRequest.
if (command instanceof SafeTimePropagatingCommand) {
SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command;
long proposedSafeTime = cmd.safeTime().longValue();
// Because of clock.tick it's guaranteed that two different commands will have different safe timestamps.
// maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried
// by raft client because of either TimeoutException or inner raft server recoverable exception.
if (proposedSafeTime >= maxObservableSafeTime) {
maxObservableSafeTime = proposedSafeTime;
} else {
throw new SafeTimeReorderException();
}
}
return false;
}
/**
* Returns underlying storage.
*/
@TestOnly
public MvPartitionStorage getMvStorage() {
return storage.getStorage();
}
/**
* Handler for the {@link BuildIndexCommand}.
*
* @param cmd Command.
* @param commandIndex RAFT index of the command.
* @param commandTerm RAFT term of the command.
*/
void handleBuildIndexCommand(BuildIndexCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return;
}
BuildIndexRowVersionChooser rowVersionChooser = createBuildIndexRowVersionChooser(cmd);
BinaryRowUpgrader binaryRowUpgrader = createBinaryRowUpgrader(cmd);
storage.runConsistently(locker -> {
List<UUID> rowUuids = new ArrayList<>(cmd.rowIds());
// Natural UUID order matches RowId order within the same partition.
Collections.sort(rowUuids);
Stream<BinaryRowAndRowId> buildIndexRowStream = createBuildIndexRowStream(
rowUuids,
locker,
rowVersionChooser,
binaryRowUpgrader
);
RowId nextRowIdToBuild = cmd.finish() ? null : toRowId(requireNonNull(last(rowUuids))).increment();
storageUpdateHandler.getIndexUpdateHandler().buildIndex(cmd.indexId(), buildIndexRowStream, nextRowIdToBuild);
storage.lastApplied(commandIndex, commandTerm);
return null;
});
if (cmd.finish()) {
LOG.info(
"Finish building the index: [tableId={}, partitionId={}, indexId={}]",
storage.tableId(), storage.partitionId(), cmd.indexId()
);
}
}
/**
* Handler for {@link PrimaryReplicaChangeCommand}.
*
* @param cmd Command.
* @param commandIndex Command index.
* @param commandTerm Command term.
*/
private void handlePrimaryReplicaChangeCommand(PrimaryReplicaChangeCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return;
}
storage.runConsistently(locker -> {
storage.updateLease(cmd.leaseStartTime());
storage.lastApplied(commandIndex, commandTerm);
return null;
});
}
/**
* Handler for {@link VacuumTxStatesCommand}.
*
* @param cmd Command.
* @param commandIndex Command index.
* @param commandTerm Command term.
*/
private void handleVacuumTxStatesCommand(VacuumTxStatesCommand cmd, long commandIndex, long commandTerm) {
// Skips the write command because the storage has already executed it.
if (commandIndex <= storage.lastAppliedIndex()) {
return;
}
for (UUID txId : cmd.txIds()) {
txStateStorage.remove(txId, commandIndex, commandTerm);
}
}
private static void onTxStateStorageCasFail(UUID txId, TxMeta txMetaBeforeCas, TxMeta txMetaToSet) {
String errorMsg = format("Failed to update tx state in the storage, transaction txId = {} because of inconsistent state,"
+ " expected state = {}, state to set = {}",
txId,
txMetaBeforeCas,
txMetaToSet
);
IgniteInternalException stateChangeException =
new UnexpectedTransactionStateException(
errorMsg,
new TransactionResult(txMetaBeforeCas.txState(), txMetaBeforeCas.commitTimestamp())
);
// Exception is explicitly logged because otherwise it can be lost if it did not occur on the leader.
LOG.error(errorMsg);
throw stateChangeException;
}
private static <T extends Comparable<T>> void updateTrackerIgnoringTrackerClosedException(
PendingComparableValuesTracker<T, Void> tracker,
T newValue
) {
try {
tracker.update(newValue, null);
} catch (TrackerClosedException ignored) {
// No-op.
}
}
private Stream<BinaryRowAndRowId> createBuildIndexRowStream(
List<UUID> rowUuids,
Locker locker,
BuildIndexRowVersionChooser rowVersionChooser,
BinaryRowUpgrader binaryRowUpgrader
) {
return rowUuids.stream()
.map(this::toRowId)
.peek(locker::lock)
.map(rowVersionChooser::chooseForBuildIndex)
.flatMap(Collection::stream)
.map(binaryRowAndRowId -> upgradeBinaryRow(binaryRowUpgrader, binaryRowAndRowId));
}
private RowId toRowId(UUID rowUuid) {
return new RowId(storageUpdateHandler.partitionId(), rowUuid);
}
private void replicaTouch(UUID txId, String txCoordinatorId, HybridTimestamp commitTimestamp, boolean full) {
txManager.updateTxMeta(txId, old -> new TxStateMeta(
full ? COMMITTED : PENDING,
txCoordinatorId,
old == null ? null : old.commitPartitionId(),
full ? commitTimestamp : null
));
}
private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, @Nullable TablePartitionId partId) {
txManager.updateTxMeta(txId, old -> new TxStateMeta(
commit ? COMMITTED : ABORTED,
old == null ? null : old.txCoordinatorId(),
old == null ? partId : old.commitPartitionId(),
commit ? commitTimestamp : null
));
}
private BuildIndexRowVersionChooser createBuildIndexRowVersionChooser(BuildIndexCommand command) {
int indexCreationCatalogVersion = command.creationCatalogVersion();
Catalog indexCreationCatalog = catalogService.catalog(indexCreationCatalogVersion);
assert indexCreationCatalog != null : "indexId=" + command.indexId() + ", catalogVersion=" + indexCreationCatalogVersion;
int startBuildingIndexCatalogVersion = command.requiredCatalogVersion();
Catalog startBuildingIndexCatalog = catalogService.catalog(startBuildingIndexCatalogVersion);
assert startBuildingIndexCatalog != null : "indexId=" + command.indexId() + ", catalogVersion=" + startBuildingIndexCatalogVersion;
return new BuildIndexRowVersionChooser(storage, indexCreationCatalog.time(), startBuildingIndexCatalog.time());
}
private BinaryRowUpgrader createBinaryRowUpgrader(BuildIndexCommand command) {
int indexCreationCatalogVersion = command.creationCatalogVersion();
CatalogIndexDescriptor indexDescriptor = catalogService.index(command.indexId(), indexCreationCatalogVersion);
assert indexDescriptor != null : "indexId=" + command.indexId() + ", catalogVersion=" + indexCreationCatalogVersion;
CatalogTableDescriptor tableDescriptor = catalogService.table(indexDescriptor.tableId(), indexCreationCatalogVersion);
assert tableDescriptor != null : "tableId=" + indexDescriptor.tableId() + ", catalogVersion=" + indexCreationCatalogVersion;
SchemaDescriptor schema = schemaRegistry.schema(tableDescriptor.tableVersion());
return new BinaryRowUpgrader(schemaRegistry, schema);
}
private static BinaryRowAndRowId upgradeBinaryRow(BinaryRowUpgrader upgrader, BinaryRowAndRowId source) {
BinaryRow sourceBinaryRow = source.binaryRow();
BinaryRow upgradedBinaryRow = upgrader.upgrade(sourceBinaryRow);
return upgradedBinaryRow == sourceBinaryRow ? source : new BinaryRowAndRowId(upgradedBinaryRow, source.rowId());
}
}