blob: fd34c024482eb119babf010efccd89378634dd46 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredSource;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseable;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
import static org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull;
* An SSTableReader can be constructed in a number of places, but typically is either read from disk at startup, or
* constructed from a flushed memtable, or after compaction to replace some existing sstables. However once created,
* an sstablereader may also be modified.
* <p>
* A reader's {@link OpenReason} describes its current stage in its lifecycle. Note that in parallel to this, there are
* two different Descriptor types; TMPLINK and FINAL; the latter corresponds to {@link OpenReason#NORMAL} state readers
* and all readers that replace a {@link OpenReason#NORMAL} one. TMPLINK is used for {@link OpenReason#EARLY} state
* readers and no others.
* <p>
* When a reader is being compacted, if the result is large its replacement may be opened as {@link OpenReason#EARLY}
* before compaction completes in order to present the result to consumers earlier. In this case the reader will itself
* be changed to a {@link OpenReason#MOVED_START} state, where its start no longer represents its on-disk minimum key.
* This is to permit reads to be directed to only one reader when the two represent the same data.
* The {@link OpenReason#EARLY} file can represent a compaction result that is either partially complete and still
* in-progress, or a complete and immutable sstable that is part of a larger macro compaction action that has not yet
* fully completed.
* <p>
* Currently ALL compaction results at least briefly go through an {@link OpenReason#EARLY} open state prior to completion,
* regardless of if early opening is enabled.
* <p>
* Since a reader can be created multiple times over the same shared underlying resources, and the exact resources it
* shares between each instance differ subtly, we track the lifetime of any underlying resource with its own reference
* count, which each instance takes a {@link Ref} to. Each instance then tracks references to itself, and once these
* all expire it releases all its {@link Ref} to these underlying resources.
* <p>
* There is some shared cleanup behaviour needed only once all readers in a certain stage of their lifecycle
* (i.e. {@link OpenReason#EARLY} or {@link OpenReason#NORMAL} opening), and some that must only occur once all readers
* of any kind over a single logical sstable have expired. These are managed by the {@link InstanceTidier} and
* {@link GlobalTidy} classes at the bottom, and are effectively managed as another resource each instance tracks its
* own {@link Ref} instance to, to ensure all of these resources are cleaned up safely and can be debugged otherwise.
* <p>
* TODO: fill in details about Tracker and lifecycle interactions for tools, and for compaction strategies
public abstract class SSTableReader extends SSTable implements UnfilteredSource, SelfRefCounted<SSTableReader>
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
private static final boolean TRACK_ACTIVITY = CassandraRelevantProperties.DISABLE_SSTABLE_ACTIVITY_TRACKING.getBoolean();
private static final ScheduledExecutorPlus syncExecutor = initSyncExecutor();
private static ScheduledExecutorPlus initSyncExecutor()
if (DatabaseDescriptor.isClientOrToolInitialized())
return null;
// Do NOT start this thread pool in client mode
ScheduledExecutorPlus syncExecutor = executorFactory().scheduled("read-hotness-tracker");
// Immediately remove readMeter sync task when cancelled.
// TODO: should we set this by default on all scheduled executors?
if (syncExecutor instanceof ScheduledThreadPoolExecutor)
((ScheduledThreadPoolExecutor) syncExecutor).setRemoveOnCancelPolicy(true);
return syncExecutor;
private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
public static final Comparator<SSTableReader> maxTimestampAscending = Comparator.comparingLong(SSTableReader::getMaxTimestamp);
public static final Comparator<SSTableReader> maxTimestampDescending = maxTimestampAscending.reversed();
// it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
public static final class UniqueIdentifier
public final UniqueIdentifier instanceId = new UniqueIdentifier();
public static final Comparator<SSTableReader> sstableComparator = Comparator.comparing(o -> o.first);
public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
public static final Comparator<SSTableReader> idComparator = Comparator.comparing(t ->, SSTableIdFactory.COMPARATOR);
public static final Comparator<SSTableReader> idReverseComparator = idComparator.reversed();
public static final Comparator<SSTableReader> sizeComparator = (o1, o2) ->, o2.onDiskLength());
* maxDataAge is a timestamp in local server time (e.g. Global.currentTimeMilli) which represents an upper bound
* to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
* later than maxDataAge.
* <p>
* The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
* <p>
* When a new sstable is flushed, maxDataAge is set to the time of creation.
* When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
* <p>
* The age is in milliseconds since epoc and is local to this host.
public final long maxDataAge;
public enum OpenReason
* <ul>
* <li>From {@code None} - Reader has been read from disk, either at startup or from a flushed memtable</li>
* <li>From {@link #EARLY} - Reader is the final result of a compaction</li>
* <li>From {@link #MOVED_START} - Reader WAS being compacted, but this failed and it has been restored
* to {code NORMAL}status</li>
* </ul>
* <ul>
* <li>From {@code None} - Reader is a compaction replacement that is either incomplete and has been opened
* to represent its partial result status, or has been finished but the compaction it is a part of has not
* yet completed fully</li>
* <li>From {@link #EARLY} - Same as from {@code None}, only it is not the first time it has been
* </ul>
* From:
* <ul>
* <li>From {@link #NORMAL} - Reader has seen low traffic and the amount of memory available for index summaries
* is constrained, so its index summary has been downsampled</li>
* <li>From {@link #METADATA_CHANGE} - Same
* </ul>
* <ul>
* <li>From {@link #NORMAL} - Reader is being compacted. This compaction has not finished, but the compaction
* result is either partially or fully opened, to either partially or fully replace this reader. This reader's
* start key has been updated to represent this, so that reads only hit one or the other reader.</li>
* </ul>
public final OpenReason openReason;
protected final FileHandle dfile;
// technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
// but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
public final AtomicBoolean isSuspect = new AtomicBoolean(false);
// not final since we need to be able to change level on a file.
protected volatile StatsMetadata sstableMetadata;
public final SerializationHeader header;
private final InstanceTidier tidy;
private final Ref<SSTableReader> selfRef;
private RestorableMeter readMeter;
private volatile double crcCheckChance;
public final DecoratedKey first;
public final DecoratedKey last;
public final AbstractBounds<Token> bounds;
* Calculate approximate key count.
* If cardinality estimator is available on all given sstables, then this method use them to estimate
* key count.
* If not, then this uses index summaries.
* @param sstables SSTables to calculate key count
* @return estimated key count
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
long count = -1;
if (Iterables.isEmpty(sstables))
return count;
boolean failed = false;
ICardinality cardinality = null;
for (SSTableReader sstable : sstables)
if (sstable.openReason == OpenReason.EARLY)
CompactionMetadata metadata = StatsComponent.load(sstable.descriptor).compactionMetadata();
// If we can't load the CompactionMetadata, we are forced to estimate the keys using the index
// summary. (CASSANDRA-10676)
if (metadata == null)
logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename());
failed = true;
if (cardinality == null)
cardinality = metadata.cardinalityEstimator;
cardinality = cardinality.merge(metadata.cardinalityEstimator);
catch (IOException e)
logger.warn("Reading cardinality from Statistics.db failed.", e);
failed = true;
catch (CardinalityMergeException e)
logger.warn("Cardinality merge failed.", e);
failed = true;
if (cardinality != null && !failed)
count = cardinality.cardinality();
// if something went wrong above or cardinality is not available, calculate using index summary
if (count < 0)
count = 0;
for (SSTableReader sstable : sstables)
count += sstable.estimatedKeys();
return count;
public static SSTableReader open(SSTable.Owner owner, Descriptor descriptor)
return open(owner, descriptor, null);
public static SSTableReader open(SSTable.Owner owner, Descriptor desc, TableMetadataRef metadata)
return open(owner, desc, null, metadata);
public static SSTableReader open(SSTable.Owner owner, Descriptor descriptor, Set<Component> components, TableMetadataRef metadata)
return open(owner, descriptor, components, metadata, true, false);
// use only for offline or "Standalone" operations
public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs)
return open(cfs, descriptor, components, cfs.metadata, false, true);
// use only for offline or "Standalone" operations
public static SSTableReader openNoValidation(SSTable.Owner owner, Descriptor descriptor, TableMetadataRef metadata)
return open(owner, descriptor, null, metadata, false, true);
* Open SSTable reader to be used in batch mode(such as sstableloader).
public static SSTableReader openForBatch(SSTable.Owner owner, Descriptor descriptor, Set<Component> components, TableMetadataRef metadata)
return open(owner, descriptor, components, metadata, true, true);
* Open an SSTable for reading
* @param owner owning entity
* @param descriptor SSTable to open
* @param components Components included with this SSTable
* @param metadata for this SSTables CF
* @param validate Check SSTable for corruption (limited)
* @param isOffline Whether we are opening this SSTable "offline", for example from an external tool or not for inclusion in queries (validations)
* This stops regenerating BF + Summaries and also disables tracking of hotness for the SSTable.
* @return {@link SSTableReader}
public static SSTableReader open(Owner owner,
Descriptor descriptor,
Set<Component> components,
TableMetadataRef metadata,
boolean validate,
boolean isOffline)
SSTableReaderLoadingBuilder<?, ?> builder = descriptor.getFormat().getReaderFactory().loadingBuilder(descriptor, metadata, components);
return, validate, !isOffline);
public static Collection<SSTableReader> openAll(SSTable.Owner owner, Set<Map.Entry<Descriptor, Set<Component>>> entries,
final TableMetadataRef metadata)
final Collection<SSTableReader> sstables = newBlockingQueue();
ExecutorPlus executor = executorFactory().pooled("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
Runnable runnable = () -> {
SSTableReader sstable;
sstable = open(owner, entry.getKey(), entry.getValue(), metadata);
catch (CorruptSSTableException ex)
logger.error("Corrupt sstable {}; skipping table", entry, ex);
catch (FSError ex)
logger.error("Cannot read sstable {}; file system error, skipping table", entry, ex);
executor.awaitTermination(7, TimeUnit.DAYS);
catch (InterruptedException e)
throw new UncheckedInterruptedException(e);
return sstables;
protected SSTableReader(Builder<?, ?> builder, Owner owner)
super(builder, owner);
this.sstableMetadata = builder.getStatsMetadata();
this.header = builder.getSerializationHeader();
this.dfile = builder.getDataFile();
this.maxDataAge = builder.getMaxDataAge();
this.openReason = builder.getOpenReason();
this.first = builder.getFirst();
this.last = builder.getLast();
this.bounds = AbstractBounds.strictlyWrapsAround(first.getToken(), last.getToken())
? null // this will cause the validation to fail, but the reader is opened with no validation,
// e.g. for scrubbing, we should accept screwed bounds
: AbstractBounds.bounds(first.getToken(), true, last.getToken(), true);
tidy = new InstanceTidier(descriptor, owner);
selfRef = new Ref<>(this, tidy);
public DecoratedKey getFirst()
return first;
public DecoratedKey getLast()
return last;
public AbstractBounds<Token> getBounds()
return Objects.requireNonNull(bounds, "Bounds were not created because the sstable is out of order");
public DataIntegrityMetadata.ChecksumValidator maybeGetChecksumValidator() throws IOException
if (descriptor.fileFor(Components.CRC).exists())
return new DataIntegrityMetadata.ChecksumValidator(descriptor.fileFor(Components.DATA), descriptor.fileFor(Components.CRC));
return null;
public DataIntegrityMetadata.FileDigestValidator maybeGetDigestValidator() throws IOException
if (descriptor.fileFor(Components.DIGEST).exists())
return new DataIntegrityMetadata.FileDigestValidator(descriptor.fileFor(Components.DATA), descriptor.fileFor(Components.DIGEST));
return null;
public static long getTotalBytes(Iterable<SSTableReader> sstables)
long sum = 0;
for (SSTableReader sstable : sstables)
sum += sstable.onDiskLength();
return sum;
public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables)
long sum = 0;
for (SSTableReader sstable : sstables)
sum += sstable.uncompressedLength();
return sum;
public boolean equals(Object that)
return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
public int hashCode()
return this.descriptor.hashCode();
public String getFilename()
return dfile.path();
public void setupOnline()
owner().ifPresent(o -> setCrcCheckChance(o.getCrcCheckChance()));
* Execute provided task with sstable lock to avoid racing with index summary redistribution, SEE CASSANDRA-15861.
* @param task to be guarded by sstable lock
public <R, E extends Exception> R runWithLock(CheckedFunction<Descriptor, R, E> task) throws E
synchronized (
return task.apply(descriptor);
public void setReplaced()
synchronized (
assert !tidy.isReplaced;
tidy.isReplaced = true;
public boolean isReplaced()
synchronized (
return tidy.isReplaced;
* The runnable passed to this method must not be an anonymous or non-static inner class. It can be a lambda or a
* method reference provided that it does not retain a reference chain to this reader.
public void runOnClose(final Runnable runOnClose)
if (runOnClose == null)
synchronized (
final Runnable existing = tidy.runOnClose;
if (existing == null)
tidy.runOnClose = runOnClose;
tidy.runOnClose = () -> {;;
* The method sets fields specific to this {@link SSTableReader} and the parent {@link SSTable} on the provided
* {@link Builder}. The method is intended to be called from the overloaded {@code unbuildTo} method in subclasses.
* @param builder the builder on which the fields should be set
* @param sharedCopy whether the {@link SharedCloseable} resources should be passed as shared copies or directly;
* note that the method will overwrite the fields representing {@link SharedCloseable} only if
* they are not set in the builder yet (the relevant fields in the builder are {@code null}).
* @return the same instance of builder as provided
protected final <B extends Builder<?, B>> B unbuildTo(B builder, boolean sharedCopy)
B b = super.unbuildTo(builder, sharedCopy);
if (builder.getDataFile() == null)
b.setDataFile(sharedCopy ? sharedCopyOrNull(dfile) : dfile);
return b;
public abstract SSTableReader cloneWithRestoredStart(DecoratedKey restoredStart);
public abstract SSTableReader cloneWithNewStart(DecoratedKey newStart);
public RestorableMeter getReadMeter()
return readMeter;
* All the resources which should be released upon closing this sstable reader are registered with in
* {@link GlobalTidy}. This method lets close a provided resource explicitly any time and unregister it from
* {@link GlobalTidy} so that it is not tried to be released twice.
* @param closeable a resource to be closed
protected void closeInternalComponent(AutoCloseable closeable)
synchronized (
boolean removed = tidy.closeables.remove(closeable);
catch (Exception ex)
throw new RuntimeException("Failed to close " + closeable, ex);
* This method is expected to close the components which occupy memory but are not needed when we just want to
* stream the components (for example, when SSTable is opened with SSTableLoader). The method should call
* {@link #closeInternalComponent(AutoCloseable)} for each such component. Leaving the implementation empty is
* valid given there are not such resources to release.
public abstract void releaseInMemoryComponents();
* Perform any validation needed for the reader upon creation before returning it from the {@link Builder}.
public void validate()
if (this.first.compareTo(this.last) > 0 || bounds == null)
throw new CorruptSSTableException(new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last)), getFilename());
* Returns the compression metadata for this sstable. Note that the compression metdata is a resource and should not
* be closed by the caller.
* TODO do not return a closeable resource or return a shared copy
* @throws IllegalStateException if the sstable is not compressed
public CompressionMetadata getCompressionMetadata()
if (!compression)
throw new IllegalStateException(this + " is not compressed");
return dfile.compressionMetadata().get();
* Returns the amount of memory in bytes used off heap by the compression meta-data.
* @return the amount of memory in bytes used off heap by the compression meta-data
public long getCompressionMetadataOffHeapSize()
if (!compression)
return 0;
return getCompressionMetadata().offHeapSize();
* Calculates an estimate of the number of keys in the sstable represented by this reader.
public abstract long estimatedKeys();
* Calculates an estimate of the number of keys for the given ranges in the sstable represented by this reader.
public abstract long estimatedKeysForRanges(Collection<Range<Token>> ranges);
* Returns whether methods like {@link #estimatedKeys()} or {@link #estimatedKeysForRanges(Collection)} can return
* sensible estimations.
public abstract boolean isEstimationInformative();
* Returns sample keys for the provided token range.
public abstract Iterable<DecoratedKey> getKeySamples(final Range<Token> range);
* Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
* @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
public List<PartitionPositionBounds> getPositionsForRanges(Collection<Range<Token>> ranges)
// use the index to determine a minimal section for each range
List<PartitionPositionBounds> positions = new ArrayList<>();
for (Range<Token> range : Range.normalize(ranges))
assert !range.isWrapAround() || range.right.isMinimum();
// truncate the range so it at most covers the sstable
AbstractBounds<PartitionPosition> bounds = Range.makeRowRange(range);
PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
PartitionPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
long left = getPosition(leftBound, Operator.GT);
long right = (rightBound.compareTo(last) > 0)
? uncompressedLength()
: getPosition(rightBound, Operator.GT);
if (left == right)
// empty range
assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
positions.add(new PartitionPositionBounds(left, right));
return positions;
* Retrieves the position while updating the key cache and the stats.
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
* @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
public final long getPosition(PartitionPosition key, Operator op)
return getPosition(key, op, SSTableReadsListener.NOOP_LISTENER);
public final long getPosition(PartitionPosition key, Operator op, SSTableReadsListener listener)
return getPosition(key, op, true, listener);
public final long getPosition(PartitionPosition key,
Operator op,
boolean updateStats)
return getPosition(key, op, updateStats, SSTableReadsListener.NOOP_LISTENER);
* Retrieve a position in data file according to the provided key and operator.
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
* @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
* @param updateStats true if updating stats and cache
* @param listener a listener used to handle internal events
* @return The index entry corresponding to the key, or null if the key is not present
protected long getPosition(PartitionPosition key,
Operator op,
boolean updateStats,
SSTableReadsListener listener)
AbstractRowIndexEntry rie = getRowIndexEntry(key, op, updateStats, listener);
return rie != null ? rie.position : -1;
* Retrieve an index entry for the partition found according to the provided key and operator.
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
* @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
* @param updateStats true if updating stats and cache
* @param listener a listener used to handle internal events
* @return The index entry corresponding to the key, or null if the key is not present
protected abstract AbstractRowIndexEntry getRowIndexEntry(PartitionPosition key,
Operator op,
boolean updateStats,
SSTableReadsListener listener);
public UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, long dataPosition, boolean tombstoneOnly)
return SSTableIdentityIterator.create(this, file, dataPosition, key, tombstoneOnly);
* Returns a {@link KeyReader} over all keys in the sstable.
public abstract KeyReader keyReader() throws IOException;
* Returns a {@link KeyIterator} over all keys in the sstable.
public KeyIterator keyIterator() throws IOException
return new KeyIterator(keyReader(), getPartitioner(), uncompressedLength(), new ReentrantReadWriteLock());
* Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
public abstract DecoratedKey firstKeyBeyond(PartitionPosition token);
* Returns the length in bytes of the (uncompressed) data for this SSTable. For compressed files, this is not
* the same thing as the on disk size (see {@link #onDiskLength()}).
public long uncompressedLength()
return dfile.dataLength();
* The length in bytes of the on disk size for this SSTable. For compressed files, this is not the same thing
* as the data length (see {@link #uncompressedLength()}).
public long onDiskLength()
return dfile.onDiskLength;
public double getCrcCheckChance()
return crcCheckChance;
* Set the value of CRC check chance. The argument supplied is obtained from the property of the owning CFS.
* Called when either the SSTR is initialized, or the CFS's property is updated via JMX
public void setCrcCheckChance(double crcCheckChance)
this.crcCheckChance = crcCheckChance;
dfile.compressionMetadata().ifPresent(metadata -> metadata.parameters.setCrcCheckChance(crcCheckChance));
* Mark the sstable as obsolete, i.e., compacted into newer sstables.
* <p>
* When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere except for
* threads holding a reference.
* <p>
* Calling it multiple times is usually buggy.
public void markObsolete(Runnable tidier)
if (logger.isTraceEnabled())
logger.trace("Marking {} compacted", getFilename());
synchronized (
assert !tidy.isReplaced;
assert == null : this + " was already marked compacted"; = tidier;;
public boolean isMarkedCompacted()
return != null;
public void markSuspect()
if (logger.isTraceEnabled())
logger.trace("Marking {} as a suspect to be excluded from reads.", getFilename());
public void unmarkSuspect()
public boolean isMarkedSuspect()
return isSuspect.get();
* Direct I/O SSTableScanner over a defined range of tokens.
* @param range the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
public ISSTableScanner getScanner(Range<Token> range)
if (range == null)
return getScanner();
return getScanner(Collections.singletonList(range));
* Direct I/O SSTableScanner over the entirety of the sstable..
* @return A Scanner over the full content of the SSTable.
public abstract ISSTableScanner getScanner();
* Direct I/O SSTableScanner over a defined collection of ranges of tokens.
* @param ranges the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges);
* Direct I/O SSTableScanner over an iterator of bounds.
* @param rangeIterator the keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
public abstract ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator);
* Create a {@link FileDataInput} for the data file of the sstable represented by this reader. This method returns
* a newly opened resource which must be closed by the caller.
* @param position the data input will be opened and seek to this position
public FileDataInput getFileDataInput(long position)
return dfile.createReader(position);
* Tests if the sstable contains data newer than the given age param (in localhost currentMillis time).
* This works in conjunction with maxDataAge which is an upper bound on the data in the sstable represented
* by this reader.
* @return {@code true} iff this sstable contains data that's newer than the given timestamp
public boolean newSince(long timestampMillis)
return maxDataAge > timestampMillis;
public void createLinks(String snapshotDirectoryPath)
createLinks(snapshotDirectoryPath, null);
public void createLinks(String snapshotDirectoryPath, RateLimiter rateLimiter)
createLinks(descriptor, components, snapshotDirectoryPath, rateLimiter);
public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath)
createLinks(descriptor, components, snapshotDirectoryPath, null);
public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath, RateLimiter limiter)
for (Component component : components)
File sourceFile = descriptor.fileFor(component);
if (!sourceFile.exists())
if (null != limiter)
File targetLink = new File(snapshotDirectoryPath,;
FileUtils.createHardLink(sourceFile, targetLink);
public boolean isRepaired()
return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
* Reads the key stored at the position saved in SASI.
* <p>
* When SASI is created, it uses key locations retrieved from {@link KeyReader#keyPositionForSecondaryIndex()}.
* This method is to read the key stored at such position. It is up to the concrete SSTable format implementation
* what that position means and which file it refers. The only requirement is that it is consistent with what
* {@link KeyReader#keyPositionForSecondaryIndex()} returns.
* @return key if found, {@code null} otherwise
public abstract DecoratedKey keyAtPositionFromSecondaryIndex(long keyPositionFromSecondaryIndex) throws IOException;
public boolean isPendingRepair()
return sstableMetadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR;
public TimeUUID getPendingRepair()
return sstableMetadata.pendingRepair;
public long getRepairedAt()
return sstableMetadata.repairedAt;
public boolean isTransient()
return sstableMetadata.isTransient;
public boolean intersects(Collection<Range<Token>> ranges)
Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken());
return Iterables.any(ranges, r -> r.intersects(range));
* TODO: Move someplace reusable
public abstract static class Operator
public static final Operator EQ = new Equals();
public static final Operator GE = new GreaterThanOrEqualTo();
public static final Operator GT = new GreaterThan();
* @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
* @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
public abstract int apply(int comparison);
final static class Equals extends Operator
public int apply(int comparison)
return -comparison;
final static class GreaterThanOrEqualTo extends Operator
public int apply(int comparison)
return comparison >= 0 ? 0 : 1;
final static class GreaterThan extends Operator
public int apply(int comparison)
return comparison > 0 ? 0 : 1;
public EstimatedHistogram getEstimatedPartitionSize()
return sstableMetadata.estimatedPartitionSize;
public EstimatedHistogram getEstimatedCellPerPartitionCount()
return sstableMetadata.estimatedCellPerPartitionCount;
public double getEstimatedDroppableTombstoneRatio(int gcBefore)
return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
public double getDroppableTombstonesBefore(int gcBefore)
return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
public double getCompressionRatio()
return sstableMetadata.compressionRatio;
public long getMinTimestamp()
return sstableMetadata.minTimestamp;
public long getMaxTimestamp()
return sstableMetadata.maxTimestamp;
public int getMinLocalDeletionTime()
return sstableMetadata.minLocalDeletionTime;
public int getMaxLocalDeletionTime()
return sstableMetadata.maxLocalDeletionTime;
* Whether the sstable may contain tombstones or if it is guaranteed to not contain any.
* <p>
* Note that having that method return {@code false} guarantees the sstable has no tombstones whatsoever (so no
* cell tombstone, no range tombstone maker and no expiring columns), but having it return {@code true} doesn't
* guarantee it contains any as it may simply have non-expired cells.
public boolean mayHaveTombstones()
// A sstable is guaranteed to have no tombstones if minLocalDeletionTime is still set to its default,
// Cell.NO_DELETION_TIME, which is bigger than any valid deletion times.
return getMinLocalDeletionTime() != Cell.NO_DELETION_TIME;
public int getMinTTL()
return sstableMetadata.minTTL;
public int getMaxTTL()
return sstableMetadata.maxTTL;
public long getTotalColumnsSet()
return sstableMetadata.totalColumnsSet;
public long getTotalRows()
return sstableMetadata.totalRows;
public int getAvgColumnSetPerRow()
return sstableMetadata.totalRows < 0
? -1
: (sstableMetadata.totalRows == 0 ? 0 : (int) (sstableMetadata.totalColumnsSet / sstableMetadata.totalRows));
public int getSSTableLevel()
return sstableMetadata.sstableLevel;
* Mutate sstable level with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
public void mutateLevelAndReload(int newLevel) throws IOException
synchronized (
descriptor.getMetadataSerializer().mutateLevel(descriptor, newLevel);
* Mutate sstable repair metadata with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
public void mutateRepairedAndReload(long newRepairedAt, TimeUUID newPendingRepair, boolean isTransient) throws IOException
synchronized (
descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, newRepairedAt, newPendingRepair, isTransient);
* Reloads the sstable metadata from disk.
* <p>
* Called after level is changed on sstable, for example if the sstable is dropped to L0
* <p>
* Might be possible to remove in future versions
* @throws IOException
public void reloadSSTableMetadata() throws IOException
this.sstableMetadata = StatsComponent.load(descriptor).statsMetadata();
public StatsMetadata getSSTableMetadata()
return sstableMetadata;
public RandomAccessReader openDataReader(RateLimiter limiter)
assert limiter != null;
return dfile.createReader(limiter);
public RandomAccessReader openDataReader()
return dfile.createReader();
public void trySkipFileCacheBefore(DecoratedKey key)
long position = getPosition(key, SSTableReader.Operator.GE);
NativeLibrary.trySkipCache(descriptor.fileFor(Components.DATA).absolutePath(), 0, position < 0 ? 0 : position);
public ChannelProxy getDataChannel()
* @return last modified time for data component. 0 if given component does not exist or IO error occurs.
public long getDataCreationTime()
return descriptor.fileFor(Components.DATA).lastModified();
* Increment the total read count and read rate for this SSTable. This should not be incremented for non-query reads,
* like compaction.
public void incrementReadCount()
if (readMeter != null)
public EncodingStats stats()
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
return sstableMetadata.encodingStats;
public Ref<SSTableReader> tryRef()
return selfRef.tryRef();
public Ref<SSTableReader> selfRef()
return selfRef;
public Ref<SSTableReader> ref()
return selfRef.ref();
protected List<AutoCloseable> setupInstance(boolean trackHotness)
return Collections.singletonList(dfile);
public void setup(boolean trackHotness)
assert tidy.closeables == null;
trackHotness &= TRACK_ACTIVITY;
tidy.setup(this, trackHotness, setupInstance(trackHotness));
this.readMeter =;
public void overrideReadMeter(RestorableMeter readMeter)
this.readMeter = = readMeter;
public void addTo(Ref.IdentityCollection identities)
tidy.closeables.forEach(c -> {
if (c instanceof SharedCloseable)
((SharedCloseable) c).addTo(identities);
* The method verifies whether the sstable may contain the provided key. The method does approximation using
* Bloom filter if it is present and if it is not, performs accurate check in the index.
public abstract boolean mayContainAssumingKeyIsInRange(DecoratedKey key);
* One instance per SSTableReader we create.
* <p>
* We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example),
* but there can only be one GlobalTidy for one single logical sstable.
* <p>
* When the InstanceTidier cleansup, it releases its reference to its GlobalTidy; when all InstanceTidiers
* for that type have run, the GlobalTidy cleans up.
protected static final class InstanceTidier implements Tidy
private final Descriptor descriptor;
private final WeakReference<Owner> owner;
private List<? extends AutoCloseable> closeables;
private Runnable runOnClose;
private boolean isReplaced = false;
// a reference to our shared tidy instance, that
// we will release when we are ourselves released
private Ref<GlobalTidy> globalRef;
private GlobalTidy global;
private volatile boolean setup;
public void setup(SSTableReader reader, boolean trackHotness, Collection<? extends AutoCloseable> closeables)
// get a new reference to the shared descriptor-type tidy
this.globalRef = GlobalTidy.get(reader); = globalRef.get();
if (trackHotness)
this.closeables = new ArrayList<>(closeables);
// to avoid tidy seeing partial state, set setup=true at the end
this.setup = true;
private InstanceTidier(Descriptor descriptor, Owner owner)
this.descriptor = descriptor;
this.owner = new WeakReference<>(owner);
public void tidy()
if (logger.isTraceEnabled())
logger.trace("Running instance tidier for {} with setup {}", descriptor, setup);
// don't try to cleanup if the sstablereader was never fully constructed
if (!setup)
final OpOrder.Barrier barrier;
Owner owner = this.owner.get();
if (owner != null)
barrier = owner.newReadOrderingBarrier();
barrier = null;
ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
public void run()
if (logger.isTraceEnabled())
logger.trace("Async instance tidier for {}, before barrier", descriptor);
if (barrier != null)
if (logger.isTraceEnabled())
logger.trace("Async instance tidier for {}, after barrier", descriptor);
Throwable exceptions = null;
if (runOnClose != null) try
catch (RuntimeException | Error ex)
logger.error("Failed to run on-close listeners for sstable " + descriptor.baseFile(), ex);
exceptions = ex;
Throwable closeExceptions = Throwables.close(null, Iterables.filter(closeables, Objects::nonNull));
if (closeExceptions != null)
logger.error("Failed to close some sstable components of " + descriptor.baseFile(), closeExceptions);
exceptions = Throwables.merge(exceptions, closeExceptions);
catch (RuntimeException | Error ex)
logger.error("Failed to release the global ref of " + descriptor.baseFile(), ex);
exceptions = Throwables.merge(exceptions, ex);
if (exceptions != null)
if (logger.isTraceEnabled())
logger.trace("Async instance tidier for {}, completed", descriptor);
public String toString()
return "Tidy " + descriptor.ksname + '.' + descriptor.cfname + '-' +;
public String name()
return descriptor.toString();
* One instance per logical sstable. This both tracks shared cleanup and some shared state related
* to the sstable's lifecycle.
* <p>
* All InstanceTidiers, on setup(), ask the static get() method for their shared state,
* and stash a reference to it to be released when they are. Once all such references are
* released, this shared tidy will be performed.
static final class GlobalTidy implements Tidy
static final WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null);
// keyed by descriptor, mapping to the shared GlobalTidy for that descriptor
static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
private final Descriptor desc;
// the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
// at once also, for testing purposes
private RestorableMeter readMeter;
// the scheduled persistence of the readMeter, that we will cancel once all instances of this logical
// sstable have been released
private WeakReference<ScheduledFuture<?>> readMeterSyncFuture = NULL;
// shared state managing if the logical sstable has been compacted; this is used in cleanup
private volatile Runnable obsoletion;
GlobalTidy(final SSTableReader reader)
this.desc = reader.descriptor;
void ensureReadMeter()
if (readMeter != null)
// Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
// the read meter when in client mode.
// Also, do not track read rates when running in client or tools mode (syncExecuter isn't available in these modes)
if (!TRACK_ACTIVITY || SchemaConstants.isLocalSystemKeyspace(desc.ksname) || DatabaseDescriptor.isClientOrToolInitialized())
readMeter = null;
readMeterSyncFuture = NULL;
readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname,;
// sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
readMeterSyncFuture = new WeakReference<>(syncExecutor.scheduleAtFixedRate(this::maybePersistSSTableReadMeter, 1, 5, TimeUnit.MINUTES));
void maybePersistSSTableReadMeter()
if (obsoletion == null && DatabaseDescriptor.getSStableReadRatePersistenceEnabled())
SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname,, readMeter);
private void stopReadMeterPersistence()
ScheduledFuture<?> readMeterSyncFutureLocal = readMeterSyncFuture.get();
if (readMeterSyncFutureLocal != null)
readMeterSyncFuture = NULL;
public void tidy()
if (obsoletion != null);
// don't ideally want to dropPageCache for the file until all instances have been released
for (Component c : desc.discoverComponents())
NativeLibrary.trySkipCache(desc.fileFor(c).absolutePath(), 0, 0);
public String name()
return desc.toString();
// get a new reference to the shared GlobalTidy for this sstable
public static Ref<GlobalTidy> get(SSTableReader sstable)
Descriptor descriptor = sstable.descriptor;
while (true)
Ref<GlobalTidy> ref = lookup.get(descriptor);
if (ref == null)
final GlobalTidy tidy = new GlobalTidy(sstable);
ref = new Ref<>(tidy, tidy);
Ref<GlobalTidy> ex = lookup.putIfAbsent(descriptor, ref);
if (ex == null)
return ref;
ref = ex;
Ref<GlobalTidy> newRef = ref.tryRef();
if (newRef != null)
return newRef;
// raced with tidy
lookup.remove(descriptor, ref);
public static void resetTidying()
public static class PartitionPositionBounds
public final long lowerPosition;
public final long upperPosition;
public PartitionPositionBounds(long lower, long upper)
this.lowerPosition = lower;
this.upperPosition = upper;
public final int hashCode()
int hashCode = (int) lowerPosition ^ (int) (lowerPosition >>> 32);
return 31 * (hashCode ^ (int) ((int) upperPosition ^ (upperPosition >>> 32)));
public final boolean equals(Object o)
if (!(o instanceof PartitionPositionBounds))
return false;
PartitionPositionBounds that = (PartitionPositionBounds) o;
return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
public static class IndexesBounds
public final int lowerPosition;
public final int upperPosition;
public IndexesBounds(int lower, int upper)
this.lowerPosition = lower;
this.upperPosition = upper;
public final int hashCode()
return 31 * lowerPosition * upperPosition;
public final boolean equals(Object o)
if (!(o instanceof IndexesBounds))
return false;
IndexesBounds that = (IndexesBounds) o;
return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
* Moves the sstable in oldDescriptor to a new place (with generation etc) in newDescriptor.
* <p>
* All components given will be moved/renamed
public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components, boolean copyData)
if (!oldDescriptor.isCompatible())
throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
boolean isLive = cfs.getLiveSSTables().stream().anyMatch(r -> r.descriptor.equals(newDescriptor)
|| r.descriptor.equals(oldDescriptor));
if (isLive)
String message = String.format("Can't move and open a file that is already in use in the table %s -> %s", oldDescriptor, newDescriptor);
throw new RuntimeException(message);
if (newDescriptor.fileFor(Components.DATA).exists())
String msg = String.format("File %s already exists, can't move the file there", newDescriptor.fileFor(Components.DATA));
throw new RuntimeException(msg);
if (copyData)
{"Hardlinking new SSTable {} to {}", oldDescriptor, newDescriptor);
hardlink(oldDescriptor, newDescriptor, components);
catch (FSWriteError ex)
logger.warn("Unable to hardlink new SSTable {} to {}, falling back to copying", oldDescriptor, newDescriptor, ex);
copy(oldDescriptor, newDescriptor, components);
{"Moving new SSTable {} to {}", oldDescriptor, newDescriptor);
rename(oldDescriptor, newDescriptor, components);
SSTableReader reader;
reader = open(cfs, newDescriptor, components, cfs.metadata);
catch (Throwable t)
logger.error("Aborting import of sstables. {} was corrupt", newDescriptor);
throw new RuntimeException(newDescriptor + " is corrupt, can't import", t);
return reader;
public static void shutdownBlocking(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
* @return the physical size on disk of all components for this SSTable in bytes
public long bytesOnDisk()
return bytesOnDisk(false);
* @return the total logical/uncompressed size in bytes of all components for this SSTable
public long logicalBytesOnDisk()
return bytesOnDisk(true);
private long bytesOnDisk(boolean logical)
long bytes = 0;
for (Component component : components)
// Only the data file is compressable.
bytes += logical && component == Components.DATA && compression
? getCompressionMetadata().dataLength
: descriptor.fileFor(component).length();
return bytes;
public void maybePersistSSTableReadMeter()
* Returns a new verifier for this sstable. Note that the reader must match the provided cfs.
public abstract IVerifier getVerifier(ColumnFamilyStore cfs,
OutputHandler outputHandler,
boolean isOffline,
IVerifier.Options options);
* A method to be called by {@link #getPosition(PartitionPosition, Operator, boolean, SSTableReadsListener)}
* and {@link #getRowIndexEntry(PartitionPosition, Operator, boolean, SSTableReadsListener)} methods when
* a searched key is found. It adds a trace message and notify the provided listener.
protected void notifySelected(SSTableReadsListener.SelectionReason reason, SSTableReadsListener localListener, Operator op, boolean updateStats, AbstractRowIndexEntry entry)
reason.trace(descriptor, entry);
if (localListener != null)
localListener.onSSTableSelected(this, reason);
* A method to be called by {@link #getPosition(PartitionPosition, Operator, boolean, SSTableReadsListener)}
* and {@link #getRowIndexEntry(PartitionPosition, Operator, boolean, SSTableReadsListener)} methods when
* a searched key is not found. It adds a trace message and notify the provided listener.
protected void notifySkipped(SSTableReadsListener.SkippingReason reason, SSTableReadsListener localListener, Operator op, boolean updateStats)
if (localListener != null)
localListener.onSSTableSkipped(this, reason);
* A builder of this sstable reader. It should be extended for each implementation of {@link SSTableReader} with
* the implementation specific fields.
* @param <R> type of the reader the builder creates
* @param <B> type of this builder
public abstract static class Builder<R extends SSTableReader, B extends Builder<R, B>> extends SSTable.Builder<R, B>
private long maxDataAge;
private StatsMetadata statsMetadata;
private OpenReason openReason;
private SerializationHeader serializationHeader;
private FileHandle dataFile;
private DecoratedKey first;
private DecoratedKey last;
private boolean suspected;
public Builder(Descriptor descriptor)
public B setMaxDataAge(long maxDataAge)
Preconditions.checkArgument(maxDataAge >= 0);
this.maxDataAge = maxDataAge;
return (B) this;
public B setStatsMetadata(StatsMetadata statsMetadata)
this.statsMetadata = statsMetadata;
return (B) this;
public B setOpenReason(OpenReason openReason)
this.openReason = openReason;
return (B) this;
public B setSerializationHeader(SerializationHeader serializationHeader)
this.serializationHeader = serializationHeader;
return (B) this;
public B setDataFile(FileHandle dataFile)
this.dataFile = dataFile;
return (B) this;
public B setFirst(DecoratedKey first)
this.first = first != null ? first.retainable() : null;
return (B) this;
public B setLast(DecoratedKey last)
this.last = last != null ? last.retainable() : null;
return (B) this;
public B setSuspected(boolean suspected)
this.suspected = suspected;
return (B) this;
public long getMaxDataAge()
return maxDataAge;
public StatsMetadata getStatsMetadata()
return statsMetadata;
public OpenReason getOpenReason()
return openReason;
public SerializationHeader getSerializationHeader()
return serializationHeader;
public FileHandle getDataFile()
return dataFile;
public DecoratedKey getFirst()
return first;
public DecoratedKey getLast()
return last;
public boolean isSuspected()
return suspected;
protected abstract R buildInternal(Owner owner);
public R build(Owner owner, boolean validate, boolean online)
R reader = buildInternal(owner);
if (isSuspected())
if (validate)
catch (RuntimeException | Error ex)
throw ex;
return reader;