blob: 9a42ff115dfe55405fa7b0a4e33bc712577e06ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.DataRegionMetrics;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.spi.metric.HistogramMetric;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.Collections.emptyList;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl.DATASTORAGE_METRIC_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
*/
public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest {
/** */
private static final String GROUP1 = "grp1";
/** */
private static final String NO_PERSISTENCE = "no-persistence";
/** */
private final ListeningTestLogger listeningLog = new ListeningTestLogger(log);
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setConsistentId(gridName);
long maxRegionSize = 20L * 1024 * 1024;
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(maxRegionSize)
.setPersistenceEnabled(true)
.setMetricsEnabled(true)
.setName("dflt-plc"))
.setDataRegionConfigurations(new DataRegionConfiguration()
.setMaxSize(maxRegionSize)
.setPersistenceEnabled(false)
.setMetricsEnabled(true)
.setName(NO_PERSISTENCE))
.setWalMode(WALMode.LOG_ONLY)
.setMetricsEnabled(true);
cfg.setDataStorageConfiguration(memCfg);
cfg.setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(false));
cfg.setCacheConfiguration(
cacheConfiguration(GROUP1, "cache", PARTITIONED, ATOMIC, 1, null),
cacheConfiguration(null, "cache-np", PARTITIONED, ATOMIC, 1, NO_PERSISTENCE));
cfg.setGridLogger(listeningLog);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTest();
}
/**
* @param grpName Cache group name.
* @param name Cache name.
* @param cacheMode Cache mode.
* @param atomicityMode Atomicity mode.
* @param backups Backups number.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(
String grpName,
String name,
CacheMode cacheMode,
CacheAtomicityMode atomicityMode,
int backups,
String dataRegName
) {
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName(name);
ccfg.setGroupName(grpName);
ccfg.setAtomicityMode(atomicityMode);
ccfg.setBackups(backups);
ccfg.setCacheMode(cacheMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setDataRegionName(dataRegName);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
if (NO_PERSISTENCE.equals(dataRegName))
ccfg.setDiskPageCompression(null);
return ccfg;
}
/**
* @throws Exception if failed.
*/
@Test
public void testPersistenceMetrics() throws Exception {
final IgniteEx ig = startGrid(0);
ig.active(true);
try {
IgniteCache<Object, Object> cache = ig.cache("cache");
for (int i = 0; i < 10; i++)
cache.put(i, new Person("first-" + i, "last-" + i));
IgniteCache<Object, Object> cacheNp = ig.cache("cache-np");
for (int i = 0; i < 10; i++)
cacheNp.put(i, new Person("first-" + i, "last-" + i));
DataRegionMetrics memMetrics = ig.dataRegionMetrics("dflt-plc");
assertNotNull(memMetrics);
assertTrue(memMetrics.getDirtyPages() > 0);
assertTrue(memMetrics.getPagesFillFactor() > 0);
memMetrics = ig.dataRegionMetrics("no-persistence");
assertNotNull(memMetrics);
assertTrue(memMetrics.getTotalAllocatedPages() > 0);
assertTrue(memMetrics.getPagesFillFactor() > 0);
ig.context().cache().context().database().waitForCheckpoint("test");
assertTrue(waitForCondition(new PAX() {
@Override public boolean applyx() {
DataStorageMetrics pMetrics = ig.dataStorageMetrics();
assertNotNull(pMetrics);
return pMetrics.getLastCheckpointTotalPagesNumber() != 0 &&
pMetrics.getLastCheckpointDataPagesNumber() != 0;
}
}, 10_000));
}
finally {
stopAllGrids();
}
}
/** @throws Exception if failed. */
@Test
public void testCheckpointMetrics() throws Exception {
Pattern cpPtrn = Pattern.compile("^Checkpoint started .*" +
"checkpointBeforeLockTime=(\\d+)ms, " +
"checkpointLockWait=(\\d+)ms, " +
"checkpointListenersExecuteTime=(\\d+)ms, " +
"checkpointLockHoldTime=(\\d+)ms, " +
"walCpRecordFsyncDuration=(\\d+)ms, " +
"writeCheckpointEntryDuration=(\\d+)ms, " +
"splitAndSortCpPagesDuration=(\\d+)ms");
AtomicLong expLastCpBeforeLockDuration = new AtomicLong();
AtomicLong expLastCpLockWaitDuration = new AtomicLong();
AtomicLong expLastCpListenersExecuteDuration = new AtomicLong();
AtomicLong expLastCpLockHoldDuration = new AtomicLong();
AtomicLong expLastCpWalRecordFsyncDuration = new AtomicLong();
AtomicLong expLastCpWriteEntryDuration = new AtomicLong();
AtomicLong expLastCpSplitAndSortPagesDuration = new AtomicLong();
AtomicInteger cpCnt = new AtomicInteger();
listeningLog.registerListener(s -> {
Matcher matcher = cpPtrn.matcher(s);
if (!matcher.find())
return;
expLastCpBeforeLockDuration.set(Long.parseLong(matcher.group(1)));
expLastCpLockWaitDuration.set(Long.parseLong(matcher.group(2)));
expLastCpListenersExecuteDuration.set(Long.parseLong(matcher.group(3)));
expLastCpLockHoldDuration.set(Long.parseLong(matcher.group(4)));
expLastCpWalRecordFsyncDuration.set(Long.parseLong(matcher.group(5)));
expLastCpWriteEntryDuration.set(Long.parseLong(matcher.group(6)));
expLastCpSplitAndSortPagesDuration.set(Long.parseLong(matcher.group(7)));
cpCnt.incrementAndGet();
});
IgniteEx node = startGrid(0);
node.cluster().state(ACTIVE);
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)node.context().cache().context().database();
db.checkpointReadLock();
try {
waitForCondition(() -> cpCnt.get() > 0, getTestTimeout());
MetricRegistry mreg = node.context().metric().registry(DATASTORAGE_METRIC_PREFIX);
AtomicLongMetric lastCpBeforeLockDuration = mreg.findMetric("LastCheckpointBeforeLockDuration");
AtomicLongMetric lastCpLockWaitDuration = mreg.findMetric("LastCheckpointLockWaitDuration");
AtomicLongMetric lastCpListenersExecuteDuration = mreg.findMetric("LastCheckpointListenersExecuteDuration");
AtomicLongMetric lastCpLockHoldDuration = mreg.findMetric("LastCheckpointLockHoldDuration");
AtomicLongMetric lastCpWalRecordFsyncDuration = mreg.findMetric("LastCheckpointWalRecordFsyncDuration");
AtomicLongMetric lastCpWriteEntryDuration = mreg.findMetric("LastCheckpointWriteEntryDuration");
AtomicLongMetric lastCpSplitAndSortPagesDuration =
mreg.findMetric("LastCheckpointSplitAndSortPagesDuration");
HistogramMetric cpBeforeLockHistogram = mreg.findMetric("CheckpointBeforeLockHistogram");
HistogramMetric cpLockWaitHistogram = mreg.findMetric("CheckpointLockWaitHistogram");
HistogramMetric cpListenersExecuteHistogram = mreg.findMetric("CheckpointListenersExecuteHistogram");
HistogramMetric cpMarkHistogram = mreg.findMetric("CheckpointMarkHistogram");
HistogramMetric cpLockHoldHistogram = mreg.findMetric("CheckpointLockHoldHistogram");
HistogramMetric cpPagesWriteHistogram = mreg.findMetric("CheckpointPagesWriteHistogram");
HistogramMetric cpFsyncHistogram = mreg.findMetric("CheckpointFsyncHistogram");
HistogramMetric cpWalRecordFsyncHistogram = mreg.findMetric("CheckpointWalRecordFsyncHistogram");
HistogramMetric cpWriteEntryHistogram = mreg.findMetric("CheckpointWriteEntryHistogram");
HistogramMetric cpSplitAndSortPagesHistogram = mreg.findMetric("CheckpointSplitAndSortPagesHistogram");
HistogramMetric cpHistogram = mreg.findMetric("CheckpointHistogram");
waitForCondition(() -> cpCnt.get() == Arrays.stream(cpHistogram.value()).sum(), getTestTimeout());
assertEquals(cpCnt.get(), Arrays.stream(cpBeforeLockHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpLockWaitHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpListenersExecuteHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpMarkHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpLockHoldHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpPagesWriteHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpFsyncHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpWalRecordFsyncHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpWriteEntryHistogram.value()).sum());
assertEquals(cpCnt.get(), Arrays.stream(cpSplitAndSortPagesHistogram.value()).sum());
assertEquals(expLastCpBeforeLockDuration.get(), lastCpBeforeLockDuration.value());
assertEquals(expLastCpLockWaitDuration.get(), lastCpLockWaitDuration.value());
assertEquals(expLastCpListenersExecuteDuration.get(), lastCpListenersExecuteDuration.value());
assertEquals(expLastCpLockHoldDuration.get(), lastCpLockHoldDuration.value());
assertEquals(expLastCpWalRecordFsyncDuration.get(), lastCpWalRecordFsyncDuration.value());
assertEquals(expLastCpWriteEntryDuration.get(), lastCpWriteEntryDuration.value());
assertEquals(expLastCpSplitAndSortPagesDuration.get(), lastCpSplitAndSortPagesDuration.value());
}
finally {
db.checkpointReadUnlock();
}
}
/**
* Checking that the metrics of the total logged bytes are working correctly.
*
* @throws Exception If failed.
*/
@Test
public void testWalWrittenBytes() throws Exception {
IgniteEx n = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
cfg.getDataStorageConfiguration().setWalSegmentSize((int)(2 * U.MB));
return cfg;
});
n.cluster().state(ACTIVE);
awaitPartitionMapExchange();
for (int i = 0; i < 10; i++)
n.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]);
WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext();
assertNotNull(walDisableCtx);
setFieldValue(walDisableCtx, "disableWal", true);
assertTrue(walDisableCtx.check());
assertNull(walMgr(n).log(new DataRecord(emptyList())));
assertEquals(-1, walMgr(n).lastArchivedSegment());
long exp = walMgr(n).lastWritePointer().fileOffset() - HEADER_RECORD_SIZE;
assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalWrittenBytes());
assertEquals(exp, dsMetricsMXBean(n).getWalWrittenBytes());
assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalWrittenBytes")).value());
}
/**
* Checking that the metrics of the total size compressed segment are working correctly.
*
* @throws Exception If failed.
*/
@Test
public void testWalCompressedBytes() throws Exception {
IgniteEx n0 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
cfg.getDataStorageConfiguration().setWalCompactionEnabled(true).setWalSegmentSize((int)(2 * U.MB));
return cfg;
});
n0.cluster().state(ACTIVE);
awaitPartitionMapExchange();
while (walMgr(n0).lastArchivedSegment() < 3)
n0.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]);
waitForCondition(
() -> walMgr(n0).lastArchivedSegment() == walMgr(n0).lastCompactedSegment(),
getTestTimeout()
);
assertCorrectWalCompressedBytesMetrics(n0);
stopAllGrids();
IgniteEx n1 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> {
cfg.getDataStorageConfiguration().setWalCompactionEnabled(true);
return cfg;
});
n1.cluster().state(ACTIVE);
awaitPartitionMapExchange();
assertCorrectWalCompressedBytesMetrics(n1);
}
/**
*
*/
static class Person implements Serializable {
/** */
@GridToStringInclude
@QuerySqlField(index = true, groups = "full_name")
private String fName;
/** */
@GridToStringInclude
@QuerySqlField(index = true, groups = "full_name")
private String lName;
/**
* @param fName First name.
* @param lName Last name.
*/
public Person(String fName, String lName) {
this.fName = fName;
this.lName = lName;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Person.class, this);
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Person person = (Person)o;
return Objects.equals(fName, person.fName) &&
Objects.equals(lName, person.lName);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(fName, lName);
}
}
/**
* Getting DATASTORAGE_METRIC_PREFIX metric registry.
*
* @param n Node.
* @return Group of metrics.
*/
private MetricRegistry dsMetricRegistry(IgniteEx n) {
return n.context().metric().registry(DATASTORAGE_METRIC_PREFIX);
}
/**
* Getting data storage MXBean.
*
* @param n Node.
* @return MXBean.
*/
private DataStorageMetricsMXBean dsMetricsMXBean(IgniteEx n) {
return getMxBean(n.name(), "Persistent Store", "DataStorageMetrics", DataStorageMetricsMXBean.class);
}
/**
* Check that the metric of the total size compressed segment is working correctly.
*
* @param n Node.
*/
private void assertCorrectWalCompressedBytesMetrics(IgniteEx n) {
long exp = Arrays.stream(walMgr(n).walArchiveFiles()).filter(FileDescriptor::isCompressed)
.mapToLong(fd -> fd.file().length()).sum();
assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalCompressedBytes());
assertEquals(exp, dsMetricsMXBean(n).getWalCompressedBytes());
assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalCompressedBytes")).value());
}
}