blob: fa9d47abd08d543af21c67403206330005f854d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.filter;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.transform.BasePartitions;
import org.apache.cassandra.db.transform.BaseRows;
import org.apache.cassandra.db.transform.StoppingTransformation;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Object in charge of tracking if we have fetch enough data for a given query.
*
* The reason this is not just a simple integer is that Thrift and CQL3 count
* stuffs in different ways. This is what abstract those differences.
*/
public abstract class DataLimits
{
public static final Serializer serializer = new Serializer();
public static final int NO_LIMIT = Integer.MAX_VALUE;
public static final DataLimits NONE = new CQLLimits(NO_LIMIT)
{
@Override
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return false;
}
@Override
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return iter;
}
@Override
public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return iter;
}
@Override
public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return iter;
}
};
// We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per
// partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true);
public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
public static DataLimits cqlLimits(int cqlRowLimit)
{
return new CQLLimits(cqlRowLimit);
}
// mixed mode partition range scans on compact storage tables without clustering columns coordinated by 2.x are
// returned as one (cql) row per cell, but we need to count each partition as a single row. So we just return a
// CQLLimits instance that doesn't count rows towards it's limit. See CASSANDRA-15072
public static DataLimits legacyCompactStaticCqlLimits(int cqlRowLimits)
{
return new CQLLimits(cqlRowLimits) {
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness) {
public Row applyToRow(Row row)
{
// noop: only count full partitions
return row;
}
};
}
};
}
public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit)
{
return new CQLLimits(cqlRowLimit, perPartitionLimit);
}
public static DataLimits distinctLimits(int cqlRowLimit)
{
return CQLLimits.distinct(cqlRowLimit);
}
public static DataLimits thriftLimits(int partitionLimit, int cellPerPartitionLimit)
{
return new ThriftLimits(partitionLimit, cellPerPartitionLimit);
}
public static DataLimits superColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit)
{
return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
}
public abstract Kind kind();
public abstract boolean isUnlimited();
public abstract boolean isDistinct();
public abstract DataLimits forPaging(int pageSize);
public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
public abstract DataLimits forShortReadRetry(int toFetch);
public abstract boolean hasEnoughLiveData(CachedPartition cached,
int nowInSec,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness);
/**
* Returns a new {@code Counter} for this limits.
*
* @param nowInSec the current time in second (to decide what is expired or not).
* @param assumeLiveData if true, the counter will assume that every row passed is live and won't
* thus check for liveness, otherwise it will. This should be {@code true} when used on a
* {@code RowIterator} (since it only returns live rows), false otherwise.
* @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted
* as 1 valid row.
* @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info,
* normally retrieved from {@link CFMetaData#enforceStrictLiveness()}
* @return a new {@code Counter} for this limits.
*/
public abstract Counter newCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness);
/**
* The max number of results this limits enforces.
* <p>
* Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for
* thrift, it means cells.
*
* @return the maximum number of results this limits enforces.
*/
public abstract int count();
public abstract int perPartitionCount();
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return this.newCounter(nowInSec,
false,
countPartitionsWithOnlyStaticData,
iter.metadata().enforceStrictLiveness())
.applyTo(iter);
}
public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
int nowInSec,
boolean countPartitionsWithOnlyStaticData)
{
return this.newCounter(nowInSec,
false,
countPartitionsWithOnlyStaticData,
iter.metadata().enforceStrictLiveness())
.applyTo(iter);
}
public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData, enforceStrictLiveness).applyTo(iter);
}
/**
* Estimate the number of results (the definition of "results" will be rows for CQL queries
* and partitions for thrift ones) that a full scan of the provided cfs would yield.
*/
public abstract float estimateTotalResults(ColumnFamilyStore cfs);
public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
{
// false means we do not propagate our stop signals onto the iterator, we only count
private boolean enforceLimits = true;
public Counter onlyCount()
{
this.enforceLimits = false;
return this;
}
public PartitionIterator applyTo(PartitionIterator partitions)
{
return Transformation.apply(partitions, this);
}
public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions)
{
return Transformation.apply(partitions, this);
}
public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition)
{
return (UnfilteredRowIterator) applyToPartition(partition);
}
public RowIterator applyTo(RowIterator partition)
{
return (RowIterator) applyToPartition(partition);
}
/**
* The number of results counted.
* <p>
* Note that the definition of "results" should be the same that for {@link #count}.
*
* @return the number of results counted.
*/
public abstract int counted();
public abstract int countedInCurrentPartition();
public abstract boolean isDone();
public abstract boolean isDoneForPartition();
@Override
protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
{
return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this)
: Transformation.apply((RowIterator) partition, this);
}
// called before we process a given partition
protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow);
@Override
protected void attachTo(BasePartitions partitions)
{
if (enforceLimits)
super.attachTo(partitions);
if (isDone())
stop();
}
@Override
protected void attachTo(BaseRows rows)
{
if (enforceLimits)
super.attachTo(rows);
applyToPartition(rows.partitionKey(), rows.staticRow());
if (isDoneForPartition())
stopInPartition();
}
}
/**
* Limits used by CQL; this counts rows.
*/
private static class CQLLimits extends DataLimits
{
protected final int rowLimit;
protected final int perPartitionLimit;
// Whether the query is a distinct query or not.
protected final boolean isDistinct;
private CQLLimits(int rowLimit)
{
this(rowLimit, NO_LIMIT);
}
private CQLLimits(int rowLimit, int perPartitionLimit)
{
this(rowLimit, perPartitionLimit, false);
}
private CQLLimits(int rowLimit, int perPartitionLimit, boolean isDistinct)
{
this.rowLimit = rowLimit;
this.perPartitionLimit = perPartitionLimit;
this.isDistinct = isDistinct;
}
private static CQLLimits distinct(int rowLimit)
{
return new CQLLimits(rowLimit, 1, true);
}
public Kind kind()
{
return Kind.CQL_LIMIT;
}
public boolean isUnlimited()
{
return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
}
public boolean isDistinct()
{
return isDistinct;
}
public DataLimits forPaging(int pageSize)
{
return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
}
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
return new CQLPagingLimits(pageSize, perPartitionLimit, isDistinct, lastReturnedKey, lastReturnedKeyRemaining);
}
public DataLimits forShortReadRetry(int toFetch)
{
return new CQLLimits(toFetch, perPartitionLimit, isDistinct);
}
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
// We want the number of row that are currently live. Getting that precise number forces
// us to iterate the cached partition in general, but we can avoid that if:
// - The number of rows with at least one non-expiring cell is greater than what we ask,
// in which case we know we have enough live.
// - The number of rows is less than requested, in which case we know we won't have enough.
if (cached.rowsWithNonExpiringCells() >= rowLimit)
return true;
if (cached.rowCount() < rowLimit)
return false;
// Otherwise, we need to re-count
DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
UnfilteredRowIterator iter = counter.applyTo(cacheIter))
{
// Consume the iterator until we've counted enough
while (iter.hasNext())
iter.next();
return counter.isDone();
}
}
public Counter newCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
public int count()
{
return rowLimit;
}
public int perPartitionCount()
{
return perPartitionLimit;
}
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
// is what getMeanColumns returns)
float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return rowsPerPartition * (cfs.estimateKeys());
}
protected class CQLCounter extends Counter
{
protected final int nowInSec;
protected final boolean assumeLiveData;
protected final boolean countPartitionsWithOnlyStaticData;
protected int rowCounted;
protected int rowInCurrentPartition;
protected boolean hasLiveStaticRow;
private final boolean enforceStrictLiveness;
public CQLCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
this.nowInSec = nowInSec;
this.assumeLiveData = assumeLiveData;
this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData;
this.enforceStrictLiveness = enforceStrictLiveness;
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
rowInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec, enforceStrictLiveness));
}
@Override
public Row applyToRow(Row row)
{
if (assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness))
incrementRowCount();
return row;
}
@Override
public void onPartitionClose()
{
// Normally, we don't count static rows as from a CQL point of view, it will be merge with other
// rows in the partition. However, if we only have the static row, it will be returned as one row
// so count it.
if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowInCurrentPartition == 0)
incrementRowCount();
super.onPartitionClose();
}
private void incrementRowCount()
{
if (++rowCounted >= rowLimit)
stop();
if (++rowInCurrentPartition >= perPartitionLimit)
stopInPartition();
}
public int counted()
{
return rowCounted;
}
public int countedInCurrentPartition()
{
return rowInCurrentPartition;
}
public boolean isDone()
{
return rowCounted >= rowLimit;
}
public boolean isDoneForPartition()
{
return isDone() || rowInCurrentPartition >= perPartitionLimit;
}
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
if (rowLimit != NO_LIMIT)
{
sb.append("LIMIT ").append(rowLimit);
if (perPartitionLimit != NO_LIMIT)
sb.append(' ');
}
if (perPartitionLimit != NO_LIMIT)
sb.append("PER PARTITION LIMIT ").append(perPartitionLimit);
return sb.toString();
}
}
private static class CQLPagingLimits extends CQLLimits
{
private final ByteBuffer lastReturnedKey;
private final int lastReturnedKeyRemaining;
public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
super(rowLimit, perPartitionLimit, isDistinct);
this.lastReturnedKey = lastReturnedKey;
this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
}
@Override
public Kind kind()
{
return Kind.CQL_PAGING_LIMIT;
}
@Override
public DataLimits forPaging(int pageSize)
{
throw new UnsupportedOperationException();
}
@Override
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
throw new UnsupportedOperationException();
}
@Override
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
private class PagingAwareCounter extends CQLCounter
{
private PagingAwareCounter(int nowInSec,
boolean assumeLiveData,
boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
if (partitionKey.getKey().equals(lastReturnedKey))
{
rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining;
// lastReturnedKey is the last key for which we're returned rows in the first page.
// So, since we know we have returned rows, we know we have accounted for the static row
// if any already, so force hasLiveStaticRow to false so we make sure to not count it
// once more.
hasLiveStaticRow = false;
}
else
{
super.applyToPartition(partitionKey, staticRow);
}
}
}
}
/**
* Limits used by thrift; this count partition and cells.
*/
private static class ThriftLimits extends DataLimits
{
protected final int partitionLimit;
protected final int cellPerPartitionLimit;
private ThriftLimits(int partitionLimit, int cellPerPartitionLimit)
{
this.partitionLimit = partitionLimit;
this.cellPerPartitionLimit = cellPerPartitionLimit;
}
public Kind kind()
{
return Kind.THRIFT_LIMIT;
}
public boolean isUnlimited()
{
return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
}
public boolean isDistinct()
{
return false;
}
public DataLimits forPaging(int pageSize)
{
// We don't support paging on thrift in general but do use paging under the hood for get_count. For
// that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single
// partition). We do check that the partition limit is 1 however to make sure this is not misused
// (as this wouldn't work properly for range queries).
assert partitionLimit == 1;
return new ThriftLimits(partitionLimit, pageSize);
}
public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
{
throw new UnsupportedOperationException();
}
public DataLimits forShortReadRetry(int toFetch)
{
// Short read retries are always done for a single partition at a time, so it's ok to ignore the
// partition limit for those
return new ThriftLimits(1, toFetch);
}
public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
// We want the number of cells that are currently live. Getting that precise number forces
// us to iterate the cached partition in general, but we can avoid that if:
// - The number of non-expiring live cells is greater than the number of cells asked (we then
// know we have enough live cells).
// - The number of cells cached is less than requested, in which case we know we won't have enough.
if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit)
return true;
if (cached.nonTombstoneCellCount() < cellPerPartitionLimit)
return false;
// Otherwise, we need to re-count
DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false);
UnfilteredRowIterator iter = counter.applyTo(cacheIter))
{
// Consume the iterator until we've counted enough
while (iter.hasNext())
iter.next();
return counter.isDone();
}
}
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new ThriftCounter(nowInSec, assumeLiveData);
}
public int count()
{
return partitionLimit * cellPerPartitionLimit;
}
public int perPartitionCount()
{
return cellPerPartitionLimit;
}
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// remember that getMeansColumns returns a number of cells: we should clean nomenclature
float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return cellsPerPartition * cfs.estimateKeys();
}
protected class ThriftCounter extends Counter
{
protected final int nowInSec;
protected final boolean assumeLiveData;
protected int partitionsCounted;
protected int cellsCounted;
protected int cellsInCurrentPartition;
public ThriftCounter(int nowInSec, boolean assumeLiveData)
{
this.nowInSec = nowInSec;
this.assumeLiveData = assumeLiveData;
}
@Override
public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
{
cellsInCurrentPartition = 0;
if (!staticRow.isEmpty())
applyToRow(staticRow);
}
@Override
public Row applyToRow(Row row)
{
for (Cell cell : row.cells())
{
if (assumeLiveData || cell.isLive(nowInSec))
{
++cellsCounted;
if (++cellsInCurrentPartition >= cellPerPartitionLimit)
stopInPartition();
}
}
return row;
}
@Override
public void onPartitionClose()
{
if (++partitionsCounted >= partitionLimit)
stop();
super.onPartitionClose();
}
public int counted()
{
return cellsCounted;
}
public int countedInCurrentPartition()
{
return cellsInCurrentPartition;
}
public boolean isDone()
{
return partitionsCounted >= partitionLimit;
}
public boolean isDoneForPartition()
{
return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit;
}
}
@Override
public String toString()
{
// This is not valid CQL, but that's ok since it's not used for CQL queries.
return String.format("THRIFT LIMIT (partitions=%d, cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit);
}
}
/**
* Limits used for thrift get_count when we only want to count super columns.
*/
private static class SuperColumnCountingLimits extends ThriftLimits
{
private SuperColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit)
{
super(partitionLimit, cellPerPartitionLimit);
}
public Kind kind()
{
return Kind.SUPER_COLUMN_COUNTING_LIMIT;
}
public DataLimits forPaging(int pageSize)
{
// We don't support paging on thrift in general but do use paging under the hood for get_count. For
// that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single
// partition). We do check that the partition limit is 1 however to make sure this is not misused
// (as this wouldn't work properly for range queries).
assert partitionLimit == 1;
return new SuperColumnCountingLimits(partitionLimit, pageSize);
}
public DataLimits forShortReadRetry(int toFetch)
{
// Short read retries are always done for a single partition at a time, so it's ok to ignore the
// partition limit for those
return new SuperColumnCountingLimits(1, toFetch);
}
@Override
public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
{
return new SuperColumnCountingCounter(nowInSec, assumeLiveData, enforceStrictLiveness);
}
protected class SuperColumnCountingCounter extends ThriftCounter
{
private final boolean enforceStrictLiveness;
public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
{
super(nowInSec, assumeLiveData);
this.enforceStrictLiveness = enforceStrictLiveness;
}
@Override
public Row applyToRow(Row row)
{
// In the internal format, a row == a super column, so that's what we want to count.
if (assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness))
{
++cellsCounted;
if (++cellsInCurrentPartition >= cellPerPartitionLimit)
stopInPartition();
}
return row;
}
}
}
public static class Serializer
{
public void serialize(DataLimits limits, DataOutputPlus out, int version) throws IOException
{
out.writeByte(limits.kind().ordinal());
switch (limits.kind())
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
CQLLimits cqlLimits = (CQLLimits)limits;
out.writeUnsignedVInt(cqlLimits.rowLimit);
out.writeUnsignedVInt(cqlLimits.perPartitionLimit);
out.writeBoolean(cqlLimits.isDistinct);
if (limits.kind() == Kind.CQL_PAGING_LIMIT)
{
CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out);
out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
ThriftLimits thriftLimits = (ThriftLimits)limits;
out.writeUnsignedVInt(thriftLimits.partitionLimit);
out.writeUnsignedVInt(thriftLimits.cellPerPartitionLimit);
break;
}
}
public DataLimits deserialize(DataInputPlus in, int version) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedByte()];
switch (kind)
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
int rowLimit = (int)in.readUnsignedVInt();
int perPartitionLimit = (int)in.readUnsignedVInt();
boolean isDistinct = in.readBoolean();
if (kind == Kind.CQL_LIMIT)
return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
int lastRemaining = (int)in.readUnsignedVInt();
return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining);
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
int partitionLimit = (int)in.readUnsignedVInt();
int cellPerPartitionLimit = (int)in.readUnsignedVInt();
return kind == Kind.THRIFT_LIMIT
? new ThriftLimits(partitionLimit, cellPerPartitionLimit)
: new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
}
throw new AssertionError();
}
public long serializedSize(DataLimits limits, int version)
{
long size = TypeSizes.sizeof((byte)limits.kind().ordinal());
switch (limits.kind())
{
case CQL_LIMIT:
case CQL_PAGING_LIMIT:
CQLLimits cqlLimits = (CQLLimits)limits;
size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit);
size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit);
size += TypeSizes.sizeof(cqlLimits.isDistinct);
if (limits.kind() == Kind.CQL_PAGING_LIMIT)
{
CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits;
size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey);
size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining);
}
break;
case THRIFT_LIMIT:
case SUPER_COLUMN_COUNTING_LIMIT:
ThriftLimits thriftLimits = (ThriftLimits)limits;
size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit);
size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit);
break;
default:
throw new AssertionError();
}
return size;
}
}
}