| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.db; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| |
| import org.apache.cassandra.cache.IRowCacheEntry; |
| import org.apache.cassandra.cache.RowCacheKey; |
| import org.apache.cassandra.cache.RowCacheSentinel; |
| import org.apache.cassandra.concurrent.Stage; |
| import org.apache.cassandra.concurrent.StageManager; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.db.lifecycle.*; |
| import org.apache.cassandra.db.filter.*; |
| import org.apache.cassandra.db.partitions.*; |
| import org.apache.cassandra.db.rows.*; |
| import org.apache.cassandra.exceptions.RequestExecutionException; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.util.DataInputPlus; |
| import org.apache.cassandra.io.util.DataOutputPlus; |
| import org.apache.cassandra.metrics.TableMetrics; |
| import org.apache.cassandra.net.MessageOut; |
| import org.apache.cassandra.net.MessagingService; |
| import org.apache.cassandra.schema.IndexMetadata; |
| import org.apache.cassandra.service.CacheService; |
| import org.apache.cassandra.service.ClientState; |
| import org.apache.cassandra.service.StorageProxy; |
| import org.apache.cassandra.service.pager.*; |
| import org.apache.cassandra.thrift.ThriftResultsMerger; |
| import org.apache.cassandra.tracing.Tracing; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.SearchIterator; |
| import org.apache.cassandra.utils.btree.BTreeSet; |
| import org.apache.cassandra.utils.concurrent.OpOrder; |
| import org.apache.cassandra.utils.memory.HeapAllocator; |
| |
| |
| /** |
| * A read command that selects a (part of a) single partition. |
| */ |
| public class SinglePartitionReadCommand extends ReadCommand |
| { |
| protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); |
| |
| private final DecoratedKey partitionKey; |
| private final ClusteringIndexFilter clusteringIndexFilter; |
| |
| private int oldestUnrepairedTombstone = Integer.MAX_VALUE; |
| |
| public SinglePartitionReadCommand(boolean isDigest, |
| int digestVersion, |
| boolean isForThrift, |
| CFMetaData metadata, |
| int nowInSec, |
| ColumnFilter columnFilter, |
| RowFilter rowFilter, |
| DataLimits limits, |
| DecoratedKey partitionKey, |
| ClusteringIndexFilter clusteringIndexFilter) |
| { |
| super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); |
| assert partitionKey.getPartitioner() == metadata.partitioner; |
| this.partitionKey = partitionKey; |
| this.clusteringIndexFilter = clusteringIndexFilter; |
| } |
| |
| /** |
| * Creates a new read command on a single partition. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param columnFilter the column filter to use for the query. |
| * @param rowFilter the row filter to use for the query. |
| * @param limits the limits to use for the query. |
| * @param partitionKey the partition key for the partition to query. |
| * @param clusteringIndexFilter the clustering index filter to use for the query. |
| * |
| * @return a newly created read command. |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, |
| int nowInSec, |
| ColumnFilter columnFilter, |
| RowFilter rowFilter, |
| DataLimits limits, |
| DecoratedKey partitionKey, |
| ClusteringIndexFilter clusteringIndexFilter) |
| { |
| return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); |
| } |
| |
| /** |
| * Creates a new read command on a single partition for thrift. |
| * |
| * @param isForThrift whether the query is for thrift or not. |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param columnFilter the column filter to use for the query. |
| * @param rowFilter the row filter to use for the query. |
| * @param limits the limits to use for the query. |
| * @param partitionKey the partition key for the partition to query. |
| * @param clusteringIndexFilter the clustering index filter to use for the query. |
| * |
| * @return a newly created read command. |
| */ |
| public static SinglePartitionReadCommand create(boolean isForThrift, |
| CFMetaData metadata, |
| int nowInSec, |
| ColumnFilter columnFilter, |
| RowFilter rowFilter, |
| DataLimits limits, |
| DecoratedKey partitionKey, |
| ClusteringIndexFilter clusteringIndexFilter) |
| { |
| return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); |
| } |
| |
| /** |
| * Creates a new read command on a single partition. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * @param columnFilter the column filter to use for the query. |
| * @param filter the clustering index filter to use for the query. |
| * |
| * @return a newly created read command. The returned command will use no row filter and have no limits. |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter) |
| { |
| return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); |
| } |
| |
| /** |
| * Creates a new read command that queries a single partition in its entirety. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * |
| * @return a newly created read command that queries all the rows of {@code key}. |
| */ |
| public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key) |
| { |
| return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL); |
| } |
| |
| /** |
| * Creates a new read command that queries a single partition in its entirety. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * |
| * @return a newly created read command that queries all the rows of {@code key}. |
| */ |
| public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) |
| { |
| return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); |
| } |
| |
| /** |
| * Creates a new single partition slice command for the provided single slice. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * @param slice the slice of rows to query. |
| * |
| * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will |
| * query every columns for the table (without limit or row filtering) and be in forward order. |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice) |
| { |
| return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice)); |
| } |
| |
| /** |
| * Creates a new single partition slice command for the provided slices. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * @param slices the slices of rows to query. |
| * |
| * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will |
| * query every columns for the table (without limit or row filtering) and be in forward order. |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices) |
| { |
| ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false); |
| return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); |
| } |
| |
| /** |
| * Creates a new single partition slice command for the provided slices. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * @param slices the slices of rows to query. |
| * |
| * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will |
| * query every columns for the table (without limit or row filtering) and be in forward order. |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices) |
| { |
| return create(metadata, nowInSec, metadata.decorateKey(key), slices); |
| } |
| |
| /** |
| * Creates a new single partition name command for the provided rows. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * @param names the clustering for the rows to query. |
| * |
| * @return a newly created read command that queries the {@code names} in {@code key}. The returned query will |
| * query every columns (without limit or row filtering) and be in forward order. |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names) |
| { |
| ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false); |
| return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); |
| } |
| |
| /** |
| * Creates a new single partition name command for the provided row. |
| * |
| * @param metadata the table to query. |
| * @param nowInSec the time in seconds to use are "now" for this query. |
| * @param key the partition key for the partition to query. |
| * @param name the clustering for the row to query. |
| * |
| * @return a newly created read command that queries {@code name} in {@code key}. The returned query will |
| * query every columns (without limit or row filtering). |
| */ |
| public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Clustering name) |
| { |
| return create(metadata, nowInSec, key, FBUtilities.singleton(name, metadata.comparator)); |
| } |
| |
| public SinglePartitionReadCommand copy() |
| { |
| return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); |
| } |
| |
| public DecoratedKey partitionKey() |
| { |
| return partitionKey; |
| } |
| |
| public ClusteringIndexFilter clusteringIndexFilter() |
| { |
| return clusteringIndexFilter; |
| } |
| |
| public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) |
| { |
| return clusteringIndexFilter; |
| } |
| |
| public long getTimeout() |
| { |
| return DatabaseDescriptor.getReadRpcTimeout(); |
| } |
| |
| public boolean selectsKey(DecoratedKey key) |
| { |
| if (!this.partitionKey().equals(key)) |
| return false; |
| |
| return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator()); |
| } |
| |
| public boolean selectsClustering(DecoratedKey key, Clustering clustering) |
| { |
| if (clustering == Clustering.STATIC_CLUSTERING) |
| return !columnFilter().fetchedColumns().statics.isEmpty(); |
| |
| if (!clusteringIndexFilter().selects(clustering)) |
| return false; |
| |
| return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); |
| } |
| |
| /** |
| * Returns a new command suitable to paging from the last returned row. |
| * |
| * @param lastReturned the last row returned by the previous page. The newly created command |
| * will only query row that comes after this (in query order). This can be {@code null} if this |
| * is the first page. |
| * @param pageSize the size to use for the page to query. |
| * |
| * @return the newly create command. |
| */ |
| public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize) |
| { |
| // We shouldn't have set digest yet when reaching that point |
| assert !isDigestQuery(); |
| return create(isForThrift(), |
| metadata(), |
| nowInSec(), |
| columnFilter(), |
| rowFilter(), |
| limits().forPaging(pageSize), |
| partitionKey(), |
| lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); |
| } |
| |
| public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException |
| { |
| return StorageProxy.read(Group.one(this), consistency, clientState); |
| } |
| |
| public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion) |
| { |
| return getPager(this, pagingState, protocolVersion); |
| } |
| |
| private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion) |
| { |
| return new SinglePartitionPager(command, pagingState, protocolVersion); |
| } |
| |
| protected void recordLatency(TableMetrics metric, long latencyNanos) |
| { |
| metric.readLatency.addNano(latencyNanos); |
| } |
| |
| @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail) |
| protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) |
| { |
| UnfilteredRowIterator partition = cfs.isRowCacheEnabled() |
| ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup()) |
| : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup()); |
| return new SingletonUnfilteredPartitionIterator(partition, isForThrift()); |
| } |
| |
| /** |
| * Fetch the rows requested if in cache; if not, read it from disk and cache it. |
| * <p> |
| * If the partition is cached, and the filter given is within its bounds, we return |
| * from cache, otherwise from disk. |
| * <p> |
| * If the partition is is not cached, we figure out what filter is "biggest", read |
| * that from disk, then filter the result and either cache that or return it. |
| */ |
| private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp) |
| { |
| assert !cfs.isIndex(); // CASSANDRA-5732 |
| assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name); |
| |
| RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey()); |
| |
| // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our |
| // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 |
| // TODO: don't evict entire partitions on writes (#2864) |
| IRowCacheEntry cached = CacheService.instance.rowCache.get(key); |
| if (cached != null) |
| { |
| if (cached instanceof RowCacheSentinel) |
| { |
| // Some other read is trying to cache the value, just do a normal non-caching read |
| Tracing.trace("Row cache miss (race)"); |
| cfs.metric.rowCacheMiss.inc(); |
| return queryMemtableAndDisk(cfs, readOp); |
| } |
| |
| CachedPartition cachedPartition = (CachedPartition)cached; |
| if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec())) |
| { |
| cfs.metric.rowCacheHit.inc(); |
| Tracing.trace("Row cache hit"); |
| UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition); |
| cfs.metric.updateSSTableIterated(0); |
| return unfilteredRowIterator; |
| } |
| |
| cfs.metric.rowCacheHitOutOfRange.inc(); |
| Tracing.trace("Ignoring row cache as cached value could not satisfy query"); |
| return queryMemtableAndDisk(cfs, readOp); |
| } |
| |
| cfs.metric.rowCacheMiss.inc(); |
| Tracing.trace("Row cache miss"); |
| |
| boolean cacheFullPartitions = metadata().params.caching.cacheAllRows(); |
| |
| // To be able to cache what we read, what we read must at least covers what the cache holds, that |
| // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows |
| // systematically, but we'd have to "extend" that to whatever is needed for the user query that the |
| // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently |
| // we settle for caching what we read only if the user query does query the head of the partition since |
| // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache |
| // full partitions, in which case we just always read it all and cache. |
| if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter()) |
| { |
| RowCacheSentinel sentinel = new RowCacheSentinel(); |
| boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); |
| boolean sentinelReplaced = false; |
| |
| try |
| { |
| int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); |
| @SuppressWarnings("resource") // we close on exception or upon closing the result of this method |
| UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); |
| try |
| { |
| // We want to cache only rowsToCache rows |
| CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); |
| if (sentinelSuccess && !toCache.isEmpty()) |
| { |
| Tracing.trace("Caching {} rows", toCache.rowCount()); |
| CacheService.instance.rowCache.replace(key, sentinel, toCache); |
| // Whether or not the previous replace has worked, our sentinel is not in the cache anymore |
| sentinelReplaced = true; |
| } |
| |
| // We then re-filter out what this query wants. |
| // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more |
| // than what we've cached, so we can't just use toCache. |
| UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache); |
| if (cacheFullPartitions) |
| { |
| // Everything is guaranteed to be in 'toCache', we're done with 'iter' |
| assert !iter.hasNext(); |
| iter.close(); |
| return cacheIterator; |
| } |
| return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter)); |
| } |
| catch (RuntimeException | Error e) |
| { |
| iter.close(); |
| throw e; |
| } |
| } |
| finally |
| { |
| if (sentinelSuccess && !sentinelReplaced) |
| cfs.invalidateCachedPartition(key); |
| } |
| } |
| |
| Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition"); |
| return queryMemtableAndDisk(cfs, readOp); |
| } |
| |
| /** |
| * Queries both memtable and sstables to fetch the result of this query. |
| * <p> |
| * Please note that this method: |
| * 1) does not check the row cache. |
| * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes). |
| * Those are applied in {@link ReadCommand#executeLocally}. |
| * 3) does not record some of the read metrics (latency, scanned cells histograms) nor |
| * throws TombstoneOverwhelmingException. |
| * It is publicly exposed because there is a few places where that is exactly what we want, |
| * but it should be used only where you know you don't need thoses things. |
| * <p> |
| * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is |
| * to enforce that that it is required as parameter, even though it's not explicitlly used by the method. |
| */ |
| public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp) |
| { |
| Tracing.trace("Executing single-partition query on {}", cfs.name); |
| |
| boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap(); |
| return queryMemtableAndDiskInternal(cfs, copyOnHeap); |
| } |
| |
| @Override |
| protected int oldestUnrepairedTombstone() |
| { |
| return oldestUnrepairedTombstone; |
| } |
| |
| private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) |
| { |
| /* |
| * We have 2 main strategies: |
| * 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use |
| * unless we have a names filter that we know we can optimize futher. |
| * 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row |
| * will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we |
| * have a way to guarantee we have all the data for what is queried, which is only possible for name queries |
| * and if we have neither collections nor counters (indeed, for a collection, we can't guarantee an older sstable |
| * won't have some elements that weren't in the most recent sstables, and counters are intrinsically a collection |
| * of shards so have the same problem). |
| */ |
| if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections()) |
| return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter()); |
| |
| Tracing.trace("Acquiring sstable references"); |
| ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); |
| |
| List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); |
| ClusteringIndexFilter filter = clusteringIndexFilter(); |
| |
| try |
| { |
| for (Memtable memtable : view.memtables) |
| { |
| Partition partition = memtable.getPartition(partitionKey()); |
| if (partition == null) |
| continue; |
| |
| @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator |
| UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); |
| @SuppressWarnings("resource") // same as above |
| UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter; |
| oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime); |
| iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied); |
| } |
| /* |
| * We can't eliminate full sstables based on the timestamp of what we've already read like |
| * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone |
| * we've read. We still rely on the sstable ordering by maxTimestamp since if |
| * maxTimestamp_s1 > maxTimestamp_s0, |
| * we're guaranteed that s1 cannot have a row tombstone such that |
| * timestamp(tombstone) > maxTimestamp_s0 |
| * since we necessarily have |
| * timestamp(tombstone) <= maxTimestamp_s1 |
| * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination |
| * in one pass, and minimize the number of sstables for which we read a partition tombstone. |
| */ |
| int sstablesIterated = 0; |
| Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); |
| List<SSTableReader> skippedSSTables = null; |
| long mostRecentPartitionTombstone = Long.MIN_VALUE; |
| long minTimestamp = Long.MAX_VALUE; |
| int nonIntersectingSSTables = 0; |
| |
| for (SSTableReader sstable : view.sstables) |
| { |
| minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); |
| // if we've already seen a partition tombstone with a timestamp greater |
| // than the most recent update to this sstable, we can skip it |
| if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone) |
| break; |
| |
| if (!shouldInclude(sstable)) |
| { |
| nonIntersectingSSTables++; |
| // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely |
| if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE) |
| { |
| if (skippedSSTables == null) |
| skippedSSTables = new ArrayList<>(); |
| skippedSSTables.add(sstable); |
| } |
| continue; |
| } |
| |
| sstable.incrementReadCount(); |
| @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator |
| UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())); |
| if (!sstable.isRepaired()) |
| oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); |
| |
| iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); |
| mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt()); |
| sstablesIterated++; |
| } |
| |
| int includedDueToTombstones = 0; |
| // Check for partition tombstones in the skipped sstables |
| if (skippedSSTables != null) |
| { |
| for (SSTableReader sstable : skippedSSTables) |
| { |
| if (sstable.getMaxTimestamp() <= minTimestamp) |
| continue; |
| |
| sstable.incrementReadCount(); |
| @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator |
| UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())); |
| if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp) |
| { |
| iterators.add(iter); |
| if (!sstable.isRepaired()) |
| oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); |
| includedDueToTombstones++; |
| sstablesIterated++; |
| } |
| else |
| { |
| iter.close(); |
| } |
| } |
| } |
| if (Tracing.isTracing()) |
| Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", |
| nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); |
| |
| cfs.metric.updateSSTableIterated(sstablesIterated); |
| |
| if (iterators.isEmpty()) |
| return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); |
| |
| Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); |
| |
| @SuppressWarnings("resource") // Closed through the closing of the result of that method. |
| UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec()); |
| if (!merged.isEmpty()) |
| { |
| DecoratedKey key = merged.partitionKey(); |
| cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); |
| } |
| |
| return merged; |
| } |
| catch (RuntimeException | Error e) |
| { |
| try |
| { |
| FBUtilities.closeAll(iterators); |
| } |
| catch (Exception suppressed) |
| { |
| e.addSuppressed(suppressed); |
| } |
| throw e; |
| } |
| } |
| |
| private boolean shouldInclude(SSTableReader sstable) |
| { |
| // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable |
| // don't tell us if the sstable contains static values in particular. |
| // TODO: we could record if a sstable contains any static value at all. |
| if (!columnFilter().fetchedColumns().statics.isEmpty()) |
| return true; |
| |
| return clusteringIndexFilter().shouldInclude(sstable); |
| } |
| |
| private boolean queryNeitherCountersNorCollections() |
| { |
| for (ColumnDefinition column : columnFilter().fetchedColumns()) |
| { |
| if (column.type.isCollection() || column.type.isCounter()) |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable |
| * max timestamp. |
| * |
| * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing |
| * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if |
| * no collection or counters are included). |
| * This method assumes the filter is a {@code ClusteringIndexNamesFilter}. |
| */ |
| private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter) |
| { |
| Tracing.trace("Acquiring sstable references"); |
| ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); |
| |
| ImmutableBTreePartition result = null; |
| |
| Tracing.trace("Merging memtable contents"); |
| for (Memtable memtable : view.memtables) |
| { |
| Partition partition = memtable.getPartition(partitionKey()); |
| if (partition == null) |
| continue; |
| |
| try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition)) |
| { |
| if (iter.isEmpty()) |
| continue; |
| |
| UnfilteredRowIterator clonedFilter = copyOnHeap |
| ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) |
| : iter; |
| result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false); |
| } |
| } |
| |
| /* add the SSTables on disk */ |
| Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); |
| int sstablesIterated = 0; |
| boolean onlyUnrepaired = true; |
| // read sorted sstables |
| for (SSTableReader sstable : view.sstables) |
| { |
| // if we've already seen a partition tombstone with a timestamp greater |
| // than the most recent update to this sstable, we're done, since the rest of the sstables |
| // will also be older |
| if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt()) |
| break; |
| |
| long currentMaxTs = sstable.getMaxTimestamp(); |
| filter = reduceFilter(filter, result, currentMaxTs); |
| if (filter == null) |
| break; |
| |
| if (!shouldInclude(sstable)) |
| { |
| // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion |
| // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us |
| // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable |
| // has any tombstone at all as a shortcut. |
| if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE) |
| continue; // Means no tombstone at all, we can skip that sstable |
| |
| // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable. |
| sstable.incrementReadCount(); |
| try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())) |
| { |
| if (iter.partitionLevelDeletion().isLive()) |
| { |
| sstablesIterated++; |
| result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired()); |
| } |
| } |
| continue; |
| } |
| |
| Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); |
| sstable.incrementReadCount(); |
| try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))) |
| { |
| if (iter.isEmpty()) |
| continue; |
| |
| if (sstable.isRepaired()) |
| onlyUnrepaired = false; |
| sstablesIterated++; |
| result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired()); |
| } |
| } |
| |
| cfs.metric.updateSSTableIterated(sstablesIterated); |
| |
| if (result == null || result.isEmpty()) |
| return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false); |
| |
| DecoratedKey key = result.partitionKey(); |
| cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); |
| |
| // "hoist up" the requested data into a more recent sstable |
| if (sstablesIterated > cfs.getMinimumCompactionThreshold() |
| && onlyUnrepaired |
| && !cfs.isAutoCompactionDisabled() |
| && cfs.getCompactionStrategyManager().shouldDefragment()) |
| { |
| // !!WARNING!! if we stop copying our data to a heap-managed object, |
| // we will need to track the lifetime of this mutation as well |
| Tracing.trace("Defragmenting requested data"); |
| |
| try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) |
| { |
| final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter)); |
| StageManager.getStage(Stage.MUTATION).execute(new Runnable() |
| { |
| public void run() |
| { |
| // skipping commitlog and index updates is fine since we're just de-fragmenting existing data |
| Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); |
| } |
| }); |
| } |
| } |
| |
| return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); |
| } |
| |
| private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired) |
| { |
| if (!isRepaired) |
| oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime); |
| |
| int maxRows = Math.max(filter.requestedRows().size(), 1); |
| if (result == null) |
| return ImmutableBTreePartition.create(iter, maxRows); |
| |
| try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec())) |
| { |
| return ImmutableBTreePartition.create(merged, maxRows); |
| } |
| } |
| |
| private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp) |
| { |
| if (result == null) |
| return filter; |
| |
| SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false); |
| |
| PartitionColumns columns = columnFilter().fetchedColumns(); |
| NavigableSet<Clustering> clusterings = filter.requestedRows(); |
| |
| // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows. |
| // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave |
| // that for later. |
| |
| boolean removeStatic = false; |
| if (!columns.statics.isEmpty()) |
| { |
| Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING); |
| removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp); |
| } |
| |
| NavigableSet<Clustering> toRemove = null; |
| for (Clustering clustering : clusterings) |
| { |
| if (!searchIter.hasNext()) |
| break; |
| |
| Row row = searchIter.next(clustering); |
| if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp)) |
| continue; |
| |
| if (toRemove == null) |
| toRemove = new TreeSet<>(result.metadata().comparator); |
| toRemove.add(clustering); |
| } |
| |
| if (!removeStatic && toRemove == null) |
| return filter; |
| |
| // Check if we have everything we need |
| boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic; |
| boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size()); |
| if (hasNoMoreStatic && hasNoMoreClusterings) |
| return null; |
| |
| if (toRemove != null) |
| { |
| BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator); |
| newClusterings.addAll(Sets.difference(clusterings, toRemove)); |
| clusterings = newClusterings.build(); |
| } |
| return new ClusteringIndexNamesFilter(clusterings, filter.isReversed()); |
| } |
| |
| private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp) |
| { |
| // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query |
| // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not) |
| // and 2) the requested columns. |
| if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp) |
| return false; |
| |
| for (ColumnDefinition column : requestedColumns) |
| { |
| Cell cell = row.getCell(column); |
| if (cell == null || cell.timestamp() <= sstableTimestamp) |
| return false; |
| } |
| return true; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)", |
| metadata().ksName, |
| metadata().cfName, |
| columnFilter(), |
| rowFilter(), |
| limits(), |
| metadata().getKeyValidator().getString(partitionKey().getKey()), |
| clusteringIndexFilter.toString(metadata()), |
| nowInSec()); |
| } |
| |
| public MessageOut<ReadCommand> createMessage(int version) |
| { |
| return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer); |
| } |
| |
| protected void appendCQLWhereClause(StringBuilder sb) |
| { |
| sb.append(" WHERE "); |
| |
| sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = "); |
| DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey()); |
| |
| // We put the row filter first because the clustering index filter can end by "ORDER BY" |
| if (!rowFilter().isEmpty()) |
| sb.append(" AND ").append(rowFilter()); |
| |
| String filterString = clusteringIndexFilter().toCQLString(metadata()); |
| if (!filterString.isEmpty()) |
| sb.append(" AND ").append(filterString); |
| } |
| |
| protected void serializeSelection(DataOutputPlus out, int version) throws IOException |
| { |
| metadata().getKeyValidator().writeValue(partitionKey().getKey(), out); |
| ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version); |
| } |
| |
| protected long selectionSerializedSize(int version) |
| { |
| return metadata().getKeyValidator().writtenLength(partitionKey().getKey()) |
| + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version); |
| } |
| |
| /** |
| * Groups multiple single partition read commands. |
| */ |
| public static class Group implements ReadQuery |
| { |
| public final List<SinglePartitionReadCommand> commands; |
| private final DataLimits limits; |
| private final int nowInSec; |
| |
| public Group(List<SinglePartitionReadCommand> commands, DataLimits limits) |
| { |
| assert !commands.isEmpty(); |
| this.commands = commands; |
| this.limits = limits; |
| this.nowInSec = commands.get(0).nowInSec(); |
| for (int i = 1; i < commands.size(); i++) |
| assert commands.get(i).nowInSec() == nowInSec; |
| } |
| |
| public static Group one(SinglePartitionReadCommand command) |
| { |
| return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits()); |
| } |
| |
| public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException |
| { |
| return StorageProxy.read(this, consistency, clientState); |
| } |
| |
| public int nowInSec() |
| { |
| return nowInSec; |
| } |
| |
| public DataLimits limits() |
| { |
| return limits; |
| } |
| |
| public CFMetaData metadata() |
| { |
| return commands.get(0).metadata(); |
| } |
| |
| public ReadOrderGroup startOrderGroup() |
| { |
| // Note that the only difference between the command in a group must be the partition key on which |
| // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. |
| return commands.get(0).startOrderGroup(); |
| } |
| |
| public PartitionIterator executeInternal(ReadOrderGroup orderGroup) |
| { |
| List<PartitionIterator> partitions = new ArrayList<>(commands.size()); |
| for (SinglePartitionReadCommand cmd : commands) |
| partitions.add(cmd.executeInternal(orderGroup)); |
| |
| // Because we only have enforce the limit per command, we need to enforce it globally. |
| return limits.filter(PartitionIterators.concat(partitions), nowInSec); |
| } |
| |
| public QueryPager getPager(PagingState pagingState, int protocolVersion) |
| { |
| if (commands.size() == 1) |
| return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion); |
| |
| return new MultiPartitionPager(this, pagingState, protocolVersion); |
| } |
| |
| public boolean selectsKey(DecoratedKey key) |
| { |
| return Iterables.any(commands, c -> c.selectsKey(key)); |
| } |
| |
| public boolean selectsClustering(DecoratedKey key, Clustering clustering) |
| { |
| return Iterables.any(commands, c -> c.selectsClustering(key, clustering)); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return commands.toString(); |
| } |
| } |
| |
| private static class Deserializer extends SelectionDeserializer |
| { |
| public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) |
| throws IOException |
| { |
| DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize())); |
| ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); |
| return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter); |
| } |
| } |
| } |