blob: 8f1d53f109a1d99c19979849f5b0f43b2a55c72f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.visor.verify;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.tree.CorruptedTreeException;
import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndexBase;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.message.DbException;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
/**
* Closure that locally validates indexes of given caches.
* Validation consists of three checks:
* 1. If entry is present in cache data tree, it's reachable from all cache SQL indexes
* 2. If entry is present in cache SQL index, it can be dereferenced with link from index
* 3. If entry is present in cache SQL index, it's present in cache data tree
*/
public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndexesJobResult> {
/** */
private static final long serialVersionUID = 0L;
/** Ignite. */
@IgniteInstanceResource
private transient IgniteEx ignite;
/** Injected logger. */
@LoggerResource
private IgniteLogger log;
/** Cache names. */
private Set<String> cacheNames;
/** If provided only first K elements will be validated. */
private final int checkFirst;
/** If provided only each Kth element will be validated. */
private final int checkThrough;
/** Counter of processed partitions. */
private final AtomicInteger processedPartitions = new AtomicInteger(0);
/** Total partitions. */
private volatile int totalPartitions;
/** Counter of processed indexes. */
private final AtomicInteger processedIndexes = new AtomicInteger(0);
/** Counter of integrity checked indexes. */
private final AtomicInteger integrityCheckedIndexes = new AtomicInteger(0);
/** Total partitions. */
private volatile int totalIndexes;
/** Total cache groups. */
private volatile int totalCacheGrps;
/** Last progress print timestamp. */
private final AtomicLong lastProgressPrintTs = new AtomicLong(0);
/** Calculation executor. */
private volatile ExecutorService calcExecutor;
/**
* @param cacheNames Cache names.
* @param checkFirst If positive only first K elements will be validated.
* @param checkThrough If positive only each Kth element will be validated.
*/
public ValidateIndexesClosure(Set<String> cacheNames, int checkFirst, int checkThrough) {
this.cacheNames = cacheNames;
this.checkFirst = checkFirst;
this.checkThrough = checkThrough;
}
/** {@inheritDoc} */
@Override public VisorValidateIndexesJobResult call() throws Exception {
calcExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
try {
return call0();
}
finally {
calcExecutor.shutdown();
}
}
/**
*
*/
private VisorValidateIndexesJobResult call0() {
Set<Integer> grpIds = new HashSet<>();
Set<String> missingCaches = new HashSet<>();
if (cacheNames != null) {
for (String cacheName : cacheNames) {
DynamicCacheDescriptor desc = ignite.context().cache().cacheDescriptor(cacheName);
if (desc == null) {
missingCaches.add(cacheName);
continue;
}
grpIds.add(desc.groupId());
}
if (!missingCaches.isEmpty()) {
StringBuilder strBuilder = new StringBuilder("The following caches do not exist: ");
for (String name : missingCaches)
strBuilder.append(name).append(", ");
strBuilder.delete(strBuilder.length() - 2, strBuilder.length());
throw new IgniteException(strBuilder.toString());
}
}
else {
Collection<CacheGroupContext> groups = ignite.context().cache().cacheGroups();
for (CacheGroupContext grp : groups) {
if (!grp.systemCache() && !grp.isLocal())
grpIds.add(grp.groupId());
}
}
List<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>> procPartFutures = new ArrayList<>();
List<Future<Map<String, ValidateIndexesPartitionResult>>> procIdxFutures = new ArrayList<>();
List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>();
List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>();
totalCacheGrps = grpIds.size();
Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = integrityCheckIndexesPartitions(grpIds);
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
if (grpCtx == null || integrityCheckResults.containsKey(grpId))
continue;
List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
for (GridDhtLocalPartition part : parts)
partArgs.add(new T2<>(grpCtx, part));
GridQueryProcessor qry = ignite.context().query();
IgniteH2Indexing indexing = (IgniteH2Indexing)qry.getIndexing();
for (GridCacheContext ctx : grpCtx.caches()) {
if (cacheNames == null || cacheNames.contains(ctx.name())) {
Collection<GridQueryTypeDescriptor> types = qry.types(ctx.name());
if (!F.isEmpty(types)) {
for (GridQueryTypeDescriptor type : types) {
GridH2Table gridH2Tbl = indexing.schemaManager().dataTable(ctx.name(), type.tableName());
if (gridH2Tbl == null)
continue;
ArrayList<Index> indexes = gridH2Tbl.getIndexes();
for (Index idx : indexes)
if (idx instanceof H2TreeIndexBase)
idxArgs.add(new T2<>(ctx, idx));
}
}
}
}
}
// To decrease contention on same indexes.
Collections.shuffle(partArgs);
Collections.shuffle(idxArgs);
totalPartitions = partArgs.size();
totalIndexes = idxArgs.size();
for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs)
procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2()));
for (T2<GridCacheContext, Index> t2 : idxArgs)
procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2()));
Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>();
Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>();
int curPart = 0;
int curIdx = 0;
try {
for (; curPart < procPartFutures.size(); curPart++) {
Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(curPart);
Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get();
if (!partRes.isEmpty() && partRes.entrySet().stream().anyMatch(e -> !e.getValue().issues().isEmpty()))
partResults.putAll(partRes);
}
for (; curIdx < procIdxFutures.size(); curIdx++) {
Future<Map<String, ValidateIndexesPartitionResult>> fut = procIdxFutures.get(curIdx);
Map<String, ValidateIndexesPartitionResult> idxRes = fut.get();
if (!idxRes.isEmpty() && idxRes.entrySet().stream().anyMatch(e -> !e.getValue().issues().isEmpty()))
idxResults.putAll(idxRes);
}
log.warning("ValidateIndexesClosure finished: processed " + totalPartitions + " partitions and "
+ totalIndexes + " indexes.");
}
catch (InterruptedException | ExecutionException e) {
for (int j = curPart; j < procPartFutures.size(); j++)
procPartFutures.get(j).cancel(false);
for (int j = curIdx; j < procIdxFutures.size(); j++)
procIdxFutures.get(j).cancel(false);
throw unwrapFutureException(e);
}
return new VisorValidateIndexesJobResult(partResults, idxResults, integrityCheckResults.values());
}
/**
* @param grpIds Group ids.
*/
private Map<Integer, IndexIntegrityCheckIssue> integrityCheckIndexesPartitions(Set<Integer> grpIds) {
List<Future<T2<Integer, IndexIntegrityCheckIssue>>> integrityCheckFutures = new ArrayList<>(grpIds.size());
Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>();
int curFut = 0;
IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
DbCheckpointListener lsnr = null;
try {
AtomicBoolean cpFlag = new AtomicBoolean();
if (db instanceof GridCacheDatabaseSharedManager) {
lsnr = new DbCheckpointListener() {
@Override public void onMarkCheckpointBegin(Context ctx) {
/* No-op. */
}
@Override public void onCheckpointBegin(Context ctx) {
if (ctx.hasPages())
cpFlag.set(true);
}
@Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
/* No-op. */
}
};
((GridCacheDatabaseSharedManager)db).addCheckpointListener(lsnr);
if (IdleVerifyUtility.isCheckpointNow(db))
throw new GridNotIdleException(IdleVerifyUtility.CLUSTER_NOT_IDLE_MSG);
}
for (Integer grpId: grpIds) {
final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
if (grpCtx == null || !grpCtx.persistenceEnabled()) {
integrityCheckedIndexes.incrementAndGet();
continue;
}
Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() {
@Override public T2<Integer, IndexIntegrityCheckIssue> call() throws Exception {
IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx, cpFlag);
return new T2<>(grpCtx.groupId(), issue);
}
});
integrityCheckFutures.add(checkFut);
}
for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut : integrityCheckFutures) {
T2<Integer, IndexIntegrityCheckIssue> res = fut.get();
if (res.getValue() != null)
integrityCheckResults.put(res.getKey(), res.getValue());
}
}
catch (InterruptedException | ExecutionException e) {
for (int j = curFut; j < integrityCheckFutures.size(); j++)
integrityCheckFutures.get(j).cancel(false);
throw unwrapFutureException(e);
}
finally {
if (db instanceof GridCacheDatabaseSharedManager && lsnr != null)
((GridCacheDatabaseSharedManager)db).removeCheckpointListener(lsnr);
}
return integrityCheckResults;
}
/**
* @param gctx Cache group context.
* @param cpFlag Checkpoint status flag.
*/
private IndexIntegrityCheckIssue integrityCheckIndexPartition(CacheGroupContext gctx, AtomicBoolean cpFlag) {
GridKernalContext ctx = ignite.context();
GridCacheSharedContext cctx = ctx.cache().context();
try {
FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cctx.pageStore();
IdleVerifyUtility.checkPartitionsPageCrcSum(pageStoreMgr, gctx, INDEX_PARTITION, FLAG_IDX, cpFlag);
return null;
}
catch (Throwable t) {
if (cpFlag.get())
throw new GridNotIdleException("Checkpoint with dirty pages started! Cluster not idle!", t);
log.error("Integrity check of index partition of cache group " + gctx.cacheOrGroupName() + " failed", t);
return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t);
}
finally {
integrityCheckedIndexes.incrementAndGet();
printProgressIfNeeded("Current progress of ValidateIndexesClosure: checked integrity of "
+ integrityCheckedIndexes.get() + " index partitions of " + totalCacheGrps + " cache groups");
}
}
/**
* @param grpCtx Group context.
* @param part Local partition.
*/
private Future<Map<PartitionKey, ValidateIndexesPartitionResult>> processPartitionAsync(
final CacheGroupContext grpCtx,
final GridDhtLocalPartition part
) {
return calcExecutor.submit(new Callable<Map<PartitionKey, ValidateIndexesPartitionResult>>() {
@Override public Map<PartitionKey, ValidateIndexesPartitionResult> call() throws Exception {
return processPartition(grpCtx, part);
}
});
}
/**
* @param grpCtx Group context.
* @param part Local partition.
*/
private Map<PartitionKey, ValidateIndexesPartitionResult> processPartition(
CacheGroupContext grpCtx,
GridDhtLocalPartition part
) {
if (!part.reserve())
return Collections.emptyMap();
ValidateIndexesPartitionResult partRes;
try {
if (part.state() != GridDhtPartitionState.OWNING)
return Collections.emptyMap();
long updateCntrBefore = part.updateCounter();
long partSize = part.dataStore().fullSize();
GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id());
Object consId = ignite.context().discovery().localNode().consistentId();
boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion());
partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary, consId, null);
boolean enoughIssues = false;
GridQueryProcessor qryProcessor = ignite.context().query();
Method m;
try {
m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class,
CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, boolean.class);
}
catch (NoSuchMethodException e) {
log.error("Failed to invoke typeByValue", e);
throw new IgniteException(e);
}
m.setAccessible(true);
final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0;
long current = 0;
long processedNumber = 0;
while (it.hasNextX()) {
if (enoughIssues)
break;
CacheDataRow row = it.nextX();
if (skipConditions) {
if (bothSkipConditions) {
if (processedNumber > checkFirst)
break;
else if (current++ % checkThrough > 0)
continue;
else
processedNumber++;
}
else {
if (checkFirst > 0) {
if (current++ > checkFirst)
break;
}
else {
if (current++ % checkThrough > 0)
continue;
}
}
}
int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : row.cacheId();
GridCacheContext cacheCtx = row.cacheId() == 0 ?
grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(row.cacheId());
if (cacheCtx == null)
throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
if (row.link() == 0L) {
String errMsg = "Invalid partition row, possibly deleted";
log.error(errMsg);
IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null,
new IgniteCheckedException(errMsg));
enoughIssues |= partRes.reportIssue(is);
continue;
}
try {
QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke(
qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true);
if (res == null)
continue; // Tolerate - (k, v) is just not indexed.
IgniteH2Indexing indexing = (IgniteH2Indexing)qryProcessor.getIndexing();
GridH2Table gridH2Tbl = indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName());
if (gridH2Tbl == null)
continue; // Tolerate - (k, v) is just not indexed.
GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor();
H2CacheRow h2Row = gridH2RowDesc.createRow(row);
ArrayList<Index> indexes = gridH2Tbl.getIndexes();
for (Index idx : indexes) {
if (!(idx instanceof H2TreeIndexBase))
continue;
try {
Cursor cursor = idx.find((Session) null, h2Row, h2Row);
if (cursor == null || !cursor.next())
throw new IgniteCheckedException("Key is present in CacheDataTree, but can't be found in SQL index.");
}
catch (Throwable t) {
Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
grpCtx.cacheObjectContext(), row.key(), true, true);
IndexValidationIssue is = new IndexValidationIssue(
o.toString(), cacheCtx.name(), idx.getName(), t);
log.error("Failed to lookup key: " + is.toString(), t);
enoughIssues |= partRes.reportIssue(is);
}
}
}
catch (IllegalAccessException e) {
log.error("Failed to invoke typeByValue", e);
throw new IgniteException(e);
}
catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
log.error("Failed to invoke typeByValue", target);
throw new IgniteException(target);
}
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process partition [grpId=" + grpCtx.groupId() +
", partId=" + part.id() + "]", e);
return Collections.emptyMap();
}
finally {
part.release();
printProgressOfIndexValidationIfNeeded();
}
PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
processedPartitions.incrementAndGet();
return Collections.singletonMap(partKey, partRes);
}
/**
*
*/
private void printProgressOfIndexValidationIfNeeded() {
printProgressIfNeeded("Current progress of ValidateIndexesClosure: processed " +
processedPartitions.get() + " of " + totalPartitions + " partitions, " +
processedIndexes.get() + " of " + totalIndexes + " SQL indexes");
}
/**
*
*/
private void printProgressIfNeeded(String msg) {
long curTs = U.currentTimeMillis();
long lastTs = lastProgressPrintTs.get();
if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs))
log.warning(msg);
}
/**
* @param ctx Context.
* @param idx Index.
*/
private Future<Map<String, ValidateIndexesPartitionResult>> processIndexAsync(GridCacheContext ctx, Index idx) {
return calcExecutor.submit(new Callable<Map<String, ValidateIndexesPartitionResult>>() {
@Override public Map<String, ValidateIndexesPartitionResult> call() throws Exception {
return processIndex(ctx, idx);
}
});
}
/**
* @param ctx Context.
* @param idx Index.
*/
private Map<String, ValidateIndexesPartitionResult> processIndex(GridCacheContext ctx, Index idx) {
Object consId = ignite.context().discovery().localNode().consistentId();
ValidateIndexesPartitionResult idxValidationRes = new ValidateIndexesPartitionResult(
-1, -1, true, consId, idx.getName());
boolean enoughIssues = false;
Cursor cursor = null;
try {
cursor = idx.find((Session)null, null, null);
if (cursor == null)
throw new IgniteCheckedException("Can't iterate through index: " + idx);
}
catch (Throwable t) {
IndexValidationIssue is = new IndexValidationIssue(null, ctx.name(), idx.getName(), t);
log.error("Find in index failed: " + is.toString());
idxValidationRes.reportIssue(is);
enoughIssues = true;
}
final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0;
long current = 0;
long processedNumber = 0;
KeyCacheObject previousKey = null;
while (!enoughIssues) {
KeyCacheObject h2key = null;
try {
try {
if (!cursor.next())
break;
}
catch (DbException e) {
if (X.hasCause(e, CorruptedTreeException.class))
throw new IgniteCheckedException("Key is present in SQL index, but is missing in corresponding " +
"data page. Previous successfully read key: " +
CacheObjectUtils.unwrapBinaryIfNeeded(ctx.cacheObjectContext(), previousKey, true, true),
X.cause(e, CorruptedTreeException.class)
);
throw e;
}
H2CacheRow h2Row = (H2CacheRow)cursor.get();
if (skipConditions) {
if (bothSkipConditions) {
if (processedNumber > checkFirst)
break;
else if (current++ % checkThrough > 0)
continue;
else
processedNumber++;
}
else {
if (checkFirst > 0) {
if (current++ > checkFirst)
break;
}
else {
if (current++ % checkThrough > 0)
continue;
}
}
}
h2key = (KeyCacheObject)h2Row.key();
if (h2Row.link() != 0L) {
CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key);
if (cacheDataStoreRow == null)
throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree.");
}
else
throw new IgniteCheckedException("Invalid index row, possibly deleted " + h2Row);
}
catch (Throwable t) {
Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
ctx.cacheObjectContext(), h2key, true, true);
IndexValidationIssue is = new IndexValidationIssue(
String.valueOf(o), ctx.name(), idx.getName(), t);
log.error("Failed to lookup key: " + is.toString());
enoughIssues |= idxValidationRes.reportIssue(is);
}
finally {
previousKey = h2key;
}
}
CacheGroupContext group = ctx.group();
String uniqueIdxName = String.format(
"[cacheGroup=%s, cacheGroupId=%s, cache=%s, cacheId=%s, idx=%s]",
group.name(),
group.groupId(),
ctx.name(),
ctx.cacheId(),
idx.getName()
);
processedIndexes.incrementAndGet();
printProgressOfIndexValidationIfNeeded();
return Collections.singletonMap(uniqueIdxName, idxValidationRes);
}
/**
* @param e Future result exception.
* @return Unwrapped exception.
*/
private IgniteException unwrapFutureException(Exception e) {
assert e instanceof InterruptedException || e instanceof ExecutionException : "Expecting either InterruptedException " +
"or ExecutionException";
if (e instanceof InterruptedException)
return new IgniteInterruptedException((InterruptedException)e);
else if (e.getCause() instanceof IgniteException)
return (IgniteException)e.getCause();
else
return new IgniteException(e.getCause());
}
}