| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.service.pager; |
| |
| import java.nio.ByteBuffer; |
| import java.util.NoSuchElementException; |
| |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.aggregation.GroupingState; |
| import org.apache.cassandra.db.filter.DataLimits; |
| import org.apache.cassandra.db.partitions.PartitionIterator; |
| import org.apache.cassandra.db.rows.Row; |
| import org.apache.cassandra.db.rows.RowIterator; |
| import org.apache.cassandra.service.ClientState; |
| |
| /** |
| * {@code QueryPager} that takes care of fetching the pages for aggregation queries. |
| * <p> |
| * For aggregation/group by queries, the user page size is in number of groups. But each group could be composed of very |
| * many rows so to avoid running into OOMs, this pager will page internal queries into sub-pages. So each call to |
| * {@link fetchPage} may (transparently) yield multiple internal queries (sub-pages). |
| */ |
| public final class AggregationQueryPager implements QueryPager |
| { |
| private final DataLimits limits; |
| |
| // The sub-pager, used to retrieve the next sub-page. |
| private QueryPager subPager; |
| |
| public AggregationQueryPager(QueryPager subPager, DataLimits limits) |
| { |
| this.subPager = subPager; |
| this.limits = limits; |
| } |
| |
| @Override |
| public PartitionIterator fetchPage(int pageSize, |
| ConsistencyLevel consistency, |
| ClientState clientState, |
| long queryStartNanoTime) |
| { |
| if (limits.isGroupByLimit()) |
| return new GroupByPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime); |
| |
| return new AggregationPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime); |
| } |
| |
| @Override |
| public ReadExecutionController executionController() |
| { |
| return subPager.executionController(); |
| } |
| |
| @Override |
| public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) |
| { |
| if (limits.isGroupByLimit()) |
| return new GroupByPartitionIterator(pageSize, executionController, System.nanoTime()); |
| |
| return new AggregationPartitionIterator(pageSize, executionController, System.nanoTime()); |
| } |
| |
| @Override |
| public boolean isExhausted() |
| { |
| return subPager.isExhausted(); |
| } |
| |
| @Override |
| public int maxRemaining() |
| { |
| return subPager.maxRemaining(); |
| } |
| |
| @Override |
| public PagingState state() |
| { |
| return subPager.state(); |
| } |
| |
| @Override |
| public QueryPager withUpdatedLimit(DataLimits newLimits) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * <code>PartitionIterator</code> that automatically fetch a new sub-page of data if needed when the current iterator is |
| * exhausted. |
| */ |
| public class GroupByPartitionIterator implements PartitionIterator |
| { |
| /** |
| * The top-level page size in number of groups. |
| */ |
| private final int pageSize; |
| |
| // For "normal" queries |
| private final ConsistencyLevel consistency; |
| private final ClientState clientState; |
| |
| // For internal queries |
| private final ReadExecutionController executionController; |
| |
| /** |
| * The <code>PartitionIterator</code> over the last page retrieved. |
| */ |
| private PartitionIterator partitionIterator; |
| |
| /** |
| * The next <code>RowIterator</code> to be returned. |
| */ |
| private RowIterator next; |
| |
| /** |
| * Specify if all the data have been returned. |
| */ |
| private boolean endOfData; |
| |
| /** |
| * Keeps track if the partitionIterator has been closed or not. |
| */ |
| private boolean closed; |
| |
| /** |
| * The key of the last partition processed. |
| */ |
| private ByteBuffer lastPartitionKey; |
| |
| /** |
| * The clustering of the last row processed |
| */ |
| private Clustering lastClustering; |
| |
| /** |
| * The initial amount of row remaining |
| */ |
| private int initialMaxRemaining; |
| |
| private long queryStartNanoTime; |
| |
| public GroupByPartitionIterator(int pageSize, |
| ConsistencyLevel consistency, |
| ClientState clientState, |
| long queryStartNanoTime) |
| { |
| this(pageSize, consistency, clientState, null, queryStartNanoTime); |
| } |
| |
| public GroupByPartitionIterator(int pageSize, |
| ReadExecutionController executionController, |
| long queryStartNanoTime) |
| { |
| this(pageSize, null, null, executionController, queryStartNanoTime); |
| } |
| |
| private GroupByPartitionIterator(int pageSize, |
| ConsistencyLevel consistency, |
| ClientState clientState, |
| ReadExecutionController executionController, |
| long queryStartNanoTime) |
| { |
| this.pageSize = handlePagingOff(pageSize); |
| this.consistency = consistency; |
| this.clientState = clientState; |
| this.executionController = executionController; |
| this.queryStartNanoTime = queryStartNanoTime; |
| } |
| |
| private int handlePagingOff(int pageSize) |
| { |
| // If the paging is off, the pageSize will be <= 0. So we need to replace |
| // it by DataLimits.NO_LIMIT |
| return pageSize <= 0 ? DataLimits.NO_LIMIT : pageSize; |
| } |
| |
| public final void close() |
| { |
| if (!closed) |
| { |
| closed = true; |
| partitionIterator.close(); |
| } |
| } |
| |
| public final boolean hasNext() |
| { |
| if (endOfData) |
| return false; |
| |
| if (next != null) |
| return true; |
| |
| fetchNextRowIterator(); |
| |
| return next != null; |
| } |
| |
| /** |
| * Loads the next <code>RowIterator</code> to be returned. |
| */ |
| private void fetchNextRowIterator() |
| { |
| if (partitionIterator == null) |
| { |
| initialMaxRemaining = subPager.maxRemaining(); |
| partitionIterator = fetchSubPage(pageSize); |
| } |
| |
| while (!partitionIterator.hasNext()) |
| { |
| partitionIterator.close(); |
| |
| int counted = initialMaxRemaining - subPager.maxRemaining(); |
| |
| if (isDone(pageSize, counted) || subPager.isExhausted()) |
| { |
| endOfData = true; |
| closed = true; |
| return; |
| } |
| |
| subPager = updatePagerLimit(subPager, limits, lastPartitionKey, lastClustering); |
| partitionIterator = fetchSubPage(computeSubPageSize(pageSize, counted)); |
| } |
| |
| next = partitionIterator.next(); |
| } |
| |
| protected boolean isDone(int pageSize, int counted) |
| { |
| return counted == pageSize; |
| } |
| |
| /** |
| * Updates the pager with the new limits if needed. |
| * |
| * @param pager the pager previoulsy used |
| * @param limits the DataLimits |
| * @param lastPartitionKey the partition key of the last row returned |
| * @param lastClustering the clustering of the last row returned |
| * @return the pager to use to query the next page of data |
| */ |
| protected QueryPager updatePagerLimit(QueryPager pager, |
| DataLimits limits, |
| ByteBuffer lastPartitionKey, |
| Clustering lastClustering) |
| { |
| GroupingState state = new GroupingState(lastPartitionKey, lastClustering); |
| DataLimits newLimits = limits.forGroupByInternalPaging(state); |
| return pager.withUpdatedLimit(newLimits); |
| } |
| |
| /** |
| * Computes the size of the next sub-page to retrieve. |
| * |
| * @param pageSize the top-level page size |
| * @param counted the number of result returned so far by the previous sub-pages |
| * @return the size of the next sub-page to retrieve |
| */ |
| protected int computeSubPageSize(int pageSize, int counted) |
| { |
| return pageSize - counted; |
| } |
| |
| /** |
| * Fetchs the next sub-page. |
| * |
| * @param subPageSize the sub-page size in number of groups |
| * @return the next sub-page |
| */ |
| private final PartitionIterator fetchSubPage(int subPageSize) |
| { |
| return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState, queryStartNanoTime) |
| : subPager.fetchPageInternal(subPageSize, executionController); |
| } |
| |
| public final RowIterator next() |
| { |
| if (!hasNext()) |
| throw new NoSuchElementException(); |
| |
| RowIterator iterator = new GroupByRowIterator(next); |
| lastPartitionKey = iterator.partitionKey().getKey(); |
| next = null; |
| return iterator; |
| } |
| |
| private class GroupByRowIterator implements RowIterator |
| { |
| /** |
| * The decorated <code>RowIterator</code>. |
| */ |
| private RowIterator rowIterator; |
| |
| /** |
| * Keeps track if the decorated iterator has been closed or not. |
| */ |
| private boolean closed; |
| |
| public GroupByRowIterator(RowIterator delegate) |
| { |
| this.rowIterator = delegate; |
| } |
| |
| public CFMetaData metadata() |
| { |
| return rowIterator.metadata(); |
| } |
| |
| public boolean isReverseOrder() |
| { |
| return rowIterator.isReverseOrder(); |
| } |
| |
| public PartitionColumns columns() |
| { |
| return rowIterator.columns(); |
| } |
| |
| public DecoratedKey partitionKey() |
| { |
| return rowIterator.partitionKey(); |
| } |
| |
| public Row staticRow() |
| { |
| Row row = rowIterator.staticRow(); |
| lastClustering = null; |
| return row; |
| } |
| |
| public boolean isEmpty() |
| { |
| return this.rowIterator.isEmpty() && !hasNext(); |
| } |
| |
| public void close() |
| { |
| if (!closed) |
| rowIterator.close(); |
| } |
| |
| public boolean hasNext() |
| { |
| if (rowIterator.hasNext()) |
| return true; |
| |
| DecoratedKey partitionKey = rowIterator.partitionKey(); |
| |
| rowIterator.close(); |
| |
| // Fetch the next RowIterator |
| GroupByPartitionIterator.this.hasNext(); |
| |
| // if the previous page was ending within the partition the |
| // next RowIterator is the continuation of this one |
| if (next != null && partitionKey.equals(next.partitionKey())) |
| { |
| rowIterator = next; |
| next = null; |
| return rowIterator.hasNext(); |
| } |
| |
| closed = true; |
| return false; |
| } |
| |
| public Row next() |
| { |
| Row row = this.rowIterator.next(); |
| lastClustering = row.clustering(); |
| return row; |
| } |
| } |
| } |
| |
| /** |
| * <code>PartitionIterator</code> for queries without Group By but with aggregates. |
| * <p>For maintaining backward compatibility we are forced to use the {@link org.apache.cassandra.db.filter.DataLimits.CQLLimits} instead of the |
| * {@link org.apache.cassandra.db.filter.DataLimits.CQLGroupByLimits}. Due to that pages need to be fetched in a different way.</p> |
| */ |
| public final class AggregationPartitionIterator extends GroupByPartitionIterator |
| { |
| public AggregationPartitionIterator(int pageSize, |
| ConsistencyLevel consistency, |
| ClientState clientState, |
| long queryStartNanoTime) |
| { |
| super(pageSize, consistency, clientState, queryStartNanoTime); |
| } |
| |
| public AggregationPartitionIterator(int pageSize, |
| ReadExecutionController executionController, |
| long queryStartNanoTime) |
| { |
| super(pageSize, executionController, queryStartNanoTime); |
| } |
| |
| @Override |
| protected QueryPager updatePagerLimit(QueryPager pager, |
| DataLimits limits, |
| ByteBuffer lastPartitionKey, |
| Clustering lastClustering) |
| { |
| return pager; |
| } |
| |
| @Override |
| protected boolean isDone(int pageSize, int counted) |
| { |
| return false; |
| } |
| |
| @Override |
| protected int computeSubPageSize(int pageSize, int counted) |
| { |
| return pageSize; |
| } |
| } |
| } |