| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.cql3.statements; |
| |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import com.google.common.base.Objects; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Iterators; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.cassandra.auth.Permission; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.ColumnDefinition; |
| import org.apache.cassandra.cql3.*; |
| import org.apache.cassandra.cql3.functions.Function; |
| import org.apache.cassandra.cql3.restrictions.StatementRestrictions; |
| import org.apache.cassandra.cql3.selection.RawSelector; |
| import org.apache.cassandra.cql3.selection.Selection; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.composites.*; |
| import org.apache.cassandra.db.filter.*; |
| import org.apache.cassandra.db.index.SecondaryIndexManager; |
| import org.apache.cassandra.db.marshal.CollectionType; |
| import org.apache.cassandra.db.marshal.CompositeType; |
| import org.apache.cassandra.db.marshal.Int32Type; |
| import org.apache.cassandra.dht.AbstractBounds; |
| import org.apache.cassandra.exceptions.*; |
| import org.apache.cassandra.serializers.MarshalException; |
| import org.apache.cassandra.service.ClientState; |
| import org.apache.cassandra.service.ClientWarn; |
| import org.apache.cassandra.service.QueryState; |
| import org.apache.cassandra.service.StorageProxy; |
| import org.apache.cassandra.service.pager.Pageable; |
| import org.apache.cassandra.service.pager.QueryPager; |
| import org.apache.cassandra.service.pager.QueryPagers; |
| import org.apache.cassandra.thrift.ThriftValidation; |
| import org.apache.cassandra.transport.messages.ResultMessage; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.FBUtilities; |
| |
| import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; |
| import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; |
| import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; |
| import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; |
| import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER; |
| |
| /** |
| * Encapsulates a completely parsed SELECT query, including the target |
| * column family, expression, result count, and ordering clause. |
| * |
| * A number of public methods here are only used internally. However, |
| * many of these are made accessible for the benefit of custom |
| * QueryHandler implementations, so before reducing their accessibility |
| * due consideration should be given. |
| */ |
| public class SelectStatement implements CQLStatement |
| { |
| private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class); |
| |
| private static final int DEFAULT_COUNT_PAGE_SIZE = 10000; |
| |
| private final int boundTerms; |
| public final CFMetaData cfm; |
| public final Parameters parameters; |
| private final Selection selection; |
| private final Term limit; |
| |
| private final StatementRestrictions restrictions; |
| |
| private final boolean isReversed; |
| |
| /** |
| * The comparator used to orders results when multiple keys are selected (using IN). |
| */ |
| private final Comparator<List<ByteBuffer>> orderingComparator; |
| |
| // Used by forSelection below |
| private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier.Raw, Boolean>emptyMap(), false, false, false); |
| |
| public SelectStatement(CFMetaData cfm, |
| int boundTerms, |
| Parameters parameters, |
| Selection selection, |
| StatementRestrictions restrictions, |
| boolean isReversed, |
| Comparator<List<ByteBuffer>> orderingComparator, |
| Term limit) |
| { |
| this.cfm = cfm; |
| this.boundTerms = boundTerms; |
| this.selection = selection; |
| this.restrictions = restrictions; |
| this.isReversed = isReversed; |
| this.orderingComparator = orderingComparator; |
| this.parameters = parameters; |
| this.limit = limit; |
| } |
| |
| public Iterable<Function> getFunctions() |
| { |
| return Iterables.concat(selection.getFunctions(), |
| restrictions.getFunctions(), |
| limit != null ? limit.getFunctions() : Collections.<Function>emptySet()); |
| } |
| |
| // Creates a simple select based on the given selection. |
| // Note that the results select statement should not be used for actual queries, but only for processing already |
| // queried data through processColumnFamily. |
| static SelectStatement forSelection(CFMetaData cfm, Selection selection) |
| { |
| return new SelectStatement(cfm, |
| 0, |
| defaultParameters, |
| selection, |
| StatementRestrictions.empty(cfm), |
| false, |
| null, |
| null); |
| } |
| |
| public ResultSet.ResultMetadata getResultMetadata() |
| { |
| return selection.getResultMetadata(parameters.isJson); |
| } |
| |
| public int getBoundTerms() |
| { |
| return boundTerms; |
| } |
| |
| public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException |
| { |
| state.hasColumnFamilyAccess(keyspace(), columnFamily(), Permission.SELECT); |
| for (Function function : getFunctions()) |
| state.ensureHasPermission(Permission.EXECUTE, function); |
| } |
| |
| public void validate(ClientState state) throws InvalidRequestException |
| { |
| // Nothing to do, all validation has been done by RawStatement.prepare() |
| } |
| |
| public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException |
| { |
| ConsistencyLevel cl = options.getConsistency(); |
| checkNotNull(cl, "Invalid empty consistency level"); |
| |
| cl.validateForRead(keyspace()); |
| |
| int limit = getLimit(options); |
| long now = System.currentTimeMillis(); |
| Pageable command = getPageableCommand(options, limit, now); |
| int pageSize = getPageSize(options); |
| |
| if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) |
| return execute(command, options, limit, now, state); |
| |
| QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState()); |
| return execute(pager, options, limit, now, pageSize); |
| } |
| |
| private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException |
| { |
| int limitForQuery = updateLimitForQuery(limit); |
| if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) |
| return getRangeCommand(options, limitForQuery, now); |
| |
| List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now); |
| return commands == null ? null : new Pageable.ReadCommands(commands, limitForQuery); |
| } |
| |
| public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException |
| { |
| return getPageableCommand(options, getLimit(options), System.currentTimeMillis()); |
| } |
| |
| private int getPageSize(QueryOptions options) |
| { |
| int pageSize = options.getPageSize(); |
| |
| // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM. |
| // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default |
| // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707). |
| if (selection.isAggregate() && pageSize <= 0) |
| pageSize = DEFAULT_COUNT_PAGE_SIZE; |
| |
| return pageSize; |
| } |
| |
| private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) |
| throws RequestValidationException, RequestExecutionException |
| { |
| List<Row> rows; |
| if (command == null) |
| { |
| rows = Collections.<Row>emptyList(); |
| } |
| else |
| { |
| rows = command instanceof Pageable.ReadCommands |
| ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState()) |
| : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency()); |
| } |
| |
| return processResults(rows, options, limit, now); |
| } |
| |
| private ResultMessage.Rows execute(QueryPager pager, QueryOptions options, int limit, long now, int pageSize) |
| throws RequestValidationException, RequestExecutionException |
| { |
| if (selection.isAggregate()) |
| return pageAggregateQuery(pager, options, pageSize, now); |
| |
| // We can't properly do post-query ordering if we page (see #6722) |
| checkFalse(needsPostQueryOrdering(), |
| "Cannot page queries with both ORDER BY and a IN restriction on the partition key;" |
| + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query"); |
| |
| List<Row> page = pager.fetchPage(pageSize); |
| ResultMessage.Rows msg = processResults(page, options, limit, now); |
| |
| if (!pager.isExhausted()) |
| msg.result.metadata.setHasMorePages(pager.state()); |
| |
| return msg; |
| } |
| |
| private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now) |
| throws RequestValidationException, RequestExecutionException |
| { |
| if (!restrictions.hasPartitionKeyRestrictions()) |
| { |
| logger.warn("Aggregation query used without partition key"); |
| ClientWarn.instance.warn("Aggregation query used without partition key"); |
| } |
| else if (restrictions.keyIsInRelation()) |
| { |
| logger.warn("Aggregation query used on multiple partition keys (IN restriction)"); |
| ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)"); |
| } |
| |
| Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson); |
| while (!pager.isExhausted()) |
| { |
| for (Row row : pager.fetchPage(pageSize)) |
| { |
| // Not columns match the query, skip |
| if (row.cf == null) |
| continue; |
| |
| processColumnFamily(row.key.getKey(), row.cf, options, now, result); |
| } |
| } |
| return new ResultMessage.Rows(result.build(options.getProtocolVersion())); |
| } |
| |
| public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException |
| { |
| ResultSet rset = process(rows, options, limit, now); |
| return new ResultMessage.Rows(rset); |
| } |
| |
| static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds) |
| { |
| Keyspace keyspace = Keyspace.open(keyspaceName); |
| List<Row> rows = new ArrayList<Row>(cmds.size()); |
| for (ReadCommand cmd : cmds) |
| rows.add(cmd.getRow(keyspace)); |
| return rows; |
| } |
| |
| public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException |
| { |
| return executeInternal(state, options, System.currentTimeMillis()); |
| } |
| |
| public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException |
| { |
| int limit = getLimit(options); |
| Pageable command = getPageableCommand(options, limit, now); |
| int pageSize = getPageSize(options); |
| |
| if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) |
| { |
| List<Row> rows = command == null |
| ? Collections.<Row>emptyList() |
| : (command instanceof Pageable.ReadCommands |
| ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands) |
| : ((RangeSliceCommand)command).executeLocally()); |
| |
| return processResults(rows, options, limit, now); |
| } |
| |
| QueryPager pager = QueryPagers.localPager(command); |
| return execute(pager, options, limit, now, pageSize); |
| } |
| |
| public ResultSet process(List<Row> rows) throws InvalidRequestException |
| { |
| QueryOptions options = QueryOptions.DEFAULT; |
| return process(rows, options, getLimit(options), System.currentTimeMillis()); |
| } |
| |
| public String keyspace() |
| { |
| return cfm.ksName; |
| } |
| |
| public String columnFamily() |
| { |
| return cfm.cfName; |
| } |
| |
| /** |
| * May be used by custom QueryHandler implementations |
| */ |
| public Selection getSelection() |
| { |
| return selection; |
| } |
| |
| /** |
| * May be used by custom QueryHandler implementations |
| */ |
| public StatementRestrictions getRestrictions() |
| { |
| return restrictions; |
| } |
| |
| private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException |
| { |
| Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options); |
| |
| List<ReadCommand> commands = new ArrayList<>(keys.size()); |
| |
| IDiskAtomFilter filter = makeFilter(options, limit); |
| if (filter == null) |
| return null; |
| |
| // Note that we use the total limit for every key, which is potentially inefficient. |
| // However, IN + LIMIT is not a very sensible choice. |
| for (ByteBuffer key : keys) |
| { |
| QueryProcessor.validateKey(key); |
| // We should not share the slice filter amongst the commands (hence the cloneShallow), due to |
| // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method |
| // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly) |
| commands.add(ReadCommand.create(keyspace(), ByteBufferUtil.clone(key), columnFamily(), now, filter.cloneShallow())); |
| } |
| |
| return commands; |
| } |
| |
| private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException |
| { |
| IDiskAtomFilter filter = makeFilter(options, limit); |
| if (filter == null) |
| return null; |
| |
| List<IndexExpression> expressions = getValidatedIndexExpressions(options); |
| // The LIMIT provided by the user is the number of CQL row he wants returned. |
| // We want to have getRangeSlice to count the number of columns, not the number of keys. |
| AbstractBounds<RowPosition> keyBounds = restrictions.getPartitionKeyBounds(options); |
| return keyBounds == null |
| ? null |
| : new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false); |
| } |
| |
| private ColumnSlice makeStaticSlice() |
| { |
| // Note: we could use staticPrefix.start() for the start bound, but EMPTY gives us the |
| // same effect while saving a few CPU cycles. |
| return isReversed |
| ? new ColumnSlice(cfm.comparator.staticPrefix().end(), Composites.EMPTY) |
| : new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()); |
| } |
| |
| private IDiskAtomFilter makeFilter(QueryOptions options, int limit) |
| throws InvalidRequestException |
| { |
| int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); |
| if (parameters.isDistinct) |
| { |
| // For distinct, we only care about fetching the beginning of each partition. If we don't have |
| // static columns, we in fact only care about the first cell, so we query only that (we don't "group"). |
| // If we do have static columns, we do need to fetch the first full group (to have the static columns values). |
| |
| // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for |
| // DISTINCT queries on the partition key only. |
| toGroup = selection.containsStaticColumns() ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS; |
| return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup); |
| } |
| else if (restrictions.isColumnRange()) |
| { |
| List<Composite> startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options); |
| List<Composite> endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options); |
| assert startBounds.size() == endBounds.size(); |
| |
| // Handles fetching static columns. Note that for 2i, the filter is just used to restrict |
| // the part of the index to query so adding the static slice would be useless and confusing. |
| // For 2i, static columns are retrieve in CompositesSearcher with each index hit. |
| ColumnSlice staticSlice = selection.containsStaticColumns() && !restrictions.usesSecondaryIndexing() |
| ? makeStaticSlice() |
| : null; |
| |
| // The case where startBounds == 1 is common enough that it's worth optimizing |
| if (startBounds.size() == 1) |
| { |
| ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0)); |
| if (slice.isAlwaysEmpty(cfm.comparator, isReversed)) |
| return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup); |
| |
| if (staticSlice == null) |
| return sliceFilter(slice, limit, toGroup); |
| |
| if (isReversed) |
| return slice.includes(cfm.comparator.reverseComparator(), staticSlice.start) |
| ? sliceFilter(new ColumnSlice(slice.start, staticSlice.finish), limit, toGroup) |
| : sliceFilter(new ColumnSlice[]{ slice, staticSlice }, limit, toGroup); |
| else |
| return slice.includes(cfm.comparator, staticSlice.finish) |
| ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup) |
| : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup); |
| } |
| |
| List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size()); |
| for (int i = 0; i < startBounds.size(); i++) |
| { |
| ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i)); |
| if (!slice.isAlwaysEmpty(cfm.comparator, isReversed)) |
| l.add(slice); |
| } |
| |
| if (l.isEmpty()) |
| return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup); |
| if (staticSlice == null) |
| return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup); |
| |
| // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but if there is |
| // a static slice, it could overlap with the 2nd slice. Check for it and correct if that's the case |
| ColumnSlice[] slices; |
| if (isReversed) |
| { |
| if (l.get(l.size() - 1).includes(cfm.comparator.reverseComparator(), staticSlice.start)) |
| { |
| slices = l.toArray(new ColumnSlice[l.size()]); |
| slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, Composites.EMPTY); |
| } |
| else |
| { |
| slices = l.toArray(new ColumnSlice[l.size()+1]); |
| slices[slices.length-1] = staticSlice; |
| } |
| } |
| else |
| { |
| if (l.get(0).includes(cfm.comparator, staticSlice.finish)) |
| { |
| slices = new ColumnSlice[l.size()]; |
| slices[0] = new ColumnSlice(Composites.EMPTY, l.get(0).finish); |
| for (int i = 1; i < l.size(); i++) |
| slices[i] = l.get(i); |
| } |
| else |
| { |
| slices = new ColumnSlice[l.size()+1]; |
| slices[0] = staticSlice; |
| for (int i = 0; i < l.size(); i++) |
| slices[i+1] = l.get(i); |
| } |
| } |
| return sliceFilter(slices, limit, toGroup); |
| } |
| else |
| { |
| SortedSet<CellName> cellNames = getRequestedColumns(options); |
| if (cellNames == null) // in case of IN () for the last column of the key |
| return null; |
| QueryProcessor.validateCellNames(cellNames, cfm.comparator); |
| return new NamesQueryFilter(cellNames, true); |
| } |
| } |
| |
| private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup) |
| { |
| return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup); |
| } |
| |
| private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup) |
| { |
| assert ColumnSlice.validateSlices(slices, cfm.comparator, isReversed) : String.format("Invalid slices: " + Arrays.toString(slices) + (isReversed ? " (reversed)" : "")); |
| return new SliceQueryFilter(slices, isReversed, limit, toGroup); |
| } |
| |
| /** |
| * May be used by custom QueryHandler implementations |
| */ |
| public int getLimit(QueryOptions options) throws InvalidRequestException |
| { |
| if (limit != null) |
| { |
| ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit"); |
| // treat UNSET limit value as 'unlimited' |
| if (b == UNSET_BYTE_BUFFER) |
| return Integer.MAX_VALUE; |
| try |
| { |
| Int32Type.instance.validate(b); |
| int l = Int32Type.instance.compose(b); |
| checkTrue(l > 0, "LIMIT must be strictly positive"); |
| return l; |
| } |
| catch (MarshalException e) |
| { |
| throw new InvalidRequestException("Invalid limit value"); |
| } |
| } |
| return Integer.MAX_VALUE; |
| } |
| |
| private int updateLimitForQuery(int limit) |
| { |
| // If the query is for an aggregate, we do not want to limit the number of rows retrieved. The LIMIT |
| // clause apply to the number of rows returned to the user and not to the number of rows retrieved. |
| if (selection.isAggregate()) |
| return Integer.MAX_VALUE; |
| // Internally, we don't support exclusive bounds for slices. Instead, we query one more element if necessary |
| // and exclude it later (in processColumnFamily) |
| return restrictions.isNonCompositeSliceWithExclusiveBounds() && limit != Integer.MAX_VALUE |
| ? limit + 1 |
| : limit; |
| } |
| |
| private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException |
| { |
| // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762 |
| // we always do a slice for CQL3 tables, so it's ok to ignore them here |
| assert !restrictions.isColumnRange(); |
| SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator); |
| for (Composite composite : restrictions.getClusteringColumnsAsComposites(options)) |
| columns.addAll(addSelectedColumns(composite)); |
| return columns; |
| } |
| |
| private SortedSet<CellName> addSelectedColumns(Composite prefix) |
| { |
| if (cfm.comparator.isDense()) |
| { |
| return FBUtilities.singleton(cfm.comparator.create(prefix, null), cfm.comparator); |
| } |
| else |
| { |
| SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator); |
| |
| // We need to query the selected column as well as the marker |
| // column (for the case where the row exists but has no columns outside the PK) |
| // Two exceptions are "static CF" (non-composite non-compact CF) and "super CF" |
| // that don't have marker and for which we must query all columns instead |
| if (cfm.comparator.isCompound() && !cfm.isSuper()) |
| { |
| // marker |
| columns.add(cfm.comparator.rowMarker(prefix)); |
| |
| // selected columns |
| for (ColumnDefinition def : selection.getColumns()) |
| if (def.isRegular() || def.isStatic()) |
| columns.add(cfm.comparator.create(prefix, def)); |
| } |
| else |
| { |
| // We now that we're not composite so we can ignore static columns |
| for (ColumnDefinition def : cfm.regularColumns()) |
| columns.add(cfm.comparator.create(prefix, def)); |
| } |
| return columns; |
| } |
| } |
| |
| /** |
| * May be used by custom QueryHandler implementations |
| */ |
| public List<IndexExpression> getValidatedIndexExpressions(QueryOptions options) throws InvalidRequestException |
| { |
| if (!restrictions.usesSecondaryIndexing()) |
| return Collections.emptyList(); |
| |
| ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily()); |
| SecondaryIndexManager secondaryIndexManager = cfs.indexManager; |
| |
| List<IndexExpression> expressions = restrictions.getIndexExpressions(secondaryIndexManager, options); |
| secondaryIndexManager.validateIndexSearchersForQuery(expressions); |
| |
| return expressions; |
| } |
| |
| private CellName makeExclusiveSliceBound(Bound bound, CellNameType type, QueryOptions options) throws InvalidRequestException |
| { |
| // clusteringColumnBounds may reverse bound if clustering order is reversed |
| // but areRequestedBoundsInclusive checks for Restriction::isInclusive and never |
| // reverses the order. In order to avoid inconsistencies and check inclusive |
| // bounds correctly, we need to check for column order and reverse it. See CASSANDRA-10988 |
| if (restrictions.areRequestedBoundsInclusive(reverseBoundIfNeeded(bound))) |
| return null; |
| |
| return type.makeCellName(restrictions.getClusteringColumnsBounds(bound, options).get(0)); |
| } |
| |
| /** |
| * Reverses the specified bound if the non-compound clustering column is a reversed one. |
| * @param bound bound to reverse |
| * @return the bound reversed if the column type was a reversed one or the original bound |
| */ |
| private Bound reverseBoundIfNeeded(Bound bound) |
| { |
| assert !cfm.comparator.isCompound(); |
| |
| List<ColumnDefinition> columnDefs = cfm.clusteringColumns(); |
| return columnDefs.get(columnDefs.size() - 1).isReversedType() ? bound.reverse() : bound; |
| } |
| |
| private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException |
| { |
| final CellNameType type = cfm.comparator; |
| |
| final CellName excludedStart = makeExclusiveSliceBound(Bound.START, type, options); |
| final CellName excludedEnd = makeExclusiveSliceBound(Bound.END, type, options); |
| |
| return Iterators.filter(cells, new Predicate<Cell>() |
| { |
| public boolean apply(Cell c) |
| { |
| // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless |
| // the comparator is composite that is)), filter here |
| return !((excludedStart != null && type.compare(c.name(), excludedStart) == 0) |
| || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0)); |
| } |
| }); |
| } |
| |
| private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException |
| { |
| Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson); |
| for (Row row : rows) |
| { |
| // Not columns match the query, skip |
| if (row.cf == null) |
| continue; |
| |
| processColumnFamily(row.key.getKey(), row.cf, options, now, result); |
| } |
| |
| ResultSet cqlRows = result.build(options.getProtocolVersion()); |
| |
| orderResults(cqlRows); |
| |
| // Internal calls always return columns in the comparator order, even when reverse was set |
| if (isReversed) |
| cqlRows.reverse(); |
| |
| // Trim result if needed to respect the user limit |
| cqlRows.trim(limit); |
| return cqlRows; |
| } |
| |
| // Used by ModificationStatement for CAS operations |
| void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result) |
| throws InvalidRequestException |
| { |
| CFMetaData cfm = cf.metadata(); |
| ByteBuffer[] keyComponents = null; |
| if (cfm.getKeyValidator() instanceof CompositeType) |
| { |
| keyComponents = ((CompositeType)cfm.getKeyValidator()).split(key); |
| } |
| else |
| { |
| keyComponents = new ByteBuffer[]{ key }; |
| } |
| |
| Iterator<Cell> cells = cf.getSortedColumns().iterator(); |
| if (restrictions.isNonCompositeSliceWithExclusiveBounds()) |
| cells = applySliceRestriction(cells, options); |
| |
| int protocolVersion = options.getProtocolVersion(); |
| CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells); |
| |
| // If there is static columns but there is no non-static row, then provided the select was a full |
| // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns) |
| // then we want to include the static columns in the result set (and we're done). |
| CQL3Row staticRow = iter.getStaticRow(); |
| if (staticRow != null && !iter.hasNext() && !restrictions.usesSecondaryIndexing() && restrictions.hasNoClusteringColumnsRestriction()) |
| { |
| result.newRow(protocolVersion); |
| for (ColumnDefinition def : selection.getColumns()) |
| { |
| switch (def.kind) |
| { |
| case PARTITION_KEY: |
| result.add(keyComponents[def.position()]); |
| break; |
| case STATIC: |
| addValue(result, def, staticRow, options); |
| break; |
| default: |
| result.add((ByteBuffer)null); |
| } |
| } |
| return; |
| } |
| |
| while (iter.hasNext()) |
| { |
| CQL3Row cql3Row = iter.next(); |
| |
| // Respect requested order |
| result.newRow(protocolVersion); |
| // Respect selection order |
| for (ColumnDefinition def : selection.getColumns()) |
| { |
| switch (def.kind) |
| { |
| case PARTITION_KEY: |
| result.add(keyComponents[def.position()]); |
| break; |
| case CLUSTERING_COLUMN: |
| result.add(cql3Row.getClusteringColumn(def.position())); |
| break; |
| case COMPACT_VALUE: |
| result.add(cql3Row.getColumn(null)); |
| break; |
| case REGULAR: |
| addValue(result, def, cql3Row, options); |
| break; |
| case STATIC: |
| addValue(result, def, staticRow, options); |
| break; |
| } |
| } |
| } |
| } |
| |
| private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options) |
| { |
| if (row == null) |
| { |
| result.add((ByteBuffer)null); |
| return; |
| } |
| |
| if (def.type.isMultiCell()) |
| { |
| List<Cell> cells = row.getMultiCellColumn(def.name); |
| ByteBuffer buffer = cells == null |
| ? null |
| : ((CollectionType)def.type).serializeForNativeProtocol(def, cells, options.getProtocolVersion()); |
| result.add(buffer); |
| return; |
| } |
| |
| result.add(row.getColumn(def.name)); |
| } |
| |
| private boolean needsPostQueryOrdering() |
| { |
| // We need post-query ordering only for queries with IN on the partition key and an ORDER BY. |
| return restrictions.keyIsInRelation() && !parameters.orderings.isEmpty(); |
| } |
| |
| /** |
| * Orders results when multiple keys are selected (using IN) |
| */ |
| private void orderResults(ResultSet cqlRows) |
| { |
| if (cqlRows.size() == 0 || !needsPostQueryOrdering()) |
| return; |
| |
| Collections.sort(cqlRows.rows, orderingComparator); |
| } |
| |
| public static class RawStatement extends CFStatement |
| { |
| private final Parameters parameters; |
| private final List<RawSelector> selectClause; |
| private final List<Relation> whereClause; |
| private final Term.Raw limit; |
| |
| public RawStatement(CFName cfName, Parameters parameters, List<RawSelector> selectClause, List<Relation> whereClause, Term.Raw limit) |
| { |
| super(cfName); |
| this.parameters = parameters; |
| this.selectClause = selectClause; |
| this.whereClause = whereClause == null ? Collections.<Relation>emptyList() : whereClause; |
| this.limit = limit; |
| } |
| |
| public ParsedStatement.Prepared prepare() throws InvalidRequestException |
| { |
| CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); |
| VariableSpecifications boundNames = getBoundVariables(); |
| |
| Selection selection = selectClause.isEmpty() |
| ? Selection.wildcard(cfm) |
| : Selection.fromSelectors(cfm, selectClause); |
| |
| StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection); |
| |
| if (parameters.isDistinct) |
| validateDistinctSelection(cfm, selection, restrictions); |
| |
| Comparator<List<ByteBuffer>> orderingComparator = null; |
| boolean isReversed = false; |
| |
| if (!parameters.orderings.isEmpty()) |
| { |
| verifyOrderingIsAllowed(restrictions); |
| orderingComparator = getOrderingComparator(cfm, selection, restrictions); |
| isReversed = isReversed(cfm); |
| } |
| |
| if (isReversed) |
| restrictions.reverse(); |
| |
| checkNeedsFiltering(restrictions); |
| |
| SelectStatement stmt = new SelectStatement(cfm, |
| boundNames.size(), |
| parameters, |
| selection, |
| restrictions, |
| isReversed, |
| orderingComparator, |
| prepareLimit(boundNames)); |
| |
| return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm)); |
| } |
| |
| /** |
| * Prepares the restrictions. |
| * |
| * @param cfm the column family meta data |
| * @param boundNames the variable specifications |
| * @param selection the selection |
| * @return the restrictions |
| * @throws InvalidRequestException if a problem occurs while building the restrictions |
| */ |
| private StatementRestrictions prepareRestrictions(CFMetaData cfm, |
| VariableSpecifications boundNames, |
| Selection selection) throws InvalidRequestException |
| { |
| try |
| { |
| return new StatementRestrictions(cfm, |
| whereClause, |
| boundNames, |
| selection.containsOnlyStaticColumns(), |
| selection.containsACollection(), |
| parameters.allowFiltering); |
| } |
| catch (UnrecognizedEntityException e) |
| { |
| if (containsAlias(e.entity)) |
| throw invalidRequest("Aliases aren't allowed in the where clause ('%s')", e.relation); |
| throw e; |
| } |
| } |
| |
| /** Returns a Term for the limit or null if no limit is set */ |
| private Term prepareLimit(VariableSpecifications boundNames) throws InvalidRequestException |
| { |
| if (limit == null) |
| return null; |
| |
| Term prepLimit = limit.prepare(keyspace(), limitReceiver()); |
| prepLimit.collectMarkerSpecification(boundNames); |
| return prepLimit; |
| } |
| |
| private static void verifyOrderingIsAllowed(StatementRestrictions restrictions) throws InvalidRequestException |
| { |
| checkFalse(restrictions.usesSecondaryIndexing(), "ORDER BY with 2ndary indexes is not supported."); |
| checkFalse(restrictions.isKeyRange(), "ORDER BY is only supported when the partition key is restricted by an EQ or an IN."); |
| } |
| |
| private static void validateDistinctSelection(CFMetaData cfm, |
| Selection selection, |
| StatementRestrictions restrictions) |
| throws InvalidRequestException |
| { |
| checkFalse(restrictions.hasClusteringColumnsRestriction() || restrictions.hasNonPrimaryKeyRestrictions(), |
| "SELECT DISTINCT with WHERE clause only supports restriction by partition key."); |
| |
| Collection<ColumnDefinition> requestedColumns = selection.getColumns(); |
| for (ColumnDefinition def : requestedColumns) |
| checkFalse(!def.isPartitionKey() && !def.isStatic(), |
| "SELECT DISTINCT queries must only request partition key columns and/or static columns (not %s)", |
| def.name); |
| |
| // If it's a key range, we require that all partition key columns are selected so we don't have to bother |
| // with post-query grouping. |
| if (!restrictions.isKeyRange()) |
| return; |
| |
| for (ColumnDefinition def : cfm.partitionKeyColumns()) |
| checkTrue(requestedColumns.contains(def), |
| "SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name); |
| } |
| |
| private void handleUnrecognizedOrderingColumn(ColumnIdentifier column) throws InvalidRequestException |
| { |
| checkFalse(containsAlias(column), "Aliases are not allowed in order by clause ('%s')", column); |
| checkFalse(true, "Order by on unknown column %s", column); |
| } |
| |
| private Comparator<List<ByteBuffer>> getOrderingComparator(CFMetaData cfm, |
| Selection selection, |
| StatementRestrictions restrictions) |
| throws InvalidRequestException |
| { |
| if (!restrictions.keyIsInRelation()) |
| return null; |
| |
| Map<ColumnIdentifier, Integer> orderingIndexes = getOrderingIndex(cfm, selection); |
| |
| List<Integer> idToSort = new ArrayList<Integer>(); |
| List<Comparator<ByteBuffer>> sorters = new ArrayList<Comparator<ByteBuffer>>(); |
| |
| for (ColumnIdentifier.Raw raw : parameters.orderings.keySet()) |
| { |
| ColumnIdentifier identifier = raw.prepare(cfm); |
| ColumnDefinition orderingColumn = cfm.getColumnDefinition(identifier); |
| idToSort.add(orderingIndexes.get(orderingColumn.name)); |
| sorters.add(orderingColumn.type); |
| } |
| return idToSort.size() == 1 ? new SingleColumnComparator(idToSort.get(0), sorters.get(0)) |
| : new CompositeComparator(sorters, idToSort); |
| } |
| |
| private Map<ColumnIdentifier, Integer> getOrderingIndex(CFMetaData cfm, Selection selection) |
| throws InvalidRequestException |
| { |
| // If we order post-query (see orderResults), the sorted column needs to be in the ResultSet for sorting, |
| // even if we don't |
| // ultimately ship them to the client (CASSANDRA-4911). |
| Map<ColumnIdentifier, Integer> orderingIndexes = new HashMap<>(); |
| for (ColumnIdentifier.Raw raw : parameters.orderings.keySet()) |
| { |
| ColumnIdentifier column = raw.prepare(cfm); |
| final ColumnDefinition def = cfm.getColumnDefinition(column); |
| if (def == null) |
| handleUnrecognizedOrderingColumn(column); |
| int index = selection.getResultSetIndex(def); |
| if (index < 0) |
| index = selection.addColumnForOrdering(def); |
| orderingIndexes.put(def.name, index); |
| } |
| return orderingIndexes; |
| } |
| |
| private boolean isReversed(CFMetaData cfm) throws InvalidRequestException |
| { |
| Boolean[] reversedMap = new Boolean[cfm.clusteringColumns().size()]; |
| int i = 0; |
| for (Map.Entry<ColumnIdentifier.Raw, Boolean> entry : parameters.orderings.entrySet()) |
| { |
| ColumnIdentifier column = entry.getKey().prepare(cfm); |
| boolean reversed = entry.getValue(); |
| |
| ColumnDefinition def = cfm.getColumnDefinition(column); |
| if (def == null) |
| handleUnrecognizedOrderingColumn(column); |
| |
| checkTrue(def.isClusteringColumn(), |
| "Order by is currently only supported on the clustered columns of the PRIMARY KEY, got %s", column); |
| |
| checkTrue(i++ == def.position(), |
| "Order by currently only support the ordering of columns following their declared order in the PRIMARY KEY"); |
| |
| reversedMap[def.position()] = (reversed != def.isReversedType()); |
| } |
| |
| // Check that all boolean in reversedMap, if set, agrees |
| Boolean isReversed = null; |
| for (Boolean b : reversedMap) |
| { |
| // Column on which order is specified can be in any order |
| if (b == null) |
| continue; |
| |
| if (isReversed == null) |
| { |
| isReversed = b; |
| continue; |
| } |
| checkTrue(isReversed.equals(b), "Unsupported order by relation"); |
| } |
| assert isReversed != null; |
| return isReversed; |
| } |
| |
| /** If ALLOW FILTERING was not specified, this verifies that it is not needed */ |
| private void checkNeedsFiltering(StatementRestrictions restrictions) throws InvalidRequestException |
| { |
| // non-key-range non-indexed queries cannot involve filtering underneath |
| if (!parameters.allowFiltering && (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())) |
| { |
| // We will potentially filter data if either: |
| // - Have more than one IndexExpression |
| // - Have no index expression and the column filter is not the identity |
| checkFalse(restrictions.needFiltering(), |
| StatementRestrictions.REQUIRES_ALLOW_FILTERING_MESSAGE); |
| } |
| |
| // We don't internally support exclusive slice bounds on non-composite tables. To deal with it we do an |
| // inclusive slice and remove post-query the value that shouldn't be returned. One problem however is that |
| // if there is a user limit, that limit may make the query return before the end of the slice is reached, |
| // in which case, once we'll have removed bound post-query, we might end up with less results than |
| // requested which would be incorrect. For single-partition query, this is not a problem, we just ask for |
| // one more result (see updateLimitForQuery()) since that's enough to compensate for that problem. For key |
| // range however, each returned row may include one result that will have to be trimmed, so we would have |
| // to bump the query limit by N where N is the number of rows we will return, but we don't know that in |
| // advance. So, since we currently don't have a good way to handle such query, we refuse it (#7059) rather |
| // than answering with something that is wrong. |
| if (restrictions.isNonCompositeSliceWithExclusiveBounds() && restrictions.isKeyRange() && limit != null) |
| { |
| SingleColumnRelation rel = findInclusiveClusteringRelationForCompact(restrictions.cfm); |
| throw invalidRequest("The query requests a restriction of rows with a strict bound (%s) over a range of partitions. " |
| + "This is not supported by the underlying storage engine for COMPACT tables if a LIMIT is provided. " |
| + "Please either make the condition non strict (%s) or remove the user LIMIT", rel, rel.withNonStrictOperator()); |
| } |
| } |
| |
| private SingleColumnRelation findInclusiveClusteringRelationForCompact(CFMetaData cfm) |
| { |
| for (Relation r : whereClause) |
| { |
| // We only call this when sliceRestriction != null, i.e. for compact table with non composite comparator, |
| // so it can't be a MultiColumnRelation. |
| SingleColumnRelation rel = (SingleColumnRelation)r; |
| |
| if (cfm.getColumnDefinition(rel.getEntity().prepare(cfm)).isClusteringColumn() |
| && (rel.operator() == Operator.GT || rel.operator() == Operator.LT)) |
| return rel; |
| } |
| |
| // We're not supposed to call this method unless we know this can't happen |
| throw new AssertionError(); |
| } |
| |
| private boolean containsAlias(final ColumnIdentifier name) |
| { |
| return Iterables.any(selectClause, new Predicate<RawSelector>() |
| { |
| public boolean apply(RawSelector raw) |
| { |
| return name.equals(raw.alias); |
| } |
| }); |
| } |
| |
| private ColumnSpecification limitReceiver() |
| { |
| return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[limit]", true), Int32Type.instance); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return Objects.toStringHelper(this) |
| .add("name", cfName) |
| .add("selectClause", selectClause) |
| .add("whereClause", whereClause) |
| .add("isDistinct", parameters.isDistinct) |
| .toString(); |
| } |
| } |
| |
| public static class Parameters |
| { |
| // Public because CASSANDRA-9858 |
| public final Map<ColumnIdentifier.Raw, Boolean> orderings; |
| public final boolean isDistinct; |
| public final boolean allowFiltering; |
| public final boolean isJson; |
| |
| public Parameters(Map<ColumnIdentifier.Raw, Boolean> orderings, |
| boolean isDistinct, |
| boolean allowFiltering, |
| boolean isJson) |
| { |
| this.orderings = orderings; |
| this.isDistinct = isDistinct; |
| this.allowFiltering = allowFiltering; |
| this.isJson = isJson; |
| } |
| } |
| |
| private static abstract class ColumnComparator<T> implements Comparator<T> |
| { |
| protected final int compare(Comparator<ByteBuffer> comparator, ByteBuffer aValue, ByteBuffer bValue) |
| { |
| if (aValue == null) |
| return bValue == null ? 0 : -1; |
| |
| return bValue == null ? 1 : comparator.compare(aValue, bValue); |
| } |
| } |
| |
| /** |
| * Used in orderResults(...) method when single 'ORDER BY' condition where given |
| */ |
| private static class SingleColumnComparator extends ColumnComparator<List<ByteBuffer>> |
| { |
| private final int index; |
| private final Comparator<ByteBuffer> comparator; |
| |
| public SingleColumnComparator(int columnIndex, Comparator<ByteBuffer> orderer) |
| { |
| index = columnIndex; |
| comparator = orderer; |
| } |
| |
| public int compare(List<ByteBuffer> a, List<ByteBuffer> b) |
| { |
| return compare(comparator, a.get(index), b.get(index)); |
| } |
| } |
| |
| /** |
| * Used in orderResults(...) method when multiple 'ORDER BY' conditions where given |
| */ |
| private static class CompositeComparator extends ColumnComparator<List<ByteBuffer>> |
| { |
| private final List<Comparator<ByteBuffer>> orderTypes; |
| private final List<Integer> positions; |
| |
| private CompositeComparator(List<Comparator<ByteBuffer>> orderTypes, List<Integer> positions) |
| { |
| this.orderTypes = orderTypes; |
| this.positions = positions; |
| } |
| |
| public int compare(List<ByteBuffer> a, List<ByteBuffer> b) |
| { |
| for (int i = 0; i < positions.size(); i++) |
| { |
| Comparator<ByteBuffer> type = orderTypes.get(i); |
| int columnPos = positions.get(i); |
| |
| int comparison = compare(type, a.get(columnPos), b.get(columnPos)); |
| |
| if (comparison != 0) |
| return comparison; |
| } |
| |
| return 0; |
| } |
| } |
| } |