blob: 68a896cd787f060e7ac514d1c4d6ff0e0ce563f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.bridge;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableTombstoneWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.SimpleSnitch;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.CqlType;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.SSTablesSupplier;
import org.apache.cassandra.spark.data.complex.CqlCollection;
import org.apache.cassandra.spark.data.complex.CqlFrozen;
import org.apache.cassandra.spark.data.complex.CqlList;
import org.apache.cassandra.spark.data.complex.CqlMap;
import org.apache.cassandra.spark.data.complex.CqlSet;
import org.apache.cassandra.spark.data.complex.CqlTuple;
import org.apache.cassandra.spark.data.complex.CqlUdt;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.data.types.Ascii;
import org.apache.cassandra.spark.data.types.BigInt;
import org.apache.cassandra.spark.data.types.Blob;
import org.apache.cassandra.spark.data.types.Boolean;
import org.apache.cassandra.spark.data.types.Counter;
import org.apache.cassandra.spark.data.types.Date;
import org.apache.cassandra.spark.data.types.Decimal;
import org.apache.cassandra.spark.data.types.Double;
import org.apache.cassandra.spark.data.types.Duration;
import org.apache.cassandra.spark.data.types.Empty;
import org.apache.cassandra.spark.data.types.Float;
import org.apache.cassandra.spark.data.types.Inet;
import org.apache.cassandra.spark.data.types.Int;
import org.apache.cassandra.spark.data.types.SmallInt;
import org.apache.cassandra.spark.data.types.Text;
import org.apache.cassandra.spark.data.types.Time;
import org.apache.cassandra.spark.data.types.TimeUUID;
import org.apache.cassandra.spark.data.types.Timestamp;
import org.apache.cassandra.spark.data.types.TinyInt;
import org.apache.cassandra.spark.data.types.VarChar;
import org.apache.cassandra.spark.data.types.VarInt;
import org.apache.cassandra.spark.reader.CompactionStreamScanner;
import org.apache.cassandra.spark.reader.IndexEntry;
import org.apache.cassandra.spark.reader.IndexReader;
import org.apache.cassandra.spark.reader.Rid;
import org.apache.cassandra.spark.reader.SchemaBuilder;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.reader.common.IndexIterator;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.SparkClassLoaderOverride;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.cassandra.tools.JsonTransformer;
import org.apache.cassandra.tools.Util;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@SuppressWarnings("unused")
public class CassandraBridgeImplementation extends CassandraBridge
{
private static volatile boolean setup = false;
private final Map<String, CqlField.NativeType> nativeTypes;
private final Map<Class<?>, Serializer<?>> kryoSerializers;
static
{
CassandraBridgeImplementation.setup();
}
public static synchronized void setup()
{
if (!CassandraBridgeImplementation.setup)
{
// We never want to enable mbean registration in the Cassandra code we use so disable it here
System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
Config.setClientMode(true);
// When we create a TableStreamScanner, we will set the partitioner directly on the table metadata
// using the supplied IIndexStreamScanner.Partitioner. CFMetaData::compile requires a partitioner to
// be set in DatabaseDescriptor before we can do that though, so we set one here in preparation.
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
DatabaseDescriptor.clientInitialization();
Config config = DatabaseDescriptor.getRawConfig();
config.memtable_flush_writers = 8;
config.diagnostic_events_enabled = false;
config.max_mutation_size_in_kb = config.commitlog_segment_size_in_mb * 1024 / 2;
config.concurrent_compactors = 4;
Path tempDirectory;
try
{
tempDirectory = Files.createTempDirectory(UUID.randomUUID().toString());
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
config.data_file_directories = new String[]{tempDirectory.toString()};
DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
Keyspace.setInitialized();
setup = true;
}
}
public CassandraBridgeImplementation()
{
// Cassandra-version-specific Kryo serializers
kryoSerializers = new LinkedHashMap<>();
kryoSerializers.put(CqlField.class, new CqlField.Serializer(this));
kryoSerializers.put(CqlTable.class, new CqlTable.Serializer(this));
kryoSerializers.put(CqlUdt.class, new CqlUdt.Serializer(this));
nativeTypes = allTypes().stream().collect(Collectors.toMap(CqlField.CqlType::name, Function.identity()));
}
@Override
public AbstractMap.SimpleEntry<ByteBuffer, BigInteger> getPartitionKey(@NotNull CqlTable table,
@NotNull Partitioner partitioner,
@NotNull List<String> keys)
{
Preconditions.checkArgument(table.partitionKeys().size() > 0);
ByteBuffer partitionKey = buildPartitionKey(table, keys);
BigInteger partitionKeyTokenValue = hash(partitioner, partitionKey);
return new AbstractMap.SimpleEntry<>(partitionKey, partitionKeyTokenValue);
}
@VisibleForTesting
public static ByteBuffer buildPartitionKey(@NotNull CqlTable table, @NotNull List<String> keys)
{
List<AbstractType<?>> partitionKeyColumnTypes = partitionKeyColumnTypes(table);
if (table.partitionKeys().size() == 1)
{
// Single partition key
return partitionKeyColumnTypes.get(0).fromString(keys.get(0));
}
else
{
// Composite partition key
ByteBuffer[] buffers = new ByteBuffer[keys.size()];
for (int index = 0; index < buffers.length; index++)
{
buffers[index] = partitionKeyColumnTypes.get(index).fromString(keys.get(index));
}
return CompositeType.build(ByteBufferAccessor.instance, buffers);
}
}
@VisibleForTesting
public static List<AbstractType<?>> partitionKeyColumnTypes(@NotNull CqlTable table)
{
return table.partitionKeys().stream()
.map(CqlField::type)
.map(type -> (CqlType) type)
.map(type -> type.dataType(true))
.collect(Collectors.toList());
}
@Override
public TimeProvider timeProvider()
{
return FBUtilities::nowInSeconds;
}
@Override
public StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table,
@NotNull Partitioner partitioner,
@NotNull SSTablesSupplier ssTables,
@Nullable SparkRangeFilter sparkRangeFilter,
@NotNull Collection<PartitionKeyFilter> partitionKeyFilters,
@Nullable PruneColumnFilter columnFilter,
@NotNull TimeProvider timeProvider,
boolean readIndexOffset,
boolean useIncrementalRepair,
@NotNull Stats stats)
{
// NOTE: Need to use SchemaBuilder to init keyspace if not already set in Cassandra Schema instance
SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner);
TableMetadata metadata = schemaBuilder.tableMetaData();
return new CompactionStreamScanner(metadata, partitioner, timeProvider, ssTables.openAll((ssTable, isRepairPrimary) -> {
return org.apache.cassandra.spark.reader.SSTableReader.builder(metadata, ssTable)
.withSparkRangeFilter(sparkRangeFilter)
.withPartitionKeyFilters(partitionKeyFilters)
.withColumnFilter(columnFilter)
.withReadIndexOffset(readIndexOffset)
.withStats(stats)
.useIncrementalRepair(useIncrementalRepair)
.isRepairPrimary(isRepairPrimary)
.build();
}));
}
@Override
public StreamScanner<IndexEntry> getPartitionSizeIterator(@NotNull CqlTable table,
@NotNull Partitioner partitioner,
@NotNull SSTablesSupplier ssTables,
@Nullable SparkRangeFilter rangeFilter,
@NotNull TimeProvider timeProvider,
@NotNull Stats stats,
@NotNull ExecutorService executor)
{
//NOTE: need to use SchemaBuilder to init keyspace if not already set in C* Schema instance
SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner);
TableMetadata metadata = schemaBuilder.tableMetaData();
return new IndexIterator<>(ssTables, stats, ((ssTable, isRepairPrimary, consumer) -> new IndexReader(ssTable, metadata, rangeFilter, stats, consumer)));
}
@Override
public CassandraVersion getVersion()
{
return CassandraVersion.FOURZERO;
}
@Override
public BigInteger hash(Partitioner partitioner, ByteBuffer key)
{
switch (partitioner)
{
case RandomPartitioner:
return RandomPartitioner.instance.getToken(key).getTokenValue();
case Murmur3Partitioner:
return BigInteger.valueOf((long) Murmur3Partitioner.instance.getToken(key).getTokenValue());
default:
throw new UnsupportedOperationException("Unexpected partitioner: " + partitioner);
}
}
@Override
public UUID getTimeUUID()
{
return UUIDGen.getTimeUUID();
}
@Override
public CqlTable buildSchema(String createStatement,
String keyspace,
ReplicationFactor replicationFactor,
Partitioner partitioner,
Set<String> udts,
@Nullable UUID tableId,
int indexCount)
{
return new SchemaBuilder(createStatement, keyspace, replicationFactor, partitioner, bridge -> udts, tableId, indexCount).build();
}
@Override
public String maybeQuoteIdentifier(String identifier)
{
if (isAlreadyQuoted(identifier))
{
return identifier;
}
return ColumnIdentifier.maybeQuote(identifier);
}
// CQL Type Parser
@Override
public Map<String, ? extends CqlField.NativeType> nativeTypeNames()
{
return nativeTypes;
}
@Override
public CqlField.CqlType readType(CqlField.CqlType.InternalType type, Input input)
{
switch (type)
{
case NativeCql:
return nativeType(input.readString());
case Set:
case List:
case Map:
case Tuple:
return CqlCollection.read(type, input, this);
case Frozen:
return CqlFrozen.build(CqlField.CqlType.read(input, this));
case Udt:
return CqlUdt.read(input, this);
default:
throw new IllegalStateException("Unknown CQL type, cannot deserialize");
}
}
@Override
public Ascii ascii()
{
return Ascii.INSTANCE;
}
@Override
public Blob blob()
{
return Blob.INSTANCE;
}
@Override
public Boolean bool()
{
return Boolean.INSTANCE;
}
@Override
public Counter counter()
{
return Counter.INSTANCE;
}
@Override
public BigInt bigint()
{
return BigInt.INSTANCE;
}
@Override
public Date date()
{
return Date.INSTANCE;
}
@Override
public Decimal decimal()
{
return Decimal.INSTANCE;
}
@Override
public Double aDouble()
{
return Double.INSTANCE;
}
@Override
public Duration duration()
{
return Duration.INSTANCE;
}
@Override
public Empty empty()
{
return Empty.INSTANCE;
}
@Override
public Float aFloat()
{
return Float.INSTANCE;
}
@Override
public Inet inet()
{
return Inet.INSTANCE;
}
@Override
public Int aInt()
{
return Int.INSTANCE;
}
@Override
public SmallInt smallint()
{
return SmallInt.INSTANCE;
}
@Override
public Text text()
{
return Text.INSTANCE;
}
@Override
public Time time()
{
return Time.INSTANCE;
}
@Override
public Timestamp timestamp()
{
return Timestamp.INSTANCE;
}
@Override
public TimeUUID timeuuid()
{
return TimeUUID.INSTANCE;
}
@Override
public TinyInt tinyint()
{
return TinyInt.INSTANCE;
}
@Override
public org.apache.cassandra.spark.data.types.UUID uuid()
{
return org.apache.cassandra.spark.data.types.UUID.INSTANCE;
}
@Override
public VarChar varchar()
{
return VarChar.INSTANCE;
}
@Override
public VarInt varint()
{
return VarInt.INSTANCE;
}
@Override
public CqlField.CqlType collection(String name, CqlField.CqlType... types)
{
return CqlCollection.build(name, types);
}
@Override
public CqlList list(CqlField.CqlType type)
{
return CqlCollection.list(type);
}
@Override
public CqlSet set(CqlField.CqlType type)
{
return CqlCollection.set(type);
}
@Override
public CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType)
{
return CqlCollection.map(keyType, valueType);
}
@Override
public CqlTuple tuple(CqlField.CqlType... types)
{
return CqlCollection.tuple(types);
}
@Override
public CqlField.CqlType frozen(CqlField.CqlType type)
{
return CqlFrozen.build(type);
}
@Override
public CqlField.CqlUdtBuilder udt(String keyspace, String name)
{
return CqlUdt.builder(keyspace, name);
}
@Override
public synchronized void writeSSTable(Partitioner partitioner,
String keyspace,
String table,
Path directory,
String createStatement,
String insertStatement,
String updateStatement,
boolean upsert,
Set<CqlField.CqlUdt> udts,
Consumer<Writer> writer)
{
CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
.inDirectory(directory.toFile())
.forTable(createStatement)
.withPartitioner(getPartitioner(partitioner))
.using(upsert ? updateStatement : insertStatement)
.withBufferSizeInMB(128);
for (CqlField.CqlUdt udt : udts)
{
// Add user-defined types to CQL writer
String statement = udt.createStatement(this, keyspace);
builder.withType(statement);
}
// TODO: Remove me once CQLSSTableWriter.Builder synchronize on schema (see CASSANDRA-TBD)
// build update schema, we need to synchornize
try (CQLSSTableWriter ssTable = CassandraSchema.apply(s -> builder.build()))
{
writer.accept(values -> {
try
{
ssTable.addRow(values);
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
});
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
}
public static IPartitioner getPartitioner(Partitioner partitioner)
{
return partitioner == Partitioner.Murmur3Partitioner ? Murmur3Partitioner.instance : RandomPartitioner.instance;
}
@Override
public SSTableWriter getSSTableWriter(String inDirectory,
String partitioner,
String createStatement,
String insertStatement,
RowBufferMode rowBufferMode,
int bufferSizeMB)
{
return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, rowBufferMode, bufferSizeMB);
}
// Version-Specific Test Utility Methods
@Override
@VisibleForTesting
public void writeTombstoneSSTable(Partitioner partitioner,
Path directory,
String createStatement,
String deleteStatement,
Consumer<Writer> consumer)
{
try (SSTableTombstoneWriter writer = SSTableTombstoneWriter.builder()
.inDirectory(directory.toFile())
.forTable(createStatement)
.withPartitioner(getPartitioner(partitioner))
.using(deleteStatement)
.withBufferSizeInMB(128)
.build())
{
consumer.accept(values -> {
try
{
writer.addRow(values);
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
});
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
}
@Override
@VisibleForTesting
public void sstableToJson(Path dataDbFile, OutputStream output) throws FileNotFoundException
{
if (!Files.exists(dataDbFile))
{
throw new FileNotFoundException("Cannot find file " + dataDbFile.toAbsolutePath());
}
if (!Descriptor.isValidFile(dataDbFile.toFile()))
{
throw new RuntimeException("Invalid sstable file");
}
Descriptor desc = Descriptor.fromFilename(dataDbFile.toAbsolutePath().toString());
try
{
TableMetadataRef metadata = TableMetadataRef.forOfflineTools(Util.metadataFromSSTable(desc));
SSTableReader ssTable = SSTableReader.openNoValidation(desc, metadata);
ISSTableScanner currentScanner = ssTable.getScanner();
Stream<UnfilteredRowIterator> partitions = Util.iterToStream(currentScanner);
JsonTransformer.toJson(currentScanner, partitions, false, metadata.get(), output);
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
}
@Override
@VisibleForTesting
public Object toTupleValue(CqlField.CqlTuple type, Object[] values)
{
return CqlTuple.toTupleValue(getVersion(), (CqlTuple) type, values);
}
@Override
@VisibleForTesting
public Object toUserTypeValue(CqlField.CqlUdt type, Map<String, Object> values)
{
return CqlUdt.toUserTypeValue(getVersion(), (CqlUdt) type, values);
}
// Compression Utils
private static final ICompressor COMPRESSOR = LZ4Compressor.create(Collections.emptyMap());
@Override
public ByteBuffer compress(byte[] bytes) throws IOException
{
ByteBuffer input = COMPRESSOR.preferredBufferType().allocate(bytes.length);
input.put(bytes);
input.flip();
return compress(input);
}
@Override
public ByteBuffer compress(ByteBuffer input) throws IOException
{
int length = input.remaining(); // Store uncompressed length as 4 byte int
// 4 extra bytes to store uncompressed length
ByteBuffer output = COMPRESSOR.preferredBufferType().allocate(4 + COMPRESSOR.initialCompressedBufferLength(length));
output.putInt(length);
COMPRESSOR.compress(input, output);
output.flip();
return output;
}
@Override
public ByteBuffer uncompress(byte[] bytes) throws IOException
{
ByteBuffer input = COMPRESSOR.preferredBufferType().allocate(bytes.length);
input.put(bytes);
input.flip();
return uncompress(input);
}
@Override
public ByteBuffer uncompress(ByteBuffer input) throws IOException
{
ByteBuffer output = COMPRESSOR.preferredBufferType().allocate(input.getInt());
COMPRESSOR.uncompress(input, output);
output.flip();
return output;
}
// Kryo/Java (De-)Serialization
@Override
public void kryoRegister(Kryo kryo)
{
kryoSerializers.forEach(kryo::register);
}
@Override
public void javaSerialize(ObjectOutputStream out, Serializable object)
{
try
{
out.writeObject(object);
}
catch (IOException exception)
{
throw new RuntimeException(exception);
}
}
@Override
public <T> T javaDeserialize(ObjectInputStream in, Class<T> type)
{
try (SparkClassLoaderOverride override = new SparkClassLoaderOverride(in, getClass().getClassLoader()))
{
return type.cast(in.readObject());
}
catch (IOException | ClassNotFoundException exception)
{
throw new RuntimeException(exception);
}
}
}