blob: fd34c024482eb119babf010efccd89378634dd46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredSource;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.IVerifier;
import org.apache.cassandra.io.sstable.KeyIterator;
import org.apache.cassandra.io.sstable.KeyReader;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableIdFactory;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.CheckedFunction;
import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseable;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
import static org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull;
/**
* An SSTableReader can be constructed in a number of places, but typically is either read from disk at startup, or
* constructed from a flushed memtable, or after compaction to replace some existing sstables. However once created,
* an sstablereader may also be modified.
* <p>
* A reader's {@link OpenReason} describes its current stage in its lifecycle. Note that in parallel to this, there are
* two different Descriptor types; TMPLINK and FINAL; the latter corresponds to {@link OpenReason#NORMAL} state readers
* and all readers that replace a {@link OpenReason#NORMAL} one. TMPLINK is used for {@link OpenReason#EARLY} state
* readers and no others.
* <p>
* When a reader is being compacted, if the result is large its replacement may be opened as {@link OpenReason#EARLY}
* before compaction completes in order to present the result to consumers earlier. In this case the reader will itself
* be changed to a {@link OpenReason#MOVED_START} state, where its start no longer represents its on-disk minimum key.
* This is to permit reads to be directed to only one reader when the two represent the same data.
* The {@link OpenReason#EARLY} file can represent a compaction result that is either partially complete and still
* in-progress, or a complete and immutable sstable that is part of a larger macro compaction action that has not yet
* fully completed.
* <p>
* Currently ALL compaction results at least briefly go through an {@link OpenReason#EARLY} open state prior to completion,
* regardless of if early opening is enabled.
* <p>
* Since a reader can be created multiple times over the same shared underlying resources, and the exact resources it
* shares between each instance differ subtly, we track the lifetime of any underlying resource with its own reference
* count, which each instance takes a {@link Ref} to. Each instance then tracks references to itself, and once these
* all expire it releases all its {@link Ref} to these underlying resources.
* <p>
* There is some shared cleanup behaviour needed only once all readers in a certain stage of their lifecycle
* (i.e. {@link OpenReason#EARLY} or {@link OpenReason#NORMAL} opening), and some that must only occur once all readers
* of any kind over a single logical sstable have expired. These are managed by the {@link InstanceTidier} and
* {@link GlobalTidy} classes at the bottom, and are effectively managed as another resource each instance tracks its
* own {@link Ref} instance to, to ensure all of these resources are cleaned up safely and can be debugged otherwise.
* <p>
* TODO: fill in details about Tracker and lifecycle interactions for tools, and for compaction strategies
*/
public abstract class SSTableReader extends SSTable implements UnfilteredSource, SelfRefCounted<SSTableReader>
{
private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
private static final boolean TRACK_ACTIVITY = CassandraRelevantProperties.DISABLE_SSTABLE_ACTIVITY_TRACKING.getBoolean();
private static final ScheduledExecutorPlus syncExecutor = initSyncExecutor();
private static ScheduledExecutorPlus initSyncExecutor()
{
if (DatabaseDescriptor.isClientOrToolInitialized())
return null;
// Do NOT start this thread pool in client mode
ScheduledExecutorPlus syncExecutor = executorFactory().scheduled("read-hotness-tracker");
// Immediately remove readMeter sync task when cancelled.
// TODO: should we set this by default on all scheduled executors?
if (syncExecutor instanceof ScheduledThreadPoolExecutor)
((ScheduledThreadPoolExecutor) syncExecutor).setRemoveOnCancelPolicy(true);
return syncExecutor;
}
private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
public static final Comparator<SSTableReader> maxTimestampAscending = Comparator.comparingLong(SSTableReader::getMaxTimestamp);
public static final Comparator<SSTableReader> maxTimestampDescending = maxTimestampAscending.reversed();
// it's just an object, which we use regular Object equality on; we introduce a special class just for easy recognition
public static final class UniqueIdentifier
{
}
public final UniqueIdentifier instanceId = new UniqueIdentifier();
public static final Comparator<SSTableReader> sstableComparator = Comparator.comparing(o -> o.first);
public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
public static final Comparator<SSTableReader> idComparator = Comparator.comparing(t -> t.descriptor.id, SSTableIdFactory.COMPARATOR);
public static final Comparator<SSTableReader> idReverseComparator = idComparator.reversed();
public static final Comparator<SSTableReader> sizeComparator = (o1, o2) -> Longs.compare(o1.onDiskLength(), o2.onDiskLength());
/**
* maxDataAge is a timestamp in local server time (e.g. Global.currentTimeMilli) which represents an upper bound
* to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
* later than maxDataAge.
* <p>
* The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
* <p>
* When a new sstable is flushed, maxDataAge is set to the time of creation.
* When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
* <p>
* The age is in milliseconds since epoc and is local to this host.
*/
public final long maxDataAge;
public enum OpenReason
{
/**
* <ul>
* <li>From {@code None} - Reader has been read from disk, either at startup or from a flushed memtable</li>
* <li>From {@link #EARLY} - Reader is the final result of a compaction</li>
* <li>From {@link #MOVED_START} - Reader WAS being compacted, but this failed and it has been restored
* to {code NORMAL}status</li>
* </ul>
*/
NORMAL,
/**
* <ul>
* <li>From {@code None} - Reader is a compaction replacement that is either incomplete and has been opened
* to represent its partial result status, or has been finished but the compaction it is a part of has not
* yet completed fully</li>
* <li>From {@link #EARLY} - Same as from {@code None}, only it is not the first time it has been
* </ul>
*/
EARLY,
/**
* From:
* <ul>
* <li>From {@link #NORMAL} - Reader has seen low traffic and the amount of memory available for index summaries
* is constrained, so its index summary has been downsampled</li>
* <li>From {@link #METADATA_CHANGE} - Same
* </ul>
*/
METADATA_CHANGE,
/**
* <ul>
* <li>From {@link #NORMAL} - Reader is being compacted. This compaction has not finished, but the compaction
* result is either partially or fully opened, to either partially or fully replace this reader. This reader's
* start key has been updated to represent this, so that reads only hit one or the other reader.</li>
* </ul>
*/
MOVED_START
}
public final OpenReason openReason;
protected final FileHandle dfile;
// technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
// but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
public final AtomicBoolean isSuspect = new AtomicBoolean(false);
// not final since we need to be able to change level on a file.
protected volatile StatsMetadata sstableMetadata;
public final SerializationHeader header;
private final InstanceTidier tidy;
private final Ref<SSTableReader> selfRef;
private RestorableMeter readMeter;
private volatile double crcCheckChance;
public final DecoratedKey first;
public final DecoratedKey last;
public final AbstractBounds<Token> bounds;
/**
* Calculate approximate key count.
* If cardinality estimator is available on all given sstables, then this method use them to estimate
* key count.
* If not, then this uses index summaries.
*
* @param sstables SSTables to calculate key count
* @return estimated key count
*/
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
{
long count = -1;
if (Iterables.isEmpty(sstables))
return count;
boolean failed = false;
ICardinality cardinality = null;
for (SSTableReader sstable : sstables)
{
if (sstable.openReason == OpenReason.EARLY)
continue;
try
{
CompactionMetadata metadata = StatsComponent.load(sstable.descriptor).compactionMetadata();
// If we can't load the CompactionMetadata, we are forced to estimate the keys using the index
// summary. (CASSANDRA-10676)
if (metadata == null)
{
logger.warn("Reading cardinality from Statistics.db failed for {}", sstable.getFilename());
failed = true;
break;
}
if (cardinality == null)
cardinality = metadata.cardinalityEstimator;
else
cardinality = cardinality.merge(metadata.cardinalityEstimator);
}
catch (IOException e)
{
logger.warn("Reading cardinality from Statistics.db failed.", e);
failed = true;
break;
}
catch (CardinalityMergeException e)
{
logger.warn("Cardinality merge failed.", e);
failed = true;
break;
}
}
if (cardinality != null && !failed)
count = cardinality.cardinality();
// if something went wrong above or cardinality is not available, calculate using index summary
if (count < 0)
{
count = 0;
for (SSTableReader sstable : sstables)
count += sstable.estimatedKeys();
}
return count;
}
public static SSTableReader open(SSTable.Owner owner, Descriptor descriptor)
{
return open(owner, descriptor, null);
}
public static SSTableReader open(SSTable.Owner owner, Descriptor desc, TableMetadataRef metadata)
{
return open(owner, desc, null, metadata);
}
public static SSTableReader open(SSTable.Owner owner, Descriptor descriptor, Set<Component> components, TableMetadataRef metadata)
{
return open(owner, descriptor, components, metadata, true, false);
}
// use only for offline or "Standalone" operations
public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs)
{
return open(cfs, descriptor, components, cfs.metadata, false, true);
}
// use only for offline or "Standalone" operations
public static SSTableReader openNoValidation(SSTable.Owner owner, Descriptor descriptor, TableMetadataRef metadata)
{
return open(owner, descriptor, null, metadata, false, true);
}
/**
* Open SSTable reader to be used in batch mode(such as sstableloader).
*/
public static SSTableReader openForBatch(SSTable.Owner owner, Descriptor descriptor, Set<Component> components, TableMetadataRef metadata)
{
return open(owner, descriptor, components, metadata, true, true);
}
/**
* Open an SSTable for reading
*
* @param owner owning entity
* @param descriptor SSTable to open
* @param components Components included with this SSTable
* @param metadata for this SSTables CF
* @param validate Check SSTable for corruption (limited)
* @param isOffline Whether we are opening this SSTable "offline", for example from an external tool or not for inclusion in queries (validations)
* This stops regenerating BF + Summaries and also disables tracking of hotness for the SSTable.
* @return {@link SSTableReader}
*/
public static SSTableReader open(Owner owner,
Descriptor descriptor,
Set<Component> components,
TableMetadataRef metadata,
boolean validate,
boolean isOffline)
{
SSTableReaderLoadingBuilder<?, ?> builder = descriptor.getFormat().getReaderFactory().loadingBuilder(descriptor, metadata, components);
return builder.build(owner, validate, !isOffline);
}
public static Collection<SSTableReader> openAll(SSTable.Owner owner, Set<Map.Entry<Descriptor, Set<Component>>> entries,
final TableMetadataRef metadata)
{
final Collection<SSTableReader> sstables = newBlockingQueue();
ExecutorPlus executor = executorFactory().pooled("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
try
{
for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
{
Runnable runnable = () -> {
SSTableReader sstable;
try
{
sstable = open(owner, entry.getKey(), entry.getValue(), metadata);
}
catch (CorruptSSTableException ex)
{
JVMStabilityInspector.inspectThrowable(ex);
logger.error("Corrupt sstable {}; skipping table", entry, ex);
return;
}
catch (FSError ex)
{
JVMStabilityInspector.inspectThrowable(ex);
logger.error("Cannot read sstable {}; file system error, skipping table", entry, ex);
return;
}
sstables.add(sstable);
};
executor.submit(runnable);
}
}
finally
{
executor.shutdown();
}
try
{
executor.awaitTermination(7, TimeUnit.DAYS);
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException(e);
}
return sstables;
}
protected SSTableReader(Builder<?, ?> builder, Owner owner)
{
super(builder, owner);
this.sstableMetadata = builder.getStatsMetadata();
this.header = builder.getSerializationHeader();
this.dfile = builder.getDataFile();
this.maxDataAge = builder.getMaxDataAge();
this.openReason = builder.getOpenReason();
this.first = builder.getFirst();
this.last = builder.getLast();
this.bounds = AbstractBounds.strictlyWrapsAround(first.getToken(), last.getToken())
? null // this will cause the validation to fail, but the reader is opened with no validation,
// e.g. for scrubbing, we should accept screwed bounds
: AbstractBounds.bounds(first.getToken(), true, last.getToken(), true);
tidy = new InstanceTidier(descriptor, owner);
selfRef = new Ref<>(this, tidy);
}
@Override
public DecoratedKey getFirst()
{
return first;
}
@Override
public DecoratedKey getLast()
{
return last;
}
@Override
public AbstractBounds<Token> getBounds()
{
return Objects.requireNonNull(bounds, "Bounds were not created because the sstable is out of order");
}
public DataIntegrityMetadata.ChecksumValidator maybeGetChecksumValidator() throws IOException
{
if (descriptor.fileFor(Components.CRC).exists())
return new DataIntegrityMetadata.ChecksumValidator(descriptor.fileFor(Components.DATA), descriptor.fileFor(Components.CRC));
else
return null;
}
public DataIntegrityMetadata.FileDigestValidator maybeGetDigestValidator() throws IOException
{
if (descriptor.fileFor(Components.DIGEST).exists())
return new DataIntegrityMetadata.FileDigestValidator(descriptor.fileFor(Components.DATA), descriptor.fileFor(Components.DIGEST));
else
return null;
}
public static long getTotalBytes(Iterable<SSTableReader> sstables)
{
long sum = 0;
for (SSTableReader sstable : sstables)
sum += sstable.onDiskLength();
return sum;
}
public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables)
{
long sum = 0;
for (SSTableReader sstable : sstables)
sum += sstable.uncompressedLength();
return sum;
}
public boolean equals(Object that)
{
return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
}
public int hashCode()
{
return this.descriptor.hashCode();
}
public String getFilename()
{
return dfile.path();
}
public void setupOnline()
{
owner().ifPresent(o -> setCrcCheckChance(o.getCrcCheckChance()));
}
/**
* Execute provided task with sstable lock to avoid racing with index summary redistribution, SEE CASSANDRA-15861.
*
* @param task to be guarded by sstable lock
*/
public <R, E extends Exception> R runWithLock(CheckedFunction<Descriptor, R, E> task) throws E
{
synchronized (tidy.global)
{
return task.apply(descriptor);
}
}
public void setReplaced()
{
synchronized (tidy.global)
{
assert !tidy.isReplaced;
tidy.isReplaced = true;
}
}
public boolean isReplaced()
{
synchronized (tidy.global)
{
return tidy.isReplaced;
}
}
/**
* The runnable passed to this method must not be an anonymous or non-static inner class. It can be a lambda or a
* method reference provided that it does not retain a reference chain to this reader.
*/
public void runOnClose(final Runnable runOnClose)
{
if (runOnClose == null)
return;
synchronized (tidy.global)
{
final Runnable existing = tidy.runOnClose;
if (existing == null)
tidy.runOnClose = runOnClose;
else
tidy.runOnClose = () -> {
existing.run();
runOnClose.run();
};
}
}
/**
* The method sets fields specific to this {@link SSTableReader} and the parent {@link SSTable} on the provided
* {@link Builder}. The method is intended to be called from the overloaded {@code unbuildTo} method in subclasses.
*
* @param builder the builder on which the fields should be set
* @param sharedCopy whether the {@link SharedCloseable} resources should be passed as shared copies or directly;
* note that the method will overwrite the fields representing {@link SharedCloseable} only if
* they are not set in the builder yet (the relevant fields in the builder are {@code null}).
* @return the same instance of builder as provided
*/
protected final <B extends Builder<?, B>> B unbuildTo(B builder, boolean sharedCopy)
{
B b = super.unbuildTo(builder, sharedCopy);
if (builder.getDataFile() == null)
b.setDataFile(sharedCopy ? sharedCopyOrNull(dfile) : dfile);
b.setStatsMetadata(sstableMetadata);
b.setSerializationHeader(header);
b.setMaxDataAge(maxDataAge);
b.setOpenReason(openReason);
b.setFirst(first);
b.setLast(last);
b.setSuspected(isSuspect.get());
return b;
}
public abstract SSTableReader cloneWithRestoredStart(DecoratedKey restoredStart);
public abstract SSTableReader cloneWithNewStart(DecoratedKey newStart);
public RestorableMeter getReadMeter()
{
return readMeter;
}
/**
* All the resources which should be released upon closing this sstable reader are registered with in
* {@link GlobalTidy}. This method lets close a provided resource explicitly any time and unregister it from
* {@link GlobalTidy} so that it is not tried to be released twice.
*
* @param closeable a resource to be closed
*/
protected void closeInternalComponent(AutoCloseable closeable)
{
synchronized (tidy.global)
{
boolean removed = tidy.closeables.remove(closeable);
Preconditions.checkState(removed);
try
{
closeable.close();
}
catch (Exception ex)
{
throw new RuntimeException("Failed to close " + closeable, ex);
}
}
}
/**
* This method is expected to close the components which occupy memory but are not needed when we just want to
* stream the components (for example, when SSTable is opened with SSTableLoader). The method should call
* {@link #closeInternalComponent(AutoCloseable)} for each such component. Leaving the implementation empty is
* valid given there are not such resources to release.
*/
public abstract void releaseInMemoryComponents();
/**
* Perform any validation needed for the reader upon creation before returning it from the {@link Builder}.
*/
public void validate()
{
if (this.first.compareTo(this.last) > 0 || bounds == null)
{
throw new CorruptSSTableException(new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last)), getFilename());
}
}
/**
* Returns the compression metadata for this sstable. Note that the compression metdata is a resource and should not
* be closed by the caller.
* TODO do not return a closeable resource or return a shared copy
*
* @throws IllegalStateException if the sstable is not compressed
*/
public CompressionMetadata getCompressionMetadata()
{
if (!compression)
throw new IllegalStateException(this + " is not compressed");
return dfile.compressionMetadata().get();
}
/**
* Returns the amount of memory in bytes used off heap by the compression meta-data.
*
* @return the amount of memory in bytes used off heap by the compression meta-data
*/
public long getCompressionMetadataOffHeapSize()
{
if (!compression)
return 0;
return getCompressionMetadata().offHeapSize();
}
/**
* Calculates an estimate of the number of keys in the sstable represented by this reader.
*/
public abstract long estimatedKeys();
/**
* Calculates an estimate of the number of keys for the given ranges in the sstable represented by this reader.
*/
public abstract long estimatedKeysForRanges(Collection<Range<Token>> ranges);
/**
* Returns whether methods like {@link #estimatedKeys()} or {@link #estimatedKeysForRanges(Collection)} can return
* sensible estimations.
*/
public abstract boolean isEstimationInformative();
/**
* Returns sample keys for the provided token range.
*/
public abstract Iterable<DecoratedKey> getKeySamples(final Range<Token> range);
/**
* Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
*
* @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
*/
public List<PartitionPositionBounds> getPositionsForRanges(Collection<Range<Token>> ranges)
{
// use the index to determine a minimal section for each range
List<PartitionPositionBounds> positions = new ArrayList<>();
for (Range<Token> range : Range.normalize(ranges))
{
assert !range.isWrapAround() || range.right.isMinimum();
// truncate the range so it at most covers the sstable
AbstractBounds<PartitionPosition> bounds = Range.makeRowRange(range);
PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
PartitionPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
continue;
long left = getPosition(leftBound, Operator.GT);
long right = (rightBound.compareTo(last) > 0)
? uncompressedLength()
: getPosition(rightBound, Operator.GT);
if (left == right)
// empty range
continue;
assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
positions.add(new PartitionPositionBounds(left, right));
}
return positions;
}
/**
* Retrieves the position while updating the key cache and the stats.
*
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
* @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
*/
public final long getPosition(PartitionPosition key, Operator op)
{
return getPosition(key, op, SSTableReadsListener.NOOP_LISTENER);
}
public final long getPosition(PartitionPosition key, Operator op, SSTableReadsListener listener)
{
return getPosition(key, op, true, listener);
}
public final long getPosition(PartitionPosition key,
Operator op,
boolean updateStats)
{
return getPosition(key, op, updateStats, SSTableReadsListener.NOOP_LISTENER);
}
/**
* Retrieve a position in data file according to the provided key and operator.
*
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
* @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
* @param updateStats true if updating stats and cache
* @param listener a listener used to handle internal events
* @return The index entry corresponding to the key, or null if the key is not present
*/
protected long getPosition(PartitionPosition key,
Operator op,
boolean updateStats,
SSTableReadsListener listener)
{
AbstractRowIndexEntry rie = getRowIndexEntry(key, op, updateStats, listener);
return rie != null ? rie.position : -1;
}
/**
* Retrieve an index entry for the partition found according to the provided key and operator.
*
* @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
* allow key selection by token bounds but only if op != * EQ
* @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
* @param updateStats true if updating stats and cache
* @param listener a listener used to handle internal events
* @return The index entry corresponding to the key, or null if the key is not present
*/
@VisibleForTesting
protected abstract AbstractRowIndexEntry getRowIndexEntry(PartitionPosition key,
Operator op,
boolean updateStats,
SSTableReadsListener listener);
public UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, long dataPosition, boolean tombstoneOnly)
{
return SSTableIdentityIterator.create(this, file, dataPosition, key, tombstoneOnly);
}
/**
* Returns a {@link KeyReader} over all keys in the sstable.
*/
public abstract KeyReader keyReader() throws IOException;
/**
* Returns a {@link KeyIterator} over all keys in the sstable.
*/
public KeyIterator keyIterator() throws IOException
{
return new KeyIterator(keyReader(), getPartitioner(), uncompressedLength(), new ReentrantReadWriteLock());
}
/**
* Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
*/
public abstract DecoratedKey firstKeyBeyond(PartitionPosition token);
/**
* Returns the length in bytes of the (uncompressed) data for this SSTable. For compressed files, this is not
* the same thing as the on disk size (see {@link #onDiskLength()}).
*/
public long uncompressedLength()
{
return dfile.dataLength();
}
/**
* The length in bytes of the on disk size for this SSTable. For compressed files, this is not the same thing
* as the data length (see {@link #uncompressedLength()}).
*/
public long onDiskLength()
{
return dfile.onDiskLength;
}
@VisibleForTesting
public double getCrcCheckChance()
{
return crcCheckChance;
}
/**
* Set the value of CRC check chance. The argument supplied is obtained from the property of the owning CFS.
* Called when either the SSTR is initialized, or the CFS's property is updated via JMX
*/
public void setCrcCheckChance(double crcCheckChance)
{
this.crcCheckChance = crcCheckChance;
dfile.compressionMetadata().ifPresent(metadata -> metadata.parameters.setCrcCheckChance(crcCheckChance));
}
/**
* Mark the sstable as obsolete, i.e., compacted into newer sstables.
* <p>
* When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere except for
* threads holding a reference.
* <p>
* Calling it multiple times is usually buggy.
*/
public void markObsolete(Runnable tidier)
{
if (logger.isTraceEnabled())
logger.trace("Marking {} compacted", getFilename());
synchronized (tidy.global)
{
assert !tidy.isReplaced;
assert tidy.global.obsoletion == null : this + " was already marked compacted";
tidy.global.obsoletion = tidier;
tidy.global.stopReadMeterPersistence();
}
}
public boolean isMarkedCompacted()
{
return tidy.global.obsoletion != null;
}
public void markSuspect()
{
if (logger.isTraceEnabled())
logger.trace("Marking {} as a suspect to be excluded from reads.", getFilename());
isSuspect.getAndSet(true);
}
@VisibleForTesting
public void unmarkSuspect()
{
isSuspect.getAndSet(false);
}
public boolean isMarkedSuspect()
{
return isSuspect.get();
}
/**
* Direct I/O SSTableScanner over a defined range of tokens.
*
* @param range the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
public ISSTableScanner getScanner(Range<Token> range)
{
if (range == null)
return getScanner();
return getScanner(Collections.singletonList(range));
}
/**
* Direct I/O SSTableScanner over the entirety of the sstable..
*
* @return A Scanner over the full content of the SSTable.
*/
public abstract ISSTableScanner getScanner();
/**
* Direct I/O SSTableScanner over a defined collection of ranges of tokens.
*
* @param ranges the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges);
/**
* Direct I/O SSTableScanner over an iterator of bounds.
*
* @param rangeIterator the keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
public abstract ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator);
/**
* Create a {@link FileDataInput} for the data file of the sstable represented by this reader. This method returns
* a newly opened resource which must be closed by the caller.
*
* @param position the data input will be opened and seek to this position
*/
public FileDataInput getFileDataInput(long position)
{
return dfile.createReader(position);
}
/**
* Tests if the sstable contains data newer than the given age param (in localhost currentMillis time).
* This works in conjunction with maxDataAge which is an upper bound on the data in the sstable represented
* by this reader.
*
* @return {@code true} iff this sstable contains data that's newer than the given timestamp
*/
public boolean newSince(long timestampMillis)
{
return maxDataAge > timestampMillis;
}
public void createLinks(String snapshotDirectoryPath)
{
createLinks(snapshotDirectoryPath, null);
}
public void createLinks(String snapshotDirectoryPath, RateLimiter rateLimiter)
{
createLinks(descriptor, components, snapshotDirectoryPath, rateLimiter);
}
public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath)
{
createLinks(descriptor, components, snapshotDirectoryPath, null);
}
public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath, RateLimiter limiter)
{
for (Component component : components)
{
File sourceFile = descriptor.fileFor(component);
if (!sourceFile.exists())
continue;
if (null != limiter)
limiter.acquire();
File targetLink = new File(snapshotDirectoryPath, sourceFile.name());
FileUtils.createHardLink(sourceFile, targetLink);
}
}
public boolean isRepaired()
{
return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
}
/**
* Reads the key stored at the position saved in SASI.
* <p>
* When SASI is created, it uses key locations retrieved from {@link KeyReader#keyPositionForSecondaryIndex()}.
* This method is to read the key stored at such position. It is up to the concrete SSTable format implementation
* what that position means and which file it refers. The only requirement is that it is consistent with what
* {@link KeyReader#keyPositionForSecondaryIndex()} returns.
*
* @return key if found, {@code null} otherwise
*/
public abstract DecoratedKey keyAtPositionFromSecondaryIndex(long keyPositionFromSecondaryIndex) throws IOException;
public boolean isPendingRepair()
{
return sstableMetadata.pendingRepair != ActiveRepairService.NO_PENDING_REPAIR;
}
public TimeUUID getPendingRepair()
{
return sstableMetadata.pendingRepair;
}
public long getRepairedAt()
{
return sstableMetadata.repairedAt;
}
public boolean isTransient()
{
return sstableMetadata.isTransient;
}
public boolean intersects(Collection<Range<Token>> ranges)
{
Bounds<Token> range = new Bounds<>(first.getToken(), last.getToken());
return Iterables.any(ranges, r -> r.intersects(range));
}
/**
* TODO: Move someplace reusable
*/
public abstract static class Operator
{
public static final Operator EQ = new Equals();
public static final Operator GE = new GreaterThanOrEqualTo();
public static final Operator GT = new GreaterThan();
/**
* @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
* @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
*/
public abstract int apply(int comparison);
final static class Equals extends Operator
{
public int apply(int comparison)
{
return -comparison;
}
}
final static class GreaterThanOrEqualTo extends Operator
{
public int apply(int comparison)
{
return comparison >= 0 ? 0 : 1;
}
}
final static class GreaterThan extends Operator
{
public int apply(int comparison)
{
return comparison > 0 ? 0 : 1;
}
}
}
public EstimatedHistogram getEstimatedPartitionSize()
{
return sstableMetadata.estimatedPartitionSize;
}
public EstimatedHistogram getEstimatedCellPerPartitionCount()
{
return sstableMetadata.estimatedCellPerPartitionCount;
}
public double getEstimatedDroppableTombstoneRatio(int gcBefore)
{
return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
}
public double getDroppableTombstonesBefore(int gcBefore)
{
return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
}
public double getCompressionRatio()
{
return sstableMetadata.compressionRatio;
}
public long getMinTimestamp()
{
return sstableMetadata.minTimestamp;
}
public long getMaxTimestamp()
{
return sstableMetadata.maxTimestamp;
}
public int getMinLocalDeletionTime()
{
return sstableMetadata.minLocalDeletionTime;
}
public int getMaxLocalDeletionTime()
{
return sstableMetadata.maxLocalDeletionTime;
}
/**
* Whether the sstable may contain tombstones or if it is guaranteed to not contain any.
* <p>
* Note that having that method return {@code false} guarantees the sstable has no tombstones whatsoever (so no
* cell tombstone, no range tombstone maker and no expiring columns), but having it return {@code true} doesn't
* guarantee it contains any as it may simply have non-expired cells.
*/
public boolean mayHaveTombstones()
{
// A sstable is guaranteed to have no tombstones if minLocalDeletionTime is still set to its default,
// Cell.NO_DELETION_TIME, which is bigger than any valid deletion times.
return getMinLocalDeletionTime() != Cell.NO_DELETION_TIME;
}
public int getMinTTL()
{
return sstableMetadata.minTTL;
}
public int getMaxTTL()
{
return sstableMetadata.maxTTL;
}
public long getTotalColumnsSet()
{
return sstableMetadata.totalColumnsSet;
}
public long getTotalRows()
{
return sstableMetadata.totalRows;
}
public int getAvgColumnSetPerRow()
{
return sstableMetadata.totalRows < 0
? -1
: (sstableMetadata.totalRows == 0 ? 0 : (int) (sstableMetadata.totalColumnsSet / sstableMetadata.totalRows));
}
public int getSSTableLevel()
{
return sstableMetadata.sstableLevel;
}
/**
* Mutate sstable level with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
*/
public void mutateLevelAndReload(int newLevel) throws IOException
{
synchronized (tidy.global)
{
descriptor.getMetadataSerializer().mutateLevel(descriptor, newLevel);
reloadSSTableMetadata();
}
}
/**
* Mutate sstable repair metadata with a lock to avoid racing with entire-sstable-streaming and then reload sstable metadata
*/
public void mutateRepairedAndReload(long newRepairedAt, TimeUUID newPendingRepair, boolean isTransient) throws IOException
{
synchronized (tidy.global)
{
descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, newRepairedAt, newPendingRepair, isTransient);
reloadSSTableMetadata();
}
}
/**
* Reloads the sstable metadata from disk.
* <p>
* Called after level is changed on sstable, for example if the sstable is dropped to L0
* <p>
* Might be possible to remove in future versions
*
* @throws IOException
*/
public void reloadSSTableMetadata() throws IOException
{
this.sstableMetadata = StatsComponent.load(descriptor).statsMetadata();
}
public StatsMetadata getSSTableMetadata()
{
return sstableMetadata;
}
public RandomAccessReader openDataReader(RateLimiter limiter)
{
assert limiter != null;
return dfile.createReader(limiter);
}
public RandomAccessReader openDataReader()
{
return dfile.createReader();
}
public void trySkipFileCacheBefore(DecoratedKey key)
{
long position = getPosition(key, SSTableReader.Operator.GE);
NativeLibrary.trySkipCache(descriptor.fileFor(Components.DATA).absolutePath(), 0, position < 0 ? 0 : position);
}
public ChannelProxy getDataChannel()
{
return dfile.channel;
}
/**
* @return last modified time for data component. 0 if given component does not exist or IO error occurs.
*/
public long getDataCreationTime()
{
return descriptor.fileFor(Components.DATA).lastModified();
}
/**
* Increment the total read count and read rate for this SSTable. This should not be incremented for non-query reads,
* like compaction.
*/
public void incrementReadCount()
{
if (readMeter != null)
readMeter.mark();
}
public EncodingStats stats()
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
return sstableMetadata.encodingStats;
}
public Ref<SSTableReader> tryRef()
{
return selfRef.tryRef();
}
public Ref<SSTableReader> selfRef()
{
return selfRef;
}
public Ref<SSTableReader> ref()
{
return selfRef.ref();
}
protected List<AutoCloseable> setupInstance(boolean trackHotness)
{
return Collections.singletonList(dfile);
}
public void setup(boolean trackHotness)
{
assert tidy.closeables == null;
trackHotness &= TRACK_ACTIVITY;
tidy.setup(this, trackHotness, setupInstance(trackHotness));
this.readMeter = tidy.global.readMeter;
}
@VisibleForTesting
public void overrideReadMeter(RestorableMeter readMeter)
{
this.readMeter = tidy.global.readMeter = readMeter;
}
public void addTo(Ref.IdentityCollection identities)
{
identities.add(this);
identities.add(tidy.globalRef);
tidy.closeables.forEach(c -> {
if (c instanceof SharedCloseable)
((SharedCloseable) c).addTo(identities);
});
}
/**
* The method verifies whether the sstable may contain the provided key. The method does approximation using
* Bloom filter if it is present and if it is not, performs accurate check in the index.
*/
public abstract boolean mayContainAssumingKeyIsInRange(DecoratedKey key);
/**
* One instance per SSTableReader we create.
* <p>
* We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example),
* but there can only be one GlobalTidy for one single logical sstable.
* <p>
* When the InstanceTidier cleansup, it releases its reference to its GlobalTidy; when all InstanceTidiers
* for that type have run, the GlobalTidy cleans up.
*/
protected static final class InstanceTidier implements Tidy
{
private final Descriptor descriptor;
private final WeakReference<Owner> owner;
private List<? extends AutoCloseable> closeables;
private Runnable runOnClose;
private boolean isReplaced = false;
// a reference to our shared tidy instance, that
// we will release when we are ourselves released
private Ref<GlobalTidy> globalRef;
private GlobalTidy global;
private volatile boolean setup;
public void setup(SSTableReader reader, boolean trackHotness, Collection<? extends AutoCloseable> closeables)
{
// get a new reference to the shared descriptor-type tidy
this.globalRef = GlobalTidy.get(reader);
this.global = globalRef.get();
if (trackHotness)
global.ensureReadMeter();
this.closeables = new ArrayList<>(closeables);
// to avoid tidy seeing partial state, set setup=true at the end
this.setup = true;
}
private InstanceTidier(Descriptor descriptor, Owner owner)
{
this.descriptor = descriptor;
this.owner = new WeakReference<>(owner);
}
@Override
public void tidy()
{
if (logger.isTraceEnabled())
logger.trace("Running instance tidier for {} with setup {}", descriptor, setup);
// don't try to cleanup if the sstablereader was never fully constructed
if (!setup)
return;
final OpOrder.Barrier barrier;
Owner owner = this.owner.get();
if (owner != null)
{
barrier = owner.newReadOrderingBarrier();
barrier.issue();
}
else
{
barrier = null;
}
ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
{
public void run()
{
if (logger.isTraceEnabled())
logger.trace("Async instance tidier for {}, before barrier", descriptor);
if (barrier != null)
barrier.await();
if (logger.isTraceEnabled())
logger.trace("Async instance tidier for {}, after barrier", descriptor);
Throwable exceptions = null;
if (runOnClose != null) try
{
runOnClose.run();
}
catch (RuntimeException | Error ex)
{
logger.error("Failed to run on-close listeners for sstable " + descriptor.baseFile(), ex);
exceptions = ex;
}
Throwable closeExceptions = Throwables.close(null, Iterables.filter(closeables, Objects::nonNull));
if (closeExceptions != null)
{
logger.error("Failed to close some sstable components of " + descriptor.baseFile(), closeExceptions);
exceptions = Throwables.merge(exceptions, closeExceptions);
}
try
{
globalRef.release();
}
catch (RuntimeException | Error ex)
{
logger.error("Failed to release the global ref of " + descriptor.baseFile(), ex);
exceptions = Throwables.merge(exceptions, ex);
}
if (exceptions != null)
JVMStabilityInspector.inspectThrowable(exceptions);
if (logger.isTraceEnabled())
logger.trace("Async instance tidier for {}, completed", descriptor);
}
@Override
public String toString()
{
return "Tidy " + descriptor.ksname + '.' + descriptor.cfname + '-' + descriptor.id;
}
});
}
@Override
public String name()
{
return descriptor.toString();
}
}
/**
* One instance per logical sstable. This both tracks shared cleanup and some shared state related
* to the sstable's lifecycle.
* <p>
* All InstanceTidiers, on setup(), ask the static get() method for their shared state,
* and stash a reference to it to be released when they are. Once all such references are
* released, this shared tidy will be performed.
*/
static final class GlobalTidy implements Tidy
{
static final WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null);
// keyed by descriptor, mapping to the shared GlobalTidy for that descriptor
static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
private final Descriptor desc;
// the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
// at once also, for testing purposes
private RestorableMeter readMeter;
// the scheduled persistence of the readMeter, that we will cancel once all instances of this logical
// sstable have been released
private WeakReference<ScheduledFuture<?>> readMeterSyncFuture = NULL;
// shared state managing if the logical sstable has been compacted; this is used in cleanup
private volatile Runnable obsoletion;
GlobalTidy(final SSTableReader reader)
{
this.desc = reader.descriptor;
}
void ensureReadMeter()
{
if (readMeter != null)
return;
// Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
// the read meter when in client mode.
// Also, do not track read rates when running in client or tools mode (syncExecuter isn't available in these modes)
if (!TRACK_ACTIVITY || SchemaConstants.isLocalSystemKeyspace(desc.ksname) || DatabaseDescriptor.isClientOrToolInitialized())
{
readMeter = null;
readMeterSyncFuture = NULL;
return;
}
readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.id);
// sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
readMeterSyncFuture = new WeakReference<>(syncExecutor.scheduleAtFixedRate(this::maybePersistSSTableReadMeter, 1, 5, TimeUnit.MINUTES));
}
void maybePersistSSTableReadMeter()
{
if (obsoletion == null && DatabaseDescriptor.getSStableReadRatePersistenceEnabled())
{
meterSyncThrottle.acquire();
SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.id, readMeter);
}
}
private void stopReadMeterPersistence()
{
ScheduledFuture<?> readMeterSyncFutureLocal = readMeterSyncFuture.get();
if (readMeterSyncFutureLocal != null)
{
readMeterSyncFutureLocal.cancel(true);
readMeterSyncFuture = NULL;
}
}
public void tidy()
{
lookup.remove(desc);
if (obsoletion != null)
obsoletion.run();
// don't ideally want to dropPageCache for the file until all instances have been released
for (Component c : desc.discoverComponents())
NativeLibrary.trySkipCache(desc.fileFor(c).absolutePath(), 0, 0);
}
public String name()
{
return desc.toString();
}
// get a new reference to the shared GlobalTidy for this sstable
@SuppressWarnings("resource")
public static Ref<GlobalTidy> get(SSTableReader sstable)
{
Descriptor descriptor = sstable.descriptor;
while (true)
{
Ref<GlobalTidy> ref = lookup.get(descriptor);
if (ref == null)
{
final GlobalTidy tidy = new GlobalTidy(sstable);
ref = new Ref<>(tidy, tidy);
Ref<GlobalTidy> ex = lookup.putIfAbsent(descriptor, ref);
if (ex == null)
return ref;
ref = ex;
}
Ref<GlobalTidy> newRef = ref.tryRef();
if (newRef != null)
return newRef;
// raced with tidy
lookup.remove(descriptor, ref);
}
}
}
@VisibleForTesting
public static void resetTidying()
{
GlobalTidy.lookup.clear();
}
public static class PartitionPositionBounds
{
public final long lowerPosition;
public final long upperPosition;
public PartitionPositionBounds(long lower, long upper)
{
this.lowerPosition = lower;
this.upperPosition = upper;
}
@Override
public final int hashCode()
{
int hashCode = (int) lowerPosition ^ (int) (lowerPosition >>> 32);
return 31 * (hashCode ^ (int) ((int) upperPosition ^ (upperPosition >>> 32)));
}
@Override
public final boolean equals(Object o)
{
if (!(o instanceof PartitionPositionBounds))
return false;
PartitionPositionBounds that = (PartitionPositionBounds) o;
return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
}
}
public static class IndexesBounds
{
public final int lowerPosition;
public final int upperPosition;
public IndexesBounds(int lower, int upper)
{
this.lowerPosition = lower;
this.upperPosition = upper;
}
@Override
public final int hashCode()
{
return 31 * lowerPosition * upperPosition;
}
@Override
public final boolean equals(Object o)
{
if (!(o instanceof IndexesBounds))
return false;
IndexesBounds that = (IndexesBounds) o;
return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
}
}
/**
* Moves the sstable in oldDescriptor to a new place (with generation etc) in newDescriptor.
* <p>
* All components given will be moved/renamed
*/
public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components, boolean copyData)
{
if (!oldDescriptor.isCompatible())
throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
oldDescriptor.getFormat().getLatestVersion(),
oldDescriptor));
boolean isLive = cfs.getLiveSSTables().stream().anyMatch(r -> r.descriptor.equals(newDescriptor)
|| r.descriptor.equals(oldDescriptor));
if (isLive)
{
String message = String.format("Can't move and open a file that is already in use in the table %s -> %s", oldDescriptor, newDescriptor);
logger.error(message);
throw new RuntimeException(message);
}
if (newDescriptor.fileFor(Components.DATA).exists())
{
String msg = String.format("File %s already exists, can't move the file there", newDescriptor.fileFor(Components.DATA));
logger.error(msg);
throw new RuntimeException(msg);
}
if (copyData)
{
try
{
logger.info("Hardlinking new SSTable {} to {}", oldDescriptor, newDescriptor);
hardlink(oldDescriptor, newDescriptor, components);
}
catch (FSWriteError ex)
{
logger.warn("Unable to hardlink new SSTable {} to {}, falling back to copying", oldDescriptor, newDescriptor, ex);
copy(oldDescriptor, newDescriptor, components);
}
}
else
{
logger.info("Moving new SSTable {} to {}", oldDescriptor, newDescriptor);
rename(oldDescriptor, newDescriptor, components);
}
SSTableReader reader;
try
{
reader = open(cfs, newDescriptor, components, cfs.metadata);
}
catch (Throwable t)
{
logger.error("Aborting import of sstables. {} was corrupt", newDescriptor);
throw new RuntimeException(newDescriptor + " is corrupt, can't import", t);
}
return reader;
}
public static void shutdownBlocking(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
resetTidying();
}
/**
* @return the physical size on disk of all components for this SSTable in bytes
*/
public long bytesOnDisk()
{
return bytesOnDisk(false);
}
/**
* @return the total logical/uncompressed size in bytes of all components for this SSTable
*/
public long logicalBytesOnDisk()
{
return bytesOnDisk(true);
}
private long bytesOnDisk(boolean logical)
{
long bytes = 0;
for (Component component : components)
{
// Only the data file is compressable.
bytes += logical && component == Components.DATA && compression
? getCompressionMetadata().dataLength
: descriptor.fileFor(component).length();
}
return bytes;
}
@VisibleForTesting
public void maybePersistSSTableReadMeter()
{
tidy.global.maybePersistSSTableReadMeter();
}
/**
* Returns a new verifier for this sstable. Note that the reader must match the provided cfs.
*/
public abstract IVerifier getVerifier(ColumnFamilyStore cfs,
OutputHandler outputHandler,
boolean isOffline,
IVerifier.Options options);
/**
* A method to be called by {@link #getPosition(PartitionPosition, Operator, boolean, SSTableReadsListener)}
* and {@link #getRowIndexEntry(PartitionPosition, Operator, boolean, SSTableReadsListener)} methods when
* a searched key is found. It adds a trace message and notify the provided listener.
*/
protected void notifySelected(SSTableReadsListener.SelectionReason reason, SSTableReadsListener localListener, Operator op, boolean updateStats, AbstractRowIndexEntry entry)
{
reason.trace(descriptor, entry);
if (localListener != null)
localListener.onSSTableSelected(this, reason);
}
/**
* A method to be called by {@link #getPosition(PartitionPosition, Operator, boolean, SSTableReadsListener)}
* and {@link #getRowIndexEntry(PartitionPosition, Operator, boolean, SSTableReadsListener)} methods when
* a searched key is not found. It adds a trace message and notify the provided listener.
*/
protected void notifySkipped(SSTableReadsListener.SkippingReason reason, SSTableReadsListener localListener, Operator op, boolean updateStats)
{
reason.trace(descriptor);
if (localListener != null)
localListener.onSSTableSkipped(this, reason);
}
/**
* A builder of this sstable reader. It should be extended for each implementation of {@link SSTableReader} with
* the implementation specific fields.
*
* @param <R> type of the reader the builder creates
* @param <B> type of this builder
*/
public abstract static class Builder<R extends SSTableReader, B extends Builder<R, B>> extends SSTable.Builder<R, B>
{
private long maxDataAge;
private StatsMetadata statsMetadata;
private OpenReason openReason;
private SerializationHeader serializationHeader;
private FileHandle dataFile;
private DecoratedKey first;
private DecoratedKey last;
private boolean suspected;
public Builder(Descriptor descriptor)
{
super(descriptor);
}
public B setMaxDataAge(long maxDataAge)
{
Preconditions.checkArgument(maxDataAge >= 0);
this.maxDataAge = maxDataAge;
return (B) this;
}
public B setStatsMetadata(StatsMetadata statsMetadata)
{
Preconditions.checkNotNull(statsMetadata);
this.statsMetadata = statsMetadata;
return (B) this;
}
public B setOpenReason(OpenReason openReason)
{
Preconditions.checkNotNull(openReason);
this.openReason = openReason;
return (B) this;
}
public B setSerializationHeader(SerializationHeader serializationHeader)
{
this.serializationHeader = serializationHeader;
return (B) this;
}
public B setDataFile(FileHandle dataFile)
{
this.dataFile = dataFile;
return (B) this;
}
public B setFirst(DecoratedKey first)
{
this.first = first != null ? first.retainable() : null;
return (B) this;
}
public B setLast(DecoratedKey last)
{
this.last = last != null ? last.retainable() : null;
return (B) this;
}
public B setSuspected(boolean suspected)
{
this.suspected = suspected;
return (B) this;
}
public long getMaxDataAge()
{
return maxDataAge;
}
public StatsMetadata getStatsMetadata()
{
return statsMetadata;
}
public OpenReason getOpenReason()
{
return openReason;
}
public SerializationHeader getSerializationHeader()
{
return serializationHeader;
}
public FileHandle getDataFile()
{
return dataFile;
}
public DecoratedKey getFirst()
{
return first;
}
public DecoratedKey getLast()
{
return last;
}
public boolean isSuspected()
{
return suspected;
}
protected abstract R buildInternal(Owner owner);
public R build(Owner owner, boolean validate, boolean online)
{
R reader = buildInternal(owner);
try
{
if (isSuspected())
reader.markSuspect();
reader.setup(online);
if (validate)
reader.validate();
}
catch (RuntimeException | Error ex)
{
JVMStabilityInspector.inspectThrowable(ex);
reader.selfRef().release();
throw ex;
}
return reader;
}
}
}