blob: a96dafc8cf217db585f8fe6f4d04f83389b2531f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.ignite.internal.processors.query;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservationManager;
import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
import org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.util.Arrays.stream;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
/**
* Test KILL QUERY requested from the same node where quere was executed.
*/
@SuppressWarnings({"ThrowableNotThrown", "AssertWithSideEffects"})
@RunWith(Parameterized.class)
// We need to set this threshold bigger than partitions count to force partition pruning for the BETWEEN case.
// see org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor.tryExtractBetween
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN, value = "21")
public class KillQueryTest extends GridCommonAbstractTest {
/** Generates values for the {@link #asyncCancel} parameter. */
@Parameterized.Parameters(name = "asyncCancel = {0}")
public static Iterable<Object[]> valuesForAsync() {
return Arrays.asList(new Object[][] {
{true},
{false}
});
}
/** Whether current test execution shuould use async or non-async cancel mechanism. */
@Parameterized.Parameter
public boolean asyncCancel;
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** A CSV file with one record. */
private static final String BULKLOAD_20_000_LINE_CSV_FILE =
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload20_000.csv")).
getAbsolutePath();
/** Max table rows. */
private static final int MAX_ROWS = 10000;
/** Cancellation processing timeout. */
public static final int TIMEOUT = 5000;
/** Nodes count. */
protected static final byte NODES_COUNT = 3;
/** Timeout for checking async result. */
public static final int CHECK_RESULT_TIMEOUT = 1_000;
/** Number of partitions in the test chache. Keep it small to have enough rows in each partitions. */
public static final int PARTS_CNT = 20;
/** Connection. */
private Connection conn;
/** Statement. */
private Statement stmt;
/** Ignite. */
protected IgniteEx ignite;
/** Ignite instance for kill request. */
private IgniteEx igniteForKillRequest;
/** Node configration conter. */
private static int cntr;
/** Table count. */
private static AtomicInteger tblCnt = new AtomicInteger();
/** Barrier. Needed to test unsupported cancelation. Doesn't block threads (parties=1) by default. */
private static volatile CyclicBarrier barrier = new CyclicBarrier(1);
/** Allows to block messages, issued FROM the client node. */
private static TestRecordingCommunicationSpi clientBlocker;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration<?, ?> cache = GridAbstractTest.defaultCacheConfiguration();
cache.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
cache.setCacheMode(PARTITIONED);
cache.setBackups(1);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setSqlFunctionClasses(TestSQLFunctions.class);
cache.setIndexedTypes(Integer.class, Integer.class, Long.class, Long.class, String.class, Person.class);
cfg.setCacheConfiguration(cache);
TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
cfg.setCommunicationSpi(commSpi);
if (++cntr == NODES_COUNT) {
cfg.setClientMode(true);
clientBlocker = commSpi;
}
cfg.setDiscoverySpi(new TcpDiscoverySpi() {
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
if (msg instanceof CustomMessageWrapper) {
DiscoveryCustomMessage delegate = ((CustomMessageWrapper)msg).delegate();
if (delegate instanceof DynamicCacheChangeBatch) {
try {
awaitTimeout();
}
catch (Exception e) {
log.error(e.getMessage(), e);
}
}
else if (delegate instanceof SchemaProposeDiscoveryMessage) {
try {
awaitTimeout();
}
catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
super.sendCustomEvent(msg);
}
}.setIpFinder(IP_FINDER));
return cfg;
}
/**
* Creates and populates a new cache that is used in distributed join scenario: We have table of Persons with some
* autogenerated PK. Join filter should be based on Person.id column which is not collocated with the pk. Result
* size of such join (with eq condition) is {@link #MAX_ROWS} rows.
*
* @param cacheName name of the created cache.
* @param shift integer to avoid collocation, put different value for the different caches.
*/
private void createJoinCache(String cacheName, int shift) {
CacheConfiguration<Long, Person> ccfg = GridAbstractTest.defaultCacheConfiguration();
ccfg.setName(cacheName);
ccfg.setCacheMode(PARTITIONED);
ccfg.setBackups(1);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setSqlFunctionClasses(TestSQLFunctions.class);
ccfg.setQueryEntities(Collections.singleton(
new QueryEntity(Integer.class.getName(), Person.class.getName())
.setTableName("PERSON")
.setKeyFieldName("rec_id") // PK
.addQueryField("rec_id", Integer.class.getName(), null)
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("lastName", String.class.getName(), null)
.setIndexes(Collections.singleton(new QueryIndex("id", true, "idx_" + cacheName)))
));
grid(0).createCache(ccfg);
try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(cacheName)) {
for (int recordId = 0; recordId < MAX_ROWS; recordId++) {
// If two caches has the same PK, FK fields ("id") will be different.
int intTabIdFK = (recordId + shift) % MAX_ROWS;
ds.addData(recordId,
new Person(intTabIdFK,
"Name_" + recordId,
"LastName_" + recordId,
42));
}
}
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
cntr = 0;
GridQueryProcessor.idxCls = MockedIndexing.class;
startGrids(NODES_COUNT);
// Let's set baseline topology manually. Doing so we are sure that partitions are distributed beetween our 2 srv
// nodes, not belong only one node.
awaitPartitionMapExchange(true, true, null);
long curTop = grid(0).cluster().topologyVersion();
grid(0).cluster().baselineAutoAdjustEnabled(false);
grid(0).cluster().setBaselineTopology(curTop);
awaitPartitionMapExchange(true, true, null);
// Populate data.
try (IgniteDataStreamer<Object, Object> ds = grid(0).dataStreamer(GridAbstractTest.DEFAULT_CACHE_NAME)) {
for (int i = 0; i < MAX_ROWS; ++i) {
ds.addData(i, i);
ds.addData((long)i, (long)i);
}
}
createJoinCache("PERS1", 1);
createJoinCache("PERS2", 2);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
GridQueryProcessor.idxCls = null;
super.afterTestsStopped();
}
/**
* @return Ignite node which will be used to execute kill query.
*/
protected IgniteEx getKillRequestNode() {
return grid(0);
}
/**
* Called before execution of every test method in class.
*
* @throws Exception If failed.
*/
@Before
public void before() throws Exception {
TestSQLFunctions.reset();
newBarrier(1);
tblCnt.incrementAndGet();
conn = GridTestUtils.connect(grid(0), null);
conn.setSchema('"' + GridAbstractTest.DEFAULT_CACHE_NAME + '"');
stmt = conn.createStatement();
ignite = grid(0);
igniteForKillRequest = getKillRequestNode();
MockedIndexing.resetToDefault();
}
/**
* Called after execution of every test method in class.
*
* @throws Exception If failed.
*/
@After
public void after() throws Exception {
MockedIndexing.resetToDefault();
clientBlocker.stopBlock(false);
if (stmt != null && !stmt.isClosed()) {
stmt.close();
assert stmt.isClosed();
}
conn.close();
assertTrue(ignite.context().query().runningQueries(-1).isEmpty());
}
/**
* Tries to cancel COPY FROM command, then checks such cancellation is unsupported.
*
* 1) Run COPY query, got it suspended in the middle. 2) Try to cancel it, get expected exception. 3) Wake up the
* COPY. 4) Check COPY is done.
*/
@Test
public void testBulkLoadCancellationUnsupported() throws Exception {
String path = Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
.getAbsolutePath();
String createTab = "CREATE TABLE " + currentTestTableName() +
"(id integer primary key, age integer, firstName varchar, lastname varchar)";
String copy = "COPY FROM '" + path + "'" +
" INTO " + currentTestTableName() +
" (id, age, firstName, lastName)" +
" format csv charset 'ascii'";
// It's importaint to COPY from the client node: in this case datastreamer doesn't perform local updates so
// it sends communication messages which we can hold.
IgniteEx clientNode = grid(NODES_COUNT - 1);
try (Connection clConn = GridTestUtils.connect(clientNode, null);
final Statement client = clConn.createStatement()) {
client.execute(createTab);
// Suspend further copy query by holding data streamer messages.
clientBlocker.blockMessages((dstNode, msg) -> msg instanceof DataStreamerRequest);
IgniteInternalFuture<Boolean> copyIsDone = GridTestUtils.runAsync(() -> client.execute(copy));
// Wait at least one streamer message, that means copy started.
clientBlocker.waitForBlocked(1, TIMEOUT);
// Query can be found only on the connected node.
String globQryId = findOneRunningQuery(copy, clientNode);
GridTestUtils.assertThrowsAnyCause(log,
() -> igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(globQryId, asyncCancel)),
CacheException.class,
"Query doesn't support cancellation");
// Releases copy.
clientBlocker.stopBlock(true);
copyIsDone.get(TIMEOUT);
int tabSize = clientNode.cache(DEFAULT_CACHE_NAME)
.query(new SqlFieldsQuery("SELECT * FROM " + currentTestTableName() + " ").setSchema("PUBLIC"))
.getAll()
.size();
assertEquals("COPY command inserted incorrect number of rows.", 1, tabSize);
}
}
/**
* Finds global id of the specified query on the specified node. Expecting exactly one result.
*
* @param query Query text to find id for.
* @param node Node handle to the node, which initiated the query.
*/
private String findOneRunningQuery(String query, IgniteEx node) {
List<GridRunningQueryInfo> qryList = findQueriesOnNode(query, node);
assertEquals("Expected only one running query: " + query + "\nBut found: " + qryList, 1, qryList.size());
return qryList.get(0).globalQueryId();
}
/**
* Finds queries that has specified text and are initially runned on the specified server node.
*
* @param query text of the query to find.
* @param node server node that runs the reduce query.
*/
private List<GridRunningQueryInfo> findQueriesOnNode(String query, IgniteEx node) {
List<GridRunningQueryInfo> allQrs = (List<GridRunningQueryInfo>)node.context().query().runningQueries(-1);
return allQrs.stream()
.filter(q -> q.query().equals(query))
.collect(Collectors.toList());
}
/**
* Trying to cancel CREATE TABLE command.
*/
@Test
public void testCreateTableCancellationUnsupported() throws Exception {
checkCancellationUnsupported(Collections.<String>emptyList(),
"CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
asyncCancel);
}
/**
* Trying to cancel ALTER TABLE command.
*/
@Test
public void testAlterTableCancellationUnsupported() throws Exception {
checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
"ALTER TABLE " + currentTestTableName() + " ADD COLUMN COL VARCHAR",
asyncCancel);
}
/**
* Trying to cancel CREATE INDEX command.
*/
@Test
public void testCreateIndexCancellationUnsupported() throws Exception {
checkCancellationUnsupported(Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)"),
"CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)",
asyncCancel);
}
/**
* Trying to cancel DROP INDEX command.
*/
@Test
public void testDropIndexCancellationUnsupported() throws Exception {
checkCancellationUnsupported(
Arrays.asList("CREATE TABLE " + currentTestTableName() + " (id INTEGER PRIMARY KEY, name VARCHAR)",
"CREATE INDEX " + currentTestTableName() + "_IDX ON " + currentTestTableName() + "(name, id)"),
"DROP INDEX " + currentTestTableName() + "_IDX",
asyncCancel);
}
/**
* Get test table name unique for per tests, but stable within one test run.
*
* @return Generated test table name unique for per tests, but stable within one test run.
*/
private String currentTestTableName() {
return "TST_TABLE_" + tblCnt.get();
}
/**
* Check that trying cancellation execution of {@code sqlCmd} can't be cancelled due to it's not supported.
*
* @param prepareSteps Preparation SQLs before start test.
* @param sqlCmd Command which can't be cancelled
* @param async Execute cancellation in ASYNC mode.
* @throws Exception In case of failure.
*/
private void checkCancellationUnsupported(List<String> prepareSteps, String sqlCmd,
boolean async) throws Exception {
for (String sql : prepareSteps) {
try {
stmt.execute(sql);
}
catch (SQLException e) {
throw new IgniteException(e);
}
}
newBarrier(2);
IgniteInternalFuture cancelRes = cancelQueryWithBarrier(sqlCmd,
"Query doesn't support cancellation",
async);
stmt.execute(sqlCmd);
cancelRes.get(TIMEOUT);
}
/**
* Trying to cancel non-existing query.
*/
@Test
public void testKillUnknownQry() {
UUID nodeId = ignite.localNode().id();
GridTestUtils.assertThrows(log, () -> {
igniteForKillRequest.cache(DEFAULT_CACHE_NAME)
.query(createKillQuery(nodeId, Long.MAX_VALUE, asyncCancel));
return null;
}, CacheException.class, "Query with provided ID doesn't exist [nodeId=" + nodeId);
}
/**
* Trying to cancel query on unknown node.
*/
@Test
public void testKillQryUnknownNode() {
GridTestUtils.assertThrows(log, () -> {
igniteForKillRequest.cache(DEFAULT_CACHE_NAME)
.query(createKillQuery(UUID.randomUUID(), Long.MAX_VALUE, asyncCancel));
return null;
}, CacheException.class, "Failed to cancel query, node is not alive");
}
/**
* Trying to kill already killed query. No exceptions expected.
*/
@Test
public void testKillAlreadyKilledQuery() throws Exception {
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
FieldsQueryCursor<List<?>> cur = cache.query(new SqlFieldsQuery("select * from Integer where awaitLatchCancelled() = 0"));
List<GridRunningQueryInfo> runningQueries = (List<GridRunningQueryInfo>)ignite.context().query().runningQueries(-1);
GridRunningQueryInfo runQryInfo = runningQueries.get(0);
SqlFieldsQuery killQry = createKillQuery(runQryInfo.globalQueryId(), asyncCancel);
IgniteCache<Object, Object> reqCache = igniteForKillRequest.cache(DEFAULT_CACHE_NAME);
IgniteInternalFuture killFut = cancel(1, asyncCancel);
GridTestUtils.assertThrows(log,
() -> cur.iterator().next(),
QueryCancelledException.class,
"The query was cancelled while executing");
killFut.get(CHECK_RESULT_TIMEOUT);
GridTestUtils.assertThrows(log,
() -> reqCache.query(killQry),
CacheException.class,
"Query with provided ID doesn't exist");
cur.close();
}
/**
* @param nodeId Node id.
* @param qryId Node query id.
*/
private SqlFieldsQuery createKillQuery(UUID nodeId, long qryId, boolean async) {
return createKillQuery(nodeId + "_" + qryId, async);
}
/**
* @param globalQryId Global query id.
*/
private SqlFieldsQuery createKillQuery(String globalQryId, boolean async) {
return new SqlFieldsQuery("KILL QUERY" + (async ? " ASYNC" : "") + " '" + globalQryId + "'");
}
/**
* Trying to cancel long running query. No exceptions expected.
*/
@Test
public void testCancelQuery() throws Exception {
IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
GridTestUtils.assertThrows(log, () -> {
stmt.executeQuery("select * from Integer where _key in " +
"(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
return null;
}, SQLException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Trying to cancel long running query if partition pruning does it job. It's important to set {@link
* IgniteSystemProperties#IGNITE_SQL_MAX_EXTRACTED_PARTS_FROM_BETWEEN} bigger than partitions count {@link
* #PARTS_CNT}
*/
@Test
public void testCancelQueryPartitionPruning() throws Exception {
IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
final int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = 400;
GridTestUtils.assertThrows(log, () -> {
stmt.executeQuery("select * from Integer where _key between 1000 and 2000 " +
"and awaitLatchCancelled() = 0 " +
"and shouldNotBeCalledMoreThan(" + ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL + ")");
return null;
}, SQLException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Check that local query can be canceled either using async or non-async method. Local query is performed using
* cache.query() API with "local" property "true".
*/
@Test
public void testCancelLocalQueryNative() throws Exception {
IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
GridTestUtils.assertThrows(log, () -> {
ignite.cache(DEFAULT_CACHE_NAME).query(
new SqlFieldsQuery("select * from Integer where _key in " +
"(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()")
.setLocal(true)
).getAll();
return null;
}, IgniteException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Check distributed query can be canceled.
*/
@Test
public void testCancelDistributeJoin() throws Exception {
IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
final int ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL = MAX_ROWS - 1;
GridTestUtils.assertThrows(log, () -> {
ignite.cache(DEFAULT_CACHE_NAME).query(
new SqlFieldsQuery("SELECT p1.rec_id, p1.id, p2.rec_id " +
"FROM PERS1.Person p1 JOIN PERS2.Person p2 " +
"ON p1.id = p2.id " +
"AND shouldNotBeCalledMoreThan(" + ROWS_ALLOWED_TO_PROCESS_AFTER_CANCEL + ")" +
"AND awaitLatchCancelled() = 0")
.setDistributedJoins(true)
).getAll();
return null;
}, CacheException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Trying to async cancel long running multiple statements query. No exceptions expected.
*/
@Test
public void testKillMultipleStatementsQuery() throws Exception {
try (Statement anotherStatement = conn.createStatement()) {
anotherStatement.setFetchSize(1);
String sql = "select * from Integer";
ResultSet rs = anotherStatement.executeQuery(sql);
assert rs.next();
IgniteInternalFuture cancelRes = cancel(3, asyncCancel, sql);
GridTestUtils.assertThrows(log, () -> {
// Executes multiple long running query
stmt.execute(
"select 100 from Integer;"
+ "select _key from Integer where awaitLatchCancelled() = 0;");
return null;
}, SQLException.class, "The query was cancelled while executing");
assert rs.next() : "The other cursor mustn't be closed";
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
}
/**
* Trying to cancel long running batch query. No exceptions expected. *
*
* @throws Exception If failed.
*/
@Test
public void testCancelBatchQuery() throws Exception {
try (Statement stmt2 = conn.createStatement()) {
stmt2.setFetchSize(1);
String sql = "SELECT * from Integer";
ResultSet rs = stmt2.executeQuery(sql);
Assert.assertTrue(rs.next());
IgniteInternalFuture cancelRes = cancel(2, asyncCancel, sql);
GridTestUtils.assertThrows(log, () -> {
stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
stmt.addBatch("update Long set _val = _val + 1 where awaitLatchCancelled() = 0");
stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30)");
stmt.addBatch("update Long set _val = _val + 1 where shouldNotBeCalledInCaseOfCancellation()");
stmt.executeBatch();
return null;
}, SQLException.class, "The query was cancelled while executing");
Assert.assertTrue("The other cursor mustn't be closed", rs.next());
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
}
/**
* Check if query hangs (due to reducer spins retrying to reserve partitions but they can't be reserved), we still
* able to cancel it. Used mocked indexing simulates 100% unability.
*/
@Test
public void testCancelQueryIfPartitionsCantBeReservedOnMapNodes() throws Exception {
// Releases thread that kills query when map nodes receive query request. At least one map node received is ok.
GridMessageListener qryStarted = (node, msg, plc) -> {
if (msg instanceof GridH2QueryRequest)
TestSQLFunctions.cancelLatch.countDown();
};
for (int i = 0; i < NODES_COUNT; i++)
grid(i).context().io().addMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
// Suspends distributed queries on the map nodes.
MockedIndexing.failReservations = true;
try {
IgniteInternalFuture cancelFut = cancel(1, asyncCancel);
GridTestUtils.assertThrows(log, () -> {
ignite.cache(DEFAULT_CACHE_NAME).query(
new SqlFieldsQuery("select * from Integer where _val <> 42")
).getAll();
return null;
}, CacheException.class, "The query was cancelled while executing.");
cancelFut.get(CHECK_RESULT_TIMEOUT);
}
finally {
for (int i = 0; i < NODES_COUNT; i++)
grid(i).context().io().removeMessageListener(GridTopic.TOPIC_QUERY, qryStarted);
}
}
/**
* Check if query hangs due to reducer cannot get nodes for partitions, it still can be killed.
*/
@Test
public void testCancelQueryIfUnableToGetNodesForPartitions() throws Exception {
// Force query to spin retrying to get nodes for partitions.
MockedIndexing.retryNodePartMapping = true;
String select = "select * from Integer where _val <> 42";
IgniteInternalFuture runQueryFut = GridTestUtils.runAsync(() ->
ignite.cache(DEFAULT_CACHE_NAME).query(
new SqlFieldsQuery(select)
).getAll());
boolean gotOneFreezedSelect = GridTestUtils.waitForCondition(
() -> findQueriesOnNode(select, ignite).size() == 1, TIMEOUT);
if (!gotOneFreezedSelect) {
if (runQueryFut.isDone())
printFuturesException("Got exception getting running the query.", runQueryFut);
Assert.fail("Failed to wait for query to be in running queries list exactly one time " +
"[select=" + select + ", node=" + ignite.localNode().id() + ", timeout=" + TIMEOUT + "ms].");
}
SqlFieldsQuery killQry = createKillQuery(findOneRunningQuery(select, ignite), asyncCancel);
ignite.cache(DEFAULT_CACHE_NAME).query(killQry);
GridTestUtils.assertThrowsAnyCause(
log,
() -> runQueryFut.get(CHECK_RESULT_TIMEOUT),
CacheException.class,
"The query was cancelled while executing.");
}
/**
* Print to log exception that have been catched on other thread and have been put to specified future.
*
* @param msg message to add.
* @param fut future containing the exception.
*/
private void printFuturesException(String msg, IgniteInternalFuture fut) {
try {
fut.get(TIMEOUT);
}
catch (Exception e) {
log.error(msg, e);
}
}
/**
* Test if user specified partitions for query explicitly, such query is cancealble.
*
* We check 3 scenarious in which partitions are belong to: 1) only first node <br/> 2) only second node <br/> 3)
* some to first, the others to second <br/>
*/
@Test
public void testCancelQueryWithPartitions() throws Exception {
Affinity<Object> aff = ignite.affinity(DEFAULT_CACHE_NAME);
int halfOfNodeParts = PARTS_CNT / 4;
int[] firstParts = stream(aff.primaryPartitions(grid(0).localNode())).limit(halfOfNodeParts).toArray();
int[] secondParts = stream(aff.primaryPartitions(grid(1).localNode())).limit(halfOfNodeParts).toArray();
int[] mixedParts = IntStream.concat(
stream(firstParts).limit(halfOfNodeParts),
stream(secondParts).limit(halfOfNodeParts)
).toArray();
checkPartitions(firstParts);
checkPartitions(secondParts);
checkPartitions(mixedParts);
}
/**
* Test if user specified partitions for query explicitly, such query is cancealble.
*
* @param partitions user specified partitions, could contain partitions that are mapped on one or both nodes.
*/
public void checkPartitions(int[] partitions) throws Exception {
TestSQLFunctions.reset();
IgniteInternalFuture cancelRes = cancel(1, asyncCancel);
GridTestUtils.assertThrows(log, () -> {
ignite.cache(DEFAULT_CACHE_NAME).query(
new SqlFieldsQuery("select * from Integer where _key in " +
"(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()")
.setPartitions(partitions)
).getAll();
return null;
}, CacheException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Wait until all map parts are finished on the specified node. Not needed when IGN-13862 is done.
*
* @param node node for which map request completion to wait.
*/
private void ensureMapQueriesHasFinished(IgniteEx node) throws Exception {
boolean noTasksInQryPool = GridTestUtils.waitForCondition(() -> queryPoolIsEmpty(node), TIMEOUT);
Assert.assertTrue("Node " + node.localNode().id() + " has not finished its tasks in the query pool",
noTasksInQryPool);
}
/**
* @param node node which query pool to check.
* @return {@code True} if {@link GridIoPolicy#QUERY_POOL} is empty. This means no queries are currntly executed and
* no queries are executed at the moment; {@code false} otherwise.
*/
private boolean queryPoolIsEmpty(IgniteEx node) {
ThreadPoolExecutor qryPool = (ThreadPoolExecutor)node.context().getQueryExecutorService();
return qryPool.getQueue().isEmpty() && qryPool.getActiveCount() == 0;
}
/**
* Cancels current query which wait on barrier.
*
* @param qry Query which need to try cancel.
* @param expErrMsg Expected error message during cancellation.
* @param async Execute cancellation in ASYNC mode.
* @return <code>IgniteInternalFuture</code> to check whether exception was thrown.
*/
private IgniteInternalFuture cancelQueryWithBarrier(String qry, String expErrMsg, boolean async) {
return GridTestUtils.runAsync(() -> {
try {
List<GridRunningQueryInfo> runningQueries = new ArrayList<>();
GridTestUtils.waitForCondition(() -> {
List<GridRunningQueryInfo> r = (List<GridRunningQueryInfo>)ignite.context().query()
.runningQueries(-1);
runningQueries.addAll(r.stream().filter(q -> q.query().equals(qry)).collect(Collectors.toList()));
return !runningQueries.isEmpty();
}, TIMEOUT);
assertFalse(runningQueries.isEmpty());
for (GridRunningQueryInfo runningQuery : runningQueries) {
GridTestUtils.assertThrowsAnyCause(log,
() -> igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(runningQuery.globalQueryId(), async)),
CacheException.class, expErrMsg);
}
}
catch (Exception e) {
log.error("Unexpected exception.", e);
Assert.fail("Unexpected exception");
}
finally {
try {
awaitTimeout();
}
catch (Exception e) {
log.error("Unexpected exception.", e);
Assert.fail("Unexpected exception");
}
}
});
}
/**
* Cancels current query, actual cancel will wait <code>cancelLatch</code> to be released.
*
* @return <code>IgniteInternalFuture</code> to check whether exception was thrown.
*/
private IgniteInternalFuture cancel(int expQryNum, boolean async, String... skipSqls) {
return GridTestUtils.runAsync(() -> {
try {
TestSQLFunctions.cancelLatch.await();
List<GridRunningQueryInfo> runningQueries = (List<GridRunningQueryInfo>)ignite.context().query().runningQueries(-1);
List<IgniteInternalFuture> res = new ArrayList<>();
for (GridRunningQueryInfo runningQuery : runningQueries) {
if (Stream.of(skipSqls).noneMatch((skipSql -> runningQuery.query().equals(skipSql))))
res.add(GridTestUtils.runAsync(() -> {
igniteForKillRequest.cache(DEFAULT_CACHE_NAME).query(createKillQuery(runningQuery.globalQueryId(), async));
}
));
}
// This sleep is required to wait for kill queries get started.
doSleep(500);
if (expQryNum != runningQueries.size())
log.error("Found running queries are incorrect, " +
"expected only " + expQryNum + " queries. Found : " + runningQueries);
assertEquals(expQryNum, runningQueries.size());
TestSQLFunctions.reqLatch.countDown();
for (IgniteInternalFuture fut : res)
fut.get(TIMEOUT);
// Currently canceled query returns (unblocks the caller code) without waiting for map parts of the
// query to be finished. We need to wait for them. This is a workaround for IGN-13862 because we
// observe side effects if map parts of canceled query are still running.
ensureMapQueriesHasFinished(grid(0));
ensureMapQueriesHasFinished(grid(1));
}
catch (Exception e) {
log.error("Unexpected exception.", e);
Assert.fail("Unexpected exception");
}
});
}
/**
* Fills Server Thread Pool with <code>qryCnt</code> queries. Given queries will wait for
* <code>suspendQryLatch</code> to be released.
*
* @param statements Statements.
* @param qryCnt Number of queries to execute.
* @return <code>IgniteInternalFuture</code> in order to check whether exception was thrown or not.
*/
private IgniteInternalFuture<Long> fillServerThreadPool(List<Statement> statements, int qryCnt) {
AtomicInteger idx = new AtomicInteger(0);
return GridTestUtils.runMultiThreadedAsync(() -> {
try {
statements.get(idx.getAndIncrement()).executeQuery(
"select * from Integer where awaitQuerySuspensionLatch();");
}
catch (SQLException e) {
log.error("Unexpected exception.", e);
Assert.fail("Unexpected exception");
}
}, qryCnt, "ThreadName");
}
/**
* Create and set new CyclicBarrier for the function.
*
* @param parties the number of threads that must invoke await method before the barrier is tripped
*/
private static void newBarrier(int parties) {
if (barrier != null)
barrier.reset();
barrier = new CyclicBarrier(parties);
}
/**
* @throws InterruptedException In case of failure.
* @throws TimeoutException In case of failure.
* @throws BrokenBarrierException In case of failure.
*/
private static void awaitTimeout() throws InterruptedException, TimeoutException, BrokenBarrierException {
barrier.await(TIMEOUT, TimeUnit.MILLISECONDS);
}
/**
* Utility class with custom SQL functions.
*/
public static class TestSQLFunctions {
/** Request latch. */
static volatile CountDownLatch reqLatch;
/** Cancel latch. */
static volatile CountDownLatch cancelLatch;
/** Suspend query latch. */
static volatile CountDownLatch suspendQryLatch;
/** How many times function {@link #shouldNotBeCalledMoreThan} have been called so far. */
static volatile AtomicInteger funCallCnt;
/**
* Recreate latches. Old latches are released.
*/
static void reset() {
releaseLatches(reqLatch, cancelLatch, suspendQryLatch);
reqLatch = new CountDownLatch(1);
cancelLatch = new CountDownLatch(1);
suspendQryLatch = new CountDownLatch(1);
funCallCnt = new AtomicInteger(0);
}
/**
* @param latches latches to release. Our latches have initial count = 1.
*/
private static void releaseLatches(CountDownLatch... latches) {
for (CountDownLatch l : latches) {
if (l != null)
l.countDown();
}
}
/**
* Releases cancelLatch that leeds to sending cancel Query and waits until cancel Query is fully processed.
*
* @return 0;
*/
@QuerySqlFunction
public static long awaitLatchCancelled() {
try {
cancelLatch.countDown();
reqLatch.await();
}
catch (Exception ignored) {
// No-op.
}
return 0;
}
/**
* Waits latch release.
*
* @return 0;
*/
@QuerySqlFunction
public static long awaitQuerySuspensionLatch() {
try {
suspendQryLatch.await();
}
catch (Exception ignored) {
// No-op.
}
return 0;
}
/**
* Asserts that this function have not been called more than specified number times. Otherwise we're failing.
* Intended to check that query is canceled but since cancel is not instant (query continues to process some
* number of rows), it don't process all the rows in the table (in case of full scan, of course).
*
* @param times deadline times.
* @return always {@link true}.
*/
@QuerySqlFunction
public static boolean shouldNotBeCalledMoreThan(int times) {
if (funCallCnt.incrementAndGet() >= times)
fail("Query is running too long since it was canceled.");
return true;
}
/**
* If called fails with corresponding message.
*
* @return 0;
*/
@QuerySqlFunction
public static long shouldNotBeCalledInCaseOfCancellation() {
fail("Query wasn't actually cancelled.");
return 0;
}
/**
* @param v amount of milliseconds to sleep
* @return amount of milliseconds to sleep
*/
@QuerySqlFunction
public static int sleep_func(int v) {
try {
Thread.sleep(v);
}
catch (InterruptedException ignored) {
// No-op
}
return v;
}
}
/**
* Person.
*/
static class Person implements Serializable {
/** ID. */
@QuerySqlField
private final int id;
/** First name. */
@QuerySqlField
private final String firstName;
/** Last name. */
@QuerySqlField
private final String lastName;
/** Age. */
@QuerySqlField
private final int age;
/**
* @param id ID.
* @param firstName First name.
* @param lastName Last name.
* @param age Age.
*/
Person(int id, String firstName, String lastName, int age) {
assert !F.isEmpty(firstName);
assert !F.isEmpty(lastName);
assert age > 0;
this.id = id;
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Person person = (Person)o;
if (id != person.id)
return false;
if (age != person.age)
return false;
if (!Objects.equals(firstName, person.firstName))
return false;
return Objects.equals(lastName, person.lastName);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int result = id;
result = 31 * result + (firstName != null ? firstName.hashCode() : 0);
result = 31 * result + (lastName != null ? lastName.hashCode() : 0);
result = 31 * result + age;
return result;
}
}
/**
* Mocked indexing to eventually suspend mapper or reducer code. It simulates never ending PMEs, unstable topologies
* and etc.
*/
static class MockedIndexing extends IgniteH2Indexing {
/**
* All the time this flag is set to {@code true}, no partitions can be reserved. Acts like normal indexing by
* default.
*/
static volatile boolean failReservations;
/**
* All the time this flag is set to {@code true}, reducer is not able to get nodes for given partitions because
* mapper says "retry later".
*/
static volatile boolean retryNodePartMapping;
/**
* Result of the mapping partitions to nodes, that indicates that caller should retry request later.
*/
private static final ReducePartitionMapResult RETRY_RESULT = new ReducePartitionMapResult(null, null, null);
/**
* Resets to default behaviour: disable all mock added logic.
*/
static void resetToDefault() {
failReservations = false;
retryNodePartMapping = false;
}
/**
* Setups mock objects into this indexing, just after super initialization is done.
*/
@Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException {
super.start(ctx, busyLock);
setReservationManager(new PartitionReservationManager(super.ctx) {
/** {@inheritDoc} */
@Override public PartitionReservation reservePartitions(@Nullable List<Integer> cacheIds,
AffinityTopologyVersion reqTopVer, int[] explicitParts, UUID nodeId, long reqId)
throws IgniteCheckedException {
return failReservations ? new PartitionReservation(null,
"[TESTS]: Failed to reserve partitions for the testing purpose!") :
super.reservePartitions(cacheIds, reqTopVer, explicitParts, nodeId, reqId);
}
});
setMapper(new ReducePartitionMapper(ctx, ctx.log(GridReduceQueryExecutor.class)) {
/** {@inheritDoc} */
@Override public ReducePartitionMapResult nodesForPartitions(List<Integer> cacheIds,
AffinityTopologyVersion topVer, int[] parts, boolean isReplicatedOnly, long qryId) {
return retryNodePartMapping ? RETRY_RESULT :
super.nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly, qryId);
}
});
}
/**
* Injects custom {@link PartitionReservationManager} into indexing instance.
*/
protected void setReservationManager(PartitionReservationManager mockMgr) {
try {
Field partReservationMgrFld = IgniteH2Indexing.class.getDeclaredField("partReservationMgr");
partReservationMgrFld.setAccessible(true);
partReservationMgrFld.set(this, mockMgr);
}
catch (Exception rethrown) {
throw new RuntimeException(rethrown);
}
}
/**
* Injects custom {@link ReducePartitionMapper} into reducer of this indexing instance.
*/
private void setMapper(ReducePartitionMapper mock) {
try {
GridReduceQueryExecutor rdcExec = this.reduceQueryExecutor();
Field mapperFld = GridReduceQueryExecutor.class.getDeclaredField("mapper");
mapperFld.setAccessible(true);
mapperFld.set(rdcExec, mock);
}
catch (Exception rethrown) {
throw new RuntimeException(rethrown);
}
}
}
}