blob: f75a2b6c70fc9ba1f878cb244056b1d45668e6a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.verify;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyMap;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.GRID_NOT_IDLE_MSG;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
/**
* Task for comparing update counters and checksums between primary and backup partitions of specified caches.
* <br>
* Argument: Set of cache names, 'null' will trigger verification for all caches.
* <br>
* Result: {@link IdleVerifyResultV2} with conflict partitions.
* <br>
* Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being
* concurrently updated.
*/
@GridInternal
public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVerifyTaskArg, IdleVerifyResultV2> {
/** First version of Ignite that is capable of executing Idle Verify V2. */
public static final IgniteProductVersion V2_SINCE_VER = IgniteProductVersion.fromString("2.5.3");
/** Injected logger. */
@LoggerResource
private IgniteLogger log;
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(
List<ClusterNode> subgrid,
VisorIdleVerifyTaskArg arg
) throws IgniteException {
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (ClusterNode node : subgrid)
jobs.put(new VerifyBackupPartitionsJobV2(arg), node);
return jobs;
}
/** {@inheritDoc} */
@Nullable @Override public IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
return reduce0(results);
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(
ComputeJobResult res,
List<ComputeJobResult> rcvd
) throws IgniteException {
try {
ComputeJobResultPolicy superRes = super.result(res, rcvd);
// Deny failover.
if (superRes == ComputeJobResultPolicy.FAILOVER) {
superRes = ComputeJobResultPolicy.WAIT;
if (log != null) {
log.warning("VerifyBackupPartitionsJobV2 failed on node " +
"[consistentId=" + res.getNode().consistentId() + "]", res.getException());
}
}
return superRes;
}
catch (IgniteException e) {
return ComputeJobResultPolicy.WAIT;
}
}
/**
* @param results Received results of broadcast remote requests.
* @return Idle verify job result constructed from results of remote executions.
*/
public static IdleVerifyResultV2 reduce0(List<ComputeJobResult> results) {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new HashMap<>();
Map<ClusterNode, Exception> ex = new HashMap<>();
for (ComputeJobResult res : results) {
if (res.getException() != null) {
ex.put(res.getNode(), res.getException());
continue;
}
Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData();
for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
List<PartitionHashRecordV2> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());
records.add(e.getValue());
}
}
if (results.size() != ex.size())
return new IdleVerifyResultV2(clusterHashes, ex);
else
return new IdleVerifyResultV2(ex);
}
/**
* Job that collects update counters and hashes of local partitions.
*/
private static class VerifyBackupPartitionsJobV2 extends ComputeJobAdapter {
/** */
private static final long serialVersionUID = 0L;
/** Ignite instance. */
@IgniteInstanceResource
private IgniteEx ignite;
/** Injected logger. */
@LoggerResource
private IgniteLogger log;
/** Idle verify arguments. */
private VisorIdleVerifyTaskArg arg;
/** Counter of processed partitions. */
private final AtomicInteger completionCntr = new AtomicInteger(0);
/**
* @param arg Argument.
*/
public VerifyBackupPartitionsJobV2(VisorIdleVerifyTaskArg arg) {
this.arg = arg;
}
/** {@inheritDoc} */
@Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
try {
ignite.context().cache().context().database().waitForCheckpoint("VerifyBackupPartitions");
}
catch (IgniteCheckedException e) {
throw new IgniteException(
"Failed to wait for checkpoint before executing verify backup partitions task", e);
}
Set<Integer> grpIds = getGroupIds();
completionCntr.set(0);
List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> partHashCalcFuts =
calcPartitionHashAsync(grpIds);
Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
List<IgniteException> exceptions = new ArrayList<>();
long lastProgressLogTs = U.currentTimeMillis();
for (int i = 0; i < partHashCalcFuts.size(); ) {
Future<Map<PartitionKeyV2, PartitionHashRecordV2>> fut = partHashCalcFuts.get(i);
try {
Map<PartitionKeyV2, PartitionHashRecordV2> partHash = fut.get(100, TimeUnit.MILLISECONDS);
res.putAll(partHash);
i++;
}
catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof IgniteException && !(e.getCause() instanceof GridNotIdleException)) {
exceptions.add((IgniteException)e.getCause());
i++;
continue;
}
for (int j = i + 1; j < partHashCalcFuts.size(); j++)
partHashCalcFuts.get(j).cancel(false);
if (e instanceof InterruptedException)
throw new IgniteInterruptedException((InterruptedException)e);
else
throw new IgniteException(e.getCause());
}
catch (TimeoutException ignored) {
if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60 * 1000L) {
lastProgressLogTs = U.currentTimeMillis();
log.warning("idle_verify is still running, processed " + completionCntr.get() + " of " +
partHashCalcFuts.size() + " local partitions");
}
}
}
if (!F.isEmpty(exceptions))
throw new IdleVerifyException(exceptions);
return res;
}
/**
* Class that processes cache filtering chain.
*/
private class CachesFiltering {
/**
* Initially all cache descriptors are included.
*/
private final Set<CacheGroupContext> filteredCacheGroups;
/** */
public CachesFiltering(Collection<CacheGroupContext> cacheGroups) {
filteredCacheGroups = new HashSet<>(cacheGroups);
}
/**
* Applies filtering closure.
*
* @param closure filter
* @return this
*/
public CachesFiltering filter(IgniteInClosure<Set<CacheGroupContext>> closure) {
closure.apply(filteredCacheGroups);
return this;
}
/**
* Returns result set of cache ids.
*
* @return set of filtered cache ids.
*/
public Set<Integer> result() {
Set<Integer> res = new HashSet<>();
for (CacheGroupContext cacheGrp : filteredCacheGroups)
res.add(cacheGrp.groupId());
return res;
}
}
/** */
private List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> calcPartitionHashAsync(
Set<Integer> grpIds
) {
List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> partHashCalcFutures = new ArrayList<>();
for (Integer grpId : grpIds) {
CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
if (grpCtx == null)
continue;
List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
for (GridDhtLocalPartition part : parts)
partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part));
}
return partHashCalcFutures;
}
/** */
private Set<Integer> getGroupIds() {
Collection<CacheGroupContext> cacheGroups = ignite.context().cache().cacheGroups();
Set<Integer> grpIds = new CachesFiltering(cacheGroups)
.filter(this::filterByCacheNames)
.filter(this::filterByCacheFilter)
.filter(this::filterByExcludeCaches)
.result();
if (F.isEmpty(grpIds))
throw new NoMatchingCachesException();
return grpIds;
}
/**
* Filters cache groups by exclude regexps.
*
* @param cachesToFilter cache groups to filter
*/
private void filterByExcludeCaches(Set<CacheGroupContext> cachesToFilter) {
if (!F.isEmpty(arg.excludeCaches())) {
Set<Pattern> excludedNamesPatterns = new HashSet<>();
for (String excluded : arg.excludeCaches())
excludedNamesPatterns.add(Pattern.compile(excluded));
cachesToFilter.removeIf(grp -> doesGrpMatchOneOfPatterns(grp, excludedNamesPatterns));
}
}
/**
* Filters cache groups by cache filter, also removes system (if not specified in filter option) and local
* caches.
*
* @param cachesToFilter cache groups to filter
*/
private void filterByCacheFilter(Set<CacheGroupContext> cachesToFilter) {
cachesToFilter.removeIf(grp -> !doesGrpMatchFilter(grp));
}
/**
* Filters cache groups by whitelist of cache name regexps. By default, all cache groups are included.
*
* @param cachesToFilter cache groups to filter
*/
private void filterByCacheNames(Set<CacheGroupContext> cachesToFilter) {
if (!F.isEmpty(arg.caches())) {
Set<Pattern> cacheNamesPatterns = new HashSet<>();
for (String cacheNameRegexp : arg.caches())
cacheNamesPatterns.add(Pattern.compile(cacheNameRegexp));
cachesToFilter.removeIf(grp -> !doesGrpMatchOneOfPatterns(grp, cacheNamesPatterns));
}
}
/**
* Checks does the given group match filter.
*
* @param grp cache group.
* @return boolean result
*/
private boolean doesGrpMatchFilter(CacheGroupContext grp) {
for (GridCacheContext cacheCtx : grp.caches()) {
DynamicCacheDescriptor desc = ignite.context().cache().cacheDescriptor(cacheCtx.name());
if (isCacheMatchFilter(desc))
return true;
}
return false;
}
/**
* Checks does the name of given cache group or some of the names of its caches match at least one of regexp
* from set.
*
* @param grp cache group
* @param patterns compiled regexp patterns
* @return boolean result
*/
private boolean doesGrpMatchOneOfPatterns(CacheGroupContext grp, Set<Pattern> patterns) {
for (Pattern pattern : patterns) {
if (grp.name() != null && pattern.matcher(grp.name()).matches())
return true;
for (GridCacheContext cacheCtx : grp.caches())
if (cacheCtx.name() != null && pattern.matcher(cacheCtx.name()).matches())
return true;
}
return false;
}
/**
* @param desc Cache descriptor.
*/
private boolean isCacheMatchFilter(DynamicCacheDescriptor desc) {
DataStorageConfiguration dsCfg = ignite.context().config().getDataStorageConfiguration();
CacheConfiguration cc = desc.cacheConfiguration();
switch (arg.cacheFilterEnum()) {
case DEFAULT:
return desc.cacheType().userCache() || !F.isEmpty(arg.caches());
case USER:
return desc.cacheType().userCache();
case SYSTEM:
return !desc.cacheType().userCache();
case NOT_PERSISTENT:
return desc.cacheType().userCache() && !GridCacheUtils.isPersistentCache(cc, dsCfg);
case PERSISTENT:
return desc.cacheType().userCache() && GridCacheUtils.isPersistentCache(cc, dsCfg);
case ALL:
return true;
default:
throw new IgniteException("Illegal cache filter: " + arg.cacheFilterEnum());
}
}
/**
* @param gctx Group context.
* @param part Local partition.
*/
private Future<Map<PartitionKeyV2, PartitionHashRecordV2>> calculatePartitionHashAsync(
final CacheGroupContext gctx,
final GridDhtLocalPartition part
) {
return ForkJoinPool.commonPool().submit(() -> {
Map<PartitionKeyV2, PartitionHashRecordV2> res = emptyMap();
if (!part.reserve())
return res;
try {
PartitionUpdateCounter updCntr = part.dataStore().partUpdateCounter();
PartitionUpdateCounter updateCntrBefore = updCntr == null ? null : updCntr.copy();
if (arg.checkCrc() && gctx.persistenceEnabled()) {
FilePageStoreManager pageStoreMgr =
(FilePageStoreManager)ignite.context().cache().context().pageStore();
checkPartitionsPageCrcSum(() -> (FilePageStore)pageStoreMgr.getStore(gctx.groupId(), part.id()),
part.id(), FLAG_DATA);
}
PartitionKeyV2 key = new PartitionKeyV2(gctx.groupId(), part.id(), gctx.cacheOrGroupName());
PartitionHashRecordV2 hash = calculatePartitionHash(key,
updateCntrBefore == null ? 0 : updateCntrBefore.comparableState(),
ignite.context().discovery().localNode().consistentId(),
part.state(),
part.primary(gctx.topology().readyTopologyVersion()),
part.dataStore().fullSize(),
gctx.offheap().partitionIterator(part.id()));
if (hash != null)
res = Collections.singletonMap(key, hash);
PartitionUpdateCounter updateCntrAfter = part.dataStore().partUpdateCounter();
if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) {
throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + gctx.cacheOrGroupName() +
", grpId=" + gctx.groupId() + ", partId=" + part.id() + "] changed during size " +
"calculation [updCntrBefore=" + updateCntrBefore + ", updCntrAfter=" + updateCntrAfter + "]");
}
}
catch (IgniteCheckedException e) {
U.error(log, "Can't calculate partition hash [grpId=" + gctx.groupId() +
", partId=" + part.id() + "]", e);
throw new IgniteException("Can't calculate partition hash [grpId=" + gctx.groupId() +
", partId=" + part.id() + "]", e);
}
finally {
part.release();
}
completionCntr.incrementAndGet();
return res;
});
}
}
}