blob: 911d72e96ee983cd96febd11bf09d19b0de4e0d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.bti;
import java.io.IOException;
import java.util.NoSuchElementException;
import com.carrotsearch.hppc.LongStack;
import org.apache.cassandra.db.ClusteringBound;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.UnfilteredValidation;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
import org.apache.cassandra.io.sstable.AbstractSSTableIterator;
import org.apache.cassandra.io.sstable.format.bti.RowIndexReader.IndexInfo;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileHandle;
/**
* Unfiltered row iterator over a BTI SSTable that returns rows in reverse order.
*/
class SSTableReversedIterator extends AbstractSSTableIterator<TrieIndexEntry>
{
/**
* The index of the slice being processed.
*/
private int slice;
public SSTableReversedIterator(BtiTableReader sstable,
FileDataInput file,
DecoratedKey key,
TrieIndexEntry indexEntry,
Slices slices,
ColumnFilter columns,
FileHandle ifile)
{
super(sstable, file, key, indexEntry, slices, columns, ifile);
}
protected Reader createReaderInternal(TrieIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile)
{
if (indexEntry.isIndexed())
return new ReverseIndexedReader(indexEntry, file, shouldCloseFile);
else
return new ReverseReader(file, shouldCloseFile);
}
public boolean isReverseOrder()
{
return true;
}
protected int nextSliceIndex()
{
int next = slice;
slice++;
return slices.size() - (next + 1);
}
protected boolean hasMoreSlices()
{
return slice < slices.size();
}
/**
* Reverse iteration is performed by going through an index block (or the whole partition if not indexed) forwards
* and storing the positions of each entry that falls within the slice in a stack. Reverse iteration then pops out
* positions and reads the entries.
* <p>
* Note: The earlier version of this was constructing an in-memory view of the block instead, which gives better
* performance on bigger queries and index blocks (due to not having to read disk again). With the lower
* granularity of the tries it makes better sense to store as little as possible as the beginning of the block
* should very rarely be in other page/chunk cache locations. This has the benefit of being able to answer small
* queries (esp. LIMIT 1) faster and with less GC churn.
*/
private class ReverseReader extends AbstractReader
{
final LongStack rowOffsets = new LongStack();
RangeTombstoneMarker blockOpenMarker, blockCloseMarker;
private Unfiltered next = null;
private boolean foundLessThan;
private long startPos = -1;
private ReverseReader(FileDataInput file, boolean shouldCloseFile)
{
super(file, shouldCloseFile);
}
@Override
public void setForSlice(Slice slice) throws IOException
{
// read full row and filter
if (startPos == -1)
startPos = file.getFilePointer();
else
seekToPosition(startPos);
fillOffsets(slice, true, true, Long.MAX_VALUE);
}
@Override
protected boolean hasNextInternal() throws IOException
{
if (next != null)
return true;
next = computeNext();
return next != null;
}
@Override
protected Unfiltered nextInternal() throws IOException
{
if (!hasNextInternal())
throw new NoSuchElementException();
Unfiltered toReturn = next;
next = null;
return toReturn;
}
private Unfiltered computeNext() throws IOException
{
Unfiltered toReturn;
do
{
if (blockCloseMarker != null)
{
toReturn = blockCloseMarker;
blockCloseMarker = null;
return toReturn;
}
while (!rowOffsets.isEmpty())
{
seekToPosition(rowOffsets.pop());
boolean hasNext = deserializer.hasNext();
assert hasNext : "Data file changed after offset collection pass";
toReturn = deserializer.readNext();
UnfilteredValidation.maybeValidateUnfiltered(toReturn, metadata(), key, sstable);
// We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
if (!toReturn.isEmpty())
return toReturn;
}
}
while (!foundLessThan && advanceIndexBlock());
// open marker to be output only as slice is finished
if (blockOpenMarker != null)
{
toReturn = blockOpenMarker;
blockOpenMarker = null;
return toReturn;
}
return null;
}
protected boolean advanceIndexBlock() throws IOException
{
return false;
}
void fillOffsets(Slice slice, boolean filterStart, boolean filterEnd, long stopPosition) throws IOException
{
filterStart &= !slice.start().equals(ClusteringBound.BOTTOM);
filterEnd &= !slice.end().equals(ClusteringBound.TOP);
ClusteringBound<?> start = slice.start();
long currentPosition = file.getFilePointer();
foundLessThan = false;
// This is a copy of handlePreSliceData which also checks currentPosition < stopPosition.
// Not extracted to method as we need both marker and currentPosition.
if (filterStart)
{
while (currentPosition < stopPosition && deserializer.hasNext() && deserializer.compareNextTo(start) <= 0)
{
if (deserializer.nextIsRow())
deserializer.skipNext();
else
updateOpenMarker((RangeTombstoneMarker) deserializer.readNext());
currentPosition = file.getFilePointer();
foundLessThan = true;
}
}
// We've reached the beginning of our queried slice. If we have an open marker
// we should return that at the end of the slice to close the deletion.
if (openMarker != null)
blockOpenMarker = new RangeTombstoneBoundMarker(start, openMarker);
// Now deserialize everything until we reach our requested end (if we have one)
// See SSTableIterator.ForwardRead.computeNext() for why this is a strict inequality below: this is the same
// reasoning here.
while (currentPosition < stopPosition && deserializer.hasNext()
&& (!filterEnd || deserializer.compareNextTo(slice.end()) < 0))
{
rowOffsets.push(currentPosition);
if (deserializer.nextIsRow())
deserializer.skipNext();
else
updateOpenMarker((RangeTombstoneMarker) deserializer.readNext());
currentPosition = file.getFilePointer();
}
// If we have an open marker, we should output that first, unless end is not being filtered
// (i.e. it's either top (where a marker can't be open) or we placed that marker during previous block).
if (openMarker != null && filterEnd)
{
// If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block.
blockCloseMarker = new RangeTombstoneBoundMarker(slice.end(), openMarker);
openMarker = null;
}
}
}
private class ReverseIndexedReader extends ReverseReader
{
private RowIndexReverseIterator indexReader;
private final TrieIndexEntry indexEntry;
private final long basePosition;
private Slice currentSlice;
private long currentBlockStart;
public ReverseIndexedReader(AbstractRowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile)
{
super(file, shouldCloseFile);
basePosition = indexEntry.position;
this.indexEntry = (TrieIndexEntry) indexEntry;
}
@Override
public void close() throws IOException
{
if (indexReader != null)
indexReader.close();
super.close();
}
@Override
public void setForSlice(Slice slice) throws IOException
{
currentSlice = slice;
ClusteringComparator comparator = metadata.comparator;
if (indexReader != null)
indexReader.close();
indexReader = new RowIndexReverseIterator(ifile,
indexEntry,
comparator.asByteComparable(slice.end()));
gotoBlock(indexReader.nextIndexInfo(), true, Long.MAX_VALUE);
}
boolean gotoBlock(IndexInfo indexInfo, boolean filterEnd, long blockEnd) throws IOException
{
blockOpenMarker = null;
blockCloseMarker = null;
rowOffsets.clear();
if (indexInfo == null)
return false;
currentBlockStart = basePosition + indexInfo.offset;
openMarker = indexInfo.openDeletion;
seekToPosition(currentBlockStart);
fillOffsets(currentSlice, true, filterEnd, blockEnd);
return !rowOffsets.isEmpty();
}
@Override
protected boolean advanceIndexBlock() throws IOException
{
return gotoBlock(indexReader.nextIndexInfo(), false, currentBlockStart);
}
}
}