blob: 7bbd9f19b9b1522f1aa452f59fd963000b3ca4bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.verify;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg;
import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Task for collection checksums primary and backup partitions of specified caches. <br> Argument: Set of cache names,
* 'null' will trigger verification for all caches. <br> Result: {@link IdleVerifyDumpResult} with all found partitions.
* <br> Works properly only on idle cluster - there may be false positive conflict reports if data in cluster is being
* concurrently updated.
*/
@GridInternal
public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdleVerifyTaskArg, String> {
/** */
private static final long serialVersionUID = 0L;
/** Time formatter for dump file name. */
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH-mm-ss_SSS");
/** Visible for testing. */
public static final String IDLE_DUMP_FILE_PREMIX = "idle-dump-";
/** Delegate for map execution */
private final VerifyBackupPartitionsTaskV2 delegate = new VerifyBackupPartitionsTaskV2();
/** */
private VisorIdleVerifyDumpTaskArg taskArg;
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws IgniteException {
if (arg instanceof VisorIdleVerifyDumpTaskArg)
taskArg = (VisorIdleVerifyDumpTaskArg)arg;
return delegate.map(subgrid, arg);
}
/** {@inheritDoc} */
@Nullable @Override public String reduce(List<ComputeJobResult> results)
throws IgniteException {
Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new TreeMap<>(buildPartitionKeyComparator());
for (ComputeJobResult res : results) {
Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData();
for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
clusterHashes
.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
.add(e.getValue());
}
}
Comparator<PartitionHashRecordV2> recordComp = buildRecordComparator().reversed();
Map<PartitionKeyV2, List<PartitionHashRecordV2>> partitions = new LinkedHashMap<>();
int skippedRecords = 0;
for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : clusterHashes.entrySet()) {
if (needToAdd(entry.getValue())) {
entry.getValue().sort(recordComp);
partitions.put(entry.getKey(), entry.getValue());
}
else
skippedRecords++;
}
return writeHashes(partitions, delegate.reduce(results), skippedRecords);
}
/**
* Checking conditions for adding given record to result.
*
* @param records records to check.
* @return {@code true} if this records should be add to result and {@code false} otherwise.
*/
private boolean needToAdd(List<PartitionHashRecordV2> records) {
if (records.isEmpty() || (taskArg != null && !taskArg.isSkipZeros()))
return true;
PartitionHashRecordV2 record = records.get(0);
if (record.updateCounter() != 0 || record.size() != 0)
return true;
int firstHash = record.partitionHash();
for (int i = 1; i < records.size(); i++) {
record = records.get(i);
if (record.partitionHash() != firstHash || record.updateCounter() != 0 || record.size() != 0)
return true;
}
return false;
}
/**
* @param partitions Dump result.
* @return Path where results are written.
* @throws IgniteException If failed to write the file.
*/
private String writeHashes(
Map<PartitionKeyV2, List<PartitionHashRecordV2>> partitions,
IdleVerifyResultV2 conflictRes,
int skippedRecords
) throws IgniteException {
File workDir = ignite.configuration().getWorkDirectory() == null
? new File("/tmp")
: new File(ignite.configuration().getWorkDirectory());
File out = new File(workDir, IDLE_DUMP_FILE_PREMIX + LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
ignite.log().info("IdleVerifyDumpTask will write output to " + out.getAbsolutePath());
try (BufferedWriter writer = new BufferedWriter(new FileWriter(out))) {
try {
writer.write("idle_verify check has finished, found " + partitions.size() + " partitions\n");
if (skippedRecords > 0)
writer.write(skippedRecords + " partitions was skipped\n");
if (!F.isEmpty(partitions)) {
writer.write("Cluster partitions:\n");
for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : partitions.entrySet()) {
writer.write("Partition: " + entry.getKey() + "\n");
writer.write("Partition instances: " + entry.getValue() + "\n");
}
writer.write("\n\n-----------------------------------\n\n");
conflictRes.print(str -> {
try {
writer.write(str);
}
catch (IOException e) {
throw new IgniteException("Failed to write partitions conflict.", e);
}
});
}
}
finally {
writer.flush();
}
ignite.log().info("IdleVerifyDumpTask successfully written dump to '" + out.getAbsolutePath() + "'");
}
catch (IOException | IgniteException e) {
ignite.log().error("Failed to write dump file: " + out.getAbsolutePath(), e);
throw new IgniteException(e);
}
return out.getAbsolutePath();
}
/**
* @return Comparator for {@link PartitionHashRecordV2}.
*/
@NotNull private Comparator<PartitionHashRecordV2> buildRecordComparator() {
return (o1, o2) -> {
int compare = Boolean.compare(o1.isPrimary(), o2.isPrimary());
if (compare != 0)
return compare;
return o1.consistentId().toString().compareTo(o2.consistentId().toString());
};
}
/**
* @return Comparator for {@link PartitionKeyV2}.
*/
@NotNull private Comparator<PartitionKeyV2> buildPartitionKeyComparator() {
return (o1, o2) -> {
int compare = Integer.compare(o1.groupId(), o2.groupId());
if (compare != 0)
return compare;
return Integer.compare(o1.partitionId(), o2.partitionId());
};
}
}