| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.data; |
| |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.io.Serializable; |
| import java.math.BigInteger; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Range; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.esotericsoftware.kryo.Kryo; |
| import com.esotericsoftware.kryo.io.Input; |
| import com.esotericsoftware.kryo.io.Output; |
| import org.apache.cassandra.bridge.BigNumberConfig; |
| import org.apache.cassandra.bridge.BigNumberConfigImpl; |
| import org.apache.cassandra.bridge.CassandraBridge; |
| import org.apache.cassandra.bridge.CassandraBridgeFactory; |
| import org.apache.cassandra.bridge.CassandraVersion; |
| import org.apache.cassandra.clients.ExecutorHolder; |
| import org.apache.cassandra.clients.Sidecar; |
| import org.apache.cassandra.clients.SidecarInstanceImpl; |
| import org.apache.cassandra.clients.SslConfig; |
| import org.apache.cassandra.sidecar.client.SidecarClient; |
| import org.apache.cassandra.sidecar.client.SidecarInstance; |
| import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; |
| import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; |
| import org.apache.cassandra.sidecar.common.NodeSettings; |
| import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse; |
| import org.apache.cassandra.sidecar.common.data.RingResponse; |
| import org.apache.cassandra.sidecar.common.data.SchemaResponse; |
| import org.apache.cassandra.spark.cdc.CommitLog; |
| import org.apache.cassandra.spark.cdc.TableIdLookup; |
| import org.apache.cassandra.spark.cdc.watermarker.Watermarker; |
| import org.apache.cassandra.spark.config.SchemaFeature; |
| import org.apache.cassandra.spark.config.SchemaFeatureSet; |
| import org.apache.cassandra.spark.data.partitioner.CassandraInstance; |
| import org.apache.cassandra.spark.data.partitioner.CassandraRing; |
| import org.apache.cassandra.spark.data.partitioner.ConsistencyLevel; |
| import org.apache.cassandra.spark.data.partitioner.Partitioner; |
| import org.apache.cassandra.spark.data.partitioner.TokenPartitioner; |
| import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator; |
| import org.apache.cassandra.spark.sparksql.RowBuilder; |
| import org.apache.cassandra.spark.stats.Stats; |
| import org.apache.cassandra.spark.utils.CqlUtils; |
| import org.apache.cassandra.spark.utils.MapUtils; |
| import org.apache.cassandra.spark.utils.ScalaFunctions; |
| import org.apache.cassandra.spark.utils.ThrowableUtils; |
| import org.apache.spark.sql.types.DataType; |
| import org.apache.spark.util.ShutdownHookManager; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.cassandra.spark.utils.Properties.NODE_STATUS_NOT_CONSIDERED; |
| |
| public class CassandraDataLayer extends PartitionedDataLayer implements Serializable |
| { |
| private static final long serialVersionUID = -9038926850642710787L; |
| |
| public static final Logger LOGGER = LoggerFactory.getLogger(CassandraDataLayer.class); |
| private static final Cache<String, CompletableFuture<List<SSTable>>> SNAPSHOT_CACHE = |
| CacheBuilder.newBuilder() |
| .expireAfterAccess(15, TimeUnit.MINUTES) |
| .maximumSize(128) |
| .build(); |
| |
| protected String snapshotName; |
| protected String keyspace; |
| protected String table; |
| protected CassandraBridge bridge; |
| protected Set<? extends SidecarInstance> clusterConfig; |
| protected TokenPartitioner tokenPartitioner; |
| protected Map<String, AvailabilityHint> availabilityHints; |
| protected Sidecar.ClientConfig sidecarClientConfig; |
| private SslConfig sslConfig; |
| protected Map<String, BigNumberConfigImpl> bigNumberConfigMap; |
| protected boolean enableStats; |
| protected boolean readIndexOffset; |
| protected boolean useIncrementalRepair; |
| protected List<SchemaFeature> requestedFeatures; |
| protected Map<String, ReplicationFactor> rfMap; |
| @Nullable |
| protected String lastModifiedTimestampField; |
| // volatile in order to publish the reference for visibility |
| protected volatile CqlTable cqlTable; |
| protected transient SidecarClient sidecar; |
| @VisibleForTesting |
| transient Map<String, SidecarInstance> instanceMap; |
| |
| public CassandraDataLayer(@NotNull ClientConfig options, |
| @NotNull Sidecar.ClientConfig sidecarClientConfig, |
| @Nullable SslConfig sslConfig) |
| { |
| super(options.consistencyLevel(), options.datacenter()); |
| this.snapshotName = options.snapshotName(); |
| this.keyspace = options.keyspace(); |
| this.table = CqlUtils.cleanTableName(options.table()); |
| this.sidecarClientConfig = sidecarClientConfig; |
| this.sslConfig = sslConfig; |
| this.bigNumberConfigMap = options.bigNumberConfigMap(); |
| this.enableStats = options.enableStats(); |
| this.readIndexOffset = options.readIndexOffset(); |
| this.useIncrementalRepair = options.useIncrementalRepair(); |
| this.lastModifiedTimestampField = options.lastModifiedTimestampField(); |
| this.requestedFeatures = options.requestedFeatures(); |
| } |
| |
| // For serialization |
| @VisibleForTesting |
| // CHECKSTYLE IGNORE: Constructor with many parameters |
| CassandraDataLayer(@Nullable String keyspace, |
| @Nullable String table, |
| @NotNull String snapshotName, |
| @Nullable String datacenter, |
| @NotNull Sidecar.ClientConfig sidecarClientConfig, |
| @Nullable SslConfig sslConfig, |
| @NotNull CqlTable cqlTable, |
| @NotNull TokenPartitioner tokenPartitioner, |
| @NotNull CassandraVersion version, |
| @NotNull ConsistencyLevel consistencyLevel, |
| @NotNull Set<SidecarInstanceImpl> clusterConfig, |
| @NotNull Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints, |
| @NotNull Map<String, BigNumberConfigImpl> bigNumberConfigMap, |
| boolean enableStats, |
| boolean readIndexOffset, |
| boolean useIncrementalRepair, |
| @Nullable String lastModifiedTimestampField, |
| List<SchemaFeature> requestedFeatures, |
| @NotNull Map<String, ReplicationFactor> rfMap) |
| { |
| super(consistencyLevel, datacenter); |
| this.snapshotName = snapshotName; |
| this.keyspace = keyspace; |
| this.table = table; |
| this.cqlTable = cqlTable; |
| this.tokenPartitioner = tokenPartitioner; |
| this.bridge = CassandraBridgeFactory.get(version); |
| this.clusterConfig = clusterConfig; |
| this.availabilityHints = availabilityHints; |
| this.sidecarClientConfig = sidecarClientConfig; |
| this.sslConfig = sslConfig; |
| this.bigNumberConfigMap = bigNumberConfigMap; |
| this.enableStats = enableStats; |
| this.readIndexOffset = readIndexOffset; |
| this.useIncrementalRepair = useIncrementalRepair; |
| this.lastModifiedTimestampField = lastModifiedTimestampField; |
| this.requestedFeatures = requestedFeatures; |
| if (lastModifiedTimestampField != null) |
| { |
| aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); |
| } |
| this.rfMap = rfMap; |
| initInstanceMap(); |
| } |
| |
| public void initialize(@NotNull ClientConfig options) |
| { |
| dialHome(options); |
| |
| LOGGER.info("Starting Cassandra Spark job snapshotName={} keyspace={} table={} dc={}", |
| snapshotName, keyspace, table, datacenter); |
| |
| // Load cluster config from Discovery |
| clusterConfig = initializeClusterConfig(options); |
| initInstanceMap(); |
| |
| // Get cluster info from CassandraManager |
| int effectiveNumberOfCores; |
| CompletableFuture<RingResponse> ringFuture = sidecar.ring(keyspace); |
| try |
| { |
| CompletableFuture<NodeSettings> nodeSettingsFuture = nodeSettingsFuture(clusterConfig, ringFuture); |
| effectiveNumberOfCores = initBulkReader(options, nodeSettingsFuture, ringFuture); |
| } |
| catch (InterruptedException exception) |
| { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(exception); |
| } |
| catch (ExecutionException exception) |
| { |
| throw new RuntimeException(ThrowableUtils.rootCause(exception)); |
| } |
| LOGGER.info("Initialized Cassandra Bulk Reader with effectiveNumberOfCores={}", effectiveNumberOfCores); |
| } |
| |
| private int initBulkReader(@NotNull ClientConfig options, |
| CompletableFuture<NodeSettings> nodeSettingsFuture, |
| CompletableFuture<RingResponse> ringFuture) throws ExecutionException, InterruptedException |
| { |
| Preconditions.checkArgument(keyspace != null, "Keyspace must be non-null for Cassandra Bulk Reader"); |
| Preconditions.checkArgument(table != null, "Table must be non-null for Cassandra Bulk Reader"); |
| CompletableFuture<Map<String, PartitionedDataLayer.AvailabilityHint>> snapshotFuture; |
| if (options.createSnapshot()) |
| { |
| // Use create snapshot request to capture instance availability hint |
| LOGGER.info("Creating snapshot snapshotName={} keyspace={} table={} dc={}", |
| snapshotName, keyspace, table, datacenter); |
| snapshotFuture = ringFuture.thenCompose(this::createSnapshot); |
| } |
| else |
| { |
| snapshotFuture = CompletableFuture.completedFuture(new HashMap<>()); |
| } |
| ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(), |
| ScalaFunctions.wrapLambda(() -> shutdownHook(options))); |
| |
| CompletableFuture<SchemaResponse> schemaFuture = sidecar.schema(keyspace); |
| NodeSettings nodeSettings = nodeSettingsFuture.get(); |
| |
| String cassandraVersion = getEffectiveCassandraVersionForRead(clusterConfig, nodeSettings); |
| |
| Partitioner partitioner = Partitioner.from(nodeSettings.partitioner()); |
| bridge = CassandraBridgeFactory.get(cassandraVersion); |
| availabilityHints = snapshotFuture.get(); |
| |
| String fullSchema = schemaFuture.get().schema(); |
| String createStmt = CqlUtils.extractTableSchema(fullSchema, keyspace, table); |
| int indexCount = CqlUtils.extractIndexCount(fullSchema, keyspace, table); |
| Set<String> udts = CqlUtils.extractUdts(fullSchema, keyspace); |
| ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(fullSchema, keyspace); |
| rfMap = ImmutableMap.of(keyspace, replicationFactor); |
| CompletableFuture<Integer> sizingFuture = CompletableFuture.supplyAsync( |
| () -> getSizing(clusterConfig, replicationFactor, options).getEffectiveNumberOfCores(), |
| ExecutorHolder.EXECUTOR_SERVICE); |
| validateReplicationFactor(replicationFactor); |
| udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt)); |
| |
| cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount); |
| CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get()); |
| |
| int effectiveNumberOfCores = sizingFuture.get(); |
| tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism, effectiveNumberOfCores); |
| return effectiveNumberOfCores; |
| } |
| |
| protected void shutdownHook(ClientConfig options) |
| { |
| // Preserves previous behavior, but we may just want to check for the clearSnapshot option in the future |
| if (options.clearSnapshot()) |
| { |
| if (options.createSnapshot()) |
| { |
| clearSnapshot(clusterConfig, options); |
| } |
| else |
| { |
| LOGGER.warn("Skipping clearing snapshot because it was not created by this job. " |
| + "Only the job that created the snapshot can clear it. " |
| + "snapshotName={} keyspace={} table={} dc={}", |
| snapshotName, keyspace, table, datacenter); |
| } |
| } |
| |
| try |
| { |
| sidecar.close(); |
| } |
| catch (Exception exception) |
| { |
| LOGGER.warn("Unable to close Sidecar", exception); |
| } |
| } |
| |
| private CompletionStage<Map<String, AvailabilityHint>> createSnapshot(RingResponse ring) |
| { |
| Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints = new ConcurrentHashMap<>(ring.size()); |
| |
| // Fire off create snapshot request across the entire cluster |
| List<CompletableFuture<Void>> futures = ring |
| .stream() |
| .filter(ringEntry -> datacenter == null || datacenter.equals(ringEntry.datacenter())) |
| .map(ringEntry -> { |
| PartitionedDataLayer.AvailabilityHint hint = |
| PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(), ringEntry.state()); |
| |
| CompletableFuture<PartitionedDataLayer.AvailabilityHint> createSnapshotFuture; |
| if (NODE_STATUS_NOT_CONSIDERED.contains(ringEntry.state())) |
| { |
| LOGGER.warn("Skip snapshot creating when node is joining or down " |
| + "snapshotName={} keyspace={} table={} datacenter={} fqdn={} status={} state={}", |
| snapshotName, keyspace, table, datacenter, ringEntry.fqdn(), ringEntry.status(), ringEntry.state()); |
| createSnapshotFuture = CompletableFuture.completedFuture(hint); |
| } |
| else |
| { |
| LOGGER.info("Creating snapshot on instance snapshotName={} keyspace={} table={} datacenter={} fqdn={}", |
| snapshotName, keyspace, table, datacenter, ringEntry.fqdn()); |
| SidecarInstance sidecarInstance = new SidecarInstanceImpl(ringEntry.fqdn(), sidecarClientConfig.port()); |
| createSnapshotFuture = sidecar |
| .createSnapshot(sidecarInstance, keyspace, table, snapshotName) |
| .handle((resp, throwable) -> { |
| if (throwable == null) |
| { |
| // Create snapshot succeeded |
| return hint; |
| } |
| |
| if (isExhausted(throwable)) |
| { |
| LOGGER.warn("Failed to create snapshot on instance", throwable); |
| return PartitionedDataLayer.AvailabilityHint.DOWN; |
| } |
| |
| LOGGER.error("Unexpected error creating snapshot on instance", throwable); |
| return PartitionedDataLayer.AvailabilityHint.UNKNOWN; |
| }); |
| } |
| |
| return createSnapshotFuture |
| .thenAccept(h -> availabilityHints.put(ringEntry.fqdn(), h)); |
| }) |
| .collect(Collectors.toList()); |
| |
| return CompletableFuture |
| .allOf(futures.toArray(new CompletableFuture[0])) |
| .handle((results, throwable) -> availabilityHints); |
| } |
| |
| private boolean isExhausted(@Nullable Throwable throwable) |
| { |
| return throwable != null && (throwable instanceof RetriesExhaustedException || isExhausted(throwable.getCause())); |
| } |
| |
| @Override |
| public boolean useIncrementalRepair() |
| { |
| return useIncrementalRepair; |
| } |
| |
| @Override |
| public boolean readIndexOffset() |
| { |
| return readIndexOffset; |
| } |
| |
| protected void initInstanceMap() |
| { |
| instanceMap = clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname, Function.identity())); |
| try |
| { |
| sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new ArrayList<>(clusterConfig)), |
| sidecarClientConfig, |
| sslConfig); |
| } |
| catch (IOException ioException) |
| { |
| throw new RuntimeException("Unable to build sidecar client", ioException); |
| } |
| LOGGER.info("Initialized CassandraDataLayer instanceMap numInstances={}", instanceMap.size()); |
| } |
| |
| @Override |
| public CassandraBridge bridge() |
| { |
| return bridge; |
| } |
| |
| @Override |
| public Stats stats() |
| { |
| return Stats.DoNothingStats.INSTANCE; |
| } |
| |
| @Override |
| public List<SchemaFeature> requestedFeatures() |
| { |
| return requestedFeatures; |
| } |
| |
| @Override |
| public CassandraRing ring() |
| { |
| return tokenPartitioner.ring(); |
| } |
| |
| @Override |
| public TokenPartitioner tokenPartitioner() |
| { |
| return tokenPartitioner; |
| } |
| |
| @Override |
| protected ExecutorService executorService() |
| { |
| return ExecutorHolder.EXECUTOR_SERVICE; |
| } |
| |
| @Override |
| public String jobId() |
| { |
| return null; |
| } |
| |
| @Override |
| public CqlTable cqlTable() |
| { |
| if (cqlTable == null) |
| { |
| throw new RuntimeException("Schema not initialized"); |
| } |
| return cqlTable; |
| } |
| |
| @Override |
| public Watermarker cdcWatermarker() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Duration cdcWatermarkWindow() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public CompletableFuture<List<CommitLog>> listCommitLogs(CassandraInstance instance) |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public ReplicationFactor replicationFactor(String keyspace) |
| { |
| return rfMap.get(keyspace); |
| } |
| |
| @Override |
| public TableIdLookup tableIdLookup() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| protected PartitionedDataLayer.AvailabilityHint getAvailability(CassandraInstance instance) |
| { |
| // Hint CassandraInstance availability to parent PartitionedDataLayer |
| PartitionedDataLayer.AvailabilityHint hint = availabilityHints.get(instance.nodeName()); |
| return hint != null ? hint : PartitionedDataLayer.AvailabilityHint.UNKNOWN; |
| } |
| |
| private String snapshotKey(SidecarInstance instance) |
| { |
| return String.format("%s/%s/%d/%s/%s/%s", |
| datacenter, instance.hostname(), instance.port(), keyspace, table, snapshotName); |
| } |
| |
| @Override |
| public CompletableFuture<Stream<SSTable>> listInstance(int partitionId, |
| @NotNull Range<BigInteger> range, |
| @NotNull CassandraInstance instance) |
| { |
| SidecarInstance sidecarInstance = instanceMap.get(instance.nodeName()); |
| if (sidecarInstance == null) |
| { |
| throw new IllegalStateException("Could not find matching cassandra instance: " + instance.nodeName()); |
| } |
| String key = snapshotKey(sidecarInstance); // NOTE: We don't currently support token filtering in list snapshot |
| LOGGER.info("Listing snapshot partition={} lowerBound={} upperBound={} " |
| + "instance={} port={} keyspace={} tableName={} snapshotName={}", |
| partitionId, range.lowerEndpoint(), range.upperEndpoint(), |
| sidecarInstance.hostname(), sidecarInstance.port(), keyspace, table, snapshotName); |
| try |
| { |
| return SNAPSHOT_CACHE.get(key, () -> { |
| LOGGER.info("Listing instance snapshot partition={} lowerBound={} upperBound={} " |
| + "instance={} port={} keyspace={} tableName={} snapshotName={} cacheKey={}", |
| partitionId, range.lowerEndpoint(), range.upperEndpoint(), |
| sidecarInstance.hostname(), sidecarInstance.port(), keyspace, table, snapshotName, key); |
| return sidecar.listSnapshotFiles(sidecarInstance, keyspace, table, snapshotName) |
| .thenApply(response -> collectSSTableList(sidecarInstance, response, partitionId)); |
| }).thenApply(Collection::stream); |
| } |
| catch (ExecutionException exception) |
| { |
| CompletableFuture<Stream<SSTable>> future = new CompletableFuture<>(); |
| future.completeExceptionally(ThrowableUtils.rootCause(exception)); |
| return future; |
| } |
| } |
| |
| @SuppressWarnings("UnstableApiUsage") |
| private List<SSTable> collectSSTableList(SidecarInstance sidecarInstance, |
| ListSnapshotFilesResponse response, |
| int partitionId) |
| { |
| if (response == null) |
| { |
| throw new IncompleteSSTableException(); |
| } |
| List<ListSnapshotFilesResponse.FileInfo> snapshotFilesInfo = response.snapshotFilesInfo(); |
| if (snapshotFilesInfo == null) |
| { |
| throw new IncompleteSSTableException(); |
| } |
| |
| // Group SSTable components together |
| Map<String, Map<FileType, ListSnapshotFilesResponse.FileInfo>> result = new LinkedHashMap<>(1024); |
| for (ListSnapshotFilesResponse.FileInfo file : snapshotFilesInfo) |
| { |
| String fileName = file.fileName; |
| int lastIndexOfDash = fileName.lastIndexOf('-'); |
| if (lastIndexOfDash < 0) |
| { |
| // E.g. dd manifest.json file |
| continue; |
| } |
| String ssTableName = fileName.substring(0, lastIndexOfDash); |
| try |
| { |
| FileType fileType = FileType.fromExtension(fileName.substring(lastIndexOfDash + 1)); |
| result.computeIfAbsent(ssTableName, k -> new LinkedHashMap<>()) |
| .put(fileType, file); |
| } |
| catch (IllegalArgumentException ignore) |
| { |
| // Ignore unknown SSTable component types |
| } |
| } |
| |
| // Map to SSTable |
| return result.values().stream() |
| .map(components -> new SidecarProvisionedSSTable(sidecar, |
| sidecarClientConfig, |
| sidecarInstance, |
| keyspace, |
| table, |
| snapshotName, |
| components, |
| partitionId, |
| stats())) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return Objects.hash(super.hashCode(), cqlTable, snapshotName, keyspace, table, version()); |
| } |
| |
| @Override |
| public boolean equals(Object other) |
| { |
| if (this == other) |
| { |
| return true; |
| } |
| if (other == null || this.getClass() != other.getClass() || !super.equals(other)) |
| { |
| return false; |
| } |
| |
| CassandraDataLayer that = (CassandraDataLayer) other; |
| return cqlTable.equals(that.cqlTable) |
| && snapshotName.equals(that.snapshotName) |
| && keyspace.equals(that.keyspace) |
| && table.equals(that.table) |
| && version().equals(that.version()); |
| } |
| |
| public Map<String, BigNumberConfigImpl> bigNumberConfigMap() |
| { |
| return bigNumberConfigMap; |
| } |
| |
| @Override |
| public BigNumberConfig bigNumberConfig(CqlField field) |
| { |
| BigNumberConfigImpl config = bigNumberConfigMap.get(field.name()); |
| return config != null ? config : BigNumberConfig.DEFAULT; |
| } |
| |
| /* Internal Cassandra SSTable */ |
| |
| @VisibleForTesting |
| public CassandraRing createCassandraRingFromRing(Partitioner partitioner, |
| ReplicationFactor replicationFactor, |
| RingResponse ring) |
| { |
| Collection<CassandraInstance> instances = ring |
| .stream() |
| .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) |
| .collect(Collectors.toList()); |
| return new CassandraRing(partitioner, keyspace, replicationFactor, instances); |
| } |
| |
| // JDK Serialization |
| |
| @SuppressWarnings("unchecked") |
| private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException |
| { |
| LOGGER.warn("Falling back to JDK deserialization"); |
| this.bridge = CassandraBridgeFactory.get(CassandraVersion.valueOf(in.readUTF())); |
| this.snapshotName = in.readUTF(); |
| this.keyspace = readNullable(in); |
| this.table = readNullable(in); |
| this.sidecarClientConfig = Sidecar.ClientConfig.create(in.readInt(), |
| in.readInt(), |
| in.readLong(), |
| in.readLong(), |
| in.readLong(), |
| in.readLong(), |
| in.readInt(), |
| in.readInt(), |
| (Map<FileType, Long>) in.readObject(), |
| (Map<FileType, Long>) in.readObject()); |
| this.sslConfig = (SslConfig) in.readObject(); |
| |
| this.cqlTable = bridge.javaDeserialize(in, CqlTable.class); // Delegate (de-)serialization of version-specific objects to the Cassandra Bridge |
| this.tokenPartitioner = (TokenPartitioner) in.readObject(); |
| this.clusterConfig = (Set<SidecarInstanceImpl>) in.readObject(); |
| this.availabilityHints = (Map<String, AvailabilityHint>) in.readObject(); |
| this.bigNumberConfigMap = (Map<String, BigNumberConfigImpl>) in.readObject(); |
| this.enableStats = in.readBoolean(); |
| this.readIndexOffset = in.readBoolean(); |
| this.useIncrementalRepair = in.readBoolean(); |
| this.lastModifiedTimestampField = readNullable(in); |
| int features = in.readShort(); |
| List<SchemaFeature> requestedFeatures = new ArrayList<>(features); |
| for (int feature = 0; feature < features; feature++) |
| { |
| String featureName = in.readUTF(); |
| requestedFeatures.add(SchemaFeatureSet.valueOf(featureName.toUpperCase())); |
| } |
| this.requestedFeatures = requestedFeatures; |
| // Has alias for last modified timestamp |
| if (this.lastModifiedTimestampField != null) |
| { |
| aliasLastModifiedTimestamp(this.requestedFeatures, this.lastModifiedTimestampField); |
| } |
| this.rfMap = (Map<String, ReplicationFactor>) in.readObject(); |
| this.initInstanceMap(); |
| } |
| |
| private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException |
| { |
| LOGGER.warn("Falling back to JDK serialization"); |
| out.writeUTF(this.version().name()); |
| out.writeUTF(this.snapshotName); |
| writeNullable(out, this.keyspace); |
| writeNullable(out, this.table); |
| out.writeInt(this.sidecarClientConfig.port()); |
| out.writeInt(this.sidecarClientConfig.maxRetries()); |
| out.writeLong(this.sidecarClientConfig.millisToSleep()); |
| out.writeLong(this.sidecarClientConfig.maxMillisToSleep()); |
| out.writeLong(this.sidecarClientConfig.maxBufferSize()); |
| out.writeLong(this.sidecarClientConfig.chunkBufferSize()); |
| out.writeInt(this.sidecarClientConfig.maxPoolSize()); |
| out.writeInt(this.sidecarClientConfig.timeoutSeconds()); |
| out.writeObject(this.sidecarClientConfig.maxBufferOverride()); |
| out.writeObject(this.sidecarClientConfig.chunkBufferOverride()); |
| out.writeObject(this.sslConfig); |
| bridge.javaSerialize(out, this.cqlTable); // Delegate (de-)serialization of version-specific objects to the Cassandra Bridge |
| out.writeObject(this.tokenPartitioner); |
| out.writeObject(this.clusterConfig); |
| out.writeObject(this.availabilityHints); |
| out.writeObject(this.bigNumberConfigMap); |
| out.writeBoolean(this.enableStats); |
| out.writeBoolean(this.readIndexOffset); |
| out.writeBoolean(this.useIncrementalRepair); |
| // If lastModifiedTimestampField exist, it aliases the LMT field |
| writeNullable(out, this.lastModifiedTimestampField); |
| // Write the list of requested features: first write the size, then write the feature names |
| out.writeShort(this.requestedFeatures.size()); |
| for (SchemaFeature feature : requestedFeatures) |
| { |
| out.writeUTF(feature.optionName()); |
| } |
| out.writeObject(this.rfMap); |
| } |
| |
| private static void writeNullable(ObjectOutputStream out, @Nullable String string) throws IOException |
| { |
| if (string == null) |
| { |
| out.writeBoolean(false); |
| } |
| else |
| { |
| out.writeBoolean(true); |
| out.writeUTF(string); |
| } |
| } |
| |
| @Nullable |
| private static String readNullable(ObjectInputStream in) throws IOException |
| { |
| if (in.readBoolean()) |
| { |
| return in.readUTF(); |
| } |
| return null; |
| } |
| |
| // Kryo Serialization |
| |
| public static class Serializer extends com.esotericsoftware.kryo.Serializer<CassandraDataLayer> |
| { |
| @Override |
| public void write(Kryo kryo, Output out, CassandraDataLayer dataLayer) |
| { |
| LOGGER.info("Serializing CassandraDataLayer with Kryo"); |
| out.writeString(dataLayer.keyspace); |
| out.writeString(dataLayer.table); |
| out.writeString(dataLayer.snapshotName); |
| out.writeString(dataLayer.datacenter); |
| out.writeInt(dataLayer.sidecarClientConfig.port()); |
| out.writeInt(dataLayer.sidecarClientConfig.maxRetries()); |
| out.writeLong(dataLayer.sidecarClientConfig.millisToSleep()); |
| out.writeLong(dataLayer.sidecarClientConfig.maxMillisToSleep()); |
| out.writeLong(dataLayer.sidecarClientConfig.maxBufferSize()); |
| out.writeLong(dataLayer.sidecarClientConfig.chunkBufferSize()); |
| out.writeInt(dataLayer.sidecarClientConfig.maxPoolSize()); |
| out.writeInt(dataLayer.sidecarClientConfig.timeoutSeconds()); |
| kryo.writeObject(out, dataLayer.sidecarClientConfig.maxBufferOverride()); |
| kryo.writeObject(out, dataLayer.sidecarClientConfig.chunkBufferOverride()); |
| kryo.writeObjectOrNull(out, dataLayer.sslConfig, SslConfig.class); |
| kryo.writeObject(out, dataLayer.cqlTable); |
| kryo.writeObject(out, dataLayer.tokenPartitioner); |
| kryo.writeObject(out, dataLayer.version()); |
| kryo.writeObject(out, dataLayer.consistencyLevel); |
| kryo.writeObject(out, dataLayer.clusterConfig); |
| kryo.writeObject(out, dataLayer.availabilityHints); |
| out.writeBoolean(dataLayer.bigNumberConfigMap.isEmpty()); // Kryo fails to deserialize bigNumberConfigMap map if empty |
| if (!dataLayer.bigNumberConfigMap.isEmpty()) |
| { |
| kryo.writeObject(out, dataLayer.bigNumberConfigMap); |
| } |
| out.writeBoolean(dataLayer.enableStats); |
| out.writeBoolean(dataLayer.readIndexOffset); |
| out.writeBoolean(dataLayer.useIncrementalRepair); |
| // If lastModifiedTimestampField exist, it aliases the LMT field |
| out.writeString(dataLayer.lastModifiedTimestampField); |
| // Write the list of requested features: first write the size, then write the feature names |
| SchemaFeaturesListWrapper listWrapper = new SchemaFeaturesListWrapper(); |
| listWrapper.requestedFeatureNames = dataLayer.requestedFeatures.stream() |
| .map(SchemaFeature::optionName) |
| .collect(Collectors.toList()); |
| kryo.writeObject(out, listWrapper); |
| kryo.writeObject(out, dataLayer.rfMap); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public CassandraDataLayer read(Kryo kryo, Input in, Class<CassandraDataLayer> type) |
| { |
| LOGGER.info("Deserializing CassandraDataLayer with Kryo"); |
| return new CassandraDataLayer( |
| in.readString(), |
| in.readString(), |
| in.readString(), |
| in.readString(), |
| Sidecar.ClientConfig.create(in.readInt(), |
| in.readInt(), |
| in.readLong(), |
| in.readLong(), |
| in.readLong(), |
| in.readLong(), |
| in.readInt(), |
| in.readInt(), |
| (Map<FileType, Long>) kryo.readObject(in, HashMap.class), |
| (Map<FileType, Long>) kryo.readObject(in, HashMap.class)), |
| kryo.readObjectOrNull(in, SslConfig.class), |
| kryo.readObject(in, CqlTable.class), |
| kryo.readObject(in, TokenPartitioner.class), |
| kryo.readObject(in, CassandraVersion.class), |
| kryo.readObject(in, ConsistencyLevel.class), |
| kryo.readObject(in, HashSet.class), |
| (Map<String, PartitionedDataLayer.AvailabilityHint>) kryo.readObject(in, HashMap.class), |
| in.readBoolean() ? Collections.emptyMap() |
| : (Map<String, BigNumberConfigImpl>) kryo.readObject(in, HashMap.class), |
| in.readBoolean(), |
| in.readBoolean(), |
| in.readBoolean(), |
| in.readString(), |
| kryo.readObject(in, SchemaFeaturesListWrapper.class).toList(), |
| kryo.readObject(in, HashMap.class)); |
| } |
| |
| // Wrapper only used internally for Kryo serialization/deserialization |
| private static class SchemaFeaturesListWrapper |
| { |
| public List<String> requestedFeatureNames; // CHECKSTYLE IGNORE: Public mutable field |
| |
| public List<SchemaFeature> toList() |
| { |
| return requestedFeatureNames.stream() |
| .map(name -> SchemaFeatureSet.valueOf(name.toUpperCase())) |
| .collect(Collectors.toList()); |
| } |
| } |
| } |
| |
| protected Set<? extends SidecarInstance> initializeClusterConfig(ClientConfig options) |
| { |
| return Arrays.stream(options.sidecarInstances().split(",")) |
| .map(hostname -> new SidecarInstanceImpl(hostname, options.sidecarPort())) |
| .collect(Collectors.toSet()); |
| } |
| |
| protected CompletableFuture<NodeSettings> nodeSettingsFuture(Set<? extends SidecarInstance> clusterConfig, |
| CompletableFuture<RingResponse> ring) |
| { |
| return sidecar.nodeSettings(); |
| } |
| |
| protected String getEffectiveCassandraVersionForRead(Set<? extends SidecarInstance> clusterConfig, |
| NodeSettings nodeSettings) |
| { |
| return nodeSettings.releaseVersion(); |
| } |
| |
| protected void dialHome(@NotNull ClientConfig options) |
| { |
| LOGGER.info("Dial home. clientConfig={}", options); |
| } |
| |
| protected void clearSnapshot(Set<? extends SidecarInstance> clusterConfig, @NotNull ClientConfig options) |
| { |
| LOGGER.info("Clearing snapshot at end of Spark job snapshotName={} keyspace={} table={} dc={}", |
| snapshotName, keyspace, table, datacenter); |
| CountDownLatch latch = new CountDownLatch(clusterConfig.size()); |
| try |
| { |
| for (SidecarInstance instance : clusterConfig) |
| { |
| sidecar.clearSnapshot(instance, keyspace, table, snapshotName).whenComplete((resp, throwable) -> { |
| try |
| { |
| if (throwable != null) |
| { |
| LOGGER.warn("Failed to clear snapshot on instance hostname={} port={} snapshotName={} keyspace={} table={} datacenter={}", |
| instance.hostname(), instance.port(), snapshotName, keyspace, table, datacenter, throwable); |
| } |
| } |
| finally |
| { |
| latch.countDown(); |
| } |
| }); |
| } |
| await(latch); |
| LOGGER.info("Snapshot cleared snapshotName={} keyspace={} table={} datacenter={}", |
| snapshotName, keyspace, table, datacenter); |
| } |
| catch (Throwable throwable) |
| { |
| LOGGER.warn("Unexpected exception clearing snapshot snapshotName={} keyspace={} table={} dc={}", |
| snapshotName, keyspace, table, datacenter, throwable); |
| } |
| } |
| |
| /** |
| * Returns the {@link Sizing} object based on the {@code sizing} option provided by the user, |
| * or {@link DefaultSizing} as the default sizing |
| * |
| * @param clusterConfig the cluster configuration |
| * @param replicationFactor the replication factor |
| * @param options the {@link ClientConfig} options |
| * @return the {@link Sizing} object based on the {@code sizing} option provided by the user |
| */ |
| protected Sizing getSizing(Set<? extends SidecarInstance> clusterConfig, |
| ReplicationFactor replicationFactor, |
| ClientConfig options) |
| { |
| return new DefaultSizing(options.numCores()); |
| } |
| |
| protected void await(CountDownLatch latch) |
| { |
| try |
| { |
| latch.await(); |
| } |
| catch (InterruptedException exception) |
| { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(exception); |
| } |
| } |
| |
| public static final class ClientConfig |
| { |
| public static final String SIDECAR_INSTANCES = "sidecar_instances"; |
| public static final String KEYSPACE_KEY = "keyspace"; |
| public static final String TABLE_KEY = "table"; |
| public static final String SNAPSHOT_NAME_KEY = "snapshotName"; |
| public static final String DC_KEY = "dc"; |
| public static final String CREATE_SNAPSHOT_KEY = "createSnapshot"; |
| public static final String CLEAR_SNAPSHOT_KEY = "clearSnapshot"; |
| public static final String DEFAULT_PARALLELISM_KEY = "defaultParallelism"; |
| public static final String NUM_CORES_KEY = "numCores"; |
| public static final String CONSISTENCY_LEVEL_KEY = "consistencyLevel"; |
| public static final String ENABLE_STATS_KEY = "enableStats"; |
| public static final String LAST_MODIFIED_COLUMN_NAME_KEY = "lastModifiedColumnName"; |
| public static final String READ_INDEX_OFFSET_KEY = "readIndexOffset"; |
| public static final String SIZING_KEY = "sizing"; |
| public static final String SIZING_DEFAULT = "default"; |
| public static final String MAX_PARTITION_SIZE_KEY = "maxPartitionSize"; |
| public static final String USE_INCREMENTAL_REPAIR = "useIncrementalRepair"; |
| public static final String ENABLE_EXPANSION_SHRINK_CHECK_KEY = "enableExpansionShrinkCheck"; |
| public static final String SIDECAR_PORT = "sidecar_port"; |
| public static final int DEFAULT_SIDECAR_PORT = 9043; |
| |
| private final String sidecarInstances; |
| @Nullable |
| private final String keyspace; |
| @Nullable |
| private final String table; |
| private final String snapshotName; |
| private final String datacenter; |
| private final boolean createSnapshot; |
| private final boolean clearSnapshot; |
| private final int defaultParallelism; |
| private final int numCores; |
| private final ConsistencyLevel consistencyLevel; |
| private final Map<String, BigNumberConfigImpl> bigNumberConfigMap; |
| private final boolean enableStats; |
| private final boolean readIndexOffset; |
| private final String sizing; |
| private final int maxPartitionSize; |
| private final boolean useIncrementalRepair; |
| private final List<SchemaFeature> requestedFeatures; |
| private final String lastModifiedTimestampField; |
| private final Boolean enableExpansionShrinkCheck; |
| private final int sidecarPort; |
| |
| private ClientConfig(Map<String, String> options) |
| { |
| this.sidecarInstances = MapUtils.getOrThrow(options, SIDECAR_INSTANCES, "sidecar_instances"); |
| this.keyspace = MapUtils.getOrThrow(options, KEYSPACE_KEY, "keyspace"); |
| this.table = MapUtils.getOrThrow(options, TABLE_KEY, "table"); |
| this.snapshotName = MapUtils.getOrDefault(options, SNAPSHOT_NAME_KEY, "sbr_" + UUID.randomUUID().toString().replace("-", "")); |
| this.datacenter = options.get(MapUtils.lowerCaseKey(DC_KEY)); |
| this.createSnapshot = MapUtils.getBoolean(options, CREATE_SNAPSHOT_KEY, true); |
| this.clearSnapshot = MapUtils.getBoolean(options, CLEAR_SNAPSHOT_KEY, createSnapshot); |
| this.defaultParallelism = MapUtils.getInt(options, DEFAULT_PARALLELISM_KEY, 1); |
| this.numCores = MapUtils.getInt(options, NUM_CORES_KEY, 1); |
| this.consistencyLevel = Optional.ofNullable(options.get(MapUtils.lowerCaseKey(CONSISTENCY_LEVEL_KEY))) |
| .map(ConsistencyLevel::valueOf) |
| .orElse(null); |
| this.bigNumberConfigMap = BigNumberConfigImpl.build(options); |
| this.enableStats = MapUtils.getBoolean(options, ENABLE_STATS_KEY, true); |
| this.readIndexOffset = MapUtils.getBoolean(options, READ_INDEX_OFFSET_KEY, true); |
| this.sizing = MapUtils.getOrDefault(options, SIZING_KEY, SIZING_DEFAULT); |
| this.maxPartitionSize = MapUtils.getInt(options, MAX_PARTITION_SIZE_KEY, 1); |
| this.useIncrementalRepair = MapUtils.getBoolean(options, USE_INCREMENTAL_REPAIR, true); |
| this.lastModifiedTimestampField = MapUtils.getOrDefault(options, LAST_MODIFIED_COLUMN_NAME_KEY, null); |
| this.enableExpansionShrinkCheck = MapUtils.getBoolean(options, ENABLE_EXPANSION_SHRINK_CHECK_KEY, false); |
| this.requestedFeatures = initRequestedFeatures(options); |
| this.sidecarPort = MapUtils.getInt(options, SIDECAR_PORT, DEFAULT_SIDECAR_PORT); |
| } |
| |
| public String sidecarInstances() |
| { |
| return sidecarInstances; |
| } |
| |
| @Nullable |
| public String keyspace() |
| { |
| return keyspace; |
| } |
| |
| @Nullable |
| public String table() |
| { |
| return table; |
| } |
| |
| public String snapshotName() |
| { |
| return snapshotName; |
| } |
| |
| public String datacenter() |
| { |
| return datacenter; |
| } |
| |
| public boolean createSnapshot() |
| { |
| return createSnapshot; |
| } |
| |
| public boolean clearSnapshot() |
| { |
| return clearSnapshot; |
| } |
| |
| public int getDefaultParallelism() |
| { |
| return defaultParallelism; |
| } |
| |
| public int numCores() |
| { |
| return numCores; |
| } |
| |
| public ConsistencyLevel consistencyLevel() |
| { |
| return consistencyLevel; |
| } |
| |
| public Map<String, BigNumberConfigImpl> bigNumberConfigMap() |
| { |
| return bigNumberConfigMap; |
| } |
| |
| public boolean enableStats() |
| { |
| return enableStats; |
| } |
| |
| public boolean readIndexOffset() |
| { |
| return readIndexOffset; |
| } |
| |
| public String sizing() |
| { |
| return sizing; |
| } |
| |
| public int maxPartitionSize() |
| { |
| return maxPartitionSize; |
| } |
| |
| public boolean useIncrementalRepair() |
| { |
| return useIncrementalRepair; |
| } |
| |
| public List<SchemaFeature> requestedFeatures() |
| { |
| return requestedFeatures; |
| } |
| |
| public String lastModifiedTimestampField() |
| { |
| return lastModifiedTimestampField; |
| } |
| |
| public Boolean enableExpansionShrinkCheck() |
| { |
| return enableExpansionShrinkCheck; |
| } |
| |
| public int sidecarPort() |
| { |
| return sidecarPort; |
| } |
| |
| public static ClientConfig create(Map<String, String> options) |
| { |
| return new ClientConfig(options); |
| } |
| |
| private List<SchemaFeature> initRequestedFeatures(Map<String, String> options) |
| { |
| Map<String, String> optionsCopy = new HashMap<>(options); |
| String lastModifiedColumnName = MapUtils.getOrDefault(options, LAST_MODIFIED_COLUMN_NAME_KEY, null); |
| if (lastModifiedColumnName != null) |
| { |
| optionsCopy.put(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(), "true"); |
| } |
| List<SchemaFeature> requestedFeatures = SchemaFeatureSet.initializeFromOptions(optionsCopy); |
| if (lastModifiedColumnName != null) |
| { |
| // Create alias to LAST_MODIFICATION_TIMESTAMP |
| aliasLastModifiedTimestamp(requestedFeatures, lastModifiedColumnName); |
| } |
| return requestedFeatures; |
| } |
| } |
| |
| private static void aliasLastModifiedTimestamp(List<SchemaFeature> requestedFeatures, String alias) |
| { |
| SchemaFeature featureAlias = new SchemaFeature() |
| { |
| @Override |
| public String optionName() |
| { |
| return SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(); |
| } |
| |
| @Override |
| public String fieldName() |
| { |
| return alias; |
| } |
| |
| @Override |
| public DataType fieldDataType() |
| { |
| return SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.fieldDataType(); |
| } |
| |
| @Override |
| public RowBuilder decorate(RowBuilder builder) |
| { |
| return new LastModifiedTimestampDecorator(builder, alias); |
| } |
| |
| @Override |
| public boolean fieldNullable() |
| { |
| return SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.fieldNullable(); |
| } |
| }; |
| int index = requestedFeatures.indexOf(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP); |
| if (index >= 0) |
| { |
| requestedFeatures.set(index, featureAlias); |
| } |
| } |
| } |