blob: e595fcb10a61c25e48ae4aa64f6dcbe4c19aabe6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.pager.MultiPartitionPager;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.service.pager.SinglePartitionPager;
import org.apache.cassandra.transport.ProtocolVersion;
/**
* A {@code ReadQuery} for a single partition.
*/
public interface SinglePartitionReadQuery extends ReadQuery
{
public static Group<? extends SinglePartitionReadQuery> createGroup(TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
List<DecoratedKey> partitionKeys,
ClusteringIndexFilter clusteringIndexFilter)
{
return SinglePartitionReadCommand.Group.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKeys, clusteringIndexFilter);
}
/**
* Creates a new read query on a single partition.
*
* @param metadata the table to query.
* @param nowInSec the time in seconds to use are "now" for this query.
* @param key the partition key for the partition to query.
* @param columnFilter the column filter to use for the query.
* @param filter the clustering index filter to use for the query.
*
* @return a newly created read query. The returned query will use no row filter and have no limits.
*/
public static SinglePartitionReadQuery create(TableMetadata metadata,
int nowInSec,
DecoratedKey key,
ColumnFilter columnFilter,
ClusteringIndexFilter filter)
{
return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
}
/**
* Creates a new read query on a single partition.
*
* @param metadata the table to query.
* @param nowInSec the time in seconds to use are "now" for this query.
* @param columnFilter the column filter to use for the query.
* @param rowFilter the row filter to use for the query.
* @param limits the limits to use for the query.
* @param partitionKey the partition key for the partition to query.
* @param clusteringIndexFilter the clustering index filter to use for the query.
*
* @return a newly created read query.
*/
public static SinglePartitionReadQuery create(TableMetadata metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter)
{
return SinglePartitionReadCommand.create(metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
}
/**
* Returns the key of the partition queried by this {@code ReadQuery}
* @return the key of the partition queried
*/
DecoratedKey partitionKey();
/**
* Creates a new {@code SinglePartitionReadQuery} with the specified limits.
*
* @param newLimits the new limits
* @return the new {@code SinglePartitionReadQuery}
*/
SinglePartitionReadQuery withUpdatedLimit(DataLimits newLimits);
/**
* Returns a new {@code SinglePartitionReadQuery} suitable to paging from the last returned row.
*
* @param lastReturned the last row returned by the previous page. The newly created query
* will only query row that comes after this (in query order). This can be {@code null} if this
* is the first page.
* @param limits the limits to use for the page to query.
*
* @return the newly create query.
*/
SinglePartitionReadQuery forPaging(Clustering<?> lastReturned, DataLimits limits);
@Override
default SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
{
return new SinglePartitionPager(this, pagingState, protocolVersion);
}
ClusteringIndexFilter clusteringIndexFilter();
default boolean selectsKey(DecoratedKey key)
{
if (!this.partitionKey().equals(key))
return false;
return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType);
}
default boolean selectsClustering(DecoratedKey key, Clustering<?> clustering)
{
if (clustering == Clustering.STATIC_CLUSTERING)
return !columnFilter().fetchedColumns().statics.isEmpty();
if (!clusteringIndexFilter().selects(clustering))
return false;
return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
}
/**
* Groups multiple single partition read queries.
*/
abstract class Group<T extends SinglePartitionReadQuery> implements ReadQuery
{
public final List<T> queries;
private final DataLimits limits;
private final int nowInSec;
private final boolean selectsFullPartitions;
public Group(List<T> queries, DataLimits limits)
{
assert !queries.isEmpty();
this.queries = queries;
this.limits = limits;
T firstQuery = queries.get(0);
this.nowInSec = firstQuery.nowInSec();
this.selectsFullPartitions = firstQuery.selectsFullPartition();
for (int i = 1; i < queries.size(); i++)
assert queries.get(i).nowInSec() == nowInSec;
}
public int nowInSec()
{
return nowInSec;
}
public DataLimits limits()
{
return limits;
}
public TableMetadata metadata()
{
return queries.get(0).metadata();
}
@Override
public boolean selectsFullPartition()
{
return selectsFullPartitions;
}
public ReadExecutionController executionController()
{
// Note that the only difference between the queries in a group must be the partition key on which
// they applied. So as far as ReadOrderGroup is concerned, we can use any of the queries to start one.
return queries.get(0).executionController();
}
public PartitionIterator executeInternal(ReadExecutionController controller)
{
// Note that the only difference between the queries in a group must be the partition key on which
// they applied.
boolean enforceStrictLiveness = queries.get(0).metadata().enforceStrictLiveness();
return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec),
nowInSec,
selectsFullPartitions,
enforceStrictLiveness);
}
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
return executeLocally(executionController, true);
}
/**
* Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}.
*
* @param executionController - the {@code ReadExecutionController} protecting the read.
* @param sort - whether to sort the inner queries by partition key, required for merging the iterator
* later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)}
* because in this case it is safe to do so as there is no merging involved and we don't want to
* change the old behavior which was to not sort by partition.
*
* @return - the iterator that can be used to retrieve the query result.
*/
private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort)
{
List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(queries.size());
for (T query : queries)
partitions.add(Pair.of(query.partitionKey(), query.executeLocally(executionController)));
if (sort)
Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft()));
return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList()));
}
public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
{
if (queries.size() == 1)
return new SinglePartitionPager(queries.get(0), pagingState, protocolVersion);
return new MultiPartitionPager<T>(this, pagingState, protocolVersion);
}
public boolean selectsKey(DecoratedKey key)
{
return Iterables.any(queries, c -> c.selectsKey(key));
}
public boolean selectsClustering(DecoratedKey key, Clustering<?> clustering)
{
return Iterables.any(queries, c -> c.selectsClustering(key, clustering));
}
@Override
public RowFilter rowFilter()
{
// Note that the only difference between the query in a group must be the partition key on which
// they applied.
return queries.get(0).rowFilter();
}
@Override
public ColumnFilter columnFilter()
{
// Note that the only difference between the query in a group must be the partition key on which
// they applied.
return queries.get(0).columnFilter();
}
@Override
public void trackWarnings()
{
queries.forEach(ReadQuery::trackWarnings);
}
@Override
public String toString()
{
return queries.toString();
}
}
}