blob: cf2bc13776ec3634b4e4a990a79ed6ccf329174d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.filter;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.aggregation.GroupMaker;
import org.apache.cassandra.db.aggregation.GroupingState;
import org.apache.cassandra.db.aggregation.AggregationSpecification;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.transform.BasePartitions;
import org.apache.cassandra.db.transform.BaseRows;
import org.apache.cassandra.db.transform.StoppingTransformation;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Object in charge of tracking if we have fetch enough data for a given query.
*
* The reason this is not just a simple integer is that Thrift and CQL3 count
* stuffs in different ways. This is what abstract those differences.
*/
public abstract class DataLimits
{
public static final Serializer serializer = new Serializer();
public static final int NO_LIMIT = Integer.MAX_VALUE;
public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
{
@Override
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return false;
}
@Override
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return iter;
}
@Override
public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return iter;
}
@Override
public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return iter;
}
};
// We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per
// partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT, CQL_GROUP_BY_LIMIT, CQL_GROUP_BY_PAGING_LIMIT }
public static DataLimits cqlLimits(int cqlRowLimit)
{
return cqlRowLimit == NO_LIMIT ? NONE : new CQLLimits(cqlRowLimit);
}
// mixed mode partition range scans on compact storage tables without clustering columns coordinated by 2.x are
// returned as one (cql) row per cell, but we need to count each partition as a single row. So we just return a
// CQLLimits instance that doesn't count rows towards it's limit. See CASSANDRA-15072
public static DataLimits legacyCompactStaticCqlLimits(int cqlRowLimits)
{
return new CQLLimits(cqlRowLimits) {
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness) {
public Row applyToRow(Row row)
{
// noop: only count full partitions
return row;
}
};
}
};
}
public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit)
{
return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT
? NONE
: new CQLLimits(cqlRowLimit, perPartitionLimit);
}
private static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit, boolean isDistinct)
{
return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT && !isDistinct
? NONE
: new CQLLimits(cqlRowLimit, perPartitionLimit, isDistinct);
}
public static DataLimits groupByLimits(int groupLimit,
int groupPerPartitionLimit,
int rowLimit,
AggregationSpecification groupBySpec)
{
return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
}
public static DataLimits distinctLimits(int cqlRowLimit)
{
return CQLLimits.distinct(cqlRowLimit);
}
public static DataLimits thriftLimits(int partitionLimit, int cellPerPartitionLimit)
{
return new ThriftLimits(partitionLimit, cellPerPartitionLimit);
}
public static DataLimits superColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit)
{
return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
}
public abstract Kind kind();
public abstract boolean isUnlimited();
public abstract boolean isDistinct();
public boolean isGroupByLimit()
{
return false;
}
public boolean isExhausted(Counter counter)
{
return counter.counted() < count();
}
public abstract DataLimits forPaging(int pageSize);
public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
public abstract DataLimits forShortReadRetry(int toFetch);
/**
* Creates a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries.
*
* @param state the <code>GroupMaker</code> state
* @return a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries
*/
public DataLimits forGroupByInternalPaging(GroupingState state)
{
throw new UnsupportedOperationException();
}
public abstract boolean hasEnoughLiveData(CachedPartition cached,
int nowInSec,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness);
/**
* Returns a new {@code Counter} for this limits.
*
* @param nowInSec the current time in second (to decide what is expired or not).
* @param assumeLiveData if true, the counter will assume that every row passed is live and won't
* thus check for liveness, otherwise it will. This should be {@code true} when used on a
* {@code RowIterator} (since it only returns live rows), false otherwise.
* @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted
* as 1 valid row.
* @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
* normally retrieved from {@link CFMetaData#enforceStrictLiveness()}
* @return a new {@code Counter} for this limits.
*/
public abstract Counter newCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness);
/**
* The max number of results this limits enforces.
* <p>
* Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for
* thrift, it means cells.
*
* @return the maximum number of results this limits enforces.
*/
public abstract int count();
public abstract int perPartitionCount();
/**
* Returns equivalent limits but where any internal state kept to track where we are of paging and/or grouping is
* discarded.
*/
public abstract DataLimits withoutState();
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return this.newCounter(nowInSec,
false,
countPartitionsWithOnlyStaticData,
iter.metadata().enforceStrictLiveness())
.applyTo(iter);
}
public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return this.newCounter(nowInSec,
false,
countPartitionsWithOnlyStaticData,
iter.metadata().enforceStrictLiveness())
.applyTo(iter);
}
public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData, enforceStrictLiveness).applyTo(iter);
}
/**
* Estimate the number of results (the definition of "results" will be rows for CQL queries
* and partitions for thrift ones) that a full scan of the provided cfs would yield.
*/
public abstract float estimateTotalResults(ColumnFamilyStore cfs);
public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
{
protected final int nowInSec;
protected final boolean assumeLiveData;
private final boolean enforceStrictLiveness;
// false means we do not propagate our stop signals onto the iterator, we only count
private boolean enforceLimits = true;
protected Counter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
{
this.nowInSec = nowInSec;
this.assumeLiveData = assumeLiveData;
this.enforceStrictLiveness = enforceStrictLiveness;
}
public Counter onlyCount()
{
this.enforceLimits = false;
return this;
}
public PartitionIterator applyTo(PartitionIterator partitions)
{
return Transformation.apply(partitions, this);
}
public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions)
{
return Transformation.apply(partitions, this);
}
public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
{
return (UnfilteredRowIterator) applyToPartition(partition);
}
public RowIterator applyTo(RowIterator partition)
{
return (RowIterator) applyToPartition(partition);
}
/**
* The number of results counted.
* <p>
* Note that the definition of "results" should be the same that for {@link #count}.
*
* @return the number of results counted.
*/
public abstract int counted();
public abstract int countedInCurrentPartition();
/**
* The number of rows counted.
*
* @return the number of rows counted.
*/
public abstract int rowCounted();
/**
* The number of rows counted in the current partition.
*
* @return the number of rows counted in the current partition.
*/
public abstract int rowCountedInCurrentPartition();
public abstract boolean isDone();
public abstract boolean isDoneForPartition();
protected boolean isLive(Row row)
{
return assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness);
}
@Override
protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
{
return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this)
: Transformation.apply((RowIterator) partition, this);
}
// called before we process a given partition
protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow);
@Override
protected void attachTo(BasePartitions partitions)
{
if (enforceLimits)
super.attachTo(partitions);
if (isDone())
stop();
}
@Override
protected void attachTo(BaseRows rows)
{
if (enforceLimits)
super.attachTo(rows);
applyToPartition(rows.partitionKey(), rows.staticRow());
if (isDoneForPartition())
stopInPartition();
}
@Override
public void onClose()
{
super.onClose();
}
}
/**
* Limits used by CQL; this counts rows.
*/
private static class CQLLimits extends DataLimits
{
protected final int rowLimit;
protected final int perPartitionLimit;
// Whether the query is a distinct query or not.
protected final boolean isDistinct;
private CQLLimits(int rowLimit)
{
this(rowLimit, NO_LIMIT);
}
private CQLLimits(int rowLimit, int perPartitionLimit)
{
this(rowLimit, perPartitionLimit, false);
}
private CQLLimits(int rowLimit, int perPartitionLimit, boolean isDistinct)
{
this.rowLimit = rowLimit;
this.perPartitionLimit = perPartitionLimit;
this.isDistinct = isDistinct;
}
private static CQLLimits distinct(int rowLimit)
{
return new CQLLimits(rowLimit, 1, true);
}
public Kind kind()
{
return Kind.CQL_LIMIT;
}
public boolean isUnlimited()
{
return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
}
public boolean isDistinct()
{
return isDistinct;
}
public DataLimits forPaging(int pageSize)
{
return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
}
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
return new CQLPagingLimits(pageSize, perPartitionLimit, isDistinct, lastReturnedKey, lastReturnedKeyRemaining);
}
public DataLimits forShortReadRetry(int toFetch)
{
return new CQLLimits(toFetch, perPartitionLimit, isDistinct);
}
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
// We want the number of row that are currently live. Getting that precise number forces
// us to iterate the cached partition in general, but we can avoid that if:
// - The number of rows with at least one non-expiring cell is greater than what we ask,
// in which case we know we have enough live.
// - The number of rows is less than requested, in which case we know we won't have enough.
if (cached.rowsWithNonExpiringCells() >= rowLimit)
return true;
if (cached.rowCount() < rowLimit)
return false;
// Otherwise, we need to re-count
DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
UnfilteredRowIterator iter = counter.applyTo(cacheIter))
{
// Consume the iterator until we've counted enough
while (iter.hasNext())
iter.next();
return counter.isDone();
}
}
public Counter newCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
public int count()
{
return rowLimit;
}
public int perPartitionCount()
{
return perPartitionLimit;
}
public DataLimits withoutState()
{
return this;
}
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
// is what getMeanColumns returns)
float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return rowsPerPartition * (cfs.estimateKeys());
}
protected class CQLCounter extends Counter
{
protected int rowCounted;
protected int rowInCurrentPartition;
protected final boolean countPartitionsWithOnlyStaticData;
protected boolean hasLiveStaticRow;
public CQLCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, enforceStrictLiveness);
this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData;
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
}
@Override
public Row applyToRow(Row row)
{
if (isLive(row))
incrementRowCount();
return row;
}
@Override
public void onPartitionClose()
{
// Normally, we don't count static rows as from a CQL point of view, it will be merge with other
// rows in the partition. However, if we only have the static row, it will be returned as one row
// so count it.
if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowInCurrentPartition == 0)
incrementRowCount();
super.onPartitionClose();
}
protected void incrementRowCount()
{
if (++rowCounted >= rowLimit)
stop();
if (++rowInCurrentPartition >= perPartitionLimit)
stopInPartition();
}
public int counted()
{
return rowCounted;
}
public int countedInCurrentPartition()
{
return rowInCurrentPartition;
}
public int rowCounted()
{
return rowCounted;
}
public int rowCountedInCurrentPartition()
{
return rowInCurrentPartition;
}
public boolean isDone()
{
return rowCounted >= rowLimit;
}
public boolean isDoneForPartition()
{
return isDone() || rowInCurrentPartition >= perPartitionLimit;
}
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
if (rowLimit != NO_LIMIT)
{
sb.append("LIMIT ").append(rowLimit);
if (perPartitionLimit != NO_LIMIT)
sb.append(' ');
}
if (perPartitionLimit != NO_LIMIT)
sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
return sb.toString();
}
}
private static class CQLPagingLimits extends CQLLimits
{
private final ByteBuffer lastReturnedKey;
private final int lastReturnedKeyRemaining;
public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
super(rowLimit, perPartitionLimit, isDistinct);
this.lastReturnedKey = lastReturnedKey;
this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
}
@Override
public Kind kind()
{
return Kind.CQL_PAGING_LIMIT;
}
@Override
public DataLimits forPaging(int pageSize)
{
throw new UnsupportedOperationException();
}
@Override
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
throw new UnsupportedOperationException();
}
@Override
public DataLimits withoutState()
{
return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
}
@Override
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
private class PagingAwareCounter extends CQLCounter
{
private PagingAwareCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
if (partitionKey.getKey().equals(lastReturnedKey))
{
rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining;
// lastReturnedKey is the last key for which we're returned rows in the first page.
// So, since we know we have returned rows, we know we have accounted for the static row
// if any already, so force hasLiveStaticRow to false so we make sure to not count it
// once more.
hasLiveStaticRow = false;
}
else
{
super.applyToPartition(partitionKey, staticRow);
}
}
}
}
/**
* <code>CQLLimits</code> used for GROUP BY queries or queries with aggregates.
* <p>Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence,
* the limits keep track of the number of rows as well as the number of groups.</p>
* <p>A group can only be counted if the next group or the end of the data is reached.</p>
*/
private static class CQLGroupByLimits extends CQLLimits
{
/**
* The <code>GroupMaker</code> state
*/
protected final GroupingState state;
/**
* The GROUP BY specification
*/
protected final AggregationSpecification groupBySpec;
/**
* The limit on the number of groups
*/
protected final int groupLimit;
/**
* The limit on the number of groups per partition
*/
protected final int groupPerPartitionLimit;
public CQLGroupByLimits(int groupLimit,
int groupPerPartitionLimit,
int rowLimit,
AggregationSpecification groupBySpec)
{
this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE);
}
private CQLGroupByLimits(int groupLimit,
int groupPerPartitionLimit,
int rowLimit,
AggregationSpecification groupBySpec,
GroupingState state)
{
super(rowLimit, NO_LIMIT, false);
this.groupLimit = groupLimit;
this.groupPerPartitionLimit = groupPerPartitionLimit;
this.groupBySpec = groupBySpec;
this.state = state;
}
@Override
public Kind kind()
{
return Kind.CQL_GROUP_BY_LIMIT;
}
@Override
public boolean isGroupByLimit()
{
return true;
}
public boolean isUnlimited()
{
return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT;
}
public DataLimits forShortReadRetry(int toFetch)
{
return new CQLLimits(toFetch);
}
@Override
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// For the moment, we return the estimated number of rows as we have no good way of estimating
// the number of groups that will be returned. Hopefully, we should be able to fix
// that problem at some point.
return super.estimateTotalResults(cfs);
}
@Override
public DataLimits forPaging(int pageSize)
{
return new CQLGroupByLimits(pageSize,
groupPerPartitionLimit,
rowLimit,
groupBySpec,
state);
}
@Override
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
return new CQLGroupByPagingLimits(pageSize,
groupPerPartitionLimit,
rowLimit,
groupBySpec,
state,
lastReturnedKey,
lastReturnedKeyRemaining);
}
@Override
public DataLimits forGroupByInternalPaging(GroupingState state)
{
return new CQLGroupByLimits(rowLimit,
groupPerPartitionLimit,
rowLimit,
groupBySpec,
state);
}
@Override
public Counter newCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
@Override
public int count()
{
return groupLimit;
}
@Override
public int perPartitionCount()
{
return groupPerPartitionLimit;
}
@Override
public DataLimits withoutState()
{
return state == GroupingState.EMPTY_STATE
? this
: new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
if (groupLimit != NO_LIMIT)
{
sb.append("GROUP LIMIT ").append(groupLimit);
if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT)
sb.append(' ');
}
if (groupPerPartitionLimit != NO_LIMIT)
{
sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit);
if (rowLimit != NO_LIMIT)
sb.append(' ');
}
if (rowLimit != NO_LIMIT)
{
sb.append("LIMIT ").append(rowLimit);
}
return sb.toString();
}
@Override
public boolean isExhausted(Counter counter)
{
return ((GroupByAwareCounter) counter).rowCounted < rowLimit
&& counter.counted() < groupLimit;
}
protected class GroupByAwareCounter extends Counter
{
private final GroupMaker groupMaker;
protected final boolean countPartitionsWithOnlyStaticData;
/**
* The key of the partition being processed.
*/
protected DecoratedKey currentPartitionKey;
/**
* The number of rows counted so far.
*/
protected int rowCounted;
/**
* The number of rows counted so far in the current partition.
*/
protected int rowCountedInCurrentPartition;
/**
* The number of groups counted so far. A group is counted only once it is complete
* (e.g the next one has been reached).
*/
protected int groupCounted;
/**
* The number of groups in the current partition.
*/
protected int groupInCurrentPartition;
protected boolean hasGroupStarted;
protected boolean hasLiveStaticRow;
protected boolean hasReturnedRowsFromCurrentPartition;
private GroupByAwareCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, enforceStrictLiveness);
this.groupMaker = groupBySpec.newGroupMaker(state);
this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData;
// If the end of the partition was reached at the same time than the row limit, the last group might
// not have been counted yet. Due to that we need to guess, based on the state, if the previous group
// is still open.
hasGroupStarted = state.hasClustering();
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
if (partitionKey.getKey().equals(state.partitionKey()))
{
// The only case were we could have state.partitionKey() equals to the partition key
// is if some of the partition rows have been returned in the previous page but the
// partition was not exhausted (as the state partition key has not been updated yet).
// Since we know we have returned rows, we know we have accounted for
// the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it
// once more.
hasLiveStaticRow = false;
hasReturnedRowsFromCurrentPartition = true;
hasGroupStarted = true;
}
else
{
// We need to increment our count of groups if we have reached a new one and unless we had no new
// content added since we closed our last group (that is, if hasGroupStarted). Note that we may get
// here with hasGroupStarted == false in the following cases:
// * the partition limit was reached for the previous partition
// * the previous partition was containing only one static row
// * the rows of the last group of the previous partition were all marked as deleted
if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING))
{
incrementGroupCount();
// If we detect, before starting the new partition, that we are done, we need to increase
// the per partition group count of the previous partition as the next page will start from
// there.
if (isDone())
incrementGroupInCurrentPartitionCount();
hasGroupStarted = false;
}
hasReturnedRowsFromCurrentPartition = false;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
}
currentPartitionKey = partitionKey;
// If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition
// because the pager need to retrieve the count associated to the last value it has returned.
if (!isDone())
{
groupInCurrentPartition = 0;
rowCountedInCurrentPartition = 0;
}
}
@Override
protected Row applyToStatic(Row row)
{
// It's possible that we're "done" if the partition we just started bumped the number of groups (in
// applyToPartition() above), in which case Transformation will still call this method. In that case, we
// want to ignore the static row, it should (and will) be returned with the next page/group if needs be.
if (isDone())
{
hasLiveStaticRow = false; // The row has not been returned
return Rows.EMPTY_STATIC_ROW;
}
return row;
}
@Override
public Row applyToRow(Row row)
{
// We want to check if the row belongs to a new group even if it has been deleted. The goal being
// to minimize the chances of having to go through the same data twice if we detect on the next
// non deleted row that we have reached the limit.
if (groupMaker.isNewGroup(currentPartitionKey, row.clustering()))
{
if (hasGroupStarted)
{
incrementGroupCount();
incrementGroupInCurrentPartitionCount();
}
hasGroupStarted = false;
}
// That row may have made us increment the group count, which may mean we're done for this partition, in
// which case we shouldn't count this row (it won't be returned).
if (isDoneForPartition())
{
hasGroupStarted = false;
return null;
}
if (isLive(row))
{
hasGroupStarted = true;
incrementRowCount();
hasReturnedRowsFromCurrentPartition = true;
}
return row;
}
@Override
public int counted()
{
return groupCounted;
}
@Override
public int countedInCurrentPartition()
{
return groupInCurrentPartition;
}
@Override
public int rowCounted()
{
return rowCounted;
}
@Override
public int rowCountedInCurrentPartition()
{
return rowCountedInCurrentPartition;
}
protected void incrementRowCount()
{
rowCountedInCurrentPartition++;
if (++rowCounted >= rowLimit)
stop();
}
private void incrementGroupCount()
{
groupCounted++;
if (groupCounted >= groupLimit)
stop();
}
private void incrementGroupInCurrentPartitionCount()
{
groupInCurrentPartition++;
if (groupInCurrentPartition >= groupPerPartitionLimit)
stopInPartition();
}
@Override
public boolean isDoneForPartition()
{
return isDone() || groupInCurrentPartition >= groupPerPartitionLimit;
}
@Override
public boolean isDone()
{
return groupCounted >= groupLimit;
}
@Override
public void onPartitionClose()
{
// Normally, we don't count static rows as from a CQL point of view, it will be merge with other
// rows in the partition. However, if we only have the static row, it will be returned as one group
// so count it.
if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition)
{
incrementRowCount();
incrementGroupCount();
incrementGroupInCurrentPartitionCount();
hasGroupStarted = false;
}
super.onPartitionClose();
}
@Override
public void onClose()
{
// Groups are only counted when the end of the group is reached.
// The end of a group is detected by 2 ways:
// 1) a new group is reached
// 2) the end of the data is reached
// We know that the end of the data is reached if the group limit has not been reached
// and the number of rows counted is smaller than the internal page size.
if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit)
{
incrementGroupCount();
incrementGroupInCurrentPartitionCount();
}
super.onClose();
}
}
}
private static class CQLGroupByPagingLimits extends CQLGroupByLimits
{
private final ByteBuffer lastReturnedKey;
private final int lastReturnedKeyRemaining;
public CQLGroupByPagingLimits(int groupLimit,
int groupPerPartitionLimit,
int rowLimit,
AggregationSpecification groupBySpec,
GroupingState state,
ByteBuffer lastReturnedKey,
int lastReturnedKeyRemaining)
{
super(groupLimit,
groupPerPartitionLimit,
rowLimit,
groupBySpec,
state);
this.lastReturnedKey = lastReturnedKey;
this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
}
@Override
public Kind kind()
{
return Kind.CQL_GROUP_BY_PAGING_LIMIT;
}
@Override
public DataLimits forPaging(int pageSize)
{
throw new UnsupportedOperationException();
}
@Override
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
throw new UnsupportedOperationException();
}
@Override
public DataLimits forGroupByInternalPaging(GroupingState state)
{
throw new UnsupportedOperationException();
}
@Override
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey());
return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
@Override
public DataLimits withoutState()
{
return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
}
private class PagingGroupByAwareCounter extends GroupByAwareCounter
{
private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
if (partitionKey.getKey().equals(lastReturnedKey))
{
currentPartitionKey = partitionKey;
groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining;
hasReturnedRowsFromCurrentPartition = true;
hasLiveStaticRow = false;
hasGroupStarted = state.hasClustering();
}
else
{
super.applyToPartition(partitionKey, staticRow);
}
}
}
}
/**
* Limits used by thrift; this count partition and cells.
*/
private static class ThriftLimits extends DataLimits
{
protected final int partitionLimit;
protected final int cellPerPartitionLimit;
private ThriftLimits(int partitionLimit, int cellPerPartitionLimit)
{
this.partitionLimit = partitionLimit;
this.cellPerPartitionLimit = cellPerPartitionLimit;
}
public Kind kind()
{
return Kind.THRIFT_LIMIT;
}
public boolean isUnlimited()
{
return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
}
public boolean isDistinct()
{
return false;
}
public DataLimits forPaging(int pageSize)
{
// We don't support paging on thrift in general but do use paging under the hood for get_count. For
// that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single
// partition). We do check that the partition limit is 1 however to make sure this is not misused
// (as this wouldn't work properly for range queries).
assert partitionLimit == 1;
return new ThriftLimits(partitionLimit, pageSize);
}
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
throw new UnsupportedOperationException();
}
public DataLimits forShortReadRetry(int toFetch)
{
// Short read retries are always done for a single partition at a time, so it's ok to ignore the
// partition limit for those
return new ThriftLimits(1, toFetch);
}
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
// We want the number of cells that are currently live. Getting that precise number forces
// us to iterate the cached partition in general, but we can avoid that if:
// - The number of non-expiring live cells is greater than the number of cells asked (we then
// know we have enough live cells).
// - The number of cells cached is less than requested, in which case we know we won't have enough.
if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit)
return true;
if (cached.nonTombstoneCellCount() < cellPerPartitionLimit)
return false;
// Otherwise, we need to re-count
DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
UnfilteredRowIterator iter = counter.applyTo(cacheIter))
{
// Consume the iterator until we've counted enough
while (iter.hasNext())
iter.next();
return counter.isDone();
}
}
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new ThriftCounter(nowInSec, assumeLiveData, enforceStrictLiveness);
}
public int count()
{
return partitionLimit * cellPerPartitionLimit;
}
public int perPartitionCount()
{
return cellPerPartitionLimit;
}
public DataLimits withoutState()
{
return this;
}
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// remember that getMeansColumns returns a number of cells: we should clean nomenclature
float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return cellsPerPartition * cfs.estimateKeys();
}
protected class ThriftCounter extends Counter
{
protected int partitionsCounted;
protected int cellsCounted;
protected int cellsInCurrentPartition;
public ThriftCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, enforceStrictLiveness);
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
cellsInCurrentPartition = 0;
if (!staticRow.isEmpty())
applyToRow(staticRow);
}
@Override
public Row applyToRow(Row row)
{
for (Cell cell : row.cells())
{
if (assumeLiveData || cell.isLive(nowInSec))
{
++cellsCounted;
if (++cellsInCurrentPartition >= cellPerPartitionLimit)
stopInPartition();
}
}
return row;
}
@Override
public void onPartitionClose()
{
if (++partitionsCounted >= partitionLimit)
stop();
super.onPartitionClose();
}
public int counted()
{
return cellsCounted;
}
public int countedInCurrentPartition()
{
return cellsInCurrentPartition;
}
public int rowCounted()
{
throw new UnsupportedOperationException();
}
public int rowCountedInCurrentPartition()
{
throw new UnsupportedOperationException();
}
public boolean isDone()
{
return partitionsCounted >= partitionLimit;
}
public boolean isDoneForPartition()
{
return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit;
}
}
@Override
public String toString()
{
// This is not valid CQL, but that's ok since it's not used for CQL queries.
return String.format("THRIFT LIMIT (partitions=%d, cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit);
}
}
/**
* Limits used for thrift get_count when we only want to count super columns.
*/
private static class SuperColumnCountingLimits extends ThriftLimits
{
private SuperColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit)
{
super(partitionLimit, cellPerPartitionLimit);
}
public Kind kind()
{
return Kind.SUPER_COLUMN_COUNTING_LIMIT;
}
public DataLimits forPaging(int pageSize)
{
// We don't support paging on thrift in general but do use paging under the hood for get_count. For
// that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single
// partition). We do check that the partition limit is 1 however to make sure this is not misused
// (as this wouldn't work properly for range queries).
assert partitionLimit == 1;
return new SuperColumnCountingLimits(partitionLimit, pageSize);
}
public DataLimits forShortReadRetry(int toFetch)
{
// Short read retries are always done for a single partition at a time, so it's ok to ignore the
// partition limit for those
return new SuperColumnCountingLimits(1, toFetch);
}
@Override
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new SuperColumnCountingCounter(nowInSec, assumeLiveData, enforceStrictLiveness);
}
protected class SuperColumnCountingCounter extends ThriftCounter
{
private final boolean enforceStrictLiveness;
public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, enforceStrictLiveness);
this.enforceStrictLiveness = enforceStrictLiveness;
}
@Override
public Row applyToRow(Row row)
{
// In the internal format, a row == a super column, so that's what we want to count.
if (isLive(row))
{
++cellsCounted;
if (++cellsInCurrentPartition >= cellPerPartitionLimit)
stopInPartition();
}
return row;
}
}
}
public static class Serializer
{
public void serialize(DataLimits limits, DataOutputPlus out, int version, ClusteringComparator comparator) throws IOException
{
out.writeByte(limits.kind().ordinal());
switch (limits.kind())
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
CQLLimits cqlLimits = (CQLLimits)limits;
out.writeUnsignedVInt(cqlLimits.rowLimit);
out.writeUnsignedVInt(cqlLimits.perPartitionLimit);
out.writeBoolean(cqlLimits.isDistinct);
if (limits.kind() == Kind.CQL_PAGING_LIMIT)
{
CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case CQL_GROUP_BY_LIMIT:
case CQL_GROUP_BY_PAGING_LIMIT:
CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits;
out.writeUnsignedVInt(groupByLimits.groupLimit);
out.writeUnsignedVInt(groupByLimits.groupPerPartitionLimit);
out.writeUnsignedVInt(groupByLimits.rowLimit);
AggregationSpecification groupBySpec = groupByLimits.groupBySpec;
AggregationSpecification.serializer.serialize(groupBySpec, out, version);
GroupingState.serializer.serialize(groupByLimits.state, out, version, comparator);
if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT)
{
CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits;
ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
ThriftLimits thriftLimits = (ThriftLimits)limits;
out.writeUnsignedVInt(thriftLimits.partitionLimit);
out.writeUnsignedVInt(thriftLimits.cellPerPartitionLimit);
break;
}
}
public DataLimits deserialize(DataInputPlus in, int version, ClusteringComparator comparator) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedByte()];
switch (kind)
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
{
int rowLimit = (int) in.readUnsignedVInt();
int perPartitionLimit = (int) in.readUnsignedVInt();
boolean isDistinct = in.readBoolean();
if (kind == Kind.CQL_LIMIT)
return cqlLimits(rowLimit, perPartitionLimit, isDistinct);
ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
int lastRemaining = (int) in.readUnsignedVInt();
return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining);
}
case CQL_GROUP_BY_LIMIT:
case CQL_GROUP_BY_PAGING_LIMIT:
{
int groupLimit = (int) in.readUnsignedVInt();
int groupPerPartitionLimit = (int) in.readUnsignedVInt();
int rowLimit = (int) in.readUnsignedVInt();
AggregationSpecification groupBySpec = AggregationSpecification.serializer.deserialize(in, version, comparator);
GroupingState state = GroupingState.serializer.deserialize(in, version, comparator);
if (kind == Kind.CQL_GROUP_BY_LIMIT)
return new CQLGroupByLimits(groupLimit,
groupPerPartitionLimit,
rowLimit,
groupBySpec,
state);
ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
int lastRemaining = (int) in.readUnsignedVInt();
return new CQLGroupByPagingLimits(groupLimit,
groupPerPartitionLimit,
rowLimit,
groupBySpec,
state,
lastKey,
lastRemaining);
}
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
int partitionLimit = (int) in.readUnsignedVInt();
int cellPerPartitionLimit = (int) in.readUnsignedVInt();
return kind == Kind.THRIFT_LIMIT
? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
: new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
}
throw new AssertionError();
}
public long serializedSize(DataLimits limits, int version, ClusteringComparator comparator)
{
long size = TypeSizes.sizeof((byte) limits.kind().ordinal());
switch (limits.kind())
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
CQLLimits cqlLimits = (CQLLimits) limits;
size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit);
size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit);
size += TypeSizes.sizeof(cqlLimits.isDistinct);
if (limits.kind() == Kind.CQL_PAGING_LIMIT)
{
CQLPagingLimits pagingLimits = (CQLPagingLimits) cqlLimits;
size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case CQL_GROUP_BY_LIMIT:
case CQL_GROUP_BY_PAGING_LIMIT:
CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits;
size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupLimit);
size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupPerPartitionLimit);
size += TypeSizes.sizeofUnsignedVInt(groupByLimits.rowLimit);
AggregationSpecification groupBySpec = groupByLimits.groupBySpec;
size += AggregationSpecification.serializer.serializedSize(groupBySpec, version);
size += GroupingState.serializer.serializedSize(groupByLimits.state, version, comparator);
if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT)
{
CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits;
size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
ThriftLimits thriftLimits = (ThriftLimits) limits;
size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit);
size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit);
break;
default:
throw new AssertionError();
}
return size;
}
}
}