| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiFunction; |
| import java.util.function.Function; |
| import java.util.function.LongPredicate; |
| import java.util.stream.Collectors; |
| |
| import javax.annotation.Nullable; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import accord.primitives.Seekable; |
| import accord.utils.Invariants; |
| |
| import org.apache.cassandra.config.CassandraRelevantProperties; |
| import org.apache.cassandra.config.Config; |
| import org.apache.cassandra.config.DataStorageSpec; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.cql3.CqlBuilder; |
| import org.apache.cassandra.cql3.statements.SelectOptions; |
| import org.apache.cassandra.db.filter.ClusteringIndexFilter; |
| import org.apache.cassandra.db.filter.ColumnFilter; |
| import org.apache.cassandra.db.filter.DataLimits; |
| import org.apache.cassandra.db.filter.LocalReadSizeTooLargeException; |
| import org.apache.cassandra.db.filter.RowFilter; |
| import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; |
| import org.apache.cassandra.db.partitions.PurgeFunction; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; |
| import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; |
| import org.apache.cassandra.db.rows.Cell; |
| import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; |
| import org.apache.cassandra.db.rows.RangeTombstoneMarker; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.Rows; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterator; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterators; |
| import org.apache.cassandra.db.transform.BasePartitions; |
| import org.apache.cassandra.db.transform.BaseRows; |
| import org.apache.cassandra.db.transform.RTBoundCloser; |
| import org.apache.cassandra.db.transform.RTBoundValidator; |
| import org.apache.cassandra.db.transform.RTBoundValidator.Stage; |
| import org.apache.cassandra.db.transform.StoppingTransformation; |
| import org.apache.cassandra.db.transform.Transformation; |
| import org.apache.cassandra.exceptions.CoordinatorBehindException; |
| import org.apache.cassandra.exceptions.QueryCancelledException; |
| import org.apache.cassandra.exceptions.UnknownIndexException; |
| import org.apache.cassandra.exceptions.UnknownTableException; |
| import org.apache.cassandra.index.Index; |
| import org.apache.cassandra.index.IndexRegistry; |
| import org.apache.cassandra.io.IVersionedSerializer; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.locator.Replica; |
| import org.apache.cassandra.metrics.TCMMetrics; |
| import org.apache.cassandra.metrics.TableMetrics; |
| import org.apache.cassandra.net.Message; |
| import org.apache.cassandra.net.MessageFlag; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.net.ParamType; |
| import org.apache.cassandra.net.Verb; |
| import org.apache.cassandra.schema.IndexMetadata; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.SchemaConstants; |
| import org.apache.cassandra.schema.SchemaProvider; |
| import org.apache.cassandra.schema.TableId; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.service.ActiveRepairService; |
| import org.apache.cassandra.service.ClientWarn; |
| import org.apache.cassandra.service.accord.serializers.TableMetadatas; |
| import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter; |
| import org.apache.cassandra.tcm.ClusterMetadata; |
| import org.apache.cassandra.tcm.Epoch; |
| import org.apache.cassandra.tracing.Tracing; |
| import org.apache.cassandra.transport.Dispatcher; |
| import org.apache.cassandra.utils.CassandraUInt; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.NoSpamLogger; |
| import org.apache.cassandra.utils.ObjectSizes; |
| import org.apache.cassandra.utils.TimeUUID; |
| |
| import io.netty.util.concurrent.FastThreadLocal; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.collect.Iterables.any; |
| import static com.google.common.collect.Iterables.filter; |
| import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; |
| import static org.apache.cassandra.utils.Clock.Global.nanoTime; |
| import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; |
| |
| /** |
| * General interface for storage-engine read commands (common to both range and |
| * single partition commands). |
| * <p> |
| * This contains all the informations needed to do a local read. |
| */ |
| public abstract class ReadCommand extends AbstractReadQuery |
| { |
| private static final int TEST_ITERATION_DELAY_MILLIS = CassandraRelevantProperties.TEST_READ_ITERATION_DELAY_MS.getInt(); |
| |
| protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); |
| public static final Serializer serializer = new Serializer(); |
| |
| public enum PotentialTxnConflicts |
| { |
| /** |
| * Check for and raise an error if this operation should have been transactionally managed. For use |
| * by queries that aren't issued by a transaction system managing potential conflicts in contexts where |
| * conflicts would be a problem. |
| */ |
| DISALLOW(false), |
| |
| /** |
| * Don't check or raise an error if this operation could conflict with transactions. For use when the thing |
| * being managed doesn't support transactions or the operation is being done by a transaction that is already |
| * managing any potential conflicts. |
| */ |
| ALLOW(true); |
| |
| public final boolean allowed; |
| |
| PotentialTxnConflicts(boolean allowed) |
| { |
| this.allowed = allowed; |
| } |
| } |
| |
| // Expose the active command running so transitive calls can lookup this command. |
| // This is useful for a few reasons, but mainly because the CQL query is here. |
| private static final FastThreadLocal<ReadCommand> COMMAND = new FastThreadLocal<>(); |
| |
| private final Kind kind; |
| |
| private final boolean isDigestQuery; |
| private final boolean acceptsTransient; |
| private final Epoch serializedAtEpoch; |
| private final PotentialTxnConflicts potentialTxnConflicts; |
| |
| // if a digest query, the version for which the digest is expected. Ignored if not a digest. |
| private int digestVersion; |
| |
| private boolean trackWarnings; |
| |
| protected final DataRange dataRange; |
| |
| @Nullable |
| private final Index.QueryPlan indexQueryPlan; |
| |
| protected static abstract class SelectionDeserializer |
| { |
| public abstract ReadCommand deserialize(DataInputPlus in, |
| int version, |
| Epoch serializedAtEpoch, |
| boolean isDigest, |
| int digestVersion, |
| boolean acceptsTransient, |
| PotentialTxnConflicts potentialTxnConflicts, |
| TableMetadata metadata, |
| long nowInSec, |
| ColumnFilter columnFilter, |
| RowFilter rowFilter, |
| DataLimits limits, |
| Index.QueryPlan indexQueryPlan) throws IOException; |
| } |
| |
| protected enum Kind |
| { |
| SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer, SinglePartitionReadCommand.accordSelectionDeserializer), |
| PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer, ignore -> PartitionRangeReadCommand.selectionDeserializer); |
| |
| private final SelectionDeserializer selectionDeserializer; |
| private final Function<Seekable, SelectionDeserializer> accordSelectionDeserializer; |
| |
| Kind(SelectionDeserializer selectionDeserializer, Function<Seekable, SelectionDeserializer> accordSelectionDeserializer) |
| { |
| this.selectionDeserializer = selectionDeserializer; |
| this.accordSelectionDeserializer = accordSelectionDeserializer; |
| } |
| } |
| |
| protected ReadCommand(Epoch serializedAtEpoch, |
| Kind kind, |
| boolean isDigestQuery, |
| int digestVersion, |
| boolean acceptsTransient, |
| PotentialTxnConflicts potentialTxnConflicts, |
| TableMetadata metadata, |
| long nowInSec, |
| ColumnFilter columnFilter, |
| RowFilter rowFilter, |
| DataLimits limits, |
| Index.QueryPlan indexQueryPlan, |
| boolean trackWarnings, |
| DataRange dataRange) |
| { |
| super(metadata, nowInSec, columnFilter, rowFilter, limits); |
| if (acceptsTransient && isDigestQuery) |
| throw new IllegalArgumentException("Attempted to issue a digest response to transient replica"); |
| |
| this.kind = kind; |
| this.isDigestQuery = isDigestQuery; |
| this.digestVersion = digestVersion; |
| this.acceptsTransient = acceptsTransient; |
| this.indexQueryPlan = indexQueryPlan; |
| this.potentialTxnConflicts = potentialTxnConflicts; |
| this.trackWarnings = trackWarnings; |
| this.serializedAtEpoch = serializedAtEpoch; |
| this.dataRange = dataRange; |
| } |
| |
| public static ReadCommand getCommand() |
| { |
| return COMMAND.get(); |
| } |
| |
| protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException; |
| protected abstract void serializeSelectionWithoutKey(DataOutputPlus out, int version) throws IOException; |
| protected abstract long selectionSerializedSize(int version); |
| |
| public abstract boolean isLimitedToOnePartition(); |
| |
| public abstract boolean isRangeRequest(); |
| |
| /** |
| * Creates a new <code>ReadCommand</code> instance with new limits. |
| * |
| * @param newLimits the new limits |
| * @return a new <code>ReadCommand</code> with the updated limits |
| */ |
| public abstract ReadCommand withUpdatedLimit(DataLimits newLimits); |
| |
| /** |
| * The configured timeout for this command. |
| * |
| * @return the configured timeout for this command. |
| */ |
| public abstract long getTimeout(TimeUnit unit); |
| |
| /** |
| * Whether this query is a digest one or not. |
| * |
| * @return Whether this query is a digest query. |
| */ |
| public boolean isDigestQuery() |
| { |
| return isDigestQuery; |
| } |
| |
| /** |
| * the schema version on the table when serializing this read command |
| * @return |
| */ |
| public Epoch serializedAtEpoch() |
| { |
| return serializedAtEpoch; |
| } |
| |
| /** |
| * If the query is a digest one, the requested digest version. |
| * |
| * @return the requested digest version if the query is a digest. Otherwise, this can return |
| * anything. |
| */ |
| public int digestVersion() |
| { |
| return digestVersion; |
| } |
| |
| /** |
| * Sets the digest version, for when digest for that command is requested. |
| * <p> |
| * Note that we allow setting this independently of setting the command as a digest query as |
| * this allows us to use the command as a carrier of the digest version even if we only call |
| * setIsDigestQuery on some copy of it. |
| * |
| * @param digestVersion the version for the digest is this command is used for digest query.. |
| * @return this read command. |
| */ |
| public ReadCommand setDigestVersion(int digestVersion) |
| { |
| this.digestVersion = digestVersion; |
| return this; |
| } |
| |
| /** |
| * @return Whether this query expects only a transient data response, or a full response |
| */ |
| public boolean acceptsTransient() |
| { |
| return acceptsTransient; |
| } |
| |
| @Override |
| public void trackWarnings() |
| { |
| trackWarnings = true; |
| } |
| |
| public boolean isTrackingWarnings() |
| { |
| return trackWarnings; |
| } |
| |
| /** |
| * Index query plan chosen for this query. Can be null. |
| * |
| * @return index query plan chosen for this query |
| */ |
| @Override |
| @Nullable |
| public Index.QueryPlan indexQueryPlan() |
| { |
| return indexQueryPlan; |
| } |
| |
| @Override |
| public boolean isTopK() |
| { |
| return indexQueryPlan != null && indexQueryPlan.isTopK(); |
| } |
| |
| @VisibleForTesting |
| public Index.Searcher indexSearcher() |
| { |
| return indexQueryPlan == null ? null : indexQueryPlan.searcherFor(this); |
| } |
| |
| /** |
| * The clustering index filter this command to use for the provided key. |
| * <p> |
| * Note that that method should only be called on a key actually queried by this command |
| * and in practice, this will almost always return the same filter, but for the sake of |
| * paging, the filter on the first key of a range command might be slightly different. |
| * |
| * @param key a partition key queried by this command. |
| * |
| * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}. |
| */ |
| public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key); |
| |
| @Override |
| public DataRange dataRange() |
| { |
| return dataRange; |
| } |
| |
| /** |
| * Returns a copy of this command. |
| * |
| * @return a copy of this command. |
| */ |
| public abstract ReadCommand copy(); |
| |
| /** |
| * Returns a copy of this command with acceptsTransient set to true. |
| */ |
| public ReadCommand copyAsTransientQuery(Replica replica) |
| { |
| checkArgument(replica.isTransient(), |
| "Can't make a transient request on a full replica: " + replica); |
| return copyAsTransientQuery(); |
| } |
| |
| /** |
| * Returns a copy of this command with acceptsTransient set to true. |
| */ |
| public ReadCommand copyAsTransientQuery(Iterable<Replica> replicas) |
| { |
| if (any(replicas, Replica::isFull)) |
| throw new IllegalArgumentException("Can't make a transient request on full replicas: " + Iterables.toString(filter(replicas, Replica::isFull))); |
| return copyAsTransientQuery(); |
| } |
| |
| protected abstract ReadCommand copyAsTransientQuery(); |
| |
| /** |
| * Returns a copy of this command with isDigestQuery set to true. |
| */ |
| public ReadCommand copyAsDigestQuery(Replica replica) |
| { |
| checkArgument(replica.isFull(), |
| "Can't make a digest request on a transient replica " + replica); |
| return copyAsDigestQuery(); |
| } |
| |
| /** |
| * Returns a copy of this command with isDigestQuery set to true. |
| */ |
| public ReadCommand copyAsDigestQuery(Iterable<Replica> replicas) |
| { |
| if (any(replicas, Replica::isTransient)) |
| throw new IllegalArgumentException("Can't make a digest request on a transient replica " + Iterables.toString(filter(replicas, Replica::isTransient))); |
| |
| return copyAsDigestQuery(); |
| } |
| |
| protected abstract ReadCommand copyAsDigestQuery(); |
| |
| protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController); |
| |
| /** |
| * Whether the underlying {@code ClusteringIndexFilter} is reversed or not. |
| * |
| * @return whether the underlying {@code ClusteringIndexFilter} is reversed or not. |
| */ |
| public abstract boolean isReversed(); |
| |
| public ReadResponse createResponse(UnfilteredPartitionIterator iterator, RepairedDataInfo rdi) |
| { |
| // validate that the sequence of RT markers is correct: open is followed by close, deletion times for both |
| // ends equal, and there are no dangling RT bound in any partition. |
| iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true); |
| |
| return isDigestQuery() |
| ? ReadResponse.createDigestResponse(iterator, this) |
| : ReadResponse.createDataResponse(iterator, this, rdi); |
| } |
| |
| public ReadResponse createEmptyResponse() |
| { |
| UnfilteredPartitionIterator iterator = EmptyIterators.unfilteredPartition(metadata()); |
| |
| return isDigestQuery() |
| ? ReadResponse.createDigestResponse(iterator, this) |
| : ReadResponse.createDataResponse(iterator, this, RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO); |
| } |
| |
| long indexSerializedSize(int version) |
| { |
| return null != indexQueryPlan |
| ? IndexMetadata.serializer.serializedSize(indexQueryPlan.getFirst().getIndexMetadata(), version) |
| : 0; |
| } |
| |
| static Index.QueryPlan findIndexQueryPlan(TableMetadata table, RowFilter rowFilter) |
| { |
| if (table.indexes.isEmpty() || rowFilter.isEmpty()) |
| return null; |
| |
| ColumnFamilyStore cfs = Keyspace.openAndGetStore(table); |
| |
| return cfs.indexManager.getBestIndexQueryPlanFor(rowFilter); |
| } |
| |
| /** |
| * If the index manager for the CFS determines that there's an applicable |
| * 2i that can be used to execute this command, call its (optional) |
| * validation method to check that nothing in this command's parameters |
| * violates the implementation specific validation rules. |
| */ |
| @Override |
| public void maybeValidateIndex(SelectOptions selectOptions) |
| { |
| if (null != indexQueryPlan) |
| { |
| indexQueryPlan.validate(this); |
| } |
| |
| selectOptions.validate(metadata(), IndexRegistry.obtain(metadata()), indexQueryPlan()); |
| } |
| |
| /** |
| * Executes this command on the local host. |
| * |
| * @param executionController the execution controller spanning this command |
| * |
| * @return an iterator over the result of executing this command locally. |
| */ |
| // iterators created inside the try as long as we do close the original resultIterator), or by closing the result. |
| public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) |
| { |
| return executeLocally(executionController, null); |
| } |
| |
| // ClusterMetadata is null on startup when there are local reads from system tables before it's initialized |
| public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, @Nullable ClusterMetadata cm) |
| { |
| long startTimeNanos = nanoTime(); |
| |
| COMMAND.set(this); |
| try |
| { |
| ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); |
| if (!potentialTxnConflicts.allowed) |
| ConsensusRequestRouter.validateSafeToReadNonTransactionally(this, cm); |
| Index.QueryPlan indexQueryPlan = indexQueryPlan(); |
| |
| Index.Searcher searcher = null; |
| if (indexQueryPlan != null) |
| { |
| cfs.indexManager.checkQueryability(indexQueryPlan); |
| |
| searcher = indexQueryPlan.searcherFor(this); |
| Tracing.trace("Executing read on {}.{} using index{} {}", |
| cfs.metadata.keyspace, |
| cfs.metadata.name, |
| indexQueryPlan.getIndexes().size() == 1 ? "" : "es", |
| indexQueryPlan.getIndexes() |
| .stream() |
| .map(i -> i.getIndexMetadata().name) |
| .collect(Collectors.joining(","))); |
| } |
| |
| UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController); |
| iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false); |
| |
| try |
| { |
| iterator = withQuerySizeTracking(iterator); |
| iterator = maybeSlowDownForTesting(iterator); |
| iterator = withQueryCancellation(iterator); |
| iterator = maybeRecordPurgeableTombstones(iterator, cfs); |
| iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, executionController), Stage.PURGED, false); |
| iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos); |
| |
| // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so |
| // no point in checking it again. |
| RowFilter filter = (null == searcher) ? rowFilter() : indexQueryPlan.postIndexQueryFilter(); |
| |
| /* |
| * TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, |
| * we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it |
| * would be more efficient (the sooner we discard stuff we know we don't care, the less useless |
| * processing we do on it). |
| */ |
| iterator = filter.filter(iterator, nowInSec()); |
| |
| // apply the limits/row counter; this transformation is stopping and would close the iterator as soon |
| // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included. |
| // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can |
| // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform |
| if (executionController.isTrackingRepairedStatus()) |
| { |
| DataLimits.Counter limit = |
| limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness()); |
| iterator = limit.applyTo(iterator); |
| // ensure that a consistent amount of repaired data is read on each replica. This causes silent |
| // overreading from the repaired data set, up to limits(). The extra data is not visible to |
| // the caller, only iterated to produce the repaired data digest. |
| iterator = executionController.getRepairedDataInfo().extend(iterator, limit); |
| } |
| else |
| { |
| iterator = limits().filter(iterator, nowInSec(), selectsFullPartition()); |
| } |
| |
| // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter. |
| return RTBoundCloser.close(iterator); |
| } |
| catch (RuntimeException | Error e) |
| { |
| iterator.close(); |
| throw e; |
| } |
| } |
| finally |
| { |
| COMMAND.set(null); |
| } |
| } |
| |
| protected abstract void recordLatency(TableMetrics metric, long latencyNanos); |
| |
| public ReadExecutionController executionController(boolean trackRepairedStatus) |
| { |
| return ReadExecutionController.forCommand(this, trackRepairedStatus); |
| } |
| |
| public ReadExecutionController executionController() |
| { |
| return ReadExecutionController.forCommand(this, false); |
| } |
| |
| public PotentialTxnConflicts potentialTxnConflicts() |
| { |
| return potentialTxnConflicts; |
| } |
| |
| /** |
| * Wraps the provided iterator so that metrics on what is scanned by the command are recorded. |
| * This also log warning/trow TombstoneOverwhelmingException if appropriate. |
| */ |
| private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos) |
| { |
| class MetricRecording extends Transformation<UnfilteredRowIterator> |
| { |
| private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); |
| private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); |
| |
| private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace); |
| private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness(); |
| |
| private int liveRows = 0; |
| private int lastReportedLiveRows = 0; |
| private int tombstones = 0; |
| private int lastReportedTombstones = 0; |
| |
| private DecoratedKey currentKey; |
| |
| @Override |
| public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) |
| { |
| currentKey = iter.partitionKey(); |
| return Transformation.apply(iter, this); |
| } |
| |
| @Override |
| public Row applyToStatic(Row row) |
| { |
| return applyToRow(row); |
| } |
| |
| @Override |
| public Row applyToRow(Row row) |
| { |
| boolean hasTombstones = false; |
| for (Cell<?> cell : row.cells()) |
| { |
| if (!cell.isLive(ReadCommand.this.nowInSec())) |
| { |
| countTombstone(row.clustering()); |
| hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired |
| } |
| } |
| |
| if (row.hasLiveData(ReadCommand.this.nowInSec(), enforceStrictLiveness)) |
| ++liveRows; |
| else if (!row.primaryKeyLivenessInfo().isLive(ReadCommand.this.nowInSec()) |
| && row.hasDeletion(ReadCommand.this.nowInSec()) |
| && !hasTombstones) |
| { |
| // We're counting primary key deletions only here. |
| countTombstone(row.clustering()); |
| } |
| |
| return row; |
| } |
| |
| @Override |
| public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) |
| { |
| countTombstone(marker.clustering()); |
| return marker; |
| } |
| |
| private void countTombstone(ClusteringPrefix<?> clustering) |
| { |
| ++tombstones; |
| if (tombstones > failureThreshold && respectTombstoneThresholds) |
| { |
| String query = ReadCommand.this.toCQLString(); |
| Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); |
| metric.tombstoneFailures.inc(); |
| if (trackWarnings) |
| { |
| MessageParams.remove(ParamType.TOMBSTONE_WARNING); |
| MessageParams.add(ParamType.TOMBSTONE_FAIL, tombstones); |
| } |
| throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); |
| } |
| } |
| |
| @Override |
| protected void onPartitionClose() |
| { |
| int lr = liveRows - lastReportedLiveRows; |
| int ts = tombstones - lastReportedTombstones; |
| |
| if (lr > 0) |
| metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr); |
| |
| if (ts > 0) |
| metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts); |
| |
| lastReportedLiveRows = liveRows; |
| lastReportedTombstones = tombstones; |
| } |
| |
| @Override |
| public void onClose() |
| { |
| recordLatency(metric, nanoTime() - startTimeNanos); |
| |
| metric.tombstoneScannedHistogram.update(tombstones); |
| metric.liveScannedHistogram.update(liveRows); |
| |
| boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; |
| if (warnTombstones) |
| { |
| String msg = String.format( |
| "Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)", |
| liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken()); |
| if (trackWarnings) |
| MessageParams.add(ParamType.TOMBSTONE_WARNING, tombstones); |
| else |
| ClientWarn.instance.warn(msg); |
| if (tombstones < failureThreshold) |
| { |
| metric.tombstoneWarnings.inc(); |
| } |
| |
| logger.warn(msg); |
| } |
| |
| Tracing.trace("Read {} live rows and {} tombstone cells{}", |
| liveRows, tombstones, |
| (warnTombstones ? " (see tombstone_warn_threshold)" : "")); |
| } |
| } |
| |
| return Transformation.apply(iter, new MetricRecording()); |
| } |
| |
| private boolean shouldTrackSize(DataStorageSpec.LongBytesBound warnThresholdBytes, DataStorageSpec.LongBytesBound abortThresholdBytes) |
| { |
| return trackWarnings |
| && !SchemaConstants.isSystemKeyspace(metadata().keyspace) |
| && !(warnThresholdBytes == null && abortThresholdBytes == null); |
| } |
| |
| private UnfilteredPartitionIterator withQuerySizeTracking(UnfilteredPartitionIterator iterator) |
| { |
| DataStorageSpec.LongBytesBound warnThreshold = DatabaseDescriptor.getLocalReadSizeWarnThreshold(); |
| DataStorageSpec.LongBytesBound failThreshold = DatabaseDescriptor.getLocalReadSizeFailThreshold(); |
| if (!shouldTrackSize(warnThreshold, failThreshold)) |
| return iterator; |
| final long warnBytes = warnThreshold == null ? -1 : warnThreshold.toBytes(); |
| final long failBytes = failThreshold == null ? -1 : failThreshold.toBytes(); |
| class QuerySizeTracking extends Transformation<UnfilteredRowIterator> |
| { |
| private long sizeInBytes = 0; |
| |
| @Override |
| public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) |
| { |
| sizeInBytes += ObjectSizes.sizeOnHeapOf(iter.partitionKey().getKey()); |
| return Transformation.apply(iter, this); |
| } |
| |
| @Override |
| protected Row applyToStatic(Row row) |
| { |
| if (row == Rows.EMPTY_STATIC_ROW) |
| return row; |
| |
| return applyToRow(row); |
| } |
| |
| @Override |
| protected Row applyToRow(Row row) |
| { |
| addSize(row.unsharedHeapSize()); |
| return row; |
| } |
| |
| @Override |
| protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) |
| { |
| addSize(marker.unsharedHeapSize()); |
| return marker; |
| } |
| |
| @Override |
| protected DeletionTime applyToDeletion(DeletionTime deletionTime) |
| { |
| addSize(deletionTime.unsharedHeapSize()); |
| return deletionTime; |
| } |
| |
| private void addSize(long size) |
| { |
| this.sizeInBytes += size; |
| if (failBytes != -1 && this.sizeInBytes >= failBytes) |
| { |
| String msg = String.format("Query %s attempted to read %d bytes but max allowed is %s; query aborted (see local_read_size_fail_threshold)", |
| ReadCommand.this.toCQLString(), this.sizeInBytes, failThreshold); |
| Tracing.trace(msg); |
| MessageParams.remove(ParamType.LOCAL_READ_SIZE_WARN); |
| MessageParams.add(ParamType.LOCAL_READ_SIZE_FAIL, this.sizeInBytes); |
| throw new LocalReadSizeTooLargeException(msg); |
| } |
| else if (warnBytes != -1 && this.sizeInBytes >= warnBytes) |
| { |
| MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN, this.sizeInBytes); |
| } |
| } |
| |
| @Override |
| protected void onClose() |
| { |
| ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata().id); |
| if (cfs != null) |
| cfs.metric.localReadSize.update(sizeInBytes); |
| } |
| } |
| |
| iterator = Transformation.apply(iterator, new QuerySizeTracking()); |
| return iterator; |
| } |
| |
| private class QueryCancellationChecker extends StoppingTransformation<UnfilteredRowIterator> |
| { |
| long lastCheckedAt = 0; |
| |
| @Override |
| protected void attachTo(BasePartitions partitions) |
| { |
| Preconditions.checkArgument(this.partitions == null || this.partitions == partitions, |
| "Attempted to attach 2nd different BasePartitions in StoppingTransformation; this is a bug."); |
| this.partitions = partitions; |
| } |
| |
| @Override |
| protected void attachTo(BaseRows rows) |
| { |
| Preconditions.checkArgument(this.rows == null || this.rows == rows, |
| "Attempted to attach 2nd different BaseRows in StoppingTransformation; this is a bug."); |
| this.rows = rows; |
| } |
| |
| @Override |
| protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) |
| { |
| maybeCancel(); |
| return Transformation.apply(partition, this); |
| } |
| |
| @Override |
| protected Row applyToRow(Row row) |
| { |
| maybeCancel(); |
| return row; |
| } |
| |
| private void maybeCancel() |
| { |
| /* |
| * The value returned by approxTime.now() is updated only every |
| * {@link org.apache.cassandra.utils.MonotonicClock.SampledClock.CHECK_INTERVAL_MS}, by default 2 millis. |
| * Since MonitorableImpl relies on approxTime, we don't need to check unless the approximate time has elapsed. |
| */ |
| if (lastCheckedAt == approxTime.now()) |
| return; |
| lastCheckedAt = approxTime.now(); |
| |
| if (isAborted()) |
| { |
| stop(); |
| throw new QueryCancelledException(ReadCommand.this); |
| } |
| } |
| } |
| |
| private UnfilteredPartitionIterator withQueryCancellation(UnfilteredPartitionIterator iter) |
| { |
| return Transformation.apply(iter, new QueryCancellationChecker()); |
| } |
| |
| /** |
| * A transformation used for simulating slow queries by tests. |
| */ |
| private static class DelayInjector extends Transformation<UnfilteredRowIterator> |
| { |
| @Override |
| protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) |
| { |
| FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); |
| return Transformation.apply(partition, this); |
| } |
| |
| @Override |
| protected Row applyToRow(Row row) |
| { |
| FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); |
| return row; |
| } |
| } |
| |
| private UnfilteredPartitionIterator maybeSlowDownForTesting(UnfilteredPartitionIterator iter) |
| { |
| if (TEST_ITERATION_DELAY_MILLIS > 0 && !SchemaConstants.isSystemKeyspace(metadata().keyspace)) |
| return Transformation.apply(iter, new DelayInjector()); |
| else |
| return iter; |
| } |
| |
| /** |
| * Creates a message for this command. |
| */ |
| public Message<ReadCommand> createMessage(boolean trackRepairedData, Dispatcher.RequestTime requestTime) |
| { |
| List<MessageFlag> flags = new ArrayList<>(3); |
| flags.add(MessageFlag.CALL_BACK_ON_FAILURE); |
| if (trackWarnings) |
| flags.add(MessageFlag.TRACK_WARNINGS); |
| if (trackRepairedData) |
| flags.add(MessageFlag.TRACK_REPAIRED_DATA); |
| |
| return Message.outWithFlags(verb(), |
| this, |
| requestTime, |
| flags); |
| } |
| |
| protected abstract boolean intersects(SSTableReader sstable); |
| |
| protected boolean hasRequiredStatics(SSTableReader sstable) { |
| // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable |
| // don't tell us if the sstable contains static values in particular. |
| return !columnFilter().fetchedColumns().statics.isEmpty() && sstable.header.hasStatic(); |
| } |
| |
| protected boolean hasPartitionLevelDeletions(SSTableReader sstable) |
| { |
| return sstable.getSSTableMetadata().hasPartitionLevelDeletions; |
| } |
| |
| public abstract Verb verb(); |
| |
| protected abstract void appendCQLWhereClause(CqlBuilder builder); |
| // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it |
| // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which |
| // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). |
| protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, |
| ColumnFamilyStore cfs, |
| ReadExecutionController controller) |
| { |
| class WithoutPurgeableTombstones extends PurgeFunction |
| { |
| public WithoutPurgeableTombstones() |
| { |
| super(nowInSec(), cfs.gcBefore(nowInSec()), controller.oldestUnrepairedTombstone(), |
| cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(), |
| iterator.metadata().enforceStrictLiveness()); |
| } |
| |
| protected LongPredicate getPurgeEvaluator() |
| { |
| return time -> true; |
| } |
| } |
| return Transformation.apply(iterator, new WithoutPurgeableTombstones()); |
| } |
| |
| |
| /** |
| * Wraps the provided iterator so that metrics on count of purgeable tombstones are tracked and traced. |
| * It tracks only tombstones with localDeletionTime < now - gc_grace_period. |
| * Other (non-purgeable) tombstones will be tracked by regular Cassandra logic later. |
| */ |
| private UnfilteredPartitionIterator maybeRecordPurgeableTombstones(UnfilteredPartitionIterator iter, |
| ColumnFamilyStore cfs) |
| { |
| class PurgeableTombstonesMetricRecording extends Transformation<UnfilteredRowIterator> |
| { |
| private int purgeableTombstones = 0; |
| |
| @Override |
| public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) |
| { |
| if (!iter.partitionLevelDeletion().isLive()) |
| purgeableTombstones++; |
| return Transformation.apply(iter, this); |
| } |
| |
| @Override |
| public Row applyToStatic(Row row) |
| { |
| return applyToRow(row); |
| } |
| |
| @Override |
| public Row applyToRow(Row row) |
| { |
| final long nowInSec = nowInSec(); |
| boolean hasTombstones = false; |
| |
| if (isPurgeableCellTombstonesTrackingEnabled()) |
| { |
| for (Cell<?> cell : row.cells()) |
| { |
| if (!cell.isLive(nowInSec) && isPurgeable(cell.localDeletionTime(), nowInSec)) |
| { |
| purgeableTombstones++; |
| hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired |
| } |
| } |
| } |
| |
| // we replicate the logic is used for non-purged tombstones metric here |
| if (!row.primaryKeyLivenessInfo().isLive(nowInSec) |
| && row.hasDeletion(nowInSec) |
| && isPurgeable(row.deletion().time(), nowInSec) |
| && !hasTombstones) |
| { |
| // We're counting primary key deletions only here. |
| purgeableTombstones++; |
| } |
| |
| return row; |
| } |
| |
| @Override |
| public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) |
| { |
| final long nowInSec = nowInSec(); |
| |
| // for boundary markers - increment metric only if both - close and open - markers are purgeable |
| if (marker.isBoundary()) |
| { |
| countIfBothPurgeable(marker.closeDeletionTime(false), |
| marker.openDeletionTime(false), |
| nowInSec); |
| } |
| // for bound markers - just increment if it is purgeable |
| else if (marker instanceof RangeTombstoneBoundMarker) |
| { |
| countIfPurgeable(((RangeTombstoneBoundMarker) marker).deletionTime(), nowInSec); |
| } |
| |
| return marker; |
| } |
| |
| @Override |
| public void onClose() |
| { |
| cfs.metric.purgeableTombstoneScannedHistogram.update(purgeableTombstones); |
| if (purgeableTombstones > 0) |
| Tracing.trace("Read {} purgeable tombstone cells", purgeableTombstones); |
| } |
| |
| /** |
| * Increments if both - close and open - deletion times less than (now - gc_grace_period) |
| */ |
| private void countIfBothPurgeable(DeletionTime closeDeletionTime, |
| DeletionTime openDeletionTime, |
| long nowInSec) |
| { |
| if (isPurgeable(closeDeletionTime, nowInSec) && isPurgeable(openDeletionTime, nowInSec)) |
| purgeableTombstones++; |
| } |
| |
| /** |
| * Increments if deletion time less than (now - gc_grace_period) |
| */ |
| private void countIfPurgeable(DeletionTime deletionTime, |
| long nowInSec) |
| { |
| if (isPurgeable(deletionTime, nowInSec)) |
| purgeableTombstones++; |
| } |
| |
| /** |
| * Checks that deletion time < now - gc_grace_period |
| */ |
| private boolean isPurgeable(DeletionTime deletionTime, |
| long nowInSec) |
| { |
| return isPurgeable(deletionTime.localDeletionTime(), nowInSec); |
| } |
| |
| /** |
| * Checks that deletion time < now - gc_grace_period |
| */ |
| private boolean isPurgeable(long localDeletionTime, |
| long nowInSec) |
| { |
| return localDeletionTime < cfs.gcBefore(nowInSec); |
| } |
| |
| private boolean isPurgeableCellTombstonesTrackingEnabled() |
| { |
| return DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() == Config.TombstonesMetricGranularity.cell; |
| } |
| } |
| |
| if (DatabaseDescriptor.getPurgeableTobmstonesMetricGranularity() != Config.TombstonesMetricGranularity.disabled) |
| return Transformation.apply(iter, new PurgeableTombstonesMetricRecording()); |
| else |
| return iter; |
| } |
| |
| /** |
| * Return the queried token(s) for logging |
| */ |
| public abstract String loggableTokens(); |
| |
| // Monitorable interface |
| public String name() |
| { |
| return toCQLString(); |
| } |
| |
| InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller) |
| { |
| final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge = |
| (unfilteredRowIterators, repairedDataInfo) -> { |
| UnfilteredRowIterator repaired = UnfilteredRowIterators.merge(unfilteredRowIterators); |
| return repairedDataInfo.withRepairedDataInfo(repaired); |
| }; |
| |
| // For single partition reads, after reading up to the command's DataLimit nothing extra is required. |
| // The merged & repaired row iterator will be consumed until it's exhausted or the RepairedDataInfo's |
| // internal counter is satisfied |
| final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions = |
| (rows) -> EmptyIterators.unfilteredPartition(metadata()); |
| return new InputCollector<>(view, controller, merge, postLimitPartitions); |
| } |
| |
| InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view, ReadExecutionController controller) |
| { |
| final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge = |
| (unfilteredPartitionIterators, repairedDataInfo) -> { |
| UnfilteredPartitionIterator repaired = UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, |
| NOOP); |
| return repairedDataInfo.withRepairedDataInfo(repaired); |
| }; |
| |
| // Uses identity function to provide additional partitions to be consumed after the command's |
| // DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions |
| // which we'll keep reading until the RepairedDataInfo's internal counter is satisfied. |
| return new InputCollector<>(view, controller, merge, Function.identity()); |
| } |
| |
| /** |
| * Handles the collation of unfiltered row or partition iterators that comprise the |
| * input for a query. Separates them according to repaired status and of repaired |
| * status is being tracked, handles the merge and wrapping in a digest generator of |
| * the repaired iterators. |
| * |
| * Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks |
| * as this prematurely closes the underlying iterators |
| */ |
| static class InputCollector<T extends AutoCloseable> |
| { |
| final RepairedDataInfo repairedDataInfo; |
| private final boolean isTrackingRepairedStatus; |
| Set<SSTableReader> repairedSSTables; |
| BiFunction<List<T>, RepairedDataInfo, T> repairedMerger; |
| Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions; |
| List<T> repairedIters; |
| List<T> unrepairedIters; |
| |
| InputCollector(ColumnFamilyStore.ViewFragment view, |
| ReadExecutionController controller, |
| BiFunction<List<T>, RepairedDataInfo, T> repairedMerger, |
| Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions) |
| { |
| this.repairedDataInfo = controller.getRepairedDataInfo(); |
| this.isTrackingRepairedStatus = controller.isTrackingRepairedStatus(); |
| |
| if (isTrackingRepairedStatus) |
| { |
| for (SSTableReader sstable : view.sstables) |
| { |
| if (considerRepairedForTracking(sstable)) |
| { |
| if (repairedSSTables == null) |
| repairedSSTables = Sets.newHashSetWithExpectedSize(view.sstables.size()); |
| repairedSSTables.add(sstable); |
| } |
| } |
| } |
| if (repairedSSTables == null) |
| { |
| repairedIters = Collections.emptyList(); |
| unrepairedIters = new ArrayList<>(view.sstables.size()); |
| } |
| else |
| { |
| repairedIters = new ArrayList<>(repairedSSTables.size()); |
| // when we're done collating, we'll merge the repaired iters and add the |
| // result to the unrepaired list, so size that list accordingly |
| unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1); |
| } |
| this.repairedMerger = repairedMerger; |
| this.postLimitAdditionalPartitions = postLimitAdditionalPartitions; |
| } |
| |
| void addMemtableIterator(T iter) |
| { |
| unrepairedIters.add(iter); |
| } |
| |
| void addSSTableIterator(SSTableReader sstable, T iter) |
| { |
| if (repairedSSTables != null && repairedSSTables.contains(sstable)) |
| repairedIters.add(iter); |
| else |
| unrepairedIters.add(iter); |
| } |
| |
| List<T> finalizeIterators(ColumnFamilyStore cfs, long nowInSec, long oldestUnrepairedTombstone) |
| { |
| if (repairedIters.isEmpty()) |
| return unrepairedIters; |
| |
| // merge the repaired data before returning, wrapping in a digest generator |
| repairedDataInfo.prepare(cfs, nowInSec, oldestUnrepairedTombstone); |
| T repairedIter = repairedMerger.apply(repairedIters, repairedDataInfo); |
| repairedDataInfo.finalize(postLimitAdditionalPartitions.apply(repairedIter)); |
| unrepairedIters.add(repairedIter); |
| return unrepairedIters; |
| } |
| |
| boolean isEmpty() |
| { |
| return repairedIters.isEmpty() && unrepairedIters.isEmpty(); |
| } |
| |
| // For tracking purposes we consider data repaired if the sstable is either: |
| // * marked repaired |
| // * marked pending, but the local session has been committed. This reduces the window |
| // whereby the tracking is affected by compaction backlog causing repaired sstables to |
| // remain in the pending state |
| // If an sstable is involved in a pending repair which is not yet committed, we mark the |
| // repaired data info inconclusive, as the same data on other replicas may be in a |
| // slightly different state. |
| private boolean considerRepairedForTracking(SSTableReader sstable) |
| { |
| if (!isTrackingRepairedStatus) |
| return false; |
| |
| TimeUUID pendingRepair = sstable.getPendingRepair(); |
| if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR) |
| { |
| if (ActiveRepairService.instance().consistent.local.isSessionFinalized(pendingRepair)) |
| return true; |
| |
| // In the edge case where compaction is backed up long enough for the session to |
| // timeout and be purged by LocalSessions::cleanup, consider the sstable unrepaired |
| // as it will be marked unrepaired when compaction catches up |
| if (!ActiveRepairService.instance().consistent.local.sessionExists(pendingRepair)) |
| return false; |
| |
| repairedDataInfo.markInconclusive(); |
| } |
| |
| return sstable.isRepaired(); |
| } |
| |
| void markInconclusive() |
| { |
| repairedDataInfo.markInconclusive(); |
| } |
| |
| public void close() throws Exception |
| { |
| FBUtilities.closeAll(unrepairedIters); |
| FBUtilities.closeAll(repairedIters); |
| } |
| } |
| |
| @VisibleForTesting |
| public static class Serializer implements IVersionedSerializer<ReadCommand> |
| { |
| private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 10L, TimeUnit.SECONDS); |
| private static final NoSpamLogger.NoSpamLogStatement schemaMismatchStmt = |
| noSpamLogger.getStatement("Schema epoch mismatch during read command deserialization. " + |
| "TableId: {}, remote epoch: {}, local epoch: {}", 10L, TimeUnit.SECONDS); |
| |
| private static final int IS_DIGEST = 0x01; |
| private static final int IS_FOR_THRIFT = 0x02; |
| private static final int HAS_INDEX = 0x04; |
| private static final int ACCEPTS_TRANSIENT = 0x08; |
| private static final int NEEDS_RECONCILIATION = 0x10; |
| private static final int ALLOWS_POTENTIAL_TXN_CONFLICTS = 0x20; |
| |
| private final SchemaProvider schema; |
| |
| public Serializer() |
| { |
| this(Schema.instance); |
| } |
| |
| @VisibleForTesting |
| public Serializer(SchemaProvider schema) |
| { |
| this.schema = Objects.requireNonNull(schema, "schema"); |
| } |
| |
| private static int digestFlag(boolean isDigest) |
| { |
| return isDigest ? IS_DIGEST : 0; |
| } |
| |
| private static boolean isDigest(int flags) |
| { |
| return (flags & IS_DIGEST) != 0; |
| } |
| |
| private static boolean acceptsTransient(int flags) |
| { |
| return (flags & ACCEPTS_TRANSIENT) != 0; |
| } |
| |
| private static int acceptsTransientFlag(boolean acceptsTransient) |
| { |
| return acceptsTransient ? ACCEPTS_TRANSIENT : 0; |
| } |
| |
| // We don't set this flag anymore, but still look if we receive a |
| // command with it set in case someone is using thrift a mixed 3.0/4.0+ |
| // cluster (which is unsupported). This is also a reminder for not |
| // re-using this flag until we drop 3.0/3.X compatibility (since it's |
| // used by these release for thrift and would thus confuse things) |
| private static boolean isForThrift(int flags) |
| { |
| return (flags & IS_FOR_THRIFT) != 0; |
| } |
| |
| private static int indexFlag(boolean hasIndex) |
| { |
| return hasIndex ? HAS_INDEX : 0; |
| } |
| |
| private static boolean hasIndex(int flags) |
| { |
| return (flags & HAS_INDEX) != 0; |
| } |
| |
| private static int needsReconciliationFlag(boolean needsReconciliation) |
| { |
| return needsReconciliation ? NEEDS_RECONCILIATION : 0; |
| } |
| |
| private static boolean needsReconciliation(int flags) |
| { |
| return (flags & NEEDS_RECONCILIATION) != 0; |
| } |
| |
| private static int potentialTxnConflicts(PotentialTxnConflicts potentialTxnConflicts) |
| { |
| return potentialTxnConflicts.allowed ? ALLOWS_POTENTIAL_TXN_CONFLICTS : 0; |
| } |
| |
| private static PotentialTxnConflicts potentialTxnConflicts(int flags) |
| { |
| return (flags & ALLOWS_POTENTIAL_TXN_CONFLICTS) != 0 ? PotentialTxnConflicts.ALLOW : PotentialTxnConflicts.DISALLOW; |
| } |
| |
| private void serializeHeader(ReadCommand command, DataOutputPlus out, int version) throws IOException |
| { |
| out.writeByte(command.kind.ordinal()); |
| out.writeByte( |
| digestFlag(command.isDigestQuery()) |
| | indexFlag(null != command.indexQueryPlan()) |
| | acceptsTransientFlag(command.acceptsTransient()) |
| | needsReconciliationFlag(command.rowFilter().needsReconciliation()) |
| | potentialTxnConflicts(command.potentialTxnConflicts) |
| ); |
| } |
| |
| private void serializeFiltersAndLimits(ReadCommand command, DataOutputPlus out, int version) throws IOException |
| { |
| ColumnFilter.serializer.serialize(command.columnFilter(), out, version); |
| RowFilter.serializer.serialize(command.rowFilter(), out, version); |
| DataLimits.serializer.serialize(command.limits(), out, version, command.metadata().comparator); |
| // Using the name of one of the indexes in the plan to identify the index group because we want |
| // to keep compatibility with legacy nodes. Each replica can create its own different index query plan |
| // from the index name. |
| if (null != command.indexQueryPlan) |
| IndexMetadata.serializer.serialize(command.indexQueryPlan.getFirst().getIndexMetadata(), out, version); |
| } |
| |
| public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException |
| { |
| serializeHeader(command, out, version); |
| if (command.isDigestQuery()) |
| out.writeUnsignedVInt32(command.digestVersion()); |
| command.metadata().id.serialize(out); |
| if (version >= MessagingService.VERSION_60) |
| Epoch.serializer.serialize(command.serializedAtEpoch, out); |
| out.writeInt(version >= MessagingService.VERSION_50 ? CassandraUInt.fromLong(command.nowInSec()) : (int) command.nowInSec()); |
| serializeFiltersAndLimits(command, out, version); |
| command.serializeSelection(out, version); |
| } |
| |
| public void serializeForAccord(ReadCommand command, TableMetadatas tables, DataOutputPlus out, int version) throws IOException |
| { |
| Invariants.require(!command.isDigestQuery); |
| serializeHeader(command, out, version); |
| tables.serialize(command.metadata(), out); |
| serializeFiltersAndLimits(command, out, version); |
| command.serializeSelectionWithoutKey(out, version); |
| } |
| |
| private ReadCommand deserialize(SelectionDeserializer deserializer, int flags, Epoch schemaVersion, int digestVersion, long nowInSec, TableMetadata tableMetadata, DataInputPlus in, int version) throws IOException |
| { |
| boolean isDigest = isDigest(flags); |
| boolean acceptsTransient = acceptsTransient(flags); |
| PotentialTxnConflicts potentialTxnConflicts = potentialTxnConflicts(flags); |
| boolean hasIndex = hasIndex(flags); |
| boolean needsReconciliation = needsReconciliation(flags); |
| |
| ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, tableMetadata); |
| RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, tableMetadata, needsReconciliation); |
| DataLimits limits = DataLimits.serializer.deserialize(in, version, tableMetadata); |
| Index.QueryPlan indexQueryPlan = null; |
| if (hasIndex) |
| { |
| IndexMetadata index = deserializeIndexMetadata(in, version, tableMetadata); |
| Index.Group indexGroup = Keyspace.openAndGetStore(tableMetadata).indexManager.getIndexGroup(index); |
| if (indexGroup != null) |
| indexQueryPlan = indexGroup.queryPlanFor(rowFilter); |
| } |
| |
| return deserializer.deserialize(in, version, schemaVersion, isDigest, digestVersion, acceptsTransient, potentialTxnConflicts, tableMetadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan); |
| } |
| |
| public ReadCommand deserialize(DataInputPlus in, int version) throws IOException |
| { |
| Kind kind = Kind.values()[in.readByte()]; |
| int flags = in.readByte(); |
| // Shouldn't happen or it's a user error (see comment above) but |
| // better complain loudly than doing the wrong thing. |
| if (isForThrift(flags)) |
| throw new IllegalStateException("Received a command with the thrift flag set. " |
| + "This means thrift is in use in a mixed 3.0/3.X and 4.0+ cluster, " |
| + "which is unsupported. Make sure to stop using thrift before " |
| + "upgrading to 4.0"); |
| |
| int digestVersion = isDigest(flags) ? in.readUnsignedVInt32() : 0; |
| TableId tableId = TableId.deserialize(in); |
| |
| Epoch schemaVersion = Epoch.EMPTY; |
| if (version >= MessagingService.VERSION_60) |
| schemaVersion = Epoch.serializer.deserialize(in); |
| TableMetadata tableMetadata; |
| try |
| { |
| tableMetadata = schema.getExistingTableMetadata(tableId); |
| } |
| catch (UnknownTableException e) |
| { |
| ClusterMetadata metadata = ClusterMetadata.current(); |
| Epoch localCurrentEpoch = metadata.epoch; |
| if (schemaVersion != null && localCurrentEpoch.isAfter(schemaVersion)) |
| { |
| TCMMetrics.instance.coordinatorBehindSchema.mark(); |
| throw new CoordinatorBehindException(e.getMessage()); |
| } |
| throw e; |
| } |
| long nowInSec = version >= MessagingService.VERSION_50 ? CassandraUInt.toLong(in.readInt()) : in.readInt(); |
| return deserialize(kind.selectionDeserializer, flags, schemaVersion, digestVersion, nowInSec, tableMetadata, in, version); |
| } |
| |
| public ReadCommand deserializeForAccord(Seekable key, TableMetadatas tables, DataInputPlus in, int version) throws IOException |
| { |
| Kind kind = Kind.values()[in.readByte()]; |
| int flags = in.readByte(); |
| if (isDigest(flags) || isForThrift(flags) || acceptsTransient(flags)) |
| throw new IllegalStateException("Received an Accord command with a digest/thrift/transient flag set."); |
| |
| TableMetadata tableMetadata = tables.deserialize(in); |
| |
| return deserialize(kind.accordSelectionDeserializer.apply(key), flags, tableMetadata.epoch, 0, 0, tableMetadata, in, version); |
| } |
| |
| private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException |
| { |
| try |
| { |
| return IndexMetadata.serializer.deserialize(in, version, metadata); |
| } |
| catch (UnknownIndexException e) |
| { |
| logger.info("Couldn't find a defined index on {}.{} with the id {}. " + |
| "If an index was just created, this is likely due to the schema not " + |
| "being fully propagated. Local read will proceed without using the " + |
| "index. Please wait for schema agreement after index creation.", |
| metadata.keyspace, metadata.name, e.indexId); |
| return null; |
| } |
| } |
| |
| public long serializedSize(ReadCommand command, int version) |
| { |
| return 2 // kind + flags |
| + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) |
| + command.metadata().id.serializedSize() |
| + (version >= MessagingService.VERSION_60 ? Epoch.serializer.serializedSize(command.metadata().epoch) : 0) |
| + TypeSizes.INT_SIZE // command.nowInSec() is serialized as uint |
| + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) |
| + RowFilter.serializer.serializedSize(command.rowFilter(), version) |
| + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata().comparator) |
| + command.selectionSerializedSize(version) |
| + command.indexSerializedSize(version); |
| } |
| |
| public long serializedSizeForAccord(ReadCommand command, TableMetadatas tables, int version) |
| { |
| return 2 // kind + flags |
| + tables.serializedSize(command.metadata()) |
| + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) |
| + RowFilter.serializer.serializedSize(command.rowFilter(), version) |
| + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata().comparator) |
| + command.selectionSerializedSize(version) |
| + command.indexSerializedSize(version); |
| } |
| } |
| } |