blob: f76b264a86954bbe5f7e5524194ec24b4b66597a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.jdbc.thin;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
/**
* Statement cancel test.
*/
@SuppressWarnings({"ThrowableNotThrown", "AssertWithSideEffects"})
public class JdbcThinStatementCancelSelfTest extends JdbcThinAbstractSelfTest {
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** URL. */
private static final String URL = "jdbc:ignite:thin://127.0.0.1/";
/** A CSV file with one record. */
private static final String BULKLOAD_20_000_LINE_CSV_FILE =
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload20_000.csv")).
getAbsolutePath();
/** Max table rows. */
private static final int MAX_ROWS = 10000;
/** Server thread pull size. */
private static final int SERVER_THREAD_POOL_SIZE = 4;
/** Cancellation processing timeout. */
public static final int TIMEOUT = 5000;
/** Nodes count. */
private static final byte NODES_COUNT = 3;
/** Timeout for checking async result. */
public static final int CHECK_RESULT_TIMEOUT = 1_000;
/** Connection. */
private Connection conn;
/** Statement. */
private Statement stmt;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration<?, ?> cache = defaultCacheConfiguration();
cache.setCacheMode(PARTITIONED);
cache.setBackups(1);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setSqlFunctionClasses(TestSQLFunctions.class);
cache.setIndexedTypes(Integer.class, Integer.class, Long.class, Long.class, String.class,
JdbcThinAbstractDmlStatementSelfTest.Person.class);
cfg.setCacheConfiguration(cache);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(IP_FINDER);
cfg.setDiscoverySpi(disco);
cfg.setClientConnectorConfiguration(new ClientConnectorConfiguration().
setThreadPoolSize(SERVER_THREAD_POOL_SIZE));
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGridsMultiThreaded(NODES_COUNT);
for (int i = 0; i < MAX_ROWS; ++i)
grid(0).cache(DEFAULT_CACHE_NAME).put(i, i);
for (int i = 0; i < MAX_ROWS; ++i)
grid(0).cache(DEFAULT_CACHE_NAME).put((long)i, (long)i);
}
/**
* Called before execution of every test method in class.
*
* @throws Exception If failed.
*/
@Before
public void before() throws Exception {
TestSQLFunctions.init();
conn = DriverManager.getConnection(URL);
conn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
stmt = conn.createStatement();
assert stmt != null;
assert !stmt.isClosed();
}
/**
* Called after execution of every test method in class.
*
* @throws Exception If failed.
*/
@After
public void after() throws Exception {
if (stmt != null && !stmt.isClosed()) {
stmt.close();
assert stmt.isClosed();
}
conn.close();
assert stmt.isClosed();
assert conn.isClosed();
}
/**
* Trying to cancel stament without query. In given case cancel is noop, so no exception expected.
*/
@Test
public void testCancelingStmtWithoutQuery() {
try {
stmt.cancel();
}
catch (Exception e) {
log.error("Unexpected exception.", e);
fail("Unexpected exception");
}
}
/**
* Trying to retrieve result set of a canceled query.
* SQLException with message "The query was cancelled while executing." expected.
*
* @throws Exception If failed.
*/
@Test
public void testResultSetRetrievalInCanceledStatement() throws Exception {
stmt.execute("SELECT 1; SELECT 2; SELECT 3;");
assertNotNull(stmt.getResultSet());
stmt.cancel();
GridTestUtils.assertThrows(log, () -> {
stmt.getResultSet();
return null;
}, SQLException.class, "The query was cancelled while executing.");
}
/**
* Trying to cancel already cancelled query.
* No exceptions exceped.
*
* @throws Exception If failed.
*/
@Test
public void testCancelCanceledQuery() throws Exception {
stmt.execute("SELECT 1;");
assertNotNull(stmt.getResultSet());
stmt.cancel();
stmt.cancel();
GridTestUtils.assertThrows(log, () -> {
stmt.getResultSet();
return null;
}, SQLException.class, "The query was cancelled while executing.");
}
/**
* Trying to cancel closed query.
* SQLException with message "Statement is closed." expected.
*
* @throws Exception If failed.
*/
@Test
public void testCancelClosedStmt() throws Exception {
stmt.close();
GridTestUtils.assertThrows(log, () -> {
stmt.cancel();
return null;
}, SQLException.class, "Statement is closed.");
}
/**
* Trying to call <code>resultSet.next()</code> on a canceled query.
* SQLException with message "The query was cancelled while executing." expected.
*
* @throws Exception If failed.
*/
@Test
public void testResultSetNextAfterCanceling() throws Exception {
stmt.setFetchSize(10);
ResultSet rs = stmt.executeQuery("select * from Integer");
assert rs.next();
stmt.cancel();
GridTestUtils.assertThrows(log, () -> {
rs.next();
return null;
}, SQLException.class, "The query was cancelled while executing.");
}
/**
* Ensure that it's possible to execute new query on cancelled statement.
*
* @throws Exception If failed.
*/
@Test
public void testCancelAnotherStmt() throws Exception {
stmt.setFetchSize(10);
ResultSet rs = stmt.executeQuery("select * from Integer");
assert rs.next();
stmt.cancel();
ResultSet rs2 = stmt.executeQuery("select * from Integer order by _val");
assert rs2.next() : "The other cursor mustn't be closed";
}
/**
* Ensure that stament cancel doesn't effect another statement workflow, created by the same connection.
*
* @throws Exception If failed.
*/
@Test
public void testCancelAnotherStmtResultSet() throws Exception {
try (Statement anotherStmt = conn.createStatement()) {
ResultSet rs1 = stmt.executeQuery("select * from Integer WHERE _key % 2 = 0");
ResultSet rs2 = anotherStmt.executeQuery("select * from Integer WHERE _key % 2 <> 0");
stmt.cancel();
GridTestUtils.assertThrows(log, () -> {
rs1.next();
return null;
}, SQLException.class, "The query was cancelled while executing.");
assert rs2.next() : "The other cursor mustn't be closed";
}
}
/**
* Trying to cancel long running query. No exceptions expected.
* In order to guarantee correct concurrent processing of query itself and it's cancellation request
* two latches and some other stuff is used.
* For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
* and <code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
*
* @throws Exception If failed.
*/
@Test
public void testCancelQuery() throws Exception {
IgniteInternalFuture cancelRes = cancel(stmt);
GridTestUtils.assertThrows(log, () -> {
stmt.executeQuery("select * from Integer where _key in " +
"(select abs(_key) from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
return null;
}, SQLException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Trying close canceling query. No exceptions expected.
* In order to guarantee correct concurrent processing of query itself and it's cancellation request
* two latches and some other stuff is used.
* For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
* and <code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
*
* @throws Exception If failed.
*/
@Test
public void testCloseCancelingQuery() throws Exception {
IgniteInternalFuture res = GridTestUtils.runAsync(() -> {
try {
TestSQLFunctions.cancelLatch.await();
long cancelCntrBeforeCancel = ClientListenerProcessor.CANCEL_COUNTER.get();
stmt.cancel();
try {
GridTestUtils.waitForCondition(
() -> ClientListenerProcessor.CANCEL_COUNTER.get() == cancelCntrBeforeCancel + 1, TIMEOUT);
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op.
}
assertEquals(cancelCntrBeforeCancel + 1, ClientListenerProcessor.CANCEL_COUNTER.get());
// Nothing expected here, cause query was already marked as canceled.
stmt.close();
TestSQLFunctions.reqLatch.countDown();
}
catch (Exception e) {
log.error("Unexpected exception.", e);
fail("Unexpected exception");
}
});
GridTestUtils.assertThrows(log, () -> {
stmt.executeQuery("select * from Integer where _key in " +
"(select abs(_key) from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
return null;
}, SQLException.class, "The query was cancelled while executing.");
// Ensures that there were no exceptions within async cancellation process.
res.get(CHECK_RESULT_TIMEOUT);
}
/**
* Trying to cancel long running multiple statments query. No exceptions expected.
* In order to guarantee correct concurrent processing of query itself and it's cancellation request
* two latches and some other stuff is used.
* For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
* and <code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
*
* @throws Exception If failed.
*/
@Test
public void testCancelMultipleStatementsQuery() throws Exception {
try (Statement anotherStatment = conn.createStatement()) {
anotherStatment.setFetchSize(1);
ResultSet rs = anotherStatment.executeQuery("select * from Integer");
assert rs.next();
IgniteInternalFuture cancelRes = cancel(stmt);
GridTestUtils.assertThrows(log, () -> {
// Executes multiple long running query
stmt.execute(
"select 100 from Integer;"
+ "select _key from Integer where awaitLatchCancelled() = 0;"
+ "select 100 from Integer I1 join Integer I2;"
+ "select * from Integer where shouldNotBeCalledInCaseOfCancellation()");
return null;
}, SQLException.class, "The query was cancelled while executing");
assert rs.next() : "The other cursor mustn't be closed";
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
}
/**
* Trying to cancel long running batch query. No exceptions expected.
* In order to guarantee correct concurrent processing of query itself and it's cancellation request
* two latches and some other stuff is used.
* For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>
* and <code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
*
* @throws Exception If failed.
*/
@Test
public void testCancelBatchQuery() throws Exception {
try (Statement stmt2 = conn.createStatement()) {
stmt2.setFetchSize(1);
ResultSet rs = stmt2.executeQuery("SELECT * from Integer");
assert rs.next();
IgniteInternalFuture cancelRes = cancel(stmt);
GridTestUtils.assertThrows(log, () -> {
stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30) OR _key < " + MAX_ROWS);
stmt.addBatch("update Long set _val = _val + 1 where awaitLatchCancelled() = 0");
stmt.addBatch("update Long set _val = _val + 1 where _key < sleep_func (30) OR _key < " + MAX_ROWS);
stmt.addBatch("update Long set _val = _val + 1 where shouldNotBeCalledInCaseOfCancellation()");
stmt.executeBatch();
return null;
}, java.sql.SQLException.class, "The query was cancelled while executing");
assert rs.next() : "The other cursor mustn't be closed";
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
}
/**
* Trying to cancel long running query in situation that there's no worker for cancel query,
* cause server thread pool is full. No exceptions expected.
* In order to guarantee correct concurrent processing of query itself and it's cancellation request
* thress latches and some other stuff is used.
* For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>,
* <code>TestSQLFunctions#awaitQuerySuspensionLatch()</code>
* and <code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
*
* @throws Exception If failed.
*/
@Test
public void testCancelAgainstFullServerThreadPool() throws Exception {
List<Statement> statements = Collections.synchronizedList(new ArrayList<>());
List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
// Prepares connections and statemens in order to use them for filling thread pool with pseuso-infine quries.
for (int i = 0; i < SERVER_THREAD_POOL_SIZE; i++) {
Connection yaConn = DriverManager.getConnection(URL);
yaConn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
connections.add(yaConn);
Statement yaStmt = yaConn.createStatement();
statements.add(yaStmt);
}
try {
IgniteInternalFuture cancelRes = cancel(statements.get(SERVER_THREAD_POOL_SIZE - 1));
// Completely fills server thread pool.
IgniteInternalFuture<Long> fillPoolRes = fillServerThreadPool(statements, SERVER_THREAD_POOL_SIZE - 1);
GridTestUtils.assertThrows(log, () -> {
statements.get(SERVER_THREAD_POOL_SIZE - 1).executeQuery(
"select * from Integer where _key in " +
"(select abs(_key) from Integer where awaitLatchCancelled() = 0) and" +
" shouldNotBeCalledInCaseOfCancellation()");
return null;
}, SQLException.class, "The query was cancelled while executing.");
// Releases queries in thread pool.
TestSQLFunctions.suspendQryLatch.countDown();
// Ensures that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
// Ensures that there were no exceptions within async thread pool filling process.
fillPoolRes.get(CHECK_RESULT_TIMEOUT);
}
finally {
for (Statement statement : statements)
statement.close();
for (Connection connection : connections)
connection.close();
}
}
/**
* Trying to cancel fetch query in situation that there's no worker for cancel query,
* cause server thread pool is full. No exceptions expected.
* In order to guarantee correct concurrent processing of query itself and it's cancellation request
* thress latches and some other stuff is used.
* For more details see <code>TestSQLFunctions#awaitLatchCancelled()</code>,
* <code>TestSQLFunctions#awaitQuerySuspensionLatch()</code>
* and <code>JdbcThinStatementCancelSelfTest#cancel(java.sql.Statement)</code>.
*
* @throws Exception If failed.
*/
@Test
public void testCancelFetchAgainstFullServerThreadPool() throws Exception {
stmt.setFetchSize(1);
ResultSet rs = stmt.executeQuery("SELECT * from Integer");
rs.next();
List<Statement> statements = Collections.synchronizedList(new ArrayList<>());
List<Connection> connections = Collections.synchronizedList(new ArrayList<>());
// Prepares connections and statemens in order to use them for filling thread pool with pseuso-infine quries.
for (int i = 0; i < SERVER_THREAD_POOL_SIZE; i++) {
Connection yaConn = DriverManager.getConnection(URL);
yaConn.setSchema('"' + DEFAULT_CACHE_NAME + '"');
connections.add(yaConn);
Statement yaStmt = yaConn.createStatement();
statements.add(yaStmt);
}
try {
// Completely fills server thread pool.
IgniteInternalFuture<Long> fillPoolRes = fillServerThreadPool(statements,
SERVER_THREAD_POOL_SIZE - 1);
IgniteInternalFuture fetchRes = GridTestUtils.runAsync(() -> {
GridTestUtils.assertThrows(log, () -> {
rs.next();
return null;
}, SQLException.class, "The query was cancelled while executing.");
});
stmt.cancel();
// Ensures that there were no exceptions within async data fetching process.
fetchRes.get(CHECK_RESULT_TIMEOUT);
// Releases queries in thread pool.
TestSQLFunctions.suspendQryLatch.countDown();
// Ensure that there were no exceptions within async thread pool filling process.
fillPoolRes.get(CHECK_RESULT_TIMEOUT);
}
finally {
for (Statement statement : statements)
statement.close();
for (Connection connection : connections)
connection.close();
}
}
/**
* Trying to cancel long running file upload. No exceptions expected.
*
* @throws Exception If failed.
*/
@Test
public void testCancellingLongRunningFileUpload() throws Exception {
IgniteInternalFuture cancelRes = GridTestUtils.runAsync(() -> {
try {
Thread.sleep(200);
stmt.cancel();
}
catch (Exception e) {
log.error("Unexpected exception.", e);
fail("Unexpected exception");
}
});
GridTestUtils.assertThrows(log, () -> {
stmt.executeUpdate(
"copy from '" + BULKLOAD_20_000_LINE_CSV_FILE + "' into Person" +
" (_key, age, firstName, lastName)" +
" format csv");
return null;
}, SQLException.class, "The query was cancelled while executing.");
// Ensure that there were no exceptions within async cancellation process.
cancelRes.get(CHECK_RESULT_TIMEOUT);
}
/**
* Cancels current query, actual cancel will wait <code>cancelLatch</code> to be releaseds.
*
* @return <code>IgniteInternalFuture</code> to check whether exception was thrown.
*/
private IgniteInternalFuture cancel(Statement stmt) {
return GridTestUtils.runAsync(() -> {
try {
TestSQLFunctions.cancelLatch.await();
long cancelCntrBeforeCancel = ClientListenerProcessor.CANCEL_COUNTER.get();
stmt.cancel();
try {
GridTestUtils.waitForCondition(
() -> ClientListenerProcessor.CANCEL_COUNTER.get() == cancelCntrBeforeCancel + 1, TIMEOUT);
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op.
}
assertEquals(cancelCntrBeforeCancel + 1, ClientListenerProcessor.CANCEL_COUNTER.get());
TestSQLFunctions.reqLatch.countDown();
}
catch (Exception e) {
log.error("Unexpected exception.", e);
fail("Unexpected exception");
}
});
}
/**
* Fills Server Thread Pool with <code>qryCnt</code> queries. Given queries will wait for
* <code>suspendQryLatch</code> to be released.
*
* @param statements Statements.
* @param qryCnt Number of queries to execute.
* @return <code>IgniteInternalFuture</code> in order to check whether exception was thrown or not.
*/
private IgniteInternalFuture<Long> fillServerThreadPool(List<Statement> statements, int qryCnt) {
AtomicInteger idx = new AtomicInteger(0);
return GridTestUtils.runMultiThreadedAsync(() -> {
try {
statements.get(idx.getAndIncrement()).executeQuery(
"select * from Integer where awaitQuerySuspensionLatch();");
}
catch (SQLException e) {
log.error("Unexpected exception.", e);
fail("Unexpected exception");
}
}, qryCnt, "ThreadName");
}
/**
* Utility class with custom SQL functions.
*/
public static class TestSQLFunctions {
/** Request latch. */
static CountDownLatch reqLatch;
/** Cancel latch. */
static CountDownLatch cancelLatch;
/** Suspend query latch. */
static CountDownLatch suspendQryLatch;
/**
* Recreate latches.
*/
static void init() {
reqLatch = new CountDownLatch(1);
cancelLatch = new CountDownLatch(1);
suspendQryLatch = new CountDownLatch(1);
}
/**
* Releases cancelLatch that leeds to sending cancel Query and waits until cancel Query is fully processed.
*
* @return 0;
*/
@QuerySqlFunction
public static long awaitLatchCancelled() {
try {
cancelLatch.countDown();
reqLatch.await();
}
catch (Exception ignored) {
// No-op.
}
return 0;
}
/**
* Waits latch release.
*
* @return 0;
*/
@QuerySqlFunction
public static long awaitQuerySuspensionLatch() {
try {
suspendQryLatch.await();
}
catch (Exception ignored) {
// No-op.
}
return 0;
}
/**
* If called fails with corresponding message.
*
* @return 0;
*/
@QuerySqlFunction
public static long shouldNotBeCalledInCaseOfCancellation() {
fail("Query wasn't actually cancelled.");
return 0;
}
/**
*
* @param v amount of milliseconds to sleep
* @return amount of milliseconds to sleep
*/
@QuerySqlFunction
public static int sleep_func(int v) {
try {
Thread.sleep(v);
}
catch (InterruptedException ignored) {
// No-op
}
return v;
}
}
}