blob: 15945b437e2c86678a9e81323dcb075ff9fa44a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import javax.cache.Cache;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CyclicBarrier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SpiQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingSpi;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.SystemPropertiesRule;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TestRule;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
* Ensures that SQL queries are executed in a dedicated thread pool.
*/
public class IgniteQueryDedicatedPoolTest extends GridCommonAbstractTest {
/** Class rule. */
@ClassRule public static final TestRule classRule = new SystemPropertiesRule();
/** Name of the cache for test */
private static final String CACHE_NAME = "query_pool_test";
/** Listener log messages. */
private static ListeningTestLogger testLog;
/** Query thread pool size. */
private Integer qryPoolSize;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
testLog = new ListeningTestLogger(false, log);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName)
.setGridLogger(testLog);
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setIndexedTypes(Integer.class, Integer.class);
ccfg.setIndexedTypes(Byte.class, Byte.class);
ccfg.setSqlFunctionClasses(IgniteQueryDedicatedPoolTest.class);
ccfg.setName(CACHE_NAME);
cfg.setCacheConfiguration(ccfg);
if ("client".equals(gridName))
cfg.setClientMode(true);
cfg.setIndexingSpi(new TestIndexingSpi());
if (nonNull(qryPoolSize))
cfg.setQueryThreadPoolSize(qryPoolSize);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
testLog.clearListeners();
}
/**
* Tests that SQL queries involving actual network IO are executed in dedicated pool.
* @throws Exception If failed.
*/
@Test
public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
startGrid("server");
try (Ignite client = startGrid("client")) {
IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
// We do this in order to have 1 row in results of select - function is called once per each row of result.
cache.put(1, 1);
// We have to refer to a cache explicitly in the query in order for it to be executed
// in non local distributed manner (yes, there's a "local distributed" manner too - see link above...)
QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy() from Integer"));
List<List<?>> result = cursor.getAll();
cursor.close();
assertEquals(1, result.size());
Byte plc = (Byte)result.get(0).get(0);
assertNotNull(plc);
assertEquals(GridIoPolicy.QUERY_POOL, (byte)plc);
}
}
/**
* Tests that Scan queries are executed in dedicated pool
* @throws Exception If failed.
*/
@Test
public void testScanQueryUsesDedicatedThreadPool() throws Exception {
startGrid("server");
try (Ignite client = startGrid("client")) {
IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
cache.put(0, 0);
QueryCursor<Cache.Entry<Object, Object>> cursor = cache.query(
new ScanQuery<>(new IgniteBiPredicate<Object, Object>() {
@Override public boolean apply(Object o, Object o2) {
return F.eq(GridIoManager.currentPolicy(), GridIoPolicy.QUERY_POOL);
}
}));
assertEquals(1, cursor.getAll().size());
cursor.close();
}
}
/**
* Tests that SPI queries are executed in dedicated pool
* @throws Exception If failed.
*/
@Test
public void testSpiQueryUsesDedicatedThreadPool() throws Exception {
startGrid("server");
try (Ignite client = startGrid("client")) {
IgniteCache<Byte, Byte> cache = client.cache(CACHE_NAME);
for (byte b = 0; b < Byte.MAX_VALUE; ++b)
cache.put(b, b);
QueryCursor<Cache.Entry<Byte, Byte>> cursor = cache.query(new SpiQuery<Byte, Byte>());
List<Cache.Entry<Byte, Byte>> all = cursor.getAll();
assertEquals(1, all.size());
assertEquals(GridIoPolicy.QUERY_POOL, (byte)all.get(0).getValue());
cursor.close();
}
}
/**
* Test for messages about query pool starvation in the logs.
*
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "10")
public void testContainsStarvationQryPoolInLog() throws Exception {
checkStarvationQryPoolInLog(
10_000,
"Possible thread pool starvation detected (no task completed in last 10ms, is query thread pool size " +
"large enough?)",
true
);
}
/**
* Test to verify that there are no query pool starvation messages in log.
*
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_STARVATION_CHECK_INTERVAL, value = "0")
public void testNotContainsStarvationQryPoolInLog() throws Exception {
checkStarvationQryPoolInLog(
1_000,
"Possible thread pool starvation detected (no task completed in",
false
);
}
/**
* Check messages about starvation query pool in log.
*
* @param checkTimeout Check timeout.
* @param findLogMsg Log message of interest.
* @param contains Expect whether or not messages are in log.
* @throws Exception If failed.
*/
private void checkStarvationQryPoolInLog(long checkTimeout, String findLogMsg ,boolean contains) throws Exception {
assertNotNull(findLogMsg);
qryPoolSize = 1;
startGrid("server");
IgniteEx clientNode = startGrid("client");
IgniteCache<Integer, Integer> cache = clientNode.cache(CACHE_NAME);
cache.put(0, 0);
int qrySize = 2;
CyclicBarrier barrier = new CyclicBarrier(qrySize);
LogListener logLsnr = LogListener.matches(findLogMsg).build();
testLog.registerListener(logLsnr);
for (int i = 0; i < qrySize; i++) {
runAsync(() -> {
barrier.await();
cache.query(new ScanQuery<>((o, o2) -> {
doSleep(500);
return true;
})
).getAll();
return null;
});
}
if (contains)
assertTrue(waitForCondition(logLsnr::check, checkTimeout));
else
assertFalse(waitForCondition(logLsnr::check, checkTimeout));
}
/**
* Custom SQL function to return current thread name from inside query executor
* @return Current IO policy
*/
@SuppressWarnings("unused")
@QuerySqlFunction(alias = "currentPolicy")
public static Byte currentPolicy() {
return GridIoManager.currentPolicy();
}
/**
* Indexing Spi implementation for test
*/
private static class TestIndexingSpi extends IgniteSpiAdapter implements IndexingSpi {
/** Index. */
private final SortedMap<Object, Object> idx = new TreeMap<>();
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) {
// No-op.
}
/** {@inheritDoc} */
@Override public void spiStop() {
// No-op.
}
/** {@inheritDoc} */
@Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params,
@Nullable IndexingQueryFilter filters) {
return idx.containsKey(GridIoPolicy.QUERY_POOL) ?
Collections.<Cache.Entry<?, ?>>singletonList(
new CacheEntryImpl<>(GridIoPolicy.QUERY_POOL, GridIoPolicy.QUERY_POOL)).iterator()
: Collections.<Cache.Entry<?, ?>>emptyList().iterator();
}
/** {@inheritDoc} */
@Override public void store(@Nullable String cacheName, Object key, Object val, long expirationTime) {
idx.put(key, val);
}
/** {@inheritDoc} */
@Override public void remove(@Nullable String cacheName, Object key) {
// No-op.
}
}
}