blob: 2dfe0e164d9810001f024de90ec450f8cacde99a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.metric;
import java.sql.Connection;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteJdbcThinDriver;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.metric.AbstractExporterSpiTest;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestPredicate;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestRunnable;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestTransformer;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.service.DummyService;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.spi.metric.sql.SqlViewMetricExporterSpi;
import org.apache.ignite.spi.systemview.view.SqlSchemaView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static java.util.Arrays.asList;
import static org.apache.ignite.internal.metric.SystemViewSelfTest.TEST_PREDICATE;
import static org.apache.ignite.internal.metric.SystemViewSelfTest.TEST_TRANSFORMER;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheGroupId;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.queryProcessor;
import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA;
import static org.apache.ignite.internal.processors.query.QueryUtils.SCHEMA_SYS;
import static org.apache.ignite.internal.processors.query.h2.SchemaManager.SQL_SCHEMA_VIEW;
import static org.apache.ignite.internal.util.IgniteUtils.toStringSafe;
import static org.apache.ignite.internal.util.lang.GridFunc.t;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/** */
public class SqlViewExporterSpiTest extends AbstractExporterSpiTest {
/** */
private static IgniteEx ignite0;
/** */
private static IgniteEx ignite1;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setConsistentId(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDataRegionConfigurations(
new DataRegionConfiguration().setName("in-memory").setMaxSize(100L * 1024 * 1024))
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)));
SqlViewMetricExporterSpi sqlSpi = new SqlViewMetricExporterSpi();
if (igniteInstanceName.endsWith("1"))
sqlSpi.setExportFilter(mgrp -> !mgrp.name().startsWith(FILTERED_PREFIX));
cfg.setMetricExporterSpi(sqlSpi);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
cleanPersistenceDir();
ignite0 = startGrid(0);
ignite1 = startGrid(1);
ignite0.cluster().active(true);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
Collection<String> caches = ignite0.cacheNames();
for (String cache : caches)
ignite0.destroyCache(cache);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids(true);
cleanPersistenceDir();
}
/** */
@Test
public void testEmptyFilter() throws Exception {
List<List<?>> res = execute(ignite0, "SELECT * FROM SYS.METRICS");
assertNotNull(res);
assertFalse(res.isEmpty());
}
/** */
@Test
public void testDataRegionMetrics() throws Exception {
List<List<?>> res = execute(ignite0,
"SELECT REPLACE(name, 'io.dataregion.default.'), value, description FROM SYS.METRICS");
Set<String> names = new HashSet<>();
for (List<?> row : res) {
names.add((String)row.get(0));
assertNotNull(row.get(1));
}
for (String attr : EXPECTED_ATTRIBUTES)
assertTrue(attr + " should be exporterd via SQL view", names.contains(attr));
}
/** */
@Test
public void testFilterAndExport() throws Exception {
createAdditionalMetrics(ignite1);
List<List<?>> res = execute(ignite1,
"SELECT name, value, description FROM SYS.METRICS " +
"WHERE name LIKE 'other.prefix%' OR name LIKE '" + FILTERED_PREFIX + "%'");
Set<IgniteBiTuple<String, String>> expVals = new HashSet<>(asList(
t("other.prefix.test", "42"),
t("other.prefix.test2", "43"),
t("other.prefix2.test3", "44")
));
Set<IgniteBiTuple<String, String>> vals = new HashSet<>();
for (List<?> row : res)
vals.add(t((String)row.get(0), (String)row.get(1)));
assertEquals(expVals, vals);
}
/** */
@Test
public void testCachesView() throws Exception {
Set<String> cacheNames = new HashSet<>(asList("cache-1", "cache-2"));
for (String name : cacheNames)
ignite0.createCache(name);
List<List<?>> caches = execute(ignite0, "SELECT CACHE_NAME FROM SYS.CACHES");
assertEquals(ignite0.context().cache().cacheDescriptors().size(), caches.size());
for (List<?> row : caches)
cacheNames.remove(row.get(0));
assertTrue(cacheNames.toString(), cacheNames.isEmpty());
}
/** */
@Test
public void testCacheGroupsView() throws Exception {
Set<String> grpNames = new HashSet<>(asList("grp-1", "grp-2"));
for (String grpName : grpNames)
ignite0.createCache(new CacheConfiguration<>("cache-" + grpName).setGroupName(grpName));
List<List<?>> grps = execute(ignite0, "SELECT CACHE_GROUP_NAME FROM SYS.CACHE_GROUPS");
assertEquals(ignite0.context().cache().cacheGroupDescriptors().size(), grps.size());
for (List<?> row : grps)
grpNames.remove(row.get(0));
assertTrue(grpNames.toString(), grpNames.isEmpty());
}
/** */
@Test
public void testComputeBroadcast() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(6);
for (int i = 0; i < 5; i++) {
ignite0.compute().broadcastAsync(() -> {
try {
barrier.await();
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
}
barrier.await();
List<List<?>> tasks = execute(ignite0,
"SELECT " +
" INTERNAL, " +
" AFFINITY_CACHE_NAME, " +
" AFFINITY_PARTITION_ID, " +
" TASK_CLASS_NAME, " +
" TASK_NAME, " +
" TASK_NODE_ID, " +
" USER_VERSION " +
"FROM SYS.TASKS");
assertEquals(5, tasks.size());
List<?> t = tasks.get(0);
assertFalse((Boolean)t.get(0));
assertNull(t.get(1));
assertEquals(-1, t.get(2));
assertTrue(t.get(3).toString().startsWith(getClass().getName()));
assertTrue(t.get(4).toString().startsWith(getClass().getName()));
assertEquals(ignite0.localNode().id(), t.get(5));
assertEquals("0", t.get(6));
barrier.await();
}
/** */
@Test
public void testServices() throws Exception {
ServiceConfiguration srvcCfg = new ServiceConfiguration();
srvcCfg.setName("service");
srvcCfg.setMaxPerNodeCount(1);
srvcCfg.setService(new DummyService());
ignite0.services().deploy(srvcCfg);
List<List<?>> srvs = execute(ignite0,
"SELECT " +
" NAME, " +
" SERVICE_ID, " +
" SERVICE_CLASS, " +
" TOTAL_COUNT, " +
" MAX_PER_NODE_COUNT, " +
" CACHE_NAME, " +
" AFFINITY_KEY, " +
" NODE_FILTER, " +
" STATICALLY_CONFIGURED, " +
" ORIGIN_NODE_ID " +
"FROM SYS.SERVICES");
assertEquals(ignite0.context().service().serviceDescriptors().size(), srvs.size());
List<?> sysView = srvs.iterator().next();
assertEquals(srvcCfg.getName(), sysView.get(0));
assertEquals(DummyService.class.getName(), sysView.get(2));
assertEquals(srvcCfg.getMaxPerNodeCount(), sysView.get(4));
}
/** */
@Test
public void testClientsConnections() throws Exception {
String host = ignite0.configuration().getClientConnectorConfiguration().getHost();
if (host == null)
host = ignite0.configuration().getLocalHost();
int port = ignite0.configuration().getClientConnectorConfiguration().getPort();
try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(host + ":" + port))) {
try (Connection conn = new IgniteJdbcThinDriver().connect("jdbc:ignite:thin://" + host, new Properties())) {
List<List<?>> conns = execute(ignite0, "SELECT * FROM SYS.CLIENT_CONNECTIONS");
assertEquals(2, conns.size());
}
}
}
/** */
@Test
public void testTransactions() throws Exception {
IgniteCache<Integer, Integer> cache = ignite0.createCache(new CacheConfiguration<Integer, Integer>("c")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
assertTrue(execute(ignite0, "SELECT * FROM SYS.TRANSACTIONS").isEmpty());
CountDownLatch latch1 = new CountDownLatch(10);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicInteger cntr = new AtomicInteger();
GridTestUtils.runMultiThreadedAsync(() -> {
try (Transaction tx = ignite0.transactions().withLabel("test").txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
latch1.countDown();
latch2.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 5, "xxx");
GridTestUtils.runMultiThreadedAsync(() -> {
try (Transaction tx = ignite0.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
latch1.countDown();
latch2.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 5, "yyy");
latch1.await(5, TimeUnit.SECONDS);
List<List<?>> txs = execute(ignite0, "SELECT * FROM SYS.TRANSACTIONS");
assertEquals(10, txs.size());
latch2.countDown();
boolean res = waitForCondition(() -> execute(ignite0, "SELECT * FROM SYS.TRANSACTIONS").isEmpty(), 5_000);
assertTrue(res);
}
/** */
@Test
public void testSchemas() throws Exception {
try (IgniteEx g = startGrid(new IgniteConfiguration().setSqlSchemas("MY_SCHEMA", "ANOTHER_SCHEMA"))) {
SystemView<SqlSchemaView> schemasSysView = g.context().systemView().view(SQL_SCHEMA_VIEW);
Set<String> schemaFromSysView = new HashSet<>();
schemasSysView.forEach(v -> schemaFromSysView.add(v.name()));
HashSet<String> expSchemas = new HashSet<>(asList("MY_SCHEMA", "ANOTHER_SCHEMA", "SYS", "PUBLIC"));
assertEquals(schemaFromSysView, expSchemas);
List<List<?>> schemas = execute(g, "SELECT * FROM SYS.SCHEMAS");
schemaFromSysView.clear();
schemas.forEach(s -> schemaFromSysView.add(s.get(0).toString()));
assertEquals(schemaFromSysView, expSchemas);
}
}
/** */
@Test
public void testViews() throws Exception {
Set<String> expViews = new HashSet<>(asList(
"METRICS",
"SERVICES",
"CACHE_GROUPS",
"CACHES",
"TASKS",
"SQL_QUERIES_HISTORY",
"NODES",
"SCHEMAS",
"NODE_METRICS",
"BASELINE_NODES",
"INDEXES",
"LOCAL_CACHE_GROUPS_IO",
"SQL_QUERIES",
"SCAN_QUERIES",
"NODE_ATTRIBUTES",
"TABLES",
"CLIENT_CONNECTIONS",
"VIEWS",
"TABLE_COLUMNS",
"VIEW_COLUMNS",
"TRANSACTIONS",
"CONTINUOUS_QUERIES",
"STRIPED_THREADPOOL_QUEUE",
"DATASTREAM_THREADPOOL_QUEUE",
"DATA_REGION_PAGE_LISTS",
"CACHE_GROUP_PAGE_LISTS"
));
Set<String> actViews = new HashSet<>();
List<List<?>> res = execute(ignite0, "SELECT * FROM SYS.VIEWS");
for (List<?> row : res)
actViews.add(row.get(0).toString());
assertEquals(expViews, actViews);
}
/** */
@Test
public void testTable() throws Exception {
assertTrue(execute(ignite0, "SELECT * FROM SYS.TABLES").isEmpty());
execute(ignite0, "CREATE TABLE T1(ID LONG PRIMARY KEY, NAME VARCHAR)");
List<List<?>> res = execute(ignite0, "SELECT * FROM SYS.TABLES");
assertEquals(1, res.size());
List tbl = res.get(0);
int cacheId = cacheId("SQL_PUBLIC_T1");
String cacheName = "SQL_PUBLIC_T1";
assertEquals("T1", tbl.get(0)); // TABLE_NAME
assertEquals(DFLT_SCHEMA, tbl.get(1)); // SCHEMA_NAME
assertEquals(cacheName, tbl.get(2)); // CACHE_NAME
assertEquals(cacheId, tbl.get(3)); // CACHE_ID
assertNull(tbl.get(4)); // AFFINITY_KEY_COLUMN
assertEquals("ID", tbl.get(5)); // KEY_ALIAS
assertNull(tbl.get(6)); // VALUE_ALIAS
assertEquals("java.lang.Long", tbl.get(7)); // KEY_TYPE_NAME
assertNotNull(tbl.get(8)); // VALUE_TYPE_NAME
execute(ignite0, "CREATE TABLE T2(ID LONG PRIMARY KEY, NAME VARCHAR)");
assertEquals(2, execute(ignite0, "SELECT * FROM SYS.TABLES").size());
execute(ignite0, "DROP TABLE T1");
execute(ignite0, "DROP TABLE T2");
assertTrue(execute(ignite0, "SELECT * FROM SYS.TABLES").isEmpty());
}
/** */
@Test
public void testTableColumns() throws Exception {
assertTrue(execute(ignite0, "SELECT * FROM SYS.TABLE_COLUMNS").isEmpty());
execute(ignite0, "CREATE TABLE T1(ID LONG PRIMARY KEY, NAME VARCHAR(40))");
Set<?> actCols = execute(ignite0, "SELECT * FROM SYS.TABLE_COLUMNS")
.stream()
.map(l -> l.get(0))
.collect(Collectors.toSet());
assertEquals(new HashSet<>(asList("ID", "NAME", "_KEY", "_VAL")), actCols);
execute(ignite0, "CREATE TABLE T2(ID LONG PRIMARY KEY, NAME VARCHAR(50))");
List<List<?>> expRes = asList(
asList("ID", "T1", "PUBLIC", false, false, "null", true, true, -1, -1, Long.class.getName()),
asList("NAME", "T1", "PUBLIC", false, false, "null", true, false, 40, -1, String.class.getName()),
asList("_KEY", "T1", "PUBLIC", true, false, null, false, true, -1, -1, null),
asList("_VAL", "T1", "PUBLIC", false, false, null, true, false, -1, -1, null),
asList("ID", "T2", "PUBLIC", false, false, "null", true, true, -1, -1, Long.class.getName()),
asList("NAME", "T2", "PUBLIC", false, false, "null", true, false, 50, -1, String.class.getName()),
asList("_KEY", "T2", "PUBLIC", true, false, null, false, true, -1, -1, null),
asList("_VAL", "T2", "PUBLIC", false, false, null, true, false, -1, -1, null)
);
List<List<?>> res = execute(ignite0, "SELECT * FROM SYS.TABLE_COLUMNS ORDER BY TABLE_NAME, COLUMN_NAME");
assertEquals(expRes, res);
execute(ignite0, "DROP TABLE T1");
execute(ignite0, "DROP TABLE T2");
assertTrue(execute(ignite0, "SELECT * FROM SYS.TABLE_COLUMNS").isEmpty());
}
/** */
@Test
public void testViewColumns() throws Exception {
execute(ignite0, "SELECT * FROM SYS.VIEW_COLUMNS");
List<List<?>> expRes = asList(
asList("CONNECTION_ID", "CLIENT_CONNECTIONS", SCHEMA_SYS, "null", true, 19L, 0, Long.class.getName()),
asList("LOCAL_ADDRESS", "CLIENT_CONNECTIONS", SCHEMA_SYS, "null", true, (long)Integer.MAX_VALUE, 0,
String.class.getName()),
asList("REMOTE_ADDRESS", "CLIENT_CONNECTIONS", SCHEMA_SYS, "null", true, (long)Integer.MAX_VALUE, 0,
String.class.getName()),
asList("TYPE", "CLIENT_CONNECTIONS", SCHEMA_SYS, "null", true, (long)Integer.MAX_VALUE, 0,
String.class.getName()),
asList("USER", "CLIENT_CONNECTIONS", SCHEMA_SYS, "null", true, (long)Integer.MAX_VALUE, 0,
String.class.getName()),
asList("VERSION", "CLIENT_CONNECTIONS", SCHEMA_SYS, "null", true, (long)Integer.MAX_VALUE, 0,
String.class.getName())
);
List<List<?>> res = execute(ignite0, "SELECT * FROM SYS.VIEW_COLUMNS WHERE VIEW_NAME = 'CLIENT_CONNECTIONS'");
assertEquals(expRes, res);
}
/** */
@Test
public void testContinuousQuery() throws Exception {
IgniteCache<Integer, Integer> cache = ignite0.createCache("cache-1");
assertTrue(execute(ignite0, "SELECT * FROM SYS.CONTINUOUS_QUERIES").isEmpty());
assertTrue(execute(ignite1, "SELECT * FROM SYS.CONTINUOUS_QUERIES").isEmpty());
try (QueryCursor qry = cache.query(new ContinuousQuery<>()
.setInitialQuery(new ScanQuery<>())
.setPageSize(100)
.setTimeInterval(1000)
.setLocalListener(evts -> {
// No-op.
})
.setRemoteFilterFactory(() -> evt -> true)
)) {
for (int i = 0; i < 100; i++)
cache.put(i, i);
checkContinuouQueryView(ignite0, true);
checkContinuouQueryView(ignite1, false);
}
assertTrue(execute(ignite0, "SELECT * FROM SYS.CONTINUOUS_QUERIES").isEmpty());
assertTrue(execute(ignite1, "SELECT * FROM SYS.CONTINUOUS_QUERIES").isEmpty());
}
/** */
private void checkContinuouQueryView(IgniteEx g, boolean loc) {
List<List<?>> qrys = execute(g,
"SELECT " +
" CACHE_NAME, " +
" BUFFER_SIZE, " +
" INTERVAL, " +
" NODE_ID, " +
" LOCAL_LISTENER, " +
" REMOTE_FILTER, " +
" LOCAL_TRANSFORMED_LISTENER, " +
" REMOTE_TRANSFORMER " +
"FROM SYS.CONTINUOUS_QUERIES");
assertEquals(1, qrys.size());
List<?> cq = qrys.iterator().next();
assertEquals("cache-1", cq.get(0));
assertEquals(100, cq.get(1));
assertEquals(1000L, cq.get(2));
assertEquals(ignite0.localNode().id(), cq.get(3));
if (loc)
assertTrue(cq.get(4).toString().startsWith(getClass().getName()));
else
assertNull(cq.get(4));
assertTrue(cq.get(5).toString().startsWith(getClass().getName()));
assertNull(cq.get(6));
assertNull(cq.get(7));
}
/** */
private static final String SCAN_QRY_SELECT = "SELECT " +
" ORIGIN_NODE_ID," +
" QUERY_ID," +
" CACHE_NAME," +
" CACHE_ID," +
" CACHE_GROUP_ID," +
" CACHE_GROUP_NAME," +
" START_TIME," +
" DURATION," +
" CANCELED," +
" FILTER," +
" LOCAL," +
" PARTITION," +
" TOPOLOGY," +
" TRANSFORMER," +
" KEEP_BINARY," +
" SUBJECT_ID," +
" TASK_NAME, " +
" PAGE_SIZE" +
" FROM SYS.SCAN_QUERIES";
/** */
@Test
public void testLocalScanQuery() throws Exception {
IgniteCache<Integer, Integer> cache1 = ignite0.createCache(
new CacheConfiguration<Integer, Integer>("cache1")
.setGroupName("group1"));
int part = ignite0.affinity("cache1").primaryPartitions(ignite0.localNode())[0];
List<Integer> partKeys = partitionKeys(cache1, part, 11, 0);
for (Integer key : partKeys)
cache1.put(key, key);
assertEquals(0, execute(ignite0, SCAN_QRY_SELECT).size());
QueryCursor<Integer> qryRes1 = cache1.query(
new ScanQuery<Integer, Integer>()
.setFilter(new TestPredicate())
.setLocal(true)
.setPartition(part)
.setPageSize(10),
new TestTransformer());
assertTrue(qryRes1.iterator().hasNext());
boolean res = waitForCondition(() -> !execute(ignite0, SCAN_QRY_SELECT).isEmpty(), 5_000);
assertTrue(res);
List<?> view = execute(ignite0, SCAN_QRY_SELECT).get(0);
assertEquals(ignite0.localNode().id(), view.get(0));
assertEquals(0L, view.get(1));
assertEquals("cache1", view.get(2));
assertEquals(cacheId("cache1"), view.get(3));
assertEquals(cacheGroupId("cache1", "group1"), view.get(4));
assertEquals("group1", view.get(5));
assertTrue((Long)view.get(6) <= System.currentTimeMillis());
assertTrue((Long)view.get(7) >= 0);
assertFalse((Boolean)view.get(8));
assertEquals(TEST_PREDICATE, view.get(9));
assertTrue((Boolean)view.get(10));
assertEquals(part, view.get(11));
assertEquals(toStringSafe(ignite0.context().discovery().topologyVersionEx()), view.get(12));
assertEquals(TEST_TRANSFORMER, view.get(13));
assertFalse((Boolean)view.get(14));
assertNull(view.get(15));
assertNull(view.get(16));
qryRes1.close();
res = waitForCondition(() -> execute(ignite0, SCAN_QRY_SELECT).isEmpty(), 5_000);
assertTrue(res);
}
/** */
@Test
public void testScanQuery() throws Exception {
try(IgniteEx client1 = startClientGrid("client-1");
IgniteEx client2 = startClientGrid("client-2")) {
IgniteCache<Integer, Integer> cache1 = client1.createCache(
new CacheConfiguration<Integer, Integer>("cache1")
.setGroupName("group1"));
IgniteCache<Integer, Integer> cache2 = client2.createCache("cache2");
for (int i = 0; i < 100; i++) {
cache1.put(i, i);
cache2.put(i, i);
}
assertEquals(0, execute(ignite0, SCAN_QRY_SELECT).size());
assertEquals(0, execute(ignite1, SCAN_QRY_SELECT).size());
QueryCursor<Integer> qryRes1 = cache1.query(
new ScanQuery<Integer, Integer>()
.setFilter(new TestPredicate())
.setPageSize(10),
new TestTransformer());
QueryCursor<?> qryRes2 = cache2.withKeepBinary().query(new ScanQuery<>()
.setPageSize(20));
assertTrue(qryRes1.iterator().hasNext());
assertTrue(qryRes2.iterator().hasNext());
checkScanQueryView(client1, client2, ignite0);
checkScanQueryView(client1, client2, ignite1);
qryRes1.close();
qryRes2.close();
boolean res = waitForCondition(
() -> execute(ignite0, SCAN_QRY_SELECT).size() + execute(ignite1, SCAN_QRY_SELECT).size() == 0, 5_000);
assertTrue(res);
}
}
/** */
private void checkScanQueryView(IgniteEx client1, IgniteEx client2,
IgniteEx server) throws Exception {
boolean res = waitForCondition(() -> execute(server, SCAN_QRY_SELECT).size() > 1, 5_000);
assertTrue(res);
Consumer<List<?>> cache1checker = view -> {
assertEquals(client1.localNode().id(), view.get(0));
assertTrue((Long)view.get(1) != 0);
assertEquals("cache1", view.get(2));
assertEquals(cacheId("cache1"), view.get(3));
assertEquals(cacheGroupId("cache1", "group1"), view.get(4));
assertEquals("group1", view.get(5));
assertTrue((Long)view.get(6) <= System.currentTimeMillis());
assertTrue((Long)view.get(7) >= 0);
assertFalse((Boolean)view.get(8));
assertEquals(TEST_PREDICATE, view.get(9));
assertFalse((Boolean)view.get(10));
assertEquals(-1, view.get(11));
assertEquals(toStringSafe(client1.context().discovery().topologyVersionEx()), view.get(12));
assertEquals(TEST_TRANSFORMER, view.get(13));
assertFalse((Boolean)view.get(14));
assertNull(view.get(15));
assertNull(view.get(16));
assertEquals(10, view.get(17));
};
Consumer<List<?>> cache2checker = view -> {
assertEquals(client2.localNode().id(), view.get(0));
assertTrue((Long)view.get(1) != 0);
assertEquals("cache2", view.get(2));
assertEquals(cacheId("cache2"), view.get(3));
assertEquals(cacheGroupId("cache2", null), view.get(4));
assertEquals("cache2", view.get(5));
assertTrue((Long)view.get(6) <= System.currentTimeMillis());
assertTrue((Long)view.get(7) >= 0);
assertFalse((Boolean)view.get(8));
assertNull(view.get(9));
assertFalse((Boolean)view.get(10));
assertEquals(-1, view.get(11));
assertEquals(toStringSafe(client2.context().discovery().topologyVersionEx()), view.get(12));
assertNull(view.get(13));
assertTrue((Boolean)view.get(14));
assertNull(view.get(15));
assertNull(view.get(16));
assertEquals(20, view.get(17));
};
boolean found1 = false;
boolean found2 = false;
for (List<?> view : execute(server, SCAN_QRY_SELECT)) {
if ("cache2".equals(view.get(2))) {
cache2checker.accept(view);
found1 = true;
}
else {
cache1checker.accept(view);
found2 = true;
}
}
assertTrue(found1 && found2);
}
/** */
@Test
public void testStripedExecutor() throws Exception {
checkStripeExecutorView(ignite0.context().getStripedExecutorService(),
"STRIPED_THREADPOOL_QUEUE",
"sys");
}
/** */
@Test
public void testStreamerExecutor() throws Exception {
checkStripeExecutorView(ignite0.context().getDataStreamerExecutorService(),
"DATASTREAM_THREADPOOL_QUEUE",
"data-streamer");
}
/**
* Checks striped executor system view.
*
* @param execSvc Striped executor.
* @param view System view name.
* @param poolName Executor name.
*/
private void checkStripeExecutorView(StripedExecutor execSvc, String view, String poolName) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
execSvc.execute(0, new TestRunnable(latch, 0));
execSvc.execute(0, new TestRunnable(latch, 1));
execSvc.execute(1, new TestRunnable(latch, 2));
execSvc.execute(1, new TestRunnable(latch, 3));
try {
boolean res = waitForCondition(() -> execute(ignite0, "SELECT * FROM SYS." + view).size() == 2, 5_000);
assertTrue(res);
List<List<?>> stripedQueue = execute(ignite0, "SELECT * FROM SYS." + view);
List<?> row0 = stripedQueue.get(0);
assertEquals(0, row0.get(0));
assertEquals(TestRunnable.class.getSimpleName() + '1', row0.get(1));
assertEquals(poolName + "-stripe-0", row0.get(2));
assertEquals(TestRunnable.class.getName(), row0.get(3));
List<?> row1 = stripedQueue.get(1);
assertEquals(1, row1.get(0));
assertEquals(TestRunnable.class.getSimpleName() + '3', row1.get(1));
assertEquals(poolName + "-stripe-1", row1.get(2));
assertEquals(TestRunnable.class.getName(), row1.get(3));
}
finally {
latch.countDown();
}
}
/** */
@Test
public void testPagesList() throws Exception {
String cacheName = "cacheFL";
IgniteCache<Integer, byte[]> cache = ignite0.getOrCreateCache(new CacheConfiguration<Integer, byte[]>()
.setName(cacheName).setAffinity(new RendezvousAffinityFunction().setPartitions(1)));
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ignite0.context().cache().context()
.database();
int pageSize = dbMgr.pageSize();
try {
dbMgr.enableCheckpoints(false).get();
int key = 0;
// Fill up different free-list buckets.
for (int j = 0; j < pageSize / 2; j++)
cache.put(key++, new byte[j + 1]);
// Put some pages to one bucket to overflow pages cache.
for (int j = 0; j < 1000; j++)
cache.put(key++, new byte[pageSize / 2]);
// Test filtering by 3 columns.
assertFalse(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE BUCKET_NUMBER = 0 " +
"AND PARTITION_ID = 0 AND CACHE_GROUP_ID = ?", cacheId(cacheName)).isEmpty());
// Test filtering with invalid cache group id.
assertTrue(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE CACHE_GROUP_ID = ?", -1)
.isEmpty());
// Test filtering with invalid partition id.
assertTrue(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE PARTITION_ID = ?", -1)
.isEmpty());
// Test filtering with invalid bucket number.
assertTrue(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE BUCKET_NUMBER = -1")
.isEmpty());
assertFalse(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE BUCKET_SIZE > 0 " +
"AND CACHE_GROUP_ID = ?", cacheId(cacheName)).isEmpty());
assertFalse(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE STRIPES_COUNT > 0 " +
"AND CACHE_GROUP_ID = ?", cacheId(cacheName)).isEmpty());
assertFalse(execute(ignite0, "SELECT * FROM SYS.CACHE_GROUP_PAGE_LISTS WHERE CACHED_PAGES_COUNT > 0 " +
"AND CACHE_GROUP_ID = ?", cacheId(cacheName)).isEmpty());
assertFalse(execute(ignite0, "SELECT * FROM SYS.DATA_REGION_PAGE_LISTS WHERE NAME LIKE 'in-memory%'")
.isEmpty());
assertEquals(0L, execute(ignite0, "SELECT COUNT(*) FROM SYS.DATA_REGION_PAGE_LISTS " +
"WHERE NAME LIKE 'in-memory%' AND BUCKET_SIZE > 0").get(0).get(0));
}
finally {
dbMgr.enableCheckpoints(true).get();
}
ignite0.cluster().active(false);
ignite0.cluster().active(true);
IgniteCache<Integer, Integer> cacheInMemory = ignite0.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
.setName("cacheFLInMemory").setDataRegionName("in-memory"));
cacheInMemory.put(0, 0);
// After activation/deactivation new view for data region pages lists should be created, check that new view
// correctly reflects changes in free-lists.
assertFalse(execute(ignite0, "SELECT * FROM SYS.DATA_REGION_PAGE_LISTS WHERE NAME LIKE 'in-memory%' AND " +
"BUCKET_SIZE > 0").isEmpty());
}
/**
* Execute query on given node.
*
* @param node Node.
* @param sql Statement.
*/
private List<List<?>> execute(Ignite node, String sql, Object... args) {
SqlFieldsQuery qry = new SqlFieldsQuery(sql)
.setArgs(args)
.setSchema("PUBLIC");
return queryProcessor(node).querySqlFields(qry, true).getAll();
}
}