blob: 29eb29951b90908d57761bb030e87b17000ca046 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.DataIntegrityMetadata.FileDigestValidator;
import org.apache.cassandra.io.util.FileInputStreamPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilterSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.IFilter;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.TimeUUID;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.LongPredicate;
import org.apache.cassandra.io.util.File;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
public class Verifier implements Closeable
{
private final ColumnFamilyStore cfs;
private final SSTableReader sstable;
private final CompactionController controller;
private final ReadWriteLock fileAccessLock;
private final RandomAccessReader dataFile;
private final RandomAccessReader indexFile;
private final VerifyInfo verifyInfo;
private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
private final Options options;
private final boolean isOffline;
/**
* Given a keyspace, return the set of local and pending token ranges. By default {@link StorageService#getLocalAndPendingRanges(String)}
* is expected, but for the standalone verifier case we can't use that, so this is here to allow the CLI to provide
* the token ranges.
*/
private final Function<String, ? extends Collection<Range<Token>>> tokenLookup;
private int goodRows;
private final OutputHandler outputHandler;
private FileDigestValidator validator;
public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, boolean isOffline, Options options)
{
this(cfs, sstable, new OutputHandler.LogOutput(), isOffline, options);
}
public Verifier(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline, Options options)
{
this.cfs = cfs;
this.sstable = sstable;
this.outputHandler = outputHandler;
this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(cfs.metadata(), sstable.descriptor.version, sstable.header);
this.controller = new VerifyController(cfs);
this.fileAccessLock = new ReentrantReadWriteLock();
this.dataFile = isOffline
? sstable.openDataReader()
: sstable.openDataReader(CompactionManager.instance.getRateLimiter());
this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
this.verifyInfo = new VerifyInfo(dataFile, sstable, fileAccessLock.readLock());
this.options = options;
this.isOffline = isOffline;
this.tokenLookup = options.tokenLookup;
}
public void verify()
{
boolean extended = options.extendedVerification;
long rowStart = 0;
outputHandler.output(String.format("Verifying %s (%s)", sstable, FBUtilities.prettyPrintMemory(dataFile.length())));
if (options.checkVersion && !sstable.descriptor.version.isLatestVersion())
{
String msg = String.format("%s is not the latest version, run upgradesstables", sstable);
outputHandler.output(msg);
// don't use markAndThrow here because we don't want a CorruptSSTableException for this.
throw new RuntimeException(msg);
}
outputHandler.output(String.format("Deserializing sstable metadata for %s ", sstable));
try
{
EnumSet<MetadataType> types = EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER);
Map<MetadataType, MetadataComponent> sstableMetadata = sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, types);
if (sstableMetadata.containsKey(MetadataType.VALIDATION) &&
!((ValidationMetadata)sstableMetadata.get(MetadataType.VALIDATION)).partitioner.equals(sstable.getPartitioner().getClass().getCanonicalName()))
throw new IOException("Partitioner does not match validation metadata");
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t, false);
}
try
{
outputHandler.debug("Deserializing index for "+sstable);
deserializeIndex(sstable);
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t);
}
try
{
outputHandler.debug("Deserializing index summary for "+sstable);
deserializeIndexSummary(sstable);
}
catch (Throwable t)
{
outputHandler.output("Index summary is corrupt - if it is removed it will get rebuilt on startup "+sstable.descriptor.filenameFor(Component.SUMMARY));
outputHandler.warn(t);
markAndThrow(t, false);
}
try
{
outputHandler.debug("Deserializing bloom filter for "+sstable);
deserializeBloomFilter(sstable);
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t);
}
if (options.checkOwnsTokens && !isOffline && !(cfs.getPartitioner() instanceof LocalPartitioner))
{
outputHandler.debug("Checking that all tokens are owned by the current node");
try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata()))
{
List<Range<Token>> ownedRanges = Range.normalize(tokenLookup.apply(cfs.metadata.keyspace));
if (ownedRanges.isEmpty())
return;
RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
while (iter.hasNext())
{
DecoratedKey key = iter.next();
rangeOwnHelper.validate(key);
}
}
catch (Throwable t)
{
outputHandler.warn(t);
markAndThrow(t);
}
}
if (options.quick)
return;
// Verify will use the Digest files, which works for both compressed and uncompressed sstables
outputHandler.output(String.format("Checking computed hash of %s ", sstable));
try
{
validator = null;
if (new File(sstable.descriptor.filenameFor(Component.DIGEST)).exists())
{
validator = DataIntegrityMetadata.fileDigestValidator(sstable.descriptor);
validator.validate();
}
else
{
outputHandler.output("Data digest missing, assuming extended verification of disk values");
extended = true;
}
}
catch (IOException e)
{
outputHandler.warn(e);
markAndThrow(e);
}
finally
{
FileUtils.closeQuietly(validator);
}
if (!extended)
return;
outputHandler.output("Extended Verify requested, proceeding to inspect values");
try
{
ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
{
long firstRowPositionFromIndex = rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
if (firstRowPositionFromIndex != 0)
markAndThrow(new RuntimeException("firstRowPositionFromIndex != 0: "+firstRowPositionFromIndex));
}
List<Range<Token>> ownedRanges = isOffline ? Collections.emptyList() : Range.normalize(tokenLookup.apply(cfs.metadata().keyspace));
RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(ownedRanges);
DecoratedKey prevKey = null;
while (!dataFile.isEOF())
{
if (verifyInfo.isStopRequested())
throw new CompactionInterruptedException(verifyInfo.getCompactionInfo());
rowStart = dataFile.getFilePointer();
outputHandler.debug("Reading row at " + rowStart);
DecoratedKey key = null;
try
{
key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile));
}
catch (Throwable th)
{
throwIfFatal(th);
// check for null key below
}
if (options.checkOwnsTokens && ownedRanges.size() > 0 && !(cfs.getPartitioner() instanceof LocalPartitioner))
{
try
{
rangeOwnHelper.validate(key);
}
catch (Throwable t)
{
outputHandler.warn(String.format("Key %s in sstable %s not owned by local ranges %s", key, sstable, ownedRanges), t);
markAndThrow(t);
}
}
ByteBuffer currentIndexKey = nextIndexKey;
long nextRowPositionFromIndex = 0;
try
{
nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
nextRowPositionFromIndex = indexFile.isEOF()
? dataFile.length()
: rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
}
catch (Throwable th)
{
markAndThrow(th);
}
long dataStart = dataFile.getFilePointer();
long dataStartFromIndex = currentIndexKey == null
? -1
: rowStart + 2 + currentIndexKey.remaining();
long dataSize = nextRowPositionFromIndex - dataStartFromIndex;
// avoid an NPE if key is null
String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey());
outputHandler.debug(String.format("row %s is %s", keyName, FBUtilities.prettyPrintMemory(dataSize)));
assert currentIndexKey != null || indexFile.isEOF();
try
{
if (key == null || dataSize > dataFile.length())
markAndThrow(new RuntimeException(String.format("key = %s, dataSize=%d, dataFile.length() = %d", key, dataSize, dataFile.length())));
//mimic the scrub read path, intentionally unused
try (UnfilteredRowIterator iterator = SSTableIdentityIterator.create(sstable, dataFile, key))
{
}
if ( (prevKey != null && prevKey.compareTo(key) > 0) || !key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex )
markAndThrow(new RuntimeException("Key out of order: previous = "+prevKey + " : current = " + key));
goodRows++;
prevKey = key;
outputHandler.debug(String.format("Row %s at %s valid, moving to next row at %s ", goodRows, rowStart, nextRowPositionFromIndex));
dataFile.seek(nextRowPositionFromIndex);
}
catch (Throwable th)
{
markAndThrow(th);
}
}
}
catch (Throwable t)
{
throw Throwables.propagate(t);
}
finally
{
controller.close();
}
outputHandler.output("Verify of " + sstable + " succeeded. All " + goodRows + " rows read successfully");
}
/**
* Use the fact that check(..) is called with sorted tokens - we keep a pointer in to the normalized ranges
* and only bump the pointer if the key given is out of range. This is done to avoid calling .contains(..) many
* times for each key (with vnodes for example)
*/
@VisibleForTesting
public static class RangeOwnHelper
{
private final List<Range<Token>> normalizedRanges;
private int rangeIndex = 0;
private DecoratedKey lastKey;
public RangeOwnHelper(List<Range<Token>> normalizedRanges)
{
this.normalizedRanges = normalizedRanges;
Range.assertNormalized(normalizedRanges);
}
/**
* check if the given key is contained in any of the given ranges
*
* Must be called in sorted order - key should be increasing
*
* @param key the key
* @throws RuntimeException if the key is not contained
*/
public void validate(DecoratedKey key)
{
if (!check(key))
throw new RuntimeException("Key " + key + " is not contained in the given ranges");
}
/**
* check if the given key is contained in any of the given ranges
*
* Must be called in sorted order - key should be increasing
*
* @param key the key
* @return boolean
*/
public boolean check(DecoratedKey key)
{
assert lastKey == null || key.compareTo(lastKey) > 0;
lastKey = key;
if (normalizedRanges.isEmpty()) // handle tests etc where we don't have any ranges
return true;
if (rangeIndex > normalizedRanges.size() - 1)
throw new IllegalStateException("RangeOwnHelper can only be used to find the first out-of-range-token");
while (!normalizedRanges.get(rangeIndex).contains(key.getToken()))
{
rangeIndex++;
if (rangeIndex > normalizedRanges.size() - 1)
return false;
}
return true;
}
}
private void deserializeIndex(SSTableReader sstable) throws IOException
{
try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))))
{
long indexSize = primaryIndex.length();
while ((primaryIndex.getFilePointer()) != indexSize)
{
ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
RowIndexEntry.Serializer.skip(primaryIndex, sstable.descriptor.version);
}
}
}
private void deserializeIndexSummary(SSTableReader sstable) throws IOException
{
File file = new File(sstable.descriptor.filenameFor(Component.SUMMARY));
TableMetadata metadata = cfs.metadata();
try (DataInputStream iStream = new DataInputStream(Files.newInputStream(file.toPath())))
{
try (IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream,
cfs.getPartitioner(),
metadata.params.minIndexInterval,
metadata.params.maxIndexInterval))
{
ByteBufferUtil.readWithLength(iStream);
ByteBufferUtil.readWithLength(iStream);
}
}
}
private void deserializeBloomFilter(SSTableReader sstable) throws IOException
{
File bfPath = new File(sstable.descriptor.filenameFor(Component.FILTER));
if (bfPath.exists())
{
try (FileInputStreamPlus stream = bfPath.newInputStream();
IFilter bf = BloomFilterSerializer.deserialize(stream, sstable.descriptor.version.hasOldBfFormat()))
{
}
}
}
public void close()
{
fileAccessLock.writeLock().lock();
try
{
FileUtils.closeQuietly(dataFile);
FileUtils.closeQuietly(indexFile);
}
finally
{
fileAccessLock.writeLock().unlock();
}
}
private void throwIfFatal(Throwable th)
{
if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError))
throw (Error) th;
}
private void markAndThrow(Throwable cause)
{
markAndThrow(cause, true);
}
private void markAndThrow(Throwable cause, boolean mutateRepaired)
{
if (mutateRepaired && options.mutateRepairStatus) // if we are able to mutate repaired flag, an incremental repair should be enough
{
try
{
sstable.mutateRepairedAndReload(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
}
catch(IOException ioe)
{
outputHandler.output("Error mutating repairedAt for SSTable " + sstable.getFilename() + ", as part of markAndThrow");
}
}
Exception e = new Exception(String.format("Invalid SSTable %s, please force %srepair", sstable.getFilename(), (mutateRepaired && options.mutateRepairStatus) ? "" : "a full "), cause);
if (options.invokeDiskFailurePolicy)
throw new CorruptSSTableException(e, sstable.getFilename());
else
throw new RuntimeException(e);
}
public CompactionInfo.Holder getVerifyInfo()
{
return verifyInfo;
}
private static class VerifyInfo extends CompactionInfo.Holder
{
private final RandomAccessReader dataFile;
private final SSTableReader sstable;
private final TimeUUID verificationCompactionId;
private final Lock fileReadLock;
public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable, Lock fileReadLock)
{
this.dataFile = dataFile;
this.sstable = sstable;
this.fileReadLock = fileReadLock;
verificationCompactionId = nextTimeUUID();
}
public CompactionInfo getCompactionInfo()
{
fileReadLock.lock();
try
{
return new CompactionInfo(sstable.metadata(),
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
verificationCompactionId,
ImmutableSet.of(sstable));
}
catch (Exception e)
{
throw new RuntimeException();
}
finally
{
fileReadLock.unlock();
}
}
public boolean isGlobal()
{
return false;
}
}
private static class VerifyController extends CompactionController
{
public VerifyController(ColumnFamilyStore cfs)
{
super(cfs, Integer.MAX_VALUE);
}
@Override
public LongPredicate getPurgeEvaluator(DecoratedKey key)
{
return time -> false;
}
}
public static Options.Builder options()
{
return new Options.Builder();
}
public static class Options
{
public final boolean invokeDiskFailurePolicy;
public final boolean extendedVerification;
public final boolean checkVersion;
public final boolean mutateRepairStatus;
public final boolean checkOwnsTokens;
public final boolean quick;
public final Function<String, ? extends Collection<Range<Token>>> tokenLookup;
private Options(boolean invokeDiskFailurePolicy, boolean extendedVerification, boolean checkVersion, boolean mutateRepairStatus, boolean checkOwnsTokens, boolean quick, Function<String, ? extends Collection<Range<Token>>> tokenLookup)
{
this.invokeDiskFailurePolicy = invokeDiskFailurePolicy;
this.extendedVerification = extendedVerification;
this.checkVersion = checkVersion;
this.mutateRepairStatus = mutateRepairStatus;
this.checkOwnsTokens = checkOwnsTokens;
this.quick = quick;
this.tokenLookup = tokenLookup;
}
@Override
public String toString()
{
return "Options{" +
"invokeDiskFailurePolicy=" + invokeDiskFailurePolicy +
", extendedVerification=" + extendedVerification +
", checkVersion=" + checkVersion +
", mutateRepairStatus=" + mutateRepairStatus +
", checkOwnsTokens=" + checkOwnsTokens +
", quick=" + quick +
'}';
}
public static class Builder
{
private boolean invokeDiskFailurePolicy = false; // invoking disk failure policy can stop the node if we find a corrupt stable
private boolean extendedVerification = false;
private boolean checkVersion = false;
private boolean mutateRepairStatus = false; // mutating repair status can be dangerous
private boolean checkOwnsTokens = false;
private boolean quick = false;
private Function<String, ? extends Collection<Range<Token>>> tokenLookup = StorageService.instance::getLocalAndPendingRanges;
public Builder invokeDiskFailurePolicy(boolean param)
{
this.invokeDiskFailurePolicy = param;
return this;
}
public Builder extendedVerification(boolean param)
{
this.extendedVerification = param;
return this;
}
public Builder checkVersion(boolean param)
{
this.checkVersion = param;
return this;
}
public Builder mutateRepairStatus(boolean param)
{
this.mutateRepairStatus = param;
return this;
}
public Builder checkOwnsTokens(boolean param)
{
this.checkOwnsTokens = param;
return this;
}
public Builder quick(boolean param)
{
this.quick = param;
return this;
}
public Builder tokenLookup(Function<String, ? extends Collection<Range<Token>>> tokenLookup)
{
this.tokenLookup = tokenLookup;
return this;
}
public Options build()
{
return new Options(invokeDiskFailurePolicy, extendedVerification, checkVersion, mutateRepairStatus, checkOwnsTokens, quick, tokenLookup);
}
}
}
}