blob: 55eb0ef0cf9004737339f13b3bb8f6d6819cad02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.ignite.internal.processors.query;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Test KILL QUERY requested from client which disconnected from cluster during cancellation in progress.
*/
@RunWith(JUnit4.class)
public class KillQueryOnClientDisconnectTest extends GridCommonAbstractTest {
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** Statement. */
protected Statement stmt;
/** Cancellation processing timeout. */
public static final int TIMEOUT = 5000;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrids(1);
startClientGrid(1);
for (int i = 0; i < 1000; ++i)
grid(0).cache(GridAbstractTest.DEFAULT_CACHE_NAME).put(i, i);
}
/**
* Called before execution of every test method in class.
*
* @throws Exception If failed.
*/
@Before
public void before() throws Exception {
TestSQLFunctions.init();
Connection conn = GridTestUtils.connect(grid(0), null);
conn.setSchema('"' + GridAbstractTest.DEFAULT_CACHE_NAME + '"');
stmt = conn.createStatement();
}
/** */
protected IgniteEx clientNode() {
IgniteEx clientNode = grid(1);
assertTrue(clientNode.context().clientNode());
return clientNode;
}
/** */
protected IgniteEx serverNode() {
IgniteEx srvNode = grid(0);
assertFalse(srvNode.context().clientNode());
return srvNode;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration<?, ?> cache = GridAbstractTest.defaultCacheConfiguration();
cache.setCacheMode(PARTITIONED);
cache.setBackups(1);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setSqlFunctionClasses(TestSQLFunctions.class);
cache.setIndexedTypes(Integer.class, Integer.class);
cfg.setCacheConfiguration(cache);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(IP_FINDER);
cfg.setDiscoverySpi(disco);
cfg.setCommunicationSpi(new TcpCommunicationSpi() {
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) {
if (GridIoMessage.class.isAssignableFrom(msg.getClass())) {
Message gridMsg = ((GridIoMessage)msg).message();
//Don't send Kill response and disconnect client node.
if (GridQueryKillResponse.class.isAssignableFrom(gridMsg.getClass())) {
grid(0).configuration().getDiscoverySpi().failNode(clientNode().cluster().localNode().id(), null);
return;
}
}
super.sendMessage(node, msg, ackC);
}
});
return cfg;
}
/**
* Test client disconnect during cancellation request is processing.
*
* @throws Exception In case of failure.
*/
@Test
public void clientDisconnectFromCluster() throws Exception {
IgniteInternalFuture cancelRes = cancelAndCheckClientDisconnect();
GridTestUtils.assertThrows(log, () -> {
stmt.executeQuery("select * from Integer where _key in " +
"(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()");
return null;
}, SQLException.class, "The query was cancelled while executing.");
cancelRes.get(TIMEOUT);
}
/**
* Cancels current query, actual cancel will wait <code>cancelLatch</code> to be releaseds.
* Checked that Cancellation wasn't complete due to clint was disconnected.
*
* @return <code>IgniteInternalFuture</code> to check whether exception was thrown.
*/
protected IgniteInternalFuture cancelAndCheckClientDisconnect() {
return GridTestUtils.runAsync(() -> {
try {
TestSQLFunctions.cancelLatch.await();
List<GridRunningQueryInfo> runningQueries = (List<GridRunningQueryInfo>)serverNode().context().query().runningQueries(-1);
assertEquals(1, runningQueries.size());
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
clientNode().cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("KILL QUERY '" + runningQueries.get(0).globalQueryId() + "'"));
});
doSleep(500);
TestSQLFunctions.reqLatch.countDown();
GridTestUtils.assertThrows(log, () -> fut.get(TIMEOUT), IgniteCheckedException.class, "Failed to cancel query because local client node has been disconnected from the cluster");
}
catch (Exception e) {
log.error("Unexpected exception.", e);
Assert.fail("Unexpected exception");
}
});
}
/**
* Utility class with custom SQL functions.
*/
public static class TestSQLFunctions {
/** Request latch. */
static CountDownLatch reqLatch;
/** Cancel latch. */
static CountDownLatch cancelLatch;
/** Suspend query latch. */
static CountDownLatch suspendQryLatch;
/**
* Recreate latches.
*/
static void init() {
reqLatch = new CountDownLatch(1);
cancelLatch = new CountDownLatch(1);
suspendQryLatch = new CountDownLatch(1);
}
/**
* Releases cancelLatch that leeds to sending cancel Query and waits until cancel Query is fully processed.
*
* @return 0;
*/
@QuerySqlFunction
public static long awaitLatchCancelled() {
try {
cancelLatch.countDown();
reqLatch.await();
}
catch (Exception ignored) {
// No-op.
}
return 0;
}
/**
* Waits latch release.
*
* @return 0;
*/
@QuerySqlFunction
public static long awaitQuerySuspensionLatch() {
try {
suspendQryLatch.await();
}
catch (Exception ignored) {
// No-op.
}
return 0;
}
/**
* If called fails with corresponding message.
*
* @return 0;
*/
@QuerySqlFunction
public static long shouldNotBeCalledInCaseOfCancellation() {
fail("Query wasn't actually cancelled.");
return 0;
}
/**
* @param v amount of milliseconds to sleep
* @return amount of milliseconds to sleep
*/
@QuerySqlFunction
public static int sleep_func(int v) {
try {
Thread.sleep(v);
}
catch (InterruptedException ignored) {
// No-op
}
return v;
}
}
}