blob: 38aa89918edbfd0ccd6976b740d6327920833ed7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.visor.verify;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cache.query.index.Index;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRowImpl;
import org.apache.ignite.internal.cache.query.index.sorted.inline.IndexQueryContext;
import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexImpl;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyMap;
import static java.util.Collections.newSetFromMap;
import static java.util.Collections.shuffle;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.GRID_NOT_IDLE_MSG;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.compareUpdateCounters;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.formatUpdateCountersDiff;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.getUpdateCountersSnapshot;
import static org.apache.ignite.internal.util.IgniteUtils.error;
/**
* Closure that locally validates indexes of given caches.
* Validation consists of four checks:
* 1. If entry is present in cache data tree, it's reachable from all cache SQL indexes
* 2. If entry is present in cache SQL index, it can be dereferenced with link from index
* 3. If entry is present in cache SQL index, it's present in cache data tree
* 4. If size of cache and index on same table are not same
*/
public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndexesJobResult> {
/** */
private static final long serialVersionUID = 0L;
/** Exception message throwing when closure was cancelled. */
public static final String CANCELLED_MSG = "Closure of index validation was cancelled.";
/** Ignite. */
@IgniteInstanceResource
private transient IgniteEx ignite;
/** Injected logger. */
@LoggerResource
private IgniteLogger log;
/** Cache names. */
private final Set<String> cacheNames;
/** If provided only first K elements will be validated. */
private final int checkFirst;
/** If provided only each Kth element will be validated. */
private final int checkThrough;
/** Check CRC. */
private final boolean checkCrc;
/** Check that index size and cache size are same. */
private final boolean checkSizes;
/** Counter of processed partitions. */
private final AtomicInteger processedPartitions = new AtomicInteger(0);
/** Total partitions. */
private volatile int totalPartitions;
/** Counter of processed indexes. */
private final AtomicInteger processedIndexes = new AtomicInteger(0);
/** Counter of integrity checked indexes. */
private final AtomicInteger integrityCheckedIndexes = new AtomicInteger(0);
/** Counter of calculated sizes of caches per partitions. */
private final AtomicInteger processedCacheSizePartitions = new AtomicInteger(0);
/** Counter of calculated index sizes. */
private final AtomicInteger processedIdxSizes = new AtomicInteger(0);
/** Total partitions. */
private volatile int totalIndexes;
/** Total cache groups. */
private volatile int totalCacheGrps;
/** Last progress print timestamp. */
private final AtomicLong lastProgressPrintTs = new AtomicLong(0);
/** Calculation executor. */
private volatile ExecutorService calcExecutor;
/** Group cache ids when calculating cache size was an error. */
private final Set<Integer> failCalcCacheSizeGrpIds = newSetFromMap(new ConcurrentHashMap<>());
/** Validate index context. */
private final ValidateIndexesContext validateCtx;
/**
* Constructor.
*
* @param validateCtx Context of validate index closure.
* @param cacheNames Cache names.
* @param checkFirst If positive only first K elements will be validated.
* @param checkThrough If positive only each Kth element will be validated.
* @param checkCrc Check CRC sum on stored pages on disk.
* @param checkSizes Check that index size and cache size are same.
*/
public ValidateIndexesClosure(
ValidateIndexesContext validateCtx,
Set<String> cacheNames,
int checkFirst,
int checkThrough,
boolean checkCrc,
boolean checkSizes
) {
this.validateCtx = validateCtx;
this.cacheNames = cacheNames;
this.checkFirst = checkFirst;
this.checkThrough = checkThrough;
this.checkCrc = checkCrc;
this.checkSizes = checkSizes;
}
/** {@inheritDoc} */
@Override public VisorValidateIndexesJobResult call() throws Exception {
calcExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
return call0();
}
finally {
calcExecutor.shutdown();
}
}
/**
* Detect groups for further analysis.
*
* @return Collection of group id.
*/
private Set<Integer> collectGroupIds() {
Set<Integer> grpIds = new HashSet<>();
Set<String> missingCaches = new HashSet<>();
if (cacheNames != null) {
for (String cacheName : cacheNames) {
DynamicCacheDescriptor desc = ignite.context().cache().cacheDescriptor(cacheName);
if (desc == null)
missingCaches.add(cacheName);
else
if (ignite.context().cache().cacheGroup(desc.groupId()).affinityNode())
grpIds.add(desc.groupId());
}
if (!missingCaches.isEmpty()) {
String errStr = "The following caches do not exist: " + String.join(", ", missingCaches);
throw new IgniteException(errStr);
}
}
else {
Collection<CacheGroupContext> groups = ignite.context().cache().cacheGroups();
for (CacheGroupContext grp : groups) {
if (!grp.systemCache() && !grp.isLocal() && grp.affinityNode())
grpIds.add(grp.groupId());
}
}
return grpIds;
}
/**
*
*/
private VisorValidateIndexesJobResult call0() {
if (validateCtx.isCancelled())
throw new IgniteException(CANCELLED_MSG);
Set<Integer> grpIds = collectGroupIds();
/** Update counters per partition per group. */
final Map<Integer, Map<Integer, PartitionUpdateCounter>> partsWithCntrsPerGrp =
getUpdateCountersSnapshot(ignite, grpIds);
IdleVerifyUtility.IdleChecker idleChecker = new IdleVerifyUtility.IdleChecker(ignite, partsWithCntrsPerGrp);
List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>();
List<T2<GridCacheContext, InlineIndexImpl>> idxArgs = new ArrayList<>();
totalCacheGrps = grpIds.size();
Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = integrityCheckIndexesPartitions(grpIds, idleChecker);
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
if (isNull(grpCtx) || integrityCheckResults.containsKey(grpId))
continue;
for (GridDhtLocalPartition part : grpCtx.topology().localPartitions())
partArgs.add(new T2<>(grpCtx, part));
for (GridCacheContext ctx : grpCtx.caches()) {
String cacheName = ctx.name();
if (cacheNames == null || cacheNames.contains(cacheName)) {
Collection<Index> idxs = ignite.context().indexProcessor().indexes(cacheName);
for (Index idx : idxs) {
InlineIndexImpl idx0 = idx.unwrap(InlineIndexImpl.class);
if (idx0 != null)
idxArgs.add(new T2<>(ctx, idx0));
}
}
}
}
// To decrease contention on same indexes.
shuffle(partArgs);
shuffle(idxArgs);
totalPartitions = partArgs.size();
totalIndexes = idxArgs.size();
List<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>> procPartFutures = new ArrayList<>(partArgs.size());
List<Future<Map<String, ValidateIndexesPartitionResult>>> procIdxFutures = new ArrayList<>(idxArgs.size());
List<T3<CacheGroupContext, GridDhtLocalPartition, Future<CacheSize>>> cacheSizeFutures = new ArrayList<>(partArgs.size());
List<T3<GridCacheContext, InlineIndexImpl, Future<T2<Throwable, Long>>>> idxSizeFutures = new ArrayList<>(idxArgs.size());
partArgs.forEach(k -> procPartFutures.add(processPartitionAsync(k.get1(), k.get2())));
idxArgs.forEach(k -> procIdxFutures.add(processIndexAsync(k, idleChecker)));
if (checkSizes) {
for (T2<CacheGroupContext, GridDhtLocalPartition> partArg : partArgs) {
CacheGroupContext cacheGrpCtx = partArg.get1();
GridDhtLocalPartition locPart = partArg.get2();
cacheSizeFutures.add(new T3<>(cacheGrpCtx, locPart, calcCacheSizeAsync(cacheGrpCtx, locPart)));
}
for (T2<GridCacheContext, InlineIndexImpl> idxArg : idxArgs) {
GridCacheContext cacheCtx = idxArg.get1();
InlineIndexImpl idx = idxArg.get2();
idxSizeFutures.add(new T3<>(cacheCtx, idx, calcIndexSizeAsync(cacheCtx, idx, idleChecker)));
}
}
Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>();
Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>();
Map<String, ValidateIndexesCheckSizeResult> checkSizeResults = new HashMap<>();
int curPart = 0;
int curIdx = 0;
int curCacheSize = 0;
int curIdxSize = 0;
try {
for (; curPart < procPartFutures.size(); curPart++) {
Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(curPart);
Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get();
if (!partRes.isEmpty() && partRes.entrySet().stream().anyMatch(e -> !e.getValue().issues().isEmpty()))
partResults.putAll(partRes);
}
for (; curIdx < procIdxFutures.size(); curIdx++) {
Future<Map<String, ValidateIndexesPartitionResult>> fut = procIdxFutures.get(curIdx);
Map<String, ValidateIndexesPartitionResult> idxRes = fut.get();
if (!idxRes.isEmpty() && idxRes.entrySet().stream().anyMatch(e -> !e.getValue().issues().isEmpty()))
idxResults.putAll(idxRes);
}
if (checkSizes) {
for (; curCacheSize < cacheSizeFutures.size(); curCacheSize++)
cacheSizeFutures.get(curCacheSize).get3().get();
for (; curIdxSize < idxSizeFutures.size(); curIdxSize++)
idxSizeFutures.get(curIdxSize).get3().get();
checkSizes(cacheSizeFutures, idxSizeFutures, checkSizeResults);
Map<Integer, Map<Integer, PartitionUpdateCounter>> partsWithCntrsPerGrpAfterChecks =
getUpdateCountersSnapshot(ignite, grpIds);
List<Integer> diff = compareUpdateCounters(ignite, partsWithCntrsPerGrp, partsWithCntrsPerGrpAfterChecks);
if (!F.isEmpty(diff)) {
String res = formatUpdateCountersDiff(ignite, diff);
if (!res.isEmpty())
throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[" + res + "]");
}
}
log.warning("ValidateIndexesClosure finished: processed " + totalPartitions + " partitions and "
+ totalIndexes + " indexes.");
}
catch (InterruptedException | ExecutionException e) {
for (int j = curPart; j < procPartFutures.size(); j++)
procPartFutures.get(j).cancel(false);
for (int j = curIdx; j < procIdxFutures.size(); j++)
procIdxFutures.get(j).cancel(false);
for (int j = curCacheSize; j < cacheSizeFutures.size(); j++)
cacheSizeFutures.get(j).get3().cancel(false);
for (int j = curIdxSize; j < idxSizeFutures.size(); j++)
idxSizeFutures.get(j).get3().cancel(false);
throw unwrapFutureException(e);
}
if (validateCtx.isCancelled())
throw new IgniteException(CANCELLED_MSG);
return new VisorValidateIndexesJobResult(
partResults,
idxResults,
integrityCheckResults.values(),
checkSizeResults
);
}
/**
* @param grpIds Group ids.
* @param idleChecker Idle check closure.
*/
private Map<Integer, IndexIntegrityCheckIssue> integrityCheckIndexesPartitions(
Set<Integer> grpIds,
IgniteInClosure<Integer> idleChecker) {
if (!checkCrc)
return Collections.emptyMap();
List<Future<T2<Integer, IndexIntegrityCheckIssue>>> integrityCheckFutures = new ArrayList<>(grpIds.size());
Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>();
int curFut = 0;
IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
try {
for (Integer grpId: grpIds) {
final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
if (grpCtx == null || !grpCtx.persistenceEnabled()) {
integrityCheckedIndexes.incrementAndGet();
continue;
}
Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() {
@Override public T2<Integer, IndexIntegrityCheckIssue> call() {
IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx, idleChecker);
return new T2<>(grpCtx.groupId(), issue);
}
});
integrityCheckFutures.add(checkFut);
}
for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut : integrityCheckFutures) {
T2<Integer, IndexIntegrityCheckIssue> res = fut.get();
if (res.getValue() != null)
integrityCheckResults.put(res.getKey(), res.getValue());
}
}
catch (InterruptedException | ExecutionException e) {
for (int j = curFut; j < integrityCheckFutures.size(); j++)
integrityCheckFutures.get(j).cancel(false);
throw unwrapFutureException(e);
}
return integrityCheckResults;
}
/**
* @param gctx Cache group context.
* @param idleChecker Idle check closure.
*/
private IndexIntegrityCheckIssue integrityCheckIndexPartition(
CacheGroupContext gctx,
IgniteInClosure<Integer> idleChecker
) {
GridKernalContext ctx = ignite.context();
GridCacheSharedContext<?, ?> cctx = ctx.cache().context();
try {
FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cctx.pageStore();
if (pageStoreMgr != null && gctx.persistenceEnabled()) {
checkPartitionsPageCrcSum(() -> (FilePageStore)pageStoreMgr.getStore(gctx.groupId(), INDEX_PARTITION),
INDEX_PARTITION, FLAG_IDX);
}
idleChecker.apply(gctx.groupId());
return null;
}
catch (Throwable t) {
log.error("Integrity check of index partition of cache group " + gctx.cacheOrGroupName() + " failed", t);
return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t);
}
finally {
integrityCheckedIndexes.incrementAndGet();
printProgressIfNeeded(() -> "Current progress of ValidateIndexesClosure: checked integrity of "
+ integrityCheckedIndexes.get() + " index partitions of " + totalCacheGrps + " cache groups");
}
}
/**
* @param grpCtx Group context.
* @param part Local partition.
*/
private Future<Map<PartitionKey, ValidateIndexesPartitionResult>> processPartitionAsync(
CacheGroupContext grpCtx,
GridDhtLocalPartition part
) {
return calcExecutor.submit(new Callable<Map<PartitionKey, ValidateIndexesPartitionResult>>() {
@Override public Map<PartitionKey, ValidateIndexesPartitionResult> call() {
return processPartition(grpCtx, part);
}
});
}
/**
* @param grpCtx Group context.
* @param part Local partition.
*/
private Map<PartitionKey, ValidateIndexesPartitionResult> processPartition(
CacheGroupContext grpCtx,
GridDhtLocalPartition part
) {
if (validateCtx.isCancelled() || !part.reserve())
return emptyMap();
ValidateIndexesPartitionResult partRes;
try {
if (part.state() != OWNING)
return emptyMap();
@Nullable PartitionUpdateCounter updCntr = part.dataStore().partUpdateCounter();
PartitionUpdateCounter updateCntrBefore = updCntr == null ? null : updCntr.copy();
partRes = new ValidateIndexesPartitionResult();
boolean hasMvcc = grpCtx.caches().stream().anyMatch(GridCacheContext::mvccEnabled);
if (hasMvcc) {
for (GridCacheContext<?, ?> context : grpCtx.caches()) {
IndexQueryContext qryCtx = mvccQueryContext(context);
GridIterator<CacheDataRow> iterator = grpCtx.offheap().cachePartitionIterator(
context.cacheId(),
part.id(),
qryCtx.mvccSnapshot(),
null
);
processPartIterator(grpCtx, partRes, qryCtx, iterator);
}
}
else
processPartIterator(grpCtx, partRes, null, grpCtx.offheap().partitionIterator(part.id()));
PartitionUpdateCounter updateCntrAfter = part.dataStore().partUpdateCounter();
if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
", grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "] changed during index validation " +
"[before=" + updateCntrBefore + ", after=" + updateCntrAfter + "]");
}
}
catch (IgniteCheckedException e) {
error(log, "Failed to process partition [grpId=" + grpCtx.groupId() +
", partId=" + part.id() + "]", e);
return emptyMap();
}
finally {
part.release();
printProgressOfIndexValidationIfNeeded();
}
PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
processedPartitions.incrementAndGet();
return Collections.singletonMap(partKey, partRes);
}
/**
* Process partition iterator.
*
* @param grpCtx Cache group context.
* @param partRes Result object.
* @param qryCtx Index query context.
* @param it Partition iterator.
* @throws IgniteCheckedException
*/
private void processPartIterator(
CacheGroupContext grpCtx,
ValidateIndexesPartitionResult partRes,
IndexQueryContext qryCtx,
GridIterator<CacheDataRow> it
) throws IgniteCheckedException {
boolean enoughIssues = false;
GridQueryProcessor qryProcessor = ignite.context().query();
final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0;
long current = 0;
long processedNumber = 0;
while (it.hasNextX() && !validateCtx.isCancelled()) {
if (enoughIssues)
break;
CacheDataRow row = it.nextX();
if (skipConditions) {
if (bothSkipConditions) {
if (processedNumber > checkFirst)
break;
else if (current++ % checkThrough > 0)
continue;
else
processedNumber++;
}
else {
if (checkFirst > 0) {
if (current++ > checkFirst)
break;
}
else {
if (current++ % checkThrough > 0)
continue;
}
}
}
int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : row.cacheId();
GridCacheContext<?, ?> cacheCtx = row.cacheId() == 0 ?
grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(row.cacheId());
if (cacheCtx == null)
throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
if (row.link() == 0L) {
String errMsg = "Invalid partition row, possibly deleted";
log.error(errMsg);
IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null,
new IgniteCheckedException(errMsg));
enoughIssues |= partRes.reportIssue(is);
continue;
}
QueryTypeDescriptorImpl res = qryProcessor.typeByValue(
cacheCtx.name(),
cacheCtx.cacheObjectContext(),
row.key(),
row.value(),
true
);
if (res == null)
continue; // Tolerate - (k, v) is just not indexed.
Collection<Index> indexes = ignite.context().indexProcessor().indexes(cacheCtx.name());
for (Index idx : indexes) {
if (validateCtx.isCancelled())
break;
InlineIndexImpl idx0 = idx.unwrap(InlineIndexImpl.class);
if (idx0 == null)
continue;
if (!F.eq(idx0.indexDefinition().idxName().tableName(), res.tableName()))
continue;
IndexRow idxRow = new IndexRowImpl(idx0.segment(0).rowHandler(), row);
try {
GridCursor<IndexRow> cursor = idx0.find(idxRow, idxRow, true, true, qryCtx);
if (cursor == null || !cursor.next())
throw new IgniteCheckedException("Key is present in CacheDataTree, but can't be found in SQL index.");
}
catch (Throwable t) {
Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
grpCtx.cacheObjectContext(), row.key(), true, true);
IndexValidationIssue is = new IndexValidationIssue(
o.toString(), cacheCtx.name(), idx.name(), t);
log.error("Failed to lookup key: " + is.toString(), t);
enoughIssues |= partRes.reportIssue(is);
}
}
}
}
/**
* Get QueryContext for MVCC snapshot.
*
* @param cctx Cache context.
* @return QueryContext for MVCC snapshot.
* @throws IgniteCheckedException If failed.
*/
private IndexQueryContext mvccQueryContext(GridCacheContext<?, ?> cctx) throws IgniteCheckedException {
boolean mvccEnabled = cctx.mvccEnabled();
if (mvccEnabled) {
MvccQueryTracker tracker = MvccUtils.mvccTracker(cctx, true);
MvccSnapshot mvccSnapshot = tracker.snapshot();
return new IndexQueryContext(cacheName -> null, null, mvccSnapshot);
}
return null;
}
/**
*
*/
private void printProgressOfIndexValidationIfNeeded() {
printProgressIfNeeded(() -> "Current progress of ValidateIndexesClosure: processed " +
processedPartitions.get() + " of " + totalPartitions + " partitions, " +
processedIndexes.get() + " of " + totalIndexes + " SQL indexes" +
(checkSizes ? ", " + processedCacheSizePartitions.get() + " of " + totalPartitions +
" calculate cache size per partitions, " + processedIdxSizes.get() + " of " + totalIndexes +
"calculate index size" : ""));
}
/**
*
*/
private void printProgressIfNeeded(Supplier<String> msgSup) {
long curTs = U.currentTimeMillis();
long lastTs = lastProgressPrintTs.get();
if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs))
log.warning(msgSup.get());
}
/**
* @param cacheCtxWithIdx Cache context and appropriate index.
* @param idleChecker Idle check closure.
*/
private Future<Map<String, ValidateIndexesPartitionResult>> processIndexAsync(
T2<GridCacheContext, InlineIndexImpl> cacheCtxWithIdx,
IgniteInClosure<Integer> idleChecker
) {
return calcExecutor.submit(new Callable<Map<String, ValidateIndexesPartitionResult>>() {
/** {@inheritDoc} */
@Override public Map<String, ValidateIndexesPartitionResult> call() {
BPlusTree.suspendFailureDiagnostic.set(true);
try {
return processIndex(cacheCtxWithIdx, idleChecker);
}
finally {
BPlusTree.suspendFailureDiagnostic.set(false);
}
}
});
}
/**
* @param cacheCtxWithIdx Cache context and appropriate index.
* @param idleChecker Idle check closure.
*/
private Map<String, ValidateIndexesPartitionResult> processIndex(
T2<GridCacheContext, InlineIndexImpl> cacheCtxWithIdx,
IgniteInClosure<Integer> idleChecker
) {
if (validateCtx.isCancelled())
return emptyMap();
GridCacheContext ctx = cacheCtxWithIdx.get1();
InlineIndexImpl idx = cacheCtxWithIdx.get2();
ValidateIndexesPartitionResult idxValidationRes = new ValidateIndexesPartitionResult();
boolean enoughIssues = false;
GridCursor<IndexRow> cursor = null;
try {
cursor = idx.find(null, null, true, true, mvccQueryContext(cacheCtxWithIdx.get1()));
if (cursor == null)
throw new IgniteCheckedException("Can't iterate through index: " + idx);
}
catch (Throwable t) {
IndexValidationIssue is = new IndexValidationIssue(null, ctx.name(), idx.name(), t);
log.error("Find in index failed: " + is.toString());
idxValidationRes.reportIssue(is);
enoughIssues = true;
}
final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0;
long current = 0;
long processedNumber = 0;
KeyCacheObject previousKey = null;
while (!enoughIssues && !validateCtx.isCancelled()) {
KeyCacheObject key = null;
try {
try {
if (!cursor.next())
break;
}
catch (Exception e) {
if (X.hasCause(e, CorruptedTreeException.class))
throw new IgniteCheckedException("Key is present in SQL index, but is missing in corresponding " +
"data page. Previous successfully read key: " +
CacheObjectUtils.unwrapBinaryIfNeeded(ctx.cacheObjectContext(), previousKey, true, true),
X.cause(e, CorruptedTreeException.class)
);
throw e;
}
IndexRow idxRow = cursor.get();
if (skipConditions) {
if (bothSkipConditions) {
if (processedNumber > checkFirst)
break;
else if (current++ % checkThrough > 0)
continue;
else
processedNumber++;
}
else {
if (checkFirst > 0) {
if (current++ > checkFirst)
break;
}
else {
if (current++ % checkThrough > 0)
continue;
}
}
}
key = idxRow.cacheDataRow().key();
if (idxRow.cacheDataRow().link() != 0L) {
CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, key);
if (cacheDataStoreRow == null)
throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
}
else
throw new IgniteCheckedException("Invalid index row, possibly deleted " + idxRow);
}
catch (Throwable t) {
Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
ctx.cacheObjectContext(), key, true, true);
IndexValidationIssue is = new IndexValidationIssue(
String.valueOf(o), ctx.name(), idx.name(), t);
log.error("Failed to lookup key: " + is.toString());
enoughIssues |= idxValidationRes.reportIssue(is);
}
finally {
previousKey = key;
}
}
CacheGroupContext group = ctx.group();
String uniqueIdxName = String.format(
"[cacheGroup=%s, cacheGroupId=%s, cache=%s, cacheId=%s, idx=%s]",
group.name(),
group.groupId(),
ctx.name(),
ctx.cacheId(),
idx.name()
);
idleChecker.apply(group.groupId());
processedIndexes.incrementAndGet();
printProgressOfIndexValidationIfNeeded();
return Collections.singletonMap(uniqueIdxName, idxValidationRes);
}
/**
* @param e Future result exception.
* @return Unwrapped exception.
*/
private IgniteException unwrapFutureException(Exception e) {
assert e instanceof InterruptedException || e instanceof ExecutionException : "Expecting either InterruptedException " +
"or ExecutionException";
if (e instanceof InterruptedException)
return new IgniteInterruptedException((InterruptedException)e);
else if (e.getCause() instanceof IgniteException)
return (IgniteException)e.getCause();
else
return new IgniteException(e.getCause());
}
/**
* Asynchronous calculation of caches size with divided by tables.
*
* @param grpCtx Cache group context.
* @param locPart Local partition.
* @return Future with cache sizes.
*/
private Future<CacheSize> calcCacheSizeAsync(
CacheGroupContext grpCtx,
GridDhtLocalPartition locPart
) {
return calcExecutor.submit(() -> {
return calcCacheSize(grpCtx, locPart);
});
}
/**
* Calculation of caches size with divided by tables.
*
* @param grpCtx Cache group context.
* @param locPart Local partition.
* @return Cache size representation object.
*/
private CacheSize calcCacheSize(CacheGroupContext grpCtx, GridDhtLocalPartition locPart) {
try {
if (validateCtx.isCancelled())
return new CacheSize(null, emptyMap());
@Nullable PartitionUpdateCounter updCntr = locPart.dataStore().partUpdateCounter();
PartitionUpdateCounter updateCntrBefore = updCntr == null ? updCntr : updCntr.copy();
int grpId = grpCtx.groupId();
if (failCalcCacheSizeGrpIds.contains(grpId))
return new CacheSize(null, null);
boolean reserve = false;
int partId = locPart.id();
try {
if (!(reserve = locPart.reserve()))
throw new IgniteException("Can't reserve partition");
if (locPart.state() != OWNING)
throw new IgniteException("Partition not in state " + OWNING);
Map<Integer, Map<String, AtomicLong>> cacheSizeByTbl = new HashMap<>();
GridIterator<CacheDataRow> partIter = grpCtx.offheap().partitionIterator(partId);
GridQueryProcessor qryProcessor = ignite.context().query();
while (partIter.hasNextX() && !failCalcCacheSizeGrpIds.contains(grpId)) {
CacheDataRow cacheDataRow = partIter.nextX();
int cacheId = cacheDataRow.cacheId();
GridCacheContext cacheCtx = cacheId == 0 ?
grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(cacheId);
if (cacheCtx == null)
throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
if (cacheDataRow.link() == 0L)
throw new IgniteException("Contains invalid partition row, possibly deleted");
String cacheName = cacheCtx.name();
QueryTypeDescriptorImpl qryTypeDesc = qryProcessor.typeByValue(
cacheName,
cacheCtx.cacheObjectContext(),
cacheDataRow.key(),
cacheDataRow.value(),
true
);
if (isNull(qryTypeDesc))
continue; // Tolerate - (k, v) is just not indexed.
String tableName = qryTypeDesc.tableName();
cacheSizeByTbl.computeIfAbsent(cacheCtx.cacheId(), i -> new HashMap<>())
.computeIfAbsent(tableName, s -> new AtomicLong()).incrementAndGet();
}
PartitionUpdateCounter updateCntrAfter = locPart.dataStore().partUpdateCounter();
if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() +
", grpId=" + grpCtx.groupId() + ", partId=" + locPart.id() + "] changed during size " +
"calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
}
return new CacheSize(null, cacheSizeByTbl);
}
catch (Throwable t) {
IgniteException cacheSizeErr = new IgniteException("Cache size calculation error [" +
cacheGrpInfo(grpCtx) + ", locParId=" + partId + ", err=" + t.getMessage() + "]", t);
error(log, cacheSizeErr);
failCalcCacheSizeGrpIds.add(grpId);
return new CacheSize(cacheSizeErr, null);
}
finally {
if (reserve)
locPart.release();
}
}
finally {
processedCacheSizePartitions.incrementAndGet();
printProgressOfIndexValidationIfNeeded();
}
}
/**
* Asynchronous calculation of the index size for cache.
*
* @param cacheCtx Cache context.
* @param idx Index.
* @param idleChecker Idle check closure.
* @return Future with index size.
*/
private Future<T2<Throwable, Long>> calcIndexSizeAsync(
GridCacheContext cacheCtx,
InlineIndexImpl idx,
IgniteInClosure<Integer> idleChecker
) {
return calcExecutor.submit(() -> calcIndexSize(cacheCtx, idx, idleChecker));
}
/**
* Calculation of the index size for cache.
*
* @param cacheCtx Cache context.
* @param idx Index.
* @param idleChecker Idle check closure.
* @return Tuple contains exception if it happened and size of index.
*/
private T2<Throwable, Long> calcIndexSize(
GridCacheContext cacheCtx,
InlineIndexImpl idx,
IgniteInClosure<Integer> idleChecker
) {
if (validateCtx.isCancelled())
return new T2<>(null, 0L);
try {
if (failCalcCacheSizeGrpIds.contains(cacheCtx.groupId()))
return new T2<>(null, 0L);
String cacheName = cacheCtx.name();
String tblName = idx.indexDefinition().idxName().tableName();
String idxName = idx.indexDefinition().idxName().idxName();
try {
long indexSize = idx.totalCount();
idleChecker.apply(cacheCtx.groupId());
return new T2<>(null, indexSize);
}
catch (Throwable t) {
Throwable idxSizeErr = new IgniteException("Index size calculation error [" +
cacheGrpInfo(cacheCtx.group()) + ", " + cacheInfo(cacheCtx) + ", tableName=" +
tblName + ", idxName=" + idxName + ", err=" + t.getMessage() + "]", t);
error(log, idxSizeErr);
return new T2<>(idxSizeErr, 0L);
}
}
finally {
processedIdxSizes.incrementAndGet();
printProgressOfIndexValidationIfNeeded();
}
}
/**
* Return cache group info string.
*
* @param cacheGrpCtx Cache group context.
* @return Cache group info string.
*/
private String cacheGrpInfo(CacheGroupContext cacheGrpCtx) {
return "cacheGrpName=" + cacheGrpCtx.name() + ", cacheGrpId=" + cacheGrpCtx.groupId();
}
/**
* Return cache info string.
*
* @param cacheCtx Cache context.
* @return Cache info string.
*/
private String cacheInfo(GridCacheContext cacheCtx) {
return "cacheName=" + cacheCtx.name() + ", cacheId=" + cacheCtx.cacheId();
}
/**
* Checking size of records in cache and indexes with a record into
* {@code checkSizeRes} if they are not equal.
*
* @param cacheSizesFutures Futures calculating size of records in caches.
* @param idxSizeFutures Futures calculating size of indexes of caches.
* @param checkSizeRes Result of size check.
*/
private void checkSizes(
List<T3<CacheGroupContext, GridDhtLocalPartition, Future<CacheSize>>> cacheSizesFutures,
List<T3<GridCacheContext, InlineIndexImpl, Future<T2<Throwable, Long>>>> idxSizeFutures,
Map<String, ValidateIndexesCheckSizeResult> checkSizeRes
) throws ExecutionException, InterruptedException {
if (!checkSizes)
return;
Map<Integer, CacheSize> cacheSizeTotal = new HashMap<>();
for (T3<CacheGroupContext, GridDhtLocalPartition, Future<CacheSize>> cacheSizeFut : cacheSizesFutures) {
CacheGroupContext cacheGrpCtx = cacheSizeFut.get1();
CacheSize cacheSize = cacheSizeFut.get3().get();
Throwable cacheSizeErr = cacheSize.err;
int grpId = cacheGrpCtx.groupId();
if (failCalcCacheSizeGrpIds.contains(grpId) && nonNull(cacheSizeErr)) {
checkSizeRes.computeIfAbsent(
cacheGrpInfo(cacheGrpCtx),
s -> new ValidateIndexesCheckSizeResult(0, new ArrayList<>())
).issues().add(new ValidateIndexesCheckSizeIssue(null, 0, cacheSizeErr));
}
else {
cacheSizeTotal.computeIfAbsent(grpId, i -> new CacheSize(null, new HashMap<>()))
.merge(cacheSize.cacheSizePerTbl);
}
}
for (T3<GridCacheContext, InlineIndexImpl, Future<T2<Throwable, Long>>> idxSizeFut : idxSizeFutures) {
GridCacheContext cacheCtx = idxSizeFut.get1();
int grpId = cacheCtx.groupId();
if (failCalcCacheSizeGrpIds.contains(grpId))
continue;
InlineIndexImpl idx = idxSizeFut.get2();
String tblName = idx.indexDefinition().idxName().tableName();
AtomicLong cacheSizeObj = cacheSizeTotal.get(grpId).cacheSizePerTbl
.getOrDefault(cacheCtx.cacheId(), emptyMap()).get(tblName);
long cacheSizeByTbl = isNull(cacheSizeObj) ? 0L : cacheSizeObj.get();
T2<Throwable, Long> idxSizeRes = idxSizeFut.get3().get();
Throwable err = idxSizeRes.get1();
long idxSize = idxSizeRes.get2();
if (isNull(err) && idxSize != cacheSizeByTbl)
err = new IgniteException("Cache and index size not same.");
if (nonNull(err)) {
checkSizeRes.computeIfAbsent(
"[" + cacheGrpInfo(cacheCtx.group()) + ", " + cacheInfo(cacheCtx) + ", tableName=" + tblName + "]",
s -> new ValidateIndexesCheckSizeResult(cacheSizeByTbl, new ArrayList<>()))
.issues().add(new ValidateIndexesCheckSizeIssue(idx.name(), idxSize, err));
}
}
}
/**
* Container class for calculating the size of cache, divided by tables.
*/
private static class CacheSize {
/** Error calculating size of the cache. */
final Throwable err;
/** Table split cache size, {@code Map<CacheId, Map<TableName, Size>>}. */
final Map<Integer, Map<String, AtomicLong>> cacheSizePerTbl;
/**
* Constructor.
*
* @param err Error calculating size of the cache.
* @param cacheSizePerTbl Table split cache size.
*/
public CacheSize(
@Nullable Throwable err,
@Nullable Map<Integer, Map<String, AtomicLong>> cacheSizePerTbl
) {
this.err = err;
this.cacheSizePerTbl = cacheSizePerTbl;
}
/**
* Merging cache sizes separated by tables.
*
* @param other Other table split cache size.
*/
void merge(Map<Integer, Map<String, AtomicLong>> other) {
assert nonNull(cacheSizePerTbl);
for (Entry<Integer, Map<String, AtomicLong>> cacheEntry : other.entrySet()) {
for (Entry<String, AtomicLong> tableEntry : cacheEntry.getValue().entrySet()) {
cacheSizePerTbl.computeIfAbsent(cacheEntry.getKey(), i -> new HashMap<>())
.computeIfAbsent(tableEntry.getKey(), s -> new AtomicLong())
.addAndGet(tableEntry.getValue().get());
}
}
}
}
}