IGNITE-22147 Fixed flaky ItTxResourcesVacuumTest.testRecoveryAfterPersistentStateVacuumized (#3689)
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index ccb91a8..95f2408 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -24,7 +24,6 @@
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
@@ -229,7 +228,7 @@
if (finishRequest.txId().equals(txId)) {
finishStartedFuture.complete(null);
- finishAllowedFuture.join();
+ joinWithTimeout(finishAllowedFuture);
}
}
@@ -250,7 +249,7 @@
// Check that the volatile state of the transaction is preserved.
assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
- finishAllowedFuture.complete(null);
+ assertTrue(finishAllowedFuture.complete(null));
assertThat(commitFut, willCompleteSuccessfully());
@@ -382,7 +381,8 @@
Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
- log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+ log.info("Test: Commit partition [part={}, leaseholder={}, hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+ commitPartNodes);
// Some node that does not host the commit partition, will be the primary node for upserting another tuple.
IgniteImpl leaseholderForAnotherTuple = findNode(n -> !commitPartNodes.contains(n.name()));
@@ -405,7 +405,7 @@
log.info("Test: cleanup started.");
if (commitPartNodes.contains(n)) {
- cleanupAllowed.join();
+ joinWithTimeout(cleanupAllowed);
}
}
@@ -425,13 +425,13 @@
assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), txId, commitPartId, false);
// Unblocking cleanup.
- cleanupAllowed.complete(null);
+ assertTrue(cleanupAllowed.complete(null));
assertThat(commitFut, willCompleteSuccessfully());
Transaction roTxAfter = beginReadOnlyTx(anyNode());
- waitForCondition(() -> volatileTxState(commitPartitionLeaseholder, txId) != null, 10_000);
+ waitForCleanupCompletion(commitPartNodes, txId);
triggerVacuum();
assertTxStateVacuumized(txId, commitPartId, true);
@@ -498,7 +498,7 @@
if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
cleanupStarted.complete(null);
- cleanupAllowedFut.join();
+ joinWithTimeout(cleanupAllowedFut);
}
return false;
@@ -512,7 +512,7 @@
transferPrimary(cluster.runningNodes().collect(toSet()), commitPartGrpId, commitPartNodes::contains);
- cleanupAllowedFut.complete(null);
+ assertTrue(cleanupAllowedFut.complete(null));
cleanupAllowed[0] = true;
@@ -570,7 +570,8 @@
Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
- log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+ log.info("Test: Commit partition [part={}, leaseholder={}, hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+ commitPartNodes);
view.upsert(tx, tuple);
@@ -578,17 +579,20 @@
CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
boolean[] cleanupAllowed = new boolean[1];
- commitPartitionLeaseholder.dropMessages((n, msg) -> {
+ // Cleanup may be triggered by the primary replica reelection as well.
+ runningNodes().filter(n -> commitPartNodes.contains(n.name())).forEach(nd -> nd.dropMessages((n, msg) -> {
if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
cleanupStarted.complete(null);
- cleanupAllowedFut.join();
+ log.warn("Test: cleanup started.");
- return true;
+ joinWithTimeout(cleanupAllowedFut);
+
+ log.info("Test: cleanup resumed.");
}
return false;
- });
+ }));
Transaction roTxBefore = beginReadOnlyTx(anyNode());
@@ -596,6 +600,8 @@
waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+ log.info("Test: state replicated.");
+
assertThat(cleanupStarted, willCompleteSuccessfully());
// Wait for volatile tx state vacuum. This is possible because tx finish is complete.
@@ -604,7 +610,7 @@
log.info("Test: volatile state vacuumized");
- cleanupAllowedFut.complete(null);
+ assertTrue(cleanupAllowedFut.complete(null));
cleanupAllowed[0] = true;
@@ -614,11 +620,7 @@
Transaction roTxAfter = beginReadOnlyTx(anyNode());
- waitForCondition(() -> {
- TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(commitPartitionLeaseholder, txId);
-
- return txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
- }, 10_000);
+ waitForCleanupCompletion(commitPartNodes, txId);
log.info("Test: cleanup completed.");
@@ -689,13 +691,7 @@
tx0.commitAsync();
- // Check that the final tx state COMMITTED is saved to the persistent tx storage.
- assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n -> commitPartitionNodes.contains(n.name())).allMatch(n -> {
- TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
-
- return meta != null && meta.txState() == COMMITTED;
- }), 10_000));
-
+ // Cleanup starts not earlier than the finish command is applied to commit partition group.
assertThat(cleanupStarted, willCompleteSuccessfully());
// Stop the first transaction coordinator.
@@ -737,8 +733,6 @@
*/
@Test
public void testRoReadTheCorrectDataInBetween() {
- setTxResourceTtl(0);
-
IgniteImpl node = anyNode();
String tableName = TABLE_NAME + "_1";
@@ -885,6 +879,28 @@
}
/**
+ * Wait for cleanup completion timestamp on any node of commit partition group.
+ *
+ * @param commitPartitionNodeNames Node names of nodes in commit partition group.
+ * @param txId Transaction id.
+ */
+ private void waitForCleanupCompletion(Set<String> commitPartitionNodeNames, UUID txId) throws InterruptedException {
+ Set<IgniteImpl> commitPartitionNodes = runningNodes().filter(n -> commitPartitionNodeNames.contains(n.name())).collect(toSet());
+
+ assertTrue(waitForCondition(() -> {
+ boolean res = false;
+
+ for (IgniteImpl node : commitPartitionNodes) {
+ TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(node, txId);
+
+ res = res || txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
+ }
+
+ return res;
+ }, 10_000));
+ }
+
+ /**
* Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
*
* @param txId Transaction id.
@@ -1051,4 +1067,14 @@
.findFirst()
.get();
}
+
+ private void joinWithTimeout(CompletableFuture<?> future) {
+ future.orTimeout(60, TimeUnit.SECONDS)
+ .exceptionally(e -> {
+ log.error("Could not wait for the future.", e);
+
+ return null;
+ })
+ .join();
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 56f1813..1df76b7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -726,7 +726,9 @@
commit ? COMMITTED : ABORTED,
old == null ? null : old.txCoordinatorId(),
old == null ? partId : old.commitPartitionId(),
- commit ? commitTimestamp : null
+ commit ? commitTimestamp : null,
+ old == null ? null : old.initialVacuumObservationTimestamp(),
+ old == null ? null : old.cleanupCompletionTimestamp()
));
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index aa3aab5..52fef60 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -3922,7 +3922,9 @@
txState,
old == null ? null : old.txCoordinatorId(),
old == null ? null : old.commitPartitionId(),
- txState == COMMITTED ? commitTimestamp : null
+ txState == COMMITTED ? commitTimestamp : null,
+ old == null ? null : old.initialVacuumObservationTimestamp(),
+ old == null ? null : old.cleanupCompletionTimestamp()
));
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
index f73e9c9..1676be8 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
@@ -57,12 +57,12 @@
public void start() {
messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> {
if (msg instanceof FinishedTransactionsBatchMessage) {
- processTxCleanup((FinishedTransactionsBatchMessage) msg);
+ processFinishedTransactionsBatchMessage((FinishedTransactionsBatchMessage) msg);
}
});
}
- private void processTxCleanup(FinishedTransactionsBatchMessage closeCursorsMessage) {
+ private void processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMessage closeCursorsMessage) {
asyncExecutor.execute(() -> closeCursorsMessage.transactions().forEach(resourcesRegistry::close));
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 7af5d71..9796727 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -139,7 +139,7 @@
if (ex == null) {
msg = prepareResponse();
} else {
- msg = prepareErrorResponse(ex);
+ msg = prepareErrorResponse(txCleanupMessage.txId(), ex);
// Run durable cleanup for the partitions that we failed to cleanup properly.
// No need to wait on this future.
@@ -178,9 +178,10 @@
.build();
}
- private NetworkMessage prepareErrorResponse(Throwable th) {
+ private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
return FACTORY
.txCleanupMessageErrorResponse()
+ .txId(txId)
.throwable(th)
.timestampLong(clockService.nowLong())
.build();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 7c43ebf..dcf97ff 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -32,12 +32,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.util.CompletableFutures;
@@ -47,6 +50,9 @@
* Sends TX Cleanup request.
*/
public class TxCleanupRequestSender {
+ /** Logger. */
+ private final IgniteLogger log = Loggers.forClass(TxCleanupRequestSender.class);
+
/** Placement driver helper. */
private final PlacementDriverHelper placementDriverHelper;
@@ -86,6 +92,12 @@
if (result != null) {
onCleanupReplicated(result);
}
+
+ if (msg instanceof TxCleanupMessageErrorResponse) {
+ TxCleanupMessageErrorResponse response = (TxCleanupMessageErrorResponse) msg;
+
+ log.warn("Exception happened during transaction cleanup [txId={}].", response.throwable(), response.txId());
+ }
}
});
}
@@ -168,6 +180,9 @@
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
+ // Start tracking the partitions we want to learn the replication confirmation from.
+ writeIntentsReplicated.put(txId, new CleanupContext(new HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED));
+
return placementDriverHelper.findPrimaryReplicas(partitionIds)
.thenCompose(partitionData -> {
cleanupPartitionsWithoutPrimary(commit, commitTimestamp, txId, partitionData.partitionsWithoutPrimary);
@@ -257,7 +272,7 @@
*/
private final TxState txState;
- public CleanupContext(Set<TablePartitionId> partitions, TxState txState) {
+ private CleanupContext(Set<TablePartitionId> partitions, TxState txState) {
this.partitions = partitions;
this.txState = txState;
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
index 5be7788..8539742 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.tx.message;
+import java.util.UUID;
import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
@@ -26,6 +27,13 @@
@Transferable(TxMessageGroup.TX_CLEANUP_MSG_ERR_RESPONSE)
public interface TxCleanupMessageErrorResponse extends TxCleanupMessageResponse {
/**
+ * Transaction id.
+ *
+ * @return Transaction id.
+ */
+ UUID txId();
+
+ /**
* Returns a {@link Throwable} that was thrown during handling a lock release message.
*
* @return {@link Throwable} that was thrown during handling a lock release message.