blob: f4bd1eaf72aa9ad9a749f83405316d271a2fdd01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable.format.big;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.utils.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.AbstractBounds.Boundary;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
import static org.apache.cassandra.dht.AbstractBounds.maxLeft;
import static org.apache.cassandra.dht.AbstractBounds.minRight;
public class BigTableScanner implements ISSTableScanner
{
private final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final RandomAccessReader dfile;
protected final RandomAccessReader ifile;
public final SSTableReader sstable;
private final Iterator<AbstractBounds<PartitionPosition>> rangeIterator;
private AbstractBounds<PartitionPosition> currentRange;
private final ColumnFilter columns;
private final DataRange dataRange;
private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
private final boolean isForThrift;
private final SSTableReadsListener listener;
private long startScan = -1;
private long bytesScanned = 0;
protected Iterator<UnfilteredRowIterator> iterator;
// Full scan of the sstables
public static ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
{
return new BigTableScanner(sstable,
ColumnFilter.all(sstable.metadata),
limiter,
Iterators.singletonIterator(fullRange(sstable)));
}
public static ISSTableScanner getScanner(SSTableReader sstable,
ColumnFilter columns,
DataRange dataRange,
RateLimiter limiter,
boolean isForThrift,
SSTableReadsListener listener)
{
return new BigTableScanner(sstable, columns, dataRange, limiter, isForThrift, makeBounds(sstable, dataRange).iterator(), listener);
}
public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
{
// We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(tokenRanges);
if (positions.isEmpty())
return new EmptySSTableScanner(sstable);
return new BigTableScanner(sstable,
ColumnFilter.all(sstable.metadata),
limiter,
makeBounds(sstable, tokenRanges).iterator());
}
public static ISSTableScanner getScanner(SSTableReader sstable, Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
{
return new BigTableScanner(sstable, ColumnFilter.all(sstable.metadata), null, rangeIterator);
}
private BigTableScanner(SSTableReader sstable,
ColumnFilter columns,
RateLimiter limiter,
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
{
this(sstable, columns, null, limiter, false, rangeIterator, SSTableReadsListener.NOOP_LISTENER);
}
private BigTableScanner(SSTableReader sstable,
ColumnFilter columns,
DataRange dataRange,
RateLimiter limiter,
boolean isForThrift,
Iterator<AbstractBounds<PartitionPosition>> rangeIterator,
SSTableReadsListener listener)
{
assert sstable != null;
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
this.columns = columns;
this.dataRange = dataRange;
this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
sstable.descriptor.version,
sstable.header);
this.isForThrift = isForThrift;
this.rangeIterator = rangeIterator;
this.listener = listener;
}
private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
{
List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(tokenRanges.size());
for (Range<Token> range : Range.normalize(tokenRanges))
addRange(sstable, Range.makeRowRange(range), boundsList);
return boundsList;
}
private static List<AbstractBounds<PartitionPosition>> makeBounds(SSTableReader sstable, DataRange dataRange)
{
List<AbstractBounds<PartitionPosition>> boundsList = new ArrayList<>(2);
addRange(sstable, dataRange.keyRange(), boundsList);
return boundsList;
}
private static AbstractBounds<PartitionPosition> fullRange(SSTableReader sstable)
{
return new Bounds<PartitionPosition>(sstable.first, sstable.last);
}
private static void addRange(SSTableReader sstable, AbstractBounds<PartitionPosition> requested, List<AbstractBounds<PartitionPosition>> boundsList)
{
if (requested instanceof Range && ((Range)requested).isWrapAround())
{
if (requested.right.compareTo(sstable.first) >= 0)
{
// since we wrap, we must contain the whole sstable prior to stopKey()
Boundary<PartitionPosition> left = new Boundary<PartitionPosition>(sstable.first, true);
Boundary<PartitionPosition> right;
right = requested.rightBoundary();
right = minRight(right, sstable.last, true);
if (!isEmpty(left, right))
boundsList.add(AbstractBounds.bounds(left, right));
}
if (requested.left.compareTo(sstable.last) <= 0)
{
// since we wrap, we must contain the whole sstable after dataRange.startKey()
Boundary<PartitionPosition> right = new Boundary<PartitionPosition>(sstable.last, true);
Boundary<PartitionPosition> left;
left = requested.leftBoundary();
left = maxLeft(left, sstable.first, true);
if (!isEmpty(left, right))
boundsList.add(AbstractBounds.bounds(left, right));
}
}
else
{
assert requested.left.compareTo(requested.right) <= 0 || requested.right.isMinimum();
Boundary<PartitionPosition> left, right;
left = requested.leftBoundary();
right = requested.rightBoundary();
left = maxLeft(left, sstable.first, true);
// apparently isWrapAround() doesn't count Bounds that extend to the limit (min) as wrapping
right = requested.right.isMinimum() ? new Boundary<PartitionPosition>(sstable.last, true)
: minRight(right, sstable.last, true);
if (!isEmpty(left, right))
boundsList.add(AbstractBounds.bounds(left, right));
}
}
private void seekToCurrentRangeStart()
{
long indexPosition = sstable.getIndexScanPosition(currentRange.left);
ifile.seek(indexPosition);
try
{
while (!ifile.isEOF())
{
indexPosition = ifile.getFilePointer();
DecoratedKey indexDecoratedKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
if (indexDecoratedKey.compareTo(currentRange.left) > 0 || currentRange.contains(indexDecoratedKey))
{
// Found, just read the dataPosition and seek into index and data files
long dataPosition = RowIndexEntry.Serializer.readPosition(ifile, sstable.descriptor.version);
ifile.seek(indexPosition);
dfile.seek(dataPosition);
break;
}
else
{
RowIndexEntry.Serializer.skip(ifile, sstable.descriptor.version);
}
}
}
catch (IOException e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, sstable.getFilename());
}
}
public void close()
{
try
{
if (isClosed.compareAndSet(false, true))
FileUtils.close(dfile, ifile);
}
catch (IOException e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, sstable.getFilename());
}
}
public long getLengthInBytes()
{
return dfile.length();
}
public long getCurrentPosition()
{
return dfile.getFilePointer();
}
public long getBytesScanned()
{
return bytesScanned;
}
public long getCompressedLengthInBytes()
{
return sstable.onDiskLength();
}
public String getBackingFiles()
{
return sstable.toString();
}
public boolean isForThrift()
{
return isForThrift;
}
public CFMetaData metadata()
{
return sstable.metadata;
}
public boolean hasNext()
{
if (iterator == null)
iterator = createIterator();
return iterator.hasNext();
}
public UnfilteredRowIterator next()
{
if (iterator == null)
iterator = createIterator();
return iterator.next();
}
public void remove()
{
throw new UnsupportedOperationException();
}
private Iterator<UnfilteredRowIterator> createIterator()
{
listener.onScanningStarted(sstable);
return new KeyScanningIterator();
}
protected class KeyScanningIterator extends AbstractIterator<UnfilteredRowIterator>
{
private DecoratedKey nextKey;
private RowIndexEntry nextEntry;
private DecoratedKey currentKey;
private RowIndexEntry currentEntry;
protected UnfilteredRowIterator computeNext()
{
try
{
if (nextEntry == null)
{
do
{
if (startScan != -1)
bytesScanned += dfile.getFilePointer() - startScan;
// we're starting the first range or we just passed the end of the previous range
if (!rangeIterator.hasNext())
return endOfData();
currentRange = rangeIterator.next();
seekToCurrentRangeStart();
startScan = dfile.getFilePointer();
if (ifile.isEOF())
return endOfData();
currentKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
currentEntry = rowIndexEntrySerializer.deserialize(ifile, ifile.getFilePointer());
} while (!currentRange.contains(currentKey));
}
else
{
// we're in the middle of a range
currentKey = nextKey;
currentEntry = nextEntry;
}
if (ifile.isEOF())
{
nextEntry = null;
nextKey = null;
}
else
{
// we need the position of the start of the next key, regardless of whether it falls in the current range
nextKey = sstable.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
nextEntry = rowIndexEntrySerializer.deserialize(ifile, ifile.getFilePointer());
if (!currentRange.contains(nextKey))
{
nextKey = null;
nextEntry = null;
}
}
/*
* For a given partition key, we want to avoid hitting the data
* file unless we're explicitely asked to. This is important
* for PartitionRangeReadCommand#checkCacheFilter.
*/
return new LazilyInitializedUnfilteredRowIterator(currentKey)
{
protected UnfilteredRowIterator initializeIterator()
{
if (startScan != -1)
bytesScanned += dfile.getFilePointer() - startScan;
try
{
if (dataRange == null)
{
dfile.seek(currentEntry.position);
startScan = dfile.getFilePointer();
ByteBufferUtil.skipShortLength(dfile); // key
return SSTableIdentityIterator.create(sstable, dfile, partitionKey());
}
else
{
startScan = dfile.getFilePointer();
}
ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(partitionKey());
return sstable.iterator(dfile, partitionKey(), currentEntry, filter.getSlices(BigTableScanner.this.metadata()), columns, filter.isReversed(), isForThrift);
}
catch (CorruptSSTableException | IOException e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, sstable.getFilename());
}
}
};
}
catch (CorruptSSTableException | IOException e)
{
sstable.markSuspect();
throw new CorruptSSTableException(e, sstable.getFilename());
}
}
}
@Override
public String toString()
{
return getClass().getSimpleName() + "(" +
"dfile=" + dfile +
" ifile=" + ifile +
" sstable=" + sstable +
")";
}
public static class EmptySSTableScanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
{
private final SSTableReader sstable;
public EmptySSTableScanner(SSTableReader sstable)
{
this.sstable = sstable;
}
public long getLengthInBytes()
{
return 0;
}
public long getCurrentPosition()
{
return 0;
}
public long getBytesScanned()
{
return 0;
}
public long getCompressedLengthInBytes()
{
return 0;
}
public String getBackingFiles()
{
return sstable.getFilename();
}
public boolean isForThrift()
{
return false;
}
public CFMetaData metadata()
{
return sstable.metadata;
}
public boolean hasNext()
{
return false;
}
public UnfilteredRowIterator next()
{
return null;
}
}
}