blob: eb02e0ead26ade0d9b561f8ae9a538965b6834dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionAssignment;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionIdForTuple;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.table;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.tableId;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.txId;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.tx.TransactionMeta;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Integration tests for tx recources vacuum.
*/
@ExtendWith(SystemPropertiesExtension.class)
@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "1000")
public class ItTxResourcesVacuumTest extends ClusterPerTestIntegrationTest {
/** Table name. */
private static final String TABLE_NAME = "test_table";
private static final Tuple INITIAL_TUPLE = Tuple.create().set("key", 1L).set("val", "1");
private static final Function<Tuple, Tuple> NEXT_TUPLE = t -> Tuple.create()
.set("key", t.longValue("key") + 1)
.set("val", "" + (t.longValue("key") + 1));
private static final int REPLICAS = 2;
/** Nodes bootstrap configuration pattern. */
private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
+ " netClusterNodes: [ {} ]\n"
+ " }\n"
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " rest.port: {},\n"
+ " raft: { responseTimeout: 30000 },"
+ " compute.threadPoolSize: 1\n"
+ "}";
private ExecutorService txStateStorageExecutor = Executors.newSingleThreadExecutor();
@BeforeEach
@Override
public void setup(TestInfo testInfo) throws Exception {
super.setup(testInfo);
String zoneSql = "create zone test_zone with partitions=20, replicas=" + REPLICAS
+ ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
String sql = "create table " + TABLE_NAME + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
txStateStorageExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create("test", "tx-state-storage-test-pool", log,
ThreadOperation.STORAGE_READ));
}
@Override
@AfterEach
public void tearDown() {
shutdownAndAwaitTermination(txStateStorageExecutor, 10, TimeUnit.SECONDS);
super.tearDown();
}
@Override
protected void customizeInitParameters(InitParametersBuilder builder) {
super.customizeInitParameters(builder);
builder.clusterConfiguration("{"
+ " transaction: {"
+ " txnResourceTtl: 0"
+ " },"
+ " replication: {"
+ " rpcTimeout: 30000"
+ " },"
+ "}");
}
/**
* Returns node bootstrap config template.
*
* @return Node bootstrap config template.
*/
@Override
protected String getNodeBootstrapConfigTemplate() {
return NODE_BOOTSTRAP_CFG_TEMPLATE;
}
/**
* Simple TTL-triggered vacuum test, checking also that PENDING and FINISHING states are not removed.
*
* <ul>
* <li>Run a transaction;</li>
* <li>Run a parallel transaction;</li>
* <li>Insert values within both transactions;</li>
* <li>Commit the parallel transaction and wait for vacuum of its state;</li>
* <li>Run another parallel transaction;</li>
* <li>Check that the volatile PENDING state of the transaction is preserved;</li>
* <li>Block {@link TxFinishReplicaRequest} for the pending transaction;</li>
* <li>Start the tx commit;</li>
* <li>While the state is FINISHING, commit the parallel transaction and wait for vacuum of its state;</li>
* <li>Check that the volatile state of the transaction is preserved;</li>
* <li>Unblock {@link TxFinishReplicaRequest};</li>
* <li>Check that both volatile and persistent state is vacuumized;</li>
* <li>Check that the committed value is correct.</li>
* </ul>
*/
@Test
public void testVacuum() throws InterruptedException {
// We should test the TTL-triggered vacuum.
setTxResourceTtl(1);
IgniteImpl node = anyNode();
RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
// Put some value into the table.
Transaction tx = node.transactions().begin();
Transaction parallelTx1 = node.transactions().begin();
UUID txId = txId(tx);
UUID parallelTx1Id = txId(parallelTx1);
log.info("Test: Loading the data [tx={}].", txId);
Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
Tuple tupleForParallelTx = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, NEXT_TUPLE.apply(tuple), NEXT_TUPLE, true);
int partIdForParallelTx = partitionIdForTuple(anyNode(), TABLE_NAME, tupleForParallelTx, parallelTx1);
int partId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
Set<String> nodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), partId));
view.upsert(tx, tuple);
view.upsert(parallelTx1, tupleForParallelTx);
// Check that the volatile PENDING state of the transaction is preserved.
parallelTx1.commit();
waitForTxStateVacuum(nodes, parallelTx1Id, partIdForParallelTx, true, 10_000);
assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
Transaction parallelTx2 = node.transactions().begin();
UUID parallelTx2Id = txId(parallelTx2);
view.upsert(parallelTx2, tupleForParallelTx);
CompletableFuture<Void> finishStartedFuture = new CompletableFuture<>();
CompletableFuture<Void> finishAllowedFuture = new CompletableFuture<>();
node.dropMessages((n, msg) -> {
if (msg instanceof TxFinishReplicaRequest) {
TxFinishReplicaRequest finishRequest = (TxFinishReplicaRequest) msg;
if (finishRequest.txId().equals(txId)) {
finishStartedFuture.complete(null);
finishAllowedFuture.join();
}
}
return false;
});
Transaction roTxBefore = beginReadOnlyTx(anyNode());
CompletableFuture<Void> commitFut = runAsync(tx::commit);
assertThat(finishStartedFuture, willCompleteSuccessfully());
// While the state is FINISHING, wait 3 seconds.
assertEquals(FINISHING, volatileTxState(node, txId).txState());
parallelTx2.commit();
waitForTxStateVacuum(nodes, parallelTx2Id, partId, true, 10_000);
// Check that the volatile state of the transaction is preserved.
assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
finishAllowedFuture.complete(null);
assertThat(commitFut, willCompleteSuccessfully());
log.info("Test: Tx committed [tx={}].", txId);
Transaction roTxAfter = beginReadOnlyTx(anyNode());
waitForTxStateReplication(nodes, txId, partId, 10_000);
// Check that both volatile and persistent state is vacuumized..
waitForTxStateVacuum(txId, partId, true, 10_000);
// Trying to read the value.
Tuple keyRec = Tuple.create().set("key", tuple.longValue("key"));
checkValueReadOnly(view, roTxBefore, keyRec, null);
checkValueReadOnly(view, roTxAfter, keyRec, tuple);
}
/**
* Check that the ABANDONED transaction state is preserved until recovery.
*
* <ul>
* <li>Start a transaction from a coordinator that would be not included into commit partition group;</li>
* <li>Start a parallel transaction;</li>
* <li>Find a tuple for parallel tx that would be hosted on the same partition as a tuple for the abandoned tx;</li>
* <li>Insert values within both transactions;</li>
* <li>Commit the parallel transaction;</li>
* <li>Stop the tx coordinator;</li>
* <li>Wait for tx state of parallel tx to be vacuumized;</li>
* <li>Check that the volatile state of the transaction is preserved;</li>
* <li>Try to read the value using another transaction, which starts the tx recovery;</li>
* <li>Check that abandoned tx is rolled back and thus the value is null;</li>
* <li>Check that the abandoned transaction is recovered; its volatile and persistent states are vacuumized.</li>
* </ul>
*/
@Test
public void testAbandonedTxnsAreNotVacuumizedUntilRecovered() throws InterruptedException {
setTxResourceTtl(1);
IgniteImpl leaseholder = cluster.node(0);
Tuple tuple = findTupleToBeHostedOnNode(leaseholder, TABLE_NAME, null, INITIAL_TUPLE, NEXT_TUPLE, true);
int partId = partitionIdForTuple(anyNode(), TABLE_NAME, tuple, null);
TablePartitionId groupId = new TablePartitionId(tableId(anyNode(), TABLE_NAME), partId);
Set<String> txNodes = partitionAssignment(anyNode(), groupId);
IgniteImpl abandonedTxCoord = findNode(n -> !txNodes.contains(n.name()));
RecordView<Tuple> view = abandonedTxCoord.tables().table(TABLE_NAME).recordView();
Transaction abandonedTx = abandonedTxCoord.transactions().begin();
UUID abandonedTxId = txId(abandonedTx);
Transaction parallelTx = abandonedTxCoord.transactions().begin();
UUID parallelTxId = txId(parallelTx);
// Find a tuple hosted on the same partition.
Tuple tupleForParallelTx = tuple;
int partIdForParallelTx = -1;
while (partIdForParallelTx != partId) {
tupleForParallelTx = findTupleToBeHostedOnNode(leaseholder, TABLE_NAME, null, NEXT_TUPLE.apply(tupleForParallelTx), NEXT_TUPLE,
true);
partIdForParallelTx = partitionIdForTuple(anyNode(), TABLE_NAME, tupleForParallelTx, parallelTx);
}
view.upsert(abandonedTx, tuple);
view.upsert(parallelTx, tupleForParallelTx);
parallelTx.commit();
stopNode(abandonedTxCoord.name());
waitForTxStateVacuum(txNodes, parallelTxId, partIdForParallelTx, true, 10_000);
// Check that the volatile state of the transaction is preserved.
assertTrue(checkVolatileTxStateOnNodes(txNodes, abandonedTxId));
// Try to read the value using another transaction, which starts the tx recovery.
RecordView<Tuple> viewLh = leaseholder.tables().table(TABLE_NAME).recordView();
Tuple value = viewLh.get(null, Tuple.create().set("key", tuple.longValue("key")));
// Check that abandoned tx is rolled back and thus the value is null.
assertNull(value);
// Check that the abandoned transaction is recovered; its volatile and persistent states are vacuumized.
// Wait for it, because we don't have the recovery completion future.
waitForTxStateVacuum(txNodes, abandonedTxId, partId, true, 10_000);
}
/**
* Check that the tx state on commit partition is vacuumized only when cleanup is completed.
*
* <ul>
* <li>Start a transaction;</li>
* <li>Generate some tuple and define on which nodes it would be hosted;</li>
* <li>Choose one more node that doesn't host the first tuple and choose a tuple that will be sent on this node as primary;</li>
* <li>Upsert both tuples within a transaction;</li>
* <li>Block {@link TxCleanupMessage}-s from commit partition primary;</li>
* <li>Start a tx commit;</li>
* <li>Wait for vacuum completion on a node that doesn't host the commit partition;</li>
* <li>Unblock {@link TxCleanupMessage}-s;</li>
* <li>Wait for the tx state vacuum on the commit partition group.</li>
* </ul>
*/
@Test
@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "0")
public void testVacuumWithCleanupDelay() throws InterruptedException {
IgniteImpl node = anyNode();
RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
// Put some value into the table.
Transaction tx = node.transactions().begin();
UUID txId = txId(tx);
log.info("Test: Loading the data [tx={}].", txId);
// Generate some tuple and define on which nodes it would be hosted.
Tuple tuple0 = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple0, tx);
TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
// Some node that does not host the commit partition, will be the primary node for upserting another tuple.
IgniteImpl leaseholderForAnotherTuple = findNode(n -> !commitPartNodes.contains(n.name()));
log.info("Test: leaseholderForAnotherTuple={}", leaseholderForAnotherTuple.name());
Tuple tuple1 = findTupleToBeHostedOnNode(leaseholderForAnotherTuple, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
// Upsert both tuples within a transaction.
view.upsert(tx, tuple0);
view.upsert(tx, tuple1);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
CompletableFuture<Void> cleanupAllowed = new CompletableFuture<>();
commitPartitionLeaseholder.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage) {
cleanupStarted.complete(null);
log.info("Test: cleanup started.");
if (commitPartNodes.contains(n)) {
cleanupAllowed.join();
}
}
return false;
});
Transaction roTxBefore = beginReadOnlyTx(anyNode());
CompletableFuture<Void> commitFut = tx.commitAsync();
waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
assertThat(cleanupStarted, willCompleteSuccessfully());
// Check the vacuum result on a node that doesn't host the commit partition.
triggerVacuum();
assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), txId, commitPartId, false);
// Unblocking cleanup.
cleanupAllowed.complete(null);
assertThat(commitFut, willCompleteSuccessfully());
Transaction roTxAfter = beginReadOnlyTx(anyNode());
waitForCondition(() -> volatileTxState(commitPartitionLeaseholder, txId) != null, 10_000);
triggerVacuum();
assertTxStateVacuumized(txId, commitPartId, true);
// Trying to read the values.
Tuple key0 = Tuple.create().set("key", tuple0.longValue("key"));
Tuple key1 = Tuple.create().set("key", tuple1.longValue("key"));
checkValueReadOnly(view, roTxBefore, key0, null);
checkValueReadOnly(view, roTxAfter, key0, tuple0);
checkValueReadOnly(view, roTxBefore, key1, null);
checkValueReadOnly(view, roTxAfter, key1, tuple1);
}
/**
* Check that the tx state on commit partition is vacuumized only when cleanup is completed.
*
* <ul>
* <li>Start a transaction;</li>
* <li>Upsert a value;</li>
* <li>Block {@link TxCleanupMessage}-s;</li>
* <li>Start a tx commit;</li>
* <li>Transfer the primary replica;</li>
* <li>Unblock the {@link TxCleanupMessage}-s;</li>
* <li>Ensure that tx states are finally vacuumized.</li>
* </ul>
*/
@Test
public void testCommitPartitionPrimaryChangesBeforeVacuum() throws InterruptedException {
// We can't leave TTL as 0 here, because the primary replica is changed during cleanup, and this means
// WriteIntentSwitchReplicaRequest will be processed not on the primary. Removing tx state instantly will cause incorrect
// tx recovery and write intent switch with tx state as ABORTED.
setTxResourceTtl(1);
IgniteImpl node = anyNode();
RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
// Put some value into the table.
Transaction tx = node.transactions().begin();
UUID txId = txId(tx);
log.info("Test: Loading the data [tx={}].", txId);
Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
view.upsert(tx, tuple);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
boolean[] cleanupAllowed = new boolean[1];
commitPartitionLeaseholder.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
cleanupStarted.complete(null);
cleanupAllowedFut.join();
}
return false;
});
Transaction roTxBefore = beginReadOnlyTx(anyNode());
CompletableFuture<Void> commitFut = tx.commitAsync();
assertThat(cleanupStarted, willCompleteSuccessfully());
transferPrimary(cluster.runningNodes().collect(toSet()), commitPartGrpId, commitPartNodes::contains);
cleanupAllowedFut.complete(null);
cleanupAllowed[0] = true;
assertThat(commitFut, willCompleteSuccessfully());
log.info("Test: tx committed.");
waitForTxStateVacuum(txId, commitPartId, true, 10_000);
Transaction roTxAfter = beginReadOnlyTx(anyNode());
log.info("Test: checking values.");
// Trying to read the value.
Tuple key = Tuple.create().set("key", tuple.longValue("key"));
checkValueReadOnly(view, roTxBefore, key, null);
checkValueReadOnly(view, roTxAfter, key, tuple);
}
/**
* Check that the tx state on commit partition is vacuumized only when cleanup is completed.
*
* <ul>
* <li>Start a transaction;</li>
* <li>Upsert a tuple;</li>
* <li>Block {@link TxCleanupMessage}-s from commit partition primary;</li>
* <li>Start a tx commit;</li>
* <li>Wait for tx cleanup to start;</li>
* <li>Wait for volatile tx state vacuum;</li>
* <li>Unblock {@link TxCleanupMessage}-s;</li>
* <li>Wait for the tx state vacuum on the commit partition group.</li>
* </ul>
*/
@Test
@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "0")
public void testVacuumPersistentStateAfterCleanupDelayAndVolatileStateVacuum() throws InterruptedException {
IgniteImpl node = anyNode();
RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
// Put some value into the table.
Transaction tx = node.transactions().begin();
UUID txId = txId(tx);
log.info("Test: Loading the data [tx={}].", txId);
Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
view.upsert(tx, tuple);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
boolean[] cleanupAllowed = new boolean[1];
commitPartitionLeaseholder.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
cleanupStarted.complete(null);
cleanupAllowedFut.join();
return true;
}
return false;
});
Transaction roTxBefore = beginReadOnlyTx(anyNode());
CompletableFuture<Void> commitFut = tx.commitAsync();
waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
assertThat(cleanupStarted, willCompleteSuccessfully());
// Wait for volatile tx state vacuum. This is possible because tx finish is complete.
triggerVacuum();
assertTxStateVacuumized(txId, commitPartId, false);
log.info("Test: volatile state vacuumized");
cleanupAllowedFut.complete(null);
cleanupAllowed[0] = true;
assertThat(commitFut, willCompleteSuccessfully());
log.info("Test: commit completed.");
Transaction roTxAfter = beginReadOnlyTx(anyNode());
waitForCondition(() -> {
TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(commitPartitionLeaseholder, txId);
return txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
}, 10_000);
log.info("Test: cleanup completed.");
triggerVacuum();
assertTxStateVacuumized(txId, commitPartId, true);
// Trying to read the data.
Tuple key = Tuple.create().set("key", tuple.longValue("key"));
checkValueReadOnly(view, roTxBefore, key, null);
checkValueReadOnly(view, roTxAfter, key, tuple);
}
/**
* Checks that the tx recovery doesn't change tx finish result from COMMITTED to ABORTED if it once saved in the persistent storage.
*
* <ul>
* <li>Start a transaction tx0;</li>
* <li>Upsert some value;</li>
* <li>Block {@link TxCleanupMessage}-s;</li>
* <li>Start the commit of tx0 and with for tx state COMMITTED to be replicated in persistent storage;</li>
* <li>Stop the tx0's coordinator;</li>
* <li>Wait for tx0's state vacuum;</li>
* <li>Try to get the data that has been committed by tx0, ensure the data is correct.</li>
* </ul>
*/
@Test
public void testRecoveryAfterPersistentStateVacuumized() throws InterruptedException {
// This node isn't going to be stopped, so let it be node 0.
IgniteImpl commitPartitionLeaseholder = cluster.node(0);
Tuple tuple0 = findTupleToBeHostedOnNode(commitPartitionLeaseholder, TABLE_NAME, null, INITIAL_TUPLE, NEXT_TUPLE, true);
int commitPartId = partitionIdForTuple(commitPartitionLeaseholder, TABLE_NAME, tuple0, null);
Set<String> commitPartitionNodes = partitionAssignment(commitPartitionLeaseholder,
new TablePartitionId(tableId(commitPartitionLeaseholder, TABLE_NAME), commitPartId));
// Choose some node that doesn't host the partition as a tx coordinator.
IgniteImpl coord0 = findNode(n -> !commitPartitionNodes.contains(n.name()));
RecordView<Tuple> view0 = coord0.tables().table(TABLE_NAME).recordView();
// Put some value into the table.
Transaction tx0 = coord0.transactions().begin();
UUID txId0 = txId(tx0);
log.info("Test: Transaction 0 [tx={}].", txId0);
log.info("Test: Commit partition of transaction 0 [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(),
commitPartitionNodes);
view0.upsert(tx0, tuple0);
CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
commitPartitionLeaseholder.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage) {
cleanupStarted.complete(null);
return false;
}
return false;
});
log.info("Test: Committing the transaction 0 [tx={}].", txId0);
tx0.commitAsync();
// Check that the final tx state COMMITTED is saved to the persistent tx storage.
assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n -> commitPartitionNodes.contains(n.name())).allMatch(n -> {
TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
return meta != null && meta.txState() == COMMITTED;
}), 10_000));
assertThat(cleanupStarted, willCompleteSuccessfully());
// Stop the first transaction coordinator.
stopNode(coord0.name());
// No cleanup happened, waiting for vacuum on the remaining nodes that participated on tx0.
waitForTxStateVacuum(txId0, commitPartId, true, 10_000);
// Preparing to run another tx.
IgniteImpl coord1 = anyNode();
RecordView<Tuple> view1 = coord1.tables().table(TABLE_NAME).recordView();
// Another tx should get the data committed by tx 0.
Tuple keyTuple = Tuple.create().set("key", tuple0.longValue("key"));
Tuple tx0Data = view1.get(null, keyTuple);
assertEquals(tuple0.longValue("key"), tx0Data.longValue("key"));
assertEquals(tuple0.stringValue("val"), tx0Data.stringValue("val"));
// Waiting for vacuum, because there is no recovery future here.
waitForTxStateVacuum(txId0, commitPartId, true, 10_000);
}
/**
* Check that RO txns read the correct data consistent with commit timestamps.
*
* <ul>
* <li>For this test, create another zone and table with number of replicas that is equal to number of nodes;</li>
* <li>Start RO tx 1;</li>
* <li>Upsert (k1, v1) within RW tx 1 and commit it;</li>
* <li>Start RO tx 2;</li>
* <li>Upsert (k1, v2) within RW tx 2 and commit it;</li>
* <li>Start RO tx 3;</li>
* <li>Wait for vacuum of the states of RW tx 1 and RW tx 2;</li>
* <li>Read the data by k1 within RO tx 1, should be null;</li>
* <li>Read the data by k1 within RO tx 2, should be v1;</li>
* <li>Read the data by k1 within RO tx 3, should be v2.</li>
* </ul>
*/
@Test
public void testRoReadTheCorrectDataInBetween() {
setTxResourceTtl(0);
IgniteImpl node = anyNode();
String tableName = TABLE_NAME + "_1";
// For this test, create another zone and table with number of replicas that is equal to number of nodes.
String zoneSql = "create zone test_zone_1 with partitions=20, replicas=" + initialNodes()
+ ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
String sql = "create table " + tableName + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE_1'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
Transaction roTx1 = beginReadOnlyTx(node);
Tuple t1 = Tuple.create().set("key", 1L).set("val", "val1");
Tuple t2 = Tuple.create().set("key", 1L).set("val", "val2");
RecordView<Tuple> view = table(node, tableName).recordView();
Transaction rwTx1 = node.transactions().begin();
view.upsert(rwTx1, t1);
rwTx1.commit();
UUID rwTxId1 = txId(rwTx1);
Transaction roTx2 = beginReadOnlyTx(node);
Transaction rwTx2 = node.transactions().begin();
view.upsert(rwTx2, t2);
rwTx2.commit();
UUID rwTxId2 = txId(rwTx1);
Transaction roTx3 = beginReadOnlyTx(node);
triggerVacuum();
assertTxStateVacuumized(rwTxId1, tableName, partitionIdForTuple(node, tableName, t1, rwTx1), true);
assertTxStateVacuumized(rwTxId2, tableName, partitionIdForTuple(node, tableName, t2, rwTx2), true);
Tuple keyRec = Tuple.create().set("key", 1L);
checkValueReadOnly(view, roTx1, keyRec, null);
checkValueReadOnly(view, roTx2, keyRec, t1);
checkValueReadOnly(view, roTx3, keyRec, t2);
}
private static Transaction beginReadOnlyTx(IgniteImpl node) {
return node.transactions().begin(new TransactionOptions().readOnly(true));
}
/**
* Check value using given read only tx.
*
* @param view Record view.
* @param readOnlyTx RO tx.
* @param keyTuple Key tuple.
* @param expected Expected tuple.
*/
private static void checkValueReadOnly(RecordView<Tuple> view, Transaction readOnlyTx, Tuple keyTuple, @Nullable Tuple expected) {
Tuple actual = view.get(readOnlyTx, keyTuple);
if (expected == null) {
assertNull(actual);
} else {
assertEquals(expected.stringValue("val"), actual.stringValue("val"));
}
}
private void setTxResourceTtl(long ttl) {
CompletableFuture<Void> changeFuture = anyNode().clusterConfiguration().change(c ->
c.changeRoot(TransactionConfiguration.KEY).changeTxnResourceTtl(ttl));
assertThat(changeFuture, willCompleteSuccessfully());
}
/**
* To use it, set tx resource TTL should be set to {@code 0}, see {@link #setTxResourceTtl(long)}.
*/
private void triggerVacuum() {
runningNodes().forEach(node -> {
log.info("Test: triggering vacuum manually on node: " + node.name());
CompletableFuture<Void> vacuumFut = node.txManager().vacuum();
assertThat(vacuumFut, willCompleteSuccessfully());
});
}
private boolean checkVolatileTxStateOnNodes(Set<String> nodeConsistentIds, UUID txId) {
return cluster.runningNodes()
.filter(n -> nodeConsistentIds.contains(n.name()))
.allMatch(n -> volatileTxState(n, txId) != null);
}
private boolean checkPersistentTxStateOnNodes(Set<String> nodeConsistentIds, UUID txId, int partId) {
return cluster.runningNodes()
.filter(n -> nodeConsistentIds.contains(n.name()))
.allMatch(n -> persistentTxState(n, txId, partId) != null);
}
/**
* Waits for persistent tx state to be replicated on the given nodes.
*
* @param nodeConsistentIds Node names.
* @param txId Transaction id.
* @param partId Commit partition id.
* @param timeMs Time to wait.
*/
private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID txId, int partId, long timeMs)
throws InterruptedException {
assertTrue(waitForCondition(() -> checkPersistentTxStateOnNodes(nodeConsistentIds, txId, partId), timeMs));
}
/**
* Waits for vacuum of volatile (and if needed, persistent) state of the given tx on all nodes of the cluster.
*
* @param txId Transaction id.
* @param partId Commit partition id to check the persistent tx state storage of this partition.
* @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
* @param timeMs Time to wait.
*/
private void waitForTxStateVacuum(UUID txId, int partId, boolean checkPersistent, long timeMs) throws InterruptedException {
waitForTxStateVacuum(cluster.runningNodes().map(IgniteImpl::name).collect(toSet()), txId, partId, checkPersistent, timeMs);
}
/**
* Waits for vacuum of volatile (and if needed, persistent) state of the given tx on the given nodes.
*
* @param nodeConsistentIds Node names.
* @param txId Transaction id.
* @param partId Commit partition id to check the persistent tx state storage of this partition.
* @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
* @param timeMs Time to wait.
*/
private void waitForTxStateVacuum(Set<String> nodeConsistentIds, UUID txId, int partId, boolean checkPersistent, long timeMs)
throws InterruptedException {
boolean r = waitForCondition(() -> txStateIsAbsent(nodeConsistentIds, txId, TABLE_NAME, partId, checkPersistent, false), timeMs);
if (!r) {
logCurrentTxState(nodeConsistentIds, txId, TABLE_NAME, partId);
}
assertTrue(r);
}
/**
* Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
*
* @param txId Transaction id.
* @param partId Commit partition id to check the persistent tx state storage of this partition.
* @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
*/
private void assertTxStateVacuumized(UUID txId, int partId, boolean checkPersistent) {
assertTxStateVacuumized(txId, TABLE_NAME, partId, checkPersistent);
}
/**
* Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
*
* @param txId Transaction id.
* @param tableName Table name of the table that commit partition belongs to.
* @param partId Commit partition id to check the persistent tx state storage of this partition.
* @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
*/
private void assertTxStateVacuumized(UUID txId, String tableName, int partId, boolean checkPersistent) {
Set<String> allNodes = cluster.runningNodes().map(IgniteImpl::name).collect(toSet());
assertTxStateVacuumized(allNodes, txId, tableName, partId, checkPersistent);
}
/**
* Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on the given nodes. Uses default
* {@link #TABLE_NAME}.
*
* @param nodeConsistentIds Node names.
* @param txId Transaction id.
* @param partId Commit partition id to check the persistent tx state storage of this partition.
* @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
*/
private void assertTxStateVacuumized(Set<String> nodeConsistentIds, UUID txId, int partId, boolean checkPersistent) {
assertTxStateVacuumized(nodeConsistentIds, txId, TABLE_NAME, partId, checkPersistent);
}
/**
* Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on the given nodes.
*
* @param nodeConsistentIds Node names.
* @param txId Transaction id.
* @param tableName Table name of the table that commit partition belongs to.
* @param partId Commit partition id to check the persistent tx state storage of this partition.
* @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
*/
private void assertTxStateVacuumized(Set<String> nodeConsistentIds, UUID txId, String tableName, int partId, boolean checkPersistent) {
boolean result = txStateIsAbsent(nodeConsistentIds, txId, tableName, partId, checkPersistent, true);
if (!result) {
triggerVacuum();
result = txStateIsAbsent(nodeConsistentIds, txId, tableName, partId, checkPersistent, true);
if (!result) {
logCurrentTxState(nodeConsistentIds, txId, tableName, partId);
}
}
assertTrue(result);
}
/**
* Checks whether the tx state is absent on all of the given nodes.
*
* @param nodeConsistentIds Set of node names to check.
* @param txId Transaction id.
* @param tableName Table name of the table that commit partition belongs to.
* @param partId Commit partition id.
* @param checkPersistent Whether the persistent state should be checked.
* @param checkCpPrimaryOnly If {@code} true, the persistent state should be checked only on the commit partition primary,
* otherwise it would be checked on every given node.
* @return {@code true} if tx state is absent, {@code false} otherwise. Call {@link #logCurrentTxState(Set, UUID, String, int)}
* for details.
*/
private boolean txStateIsAbsent(
Set<String> nodeConsistentIds,
UUID txId,
String tableName,
int partId,
boolean checkPersistent,
boolean checkCpPrimaryOnly
) {
boolean result = true;
String cpPrimaryId = null;
if (checkCpPrimaryOnly) {
IgniteImpl node = anyNode();
TablePartitionId tablePartitionId = new TablePartitionId(tableId(node, tableName), partId);
CompletableFuture<ReplicaMeta> replicaFut = node.placementDriver().getPrimaryReplica(tablePartitionId, node.clock().now());
assertThat(replicaFut, willCompleteSuccessfully());
ReplicaMeta replicaMeta = replicaFut.join();
// The test doesn't make sense if there is no primary right now.
assertNotNull(replicaMeta);
cpPrimaryId = replicaMeta.getLeaseholderId();
}
for (Iterator<IgniteImpl> iterator = cluster.runningNodes().iterator(); iterator.hasNext();) {
IgniteImpl node = iterator.next();
if (!nodeConsistentIds.contains(node.name())) {
continue;
}
result = result
&& volatileTxState(node, txId) == null
&& (!checkPersistent || !node.id().equals(cpPrimaryId) || persistentTxState(node, txId, partId) == null);
}
return result;
}
private void logCurrentTxState(Set<String> nodeConsistentIds, UUID txId, String table, int partId) {
cluster.runningNodes().filter(n -> nodeConsistentIds.contains(n.name())).forEach(node -> {
log.info("Test: volatile state [tx={}, node={}, state={}].", txId, node.name(), volatileTxState(node, txId));
log.info("Test: persistent state [tx={}, node={}, state={}].", txId, node.name(), persistentTxState(node, txId, table, partId));
});
}
private IgniteImpl anyNode() {
return runningNodes().findFirst().orElseThrow();
}
@Nullable
private static TransactionMeta volatileTxState(IgniteImpl node, UUID txId) {
TxManagerImpl txManager = (TxManagerImpl) node.txManager();
return txManager.stateMeta(txId);
}
@Nullable
private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, int partId) {
return persistentTxState(node, txId, TABLE_NAME, partId);
}
@Nullable
private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, String tableName, int partId) {
TransactionMeta[] meta = new TransactionMeta[1];
Future f = txStateStorageExecutor.submit(() -> {
TxStateStorage txStateStorage = table(node, tableName).internalTable().txStateStorage().getTxStateStorage(partId);
assertNotNull(txStateStorage);
meta[0] = txStateStorage.get(txId);
});
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return meta[0];
}
private IgniteImpl findNode(Predicate<IgniteImpl> filter) {
return cluster.runningNodes()
.filter(n -> n != null && filter.test(n))
.findFirst()
.get();
}
}