| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.ignite.internal.processors.query; |
| |
| import java.sql.Connection; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import junit.framework.Assert; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.query.SqlFieldsQuery; |
| import org.apache.ignite.cache.query.annotations.QuerySqlFunction; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.managers.communication.GridIoMessage; |
| import org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.GridAbstractTest; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| |
| /** |
| * Test KILL QUERY requested from client which disconnected from cluster during cancellation in progress. |
| */ |
| |
| @RunWith(JUnit4.class) |
| public class KillQueryOnClientDisconnectTest extends GridCommonAbstractTest { |
| /** IP finder. */ |
| private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); |
| |
| /** Statement. */ |
| protected Statement stmt; |
| |
| /** Cancellation processing timeout. */ |
| public static final int TIMEOUT = 5000; |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| super.beforeTestsStarted(); |
| |
| startGrids(1); |
| startClientGrid(1); |
| |
| for (int i = 0; i < 1000; ++i) |
| grid(0).cache(GridAbstractTest.DEFAULT_CACHE_NAME).put(i, i); |
| } |
| |
| /** |
| * Called before execution of every test method in class. |
| * |
| * @throws Exception If failed. |
| */ |
| @Before |
| public void before() throws Exception { |
| TestSQLFunctions.init(); |
| |
| Connection conn = GridTestUtils.connect(grid(0), null); |
| |
| conn.setSchema('"' + GridAbstractTest.DEFAULT_CACHE_NAME + '"'); |
| |
| stmt = conn.createStatement(); |
| } |
| |
| /** */ |
| protected IgniteEx clientNode() { |
| IgniteEx clientNode = grid(1); |
| |
| assertTrue(clientNode.context().clientNode()); |
| |
| return clientNode; |
| } |
| |
| /** */ |
| protected IgniteEx serverNode() { |
| IgniteEx srvNode = grid(0); |
| |
| assertFalse(srvNode.context().clientNode()); |
| |
| return srvNode; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| CacheConfiguration<?, ?> cache = GridAbstractTest.defaultCacheConfiguration(); |
| |
| cache.setCacheMode(PARTITIONED); |
| cache.setBackups(1); |
| cache.setWriteSynchronizationMode(FULL_SYNC); |
| cache.setSqlFunctionClasses(TestSQLFunctions.class); |
| cache.setIndexedTypes(Integer.class, Integer.class); |
| |
| cfg.setCacheConfiguration(cache); |
| |
| TcpDiscoverySpi disco = new TcpDiscoverySpi(); |
| |
| disco.setIpFinder(IP_FINDER); |
| |
| cfg.setDiscoverySpi(disco); |
| |
| cfg.setCommunicationSpi(new TcpCommunicationSpi() { |
| /** {@inheritDoc} */ |
| @Override public void sendMessage(ClusterNode node, Message msg, |
| IgniteInClosure<IgniteException> ackC) { |
| if (GridIoMessage.class.isAssignableFrom(msg.getClass())) { |
| Message gridMsg = ((GridIoMessage)msg).message(); |
| |
| //Don't send Kill response and disconnect client node. |
| if (GridQueryKillResponse.class.isAssignableFrom(gridMsg.getClass())) { |
| grid(0).configuration().getDiscoverySpi().failNode(clientNode().cluster().localNode().id(), null); |
| |
| return; |
| } |
| } |
| |
| super.sendMessage(node, msg, ackC); |
| } |
| }); |
| |
| return cfg; |
| } |
| |
| /** |
| * Test client disconnect during cancellation request is processing. |
| * |
| * @throws Exception In case of failure. |
| */ |
| @Test |
| public void clientDisconnectFromCluster() throws Exception { |
| IgniteInternalFuture cancelRes = cancelAndCheckClientDisconnect(); |
| |
| GridTestUtils.assertThrows(log, () -> { |
| stmt.executeQuery("select * from Integer where _key in " + |
| "(select _key from Integer where awaitLatchCancelled() = 0) and shouldNotBeCalledInCaseOfCancellation()"); |
| |
| return null; |
| }, SQLException.class, "The query was cancelled while executing."); |
| |
| cancelRes.get(TIMEOUT); |
| } |
| |
| /** |
| * Cancels current query, actual cancel will wait <code>cancelLatch</code> to be releaseds. |
| * Checked that Cancellation wasn't complete due to clint was disconnected. |
| * |
| * @return <code>IgniteInternalFuture</code> to check whether exception was thrown. |
| */ |
| protected IgniteInternalFuture cancelAndCheckClientDisconnect() { |
| return GridTestUtils.runAsync(() -> { |
| try { |
| TestSQLFunctions.cancelLatch.await(); |
| |
| List<GridRunningQueryInfo> runningQueries = (List<GridRunningQueryInfo>)serverNode().context().query().runningQueries(-1); |
| |
| assertEquals(1, runningQueries.size()); |
| |
| IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { |
| clientNode().cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("KILL QUERY '" + runningQueries.get(0).globalQueryId() + "'")); |
| }); |
| |
| doSleep(500); |
| |
| TestSQLFunctions.reqLatch.countDown(); |
| |
| GridTestUtils.assertThrows(log, () -> fut.get(TIMEOUT), IgniteCheckedException.class, "Failed to cancel query because local client node has been disconnected from the cluster"); |
| } |
| catch (Exception e) { |
| log.error("Unexpected exception.", e); |
| |
| Assert.fail("Unexpected exception"); |
| } |
| }); |
| } |
| |
| /** |
| * Utility class with custom SQL functions. |
| */ |
| public static class TestSQLFunctions { |
| /** Request latch. */ |
| static CountDownLatch reqLatch; |
| |
| /** Cancel latch. */ |
| static CountDownLatch cancelLatch; |
| |
| /** Suspend query latch. */ |
| static CountDownLatch suspendQryLatch; |
| |
| /** |
| * Recreate latches. |
| */ |
| static void init() { |
| reqLatch = new CountDownLatch(1); |
| |
| cancelLatch = new CountDownLatch(1); |
| |
| suspendQryLatch = new CountDownLatch(1); |
| } |
| |
| /** |
| * Releases cancelLatch that leeds to sending cancel Query and waits until cancel Query is fully processed. |
| * |
| * @return 0; |
| */ |
| @QuerySqlFunction |
| public static long awaitLatchCancelled() { |
| try { |
| cancelLatch.countDown(); |
| reqLatch.await(); |
| } |
| catch (Exception ignored) { |
| // No-op. |
| } |
| |
| return 0; |
| } |
| |
| /** |
| * Waits latch release. |
| * |
| * @return 0; |
| */ |
| @QuerySqlFunction |
| public static long awaitQuerySuspensionLatch() { |
| try { |
| suspendQryLatch.await(); |
| } |
| catch (Exception ignored) { |
| // No-op. |
| } |
| |
| return 0; |
| } |
| |
| /** |
| * If called fails with corresponding message. |
| * |
| * @return 0; |
| */ |
| @QuerySqlFunction |
| public static long shouldNotBeCalledInCaseOfCancellation() { |
| fail("Query wasn't actually cancelled."); |
| |
| return 0; |
| } |
| |
| /** |
| * @param v amount of milliseconds to sleep |
| * @return amount of milliseconds to sleep |
| */ |
| @QuerySqlFunction |
| public static int sleep_func(int v) { |
| try { |
| Thread.sleep(v); |
| } |
| catch (InterruptedException ignored) { |
| // No-op |
| } |
| return v; |
| } |
| } |
| } |