blob: 105d8d39340e7489585ba894674c467b1c239da6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse;
import org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Abandoned transactions integration tests.
*/
@ExtendWith(SystemPropertiesExtension.class)
@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "500")
public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest {
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
/** Table name. */
private static final String TABLE_NAME = "test_table";
private static final int PART_ID = 0;
@BeforeEach
@Override
public void setup(TestInfo testInfo) throws Exception {
super.setup(testInfo);
String zoneSql = "create zone test_zone with partitions=1, replicas=3, storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
String sql = "create table " + TABLE_NAME + " (key int primary key, val varchar(20)) with primary_zone='TEST_ZONE'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
}
@Override
protected void customizeInitParameters(InitParametersBuilder builder) {
super.customizeInitParameters(builder);
builder.clusterConfiguration("{\n"
+ " \"transaction\": {\n"
+ " \"abandonedCheckTs\": 600000\n"
+ " \"attemptsObtainLock\": 0\n"
+ " }\n"
+ "}\n");
}
@Test
public void testMultipleRecoveryRequestsIssued() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
Transaction oldRwTx = node(0).transactions().begin();
UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
CompletableFuture<UUID> recoveryTxMsgCaptureFut = new CompletableFuture<>();
AtomicInteger msgCount = new AtomicInteger();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxRecoveryMessage) {
var recoveryTxMsg = (TxRecoveryMessage) msg;
recoveryTxMsgCaptureFut.complete(recoveryTxMsg.txId());
// Drop only the first recovery to emulate a lost message.
// Another one should be issued eventually.
return msgCount.incrementAndGet() == 1;
}
return false;
});
runConflictingTransaction(node(0), oldRwTx);
runConflictingTransaction(node(0), node(0).transactions().begin());
assertThat(recoveryTxMsgCaptureFut, willCompleteSuccessfully());
assertEquals(orphanTxId, recoveryTxMsgCaptureFut.join());
assertEquals(1, msgCount.get());
node(0).clusterConfiguration().getConfiguration(TransactionConfiguration.KEY).change(transactionChange ->
transactionChange.changeAbandonedCheckTs(1));
assertTrue(waitForCondition(() -> {
runConflictingTransaction(node(0), node(0).transactions().begin());
return msgCount.get() > 1;
}, 10_000));
}
@Test
public void testAbandonedTxIsAborted() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
CompletableFuture<UUID> recoveryTxMsgCaptureFut = new CompletableFuture<>();
AtomicInteger msgCount = new AtomicInteger();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxRecoveryMessage) {
var recoveryTxMsg = (TxRecoveryMessage) msg;
recoveryTxMsgCaptureFut.complete(recoveryTxMsg.txId());
msgCount.incrementAndGet();
}
return false;
});
runConflictingTransaction(node(0), node(0).transactions().begin());
assertThat(recoveryTxMsgCaptureFut, willCompleteSuccessfully());
assertEquals(orphanTxId, recoveryTxMsgCaptureFut.join());
assertEquals(1, msgCount.get());
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTxId) == TxState.ABORTED, 10_000));
}
@Test
public void testWriteIntentRecoverNoCoordinator() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
AtomicInteger msgCount = new AtomicInteger();
IgniteImpl roCoordNode = node(0);
log.info("RO Transaction coordinator is chosen [node={}].", roCoordNode.name());
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxStateCommitPartitionRequest) {
msgCount.incrementAndGet();
assertEquals(TxState.ABANDONED, txVolatileState(commitPartNode, orphanTxId));
}
return false;
});
Transaction recoveryTxReadOnly = roCoordNode.transactions().begin(new TransactionOptions().readOnly(true));
runReadOnlyTransaction(roCoordNode, recoveryTxReadOnly);
assertEquals(1, msgCount.get());
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTxId) == TxState.ABORTED, 10_000));
}
/**
* Coordinator is alive, no recovery expected.
*/
@Test
public void testWriteIntentNoRecovery() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
Transaction rwTransaction = createRwTransaction(txCrdNode);
AtomicInteger msgCount = new AtomicInteger();
IgniteImpl roCoordNode = node(0);
log.info("RO Transaction coordinator is chosen [node={}].", roCoordNode.name());
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxRecoveryMessage) {
msgCount.incrementAndGet();
}
return false;
});
Transaction recoveryTxReadOnly = roCoordNode.transactions().begin(new TransactionOptions().readOnly(true));
runReadOnlyTransaction(roCoordNode, recoveryTxReadOnly);
assertEquals(0, msgCount.get());
rwTransaction.commit();
UUID rwId = ((InternalTransaction) rwTransaction).id();
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, rwId) == TxState.COMMITTED, 10_000));
}
@Test
public void testWriteIntentRecoveryAndLockConflict() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
AtomicInteger stateMsgCount = new AtomicInteger();
AtomicInteger recoveryMsgCount = new AtomicInteger();
IgniteImpl roCoordNode = node(0);
log.info("RO Transaction coordinator is chosen [node={}].", roCoordNode.name());
CompletableFuture<UUID> txMsgCaptureFut = new CompletableFuture<>();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxStateCommitPartitionRequest) {
stateMsgCount.incrementAndGet();
assertEquals(TxState.ABANDONED, txVolatileState(commitPartNode, orphanTxId));
txMsgCaptureFut.complete(((TxStateCommitPartitionRequest) msg).txId());
}
if (msg instanceof TxRecoveryMessage) {
recoveryMsgCount.incrementAndGet();
}
return false;
});
Transaction recoveryTxReadOnly = roCoordNode.transactions().begin(new TransactionOptions().readOnly(true));
RecordView view = roCoordNode.tables().table(TABLE_NAME).recordView();
view.getAsync(recoveryTxReadOnly, Tuple.create().set("key", 42));
assertThat(txMsgCaptureFut, willCompleteSuccessfully());
runConflictingTransaction(commitPartNode, commitPartNode.transactions().begin());
assertEquals(1, stateMsgCount.get());
assertEquals(0, recoveryMsgCount.get());
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTxId) == TxState.ABORTED, 10_000));
}
/**
* Coordinator sends a commit message and dies. The message eventually reaches the commit partition and gets executed.
* The expected outcome of the transaction is COMMIT.
*/
@Test
public void testSendCommitAndDie() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
InternalTransaction orphanTx = (InternalTransaction) createRwTransaction(txCrdNode);
CompletableFuture<TxFinishReplicaRequest> finishRequestCaptureFut = new CompletableFuture<>();
AtomicReference<String> targetName = new AtomicReference<>();
// Intercept the commit message, prevent it form being sent. We will kill this node anyway.
txCrdNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxFinishReplicaRequest) {
var finishTxMsg = (TxFinishReplicaRequest) msg;
finishRequestCaptureFut.complete(finishTxMsg);
targetName.set(nodeName);
return true;
}
return false;
});
// Initiate commit.
orphanTx.commitAsync();
assertThat(finishRequestCaptureFut, willCompleteSuccessfully());
// Stop old coordinator.
String txCrdNodeId = txCrdNode.id();
txCrdNode.stop();
assertTrue(waitForCondition(
() -> node(0).clusterNodes().stream().filter(n -> txCrdNodeId.equals(n.id())).count() == 0,
10_000)
);
// The state on the commit partition is still PENDING.
assertEquals(TxState.PENDING, txVolatileState(commitPartNode, orphanTx.id()));
// Continue the COMMIT message flow.
CompletableFuture<NetworkMessage> finishRequest =
bypassingThreadAssertions(() -> messaging(commitPartNode).invoke(targetName.get(), finishRequestCaptureFut.join(), 3000));
assertThat(finishRequest, willCompleteSuccessfully());
// The conflicting transaction should see an already committed TX.
runRwTransactionNoError(node(0), node(0).transactions().begin());
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTx.id()) == TxState.COMMITTED, 10_000));
}
/**
* Coordinator sends a commit message and dies. Another tx initiates recovery and aborts this transaction.
* The commit message eventually reaches the commit partition and gets executed but the outcome is ABORTED.
*/
@Test
public void testCommitAndDieRecoveryFirst() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
InternalTransaction orphanTx = (InternalTransaction) createRwTransaction(txCrdNode);
CompletableFuture<TxFinishReplicaRequest> finishRequestCaptureFut = new CompletableFuture<>();
AtomicReference<String> targetName = new AtomicReference<>();
// Intercept the commit message, prevent it form being sent. We will kill this node anyway.
txCrdNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxFinishReplicaRequest) {
var finishTxMsg = (TxFinishReplicaRequest) msg;
finishRequestCaptureFut.complete(finishTxMsg);
targetName.set(nodeName);
return true;
}
return false;
});
// Initiate commit.
orphanTx.commitAsync();
assertThat(finishRequestCaptureFut, willCompleteSuccessfully());
// Stop old coordinator.
String txCrdNodeId = txCrdNode.id();
txCrdNode.stop();
assertTrue(waitForCondition(
() -> node(0).clusterNodes().stream().filter(n -> txCrdNodeId.equals(n.id())).count() == 0,
10_000)
);
// The state on the commit partition is still PENDING.
assertEquals(TxState.PENDING, txVolatileState(commitPartNode, orphanTx.id()));
IgniteImpl newTxCoord = node(0);
runRwTransactionNoError(newTxCoord, newTxCoord.transactions().begin());
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTx.id()) == TxState.ABORTED, 10_000));
CompletableFuture<NetworkMessage> commitRequest =
bypassingThreadAssertions(() -> messaging(commitPartNode).invoke(targetName.get(), finishRequestCaptureFut.join(), 3000));
assertThat(commitRequest, willCompleteSuccessfully());
NetworkMessage response = commitRequest.join();
assertInstanceOf(ErrorTimestampAwareReplicaResponse.class, response);
ErrorTimestampAwareReplicaResponse errorResponse = (ErrorTimestampAwareReplicaResponse) response;
assertInstanceOf(MismatchingTransactionOutcomeException.class, ExceptionUtils.unwrapCause(errorResponse.throwable()));
assertEquals(TxState.ABORTED, txStoredState(commitPartNode, orphanTx.id()));
}
@Test
public void testRecoveryIsTriggeredOnce() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
UUID orphanTxId = startTransactionAndStopNode(txCrdNode);
log.info("Orphan tx [id={}]", orphanTxId);
CompletableFuture<UUID> recoveryTxMsgCaptureFut = new CompletableFuture<>();
AtomicInteger msgCount = new AtomicInteger();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxRecoveryMessage) {
var recoveryTxMsg = (TxRecoveryMessage) msg;
recoveryTxMsgCaptureFut.complete(recoveryTxMsg.txId());
msgCount.incrementAndGet();
}
return false;
});
IgniteImpl newCoordNode = node(0);
log.info("New transaction coordinator is chosen [node={}].", newCoordNode.name());
// Run RW transaction.
Transaction rwTx1 = commitPartNode.transactions().begin();
UUID rwTx1Id = ((InternalTransaction) rwTx1).id();
log.info("First concurrent tx [id={}]", rwTx1Id);
runConflictingTransaction(commitPartNode, rwTx1);
Transaction rwTx2 = newCoordNode.transactions().begin();
UUID rwTx2Id = ((InternalTransaction) rwTx2).id();
log.info("Second concurrent tx [id={}]", rwTx2Id);
runRwTransactionNoError(newCoordNode, rwTx2);
assertThat(recoveryTxMsgCaptureFut, willCompleteSuccessfully());
assertEquals(orphanTxId, recoveryTxMsgCaptureFut.join());
assertEquals(1, msgCount.get());
rwTx2.commit();
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, orphanTxId) == TxState.ABORTED, 10_000));
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, rwTx1Id) == TxState.ABORTED, 10_000));
Transaction rwTx3 = newCoordNode.transactions().begin();
log.info("Start RW tx {}", ((InternalTransaction) rwTx3).id());
runRwTransactionNoError(newCoordNode, rwTx3);
rwTx3.commit();
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, ((InternalTransaction) rwTx3).id()) == TxState.COMMITTED, 10_000));
}
@Test
public void testFinishAlreadyFinishedTx() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
Transaction rwTx1 = createRwTransaction(txCrdNode);
rwTx1.commit();
UUID rwTx1Id = ((InternalTransaction) rwTx1).id();
assertTrue(waitForCondition(() -> txStoredState(commitPartNode, rwTx1Id) == TxState.COMMITTED, 10_000));
IgniteImpl txCrdNode2 = node(0);
CompletableFuture<Void> finish2 = txCrdNode2.txManager().finish(
new HybridTimestampTracker(),
((InternalTransaction) rwTx1).commitPartition(),
false,
Map.of(((InternalTransaction) rwTx1).commitPartition(), new IgniteBiTuple<>(txCrdNode2.node(), 0L)),
rwTx1Id
);
assertThat(finish2, willThrow(MismatchingTransactionOutcomeException.class));
}
@Test
public void testPrimaryFailureRightAfterCommitMsg() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator node is determined [node={}].", txCrdNode.name());
Transaction rwTx1 = createRwTransaction(txCrdNode);
CompletableFuture<?> commitMsgSentFut = new CompletableFuture<>();
CompletableFuture<?> cancelLeaseFuture = new CompletableFuture<>();
txCrdNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxFinishReplicaRequest) {
boolean isFirst = !commitMsgSentFut.isDone();
if (isFirst) {
commitMsgSentFut.complete(null);
return true;
} else {
cancelLeaseFuture.join();
return false;
}
}
return false;
});
CompletableFuture<Void> commitFut = rwTx1.commitAsync();
assertThat(commitMsgSentFut, willCompleteSuccessfully());
cancelLease(commitPartNode, tblReplicationGrp);
waitAndGetLeaseholder(txCrdNode, tblReplicationGrp);
cancelLeaseFuture.complete(null);
assertThat(commitFut, willCompleteSuccessfully());
RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView();
var rec = view.get(null, Tuple.create().set("key", 42));
assertNotNull(rec);
assertEquals((Integer) 42, rec.value("key"));
assertEquals("val1", rec.value("val"));
}
@Test
public void testPrimaryFailureWhileInflightInProgress() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator node is determined [node={}].", txCrdNode.name());
Transaction rwTx1 = createRwTransaction(txCrdNode);
txCrdNode.dropMessages((nodeName, msg) -> {
if (msg instanceof ReadWriteSingleRowReplicaRequest) {
return true;
}
return false;
});
assertThrows(TransactionException.class, () -> {
RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView();
view.upsert(rwTx1, Tuple.create().set("key", 1).set("val", "val1"));
});
CompletableFuture<Void> commitFut = rwTx1.commitAsync();
commitPartNode.stop();
assertThat(commitFut, willCompleteSuccessfully());
}
@Test
public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator node is determined [node={}].", txCrdNode.name());
CompletableFuture<?> firstResponseSent = new CompletableFuture<>();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TimestampAwareReplicaResponse) {
TimestampAwareReplicaResponse response = (TimestampAwareReplicaResponse) msg;
if (response.result() == null) {
firstResponseSent.complete(null);
}
// This means this is the second response that finishes an in-flight future.
if (response.result() instanceof UUID) {
return true;
}
}
return false;
});
Transaction rwTx1 = createRwTransaction(txCrdNode);
CompletableFuture<Void> commitFut = rwTx1.commitAsync();
assertThat(firstResponseSent, willCompleteSuccessfully());
cancelLease(commitPartNode, tblReplicationGrp);
assertThat(commitFut, willThrow(MismatchingTransactionOutcomeException.class, 30, SECONDS));
RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView();
var rec = view.get(null, Tuple.create().set("key", 42));
assertNull(rec);
}
@Test
public void testTsRecoveryForCursor() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
preloadData(tbl, 10);
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl commitPartNode = findNodeByName(leaseholder);
log.info("Transaction commit partition is determined [node={}].", commitPartNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
startTransactionWithCursorAndStopNode(txCrdNode, commitPartNode);
IgniteImpl newCoordNode = node(0);
log.info("New transaction coordinator is chosen [node={}].", newCoordNode.name());
CompletableFuture<Void> txMsgCaptureFut = new CompletableFuture<>();
commitPartNode.dropMessages((nodeName, msg) -> {
if (msg instanceof TxRecoveryMessage) {
txMsgCaptureFut.complete(null);
}
return false;
});
Transaction tx = newCoordNode.transactions().begin();
RecordView view = newCoordNode.tables().table(TABLE_NAME).recordView();
var opFut = view.upsertAsync(tx, Tuple.create().set("key", 42).set("val", "new"));
try {
opFut.get();
} catch (Exception ex) {
log.info("Expected conflict that have to start recovery: " + ex.getMessage());
}
assertThat(txMsgCaptureFut, willCompleteSuccessfully());
}
/**
* Starts read-write/read only transaction, creates a cursor and checks that the cursor is canceled after tx coordinator leaves.
*
* @param readOnly Whether the tx is read only.
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCursorCleanup(boolean readOnly) throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
IgniteImpl targetNode = findNodeByName(leaseholder);
log.info("Transaction target node is determined [node={}].", targetNode.name());
IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name());
IgniteImpl thirdNode = findNode(0, initialNodes(), node -> node != txCrdNode && node != targetNode);
log.info("Another tx coordinator is chosen [node={}].", thirdNode.name());
// Preload data from coordinator node to adjust the observable timestamp.
preloadData(txCrdNode.tables().table(TABLE_NAME), 10);
// Creating a cursor that should remain because it is created from the node that will remain in the cluster.
InternalTransaction rwTx = (InternalTransaction) thirdNode.transactions().begin();
scanSingleEntryAndLeaveCursorOpen(targetNode, unwrapTableImpl(thirdNode.tables().table(TABLE_NAME)), rwTx);
// Creating a cursor that should be closed because tx coordinator leaves topology.
InternalTransaction tx = (InternalTransaction) txCrdNode.transactions().begin(new TransactionOptions().readOnly(readOnly));
startTransactionWithCursorAndStopNode(txCrdNode, targetNode, tx);
// Checking that just one cursor is remaining.
assertTrue(waitForCondition(() -> targetNode.resourcesRegistry().resources().size() == 1, 3000));
}
private static void preloadData(Table table, int entries) {
RecordView<Tuple> view = table.recordView();
for (int i = 0; i < entries; i++) {
view.upsert(null, Tuple.create().set("key", i).set("val", "preload"));
}
}
private UUID startTransactionWithCursorAndStopNode(IgniteImpl txCrdNode, IgniteImpl targetNode) throws Exception {
InternalTransaction rwTx = (InternalTransaction) txCrdNode.transactions().begin();
startTransactionWithCursorAndStopNode(txCrdNode, targetNode, rwTx);
return rwTx.id();
}
/**
* Starts a scan procedure and leaves it incomplete, then stops the coordinator node.
*
* @param txCrdNode Tx coordinator node.
* @param targetNode Node where the cursor should be created.
* @param tx Transaction.
*/
private void startTransactionWithCursorAndStopNode(IgniteImpl txCrdNode, IgniteImpl targetNode, InternalTransaction tx)
throws Exception {
scanSingleEntryAndLeaveCursorOpen(targetNode, unwrapTableImpl(txCrdNode.tables().table(TABLE_NAME)), tx);
String txCrdNodeId = txCrdNode.id();
txCrdNode.stop();
assertTrue(waitForCondition(
() -> node(0).clusterNodes().stream().filter(n -> txCrdNodeId.equals(n.id())).count() == 0,
10_000)
);
}
/**
* Starts a scan procedure for a specific transaction and reads only the first line from the cursor.
*
* @param targetNode Node where the cursor should be created.
* @param tbl Scanned table.
* @param tx Transaction.
* @throws Exception If failed.
*/
private void scanSingleEntryAndLeaveCursorOpen(IgniteImpl targetNode, TableViewInternal tbl, InternalTransaction tx)
throws Exception {
int initialCursorsCount = targetNode.resourcesRegistry().resources().size();
Publisher<BinaryRow> publisher;
if (tx.isReadOnly()) {
String primary = waitAndGetLeaseholder(node(0), new TablePartitionId(tbl.tableId(), PART_ID));
ClusterNode primaryNode = node(0).clusterNodes().stream().filter(node -> node.name().equals(primary)).findAny().get();
publisher = tbl.internalTable().scan(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, tx.coordinatorId());
} else {
publisher = tbl.internalTable().scan(PART_ID, tx);
}
List<BinaryRow> scannedRows = new ArrayList<>();
CompletableFuture<Void> scanned = new CompletableFuture<>();
Subscription subscription = TestFlowUtils.subscribeToPublisher(scannedRows, publisher, scanned);
subscription.request(1);
assertTrue(waitForCondition(() -> scannedRows.size() == 1, 10_000));
assertFalse(scanned.isDone());
assertEquals(initialCursorsCount + 1, targetNode.resourcesRegistry().resources().size());
}
@Test
public void testCursorsClosedAfterTxClose() throws Exception {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetPrimaryReplica(node(0), tblReplicationGrp).getLeaseholder();
IgniteImpl txExecNode = findNodeByName(leaseholder);
log.info("Transaction will be executed on [node={}].", txExecNode.name());
IgniteImpl txCrdNode = findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
log.info("Transaction coordinator is [node={}].", txCrdNode.name());
// Ensure there are no open cursors.
assertEquals(0, txExecNode.resourcesRegistry().resources().size());
preloadData(txCrdNode.tables().table(TABLE_NAME), 10);
CompletableFuture<Void> txRequestCaptureFut = new CompletableFuture<>();
InternalTransaction roTx = (InternalTransaction) txCrdNode.transactions().begin(new TransactionOptions().readOnly(true));
log.info("Run scan in RO [txId={}].", roTx.id());
txCrdNode.dropMessages((nodeName, msg) -> {
if (msg instanceof FinishedTransactionsBatchMessage) {
Collection<UUID> transactions = ((FinishedTransactionsBatchMessage) msg).transactions();
if (transactions.contains(roTx.id())) {
// Caught the request containing the finished tx.
txRequestCaptureFut.complete(null);
} else {
log.info("Received FinishedTransactionsBatchMessage without tx [txId={}]", roTx.id());
}
}
return false;
});
scanSingleEntryAndLeaveCursorOpen(txExecNode, unwrapTableImpl(txCrdNode.tables().table(TABLE_NAME)), roTx);
// After the RO scan there should be one open cursor.
assertEquals(1, txExecNode.resourcesRegistry().resources().size());
roTx.commit();
// Wait for the cursor cleanup message to arrive.
assertThat(txRequestCaptureFut, willCompleteSuccessfully());
// Now check that the cursor is closed.
assertTrue(waitForCondition(
() -> txExecNode.resourcesRegistry().resources().isEmpty(),
10_000)
);
}
private DefaultMessagingService messaging(IgniteImpl node) {
ClusterService coordinatorService = IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "clusterSvc");
return (DefaultMessagingService) coordinatorService.messagingService();
}
private @Nullable TxState txVolatileState(IgniteImpl node, UUID txId) {
TxStateMeta txMeta = node.txManager().stateMeta(txId);
return txMeta == null ? null : txMeta.txState();
}
private static @Nullable TxState txStoredState(IgniteImpl node, UUID txId) {
TxMeta txMeta = txStoredMeta(node, txId);
return txMeta == null ? null : txMeta.txState();
}
private static @Nullable TxMeta txStoredMeta(IgniteImpl node, UUID txId) {
InternalTable internalTable = unwrapTableImpl(node.tables().table(TABLE_NAME)).internalTable();
return bypassingThreadAssertions(() -> internalTable.txStateStorage().getTxStateStorage(0).get(txId));
}
/**
* Runs a transaction that was expectedly finished with the lock conflict exception.
*
* @param node Transaction coordinator node.
* @param rwTx A transaction to create a lock conflict with an abandoned one.
*/
private void runConflictingTransaction(IgniteImpl node, Transaction rwTx) {
RecordView view = node.tables().table(TABLE_NAME).recordView();
try {
view.upsert(rwTx, Tuple.create().set("key", 42).set("val", "val2"));
fail("Lock conflict have to be detected.");
} catch (Exception e) {
assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
log.info("Expected lock conflict.", e);
}
}
private void runRwTransactionNoError(IgniteImpl node, Transaction rwTx) {
RecordView view = node.tables().table(TABLE_NAME).recordView();
try {
view.upsert(rwTx, Tuple.create().set("key", 42).set("val", "val2"));
} catch (Exception e) {
assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
log.info("Expected lock conflict.", e);
}
}
/**
* Runs a RO transaction to trigger recovery on write intent resolutioin.
*
* @param node Transaction coordinator node.
* @param roTx A transaction to resolve write intents from the abandoned TX.
*/
private void runReadOnlyTransaction(IgniteImpl node, Transaction roTx) {
RecordView view = node.tables().table(TABLE_NAME).recordView();
try {
view.get(roTx, Tuple.create().set("key", 42));
} catch (Exception e) {
assertEquals(Transactions.ACQUIRE_LOCK_ERR, extractCodeFrom(e));
log.info("Expected lock conflict.", e);
}
}
/**
* Starts the transaction, takes a lock, and stops the transaction coordinator. The stopped node leaves the transaction in the pending
* state.
*
* @param node Transaction coordinator node.
* @return Transaction id.
* @throws InterruptedException If interrupted.
*/
private UUID startTransactionAndStopNode(IgniteImpl node) throws InterruptedException, ExecutionException {
Transaction rwTx1 = createRwTransaction(node);
String txCrdNodeId = node.id();
node.stop();
assertTrue(waitForCondition(
() -> node(0).clusterNodes().stream().filter(n -> txCrdNodeId.equals(n.id())).count() == 0,
10_000)
);
return ((InternalTransaction) rwTx1).id();
}
/**
* Creates RW the transaction.
*
* @param node Transaction coordinator node.
* @return Transaction id.
*/
private Transaction createRwTransaction(IgniteImpl node) {
RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
Transaction rwTx1 = node.transactions().begin();
view.upsert(rwTx1, Tuple.create().set("key", 42).set("val", "val1"));
return rwTx1;
}
private IgniteImpl findNode(int startRange, int endRange, Predicate<IgniteImpl> filter) {
return IntStream.range(startRange, endRange)
.mapToObj(this::node)
.filter(filter::test)
.findFirst()
.get();
}
private IgniteImpl findNodeByName(String leaseholder) {
return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
}
private IgniteImpl nonPrimaryNode(String leaseholder) {
return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
}
private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node.clock().now(),
10,
SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
return primaryReplicaFut.join();
}
private static String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
return waitAndGetPrimaryReplica(node, tblReplicationGrp).getLeaseholder();
}
private void cancelLease(IgniteImpl leaseholder, ReplicationGroupId groupId) {
StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY
.stopLeaseProlongationMessage()
.groupId(groupId)
.build();
// Just sent it to all nodes to not determine the exact placement driver active actor.
runningNodes().forEach(node -> leaseholder.sendFakeMessage(node.name(), msg));
}
}