| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.bridge; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.math.BigInteger; |
| import java.nio.ByteBuffer; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.AbstractMap; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.EnumSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutorService; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.stream.Stream; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.esotericsoftware.kryo.Kryo; |
| import com.esotericsoftware.kryo.Serializer; |
| import org.apache.cassandra.analytics.reader.common.IndexIterator; |
| import org.apache.cassandra.analytics.stats.Stats; |
| import org.apache.cassandra.db.DecoratedKey; |
| import org.apache.cassandra.db.marshal.AbstractType; |
| import org.apache.cassandra.db.marshal.ByteBufferAccessor; |
| import org.apache.cassandra.db.marshal.CompositeType; |
| import org.apache.cassandra.db.rows.UnfilteredRowIterator; |
| import org.apache.cassandra.dht.IPartitioner; |
| import org.apache.cassandra.dht.Murmur3Partitioner; |
| import org.apache.cassandra.dht.RandomPartitioner; |
| import org.apache.cassandra.io.compress.ICompressor; |
| import org.apache.cassandra.io.compress.LZ4Compressor; |
| import org.apache.cassandra.io.sstable.CQLSSTableWriter; |
| import org.apache.cassandra.io.sstable.Descriptor; |
| import org.apache.cassandra.io.sstable.ISSTableScanner; |
| import org.apache.cassandra.io.sstable.SSTableTombstoneWriter; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.sstable.format.bti.BtiReaderUtils; |
| import org.apache.cassandra.io.sstable.metadata.MetadataComponent; |
| import org.apache.cassandra.io.sstable.metadata.MetadataType; |
| import org.apache.cassandra.io.sstable.metadata.StatsMetadata; |
| import org.apache.cassandra.io.util.File; |
| import org.apache.cassandra.schema.Schema; |
| import org.apache.cassandra.schema.TableMetadata; |
| import org.apache.cassandra.schema.TableMetadataRef; |
| import org.apache.cassandra.spark.data.CassandraTypes; |
| import org.apache.cassandra.spark.data.CqlField; |
| import org.apache.cassandra.spark.data.CqlTable; |
| import org.apache.cassandra.spark.data.CqlType; |
| import org.apache.cassandra.spark.data.ReplicationFactor; |
| import org.apache.cassandra.spark.data.SSTable; |
| import org.apache.cassandra.spark.data.SSTablesSupplier; |
| import org.apache.cassandra.spark.data.TypeConverter; |
| import org.apache.cassandra.spark.data.complex.CqlTuple; |
| import org.apache.cassandra.spark.data.complex.CqlUdt; |
| import org.apache.cassandra.spark.data.partitioner.Partitioner; |
| import org.apache.cassandra.spark.reader.BtiIndexReader; |
| import org.apache.cassandra.spark.reader.CompactionStreamScanner; |
| import org.apache.cassandra.spark.reader.IndexEntry; |
| import org.apache.cassandra.spark.reader.BigIndexReader; |
| import org.apache.cassandra.spark.reader.ReaderUtils; |
| import org.apache.cassandra.spark.reader.RowData; |
| import org.apache.cassandra.spark.reader.SchemaBuilder; |
| import org.apache.cassandra.spark.reader.StreamScanner; |
| import org.apache.cassandra.spark.reader.SummaryDbUtils; |
| import org.apache.cassandra.spark.sparksql.CellIterator; |
| import org.apache.cassandra.spark.sparksql.RowIterator; |
| import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; |
| import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter; |
| import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter; |
| import org.apache.cassandra.spark.sparksql.filters.SSTableTimeRangeFilter; |
| import org.apache.cassandra.spark.utils.Pair; |
| import org.apache.cassandra.spark.utils.SparkClassLoaderOverride; |
| import org.apache.cassandra.spark.utils.TimeProvider; |
| import org.apache.cassandra.tools.JsonTransformer; |
| import org.apache.cassandra.tools.Util; |
| import org.apache.cassandra.util.CompressionUtil; |
| import org.apache.cassandra.util.IntWrapper; |
| import org.apache.cassandra.utils.BloomFilter; |
| import org.apache.cassandra.utils.CompressionUtilImplementation; |
| import org.apache.cassandra.utils.TimeUUID; |
| import org.apache.cassandra.utils.TokenUtils; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| @SuppressWarnings("unused") |
| public class CassandraBridgeImplementation extends CassandraBridge |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBridgeImplementation.class); |
| |
| private final Map<Class<?>, Serializer<?>> kryoSerializers; |
| |
| static |
| { |
| setup(); |
| } |
| |
| public static synchronized void setup() |
| { |
| CassandraTypesImplementation.setup(BridgeInitializationParameters.fromEnvironment()); |
| } |
| |
| public CassandraBridgeImplementation() |
| { |
| // Cassandra-version-specific Kryo serializers |
| kryoSerializers = new LinkedHashMap<>(); |
| kryoSerializers.put(CqlField.class, new CqlField.Serializer(cassandraTypes())); |
| kryoSerializers.put(CqlTable.class, new CqlTable.Serializer(cassandraTypes())); |
| kryoSerializers.put(CqlUdt.class, new CqlUdt.Serializer(cassandraTypes())); |
| } |
| |
| public CassandraTypes cassandraTypes() |
| { |
| return CassandraTypesImplementation.INSTANCE; |
| } |
| |
| @Override |
| public AbstractMap.SimpleEntry<ByteBuffer, BigInteger> getPartitionKey(@NotNull CqlTable table, |
| @NotNull Partitioner partitioner, |
| @NotNull List<String> keys) |
| { |
| Preconditions.checkArgument(table.partitionKeys().size() > 0); |
| ByteBuffer partitionKey = buildPartitionKey(table, keys); |
| BigInteger partitionKeyTokenValue = hash(partitioner, partitionKey); |
| return new AbstractMap.SimpleEntry<>(partitionKey, partitionKeyTokenValue); |
| } |
| |
| @VisibleForTesting |
| public static ByteBuffer buildPartitionKey(@NotNull CqlTable table, @NotNull List<String> keys) |
| { |
| List<AbstractType<?>> partitionKeyColumnTypes = partitionKeyColumnTypes(table); |
| if (table.partitionKeys().size() == 1) |
| { |
| // Single partition key |
| return partitionKeyColumnTypes.get(0).fromString(keys.get(0)); |
| } |
| else |
| { |
| // Composite partition key |
| ByteBuffer[] buffers = new ByteBuffer[keys.size()]; |
| for (int index = 0; index < buffers.length; index++) |
| { |
| buffers[index] = partitionKeyColumnTypes.get(index).fromString(keys.get(index)); |
| } |
| return CompositeType.build(ByteBufferAccessor.instance, buffers); |
| } |
| } |
| |
| @VisibleForTesting |
| public static List<AbstractType<?>> partitionKeyColumnTypes(@NotNull CqlTable table) |
| { |
| return table.partitionKeys().stream() |
| .map(CqlField::type) |
| .map(type -> (CqlType) type) |
| .map(type -> type.dataType(true)) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public StreamScanner<RowData> getCompactionScanner(@NotNull CqlTable table, |
| @NotNull Partitioner partitioner, |
| @NotNull SSTablesSupplier ssTables, |
| @Nullable SparkRangeFilter sparkRangeFilter, |
| @NotNull Collection<PartitionKeyFilter> partitionKeyFilters, |
| @NotNull SSTableTimeRangeFilter sstableTimeRangeFilter, |
| @Nullable PruneColumnFilter columnFilter, |
| @NotNull TimeProvider timeProvider, |
| boolean readIndexOffset, |
| boolean useIncrementalRepair, |
| @NotNull Stats stats) |
| { |
| // NOTE: Need to use SchemaBuilder to init keyspace if not already set in Cassandra Schema instance |
| SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner); |
| TableMetadata metadata = schemaBuilder.tableMetaData(); |
| return new CompactionStreamScanner(metadata, partitioner, timeProvider, ssTables.openAll((ssTable, isRepairPrimary) -> { |
| return org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable) |
| .withSparkRangeFilter(sparkRangeFilter) |
| .withPartitionKeyFilters(partitionKeyFilters) |
| .withTimeRangeFilter(sstableTimeRangeFilter) |
| .withColumnFilter(columnFilter) |
| .withReadIndexOffset(readIndexOffset) |
| .withStats(stats) |
| .useIncrementalRepair(useIncrementalRepair) |
| .isRepairPrimary(isRepairPrimary) |
| .build(); |
| })); |
| } |
| |
| @Override |
| public StreamScanner<IndexEntry> getPartitionSizeIterator(@NotNull CqlTable table, |
| @NotNull Partitioner partitioner, |
| @NotNull SSTablesSupplier ssTables, |
| @Nullable SparkRangeFilter rangeFilter, |
| @NotNull TimeProvider timeProvider, |
| @NotNull Stats stats, |
| @NotNull ExecutorService executor) |
| { |
| //NOTE: need to use SchemaBuilder to init keyspace if not already set in C* Schema instance |
| SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner); |
| TableMetadata metadata = schemaBuilder.tableMetaData(); |
| return new IndexIterator<>(ssTables, stats, ((ssTable, isRepairPrimary, consumer) -> { |
| if (ssTable.isBigFormat()) |
| { |
| return new BigIndexReader(ssTable, metadata, rangeFilter, stats, consumer); |
| } |
| return new BtiIndexReader(ssTable, metadata, rangeFilter, stats, consumer); |
| })); |
| } |
| |
| @Override |
| public CassandraVersion getVersion() |
| { |
| return CassandraVersion.FIVEZERO; |
| } |
| |
| @Override |
| public BigInteger hash(Partitioner partitioner, ByteBuffer key) |
| { |
| switch (partitioner) |
| { |
| case RandomPartitioner: |
| return RandomPartitioner.instance.getToken(key).getTokenValue(); |
| case Murmur3Partitioner: |
| return BigInteger.valueOf((long) Murmur3Partitioner.instance.getToken(key).getTokenValue()); |
| default: |
| throw new UnsupportedOperationException("Unexpected partitioner: " + partitioner); |
| } |
| } |
| |
| @Override |
| public UUID getTimeUUID() |
| { |
| return TimeUUID.Generator.nextTimeUUID().asUUID(); |
| } |
| |
| @Override |
| public CqlTable buildSchema(String createStatement, |
| String keyspace, |
| ReplicationFactor replicationFactor, |
| Partitioner partitioner, |
| Set<String> udts, |
| @Nullable UUID tableId, |
| int indexCount, |
| boolean enableCdc) |
| { |
| return new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, cassandraTypes -> udts, tableId, indexCount, enableCdc).build(); |
| } |
| |
| @Override |
| public CompressionUtil compressionUtil() |
| { |
| return CompressionUtilImplementation.INSTANCE; |
| } |
| |
| @Override |
| public long lastRepairTime(String keyspace, String table, SSTable ssTable) throws IOException |
| { |
| Map<MetadataType, MetadataComponent> componentMap = ReaderUtils.deserializeStatsMetadata(keyspace, table, ssTable, EnumSet.of(MetadataType.STATS)); |
| StatsMetadata statsMetadata = (StatsMetadata) componentMap.get(MetadataType.STATS); |
| if (statsMetadata == null) |
| { |
| throw new IllegalStateException("Could not read StatsMetadata"); |
| } |
| return statsMetadata.repairedAt; |
| } |
| |
| @Override |
| public List<Boolean> overlaps(SSTable ssTable, |
| Partitioner partitioner, |
| int minIndexInterval, |
| int maxIndexInterval, |
| List<TokenRange> ranges) |
| { |
| SSTableSummary summary = getSSTableSummary(partitioner, ssTable, minIndexInterval, maxIndexInterval); |
| TokenRange sstableRange = TokenRange.closed(summary.firstToken, summary.lastToken); |
| return ranges.stream() |
| .map(range -> range.isConnected(sstableRange)) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public Tokenizer tokenizer(Partitioner partitioner) |
| { |
| IPartitioner iPartitioner = getPartitioner(partitioner); |
| return partitionKey -> { |
| DecoratedKey decoratedKey = iPartitioner.decorateKey(partitionKey); |
| return TokenUtils.tokenToBigInteger(decoratedKey.getToken()); |
| }; |
| } |
| |
| @Override |
| public List<ByteBuffer> encodePartitionKeys(Partitioner partitioner, String keyspace, String createTableStmt, List<List<String>> keys) |
| { |
| CqlTable table = new SchemaBuilder(createTableStmt, keyspace, ReplicationFactor.simpleStrategy(1), partitioner).build(); |
| return keys.stream().map(key -> buildPartitionKey(table, key)).collect(Collectors.toList()); |
| } |
| |
| @Override |
| public org.apache.cassandra.bridge.BloomFilter openBloomFilter(Partitioner partitioner, |
| String keyspace, |
| String table, |
| SSTable ssTable) throws IOException |
| { |
| IPartitioner iPartitioner = getPartitioner(partitioner); |
| Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, table, ssTable); |
| // closing `SharedCloseableImpl` instances is known to cause SIGSEGV errors |
| BloomFilter filter = openBloomFilter(descriptor, ssTable); |
| return partitionKey -> { |
| DecoratedKey decoratedKey = iPartitioner.decorateKey(partitionKey); |
| return filter.isPresent(decoratedKey); |
| }; |
| } |
| |
| private BloomFilter openBloomFilter(Descriptor descriptor, SSTable ssTable) throws IOException |
| { |
| return ReaderUtils.readFilter(ssTable, descriptor); |
| } |
| |
| @Override |
| public List<Boolean> contains(Partitioner partitioner, String keyspace, String table, SSTable ssTable, List<ByteBuffer> partitionKeys) throws IOException |
| { |
| if (partitionKeys.isEmpty()) |
| { |
| return Collections.emptyList(); |
| } |
| |
| IPartitioner iPartitioner = getPartitioner(partitioner); |
| List<DecoratedKey> decoratedKeys = partitionKeys.stream().map(iPartitioner::decorateKey).collect(Collectors.toList()); |
| Descriptor descriptor = ReaderUtils.constructDescriptor(keyspace, table, ssTable); |
| BloomFilter filter = openBloomFilter(descriptor, ssTable); |
| List<Boolean> result = decoratedKeys.stream().map(filter::isPresent).collect(Collectors.toList()); |
| if (result.stream().noneMatch(found -> found)) |
| { |
| // no matches in the bloom filter, so we can exit early |
| return result; |
| } |
| |
| // sorted by token with index into original partitionKeys list |
| List<Pair<BigInteger, Integer>> sortedByTokens = IntStream.range(0, decoratedKeys.size()) |
| .mapToObj(idx -> { |
| DecoratedKey key = decoratedKeys.get(idx); |
| BigInteger token = TokenUtils.tokenToBigInteger(key.getToken()); |
| return Pair.of(token, idx); |
| }) |
| .sorted(Comparator.comparing(Pair::getLeft)) |
| .collect(Collectors.toList()); |
| |
| IntWrapper position = new IntWrapper(); |
| Function<ByteBuffer, Boolean> consumer = buffer -> { |
| DecoratedKey key = iPartitioner.decorateKey(buffer); |
| BigInteger token = TokenUtils.tokenToBigInteger(key.getToken()); |
| |
| Pair<BigInteger, Integer> current = sortedByTokens.get(position.value); |
| int compare = token.compareTo(current.getLeft()); |
| while (compare > 0) |
| { |
| // we passed without finding the key |
| result.set(current.getRight(), false); |
| position.value++; |
| if (position.value >= decoratedKeys.size()) |
| { |
| // if we've found all the keys we can exit early |
| return true; |
| } |
| current = sortedByTokens.get(position.value); |
| compare = token.compareTo(current.getLeft()); |
| } |
| |
| ByteBuffer currentKey = partitionKeys.get(current.getRight()); |
| if (compare == 0 && buffer.equals(currentKey)) // token and key matches |
| { |
| result.set(current.getRight(), true); |
| position.value++; |
| } |
| |
| // if we've found all the keys we can exit early |
| return position.value >= decoratedKeys.size(); |
| }; |
| |
| if (ssTable.isBtiFormat()) |
| { |
| BtiReaderUtils.readPrimaryIndex(ssTable, iPartitioner, descriptor, 1.0, consumer); |
| } |
| else |
| { |
| try (InputStream primaryIndex = ssTable.openPrimaryIndexStream()) |
| { |
| if (primaryIndex == null) |
| { |
| throw new IOException("Could not read Index.db file"); |
| } |
| |
| ReaderUtils.readPrimaryIndex(primaryIndex, consumer); |
| } |
| } |
| |
| // mark as false for the rest of the keys we didn't reach |
| IntStream.range(position.value, sortedByTokens.size()) |
| .forEach(i -> result.set(sortedByTokens.get(i).getRight(), false)); |
| |
| return result; |
| } |
| |
| @Override |
| public void readPartitionKeys(Partitioner partitioner, |
| String keyspace, |
| String createStmt, |
| SSTablesSupplier ssTables, |
| @Nullable TokenRange tokenRange, |
| @Nullable List<ByteBuffer> partitionKeys, |
| @Nullable String[] requiredColumns, |
| @NotNull SSTableTimeRangeFilter sstableTimeRangeFilter, |
| Consumer<Map<String, Object>> rowConsumer) throws IOException |
| { |
| IPartitioner iPartitioner = getPartitioner(partitioner); |
| SchemaBuilder schemaBuilder = new SchemaBuilder(createStmt, keyspace, ReplicationFactor.simpleStrategy(1), partitioner); |
| TableMetadata metadata = schemaBuilder.tableMetaData(); |
| CqlTable table = schemaBuilder.build(); |
| List<BigInteger> tokens = partitionKeys == null ? Collections.emptyList() : toTokens(partitioner, partitionKeys); |
| List<PartitionKeyFilter> partitionKeyFilters = partitionKeys == null ? Collections.emptyList() : |
| IntStream |
| .range(0, partitionKeys.size()) |
| .mapToObj(i -> PartitionKeyFilter.create(partitionKeys.get(i), tokens.get(i))) |
| .sorted() |
| .collect(Collectors.toList()); |
| |
| try (CellIterator it = new CellIterator(0, |
| table, |
| Stats.DoNothingStats.INSTANCE, |
| TypeConverter.IDENTITY, |
| partitionKeyFilters, |
| sstableTimeRangeFilter, |
| (t) -> PruneColumnFilter.of(requiredColumns), |
| (partitionId1, partitionKeyFilters1, timeRangeFilter1, columnFilter1) -> |
| new CompactionStreamScanner( |
| metadata, |
| partitioner, |
| TimeProvider.DEFAULT, |
| ssTables.openAll((ssTable, isRepairPrimary) -> |
| org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable) |
| .withPartitionKeyFilters(partitionKeyFilters1) |
| .withTimeRangeFilter(timeRangeFilter1) |
| .build()) |
| )) |
| { |
| @Override |
| public boolean isInPartition(int partitionId, BigInteger token, ByteBuffer partitionKey) |
| { |
| return true; |
| } |
| |
| @Override |
| public boolean equals(CqlField field, Object obj1, Object obj2) |
| { |
| return Objects.equals(obj1, obj2); |
| } |
| }) |
| { |
| RowIterator<Map<String, Object>> rowIterator = RowIterator.rowMapIterator(it, Stats.DoNothingStats.INSTANCE, requiredColumns); |
| |
| while (rowIterator.next()) |
| { |
| rowConsumer.accept(rowIterator.get()); |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void writeSSTable(Partitioner partitioner, |
| String keyspace, |
| String table, |
| Path directory, |
| String createStatement, |
| String insertStatement, |
| String updateStatement, |
| boolean upsert, |
| Set<CqlField.CqlUdt> udts, |
| Consumer<Writer> writer) |
| { |
| CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() |
| .inDirectory(directory.toFile().getAbsolutePath()) |
| .forTable(createStatement) |
| .withPartitioner(getPartitioner(partitioner)) |
| .using(upsert ? updateStatement : insertStatement) |
| .withBufferSizeInMB(128); |
| |
| for (CqlField.CqlUdt udt : udts) |
| { |
| // Add user-defined types to CQL writer |
| String statement = udt.createStatement(cassandraTypes(), keyspace); |
| builder.withType(statement); |
| } |
| |
| try (CQLSSTableWriter ssTable = builder.build()) |
| { |
| writer.accept(values -> { |
| try |
| { |
| ssTable.addRow(values); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| }); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| public static IPartitioner getPartitioner(Partitioner partitioner) |
| { |
| return CassandraTypesImplementation.getPartitioner(partitioner); |
| } |
| |
| @Override |
| public SSTableWriter getSSTableWriter(String inDirectory, |
| String partitioner, |
| String createStatement, |
| String insertStatement, |
| @NotNull Set<String> userDefinedTypeStatements, |
| int bufferSizeMB) |
| { |
| return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, |
| userDefinedTypeStatements, bufferSizeMB); |
| } |
| |
| @Override |
| public SSTableSummary getSSTableSummary(@NotNull String keyspace, |
| @NotNull String table, |
| @NotNull SSTable ssTable) |
| { |
| TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); |
| if (metadata == null) |
| { |
| throw new RuntimeException("Could not create table metadata needed for reading SSTable summaries for keyspace: " + keyspace); |
| } |
| return getSSTableSummary(metadata.partitioner, ssTable, metadata.params.minIndexInterval, metadata.params.maxIndexInterval); |
| } |
| |
| @Override |
| public SSTableSummary getSSTableSummary(@NotNull Partitioner partitioner, |
| @NotNull SSTable ssTable, |
| int minIndexInterval, |
| int maxIndexInterval) |
| { |
| return getSSTableSummary(getPartitioner(partitioner), ssTable, minIndexInterval, maxIndexInterval); |
| } |
| |
| protected SSTableSummary getSSTableSummary(@NotNull IPartitioner partitioner, |
| @NotNull SSTable ssTable, |
| int minIndexInterval, |
| int maxIndexInterval) |
| { |
| try |
| { |
| SummaryDbUtils.Summary summary = SummaryDbUtils.readSummary(ssTable, partitioner, minIndexInterval, maxIndexInterval); |
| Pair<DecoratedKey, DecoratedKey> keys = summary == null ? null : Pair.of(summary.first(), summary.last()); |
| if (summary == null) |
| { |
| keys = ReaderUtils.keysFromIndex(partitioner, ssTable); |
| } |
| if (keys == null) |
| { |
| throw new RuntimeException("Could not load SSTable first or last tokens for SSTable: " + ssTable.getDataFileName()); |
| } |
| DecoratedKey first = keys.left; |
| DecoratedKey last = keys.right; |
| BigInteger firstToken = ReaderUtils.tokenToBigInteger(first.getToken()); |
| BigInteger lastToken = ReaderUtils.tokenToBigInteger(last.getToken()); |
| return new SSTableSummary(firstToken, lastToken, getSSTablePrefix(ssTable.getDataFileName())); |
| } |
| catch (final IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| private String getSSTablePrefix(String dataFileName) |
| { |
| return dataFileName.substring(0, dataFileName.lastIndexOf('-') + 1); |
| } |
| |
| // Version-Specific Test Utility Methods |
| |
| @Override |
| @VisibleForTesting |
| public void writeTombstoneSSTable(Partitioner partitioner, |
| Path directory, |
| String createStatement, |
| String deleteStatement, |
| Consumer<Writer> consumer) |
| { |
| File cassFile = new File(directory.toFile()); |
| try (SSTableTombstoneWriter writer = SSTableTombstoneWriter.builder() |
| .inDirectory(cassFile) |
| .forTable(createStatement) |
| .withPartitioner(getPartitioner(partitioner)) |
| .using(deleteStatement) |
| .withBufferSizeInMB(128) |
| .build()) |
| { |
| consumer.accept(values -> { |
| try |
| { |
| writer.addRow(values); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| }); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| @Override |
| @VisibleForTesting |
| public void sstableToJson(Path dataDbFile, OutputStream output) throws FileNotFoundException |
| { |
| if (!Files.exists(dataDbFile)) |
| { |
| throw new FileNotFoundException("Cannot find file " + dataDbFile.toAbsolutePath()); |
| } |
| File file = new File(dataDbFile.toFile()); |
| if (!Descriptor.isValidFile(file)) |
| { |
| throw new RuntimeException("Invalid sstable file"); |
| } |
| |
| Descriptor desc = Descriptor.fromFileWithComponent(file, false).left; |
| try |
| { |
| TableMetadataRef metadata = TableMetadataRef.forOfflineTools(Util.metadataFromSSTable(desc)); |
| SSTableReader ssTable = SSTableReader.openNoValidation(null, desc, metadata); |
| ISSTableScanner currentScanner = ssTable.getScanner(); |
| Stream<UnfilteredRowIterator> partitions = Util.iterToStream(currentScanner); |
| JsonTransformer.toJson(currentScanner, partitions, false, metadata.get(), output); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| @Override |
| @VisibleForTesting |
| public Object toTupleValue(CqlField.CqlTuple type, Object[] values) |
| { |
| return CqlTuple.toTupleValue(getVersion(), (CqlTuple) type, values); |
| } |
| |
| @Override |
| @VisibleForTesting |
| public Object toUserTypeValue(CqlField.CqlUdt type, Map<String, Object> values) |
| { |
| return CqlUdt.toUserTypeValue(getVersion(), (CqlUdt) type, values); |
| } |
| |
| // Compression Utils |
| |
| private static final ICompressor COMPRESSOR = LZ4Compressor.create(Collections.emptyMap()); |
| |
| @Override |
| public ByteBuffer compress(byte[] bytes) throws IOException |
| { |
| ByteBuffer input = COMPRESSOR.preferredBufferType().allocate(bytes.length); |
| input.put(bytes); |
| input.flip(); |
| return compress(input); |
| } |
| |
| @Override |
| public ByteBuffer compress(ByteBuffer input) throws IOException |
| { |
| int length = input.remaining(); // Store uncompressed length as 4 byte int |
| // 4 extra bytes to store uncompressed length |
| ByteBuffer output = COMPRESSOR.preferredBufferType().allocate(4 + COMPRESSOR.initialCompressedBufferLength(length)); |
| output.putInt(length); |
| COMPRESSOR.compress(input, output); |
| output.flip(); |
| return output; |
| } |
| |
| @Override |
| public ByteBuffer uncompress(byte[] bytes) throws IOException |
| { |
| ByteBuffer input = COMPRESSOR.preferredBufferType().allocate(bytes.length); |
| input.put(bytes); |
| input.flip(); |
| return uncompress(input); |
| } |
| |
| @Override |
| public ByteBuffer uncompress(ByteBuffer input) throws IOException |
| { |
| ByteBuffer output = COMPRESSOR.preferredBufferType().allocate(input.getInt()); |
| COMPRESSOR.uncompress(input, output); |
| output.flip(); |
| return output; |
| } |
| |
| // Kryo/Java (De-)Serialization |
| |
| @Override |
| public void kryoRegister(Kryo kryo) |
| { |
| kryoSerializers.forEach(kryo::register); |
| } |
| |
| @Override |
| public void javaSerialize(ObjectOutputStream out, Serializable object) |
| { |
| try |
| { |
| out.writeObject(object); |
| } |
| catch (IOException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| @Override |
| public <T> T javaDeserialize(ObjectInputStream in, Class<T> type) |
| { |
| try (SparkClassLoaderOverride override = new SparkClassLoaderOverride(in, getClass().getClassLoader())) |
| { |
| return type.cast(in.readObject()); |
| } |
| catch (IOException | ClassNotFoundException exception) |
| { |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| public static String baseFilename(Descriptor descriptor) |
| { |
| // note that descriptor.baseFilename() contains the directory portion in the string. We do not include the directory portion |
| return descriptor.baseFile().name(); |
| } |
| } |