blob: 02623eb913611d018e8e1978b5fa5ae77b64d267 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.pager;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnCounter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractQueryPager implements QueryPager
{
private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class);
private final ConsistencyLevel consistencyLevel;
private final boolean localQuery;
protected final CFMetaData cfm;
protected final IDiskAtomFilter columnFilter;
private final long timestamp;
private int remaining;
private boolean exhausted;
private boolean shouldFetchExtraRow;
protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
int toFetch,
boolean localQuery,
String keyspace,
String columnFamily,
IDiskAtomFilter columnFilter,
long timestamp)
{
this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp);
}
protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
int toFetch,
boolean localQuery,
CFMetaData cfm,
IDiskAtomFilter columnFilter,
long timestamp)
{
this.consistencyLevel = consistencyLevel;
this.localQuery = localQuery;
this.cfm = cfm;
this.columnFilter = columnFilter;
this.timestamp = timestamp;
this.remaining = toFetch;
}
public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
{
if (isExhausted())
return Collections.emptyList();
int currentPageSize = nextPageSize(pageSize);
List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
if (rows.isEmpty())
{
logger.debug("Got empty set of rows, considering pager exhausted");
exhausted = true;
return Collections.emptyList();
}
int liveCount = getPageLiveCount(rows);
logger.debug("Fetched {} live rows", liveCount);
// Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked
// (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient
// but most of the time there should be nothing or very little to trim.
if (liveCount > currentPageSize)
{
rows = discardLast(rows, liveCount - currentPageSize);
liveCount = currentPageSize;
}
remaining -= liveCount;
// If we've got less than requested, there is no more query to do (but
// we still need to return the current page)
if (liveCount < currentPageSize)
{
logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize);
exhausted = true;
}
// If it's not the first query and the first column is the last one returned (likely
// but not certain since paging can race with deletes/expiration), then remove the
// first column.
if (containsPreviousLast(rows.get(0)))
{
rows = discardFirst(rows);
remaining++;
}
// Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size,
// so if the page is full, trim the last entry
else if (shouldFetchExtraRow && !exhausted)
{
// We've asked for one more than necessary
rows = discardLast(rows);
remaining++;
}
logger.debug("Remaining rows to page: {}", remaining);
if (!isExhausted())
shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1));
return rows;
}
private List<Row> filterEmpty(List<Row> result)
{
for (Row row : result)
{
if (row.cf == null || !row.cf.hasColumns())
{
List<Row> newResult = new ArrayList<Row>(result.size() - 1);
for (Row row2 : result)
{
if (row2.cf == null || !row2.cf.hasColumns())
continue;
newResult.add(row2);
}
return newResult;
}
}
return result;
}
protected void restoreState(int remaining, boolean shouldFetchExtraRow)
{
this.remaining = remaining;
this.shouldFetchExtraRow = shouldFetchExtraRow;
}
public boolean isExhausted()
{
return exhausted || remaining == 0;
}
public int maxRemaining()
{
return remaining;
}
public long timestamp()
{
return timestamp;
}
private int nextPageSize(int pageSize)
{
return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0);
}
public ColumnCounter columnCounter()
{
return columnFilter.columnCounter(cfm.comparator, timestamp);
}
protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
/**
* Checks to see if the first row of a new page contains the last row from the previous page.
* @param first the first row of the new page
* @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise
*/
protected abstract boolean containsPreviousLast(Row first);
/**
* Saves the paging state by recording the last seen partition key and cell name (where applicable).
* @param last the last row in the current page
* @return true if an extra row should be fetched in the next page,false otherwise
*/
protected abstract boolean recordLast(Row last);
protected abstract boolean isReversed();
private List<Row> discardFirst(List<Row> rows)
{
return discardFirst(rows, 1);
}
@VisibleForTesting
List<Row> discardFirst(List<Row> rows, int toDiscard)
{
if (toDiscard == 0 || rows.isEmpty())
return rows;
int i = 0;
DecoratedKey firstKey = null;
ColumnFamily firstCf = null;
while (toDiscard > 0 && i < rows.size())
{
Row first = rows.get(i++);
firstKey = first.key;
firstCf = first.cf.cloneMeShallow(isReversed());
toDiscard -= isReversed()
? discardLast(first.cf, toDiscard, firstCf)
: discardFirst(first.cf, toDiscard, firstCf);
}
// If there is less live data than to discard, all is discarded
if (toDiscard > 0)
return Collections.<Row>emptyList();
// i is the index of the first row that we are sure to keep. On top of that,
// we also keep firstCf is it hasn't been fully emptied by the last iteration above.
int count = firstCf.getColumnCount();
int newSize = rows.size() - (count == 0 ? i : i - 1);
List<Row> newRows = new ArrayList<Row>(newSize);
if (count != 0)
newRows.add(new Row(firstKey, firstCf));
newRows.addAll(rows.subList(i, rows.size()));
return newRows;
}
private List<Row> discardLast(List<Row> rows)
{
return discardLast(rows, 1);
}
@VisibleForTesting
List<Row> discardLast(List<Row> rows, int toDiscard)
{
if (toDiscard == 0 || rows.isEmpty())
return rows;
int i = rows.size()-1;
DecoratedKey lastKey = null;
ColumnFamily lastCf = null;
while (toDiscard > 0 && i >= 0)
{
Row last = rows.get(i--);
lastKey = last.key;
lastCf = last.cf.cloneMeShallow(isReversed());
toDiscard -= isReversed()
? discardFirst(last.cf, toDiscard, lastCf)
: discardLast(last.cf, toDiscard, lastCf);
}
// If there is less live data than to discard, all is discarded
if (toDiscard > 0)
return Collections.<Row>emptyList();
// i is the index of the last row that we are sure to keep. On top of that,
// we also keep lastCf is it hasn't been fully emptied by the last iteration above.
int count = lastCf.getColumnCount();
int newSize = count == 0 ? i+1 : i+2;
List<Row> newRows = new ArrayList<Row>(newSize);
newRows.addAll(rows.subList(0, i+1));
if (count != 0)
newRows.add(new Row(lastKey, lastCf));
return newRows;
}
private int getPageLiveCount(List<Row> page)
{
int count = 0;
for (Row row : page)
count += columnCounter().countAll(row.cf).live();
return count;
}
private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
{
boolean isReversed = isReversed();
DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
return isReversed
? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester)
: discardHead(toDiscard, newCf, cf.iterator(), tester);
}
private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
{
boolean isReversed = isReversed();
DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
return isReversed
? discardHead(toDiscard, newCf, cf.reverseIterator(), tester)
: discardTail(cf, toDiscard, newCf, cf.iterator(), tester);
}
private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
{
ColumnCounter counter = columnCounter();
List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
// Discard the first 'toDiscard' live, non-static cells
while (iter.hasNext())
{
Cell c = iter.next();
// if it's a static column, don't count it and save it to add to the trimmed results
ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
{
staticCells.add(c);
continue;
}
counter.count(c, tester);
// once we've discarded the required amount, add the rest
if (counter.live() > toDiscard)
{
for (Cell staticCell : staticCells)
copy.addColumn(staticCell);
copy.addColumn(c);
while (iter.hasNext())
copy.addColumn(iter.next());
}
}
int live = counter.live();
// We want to take into account the row even if it was containing only static columns
if (live == 0 && !staticCells.isEmpty())
live = 1;
return Math.min(live, toDiscard);
}
private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
{
// Redoing the counting like that is not extremely efficient.
// This is called only for reversed slices or in the case of a race between
// paging and a deletion (pretty unlikely), so this is probably acceptable.
int liveCount = columnCounter().countAll(cf).live();
if (liveCount == toDiscard)
return toDiscard;
ColumnCounter counter = columnCounter();
// Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard')
while (iter.hasNext())
{
Cell c = iter.next();
counter.count(c, tester);
if (counter.live() > liveCount - toDiscard)
break;
copy.addColumn(c);
}
return Math.min(liveCount, toDiscard);
}
/**
* Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column
* as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal
* data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we
* need to start from the last non-static cell.
*/
protected Cell firstNonStaticCell(ColumnFamily cf)
{
for (Cell cell : cf)
{
ColumnDefinition def = cfm.getColumnDefinition(cell.name());
if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
return cell;
}
return null;
}
protected static Cell lastCell(ColumnFamily cf)
{
return cf.getReverseSortedColumns().iterator().next();
}
}