blob: 49cb8fa03522d24b99e8b09cbeb9e39977e5e0c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metric;
import java.lang.management.ManagementFactory;
import java.sql.Connection;
import java.text.DateFormat;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.management.DynamicMBean;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanFeatureInfo;
import javax.management.MBeanOperationInfo;
import javax.management.MBeanParameterInfo;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularDataSupport;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteJdbcThinDriver;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.client.thin.ProtocolVersion;
import org.apache.ignite.internal.managers.systemview.walker.CachePagesListViewWalker;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestPredicate;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestRunnable;
import org.apache.ignite.internal.metric.SystemViewSelfTest.TestTransformer;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext;
import org.apache.ignite.internal.processors.service.DummyService;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.RunnableX;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.managers.systemview.GridSystemViewManager.STREAM_POOL_QUEUE_VIEW;
import static org.apache.ignite.internal.managers.systemview.GridSystemViewManager.SYS_POOL_QUEUE_VIEW;
import static org.apache.ignite.internal.managers.systemview.ScanQuerySystemView.SCAN_QRY_SYS_VIEW;
import static org.apache.ignite.internal.metric.SystemViewSelfTest.TEST_PREDICATE;
import static org.apache.ignite.internal.metric.SystemViewSelfTest.TEST_TRANSFORMER;
import static org.apache.ignite.internal.processors.cache.CacheMetricsImpl.CACHE_METRICS;
import static org.apache.ignite.internal.processors.cache.ClusterCachesInfo.CACHES_VIEW;
import static org.apache.ignite.internal.processors.cache.ClusterCachesInfo.CACHE_GRPS_VIEW;
import static org.apache.ignite.internal.processors.cache.GridCacheProcessor.CACHE_GRP_PAGE_LIST_VIEW;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheGroupId;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
import static org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.CQ_SYS_VIEW;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.CPU_LOAD;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.CPU_LOAD_DESCRIPTION;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.GC_CPU_LOAD;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.GC_CPU_LOAD_DESCRIPTION;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.IGNITE_METRICS;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLI_CONN_VIEW;
import static org.apache.ignite.internal.processors.service.IgniteServiceProcessor.SVCS_VIEW;
import static org.apache.ignite.internal.processors.task.GridTaskProcessor.TASKS_VIEW;
import static org.apache.ignite.internal.util.IgniteUtils.toStringSafe;
import static org.apache.ignite.spi.metric.jmx.MetricRegistryMBean.searchHistogram;
import static org.apache.ignite.spi.systemview.jmx.SystemViewMBean.FILTER_OPERATION;
import static org.apache.ignite.spi.systemview.jmx.SystemViewMBean.VIEWS;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
/** */
public class JmxExporterSpiTest extends AbstractExporterSpiTest {
/** */
private static IgniteEx ignite;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)));
JmxMetricExporterSpi jmxSpi = new JmxMetricExporterSpi();
jmxSpi.setExportFilter(mgrp -> !mgrp.name().startsWith(FILTERED_PREFIX));
cfg.setMetricExporterSpi(jmxSpi);
cfg.setClientMode(igniteInstanceName.startsWith("client"));
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
cleanPersistenceDir();
ignite = startGrid(0);
ignite.cluster().active(true);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
Collection<String> caches = ignite.cacheNames();
for (String cache : caches)
ignite.destroyCache(cache);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids(true);
cleanPersistenceDir();
}
/** */
@Test
public void testSysJmxMetrics() throws Exception {
DynamicMBean sysMBean = metricRegistry(ignite.name(), null, SYS_METRICS);
Set<String> res = stream(sysMBean.getMBeanInfo().getAttributes())
.map(MBeanFeatureInfo::getName)
.collect(toSet());
assertTrue(res.contains(CPU_LOAD));
assertTrue(res.contains(GC_CPU_LOAD));
assertTrue(res.contains(metricName("memory", "heap", "init")));
assertTrue(res.contains(metricName("memory", "heap", "used")));
assertTrue(res.contains(metricName("memory", "nonheap", "committed")));
assertTrue(res.contains(metricName("memory", "nonheap", "max")));
Optional<MBeanAttributeInfo> cpuLoad = stream(sysMBean.getMBeanInfo().getAttributes())
.filter(a -> a.getName().equals(CPU_LOAD))
.findFirst();
assertTrue(cpuLoad.isPresent());
assertEquals(CPU_LOAD_DESCRIPTION, cpuLoad.get().getDescription());
Optional<MBeanAttributeInfo> gcCpuLoad = stream(sysMBean.getMBeanInfo().getAttributes())
.filter(a -> a.getName().equals(GC_CPU_LOAD))
.findFirst();
assertTrue(gcCpuLoad.isPresent());
assertEquals(GC_CPU_LOAD_DESCRIPTION, gcCpuLoad.get().getDescription());
}
/** */
@Test
public void testDataRegionJmxMetrics() throws Exception {
DynamicMBean dataRegionMBean = metricRegistry(ignite.name(), "io", "dataregion.default");
Set<String> res = stream(dataRegionMBean.getMBeanInfo().getAttributes())
.map(MBeanFeatureInfo::getName)
.collect(toSet());
assertTrue(res.containsAll(EXPECTED_ATTRIBUTES));
for (String metricName : res)
assertNotNull(metricName, dataRegionMBean.getAttribute(metricName));
}
/** */
@Test
public void testUnregisterRemovedRegistry() throws Exception {
String n = "cache-for-remove";
IgniteCache c = ignite.createCache(n);
DynamicMBean cacheBean = mbean(ignite, CACHE_METRICS, n);
assertNotNull(cacheBean);
ignite.destroyCache(n);
assertThrowsWithCause(() -> mbean(ignite, CACHE_METRICS, n), IgniteException.class);
}
/** */
@Test
public void testFilterAndExport() throws Exception {
createAdditionalMetrics(ignite);
assertThrowsWithCause(new RunnableX() {
@Override public void runx() throws Exception {
metricRegistry(ignite.name(), "filtered", "metric");
}
}, IgniteException.class);
DynamicMBean bean1 = metricRegistry(ignite.name(), "other", "prefix");
assertEquals(42L, bean1.getAttribute("test"));
assertEquals(43L, bean1.getAttribute("test2"));
DynamicMBean bean2 = metricRegistry(ignite.name(), "other", "prefix2");
assertEquals(44L, bean2.getAttribute("test3"));
}
/** */
@Test
public void testCachesView() throws Exception {
Set<String> cacheNames = new HashSet<>(Arrays.asList("cache-1", "cache-2"));
for (String name : cacheNames)
ignite.createCache(name);
TabularDataSupport data = systemView(CACHES_VIEW);
assertEquals(ignite.context().cache().cacheDescriptors().size(), data.size());
for (int i = 0; i < data.size(); i++) {
CompositeData row = data.get(new Object[] {i});
cacheNames.remove(row.get("cacheName"));
}
assertTrue(cacheNames.toString(), cacheNames.isEmpty());
}
/** */
@Test
public void testCacheGroupsView() throws Exception {
Set<String> grpNames = new HashSet<>(Arrays.asList("grp-1", "grp-2"));
for (String grpName : grpNames)
ignite.createCache(new CacheConfiguration<>("cache-" + grpName).setGroupName(grpName));
TabularDataSupport grps = systemView(CACHE_GRPS_VIEW);
assertEquals(ignite.context().cache().cacheGroupDescriptors().size(), grps.size());
for (Map.Entry entry : grps.entrySet()) {
CompositeData row = (CompositeData)entry.getValue();
grpNames.remove(row.get("cacheGroupName"));
}
assertTrue(grpNames.toString(), grpNames.isEmpty());
}
/** */
@Test
public void testServices() throws Exception {
ServiceConfiguration srvcCfg = new ServiceConfiguration();
srvcCfg.setName("service");
srvcCfg.setMaxPerNodeCount(1);
srvcCfg.setService(new DummyService());
ignite.services().deploy(srvcCfg);
TabularDataSupport srvs = systemView(SVCS_VIEW);
assertEquals(ignite.context().service().serviceDescriptors().size(), srvs.size());
CompositeData sysView = srvs.get(new Object[] {0});
assertEquals(srvcCfg.getName(), sysView.get("name"));
assertEquals(srvcCfg.getMaxPerNodeCount(), sysView.get("maxPerNodeCount"));
assertEquals(DummyService.class.getName(), sysView.get("serviceClass"));
}
/** */
@Test
public void testComputeBroadcast() throws Exception {
CyclicBarrier barrier = new CyclicBarrier(6);
for (int i = 0; i < 5; i++) {
ignite.compute().broadcastAsync(() -> {
try {
barrier.await();
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
}
barrier.await();
TabularDataSupport tasks = systemView(TASKS_VIEW);
assertEquals(5, tasks.size());
CompositeData t = tasks.get(new Object[] {0});
assertFalse((Boolean)t.get("internal"));
assertNull(t.get("affinityCacheName"));
assertEquals(-1, t.get("affinityPartitionId"));
assertTrue(t.get("taskClassName").toString().startsWith(getClass().getName()));
assertTrue(t.get("taskName").toString().startsWith(getClass().getName()));
assertEquals(ignite.localNode().id().toString(), t.get("taskNodeId"));
assertEquals("0", t.get("userVersion"));
barrier.await();
}
/** */
@Test
public void testClientsConnections() throws Exception {
String host = ignite.configuration().getClientConnectorConfiguration().getHost();
if (host == null)
host = ignite.configuration().getLocalHost();
int port = ignite.configuration().getClientConnectorConfiguration().getPort();
try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(host + ":" + port))) {
try (Connection conn = new IgniteJdbcThinDriver().connect("jdbc:ignite:thin://" + host, new Properties())) {
TabularDataSupport conns = systemView(CLI_CONN_VIEW);
Consumer<CompositeData> checkThin = c -> {
assertEquals("THIN", c.get("type"));
assertTrue(c.get("localAddress").toString().endsWith(Integer.toString(port)));
assertEquals(c.get("version"), ProtocolVersion.CURRENT_VER.toString());
};
Consumer<CompositeData> checkJdbc = c -> {
assertEquals("JDBC", c.get("type"));
assertTrue(c.get("localAddress").toString().endsWith(Integer.toString(port)));
assertEquals(c.get("version"), JdbcConnectionContext.CURRENT_VER.asString());
};
CompositeData c0 = conns.get(new Object[] {0});
CompositeData c1 = conns.get(new Object[] {1});
if (c0.get("type").equals("JDBC")) {
checkJdbc.accept(c0);
checkThin.accept(c1);
}
else {
checkJdbc.accept(c1);
checkThin.accept(c0);
}
assertEquals(2, conns.size());
}
}
boolean res = GridTestUtils.waitForCondition(() -> systemView(CLI_CONN_VIEW).isEmpty(), 5_000);
assertTrue(res);
}
/** */
@Test
public void testContinuousQuery() throws Exception {
try (IgniteEx remoteNode = startGrid(1)) {
IgniteCache<Integer, Integer> cache = ignite.createCache("cache-1");
assertEquals(0, systemView(CQ_SYS_VIEW).size());
assertEquals(0, systemView(remoteNode, CQ_SYS_VIEW).size());
try (QueryCursor qry = cache.query(new ContinuousQuery<>()
.setInitialQuery(new ScanQuery<>())
.setPageSize(100)
.setTimeInterval(1000)
.setLocalListener(evts -> {
// No-op.
})
.setRemoteFilterFactory(() -> evt -> true)
)) {
for (int i = 0; i < 100; i++)
cache.put(i, i);
checkContinuousQueryView(ignite, ignite);
checkContinuousQueryView(ignite, remoteNode);
}
assertEquals(0, systemView(CQ_SYS_VIEW).size());
assertEquals(0, systemView(remoteNode, CQ_SYS_VIEW).size());
}
}
/** */
private void checkContinuousQueryView(IgniteEx origNode, IgniteEx checkNode) {
TabularDataSupport qrys = systemView(checkNode, CQ_SYS_VIEW);
assertEquals(1, qrys.size());
for (int i = 0; i < qrys.size(); i++) {
CompositeData cq = qrys.get(new Object[] {i});
assertEquals("cache-1", cq.get("cacheName"));
assertEquals(100, cq.get("bufferSize"));
assertEquals(1000L, cq.get("interval"));
assertEquals(origNode.localNode().id().toString(), cq.get("nodeId"));
if (origNode.localNode().id().equals(checkNode.localNode().id()))
assertTrue(cq.get("localListener").toString().startsWith(getClass().getName()));
else
assertNull(cq.get("localListener"));
assertTrue(cq.get("remoteFilter").toString().startsWith(getClass().getName()));
assertNull(cq.get("localTransformedListener"));
assertNull(cq.get("remoteTransformer"));
}
}
/** */
public TabularDataSupport systemView(String name) {
return systemView(ignite, name);
}
/** */
public TabularDataSupport systemView(IgniteEx g, String name) {
try {
DynamicMBean caches = mbean(g, VIEWS, name);
MBeanAttributeInfo[] attrs = caches.getMBeanInfo().getAttributes();
assertEquals(1, attrs.length);
return (TabularDataSupport)caches.getAttribute(VIEWS);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/** */
public TabularDataSupport filteredSystemView(IgniteEx g, String name, Map<String, Object> filter) {
try {
DynamicMBean mbean = mbean(g, VIEWS, name);
MBeanOperationInfo[] opers = mbean.getMBeanInfo().getOperations();
assertEquals(1, opers.length);
assertEquals(FILTER_OPERATION, opers[0].getName());
MBeanParameterInfo[] paramInfo = opers[0].getSignature();
Object params[] = new Object[paramInfo.length];
String signature[] = new String[paramInfo.length];
for (int i = 0; i < paramInfo.length; i++) {
params[i] = filter.get(paramInfo[i].getName());
signature[i] = paramInfo[i].getType();
}
return (TabularDataSupport)mbean.invoke(FILTER_OPERATION, params, signature);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/** */
public static DynamicMBean mbean(IgniteEx g, String grp, String name) throws MalformedObjectNameException {
ObjectName mbeanName = U.makeMBeanName(g.name(), grp, name);
MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer();
if (!mbeanSrv.isRegistered(mbeanName))
throw new IgniteException("MBean not registered.");
return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, DynamicMBean.class, false);
}
/** */
@Test
public void testHistogramSearchByName() throws Exception {
MetricRegistry mreg = new MetricRegistry("test", name -> null, name -> null, null);
createTestHistogram(mreg);
assertEquals(Long.valueOf(1), searchHistogram("histogram_0_50", mreg));
assertEquals(Long.valueOf(2), searchHistogram("histogram_50_500", mreg));
assertEquals(Long.valueOf(3), searchHistogram("histogram_500_inf", mreg));
assertNull(searchHistogram("unknown", mreg));
assertNull(searchHistogram("unknown_0", mreg));
assertNull(searchHistogram("unknown_0_50", mreg));
assertNull(searchHistogram("unknown_test", mreg));
assertNull(searchHistogram("unknown_test_test", mreg));
assertNull(searchHistogram("unknown_0_inf", mreg));
assertNull(searchHistogram("histogram", mreg));
assertNull(searchHistogram("histogram_0", mreg));
assertNull(searchHistogram("histogram_0_100", mreg));
assertNull(searchHistogram("histogram_0_inf", mreg));
assertNull(searchHistogram("histogram_0_500", mreg));
}
/** */
@Test
public void testHistogramExport() throws Exception {
MetricRegistry mreg = ignite.context().metric().registry("histogramTest");
createTestHistogram(mreg);
DynamicMBean bean = metricRegistry(ignite.name(), null, "histogramTest");
MBeanAttributeInfo[] attrs = bean.getMBeanInfo().getAttributes();
assertEquals(3, attrs.length);
assertEquals(1L, bean.getAttribute("histogram_0_50"));
assertEquals(2L, bean.getAttribute("histogram_50_500"));
assertEquals(3L, bean.getAttribute("histogram_500_inf"));
}
/** */
@Test
public void testTransactions() throws Exception {
IgniteCache<Integer, Integer> cache = ignite.createCache(new CacheConfiguration<Integer, Integer>("c")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));
assertEquals(0, systemView(TXS_MON_LIST).size());
CountDownLatch latch = new CountDownLatch(1);
try {
AtomicInteger cntr = new AtomicInteger();
GridTestUtils.runMultiThreadedAsync(() -> {
try (Transaction tx = ignite.transactions().withLabel("test").txStart(PESSIMISTIC, REPEATABLE_READ)) {
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
latch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 5, "xxx");
boolean res = waitForCondition(() -> systemView(TXS_MON_LIST).size() == 5, 10_000L);
assertTrue(res);
CompositeData txv = systemView(TXS_MON_LIST).get(new Object[] {0});
assertEquals(ignite.localNode().id().toString(), txv.get("localNodeId"));
assertEquals(REPEATABLE_READ.name(), txv.get("isolation"));
assertEquals(PESSIMISTIC.name(), txv.get("concurrency"));
assertEquals(ACTIVE.name(), txv.get("state"));
assertNotNull(txv.get("xid"));
assertFalse((boolean)txv.get("system"));
assertFalse((boolean)txv.get("implicit"));
assertFalse((boolean)txv.get("implicitSingle"));
assertTrue((boolean)txv.get("near"));
assertFalse((boolean)txv.get("dht"));
assertTrue((boolean)txv.get("colocated"));
assertTrue((boolean)txv.get("local"));
assertEquals("test", txv.get("label"));
assertFalse((boolean)txv.get("onePhaseCommit"));
assertFalse((boolean)txv.get("internal"));
assertEquals(0L, txv.get("timeout"));
assertTrue(((long)txv.get("startTime")) <= System.currentTimeMillis());
//Only pessimistic transactions are supported when MVCC is enabled.
if(Objects.equals(System.getProperty(IgniteSystemProperties.IGNITE_FORCE_MVCC_MODE_IN_TESTS), "true"))
return;
GridTestUtils.runMultiThreadedAsync(() -> {
try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
cache.put(cntr.incrementAndGet(), cntr.incrementAndGet());
latch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 5, "xxx");
res = waitForCondition(() -> systemView(TXS_MON_LIST).size() == 10, 10_000L);
assertTrue(res);
for (int i=0; i<9; i++) {
txv = systemView(TXS_MON_LIST).get(new Object[] {i});
if (PESSIMISTIC.name().equals(txv.get("concurrency")))
continue;
assertEquals(ignite.localNode().id().toString(), txv.get("localNodeId"));
assertEquals(SERIALIZABLE.name(), txv.get("isolation"));
assertEquals(OPTIMISTIC.name(), txv.get("concurrency"));
assertEquals(ACTIVE.name(), txv.get("state"));
assertNotNull(txv.get("xid"));
assertFalse((boolean)txv.get("system"));
assertFalse((boolean)txv.get("implicit"));
assertFalse((boolean)txv.get("implicitSingle"));
assertTrue((boolean)txv.get("near"));
assertFalse((boolean)txv.get("dht"));
assertTrue((boolean)txv.get("colocated"));
assertTrue((boolean)txv.get("local"));
assertNull(txv.get("label"));
assertFalse((boolean)txv.get("onePhaseCommit"));
assertFalse((boolean)txv.get("internal"));
assertEquals(0L, txv.get("timeout"));
assertTrue(((long)txv.get("startTime")) <= System.currentTimeMillis());
}
}
finally {
latch.countDown();
}
boolean res = waitForCondition(() -> systemView(TXS_MON_LIST).isEmpty(), 10_000L);
assertTrue(res);
}
/** */
@Test
public void testLocalScanQuery() throws Exception {
IgniteCache<Integer, Integer> cache1 = ignite.createCache(
new CacheConfiguration<Integer, Integer>("cache1")
.setGroupName("group1"));
int part = ignite.affinity("cache1").primaryPartitions(ignite.localNode())[0];
List<Integer> partKeys = partitionKeys(cache1, part, 11, 0);
for (Integer key : partKeys)
cache1.put(key, key);
TabularDataSupport qrySysView0 = systemView(SCAN_QRY_SYS_VIEW);
assertNotNull(qrySysView0);
assertEquals(0, qrySysView0.size());
QueryCursor<Integer> qryRes1 = cache1.query(
new ScanQuery<Integer, Integer>()
.setFilter(new TestPredicate())
.setLocal(true)
.setPartition(part)
.setPageSize(10),
new TestTransformer());
assertTrue(qryRes1.iterator().hasNext());
boolean res = waitForCondition(() -> !systemView(SCAN_QRY_SYS_VIEW).isEmpty(), 5_000);
assertTrue(res);
CompositeData view = systemView(SCAN_QRY_SYS_VIEW).get(new Object[] {0});
assertEquals(ignite.localNode().id().toString(), view.get("originNodeId"));
assertEquals(0L, view.get("queryId"));
assertEquals("cache1", view.get("cacheName"));
assertEquals(cacheId("cache1"), view.get("cacheId"));
assertEquals(cacheGroupId("cache1", "group1"), view.get("cacheGroupId"));
assertEquals("group1", view.get("cacheGroupName"));
assertTrue((Long)view.get("startTime") <= System.currentTimeMillis());
assertTrue((Long)view.get("duration") >= 0);
assertFalse((Boolean)view.get("canceled"));
assertEquals(TEST_PREDICATE, view.get("filter"));
assertTrue((Boolean)view.get("local"));
assertEquals(part, view.get("partition"));
assertEquals(toStringSafe(ignite.context().discovery().topologyVersionEx()), view.get("topology"));
assertEquals(TEST_TRANSFORMER, view.get("transformer"));
assertFalse((Boolean)view.get("keepBinary"));
assertEquals("null", view.get("subjectId"));
assertNull(view.get("taskName"));
qryRes1.close();
res = waitForCondition(() -> systemView(SCAN_QRY_SYS_VIEW).isEmpty(), 5_000);
assertTrue(res);
}
/** */
@Test
public void testScanQuery() throws Exception {
try(IgniteEx client1 = startGrid("client-1");
IgniteEx client2 = startGrid("client-2")) {
IgniteCache<Integer, Integer> cache1 = client1.createCache(
new CacheConfiguration<Integer, Integer>("cache1")
.setGroupName("group1"));
IgniteCache<Integer, Integer> cache2 = client2.createCache("cache2");
awaitPartitionMapExchange();
for (int i = 0; i < 100; i++) {
cache1.put(i, i);
cache2.put(i, i);
}
TabularDataSupport qrySysView0 = systemView(ignite, SCAN_QRY_SYS_VIEW);
assertNotNull(qrySysView0);
assertEquals(0, qrySysView0.size());
QueryCursor<Integer> qryRes1 = cache1.query(
new ScanQuery<Integer, Integer>()
.setFilter(new TestPredicate())
.setPageSize(10),
new TestTransformer());
QueryCursor<?> qryRes2 = cache2.withKeepBinary().query(new ScanQuery<>()
.setPageSize(20));
assertTrue(qryRes1.iterator().hasNext());
assertTrue(qryRes2.iterator().hasNext());
checkScanQueryView(client1, client2, ignite);
qryRes1.close();
qryRes2.close();
boolean res = waitForCondition(() -> systemView(ignite, SCAN_QRY_SYS_VIEW).isEmpty(), 5_000);
assertTrue(res);
}
}
/** @throws Exception If failed. */
@Test
public void testIgniteKernal() throws Exception {
DynamicMBean mbn = metricRegistry(ignite.name(), null, IGNITE_METRICS);
assertNotNull(mbn);
assertEquals(36, mbn.getMBeanInfo().getAttributes().length);
assertFalse(stream(mbn.getMBeanInfo().getAttributes()).anyMatch(a-> F.isEmpty(a.getDescription())));
assertFalse(F.isEmpty((String)mbn.getAttribute("fullVersion")));
assertFalse(F.isEmpty((String)mbn.getAttribute("copyright")));
assertFalse(F.isEmpty((String)mbn.getAttribute("osInformation")));
assertFalse(F.isEmpty((String)mbn.getAttribute("jdkInformation")));
assertFalse(F.isEmpty((String)mbn.getAttribute("vmName")));
assertFalse(F.isEmpty((String)mbn.getAttribute("discoverySpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("communicationSpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("deploymentSpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("checkpointSpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("collisionSpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("eventStorageSpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("failoverSpiFormatted")));
assertFalse(F.isEmpty((String)mbn.getAttribute("loadBalancingSpiFormatted")));
assertEquals(System.getProperty("user.name"), (String)mbn.getAttribute("osUser"));
assertNotNull(DateFormat.getDateTimeInstance().parse((String)mbn.getAttribute("startTimestampFormatted")));
assertNotNull(LocalTime.parse((String)mbn.getAttribute("uptimeFormatted")));
assertTrue((boolean)mbn.getAttribute("isRebalanceEnabled"));
assertTrue((boolean)mbn.getAttribute("isNodeInBaseline"));
assertTrue((boolean)mbn.getAttribute("active"));
assertTrue((long)mbn.getAttribute("startTimestamp") > 0);
assertTrue((long)mbn.getAttribute("uptime") > 0);
assertEquals(ignite.name(), (String)mbn.getAttribute("instanceName"));
assertEquals(Collections.emptyList(), mbn.getAttribute("userAttributesFormatted"));
assertEquals(Collections.emptyList(), mbn.getAttribute("lifecycleBeansFormatted"));
assertEquals(Collections.emptyMap(), mbn.getAttribute("longJVMPauseLastEvents"));
assertEquals(0L, mbn.getAttribute("longJVMPausesCount"));
assertEquals(0L, mbn.getAttribute("longJVMPausesTotalDuration"));
long clusterStateChangeTime = (long)mbn.getAttribute("lastClusterStateChangeTime");
assertTrue(0 < clusterStateChangeTime && clusterStateChangeTime < System.currentTimeMillis());
assertEquals(String.valueOf(ignite.configuration().getPublicThreadPoolSize()),
mbn.getAttribute("executorServiceFormatted"));
assertEquals(ignite.configuration().isPeerClassLoadingEnabled(), mbn.getAttribute("isPeerClassLoadingEnabled"));
assertTrue(((String)mbn.getAttribute("currentCoordinatorFormatted"))
.contains(ignite.localNode().id().toString()));
assertEquals(ignite.configuration().getIgniteHome(), (String)mbn.getAttribute("igniteHome"));
assertEquals(ignite.localNode().id(), mbn.getAttribute("localNodeId"));
assertEquals(ignite.configuration().getGridLogger().toString(),
(String)mbn.getAttribute("gridLoggerFormatted"));
assertEquals(ignite.configuration().getMBeanServer().toString(),
(String)mbn.getAttribute("mBeanServerFormatted"));
assertEquals(ClusterState.ACTIVE.toString(), mbn.getAttribute("clusterState"));
}
/** */
private void checkScanQueryView(IgniteEx client1, IgniteEx client2, IgniteEx server) throws Exception {
boolean res = waitForCondition(() -> systemView(server, SCAN_QRY_SYS_VIEW).size() > 1, 5_000);
assertTrue(res);
Consumer<CompositeData> cache1checker = view -> {
assertEquals(client1.localNode().id().toString(), view.get("originNodeId"));
assertTrue((Long)view.get("queryId") != 0);
assertEquals("cache1", view.get("cacheName"));
assertEquals(cacheId("cache1"), view.get("cacheId"));
assertEquals(cacheGroupId("cache1", "group1"), view.get("cacheGroupId"));
assertEquals("group1", view.get("cacheGroupName"));
assertTrue((Long)view.get("startTime") <= System.currentTimeMillis());
assertTrue((Long)view.get("duration") >= 0);
assertFalse((Boolean)view.get("canceled"));
assertEquals(TEST_PREDICATE, view.get("filter"));
assertFalse((Boolean)view.get("local"));
assertEquals(-1, view.get("partition"));
assertEquals(toStringSafe(client1.context().discovery().topologyVersionEx()), view.get("topology"));
assertEquals(TEST_TRANSFORMER, view.get("transformer"));
assertFalse((Boolean)view.get("keepBinary"));
assertEquals("null", view.get("subjectId"));
assertNull(view.get("taskName"));
assertEquals(10, view.get("pageSize"));
};
Consumer<CompositeData> cache2checker = view -> {
assertEquals(client2.localNode().id().toString(), view.get("originNodeId"));
assertTrue((Long)view.get("queryId") != 0);
assertEquals("cache2", view.get("cacheName"));
assertEquals(cacheId("cache2"), view.get("cacheId"));
assertEquals(cacheGroupId("cache2", null), view.get("cacheGroupId"));
assertEquals("cache2", view.get("cacheGroupName"));
assertTrue((Long)view.get("startTime") <= System.currentTimeMillis());
assertTrue((Long)view.get("duration") >= 0);
assertFalse((Boolean)view.get("canceled"));
assertNull(view.get("filter"));
assertFalse((Boolean)view.get("local"));
assertEquals(-1, view.get("partition"));
assertEquals(toStringSafe(client2.context().discovery().topologyVersionEx()), view.get("topology"));
assertNull(view.get("transformer"));
assertTrue((Boolean)view.get("keepBinary"));
assertEquals("null", view.get("subjectId"));
assertNull(view.get("taskName"));
assertEquals(20, view.get("pageSize"));
};
boolean found1 = false;
boolean found2 = false;
TabularDataSupport qrySysView = systemView(server, SCAN_QRY_SYS_VIEW);
for (int i=0; i < qrySysView.size(); i++) {
CompositeData view = systemView(SCAN_QRY_SYS_VIEW).get(new Object[] {i});
if ("cache2".equals(view.get("cacheName"))) {
cache2checker.accept(view);
found1 = true;
}
else {
cache1checker.accept(view);
found2 = true;
}
}
assertTrue(found1 && found2);
}
/** */
@Test
public void testSysStripedExecutor() throws Exception {
checkStripeExecutorView(ignite.context().getStripedExecutorService(),
SYS_POOL_QUEUE_VIEW,
"sys");
}
/** */
@Test
public void testStreamerStripedExecutor() throws Exception {
checkStripeExecutorView(ignite.context().getDataStreamerExecutorService(),
STREAM_POOL_QUEUE_VIEW,
"data-streamer");
}
/**
* Checks striped executor system view.
*
* @param execSvc Striped executor.
* @param viewName System view.
* @param poolName Executor name.
*/
private void checkStripeExecutorView(StripedExecutor execSvc, String viewName, String poolName) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
execSvc.execute(0, new TestRunnable(latch, 0));
execSvc.execute(0, new TestRunnable(latch, 1));
execSvc.execute(1, new TestRunnable(latch, 2));
execSvc.execute(1, new TestRunnable(latch, 3));
try {
boolean res = waitForCondition(() -> systemView(viewName).size() == 2, 5_000);
assertTrue(res);
TabularDataSupport view = systemView(viewName);
CompositeData row0 = view.get(new Object[] {0});
assertEquals(0, row0.get("stripeIndex"));
assertEquals(TestRunnable.class.getSimpleName() + '1', row0.get("description"));
assertEquals(poolName + "-stripe-0", row0.get("threadName"));
assertEquals(TestRunnable.class.getName(), row0.get("taskName"));
CompositeData row1 = view.get(new Object[] {1});
assertEquals(1, row1.get("stripeIndex"));
assertEquals(TestRunnable.class.getSimpleName() + '3', row1.get("description"));
assertEquals(poolName + "-stripe-1", row1.get("threadName"));
assertEquals(TestRunnable.class.getName(), row1.get("taskName"));
}
finally {
latch.countDown();
}
}
/** */
@Test
public void testPagesList() throws Exception {
String cacheName = "cacheFL";
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(new CacheConfiguration<Integer, Integer>()
.setName(cacheName).setAffinity(new RendezvousAffinityFunction().setPartitions(2)));
// Put some data to cache to init cache partitions.
for (int i = 0; i < 10; i++)
cache.put(i, i);
TabularDataSupport view = filteredSystemView(ignite, CACHE_GRP_PAGE_LIST_VIEW, U.map(
CachePagesListViewWalker.CACHE_GROUP_ID_FILTER, cacheId(cacheName),
CachePagesListViewWalker.PARTITION_ID_FILTER, 0,
CachePagesListViewWalker.BUCKET_NUMBER_FILTER, 0
));
assertEquals(1, view.size());
view = filteredSystemView(ignite, CACHE_GRP_PAGE_LIST_VIEW, U.map(
CachePagesListViewWalker.CACHE_GROUP_ID_FILTER, cacheId(cacheName),
CachePagesListViewWalker.BUCKET_NUMBER_FILTER, 0
));
assertEquals(2, view.size());
}
/** */
private void createTestHistogram(MetricRegistry mreg) {
long[] bounds = new long[] {50, 500};
HistogramMetricImpl histogram = mreg.histogram("histogram", bounds, null);
histogram.value(10);
histogram.value(51);
histogram.value(60);
histogram.value(600);
histogram.value(600);
histogram.value(600);
}
}