IGNITE-22062 RO transaction does not close cursor when exception is thrown (#3620)
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 768e7a2..2308c41 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -28,7 +28,6 @@
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -47,14 +46,6 @@
return internalTbl.scan(part, tx.id(), internalTbl.CLOCK.now(), mock(ClusterNode.class), tx.coordinatorId());
}
- // TODO: IGNITE-17666 Use super test as is.
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17666")
- @Override
- @Test
- public void testExceptionRowScanCursorHasNext() throws Exception {
- super.testExceptionRowScanCursorHasNext();
- }
-
@Override
protected InternalTransaction startTx() {
return internalTbl.txManager().begin(HYBRID_TIMESTAMP_TRACKER, true);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index fddd5a6..4987f37 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -26,6 +26,7 @@
import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
import static org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
+import static org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -1535,7 +1536,6 @@
ReadOnlyScanRetrieveBatchReplicaRequest request = tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(tablePartitionId)
.readTimestampLong(readTimestamp.longValue())
- // TODO: IGNITE-17666 Close cursor tx finish.
.transactionId(txId)
.scanId(scanId)
.batchSize(batchSize)
@@ -1550,8 +1550,14 @@
return replicaSvc.invoke(recipientNode, request);
},
- // TODO: IGNITE-17666 Close cursor tx finish.
- (intentionallyClose, fut) -> completeScan(txId, tablePartitionId, fut, recipientNode, intentionallyClose),
+ (intentionallyClose, scanId, th) -> completeScan(
+ txId,
+ tablePartitionId,
+ scanId,
+ th,
+ recipientNode,
+ intentionallyClose || th != null
+ ),
new ReadOnlyInflightBatchRequestTracker(transactionInflights, txId)
);
}
@@ -1595,21 +1601,22 @@
columnsToInclude,
implicit
),
- (intentionallyClose, fut) -> {
+ (intentionallyClose, scanId, th) -> {
CompletableFuture<Void> opFut;
if (implicit) {
- opFut = fut.thenApply(cursorId -> null);
+ opFut = completedOrFailedFuture(null, th);
} else {
var replicationGrpId = new TablePartitionId(tableId, partId);
opFut = tx.enlistedNodeAndConsistencyToken(replicationGrpId) != null ? completeScan(
tx.id(),
replicationGrpId,
- fut,
+ scanId,
+ th,
tx.enlistedNodeAndConsistencyToken(replicationGrpId).get1(),
intentionallyClose
- ) : fut.thenApply(cursorId -> null);
+ ) : completedOrFailedFuture(null, th);
}
return postEnlist(opFut, intentionallyClose, actualTx, implicit && !intentionallyClose);
@@ -1655,8 +1662,7 @@
return replicaSvc.invoke(recipient.node(), request);
},
- // TODO: IGNITE-17666 Close cursor tx finish.
- (intentionallyClose, fut) -> completeScan(txId, tablePartitionId, fut, recipient.node(), intentionallyClose),
+ (intentionallyClose, scanId, th) -> completeScan(txId, tablePartitionId, scanId, th, recipient.node(), intentionallyClose),
READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER
);
}
@@ -1666,32 +1672,47 @@
*
* @param txId Transaction id.
* @param replicaGrpId Replication group id.
- * @param scanIdFut Future to scan id.
+ * @param scanId Scan id.
+ * @param th An exception that may occur in the scan procedure or {@code null} when the procedure passes without an exception.
* @param recipientNode Server node where the scan was started.
- * @param intentionallyClose The flag is true when the scan was intentionally closed on the initiator side and false when the
- * scan cursor has no more entries to read.
+ * @param explicitCloseCursor True when the cursor should be closed explicitly.
* @return The future.
*/
private CompletableFuture<Void> completeScan(
UUID txId,
ReplicationGroupId replicaGrpId,
- CompletableFuture<Long> scanIdFut,
+ Long scanId,
+ Throwable th,
ClusterNode recipientNode,
- boolean intentionallyClose
+ boolean explicitCloseCursor
) {
- return scanIdFut.thenCompose(scanId -> {
- if (intentionallyClose) {
- ScanCloseReplicaRequest scanCloseReplicaRequest = tableMessagesFactory.scanCloseReplicaRequest()
- .groupId(replicaGrpId)
- .transactionId(txId)
- .scanId(scanId)
- .build();
+ CompletableFuture<Void> closeFut = nullCompletedFuture();
- return replicaSvc.invoke(recipientNode, scanCloseReplicaRequest);
+ if (explicitCloseCursor) {
+ ScanCloseReplicaRequest scanCloseReplicaRequest = tableMessagesFactory.scanCloseReplicaRequest()
+ .groupId(replicaGrpId)
+ .transactionId(txId)
+ .scanId(scanId)
+ .build();
+
+ closeFut = replicaSvc.invoke(recipientNode, scanCloseReplicaRequest);
+ }
+
+ return closeFut.handle((unused, throwable) -> {
+ CompletableFuture<Void> fut = nullCompletedFuture();
+
+ if (th != null) {
+ if (throwable != null) {
+ th.addSuppressed(throwable);
+ }
+
+ fut = failedFuture(th);
+ } else if (throwable != null) {
+ fut = failedFuture(throwable);
}
- return nullCompletedFuture();
- });
+ return fut;
+ }).thenCompose(Function.identity());
}
/**
@@ -1882,7 +1903,7 @@
private final BiFunction<Long, Integer, CompletableFuture<Collection<BinaryRow>>> retrieveBatch;
/** The closure will be invoked before the cursor closed. */
- BiFunction<Boolean, CompletableFuture<Long>, CompletableFuture<Void>> onClose;
+ IgniteTriFunction<Boolean, Long, Throwable, CompletableFuture<Void>> onClose;
/** True when the publisher has a subscriber, false otherwise. */
private final AtomicBoolean subscribed;
@@ -1899,7 +1920,7 @@
*/
PartitionScanPublisher(
BiFunction<Long, Integer, CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
- BiFunction<Boolean, CompletableFuture<Long>, CompletableFuture<Void>> onClose,
+ IgniteTriFunction<Boolean, Long, Throwable, CompletableFuture<Void>> onClose,
InflightBatchRequestTracker inflightBatchRequestTracker
) {
this.retrieveBatch = retrieveBatch;
@@ -1998,7 +2019,7 @@
return;
}
- onClose.apply(intentionallyClose, t == null ? completedFuture(scanId) : failedFuture(t)).whenComplete((ignore, th) -> {
+ onClose.apply(intentionallyClose, scanId, t).whenComplete((ignore, th) -> {
if (th != null) {
subscriber.onError(th);
} else {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 34affae..2df22fd 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -83,7 +83,6 @@
TablePartitionId tablePartitionId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken
) {
- // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
return null;
}
@@ -103,7 +102,6 @@
}
@Override
- // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
protected CompletableFuture<Void> finish(boolean commit) {
return finish(commit, readTimestamp);
}