IGNITE-15346 Fix exception message of closed the GridCloseableIteratorAdapter. (#9376)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
index 895f5ae..7f6f8d9 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
@@ -69,9 +69,9 @@
         // No-op. Prevent cache destroy from super class.
     }
 
-    /** */
+    /** @throws Exception If failed. */
     @Test
-    public void testCancelScanQuery() {
+    public void testCancelScanQuery() throws Exception {
         doTestScanQueryCancel(client, srvs, args -> {
             int res = execute("--kill", "scan", args.get1().toString(), args.get2(), args.get3().toString());
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java
index 3386649..9b2b53f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java
@@ -35,8 +35,11 @@
 
     /** {@inheritDoc} */
     @Override public final T nextX() throws IgniteCheckedException {
-        if (!hasNextX())
+        if (!hasNextX()) {
+            checkClosed();
+
             throw new NoSuchElementException();
+        }
 
         return onNext();
     }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
index 8bb08bd..d60afb9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
@@ -139,9 +139,9 @@
         cleanPersistenceDir();
     }
 
-    /** */
+    /** @throws Exception If failed. */
     @Test
-    public void testCancelScanQuery() {
+    public void testCancelScanQuery() throws Exception {
         doTestScanQueryCancel(startCli, srvs, args ->
             qryMBean.cancelScan(args.get1().toString(), args.get2(), args.get3()));
     }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
index cc03bc0..7dae6b5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
@@ -105,9 +105,9 @@
             cache.put(i, i);
     }
 
-    /** */
+    /** @throws Exception If failed. */
     @Test
-    public void testCancelScanQuery() {
+    public void testCancelScanQuery() throws Exception {
         doTestScanQueryCancel(startCli, srvs,
             args -> execute(killCli, KILL_SCAN_QRY + " '" + args.get1() + "' '" + args.get2() + "' " + args.get3()));
     }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
index 64b7c85..7e5450d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
@@ -20,9 +20,11 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -37,6 +39,7 @@
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -47,10 +50,13 @@
 import org.apache.ignite.services.ServiceContext;
 import org.apache.ignite.spi.systemview.view.ServiceView;
 import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
+import static org.apache.ignite.internal.managers.systemview.ScanQuerySystemView.SCAN_QRY_SYS_VIEW;
 import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.queryProcessor;
 import static org.apache.ignite.internal.processors.service.IgniteServiceProcessor.SVCS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.apache.ignite.util.KillCommandsSQLTest.execute;
@@ -82,6 +88,12 @@
     /** Latch to block compute task execution. */
     private static CountDownLatch computeLatch;
 
+    /** Scan query filter latch. */
+    private static volatile CountDownLatch filterLatch;
+
+    /** Scan query cancel latch. */
+    private static volatile CountDownLatch cancelLatch;
+
     /**
      * Test cancel of the scan query.
      *
@@ -89,7 +101,28 @@
      * @param srvs Server nodes.
      * @param qryCanceler Query cancel closure.
      */
-    public static void doTestScanQueryCancel(IgniteEx cli, List<IgniteEx> srvs, Consumer<T3<UUID, String, Long>> qryCanceler) {
+    public static void doTestScanQueryCancel(
+        IgniteEx cli,
+        List<IgniteEx> srvs,
+        Consumer<T3<UUID, String, Long>> qryCanceler
+    ) throws Exception {
+        checkScanQueryCancelBeforeFetching(cli, srvs, qryCanceler);
+
+        checkScanQueryCancelDuringFetching(cli, srvs, qryCanceler);
+    }
+
+    /**
+     * Checks cancel of the scan query before fetching.
+     *
+     * @param cli Client node.
+     * @param srvs Server nodes.
+     * @param qryCanceler Query cancel closure.
+     */
+    public static void checkScanQueryCancelBeforeFetching(
+        IgniteEx cli,
+        List<IgniteEx> srvs,
+        Consumer<T3<UUID, String, Long>> qryCanceler
+    ) throws Exception {
         IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
 
         QueryCursor<Cache.Entry<Object, Object>> qry1 = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
@@ -98,14 +131,7 @@
         // Fetch first entry and therefore caching first page.
         assertNotNull(iter1.next());
 
-        List<List<?>> scanQries0 = execute(srvs.get(0),
-            "SELECT ORIGIN_NODE_ID, CACHE_NAME, QUERY_ID FROM SYS.SCAN_QUERIES");
-
-        assertEquals(1, scanQries0.size());
-
-        UUID originNodeId = (UUID)scanQries0.get(0).get(0);
-        String cacheName = (String)scanQries0.get(0).get(1);
-        long qryId = (Long)scanQries0.get(0).get(2);
+        T3<UUID, String, Long> qryInfo = scanQuery(srvs.get(0));
 
         // Opens second query.
         QueryCursor<Cache.Entry<Object, Object>> qry2 = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
@@ -115,7 +141,7 @@
         assertNotNull(iter2.next());
 
         // Cancel first query.
-        qryCanceler.accept(new T3<>(originNodeId, cacheName, qryId));
+        qryCanceler.accept(qryInfo);
 
         // Fetch all cached entries. It's size equal to the {@code PAGE_SZ * NODES_CNT}.
         for (int i = 0; i < PAGE_SZ * srvs.size() - 1; i++)
@@ -128,6 +154,64 @@
         for (int i = 0; i < PAGE_SZ * PAGE_SZ - 1; i++)
             assertNotNull(iter2.next());
 
+        checkScanQueryResources(cli, srvs, qryInfo.get3());
+
+        qry2.close();
+    }
+
+    /**
+     * Checks cancel of the scan query during fetching.
+     *
+     * @param cli Client node.
+     * @param srvs Server nodes.
+     * @param qryCanceler Query cancel closure.
+     */
+    private static void checkScanQueryCancelDuringFetching(
+        IgniteEx cli,
+        List<IgniteEx> srvs,
+        Consumer<T3<UUID, String, Long>> qryCanceler
+    ) throws Exception {
+        filterLatch = new CountDownLatch(1);
+        cancelLatch = new CountDownLatch(1);
+
+        IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
+
+        QueryCursor<Cache.Entry<Object, Object>> qry = cache.query(new ScanQuery<>().setFilter((o, o2) -> {
+            try {
+                filterLatch.countDown();
+
+                cancelLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+            }
+            catch (Exception ignored) {
+                // No-op.
+            }
+
+            return true;
+        }));
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync((Runnable)() -> qry.iterator().next());
+
+        assertTrue(filterLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+        T3<UUID, String, Long> qryInfo = scanQuery(srvs.get(0));
+
+        qryCanceler.accept(qryInfo);
+
+        cancelLatch.countDown();
+
+        assertThrowsAnyCause(null, fut::get, NoSuchElementException.class, "Iterator has been closed.");
+
+        checkScanQueryResources(cli, srvs, qryInfo.get3());
+    }
+
+    /**
+     * Checks scan query resources.
+     *
+     * @param cli Client node.
+     * @param srvs Server nodes.
+     * @param qryId Query ID to check.
+     */
+    private static void checkScanQueryResources(IgniteEx cli, List<IgniteEx> srvs, long qryId) {
         // Checking all server node objects cleared after cancel.
         for (int i = 0; i < srvs.size(); i++) {
             IgniteEx ignite = srvs.get(i);
@@ -152,6 +236,23 @@
     }
 
     /**
+     * Gets scan query info.
+     *
+     * @param node Node to get query info.
+     * @return Tuple of scan query info.
+     */
+    private static T3<UUID, String, Long> scanQuery(IgniteEx node) throws IgniteCheckedException {
+        assertTrue(waitForCondition(() -> node.context().systemView().view(SCAN_QRY_SYS_VIEW).size() > 0, TIMEOUT));
+
+        List<List<?>> qry = execute(node,
+            "SELECT ORIGIN_NODE_ID, CACHE_NAME, QUERY_ID FROM SYS.SCAN_QUERIES");
+
+        assertEquals(1, qry.size());
+
+        return new T3<>((UUID)qry.get(0).get(0), (String)qry.get(0).get(1), (Long)qry.get(0).get(2));
+    }
+
+    /**
      * Test cancel of the compute task.
      *
      * @param cli Client node that starts tasks.