blob: 2f887c6e642e31e4df8def364c521c90f79b2540 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.reader;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.BloomFilterSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.vint.VIntCoding;
import org.jetbrains.annotations.NotNull;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
@SuppressWarnings("WeakerAccess")
public final class ReaderUtils
{
private static final int CHECKSUM_LENGTH = 4; // CRC32
private static final Constructor<?> SERIALIZATION_HEADER =
Arrays.stream(SerializationHeader.Component.class.getDeclaredConstructors())
.filter(constructor -> constructor.getParameterCount() == 5)
.findFirst()
.orElseThrow(() -> new RuntimeException("Could not find SerializationHeader.Component constructor"));
public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = ByteBufferUtil.EMPTY_BYTE_BUFFER;
static
{
SERIALIZATION_HEADER.setAccessible(true);
}
private ReaderUtils()
{
throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated");
}
public static long tokenToLong(Token token)
{
if (token instanceof Murmur3Partitioner.LongToken)
{
return (long) token.getTokenValue();
}
if (token instanceof RandomPartitioner.BigIntegerToken)
{
return ((RandomPartitioner.BigIntegerToken) token).getTokenValue().longValue();
}
throw new UnsupportedOperationException("Unexpected token type: " + token.getClass().getName());
}
public static BigInteger tokenToBigInteger(Token token)
{
if (token instanceof Murmur3Partitioner.LongToken)
{
return BigInteger.valueOf((long) token.getTokenValue());
}
if (token instanceof RandomPartitioner.BigIntegerToken)
{
return ((RandomPartitioner.BigIntegerToken) token).getTokenValue();
}
throw new UnsupportedOperationException("Unexpected token type: " + token.getClass().getName());
}
static ByteBuffer encodeCellName(TableMetadata metadata,
ClusteringPrefix clustering,
ByteBuffer columnName,
ByteBuffer collectionElement)
{
boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
if (!TableMetadata.Flag.isCompound(metadata.flags))
{
if (isStatic)
{
return columnName;
}
assert clustering.size() == 1 : "Expected clustering size to be 1, but was " + clustering.size();
return clustering.bufferAt(0);
}
// We use comparator.size() rather than clustering.size() because of static clusterings
int clusteringSize = metadata.comparator.size();
int size = clusteringSize + (TableMetadata.Flag.isDense(metadata.flags) ? 0 : 1)
+ (collectionElement == null ? 0 : 1);
if (TableMetadata.Flag.isSuper(metadata.flags))
{
size = clusteringSize + 1;
}
ByteBuffer[] values = new ByteBuffer[size];
for (int index = 0; index < clusteringSize; index++)
{
if (isStatic)
{
values[index] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
continue;
}
ByteBuffer value = clustering.bufferAt(index);
// We can have null (only for dense compound tables for backward compatibility reasons),
// but that means we're done and should stop there as far as building the composite is concerned
if (value == null)
{
return CompositeType.build(ByteBufferAccessor.instance, Arrays.copyOfRange(values, 0, index));
}
values[index] = value;
}
if (TableMetadata.Flag.isSuper(metadata.flags))
{
// We need to set the "column" (in thrift terms) name, i.e. the value corresponding to the subcomparator.
// What it is depends on whether this is a cell for a declared "static" column
// or a "dynamic" column part of the super-column internal map.
assert columnName != null; // This should never be null for supercolumns, see decodeForSuperColumn() above
values[clusteringSize] = columnName.equals(SUPER_COLUMN_MAP_COLUMN)
? collectionElement
: columnName;
}
else
{
if (!TableMetadata.Flag.isDense(metadata.flags))
{
values[clusteringSize] = columnName;
}
if (collectionElement != null)
{
values[clusteringSize + 1] = collectionElement;
}
}
return CompositeType.build(ByteBufferAccessor.instance, isStatic, values);
}
public static Pair<DecoratedKey, DecoratedKey> keysFromIndex(@NotNull TableMetadata metadata,
@NotNull SSTable ssTable) throws IOException
{
try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
{
if (primaryIndex != null)
{
IPartitioner partitioner = metadata.partitioner;
Pair<ByteBuffer, ByteBuffer> keys = readPrimaryIndex(primaryIndex, true, Collections.emptyList());
return Pair.create(partitioner.decorateKey(keys.left), partitioner.decorateKey(keys.right));
}
}
return Pair.create(null, null);
}
static boolean anyFilterKeyInIndex(@NotNull SSTable ssTable,
@NotNull List<PartitionKeyFilter> filters) throws IOException
{
if (filters.isEmpty())
{
return false;
}
try (InputStream primaryIndex = ssTable.openPrimaryIndexStream())
{
if (primaryIndex != null)
{
Pair<ByteBuffer, ByteBuffer> keys = readPrimaryIndex(primaryIndex, false, filters);
if (keys.left != null || keys.right != null)
{
return false;
}
}
}
return true;
}
static Map<MetadataType, MetadataComponent> deserializeStatsMetadata(SSTable ssTable,
Descriptor descriptor) throws IOException
{
try (InputStream statsStream = ssTable.openStatsStream())
{
return deserializeStatsMetadata(statsStream,
EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS, MetadataType.HEADER),
descriptor);
}
}
/**
* Deserialize Statistics.db file to pull out metadata components needed for SSTable deserialization
*
* @param is input stream for Statistics.db file
* @param selectedTypes enum of MetadataType to deserialize
* @param descriptor SSTable file descriptor
* @return map of MetadataComponent for each requested MetadataType
* @throws IOException
*/
static Map<MetadataType, MetadataComponent> deserializeStatsMetadata(InputStream is,
EnumSet<MetadataType> selectedTypes,
Descriptor descriptor) throws IOException
{
DataInputStream in = new DataInputPlus.DataInputStreamPlus(is);
boolean isChecksummed = descriptor.version.hasMetadataChecksum();
CRC32 crc = new CRC32();
int count = in.readInt();
updateChecksumInt(crc, count);
maybeValidateChecksum(crc, in, descriptor);
int[] ordinals = new int[count];
int[] offsets = new int[count];
int[] lengths = new int[count];
for (int index = 0; index < count; index++)
{
ordinals[index] = in.readInt();
updateChecksumInt(crc, ordinals[index]);
offsets[index] = in.readInt();
updateChecksumInt(crc, offsets[index]);
}
maybeValidateChecksum(crc, in, descriptor);
for (int index = 0; index < count - 1; index++)
{
lengths[index] = offsets[index + 1] - offsets[index];
}
MetadataType[] allMetadataTypes = MetadataType.values();
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
for (int index = 0; index < count - 1; index++)
{
MetadataType type = allMetadataTypes[ordinals[index]];
if (!selectedTypes.contains(type))
{
in.skipBytes(lengths[index]);
continue;
}
byte[] bytes = new byte[isChecksummed ? lengths[index] - CHECKSUM_LENGTH : lengths[index]];
in.readFully(bytes);
crc.reset();
crc.update(bytes);
maybeValidateChecksum(crc, in, descriptor);
components.put(type, deserializeMetadataComponent(descriptor.version, bytes, type));
}
MetadataType type = allMetadataTypes[ordinals[count - 1]];
if (!selectedTypes.contains(type))
{
return components;
}
// We do not have in.bytesRemaining() (as in FileDataInput),
// so need to read remaining bytes to get final component
byte[] remainingBytes = ByteBufferUtils.readRemainingBytes(in, 256);
byte[] bytes;
if (descriptor.version.hasMetadataChecksum())
{
ByteBuffer buffer = ByteBuffer.wrap(remainingBytes);
int length = buffer.remaining() - 4;
bytes = new byte[length];
buffer.get(bytes, 0, length);
crc.reset();
crc.update(bytes);
validateChecksum(crc, buffer.getInt(), descriptor);
}
else
{
bytes = remainingBytes;
}
components.put(type, deserializeMetadataComponent(descriptor.version, bytes, type));
return components;
}
private static void maybeValidateChecksum(CRC32 crc, DataInputStream in, Descriptor descriptor) throws IOException
{
if (descriptor.version.hasMetadataChecksum())
{
validateChecksum(crc, in.readInt(), descriptor);
}
}
private static void validateChecksum(CRC32 crc, int expectedChecksum, Descriptor descriptor)
{
int actualChecksum = (int) crc.getValue();
if (actualChecksum != expectedChecksum)
{
String filename = descriptor.filenameFor(Component.STATS);
throw new CorruptSSTableException(new IOException("Checksums do not match for " + filename), filename);
}
}
private static MetadataComponent deserializeValidationMetaData(@NotNull DataInputBuffer in) throws IOException
{
return new ValidationMetadata(in.readUTF(), in.readDouble());
}
private static MetadataComponent deserializeMetadataComponent(@NotNull Version version,
@NotNull byte[] buffer,
@NotNull MetadataType type) throws IOException
{
DataInputBuffer in = new DataInputBuffer(buffer);
if (type == MetadataType.HEADER)
{
return deserializeSerializationHeader(in);
}
else if (type == MetadataType.VALIDATION)
{
return deserializeValidationMetaData(in);
}
return type.serializer.deserialize(version, in);
}
private static MetadataComponent deserializeSerializationHeader(@NotNull DataInputBuffer in) throws IOException
{
// We need to deserialize data type class names using shaded package names
EncodingStats stats = EncodingStats.serializer.deserialize(in);
AbstractType<?> keyType = readType(in);
int size = (int) in.readUnsignedVInt();
List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
for (int index = 0; index < size; ++index)
{
clusteringTypes.add(readType(in));
}
Map<ByteBuffer, AbstractType<?>> staticColumns = new LinkedHashMap<>();
Map<ByteBuffer, AbstractType<?>> regularColumns = new LinkedHashMap<>();
readColumnsWithType(in, staticColumns);
readColumnsWithType(in, regularColumns);
try
{
// TODO: We should expose this code in Cassandra to make it easier to do this with unit tests in Cassandra
return (SerializationHeader.Component) SERIALIZATION_HEADER.newInstance(keyType,
clusteringTypes,
staticColumns,
regularColumns,
stats);
}
catch (InstantiationException | IllegalAccessException | InvocationTargetException exception)
{
throw new RuntimeException(exception);
}
}
private static void readColumnsWithType(@NotNull DataInputPlus in,
@NotNull Map<ByteBuffer, AbstractType<?>> typeMap) throws IOException
{
int length = (int) in.readUnsignedVInt();
for (int index = 0; index < length; index++)
{
ByteBuffer name = ByteBufferUtil.readWithVIntLength(in);
typeMap.put(name, readType(in));
}
}
private static AbstractType<?> readType(@NotNull DataInputPlus in) throws IOException
{
return TypeParser.parse(UTF8Type.instance.compose(ByteBufferUtil.readWithVIntLength(in)));
}
/**
* Read primary Index.db file, read through all partitions to get first and last partition key
*
* @param primaryIndex input stream for Index.db file
* @return pair of first and last decorated keys
* @throws IOException
*/
@SuppressWarnings("InfiniteLoopStatement")
static Pair<ByteBuffer, ByteBuffer> readPrimaryIndex(@NotNull InputStream primaryIndex,
boolean readFirstLastKey,
@NotNull List<PartitionKeyFilter> filters) throws IOException
{
ByteBuffer firstKey = null;
ByteBuffer lastKey = null;
try (DataInputStream dis = new DataInputStream(primaryIndex))
{
byte[] last = null;
try
{
while (true)
{
int length = dis.readUnsignedShort();
byte[] buffer = new byte[length];
dis.readFully(buffer);
if (firstKey == null)
{
firstKey = ByteBuffer.wrap(buffer);
}
last = buffer;
ByteBuffer key = ByteBuffer.wrap(last);
if (!readFirstLastKey && filters.stream().anyMatch(filter -> filter.filter(key)))
{
return Pair.create(null, null);
}
// Read position and skip promoted index
skipRowIndexEntry(dis);
}
}
catch (EOFException ignored)
{
}
if (last != null)
{
lastKey = ByteBuffer.wrap(last);
}
}
return Pair.create(firstKey, lastKey);
}
static void skipRowIndexEntry(DataInputStream dis) throws IOException
{
readPosition(dis);
skipPromotedIndex(dis);
}
static int vIntSize(long value)
{
return VIntCoding.computeUnsignedVIntSize(value);
}
static void writePosition(long value, ByteBuffer buffer)
{
VIntCoding.writeUnsignedVInt(value, buffer);
}
static long readPosition(DataInputStream dis) throws IOException
{
return VIntCoding.readUnsignedVInt(dis);
}
/**
* @return the total bytes skipped
*/
public static int skipPromotedIndex(DataInputStream dis) throws IOException
{
final long val = VIntCoding.readUnsignedVInt(dis);
final int size = (int) val;
if (size > 0)
{
ByteBufferUtils.skipBytesFully(dis, size);
}
return Math.max(size, 0) + VIntCoding.computeUnsignedVIntSize(val);
}
static List<PartitionKeyFilter> filterKeyInBloomFilter(
@NotNull SSTable ssTable,
@NotNull IPartitioner partitioner,
Descriptor descriptor,
@NotNull List<PartitionKeyFilter> partitionKeyFilters) throws IOException
{
try
{
BloomFilter bloomFilter = SSTableCache.INSTANCE.bloomFilter(ssTable, descriptor);
return partitionKeyFilters.stream()
.filter(filter -> bloomFilter.isPresent(partitioner.decorateKey(filter.key())))
.collect(Collectors.toList());
}
catch (Exception exception)
{
if (exception instanceof FileNotFoundException)
{
return partitionKeyFilters;
}
throw exception;
}
}
static BloomFilter readFilter(@NotNull SSTable ssTable, boolean hasOldBfFormat) throws IOException
{
try (InputStream filterStream = ssTable.openFilterStream())
{
if (filterStream != null)
{
try (DataInputStream dis = new DataInputStream(filterStream))
{
return BloomFilterSerializer.deserialize(dis, hasOldBfFormat);
}
}
}
throw new FileNotFoundException();
}
}