blob: 2d853ed64aec2eb67e753af3ef68061a72e1364a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.cache.Cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.commandline.cache.CacheSubcommands;
import org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeIssue;
import org.apache.ignite.internal.visor.verify.ValidateIndexesCheckSizeResult;
import org.apache.ignite.internal.visor.verify.VisorValidateIndexesTaskResult;
import org.apache.ignite.util.GridCommandHandlerIndexingUtils.Organization;
import org.apache.ignite.util.GridCommandHandlerIndexingUtils.Person;
import org.junit.Test;
import static java.lang.String.valueOf;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
import static org.apache.ignite.internal.commandline.CommandList.CACHE;
import static org.apache.ignite.internal.commandline.cache.CacheSubcommands.VALIDATE_INDEXES;
import static org.apache.ignite.internal.commandline.cache.argument.ValidateIndexesCommandArg.CHECK_SIZES;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertNotContains;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.CACHE_NAME;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.GROUP_NAME;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakCacheDataTree;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.breakSqlIndex;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.createAndFillCache;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.organizationEntity;
import static org.apache.ignite.util.GridCommandHandlerIndexingUtils.personEntity;
* Class for testing function of checking size of index and cache in
* {@link CacheSubcommands#VALIDATE_INDEXES}.
public class GridCommandHandlerIndexingCheckSizeTest extends GridCommandHandlerClusterByClassAbstractTest {
/** Entry count for entity. */
private static final int ENTRY_CNT = 100;
/** Non persistent data region name. */
private static final String NON_PERSIST_REGION = "non-persist";
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
createAndFillCache(client, CACHE_NAME, GROUP_NAME, null, queryEntities(), ENTRY_CNT);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
new DataRegionConfiguration().setName(NON_PERSIST_REGION)
return cfg;
* Test checks that an error will be displayed checking cache size and
* index when cache is broken.
public void testCheckCacheSizeWhenBrokenCache() {
validateCheckSizesAfterBreakCacheDataTree(crd, CACHE_NAME, ENTRY_CNT);
* Test checks that cache size and index validation error will not be
* displayed if cache is broken, because argument
* {@link ValidateIndexesCommandArg#CHECK_SIZES} not used.
public void testNoCheckCacheSizeWhenBrokenCache() {
String cacheName = CACHE_NAME;
breakCacheDataTree(log, crd.cachex(cacheName), 1, null);
* Test checks that an error will be displayed checking cache size and
* index when index is broken.
* @throws Exception If failed.
public void testCheckCacheSizeWhenBrokenIdx() throws Exception {
validateCheckSizesAfterBreakSqlIndex(crd, CACHE_NAME, ENTRY_CNT);
* Test checks that cache size and index validation error will not be
* displayed if index is broken, because argument
* {@link ValidateIndexesCommandArg#CHECK_SIZES} not used.
* @throws Exception If failed.
public void testNoCheckCacheSizeWhenBrokenIdx() throws Exception {
String cacheName = CACHE_NAME;
breakSqlIndex(crd.cachex(cacheName), 1, null);
* Test that checks that there will be no errors when executing command
* "validate_indexes" with/without "--check-sizes" on the cache without
* {@link QueryEntity}.
public void testNoErrorOnCacheWithoutQueryEntity() {
String cacheName = DEFAULT_CACHE_NAME;
createAndFillCache(crd, cacheName, GROUP_NAME, null, emptyMap(), 0);
try (IgniteDataStreamer<Object, Object> streamer = crd.dataStreamer(cacheName)) {
for (int i = 0; i < ENTRY_CNT; i++)
streamer.addData(i, new Person(i, "p_" + i));
execVIWithNoErrCheck(cacheName, false);
execVIWithNoErrCheck(cacheName, true);
* Test that checks that there will be no errors when executing command
* "validate_indexes" with/without "--check-sizes" on the empty cache
* with {@link QueryEntity}.
public void testNoErrorOnEmptyCacheWithQueryEntity() {
String cacheName = DEFAULT_CACHE_NAME;
createAndFillCache(crd, cacheName, GROUP_NAME, null, queryEntities(), 0);
execVIWithNoErrCheck(cacheName, false);
execVIWithNoErrCheck(cacheName, true);
* Test checks that there will be no errors if there are entries without
* {@link QueryEntity} in cache.
public void testNoErrorOnCacheWithEntryWithoutQueryEntity() {
String cacheName = CACHE_NAME;
int cacheSize = crd.cachex(cacheName).size();
try (IgniteDataStreamer<Object, Object> streamer = crd.dataStreamer(cacheName)) {
for (int i = cacheSize; i < cacheSize + ENTRY_CNT; i++)
streamer.addData(i, i);
execVIWithNoErrCheck(cacheName, false);
execVIWithNoErrCheck(cacheName, true);
* Test checks that there will be no errors if there are entries without
* {@link QueryEntity} in cache, and also if there are null values.
public void testNoErrorOnCacheWithEntryWithoutQueryEntityAndWithNullValues() {
String cacheName = CACHE_NAME;
int cacheSize = crd.cachex(cacheName).size();
try (IgniteDataStreamer<Object, Object> streamer = crd.dataStreamer(cacheName)) {
int i = cacheSize;
for (; i < cacheSize + (ENTRY_CNT - 10); i++)
streamer.addData(i, i);
for (; i < cacheSize + ENTRY_CNT; i++)
streamer.addData(i, null);
execVIWithNoErrCheck(cacheName, false);
execVIWithNoErrCheck(cacheName, true);
* Test checks that an error will be displayed checking cache size and
* index when cache is broken. In {@link #NON_PERSIST_REGION} region.
public void testCheckCacheSizeWhenBrokenCacheInNonPersistRegion() {
IgniteEx node = crd;
String cacheName = CACHE_NAME + "_new";
createAndFillCache(node, cacheName, GROUP_NAME + "_new", NON_PERSIST_REGION, queryEntities(), ENTRY_CNT);
validateCheckSizesAfterBreakCacheDataTree(node, cacheName, ENTRY_CNT);
* Test checks that an error will be displayed checking cache size and
* index when index is broken. In {@link #NON_PERSIST_REGION} region.
* @throws Exception If failed.
public void testCheckCacheSizeWhenBrokenIdxInNonPersistRegion() throws Exception {
IgniteEx node = crd;
String cacheName = CACHE_NAME + "_new";
createAndFillCache(node, cacheName, GROUP_NAME + "_new", NON_PERSIST_REGION, queryEntities(), ENTRY_CNT);
validateCheckSizesAfterBreakSqlIndex(node, cacheName, ENTRY_CNT);
* Test checks that an error will be displayed checking cache size and
* index when cache is broken. In case of dynamic add a column and index.
public void testCheckCacheSizeWhenBrokenCacheWithDynamicAddColumnAndIndex() {
IgniteEx node = crd;
String cacheName = CACHE_NAME;
int addCnt = ENTRY_CNT;
addColumnAndIdx(node, cacheName, addCnt);
validateCheckSizesAfterBreakCacheDataTree(node, cacheName, ENTRY_CNT + addCnt);
* Test checks that an error will be displayed checking cache size and
* index when index is broken. In case of dynamic add a column and index.
* @throws Exception If failed.
public void testCheckCacheSizeWhenBrokenIdxWithDynamicAddColumnAndIndex() throws Exception {
IgniteEx node = crd;
String cacheName = CACHE_NAME;
int addCnt = ENTRY_CNT;
addColumnAndIdx(node, cacheName, addCnt);
validateCheckSizesAfterBreakSqlIndex(node, cacheName, ENTRY_CNT + addCnt);
* Test that checks that there will be no errors when executing command
* "validate_indexes" with/without "--check-sizes" on cache with
* {@link QueryEntity}.
public void testNoErrorOnCacheWithQueryEntity() {
String cacheName = CACHE_NAME;
execVIWithNoErrCheck(cacheName, false);
execVIWithNoErrCheck(cacheName, true);
* Adding the "address" column and index for {@link Person} and
* {@link Organization}, with new entries added for each of them.
* @param node Node.
* @param cacheName Cache name.
* @param addCnt How many entries add to table.
* */
private void addColumnAndIdx(IgniteEx node, String cacheName, int addCnt) {
IgniteCache<Object, Object> cache = node.cache(cacheName);
cache.query(new SqlFieldsQuery("alter table Person add column orgAddr varchar")).getAll();
cache.query(new SqlFieldsQuery("alter table Organization add column addr varchar")).getAll();
cache.query(new SqlFieldsQuery("create index p_o_addr on Person (orgAddr)")).getAll();
cache.query(new SqlFieldsQuery("create index o_addr on Organization (addr)")).getAll();
int key = node.cachex(cacheName).size();
try (IgniteDataStreamer<Object, Object> streamer = node.dataStreamer(cacheName)) {
ThreadLocalRandom rand = ThreadLocalRandom.current();
for (int i = 0; i < addCnt; i++) {
new Person(rand.nextInt(), valueOf(rand.nextLong())).orgAddr(valueOf(rand.nextLong()))
new Organization(rand.nextInt(), valueOf(rand.nextLong())).addr(valueOf(rand.nextLong()))
* Validation of cache and index sizes after breaking sql index.
* @param node Node.
* @param cacheName Cache name.
* @param entryCnt Entry count for table.
private void validateCheckSizesAfterBreakSqlIndex(IgniteEx node, String cacheName, int entryCnt) throws Exception {
Map<String, AtomicInteger> rmvIdxByTbl = new HashMap<>();
breakSqlIndex(node.cachex(cacheName), 0, row -> {
rmvIdxByTbl.computeIfAbsent(tableName(cacheName, row), s -> new AtomicInteger()).incrementAndGet();
return true;
assertEquals(rmvIdxByTbl.size(), queryEntities().size());
validateCheckSizes(node, cacheName, rmvIdxByTbl, ai -> entryCnt, ai -> entryCnt - ai.get());
* Validation of cache and index sizes after breaking CacheDataTree.
* @param node Node.
* @param cacheName Cache name.
* @param entryCnt Entry count for table.
private void validateCheckSizesAfterBreakCacheDataTree(IgniteEx node, String cacheName, int entryCnt) {
Map<String, AtomicInteger> rmvEntryByTbl = new HashMap<>();
breakCacheDataTree(log, node.cachex(cacheName), 1, (i, entry) -> {
rmvEntryByTbl.computeIfAbsent(tableName(cacheName, entry), s -> new AtomicInteger()).incrementAndGet();
return true;
assertEquals(rmvEntryByTbl.size(), queryEntities().size());
validateCheckSizes(node, cacheName, rmvEntryByTbl, ai -> entryCnt - ai.get(), ai -> entryCnt);
* Creating {@link QueryEntity}'s with filling functions.
* @return {@link QueryEntity}'s with filling functions.
private Map<QueryEntity, Function<Random, Object>> queryEntities() {
Map<QueryEntity, Function<Random, Object>> qryEntities = new HashMap<>();
qryEntities.put(personEntity(), rand -> new Person(rand.nextInt(), valueOf(rand.nextLong())));
qryEntities.put(organizationEntity(), rand -> new Organization(rand.nextInt(), valueOf(rand.nextLong())));
return qryEntities;
* Executing "validate_indexes" command with verify that there are
* no errors in result.
* @param cacheName Cache name.
* @param checkSizes Add argument "--check-sizes".
private void execVIWithNoErrCheck(String cacheName, boolean checkSizes) {
List<String> cmdWithArgs = new ArrayList<>(asList(CACHE.text(), VALIDATE_INDEXES.text(), cacheName));
if (checkSizes)
assertEquals(EXIT_CODE_OK, execute(cmdWithArgs));
String out = testOut.toString();
assertNotContains(log, out, "issues found (listed above)");
assertNotContains(log, out, "Size check");
* Check that if data is broken and option
* {@link ValidateIndexesCommandArg#CHECK_SIZES} is enabled, size check
* will not take place.
* @param cacheName Cache size.
private void checkNoCheckSizeInCaseBrokenData(String cacheName) {
execute(CACHE.text(), VALIDATE_INDEXES.text(), cacheName)
String out = testOut.toString();
assertContains(log, out, "issues found (listed above)");
assertNotContains(log, out, "Size check");
* Checking whether cache and index size check is correct.
* @param node Node.
* @param cacheName Cache name.
* @param rmvByTbl Number of deleted items per table.
* @param cacheSizeExp Function for getting expected cache size.
* @param idxSizeExp Function for getting expected index size.
private void validateCheckSizes(
IgniteEx node,
String cacheName,
Map<String, AtomicInteger> rmvByTbl,
Function<AtomicInteger, Integer> cacheSizeExp,
Function<AtomicInteger, Integer> idxSizeExp
) {
assertEquals(EXIT_CODE_OK, execute(CACHE.text(), VALIDATE_INDEXES.text(), cacheName, CHECK_SIZES.argName()));
String out = testOut.toString();
assertContains(log, out, "issues found (listed above)");
assertContains(log, out, "Size check");
Map<String, ValidateIndexesCheckSizeResult> valIdxCheckSizeResults =
assertEquals(rmvByTbl.size(), valIdxCheckSizeResults.size());
for (Map.Entry<String, AtomicInteger> rmvByTblEntry : rmvByTbl.entrySet()) {
ValidateIndexesCheckSizeResult checkSizeRes = valIdxCheckSizeResults.entrySet().stream()
.filter(e -> e.getKey().contains(rmvByTblEntry.getKey()))
assertEquals((int)cacheSizeExp.apply(rmvByTblEntry.getValue()), checkSizeRes.cacheSize());
Collection<ValidateIndexesCheckSizeIssue> issues = checkSizeRes.issues();
issues.forEach(issue -> {
assertEquals((int)idxSizeExp.apply(rmvByTblEntry.getValue()), issue.indexSize());
Throwable err = issue.error();
assertEquals("Cache and index size not same.", err.getMessage());
* Get table name for cache row.
* @param cacheName Cache name.
* @param cacheDataRow Cache row.
private String tableName(String cacheName, CacheDataRow cacheDataRow) {
try {
return crd.context().query().typeByValue(
catch (IgniteCheckedException e) {
throw new IgniteException(e);
* Get table name for cache entry.
* @param cacheName Cache name.
* @param cacheEntry Cache entry.
private <K, V> String tableName(String cacheName, Cache.Entry<K, V> cacheEntry) {
try {
return crd.context().query().typeByValue(
catch (IgniteCheckedException e) {
throw new IgniteException(e);