blob: 1692f4ce1c0b0a3dfbac5e3c187e5c1c8425788d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.commandline.indexreader;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Handler;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.commandline.ProgressPrinter;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.processors.cache.persistence.IndexStorageImpl;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.GridStringBuilder;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.util.GridCommandHandlerAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;
import static java.util.regex.Pattern.compile;
import static java.util.stream.Collectors.joining;
import static org.apache.ignite.internal.commandline.indexreader.IgniteIndexReader.ERROR_PREFIX;
import static org.apache.ignite.internal.commandline.indexreader.IgniteIndexReader.HORIZONTAL_SCAN_NAME;
import static org.apache.ignite.internal.commandline.indexreader.IgniteIndexReader.META_TREE_NAME;
import static org.apache.ignite.internal.commandline.indexreader.IgniteIndexReader.RECURSIVE_TRAVERSE_NAME;
import static org.apache.ignite.internal.commandline.indexreader.IgniteIndexReader.normalizePageId;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageIndex;
import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_GRP_DIR_PREFIX;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
/**
* Class for testing {@link IgniteIndexReader}.
*/
public class IgniteIndexReaderTest extends GridCommandHandlerAbstractTest {
/** Page size. */
protected static final int PAGE_SIZE = 4096;
/** Partitions count. */
protected static final int PART_CNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
/** Version of file page stores. */
private static final int PAGE_STORE_VER = 2;
/** Cache group name. */
protected static final String CACHE_GROUP_NAME = "defaultGroup";
/** Cache group without indexes. */
private static final String EMPTY_CACHE_NAME = "empty";
/** Cache group without indexes. */
private static final String EMPTY_CACHE_GROUP_NAME = "emptyGroup";
/** Cache with static query configuration. */
private static final String QUERY_CACHE_NAME = "query";
/** Cache group with static query configuration. */
private static final String QUERY_CACHE_GROUP_NAME = "queryGroup";
/** Count of tables that will be created for test. */
private static final int CREATED_TABLES_CNT = 3;
/** Line delimiter. */
private static final String LINE_DELIM = System.lineSeparator();
/** Common part of regexp for single index output validation. */
private static final String CHECK_IDX_PTRN_COMMON =
"<PREFIX>Index tree: I \\[idxName=[\\-_0-9]{1,20}_%s##H2Tree.0, pageId=[0-9a-f]{16}\\]" +
LINE_DELIM + "<PREFIX>---- Page stat:" +
LINE_DELIM + "<PREFIX>Type.*Pages.*Free space.*" +
LINE_DELIM + "<PREFIX>([0-9a-zA-Z]{1,50}.*[0-9]{1,5}" +
LINE_DELIM + "<PREFIX>){%s,1000}---- Count of items found in leaf pages: %s(" +
LINE_DELIM + "<PREFIX>---- Inline usage statistics \\[inlineSize=[0-9]{1,3} bytes\\]" +
LINE_DELIM + "<PREFIX>.*Used bytes.*Entries count(" +
LINE_DELIM + "<PREFIX>.*[0-9]{1,10}){0,64})?" +
LINE_DELIM;
/** Regexp to validate output of correct index. */
private static final String CHECK_IDX_PTRN_CORRECT =
CHECK_IDX_PTRN_COMMON + "<PREFIX>No errors occurred while traversing.";
/** Regexp to validate output of corrupted index. */
private static final String CHECK_IDX_PTRN_WITH_ERRORS =
"<PREFIX>" + ERROR_PREFIX + "---- Errors:" +
LINE_DELIM + "<PREFIX>" + ERROR_PREFIX + "Page id: [0-9]{1,30}, exceptions: Failed to read page.*";
/** Work directory, containing cache group directories. */
private static File workDir;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
cleanPersistenceDir();
prepareWorkDir();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
injectTestSystemOut();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName).setDataStorageConfiguration(
new DataStorageConfiguration()
.setPageSize(PAGE_SIZE)
.setWalSegmentSize(4 * 1024 * 1024)
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setInitialSize(20 * 1024L * 1024L)
.setMaxSize(64 * 1024L * 1024L)
)
).setCacheConfiguration(
new CacheConfiguration<>(DEFAULT_CACHE_NAME)
.setGroupName(CACHE_GROUP_NAME)
.setSqlSchema(QueryUtils.DFLT_SCHEMA),
new CacheConfiguration<>(EMPTY_CACHE_NAME)
.setGroupName(EMPTY_CACHE_GROUP_NAME),
new CacheConfiguration<>(QUERY_CACHE_NAME)
.setGroupName(QUERY_CACHE_GROUP_NAME)
.setQueryEntities(asList(
new QueryEntity(Integer.class, TestClass1.class)
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("f", Integer.class.getName(), null)
.addQueryField("s", String.class.getName(), null)
.setIndexes(singleton(new QueryIndex("f")))
.setTableName("QT1"),
new QueryEntity(Integer.class, TestClass2.class)
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("f", Integer.class.getName(), null)
.addQueryField("s", String.class.getName(), null)
.setIndexes(singleton(new QueryIndex("s")))
.setTableName("QT2")
))
);
}
/**
* Creating and filling {@link #workDir}.
*
* @throws Exception If fails.
*/
protected void prepareWorkDir() throws Exception {
try (IgniteEx node = startGrid(0)) {
populateData(node, null);
workDir = ((FilePageStoreManager)node.context().cache().context().pageStore()).workDir();
}
}
/**
* Filling node with data.
*
* @param node Node.
* @param insert {@code True} for inserting some data and creating tables and indexes,
* {@code false} for inserting, updating and deleting data and deleting indexes, {@code null} for all data processing.
* {@code True} if only insert data, {@code false} if delete from {@link #DEFAULT_CACHE_NAME} and {@code null} all at once.
* @throws Exception If fails.
*/
protected void populateData(IgniteEx node, @Nullable Boolean insert) throws Exception {
requireNonNull(node);
IgniteClusterEx cluster = node.cluster();
if (!cluster.active())
cluster.active(true);
IgniteCache<Integer, Object> qryCache = node.cache(QUERY_CACHE_NAME);
int s = isNull(insert) || insert ? 0 : 70;
int e = isNull(insert) || !insert ? 100 : 80;
for (int i = s; i < e; i++)
qryCache.put(i, new TestClass1(i, valueOf(i)));
s = isNull(insert) || insert ? 0 : 50;
e = isNull(insert) || !insert ? 70 : 60;
for (int i = s; i < e; i++)
qryCache.put(i, new TestClass2(i, valueOf(i)));
IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < CREATED_TABLES_CNT; i++)
createAndFillTable(cache, TableInfo.generate(i), insert);
forceCheckpoint(node);
}
/**
* Get data directory for cache group.
*
* @param cacheGrpName Cache group name.
* @return Directory name.
*/
protected String dataDir(String cacheGrpName) {
return CACHE_GRP_DIR_PREFIX + cacheGrpName;
}
/**
* @param workDir Working directory.
* @param cacheGrp Cache group.
* @return Tuple that consists of some inner page id of any index tree, and some link to data.
* @throws IgniteCheckedException If failed.
*/
private IgniteBiTuple<Long, Long> findPagesForAnyCacheKey(File workDir, String cacheGrp) throws IgniteCheckedException {
File dir = new File(workDir, dataDir(cacheGrp));
// Take any inner page from tree.
AtomicLong anyLeafId = new AtomicLong();
Logger log = createTestLogger();
IgniteIndexReader reader0 = new IgniteIndexReader(PAGE_SIZE, PART_CNT, PAGE_STORE_VER, dir, null, false, log) {
@Override ScanContext createContext(
String idxName,
FilePageStore store,
ItemStorage items,
String prefix,
ProgressPrinter printer
) {
GridTuple3<Integer, Integer, String> parsed;
if (idxName != null)
parsed = cacheAndTypeId(idxName);
else
parsed = new GridTuple3<>(UNKNOWN_CACHE, 0, null);
return new ScanContext(parsed.get1(), idxName, inlineFieldsCount(parsed), store, items, log, prefix, printer) {
@Override public void onLeafPage(long pageId, List<Object> data) {
super.onLeafPage(pageId, data);
anyLeafId.set(pageId);
}
};
}
};
try (IgniteIndexReader reader = reader0) {
long[] partitionRoots = reader.partitionRoots(PageIdAllocator.META_PAGE_ID);
ItemsListStorage<IndexStorageImpl.IndexItem> idxItemStorage = new ItemsListStorage<>();
reader.recursiveTreeScan(partitionRoots[0], META_TREE_NAME, idxItemStorage, null);
// Take any index item.
IndexStorageImpl.IndexItem idxItem = idxItemStorage.iterator().next();
ItemsListStorage<CacheAwareLink> linkStorage = new ItemsListStorage<>();
reader.recursiveTreeScan(
normalizePageId(idxItem.pageId()),
idxItem.nameString(),
linkStorage,
null
);
// Take any link.
long link = linkStorage.store.get(0).link;
return new IgniteBiTuple<>(anyLeafId.get(), link);
}
}
/**
* Corrupts partition file.
*
* @param workDir Work directory.
* @param pageToCorrupt Page to corrupt.
* @throws Exception If failed.
*/
private void corruptFile(File workDir, long pageToCorrupt) throws Exception {
int partId = partId(pageToCorrupt);
int pageIdxCorrupt = pageIndex(pageToCorrupt);
String fileName = partId == INDEX_PARTITION ? INDEX_FILE_NAME : format(PART_FILE_TEMPLATE, partId);
File cacheWorkDir = new File(workDir, dataDir(CACHE_GROUP_NAME));
File file = new File(cacheWorkDir, fileName);
File backup = new File(cacheWorkDir, fileName + ".backup");
if (!backup.exists())
Files.copy(file.toPath(), backup.toPath());
ByteBuffer buf = GridUnsafe.allocateBuffer(PAGE_SIZE);
try (FileChannel c = new RandomAccessFile(file, "rw").getChannel()) {
byte[] trash = new byte[PAGE_SIZE];
ThreadLocalRandom.current().nextBytes(trash);
int pageIdx = realPageIdxInFile(c, pageIdxCorrupt);
buf.rewind();
buf.put(trash);
buf.rewind();
c.write(buf, pageIdx * PAGE_SIZE);
}
finally {
GridUnsafe.freeBuffer(buf);
}
}
/**
* @param c Opened file channel.
* @param pageIdxToFind Page index to find. May be different in snapshot files.
* @return Page index.
* @throws IOException If failed.
*/
protected int realPageIdxInFile(FileChannel c, int pageIdxToFind) throws IOException {
ByteBuffer buf = GridUnsafe.allocateBuffer(PAGE_SIZE);
long addr = GridUnsafe.bufferAddress(buf);
try {
int pageCnt = 0;
while (true) {
buf.rewind();
if (c.read(buf) == -1)
break;
buf.rewind();
long pageId = PageIO.getPageId(addr);
int pageIdx = pageIndex(pageId);
if (pageIdx == pageIdxToFind)
return pageCnt;
pageCnt++;
}
}
finally {
GridUnsafe.freeBuffer(buf);
}
return -1;
}
/**
* Restores corrupted file from backup after corruption if exists.
*
* @param workDir Work directory.
* @param partId Partition id.
* @throws IOException If failed.
*/
private void restoreFile(File workDir, int partId) throws IOException {
String fileName = partId == INDEX_PARTITION ? INDEX_FILE_NAME : format(PART_FILE_TEMPLATE, partId);
File cacheWorkDir = new File(workDir, dataDir(CACHE_GROUP_NAME));
File backupFiles = new File(cacheWorkDir, fileName + ".backup");
if (!backupFiles.exists())
return;
Path backupFilesPath = backupFiles.toPath();
Files.copy(backupFilesPath, new File(cacheWorkDir, fileName).toPath(), REPLACE_EXISTING);
Files.delete(backupFilesPath);
}
/**
* Generates fields for sql table.
*
* @param cnt Count of fields.
* @return List of pairs, first is field name, second is field type.
*/
private static List<IgnitePair<String>> fields(int cnt) {
List<IgnitePair<String>> res = new LinkedList<>();
for (int i = 0; i < cnt; i++)
res.add(new IgnitePair<>("f" + i, i % 2 == 0 ? "integer" : "varchar(100)"));
return res;
}
/**
* Generates indexes for given table and fields.
*
* @param tblName Table name.
* @param fields Fields list, returned by {@link #fields}.
* @return List of pairs, first is index name, second is list of fields, covered by index, divived by comma.
*/
private static List<IgnitePair<String>> idxs(String tblName, List<IgnitePair<String>> fields) {
List<IgnitePair<String>> res = fields.stream()
.map(f -> new IgnitePair<>(tblName + "_" + f.get1() + "_idx", f.get1()))
.collect(Collectors.toCollection(LinkedList::new));
// Add one multicolumn index.
if (fields.size() > 1) {
res.add(new IgnitePair<>(
tblName + "_" + fields.get(0).get1() + "_" + fields.get(1).get1() + "_idx",
fields.get(0).get1() + "," + fields.get(1).get1()
));
}
return res;
}
/**
* Creates an sql table, indexes and fill it with some data.
*
* @param cache Ignite cache.
* @param insert {@code True} for inserting some data and creating tables and indexes,
* {@code false} for inserting, updating and deleting data and deleting indexes, {@code null} for all data processing.
* @param info Table info.
*/
private static void createAndFillTable(IgniteCache<?, ?> cache, TableInfo info, @Nullable Boolean insert) {
String idxToDelName = info.tblName + "_idx_to_delete";
List<IgnitePair<String>> fields = fields(info.fieldsCnt);
List<IgnitePair<String>> idxs = idxs(info.tblName, fields);
String strFields = fields.stream().map(f -> f.get1() + " " + f.get2()).collect(joining(", "));
if (isNull(insert) || insert) {
query(
cache,
"create table " + info.tblName + " (id integer primary key, " + strFields + ") with " +
"\"CACHE_NAME=" + info.tblName + ", CACHE_GROUP=" + CACHE_GROUP_NAME + "\""
);
for (IgnitePair<String> idx : idxs)
query(cache, format("create index %s on %s (%s)", idx.get1(), info.tblName, idx.get2()));
query(cache, format("create index %s on %s (%s)", idxToDelName, info.tblName, fields.get(0).get1()));
}
int s = isNull(insert) || insert ? 0 : info.rec - 10;
int e = isNull(insert) || !insert ? info.rec : info.rec - 10;
for (int i = s; i < e; i++)
insertQuery(cache, info.tblName, fields, i);
if (nonNull(insert) && !insert) {
for (int i = s - 10; i < s; i++)
updateQuery(cache, info.tblName, fields, i);
}
if (isNull(insert) || !insert) {
for (int i = info.rec - info.del; i < info.rec; i++)
query(cache, "delete from " + info.tblName + " where id = " + i);
}
if (isNull(insert) || !insert)
query(cache, "drop index " + idxToDelName);
}
/**
* Performs an insert query.
*
* @param cache Ignite cache.
* @param tblName Table name.
* @param fields List of fields.
* @param cntr Counter which is used to generate data.
*/
private static void insertQuery(IgniteCache<?, ?> cache, String tblName, List<IgnitePair<String>> fields, int cntr) {
GridStringBuilder q = new GridStringBuilder().a("insert into ").a(tblName).a(" (id, ");
q.a(fields.stream().map(IgniteBiTuple::get1).collect(joining(", ")));
q.a(") values (");
q.a(fields.stream().map(f -> "?").collect(joining(", ", "?, ", ")")));
Object[] paramVals = new Object[fields.size() + 1];
for (int i = 0; i < fields.size() + 1; i++)
paramVals[i] = (i % 2 == 0) ? cntr : valueOf(cntr);
query(cache, q.toString(), paramVals);
}
/**
* Performs an update query.
*
* @param cache Ignite cache.
* @param tblName Table name.
* @param fields List of fields.
* @param cntr Counter which is used to generate data.
*/
private static void updateQuery(IgniteCache<?, ?> cache, String tblName, List<IgnitePair<String>> fields, int cntr) {
GridStringBuilder q = new GridStringBuilder().a("update ").a(tblName).a(" set ")
.a(fields.stream().map(IgniteBiTuple::get1).collect(joining("=?, ", "", "=?")))
.a(" where id=?");
Object[] paramVals = new Object[fields.size() + 1];
for (int i = 0; i < fields.size() + 1; i++)
paramVals[i] = (i % 2 == 0) ? cntr : valueOf(cntr);
Object id = paramVals[0];
paramVals[0] = paramVals[paramVals.length - 1];
paramVals[paramVals.length - 1] = id;
query(cache, q.toString(), paramVals);
}
/**
* Performs a query.
*
* @param cache Ignite cache.
* @param qry Query string.
* @return Result.
*/
private static List<List<?>> query(IgniteCache<?, ?> cache, String qry) {
return cache.query(new SqlFieldsQuery(qry)).getAll();
}
/**
* Performs a query.
*
* @param cache Ignite cache.
* @param qry Query string.
* @param args Query arguments.
* @return Result.
*/
private static List<List<?>> query(IgniteCache<?, ?> cache, String qry, Object... args) {
return cache.query(new SqlFieldsQuery(qry).setArgs(args)).getAll();
}
/**
* Checks the index reader output.
*
* @param output Output.
* @param treesCnt Count of b+ trees.
* @param travErrCnt Count of errors that can occur during traversal.
* @param pageListsErrCnt Count of errors that can occur during page lists scan.
* @param seqErrCnt Count of errors that can occur during sequential scan.
* @param partReadingErr partition file reading errors should be present.
* @param idxSizeConsistent Index size should be consistent
*/
private void checkOutput(
String output,
int treesCnt,
int travErrCnt,
int pageListsErrCnt,
int seqErrCnt,
boolean idxReadingErr,
boolean partReadingErr,
boolean idxSizeConsistent
) {
assertFound(output, RECURSIVE_TRAVERSE_NAME + "Total trees:.*" + treesCnt);
assertFound(output, HORIZONTAL_SCAN_NAME + "Total trees:.*" + treesCnt);
assertFound(output, RECURSIVE_TRAVERSE_NAME + "Total errors during trees traversal:.*" + travErrCnt);
assertFound(output, HORIZONTAL_SCAN_NAME + "Total errors during trees traversal:.*" + travErrCnt);
assertFound(output, "Total errors during lists scan:.*" + pageListsErrCnt);
if (idxSizeConsistent)
assertContains(log, output, "No index size consistency errors found.");
else
assertContains(log, output, "Index size inconsistency");
if (seqErrCnt >= 0)
assertFound(output, "Total errors occurred during sequential scan:.*" + seqErrCnt);
else
assertContains(log, output, "Orphan pages were not reported due to --indexes filter.");
if (travErrCnt == 0 && pageListsErrCnt == 0 && seqErrCnt == 0)
assertNotContains(log, output, ERROR_PREFIX);
if (idxReadingErr)
assertContains(log, output, ERROR_PREFIX + "Errors detected while reading index.bin");
if (partReadingErr)
assertContains(log, output, ERROR_PREFIX + "Errors detected while reading partition files:");
}
/**
* Checks output info for indexes.
*
* @param output Output string.
* @param info Table info, which is used to calculate index info.
* @param corruptedIdx Whether some index is corrupted.
*/
private void checkIdxs(String output, TableInfo info, boolean corruptedIdx) {
List<IgnitePair<String>> fields = fields(info.fieldsCnt);
List<IgnitePair<String>> idxs = idxs(info.tblName, fields);
int entriesCnt = info.rec - info.del;
idxs.stream().map(IgniteBiTuple::get1)
.forEach(idx -> {
checkIdx(output, RECURSIVE_TRAVERSE_NAME, idx.toUpperCase(), entriesCnt, corruptedIdx);
checkIdx(output, HORIZONTAL_SCAN_NAME, idx.toUpperCase(), entriesCnt, corruptedIdx);
});
}
/**
* Checks output info for single index.
*
* @param output Output string.
* @param traversePrefix Traverse prefix.
* @param idx Index name.
* @param entriesCnt Count of entries that should be present in index.
* @param canBeCorrupted Whether index can be corrupted.
*/
private void checkIdx(String output, String traversePrefix, String idx, int entriesCnt, boolean canBeCorrupted) {
Pattern ptrnCorrect = compile(checkIdxRegex(traversePrefix, false, idx, 1, valueOf(entriesCnt)));
Matcher mCorrect = ptrnCorrect.matcher(output);
if (canBeCorrupted) {
Pattern ptrnCorrupted = compile(checkIdxRegex(traversePrefix, true, idx, 0, "[0-9]{1,4}"));
Matcher mCorrupted = ptrnCorrupted.matcher(output);
assertTrue("could not find index " + idx + ":\n" + output, mCorrect.find() || mCorrupted.find());
}
else
assertTrue("could not find index " + idx + ":\n" + output, mCorrect.find());
}
/**
* Returns regexp string for index check.
*
* @param traversePrefix Traverse prefix.
* @param withErrors Whether errors should be present.
* @param idxName Index name.
* @param minimumPageStatSize Minimum size of page stats for index.
* @param itemsCnt Count of data entries.
* @return Regexp string.
*/
private String checkIdxRegex(
String traversePrefix,
boolean withErrors,
String idxName,
int minimumPageStatSize,
String itemsCnt
) {
return format(
withErrors ? CHECK_IDX_PTRN_WITH_ERRORS : CHECK_IDX_PTRN_CORRECT,
idxName,
minimumPageStatSize,
itemsCnt
).replace("<PREFIX>", traversePrefix);
}
/**
* Runs index reader on given cache group.
*
* @param workDir Work directory.
* @param cacheGrp Cache group name.
* @param idxs Indexes to check.
* @param checkParts Whether to check cache data tree in partitions..
* @return Index reader output.
* @throws IgniteCheckedException If failed.
*/
private String runIndexReader(
File workDir,
String cacheGrp,
@Nullable String[] idxs,
boolean checkParts
) throws IgniteCheckedException {
testOut.reset();
Logger logger = createTestLogger();
IgniteIndexReader reader0 = new IgniteIndexReader(
PAGE_SIZE,
PART_CNT,
PAGE_STORE_VER,
new File(workDir, dataDir(cacheGrp)),
isNull(idxs) ? null : idx -> Arrays.stream(idxs).anyMatch(idx::endsWith),
checkParts,
logger
) {
/** {@inheritDoc} */
@Override ProgressPrinter createProgressPrinter(String caption, long total) {
return new ProgressPrinter(System.err, caption, total);
}
};
try (IgniteIndexReader reader = reader0) {
reader.readIndex();
}
// Flush all Logger handlers to make log data available to test.
Arrays.stream(logger.getHandlers()).forEach(Handler::flush);
return testOut.toString();
}
/**
* Test checks correctness of index.
*
* Steps:
* 1)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME};
* 2)Check that there are no errors in output and 19 B+trees were found;
* 3)Check that all indexes are present in output.
*
* @throws IgniteCheckedException If failed.
*/
@Test
public void testCorrectIdx() throws IgniteCheckedException {
checkCorrectIdx(workDir);
}
/**
* Test checks correctness of index and consistency of partitions.
*
* Steps:
* 1)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME} with check partitions;
* 2)Check that there are no errors in output and 19 B+trees were found;
* 3)Check that all indexes are present in output.
*
* @throws IgniteCheckedException If failed.
*/
@Test
public void testCorrectIdxWithCheckParts() throws IgniteCheckedException {
checkCorrectIdxWithCheckParts(workDir);
}
/**
* Test verifies that specific indexes being checked are correct.
*
* Steps:
* 1)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME} with filter by indexes;
* 2)Check that there are no errors in output and 3 B+trees were found;
* 3)Check filtered indexes are present in output.
*
* @throws IgniteCheckedException If failed.
*/
@Test
public void testCorrectIdxWithFilter() throws IgniteCheckedException {
checkCorrectIdxWithFilter(workDir);
}
/**
* Test checks whether the index of an empty group is correct.
*
* Steps:
* 1)Run {@link IgniteIndexReader} for {@link #EMPTY_CACHE_GROUP_NAME};
* 2)Check that there are no errors in output and 1 B+tree were found;
* 4)Run {@link IgniteIndexReader} for a non-existent cache group;
* 3)Check that an exception is thrown.
*
* @throws IgniteCheckedException If failed.
*/
@Test
public void testEmpty() throws IgniteCheckedException {
checkEmpty(workDir);
}
/**
* Test for finding corrupted pages in index.
*
* Steps:
* 1)Currupt page in index.bin and backup it;
* 2)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME};
* 3)Check that found 19 B+trees, 2 errors in sequential and transversal analysis;
* 4)Check that all indexes are present in output;
* 5)Restore backup index.bin.
*
* @throws Exception If failed.
*/
@Test
public void testCorruptedIdx() throws Exception {
checkCorruptedIdx(Collections.singletonList(workDir));
}
/**
* Test for finding corrupted pages in index and checking for consistency in partitions.
*
* Steps:
* 1)Currupt pages not in reuseList of index.bin and backup it;
* 2)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME} with check partitions;
* 3)Check that there are errors when checking partitions and there are no errors in reuseList;
* 4)Restore backup index.bin.
*
* @throws Exception If failed.
*/
@Test
public void testCorruptedIdxWithCheckParts() throws Exception {
checkCorruptedIdxWithCheckParts(Collections.singletonList(workDir));
}
/**
* Test for finding corrupted page in partition.
*
* Steps:
* 1)Currupt page in part-0.bin and backup it;
* 2)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME};
* 2)Check that found 19 B+trees and errors when reading currupted page from partition;
* 3)Check that all indexes are present in output;
* 4)Restore backup part-0.bin.
*
* @throws Exception If failed.
*/
@Test
public void testCorruptedPart() throws Exception {
checkCorruptedPart(Collections.singletonList(workDir));
}
/**
* Test for finding corrupted pages in index and partition.
*
* Steps:
* 1)Currupt page in index.bin, part-0.bin and backup them;
* 2)Run {@link IgniteIndexReader} for {@link #CACHE_GROUP_NAME};
* 2)Check that found 19 B+trees and errors when reading currupted page from index and partition;
* 3)Check that all indexes are present in output;
* 4)Restore backup index.bin and part-0.bin.
*
* @throws Exception If failed.
*/
@Test
public void testCorruptedIdxAndPart() throws Exception {
checkCorruptedIdxAndPart(Collections.singletonList(workDir));
}
/**
* Test checks correctness of index of {@link #QUERY_CACHE_GROUP_NAME}.
*
* Steps:
* 1)Run {@link IgniteIndexReader} for {@link #QUERY_CACHE_GROUP_NAME};
* 2)Check that there are no errors in output and 5 B+trees were found.
*
* @throws IgniteCheckedException If failed.
*/
@Test
public void testQryCacheGroup() throws IgniteCheckedException {
checkQryCacheGroup(workDir);
}
/**
* Checks whether corrupted pages are found in index and partition.
* Index and partition {@code 0} will be corrupted for work directories,
* and first work directory will be used to start {@link IgniteIndexReader}.
*
* @param workDirs Work directories.
* @throws Exception If failed.
*/
protected void checkCorruptedIdxAndPart(List<File> workDirs) throws Exception {
A.ensure(nonNull(workDirs) && !workDirs.isEmpty(), "empty workDirs");
try {
for (File dir : workDirs) {
IgniteBiTuple<Long, Long> pagesToCorrupt = findPagesForAnyCacheKey(dir, CACHE_GROUP_NAME);
corruptFile(dir, pagesToCorrupt.get1());
corruptFile(dir, pagesToCorrupt.get2());
}
String output = runIndexReader(workDirs.get(0), CACHE_GROUP_NAME, null, false);
boolean idxReadingErr = isReportIdxAndPartFilesReadingErr();
boolean partReadingErr = isReportIdxAndPartFilesReadingErr();
checkOutput(output, 19, 25, 0, 1, idxReadingErr, partReadingErr, false);
for (int i = 0; i < CREATED_TABLES_CNT; i++)
checkIdxs(output, TableInfo.generate(i), true);
}
finally {
for (File dir : workDirs) {
restoreFile(dir, INDEX_PARTITION);
restoreFile(dir, 0);
}
}
}
/**
* Checks whether corrupted page are found in partition.
* Partition {@code 0} will be corrupted for work directories,
* and first work directory will be used to start {@link IgniteIndexReader}.
*
* @param workDirs Work directories.
* @throws Exception If failed.
*/
protected void checkCorruptedPart(List<File> workDirs) throws Exception {
A.ensure(nonNull(workDirs) && !workDirs.isEmpty(), "empty workDirs");
try {
for (File dir : workDirs) {
IgniteBiTuple<Long, Long> pagesToCorrupt = findPagesForAnyCacheKey(dir, CACHE_GROUP_NAME);
corruptFile(dir, pagesToCorrupt.get2());
}
String output = runIndexReader(workDirs.get(0), CACHE_GROUP_NAME, null, false);
boolean partReadingErr = isReportIdxAndPartFilesReadingErr();
checkOutput(output, 19, 23, 0, 0, false, partReadingErr, true);
for (int i = 0; i < CREATED_TABLES_CNT; i++)
checkIdxs(output, TableInfo.generate(i), true);
}
finally {
for (File dir : workDirs)
restoreFile(dir, 0);
}
}
/**
* Checking for corrupted pages in index and checking for consistency in partitions.
* Index will be corrupted for work directories, and first work directory will be used to start {@link IgniteIndexReader}.
*
* @param workDirs Work directories.
* @throws Exception If failed.
*/
protected void checkCorruptedIdxWithCheckParts(List<File> workDirs) throws Exception {
A.ensure(nonNull(workDirs) && !workDirs.isEmpty(), "empty workDirs");
try {
for (File dir : workDirs) {
IgniteBiTuple<Long, Long> pagesToCorrupt = findPagesForAnyCacheKey(dir, CACHE_GROUP_NAME);
corruptFile(dir, pagesToCorrupt.get1());
}
String output = runIndexReader(workDirs.get(0), CACHE_GROUP_NAME, null, true);
// Pattern with errors count > 9
assertFound(output, "Partition check finished, total errors: [0-9]{2,5}, total problem partitions: [0-9]{2,5}");
assertFound(output, "Total errors during lists scan:.*0");
}
finally {
for (File dir : workDirs)
restoreFile(dir, INDEX_PARTITION);
}
}
/** */
private void assertFound(String output, String pattern) {
assertTrue(output, compile(pattern).matcher(output).find());
}
/**
* Checks whether corrupted pages are found in index.
* Index will be corrupted for work directories, and first work directory will be used to start {@link IgniteIndexReader}.
*
* @param workDirs Work directories.
* @throws Exception If failed.
*/
protected void checkCorruptedIdx(List<File> workDirs) throws Exception {
A.ensure(nonNull(workDirs) && !workDirs.isEmpty(), "empty workDirs");
try {
for (File dir : workDirs) {
IgniteBiTuple<Long, Long> pagesToCorrupt = findPagesForAnyCacheKey(dir, CACHE_GROUP_NAME);
corruptFile(dir, pagesToCorrupt.get1());
}
String output = runIndexReader(workDirs.get(0), CACHE_GROUP_NAME, null, false);
// 1 corrupted page detected while traversing, and 1 index size inconsistency error.
int travErrCnt = 2;
int seqErrCnt = 1;
boolean idxReadingErr = isReportIdxAndPartFilesReadingErr();
checkOutput(output, 19, travErrCnt, 0, seqErrCnt, idxReadingErr, false, false);
for (int i = 0; i < CREATED_TABLES_CNT; i++)
checkIdxs(output, TableInfo.generate(i), true);
}
finally {
for (File dir : workDirs)
restoreFile(dir, INDEX_PARTITION);
}
}
/**
* Checking correctness of index for {@link #QUERY_CACHE_GROUP_NAME}.
*
* @param workDir Work directory.
* @throws IgniteCheckedException If failed.
*/
protected void checkQryCacheGroup(File workDir) throws IgniteCheckedException {
String output = runIndexReader(workDir, QUERY_CACHE_GROUP_NAME, null, false);
checkOutput(output, 5, 0, 0, 0, false, false, true);
}
/**
* Checking correctness of index.
*
* @param workDir Work directory.
* @throws IgniteCheckedException If failed.
*/
protected void checkCorrectIdx(File workDir) throws IgniteCheckedException {
String output = runIndexReader(workDir, CACHE_GROUP_NAME, null, false);
checkOutput(output, 19, 0, 0, 0, false, false, true);
for (int i = 0; i < CREATED_TABLES_CNT; i++)
checkIdxs(output, TableInfo.generate(i), false);
}
/**
* Checking correctness of index and consistency of partitions with it.
*
* @param workDir Work directory.
* @throws IgniteCheckedException If failed.
*/
protected void checkCorrectIdxWithCheckParts(File workDir) throws IgniteCheckedException {
String output = runIndexReader(workDir, CACHE_GROUP_NAME, null, true);
checkOutput(output, 19, 0, 0, 0, false, false, true);
for (int i = 0; i < CREATED_TABLES_CNT; i++)
checkIdxs(output, TableInfo.generate(i), false);
assertContains(log, output, "Partitions check detected no errors.");
assertContains(log, output, "Partition check finished, total errors: 0, total problem partitions: 0");
}
/**
* Checking that specific indexes being checked are correct.
*
* @param workDir Work directory.
* @throws IgniteCheckedException If failed.
*/
protected void checkCorrectIdxWithFilter(File workDir) throws IgniteCheckedException {
String[] idxsToCheck = {"T2_F1_IDX##H2Tree%0", "T2_F2_IDX##H2Tree%0"};
String output = runIndexReader(workDir, CACHE_GROUP_NAME, idxsToCheck, false);
checkOutput(output, 3, 0, 0, -1, false, false, true);
Set<String> idxSet = new HashSet<>(asList(idxsToCheck));
for (int i = 0; i < CREATED_TABLES_CNT; i++) {
TableInfo info = TableInfo.generate(i);
List<IgnitePair<String>> fields = fields(info.fieldsCnt);
List<IgnitePair<String>> idxs = idxs(info.tblName, fields);
int entriesCnt = info.rec - info.del;
idxs.stream().map(IgniteBiTuple::get1)
.filter(idxSet::contains)
.forEach(idx -> {
checkIdx(output, RECURSIVE_TRAVERSE_NAME, idx.toUpperCase(), entriesCnt, false);
checkIdx(output, HORIZONTAL_SCAN_NAME, idx.toUpperCase(), entriesCnt, false);
});
}
}
/**
* Validating index of an empty group.
*
* @param workDir Work directory.
* @throws IgniteCheckedException If failed.
*/
protected void checkEmpty(File workDir) throws IgniteCheckedException {
// Check output for empty cache group.
String output = runIndexReader(workDir, EMPTY_CACHE_GROUP_NAME, null, false);
checkOutput(output, 1, 0, 0, 0, false, false, true);
// Create an empty directory and try to check it.
String newCleanGrp = "noCache";
File cleanDir = new File(workDir, dataDir(newCleanGrp));
try {
cleanDir.mkdir();
assertThrows(
log,
() -> runIndexReader(workDir, newCleanGrp, null, false),
IgniteCheckedException.class,
null
);
}
finally {
U.delete(cleanDir);
}
}
/**
* @return Flag indicates partition file reading errors should be present in output.
*/
protected boolean isReportIdxAndPartFilesReadingErr() {
return false;
}
/**
*
*/
private static class TableInfo {
/** Table name. */
final String tblName;
/** Fields count. */
final int fieldsCnt;
/** Count of records that should be inserted. */
final int rec;
/** Count of records that should be deleted after insert.*/
final int del;
/** */
public TableInfo(String tblName, int fieldsCnt, int rec, int del) {
this.tblName = tblName;
this.fieldsCnt = fieldsCnt;
this.rec = rec;
this.del = del;
}
/**
* Generates some table info for given int.
* @param i Some integer.
* @return Table info.
*/
public static TableInfo generate(int i) {
return new TableInfo("T" + i, 3 + (i % 3), 1700 - (i % 3) * 500, (i % 3) * 250);
}
}
/**
*
*/
private static class TestClass1 {
/** */
private final Integer f;
/** */
private final String s;
/** */
public TestClass1(Integer f, String s) {
this.f = f;
this.s = s;
}
}
/**
*
*/
private static class TestClass2 extends TestClass1 {
/** */
public TestClass2(Integer f, String s) {
super(f, s);
}
}
}