blob: 47d4bd4248d282ba7eb718cd7da7b92d5762c3d3 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.log
import java.nio.ByteBuffer
import kafka.common.LongRef
import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec}
import org.apache.kafka.common.errors.InvalidTimestampException
import org.apache.kafka.common.record._
import scala.collection.mutable
import scala.collection.JavaConverters._
private[kafka] object LogValidator {
* Update the offsets for this message set and do further validation on messages including:
* 1. Messages for compacted topics must have keys
* 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
* starting from 0.
* 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
* This method will convert the messages in the following scenarios:
* A. Magic value of a message = 0 and messageFormatVersion is 1
* B. Magic value of a message = 1 and messageFormatVersion is 0
* If no format conversion or value overwriting is required for messages, this method will perform in-place
* operations and avoid re-compression.
* Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
* of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords,
offsetCounter: LongRef,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean = false,
messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
messageTimestampType: TimestampType,
messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingShallowMagic(messageFormatVersion))
convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
messageTimestampDiffMaxMs, messageFormatVersion)
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
} else {
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords,
offsetCounter: LongRef,
compactedTopic: Boolean,
now: Long,
timestampType: TimestampType,
messageTimestampDiffMaxMs: Long,
toMagicValue: Byte): ValidationAndOffsetAssignResult = {
val sizeInBytesAfterConversion = { logEntry =>
val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
offsetCounter.value, now)
records.shallowEntries.asScala.foreach { logEntry =>
val record = logEntry.record
validateKey(record, compactedTopic)
validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs)
builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), record)
val convertedRecords =
val info =
validatedRecords = convertedRecords,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
messageSizeMaybeChanged = true)
private def assignOffsetsNonCompressed(records: MemoryRecords,
offsetCounter: LongRef,
now: Long,
compactedTopic: Boolean,
timestampType: TimestampType,
timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
var maxTimestamp = Record.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
val firstOffset = offsetCounter.value
for (entry <- records.shallowEntries.asScala) {
val record = entry.record
validateKey(record, compactedTopic)
val offset = offsetCounter.getAndIncrement()
if (record.magic > Record.MAGIC_VALUE_V0) {
validateTimestamp(record, now, timestampType, timestampDiffMaxMs)
if (timestampType == TimestampType.LOG_APPEND_TIME)
else if (record.timestamp > maxTimestamp) {
maxTimestamp = record.timestamp
offsetOfMaxTimestamp = offset
if (timestampType == TimestampType.LOG_APPEND_TIME) {
maxTimestamp = now
offsetOfMaxTimestamp = firstOffset
validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp,
messageSizeMaybeChanged = false)
* We cannot do in place assignment in one of the following situations:
* 1. Source and target compression codec are different
* 2. When magic value to use is 0 because offsets need to be overwritten
* 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.
* 4. Message format conversion is needed.
def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
offsetCounter: LongRef,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean = false,
messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
messageTimestampType: TimestampType,
messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
// No in place assignment situation 1 and 2
var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0
var maxTimestamp = Record.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry =>
val record = logEntry.record
validateKey(record, compactedTopic)
if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) {
// Validate the timestamp
validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs)
// Check if we need to overwrite offset, no in place assignment situation 3
if (logEntry.offset != expectedInnerOffset.getAndIncrement())
inPlaceAssignment = false
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
if (sourceCodec != NoCompressionCodec && logEntry.isCompressed)
throw new InvalidMessageException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
// No in place assignment situation 4
if (record.magic != messageFormatVersion)
inPlaceAssignment = false
validatedRecords += record.convert(messageFormatVersion)
if (!inPlaceAssignment) {
val entries = => LogEntry.create(offsetCounter.getAndIncrement(), record))
val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec),
now, entries.asJava)
val updatedRecords =
val info =
validatedRecords = updatedRecords,
maxTimestamp = info.maxTimestamp,
shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
messageSizeMaybeChanged = true)
} else {
// ensure the inner messages are valid
// we can update the wrapper message only and write the compressed payload as is
val entry =
val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
val shallowTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp
if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
else if (messageTimestampType == TimestampType.CREATE_TIME)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = shallowTimestamp,
shallowOffsetOfMaxTimestamp = offset,
messageSizeMaybeChanged = false)
private def validateKey(record: Record, compactedTopic: Boolean) {
if (compactedTopic && !record.hasKey)
throw new InvalidMessageException("Compacted topic cannot accept message without key.")
* This method validates the timestamps of a message.
* If the message is using create time, this method checks if it is within acceptable range.
private def validateTimestamp(record: Record,
now: Long,
timestampType: TimestampType,
timestampDiffMaxMs: Long) {
if (timestampType == TimestampType.CREATE_TIME && math.abs(record.timestamp - now) > timestampDiffMaxMs)
throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " +
s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
if (record.timestampType == TimestampType.LOG_APPEND_TIME)
throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " +
s"timestamp type to LogAppendTime.")
case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords,
maxTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
messageSizeMaybeChanged: Boolean)