| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.io.sstable.format.bti; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.PartitionPosition; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.io.sstable.KeyReader; |
| import org.apache.cassandra.io.util.FileDataInput; |
| import org.apache.cassandra.io.util.FileHandle; |
| import org.apache.cassandra.utils.ByteBufferUtil; |
| import org.apache.cassandra.utils.Throwables; |
| |
| import static org.apache.cassandra.utils.FBUtilities.immutableListWithFilteredNulls; |
| |
| /** |
| * Partition iterator for the BTI format. |
| * <p> |
| * As the index stores prefixes of keys, the slice returned by the underlying {@link PartitionIndex.IndexPosIterator} |
| * may start and end with entries that have the same prefix as the provided bounds, but be in the wrong relationship |
| * with them. To filter these out, we start by checking the first item during initialization, and by working one item |
| * ahead, so that we can recognize the end of the slice and check the last item before we return it. |
| */ |
| class PartitionIterator extends PartitionIndex.IndexPosIterator implements KeyReader |
| { |
| private final PartitionIndex partitionIndex; |
| private final IPartitioner partitioner; |
| private final PartitionPosition limit; |
| private final int exclusiveLimit; |
| private final FileHandle dataFile; |
| private final FileHandle rowIndexFile; |
| |
| private FileDataInput dataInput; |
| private FileDataInput indexInput; |
| |
| private DecoratedKey currentKey; |
| private TrieIndexEntry currentEntry; |
| private DecoratedKey nextKey; |
| private TrieIndexEntry nextEntry; |
| |
| @SuppressWarnings({ "resource", "RedundantSuppression" }) |
| static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile, |
| PartitionPosition left, int inclusiveLeft, PartitionPosition right, int exclusiveRight) throws IOException |
| { |
| PartitionIterator partitionIterator = null; |
| PartitionIndex partitionIndexCopy = null; |
| FileHandle dataFileCopy = null; |
| FileHandle rowIndexFileCopy = null; |
| |
| try |
| { |
| partitionIndexCopy = partitionIndex.sharedCopy(); |
| dataFileCopy = dataFile.sharedCopy(); |
| rowIndexFileCopy = rowIndexFile.sharedCopy(); |
| |
| partitionIterator = new PartitionIterator(partitionIndexCopy, partitioner, rowIndexFileCopy, dataFileCopy, left, right, exclusiveRight); |
| |
| partitionIterator.readNext(); |
| // Because the index stores prefixes, the first value can be in any relationship with the left bound. |
| if (partitionIterator.nextKey != null && !(partitionIterator.nextKey.compareTo(left) > inclusiveLeft)) |
| { |
| partitionIterator.readNext(); |
| } |
| partitionIterator.advance(); |
| return partitionIterator; |
| } |
| catch (IOException | RuntimeException ex) |
| { |
| if (partitionIterator != null) |
| { |
| partitionIterator.close(); |
| } |
| else |
| { |
| Throwables.closeNonNullAndAddSuppressed(ex, rowIndexFileCopy, dataFileCopy, partitionIndexCopy); |
| } |
| throw ex; |
| } |
| } |
| |
| static PartitionIterator create(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile) throws IOException |
| { |
| return create(partitionIndex, partitioner, rowIndexFile, dataFile, partitionIndex.firstKey(), -1, partitionIndex.lastKey(), 0); |
| } |
| |
| static PartitionIterator empty(PartitionIndex partitionIndex) |
| { |
| return new PartitionIterator(partitionIndex.sharedCopy()); |
| } |
| |
| private PartitionIterator(PartitionIndex partitionIndex, IPartitioner partitioner, FileHandle rowIndexFile, FileHandle dataFile, |
| PartitionPosition left, PartitionPosition right, int exclusiveRight) |
| { |
| super(partitionIndex, left, right); |
| this.partitionIndex = partitionIndex; |
| this.partitioner = partitioner; |
| this.limit = right; |
| this.exclusiveLimit = exclusiveRight; |
| this.rowIndexFile = rowIndexFile; |
| this.dataFile = dataFile; |
| } |
| |
| private PartitionIterator(PartitionIndex partitionIndex) |
| { |
| super(partitionIndex, partitionIndex.firstKey(), partitionIndex.firstKey()); |
| this.partitionIndex = partitionIndex; |
| this.partitioner = null; |
| this.limit = partitionIndex.firstKey(); |
| this.exclusiveLimit = -1; |
| this.rowIndexFile = null; |
| this.dataFile = null; |
| |
| this.currentEntry = null; |
| this.currentKey = null; |
| this.nextEntry = null; |
| this.nextKey = null; |
| } |
| |
| @Override |
| public void close() |
| { |
| Throwable accum = null; |
| accum = Throwables.close(accum, immutableListWithFilteredNulls(partitionIndex, dataFile, rowIndexFile)); |
| accum = Throwables.close(accum, immutableListWithFilteredNulls(dataInput, indexInput)); |
| accum = Throwables.perform(accum, super::close); |
| Throwables.maybeFail(accum); |
| } |
| |
| public DecoratedKey decoratedKey() |
| { |
| return currentKey; |
| } |
| |
| public ByteBuffer key() |
| { |
| return currentKey.getKey(); |
| } |
| |
| @Override |
| public long dataPosition() |
| { |
| return currentEntry != null ? currentEntry.position : -1; |
| } |
| |
| @Override |
| public long keyPositionForSecondaryIndex() |
| { |
| return dataPosition(); |
| } |
| |
| public TrieIndexEntry entry() |
| { |
| return currentEntry; |
| } |
| |
| @Override |
| public boolean advance() throws IOException |
| { |
| currentKey = nextKey; |
| currentEntry = nextEntry; |
| if (currentKey != null) |
| { |
| readNext(); |
| // if nextKey is null, then currentKey is the last key to be published, therefore check against any limit |
| // and suppress the partition if it is beyond the limit |
| if (nextKey == null && limit != null && currentKey.compareTo(limit) > exclusiveLimit) |
| { // exclude last partition outside range |
| currentKey = null; |
| currentEntry = null; |
| return false; |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| private void readNext() throws IOException |
| { |
| long pos = nextIndexPos(); |
| if (pos != PartitionIndex.NOT_FOUND) |
| { |
| if (pos >= 0) |
| { |
| seekIndexInput(pos); |
| nextKey = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(indexInput)); |
| nextEntry = TrieIndexEntry.deserialize(indexInput, indexInput.getFilePointer()); |
| } |
| else |
| { |
| pos = ~pos; |
| seekDataInput(pos); |
| nextKey = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataInput)); |
| nextEntry = new TrieIndexEntry(pos); |
| } |
| } |
| else |
| { |
| nextKey = null; |
| nextEntry = null; |
| } |
| } |
| |
| private void seekIndexInput(long pos) throws IOException |
| { |
| if (indexInput == null) |
| indexInput = rowIndexFile.createReader(pos); |
| else |
| indexInput.seek(pos); |
| } |
| |
| private void seekDataInput(long pos) throws IOException |
| { |
| if (dataInput == null) |
| dataInput = dataFile.createReader(pos); |
| else |
| dataInput.seek(pos); |
| } |
| |
| @Override |
| public boolean isExhausted() |
| { |
| return currentKey == null; |
| } |
| |
| @Override |
| public void reset() |
| { |
| go(root); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return String.format("BTI-PartitionIterator(%s)", partitionIndex.getFileHandle().path()); |
| } |
| } |