blob: 358d408919bec027d5dcc7784430a2501e0ab125 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.LongPredicate;
import java.util.function.Function;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.RTBoundCloser;
import org.apache.cassandra.db.transform.RTBoundValidator;
import org.apache.cassandra.db.transform.RTBoundValidator.Stage;
import org.apache.cassandra.db.transform.StoppingTransformation;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.UnknownIndexException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.SchemaProvider;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.TimeUUID;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
/**
* General interface for storage-engine read commands (common to both range and
* single partition commands).
* <p>
* This contains all the informations needed to do a local read.
*/
public abstract class ReadCommand extends AbstractReadQuery
{
private static final int TEST_ITERATION_DELAY_MILLIS = Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", "0"));
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
// Expose the active command running so transitive calls can lookup this command.
// This is useful for a few reasons, but mainly because the CQL query is here.
private static final FastThreadLocal<ReadCommand> COMMAND = new FastThreadLocal<>();
private final Kind kind;
private final boolean isDigestQuery;
private final boolean acceptsTransient;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
private boolean trackWarnings;
@Nullable
private final IndexMetadata index;
protected static abstract class SelectionDeserializer
{
public abstract ReadCommand deserialize(DataInputPlus in,
int version,
boolean isDigest,
int digestVersion,
boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
IndexMetadata index) throws IOException;
}
protected enum Kind
{
SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer);
private final SelectionDeserializer selectionDeserializer;
Kind(SelectionDeserializer selectionDeserializer)
{
this.selectionDeserializer = selectionDeserializer;
}
}
protected ReadCommand(Kind kind,
boolean isDigestQuery,
int digestVersion,
boolean acceptsTransient,
TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
IndexMetadata index,
boolean trackWarnings)
{
super(metadata, nowInSec, columnFilter, rowFilter, limits);
if (acceptsTransient && isDigestQuery)
throw new IllegalArgumentException("Attempted to issue a digest response to transient replica");
this.kind = kind;
this.isDigestQuery = isDigestQuery;
this.digestVersion = digestVersion;
this.acceptsTransient = acceptsTransient;
this.index = index;
this.trackWarnings = trackWarnings;
}
public static ReadCommand getCommand()
{
return COMMAND.get();
}
protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
protected abstract long selectionSerializedSize(int version);
public abstract boolean isLimitedToOnePartition();
public abstract boolean isRangeRequest();
/**
* Creates a new <code>ReadCommand</code> instance with new limits.
*
* @param newLimits the new limits
* @return a new <code>ReadCommand</code> with the updated limits
*/
public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
/**
* The configured timeout for this command.
*
* @return the configured timeout for this command.
*/
public abstract long getTimeout(TimeUnit unit);
/**
* Whether this query is a digest one or not.
*
* @return Whether this query is a digest query.
*/
public boolean isDigestQuery()
{
return isDigestQuery;
}
/**
* If the query is a digest one, the requested digest version.
*
* @return the requested digest version if the query is a digest. Otherwise, this can return
* anything.
*/
public int digestVersion()
{
return digestVersion;
}
/**
* Sets the digest version, for when digest for that command is requested.
* <p>
* Note that we allow setting this independently of setting the command as a digest query as
* this allows us to use the command as a carrier of the digest version even if we only call
* setIsDigestQuery on some copy of it.
*
* @param digestVersion the version for the digest is this command is used for digest query..
* @return this read command.
*/
public ReadCommand setDigestVersion(int digestVersion)
{
this.digestVersion = digestVersion;
return this;
}
/**
* @return Whether this query expects only a transient data response, or a full response
*/
public boolean acceptsTransient()
{
return acceptsTransient;
}
@Override
public void trackWarnings()
{
trackWarnings = true;
}
public boolean isTrackingWarnings()
{
return trackWarnings;
}
/**
* Index (metadata) chosen for this query. Can be null.
*
* @return index (metadata) chosen for this query
*/
@Nullable
public IndexMetadata indexMetadata()
{
return index;
}
/**
* The clustering index filter this command to use for the provided key.
* <p>
* Note that that method should only be called on a key actually queried by this command
* and in practice, this will almost always return the same filter, but for the sake of
* paging, the filter on the first key of a range command might be slightly different.
*
* @param key a partition key queried by this command.
*
* @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
*/
public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
/**
* Returns a copy of this command.
*
* @return a copy of this command.
*/
public abstract ReadCommand copy();
/**
* Returns a copy of this command with acceptsTransient set to true.
*/
public ReadCommand copyAsTransientQuery(Replica replica)
{
Preconditions.checkArgument(replica.isTransient(),
"Can't make a transient request on a full replica: " + replica);
return copyAsTransientQuery();
}
/**
* Returns a copy of this command with acceptsTransient set to true.
*/
public ReadCommand copyAsTransientQuery(Iterable<Replica> replicas)
{
if (any(replicas, Replica::isFull))
throw new IllegalArgumentException("Can't make a transient request on full replicas: " + Iterables.toString(filter(replicas, Replica::isFull)));
return copyAsTransientQuery();
}
protected abstract ReadCommand copyAsTransientQuery();
/**
* Returns a copy of this command with isDigestQuery set to true.
*/
public ReadCommand copyAsDigestQuery(Replica replica)
{
Preconditions.checkArgument(replica.isFull(),
"Can't make a digest request on a transient replica " + replica);
return copyAsDigestQuery();
}
/**
* Returns a copy of this command with isDigestQuery set to true.
*/
public ReadCommand copyAsDigestQuery(Iterable<Replica> replicas)
{
if (any(replicas, Replica::isTransient))
throw new IllegalArgumentException("Can't make a digest request on a transient replica " + Iterables.toString(filter(replicas, Replica::isTransient)));
return copyAsDigestQuery();
}
protected abstract ReadCommand copyAsDigestQuery();
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public abstract boolean isReversed();
@SuppressWarnings("resource")
public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi)
{
// validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
// ends equal, and there are no dangling RT bound in any partition.
iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true);
return isDigestQuery()
? ReadResponse.createDigestResponse(iterator, this)
: ReadResponse.createDataResponse(iterator, this, rdi);
}
@SuppressWarnings("resource") // We don't need to close an empty iterator.
public ReadResponse createEmptyResponse()
{
UnfilteredPartitionIterator iterator = EmptyIterators.unfilteredPartition(metadata());
return isDigestQuery()
? ReadResponse.createDigestResponse(iterator, this)
: ReadResponse.createDataResponse(iterator, this, RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO);
}
long indexSerializedSize(int version)
{
return null != index
? IndexMetadata.serializer.serializedSize(index, version)
: 0;
}
public Index getIndex(ColumnFamilyStore cfs)
{
return null != index
? cfs.indexManager.getIndex(index)
: null;
}
static IndexMetadata findIndex(TableMetadata table, RowFilter rowFilter)
{
if (table.indexes.isEmpty() || rowFilter.isEmpty())
return null;
ColumnFamilyStore cfs = Keyspace.openAndGetStore(table);
Index index = cfs.indexManager.getBestIndexFor(rowFilter);
return null != index
? index.getIndexMetadata()
: null;
}
/**
* If the index manager for the CFS determines that there's an applicable
* 2i that can be used to execute this command, call its (optional)
* validation method to check that nothing in this command's parameters
* violates the implementation specific validation rules.
*/
public void maybeValidateIndex()
{
if (null != index)
IndexRegistry.obtain(metadata()).getIndex(index).validate(this);
}
/**
* Executes this command on the local host.
*
* @param executionController the execution controller spanning this command
*
* @return an iterator over the result of executing this command locally.
*/
@SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
// iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
long startTimeNanos = nanoTime();
COMMAND.set(this);
try
{
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
Index index = getIndex(cfs);
Index.Searcher searcher = null;
if (index != null)
{
if (!cfs.indexManager.isIndexQueryable(index))
throw new IndexNotAvailableException(index);
searcher = index.searcherFor(this);
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name);
}
UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
try
{
iterator = withQuerySizeTracking(iterator);
iterator = withStateTracking(iterator);
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
// If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
// no point in checking it again.
RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
/*
* TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
* we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
* would be more efficient (the sooner we discard stuff we know we don't care, the less useless
* processing we do on it).
*/
iterator = filter.filter(iterator, nowInSec());
// apply the limits/row counter; this transformation is stopping and would close the iterator as soon
// as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
// If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can
// optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform
if (executionController.isTrackingRepairedStatus())
{
DataLimits.Counter limit =
limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness());
iterator = limit.applyTo(iterator);
// ensure that a consistent amount of repaired data is read on each replica. This causes silent
// overreading from the repaired data set, up to limits(). The extra data is not visible to
// the caller, only iterated to produce the repaired data digest.
iterator = executionController.getRepairedDataInfo().extend(iterator, limit);
}
else
{
iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
}
// because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
return RTBoundCloser.close(iterator);
}
catch (RuntimeException | Error e)
{
iterator.close();
throw e;
}
}
finally
{
COMMAND.set(null);
}
}
protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
public ReadExecutionController executionController(boolean trackRepairedStatus)
{
return ReadExecutionController.forCommand(this, trackRepairedStatus);
}
public ReadExecutionController executionController()
{
return ReadExecutionController.forCommand(this, false);
}
/**
* Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
* This also log warning/trow TombstoneOverwhelmingException if appropriate.
*/
private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
{
class MetricRecording extends Transformation<UnfilteredRowIterator>
{
private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace);
private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();
private int liveRows = 0;
private int tombstones = 0;
private DecoratedKey currentKey;
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
{
currentKey = iter.partitionKey();
return Transformation.apply(iter, this);
}
@Override
public Row applyToStatic(Row row)
{
return applyToRow(row);
}
@Override
public Row applyToRow(Row row)
{
boolean hasTombstones = false;
for (Cell<?> cell : row.cells())
{
if (!cell.isLive(ReadCommand.this.nowInSec()))
{
countTombstone(row.clustering());
hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired
}
}
if (row.hasLiveData(ReadCommand.this.nowInSec(), enforceStrictLiveness))
++liveRows;
else if (!row.primaryKeyLivenessInfo().isLive(ReadCommand.this.nowInSec())
&& row.hasDeletion(ReadCommand.this.nowInSec())
&& !hasTombstones)
{
// We're counting primary key deletions only here.
countTombstone(row.clustering());
}
return row;
}
@Override
public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
countTombstone(marker.clustering());
return marker;
}
private void countTombstone(ClusteringPrefix<?> clustering)
{
++tombstones;
if (tombstones > failureThreshold && respectTombstoneThresholds)
{
String query = ReadCommand.this.toCQLString();
Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
metric.tombstoneFailures.inc();
if (trackWarnings)
{
MessageParams.remove(ParamType.TOMBSTONE_WARNING);
MessageParams.add(ParamType.TOMBSTONE_FAIL, tombstones);
}
throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
}
}
@Override
public void onClose()
{
recordLatency(metric, nanoTime() - startTimeNanos);
metric.tombstoneScannedHistogram.update(tombstones);
metric.liveScannedHistogram.update(liveRows);
boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
if (warnTombstones)
{
String msg = String.format(
"Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)",
liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken());
if (trackWarnings)
MessageParams.add(ParamType.TOMBSTONE_WARNING, tombstones);
else
ClientWarn.instance.warn(msg);
if (tombstones < failureThreshold)
{
metric.tombstoneWarnings.inc();
}
logger.warn(msg);
}
Tracing.trace("Read {} live rows and {} tombstone cells{}",
liveRows, tombstones,
(warnTombstones ? " (see tombstone_warn_threshold)" : ""));
}
}
return Transformation.apply(iter, new MetricRecording());
}
protected class CheckForAbort extends StoppingTransformation<UnfilteredRowIterator>
{
long lastChecked = 0;
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
if (maybeAbort())
{
partition.close();
return null;
}
return Transformation.apply(partition, this);
}
protected Row applyToRow(Row row)
{
if (TEST_ITERATION_DELAY_MILLIS > 0)
maybeDelayForTesting();
return maybeAbort() ? null : row;
}
private boolean maybeAbort()
{
/**
* TODO: this is not a great way to abort early; why not expressly limit checks to 10ms intervals?
* The value returned by approxTime.now() is updated only every
* {@link org.apache.cassandra.utils.MonotonicClock.SampledClock.CHECK_INTERVAL_MS}, by default 2 millis. Since MonitorableImpl
* relies on approxTime, we don't need to check unless the approximate time has elapsed.
*/
if (lastChecked == approxTime.now())
return false;
lastChecked = approxTime.now();
if (isAborted())
{
stop();
return true;
}
return false;
}
private void maybeDelayForTesting()
{
if (!metadata().keyspace.startsWith("system"))
FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
}
}
private boolean shouldTrackSize(DataStorageSpec.LongBytesBound warnThresholdBytes, DataStorageSpec.LongBytesBound abortThresholdBytes)
{
return trackWarnings
&& !SchemaConstants.isSystemKeyspace(metadata().keyspace)
&& !(warnThresholdBytes == null && abortThresholdBytes == null);
}
private UnfilteredPartitionIterator withQuerySizeTracking(UnfilteredPartitionIterator iterator)
{
DataStorageSpec.LongBytesBound warnThreshold = DatabaseDescriptor.getLocalReadSizeWarnThreshold();
DataStorageSpec.LongBytesBound failThreshold = DatabaseDescriptor.getLocalReadSizeFailThreshold();
if (!shouldTrackSize(warnThreshold, failThreshold))
return iterator;
final long warnBytes = warnThreshold == null ? -1 : warnThreshold.toBytes();
final long failBytes = failThreshold == null ? -1 : failThreshold.toBytes();
class QuerySizeTracking extends Transformation<UnfilteredRowIterator>
{
private long sizeInBytes = 0;
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
{
sizeInBytes += ObjectSizes.sizeOnHeapOf(iter.partitionKey().getKey());
return Transformation.apply(iter, this);
}
@Override
protected Row applyToStatic(Row row)
{
return applyToRow(row);
}
@Override
protected Row applyToRow(Row row)
{
addSize(row.unsharedHeapSize());
return row;
}
@Override
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
addSize(marker.unsharedHeapSize());
return marker;
}
@Override
protected DeletionTime applyToDeletion(DeletionTime deletionTime)
{
addSize(deletionTime.unsharedHeapSize());
return deletionTime;
}
private void addSize(long size)
{
this.sizeInBytes += size;
if (failBytes != -1 && this.sizeInBytes >= failBytes)
{
String msg = String.format("Query %s attempted to read %d bytes but max allowed is %s; query aborted (see local_read_size_fail_threshold)",
ReadCommand.this.toCQLString(), this.sizeInBytes, failThreshold);
Tracing.trace(msg);
MessageParams.remove(ParamType.LOCAL_READ_SIZE_WARN);
MessageParams.add(ParamType.LOCAL_READ_SIZE_FAIL, this.sizeInBytes);
throw new LocalReadSizeTooLargeException(msg);
}
else if (warnBytes != -1 && this.sizeInBytes >= warnBytes)
{
MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN, this.sizeInBytes);
}
}
@Override
protected void onClose()
{
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id);
if (cfs != null)
cfs.metric.localReadSize.update(sizeInBytes);
}
}
iterator = Transformation.apply(iterator, new QuerySizeTracking());
return iterator;
}
protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter)
{
return Transformation.apply(iter, new CheckForAbort());
}
/**
* Creates a message for this command.
*/
public Message<ReadCommand> createMessage(boolean trackRepairedData)
{
Message<ReadCommand> msg = trackRepairedData
? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA)
: Message.outWithFlag(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE);
if (trackWarnings)
msg = msg.withFlag(MessageFlag.TRACK_WARNINGS);
return msg;
}
public abstract Verb verb();
protected abstract void appendCQLWhereClause(StringBuilder sb);
// Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
// can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
// are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator,
ColumnFamilyStore cfs,
ReadExecutionController controller)
{
class WithoutPurgeableTombstones extends PurgeFunction
{
public WithoutPurgeableTombstones()
{
super(nowInSec(), cfs.gcBefore(nowInSec()), controller.oldestUnrepairedTombstone(),
cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
iterator.metadata().enforceStrictLiveness());
}
protected LongPredicate getPurgeEvaluator()
{
return time -> true;
}
}
return Transformation.apply(iterator, new WithoutPurgeableTombstones());
}
/**
* Return the queried token(s) for logging
*/
public abstract String loggableTokens();
// Monitorable interface
public String name()
{
return toCQLString();
}
@SuppressWarnings("resource") // resultant iterators are closed by their callers
InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller)
{
final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge =
(unfilteredRowIterators, repairedDataInfo) -> {
UnfilteredRowIterator repaired = UnfilteredRowIterators.merge(unfilteredRowIterators);
return repairedDataInfo.withRepairedDataInfo(repaired);
};
// For single partition reads, after reading up to the command's DataLimit nothing extra is required.
// The merged & repaired row iterator will be consumed until it's exhausted or the RepairedDataInfo's
// internal counter is satisfied
final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions =
(rows) -> EmptyIterators.unfilteredPartition(metadata());
return new InputCollector<>(view, controller, merge, postLimitPartitions);
}
@SuppressWarnings("resource") // resultant iterators are closed by their callers
InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller)
{
final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge =
(unfilteredPartitionIterators, repairedDataInfo) -> {
UnfilteredPartitionIterator repaired = UnfilteredPartitionIterators.merge(unfilteredPartitionIterators,
NOOP);
return repairedDataInfo.withRepairedDataInfo(repaired);
};
// Uses identity function to provide additional partitions to be consumed after the command's
// DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions
// which we'll keep reading until the RepairedDataInfo's internal counter is satisfied.
return new InputCollector<>(view, controller, merge, Function.identity());
}
/**
* Handles the collation of unfiltered row or partition iterators that comprise the
* input for a query. Separates them according to repaired status and of repaired
* status is being tracked, handles the merge and wrapping in a digest generator of
* the repaired iterators.
*
* Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks
* as this prematurely closes the underlying iterators
*/
static class InputCollector<T extends AutoCloseable>
{
final RepairedDataInfo repairedDataInfo;
private final boolean isTrackingRepairedStatus;
Set<SSTableReader> repairedSSTables;
BiFunction<List<T>, RepairedDataInfo, T> repairedMerger;
Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions;
List<T> repairedIters;
List<T> unrepairedIters;
InputCollector(ColumnFamilyStore.ViewFragment view,
ReadExecutionController controller,
BiFunction<List<T>, RepairedDataInfo, T> repairedMerger,
Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions)
{
this.repairedDataInfo = controller.getRepairedDataInfo();
this.isTrackingRepairedStatus = controller.isTrackingRepairedStatus();
if (isTrackingRepairedStatus)
{
for (SSTableReader sstable : view.sstables)
{
if (considerRepairedForTracking(sstable))
{
if (repairedSSTables == null)
repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size());
repairedSSTables.add(sstable);
}
}
}
if (repairedSSTables == null)
{
repairedIters = Collections.emptyList();
unrepairedIters = new ArrayList<>(view.sstables.size());
}
else
{
repairedIters = new ArrayList<>(repairedSSTables.size());
// when we're done collating, we'll merge the repaired iters and add the
// result to the unrepaired list, so size that list accordingly
unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1);
}
this.repairedMerger = repairedMerger;
this.postLimitAdditionalPartitions = postLimitAdditionalPartitions;
}
void addMemtableIterator(T iter)
{
unrepairedIters.add(iter);
}
void addSSTableIterator(SSTableReader sstable, T iter)
{
if (repairedSSTables != null && repairedSSTables.contains(sstable))
repairedIters.add(iter);
else
unrepairedIters.add(iter);
}
@SuppressWarnings("resource") // the returned iterators are closed by the caller
List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone)
{
if (repairedIters.isEmpty())
return unrepairedIters;
// merge the repaired data before returning, wrapping in a digest generator
repairedDataInfo.prepare(cfs, nowInSec, oldestUnrepairedTombstone);
T repairedIter = repairedMerger.apply(repairedIters, repairedDataInfo);
repairedDataInfo.finalize(postLimitAdditionalPartitions.apply(repairedIter));
unrepairedIters.add(repairedIter);
return unrepairedIters;
}
boolean isEmpty()
{
return repairedIters.isEmpty() && unrepairedIters.isEmpty();
}
// For tracking purposes we consider data repaired if the sstable is either:
// * marked repaired
// * marked pending, but the local session has been committed. This reduces the window
// whereby the tracking is affected by compaction backlog causing repaired sstables to
// remain in the pending state
// If an sstable is involved in a pending repair which is not yet committed, we mark the
// repaired data info inconclusive, as the same data on other replicas may be in a
// slightly different state.
private boolean considerRepairedForTracking(SSTableReader sstable)
{
if (!isTrackingRepairedStatus)
return false;
TimeUUID pendingRepair = sstable.getPendingRepair();
if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
{
if (ActiveRepairService.instance.consistent.local.isSessionFinalized(pendingRepair))
return true;
// In the edge case where compaction is backed up long enough for the session to
// timeout and be purged by LocalSessions::cleanup, consider the sstable unrepaired
// as it will be marked unrepaired when compaction catches up
if (!ActiveRepairService.instance.consistent.local.sessionExists(pendingRepair))
return false;
repairedDataInfo.markInconclusive();
}
return sstable.isRepaired();
}
void markInconclusive()
{
repairedDataInfo.markInconclusive();
}
public void close() throws Exception
{
FBUtilities.closeAll(unrepairedIters);
FBUtilities.closeAll(repairedIters);
}
}
@VisibleForTesting
public static class Serializer implements IVersionedSerializer<ReadCommand>
{
private final SchemaProvider schema;
public Serializer()
{
this(Schema.instance);
}
@VisibleForTesting
public Serializer(SchemaProvider schema)
{
this.schema = Objects.requireNonNull(schema, "schema");
}
private static int digestFlag(boolean isDigest)
{
return isDigest ? 0x01 : 0;
}
private static boolean isDigest(int flags)
{
return (flags & 0x01) != 0;
}
private static boolean acceptsTransient(int flags)
{
return (flags & 0x08) != 0;
}
private static int acceptsTransientFlag(boolean acceptsTransient)
{
return acceptsTransient ? 0x08 : 0;
}
// We don't set this flag anymore, but still look if we receive a
// command with it set in case someone is using thrift a mixed 3.0/4.0+
// cluster (which is unsupported). This is also a reminder for not
// re-using this flag until we drop 3.0/3.X compatibility (since it's
// used by these release for thrift and would thus confuse things)
private static boolean isForThrift(int flags)
{
return (flags & 0x02) != 0;
}
private static int indexFlag(boolean hasIndex)
{
return hasIndex ? 0x04 : 0;
}
private static boolean hasIndex(int flags)
{
return (flags & 0x04) != 0;
}
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
out.writeByte(command.kind.ordinal());
out.writeByte(
digestFlag(command.isDigestQuery())
| indexFlag(null != command.indexMetadata())
| acceptsTransientFlag(command.acceptsTransient())
);
if (command.isDigestQuery())
out.writeUnsignedVInt(command.digestVersion());
command.metadata().id.serialize(out);
out.writeInt(command.nowInSec());
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
RowFilter.serializer.serialize(command.rowFilter(), out, version);
DataLimits.serializer.serialize(command.limits(), out, version, command.metadata().comparator);
if (null != command.index)
IndexMetadata.serializer.serialize(command.index, out, version);
command.serializeSelection(out, version);
}
public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
{
Kind kind = Kind.values()[in.readByte()];
int flags = in.readByte();
boolean isDigest = isDigest(flags);
boolean acceptsTransient = acceptsTransient(flags);
// Shouldn't happen or it's a user error (see comment above) but
// better complain loudly than doing the wrong thing.
if (isForThrift(flags))
throw new IllegalStateException("Received a command with the thrift flag set. "
+ "This means thrift is in use in a mixed 3.0/3.X and 4.0+ cluster, "
+ "which is unsupported. Make sure to stop using thrift before "
+ "upgrading to 4.0");
boolean hasIndex = hasIndex(flags);
int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
TableMetadata metadata = schema.getExistingTableMetadata(TableId.deserialize(in));
int nowInSec = in.readInt();
ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata);
IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index);
}
private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException
{
try
{
return IndexMetadata.serializer.deserialize(in, version, metadata);
}
catch (UnknownIndexException e)
{
logger.info("Couldn't find a defined index on {}.{} with the id {}. " +
"If an index was just created, this is likely due to the schema not " +
"being fully propagated. Local read will proceed without using the " +
"index. Please wait for schema agreement after index creation.",
metadata.keyspace, metadata.name, e.indexId);
return null;
}
}
public long serializedSize(ReadCommand command, int version)
{
return 2 // kind + flags
+ (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
+ command.metadata().id.serializedSize()
+ TypeSizes.sizeof(command.nowInSec())
+ ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ RowFilter.serializer.serializedSize(command.rowFilter(), version)
+ DataLimits.serializer.serializedSize(command.limits(), version, command.metadata().comparator)
+ command.selectionSerializedSize(version)
+ command.indexSerializedSize(version);
}
}
}