blob: 99e24c876b5e7ecbb880dfd99423be72dfbeedf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import com.google.common.collect.Iterables;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.BaseRowIterator;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.thrift.ThriftResultsMerger;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
/**
* A read command that selects a (part of a) range of partitions.
*/
public class PartitionRangeReadCommand extends ReadCommand
{
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
private final DataRange dataRange;
private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
public PartitionRangeReadCommand(boolean isDigest,
int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
DataRange dataRange,
Optional<IndexMetadata> index)
{
super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
this.dataRange = dataRange;
this.index = index;
}
public PartitionRangeReadCommand(CFMetaData metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
DataRange dataRange,
Optional<IndexMetadata> index)
{
this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
}
/**
* Creates a new read command that query all the data in the table.
*
* @param metadata the table to query.
* @param nowInSec the time in seconds to use are "now" for this query.
*
* @return a newly created read command that queries everything in the table.
*/
public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
{
return new PartitionRangeReadCommand(metadata,
nowInSec,
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
DataRange.allData(metadata.partitioner),
Optional.empty());
}
public DataRange dataRange()
{
return dataRange;
}
public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
{
return dataRange.clusteringIndexFilter(key);
}
public boolean isNamesQuery()
{
return dataRange.isNamesQuery();
}
public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
{
return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
}
public PartitionRangeReadCommand copy()
{
return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
}
public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
{
return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
}
public long getTimeout()
{
return DatabaseDescriptor.getRangeRpcTimeout();
}
public boolean selectsKey(DecoratedKey key)
{
if (!dataRange().contains(key))
return false;
return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
}
public boolean selectsClustering(DecoratedKey key, Clustering clustering)
{
if (clustering == Clustering.STATIC_CLUSTERING)
return !columnFilter().fetchedColumns().statics.isEmpty();
if (!dataRange().clusteringIndexFilter(key).selects(clustering))
return false;
return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
}
public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
{
return StorageProxy.getRangeSlice(this, consistency);
}
public QueryPager getPager(PagingState pagingState, int protocolVersion)
{
return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
}
protected void recordLatency(TableMetrics metric, long latencyNanos)
{
metric.rangeLatency.addNano(latencyNanos);
}
protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
{
ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
try
{
for (Memtable memtable : view.memtables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
}
for (SSTableReader sstable : view.sstables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift());
iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
if (!sstable.isRepaired())
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
}
return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
}
catch (RuntimeException | Error e)
{
try
{
FBUtilities.closeAll(iterators);
}
catch (Exception suppressed)
{
e.addSuppressed(suppressed);
}
throw e;
}
}
@Override
protected int oldestUnrepairedTombstone()
{
return oldestUnrepairedTombstone;
}
private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
{
class CacheFilter extends Transformation
{
@Override
public BaseRowIterator applyToPartition(BaseRowIterator iter)
{
// Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
// (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
DecoratedKey dk = iter.partitionKey();
// Check if this partition is in the rowCache and if it is, if it covers our filter
CachedPartition cached = cfs.getRawCachedPartition(dk);
ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk);
if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec()))
{
// We won't use 'iter' so close it now.
iter.close();
return filter.getUnfilteredRowIterator(columnFilter(), cached);
}
return iter;
}
}
return Transformation.apply(iter, new CacheFilter());
}
public MessageOut<ReadCommand> createMessage(int version)
{
return dataRange().isPaging()
? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer)
: new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer);
}
protected void appendCQLWhereClause(StringBuilder sb)
{
if (dataRange.isUnrestricted() && rowFilter().isEmpty())
return;
sb.append(" WHERE ");
// We put the row filter first because the data range can end by "ORDER BY"
if (!rowFilter().isEmpty())
{
sb.append(rowFilter());
if (!dataRange.isUnrestricted())
sb.append(" AND ");
}
if (!dataRange.isUnrestricted())
sb.append(dataRange.toCQLString(metadata()));
}
/**
* Allow to post-process the result of the query after it has been reconciled on the coordinator
* but before it is passed to the CQL layer to return the ResultSet.
*
* See CASSANDRA-8717 for why this exists.
*/
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
{
ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
Index index = getIndex(cfs);
return index == null ? result : index.postProcessorFor(this).apply(result, this);
}
@Override
public String toString()
{
return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)",
metadata().ksName,
metadata().cfName,
columnFilter(),
rowFilter(),
limits(),
dataRange().toString(metadata()));
}
protected void serializeSelection(DataOutputPlus out, int version) throws IOException
{
DataRange.serializer.serialize(dataRange(), out, version, metadata());
}
protected long selectionSerializedSize(int version)
{
return DataRange.serializer.serializedSize(dataRange(), version, metadata());
}
private static class Deserializer extends SelectionDeserializer
{
public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
}
}
}