blob: fb3cb4e20a9bb3a6f6b3e24a74b42d85c00fb849 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.LongPredicate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IVerifier;
import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.KeyReader;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.TimeUUID;
public abstract class SortedTableVerifier<R extends SSTableReaderWithFilter> implements IVerifier
{
private final static Logger logger = LoggerFactory.getLogger(SortedTableVerifier.class);
protected final ColumnFamilyStore cfs;
protected final R sstable;
protected final ReadWriteLock fileAccessLock;
protected final RandomAccessReader dataFile;
protected final VerifyInfo verifyInfo;
protected final Options options;
protected final boolean isOffline;
/**
* Given a keyspace, return the set of local and pending token ranges. By default {@link StorageService#getLocalAndPendingRanges(String)}
* is expected, but for the standalone verifier case we can't use that, so this is here to allow the CLI to provide
* the token ranges.
*/
protected final Function<String, ? extends Collection<Range<Token>>> tokenLookup;
protected int goodRows;
protected final OutputHandler outputHandler;
public SortedTableVerifier(ColumnFamilyStore cfs, R sstable, OutputHandler outputHandler, boolean isOffline, Options options)
{
this.cfs = cfs;
this.sstable = sstable;
this.outputHandler = outputHandler;
this.fileAccessLock = new ReentrantReadWriteLock();
this.dataFile = isOffline
? sstable.openDataReader()
: sstable.openDataReader(CompactionManager.instance.getRateLimiter());
this.verifyInfo = new VerifyInfo(dataFile, sstable, fileAccessLock.readLock());
this.options = options;
this.isOffline = isOffline;
this.tokenLookup = options.tokenLookup;
}
protected void deserializeBloomFilter(SSTableReader sstable) throws IOException
{
try (IFilter filter = FilterComponent.load(sstable.descriptor)) {
if (filter != null)
logger.trace("Filter loaded for {}", sstable);
}
}
public CompactionInfo.Holder getVerifyInfo()
{
return verifyInfo;
}
protected void markAndThrow(Throwable cause)
{
markAndThrow(cause, true);
}
protected void markAndThrow(Throwable cause, boolean mutateRepaired)
{
if (mutateRepaired && options.mutateRepairStatus) // if we are able to mutate repaired flag, an incremental repair should be enough
{
try
{
sstable.mutateRepairedAndReload(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
}
catch (IOException ioe)
{
outputHandler.output("Error mutating repairedAt for SSTable %s, as part of markAndThrow", sstable.getFilename());
}
}
Exception e = new Exception(String.format("Invalid SSTable %s, please force %srepair", sstable.getFilename(), (mutateRepaired && options.mutateRepairStatus) ? "" : "a full "), cause);
if (options.invokeDiskFailurePolicy)
throw new CorruptSSTableException(e, sstable.getFilename());
else
throw new RuntimeException(e);
}
public void verify()
{
verifySSTableVersion();
verifySSTableMetadata();
verifyIndex();
verifyBloomFilter();
if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() instanceof LocalPartitioner))
{
if (verifyOwnedRanges() == 0)
return;
}
if (options.quick)
return;
if (verifyDigest() && !options.extendedVerification)
return;
verifySSTable();
outputHandler.output("Verify of %s succeeded. All %d rows read successfully", sstable, goodRows);
}
protected void verifyBloomFilter()
{
try
{
outputHandler.debug("Deserializing bloom filter for %s", sstable);
deserializeBloomFilter(sstable);
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t);
}
}
protected void verifySSTableMetadata()
{
outputHandler.output("Deserializing sstable metadata for %s ", sstable);
try
{
StatsComponent statsComponent = StatsComponent.load(sstable.descriptor, MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
if (statsComponent.validationMetadata() != null &&
!statsComponent.validationMetadata().partitioner.equals(sstable.getPartitioner().getClass().getCanonicalName()))
throw new IOException("Partitioner does not match validation metadata");
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t, false);
}
}
protected void verifySSTableVersion()
{
outputHandler.output("Verifying %s (%s)", sstable, FBUtilities.prettyPrintMemory(dataFile.length()));
if (options.checkVersion && !sstable.descriptor.version.isLatestVersion())
{
String msg = String.format("%s is not the latest version, run upgradesstables", sstable);
outputHandler.output(msg);
// don't use markAndThrow here because we don't want a CorruptSSTableException for this.
throw new RuntimeException(msg);
}
}
protected int verifyOwnedRanges()
{
List<Range<Token>> ownedRanges = Collections.emptyList();
outputHandler.debug("Checking that all tokens are owned by the current node");
try (KeyIterator iter = sstable.keyIterator())
{
ownedRanges = Range.normalize(tokenLookup.apply(cfs.metadata.keyspace));
if (ownedRanges.isEmpty())
return 0;
RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
while (iter.hasNext())
{
DecoratedKey key = iter.next();
rangeOwnHelper.validate(key);
}
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t);
}
return ownedRanges.size();
}
protected boolean verifyDigest()
{
boolean passed = true;
// Verify will use the Digest files, which works for both compressed and uncompressed sstables
outputHandler.output("Checking computed hash of %s ", sstable);
try
{
DataIntegrityMetadata.FileDigestValidator validator = sstable.maybeGetDigestValidator();
if (validator != null)
{
validator.validate();
}
else
{
outputHandler.output("Data digest missing, assuming extended verification of disk values");
passed = false;
}
}
catch (IOException e)
{
outputHandler.warn(e);
markAndThrow(e);
}
return passed;
}
protected void verifySSTable()
{
outputHandler.output("Extended Verify requested, proceeding to inspect values");
try (VerifyController verifyController = new VerifyController(cfs);
KeyReader indexIterator = sstable.keyReader())
{
if (indexIterator.dataPosition() != 0)
markAndThrow(new RuntimeException("First row position from index != 0: " + indexIterator.dataPosition()));
List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(tokenLookup.apply(cfs.metadata().keyspace));
RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
DecoratedKey prevKey = null;
while (!dataFile.isEOF())
{
if (verifyInfo.isStopRequested())
throw new CompactionInterruptedException(verifyInfo.getCompactionInfo());
long rowStart = dataFile.getFilePointer();
outputHandler.debug("Reading row at %d", rowStart);
DecoratedKey key = null;
try
{
key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
}
catch (Throwable th)
{
markAndThrow(th);
}
if (options.checkOwnsTokens && ownedRanges.size() > 0 && !(cfs.getPartitioner() instanceof LocalPartitioner))
{
try
{
rangeOwnHelper.validate(key);
}
catch (Throwable t)
{
outputHandler.warn(t, "Key %s in sstable %s not owned by local ranges %s", key, sstable, ownedRanges);
markAndThrow(t);
}
}
ByteBuffer currentIndexKey = indexIterator.key();
long nextRowPositionFromIndex = 0;
try
{
nextRowPositionFromIndex = indexIterator.advance()
? indexIterator.dataPosition()
: dataFile.length();
}
catch (Throwable th)
{
markAndThrow(th);
}
long dataStart = dataFile.getFilePointer();
long dataStartFromIndex = currentIndexKey == null
? -1
: rowStart + 2 + currentIndexKey.remaining();
long dataSize = nextRowPositionFromIndex - dataStartFromIndex;
// avoid an NPE if key is null
String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey());
outputHandler.debug("row %s is %s", keyName, FBUtilities.prettyPrintMemory(dataSize));
try
{
if (key == null || dataSize > dataFile.length())
markAndThrow(new RuntimeException(String.format("key = %s, dataSize=%d, dataFile.length() = %d", key, dataSize, dataFile.length())));
try (UnfilteredRowIterator iterator = SSTableIdentityIterator.create(sstable, dataFile, key))
{
verifyPartition(key, iterator);
}
if ((prevKey != null && prevKey.compareTo(key) > 0) || !key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex)
markAndThrow(new RuntimeException("Key out of order: previous = " + prevKey + " : current = " + key));
goodRows++;
prevKey = key;
outputHandler.debug("Row %s at %s valid, moving to next row at %s ", goodRows, rowStart, nextRowPositionFromIndex);
dataFile.seek(nextRowPositionFromIndex);
}
catch (Throwable th)
{
markAndThrow(th);
}
}
}
catch (Throwable t)
{
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
}
}
protected abstract void verifyPartition(DecoratedKey key, UnfilteredRowIterator iterator);
protected void verifyIndex()
{
try
{
outputHandler.debug("Deserializing index for %s", sstable);
deserializeIndex(sstable);
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t);
}
}
private void deserializeIndex(SSTableReader sstable) throws IOException
{
try (KeyReader it = sstable.keyReader())
{
ByteBuffer last = it.key();
while (it.advance()) last = it.key(); // no-op, just check if index is readable
if (!Objects.equals(last, sstable.last.getKey()))
throw new CorruptSSTableException(new IOException("Failed to read partition index"), it.toString());
}
}
@Override
public void close()
{
fileAccessLock.writeLock().lock();
try
{
FileUtils.closeQuietly(dataFile);
}
finally
{
fileAccessLock.writeLock().unlock();
}
}
/**
* Use the fact that check(...) is called with sorted tokens - we keep a pointer in to the normalized ranges
* and only bump the pointer if the key given is out of range. This is done to avoid calling .contains(..) many
* times for each key (with vnodes for example)
*/
@VisibleForTesting
public static class RangeOwnHelper
{
private final List<Range<Token>> normalizedRanges;
private int rangeIndex = 0;
private DecoratedKey lastKey;
public RangeOwnHelper(List<Range<Token>> normalizedRanges)
{
this.normalizedRanges = normalizedRanges;
Range.assertNormalized(normalizedRanges);
}
/**
* check if the given key is contained in any of the given ranges
* <p>
* Must be called in sorted order - key should be increasing
*
* @param key the key
* @throws RuntimeException if the key is not contained
*/
public void validate(DecoratedKey key)
{
if (!check(key))
throw new RuntimeException("Key " + key + " is not contained in the given ranges");
}
/**
* check if the given key is contained in any of the given ranges
* <p>
* Must be called in sorted order - key should be increasing
*
* @param key the key
* @return boolean
*/
public boolean check(DecoratedKey key)
{
assert lastKey == null || key.compareTo(lastKey) > 0;
lastKey = key;
if (normalizedRanges.isEmpty()) // handle tests etc. where we don't have any ranges
return true;
if (rangeIndex > normalizedRanges.size() - 1)
throw new IllegalStateException("RangeOwnHelper can only be used to find the first out-of-range-token");
while (!normalizedRanges.get(rangeIndex).contains(key.getToken()))
{
rangeIndex++;
if (rangeIndex > normalizedRanges.size() - 1)
return false;
}
return true;
}
}
protected static class VerifyInfo extends CompactionInfo.Holder
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
private final TimeUUID verificationCompactionId;
private final Lock fileReadLock;
public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable, Lock fileReadLock)
{
this.dataFile = dataFile;
this.sstable = sstable;
this.fileReadLock = fileReadLock;
verificationCompactionId = TimeUUID.Generator.nextTimeUUID();
}
public CompactionInfo getCompactionInfo()
{
fileReadLock.lock();
try
{
return new CompactionInfo(sstable.metadata(),
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
verificationCompactionId,
ImmutableSet.of(sstable));
}
catch (Exception e)
{
throw new RuntimeException();
}
finally
{
fileReadLock.unlock();
}
}
public boolean isGlobal()
{
return false;
}
}
protected static class VerifyController extends CompactionController
{
public VerifyController(ColumnFamilyStore cfs)
{
super(cfs, Integer.MAX_VALUE);
}
@Override
public LongPredicate getPurgeEvaluator(DecoratedKey key)
{
return time -> false;
}
}
}