blob: 1e6f8c8ab8e0c34c359e8e5d53ab4d286cc7c4dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import com.google.common.base.Objects;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.dht.*;
/**
* Groups key range and column filter for range queries.
*
* The main "trick" of this class is that the column filter can only
* be obtained by providing the row key on which the column filter will
* be applied (which we always know before actually querying the columns).
*
* This allows the paging DataRange to return a filter for most rows but a
* potentially different ones for the starting and stopping key. Could
* allow more fancy stuff in the future too, like column filters that
* depend on the actual key value :)
*/
public class DataRange
{
protected final AbstractBounds<RowPosition> keyRange;
protected IDiskAtomFilter columnFilter;
protected final boolean selectFullRow;
public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
{
this.keyRange = range;
this.columnFilter = columnFilter;
this.selectFullRow = columnFilter instanceof SliceQueryFilter
? isFullRowSlice((SliceQueryFilter)columnFilter)
: false;
}
public static boolean isFullRowSlice(SliceQueryFilter filter)
{
return filter.slices.length == 1
&& filter.start().isEmpty()
&& filter.finish().isEmpty()
&& filter.count == Integer.MAX_VALUE;
}
public static DataRange allData(IPartitioner partitioner)
{
return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
}
public static DataRange forTokenRange(Range<Token> keyRange)
{
return forKeyRange(Range.makeRowRange(keyRange));
}
public static DataRange forKeyRange(Range<RowPosition> keyRange)
{
return new DataRange(keyRange, new IdentityQueryFilter());
}
public AbstractBounds<RowPosition> keyRange()
{
return keyRange;
}
public RowPosition startKey()
{
return keyRange.left;
}
public RowPosition stopKey()
{
return keyRange.right;
}
/**
* Returns true if tombstoned partitions should not be included in results or count towards the limit.
* See CASSANDRA-8490 for more details on why this is needed (and done this way).
* */
public boolean ignoredTombstonedPartitions()
{
if (!(columnFilter instanceof SliceQueryFilter))
return false;
return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
}
// Whether the bounds of this DataRange actually wraps around.
public boolean isWrapAround()
{
// On range can ever wrap
return keyRange instanceof Range && ((Range<?>)keyRange).isWrapAround();
}
public boolean contains(RowPosition pos)
{
return keyRange.contains(pos);
}
public int getLiveCount(ColumnFamily data, long now)
{
return columnFilter instanceof SliceQueryFilter
? ((SliceQueryFilter)columnFilter).lastCounted()
: columnFilter.getLiveCount(data, now);
}
public boolean selectsFullRowFor(ByteBuffer rowKey)
{
return selectFullRow;
}
/**
* Returns a column filter that should be used for a particular row key. Note that in the case of paging,
* slice starts and ends may change depending on the row key.
*/
public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
{
return columnFilter;
}
/**
* Sets a new limit on the number of (grouped) cells to fetch. This is currently only used when the query limit applies
* to CQL3 rows.
*/
public void updateColumnsLimit(int count)
{
columnFilter.updateColumnsLimit(count);
}
public static class Paging extends DataRange
{
// The slice of columns that we want to fetch for each row, ignoring page start/end issues.
private final SliceQueryFilter sliceFilter;
private final CFMetaData cfm;
private final Comparator<Composite> comparator;
// used to restrict the start of the slice for the first partition in the range
private final Composite firstPartitionColumnStart;
// used to restrict the end of the slice for the last partition in the range
private final Composite lastPartitionColumnFinish;
// tracks the last key that we updated the filter for to avoid duplicating work
private ByteBuffer lastKeyFilterWasUpdatedFor;
private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite firstPartitionColumnStart,
Composite lastPartitionColumnFinish, CFMetaData cfm, Comparator<Composite> comparator)
{
super(range, filter);
// When using a paging range, we don't allow wrapped ranges, as it's unclear how to handle them properly.
// This is ok for now since we only need this in range slice queries, and the range are "unwrapped" in that case.
assert !(range instanceof Range) || !((Range<?>)range).isWrapAround() || range.right.isMinimum() : range;
this.sliceFilter = filter;
this.cfm = cfm;
this.comparator = comparator;
this.firstPartitionColumnStart = firstPartitionColumnStart;
this.lastPartitionColumnFinish = lastPartitionColumnFinish;
this.lastKeyFilterWasUpdatedFor = null;
}
public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CFMetaData cfm)
{
this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator() : cfm.comparator);
}
@Override
public boolean selectsFullRowFor(ByteBuffer rowKey)
{
// If we initial filter is not the full filter, don't bother
if (!selectFullRow)
return false;
if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
return true;
return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
}
private boolean equals(RowPosition pos, ByteBuffer rowKey)
{
return pos instanceof DecoratedKey && ((DecoratedKey)pos).getKey().equals(rowKey);
}
@Override
public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
{
/*
* We have that ugly hack that for slice queries, when we ask for
* the live count, we reach into the query filter to get the last
* counter number of columns to avoid recounting.
* Maybe we should just remove that hack, but in the meantime, we
* need to keep a reference the last returned filter.
*/
if (equals(startKey(), rowKey) || equals(stopKey(), rowKey))
{
if (!rowKey.equals(lastKeyFilterWasUpdatedFor))
{
this.lastKeyFilterWasUpdatedFor = rowKey;
columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey));
}
}
else
{
columnFilter = sliceFilter;
}
return columnFilter;
}
/** Returns true if the slice includes static columns, false otherwise. */
private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm)
{
return cfm.hasStaticColumns() &&
slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
}
private ColumnSlice[] slicesForKey(ByteBuffer key)
{
// Also note that firstPartitionColumnStart and lastPartitionColumnFinish, when used, only "restrict" the filter slices,
// it doesn't expand on them. As such, we can ignore the case where they are empty and we do
// as it screw up with the logic below (see #6592)
Composite newStart = equals(startKey(), key) && !firstPartitionColumnStart.isEmpty() ? firstPartitionColumnStart : null;
Composite newFinish = equals(stopKey(), key) && !lastPartitionColumnFinish.isEmpty() ? lastPartitionColumnFinish : null;
// in the common case, we'll have the same number of slices
List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length);
// Check our slices to see if any fall before the page start (in which case they can be removed) or
// if they contain the page start (in which case they should start from the page start). However, if the
// slices would include static columns, we need to ensure they are also fetched, and so a separate
// slice for the static columns may be required.
// Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
// the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
for (ColumnSlice slice : sliceFilter.slices)
{
if (newStart != null)
{
if (slice.isBefore(comparator, newStart))
{
if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm))
newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
continue;
}
if (slice.includes(comparator, newStart))
{
if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(Composites.EMPTY))
newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
slice = new ColumnSlice(newStart, slice.finish);
}
// once we see a slice that either includes the page start or is after it, we can stop checking
// against the page start (because the slices are ordered)
newStart = null;
}
assert newStart == null;
if (newFinish != null && !slice.isBefore(comparator, newFinish))
{
if (slice.includes(comparator, newFinish))
newSlices.add(new ColumnSlice(slice.start, newFinish));
// In any case, we're done
break;
}
newSlices.add(slice);
}
return newSlices.toArray(new ColumnSlice[newSlices.size()]);
}
@Override
public void updateColumnsLimit(int count)
{
columnFilter.updateColumnsLimit(count);
sliceFilter.updateColumnsLimit(count);
}
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("keyRange", keyRange)
.add("sliceFilter", sliceFilter)
.add("columnFilter", columnFilter)
.add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : cfm.comparator.getString(firstPartitionColumnStart))
.add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : cfm.comparator.getString(lastPartitionColumnFinish))
.toString();
}
}
}