blob: e107a97c098cf184f8d2822a07e1b891bea16303 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.file.Path;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.cache.eviction.EvictionFilter;
import org.apache.ignite.cache.eviction.EvictionPolicy;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.managers.discovery.ClusterMetricsImpl;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.util.lang.GridNodePredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.metric.sql.SqlViewMetricExporterSpi;
import org.apache.ignite.spi.systemview.view.SqlTableView;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Assert;
import org.junit.Test;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toSet;
import static org.junit.Assert.assertNotEquals;
/**
* Tests for ignite SQL system views.
*/
public class SqlSystemViewsSelfTest extends AbstractIndexingCommonTest {
/** Metrics check attempts. */
private static final int METRICS_CHECK_ATTEMPTS = 10;
/** */
private boolean isPersistenceEnabled;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
isPersistenceEnabled = false;
GridQueryProcessor.idxCls = null;
}
/** @return System schema name. */
protected String systemSchemaName() {
return "SYS";
}
/**
* @param ignite Ignite.
* @param sql Sql.
* @param args Args.
*/
@SuppressWarnings("unchecked")
private List<List<?>> execSql(Ignite ignite, String sql, Object... args) {
IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
SqlFieldsQuery qry = new SqlFieldsQuery(sql);
if (args != null && args.length > 0)
qry.setArgs(args);
return cache.query(qry).getAll();
}
/**
* @param sql Sql.
* @param args Args.
*/
private List<List<?>> execSql(String sql, Object... args) {
return execSql(grid(), sql, args);
}
/**
* @param sql Sql.
*/
private void assertSqlError(final String sql) {
Throwable t = GridTestUtils.assertThrowsWithCause(new Callable<Void>() {
@Override public Void call() {
execSql(sql);
return null;
}
}, IgniteSQLException.class);
IgniteSQLException sqlE = X.cause(t, IgniteSQLException.class);
assert sqlE != null;
assertEquals(IgniteQueryErrorCode.UNSUPPORTED_OPERATION, sqlE.statusCode());
}
/**
* Test system views modifications.
*/
@Test
public void testModifications() throws Exception {
startGrid(getConfiguration());
assertSqlError("DROP TABLE " + systemSchemaName() + ".NODES");
assertSqlError("TRUNCATE TABLE " + systemSchemaName() + ".NODES");
assertSqlError("ALTER TABLE " + systemSchemaName() + ".NODES RENAME TO " + systemSchemaName() + ".N");
assertSqlError("ALTER TABLE " + systemSchemaName() + ".NODES ADD COLUMN C VARCHAR");
assertSqlError("ALTER TABLE " + systemSchemaName() + ".NODES DROP COLUMN NODE_ID");
assertSqlError("ALTER TABLE " + systemSchemaName() + ".NODES RENAME COLUMN NODE_ID TO C");
assertSqlError("CREATE INDEX IDX ON " + systemSchemaName() + ".NODES(NODE_ID)");
assertSqlError("INSERT INTO " + systemSchemaName() + ".NODES (NODE_ID) VALUES ('-')");
assertSqlError("UPDATE " + systemSchemaName() + ".NODES SET NODE_ID = '-'");
assertSqlError("DELETE " + systemSchemaName() + ".NODES");
}
/**
* Test schemas system view.
* @throws Exception in case of failure.
*/
@Test
public void testSchemasView() throws Exception {
IgniteEx srv = startGrid(getConfiguration().setSqlSchemas("PREDIFINED_SCHEMA_1"));
IgniteEx client =
startClientGrid(getConfiguration().setIgniteInstanceName("CLIENT").setSqlSchemas("PREDIFINED_SCHEMA_2"));
srv.createCache(cacheConfiguration("TST1"));
String schemasSql = "SELECT * FROM " + systemSchemaName() + ".SCHEMAS";
List<List<?>> srvNodeSchemas = execSql(schemasSql);
List<List<?>> clientNodeSchemas = execSql(client, schemasSql);
Set expSchemasSrv = Sets.newHashSet("PREDIFINED_SCHEMA_1", "PUBLIC", "TST1", systemSchemaName());
Set schemasSrv = srvNodeSchemas.stream().map(f -> f.get(0)).map(String.class::cast).collect(toSet());
Assert.assertEquals(expSchemasSrv, schemasSrv);
Set expSchemasCli = Sets.newHashSet("PREDIFINED_SCHEMA_2", "PUBLIC", "TST1", systemSchemaName());
Set schemasCli = clientNodeSchemas.stream().map(f -> f.get(0)).map(String.class::cast).collect(toSet());
Assert.assertEquals(expSchemasCli, schemasCli);
}
/**
* Test indexes system view.
*
* @throws Exception in case of failure.
*/
@Test
public void testIndexesView() throws Exception {
IgniteEx srv = startGrid(getConfiguration());
IgniteEx client = startClientGrid(getConfiguration().setIgniteInstanceName("CLIENT"));
srv.createCache(cacheConfiguration("TST1"));
execSql("CREATE TABLE PUBLIC.AFF_CACHE (ID1 INT, ID2 INT, MY_VAL VARCHAR, PRIMARY KEY (ID1 DESC, ID2)) WITH \"affinity_key=ID2\"");
execSql("CREATE TABLE CACHE_SQL (ID INT PRIMARY KEY, MY_VAL VARCHAR)");
execSql("CREATE INDEX IDX_2 ON CACHE_SQL(ID DESC) INLINE_SIZE 13");
execSql("CREATE TABLE PUBLIC.DFLT_CACHE (ID1 INT, ID2 INT, MY_VAL VARCHAR, PRIMARY KEY (ID1 DESC, ID2))");
execSql("CREATE INDEX IDX_1 ON PUBLIC.DFLT_CACHE(ID2 DESC, ID1, MY_VAL DESC)");
execSql("CREATE INDEX IDX_3 ON PUBLIC.DFLT_CACHE(MY_VAL)");
execSql("CREATE TABLE PUBLIC.DFLT_AFF_CACHE (ID1 INT, ID2 INT, MY_VAL VARCHAR, PRIMARY KEY (ID1 DESC, ID2)) WITH \"affinity_key=ID1\"");
execSql("CREATE INDEX IDX_AFF_1 ON PUBLIC.DFLT_AFF_CACHE(ID2 DESC, ID1, MY_VAL DESC)");
String idxSql = "SELECT " +
" CACHE_ID," +
" CACHE_NAME," +
" SCHEMA_NAME," +
" TABLE_NAME," +
" INDEX_NAME," +
" INDEX_TYPE," +
" COLUMNS," +
" IS_PK," +
" IS_UNIQUE," +
" INLINE_SIZE" +
" FROM " + systemSchemaName() + ".INDEXES ORDER BY TABLE_NAME, INDEX_NAME";
List<List<?>> srvNodeIndexes = execSql(srv, idxSql);
List<List<?>> clientNodeNodeIndexes = execSql(client, idxSql);
assertTrue(srvNodeIndexes.containsAll(clientNodeNodeIndexes));
//ToDo: As of now we can see duplicates columns within index due to https://issues.apache.org/jira/browse/IGNITE-11125
String[][] expectedResults = {
{"-825022849", "SQL_PUBLIC_AFF_CACHE", "PUBLIC", "AFF_CACHE", "AFFINITY_KEY", "BTREE", "\"ID2\" ASC, \"ID1\" ASC", "false", "false", "10"},
{"-825022849", "SQL_PUBLIC_AFF_CACHE", "PUBLIC", "AFF_CACHE", "_key_PK", "BTREE", "\"ID1\" ASC, \"ID2\" ASC", "true", "true", "10"},
{"-825022849", "SQL_PUBLIC_AFF_CACHE", "PUBLIC", "AFF_CACHE", "_key_PK__SCAN_", "SCAN", "null", "false", "false", "0"},
{"-825022849", "SQL_PUBLIC_AFF_CACHE", "PUBLIC", "AFF_CACHE", "_key_PK_hash", "HASH", "\"ID1\" ASC, \"ID2\" ASC, \"ID2\" ASC", "true", "true", "0"},
{"707660652", "SQL_PUBLIC_CACHE_SQL", "PUBLIC", "CACHE_SQL", "IDX_2", "BTREE", "\"ID\" DESC, \"ID\" ASC", "false", "false", "13"},
{"707660652", "SQL_PUBLIC_CACHE_SQL", "PUBLIC", "CACHE_SQL", "IDX_2_proxy", "BTREE", "\"ID\" DESC, \"ID\" ASC", "false", "false", "0"},
{"707660652", "SQL_PUBLIC_CACHE_SQL", "PUBLIC", "CACHE_SQL", "_key_PK", "BTREE", "\"ID\" ASC", "true", "true", "5"},
{"707660652", "SQL_PUBLIC_CACHE_SQL", "PUBLIC", "CACHE_SQL", "_key_PK__SCAN_", "SCAN", "null", "false", "false", "0"},
{"707660652", "SQL_PUBLIC_CACHE_SQL", "PUBLIC", "CACHE_SQL", "_key_PK_hash", "HASH", "\"ID\" ASC", "true", "true", "0"},
{"707660652", "SQL_PUBLIC_CACHE_SQL", "PUBLIC", "CACHE_SQL", "_key_PK_proxy", "BTREE", "\"ID\" ASC", "false", "false", "0"},
{"1374144180", "SQL_PUBLIC_DFLT_AFF_CACHE", "PUBLIC", "DFLT_AFF_CACHE", "AFFINITY_KEY", "BTREE", "\"ID1\" ASC, \"ID2\" ASC", "false", "false", "10"},
{"1374144180", "SQL_PUBLIC_DFLT_AFF_CACHE", "PUBLIC", "DFLT_AFF_CACHE", "IDX_AFF_1", "BTREE", "\"ID2\" DESC, \"ID1\" ASC, \"MY_VAL\" DESC", "false", "false", "10"},
{"1374144180", "SQL_PUBLIC_DFLT_AFF_CACHE", "PUBLIC", "DFLT_AFF_CACHE", "_key_PK", "BTREE", "\"ID1\" ASC, \"ID2\" ASC", "true", "true", "10"},
{"1374144180", "SQL_PUBLIC_DFLT_AFF_CACHE", "PUBLIC", "DFLT_AFF_CACHE", "_key_PK__SCAN_", "SCAN", "null", "false", "false", "0"},
{"1374144180", "SQL_PUBLIC_DFLT_AFF_CACHE", "PUBLIC", "DFLT_AFF_CACHE", "_key_PK_hash", "HASH", "\"ID1\" ASC, \"ID2\" ASC, \"ID1\" ASC", "true", "true", "0"},
{"1102275506", "SQL_PUBLIC_DFLT_CACHE", "PUBLIC", "DFLT_CACHE", "IDX_1", "BTREE", "\"ID2\" DESC, \"ID1\" ASC, \"MY_VAL\" DESC, \"ID1\" ASC, \"ID2\" ASC", "false", "false", "10"},
{"1102275506", "SQL_PUBLIC_DFLT_CACHE", "PUBLIC", "DFLT_CACHE", "IDX_3", "BTREE", "\"MY_VAL\" ASC, \"ID1\" ASC, \"ID2\" ASC, \"ID1\" ASC, \"ID2\" ASC", "false", "false", "10"},
{"1102275506", "SQL_PUBLIC_DFLT_CACHE", "PUBLIC", "DFLT_CACHE", "_key_PK", "BTREE", "\"ID1\" ASC, \"ID2\" ASC", "true", "true", "10"},
{"1102275506", "SQL_PUBLIC_DFLT_CACHE", "PUBLIC", "DFLT_CACHE", "_key_PK__SCAN_", "SCAN", "null", "false", "false", "0"},
{"1102275506", "SQL_PUBLIC_DFLT_CACHE", "PUBLIC", "DFLT_CACHE", "_key_PK_hash", "HASH", "\"ID1\" ASC, \"ID2\" ASC", "true", "true", "0"},
{"2584860", "TST1", "TST1", "VALUECLASS", "TST1_INDEX", "BTREE", "\"KEY\" ASC, \"_KEY\" ASC", "false", "false", "10"},
{"2584860", "TST1", "TST1", "VALUECLASS", "TST1_INDEX_proxy", "BTREE", "\"_KEY\" ASC, \"KEY\" ASC", "false", "false", "0"},
{"2584860", "TST1", "TST1", "VALUECLASS", "_key_PK", "BTREE", "\"_KEY\" ASC", "true", "true", "10"},
{"2584860", "TST1", "TST1", "VALUECLASS", "_key_PK__SCAN_", "SCAN", "null", "false", "false", "0"},
{"2584860", "TST1", "TST1", "VALUECLASS", "_key_PK_hash", "HASH", "\"_KEY\" ASC", "true", "true", "0"},
{"2584860", "TST1", "TST1", "VALUECLASS", "_key_PK_proxy", "BTREE", "\"KEY\" ASC", "false", "false", "0"}
};
for (int i = 0; i < srvNodeIndexes.size(); i++) {
List<?> resRow = srvNodeIndexes.get(i);
String[] expRow = expectedResults[i];
assertEquals(expRow.length, resRow.size());
for (int j = 0; j < expRow.length; j++)
assertEquals(expRow[j], String.valueOf(resRow.get(j)));
}
}
/**
* Tests {@link SqlTableView#isIndexRebuildInProgress()}.
*
* @throws Exception If failed.
*/
@Test
public void testTableViewDuringRebuilding() throws Exception {
isPersistenceEnabled = true;
IgniteEx srv = startGrid(getConfiguration());
srv.cluster().active(true);
String cacheName1 = "CACHE_1";
String cacheSqlName1 = "SQL_PUBLIC_" + cacheName1;
String cacheName2 = "CACHE_2";
String cacheSqlName2 = "SQL_PUBLIC_" + cacheName2;
execSql("CREATE TABLE " + cacheName1 + " (ID1 INT PRIMARY KEY, MY_VAL VARCHAR)");
execSql("CREATE INDEX IDX_1 ON "+ cacheName1 + " (MY_VAL DESC)");
execSql("CREATE TABLE " + cacheName2 + " (ID INT PRIMARY KEY, MY_VAL VARCHAR)");
execSql("CREATE INDEX IDX_2 ON " + cacheName2 + " (ID DESC)");
// Put data to create indexes.
execSql("INSERT INTO " + cacheName1 + " VALUES(?, ?)", 1, "12345");
execSql("INSERT INTO " + cacheName2 + " VALUES(?, ?)", 1, "12345");
List<Path> idxPaths = getIndexBinPaths(cacheSqlName1);
idxPaths.addAll(getIndexBinPaths(cacheSqlName2));
stopAllGrids();
idxPaths.forEach(idxPath -> assertTrue(U.delete(idxPath)));
GridQueryProcessor.idxCls = BlockingIndexing.class;
srv = startGrid(getConfiguration());
srv.cluster().active(true);
checkIndexRebuild(cacheName1, true);
checkIndexRebuild(cacheName2, true);
((BlockingIndexing)srv.context().query().getIndexing()).stopBlock(cacheSqlName1);
srv.cache(cacheSqlName1).indexReadyFuture().get(30_000);
checkIndexRebuild(cacheName1, false);
checkIndexRebuild(cacheName2, true);
((BlockingIndexing)srv.context().query().getIndexing()).stopBlock(cacheSqlName2);
srv.cache(cacheSqlName2).indexReadyFuture().get(30_000);
checkIndexRebuild(cacheName1, false);
checkIndexRebuild(cacheName2, false);
}
/**
* Checks index rebuilding for given cache.
*
* @param cacheName Cache name.
* @param rebuild Is indexes rebuild in progress.
*/
private void checkIndexRebuild(String cacheName, boolean rebuild) {
String idxSql = "SELECT IS_INDEX_REBUILD_IN_PROGRESS FROM " + systemSchemaName() + ".TABLES " +
"WHERE TABLE_NAME = ?";
List<List<?>> res = execSql(grid(), idxSql, cacheName);
assertFalse(res.isEmpty());
assertTrue(res.stream().allMatch(row -> {
assertEquals(1, row.size());
Boolean isIndexRebuildInProgress = (Boolean)row.get(0);
return isIndexRebuildInProgress == rebuild;
}));
}
/**
* @return Default cache configuration.
*/
protected CacheConfiguration<AbstractSchemaSelfTest.KeyClass, AbstractSchemaSelfTest.ValueClass> cacheConfiguration(String cacheName) throws Exception {
CacheConfiguration ccfg = new CacheConfiguration().setName(cacheName);
QueryEntity entity = new QueryEntity();
entity.setKeyType(AbstractSchemaSelfTest.KeyClass.class.getName());
entity.setValueType(AbstractSchemaSelfTest.ValueClass.class.getName());
entity.setKeyFieldName("key");
entity.addQueryField("key", entity.getKeyType(), null);
entity.addQueryField("id", Long.class.getName(), null);
entity.addQueryField("field1", Long.class.getName(), null);
entity.setKeyFields(Collections.singleton("id"));
entity.setIndexes(Collections.singletonList(
new QueryIndex("key", true, cacheName + "_index")
));
ccfg.setQueryEntities(Collections.singletonList(entity));
return ccfg;
}
/**
* Test different query modes.
*/
@Test
public void testQueryModes() throws Exception {
Ignite ignite = startGrid(0);
startGrid(1);
UUID nodeId = ignite.cluster().localNode().id();
IgniteCache cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
String sql = "SELECT NODE_ID FROM " + systemSchemaName() + ".NODES WHERE NODE_ORDER = 1";
SqlFieldsQuery qry;
qry = new SqlFieldsQuery(sql).setDistributedJoins(true);
assertEquals(nodeId, ((List<?>)cache.query(qry).getAll().get(0)).get(0));
qry = new SqlFieldsQuery(sql).setReplicatedOnly(true);
assertEquals(nodeId, ((List<?>)cache.query(qry).getAll().get(0)).get(0));
qry = new SqlFieldsQuery(sql).setLocal(true);
assertEquals(nodeId, ((List<?>)cache.query(qry).getAll().get(0)).get(0));
}
/**
* Test Query history system view.
*/
@Test
@SuppressWarnings("unchecked")
public void testQueryHistoryMetricsModes() throws Exception {
IgniteEx ignite = startGrid(0);
final String SCHEMA_NAME = "TEST_SCHEMA";
final long MAX_SLEEP = 500;
final long MIN_SLEEP = 50;
long tsBeforeRun = System.currentTimeMillis();
IgniteCache cache = ignite.createCache(
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setIndexedTypes(Integer.class, String.class)
.setSqlSchema(SCHEMA_NAME)
.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class)
);
cache.put(100, "200");
String sql = "SELECT \"STRING\"._KEY, \"STRING\"._VAL FROM \"STRING\" WHERE _key=100 AND sleep_and_can_fail()>0";
GridTestUtils.SqlTestFunctions.sleepMs = MIN_SLEEP;
GridTestUtils.SqlTestFunctions.fail = false;
cache.query(new SqlFieldsQuery(sql).setSchema(SCHEMA_NAME)).getAll();
GridTestUtils.SqlTestFunctions.sleepMs = MAX_SLEEP;
GridTestUtils.SqlTestFunctions.fail = false;
cache.query(new SqlFieldsQuery(sql).setSchema(SCHEMA_NAME)).getAll();
GridTestUtils.SqlTestFunctions.sleepMs = MIN_SLEEP;
GridTestUtils.SqlTestFunctions.fail = true;
GridTestUtils.assertThrows(log,
() ->
cache.query(new SqlFieldsQuery(sql).setSchema(SCHEMA_NAME)).getAll()
, CacheException.class,
"Exception calling user-defined function");
String sqlHist = "SELECT SCHEMA_NAME, SQL, LOCAL, EXECUTIONS, FAILURES, DURATION_MIN, DURATION_MAX, LAST_START_TIME " +
"FROM " + systemSchemaName() + ".SQL_QUERIES_HISTORY ORDER BY LAST_START_TIME";
cache.query(new SqlFieldsQuery(sqlHist).setLocal(true)).getAll();
cache.query(new SqlFieldsQuery(sqlHist).setLocal(true)).getAll();
List<List<?>> res = cache.query(new SqlFieldsQuery(sqlHist).setLocal(true)).getAll();
assertEquals(2, res.size());
long tsAfterRun = System.currentTimeMillis();
List<?> firstRow = res.get(0);
List<?> secondRow = res.get(1);
//SCHEMA_NAME
assertEquals(SCHEMA_NAME, firstRow.get(0));
assertEquals(SCHEMA_NAME, secondRow.get(0));
//SQL
assertEquals(sql, firstRow.get(1));
assertEquals(sqlHist, secondRow.get(1));
// LOCAL flag
assertEquals(false, firstRow.get(2));
assertEquals(true, secondRow.get(2));
// EXECUTIONS
assertEquals(3L, firstRow.get(3));
assertEquals(2L, secondRow.get(3));
//FAILURES
assertEquals(1L, firstRow.get(4));
assertEquals(0L, secondRow.get(4));
//DURATION_MIN
assertTrue((Long)firstRow.get(5) >= MIN_SLEEP);
assertTrue((Long)firstRow.get(5) < (Long)firstRow.get(6));
//DURATION_MAX
assertTrue((Long)firstRow.get(6) >= MAX_SLEEP);
//LAST_START_TIME
assertFalse(((Timestamp)firstRow.get(7)).before(new Timestamp(tsBeforeRun)));
assertFalse(((Timestamp)firstRow.get(7)).after(new Timestamp(tsAfterRun)));
}
/**
* Test running queries system view.
*/
@Test
public void testRunningQueriesView() throws Exception {
IgniteEx ignite = startGrid(0);
IgniteCache cache = ignite.createCache(
new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, String.class)
);
cache.put(100,"200");
String sql = "SELECT SQL, QUERY_ID, SCHEMA_NAME, LOCAL, START_TIME, DURATION FROM " +
systemSchemaName() + ".SQL_QUERIES";
FieldsQueryCursor notClosedFieldQryCursor = cache.query(new SqlFieldsQuery(sql).setLocal(true));
List<?> cur = cache.query(new SqlFieldsQuery(sql).setLocal(true)).getAll();
assertEquals(2, cur.size());
List<?> res0 = (List<?>)cur.get(0);
List<?> res1 = (List<?>)cur.get(1);
Timestamp ts = (Timestamp)res0.get(4);
Instant now = Instant.now();
long diffInMillis = now.minusMillis(ts.getTime()).toEpochMilli();
assertTrue(diffInMillis < 3000);
assertEquals(sql, res0.get(0));
assertEquals(sql, res1.get(0));
assertTrue((Boolean)res0.get(3));
String id0 = (String)res0.get(1);
String id1 = (String)res1.get(1);
assertNotEquals(id0, id1);
String qryPrefix = ignite.localNode().id() + "_";
String qryId1 = qryPrefix + "1";
String qryId2 = qryPrefix + "2";
assertTrue(id0.equals(qryId1) || id1.equals(qryId1));
assertTrue(id0.equals(qryId2) || id1.equals(qryId2));
assertEquals(2, cache.query(new SqlFieldsQuery(sql)).getAll().size());
notClosedFieldQryCursor.close();
assertEquals(1, cache.query(new SqlFieldsQuery(sql)).getAll().size());
cache.put(100,"200");
QueryCursor notClosedQryCursor = cache.query(new SqlQuery<>(String.class, "_key=100"));
String expSqlQry = "SELECT \"default\".\"STRING\"._KEY, \"default\".\"STRING\"._VAL FROM " +
"\"default\".\"STRING\" WHERE _key=100";
cur = cache.query(new SqlFieldsQuery(sql)).getAll();
assertEquals(2, cur.size());
res0 = (List<?>)cur.get(0);
res1 = (List<?>)cur.get(1);
assertTrue(expSqlQry, res0.get(0).equals(expSqlQry) || res1.get(0).equals(expSqlQry));
assertFalse((Boolean)res0.get(3));
assertFalse((Boolean)res1.get(3));
notClosedQryCursor.close();
sql = "SELECT SQL, QUERY_ID FROM " + systemSchemaName() + ".SQL_QUERIES WHERE QUERY_ID='" + qryPrefix + "7'";
assertEquals(qryPrefix + "7", ((List<?>)cache.query(new SqlFieldsQuery(sql)).getAll().get(0)).get(1));
sql = "SELECT SQL FROM " + systemSchemaName() + ".SQL_QUERIES WHERE DURATION > 100000";
assertTrue(cache.query(new SqlFieldsQuery(sql)).getAll().isEmpty());
sql = "SELECT SQL FROM " + systemSchemaName() + ".SQL_QUERIES WHERE QUERY_ID='UNKNOWN'";
assertTrue(cache.query(new SqlFieldsQuery(sql)).getAll().isEmpty());
}
/**
* Test that we can't use cache tables and system views in the same query.
*/
@Test
public void testCacheToViewJoin() throws Exception {
Ignite ignite = startGrid();
ignite.createCache(new CacheConfiguration<>().setName(DEFAULT_CACHE_NAME).setQueryEntities(
Collections.singleton(new QueryEntity(Integer.class.getName(), String.class.getName()))));
assertSqlError("SELECT * FROM \"" + DEFAULT_CACHE_NAME + "\".String JOIN " + systemSchemaName() + ".NODES ON 1=1");
}
/**
* @param rowData Row data.
* @param colTypes Column types.
*/
private void assertColumnTypes(List<?> rowData, Class<?>... colTypes) {
for (int i = 0; i < colTypes.length; i++) {
if (rowData.get(i) != null)
assertEquals("Column " + i + " type", colTypes[i], rowData.get(i).getClass());
}
}
/**
* Test nodes system view.
*
* @throws Exception If failed.
*/
@Test
public void testNodesViews() throws Exception {
Ignite igniteSrv = startGrid(getTestIgniteInstanceName(), getConfiguration().setMetricsUpdateFrequency(500L));
Ignite igniteCli =
startClientGrid(getTestIgniteInstanceName(1), getConfiguration().setMetricsUpdateFrequency(500L));
startGrid(getTestIgniteInstanceName(2), getConfiguration().setMetricsUpdateFrequency(500L).setDaemon(true));
UUID nodeId0 = igniteSrv.cluster().localNode().id();
awaitPartitionMapExchange();
List<List<?>> resAll = execSql("SELECT NODE_ID, CONSISTENT_ID, VERSION, IS_CLIENT, IS_DAEMON, " +
"NODE_ORDER, ADDRESSES, HOSTNAMES FROM " + systemSchemaName() + ".NODES");
assertColumnTypes(resAll.get(0), UUID.class, String.class, String.class, Boolean.class, Boolean.class,
Long.class, String.class, String.class);
assertEquals(3, resAll.size());
List<List<?>> resSrv = execSql(
"SELECT NODE_ID, NODE_ORDER FROM " +
systemSchemaName() + ".NODES WHERE IS_CLIENT = FALSE AND IS_DAEMON = FALSE"
);
assertEquals(1, resSrv.size());
assertEquals(nodeId0, resSrv.get(0).get(0));
assertEquals(1L, resSrv.get(0).get(1));
List<List<?>> resCli = execSql(
"SELECT NODE_ID, NODE_ORDER FROM " + systemSchemaName() + ".NODES WHERE IS_CLIENT = TRUE");
assertEquals(1, resCli.size());
assertEquals(nodeId(1), resCli.get(0).get(0));
assertEquals(2L, resCli.get(0).get(1));
List<List<?>> resDaemon = execSql(
"SELECT NODE_ID, NODE_ORDER FROM " + systemSchemaName() + ".NODES WHERE IS_DAEMON = TRUE");
assertEquals(1, resDaemon.size());
assertEquals(nodeId(2), resDaemon.get(0).get(0));
assertEquals(3L, resDaemon.get(0).get(1));
// Check index on ID column.
assertEquals(0, execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODES WHERE NODE_ID = '-'").size());
assertEquals(1, execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODES WHERE NODE_ID = ?",
nodeId0).size());
assertEquals(1, execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODES WHERE NODE_ID = ?",
nodeId(2)).size());
// Check index on ID column with disjunction.
assertEquals(3, execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODES WHERE NODE_ID = ? " +
"OR node_order=1 OR node_order=2 OR node_order=3", nodeId0).size());
// Check quick-count.
assertEquals(3L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".NODES").get(0).get(0));
// Check joins
assertEquals(nodeId0, execSql("SELECT N1.NODE_ID FROM " + systemSchemaName() + ".NODES N1 JOIN " +
systemSchemaName() + ".NODES N2 ON N1.NODE_ORDER = N2.NODE_ORDER JOIN " +
systemSchemaName() + ".NODES N3 ON N2.NODE_ID = N3.NODE_ID WHERE N3.NODE_ORDER = 1")
.get(0).get(0));
// Check sub-query
assertEquals(nodeId0, execSql("SELECT N1.NODE_ID FROM " + systemSchemaName() + ".NODES N1 " +
"WHERE NOT EXISTS (SELECT 1 FROM " + systemSchemaName() + ".NODES N2 WHERE N2.NODE_ID = N1.NODE_ID AND N2.NODE_ORDER <> 1)")
.get(0).get(0));
// Check node attributes view
String cliAttrName = IgniteNodeAttributes.ATTR_CLIENT_MODE;
assertColumnTypes(execSql("SELECT NODE_ID, NAME, VALUE FROM " + systemSchemaName() + ".NODE_ATTRIBUTES").get(0),
UUID.class, String.class, String.class);
assertEquals(1,
execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODE_ATTRIBUTES WHERE NAME = ? AND VALUE = 'true'",
cliAttrName).size());
assertEquals(3,
execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODE_ATTRIBUTES WHERE NAME = ?", cliAttrName).size());
assertEquals(1,
execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODE_ATTRIBUTES WHERE NODE_ID = ? AND NAME = ? AND VALUE = 'true'",
nodeId(1), cliAttrName).size());
assertEquals(0,
execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODE_ATTRIBUTES WHERE NODE_ID = '-' AND NAME = ?",
cliAttrName).size());
assertEquals(0,
execSql("SELECT NODE_ID FROM " + systemSchemaName() + ".NODE_ATTRIBUTES WHERE NODE_ID = ? AND NAME = '-'",
nodeId(1)).size());
// Check node metrics view.
String sqlAllMetrics = "SELECT NODE_ID, LAST_UPDATE_TIME, " +
"MAX_ACTIVE_JOBS, CUR_ACTIVE_JOBS, AVG_ACTIVE_JOBS, " +
"MAX_WAITING_JOBS, CUR_WAITING_JOBS, AVG_WAITING_JOBS, " +
"MAX_REJECTED_JOBS, CUR_REJECTED_JOBS, AVG_REJECTED_JOBS, TOTAL_REJECTED_JOBS, " +
"MAX_CANCELED_JOBS, CUR_CANCELED_JOBS, AVG_CANCELED_JOBS, TOTAL_CANCELED_JOBS, " +
"MAX_JOBS_WAIT_TIME, CUR_JOBS_WAIT_TIME, AVG_JOBS_WAIT_TIME, " +
"MAX_JOBS_EXECUTE_TIME, CUR_JOBS_EXECUTE_TIME, AVG_JOBS_EXECUTE_TIME, TOTAL_JOBS_EXECUTE_TIME, " +
"TOTAL_EXECUTED_JOBS, TOTAL_EXECUTED_TASKS, " +
"TOTAL_BUSY_TIME, TOTAL_IDLE_TIME, CUR_IDLE_TIME, BUSY_TIME_PERCENTAGE, IDLE_TIME_PERCENTAGE, " +
"TOTAL_CPU, CUR_CPU_LOAD, AVG_CPU_LOAD, CUR_GC_CPU_LOAD, " +
"HEAP_MEMORY_INIT, HEAP_MEMORY_USED, HEAP_MEMORY_COMMITED, HEAP_MEMORY_MAX, HEAP_MEMORY_TOTAL, " +
"NONHEAP_MEMORY_INIT, NONHEAP_MEMORY_USED, NONHEAP_MEMORY_COMMITED, NONHEAP_MEMORY_MAX, NONHEAP_MEMORY_TOTAL, " +
"UPTIME, JVM_START_TIME, NODE_START_TIME, LAST_DATA_VERSION, " +
"CUR_THREAD_COUNT, MAX_THREAD_COUNT, TOTAL_THREAD_COUNT, CUR_DAEMON_THREAD_COUNT, " +
"SENT_MESSAGES_COUNT, SENT_BYTES_COUNT, RECEIVED_MESSAGES_COUNT, RECEIVED_BYTES_COUNT, " +
"OUTBOUND_MESSAGES_QUEUE FROM " + systemSchemaName() + ".NODE_METRICS";
List<List<?>> resMetrics = execSql(sqlAllMetrics);
assertColumnTypes(resMetrics.get(0), UUID.class, Timestamp.class,
Integer.class, Integer.class, Float.class, // Active jobs.
Integer.class, Integer.class, Float.class, // Waiting jobs.
Integer.class, Integer.class, Float.class, Integer.class, // Rejected jobs.
Integer.class, Integer.class, Float.class, Integer.class, // Canceled jobs.
Long.class, Long.class, Long.class, // Jobs wait time.
Long.class, Long.class, Long.class, Long.class, // Jobs execute time.
Integer.class, Integer.class, // Executed jobs/task.
Long.class, Long.class, Long.class, Float.class, Float.class, // Busy/idle time.
Integer.class, Double.class, Double.class, Double.class, // CPU.
Long.class, Long.class, Long.class, Long.class, Long.class, // Heap memory.
Long.class, Long.class, Long.class, Long.class, Long.class, // Nonheap memory.
Long.class, Timestamp.class, Timestamp.class, Long.class, // Uptime.
Integer.class, Integer.class, Long.class, Integer.class, // Threads.
Integer.class, Long.class, Integer.class, Long.class, // Sent/received messages.
Integer.class); // Outbound message queue.
assertEquals(3, resAll.size());
// Check join with nodes.
assertEquals(3, execSql("SELECT NM.LAST_UPDATE_TIME FROM " + systemSchemaName() + ".NODES N " +
"JOIN " + systemSchemaName() + ".NODE_METRICS NM ON N.NODE_ID = NM.NODE_ID").size());
// Check index on NODE_ID column.
assertEquals(1, execSql("SELECT LAST_UPDATE_TIME FROM " + systemSchemaName() + ".NODE_METRICS WHERE NODE_ID = ?",
nodeId(1)).size());
// Check malformed value for indexed column.
assertEquals(0, execSql("SELECT LAST_UPDATE_TIME FROM " + systemSchemaName() + ".NODE_METRICS WHERE NODE_ID = ?",
"-").size());
// Check quick-count.
assertEquals(3L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".NODE_METRICS").get(0).get(0));
// Check metric values.
// Broadcast jobs to server and client nodes to get non zero metric values.
for (int i = 0; i < 100; i++) {
IgniteFuture<Void> fut = igniteSrv.compute(igniteSrv.cluster().forNodeId(nodeId0, nodeId(1)))
.broadcastAsync(
new IgniteRunnable() {
@Override public void run() {
Random rnd = new Random();
try {
doSleep(rnd.nextInt(100));
}
catch (Throwable ignore) {
// No-op.
}
}
});
if (i % 10 == 0)
fut.cancel();
}
doSleep(igniteSrv.configuration().getMetricsUpdateFrequency() * 3L);
for (Ignite grid : G.allGrids()) {
UUID nodeId = grid.cluster().localNode().id();
// Metrics for node must be collected from another node to avoid race and get consistent metrics snapshot.
Ignite ignite = F.eq(nodeId, nodeId0) ? igniteCli : igniteSrv;
for (int i = 0; i < METRICS_CHECK_ATTEMPTS; i++) {
ClusterMetrics metrics = ignite.cluster().node(nodeId).metrics();
assertTrue(metrics instanceof ClusterMetricsSnapshot);
resMetrics = execSql(ignite, sqlAllMetrics + " WHERE NODE_ID = ?", nodeId);
log.info("Check metrics for node " + grid.name() + ", attempt " + (i + 1));
if (metrics.getLastUpdateTime() == ((Timestamp)resMetrics.get(0).get(1)).getTime()) {
assertEquals(metrics.getMaximumActiveJobs(), resMetrics.get(0).get(2));
assertEquals(metrics.getCurrentActiveJobs(), resMetrics.get(0).get(3));
assertEquals(metrics.getAverageActiveJobs(), resMetrics.get(0).get(4));
assertEquals(metrics.getMaximumWaitingJobs(), resMetrics.get(0).get(5));
assertEquals(metrics.getCurrentWaitingJobs(), resMetrics.get(0).get(6));
assertEquals(metrics.getAverageWaitingJobs(), resMetrics.get(0).get(7));
assertEquals(metrics.getMaximumRejectedJobs(), resMetrics.get(0).get(8));
assertEquals(metrics.getCurrentRejectedJobs(), resMetrics.get(0).get(9));
assertEquals(metrics.getAverageRejectedJobs(), resMetrics.get(0).get(10));
assertEquals(metrics.getTotalRejectedJobs(), resMetrics.get(0).get(11));
assertEquals(metrics.getMaximumCancelledJobs(), resMetrics.get(0).get(12));
assertEquals(metrics.getCurrentCancelledJobs(), resMetrics.get(0).get(13));
assertEquals(metrics.getAverageCancelledJobs(), resMetrics.get(0).get(14));
assertEquals(metrics.getTotalCancelledJobs(), resMetrics.get(0).get(15));
assertEquals(metrics.getMaximumJobWaitTime(), resMetrics.get(0).get(16));
assertEquals(metrics.getCurrentJobWaitTime(), resMetrics.get(0).get(17));
assertEquals((long)metrics.getAverageJobWaitTime(), resMetrics.get(0).get(18));
assertEquals(metrics.getMaximumJobExecuteTime(), resMetrics.get(0).get(19));
assertEquals(metrics.getCurrentJobExecuteTime(), resMetrics.get(0).get(20));
assertEquals((long)metrics.getAverageJobExecuteTime(), resMetrics.get(0).get(21));
assertEquals(metrics.getTotalJobsExecutionTime(), resMetrics.get(0).get(22));
assertEquals(metrics.getTotalExecutedJobs(), resMetrics.get(0).get(23));
assertEquals(metrics.getTotalExecutedTasks(), resMetrics.get(0).get(24));
assertEquals(metrics.getTotalBusyTime(), resMetrics.get(0).get(25));
assertEquals(metrics.getTotalIdleTime(), resMetrics.get(0).get(26));
assertEquals(metrics.getCurrentIdleTime(), resMetrics.get(0).get(27));
assertEquals(metrics.getBusyTimePercentage(), resMetrics.get(0).get(28));
assertEquals(metrics.getIdleTimePercentage(), resMetrics.get(0).get(29));
assertEquals(metrics.getTotalCpus(), resMetrics.get(0).get(30));
assertEquals(metrics.getCurrentCpuLoad(), resMetrics.get(0).get(31));
assertEquals(metrics.getAverageCpuLoad(), resMetrics.get(0).get(32));
assertEquals(metrics.getCurrentGcCpuLoad(), resMetrics.get(0).get(33));
assertEquals(metrics.getHeapMemoryInitialized(), resMetrics.get(0).get(34));
assertEquals(metrics.getHeapMemoryUsed(), resMetrics.get(0).get(35));
assertEquals(metrics.getHeapMemoryCommitted(), resMetrics.get(0).get(36));
assertEquals(metrics.getHeapMemoryMaximum(), resMetrics.get(0).get(37));
assertEquals(metrics.getHeapMemoryTotal(), resMetrics.get(0).get(38));
assertEquals(metrics.getNonHeapMemoryInitialized(), resMetrics.get(0).get(39));
assertEquals(metrics.getNonHeapMemoryUsed(), resMetrics.get(0).get(40));
assertEquals(metrics.getNonHeapMemoryCommitted(), resMetrics.get(0).get(41));
assertEquals(metrics.getNonHeapMemoryMaximum(), resMetrics.get(0).get(42));
assertEquals(metrics.getNonHeapMemoryTotal(), resMetrics.get(0).get(43));
assertEquals(metrics.getUpTime(), resMetrics.get(0).get(44));
assertEquals(metrics.getStartTime(), ((Timestamp)resMetrics.get(0).get(45)).getTime());
assertEquals(metrics.getNodeStartTime(), ((Timestamp)resMetrics.get(0).get(46)).getTime());
assertEquals(metrics.getLastDataVersion(), resMetrics.get(0).get(47));
assertEquals(metrics.getCurrentThreadCount(), resMetrics.get(0).get(48));
assertEquals(metrics.getMaximumThreadCount(), resMetrics.get(0).get(49));
assertEquals(metrics.getTotalStartedThreadCount(), resMetrics.get(0).get(50));
assertEquals(metrics.getCurrentDaemonThreadCount(), resMetrics.get(0).get(51));
assertEquals(metrics.getSentMessagesCount(), resMetrics.get(0).get(52));
assertEquals(metrics.getSentBytesCount(), resMetrics.get(0).get(53));
assertEquals(metrics.getReceivedMessagesCount(), resMetrics.get(0).get(54));
assertEquals(metrics.getReceivedBytesCount(), resMetrics.get(0).get(55));
assertEquals(metrics.getOutboundMessagesQueueSize(), resMetrics.get(0).get(56));
break;
}
else {
log.info("Metrics was updated in background, will retry check");
if (i == METRICS_CHECK_ATTEMPTS - 1)
fail("Failed to check metrics, attempts limit reached (" + METRICS_CHECK_ATTEMPTS + ')');
}
}
}
}
/**
* Test baseline topology system view.
*/
@Test
public void testBaselineViews() throws Exception {
cleanPersistenceDir();
Ignite ignite = startGrid(getTestIgniteInstanceName(), getPdsConfiguration("node0"));
startGrid(getTestIgniteInstanceName(1), getPdsConfiguration("node1"));
ignite.cluster().active(true);
List<List<?>> res = execSql("SELECT CONSISTENT_ID, ONLINE FROM " +
systemSchemaName() + ".BASELINE_NODES ORDER BY CONSISTENT_ID");
assertColumnTypes(res.get(0), String.class, Boolean.class);
assertEquals(2, res.size());
assertEquals("node0", res.get(0).get(0));
assertEquals("node1", res.get(1).get(0));
assertEquals(true, res.get(0).get(1));
assertEquals(true, res.get(1).get(1));
stopGrid(getTestIgniteInstanceName(1));
res = execSql("SELECT CONSISTENT_ID FROM " + systemSchemaName() + ".BASELINE_NODES WHERE ONLINE = false");
assertEquals(1, res.size());
assertEquals("node1", res.get(0).get(0));
Ignite ignite2 = startGrid(getTestIgniteInstanceName(2), getPdsConfiguration("node2"));
assertEquals(2, execSql(ignite2, "SELECT CONSISTENT_ID FROM " + systemSchemaName() + ".BASELINE_NODES").size());
res = execSql("SELECT CONSISTENT_ID FROM " + systemSchemaName() + ".NODES N WHERE NOT EXISTS (SELECT 1 FROM " +
systemSchemaName() + ".BASELINE_NODES B WHERE B.CONSISTENT_ID = N.CONSISTENT_ID)");
assertEquals(1, res.size());
assertEquals("node2", res.get(0).get(0));
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setMetricExporterSpi(new SqlViewMetricExporterSpi());
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration() throws Exception {
IgniteConfiguration cfg = super.getConfiguration()
.setCacheConfiguration(new CacheConfiguration().setName(DEFAULT_CACHE_NAME))
.setMetricExporterSpi(new SqlViewMetricExporterSpi());
if (isPersistenceEnabled) {
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setPersistenceEnabled(true).setMaxSize(10 * 1024 * 1024)
)
);
}
return cfg;
}
/**
* Test IO statistics SQL system views for cache groups.
*
* @throws Exception
*/
@Test
public void testIoStatisticsViews() throws Exception {
Ignite ignite = startGrid(getTestIgniteInstanceName(), getPdsConfiguration("node0"));
ignite.cluster().active(true);
execSql("CREATE TABLE TST(id INTEGER PRIMARY KEY, name VARCHAR, age integer)");
for (int i = 0; i < 500; i++)
execSql("INSERT INTO TST(id, name, age) VALUES (" + i + ",'name-" + i + "'," + i + 1 + ")");
String sql1 = "SELECT CACHE_GROUP_ID, CACHE_GROUP_NAME, PHYSICAL_READS, LOGICAL_READS FROM " +
systemSchemaName() + ".LOCAL_CACHE_GROUPS_IO";
List<List<?>> res1 = execSql(sql1);
Map<?, ?> map = res1.stream().collect(Collectors.toMap(k -> k.get(1), v -> v.get(3)));
assertEquals(2, map.size());
assertTrue(map.containsKey("SQL_PUBLIC_TST"));
assertTrue((Long)map.get("SQL_PUBLIC_TST") > 0);
assertTrue(map.containsKey(DEFAULT_CACHE_NAME));
sql1 = "SELECT CACHE_GROUP_ID, CACHE_GROUP_NAME, PHYSICAL_READS, LOGICAL_READS FROM " +
systemSchemaName() + ".LOCAL_CACHE_GROUPS_IO WHERE CACHE_GROUP_NAME='SQL_PUBLIC_TST'";
assertEquals(1, execSql(sql1).size());
}
/**
* Simple test for {@link SqlTableView}.
*/
@Test
public void testTablesView() throws Exception {
IgniteEx ignite = startGrid(getConfiguration());
GridCacheProcessor cacheProc = ignite.context().cache();
execSql("CREATE TABLE CACHE_SQL (ID INT PRIMARY KEY, MY_VAL VARCHAR) WITH " +
"\"cache_name=cache_sql,template=partitioned,atomicity=atomic,wrap_value=true,value_type=random_name\"");
execSql("CREATE TABLE PUBLIC.DFLT_CACHE (ID1 INT, ID2 INT, MY_VAL VARCHAR, PRIMARY KEY (ID1, ID2)) WITH"
+ "\"affinity_key=ID2,wrap_value=false,key_type=random_name\"");
int cacheSqlId = cacheProc.cacheDescriptor("cache_sql").cacheId();
int ddlTabId = cacheProc.cacheDescriptor("SQL_PUBLIC_DFLT_CACHE").cacheId();
List<List<?>> cacheSqlInfos = execSql("SELECT * FROM " + systemSchemaName() + ".TABLES WHERE " +
"TABLE_NAME = 'CACHE_SQL'");
List<?> expRow = asList(
"CACHE_SQL", // TABLE_NAME
"PUBLIC", // SCHEMA_NAME
"cache_sql", // CACHE_NAME
cacheSqlId, // CACHE_ID
null, // AFFINITY_KEY_COLUMN
"ID", // KEY_ALIAS
null, // VALUE_ALIAS
"java.lang.Integer", // KEY_TYPE_NAME
"random_name", // VALUE_TYPE_NAME
false // IS_INDEX_REBUILD_IN_PROGRESS
);
assertEquals("Returned incorrect info. ", expRow, cacheSqlInfos.get(0));
// no more rows are expected.
assertEquals("Expected to return only one row", 1, cacheSqlInfos.size());
List<List<?>> allInfos = execSql("SELECT * FROM " + systemSchemaName() + ".TABLES");
List<?> allExpRows = asList(
expRow,
asList(
"DFLT_CACHE", // TABLE_NAME
"PUBLIC", // SCHEMA_NAME
"SQL_PUBLIC_DFLT_CACHE", // CACHE_NAME
ddlTabId, // CACHE_ID
"ID2", // AFFINITY_KEY_COLUMN
null, // KEY_ALIAS
"MY_VAL", // VALUE_ALIAS
"random_name", // KEY_TYPE_NAME
"java.lang.String", // VALUE_TYPE_NAME
false // IS_INDEX_REBUILD_IN_PROGRESS
)
);
if (!F.eqNotOrdered(allExpRows, allInfos))
fail("Returned incorrect rows [expected=" + allExpRows + ", actual=" + allInfos + "].");
// Filter by cache name:
assertEquals(
Collections.singletonList(asList("DFLT_CACHE", "SQL_PUBLIC_DFLT_CACHE")),
execSql("SELECT TABLE_NAME, CACHE_NAME " +
"FROM " + systemSchemaName() + ".TABLES " +
"WHERE CACHE_NAME LIKE 'SQL\\_PUBLIC\\_%'"));
assertEquals(
Collections.singletonList(asList("CACHE_SQL", "cache_sql")),
execSql("SELECT TABLE_NAME, CACHE_NAME " +
"FROM " + systemSchemaName() + ".TABLES " +
"WHERE CACHE_NAME NOT LIKE 'SQL\\_PUBLIC\\_%'"));
// Join with CACHES view.
assertEquals(
asList(
asList("DFLT_CACHE", "SQL_PUBLIC_DFLT_CACHE", "SQL_PUBLIC_DFLT_CACHE"),
asList("CACHE_SQL", "cache_sql", "cache_sql")),
execSql("SELECT TABLE_NAME, TAB.CACHE_NAME, C.CACHE_NAME " +
"FROM " + systemSchemaName() + ".TABLES AS TAB JOIN " + systemSchemaName() + ".CACHES AS C " +
"ON TAB.CACHE_ID = C.CACHE_ID " +
"ORDER BY C.CACHE_NAME")
);
}
/**
* Verify that if we drop or create table, TABLES system view reflects these changes.
*/
@Test
public void testTablesDropAndCreate() throws Exception {
IgniteEx ignite = startGrid(getConfiguration());
final String selectTabNameCacheName = "SELECT TABLE_NAME, CACHE_NAME FROM " + systemSchemaName() + ".TABLES ORDER BY TABLE_NAME";
assertTrue("Initially no tables expected", execSql(selectTabNameCacheName).isEmpty());
execSql("CREATE TABLE PUBLIC.TAB1 (ID INT PRIMARY KEY, VAL VARCHAR)");
assertEquals(
asList(asList("TAB1", "SQL_PUBLIC_TAB1")),
execSql(selectTabNameCacheName));
execSql("CREATE TABLE PUBLIC.TAB2 (ID LONG PRIMARY KEY, VAL_STR VARCHAR) WITH \"cache_name=cache2\"");
execSql("CREATE TABLE PUBLIC.TAB3 (ID LONG PRIMARY KEY, VAL_INT INT) WITH \"cache_name=cache3\" ");
assertEquals(
asList(
asList("TAB1", "SQL_PUBLIC_TAB1"),
asList("TAB2", "cache2"),
asList("TAB3", "cache3")
),
execSql(selectTabNameCacheName));
execSql("DROP TABLE PUBLIC.TAB2");
assertEquals(
asList(
asList("TAB1", "SQL_PUBLIC_TAB1"),
asList("TAB3", "cache3")
),
execSql(selectTabNameCacheName));
execSql("DROP TABLE PUBLIC.TAB3");
assertEquals(
asList(asList("TAB1", "SQL_PUBLIC_TAB1")),
execSql(selectTabNameCacheName));
execSql("DROP TABLE PUBLIC.TAB1");
assertTrue("All tables should be dropped", execSql(selectTabNameCacheName).isEmpty());
}
/**
* Dummy implementation of the mapper. Required to test "AFFINITY_KEY_COLUMN".
*/
static class ConstantMapper implements AffinityKeyMapper {
/** Serial version uid. */
private static final long serialVersionUID = 7018626316531791556L;
/** {@inheritDoc} */
@Override public Object affinityKey(Object key) {
return 1;
}
/** {@inheritDoc} */
@Override public void reset() {
//NO-op
}
}
/**
* Check affinity column if custom affinity mapper is specified.
*/
@Test
public void testTablesNullAffinityKey() throws Exception {
IgniteEx ignite = startGrid(getConfiguration());
AffinityKeyMapper fakeMapper = new ConstantMapper();
ignite.getOrCreateCache(defaultCacheConfiguration().setName("NO_KEY_FIELDS_CACHE").setAffinityMapper(fakeMapper)
.setQueryEntities(Collections.singleton(
// A cache with no key fields
new QueryEntity(Object.class.getName(), "Object2")
.addQueryField("name", String.class.getName(), null)
.addQueryField("salary", Integer.class.getName(), null)
.setTableName("NO_KEY_TABLE")
)));
List<List<String>> expected = Collections.singletonList(asList("NO_KEY_TABLE", null));
assertEquals(expected,
execSql("SELECT TABLE_NAME, AFFINITY_KEY_COLUMN " +
"FROM " + systemSchemaName() + ".TABLES " +
"WHERE CACHE_NAME = 'NO_KEY_FIELDS_CACHE'"));
assertEquals(expected,
execSql("SELECT TABLE_NAME, AFFINITY_KEY_COLUMN " +
"FROM " + systemSchemaName() + ".TABLES " +
"WHERE AFFINITY_KEY_COLUMN IS NULL"));
}
/**
* Special test for key/val name and type. Covers most used cases
*/
@Test
public void testTablesViewKeyVal() throws Exception {
IgniteEx ignite = startGrid(getConfiguration());
{
ignite.getOrCreateCache(defaultCacheConfiguration().setName("NO_ALIAS_NON_SQL_KEY")
.setQueryEntities(Collections.singleton(
// A cache with no key fields
new QueryEntity(Object.class.getName(), "Object2")
.addQueryField("name", String.class.getName(), null)
.addQueryField("salary", Integer.class.getName(), null)
.setTableName("NO_ALIAS_NON_SQL_KEY")
)));
List<?> keyValAliases = execSql("SELECT KEY_ALIAS, VALUE_ALIAS FROM " + systemSchemaName() + ".TABLES " +
"WHERE TABLE_NAME = 'NO_ALIAS_NON_SQL_KEY'").get(0);
assertEquals(asList(null, null), keyValAliases);
}
{
execSql("CREATE TABLE PUBLIC.SIMPLE_KEY_SIMPLE_VAL (ID INT PRIMARY KEY, NAME VARCHAR) WITH \"wrap_value=false\"");
List<?> keyValAliases = execSql("SELECT KEY_ALIAS, VALUE_ALIAS FROM " + systemSchemaName() + ".TABLES " +
"WHERE TABLE_NAME = 'SIMPLE_KEY_SIMPLE_VAL'").get(0);
assertEquals(asList("ID", "NAME"), keyValAliases);
}
{
execSql("CREATE TABLE PUBLIC.COMPLEX_KEY_COMPLEX_VAL " +
"(ID1 INT, " +
"ID2 INT, " +
"VAL1 VARCHAR, " +
"VAL2 VARCHAR, " +
"PRIMARY KEY(ID1, ID2))");
List<?> keyValAliases = execSql("SELECT KEY_ALIAS, VALUE_ALIAS FROM " + systemSchemaName() + ".TABLES " +
"WHERE TABLE_NAME = 'COMPLEX_KEY_COMPLEX_VAL'").get(0);
assertEquals(asList(null, null), keyValAliases);
}
}
/**
* Test caches system views.
*/
@SuppressWarnings("ConstantConditions")
@Test
public void testCachesViews() throws Exception {
DataStorageConfiguration dsCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setName("def").setPersistenceEnabled(true))
.setDataRegionConfigurations(new DataRegionConfiguration().setName("dr1"),
new DataRegionConfiguration().setName("dr2"), new DataRegionConfiguration().setName("dr3"));
IgniteEx ignite0 = startGrid(getConfiguration().setDataStorageConfiguration(dsCfg));
Ignite ignite1 = startGrid(getConfiguration().setDataStorageConfiguration(dsCfg).setIgniteInstanceName("node1"));
ignite0.cluster().active(true);
Ignite ignite2 = startGrid(getConfiguration().setDataStorageConfiguration(dsCfg).setIgniteInstanceName("node2"));
Ignite ignite3 =
startClientGrid(getConfiguration().setDataStorageConfiguration(dsCfg).setIgniteInstanceName("node3"));
ignite0.getOrCreateCache(new CacheConfiguration<>()
.setName("cache_atomic_part")
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setCacheMode(CacheMode.PARTITIONED)
.setGroupName("cache_grp")
.setNodeFilter(new TestNodeFilter(ignite0.cluster().localNode()))
);
ignite0.getOrCreateCache(new CacheConfiguration<>()
.setName("cache_atomic_repl")
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setCacheMode(CacheMode.REPLICATED)
.setDataRegionName("dr1")
.setTopologyValidator(new TestTopologyValidator())
);
ignite0.getOrCreateCache(new CacheConfiguration<>()
.setName("cache_tx_part")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(CacheMode.PARTITIONED)
.setGroupName("cache_grp")
.setNodeFilter(new TestNodeFilter(ignite0.cluster().localNode()))
);
ignite0.getOrCreateCache(new CacheConfiguration<>()
.setName("cache_tx_repl")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(CacheMode.REPLICATED)
.setDataRegionName("dr2")
.setEvictionFilter(new TestEvictionFilter())
.setEvictionPolicyFactory(new TestEvictionPolicyFactory())
.setOnheapCacheEnabled(true)
);
ignite0.getOrCreateCache(new CacheConfiguration<>()
.setName("cache_cust_node_filter")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(CacheMode.REPLICATED)
.setDataRegionName("dr3")
.setEvictionFilter(new TestEvictionFilter())
.setEvictionPolicyFactory(new TestEvictionPolicyFactory())
.setOnheapCacheEnabled(true)
.setNodeFilter(new CustomNodeFilter(Integer.MAX_VALUE))
);
ignite0.getOrCreateCache(new CacheConfiguration<>()
.setName("cache_cust_err_node_filter")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(CacheMode.REPLICATED)
.setDataRegionName("dr3")
.setEvictionFilter(new TestEvictionFilter())
.setEvictionPolicyFactory(new TestEvictionPolicyFactory())
.setOnheapCacheEnabled(true)
.setNodeFilter(new CustomNodeFilter(1))
);
execSql("CREATE TABLE cache_sql (ID INT PRIMARY KEY, VAL VARCHAR) WITH " +
"\"cache_name=cache_sql,template=partitioned,atomicity=atomic\"");
awaitPartitionMapExchange();
List<List<?>> resAll = execSql("SELECT CACHE_GROUP_ID, CACHE_GROUP_NAME, CACHE_ID, CACHE_NAME, CACHE_TYPE," +
"CACHE_MODE, ATOMICITY_MODE, IS_ONHEAP_CACHE_ENABLED, IS_COPY_ON_READ, IS_LOAD_PREVIOUS_VALUE, " +
"IS_READ_FROM_BACKUP, PARTITION_LOSS_POLICY, NODE_FILTER, TOPOLOGY_VALIDATOR, IS_EAGER_TTL, " +
"WRITE_SYNCHRONIZATION_MODE, IS_INVALIDATE, IS_EVENTS_DISABLED, IS_STATISTICS_ENABLED, " +
"IS_MANAGEMENT_ENABLED, BACKUPS, AFFINITY, AFFINITY_MAPPER, " +
"REBALANCE_MODE, REBALANCE_BATCH_SIZE, REBALANCE_TIMEOUT, REBALANCE_DELAY, REBALANCE_THROTTLE, " +
"REBALANCE_BATCHES_PREFETCH_COUNT, REBALANCE_ORDER, " +
"EVICTION_FILTER, EVICTION_POLICY_FACTORY, " +
"IS_NEAR_CACHE_ENABLED, NEAR_CACHE_EVICTION_POLICY_FACTORY, NEAR_CACHE_START_SIZE, " +
"DEFAULT_LOCK_TIMEOUT, INTERCEPTOR, CACHE_STORE_FACTORY, " +
"IS_STORE_KEEP_BINARY, IS_READ_THROUGH, IS_WRITE_THROUGH, " +
"IS_WRITE_BEHIND_ENABLED, WRITE_BEHIND_COALESCING, WRITE_BEHIND_FLUSH_SIZE, " +
"WRITE_BEHIND_FLUSH_FREQUENCY, WRITE_BEHIND_FLUSH_THREAD_COUNT, WRITE_BEHIND_BATCH_SIZE, " +
"MAX_CONCURRENT_ASYNC_OPERATIONS, CACHE_LOADER_FACTORY, CACHE_WRITER_FACTORY, EXPIRY_POLICY_FACTORY, " +
"IS_SQL_ESCAPE_ALL, SQL_SCHEMA, SQL_INDEX_MAX_INLINE_SIZE, IS_SQL_ONHEAP_CACHE_ENABLED, " +
"SQL_ONHEAP_CACHE_MAX_SIZE, QUERY_DETAIL_METRICS_SIZE, QUERY_PARALLELISM, MAX_QUERY_ITERATORS_COUNT, " +
"DATA_REGION_NAME FROM " + systemSchemaName() + ".CACHES");
assertColumnTypes(resAll.get(0),
Integer.class, String.class, Integer.class, String.class, String.class,
String.class, String.class, Boolean.class, Boolean.class, Boolean.class,
Boolean.class, String.class, String.class, String.class, Boolean.class,
String.class, Boolean.class, Boolean.class, Boolean.class,
Boolean.class, Integer.class, String.class, String.class,
String.class, Integer.class, Long.class, Long.class, Long.class, // Rebalance.
Long.class, Integer.class,
String.class, String.class, // Eviction.
Boolean.class, String.class, Integer.class, // Near cache.
Long.class, String.class, String.class,
Boolean.class, Boolean.class, Boolean.class,
Boolean.class, Boolean.class, Integer.class, // Write-behind.
Long.class, Integer.class, Integer.class,
Integer.class, String.class, String.class, String.class,
Boolean.class, String.class, Integer.class, Boolean.class, // SQL.
Integer.class, Integer.class, Integer.class, Integer.class,
String.class);
assertEquals("cache_tx_part", execSql("SELECT CACHE_NAME FROM " + systemSchemaName() + ".CACHES WHERE " +
"CACHE_MODE = 'PARTITIONED' AND ATOMICITY_MODE = 'TRANSACTIONAL' AND CACHE_NAME like 'cache%'").get(0).get(0));
assertEquals("cache_atomic_repl", execSql("SELECT CACHE_NAME FROM " + systemSchemaName() + ".CACHES WHERE " +
"CACHE_MODE = 'REPLICATED' AND ATOMICITY_MODE = 'ATOMIC' AND CACHE_NAME like 'cache%'").get(0).get(0));
assertEquals(2L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_GROUP_NAME = 'cache_grp'")
.get(0).get(0));
assertEquals("cache_atomic_repl", execSql("SELECT CACHE_NAME FROM " + systemSchemaName() + ".CACHES " +
"WHERE DATA_REGION_NAME = 'dr1'").get(0).get(0));
assertEquals("cache_tx_repl", execSql("SELECT CACHE_NAME FROM " + systemSchemaName() + ".CACHES " +
"WHERE DATA_REGION_NAME = 'dr2'").get(0).get(0));
assertEquals("PARTITIONED", execSql("SELECT CACHE_MODE FROM " + systemSchemaName() + ".CACHES " +
"WHERE CACHE_NAME = 'cache_atomic_part'").get(0).get(0));
assertEquals("USER", execSql("SELECT CACHE_TYPE FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME = 'cache_sql'")
.get(0).get(0));
assertEquals(0L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME = 'no_such_cache'").get(0)
.get(0));
assertEquals(0L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME = '1'").get(0).get(0));
assertEquals("TestNodeFilter", execSql("SELECT NODE_FILTER FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME = " +
"'cache_atomic_part'").get(0).get(0));
assertEquals("TestEvictionFilter", execSql("SELECT EVICTION_FILTER FROM " + systemSchemaName() + ".CACHES " +
"WHERE CACHE_NAME = 'cache_tx_repl'").get(0).get(0));
assertEquals("TestEvictionPolicyFactory", execSql("SELECT EVICTION_POLICY_FACTORY " +
"FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME = 'cache_tx_repl'").get(0).get(0));
assertEquals("TestTopologyValidator", execSql("SELECT TOPOLOGY_VALIDATOR FROM " + systemSchemaName() + ".CACHES " +
"WHERE CACHE_NAME = 'cache_atomic_repl'").get(0).get(0));
// Check quick count.
assertEquals(execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES").get(0).get(0),
execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_ID <> CACHE_ID + 1").get(0).get(0));
// Check that caches are the same on BLT, BLT filtered by node filter, non BLT and client nodes.
assertEquals(7L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME like 'cache%'").get(0)
.get(0));
assertEquals(7L, execSql(ignite1, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME like 'cache%'")
.get(0).get(0));
assertEquals(7L, execSql(ignite2, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME like 'cache%'")
.get(0).get(0));
assertEquals(7L, execSql(ignite3, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES WHERE CACHE_NAME like 'cache%'")
.get(0).get(0));
// Check cache groups.
resAll = execSql("SELECT CACHE_GROUP_ID, CACHE_GROUP_NAME, IS_SHARED, CACHE_COUNT, " +
"CACHE_MODE, ATOMICITY_MODE, AFFINITY, PARTITIONS_COUNT, " +
"NODE_FILTER, DATA_REGION_NAME, TOPOLOGY_VALIDATOR, PARTITION_LOSS_POLICY, " +
"REBALANCE_MODE, REBALANCE_DELAY, REBALANCE_ORDER, BACKUPS " +
"FROM " + systemSchemaName() + ".CACHE_GROUPS");
assertColumnTypes(resAll.get(0),
Integer.class, String.class, Boolean.class, Integer.class,
String.class, String.class, String.class, Integer.class,
String.class, String.class, String.class, String.class,
String.class, Long.class, Integer.class, Integer.class);
assertEquals(2, execSql("SELECT CACHE_COUNT FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE CACHE_GROUP_NAME = 'cache_grp'").get(0).get(0));
assertEquals("cache_grp", execSql("SELECT CACHE_GROUP_NAME FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE IS_SHARED = true AND CACHE_GROUP_NAME like 'cache%'").get(0).get(0));
// Check index on ID column.
assertEquals("cache_tx_repl", execSql("SELECT CACHE_GROUP_NAME FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE CACHE_GROUP_ID = ?", ignite0.cachex("cache_tx_repl").context().groupId()).get(0).get(0));
assertEquals(0, execSql("SELECT CACHE_GROUP_ID FROM " + systemSchemaName() + ".CACHE_GROUPS WHERE CACHE_GROUP_ID = 0").size());
// Check join by indexed column.
assertEquals("cache_tx_repl", execSql("SELECT CG.CACHE_GROUP_NAME FROM " + systemSchemaName() + ".CACHES C JOIN " +
systemSchemaName() + ".CACHE_GROUPS CG ON C.CACHE_GROUP_ID = CG.CACHE_GROUP_ID WHERE C.CACHE_NAME = 'cache_tx_repl'")
.get(0).get(0));
// Check join by non-indexed column.
assertEquals("cache_grp", execSql("SELECT CG.CACHE_GROUP_NAME FROM " + systemSchemaName() + ".CACHES C JOIN " +
systemSchemaName() + ".CACHE_GROUPS CG ON C.CACHE_GROUP_NAME = CG.CACHE_GROUP_NAME WHERE C.CACHE_NAME = 'cache_tx_part'")
.get(0).get(0));
// Check configuration equality for cache and cache group views.
assertEquals(5L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHES C JOIN " +
systemSchemaName() + ".CACHE_GROUPS CG " +
"ON C.CACHE_NAME = CG.CACHE_GROUP_NAME WHERE C.CACHE_NAME like 'cache%' " +
"AND C.CACHE_MODE = CG.CACHE_MODE " +
"AND C.ATOMICITY_MODE = CG.ATOMICITY_MODE " +
"AND COALESCE(C.AFFINITY, '-') = COALESCE(CG.AFFINITY, '-') " +
"AND COALESCE(C.NODE_FILTER, '-') = COALESCE(CG.NODE_FILTER, '-') " +
"AND COALESCE(C.DATA_REGION_NAME, '-') = COALESCE(CG.DATA_REGION_NAME, '-') " +
"AND COALESCE(C.TOPOLOGY_VALIDATOR, '-') = COALESCE(CG.TOPOLOGY_VALIDATOR, '-') " +
"AND C.PARTITION_LOSS_POLICY = CG.PARTITION_LOSS_POLICY " +
"AND C.REBALANCE_MODE = CG.REBALANCE_MODE " +
"AND C.REBALANCE_DELAY = CG.REBALANCE_DELAY " +
"AND C.REBALANCE_ORDER = CG.REBALANCE_ORDER " +
"AND COALESCE(C.BACKUPS, -1) = COALESCE(CG.BACKUPS, -1)"
).get(0).get(0));
// Check quick count.
assertEquals(execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS").get(0).get(0),
execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS WHERE CACHE_GROUP_ID <> CACHE_GROUP_ID + 1")
.get(0).get(0));
// Check that cache groups are the same on different nodes.
assertEquals(6L, execSql("SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE CACHE_GROUP_NAME like 'cache%'").get(0).get(0));
assertEquals(6L, execSql(ignite1, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE CACHE_GROUP_NAME like 'cache%'").get(0).get(0));
assertEquals(6L, execSql(ignite2, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE CACHE_GROUP_NAME like 'cache%'").get(0).get(0));
assertEquals(6L, execSql(ignite3, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE CACHE_GROUP_NAME like 'cache%'").get(0).get(0));
assertEquals(5L, execSql(ignite0, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE NODE_FILTER is NULL").get(0).get(0));
assertEquals(1L, execSql(ignite0, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE NODE_FILTER = 'CUSTOM_NODE_FILTER'").get(0).get(0));
assertEquals(1L, execSql(ignite0, "SELECT COUNT(*) FROM " + systemSchemaName() + ".CACHE_GROUPS " +
"WHERE NODE_FILTER like '%Oops... incorrect customer realization.'").get(0).get(0));
}
/**
* Regression test. Verifies that duration metrics is able to be longer than 24 hours.
*/
@Test
public void testDurationMetricsCanBeLonger24Hours() throws Exception {
Ignite ign = startGrid("MockedMetrics", getConfiguration().setMetricsUpdateFrequency(500));
ClusterNode node = ign.cluster().localNode();
assert node instanceof TcpDiscoveryNode : "Setup failed, test is incorrect.";
// Get rid of metrics provider: current logic ignores metrics field if provider != null.
setField(node, "metricsProvider", null);
ClusterMetricsImpl original = getField(node, "metrics");
setField(node, "metrics", new MockedClusterMetrics(original));;
List<?> durationMetrics = execSql(ign,
"SELECT " +
"MAX_JOBS_WAIT_TIME, " +
"CUR_JOBS_WAIT_TIME, " +
"AVG_JOBS_WAIT_TIME, " +
"MAX_JOBS_EXECUTE_TIME, " +
"CUR_JOBS_EXECUTE_TIME, " +
"AVG_JOBS_EXECUTE_TIME, " +
"TOTAL_JOBS_EXECUTE_TIME, " +
"TOTAL_BUSY_TIME, " +
"TOTAL_IDLE_TIME, " +
"CUR_IDLE_TIME, " +
"UPTIME " +
"FROM " + systemSchemaName() + ".NODE_METRICS").get(0);
List<Long> elevenExpVals = LongStream
.generate(() -> MockedClusterMetrics.LONG_DURATION_MS)
.limit(11)
.boxed()
.collect(Collectors.toList());
assertEqualsCollections(elevenExpVals, durationMetrics);
}
/**
* Mock for {@link ClusterMetricsImpl} that always returns big (more than 24h) duration for all duration metrics.
*/
public static class MockedClusterMetrics extends ClusterMetricsImpl {
/** Some long (> 24h) duration. */
public static final long LONG_DURATION_MS = TimeUnit.DAYS.toMillis(365);
/**
* Constructor.
*
* @param original - original cluster metrics object. Required to leave the original behaviour for not overriden
* methods.
*/
public MockedClusterMetrics(ClusterMetricsImpl original) throws Exception {
super(
getField(original, "ctx"),
getField(original, "nodeStartTime"));
}
/** {@inheritDoc} */
@Override public long getMaximumJobWaitTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getCurrentJobWaitTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public double getAverageJobWaitTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getMaximumJobExecuteTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getCurrentJobExecuteTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public double getAverageJobExecuteTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getTotalJobsExecutionTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getTotalBusyTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getTotalIdleTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getCurrentIdleTime() {
return LONG_DURATION_MS;
}
/** {@inheritDoc} */
@Override public long getUpTime() {
return LONG_DURATION_MS;
}
}
/**
* Get field value using reflection.
*
* @param target object containing the field.
* @param fieldName name of the field.
*/
private static <T> T getField(Object target, String fieldName) throws Exception {
Class clazz = target.getClass();
Field fld = clazz.getDeclaredField(fieldName);
fld.setAccessible(true);
return (T) fld.get(target);
}
/**
* Set field using reflection.
*
* @param target object containing the field.
* @param fieldName name of the field.
* @param val new field value.
*/
private static void setField(Object target, String fieldName, Object val) throws Exception {
Class clazz = target.getClass();
Field fld = clazz.getDeclaredField(fieldName);
fld.setAccessible(true);
fld.set(target, val);
}
/**
* Gets ignite configuration with persistence enabled.
*/
private IgniteConfiguration getPdsConfiguration(String consistentId) throws Exception {
IgniteConfiguration cfg = getConfiguration();
cfg.setDataStorageConfiguration(
new DataStorageConfiguration().setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100L * 1024L * 1024L).setPersistenceEnabled(true))
);
cfg.setConsistentId(consistentId);
return cfg;
}
/**
*
*/
private static class TestNodeFilter extends GridNodePredicate {
/**
* @param node Node.
*/
public TestNodeFilter(ClusterNode node) {
super(node);
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestNodeFilter";
}
}
/**
*
*/
private static class TestEvictionFilter implements EvictionFilter<Object, Object> {
/** {@inheritDoc} */
@Override public boolean evictAllowed(Cache.Entry<Object, Object> entry) {
return false;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestEvictionFilter";
}
}
/**
*
*/
private static class TestEvictionPolicyFactory implements Factory<EvictionPolicy<Object, Object>> {
/** {@inheritDoc} */
@Override public EvictionPolicy<Object, Object> create() {
return new EvictionPolicy<Object, Object>() {
@Override public void onEntryAccessed(boolean rmv, EvictableEntry<Object, Object> entry) {
// No-op.
}
};
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestEvictionPolicyFactory";
}
}
/**
*
*/
private static class TestTopologyValidator implements TopologyValidator {
/** {@inheritDoc} */
@Override public boolean validate(Collection<ClusterNode> nodes) {
return true;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestTopologyValidator";
}
}
private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> {
private final int attemptsBeforeException;
private volatile int attempts;
public CustomNodeFilter(int attemptsBeforeException) {
this.attemptsBeforeException = attemptsBeforeException;
}
@Override public boolean apply(ClusterNode node) {
return true;
}
@Override public String toString() {
if(attempts++ > attemptsBeforeException)
throw new NullPointerException("Oops... incorrect customer realization.");
return "CUSTOM_NODE_FILTER";
}
}
}