blob: 9bd8ff93508a705d41edb6bed051f7c6af040fac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.bti;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.sstable.KeyReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.utils.FBUtilities.immutableListWithFilteredNulls;
/**
* Partition iterator for the BTI format.
* <p>
* As the index stores prefixes of keys, the slice returned by the underlying {@link PartitionIndex.IndexPosIterator}
* may start and end with entries that have the same prefix as the provided bounds, but be in the wrong relationship
* with them. To filter these out, we start by checking the first item during initialization, and by working one item
* ahead, so that we can recognize the end of the slice and check the last item before we return it.
*/
class PartitionIterator extends PartitionIndex.IndexPosIterator implements KeyReader
{
private final PartitionIndex partitionIndex;
private final IPartitioner partitioner;
private final PartitionPosition limit;
private final int exclusiveLimit;
private final FileHandle dataFile;
private final FileHandle rowIndexFile;
private FileDataInput dataInput;
private FileDataInput indexInput;
private DecoratedKey currentKey;
private TrieIndexEntry currentEntry;
private DecoratedKey nextKey;
private TrieIndexEntry nextEntry;
@SuppressWarnings({ "resource", "RedundantSuppression" })
static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile,
PartitionPosition left, int inclusiveLeft, PartitionPosition right, int exclusiveRight) throws IOException
{
PartitionIterator partitionIterator = null;
PartitionIndex partitionIndexCopy = null;
FileHandle dataFileCopy = null;
FileHandle rowIndexFileCopy = null;
try
{
partitionIndexCopy = partitionIndex.sharedCopy();
dataFileCopy = dataFile.sharedCopy();
rowIndexFileCopy = rowIndexFile.sharedCopy();
partitionIterator = new PartitionIterator(partitionIndexCopy, partitioner, rowIndexFileCopy, dataFileCopy, left, right, exclusiveRight);
partitionIterator.readNext();
// Because the index stores prefixes, the first value can be in any relationship with the left bound.
if (partitionIterator.nextKey != null && !(partitionIterator.nextKey.compareTo(left) > inclusiveLeft))
{
partitionIterator.readNext();
}
partitionIterator.advance();
return partitionIterator;
}
catch (IOException | RuntimeException ex)
{
if (partitionIterator != null)
{
partitionIterator.close();
}
else
{
Throwables.closeNonNullAndAddSuppressed(ex, rowIndexFileCopy, dataFileCopy, partitionIndexCopy);
}
throw ex;
}
}
static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile) throws IOException
{
return create(partitionIndex, partitioner, rowIndexFile, dataFile, partitionIndex.firstKey(), -1, partitionIndex.lastKey(), 0);
}
static PartitionIterator empty(PartitionIndex partitionIndex)
{
return new PartitionIterator(partitionIndex.sharedCopy());
}
private PartitionIterator(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile,
PartitionPosition left, PartitionPosition right, int exclusiveRight)
{
super(partitionIndex, left, right);
this.partitionIndex = partitionIndex;
this.partitioner = partitioner;
this.limit = right;
this.exclusiveLimit = exclusiveRight;
this.rowIndexFile = rowIndexFile;
this.dataFile = dataFile;
}
private PartitionIterator(PartitionIndex partitionIndex)
{
super(partitionIndex, partitionIndex.firstKey(), partitionIndex.firstKey());
this.partitionIndex = partitionIndex;
this.partitioner = null;
this.limit = partitionIndex.firstKey();
this.exclusiveLimit = -1;
this.rowIndexFile = null;
this.dataFile = null;
this.currentEntry = null;
this.currentKey = null;
this.nextEntry = null;
this.nextKey = null;
}
@Override
public void close()
{
Throwable accum = null;
accum = Throwables.close(accum, immutableListWithFilteredNulls(partitionIndex, dataFile, rowIndexFile));
accum = Throwables.close(accum, immutableListWithFilteredNulls(dataInput, indexInput));
accum = Throwables.perform(accum, super::close);
Throwables.maybeFail(accum);
}
public DecoratedKey decoratedKey()
{
return currentKey;
}
public ByteBuffer key()
{
return currentKey.getKey();
}
@Override
public long dataPosition()
{
return currentEntry != null ? currentEntry.position : -1;
}
@Override
public long keyPositionForSecondaryIndex()
{
return dataPosition();
}
public TrieIndexEntry entry()
{
return currentEntry;
}
@Override
public boolean advance() throws IOException
{
currentKey = nextKey;
currentEntry = nextEntry;
if (currentKey != null)
{
readNext();
// if nextKey is null, then currentKey is the last key to be published, therefore check against any limit
// and suppress the partition if it is beyond the limit
if (nextKey == null && limit != null && currentKey.compareTo(limit) > exclusiveLimit)
{ // exclude last partition outside range
currentKey = null;
currentEntry = null;
return false;
}
return true;
}
return false;
}
private void readNext() throws IOException
{
long pos = nextIndexPos();
if (pos != PartitionIndex.NOT_FOUND)
{
if (pos >= 0)
{
seekIndexInput(pos);
nextKey = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(indexInput));
nextEntry = TrieIndexEntry.deserialize(indexInput, indexInput.getFilePointer());
}
else
{
pos = ~pos;
seekDataInput(pos);
nextKey = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataInput));
nextEntry = new TrieIndexEntry(pos);
}
}
else
{
nextKey = null;
nextEntry = null;
}
}
private void seekIndexInput(long pos) throws IOException
{
if (indexInput == null)
indexInput = rowIndexFile.createReader(pos);
else
indexInput.seek(pos);
}
private void seekDataInput(long pos) throws IOException
{
if (dataInput == null)
dataInput = dataFile.createReader(pos);
else
dataInput.seek(pos);
}
@Override
public boolean isExhausted()
{
return currentKey == null;
}
@Override
public void reset()
{
go(root);
}
@Override
public String toString()
{
return String.format("BTI-PartitionIterator(%s)", partitionIndex.getFileHandle().path());
}
}