IGNITE-22062 RO transaction does not close cursor when exception is thrown (#3620)

diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 768e7a2..2308c41 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -28,7 +28,6 @@
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -47,14 +46,6 @@
         return internalTbl.scan(part, tx.id(), internalTbl.CLOCK.now(), mock(ClusterNode.class), tx.coordinatorId());
     }
 
-    // TODO: IGNITE-17666 Use super test as is.
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17666")
-    @Override
-    @Test
-    public void testExceptionRowScanCursorHasNext() throws Exception {
-        super.testExceptionRowScanCursorHasNext();
-    }
-
     @Override
     protected InternalTransaction startTx() {
         return internalTbl.txManager().begin(HYBRID_TIMESTAMP_TRACKER, true);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index fddd5a6..4987f37 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -26,6 +26,7 @@
 import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
 import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
 import static org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
+import static org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
 import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
@@ -1535,7 +1536,6 @@
                     ReadOnlyScanRetrieveBatchReplicaRequest request = tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
                             .groupId(tablePartitionId)
                             .readTimestampLong(readTimestamp.longValue())
-                            // TODO: IGNITE-17666 Close cursor tx finish.
                             .transactionId(txId)
                             .scanId(scanId)
                             .batchSize(batchSize)
@@ -1550,8 +1550,14 @@
 
                     return replicaSvc.invoke(recipientNode, request);
                 },
-                // TODO: IGNITE-17666 Close cursor tx finish.
-                (intentionallyClose, fut) -> completeScan(txId, tablePartitionId, fut, recipientNode, intentionallyClose),
+                (intentionallyClose, scanId, th) -> completeScan(
+                        txId,
+                        tablePartitionId,
+                        scanId,
+                        th,
+                        recipientNode,
+                        intentionallyClose || th != null
+                ),
                 new ReadOnlyInflightBatchRequestTracker(transactionInflights, txId)
         );
     }
@@ -1595,21 +1601,22 @@
                         columnsToInclude,
                         implicit
                 ),
-                (intentionallyClose, fut) -> {
+                (intentionallyClose, scanId, th) -> {
                     CompletableFuture<Void> opFut;
 
                     if (implicit) {
-                        opFut = fut.thenApply(cursorId -> null);
+                        opFut = completedOrFailedFuture(null, th);
                     } else {
                         var replicationGrpId = new TablePartitionId(tableId, partId);
 
                         opFut = tx.enlistedNodeAndConsistencyToken(replicationGrpId) != null ? completeScan(
                                 tx.id(),
                                 replicationGrpId,
-                                fut,
+                                scanId,
+                                th,
                                 tx.enlistedNodeAndConsistencyToken(replicationGrpId).get1(),
                                 intentionallyClose
-                        ) : fut.thenApply(cursorId -> null);
+                        ) : completedOrFailedFuture(null, th);
                     }
 
                     return postEnlist(opFut, intentionallyClose, actualTx, implicit && !intentionallyClose);
@@ -1655,8 +1662,7 @@
 
                     return replicaSvc.invoke(recipient.node(), request);
                 },
-                // TODO: IGNITE-17666 Close cursor tx finish.
-                (intentionallyClose, fut) -> completeScan(txId, tablePartitionId, fut, recipient.node(), intentionallyClose),
+                (intentionallyClose, scanId, th) -> completeScan(txId, tablePartitionId, scanId, th, recipient.node(), intentionallyClose),
                 READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER
         );
     }
@@ -1666,32 +1672,47 @@
      *
      * @param txId Transaction id.
      * @param replicaGrpId Replication group id.
-     * @param scanIdFut Future to scan id.
+     * @param scanId Scan id.
+     * @param th An exception that may occur in the scan procedure or {@code null} when the procedure passes without an exception.
      * @param recipientNode Server node where the scan was started.
-     * @param intentionallyClose The flag is true when the scan was intentionally closed on the initiator side and false when the
-     *         scan cursor has no more entries to read.
+     * @param explicitCloseCursor True when the cursor should be closed explicitly.
      * @return The future.
      */
     private CompletableFuture<Void> completeScan(
             UUID txId,
             ReplicationGroupId replicaGrpId,
-            CompletableFuture<Long> scanIdFut,
+            Long scanId,
+            Throwable th,
             ClusterNode recipientNode,
-            boolean intentionallyClose
+            boolean explicitCloseCursor
     ) {
-        return scanIdFut.thenCompose(scanId -> {
-            if (intentionallyClose) {
-                ScanCloseReplicaRequest scanCloseReplicaRequest = tableMessagesFactory.scanCloseReplicaRequest()
-                        .groupId(replicaGrpId)
-                        .transactionId(txId)
-                        .scanId(scanId)
-                        .build();
+        CompletableFuture<Void> closeFut = nullCompletedFuture();
 
-                return replicaSvc.invoke(recipientNode, scanCloseReplicaRequest);
+        if (explicitCloseCursor) {
+            ScanCloseReplicaRequest scanCloseReplicaRequest = tableMessagesFactory.scanCloseReplicaRequest()
+                    .groupId(replicaGrpId)
+                    .transactionId(txId)
+                    .scanId(scanId)
+                    .build();
+
+            closeFut = replicaSvc.invoke(recipientNode, scanCloseReplicaRequest);
+        }
+
+        return closeFut.handle((unused, throwable) -> {
+            CompletableFuture<Void> fut = nullCompletedFuture();
+
+            if (th != null) {
+                if (throwable != null) {
+                    th.addSuppressed(throwable);
+                }
+
+                fut = failedFuture(th);
+            } else if (throwable != null) {
+                fut = failedFuture(throwable);
             }
 
-            return nullCompletedFuture();
-        });
+            return fut;
+        }).thenCompose(Function.identity());
     }
 
     /**
@@ -1882,7 +1903,7 @@
         private final BiFunction<Long, Integer, CompletableFuture<Collection<BinaryRow>>> retrieveBatch;
 
         /** The closure will be invoked before the cursor closed. */
-        BiFunction<Boolean, CompletableFuture<Long>, CompletableFuture<Void>> onClose;
+        IgniteTriFunction<Boolean, Long, Throwable, CompletableFuture<Void>> onClose;
 
         /** True when the publisher has a subscriber, false otherwise. */
         private final AtomicBoolean subscribed;
@@ -1899,7 +1920,7 @@
          */
         PartitionScanPublisher(
                 BiFunction<Long, Integer, CompletableFuture<Collection<BinaryRow>>> retrieveBatch,
-                BiFunction<Boolean, CompletableFuture<Long>, CompletableFuture<Void>> onClose,
+                IgniteTriFunction<Boolean, Long, Throwable, CompletableFuture<Void>> onClose,
                 InflightBatchRequestTracker inflightBatchRequestTracker
         ) {
             this.retrieveBatch = retrieveBatch;
@@ -1998,7 +2019,7 @@
                     return;
                 }
 
-                onClose.apply(intentionallyClose, t == null ? completedFuture(scanId) : failedFuture(t)).whenComplete((ignore, th) -> {
+                onClose.apply(intentionallyClose, scanId, t).whenComplete((ignore, th) -> {
                     if (th != null) {
                         subscriber.onError(th);
                     } else {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 34affae..2df22fd 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -83,7 +83,6 @@
             TablePartitionId tablePartitionId,
             IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken
     ) {
-        // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
         return null;
     }
 
@@ -103,7 +102,6 @@
     }
 
     @Override
-    // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only.
     protected CompletableFuture<Void> finish(boolean commit) {
         return finish(commit, readTimestamp);
     }