| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.log |
| |
| import java.nio.ByteBuffer |
| |
| import kafka.common.LongRef |
| import kafka.message.{CompressionCodec, InvalidMessageException, NoCompressionCodec} |
| import org.apache.kafka.common.errors.InvalidTimestampException |
| import org.apache.kafka.common.record._ |
| |
| import scala.collection.mutable |
| import scala.collection.JavaConverters._ |
| |
| private[kafka] object LogValidator { |
| |
| /** |
| * Update the offsets for this message set and do further validation on messages including: |
| * 1. Messages for compacted topics must have keys |
| * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets |
| * starting from 0. |
| * 3. When magic value = 1, validate and maybe overwrite timestamps of messages. |
| * |
| * This method will convert the messages in the following scenarios: |
| * A. Magic value of a message = 0 and messageFormatVersion is 1 |
| * B. Magic value of a message = 1 and messageFormatVersion is 0 |
| * |
| * If no format conversion or value overwriting is required for messages, this method will perform in-place |
| * operations and avoid re-compression. |
| * |
| * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset |
| * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed. |
| */ |
| private[kafka] def validateMessagesAndAssignOffsets(records: MemoryRecords, |
| offsetCounter: LongRef, |
| now: Long, |
| sourceCodec: CompressionCodec, |
| targetCodec: CompressionCodec, |
| compactedTopic: Boolean = false, |
| messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE, |
| messageTimestampType: TimestampType, |
| messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { |
| if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { |
| // check the magic value |
| if (!records.hasMatchingShallowMagic(messageFormatVersion)) |
| convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType, |
| messageTimestampDiffMaxMs, messageFormatVersion) |
| else |
| // Do in-place validation, offset assignment and maybe set timestamp |
| assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType, |
| messageTimestampDiffMaxMs) |
| } else { |
| validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, |
| messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs) |
| } |
| } |
| |
| private def convertAndAssignOffsetsNonCompressed(records: MemoryRecords, |
| offsetCounter: LongRef, |
| compactedTopic: Boolean, |
| now: Long, |
| timestampType: TimestampType, |
| messageTimestampDiffMaxMs: Long, |
| toMagicValue: Byte): ValidationAndOffsetAssignResult = { |
| val sizeInBytesAfterConversion = records.shallowEntries.asScala.map { logEntry => |
| logEntry.record.convertedSize(toMagicValue) |
| }.sum |
| |
| val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion) |
| val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType, |
| offsetCounter.value, now) |
| |
| records.shallowEntries.asScala.foreach { logEntry => |
| val record = logEntry.record |
| validateKey(record, compactedTopic) |
| validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs) |
| builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), record) |
| } |
| |
| val convertedRecords = builder.build() |
| val info = builder.info |
| ValidationAndOffsetAssignResult( |
| validatedRecords = convertedRecords, |
| maxTimestamp = info.maxTimestamp, |
| shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, |
| messageSizeMaybeChanged = true) |
| } |
| |
| private def assignOffsetsNonCompressed(records: MemoryRecords, |
| offsetCounter: LongRef, |
| now: Long, |
| compactedTopic: Boolean, |
| timestampType: TimestampType, |
| timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { |
| var maxTimestamp = Record.NO_TIMESTAMP |
| var offsetOfMaxTimestamp = -1L |
| val firstOffset = offsetCounter.value |
| |
| for (entry <- records.shallowEntries.asScala) { |
| val record = entry.record |
| validateKey(record, compactedTopic) |
| |
| val offset = offsetCounter.getAndIncrement() |
| entry.setOffset(offset) |
| |
| if (record.magic > Record.MAGIC_VALUE_V0) { |
| validateTimestamp(record, now, timestampType, timestampDiffMaxMs) |
| |
| if (timestampType == TimestampType.LOG_APPEND_TIME) |
| entry.setLogAppendTime(now) |
| else if (record.timestamp > maxTimestamp) { |
| maxTimestamp = record.timestamp |
| offsetOfMaxTimestamp = offset |
| } |
| } |
| } |
| |
| if (timestampType == TimestampType.LOG_APPEND_TIME) { |
| maxTimestamp = now |
| offsetOfMaxTimestamp = firstOffset |
| } |
| |
| ValidationAndOffsetAssignResult( |
| validatedRecords = records, |
| maxTimestamp = maxTimestamp, |
| shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp, |
| messageSizeMaybeChanged = false) |
| } |
| |
| /** |
| * We cannot do in place assignment in one of the following situations: |
| * 1. Source and target compression codec are different |
| * 2. When magic value to use is 0 because offsets need to be overwritten |
| * 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten. |
| * 4. Message format conversion is needed. |
| */ |
| def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords, |
| offsetCounter: LongRef, |
| now: Long, |
| sourceCodec: CompressionCodec, |
| targetCodec: CompressionCodec, |
| compactedTopic: Boolean = false, |
| messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE, |
| messageTimestampType: TimestampType, |
| messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = { |
| // No in place assignment situation 1 and 2 |
| var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Record.MAGIC_VALUE_V0 |
| |
| var maxTimestamp = Record.NO_TIMESTAMP |
| val expectedInnerOffset = new LongRef(0) |
| val validatedRecords = new mutable.ArrayBuffer[Record] |
| |
| records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry => |
| val record = logEntry.record |
| record.ensureValid() |
| validateKey(record, compactedTopic) |
| |
| if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) { |
| // Validate the timestamp |
| validateTimestamp(record, now, messageTimestampType, messageTimestampDiffMaxMs) |
| // Check if we need to overwrite offset, no in place assignment situation 3 |
| if (logEntry.offset != expectedInnerOffset.getAndIncrement()) |
| inPlaceAssignment = false |
| if (record.timestamp > maxTimestamp) |
| maxTimestamp = record.timestamp |
| } |
| |
| if (sourceCodec != NoCompressionCodec && logEntry.isCompressed) |
| throw new InvalidMessageException("Compressed outer record should not have an inner record with a " + |
| s"compression attribute set: $record") |
| |
| // No in place assignment situation 4 |
| if (record.magic != messageFormatVersion) |
| inPlaceAssignment = false |
| |
| validatedRecords += record.convert(messageFormatVersion) |
| } |
| |
| if (!inPlaceAssignment) { |
| val entries = validatedRecords.map(record => LogEntry.create(offsetCounter.getAndIncrement(), record)) |
| val builder = MemoryRecords.builderWithEntries(messageTimestampType, CompressionType.forId(targetCodec.codec), |
| now, entries.asJava) |
| val updatedRecords = builder.build() |
| val info = builder.info |
| ValidationAndOffsetAssignResult( |
| validatedRecords = updatedRecords, |
| maxTimestamp = info.maxTimestamp, |
| shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp, |
| messageSizeMaybeChanged = true) |
| } else { |
| // we can update the wrapper message only and write the compressed payload as is |
| val entry = records.shallowEntries.iterator.next() |
| val offset = offsetCounter.addAndGet(validatedRecords.size) - 1 |
| entry.setOffset(offset) |
| |
| val shallowTimestamp = if (messageTimestampType == TimestampType.LOG_APPEND_TIME) now else maxTimestamp |
| if (messageTimestampType == TimestampType.LOG_APPEND_TIME) |
| entry.setLogAppendTime(shallowTimestamp) |
| else if (messageTimestampType == TimestampType.CREATE_TIME) |
| entry.setCreateTime(shallowTimestamp) |
| |
| ValidationAndOffsetAssignResult(validatedRecords = records, |
| maxTimestamp = shallowTimestamp, |
| shallowOffsetOfMaxTimestamp = offset, |
| messageSizeMaybeChanged = false) |
| } |
| } |
| |
| private def validateKey(record: Record, compactedTopic: Boolean) { |
| if (compactedTopic && !record.hasKey) |
| throw new InvalidMessageException("Compacted topic cannot accept message without key.") |
| } |
| |
| /** |
| * This method validates the timestamps of a message. |
| * If the message is using create time, this method checks if it is within acceptable range. |
| */ |
| private def validateTimestamp(record: Record, |
| now: Long, |
| timestampType: TimestampType, |
| timestampDiffMaxMs: Long) { |
| if (timestampType == TimestampType.CREATE_TIME && math.abs(record.timestamp - now) > timestampDiffMaxMs) |
| throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out of range. " + |
| s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}") |
| if (record.timestampType == TimestampType.LOG_APPEND_TIME) |
| throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer should not set " + |
| s"timestamp type to LogAppendTime.") |
| } |
| |
| case class ValidationAndOffsetAssignResult(validatedRecords: MemoryRecords, |
| maxTimestamp: Long, |
| shallowOffsetOfMaxTimestamp: Long, |
| messageSizeMaybeChanged: Boolean) |
| |
| } |