blob: 822d8387c2a4ffe823d8dec109e5d64935f4d898 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.filter;
import java.nio.ByteBuffer;
import java.io.DataInput;
import java.io.IOException;
import java.util.*;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
public class SliceQueryFilter implements IDiskAtomFilter
{
private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class);
/**
* A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results
* or count towards the limit. See CASSANDRA-8490 for more details on why this is needed (and done this way).
**/
public static final int IGNORE_TOMBSTONED_PARTITIONS = -2;
public final ColumnSlice[] slices;
public final boolean reversed;
public volatile int count;
public final int compositesToGroup;
// Not serialized, just a ack for range slices to find the number of live column counted, even when we group
private ColumnCounter columnCounter;
public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count)
{
this(new ColumnSlice(start, finish), reversed, count);
}
public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count, int compositesToGroup)
{
this(new ColumnSlice(start, finish), reversed, count, compositesToGroup);
}
public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count)
{
this(new ColumnSlice[]{ slice }, reversed, count);
}
public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count, int compositesToGroup)
{
this(new ColumnSlice[]{ slice }, reversed, count, compositesToGroup);
}
/**
* Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or
* reversed).
*/
public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count)
{
this(slices, reversed, count, -1);
}
public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count, int compositesToGroup)
{
this.slices = slices;
this.reversed = reversed;
this.count = count;
this.compositesToGroup = compositesToGroup;
}
public SliceQueryFilter cloneShallow()
{
return new SliceQueryFilter(slices, reversed, count, compositesToGroup);
}
public SliceQueryFilter withUpdatedCount(int newCount)
{
return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup);
}
public SliceQueryFilter withUpdatedSlices(ColumnSlice[] newSlices)
{
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
}
/** Returns true if the slice includes static columns, false otherwise. */
private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm)
{
return cfm.hasStaticColumns() &&
slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
}
public boolean hasStaticSlice(CFMetaData cfm)
{
for (ColumnSlice slice : slices)
if (sliceIncludesStatics(slice, cfm))
return true;
return false;
}
/**
* Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the
* remainder of the normal data.
*
* This should only be called when the filter is reversed and the filter is known to cover static columns (through
* hasStaticSlice()).
*
* @return a pair of (static, normal) SliceQueryFilters
*/
public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm)
{
assert reversed;
Composite staticSliceEnd = cfm.comparator.staticPrefix().end();
List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length);
for (ColumnSlice slice : slices)
{
if (sliceIncludesStatics(slice, cfm))
nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd));
else
nonStaticSlices.add(slice);
}
return Pair.create(
new SliceQueryFilter(staticSliceEnd, Composites.EMPTY, true, count, compositesToGroup),
new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup));
}
public SliceQueryFilter withUpdatedStart(Composite newStart, CFMetaData cfm)
{
Comparator<Composite> cmp = reversed ? cfm.comparator.reverseComparator() : cfm.comparator;
// Check our slices to see if any fall before the new start (in which case they can be removed) or
// if they contain the new start (in which case they should start from the page start). However, if the
// slices would include static columns, we need to ensure they are also fetched, and so a separate
// slice for the static columns may be required.
// Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
// the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
List<ColumnSlice> newSlices = new ArrayList<>();
boolean pastNewStart = false;
for (ColumnSlice slice : slices)
{
if (pastNewStart)
{
newSlices.add(slice);
continue;
}
if (slice.isBefore(cmp, newStart))
{
if (!reversed && sliceIncludesStatics(slice, cfm))
newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
continue;
}
else if (slice.includes(cmp, newStart))
{
if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.isEmpty())
newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
newSlices.add(new ColumnSlice(newStart, slice.finish));
}
else
{
newSlices.add(slice);
}
pastNewStart = true;
}
return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
}
public Iterator<Cell> getColumnIterator(ColumnFamily cf)
{
assert cf != null;
return reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
}
public OnDiskAtomIterator getColumnIterator(final DecoratedKey key, final ColumnFamily cf)
{
assert cf != null;
final Iterator<Cell> iter = getColumnIterator(cf);
return new OnDiskAtomIterator()
{
public ColumnFamily getColumnFamily()
{
return cf;
}
public DecoratedKey getKey()
{
return key;
}
public boolean hasNext()
{
return iter.hasNext();
}
public OnDiskAtom next()
{
return iter.next();
}
public void close() throws IOException { }
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
{
return sstable.iterator(key, slices, reversed);
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry)
{
return sstable.iterator(file, key, slices, reversed, indexEntry);
}
public Comparator<Cell> getColumnComparator(CellNameType comparator)
{
return reversed ? comparator.columnReverseComparator() : comparator.columnComparator(false);
}
public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now)
{
columnCounter = columnCounter(container.getComparator(), now);
DeletionInfo.InOrderTester tester = container.deletionInfo().inOrderTester(reversed);
while (reducedColumns.hasNext())
{
Cell cell = reducedColumns.next();
if (logger.isTraceEnabled())
logger.trace("collecting {} of {}: {}", columnCounter.live(), count, cell.getString(container.getComparator()));
// An expired tombstone will be immediately discarded in memory, and needn't be counted.
// Neither should be any cell shadowed by a range- or a partition tombstone.
if (cell.getLocalDeletionTime() < gcBefore || !columnCounter.count(cell, tester))
continue;
if (columnCounter.live() > count)
break;
if (respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneFailureThreshold())
{
Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}",
DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container));
throw new TombstoneOverwhelmingException(columnCounter.tombstones(),
count,
container.metadata().ksName,
container.metadata().cfName,
container.getComparator().getString(cell.name()),
getSlicesInfo(container),
container.metadata().getKeyValidator().getString(key.getKey()));
}
container.appendColumn(cell);
}
boolean warnTombstones = logger.isWarnEnabled() && respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneWarnThreshold();
if (warnTombstones)
{
String msg = String.format("Read %d live and %d tombstone cells in %s.%s for key: %1.512s (see tombstone_warn_threshold). %d columns were requested, slices=%1.512s",
columnCounter.live(),
columnCounter.tombstones(),
container.metadata().ksName,
container.metadata().cfName,
container.metadata().getKeyValidator().getString(key.getKey()),
count,
getSlicesInfo(container));
ClientWarn.instance.warn(msg);
logger.warn(msg);
}
Tracing.trace("Read {} live and {} tombstone cells{}",
columnCounter.live(),
columnCounter.tombstones(),
warnTombstones ? " (see tombstone_warn_threshold)" : "");
}
private String getSlicesInfo(ColumnFamily container)
{
StringBuilder sb = new StringBuilder();
CellNameType type = container.metadata().comparator;
for (ColumnSlice sl : slices)
{
assert sl != null;
sb.append('[');
sb.append(type.getString(sl.start));
sb.append('-');
sb.append(type.getString(sl.finish));
sb.append(']');
}
return sb.toString();
}
protected boolean respectTombstoneThresholds()
{
return true;
}
public int getLiveCount(ColumnFamily cf, long now)
{
return columnCounter(cf.getComparator(), now).countAll(cf).live();
}
public ColumnCounter columnCounter(CellNameType comparator, long now)
{
if (compositesToGroup < 0)
return new ColumnCounter(now);
else if (compositesToGroup == 0)
return new ColumnCounter.GroupByPrefix(now, null, 0);
else if (reversed)
return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup);
else
return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup);
}
public ColumnFamily trim(ColumnFamily cf, int trimTo, long now)
{
// each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming
if (cf.getColumnCount() < trimTo)
return cf;
ColumnCounter counter = columnCounter(cf.getComparator(), now);
ColumnFamily trimmedCf = cf.getFactory().create(cf.metadata(), reversed, trimTo);
trimmedCf.delete(cf);
Collection<Cell> cells = reversed
? cf.getReverseSortedColumns()
: cf.getSortedColumns();
DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(reversed);
for (Iterator<Cell> iter = cells.iterator(); iter.hasNext(); )
{
Cell cell = iter.next();
counter.count(cell, tester);
if (counter.live() > trimTo)
{
break;
}
else
{
trimmedCf.addColumn(cell);
}
}
return trimmedCf;
}
public Composite start()
{
return this.slices[0].start;
}
public Composite finish()
{
return this.slices[slices.length - 1].finish;
}
public void setStart(Composite start)
{
assert slices.length == 1;
this.slices[0] = new ColumnSlice(start, this.slices[0].finish);
}
public int lastCounted()
{
// If we have a slice limit set, columnCounter.live() can overcount by one because we have to call
// columnCounter.count() before we can tell if we've exceeded the slice limit (and accordingly, should not
// add the cells to returned container). To deal with this overcounting, we take the min of the slice
// limit and the counter's count.
return columnCounter == null ? 0 : Math.min(columnCounter.live(), count);
}
public int lastTombstones()
{
return columnCounter == null ? 0 : columnCounter.tombstones();
}
public int lastLive()
{
return columnCounter == null ? 0 : columnCounter.live();
}
@Override
public String toString()
{
return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + ", toGroup = " + compositesToGroup + "]";
}
public boolean isReversed()
{
return reversed;
}
public void updateColumnsLimit(int newLimit)
{
count = newLimit;
}
public boolean maySelectPrefix(CType type, Composite prefix)
{
for (ColumnSlice slice : slices)
if (slice.includes(type, prefix))
return true;
return false;
}
public boolean shouldInclude(SSTableReader sstable)
{
List<ByteBuffer> minColumnNames = sstable.getSSTableMetadata().minColumnNames;
List<ByteBuffer> maxColumnNames = sstable.getSSTableMetadata().maxColumnNames;
CellNameType comparator = sstable.metadata.comparator;
if (minColumnNames.isEmpty() || maxColumnNames.isEmpty())
return true;
for (ColumnSlice slice : slices)
if (slice.intersects(minColumnNames, maxColumnNames, comparator, reversed))
return true;
return false;
}
public boolean isHeadFilter()
{
return slices.length == 1 && slices[0].start.isEmpty() && !reversed;
}
public boolean countCQL3Rows(CellNameType comparator)
{
// If comparator is dense a cell == a CQL3 rows so we're always counting CQL3 rows
// in particular. Otherwise, we do so only if we group the cells into CQL rows.
return comparator.isDense() || compositesToGroup >= 0;
}
public boolean isFullyCoveredBy(ColumnFamily cf, long now)
{
// cf is the beginning of a partition. It covers this filter if:
// 1) either this filter requests the head of the partition and request less
// than what cf has to offer (note: we do need to use getLiveCount() for that
// as it knows if the filter count cells or CQL3 rows).
// 2) the start and finish bound of this filter are included in cf.
if (isHeadFilter() && count <= getLiveCount(cf, now))
return true;
if (start().isEmpty() || finish().isEmpty() || !cf.hasColumns())
return false;
Composite low = isReversed() ? finish() : start();
Composite high = isReversed() ? start() : finish();
CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name();
return cf.getComparator().compare(first, low) <= 0
&& cf.getComparator().compare(high, last) <= 0;
}
public static class Serializer implements IVersionedSerializer<SliceQueryFilter>
{
private CType type;
public Serializer(CType type)
{
this.type = type;
}
public void serialize(SliceQueryFilter f, DataOutputPlus out, int version) throws IOException
{
out.writeInt(f.slices.length);
for (ColumnSlice slice : f.slices)
type.sliceSerializer().serialize(slice, out, version);
out.writeBoolean(f.reversed);
int count = f.count;
out.writeInt(count);
out.writeInt(f.compositesToGroup);
}
public SliceQueryFilter deserialize(DataInput in, int version) throws IOException
{
ColumnSlice[] slices;
slices = new ColumnSlice[in.readInt()];
for (int i = 0; i < slices.length; i++)
slices[i] = type.sliceSerializer().deserialize(in, version);
boolean reversed = in.readBoolean();
int count = in.readInt();
int compositesToGroup = in.readInt();
return new SliceQueryFilter(slices, reversed, count, compositesToGroup);
}
public long serializedSize(SliceQueryFilter f, int version)
{
TypeSizes sizes = TypeSizes.NATIVE;
int size = 0;
size += sizes.sizeof(f.slices.length);
for (ColumnSlice slice : f.slices)
size += type.sliceSerializer().serializedSize(slice, version);
size += sizes.sizeof(f.reversed);
size += sizes.sizeof(f.count);
size += sizes.sizeof(f.compositesToGroup);
return size;
}
}
public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source)
{
final DeletionInfo delInfo = source.deletionInfo();
if (!delInfo.hasRanges() || slices.length == 0)
return Iterators.emptyIterator();
return new AbstractIterator<RangeTombstone>()
{
private int sliceIdx = 0;
private Iterator<RangeTombstone> sliceIter = currentRangeIter();
protected RangeTombstone computeNext()
{
while (true)
{
if (sliceIter.hasNext())
return sliceIter.next();
if (!nextSlice())
return endOfData();
sliceIter = currentRangeIter();
}
}
private Iterator<RangeTombstone> currentRangeIter()
{
ColumnSlice slice = slices[reversed ? (slices.length - 1 - sliceIdx) : sliceIdx];
return reversed ? delInfo.rangeIterator(slice.finish, slice.start)
: delInfo.rangeIterator(slice.start, slice.finish);
}
private boolean nextSlice()
{
return ++sliceIdx < slices.length;
}
};
}
}