blob: bd7d17f676f2fdb83f18e391b7ab1b2ec5717a8a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.reader;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.commitlog.BufferingCommitLogReader;
import org.apache.cassandra.db.commitlog.PartitionUpdateWrapper;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.cdc.CommitLog;
import org.apache.cassandra.spark.cdc.CommitLogProvider;
import org.apache.cassandra.spark.cdc.watermarker.Watermarker;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.sparksql.RangeTombstoneMarkerImplementation;
import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.FutureUtils;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.spark.TaskContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class CdcScannerBuilder
{
private static final Logger LOGGER = LoggerFactory.getLogger(CdcScannerBuilder.class);
// Match both legacy and new version of CommitLogs, for example: CommitLog-12345.log and CommitLog-4-12345.log
private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog(-\\d+)*-(\\d+).log");
private static final CompletableFuture<BufferingCommitLogReader.Result> NO_OP_FUTURE =
CompletableFuture.completedFuture(null);
final TableMetadata table;
final Partitioner partitioner;
final Stats stats;
final Map<CassandraInstance, CompletableFuture<List<PartitionUpdateWrapper>>> futures;
final int minimumReplicasPerMutation;
@Nullable
private final SparkRangeFilter sparkRangeFilter;
@Nullable
private final CdcOffsetFilter offsetFilter;
@NotNull
final Watermarker watermarker;
private final int partitionId;
private final long startTimeNanos;
@NotNull
private final TimeProvider timeProvider;
// CHECKSTYLE IGNORE: Constructor with many parameters
public CdcScannerBuilder(int partitionId,
TableMetadata table,
Partitioner partitioner,
CommitLogProvider commitLogs,
Stats stats,
@Nullable SparkRangeFilter sparkRangeFilter,
@Nullable CdcOffsetFilter offsetFilter,
int minimumReplicasPerMutation,
@NotNull Watermarker jobWatermarker,
@NotNull String jobId,
@NotNull ExecutorService executorService,
@NotNull TimeProvider timeProvider)
{
this.table = table;
this.partitioner = partitioner;
this.stats = stats;
this.sparkRangeFilter = sparkRangeFilter;
this.offsetFilter = offsetFilter;
this.watermarker = jobWatermarker.instance(jobId);
Preconditions.checkArgument(minimumReplicasPerMutation >= 1,
"minimumReplicasPerMutation should be at least 1");
this.minimumReplicasPerMutation = minimumReplicasPerMutation;
this.startTimeNanos = System.nanoTime();
this.timeProvider = timeProvider;
Map<CassandraInstance, List<CommitLog>> logs = commitLogs
.logs()
.collect(Collectors.groupingBy(CommitLog::instance, Collectors.toList()));
Map<CassandraInstance, CommitLog.Marker> markers = logs.keySet().stream()
.map(watermarker::highWaterMark)
.filter(Objects::nonNull)
.collect(Collectors.toMap(CommitLog.Marker::instance, Function.identity()));
this.partitionId = partitionId;
LOGGER.info("Opening CdcScanner numInstances={} start={} maxAgeMicros={} partitionId={} listLogsTimeNanos={}",
logs.size(),
offsetFilter != null ? offsetFilter.start().getTimestampMicros() : null,
offsetFilter != null ? offsetFilter.maxAgeMicros() : null,
partitionId,
System.nanoTime() - startTimeNanos);
this.futures = logs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> openInstanceAsync(entry.getValue(),
markers.get(entry.getKey()),
executorService)));
}
private static boolean skipCommitLog(@NotNull CommitLog log, @Nullable CommitLog.Marker highwaterMark)
{
if (highwaterMark == null)
{
return false;
}
Long segmentId = extractSegmentId(log);
if (segmentId != null)
{
// Only read CommitLog if greater than or equal to previously read CommitLog segmentId
return segmentId < highwaterMark.segmentId();
}
return true;
}
@Nullable
public static Long extractSegmentId(@NotNull CommitLog log)
{
return extractSegmentId(log.name());
}
@Nullable
public static Long extractSegmentId(@NotNull String filename)
{
Matcher matcher = CdcScannerBuilder.COMMIT_LOG_FILE_PATTERN.matcher(filename);
if (matcher.matches())
{
try
{
return Long.parseLong(matcher.group(2));
}
catch (NumberFormatException exception)
{
LOGGER.error("Could not parse commit log segmentId name={}", filename, exception);
return null;
}
}
LOGGER.error("Could not parse commit log filename name={}", filename);
return null; // Cannot extract segment id
}
private CompletableFuture<List<PartitionUpdateWrapper>> openInstanceAsync(@NotNull List<CommitLog> logs,
@Nullable CommitLog.Marker highWaterMark,
@NotNull ExecutorService executorService)
{
// Read all CommitLogs on instance async and combine into single future,
// if we fail to read any CommitLog on the instance we fail this instance
List<CompletableFuture<BufferingCommitLogReader.Result>> futures = logs.stream()
.map(log -> openReaderAsync(log, highWaterMark, executorService))
.collect(Collectors.toList());
return FutureUtils.combine(futures)
.thenApply(result -> {
// Update highwater mark on success, if instance fails we don't update
// highwater mark so resume from original position on next attempt
result.stream()
.map(BufferingCommitLogReader.Result::marker)
.max(CommitLog.Marker::compareTo)
.ifPresent(watermarker::updateHighWaterMark);
// Combine all updates into single list
return result.stream()
.map(BufferingCommitLogReader.Result::updates)
.flatMap(Collection::stream)
.collect(Collectors.toList());
});
}
private CompletableFuture<BufferingCommitLogReader.Result> openReaderAsync(@NotNull CommitLog log,
@Nullable CommitLog.Marker highWaterMark,
@NotNull ExecutorService executorService)
{
if (skipCommitLog(log, highWaterMark))
{
return NO_OP_FUTURE;
}
return CompletableFuture.supplyAsync(() -> openReader(log, highWaterMark), executorService);
}
@Nullable
private BufferingCommitLogReader.Result openReader(@NotNull CommitLog log, @Nullable CommitLog.Marker highWaterMark)
{
long startTimeNanos = System.nanoTime();
LOGGER.info("Opening BufferingCommitLogReader instance={} log={} high='{}' partitionId={}",
log.instance().nodeName(), log.name(), highWaterMark, partitionId);
try (BufferingCommitLogReader reader =
new BufferingCommitLogReader(table, offsetFilter, log, sparkRangeFilter, highWaterMark, partitionId))
{
if (reader.isReadable())
{
return reader.result();
}
}
finally
{
LOGGER.info("Finished reading log on instance instance={} log={} partitionId={} timeNanos={}",
log.instance().nodeName(), log.name(), partitionId, System.nanoTime() - startTimeNanos);
}
return null;
}
public StreamScanner build()
{
// Block on futures to read all CommitLog mutations and pass over to SortedStreamScanner
List<PartitionUpdateWrapper> updates = futures.values().stream()
.map(future -> FutureUtils.await(future, throwable -> LOGGER.warn("Failed to read instance with error",
throwable)))
.filter(FutureUtils.FutureResult::isSuccess)
.map(FutureUtils.FutureResult::value)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
futures.clear();
schedulePersist();
Collection<PartitionUpdateWrapper> filtered = filterValidUpdates(updates);
LOGGER.info("Opened CdcScanner start={} maxAgeMicros={} partitionId={} timeNanos={}",
offsetFilter != null ? offsetFilter.start().getTimestampMicros() : null,
offsetFilter != null ? offsetFilter.maxAgeMicros() : null,
partitionId,
System.nanoTime() - startTimeNanos);
return new SortedStreamScanner(table, partitioner, filtered, timeProvider);
}
/**
* A stream scanner that is backed by a sorted collection of {@link PartitionUpdateWrapper}
*/
private static class SortedStreamScanner extends AbstractStreamScanner
{
private final Queue<PartitionUpdateWrapper> updates;
SortedStreamScanner(@NotNull TableMetadata metadata,
@NotNull Partitioner partitionerType,
@NotNull Collection<PartitionUpdateWrapper> updates,
@NotNull TimeProvider timeProvider)
{
super(metadata, partitionerType, timeProvider);
this.updates = new PriorityQueue<>(PartitionUpdateWrapper::compareTo);
this.updates.addAll(updates);
}
@Override
UnfilteredPartitionIterator initializePartitions()
{
return new UnfilteredPartitionIterator()
{
private PartitionUpdateWrapper next;
@Override
public TableMetadata metadata()
{
return metadata;
}
@Override
public void close()
{
// Do nothing
}
@Override
public boolean hasNext()
{
if (next == null)
{
next = updates.poll();
}
return next != null;
}
@Override
public UnfilteredRowIterator next()
{
PartitionUpdate update = next.partitionUpdate();
next = null;
return update.unfilteredIterator();
}
};
}
@Override
public void close()
{
updates.clear();
}
@Override
protected void handleRowTombstone(Row row)
{
// Prepare clustering data to be consumed the next
columnData = new ClusteringColumnDataState(row.clustering());
rid.setTimestamp(row.deletion().time().markedForDeleteAt());
// Flag was reset at org.apache.cassandra.spark.sparksql.SparkCellIterator.getNext
rid.setRowDeletion(true);
}
@Override
protected void handlePartitionTombstone(UnfilteredRowIterator partition)
{
rid.setPartitionKeyCopy(partition.partitionKey().getKey(),
ReaderUtils.tokenToBigInteger(partition.partitionKey().getToken()));
rid.setTimestamp(partition.partitionLevelDeletion().markedForDeleteAt());
// Flag was reset at org.apache.cassandra.spark.sparksql.SparkCellIterator.getNext
rid.setPartitionDeletion(true);
}
@Override
protected void handleCellTombstone()
{
rid.setValueCopy(null);
}
@Override
protected void handleCellTombstoneInComplex(Cell<?> cell)
{
if (cell.column().type instanceof ListType)
{
LOGGER.warn("Unable to process element deletions inside a List type. Skipping...");
return;
}
CellPath path = cell.path();
if (0 < path.size()) // Size can either be 0 (EmptyCellPath) or 1 (SingleItemCellPath)
{
rid.addCellTombstoneInComplex(path.get(0));
}
}
@Override
protected void handleRangeTombstone(RangeTombstoneMarker marker)
{
rid.addRangeTombstoneMarker(new RangeTombstoneMarkerImplementation(marker));
}
}
private void schedulePersist()
{
// Add task listener to persist Watermark on task success
TaskContext.get().addTaskCompletionListener(context -> {
if (context.isCompleted() && context.fetchFailed().isEmpty())
{
LOGGER.info("Persisting Watermark on task completion partitionId={}", partitionId);
// Once we have read all CommitLogs we can persist the watermark state
watermarker.persist(offsetFilter != null ? offsetFilter.maxAgeMicros() : null);
}
else
{
LOGGER.warn("Not persisting Watermark due to task failure partitionId={}",
partitionId, context.fetchFailed().get());
}
});
}
/**
* Get rid of invalid updates from the updates
*
* @param updates, a collection of PartitionUpdateWrappers
* @return a new updates without invalid updates
*/
private Collection<PartitionUpdateWrapper> filterValidUpdates(Collection<PartitionUpdateWrapper> updates)
{
// Only filter if it demands more than 1 replicas to compact
if (minimumReplicasPerMutation == 1 || updates.isEmpty())
{
return updates;
}
Map<PartitionUpdateWrapper, List<PartitionUpdateWrapper>> replicaCopies = updates.stream()
.collect(Collectors.groupingBy(update -> update, Collectors.toList()));
return replicaCopies.values().stream()
.filter(this::filter) // Discard PartitionUpdate without enough replicas
.map(update -> update.get(0)) // Deduplicate the valid updates to just singe copy
.collect(Collectors.toList());
}
private boolean filter(List<PartitionUpdateWrapper> updates)
{
return filter(updates, minimumReplicasPerMutation, watermarker, stats);
}
static boolean filter(List<PartitionUpdateWrapper> updates,
int minimumReplicasPerMutation,
Watermarker watermarker,
Stats stats)
{
if (updates.isEmpty())
{
throw new IllegalStateException("Should not received empty list of updates");
}
PartitionUpdateWrapper update = updates.get(0);
PartitionUpdate partitionUpdate = update.partitionUpdate();
int numReplicas = updates.size() + watermarker.replicaCount(update);
if (numReplicas < minimumReplicasPerMutation)
{
// Insufficient replica copies to publish, so record replica count and handle on subsequent round
LOGGER.warn("Ignore the partition update (partition key: '{}') for this batch "
+ "due to insufficient replicas received. {} required {} received.",
partitionUpdate != null ? partitionUpdate.partitionKey() : "unknown",
minimumReplicasPerMutation, numReplicas);
watermarker.recordReplicaCount(update, numReplicas);
stats.insufficientReplicas(update, updates.size(), minimumReplicasPerMutation);
return false;
}
// Sufficient Replica Copies to Publish
if (updates.stream().anyMatch(watermarker::seenBefore))
{
// Mutation previously marked as late, now we have sufficient replica copies to publish,
// so clear watermark and publish now
LOGGER.info("Achieved consistency level for late partition update (partition key: '{}'). {} received.",
partitionUpdate != null ? partitionUpdate.partitionKey() : "unknown", numReplicas);
watermarker.untrackReplicaCount(update);
stats.lateMutationPublished(update);
return true;
}
// We haven't seen this mutation before and achieved CL, so publish
stats.publishedMutation(update);
return true;
}
public static class CdcScanner implements ISSTableScanner
{
final TableMetadata tableMetadata;
final PartitionUpdate update;
UnfilteredRowIterator it;
public CdcScanner(TableMetadata tableMetadata, PartitionUpdate update)
{
this.tableMetadata = tableMetadata;
this.update = update;
this.it = update.unfilteredIterator();
}
@Override
public long getLengthInBytes()
{
return 0;
}
@Override
public long getCompressedLengthInBytes()
{
return 0;
}
@Override
public long getCurrentPosition()
{
return 0;
}
@Override
public long getBytesScanned()
{
return 0;
}
@Override
public Set<SSTableReader> getBackingSSTables()
{
return Collections.emptySet();
}
@Override
public TableMetadata metadata()
{
return tableMetadata;
}
@Override
public void close()
{
}
@Override
public boolean hasNext()
{
return it != null;
}
@Override
public UnfilteredRowIterator next()
{
UnfilteredRowIterator result = it;
it = null;
return result;
}
}
}