blob: a6c9fa9ef2e2bea1cebe266e12a0a5a2e68a0106 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.repair;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.metrics.TopPartitionTracker;
import org.apache.cassandra.repair.state.ValidationState;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
public class ValidationManager
{
private static final Logger logger = LoggerFactory.getLogger(ValidationManager.class);
public static final ValidationManager instance = new ValidationManager();
private ValidationManager() {}
private static MerkleTrees createMerkleTrees(ValidationPartitionIterator validationIterator, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
{
MerkleTrees trees = new MerkleTrees(cfs.getPartitioner());
long allPartitions = validationIterator.estimatedPartitions();
Map<Range<Token>, Long> rangePartitionCounts = validationIterator.getRangePartitionCounts();
// The repair coordinator must hold RF trees in memory at once, so a given validation compaction can only
// use 1 / RF of the allowed space.
long availableBytes = (DatabaseDescriptor.getRepairSessionSpaceInMiB() * 1048576) /
cfs.keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
for (Range<Token> range : ranges)
{
long numPartitions = rangePartitionCounts.get(range);
double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0;
// determine max tree depth proportional to range size to avoid blowing up memory with multiple tress,
// capping at a depth that does not exceed our memory budget (CASSANDRA-11390, CASSANDRA-14096)
int rangeAvailableBytes = Math.max(1, (int) (rangeOwningRatio * availableBytes));
// Try to estimate max tree depth that fits the space budget assuming hashes of 256 bits = 32 bytes
// note that estimatedMaxDepthForBytes cannot return a number lower than 1
int estimatedMaxDepth = MerkleTree.estimatedMaxDepthForBytes(cfs.getPartitioner(), rangeAvailableBytes, 32);
int maxDepth = rangeOwningRatio > 0
? Math.min(estimatedMaxDepth, DatabaseDescriptor.getRepairSessionMaxTreeDepth())
: 0;
// determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263)
int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
trees.addMerkleTree((int) Math.pow(2, depth), range);
}
if (logger.isDebugEnabled())
{
// MT serialize may take time
logger.debug("Created {} merkle trees with merkle trees size {}, {} partitions, {} bytes", trees.ranges().size(), trees.size(), allPartitions, MerkleTrees.serializer.serializedSize(trees, 0));
}
return trees;
}
private static ValidationPartitionIterator getValidationIterator(TableRepairManager repairManager, Validator validator, TopPartitionTracker.Collector topPartitionCollector) throws IOException, NoSuchRepairSessionException
{
RepairJobDesc desc = validator.desc;
return repairManager.getValidationIterator(desc.ranges, desc.parentSessionId, desc.sessionId, validator.isIncremental, validator.nowInSec, topPartitionCollector);
}
/**
* Performs a readonly "compaction" of all sstables in order to validate complete rows,
* but without writing the merge result
*/
@SuppressWarnings("resource")
private void doValidation(ColumnFamilyStore cfs, Validator validator) throws IOException, NoSuchRepairSessionException
{
// this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
// mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work,
// particularly in the scenario where a validation is submitted before the drop, and there are compactions
// started prior to the drop keeping some sstables alive. Since validationCompaction can run
// concurrently with other compactions, it would otherwise go ahead and scan those again.
ValidationState state = validator.state;
if (!cfs.isValid())
{
state.phase.skip(String.format("Table %s is not valid", cfs));
return;
}
TopPartitionTracker.Collector topPartitionCollector = null;
if (cfs.topPartitions != null && DatabaseDescriptor.topPartitionsEnabled() && isTopPartitionSupported(validator))
topPartitionCollector = new TopPartitionTracker.Collector(validator.desc.ranges);
// Create Merkle trees suitable to hold estimated partitions for the given ranges.
// We blindly assume that a partition is evenly distributed on all sstables for now.
long start = nanoTime();
try (ValidationPartitionIterator vi = getValidationIterator(cfs.getRepairManager(), validator, topPartitionCollector))
{
state.phase.start(vi.estimatedPartitions(), vi.getEstimatedBytes());
MerkleTrees trees = createMerkleTrees(vi, validator.desc.ranges, cfs);
// validate the CF as we iterate over it
validator.prepare(cfs, trees, topPartitionCollector);
while (vi.hasNext())
{
try (UnfilteredRowIterator partition = vi.next())
{
validator.add(partition);
state.partitionsProcessed++;
state.bytesRead = vi.getBytesRead();
if (state.partitionsProcessed % 1024 == 0) // update every so often
state.updated();
}
}
validator.complete();
}
finally
{
cfs.metric.bytesValidated.update(state.estimatedTotalBytes);
cfs.metric.partitionsValidated.update(state.partitionsProcessed);
if (topPartitionCollector != null)
cfs.topPartitions.merge(topPartitionCollector);
}
if (logger.isDebugEnabled())
{
long duration = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start);
logger.debug("Validation of {} partitions (~{}) finished in {} msec, for {}",
state.partitionsProcessed,
FBUtilities.prettyPrintMemory(state.estimatedTotalBytes),
duration,
validator.desc);
}
}
private static boolean isTopPartitionSupported(Validator validator)
{
// supported: --validate, --full, --full --preview
switch (validator.getPreviewKind())
{
case NONE:
return !validator.isIncremental;
case ALL:
case REPAIRED:
return true;
case UNREPAIRED:
return false;
default:
throw new AssertionError("Unknown preview kind: " + validator.getPreviewKind());
}
}
/**
* Does not mutate data, so is not scheduled.
*/
public Future<?> submitValidation(ColumnFamilyStore cfs, Validator validator)
{
Callable<Object> validation = new Callable<Object>()
{
public Object call() throws IOException
{
try (TableMetrics.TableTimer.Context c = cfs.metric.validationTime.time())
{
doValidation(cfs, validator);
}
catch (PreviewRepairConflictWithIncrementalRepairException | NoSuchRepairSessionException | CompactionInterruptedException e)
{
validator.fail(e);
logger.warn(e.getMessage());
}
catch (Throwable e)
{
// we need to inform the remote end of our failure, otherwise it will hang on repair forever
validator.fail(e);
logger.error("Validation failed.", e);
throw e;
}
return this;
}
};
return cfs.getRepairManager().submitValidation(validation);
}
}