| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package kafka.coordinator.transaction |
| |
| import java.io.PrintStream |
| import java.nio.ByteBuffer |
| import java.nio.charset.StandardCharsets |
| import kafka.internals.generated.{TransactionLogKey, TransactionLogValue} |
| import org.apache.kafka.clients.consumer.ConsumerRecord |
| import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil} |
| import org.apache.kafka.common.record.{CompressionType, Record, RecordBatch} |
| import org.apache.kafka.common.{MessageFormatter, TopicPartition} |
| |
| import scala.collection.mutable |
| import scala.jdk.CollectionConverters._ |
| |
| /** |
| * Messages stored for the transaction topic represent the producer id and transactional status of the corresponding |
| * transactional id, which have versions for both the key and value fields. Key and value |
| * versions are used to evolve the message formats: |
| * |
| * key version 0: [transactionalId] |
| * -> value version 0: [producer_id, producer_epoch, expire_timestamp, status, [topic, [partition] ], timestamp] |
| */ |
| object TransactionLog { |
| |
| // enforce always using |
| // 1. cleanup policy = compact |
| // 2. compression = none |
| // 3. unclean leader election = disabled |
| // 4. required acks = -1 when writing |
| val EnforcedCompressionType: CompressionType = CompressionType.NONE |
| val EnforcedRequiredAcks: Short = (-1).toShort |
| |
| /** |
| * Generates the bytes for transaction log message key |
| * |
| * @return key bytes |
| */ |
| private[transaction] def keyToBytes(transactionalId: String): Array[Byte] = { |
| MessageUtil.toVersionPrefixedBytes(TransactionLogKey.HIGHEST_SUPPORTED_VERSION, |
| new TransactionLogKey().setTransactionalId(transactionalId)) |
| } |
| |
| /** |
| * Generates the payload bytes for transaction log message value |
| * |
| * @return value payload bytes |
| */ |
| private[transaction] def valueToBytes(txnMetadata: TxnTransitMetadata): Array[Byte] = { |
| if (txnMetadata.txnState == Empty && txnMetadata.topicPartitions.nonEmpty) |
| throw new IllegalStateException(s"Transaction is not expected to have any partitions since its state is ${txnMetadata.txnState}: $txnMetadata") |
| |
| val transactionPartitions = if (txnMetadata.txnState == Empty) null |
| else txnMetadata.topicPartitions |
| .groupBy(_.topic) |
| .map { case (topic, partitions) => |
| new TransactionLogValue.PartitionsSchema() |
| .setTopic(topic) |
| .setPartitionIds(partitions.map(tp => Integer.valueOf(tp.partition)).toList.asJava) |
| }.toList.asJava |
| |
| // Serialize with the highest supported non-flexible version |
| // until a tagged field is introduced or the version is bumped. |
| MessageUtil.toVersionPrefixedBytes(0, |
| new TransactionLogValue() |
| .setProducerId(txnMetadata.producerId) |
| .setProducerEpoch(txnMetadata.producerEpoch) |
| .setTransactionTimeoutMs(txnMetadata.txnTimeoutMs) |
| .setTransactionStatus(txnMetadata.txnState.id) |
| .setTransactionLastUpdateTimestampMs(txnMetadata.txnLastUpdateTimestamp) |
| .setTransactionStartTimestampMs(txnMetadata.txnStartTimestamp) |
| .setTransactionPartitions(transactionPartitions)) |
| } |
| |
| /** |
| * Decodes the transaction log messages' key |
| * |
| * @return the key |
| */ |
| def readTxnRecordKey(buffer: ByteBuffer): BaseKey = { |
| val version = buffer.getShort |
| if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION && version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { |
| val value = new TransactionLogKey(new ByteBufferAccessor(buffer), version) |
| TxnKey( |
| version = version, |
| transactionalId = value.transactionalId |
| ) |
| } else { |
| UnknownKey(version) |
| } |
| } |
| |
| /** |
| * Decodes the transaction log messages' payload and retrieves the transaction metadata from it |
| * |
| * @return a transaction metadata object from the message |
| */ |
| def readTxnRecordValue(transactionalId: String, buffer: ByteBuffer): Option[TransactionMetadata] = { |
| // tombstone |
| if (buffer == null) None |
| else { |
| val version = buffer.getShort |
| if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) { |
| val value = new TransactionLogValue(new ByteBufferAccessor(buffer), version) |
| val transactionMetadata = new TransactionMetadata( |
| transactionalId = transactionalId, |
| producerId = value.producerId, |
| lastProducerId = RecordBatch.NO_PRODUCER_ID, |
| producerEpoch = value.producerEpoch, |
| lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH, |
| txnTimeoutMs = value.transactionTimeoutMs, |
| state = TransactionState.fromId(value.transactionStatus), |
| topicPartitions = mutable.Set.empty[TopicPartition], |
| txnStartTimestamp = value.transactionStartTimestampMs, |
| txnLastUpdateTimestamp = value.transactionLastUpdateTimestampMs) |
| |
| if (!transactionMetadata.state.equals(Empty)) |
| value.transactionPartitions.forEach(partitionsSchema => |
| transactionMetadata.addPartitions(partitionsSchema.partitionIds |
| .asScala |
| .map(partitionId => new TopicPartition(partitionsSchema.topic, partitionId)) |
| .toSet) |
| ) |
| Some(transactionMetadata) |
| } else throw new IllegalStateException(s"Unknown version $version from the transaction log message value") |
| } |
| } |
| |
| // Formatter for use with tools to read transaction log messages |
| class TransactionLogMessageFormatter extends MessageFormatter { |
| def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { |
| Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { |
| case txnKey: TxnKey => |
| val transactionalId = txnKey.transactionalId |
| val value = consumerRecord.value |
| val producerIdMetadata = if (value == null) |
| None |
| else |
| readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) |
| output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) |
| output.write("::".getBytes(StandardCharsets.UTF_8)) |
| output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) |
| output.write("\n".getBytes(StandardCharsets.UTF_8)) |
| |
| case unknownKey: UnknownKey => |
| output.write(s"unknown::version=${unknownKey.version}\n".getBytes(StandardCharsets.UTF_8)) |
| } |
| } |
| } |
| |
| /** |
| * Exposed for printing records using [[kafka.tools.DumpLogSegments]] |
| */ |
| def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = { |
| TransactionLog.readTxnRecordKey(record.key) match { |
| case txnKey: TxnKey => |
| val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}" |
| |
| val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match { |
| case None => "<DELETE>" |
| |
| case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," + |
| s"producerEpoch:${txnMetadata.producerEpoch}," + |
| s"state=${txnMetadata.state}," + |
| s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," + |
| s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + |
| s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" |
| } |
| |
| (Some(keyString), Some(valueString)) |
| |
| case unknownKey: UnknownKey => |
| (Some(s"unknown::version=${unknownKey.version}"), None) |
| } |
| } |
| |
| } |
| |
| sealed trait BaseKey{ |
| def version: Short |
| def transactionalId: String |
| } |
| |
| case class TxnKey(version: Short, transactionalId: String) extends BaseKey { |
| override def toString: String = transactionalId |
| } |
| |
| case class UnknownKey(version: Short) extends BaseKey { |
| override def transactionalId: String = null |
| override def toString: String = transactionalId |
| } |
| |