IGNITE-15346 Fix exception message of closed the GridCloseableIteratorAdapter. (#9376)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
index 895f5ae..7f6f8d9 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
@@ -69,9 +69,9 @@
// No-op. Prevent cache destroy from super class.
}
- /** */
+ /** @throws Exception If failed. */
@Test
- public void testCancelScanQuery() {
+ public void testCancelScanQuery() throws Exception {
doTestScanQueryCancel(client, srvs, args -> {
int res = execute("--kill", "scan", args.get1().toString(), args.get2(), args.get3().toString());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java
index 3386649..9b2b53f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridCloseableIteratorAdapter.java
@@ -35,8 +35,11 @@
/** {@inheritDoc} */
@Override public final T nextX() throws IgniteCheckedException {
- if (!hasNextX())
+ if (!hasNextX()) {
+ checkClosed();
+
throw new NoSuchElementException();
+ }
return onNext();
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
index 8bb08bd..d60afb9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsMXBeanTest.java
@@ -139,9 +139,9 @@
cleanPersistenceDir();
}
- /** */
+ /** @throws Exception If failed. */
@Test
- public void testCancelScanQuery() {
+ public void testCancelScanQuery() throws Exception {
doTestScanQueryCancel(startCli, srvs, args ->
qryMBean.cancelScan(args.get1().toString(), args.get2(), args.get3()));
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
index cc03bc0..7dae6b5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsSQLTest.java
@@ -105,9 +105,9 @@
cache.put(i, i);
}
- /** */
+ /** @throws Exception If failed. */
@Test
- public void testCancelScanQuery() {
+ public void testCancelScanQuery() throws Exception {
doTestScanQueryCancel(startCli, srvs,
args -> execute(killCli, KILL_SCAN_QRY + " '" + args.get1() + "' '" + args.get2() + "' " + args.get3()));
}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
index 64b7c85..7e5450d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/util/KillCommandsTests.java
@@ -20,9 +20,11 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -37,6 +39,7 @@
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.util.typedef.T3;
@@ -47,10 +50,13 @@
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.spi.systemview.view.ServiceView;
import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
+import static org.apache.ignite.internal.managers.systemview.ScanQuerySystemView.SCAN_QRY_SYS_VIEW;
import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.queryProcessor;
import static org.apache.ignite.internal.processors.service.IgniteServiceProcessor.SVCS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.util.KillCommandsSQLTest.execute;
@@ -82,6 +88,12 @@
/** Latch to block compute task execution. */
private static CountDownLatch computeLatch;
+ /** Scan query filter latch. */
+ private static volatile CountDownLatch filterLatch;
+
+ /** Scan query cancel latch. */
+ private static volatile CountDownLatch cancelLatch;
+
/**
* Test cancel of the scan query.
*
@@ -89,7 +101,28 @@
* @param srvs Server nodes.
* @param qryCanceler Query cancel closure.
*/
- public static void doTestScanQueryCancel(IgniteEx cli, List<IgniteEx> srvs, Consumer<T3<UUID, String, Long>> qryCanceler) {
+ public static void doTestScanQueryCancel(
+ IgniteEx cli,
+ List<IgniteEx> srvs,
+ Consumer<T3<UUID, String, Long>> qryCanceler
+ ) throws Exception {
+ checkScanQueryCancelBeforeFetching(cli, srvs, qryCanceler);
+
+ checkScanQueryCancelDuringFetching(cli, srvs, qryCanceler);
+ }
+
+ /**
+ * Checks cancel of the scan query before fetching.
+ *
+ * @param cli Client node.
+ * @param srvs Server nodes.
+ * @param qryCanceler Query cancel closure.
+ */
+ public static void checkScanQueryCancelBeforeFetching(
+ IgniteEx cli,
+ List<IgniteEx> srvs,
+ Consumer<T3<UUID, String, Long>> qryCanceler
+ ) throws Exception {
IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
QueryCursor<Cache.Entry<Object, Object>> qry1 = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
@@ -98,14 +131,7 @@
// Fetch first entry and therefore caching first page.
assertNotNull(iter1.next());
- List<List<?>> scanQries0 = execute(srvs.get(0),
- "SELECT ORIGIN_NODE_ID, CACHE_NAME, QUERY_ID FROM SYS.SCAN_QUERIES");
-
- assertEquals(1, scanQries0.size());
-
- UUID originNodeId = (UUID)scanQries0.get(0).get(0);
- String cacheName = (String)scanQries0.get(0).get(1);
- long qryId = (Long)scanQries0.get(0).get(2);
+ T3<UUID, String, Long> qryInfo = scanQuery(srvs.get(0));
// Opens second query.
QueryCursor<Cache.Entry<Object, Object>> qry2 = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
@@ -115,7 +141,7 @@
assertNotNull(iter2.next());
// Cancel first query.
- qryCanceler.accept(new T3<>(originNodeId, cacheName, qryId));
+ qryCanceler.accept(qryInfo);
// Fetch all cached entries. It's size equal to the {@code PAGE_SZ * NODES_CNT}.
for (int i = 0; i < PAGE_SZ * srvs.size() - 1; i++)
@@ -128,6 +154,64 @@
for (int i = 0; i < PAGE_SZ * PAGE_SZ - 1; i++)
assertNotNull(iter2.next());
+ checkScanQueryResources(cli, srvs, qryInfo.get3());
+
+ qry2.close();
+ }
+
+ /**
+ * Checks cancel of the scan query during fetching.
+ *
+ * @param cli Client node.
+ * @param srvs Server nodes.
+ * @param qryCanceler Query cancel closure.
+ */
+ private static void checkScanQueryCancelDuringFetching(
+ IgniteEx cli,
+ List<IgniteEx> srvs,
+ Consumer<T3<UUID, String, Long>> qryCanceler
+ ) throws Exception {
+ filterLatch = new CountDownLatch(1);
+ cancelLatch = new CountDownLatch(1);
+
+ IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
+
+ QueryCursor<Cache.Entry<Object, Object>> qry = cache.query(new ScanQuery<>().setFilter((o, o2) -> {
+ try {
+ filterLatch.countDown();
+
+ cancelLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ return true;
+ }));
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync((Runnable)() -> qry.iterator().next());
+
+ assertTrue(filterLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
+
+ T3<UUID, String, Long> qryInfo = scanQuery(srvs.get(0));
+
+ qryCanceler.accept(qryInfo);
+
+ cancelLatch.countDown();
+
+ assertThrowsAnyCause(null, fut::get, NoSuchElementException.class, "Iterator has been closed.");
+
+ checkScanQueryResources(cli, srvs, qryInfo.get3());
+ }
+
+ /**
+ * Checks scan query resources.
+ *
+ * @param cli Client node.
+ * @param srvs Server nodes.
+ * @param qryId Query ID to check.
+ */
+ private static void checkScanQueryResources(IgniteEx cli, List<IgniteEx> srvs, long qryId) {
// Checking all server node objects cleared after cancel.
for (int i = 0; i < srvs.size(); i++) {
IgniteEx ignite = srvs.get(i);
@@ -152,6 +236,23 @@
}
/**
+ * Gets scan query info.
+ *
+ * @param node Node to get query info.
+ * @return Tuple of scan query info.
+ */
+ private static T3<UUID, String, Long> scanQuery(IgniteEx node) throws IgniteCheckedException {
+ assertTrue(waitForCondition(() -> node.context().systemView().view(SCAN_QRY_SYS_VIEW).size() > 0, TIMEOUT));
+
+ List<List<?>> qry = execute(node,
+ "SELECT ORIGIN_NODE_ID, CACHE_NAME, QUERY_ID FROM SYS.SCAN_QUERIES");
+
+ assertEquals(1, qry.size());
+
+ return new T3<>((UUID)qry.get(0).get(0), (String)qry.get(0).get(1), (Long)qry.get(0).get(2));
+ }
+
+ /**
* Test cancel of the compute task.
*
* @param cli Client node that starts tasks.