blob: 31d2a3b0714287fabe76634856ac37abee42476b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import javax.annotation.concurrent.NotThreadSafe;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.io.util.CdcRandomAccessReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.spark.cdc.watermarker.Watermarker;
import org.apache.cassandra.spark.exceptions.TransportFailureException;
import org.apache.cassandra.spark.reader.ReaderUtils;
import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
import org.apache.cassandra.spark.utils.LoggerHelper;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
/**
* Copied and refactored from org.apache.cassandra.db.commitlog.CommitLogReader
* to read from generic source not tied to java.io.File and local file system
*/
@NotThreadSafe
public class BufferingCommitLogReader implements CommitLogReadHandler, AutoCloseable, Comparable<BufferingCommitLogReader>
{
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
@VisibleForTesting
public static final int ALL_MUTATIONS = -1;
private final TableMetadata table;
private final org.apache.cassandra.spark.cdc.CommitLog log;
@Nullable
final CdcOffsetFilter offsetFilter;
private final CRC32 checksum;
List<PartitionUpdateWrapper> updates;
@Nullable
private final SparkRangeFilter sparkRangeFilter;
private byte[] buffer;
private RandomAccessReader reader;
private CommitLogDescriptor descriptor = null;
private final ReadStatusTracker statusTracker;
private int position = 0;
@NotNull
private final org.apache.cassandra.spark.cdc.CommitLog.Marker highWaterMark;
private final LoggerHelper logger;
@VisibleForTesting
public BufferingCommitLogReader(@NotNull TableMetadata table,
@NotNull org.apache.cassandra.spark.cdc.CommitLog log,
@NotNull Watermarker watermarker)
{
this(table, null, log, null, watermarker.highWaterMark(log.instance()), 0);
}
public BufferingCommitLogReader(@NotNull TableMetadata table,
@Nullable CdcOffsetFilter offsetFilter,
@NotNull org.apache.cassandra.spark.cdc.CommitLog log,
@Nullable SparkRangeFilter sparkRangeFilter,
@Nullable org.apache.cassandra.spark.cdc.CommitLog.Marker highWaterMark,
int partitionId)
{
this.table = table;
this.offsetFilter = offsetFilter;
this.log = log;
this.updates = new ArrayList<>();
this.sparkRangeFilter = sparkRangeFilter;
this.statusTracker = new ReadStatusTracker(ALL_MUTATIONS, false);
this.checksum = new CRC32();
this.buffer = new byte[CdcRandomAccessReader.DEFAULT_BUFFER_SIZE];
this.reader = BufferingCommitLogReader.reader(log);
this.logger = new LoggerHelper(LoggerFactory.getLogger(BufferingCommitLogReader.class),
"instance", log.instance().nodeName(),
"dc", log.instance().dataCenter(),
"log", log.name(),
"size", log.maxOffset(),
"partitionId", partitionId);
this.highWaterMark = highWaterMark != null ? highWaterMark : log.zeroMarker();
try
{
readHeader();
if (skip())
{
// If we can skip this CommitLog, close immediately
close();
}
else
{
read();
}
}
catch (Throwable throwable)
{
close();
if (isNotFoundError(throwable))
{
return;
}
logger.warn("Exception reading CommitLog", throwable);
throw new RuntimeException(throwable);
}
}
public static RandomAccessReader reader(org.apache.cassandra.spark.cdc.CommitLog log)
{
return new CdcRandomAccessReader(log);
}
private void readHeader() throws IOException
{
long startTimeNanos = System.nanoTime();
try
{
descriptor = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
}
catch (IOException exception)
{
// Let recover deal with it
logger.warn("IOException reading CommitLog header", exception);
}
if (descriptor == null)
{
// Don't care about whether or not the handler thinks we can continue. We can't without descriptor.
// Whether or not we can continue depends on whether this is the last segment.
handleUnrecoverableError(new CommitLogReadException(
String.format("Could not read commit log descriptor in file %s", log.name()),
CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
false));
}
else
{
logger.info("Read log header", "segmentId", descriptor.id,
"compression", descriptor.compression,
"version", descriptor.version,
"messagingVersion", descriptor.getMessagingVersion(),
"timeNanos", System.nanoTime() - startTimeNanos);
}
}
private void read()
{
try
{
readCommitLogSegment();
}
catch (Throwable throwable)
{
if (isNotFoundError(throwable))
{
return;
}
Throwable cause = ThrowableUtils.rootCause(throwable);
logger.warn("Exception reading CommitLog", cause);
throw new RuntimeException(cause);
}
}
private boolean isNotFoundError(Throwable throwable)
{
TransportFailureException transportEx = ThrowableUtils.rootCause(throwable, TransportFailureException.class);
if (transportEx != null && transportEx.isNotFound())
{
// Underlying CommitLog may have been removed before/during reading
// this should only happen when CommitLog is old and can be removed
logger.warn("CommitLog not found, assuming removed by underlying storage", transportEx);
return true;
}
return false;
}
/**
* Reads mutations from file, handing them off to handler
*
* @throws IOException IOException
*/
private void readCommitLogSegment() throws IOException
{
long startTimeNanos = System.nanoTime();
SeekableCommitLogSegmentReader segmentReader;
try
{
segmentReader = new SeekableCommitLogSegmentReader(this, descriptor, reader, logger, false);
}
catch (Exception exception)
{
handleUnrecoverableError(new CommitLogReadException(
String.format("Unable to create segment reader for commit log file: %s", exception),
CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
false));
return;
}
try
{
if (descriptor.id == highWaterMark.segmentId() && reader.getFilePointer() < highWaterMark.position())
{
segmentReader.seek(highWaterMark.position());
}
for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
{
// Only tolerate truncationSerializationHeader if we allow in both global and segment
// statusTracker.tolerateErrorsInSection = tolerateTruncation && syncSegment.toleratesErrorsInSection;
statusTracker.errorContext = String.format("Next section at %d in %s",
syncSegment.fileStartPosition, descriptor.fileName());
readSection(syncSegment.input, syncSegment.endPosition, descriptor);
// Track the position at end of previous section after successfully reading mutations,
// so we can update highwater mark after reading
position = (int) reader.getFilePointer();
if (!statusTracker.shouldContinue())
{
break;
}
}
}
// Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) cannot throw a checked exception,
// so we check to see if a RuntimeException is wrapping an IOException
catch (RuntimeException exception)
{
if (exception.getCause() instanceof IOException)
{
throw (IOException) exception.getCause();
}
throw exception;
}
logger.info("Finished reading commit log", "updates", updates.size(),
"timeNanos", System.nanoTime() - startTimeNanos);
}
public boolean skip() throws IOException
{
if (shouldSkip(reader))
{
logger.info("Skipping playback of empty log");
return true;
}
// Just transform from the file name (no reading of headSkipping playback of empty log:ers) to determine version
long segmentIdFromFilename = CommitLogDescriptor.fromFileName(log.name()).id;
if (segmentIdFromFilename != descriptor.id)
{
CommitLogReadException readException = new CommitLogReadException(
String.format("Segment id mismatch (filename %d, descriptor %d) in file %s",
segmentIdFromFilename, descriptor.id, log.name()),
CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
false);
return shouldSkipSegmentOnError(readException);
}
return shouldSkipSegmentId();
}
/**
* Peek the next 8 bytes to determine if it reaches the end of the file.
* It should <b>only</b> be called immediately after reading the CommitLog header.
*
* @return true to skip; otherwise, return false
* @throws IOException io exception
*/
private static boolean shouldSkip(RandomAccessReader reader) throws IOException
{
try
{
reader.mark(); // Mark position
int end = reader.readInt();
long filecrc = reader.readInt() & 0xFFFFFFFFL;
return end == 0 && filecrc == 0;
}
catch (EOFException exception)
{
// No data to read
return true;
}
finally
{
// Return to marked position before reading mutations
reader.reset();
}
}
/**
* Any segment with id >= minPosition.segmentId is a candidate for read
*/
private boolean shouldSkipSegmentId()
{
logger.debug("Reading commit log", "version", descriptor.version,
"messagingVersion", descriptor.getMessagingVersion(),
"compression", descriptor.compression);
if (highWaterMark.segmentId() > descriptor.id)
{
logger.info("Skipping read of fully-flushed log", "segmentId", descriptor.id,
"minSegmentId", highWaterMark.segmentId());
return true;
}
else
{
return false;
}
}
/**
* Reads a section of a file containing mutations
*
* @param reader FileDataInput / logical buffer containing CommitLog mutations
* @param end logical numeric end of the segment being read
* @param desc Descriptor for CommitLog serialization
*/
private void readSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException
{
while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
{
int mutationStart = (int) reader.getFilePointer();
logger.trace("Reading mutation at", "position", mutationStart);
long claimedCRC32;
int serializedSize;
try
{
// We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
// of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
// However, it's possible with 2.1 era CommitLogs that the last mutation ended less than 4 bytes
// from the end of the file, which means that we'll be unable to read a full int and instead
// read an EOF here.
if (end - reader.getFilePointer() < 4)
{
logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
statusTracker.requestTermination();
return;
}
// Any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
logger.trace("Encountered end of segment marker at", "position", reader.getFilePointer());
statusTracker.requestTermination();
return;
}
// Mutation must be at LEAST 10 bytes:
// 3 for a non-empty Keyspace
// 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
// 4 bytes for column count
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
{
if (shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Invalid mutation size %d at %d in %s",
serializedSize, mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
return;
}
long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader);
checksum.reset();
CommitLogFormat.updateChecksum(checksum, serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
{
if (shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Mutation size checksum failure at %d in %s",
mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
return;
}
if (serializedSize > buffer.length)
{
buffer = new byte[(int) (1.2 * serializedSize)];
}
reader.readFully(buffer, 0, serializedSize);
claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader);
}
catch (EOFException exception)
{
if (shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Unexpected end of segment at %d in %s",
mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.EOF,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
return;
}
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
if (shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Mutation checksum failure at %d in %s",
mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
continue;
}
int mutationPosition = (int) reader.getFilePointer();
readMutationInternal(buffer, serializedSize, mutationPosition, desc);
statusTracker.addProcessedMutation();
}
}
/**
* Deserializes and passes a Mutation to the ICommitLogReadHandler requested
*
* @param inputBuffer raw byte array w/Mutation data
* @param size deserialized size of mutation
* @param mutationPosition filePointer offset of end of mutation within CommitLogSegment
* @param descriptor CommitLogDescriptor being worked on
*/
@VisibleForTesting
private void readMutationInternal(byte[] inputBuffer,
int size,
int mutationPosition,
CommitLogDescriptor descriptor) throws IOException
{
// For now, we need to go through the motions of deserializing the mutation to determine its size and move
// the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment
Mutation mutation;
try (RebufferingInputStream bufferIn = new DataInputBuffer(inputBuffer, 0, size))
{
mutation = Mutation.serializer.deserialize(bufferIn,
descriptor.getMessagingVersion(),
DeserializationHelper.Flag.LOCAL);
// Double-check that what we read is still valid for the current schema
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
update.validate();
}
}
catch (UnknownTableException exception)
{
if (exception.id == null)
{
return;
}
// We see many unknown table exception logs when we skip over mutations from other tables
logger.trace("Invalid mutation", exception);
return;
}
catch (Throwable throwable)
{
JVMStabilityInspector.inspectThrowable(throwable);
Path path = Files.createTempFile("mutation", "dat");
try (DataOutputStream out = new DataOutputStream(Files.newOutputStream(path)))
{
out.write(inputBuffer, 0, size);
}
// Checksum passed so this error can't be permissible
handleUnrecoverableError(new CommitLogReadException(
String.format("Unexpected error deserializing mutation; saved to %s. "
+ "This may be caused by replaying a mutation against a table with the same name but incompatible schema. "
+ "Exception follows: %s", path, throwable),
CommitLogReadErrorReason.MUTATION_ERROR,
false));
return;
}
logger.trace("Read mutation for", () -> "keyspace", mutation::getKeyspaceName,
() -> "key", mutation::key,
() -> "mutation", () -> mutation.getPartitionUpdates().stream()
.map(AbstractBTreePartition::toString)
.collect(Collectors.joining(", ", "{", "}")));
handleMutation(mutation, size, mutationPosition, descriptor);
}
public boolean isReadable()
{
return updates != null;
}
public void close()
{
if (updates == null)
{
return;
}
try
{
reader.close();
reader = null;
updates = null;
}
catch (Throwable throwable)
{
logger.error("Unexpected exception closing reader", throwable);
}
}
public int compareTo(@NotNull BufferingCommitLogReader that)
{
return Long.compare(this.descriptor.id, that.descriptor.id);
}
/**
* Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code
*/
private static class CommitLogFormat
{
public static long calculateClaimedChecksum(FileDataInput input) throws IOException
{
return input.readInt() & 0xFFFFFFFFL;
}
public static void updateChecksum(CRC32 checksum, int serializedSize)
{
updateChecksumInt(checksum, serializedSize);
}
public static long calculateClaimedCRC32(FileDataInput input) throws IOException
{
return input.readInt() & 0xFFFFFFFFL;
}
}
private static class ReadStatusTracker
{
private int mutationsLeft;
public String errorContext = ""; // CHECKSTYLE IGNORE: Public mutable field
public boolean tolerateErrorsInSection; // CHECKSTYLE IGNORE: Public mutable field
private boolean error;
ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
{
this.error = false;
this.mutationsLeft = mutationLimit;
this.tolerateErrorsInSection = tolerateErrorsInSection;
}
public void addProcessedMutation()
{
if (mutationsLeft == ALL_MUTATIONS)
{
return;
}
--mutationsLeft;
}
public boolean shouldContinue()
{
return !error && mutationsLeft != 0;
}
public void requestTermination()
{
error = true;
}
}
/**
* @return result object wrapping list of updates buffered and the final highwater marker position
*/
public Result result()
{
return new Result(this);
}
public static final class Result
{
private final List<PartitionUpdateWrapper> updates;
private final org.apache.cassandra.spark.cdc.CommitLog.Marker marker;
private Result(BufferingCommitLogReader reader)
{
updates = reader.updates;
marker = reader.log.markerAt(reader.descriptor.id, reader.position);
}
public List<PartitionUpdateWrapper> updates()
{
return updates;
}
public org.apache.cassandra.spark.cdc.CommitLog.Marker marker()
{
return marker;
}
}
// CommitLog Read Handler
public boolean shouldSkipSegmentOnError(CommitLogReadException exception)
{
logger.warn("CommitLog error on shouldSkipSegment", exception);
return false;
}
public void handleUnrecoverableError(CommitLogReadException exception) throws IOException
{
logger.error("CommitLog unrecoverable error", exception);
statusTracker.requestTermination();
throw exception;
}
public void handleMutation(Mutation mutation, int size, int mutationPosition, CommitLogDescriptor descriptor)
{
mutation.getPartitionUpdates().stream()
.filter(this::filter)
.map(update -> Pair.create(update, maxTimestamp(update)))
.filter(this::withinTimeWindow)
.map(this::wrapUpdate)
.forEach(updates::add);
}
private long maxTimestamp(PartitionUpdate update)
{
// Row deletion
if (update.rowCount() == 1 && !update.lastRow().deletion().isLive())
{
return update.lastRow().deletion().time().markedForDeleteAt();
}
else
{
return update.maxTimestamp();
}
}
private PartitionUpdateWrapper wrapUpdate(Pair<PartitionUpdate, Long> update)
{
return wrapUpdate(update.left, update.right);
}
private PartitionUpdateWrapper wrapUpdate(PartitionUpdate update, long maxTimestampMicros)
{
return new PartitionUpdateWrapper(table, update, maxTimestampMicros);
}
/**
* @param update the partition update
* @return true if this is a mutation we are looking for
*/
private boolean filter(PartitionUpdate update)
{
return isTable(update) && withinRange(update);
}
private boolean isTable(PartitionUpdate update)
{
return update.metadata().keyspace.equals(table.keyspace)
&& update.metadata().name.equals(table.name);
}
private boolean withinTimeWindow(Pair<PartitionUpdate, Long> update)
{
boolean shouldInclude = withinTimeWindow(update.right);
if (!shouldInclude)
{
logger.info("Exclude the update due to out of the allowed time window.", "update", update.left);
}
return shouldInclude;
}
private boolean withinTimeWindow(long maxTimestampMicros)
{
if (offsetFilter == null)
{
return true;
}
return offsetFilter.overlaps(maxTimestampMicros);
}
/**
* @param update a CommitLog PartitionUpdate
* @return true if PartitionUpdate overlaps with the Spark worker token range
*/
private boolean withinRange(PartitionUpdate update)
{
if (sparkRangeFilter == null)
{
return true;
}
BigInteger token = ReaderUtils.tokenToBigInteger(update.partitionKey().getToken());
return !sparkRangeFilter.skipPartition(token);
}
}