| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.log |
| |
| import java.io.{File, IOException} |
| import java.lang.{Long => JLong} |
| import java.nio.file.{Files, NoSuchFileException} |
| import java.text.NumberFormat |
| import java.util.Map.{Entry => JEntry} |
| import java.util.Optional |
| import java.util.concurrent.atomic._ |
| import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit} |
| import java.util.regex.Pattern |
| |
| import com.yammer.metrics.core.Gauge |
| import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0} |
| import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException} |
| import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} |
| import kafka.metrics.KafkaMetricsGroup |
| import kafka.server.checkpoints.LeaderEpochCheckpointFile |
| import kafka.server.epoch.LeaderEpochFileCache |
| import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch} |
| import kafka.utils._ |
| import org.apache.kafka.common.errors._ |
| import org.apache.kafka.common.record.FileRecords.TimestampAndOffset |
| import org.apache.kafka.common.record._ |
| import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction |
| import org.apache.kafka.common.requests.{EpochEndOffset, ListOffsetRequest} |
| import org.apache.kafka.common.utils.{Time, Utils} |
| import org.apache.kafka.common.{KafkaException, TopicPartition} |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable.{ArrayBuffer, ListBuffer} |
| import scala.collection.{Seq, Set, mutable} |
| |
| object LogAppendInfo { |
| val UnknownLogAppendInfo = LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L, |
| RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) |
| |
| def unknownLogAppendInfoWithLogStartOffset(logStartOffset: Long): LogAppendInfo = |
| LogAppendInfo(None, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset, |
| RecordConversionStats.EMPTY, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, -1L) |
| } |
| |
| /** |
| * Struct to hold various quantities we compute about each message set before appending to the log |
| * |
| * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending |
| * to the follower. |
| * @param lastOffset The last offset in the message set |
| * @param maxTimestamp The maximum timestamp of the message set. |
| * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. |
| * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp |
| * @param logStartOffset The start offset of the log at the time of this append. |
| * @param recordConversionStats Statistics collected during record processing, `null` if `assignOffsets` is `false` |
| * @param sourceCodec The source codec used in the message set (send by the producer) |
| * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any) |
| * @param shallowCount The number of shallow messages |
| * @param validBytes The number of valid bytes |
| * @param offsetsMonotonic Are the offsets in this message set monotonically increasing |
| * @param lastOffsetOfFirstBatch The last offset of the first batch |
| */ |
| case class LogAppendInfo(var firstOffset: Option[Long], |
| var lastOffset: Long, |
| var maxTimestamp: Long, |
| var offsetOfMaxTimestamp: Long, |
| var logAppendTime: Long, |
| var logStartOffset: Long, |
| var recordConversionStats: RecordConversionStats, |
| sourceCodec: CompressionCodec, |
| targetCodec: CompressionCodec, |
| shallowCount: Int, |
| validBytes: Int, |
| offsetsMonotonic: Boolean, |
| lastOffsetOfFirstBatch: Long) { |
| /** |
| * Get the first offset if it exists, else get the last offset of the first batch |
| * For magic versions 2 and newer, this method will return first offset. For magic versions |
| * older than 2, we use the last offset of the first batch as an approximation of the first |
| * offset to avoid decompressing the data. |
| */ |
| def firstOrLastOffsetOfFirstBatch: Long = firstOffset.getOrElse(lastOffsetOfFirstBatch) |
| |
| /** |
| * Get the (maximum) number of messages described by LogAppendInfo |
| * @return Maximum possible number of messages described by LogAppendInfo |
| */ |
| def numMessages: Long = { |
| firstOffset match { |
| case Some(firstOffsetVal) if (firstOffsetVal >= 0 && lastOffset >= 0) => (lastOffset - firstOffsetVal + 1) |
| case _ => 0 |
| } |
| } |
| } |
| |
| /** |
| * Container class which represents a snapshot of the significant offsets for a partition. This allows fetching |
| * of these offsets atomically without the possibility of a leader change affecting their consistency relative |
| * to each other. See [[kafka.cluster.Partition.fetchOffsetSnapshot()]]. |
| */ |
| case class LogOffsetSnapshot(logStartOffset: Long, |
| logEndOffset: LogOffsetMetadata, |
| highWatermark: LogOffsetMetadata, |
| lastStableOffset: LogOffsetMetadata) |
| |
| /** |
| * Another container which is used for lower level reads using [[kafka.cluster.Partition.readRecords()]]. |
| */ |
| case class LogReadInfo(fetchedData: FetchDataInfo, |
| highWatermark: Long, |
| logStartOffset: Long, |
| logEndOffset: Long, |
| lastStableOffset: Long) |
| |
| /** |
| * A class used to hold useful metadata about a completed transaction. This is used to build |
| * the transaction index after appending to the log. |
| * |
| * @param producerId The ID of the producer |
| * @param firstOffset The first offset (inclusive) of the transaction |
| * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the |
| * COMMIT/ABORT control record which indicates the transaction's completion. |
| * @param isAborted Whether or not the transaction was aborted |
| */ |
| case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean) { |
| override def toString: String = { |
| "CompletedTxn(" + |
| s"producerId=$producerId, " + |
| s"firstOffset=$firstOffset, " + |
| s"lastOffset=$lastOffset, " + |
| s"isAborted=$isAborted)" |
| } |
| } |
| |
| /** |
| * A class used to hold params required to decide to rotate a log segment or not. |
| */ |
| case class RollParams(maxSegmentMs: Long, |
| maxSegmentBytes: Int, |
| maxTimestampInMessages: Long, |
| maxOffsetInMessages: Long, |
| messagesSize: Int, |
| now: Long) |
| |
| object RollParams { |
| def apply(config: LogConfig, appendInfo: LogAppendInfo, messagesSize: Int, now: Long): RollParams = { |
| new RollParams(config.maxSegmentMs, |
| config.segmentSize, |
| appendInfo.maxTimestamp, |
| appendInfo.lastOffset, |
| messagesSize, now) |
| } |
| } |
| |
| /** |
| * An append-only log for storing messages. |
| * |
| * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment. |
| * |
| * New log segments are created according to a configurable policy that controls the size in bytes or time interval |
| * for a given segment. |
| * |
| * @param dir The directory in which log segments are created. |
| * @param config The log configuration settings |
| * @param logStartOffset The earliest offset allowed to be exposed to kafka client. |
| * The logStartOffset can be updated by : |
| * - user's DeleteRecordsRequest |
| * - broker's log retention |
| * - broker's log truncation |
| * The logStartOffset is used to decide the following: |
| * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted. |
| * It may trigger log rolling if the active segment is deleted. |
| * - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset, |
| * we make sure that logStartOffset <= log's highWatermark |
| * Other activities such as log cleaning are not affected by logStartOffset. |
| * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk |
| * @param scheduler The thread pool scheduler used for background actions |
| * @param brokerTopicStats Container for Broker Topic Yammer Metrics |
| * @param time The time instance used for checking the clock |
| * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired |
| * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired |
| */ |
| @threadsafe |
| class Log(@volatile var dir: File, |
| @volatile var config: LogConfig, |
| @volatile var logStartOffset: Long, |
| @volatile var recoveryPoint: Long, |
| scheduler: Scheduler, |
| brokerTopicStats: BrokerTopicStats, |
| val time: Time, |
| val maxProducerIdExpirationMs: Int, |
| val producerIdExpirationCheckIntervalMs: Int, |
| val topicPartition: TopicPartition, |
| val producerStateManager: ProducerStateManager, |
| logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { |
| |
| import kafka.log.Log._ |
| |
| this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] " |
| |
| /* A lock that guards all modifications to the log */ |
| private val lock = new Object |
| // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() |
| // After memory mapped buffer is closed, no disk IO operation should be performed for this log |
| @volatile private var isMemoryMappedBufferClosed = false |
| |
| /* last time it was flushed */ |
| private val lastFlushedTime = new AtomicLong(time.milliseconds) |
| |
| def initFileSize: Int = { |
| if (config.preallocate) |
| config.segmentSize |
| else |
| 0 |
| } |
| |
| def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = { |
| val oldConfig = this.config |
| this.config = newConfig |
| if (updatedKeys.contains(LogConfig.MessageFormatVersionProp)) { |
| val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion |
| val newRecordVersion = newConfig.messageFormatVersion.recordVersion |
| if (newRecordVersion.precedes(oldRecordVersion)) |
| warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.") |
| initializeLeaderEpochCache() |
| } |
| } |
| |
| private def checkIfMemoryMappedBufferClosed(): Unit = { |
| if (isMemoryMappedBufferClosed) |
| throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed") |
| } |
| |
| @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ |
| |
| /* The earliest offset which is part of an incomplete transaction. This is used to compute the |
| * last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset |
| * gets removed from the log (through record or segment deletion). In this case, the first unstable offset |
| * will point to the log start offset, which may actually be either part of a completed transaction or not |
| * part of a transaction at all. However, since we only use the LSO for the purpose of restricting the |
| * read_committed consumer to fetching decided data (i.e. committed, aborted, or non-transactional), this |
| * temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets |
| * of each ongoing transaction in order to compute a new first unstable offset. It is possible, however, |
| * that this could result in disagreement between replicas depending on when they began replicating the log. |
| * In the worst case, the LSO could be seen by a consumer to go backwards. |
| */ |
| @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None |
| |
| /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are |
| * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark |
| * equals the log end offset (which may never happen for a partition under consistent load). This is needed to |
| * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark. |
| */ |
| @volatile private var replicaHighWatermark: Option[Long] = None |
| |
| /* the actual segments of the log */ |
| private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] |
| |
| // Visible for testing |
| @volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None |
| |
| locally { |
| val startMs = time.milliseconds |
| |
| // create the log directory if it doesn't exist |
| Files.createDirectories(dir.toPath) |
| |
| initializeLeaderEpochCache() |
| |
| val nextOffset = loadSegments() |
| |
| /* Calculate the offset of the next message */ |
| nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) |
| |
| leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset)) |
| |
| logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) |
| |
| // The earliest leader epoch may not be flushed during a hard failure. Recover it here. |
| leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) |
| |
| // Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here |
| // from scratch. |
| if (!producerStateManager.isEmpty) |
| throw new IllegalStateException("Producer state must be empty during log initialization") |
| loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile) |
| |
| info(s"Completed load of log with ${segments.size} segments, log start offset $logStartOffset and " + |
| s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms") |
| } |
| |
| private val tags = { |
| val maybeFutureTag = if (isFuture) Map("is-future" -> "true") else Map.empty[String, String] |
| Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ maybeFutureTag |
| } |
| |
| newGauge("NumLogSegments", |
| new Gauge[Int] { |
| def value = numberOfSegments |
| }, |
| tags) |
| |
| newGauge("LogStartOffset", |
| new Gauge[Long] { |
| def value = logStartOffset |
| }, |
| tags) |
| |
| newGauge("LogEndOffset", |
| new Gauge[Long] { |
| def value = logEndOffset |
| }, |
| tags) |
| |
| newGauge("Size", |
| new Gauge[Long] { |
| def value = size |
| }, |
| tags) |
| |
| scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { |
| lock synchronized { |
| producerStateManager.removeExpiredProducers(time.milliseconds) |
| } |
| }, period = producerIdExpirationCheckIntervalMs, delay = producerIdExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) |
| |
| /** The name of this log */ |
| def name = dir.getName() |
| |
| def recordVersion: RecordVersion = config.messageFormatVersion.recordVersion |
| |
| private def initializeLeaderEpochCache(): Unit = lock synchronized { |
| val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir) |
| |
| def newLeaderEpochFileCache(): LeaderEpochFileCache = { |
| val checkpointFile = new LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel) |
| new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) |
| } |
| |
| if (recordVersion.precedes(RecordVersion.V2)) { |
| val currentCache = if (leaderEpochFile.exists()) |
| Some(newLeaderEpochFileCache()) |
| else |
| None |
| |
| if (currentCache.exists(_.nonEmpty)) |
| warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion") |
| |
| Files.deleteIfExists(leaderEpochFile.toPath) |
| leaderEpochCache = None |
| } else { |
| leaderEpochCache = Some(newLeaderEpochFileCache()) |
| } |
| } |
| |
| /** |
| * Removes any temporary files found in log directory, and creates a list of all .swap files which could be swapped |
| * in place of existing segment(s). For log splitting, we know that any .swap file whose base offset is higher than |
| * the smallest offset .clean file could be part of an incomplete split operation. Such .swap files are also deleted |
| * by this method. |
| * @return Set of .swap files that are valid to be swapped in as segment files |
| */ |
| private def removeTempFilesAndCollectSwapFiles(): Set[File] = { |
| |
| def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = { |
| info(s"Deleting index files with suffix $suffix for baseFile $baseFile") |
| val offset = offsetFromFile(baseFile) |
| Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath) |
| Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath) |
| Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath) |
| } |
| |
| var swapFiles = Set[File]() |
| var cleanFiles = Set[File]() |
| var minCleanedFileOffset = Long.MaxValue |
| |
| for (file <- dir.listFiles if file.isFile) { |
| if (!file.canRead) |
| throw new IOException(s"Could not read file $file") |
| val filename = file.getName |
| if (filename.endsWith(DeletedFileSuffix)) { |
| debug(s"Deleting stray temporary file ${file.getAbsolutePath}") |
| Files.deleteIfExists(file.toPath) |
| } else if (filename.endsWith(CleanedFileSuffix)) { |
| minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) |
| cleanFiles += file |
| } else if (filename.endsWith(SwapFileSuffix)) { |
| // we crashed in the middle of a swap operation, to recover: |
| // if a log, delete the index files, complete the swap operation later |
| // if an index just delete the index files, they will be rebuilt |
| val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) |
| info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.") |
| if (isIndexFile(baseFile)) { |
| deleteIndicesIfExist(baseFile) |
| } else if (isLogFile(baseFile)) { |
| deleteIndicesIfExist(baseFile) |
| swapFiles += file |
| } |
| } |
| } |
| |
| // KAFKA-6264: Delete all .swap files whose base offset is greater than the minimum .cleaned segment offset. Such .swap |
| // files could be part of an incomplete split operation that could not complete. See Log#splitOverflowedSegment |
| // for more details about the split operation. |
| val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset) |
| invalidSwapFiles.foreach { file => |
| debug(s"Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset") |
| val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) |
| deleteIndicesIfExist(baseFile, SwapFileSuffix) |
| Files.deleteIfExists(file.toPath) |
| } |
| |
| // Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files |
| cleanFiles.foreach { file => |
| debug(s"Deleting stray .clean file ${file.getAbsolutePath}") |
| Files.deleteIfExists(file.toPath) |
| } |
| |
| validSwapFiles |
| } |
| |
| /** |
| * This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded |
| * It is possible that we encounter a segment with index offset overflow in which case the LogSegmentOffsetOverflowException |
| * will be thrown. Note that any segments that were opened before we encountered the exception will remain open and the |
| * caller is responsible for closing them appropriately, if needed. |
| * @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset |
| */ |
| private def loadSegmentFiles(): Unit = { |
| // load segments in ascending order because transactional data from one segment may depend on the |
| // segments that come before it |
| for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { |
| if (isIndexFile(file)) { |
| // if it is an index file, make sure it has a corresponding .log file |
| val offset = offsetFromFile(file) |
| val logFile = Log.logFile(dir, offset) |
| if (!logFile.exists) { |
| warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.") |
| Files.deleteIfExists(file.toPath) |
| } |
| } else if (isLogFile(file)) { |
| // if it's a log file, load the corresponding log segment |
| val baseOffset = offsetFromFile(file) |
| val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists() |
| val segment = LogSegment.open(dir = dir, |
| baseOffset = baseOffset, |
| config, |
| time = time, |
| fileAlreadyExists = true) |
| |
| try segment.sanityCheck(timeIndexFileNewlyCreated) |
| catch { |
| case _: NoSuchFileException => |
| error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " + |
| "recovering segment and rebuilding index files...") |
| recoverSegment(segment) |
| case e: CorruptIndexException => |
| warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " + |
| s"to ${e.getMessage}}, recovering segment and rebuilding index files...") |
| recoverSegment(segment) |
| } |
| addSegment(segment) |
| } |
| } |
| } |
| |
| /** |
| * Recover the given segment. |
| * @param segment Segment to recover |
| * @param leaderEpochCache Optional cache for updating the leader epoch during recovery |
| * @return The number of bytes truncated from the segment |
| * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index offset overflow |
| */ |
| private def recoverSegment(segment: LogSegment, |
| leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized { |
| val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) |
| rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false, producerStateManager) |
| val bytesTruncated = segment.recover(producerStateManager, leaderEpochCache) |
| // once we have recovered the segment's data, take a snapshot to ensure that we won't |
| // need to reload the same segment again while recovering another segment. |
| producerStateManager.takeSnapshot() |
| bytesTruncated |
| } |
| |
| /** |
| * This method does not need to convert IOException to KafkaStorageException because it is only called before all logs |
| * are loaded. |
| * @throws LogSegmentOffsetOverflowException if the swap file contains messages that cause the log segment offset to |
| * overflow. Note that this is currently a fatal exception as we do not have |
| * a way to deal with it. The exception is propagated all the way up to |
| * KafkaServer#startup which will cause the broker to shut down if we are in |
| * this situation. This is expected to be an extremely rare scenario in practice, |
| * and manual intervention might be required to get out of it. |
| */ |
| private def completeSwapOperations(swapFiles: Set[File]): Unit = { |
| for (swapFile <- swapFiles) { |
| val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) |
| val baseOffset = offsetFromFile(logFile) |
| val swapSegment = LogSegment.open(swapFile.getParentFile, |
| baseOffset = baseOffset, |
| config, |
| time = time, |
| fileSuffix = SwapFileSuffix) |
| info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.") |
| recoverSegment(swapSegment) |
| |
| // We create swap files for two cases: |
| // (1) Log cleaning where multiple segments are merged into one, and |
| // (2) Log splitting where one segment is split into multiple. |
| // |
| // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment |
| // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion |
| // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to |
| // do a replace with an existing segment. |
| val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment => |
| segment.readNextOffset > swapSegment.baseOffset |
| } |
| replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true) |
| } |
| } |
| |
| /** |
| * Load the log segments from the log files on disk and return the next offset. |
| * This method does not need to convert IOException to KafkaStorageException because it is only called before all logs |
| * are loaded. |
| * @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that overflow index offset; or when |
| * we find an unexpected number of .log files with overflow |
| */ |
| private def loadSegments(): Long = { |
| // first do a pass through the files in the log directory and remove any temporary files |
| // and find any interrupted swap operations |
| val swapFiles = removeTempFilesAndCollectSwapFiles() |
| |
| // Now do a second pass and load all the log and index files. |
| // We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When |
| // this happens, restart loading segment files from scratch. |
| retryOnOffsetOverflow { |
| // In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry |
| // loading of segments. In that case, we also need to close all segments that could have been left open in previous |
| // call to loadSegmentFiles(). |
| logSegments.foreach(_.close()) |
| segments.clear() |
| loadSegmentFiles() |
| } |
| |
| // Finally, complete any interrupted swap operations. To be crash-safe, |
| // log files that are replaced by the swap segment should be renamed to .deleted |
| // before the swap file is restored as the new segment file. |
| completeSwapOperations(swapFiles) |
| |
| if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { |
| val nextOffset = retryOnOffsetOverflow { |
| recoverLog() |
| } |
| |
| // reset the index size of the currently active log segment to allow more entries |
| activeSegment.resizeIndexes(config.maxIndexSize) |
| nextOffset |
| } else { |
| if (logSegments.isEmpty) { |
| addSegment(LogSegment.open(dir = dir, |
| baseOffset = 0, |
| config, |
| time = time, |
| fileAlreadyExists = false, |
| initFileSize = this.initFileSize, |
| preallocate = false)) |
| } |
| 0 |
| } |
| } |
| |
| private def updateLogEndOffset(messageOffset: Long) { |
| nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size) |
| } |
| |
| /** |
| * Recover the log segments and return the next offset after recovery. |
| * This method does not need to convert IOException to KafkaStorageException because it is only called before all |
| * logs are loaded. |
| * @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow |
| */ |
| private def recoverLog(): Long = { |
| // if we have the clean shutdown marker, skip recovery |
| if (!hasCleanShutdownFile) { |
| // okay we need to actually recover this log |
| val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator |
| while (unflushed.hasNext) { |
| val segment = unflushed.next |
| info(s"Recovering unflushed segment ${segment.baseOffset}") |
| val truncatedBytes = |
| try { |
| recoverSegment(segment, leaderEpochCache) |
| } catch { |
| case _: InvalidOffsetException => |
| val startOffset = segment.baseOffset |
| warn("Found invalid offset during recovery. Deleting the corrupt segment and " + |
| s"creating an empty one with starting offset $startOffset") |
| segment.truncateTo(startOffset) |
| } |
| if (truncatedBytes > 0) { |
| // we had an invalid message, delete all remaining log |
| warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}") |
| unflushed.foreach(deleteSegment) |
| } |
| } |
| } |
| |
| if (logSegments.nonEmpty) { |
| val logEndOffset = activeSegment.readNextOffset |
| if (logEndOffset < logStartOffset) { |
| warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " + |
| "This could happen if segment files were deleted from the file system.") |
| logSegments.foreach(deleteSegment) |
| } |
| } |
| |
| if (logSegments.isEmpty) { |
| // no existing segments, create a new mutable segment beginning at logStartOffset |
| addSegment(LogSegment.open(dir = dir, |
| baseOffset = logStartOffset, |
| config, |
| time = time, |
| fileAlreadyExists = false, |
| initFileSize = this.initFileSize, |
| preallocate = config.preallocate)) |
| } |
| |
| recoveryPoint = activeSegment.readNextOffset |
| recoveryPoint |
| } |
| |
| // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be |
| // free of all side-effects, i.e. it must not update any log-specific state. |
| private def rebuildProducerState(lastOffset: Long, |
| reloadFromCleanShutdown: Boolean, |
| producerStateManager: ProducerStateManager): Unit = lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| val messageFormatVersion = config.messageFormatVersion.recordVersion.value |
| val segments = logSegments |
| val offsetsToSnapshot = |
| if (segments.nonEmpty) { |
| val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset) |
| Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset)) |
| } else { |
| Seq(Some(lastOffset)) |
| } |
| info(s"Loading producer state till offset $lastOffset with message format version $messageFormatVersion") |
| |
| // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being |
| // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case, |
| // but we have to be careful not to assume too much in the presence of broker failures. The two most common |
| // upgrade cases in which we expect to find no snapshots are the following: |
| // |
| // 1. The broker has been upgraded, but the topic is still on the old message format. |
| // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown. |
| // |
| // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end |
| // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot |
| // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state |
| // from the first segment. |
| if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || |
| (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { |
| // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the |
| // last two segments and the last offset. This should avoid the full scan in the case that the log needs |
| // truncation. |
| offsetsToSnapshot.flatten.foreach { offset => |
| producerStateManager.updateMapEndOffset(offset) |
| producerStateManager.takeSnapshot() |
| } |
| } else { |
| val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset |
| producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) |
| |
| // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end |
| // offset (which would be the case on first startup) and there were active producers prior to truncation |
| // (which could be the case if truncating after initial loading). If there weren't, then truncating |
| // shouldn't change that fact (although it could cause a producerId to expire earlier than expected), |
| // and we can skip the loading. This is an optimization for users which are not yet using |
| // idempotent/transactional features yet. |
| if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { |
| logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => |
| val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) |
| producerStateManager.updateMapEndOffset(startOffset) |
| |
| if (offsetsToSnapshot.contains(Some(segment.baseOffset))) |
| producerStateManager.takeSnapshot() |
| |
| val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) |
| if (fetchDataInfo != null) |
| loadProducersFromLog(producerStateManager, fetchDataInfo.records) |
| } |
| } |
| producerStateManager.updateMapEndOffset(lastOffset) |
| producerStateManager.takeSnapshot() |
| } |
| } |
| |
| private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { |
| rebuildProducerState(lastOffset, reloadFromCleanShutdown, producerStateManager) |
| updateFirstUnstableOffset() |
| } |
| |
| private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = { |
| val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo] |
| val completedTxns = ListBuffer.empty[CompletedTxn] |
| records.batches.asScala.foreach { batch => |
| if (batch.hasProducerId) { |
| val maybeCompletedTxn = updateProducers(batch, loadedProducers, isFromClient = false) |
| maybeCompletedTxn.foreach(completedTxns += _) |
| } |
| } |
| loadedProducers.values.foreach(producerStateManager.update) |
| completedTxns.foreach(producerStateManager.completeTxn) |
| } |
| |
| private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock synchronized { |
| producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => |
| (producerId, producerIdEntry.lastSeq) |
| } |
| } |
| |
| private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized { |
| producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => |
| val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None |
| val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch) |
| producerId -> lastRecord |
| } |
| } |
| |
| /** |
| * Check if we have the "clean shutdown" file |
| */ |
| private def hasCleanShutdownFile: Boolean = new File(dir.getParentFile, CleanShutdownFile).exists() |
| |
| /** |
| * The number of segments in the log. |
| * Take care! this is an O(n) operation. |
| */ |
| def numberOfSegments: Int = segments.size |
| |
| /** |
| * Close this log. |
| * The memory mapped buffer for index files of this log will be left open until the log is deleted. |
| */ |
| def close() { |
| debug("Closing log") |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") { |
| // We take a snapshot at the last written offset to hopefully avoid the need to scan the log |
| // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization |
| // (the clean shutdown file is written after the logs are all closed). |
| producerStateManager.takeSnapshot() |
| logSegments.foreach(_.close()) |
| } |
| } |
| } |
| |
| /** |
| * Rename the directory of the log |
| * |
| * @throws KafkaStorageException if rename fails |
| */ |
| def renameDir(name: String) { |
| lock synchronized { |
| maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") { |
| val renamedDir = new File(dir.getParent, name) |
| Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) |
| if (renamedDir != dir) { |
| dir = renamedDir |
| logSegments.foreach(_.updateDir(renamedDir)) |
| producerStateManager.logDir = dir |
| // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference |
| // the checkpoint file in renamed log directory |
| initializeLeaderEpochCache() |
| } |
| } |
| } |
| } |
| |
| /** |
| * Close file handlers used by log but don't write to disk. This is called if the log directory is offline |
| */ |
| def closeHandlers() { |
| debug("Closing handlers") |
| lock synchronized { |
| logSegments.foreach(_.closeHandlers()) |
| isMemoryMappedBufferClosed = true |
| } |
| } |
| |
| /** |
| * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs |
| * |
| * @param records The records to append |
| * @param isFromClient Whether or not this append is from a producer |
| * @param interBrokerProtocolVersion Inter-broker message protocol version |
| * @throws KafkaStorageException If the append fails due to an I/O error. |
| * @return Information about the appended messages including the first and last offset. |
| */ |
| def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true, |
| interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { |
| append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch) |
| } |
| |
| /** |
| * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs |
| * |
| * @param records The records to append |
| * @throws KafkaStorageException If the append fails due to an I/O error. |
| * @return Information about the appended messages including the first and last offset. |
| */ |
| def appendAsFollower(records: MemoryRecords): LogAppendInfo = { |
| append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1) |
| } |
| |
| /** |
| * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. |
| * |
| * This method will generally be responsible for assigning offsets to the messages, |
| * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. |
| * |
| * @param records The log records to append |
| * @param isFromClient Whether or not this append is from a producer |
| * @param interBrokerProtocolVersion Inter-broker message protocol version |
| * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given |
| * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader |
| * @throws KafkaStorageException If the append fails due to an I/O error. |
| * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' |
| * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset |
| * @return Information about the appended messages including the first and last offset. |
| */ |
| private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { |
| maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { |
| val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) |
| |
| // return if we have no valid messages or if this is a duplicate of the last appended entry |
| if (appendInfo.shallowCount == 0) |
| return appendInfo |
| |
| // trim any invalid bytes or partial messages before appending it to the on-disk log |
| var validRecords = trimInvalidBytes(records, appendInfo) |
| |
| // they are valid, insert them in the log |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| if (assignOffsets) { |
| // assign offsets to the message set |
| val offset = new LongRef(nextOffsetMetadata.messageOffset) |
| appendInfo.firstOffset = Some(offset.value) |
| val now = time.milliseconds |
| val validateAndOffsetAssignResult = try { |
| LogValidator.validateMessagesAndAssignOffsets(validRecords, |
| offset, |
| time, |
| now, |
| appendInfo.sourceCodec, |
| appendInfo.targetCodec, |
| config.compact, |
| config.messageFormatVersion.recordVersion.value, |
| config.messageTimestampType, |
| config.messageTimestampDifferenceMaxMs, |
| leaderEpoch, |
| isFromClient, |
| interBrokerProtocolVersion) |
| } catch { |
| case e: IOException => |
| throw new KafkaException(s"Error validating messages while appending to log $name", e) |
| } |
| validRecords = validateAndOffsetAssignResult.validatedRecords |
| appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp |
| appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp |
| appendInfo.lastOffset = offset.value - 1 |
| appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats |
| if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) |
| appendInfo.logAppendTime = now |
| |
| // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message |
| // format conversion) |
| if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { |
| for (batch <- validRecords.batches.asScala) { |
| if (batch.sizeInBytes > config.maxMessageSize) { |
| // we record the original message set size instead of the trimmed size |
| // to be consistent with pre-compression bytesRejectedRate recording |
| brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) |
| brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) |
| throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" + |
| s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.") |
| } |
| } |
| } |
| } else { |
| // we are taking the offsets we are given |
| if (!appendInfo.offsetsMonotonic) |
| throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + |
| records.records.asScala.map(_.offset)) |
| |
| if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) { |
| // we may still be able to recover if the log is empty |
| // one example: fetching from log start offset on the leader which is not batch aligned, |
| // which may happen as a result of AdminClient#deleteRecords() |
| val firstOffset = appendInfo.firstOffset match { |
| case Some(offset) => offset |
| case None => records.batches.asScala.head.baseOffset() |
| } |
| |
| val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch" |
| throw new UnexpectedAppendOffsetException( |
| s"Unexpected offset in append to $topicPartition. $firstOrLast " + |
| s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + |
| s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + |
| s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", |
| firstOffset, appendInfo.lastOffset) |
| } |
| } |
| |
| // update the epoch cache with the epoch stamped onto the message by the leader |
| validRecords.batches.asScala.foreach { batch => |
| if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { |
| maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset) |
| } else { |
| // In partial upgrade scenarios, we may get a temporary regression to the message format. In |
| // order to ensure the safety of leader election, we clear the epoch cache so that we revert |
| // to truncation by high watermark after the next leader election. |
| leaderEpochCache.filter(_.nonEmpty).foreach { cache => |
| warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}") |
| cache.clearAndFlush() |
| } |
| } |
| } |
| |
| // check messages set size may be exceed config.segmentSize |
| if (validRecords.sizeInBytes > config.segmentSize) { |
| throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " + |
| s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.") |
| } |
| |
| // now that we have valid records, offsets assigned, and timestamps updated, we need to |
| // validate the idempotent/transactional state of the producers and collect some metadata |
| val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient) |
| maybeDuplicate.foreach { duplicate => |
| appendInfo.firstOffset = Some(duplicate.firstOffset) |
| appendInfo.lastOffset = duplicate.lastOffset |
| appendInfo.logAppendTime = duplicate.timestamp |
| appendInfo.logStartOffset = logStartOffset |
| return appendInfo |
| } |
| |
| // maybe roll the log if this segment is full |
| val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) |
| |
| val logOffsetMetadata = LogOffsetMetadata( |
| messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch, |
| segmentBaseOffset = segment.baseOffset, |
| relativePositionInSegment = segment.size) |
| |
| segment.append(largestOffset = appendInfo.lastOffset, |
| largestTimestamp = appendInfo.maxTimestamp, |
| shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, |
| records = validRecords) |
| |
| // Increment the log end offset. We do this immediately after the append because a |
| // write to the transaction index below may fail and we want to ensure that the offsets |
| // of future appends still grow monotonically. The resulting transaction index inconsistency |
| // will be cleaned up after the log directory is recovered. Note that the end offset of the |
| // ProducerStateManager will not be updated and the last stable offset will not advance |
| // if the append to the transaction index fails. |
| updateLogEndOffset(appendInfo.lastOffset + 1) |
| |
| // update the producer state |
| for ((_, producerAppendInfo) <- updatedProducers) { |
| producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata) |
| producerStateManager.update(producerAppendInfo) |
| } |
| |
| // update the transaction index with the true last stable offset. The last offset visible |
| // to consumers using READ_COMMITTED will be limited by this value and the high watermark. |
| for (completedTxn <- completedTxns) { |
| val lastStableOffset = producerStateManager.lastStableOffset(completedTxn) |
| segment.updateTxnIndex(completedTxn, lastStableOffset) |
| producerStateManager.completeTxn(completedTxn) |
| } |
| |
| // always update the last producer id map offset so that the snapshot reflects the current offset |
| // even if there isn't any idempotent data being written |
| producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1) |
| |
| // update the first unstable offset (which is used to compute LSO) |
| updateFirstUnstableOffset() |
| |
| trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " + |
| s"first offset: ${appendInfo.firstOffset}, " + |
| s"next offset: ${nextOffsetMetadata.messageOffset}, " + |
| s"and messages: $validRecords") |
| |
| if (unflushedMessages >= config.flushInterval) |
| flush() |
| |
| appendInfo |
| } |
| } |
| } |
| |
| def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = { |
| leaderEpochCache.foreach { cache => |
| cache.assign(leaderEpoch, startOffset) |
| } |
| } |
| |
| def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch) |
| |
| def endOffsetForEpoch(leaderEpoch: Int): Option[OffsetAndEpoch] = { |
| leaderEpochCache.flatMap { cache => |
| val (foundEpoch, foundOffset) = cache.endOffsetFor(leaderEpoch) |
| if (foundOffset == EpochEndOffset.UNDEFINED_EPOCH_OFFSET) |
| None |
| else |
| Some(OffsetAndEpoch(foundOffset, foundEpoch)) |
| } |
| } |
| |
| def onHighWatermarkIncremented(highWatermark: Long): Unit = { |
| lock synchronized { |
| replicaHighWatermark = Some(highWatermark) |
| producerStateManager.onHighWatermarkUpdated(highWatermark) |
| updateFirstUnstableOffset() |
| } |
| } |
| |
| private def updateFirstUnstableOffset(): Unit = lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match { |
| case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset => |
| val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset) |
| val segment = segments.floorEntry(offset).getValue |
| val position = segment.translateOffset(offset) |
| Some(LogOffsetMetadata(offset, segment.baseOffset, position.position)) |
| case other => other |
| } |
| |
| if (updatedFirstStableOffset != this.firstUnstableOffset) { |
| debug(s"First unstable offset updated to $updatedFirstStableOffset") |
| this.firstUnstableOffset = updatedFirstStableOffset |
| } |
| } |
| |
| /** |
| * Increment the log start offset if the provided offset is larger. |
| */ |
| def maybeIncrementLogStartOffset(newLogStartOffset: Long) { |
| // We don't have to write the log start offset to log-start-offset-checkpoint immediately. |
| // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown |
| // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low. |
| maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") { |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| if (newLogStartOffset > logStartOffset) { |
| info(s"Incrementing log start offset to $newLogStartOffset") |
| logStartOffset = newLogStartOffset |
| leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) |
| producerStateManager.truncateHead(logStartOffset) |
| updateFirstUnstableOffset() |
| } |
| } |
| } |
| } |
| |
| private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean): |
| (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[BatchMetadata]) = { |
| val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo] |
| val completedTxns = ListBuffer.empty[CompletedTxn] |
| for (batch <- records.batches.asScala if batch.hasProducerId) { |
| val maybeLastEntry = producerStateManager.lastEntry(batch.producerId) |
| |
| // if this is a client produce request, there will be up to 5 batches which could have been duplicated. |
| // If we find a duplicate, we return the metadata of the appended batch to the client. |
| if (isFromClient) { |
| maybeLastEntry.flatMap(_.findDuplicateBatch(batch)).foreach { duplicate => |
| return (updatedProducers, completedTxns.toList, Some(duplicate)) |
| } |
| } |
| |
| val maybeCompletedTxn = updateProducers(batch, updatedProducers, isFromClient = isFromClient) |
| maybeCompletedTxn.foreach(completedTxns += _) |
| } |
| (updatedProducers, completedTxns.toList, None) |
| } |
| |
| /** |
| * Validate the following: |
| * <ol> |
| * <li> each message matches its CRC |
| * <li> each message size is valid |
| * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other. |
| * </ol> |
| * |
| * Also compute the following quantities: |
| * <ol> |
| * <li> First offset in the message set |
| * <li> Last offset in the message set |
| * <li> Number of messages |
| * <li> Number of valid bytes |
| * <li> Whether the offsets are monotonically increasing |
| * <li> Whether any compression codec is used (if many are used, then the last one is given) |
| * </ol> |
| */ |
| private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { |
| var shallowMessageCount = 0 |
| var validBytesCount = 0 |
| var firstOffset: Option[Long] = None |
| var lastOffset = -1L |
| var sourceCodec: CompressionCodec = NoCompressionCodec |
| var monotonic = true |
| var maxTimestamp = RecordBatch.NO_TIMESTAMP |
| var offsetOfMaxTimestamp = -1L |
| var readFirstMessage = false |
| var lastOffsetOfFirstBatch = -1L |
| |
| for (batch <- records.batches.asScala) { |
| // we only validate V2 and higher to avoid potential compatibility issues with older clients |
| if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0) |
| throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + |
| s"be 0, but it is ${batch.baseOffset}") |
| |
| // update the first offset if on the first message. For magic versions older than 2, we use the last offset |
| // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). |
| // For magic version 2, we can get the first offset directly from the batch header. |
| // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower |
| // case, validation will be more lenient. |
| // Also indicate whether we have the accurate first offset or not |
| if (!readFirstMessage) { |
| if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) |
| firstOffset = Some(batch.baseOffset) |
| lastOffsetOfFirstBatch = batch.lastOffset |
| readFirstMessage = true |
| } |
| |
| // check that offsets are monotonically increasing |
| if (lastOffset >= batch.lastOffset) |
| monotonic = false |
| |
| // update the last offset seen |
| lastOffset = batch.lastOffset |
| |
| // Check if the message sizes are valid. |
| val batchSize = batch.sizeInBytes |
| if (batchSize > config.maxMessageSize) { |
| brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) |
| brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) |
| throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + |
| s"which exceeds the maximum configured value of ${config.maxMessageSize}.") |
| } |
| |
| // check the validity of the message by checking CRC |
| batch.ensureValid() |
| |
| if (batch.maxTimestamp > maxTimestamp) { |
| maxTimestamp = batch.maxTimestamp |
| offsetOfMaxTimestamp = lastOffset |
| } |
| |
| shallowMessageCount += 1 |
| validBytesCount += batchSize |
| |
| val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) |
| if (messageCodec != NoCompressionCodec) |
| sourceCodec = messageCodec |
| } |
| |
| // Apply broker-side compression if any |
| val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) |
| LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, |
| RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) |
| } |
| |
| private def updateProducers(batch: RecordBatch, |
| producers: mutable.Map[Long, ProducerAppendInfo], |
| isFromClient: Boolean): Option[CompletedTxn] = { |
| val producerId = batch.producerId |
| val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, isFromClient)) |
| appendInfo.append(batch) |
| } |
| |
| /** |
| * Trim any invalid bytes from the end of this message set (if there are any) |
| * |
| * @param records The records to trim |
| * @param info The general information of the message set |
| * @return A trimmed message set. This may be the same as what was passed in or it may not. |
| */ |
| private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = { |
| val validBytes = info.validBytes |
| if (validBytes < 0) |
| throw new CorruptRecordException(s"Cannot append record batch with illegal length $validBytes to " + |
| s"log for $topicPartition. A possible cause is a corrupted produce request.") |
| if (validBytes == records.sizeInBytes) { |
| records |
| } else { |
| // trim invalid bytes |
| val validByteBuffer = records.buffer.duplicate() |
| validByteBuffer.limit(validBytes) |
| MemoryRecords.readableRecords(validByteBuffer) |
| } |
| } |
| |
| /** |
| * Read messages from the log. |
| * |
| * @param startOffset The offset to begin reading at |
| * @param maxLength The maximum number of bytes to read |
| * @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) |
| * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) |
| * @param includeAbortedTxns Whether or not to lookup aborted transactions for fetched data |
| * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset |
| * @return The fetch data information including fetch starting offset metadata and messages read. |
| */ |
| def read(startOffset: Long, |
| maxLength: Int, |
| maxOffset: Option[Long], |
| minOneMessage: Boolean, |
| includeAbortedTxns: Boolean): FetchDataInfo = { |
| maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { |
| trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes") |
| |
| // Because we don't use lock for reading, the synchronization is a little bit tricky. |
| // We create the local variables to avoid race conditions with updates to the log. |
| val currentNextOffsetMetadata = nextOffsetMetadata |
| val next = currentNextOffsetMetadata.messageOffset |
| if (startOffset == next) { |
| val abortedTransactions = |
| if (includeAbortedTxns) Some(List.empty[AbortedTransaction]) |
| else None |
| return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false, |
| abortedTransactions = abortedTransactions) |
| } |
| |
| var segmentEntry = segments.floorEntry(startOffset) |
| |
| // return error on attempt to read beyond the log end offset or read below log start offset |
| if (startOffset > next || segmentEntry == null || startOffset < logStartOffset) |
| throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + |
| s"but we only have log segments in the range $logStartOffset to $next.") |
| |
| // Do the read on the segment with a base offset less than the target offset |
| // but if that segment doesn't contain any messages with an offset greater than that |
| // continue to read from successive segments until we get some messages or we reach the end of the log |
| while (segmentEntry != null) { |
| val segment = segmentEntry.getValue |
| |
| // If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after |
| // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may |
| // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log |
| // end of the active segment. |
| val maxPosition = { |
| if (segmentEntry == segments.lastEntry) { |
| val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong |
| // Check the segment again in case a new segment has just rolled out. |
| if (segmentEntry != segments.lastEntry) |
| // New log segment has rolled out, we can read up to the file end. |
| segment.size |
| else |
| exposedPos |
| } else { |
| segment.size |
| } |
| } |
| val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) |
| if (fetchInfo == null) { |
| segmentEntry = segments.higherEntry(segmentEntry.getKey) |
| } else { |
| return if (includeAbortedTxns) |
| addAbortedTransactions(startOffset, segmentEntry, fetchInfo) |
| else |
| fetchInfo |
| } |
| } |
| |
| // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, |
| // this can happen when all messages with offset larger than start offsets have been deleted. |
| // In this case, we will return the empty set with log end offset metadata |
| FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) |
| } |
| } |
| |
| private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { |
| val segmentEntry = segments.floorEntry(startOffset) |
| val allAbortedTxns = ListBuffer.empty[AbortedTxn] |
| def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns |
| collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator) |
| allAbortedTxns.toList |
| } |
| |
| private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment], |
| fetchInfo: FetchDataInfo): FetchDataInfo = { |
| val fetchSize = fetchInfo.records.sizeInBytes |
| val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, |
| fetchInfo.fetchOffsetMetadata.relativePositionInSegment) |
| val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { |
| val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) |
| if (nextSegmentEntry != null) |
| nextSegmentEntry.getValue.baseOffset |
| else |
| logEndOffset |
| } |
| |
| val abortedTransactions = ListBuffer.empty[AbortedTransaction] |
| def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction) |
| collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry, accumulator) |
| |
| FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata, |
| records = fetchInfo.records, |
| firstEntryIncomplete = fetchInfo.firstEntryIncomplete, |
| abortedTransactions = Some(abortedTransactions.toList)) |
| } |
| |
| private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long, |
| startingSegmentEntry: JEntry[JLong, LogSegment], |
| accumulator: List[AbortedTxn] => Unit): Unit = { |
| var segmentEntry = startingSegmentEntry |
| while (segmentEntry != null) { |
| val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset) |
| accumulator(searchResult.abortedTransactions) |
| if (searchResult.isComplete) |
| return |
| segmentEntry = segments.higherEntry(segmentEntry.getKey) |
| } |
| } |
| |
| /** |
| * Get an offset based on the given timestamp |
| * The offset returned is the offset of the first message whose timestamp is greater than or equals to the |
| * given timestamp. |
| * |
| * If no such message is found, the log end offset is returned. |
| * |
| * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before |
| * , i.e. it only gives back the timestamp based on the last modification time of the log segments. |
| * |
| * @param targetTimestamp The given timestamp for offset fetching. |
| * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp. |
| * None if no such message is found. |
| */ |
| def fetchOffsetByTimestamp(targetTimestamp: Long): Option[TimestampAndOffset] = { |
| maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { |
| debug(s"Searching offset for timestamp $targetTimestamp") |
| |
| if (config.messageFormatVersion < KAFKA_0_10_0_IV0 && |
| targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP && |
| targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP) |
| throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " + |
| s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " + |
| s"required version $KAFKA_0_10_0_IV0") |
| |
| // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides |
| // constant time access while being safe to use with concurrent collections unlike `toArray`. |
| val segmentsCopy = logSegments.toBuffer |
| // For the earliest and latest, we do not need to return the timestamp. |
| if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) { |
| // The first cached epoch usually corresponds to the log start offset, but we have to verify this since |
| // it may not be true following a message format version bump as the epoch will not be available for |
| // log entries written in the older format. |
| val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry) |
| val epochOpt = earliestEpochEntry match { |
| case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch) |
| case _ => Optional.empty[Integer]() |
| } |
| return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)) |
| } else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) { |
| val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) |
| val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) |
| return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional)) |
| } |
| |
| val targetSeg = { |
| // Get all the segments whose largest timestamp is smaller than target timestamp |
| val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) |
| // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. |
| if (earlierSegs.length < segmentsCopy.length) |
| Some(segmentsCopy(earlierSegs.length)) |
| else |
| None |
| } |
| |
| targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset)) |
| } |
| } |
| |
| def legacyFetchOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = { |
| // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides |
| // constant time access while being safe to use with concurrent collections unlike `toArray`. |
| val segments = logSegments.toBuffer |
| val lastSegmentHasSize = segments.last.size > 0 |
| |
| val offsetTimeArray = |
| if (lastSegmentHasSize) |
| new Array[(Long, Long)](segments.length + 1) |
| else |
| new Array[(Long, Long)](segments.length) |
| |
| for (i <- segments.indices) |
| offsetTimeArray(i) = (math.max(segments(i).baseOffset, logStartOffset), segments(i).lastModified) |
| if (lastSegmentHasSize) |
| offsetTimeArray(segments.length) = (logEndOffset, time.milliseconds) |
| |
| var startIndex = -1 |
| timestamp match { |
| case ListOffsetRequest.LATEST_TIMESTAMP => |
| startIndex = offsetTimeArray.length - 1 |
| case ListOffsetRequest.EARLIEST_TIMESTAMP => |
| startIndex = 0 |
| case _ => |
| var isFound = false |
| debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) |
| startIndex = offsetTimeArray.length - 1 |
| while (startIndex >= 0 && !isFound) { |
| if (offsetTimeArray(startIndex)._2 <= timestamp) |
| isFound = true |
| else |
| startIndex -= 1 |
| } |
| } |
| |
| val retSize = maxNumOffsets.min(startIndex + 1) |
| val ret = new Array[Long](retSize) |
| for (j <- 0 until retSize) { |
| ret(j) = offsetTimeArray(startIndex)._1 |
| startIndex -= 1 |
| } |
| // ensure that the returned seq is in descending order of offsets |
| ret.toSeq.sortBy(-_) |
| } |
| |
| /** |
| * Given a message offset, find its corresponding offset metadata in the log. |
| * If the message offset is out of range, return None to the caller. |
| */ |
| def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { |
| try { |
| val fetchDataInfo = read(offset, |
| maxLength = 1, |
| maxOffset = None, |
| minOneMessage = false, |
| includeAbortedTxns = false) |
| Some(fetchDataInfo.fetchOffsetMetadata) |
| } catch { |
| case _: OffsetOutOfRangeException => None |
| } |
| } |
| |
| /** |
| * Delete any log segments matching the given predicate function, |
| * starting with the oldest segment and moving forward until a segment doesn't match. |
| * |
| * @param predicate A function that takes in a candidate log segment and the next higher segment |
| * (if there is one) and returns true iff it is deletable |
| * @return The number of segments deleted |
| */ |
| private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { |
| lock synchronized { |
| val deletable = deletableSegments(predicate) |
| if (deletable.nonEmpty) |
| info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") |
| deleteSegments(deletable) |
| } |
| } |
| |
| private def deleteSegments(deletable: Iterable[LogSegment]): Int = { |
| maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { |
| val numToDelete = deletable.size |
| if (numToDelete > 0) { |
| // we must always have at least one segment, so if we are going to delete all the segments, create a new one first |
| if (segments.size == numToDelete) |
| roll() |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| // remove the segments for lookups |
| deletable.foreach(deleteSegment) |
| maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) |
| } |
| } |
| numToDelete |
| } |
| } |
| |
| /** |
| * Find segments starting from the oldest until the user-supplied predicate is false or the segment |
| * containing the current high watermark is reached. We do not delete segments with offsets at or beyond |
| * the high watermark to ensure that the log start offset can never exceed it. If the high watermark |
| * has not yet been initialized, no segments are eligible for deletion. |
| * |
| * A final segment that is empty will never be returned (since we would just end up re-creating it). |
| * |
| * @param predicate A function that takes in a candidate log segment and the next higher segment |
| * (if there is one) and returns true iff it is deletable |
| * @return the segments ready to be deleted |
| */ |
| private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { |
| if (segments.isEmpty || replicaHighWatermark.isEmpty) { |
| Seq.empty |
| } else { |
| val highWatermark = replicaHighWatermark.get |
| val deletable = ArrayBuffer.empty[LogSegment] |
| var segmentEntry = segments.firstEntry |
| while (segmentEntry != null) { |
| val segment = segmentEntry.getValue |
| val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) |
| val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null) |
| (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false) |
| else |
| (null, logEndOffset, segment.size == 0) |
| |
| if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) { |
| deletable += segment |
| segmentEntry = nextSegmentEntry |
| } else { |
| segmentEntry = null |
| } |
| } |
| deletable |
| } |
| } |
| |
| /** |
| * If topic deletion is enabled, delete any log segments that have either expired due to time based retention |
| * or because the log size is > retentionSize. |
| * |
| * Whether or not deletion is enabled, delete any log segments that are before the log start offset |
| */ |
| def deleteOldSegments(): Int = { |
| if (config.delete) { |
| deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() |
| } else { |
| deleteLogStartOffsetBreachedSegments() |
| } |
| } |
| |
| private def deleteRetentionMsBreachedSegments(): Int = { |
| if (config.retentionMs < 0) return 0 |
| val startMs = time.milliseconds |
| deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, |
| reason = s"retention time ${config.retentionMs}ms breach") |
| } |
| |
| private def deleteRetentionSizeBreachedSegments(): Int = { |
| if (config.retentionSize < 0 || size < config.retentionSize) return 0 |
| var diff = size - config.retentionSize |
| def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { |
| if (diff - segment.size >= 0) { |
| diff -= segment.size |
| true |
| } else { |
| false |
| } |
| } |
| |
| deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach") |
| } |
| |
| private def deleteLogStartOffsetBreachedSegments(): Int = { |
| def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = |
| nextSegmentOpt.exists(_.baseOffset <= logStartOffset) |
| |
| deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") |
| } |
| |
| def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) |
| |
| /** |
| * The size of the log in bytes |
| */ |
| def size: Long = Log.sizeInBytes(logSegments) |
| |
| /** |
| * The offset metadata of the next message that will be appended to the log |
| */ |
| def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata |
| |
| /** |
| * The offset of the next message that will be appended to the log |
| */ |
| def logEndOffset: Long = nextOffsetMetadata.messageOffset |
| |
| /** |
| * Roll the log over to a new empty log segment if necessary. |
| * |
| * @param messagesSize The messages set size in bytes. |
| * @param appendInfo log append information |
| * logSegment will be rolled if one of the following conditions met |
| * <ol> |
| * <li> The logSegment is full |
| * <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if |
| * the first message does not have a timestamp) |
| * <li> The index is full |
| * </ol> |
| * @return The currently active segment after (perhaps) rolling to a new segment |
| */ |
| private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = { |
| val segment = activeSegment |
| val now = time.milliseconds |
| |
| val maxTimestampInMessages = appendInfo.maxTimestamp |
| val maxOffsetInMessages = appendInfo.lastOffset |
| |
| if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) { |
| debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " + |
| s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " + |
| s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + |
| s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).") |
| |
| /* |
| maxOffsetInMessages - Integer.MAX_VALUE is a heuristic value for the first offset in the set of messages. |
| Since the offset in messages will not differ by more than Integer.MAX_VALUE, this is guaranteed <= the real |
| first offset in the set. Determining the true first offset in the set requires decompression, which the follower |
| is trying to avoid during log append. Prior behavior assigned new baseOffset = logEndOffset from old segment. |
| This was problematic in the case that two consecutive messages differed in offset by |
| Integer.MAX_VALUE.toLong + 2 or more. In this case, the prior behavior would roll a new log segment whose |
| base offset was too low to contain the next message. This edge case is possible when a replica is recovering a |
| highly compacted topic from scratch. |
| Note that this is only required for pre-V2 message formats because these do not store the first message offset |
| in the header. |
| */ |
| appendInfo.firstOffset match { |
| case Some(firstOffset) => roll(Some(firstOffset)) |
| case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE)) |
| } |
| } else { |
| segment |
| } |
| } |
| |
| /** |
| * Roll the log over to a new active segment starting with the current logEndOffset. |
| * This will trim the index to the exact size of the number of entries it currently contains. |
| * |
| * @return The newly rolled segment |
| */ |
| def roll(expectedNextOffset: Option[Long] = None): LogSegment = { |
| maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { |
| val start = time.hiResClockMs() |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) |
| val logFile = Log.logFile(dir, newOffset) |
| |
| if (segments.containsKey(newOffset)) { |
| // segment with the same base offset already exists and loaded |
| if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { |
| // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an |
| // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). |
| warn(s"Trying to roll a new log segment with start offset $newOffset " + |
| s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + |
| s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + |
| s" size of offset index: ${activeSegment.offsetIndex.entries}.") |
| deleteSegment(activeSegment) |
| } else { |
| throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + |
| s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + |
| s"segment is ${segments.get(newOffset)}.") |
| } |
| } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { |
| throw new KafkaException( |
| s"Trying to roll a new log segment for topic partition $topicPartition with " + |
| s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") |
| } else { |
| val offsetIdxFile = offsetIndexFile(dir, newOffset) |
| val timeIdxFile = timeIndexFile(dir, newOffset) |
| val txnIdxFile = transactionIndexFile(dir, newOffset) |
| |
| for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { |
| warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") |
| Files.delete(file.toPath) |
| } |
| |
| Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) |
| } |
| |
| // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot |
| // offset align with the new segment offset since this ensures we can recover the segment by beginning |
| // with the corresponding snapshot file and scanning the segment data. Because the segment base offset |
| // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), |
| // we manually override the state offset here prior to taking the snapshot. |
| producerStateManager.updateMapEndOffset(newOffset) |
| producerStateManager.takeSnapshot() |
| |
| val segment = LogSegment.open(dir, |
| baseOffset = newOffset, |
| config, |
| time = time, |
| fileAlreadyExists = false, |
| initFileSize = initFileSize, |
| preallocate = config.preallocate) |
| addSegment(segment) |
| // We need to update the segment base offset and append position data of the metadata when log rolls. |
| // The next offset should not change. |
| updateLogEndOffset(nextOffsetMetadata.messageOffset) |
| // schedule an asynchronous flush of the old segment |
| scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) |
| |
| info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") |
| |
| segment |
| } |
| } |
| } |
| |
| /** |
| * The number of messages appended to the log since the last flush |
| */ |
| def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint |
| |
| /** |
| * Flush all log segments |
| */ |
| def flush(): Unit = flush(this.logEndOffset) |
| |
| /** |
| * Flush log segments for all offsets up to offset-1 |
| * |
| * @param offset The offset to flush up to (non-inclusive); the new recovery point |
| */ |
| def flush(offset: Long) : Unit = { |
| maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { |
| if (offset <= this.recoveryPoint) |
| return |
| debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + |
| s"unflushed: $unflushedMessages") |
| for (segment <- logSegments(this.recoveryPoint, offset)) |
| segment.flush() |
| |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| if (offset > this.recoveryPoint) { |
| this.recoveryPoint = offset |
| lastFlushedTime.set(time.milliseconds) |
| } |
| } |
| } |
| } |
| |
| /** |
| * Cleanup old producer snapshots after the recovery point is checkpointed. It is useful to retain |
| * the snapshots from the recent segments in case we need to truncate and rebuild the producer state. |
| * Otherwise, we would always need to rebuild from the earliest segment. |
| * |
| * More specifically: |
| * |
| * 1. We always retain the producer snapshot from the last two segments. This solves the common case |
| * of truncating to an offset within the active segment, and the rarer case of truncating to the previous segment. |
| * |
| * 2. We only delete snapshots for offsets less than the recovery point. The recovery point is checkpointed |
| * periodically and it can be behind after a hard shutdown. Since recovery starts from the recovery point, the logic |
| * of rebuilding the producer snapshots in one pass and without loading older segments is simpler if we always |
| * have a producer snapshot for all segments being recovered. |
| * |
| * Return the minimum snapshots offset that was retained. |
| */ |
| def deleteSnapshotsAfterRecoveryPointCheckpoint(): Long = { |
| val minOffsetToRetain = minSnapshotsOffsetToRetain |
| producerStateManager.deleteSnapshotsBefore(minOffsetToRetain) |
| minOffsetToRetain |
| } |
| |
| // Visible for testing, see `deleteSnapshotsAfterRecoveryPointCheckpoint()` for details |
| private[log] def minSnapshotsOffsetToRetain: Long = { |
| lock synchronized { |
| val twoSegmentsMinOffset = lowerSegment(activeSegment.baseOffset).getOrElse(activeSegment).baseOffset |
| // Prefer segment base offset |
| val recoveryPointOffset = lowerSegment(recoveryPoint).map(_.baseOffset).getOrElse(recoveryPoint) |
| math.min(recoveryPointOffset, twoSegmentsMinOffset) |
| } |
| } |
| |
| private def lowerSegment(offset: Long): Option[LogSegment] = |
| Option(segments.lowerEntry(offset)).map(_.getValue) |
| |
| /** |
| * Completely delete this log directory and all contents from the file system with no delay |
| */ |
| private[log] def delete() { |
| maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") { |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| removeLogMetrics() |
| logSegments.foreach(_.deleteIfExists()) |
| segments.clear() |
| leaderEpochCache.foreach(_.clear()) |
| Utils.delete(dir) |
| // File handlers will be closed if this log is deleted |
| isMemoryMappedBufferClosed = true |
| } |
| } |
| } |
| |
| // visible for testing |
| private[log] def takeProducerSnapshot(): Unit = lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| producerStateManager.takeSnapshot() |
| } |
| |
| // visible for testing |
| private[log] def latestProducerSnapshotOffset: Option[Long] = lock synchronized { |
| producerStateManager.latestSnapshotOffset |
| } |
| |
| // visible for testing |
| private[log] def oldestProducerSnapshotOffset: Option[Long] = lock synchronized { |
| producerStateManager.oldestSnapshotOffset |
| } |
| |
| // visible for testing |
| private[log] def latestProducerStateEndOffset: Long = lock synchronized { |
| producerStateManager.mapEndOffset |
| } |
| |
| /** |
| * Truncate this log so that it ends with the greatest offset < targetOffset. |
| * |
| * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. |
| * @return True iff targetOffset < logEndOffset |
| */ |
| private[log] def truncateTo(targetOffset: Long): Boolean = { |
| maybeHandleIOException(s"Error while truncating log to offset $targetOffset for $topicPartition in dir ${dir.getParent}") { |
| if (targetOffset < 0) |
| throw new IllegalArgumentException(s"Cannot truncate partition $topicPartition to a negative offset (%d).".format(targetOffset)) |
| if (targetOffset >= logEndOffset) { |
| info(s"Truncating to $targetOffset has no effect as the largest offset in the log is ${logEndOffset - 1}") |
| false |
| } else { |
| info(s"Truncating to offset $targetOffset") |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| if (segments.firstEntry.getValue.baseOffset > targetOffset) { |
| truncateFullyAndStartAt(targetOffset) |
| } else { |
| val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) |
| deletable.foreach(deleteSegment) |
| activeSegment.truncateTo(targetOffset) |
| updateLogEndOffset(targetOffset) |
| this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) |
| this.logStartOffset = math.min(targetOffset, this.logStartOffset) |
| leaderEpochCache.foreach(_.truncateFromEnd(targetOffset)) |
| loadProducerState(targetOffset, reloadFromCleanShutdown = false) |
| } |
| true |
| } |
| } |
| } |
| } |
| |
| /** |
| * Delete all data in the log and start at the new offset |
| * |
| * @param newOffset The new offset to start the log with |
| */ |
| private[log] def truncateFullyAndStartAt(newOffset: Long) { |
| maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") { |
| debug(s"Truncate and start at offset $newOffset") |
| lock synchronized { |
| checkIfMemoryMappedBufferClosed() |
| val segmentsToDelete = logSegments.toList |
| segmentsToDelete.foreach(deleteSegment) |
| addSegment(LogSegment.open(dir, |
| baseOffset = newOffset, |
| config = config, |
| time = time, |
| fileAlreadyExists = false, |
| initFileSize = initFileSize, |
| preallocate = config.preallocate)) |
| updateLogEndOffset(newOffset) |
| leaderEpochCache.foreach(_.clearAndFlush()) |
| |
| producerStateManager.truncate() |
| producerStateManager.updateMapEndOffset(newOffset) |
| updateFirstUnstableOffset() |
| |
| this.recoveryPoint = math.min(newOffset, this.recoveryPoint) |
| this.logStartOffset = newOffset |
| } |
| } |
| } |
| |
| /** |
| * The time this log is last known to have been fully flushed to disk |
| */ |
| def lastFlushTime: Long = lastFlushedTime.get |
| |
| /** |
| * The active segment that is currently taking appends |
| */ |
| def activeSegment = segments.lastEntry.getValue |
| |
| /** |
| * All the log segments in this log ordered from oldest to newest |
| */ |
| def logSegments: Iterable[LogSegment] = segments.values.asScala |
| |
| /** |
| * Get all segments beginning with the segment that includes "from" and ending with the segment |
| * that includes up to "to-1" or the end of the log (if to > logEndOffset) |
| */ |
| def logSegments(from: Long, to: Long): Iterable[LogSegment] = { |
| lock synchronized { |
| val view = Option(segments.floorKey(from)).map { floor => |
| segments.subMap(floor, to) |
| }.getOrElse(segments.headMap(to)) |
| view.values.asScala |
| } |
| } |
| |
| override def toString = "Log(" + dir + ")" |
| |
| /** |
| * This method performs an asynchronous log segment delete by doing the following: |
| * <ol> |
| * <li>It removes the segment from the segment map so that it will no longer be used for reads. |
| * <li>It renames the index and log files by appending .deleted to the respective file name |
| * <li>It schedules an asynchronous delete operation to occur in the future |
| * </ol> |
| * This allows reads to happen concurrently without synchronization and without the possibility of physically |
| * deleting a file while it is being read from. |
| * |
| * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded |
| * or the immediate caller will catch and handle IOException |
| * |
| * @param segment The log segment to schedule for deletion |
| */ |
| private def deleteSegment(segment: LogSegment) { |
| info(s"Scheduling log segment [baseOffset ${segment.baseOffset}, size ${segment.size}] for deletion.") |
| lock synchronized { |
| segments.remove(segment.baseOffset) |
| asyncDeleteSegment(segment) |
| } |
| } |
| |
| /** |
| * Perform an asynchronous delete on the given file. |
| * |
| * This method assumes that the file exists and the method is not thread-safe. |
| * |
| * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because |
| * it is either called before all logs are loaded or the caller will catch and handle IOException |
| * |
| * @throws IOException if the file can't be renamed and still exists |
| */ |
| private def asyncDeleteSegment(segment: LogSegment) { |
| segment.changeFileSuffixes("", Log.DeletedFileSuffix) |
| def deleteSeg() { |
| info(s"Deleting segment ${segment.baseOffset}") |
| maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { |
| segment.deleteIfExists() |
| } |
| } |
| scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs) |
| } |
| |
| /** |
| * Swap one or more new segment in place and delete one or more existing segments in a crash-safe manner. The old |
| * segments will be asynchronously deleted. |
| * |
| * This method does not need to convert IOException to KafkaStorageException because it is either called before all logs are loaded |
| * or the caller will catch and handle IOException |
| * |
| * The sequence of operations is: |
| * <ol> |
| * <li> Cleaner creates one or more new segments with suffix .cleaned and invokes replaceSegments(). |
| * If broker crashes at this point, the clean-and-swap operation is aborted and |
| * the .cleaned files are deleted on recovery in loadSegments(). |
| * <li> New segments are renamed .swap. If the broker crashes before all segments were renamed to .swap, the |
| * clean-and-swap operation is aborted - .cleaned as well as .swap files are deleted on recovery in |
| * loadSegments(). We detect this situation by maintaining a specific order in which files are renamed from |
| * .cleaned to .swap. Basically, files are renamed in descending order of offsets. On recovery, all .swap files |
| * whose offset is greater than the minimum-offset .clean file are deleted. |
| * <li> If the broker crashes after all new segments were renamed to .swap, the operation is completed, the swap |
| * operation is resumed on recovery as described in the next step. |
| * <li> Old segment files are renamed to .deleted and asynchronous delete is scheduled. |
| * If the broker crashes, any .deleted files left behind are deleted on recovery in loadSegments(). |
| * replaceSegments() is then invoked to complete the swap with newSegment recreated from |
| * the .swap file and oldSegments containing segments which were not renamed before the crash. |
| * <li> Swap segment(s) are renamed to replace the existing segments, completing this operation. |
| * If the broker crashes, any .deleted files which may be left behind are deleted |
| * on recovery in loadSegments(). |
| * </ol> |
| * |
| * @param newSegments The new log segment to add to the log |
| * @param oldSegments The old log segments to delete from the log |
| * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash |
| */ |
| private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) { |
| lock synchronized { |
| val sortedNewSegments = newSegments.sortBy(_.baseOffset) |
| // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments |
| // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() |
| // multiple times for the same segment. |
| val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset) |
| |
| checkIfMemoryMappedBufferClosed() |
| // need to do this in two phases to be crash safe AND do the delete asynchronously |
| // if we crash in the middle of this we complete the swap in loadSegments() |
| if (!isRecoveredSwapFile) |
| sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) |
| sortedNewSegments.reverse.foreach(addSegment(_)) |
| |
| // delete the old files |
| for (seg <- sortedOldSegments) { |
| // remove the index entry |
| if (seg.baseOffset != sortedNewSegments.head.baseOffset) |
| segments.remove(seg.baseOffset) |
| // delete segment |
| asyncDeleteSegment(seg) |
| } |
| // okay we are safe now, remove the swap suffix |
| sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) |
| } |
| } |
| |
| /** |
| * This function does not acquire Log.lock. The caller has to make sure log segments don't get deleted during |
| * this call, and also protects against calling this function on the same segment in parallel. |
| * |
| * Currently, it is used by LogCleaner threads on log compact non-active segments only with LogCleanerManager's lock |
| * to ensure no other logcleaner threads and retention thread can work on the same segment. |
| */ |
| private[log] def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = { |
| segments.map { |
| segment => |
| segment.getFirstBatchTimestamp() |
| } |
| } |
| |
| /** |
| * remove deleted log metrics |
| */ |
| private[log] def removeLogMetrics(): Unit = { |
| removeMetric("NumLogSegments", tags) |
| removeMetric("LogStartOffset", tags) |
| removeMetric("LogEndOffset", tags) |
| removeMetric("Size", tags) |
| } |
| |
| /** |
| * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it. |
| * @param segment The segment to add |
| */ |
| @threadsafe |
| def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment) |
| |
| private def maybeHandleIOException[T](msg: => String)(fun: => T): T = { |
| try { |
| fun |
| } catch { |
| case e: IOException => |
| logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e) |
| throw new KafkaStorageException(msg, e) |
| } |
| } |
| |
| private[log] def retryOnOffsetOverflow[T](fn: => T): T = { |
| while (true) { |
| try { |
| return fn |
| } catch { |
| case e: LogSegmentOffsetOverflowException => |
| info(s"Caught segment overflow error: ${e.getMessage}. Split segment and retry.") |
| splitOverflowedSegment(e.segment) |
| } |
| } |
| throw new IllegalStateException() |
| } |
| |
| /** |
| * Split a segment into one or more segments such that there is no offset overflow in any of them. The |
| * resulting segments will contain the exact same messages that are present in the input segment. On successful |
| * completion of this method, the input segment will be deleted and will be replaced by the resulting new segments. |
| * See replaceSegments for recovery logic, in case the broker dies in the middle of this operation. |
| * <p>Note that this method assumes we have already determined that the segment passed in contains records that cause |
| * offset overflow.</p> |
| * <p>The split logic overloads the use of .clean files that LogCleaner typically uses to make the process of replacing |
| * the input segment with multiple new segments atomic and recoverable in the event of a crash. See replaceSegments |
| * and completeSwapOperations for the implementation to make this operation recoverable on crashes.</p> |
| * @param segment Segment to split |
| * @return List of new segments that replace the input segment |
| */ |
| private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = { |
| require(isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}") |
| require(segment.hasOverflow, "Split operation is only permitted for segments with overflow") |
| |
| info(s"Splitting overflowed segment $segment") |
| |
| val newSegments = ListBuffer[LogSegment]() |
| try { |
| var position = 0 |
| val sourceRecords = segment.log |
| |
| while (position < sourceRecords.sizeInBytes) { |
| val firstBatch = sourceRecords.batchesFrom(position).asScala.head |
| val newSegment = LogCleaner.createNewCleanedSegment(this, firstBatch.baseOffset) |
| newSegments += newSegment |
| |
| val bytesAppended = newSegment.appendFromFile(sourceRecords, position) |
| if (bytesAppended == 0) |
| throw new IllegalStateException(s"Failed to append records from position $position in $segment") |
| |
| position += bytesAppended |
| } |
| |
| // prepare new segments |
| var totalSizeOfNewSegments = 0 |
| newSegments.foreach { splitSegment => |
| splitSegment.onBecomeInactiveSegment() |
| splitSegment.flush() |
| splitSegment.lastModified = segment.lastModified |
| totalSizeOfNewSegments += splitSegment.log.sizeInBytes |
| } |
| // size of all the new segments combined must equal size of the original segment |
| if (totalSizeOfNewSegments != segment.log.sizeInBytes) |
| throw new IllegalStateException("Inconsistent segment sizes after split" + |
| s" before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments") |
| |
| // replace old segment with new ones |
| info(s"Replacing overflowed segment $segment with split segments $newSegments") |
| replaceSegments(newSegments.toList, List(segment), isRecoveredSwapFile = false) |
| newSegments.toList |
| } catch { |
| case e: Exception => |
| newSegments.foreach { splitSegment => |
| splitSegment.close() |
| splitSegment.deleteIfExists() |
| } |
| throw e |
| } |
| } |
| } |
| |
| /** |
| * Helper functions for logs |
| */ |
| object Log { |
| |
| /** a log file */ |
| val LogFileSuffix = ".log" |
| |
| /** an index file */ |
| val IndexFileSuffix = ".index" |
| |
| /** a time index file */ |
| val TimeIndexFileSuffix = ".timeindex" |
| |
| val ProducerSnapshotFileSuffix = ".snapshot" |
| |
| /** an (aborted) txn index */ |
| val TxnIndexFileSuffix = ".txnindex" |
| |
| /** a file that is scheduled to be deleted */ |
| val DeletedFileSuffix = ".deleted" |
| |
| /** A temporary file that is being used for log cleaning */ |
| val CleanedFileSuffix = ".cleaned" |
| |
| /** A temporary file used when swapping files into the log */ |
| val SwapFileSuffix = ".swap" |
| |
| /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher. |
| * This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be |
| * avoided by passing in the recovery point, however finding the correct position to do this |
| * requires accessing the offset index which may not be safe in an unclean shutdown. |
| * For more information see the discussion in PR#2104 |
| */ |
| val CleanShutdownFile = ".kafka_cleanshutdown" |
| |
| /** a directory that is scheduled to be deleted */ |
| val DeleteDirSuffix = "-delete" |
| |
| /** a directory that is used for future partition */ |
| val FutureDirSuffix = "-future" |
| |
| private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") |
| private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix") |
| |
| val UnknownLogStartOffset = -1L |
| |
| def apply(dir: File, |
| config: LogConfig, |
| logStartOffset: Long, |
| recoveryPoint: Long, |
| scheduler: Scheduler, |
| brokerTopicStats: BrokerTopicStats, |
| time: Time = Time.SYSTEM, |
| maxProducerIdExpirationMs: Int, |
| producerIdExpirationCheckIntervalMs: Int, |
| logDirFailureChannel: LogDirFailureChannel): Log = { |
| val topicPartition = Log.parseTopicPartitionName(dir) |
| val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) |
| new Log(dir, config, logStartOffset, recoveryPoint, scheduler, brokerTopicStats, time, maxProducerIdExpirationMs, |
| producerIdExpirationCheckIntervalMs, topicPartition, producerStateManager, logDirFailureChannel) |
| } |
| |
| /** |
| * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros |
| * so that ls sorts the files numerically. |
| * |
| * @param offset The offset to use in the file name |
| * @return The filename |
| */ |
| def filenamePrefixFromOffset(offset: Long): String = { |
| val nf = NumberFormat.getInstance() |
| nf.setMinimumIntegerDigits(20) |
| nf.setMaximumFractionDigits(0) |
| nf.setGroupingUsed(false) |
| nf.format(offset) |
| } |
| |
| /** |
| * Construct a log file name in the given dir with the given base offset and the given suffix |
| * |
| * @param dir The directory in which the log will reside |
| * @param offset The base offset of the log file |
| * @param suffix The suffix to be appended to the file name (e.g. "", ".deleted", ".cleaned", ".swap", etc.) |
| */ |
| def logFile(dir: File, offset: Long, suffix: String = ""): File = |
| new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix) |
| |
| /** |
| * Return a directory name to rename the log directory to for async deletion. |
| * The name will be in the following format: "topic-partitionId.uniqueId-delete". |
| * If the topic name is too long, it will be truncated to prevent the total name |
| * from exceeding 255 characters. |
| */ |
| def logDeleteDirName(topicPartition: TopicPartition): String = { |
| val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") |
| val suffix = s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}" |
| val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size) |
| s"${topicPartition.topic().substring(0, prefixLength)}${suffix}" |
| } |
| |
| /** |
| * Return a future directory name for the given topic partition. The name will be in the following |
| * format: topic-partition.uniqueId-future where topic, partition and uniqueId are variables. |
| */ |
| def logFutureDirName(topicPartition: TopicPartition): String = { |
| logDirNameWithSuffix(topicPartition, FutureDirSuffix) |
| } |
| |
| private def logDirNameWithSuffix(topicPartition: TopicPartition, suffix: String): String = { |
| val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") |
| s"${logDirName(topicPartition)}.$uniqueId$suffix" |
| } |
| |
| /** |
| * Return a directory name for the given topic partition. The name will be in the following |
| * format: topic-partition where topic, partition are variables. |
| */ |
| def logDirName(topicPartition: TopicPartition): String = { |
| s"${topicPartition.topic}-${topicPartition.partition}" |
| } |
| |
| /** |
| * Construct an index file name in the given dir using the given base offset and the given suffix |
| * |
| * @param dir The directory in which the log will reside |
| * @param offset The base offset of the log file |
| * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) |
| */ |
| def offsetIndexFile(dir: File, offset: Long, suffix: String = ""): File = |
| new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix + suffix) |
| |
| /** |
| * Construct a time index file name in the given dir using the given base offset and the given suffix |
| * |
| * @param dir The directory in which the log will reside |
| * @param offset The base offset of the log file |
| * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) |
| */ |
| def timeIndexFile(dir: File, offset: Long, suffix: String = ""): File = |
| new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix + suffix) |
| |
| def deleteFileIfExists(file: File, suffix: String = ""): Unit = |
| Files.deleteIfExists(new File(file.getPath + suffix).toPath) |
| |
| /** |
| * Construct a producer id snapshot file using the given offset. |
| * |
| * @param dir The directory in which the log will reside |
| * @param offset The last offset (exclusive) included in the snapshot |
| */ |
| def producerSnapshotFile(dir: File, offset: Long): File = |
| new File(dir, filenamePrefixFromOffset(offset) + ProducerSnapshotFileSuffix) |
| |
| /** |
| * Construct a transaction index file name in the given dir using the given base offset and the given suffix |
| * |
| * @param dir The directory in which the log will reside |
| * @param offset The base offset of the log file |
| * @param suffix The suffix to be appended to the file name ("", ".deleted", ".cleaned", ".swap", etc.) |
| */ |
| def transactionIndexFile(dir: File, offset: Long, suffix: String = ""): File = |
| new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix + suffix) |
| |
| def offsetFromFileName(filename: String): Long = { |
| filename.substring(0, filename.indexOf('.')).toLong |
| } |
| |
| def offsetFromFile(file: File): Long = { |
| offsetFromFileName(file.getName) |
| } |
| |
| /** |
| * Calculate a log's size (in bytes) based on its log segments |
| * |
| * @param segments The log segments to calculate the size of |
| * @return Sum of the log segments' sizes (in bytes) |
| */ |
| def sizeInBytes(segments: Iterable[LogSegment]): Long = |
| segments.map(_.size.toLong).sum |
| |
| /** |
| * Parse the topic and partition out of the directory name of a log |
| */ |
| def parseTopicPartitionName(dir: File): TopicPartition = { |
| if (dir == null) |
| throw new KafkaException("dir should not be null") |
| |
| def exception(dir: File): KafkaException = { |
| new KafkaException(s"Found directory ${dir.getCanonicalPath}, '${dir.getName}' is not in the form of " + |
| "topic-partition or topic-partition.uniqueId-delete (if marked for deletion).\n" + |
| "Kafka's log directories (and children) should only contain Kafka topic data.") |
| } |
| |
| val dirName = dir.getName |
| if (dirName == null || dirName.isEmpty || !dirName.contains('-')) |
| throw exception(dir) |
| if (dirName.endsWith(DeleteDirSuffix) && !DeleteDirPattern.matcher(dirName).matches || |
| dirName.endsWith(FutureDirSuffix) && !FutureDirPattern.matcher(dirName).matches) |
| throw exception(dir) |
| |
| val name: String = |
| if (dirName.endsWith(DeleteDirSuffix) || dirName.endsWith(FutureDirSuffix)) dirName.substring(0, dirName.lastIndexOf('.')) |
| else dirName |
| |
| val index = name.lastIndexOf('-') |
| val topic = name.substring(0, index) |
| val partitionString = name.substring(index + 1) |
| if (topic.isEmpty || partitionString.isEmpty) |
| throw exception(dir) |
| |
| val partition = |
| try partitionString.toInt |
| catch { case _: NumberFormatException => throw exception(dir) } |
| |
| new TopicPartition(topic, partition) |
| } |
| |
| private def isIndexFile(file: File): Boolean = { |
| val filename = file.getName |
| filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix) |
| } |
| |
| private def isLogFile(file: File): Boolean = |
| file.getPath.endsWith(LogFileSuffix) |
| |
| } |