blob: 7e5450db24e622ec0e5bbbbf4ec74551e3377766 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.util;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.event.CacheEntryEvent;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.spi.systemview.view.ServiceView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import static org.apache.ignite.internal.managers.systemview.ScanQuerySystemView.SCAN_QRY_SYS_VIEW;
import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.queryProcessor;
import static org.apache.ignite.internal.processors.service.IgniteServiceProcessor.SVCS_VIEW;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.util.KillCommandsSQLTest.execute;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* General tests for the cancel command.
*/
class KillCommandsTests {
/** Service name. */
public static final String SVC_NAME = "my-svc";
/** Cache name. */
public static final String DEFAULT_CACHE_NAME = "default";
/** Page size. */
public static final int PAGE_SZ = 5;
/** Number of pages to insert. */
public static final int PAGES_CNT = 1000;
/** Operations timeout. */
public static final int TIMEOUT = 10_000;
/** Latch to block compute task execution. */
private static CountDownLatch computeLatch;
/** Scan query filter latch. */
private static volatile CountDownLatch filterLatch;
/** Scan query cancel latch. */
private static volatile CountDownLatch cancelLatch;
/**
* Test cancel of the scan query.
*
* @param cli Client node.
* @param srvs Server nodes.
* @param qryCanceler Query cancel closure.
*/
public static void doTestScanQueryCancel(
IgniteEx cli,
List<IgniteEx> srvs,
Consumer<T3<UUID, String, Long>> qryCanceler
) throws Exception {
checkScanQueryCancelBeforeFetching(cli, srvs, qryCanceler);
checkScanQueryCancelDuringFetching(cli, srvs, qryCanceler);
}
/**
* Checks cancel of the scan query before fetching.
*
* @param cli Client node.
* @param srvs Server nodes.
* @param qryCanceler Query cancel closure.
*/
public static void checkScanQueryCancelBeforeFetching(
IgniteEx cli,
List<IgniteEx> srvs,
Consumer<T3<UUID, String, Long>> qryCanceler
) throws Exception {
IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
QueryCursor<Cache.Entry<Object, Object>> qry1 = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
Iterator<Cache.Entry<Object, Object>> iter1 = qry1.iterator();
// Fetch first entry and therefore caching first page.
assertNotNull(iter1.next());
T3<UUID, String, Long> qryInfo = scanQuery(srvs.get(0));
// Opens second query.
QueryCursor<Cache.Entry<Object, Object>> qry2 = cache.query(new ScanQuery<>().setPageSize(PAGE_SZ));
Iterator<Cache.Entry<Object, Object>> iter2 = qry2.iterator();
// Fetch first entry and therefore caching first page.
assertNotNull(iter2.next());
// Cancel first query.
qryCanceler.accept(qryInfo);
// Fetch all cached entries. It's size equal to the {@code PAGE_SZ * NODES_CNT}.
for (int i = 0; i < PAGE_SZ * srvs.size() - 1; i++)
assertNotNull(iter1.next());
// Fetch of the next page should throw the exception.
assertThrowsWithCause(iter1::next, IgniteCheckedException.class);
// Checking that second query works fine after canceling first.
for (int i = 0; i < PAGE_SZ * PAGE_SZ - 1; i++)
assertNotNull(iter2.next());
checkScanQueryResources(cli, srvs, qryInfo.get3());
qry2.close();
}
/**
* Checks cancel of the scan query during fetching.
*
* @param cli Client node.
* @param srvs Server nodes.
* @param qryCanceler Query cancel closure.
*/
private static void checkScanQueryCancelDuringFetching(
IgniteEx cli,
List<IgniteEx> srvs,
Consumer<T3<UUID, String, Long>> qryCanceler
) throws Exception {
filterLatch = new CountDownLatch(1);
cancelLatch = new CountDownLatch(1);
IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
QueryCursor<Cache.Entry<Object, Object>> qry = cache.query(new ScanQuery<>().setFilter((o, o2) -> {
try {
filterLatch.countDown();
cancelLatch.await(TIMEOUT, TimeUnit.MILLISECONDS);
}
catch (Exception ignored) {
// No-op.
}
return true;
}));
IgniteInternalFuture<?> fut = GridTestUtils.runAsync((Runnable)() -> qry.iterator().next());
assertTrue(filterLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
T3<UUID, String, Long> qryInfo = scanQuery(srvs.get(0));
qryCanceler.accept(qryInfo);
cancelLatch.countDown();
assertThrowsAnyCause(null, fut::get, NoSuchElementException.class, "Iterator has been closed.");
checkScanQueryResources(cli, srvs, qryInfo.get3());
}
/**
* Checks scan query resources.
*
* @param cli Client node.
* @param srvs Server nodes.
* @param qryId Query ID to check.
*/
private static void checkScanQueryResources(IgniteEx cli, List<IgniteEx> srvs, long qryId) {
// Checking all server node objects cleared after cancel.
for (int i = 0; i < srvs.size(); i++) {
IgniteEx ignite = srvs.get(i);
int cacheId = CU.cacheId(DEFAULT_CACHE_NAME);
GridCacheContext<?, ?> ctx = ignite.context().cache().context().cacheContext(cacheId);
ConcurrentMap<UUID, ? extends GridCacheQueryManager<?, ?>.RequestFutureMap> qryIters =
ctx.queries().queryIterators();
assertTrue(qryIters.size() <= 1);
if (qryIters.isEmpty())
return;
GridCacheQueryManager<?, ?>.RequestFutureMap futs = qryIters.get(cli.localNode().id());
assertNotNull(futs);
assertFalse(futs.containsKey(qryId));
}
}
/**
* Gets scan query info.
*
* @param node Node to get query info.
* @return Tuple of scan query info.
*/
private static T3<UUID, String, Long> scanQuery(IgniteEx node) throws IgniteCheckedException {
assertTrue(waitForCondition(() -> node.context().systemView().view(SCAN_QRY_SYS_VIEW).size() > 0, TIMEOUT));
List<List<?>> qry = execute(node,
"SELECT ORIGIN_NODE_ID, CACHE_NAME, QUERY_ID FROM SYS.SCAN_QUERIES");
assertEquals(1, qry.size());
return new T3<>((UUID)qry.get(0).get(0), (String)qry.get(0).get(1), (Long)qry.get(0).get(2));
}
/**
* Test cancel of the compute task.
*
* @param cli Client node that starts tasks.
* @param srvs Server nodes.
* @param qryCanceler Query cancel closure.
*/
public static void doTestCancelComputeTask(IgniteEx cli, List<IgniteEx> srvs, Consumer<String> qryCanceler)
throws Exception {
computeLatch = new CountDownLatch(1);
IgniteFuture<Collection<Integer>> fut = cli.compute().broadcastAsync(() -> {
computeLatch.await();
return 1;
});
try {
String[] id = new String[1];
boolean res = waitForCondition(() -> {
for (IgniteEx srv : srvs) {
List<List<?>> tasks = execute(srv, "SELECT SESSION_ID FROM SYS.JOBS");
if (tasks.size() == 1)
id[0] = (String)tasks.get(0).get(0);
else
return false;
}
return true;
}, TIMEOUT);
assertTrue(res);
qryCanceler.accept(id[0]);
for (IgniteEx srv : srvs) {
res = waitForCondition(() -> {
List<List<?>> tasks = execute(srv, "SELECT SESSION_ID FROM SYS.JOBS");
return tasks.isEmpty();
}, TIMEOUT);
assertTrue(srv.configuration().getIgniteInstanceName(), res);
}
assertThrowsWithCause(() -> fut.get(TIMEOUT), IgniteException.class);
} finally {
computeLatch.countDown();
}
}
/**
* Test cancel of the transaction.
*
* @param cli Client node.
* @param srvs Server nodes.
* @param txCanceler Transaction cancel closure.
*/
public static void doTestCancelTx(IgniteEx cli, List<IgniteEx> srvs, Consumer<String> txCanceler) {
IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
// See e.g. KillCommandsMxBeanTest
int testKey = (PAGES_CNT * PAGE_SZ) + 42;
try (Transaction tx = cli.transactions().txStart()) {
cache.put(testKey, 1);
List<List<?>> txs = execute(cli, "SELECT xid FROM SYS.TRANSACTIONS");
assertEquals(1, txs.size());
String xid = (String)txs.get(0).get(0);
txCanceler.accept(xid);
assertThrowsWithCause(tx::commit, IgniteException.class);
for (int i = 0; i < srvs.size(); i++) {
txs = execute(srvs.get(i), "SELECT xid FROM SYS.TRANSACTIONS");
assertEquals(0, txs.size());
}
}
assertNull(cache.get(testKey));
}
/**
* Test cancel of the service.
*
* @param startCli Client node to start service.
* @param killCli Client node to kill service.
* @param srv Server node.
* @param svcCanceler Service cancel closure.
*/
public static void doTestCancelService(IgniteEx startCli, IgniteEx killCli, IgniteEx srv,
Consumer<String> svcCanceler) throws Exception {
ServiceConfiguration scfg = new ServiceConfiguration();
scfg.setName(SVC_NAME);
scfg.setMaxPerNodeCount(1);
scfg.setNodeFilter(srv.cluster().predicate());
scfg.setService(new TestServiceImpl());
startCli.services().deploy(scfg);
SystemView<ServiceView> svcView = srv.context().systemView().view(SVCS_VIEW);
SystemView<ServiceView> killCliSvcView = killCli.context().systemView().view(SVCS_VIEW);
boolean res = waitForCondition(() -> svcView.size() == 1 && killCliSvcView.size() == 1, TIMEOUT);
assertTrue(res);
TestService svc = startCli.services().serviceProxy(SVC_NAME, TestService.class, true);
assertNotNull(svc);
svcCanceler.accept(SVC_NAME);
res = waitForCondition(() -> svcView.size() == 0, TIMEOUT);
assertTrue(res);
}
/**
* Test cancel of the SQL query.
*
* @param cli Client node.
* @param qryCanceler Query cancel closure.
*/
public static void doTestCancelSQLQuery(IgniteEx cli, Consumer<String> qryCanceler) {
String qryStr = "SELECT * FROM \"default\".Integer";
SqlFieldsQuery qry = new SqlFieldsQuery(qryStr).setPageSize(PAGE_SZ);
Iterator<List<?>> iter = queryProcessor(cli).querySqlFields(qry, true).iterator();
assertNotNull(iter.next());
List<List<?>> sqlQries = execute(cli, "SELECT * FROM SYS.SQL_QUERIES ORDER BY START_TIME");
assertEquals(2, sqlQries.size());
String qryId = (String)sqlQries.get(0).get(0);
assertEquals(qryStr, sqlQries.get(0).get(1));
qryCanceler.accept(qryId);
for (int i = 0; i < PAGE_SZ - 2; i++)
assertNotNull(iter.next());
assertThrowsWithCause(iter::next, CacheException.class);
}
/**
* Test cancel of the continuous query.
*
* @param cli Client node.
* @param srvs Server nodes.
* @param qryCanceler Query cancel closure.
*/
public static void doTestCancelContinuousQuery(IgniteEx cli, List<IgniteEx> srvs,
BiConsumer<UUID, UUID> qryCanceler) throws Exception {
IgniteCache<Object, Object> cache = cli.cache(DEFAULT_CACHE_NAME);
ContinuousQuery<Integer, Integer> cq = new ContinuousQuery<>();
AtomicInteger cntr = new AtomicInteger();
cq.setInitialQuery(new ScanQuery<>());
cq.setTimeInterval(1_000L);
cq.setPageSize(PAGE_SZ);
cq.setLocalListener(events -> {
for (CacheEntryEvent<? extends Integer, ? extends Integer> e : events) {
assertNotNull(e);
cntr.incrementAndGet();
}
});
cache.query(cq);
for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
cache.put(i, i);
boolean res = waitForCondition(() -> cntr.get() == PAGE_SZ * PAGE_SZ, TIMEOUT);
assertTrue(res);
List<List<?>> cqQries = execute(cli,
"SELECT NODE_ID, ROUTINE_ID FROM SYS.CONTINUOUS_QUERIES");
assertEquals(1, cqQries.size());
UUID nodeId = (UUID)cqQries.get(0).get(0);
UUID routineId = (UUID)cqQries.get(0).get(1);
qryCanceler.accept(nodeId, routineId);
long cnt = cntr.get();
for (int i = 0; i < PAGE_SZ * PAGE_SZ; i++)
cache.put(i, i);
res = waitForCondition(() -> cntr.get() > cnt, TIMEOUT);
assertFalse(res);
for (int i = 0; i < srvs.size(); i++) {
IgniteEx srv = srvs.get(i);
res = waitForCondition(() -> execute(srv,
"SELECT ROUTINE_ID FROM SYS.CONTINUOUS_QUERIES").isEmpty(), TIMEOUT);
assertTrue(srv.configuration().getIgniteInstanceName(), res);
}
}
/** */
public interface TestService extends Service {
/** */
public void doTheJob();
}
/** */
public static class TestServiceImpl implements TestService {
/** {@inheritDoc} */
@Override public void cancel(ServiceContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Override public void init(ServiceContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Override public void execute(ServiceContext ctx) {
// No-op.
}
/** {@inheritDoc} */
@Override public void doTheJob() {
// No-op.
}
}
}