blob: a5bbc36cfe6e812ea2d1e873876dcc1b9b41ce85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.reader;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.UnfilteredDeserializer;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.DroppedColumn;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.reader.common.RawInputStream;
import org.apache.cassandra.spark.reader.common.SSTableStreamException;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@SuppressWarnings("unused")
public class SSTableReader implements SparkSSTableReader, Scannable
{
private static final Logger LOGGER = LoggerFactory.getLogger(SSTableReader.class);
private final TableMetadata metadata;
@NotNull
private final SSTable ssTable;
private final StatsMetadata statsMetadata;
@NotNull
private final Version version;
@NotNull
private final DecoratedKey first;
@NotNull
private final DecoratedKey last;
@NotNull
private final BigInteger firstToken;
@NotNull
private final BigInteger lastToken;
private final SerializationHeader header;
private final DeserializationHelper helper;
@NotNull
private final AtomicReference<SSTableStreamReader> reader = new AtomicReference<>(null);
@Nullable
private final SparkRangeFilter sparkRangeFilter;
@NotNull
private final List<PartitionKeyFilter> partitionKeyFilters;
@NotNull
private final Stats stats;
@Nullable
private Long startOffset = null;
private Long openedNanos = null;
@NotNull
private final Function<StatsMetadata, Boolean> isRepaired;
public static class Builder
{
@NotNull
final TableMetadata metadata;
@NotNull
final SSTable ssTable;
@Nullable
PruneColumnFilter columnFilter = null;
boolean readIndexOffset = true;
@NotNull
Stats stats = Stats.DoNothingStats.INSTANCE;
boolean useIncrementalRepair = true;
boolean isRepairPrimary = false;
Function<StatsMetadata, Boolean> isRepaired = stats -> stats.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
@Nullable
SparkRangeFilter sparkRangeFilter = null;
@NotNull
final List<PartitionKeyFilter> partitionKeyFilters = new ArrayList<>();
Builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
{
this.metadata = metadata;
this.ssTable = ssTable;
}
public Builder withSparkRangeFilter(@Nullable SparkRangeFilter sparkRangeFilter)
{
this.sparkRangeFilter = sparkRangeFilter;
return this;
}
public Builder withPartitionKeyFilters(@NotNull Collection<PartitionKeyFilter> partitionKeyFilters)
{
this.partitionKeyFilters.addAll(partitionKeyFilters);
return this;
}
public Builder withPartitionKeyFilter(@NotNull PartitionKeyFilter partitionKeyFilter)
{
partitionKeyFilters.add(partitionKeyFilter);
return this;
}
public Builder withColumnFilter(@Nullable PruneColumnFilter columnFilter)
{
this.columnFilter = columnFilter;
return this;
}
public Builder withReadIndexOffset(boolean readIndexOffset)
{
this.readIndexOffset = readIndexOffset;
return this;
}
public Builder withStats(@NotNull Stats stats)
{
this.stats = stats;
return this;
}
public Builder useIncrementalRepair(boolean useIncrementalRepair)
{
this.useIncrementalRepair = useIncrementalRepair;
return this;
}
public Builder isRepairPrimary(boolean isRepairPrimary)
{
this.isRepairPrimary = isRepairPrimary;
return this;
}
public Builder withIsRepairedFunction(Function<StatsMetadata, Boolean> isRepaired)
{
this.isRepaired = isRepaired;
return this;
}
public SSTableReader build() throws IOException
{
return new SSTableReader(metadata,
ssTable,
sparkRangeFilter,
partitionKeyFilters,
columnFilter,
readIndexOffset,
stats,
useIncrementalRepair,
isRepairPrimary,
isRepaired);
}
}
public static Builder builder(@NotNull TableMetadata metadata, @NotNull SSTable ssTable)
{
return new Builder(metadata, ssTable);
}
// CHECKSTYLE IGNORE: Constructor with many parameters
public SSTableReader(@NotNull TableMetadata metadata,
@NotNull SSTable ssTable,
@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters,
@Nullable PruneColumnFilter columnFilter,
boolean readIndexOffset,
@NotNull Stats stats,
boolean useIncrementalRepair,
boolean isRepairPrimary,
@NotNull Function<StatsMetadata, Boolean> isRepaired) throws IOException
{
long startTimeNanos = System.nanoTime();
long now;
this.ssTable = ssTable;
this.stats = stats;
this.isRepaired = isRepaired;
this.sparkRangeFilter = sparkRangeFilter;
File file = constructFilename(metadata.keyspace, metadata.name, ssTable.getDataFileName());
Descriptor descriptor = Descriptor.fromFilename(file);
this.version = descriptor.version;
SummaryDbUtils.Summary summary = null;
Pair<DecoratedKey, DecoratedKey> keys = Pair.create(null, null);
try
{
now = System.nanoTime();
summary = SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
stats.readSummaryDb(ssTable, System.nanoTime() - now);
keys = Pair.create(summary.first(), summary.last());
}
catch (IOException exception)
{
LOGGER.warn("Failed to read Summary.db file ssTable='{}'", ssTable, exception);
}
if (keys.left == null || keys.right == null)
{
LOGGER.warn("Could not load first and last key from Summary.db file, so attempting Index.db fileName={}",
ssTable.getDataFileName());
now = System.nanoTime();
keys = SSTableCache.INSTANCE.keysFromIndex(metadata, ssTable);
stats.readIndexDb(ssTable, System.nanoTime() - now);
}
if (keys.left == null || keys.right == null)
{
throw new IOException("Could not load SSTable first or last tokens");
}
this.first = keys.left;
this.last = keys.right;
this.firstToken = ReaderUtils.tokenToBigInteger(first.getToken());
this.lastToken = ReaderUtils.tokenToBigInteger(last.getToken());
TokenRange readerRange = range();
List<PartitionKeyFilter> matchingKeyFilters = partitionKeyFilters.stream()
.filter(filter -> readerRange.contains(filter.token()))
.collect(Collectors.toList());
boolean overlapsSparkRange = sparkRangeFilter == null || SparkSSTableReader.overlaps(this, sparkRangeFilter.tokenRange());
if (!overlapsSparkRange // SSTable doesn't overlap with Spark worker token range
|| (matchingKeyFilters.isEmpty() && !partitionKeyFilters.isEmpty())) // No matching partition key filters overlap with SSTable
{
this.partitionKeyFilters = Collections.emptyList();
stats.skippedSSTable(sparkRangeFilter, partitionKeyFilters, firstToken, lastToken);
LOGGER.info("Ignoring SSTableReader with firstToken={} lastToken={}, does not overlap with any filter",
firstToken, lastToken);
statsMetadata = null;
header = null;
helper = null;
this.metadata = null;
return;
}
if (!matchingKeyFilters.isEmpty())
{
List<PartitionKeyFilter> matchInBloomFilter =
ReaderUtils.filterKeyInBloomFilter(ssTable, metadata.partitioner, descriptor, matchingKeyFilters);
this.partitionKeyFilters = ImmutableList.copyOf(matchInBloomFilter);
// Check if required keys are actually present
if (matchInBloomFilter.isEmpty() || !ReaderUtils.anyFilterKeyInIndex(ssTable, matchInBloomFilter))
{
if (matchInBloomFilter.isEmpty())
{
stats.missingInBloomFilter();
}
else
{
stats.missingInIndex();
}
LOGGER.info("Ignoring SSTable {}, no match found in index file for key filters",
this.ssTable.getDataFileName());
statsMetadata = null;
header = null;
helper = null;
this.metadata = null;
return;
}
}
else
{
this.partitionKeyFilters = ImmutableList.copyOf(partitionKeyFilters);
}
Map<MetadataType, MetadataComponent> componentMap = SSTableCache.INSTANCE.componentMapFromStats(ssTable, descriptor);
ValidationMetadata validation = (ValidationMetadata) componentMap.get(MetadataType.VALIDATION);
if (validation != null && !validation.partitioner.equals(metadata.partitioner.getClass().getName()))
{
throw new IllegalStateException("Partitioner in ValidationMetadata does not match TableMetaData: "
+ validation.partitioner + " vs. " + metadata.partitioner.getClass().getName());
}
this.statsMetadata = (StatsMetadata) componentMap.get(MetadataType.STATS);
SerializationHeader.Component headerComp = (SerializationHeader.Component) componentMap.get(MetadataType.HEADER);
if (headerComp == null)
{
throw new IOException("Cannot read SSTable if cannot deserialize stats header info");
}
if (useIncrementalRepair && !isRepairPrimary && isRepaired())
{
stats.skippedRepairedSSTable(ssTable, statsMetadata.repairedAt);
LOGGER.info("Ignoring repaired SSTable on non-primary repair replica ssTable='{}' repairedAt={}",
ssTable, statsMetadata.repairedAt);
header = null;
helper = null;
this.metadata = null;
return;
}
Set<String> columnNames = Streams.concat(metadata.columns().stream(),
metadata.staticColumns().stream())
.map(column -> column.name.toString())
.collect(Collectors.toSet());
Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
metadata.name,
ssTable,
headerComp.getRegularColumns(),
columnNames,
ColumnMetadata.Kind.REGULAR));
droppedColumns.putAll(buildDroppedColumns(metadata.keyspace,
metadata.name,
ssTable,
headerComp.getStaticColumns(),
columnNames,
ColumnMetadata.Kind.STATIC));
if (!droppedColumns.isEmpty())
{
LOGGER.info("Rebuilding table metadata with dropped columns numDroppedColumns={} ssTable='{}'",
droppedColumns.size(), ssTable);
metadata = metadata.unbuild().droppedColumns(droppedColumns).build();
}
this.header = headerComp.toHeader(metadata);
this.helper = new DeserializationHelper(metadata,
MessagingService.VERSION_30,
DeserializationHelper.Flag.FROM_REMOTE,
buildColumnFilter(metadata, columnFilter));
this.metadata = metadata;
if (readIndexOffset && summary != null)
{
SummaryDbUtils.Summary finalSummary = summary;
extractRange(sparkRangeFilter, partitionKeyFilters)
.ifPresent(range -> readOffsets(finalSummary.summary(), range));
}
else
{
LOGGER.warn("Reading SSTable without looking up start/end offset, performance will potentially be degraded");
}
// Open SSTableStreamReader so opened in parallel inside thread pool
// and buffered + ready to go when CompactionIterator starts reading
reader.set(new SSTableStreamReader());
stats.openedSSTable(ssTable, System.nanoTime() - startTimeNanos);
this.openedNanos = System.nanoTime();
}
/**
* Constructs full file path for a given combination of keyspace, table, and data file name,
* while adjusting for data files with non-standard names prefixed with keyspace and table
*
* @param keyspace Name of the keyspace
* @param table Name of the table
* @param filename Name of the data file
* @return A full file path, adjusted for non-standard file names
*/
@VisibleForTesting
@NotNull
static File constructFilename(@NotNull String keyspace, @NotNull String table, @NotNull String filename)
{
String[] components = filename.split("-");
if (components.length == 6
&& components[0].equals(keyspace)
&& components[1].equals(table))
{
filename = filename.substring(keyspace.length() + table.length() + 2);
}
return new File(String.format("./%s/%s", keyspace, table), filename);
}
private static Map<ByteBuffer, DroppedColumn> buildDroppedColumns(String keyspace,
String table,
SSTable ssTable,
Map<ByteBuffer, AbstractType<?>> columns,
Set<String> columnNames,
ColumnMetadata.Kind kind)
{
Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>();
for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet())
{
String colName = UTF8Type.instance.getString((entry.getKey()));
if (!columnNames.contains(colName))
{
AbstractType<?> type = entry.getValue();
LOGGER.warn("Dropped column found colName={} sstable='{}'", colName, ssTable);
ColumnMetadata column = new ColumnMetadata(keyspace,
table,
ColumnIdentifier.getInterned(colName, true),
type,
ColumnMetadata.NO_POSITION,
kind);
long droppedTime = TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())
- TimeUnit.MINUTES.toMicros(60);
droppedColumns.put(entry.getKey(), new DroppedColumn(column, droppedTime));
}
}
return droppedColumns;
}
/**
* Merge all the partition key filters to give the token range we care about.
* If no partition key filters, then use the Spark worker token range.
*
* @param sparkRangeFilter optional spark range filter
* @param partitionKeyFilters list of partition key filters
* @return the token range we care about for this Spark worker
*/
public static Optional<TokenRange> extractRange(@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull List<PartitionKeyFilter> partitionKeyFilters)
{
Optional<TokenRange> partitionKeyRange = partitionKeyFilters.stream()
.map(PartitionKeyFilter::tokenRange)
.reduce(TokenRange::merge);
return partitionKeyRange.isPresent()
? partitionKeyRange
: Optional.ofNullable(sparkRangeFilter != null ? sparkRangeFilter.tokenRange() : null);
}
/**
* Read Data.db offsets by binary searching Summary.db into Index.db, then reading offsets in Index.db
*
* @param indexSummary Summary.db index summary
* @param range token range we care about for this Spark worker
*/
private void readOffsets(IndexSummary indexSummary, TokenRange range)
{
try
{
// If start is null we failed to find an overlapping token in the Index.db file,
// this is unlikely as we already pre-filter the SSTable based on the start-end token range.
// But in this situation we read the entire Data.db file to be safe, even if it hits performance.
startOffset = IndexDbUtils.findDataDbOffset(indexSummary, range, metadata.partitioner, ssTable, stats);
if (startOffset == null)
{
LOGGER.error("Failed to find Data.db start offset, performance will be degraded sstable='{}'", ssTable);
}
}
catch (IOException exception)
{
LOGGER.warn("IOException finding SSTable offsets, cannot skip directly to start offset in Data.db. "
+ "Performance will be degraded.", exception);
}
}
/**
* Build a ColumnFilter if we need to prune any columns for more efficient deserialization of the SSTable
*
* @param metadata TableMetadata object
* @param columnFilter prune column filter
* @return ColumnFilter if and only if we can prune any columns when deserializing the SSTable,
* otherwise return null
*/
@Nullable
private static ColumnFilter buildColumnFilter(TableMetadata metadata, @Nullable PruneColumnFilter columnFilter)
{
if (columnFilter == null)
{
return null;
}
List<ColumnMetadata> include = metadata.columns().stream()
.filter(column -> columnFilter.includeColumn(column.name.toString()))
.collect(Collectors.toList());
if (include.size() == metadata.columns().size())
{
return null; // No columns pruned
}
return ColumnFilter.allRegularColumnsBuilder(metadata, false)
.addAll(include)
.build();
}
public SSTable sstable()
{
return ssTable;
}
public boolean ignore()
{
return reader.get() == null;
}
@Override
public int hashCode()
{
return Objects.hash(metadata.keyspace, metadata.name, ssTable);
}
@Override
public boolean equals(Object other)
{
return other instanceof SSTableReader
&& this.metadata.keyspace.equals(((SSTableReader) other).metadata.keyspace)
&& this.metadata.name.equals(((SSTableReader) other).metadata.name)
&& this.ssTable.equals(((SSTableReader) other).ssTable);
}
public boolean isRepaired()
{
return isRepaired.apply(statsMetadata);
}
public DecoratedKey first()
{
return first;
}
public DecoratedKey last()
{
return last;
}
public long getMinTimestamp()
{
return statsMetadata.minTimestamp;
}
public long getMaxTimestamp()
{
return statsMetadata.maxTimestamp;
}
public StatsMetadata getSSTableMetadata()
{
return statsMetadata;
}
@Override
public ISSTableScanner scanner()
{
ISSTableScanner result = reader.getAndSet(null);
if (result == null)
{
throw new IllegalStateException("SSTableStreamReader cannot be re-used");
}
return result;
}
@Override
@NotNull
public BigInteger firstToken()
{
return firstToken;
}
@Override
@NotNull
public BigInteger lastToken()
{
return lastToken;
}
public class SSTableStreamReader implements ISSTableScanner
{
private final DataInputStream dis;
private final DataInputPlus in;
final RawInputStream dataStream;
private DecoratedKey key;
private DeletionTime partitionLevelDeletion;
private SSTableSimpleIterator iterator;
private Row staticRow;
@Nullable
private final BigInteger lastToken;
private long lastTimeNanos = System.nanoTime();
SSTableStreamReader() throws IOException
{
lastToken = sparkRangeFilter != null ? sparkRangeFilter.tokenRange().upperEndpoint() : null;
try (@Nullable InputStream compressionInfoInputStream = ssTable.openCompressionStream())
{
DataInputStream dataInputStream = new DataInputStream(ssTable.openDataStream());
if (compressionInfoInputStream != null)
{
dataStream = CompressedRawInputStream.fromInputStream(ssTable,
dataInputStream,
compressionInfoInputStream,
version.hasMaxCompressedLength(),
stats);
}
else
{
dataStream = new RawInputStream(dataInputStream, new byte[64 * 1024], stats);
}
}
dis = new DataInputStream(dataStream);
if (startOffset != null)
{
// Skip to start offset, if known, of first in-range partition
ByteBufferUtils.skipFully(dis, startOffset);
assert dataStream.position() == startOffset;
LOGGER.info("Using Data.db start offset to skip ahead startOffset={} sstable='{}'",
startOffset, ssTable);
stats.skippedDataDbStartOffset(startOffset);
}
in = new DataInputPlus.DataInputStreamPlus(dis);
}
@Override
public TableMetadata metadata()
{
return metadata;
}
public boolean overlapsSparkTokenRange(BigInteger token)
{
return sparkRangeFilter == null || sparkRangeFilter.overlaps(token);
}
public boolean overlapsPartitionFilters(DecoratedKey key)
{
return partitionKeyFilters.isEmpty()
|| partitionKeyFilters.stream().anyMatch(filter -> filter.matches(key.getKey()));
}
public boolean overlaps(DecoratedKey key, BigInteger token)
{
return overlapsSparkTokenRange(token) && overlapsPartitionFilters(key);
}
@Override
public boolean hasNext()
{
try
{
while (true)
{
key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
iterator = SSTableSimpleIterator.create(metadata, in, header, helper, partitionLevelDeletion);
staticRow = iterator.readStaticRow();
BigInteger token = ReaderUtils.tokenToBigInteger(key.getToken());
if (overlaps(key, token))
{
// Partition overlaps with filters
long now = System.nanoTime();
stats.nextPartition(now - lastTimeNanos);
lastTimeNanos = now;
return true;
}
if (lastToken != null && startOffset != null && lastToken.compareTo(token) < 0)
{
// Partition no longer overlaps SparkTokenRange so we've finished reading this SSTable
stats.skippedDataDbEndOffset(dataStream.position() - startOffset);
return false;
}
stats.skippedPartition(key.getKey(), ReaderUtils.tokenToBigInteger(key.getToken()));
// Skip partition efficiently without deserializing
UnfilteredDeserializer deserializer = UnfilteredDeserializer.create(metadata, in, header, helper);
while (deserializer.hasNext())
{
deserializer.skipNext();
}
}
}
catch (EOFException exception)
{
return false;
}
catch (IOException exception)
{
stats.corruptSSTable(exception, metadata.keyspace, metadata.name, ssTable);
LOGGER.warn("IOException reading sstable keyspace={} table={} dataFileName={} ssTable='{}'",
metadata.keyspace, metadata.name, ssTable.getDataFileName(), ssTable, exception);
throw new SSTableStreamException(exception);
}
catch (Throwable throwable)
{
stats.corruptSSTable(throwable, metadata.keyspace, metadata.name, ssTable);
LOGGER.error("Error reading sstable keyspace={} table={} dataFileName={} ssTable='{}'",
metadata.keyspace, metadata.name, ssTable.getDataFileName(), ssTable, throwable);
throw new RuntimeException(ThrowableUtils.rootCause(throwable));
}
}
@Override
public UnfilteredRowIterator next()
{
return new UnfilteredIterator();
}
@Override
public void close()
{
LOGGER.debug("Closing SparkSSTableReader {}", ssTable);
try
{
dis.close();
if (openedNanos != null)
{
stats.closedSSTable(System.nanoTime() - openedNanos);
}
}
catch (IOException exception)
{
LOGGER.warn("IOException closing SSTable DataInputStream", exception);
}
}
@Override
public long getLengthInBytes()
{
// This is mostly used to return Compaction info for Metrics or via JMX so we can ignore here
return 0;
}
@Override
public long getCompressedLengthInBytes()
{
return 0;
}
@Override
public long getCurrentPosition()
{
// This is mostly used to return Compaction info for Metrics or via JMX so we can ignore here
return 0;
}
@Override
public long getBytesScanned()
{
return 0;
}
@Override
public Set<org.apache.cassandra.io.sstable.format.SSTableReader> getBackingSSTables()
{
return Collections.emptySet();
}
private class UnfilteredIterator implements UnfilteredRowIterator
{
@Override
public RegularAndStaticColumns columns()
{
return metadata.regularAndStaticColumns();
}
@Override
public TableMetadata metadata()
{
return metadata;
}
@Override
public boolean isReverseOrder()
{
return false;
}
@Override
public DecoratedKey partitionKey()
{
return key;
}
@Override
public DeletionTime partitionLevelDeletion()
{
return partitionLevelDeletion;
}
@Override
public Row staticRow()
{
return staticRow;
}
@Override
public EncodingStats stats()
{
return header.stats();
}
@Override
public boolean hasNext()
{
try
{
return iterator.hasNext();
}
catch (IOError error)
{
// SSTableSimpleIterator::computeNext wraps IOException in IOError, so we catch those,
// try to extract the IOException and re-wrap it in an SSTableStreamException,
// which we can then process in TableStreamScanner
if (error.getCause() instanceof IOException)
{
throw new SSTableStreamException((IOException) error.getCause());
}
// Otherwise, just throw the IOError and deal with it further up the stack
throw error;
}
}
@Override
public Unfiltered next()
{
// NOTE: In practice we know that IOException will be thrown by hasNext(),
// because that's where the actual reading happens, so we don't bother
// catching IOError here (contrarily to what we do in hasNext)
return iterator.next();
}
@Override
public void close()
{
}
}
}
}