blob: a4327d68ea610587a6899e0f70a62b9f9b069145 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.ignite.internal.processors.query;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.TextQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Check query history metrics from server node.
*/
public class SqlQueryHistorySelfTest extends GridCommonAbstractTest {
/** */
private static final int QUERY_HISTORY_SIZE = 3;
/** */
private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startTestGrid();
IgniteCache<Integer, String> cacheA = grid(0).cache("A");
IgniteCache<Integer, String> cacheB = grid(0).cache("B");
for (int i = 0; i < 100; i++) {
cacheA.put(i, String.valueOf(i));
cacheB.put(i, String.valueOf(i));
}
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
((IgniteH2Indexing)queryNode().context().query().getIndexing()).runningQueryManager()
.resetQueryHistoryMetrics();
}
/**
* @param cacheName Cache name.
* @return Cache configuration.
*/
private CacheConfiguration<Integer, String> configureCache(String cacheName) {
CacheConfiguration<Integer, String> ccfg = defaultCacheConfiguration();
ccfg.setName(cacheName);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setIndexedTypes(Integer.class, String.class);
ccfg.setSqlFunctionClasses(Functions.class);
return ccfg;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(ipFinder);
cfg.setDiscoverySpi(disco);
cfg.setCacheConfiguration(configureCache("A"), configureCache("B"));
cfg.setSqlQueryHistorySize(QUERY_HISTORY_SIZE);
return cfg;
}
/**
* Test metrics for JDBC.
*
* @throws Exception In case of error.
*/
@Test
public void testJdbcSelectQueryHistory() throws Exception {
String qry = "select * from A.String";
checkQueryMetrics(qry);
}
/**
* Test metrics for JDBC in case not fully resultset is not fully read.
*
* @throws Exception In case of error.
*/
@Test
public void testJdbcSelectNotFullyFetchedQueryHistory() throws Exception {
String qry = "select * from A.String";
try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
stmt.setFetchSize(1);
ResultSet rs = stmt.executeQuery(qry);
assertTrue(rs.next());
checkMetrics(0, 0, 0, 0, true);
}
}
/**
* Test metrics for failed SQL queries.
*/
@Test
public void testJdbcQueryHistoryFailed() {
try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
stmt.executeQuery("select * from A.String where A.fail()=1");
fail("Query should be failed.");
}
catch (Exception ignore) {
//No-Op
}
checkMetrics(1, 0, 1, 1, true);
}
/**
* Test metrics for JDBC in case of DDL and DML
*
* @throws Exception In case of error.
*/
@Test
public void testJdbcQueryHistoryForDmlAndDdl() throws Exception {
List<String> cmds = Arrays.asList(
"create table TST(id int PRIMARY KEY, name varchar)",
"insert into TST(id) values(1)",
"commit"
);
try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
for (String cmd : cmds)
stmt.execute(cmd);
}
checkSeriesCommand(cmds);
}
/**
* @param cmds List of SQL commands.
* @throws IgniteInterruptedCheckedException In case of failure.
*/
private void checkSeriesCommand(List<String> cmds) throws IgniteInterruptedCheckedException {
waitingFor("size", QUERY_HISTORY_SIZE);
for (int i = 0; i < QUERY_HISTORY_SIZE; i++)
checkMetrics(QUERY_HISTORY_SIZE, i, 1, 0, false);
// Check that collected metrics contains correct items: metrics for last N queries.
Collection<QueryHistory> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing())
.runningQueryManager().queryHistoryMetrics().values();
assertEquals(QUERY_HISTORY_SIZE, metrics.size());
Set<String> qries = metrics.stream().map(QueryHistory::query).collect(Collectors.toSet());
for (int i = 0; i < cmds.size(); i++)
assertTrue(qries.contains(cmds.get(QUERY_HISTORY_SIZE - 1 - i)));
}
/**
* Test metrics for SQL fields queries.
*/
@Test
public void testSqlFieldsQueryHistory() {
SqlFieldsQuery qry = new SqlFieldsQuery("select * from String");
checkQueryMetrics(qry);
}
/**
* Test metrics for SQL fields queries.
*
* @throws Exception In case of error.
*/
@Test
public void testSqlFieldsQueryHistoryNotFullyFetched() throws Exception {
SqlFieldsQuery qry = new SqlFieldsQuery("select * from String");
qry.setPageSize(10);
checkQueryNotFullyFetchedMetrics(qry, false);
}
/**
* Test metrics for failed SQL queries.
*/
@Test
public void testSqlFieldsQueryHistoryFailed() {
SqlFieldsQuery qry = new SqlFieldsQuery("select * from String where fail()=1");
checkQueryFailedMetrics(qry);
}
/**
* Test metrics eviction.
*
* @throws Exception In case of error.
*/
@Test
public void testQueryHistoryForDmlAndDdl() throws Exception {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
List<String> cmds = Arrays.asList(
"create table TST(id int PRIMARY KEY, name varchar)",
"insert into TST(id) values(1)",
"commit"
);
cmds.forEach((cmd) ->
cache.query(new SqlFieldsQuery(cmd)).getAll()
);
checkSeriesCommand(cmds);
}
/**
* Test metrics eviction.
*
* @throws Exception In case of error.
*/
@Test
public void testQueryHistoryEviction() throws Exception {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
cache.query(new SqlFieldsQuery("select * from String")).getAll();
cache.query(new SqlFieldsQuery("select count(*) from String")).getAll();
cache.query(new SqlFieldsQuery("select * from String limit 1")).getAll();
cache.query(new SqlFieldsQuery("select * from String limit 2")).getAll();
cache.query(new SqlQuery<>("String", "from String")).getAll();
waitingFor("size", QUERY_HISTORY_SIZE);
for (int i = 0; i < QUERY_HISTORY_SIZE; i++)
checkMetrics(QUERY_HISTORY_SIZE, i, 1, 0, false);
// Check that collected metrics contains correct items: metrics for last N queries.
Collection<QueryHistory> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing())
.runningQueryManager().queryHistoryMetrics().values();
assertEquals(QUERY_HISTORY_SIZE, metrics.size());
Set<String> qries = metrics.stream().map(QueryHistory::query).collect(Collectors.toSet());
assertTrue(qries.contains("SELECT \"A\".\"STRING\"._KEY, \"A\".\"STRING\"._VAL from String"));
assertTrue(qries.contains("select * from String limit 2"));
assertTrue(qries.contains("select * from String limit 1"));
}
/**
* Test metrics if queries executed from several threads.
*
* @throws Exception In case of error.
*/
@Test
public void testQueryHistoryMultithreaded() throws Exception {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
Collection<Worker> workers = new ArrayList<>();
int repeat = 10;
for (int k = 0; k < repeat; k++) {
// Execute as match queries as history size to avoid eviction.
for (int i = 1; i <= QUERY_HISTORY_SIZE; i++)
workers.add(new Worker(cache, new SqlFieldsQuery("select * from String limit " + i)));
}
for (Worker worker : workers)
worker.start();
for (Worker worker : workers)
worker.join();
for (int i = 0; i < QUERY_HISTORY_SIZE; i++)
checkMetrics(QUERY_HISTORY_SIZE, i, repeat, 0, false);
}
/**
* Test metrics for Scan queries.
*/
@Test
public void testScanQueryHistory() {
ScanQuery<Integer, String> qry = new ScanQuery<>();
checkNoQueryMetrics(qry);
}
/**
* Test metrics for Scan queries.
*/
@Test
public void testSqlQueryHistory() {
SqlQuery<Integer, String> qry = new SqlQuery<>("String", "from String");
checkQueryMetrics(qry);
}
/**
* Test metrics for Scan queries.
*
* @throws Exception In case of error.
*/
@Test
public void testSqlQueryHistoryNotFullyFetched() throws Exception {
SqlQuery<Integer, String> qry = new SqlQuery<>("String", "from String");
qry.setPageSize(10);
checkQueryNotFullyFetchedMetrics(qry, true);
}
/**
* Test metrics for Sql queries.
*/
@Test
public void testTextQueryMetrics() {
TextQuery qry = new TextQuery<>("String", "1");
checkNoQueryMetrics(qry);
}
/**
* Test metrics for Sql queries.
*
* @throws Exception In case of error.
*/
@Test
public void testTextQueryHistoryNotFullyFetched() throws Exception {
TextQuery qry = new TextQuery<>("String", "1");
qry.setPageSize(10);
checkQueryNotFullyFetchedMetrics(qry, true);
}
/**
* Test metrics for SQL cross cache queries.
*/
@Test
public void testSqlFieldsCrossCacheQueryHistory() {
SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String");
checkQueryMetrics(qry);
}
/**
* Test metrics for SQL cross cache queries.
*
* @throws Exception In case of error.
*/
@Test
public void testSqlFieldsQueryHistoryCrossCacheQueryNotFullyFetched() throws Exception {
SqlFieldsQuery qry = new SqlFieldsQuery("select * from \"B\".String");
qry.setPageSize(10);
checkQueryNotFullyFetchedMetrics(qry, false);
}
/**
* Check metrics.
*
* @param sz Expected size of metrics.
* @param idx Index of metrics to check.
* @param execs Expected number of executions.
* @param failures Expected number of failures.
* @param first {@code true} if metrics checked for first query only.
*/
private void checkMetrics(int sz, int idx, int execs, int failures,
boolean first) {
Collection<QueryHistory> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing())
.runningQueryManager().queryHistoryMetrics().values();
assertNotNull(metrics);
assertEquals(sz, metrics.size());
if (sz == 0)
return;
QueryHistory m = new ArrayList<>(metrics).get(idx);
info("Metrics: " + m);
assertEquals("Executions", execs, m.executions());
assertEquals("Failures", failures, m.failures());
assertTrue(m.maximumTime() >= 0);
assertTrue(m.minimumTime() >= 0);
assertTrue(m.lastStartTime() > 0 && m.lastStartTime() <= System.currentTimeMillis());
if (first)
assertEquals("On first execution minTime == maxTime", m.minimumTime(), m.maximumTime());
}
/**
* @param qry Query.
* @throws SQLException In case of failure.
*/
private void checkQueryMetrics(String qry) throws SQLException {
runJdbcQuery(qry);
checkMetrics(1, 0, 1, 0, true);
// Execute again with the same parameters.
runJdbcQuery(qry);
checkMetrics(1, 0, 2, 0, false);
}
/**
* @param qry SQL query.
* @throws SQLException In case of failure.
*/
private void runJdbcQuery(String qry) throws SQLException {
try (Connection conn = GridTestUtils.connect(queryNode(), null); Statement stmt = conn.createStatement()) {
stmt.execute(qry);
}
}
/**
* @param qry Query.
*/
private void checkQueryMetrics(Query qry) {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
// Execute query.
cache.query(qry).getAll();
checkMetrics(1, 0, 1, 0, true);
// Execute again with the same parameters.
cache.query(qry).getAll();
checkMetrics(1, 0, 2, 0, false);
}
/**
* @param qry Query.
*/
private void checkNoQueryMetrics(Query qry) {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
// Execute query.
cache.query(qry).getAll();
checkMetrics(0, 0, 0, 0, true);
// Execute again with the same parameters.
cache.query(qry).getAll();
checkMetrics(0, 0, 0, 0, false);
}
/**
* @param qry Query.
* @param waitingForCompletion Waiting for query completion.
*/
private void checkQueryNotFullyFetchedMetrics(Query qry, boolean waitingForCompletion)
throws IgniteInterruptedCheckedException {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
// Execute query.
cache.query(qry).iterator().next();
if (waitingForCompletion)
waitingFor("executions", 1);
checkMetrics(0, 0, 0, 0, true);
// Execute again with the same parameters.
cache.query(qry).iterator().next();
if (waitingForCompletion)
waitingFor("executions", 2);
checkMetrics(0, 0, 0, 0, false);
}
/**
* @param qry Query.
*/
private void checkQueryFailedMetrics(Query qry) {
IgniteCache<Integer, String> cache = queryNode().context().cache().jcache("A");
try {
// Execute invalid query.
cache.query(qry).getAll();
}
catch (Exception ignored) {
// No-op.
}
checkMetrics(1, 0, 1, 1, true);
try {
// Execute invalid query again with the same parameters.
cache.query(qry).getAll();
}
catch (Exception ignored) {
// No-op.
}
checkMetrics(1, 0, 2, 2, false);
}
/**
* @param cond Condition to check.
* @param exp Expected value.
*/
private void waitingFor(final String cond, final int exp) throws IgniteInterruptedCheckedException {
GridTestUtils.waitForCondition(() -> {
Collection<QueryHistory> metrics = ((IgniteH2Indexing)queryNode().context().query().getIndexing())
.runningQueryManager().queryHistoryMetrics().values();
switch (cond) {
case "size":
return metrics.size() == exp;
case "executions":
int executions = 0;
for (QueryHistory m : metrics)
executions += m.executions();
return executions == exp;
default:
return true;
}
}, 2000);
}
/**
* @return Ignite instance for quering.
*/
protected IgniteEx queryNode() {
IgniteEx node = grid(0);
assertFalse(node.context().clientNode());
return node;
}
/**
* @throws Exception In case of failure.
*/
protected void startTestGrid() throws Exception {
startGrids(2);
}
/**
*
*/
public static class Functions {
/**
*
*/
@QuerySqlFunction
public static int fail() {
throw new IgniteSQLException("SQL function fail for test purpuses");
}
}
/**
*
*/
private static class Worker extends Thread {
/**
*
*/
private final IgniteCache cache;
/**
*
*/
private final Query qry;
/**
*
*/
Worker(IgniteCache cache, Query qry) {
this.cache = cache;
this.qry = qry;
}
/** {@inheritDoc} */
@Override public void run() {
cache.query(qry).getAll();
}
}
}