| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.cassandra.db.commitlog; |
| |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.zip.CRC32; |
| |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Multimap; |
| import com.google.common.collect.Ordering; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.cassandra.concurrent.Stage; |
| import org.apache.cassandra.concurrent.StageManager; |
| import org.apache.cassandra.config.CFMetaData; |
| import org.apache.cassandra.config.Schema; |
| import org.apache.cassandra.db.*; |
| import org.apache.cassandra.db.rows.SerializationHelper; |
| import org.apache.cassandra.db.partitions.PartitionUpdate; |
| import org.apache.cassandra.exceptions.ConfigurationException; |
| import org.apache.cassandra.io.util.FileSegmentInputStream; |
| import org.apache.cassandra.io.util.RebufferingInputStream; |
| import org.apache.cassandra.schema.CompressionParams; |
| import org.apache.cassandra.io.compress.ICompressor; |
| import org.apache.cassandra.io.sstable.format.SSTableReader; |
| import org.apache.cassandra.io.util.ChannelProxy; |
| import org.apache.cassandra.io.util.DataInputBuffer; |
| import org.apache.cassandra.io.util.FileDataInput; |
| import org.apache.cassandra.io.util.RandomAccessReader; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| import org.apache.cassandra.utils.WrappedRunnable; |
| import org.cliffc.high_scale_lib.NonBlockingHashSet; |
| |
| import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; |
| |
| public class CommitLogReplayer |
| { |
| static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors"; |
| private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); |
| private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024); |
| private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; |
| |
| private final Set<Keyspace> keyspacesRecovered; |
| private final List<Future<?>> futures; |
| private final Map<UUID, AtomicInteger> invalidMutations; |
| private final AtomicInteger replayedCount; |
| private final Map<UUID, IntervalSet<ReplayPosition>> cfPersisted; |
| private final ReplayPosition globalPosition; |
| private final CRC32 checksum; |
| private byte[] buffer; |
| private byte[] uncompressedBuffer; |
| |
| private final ReplayFilter replayFilter; |
| private final CommitLogArchiver archiver; |
| |
| CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, IntervalSet<ReplayPosition>> cfPersisted, ReplayFilter replayFilter) |
| { |
| this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>(); |
| this.futures = new ArrayList<Future<?>>(); |
| this.buffer = new byte[4096]; |
| this.uncompressedBuffer = new byte[4096]; |
| this.invalidMutations = new HashMap<UUID, AtomicInteger>(); |
| // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. |
| this.replayedCount = new AtomicInteger(); |
| this.checksum = new CRC32(); |
| this.cfPersisted = cfPersisted; |
| this.globalPosition = globalPosition; |
| this.replayFilter = replayFilter; |
| this.archiver = commitLog.archiver; |
| } |
| |
| public static CommitLogReplayer construct(CommitLog commitLog) |
| { |
| // compute per-CF and global replay intervals |
| Map<UUID, IntervalSet<ReplayPosition>> cfPersisted = new HashMap<>(); |
| ReplayFilter replayFilter = ReplayFilter.create(); |
| for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) |
| { |
| // but, if we've truncated the cf in question, then we need to need to start replay after the truncation |
| ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId); |
| if (truncatedAt != null) |
| { |
| // Point in time restore is taken to mean that the tables need to be recovered even if they were |
| // deleted at a later point in time. Any truncation record after that point must thus be cleared prior |
| // to recovery (CASSANDRA-9195). |
| long restoreTime = commitLog.archiver.restorePointInTime; |
| long truncatedTime = SystemKeyspace.getTruncatedAt(cfs.metadata.cfId); |
| if (truncatedTime > restoreTime) |
| { |
| if (replayFilter.includes(cfs.metadata)) |
| { |
| logger.info("Restore point in time is before latest truncation of table {}.{}. Clearing truncation record.", |
| cfs.metadata.ksName, |
| cfs.metadata.cfName); |
| SystemKeyspace.removeTruncationRecord(cfs.metadata.cfId); |
| truncatedAt = null; |
| } |
| } |
| } |
| |
| IntervalSet<ReplayPosition> filter = persistedIntervals(cfs.getLiveSSTables(), truncatedAt); |
| cfPersisted.put(cfs.metadata.cfId, filter); |
| } |
| ReplayPosition globalPosition = firstNotCovered(cfPersisted.values()); |
| logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted)); |
| return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter); |
| } |
| |
| private static boolean shouldSkip(File file) throws IOException, ConfigurationException |
| { |
| CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); |
| if (desc.version < CommitLogDescriptor.VERSION_21) |
| { |
| return false; |
| } |
| try(ChannelProxy channel = new ChannelProxy(file); |
| RandomAccessReader reader = RandomAccessReader.open(channel)) |
| { |
| CommitLogDescriptor.readHeader(reader); |
| int end = reader.readInt(); |
| long filecrc = reader.readInt() & 0xffffffffL; |
| return end == 0 && filecrc == 0; |
| } |
| } |
| |
| private static List<File> filterCommitLogFiles(File[] toFilter) |
| { |
| List<File> filtered = new ArrayList<>(toFilter.length); |
| for (File file: toFilter) |
| { |
| try |
| { |
| if (shouldSkip(file)) |
| { |
| logger.info("Skipping playback of empty log: {}", file.getName()); |
| } |
| else |
| { |
| filtered.add(file); |
| } |
| } |
| catch (Exception e) |
| { |
| // let recover deal with it |
| filtered.add(file); |
| } |
| } |
| |
| return filtered; |
| } |
| |
| public void recover(File[] clogs) throws IOException |
| { |
| List<File> filteredLogs = filterCommitLogFiles(clogs); |
| |
| int i = 0; |
| for (File clog: filteredLogs) |
| { |
| i++; |
| recover(clog, i == filteredLogs.size()); |
| } |
| } |
| |
| /** |
| * A set of known safe-to-discard commit log replay positions, based on |
| * the range covered by on disk sstables and those prior to the most recent truncation record |
| */ |
| public static IntervalSet<ReplayPosition> persistedIntervals(Iterable<SSTableReader> onDisk, ReplayPosition truncatedAt) |
| { |
| IntervalSet.Builder<ReplayPosition> builder = new IntervalSet.Builder<>(); |
| for (SSTableReader reader : onDisk) |
| builder.addAll(reader.getSSTableMetadata().commitLogIntervals); |
| |
| if (truncatedAt != null) |
| builder.add(ReplayPosition.NONE, truncatedAt); |
| return builder.build(); |
| } |
| |
| /** |
| * Find the earliest commit log position that is not covered by the known flushed ranges for some table. |
| * |
| * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the |
| * given table was constructed* and hence we can start replay from the end of that interval. |
| * |
| * If such an interval is not known, we must replay from the beginning. |
| * |
| * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter |
| * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be |
| * incorrect during replay there is little chance that the affected deployment is in production. |
| */ |
| public static ReplayPosition firstNotCovered(Collection<IntervalSet<ReplayPosition>> ranges) |
| { |
| return ranges.stream() |
| .map(intervals -> Iterables.getFirst(intervals.ends(), ReplayPosition.NONE)) |
| .min(Ordering.natural()) |
| .get(); // iteration is per known-CF, there must be at least one. |
| } |
| |
| public int blockForWrites() |
| { |
| for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet()) |
| logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey())); |
| |
| // wait for all the writes to finish on the mutation stage |
| FBUtilities.waitOnFutures(futures); |
| logger.trace("Finished waiting on mutations from recovery"); |
| |
| // flush replayed keyspaces |
| futures.clear(); |
| boolean flushingSystem = false; |
| for (Keyspace keyspace : keyspacesRecovered) |
| { |
| if (keyspace.getName().equals(SystemKeyspace.NAME)) |
| flushingSystem = true; |
| |
| futures.addAll(keyspace.flush()); |
| } |
| |
| // also flush batchlog incase of any MV updates |
| if (!flushingSystem) |
| futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush()); |
| |
| FBUtilities.waitOnFutures(futures); |
| return replayedCount.get(); |
| } |
| |
| private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException |
| { |
| if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) |
| { |
| // There was no room in the segment to write a final header. No data could be present here. |
| return -1; |
| } |
| reader.seek(offset); |
| CRC32 crc = new CRC32(); |
| updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL)); |
| updateChecksumInt(crc, (int) (descriptor.id >>> 32)); |
| updateChecksumInt(crc, (int) reader.getPosition()); |
| int end = reader.readInt(); |
| long filecrc = reader.readInt() & 0xffffffffL; |
| if (crc.getValue() != filecrc) |
| { |
| if (end != 0 || filecrc != 0) |
| { |
| handleReplayError(false, null, |
| "Encountered bad header at position %d of commit log %s, with invalid CRC. " + |
| "The end of segment marker should be zero.", |
| offset, reader.getPath()); |
| } |
| return -1; |
| } |
| else if (end < offset || end > reader.length()) |
| { |
| handleReplayError(tolerateTruncation, null, |
| "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", |
| offset, reader.getPath()); |
| return -1; |
| } |
| return end; |
| } |
| |
| abstract static class ReplayFilter |
| { |
| public abstract Iterable<PartitionUpdate> filter(Mutation mutation); |
| |
| public abstract boolean includes(CFMetaData metadata); |
| |
| public static ReplayFilter create() |
| { |
| // If no replaylist is supplied an empty array of strings is used to replay everything. |
| if (System.getProperty("cassandra.replayList") == null) |
| return new AlwaysReplayFilter(); |
| |
| Multimap<String, String> toReplay = HashMultimap.create(); |
| for (String rawPair : System.getProperty("cassandra.replayList").split(",")) |
| { |
| String[] pair = rawPair.trim().split("\\."); |
| if (pair.length != 2) |
| throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'"); |
| |
| Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]); |
| if (ks == null) |
| throw new IllegalArgumentException("Unknown keyspace " + pair[0]); |
| ColumnFamilyStore cfs = ks.getColumnFamilyStore(pair[1]); |
| if (cfs == null) |
| throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1])); |
| |
| toReplay.put(pair[0], pair[1]); |
| } |
| return new CustomReplayFilter(toReplay); |
| } |
| } |
| |
| private static class AlwaysReplayFilter extends ReplayFilter |
| { |
| public Iterable<PartitionUpdate> filter(Mutation mutation) |
| { |
| return mutation.getPartitionUpdates(); |
| } |
| |
| public boolean includes(CFMetaData metadata) |
| { |
| return true; |
| } |
| } |
| |
| private static class CustomReplayFilter extends ReplayFilter |
| { |
| private Multimap<String, String> toReplay; |
| |
| public CustomReplayFilter(Multimap<String, String> toReplay) |
| { |
| this.toReplay = toReplay; |
| } |
| |
| public Iterable<PartitionUpdate> filter(Mutation mutation) |
| { |
| final Collection<String> cfNames = toReplay.get(mutation.getKeyspaceName()); |
| if (cfNames == null) |
| return Collections.emptySet(); |
| |
| return Iterables.filter(mutation.getPartitionUpdates(), new Predicate<PartitionUpdate>() |
| { |
| public boolean apply(PartitionUpdate upd) |
| { |
| return cfNames.contains(upd.metadata().cfName); |
| } |
| }); |
| } |
| |
| public boolean includes(CFMetaData metadata) |
| { |
| return toReplay.containsEntry(metadata.ksName, metadata.cfName); |
| } |
| } |
| |
| /** |
| * consult the known-persisted ranges for our sstables; |
| * if the position is covered by one of them it does not need to be replayed |
| * |
| * @return true iff replay is necessary |
| */ |
| private boolean shouldReplay(UUID cfId, ReplayPosition position) |
| { |
| return !cfPersisted.get(cfId).contains(position); |
| } |
| |
| @SuppressWarnings("resource") |
| public void recover(File file, boolean tolerateTruncation) throws IOException |
| { |
| CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); |
| try(ChannelProxy channel = new ChannelProxy(file); |
| RandomAccessReader reader = RandomAccessReader.open(channel)) |
| { |
| if (desc.version < CommitLogDescriptor.VERSION_21) |
| { |
| if (logAndCheckIfShouldSkip(file, desc)) |
| return; |
| if (globalPosition.segment == desc.id) |
| reader.seek(globalPosition.position); |
| replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation); |
| return; |
| } |
| |
| final long segmentId = desc.id; |
| try |
| { |
| desc = CommitLogDescriptor.readHeader(reader); |
| } |
| catch (IOException e) |
| { |
| desc = null; |
| } |
| if (desc == null) { |
| // Presumably a failed CRC or other IO error occurred, which may be ok if it's the last segment |
| // where we tolerate (and expect) truncation |
| handleReplayError(tolerateTruncation, null, "Could not read commit log descriptor in file %s", file); |
| return; |
| } |
| if (segmentId != desc.id) |
| { |
| handleReplayError(false, null,"Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file); |
| // continue processing if ignored. |
| } |
| |
| if (logAndCheckIfShouldSkip(file, desc)) |
| return; |
| |
| ICompressor compressor = null; |
| if (desc.compression != null) |
| { |
| try |
| { |
| compressor = CompressionParams.createCompressor(desc.compression); |
| } |
| catch (ConfigurationException e) |
| { |
| handleReplayError(false, null, "Unknown compression: %s", e.getMessage()); |
| return; |
| } |
| } |
| |
| assert reader.length() <= Integer.MAX_VALUE; |
| int end = (int) reader.getFilePointer(); |
| int replayEnd = end; |
| |
| while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0) |
| { |
| int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE; |
| |
| if (logger.isTraceEnabled()) |
| logger.trace("Replaying {} between {} and {}", file, reader.getFilePointer(), end); |
| if (compressor != null) |
| { |
| int uncompressedLength = reader.readInt(); |
| replayEnd = replayPos + uncompressedLength; |
| } |
| else |
| { |
| replayEnd = end; |
| } |
| |
| if (segmentId == globalPosition.segment && replayEnd < globalPosition.position) |
| // Skip over flushed section. |
| continue; |
| |
| FileDataInput sectionReader = reader; |
| String errorContext = desc.fileName(); |
| // In the uncompressed case the last non-fully-flushed section can be anywhere in the file. |
| boolean tolerateErrorsInSection = tolerateTruncation; |
| if (compressor != null) |
| { |
| // In the compressed case we know if this is the last section. |
| tolerateErrorsInSection &= end == reader.length() || end < 0; |
| |
| int start = (int) reader.getFilePointer(); |
| try |
| { |
| int compressedLength = end - start; |
| if (logger.isTraceEnabled()) |
| logger.trace("Decompressing {} between replay positions {} and {}", |
| file, |
| replayPos, |
| replayEnd); |
| if (compressedLength > buffer.length) |
| buffer = new byte[(int) (1.2 * compressedLength)]; |
| reader.readFully(buffer, 0, compressedLength); |
| int uncompressedLength = replayEnd - replayPos; |
| if (uncompressedLength > uncompressedBuffer.length) |
| uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; |
| compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0); |
| sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos); |
| errorContext = "compressed section at " + start + " in " + errorContext; |
| } |
| catch (IOException | ArrayIndexOutOfBoundsException e) |
| { |
| handleReplayError(tolerateErrorsInSection, e, |
| "Unexpected exception decompressing section at %d", |
| start); |
| continue; |
| } |
| } |
| |
| if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection)) |
| break; |
| } |
| logger.debug("Finished reading {}", file); |
| } |
| } |
| |
| public boolean logAndCheckIfShouldSkip(File file, CommitLogDescriptor desc) |
| { |
| logger.debug("Replaying {} (CL version {}, messaging version {}, compression {})", |
| file.getPath(), |
| desc.version, |
| desc.getMessagingVersion(), |
| desc.compression); |
| |
| if (globalPosition.segment > desc.id) |
| { |
| logger.trace("skipping replay of fully-flushed {}", file); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Replays a sync section containing a list of mutations. |
| * |
| * @return Whether replay should continue with the next section. |
| */ |
| private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException |
| { |
| /* read the logs populate Mutation and apply */ |
| while (reader.getFilePointer() < end && !reader.isEOF()) |
| { |
| long mutationStart = reader.getFilePointer(); |
| if (logger.isTraceEnabled()) |
| logger.trace("Reading mutation at {}", mutationStart); |
| |
| long claimedCRC32; |
| int serializedSize; |
| try |
| { |
| // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end |
| // of a segment, which happens naturally due to the 0 padding of the empty segment on creation. |
| // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes |
| // from the end of the file, which means that we'll be unable to read an a full int and instead |
| // read an EOF here |
| if(end - reader.getFilePointer() < 4) |
| { |
| logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing"); |
| return false; |
| } |
| |
| // any of the reads may hit EOF |
| serializedSize = reader.readInt(); |
| if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) |
| { |
| logger.trace("Encountered end of segment marker at {}", reader.getFilePointer()); |
| return false; |
| } |
| |
| // Mutation must be at LEAST 10 bytes: |
| // 3 each for a non-empty Keyspace and Key (including the |
| // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. |
| // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 |
| if (serializedSize < 10) |
| { |
| handleReplayError(tolerateErrors, null, |
| "Invalid mutation size %d at %d in %s", |
| serializedSize, mutationStart, errorContext); |
| return false; |
| } |
| |
| long claimedSizeChecksum; |
| if (desc.version < CommitLogDescriptor.VERSION_21) |
| claimedSizeChecksum = reader.readLong(); |
| else |
| claimedSizeChecksum = reader.readInt() & 0xffffffffL; |
| checksum.reset(); |
| if (desc.version < CommitLogDescriptor.VERSION_20) |
| checksum.update(serializedSize); |
| else |
| updateChecksumInt(checksum, serializedSize); |
| |
| if (checksum.getValue() != claimedSizeChecksum) |
| { |
| handleReplayError(tolerateErrors, null, |
| "Mutation size checksum failure at %d in %s", |
| mutationStart, errorContext); |
| return false; |
| } |
| // ok. |
| |
| if (serializedSize > buffer.length) |
| buffer = new byte[(int) (1.2 * serializedSize)]; |
| reader.readFully(buffer, 0, serializedSize); |
| if (desc.version < CommitLogDescriptor.VERSION_21) |
| claimedCRC32 = reader.readLong(); |
| else |
| claimedCRC32 = reader.readInt() & 0xffffffffL; |
| } |
| catch (EOFException eof) |
| { |
| handleReplayError(tolerateErrors, eof, |
| "Unexpected end of segment", |
| mutationStart, errorContext); |
| return false; // last CL entry didn't get completely written. that's ok. |
| } |
| |
| checksum.update(buffer, 0, serializedSize); |
| if (claimedCRC32 != checksum.getValue()) |
| { |
| handleReplayError(tolerateErrors, null, |
| "Mutation checksum failure at %d in %s", |
| mutationStart, errorContext); |
| continue; |
| } |
| replayMutation(buffer, serializedSize, (int) reader.getFilePointer(), desc); |
| } |
| return true; |
| } |
| |
| /** |
| * Deserializes and replays a commit log entry. |
| */ |
| void replayMutation(byte[] inputBuffer, int size, |
| final int entryLocation, final CommitLogDescriptor desc) throws IOException |
| { |
| |
| final Mutation mutation; |
| try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) |
| { |
| mutation = Mutation.serializer.deserialize(bufIn, |
| desc.getMessagingVersion(), |
| SerializationHelper.Flag.LOCAL); |
| // doublecheck that what we read is [still] valid for the current schema |
| for (PartitionUpdate upd : mutation.getPartitionUpdates()) |
| upd.validate(); |
| } |
| catch (UnknownColumnFamilyException ex) |
| { |
| if (ex.cfId == null) |
| return; |
| AtomicInteger i = invalidMutations.get(ex.cfId); |
| if (i == null) |
| { |
| i = new AtomicInteger(1); |
| invalidMutations.put(ex.cfId, i); |
| } |
| else |
| i.incrementAndGet(); |
| return; |
| } |
| catch (Throwable t) |
| { |
| JVMStabilityInspector.inspectThrowable(t); |
| File f = File.createTempFile("mutation", "dat"); |
| |
| try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) |
| { |
| out.write(inputBuffer, 0, size); |
| } |
| |
| // Checksum passed so this error can't be permissible. |
| handleReplayError(false, t, |
| "Unexpected error deserializing mutation; saved to %s. " + |
| "This may be caused by replaying a mutation against a table with the same name but incompatible schema.", |
| f.getAbsolutePath(), |
| t); |
| return; |
| } |
| |
| if (logger.isTraceEnabled()) |
| logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); |
| |
| Runnable runnable = new WrappedRunnable() |
| { |
| public void runMayThrow() |
| { |
| if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) |
| return; |
| if (pointInTimeExceeded(mutation)) |
| return; |
| |
| final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); |
| |
| // Rebuild the mutation, omitting column families that |
| // a) the user has requested that we ignore, |
| // b) have already been flushed, |
| // or c) are part of a cf that was dropped. |
| // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. |
| Mutation newMutation = null; |
| for (PartitionUpdate update : replayFilter.filter(mutation)) |
| { |
| if (Schema.instance.getCF(update.metadata().cfId) == null) |
| continue; // dropped |
| |
| // replay if current segment is newer than last flushed one or, |
| // if it is the last known segment, if we are after the replay position |
| if (shouldReplay(update.metadata().cfId, new ReplayPosition(desc.id, entryLocation))) |
| { |
| if (newMutation == null) |
| newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); |
| newMutation.add(update); |
| replayedCount.incrementAndGet(); |
| } |
| } |
| if (newMutation != null) |
| { |
| assert !newMutation.isEmpty(); |
| |
| Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false); |
| keyspacesRecovered.add(keyspace); |
| } |
| } |
| }; |
| futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); |
| if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) |
| { |
| FBUtilities.waitOnFutures(futures); |
| futures.clear(); |
| } |
| } |
| |
| protected boolean pointInTimeExceeded(Mutation fm) |
| { |
| long restoreTarget = archiver.restorePointInTime; |
| |
| for (PartitionUpdate upd : fm.getPartitionUpdates()) |
| { |
| if (archiver.precision.toMillis(upd.maxTimestamp()) > restoreTarget) |
| return true; |
| } |
| return false; |
| } |
| |
| static void handleReplayError(boolean permissible, Throwable t, String message, Object... messageArgs) throws IOException |
| { |
| String msg = String.format(message, messageArgs); |
| IOException e = new CommitLogReplayException(msg, t); |
| if (permissible) |
| logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e); |
| else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY)) |
| logger.error("Ignoring commit log replay error", e); |
| else if (!CommitLog.handleCommitError("Failed commit log replay", e)) |
| { |
| logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " + |
| "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " + |
| "on the command line"); |
| throw e; |
| } |
| } |
| |
| @SuppressWarnings("serial") |
| public static class CommitLogReplayException extends IOException |
| { |
| public CommitLogReplayException(String message, Throwable cause) |
| { |
| super(message, cause); |
| } |
| |
| public CommitLogReplayException(String message) |
| { |
| super(message); |
| } |
| } |
| } |