KAFKA-10438: Lazy initialization of record header to reduce memory usage (#9223)

There are no checks on the header key so instantiating key (bytes to string) is unnecessary.
One implication is that conversion failures will be detected a bit later, but this is consistent
with how we handle the header value.

**JMH RESULT**

1. ops: +12%
1. The optimization of memory usage is very small as the cost of creating extra ```ByteBuffer``` is
almost same to byte array copy (used to construct ```String```). Using large key results in better
improvement but I don't think large key is common case.

**BEFORE**
```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15  2035938.174 ± 1653.566   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15     2040.000 ±    0.001    B/op
```

```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15  1979193.376 ± 1239.286   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15     2120.000 ±    0.001    B/op
```


**AFTER**

```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15  2289115.973 ± 2661.856   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15     2032.000 ±    0.001    B/op
```

```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score     Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15  2222625.706 ± 908.358   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15     2040.000 ±   0.001    B/op
```

Reviewers: Ismael Juma <ismael@juma.me.uk>
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
index 3b73c93..d042494 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
@@ -24,7 +24,8 @@
 import org.apache.kafka.common.utils.Utils;
 
 public class RecordHeader implements Header {
-    private final String key;
+    private ByteBuffer keyBuffer;
+    private String key;
     private ByteBuffer valueBuffer;
     private byte[] value;
 
@@ -34,13 +35,16 @@
         this.value = value;
     }
 
-    public RecordHeader(String key, ByteBuffer valueBuffer) {
-        Objects.requireNonNull(key, "Null header keys are not permitted");
-        this.key = key;
+    public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+        this.keyBuffer = Objects.requireNonNull(keyBuffer, "Null header keys are not permitted");
         this.valueBuffer = valueBuffer;
     }
     
     public String key() {
+        if (key == null) {
+            key = Utils.utf8(keyBuffer, keyBuffer.remaining());
+            keyBuffer = null;
+        }
         return key;
     }
 
@@ -60,20 +64,20 @@
             return false;
 
         RecordHeader header = (RecordHeader) o;
-        return Objects.equals(key, header.key) &&
+        return Objects.equals(key(), header.key()) &&
                Arrays.equals(value(), header.value());
     }
 
     @Override
     public int hashCode() {
-        int result = key != null ? key.hashCode() : 0;
+        int result = key() != null ? key().hashCode() : 0;
         result = 31 * result + Arrays.hashCode(value());
         return result;
     }
 
     @Override
     public String toString() {
-        return "RecordHeader(key = " + key + ", value = " + Arrays.toString(value()) + ")";
+        return "RecordHeader(key = " + key() + ", value = " + Arrays.toString(value()) + ")";
     }
 
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 976b556..665ad54 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -535,7 +535,8 @@
             if (headerKeySize < 0)
                 throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
 
-            String headerKey = Utils.utf8(buffer, headerKeySize);
+            ByteBuffer headerKeyBuffer = buffer.slice();
+            headerKeyBuffer.limit(headerKeySize);
             buffer.position(buffer.position() + headerKeySize);
 
             ByteBuffer headerValue = null;
@@ -546,7 +547,7 @@
                 buffer.position(buffer.position() + headerValueSize);
             }
 
-            headers[i] = new RecordHeader(headerKey, headerValue);
+            headers[i] = new RecordHeader(headerKeyBuffer, headerValue);
         }
 
         return headers;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 822b3b9..11854dc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -270,7 +270,7 @@
         DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
     }
 
-    @Test(expected = StringIndexOutOfBoundsException.class)
+    @Test(expected = InvalidRecordException.class)
     public void testInvalidHeaderKey() {
         byte attributes = 0;
         long timestampDelta = 2;
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 8e17446..48b9f57 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -262,7 +262,7 @@
       recordConversionStats = recordConversionStats)
   }
 
-  private def assignOffsetsNonCompressed(records: MemoryRecords,
+  def assignOffsetsNonCompressed(records: MemoryRecords,
                                          topicPartition: TopicPartition,
                                          offsetCounter: LongRef,
                                          now: Long,
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
new file mode 100644
index 0000000..834652e
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.server.BrokerTopicStats;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
+
+@State(Scope.Benchmark)
+public abstract class BaseRecordBatchBenchmark {
+    private static final int MAX_HEADER_SIZE = 5;
+    private static final int HEADER_KEY_SIZE = 30;
+
+    private final Random random = new Random(0);
+
+    final int batchCount = 100;
+
+    public enum Bytes {
+        RANDOM, ONES
+    }
+
+    @Param(value = {"1", "2", "10", "50", "200", "500"})
+    private int maxBatchSize = 200;
+
+    @Param(value = {"1", "2"})
+    byte messageVersion = CURRENT_MAGIC_VALUE;
+
+    @Param(value = {"100", "1000", "10000", "100000"})
+    private int messageSize = 1000;
+
+    @Param(value = {"RANDOM", "ONES"})
+    private Bytes bytes = Bytes.RANDOM;
+
+    @Param(value = {"NO_CACHING", "CREATE"})
+    private String bufferSupplierStr = "NO_CACHING";
+
+    // zero starting offset is much faster for v1 batches, but that will almost never happen
+    int startingOffset;
+
+    // Used by measureSingleMessage
+    ByteBuffer singleBatchBuffer;
+
+    // Used by measureVariableBatchSize
+    ByteBuffer[] batchBuffers;
+    BufferSupplier bufferSupplier;
+    final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+
+    @Setup
+    public void init() {
+        // For v0 batches a zero starting offset is much faster but that will almost never happen.
+        // For v2 batches we use starting offset = 0 as these batches are relative to the base
+        // offset and measureValidation will mutate these batches between iterations
+        startingOffset = messageVersion == 2 ? 0 : 42;
+
+        if (bufferSupplierStr.equals("NO_CACHING")) {
+            bufferSupplier = BufferSupplier.NO_CACHING;
+        } else if (bufferSupplierStr.equals("CREATE")) {
+            bufferSupplier = BufferSupplier.create();
+        } else {
+            throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
+        }
+        singleBatchBuffer = createBatch(1);
+
+        batchBuffers = new ByteBuffer[batchCount];
+        for (int i = 0; i < batchCount; ++i) {
+            int size = random.nextInt(maxBatchSize) + 1;
+            batchBuffers[i] = createBatch(size);
+        }
+    }
+
+    private static Header[] createHeaders() {
+        char[] headerChars = new char[HEADER_KEY_SIZE];
+        Arrays.fill(headerChars, 'a');
+        String headerKey = new String(headerChars);
+        byte[] headerValue = new byte[0];
+        return IntStream.range(0, MAX_HEADER_SIZE).mapToObj(index -> new Header() {
+            @Override
+            public String key() {
+                return headerKey;
+            }
+
+            @Override
+            public byte[] value() {
+                return headerValue;
+            }
+        }).toArray(Header[]::new);
+    }
+
+    abstract CompressionType compressionType();
+
+    private ByteBuffer createBatch(int batchSize) {
+        // Magic v1 does not support record headers
+        Header[] headers = messageVersion < RecordBatch.MAGIC_VALUE_V2 ? Record.EMPTY_HEADERS : createHeaders();
+        byte[] value = new byte[messageSize];
+        final ByteBuffer buf = ByteBuffer.allocate(
+            AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType(), new byte[0], value,
+                    headers) * batchSize
+        );
+
+        final MemoryRecordsBuilder builder =
+            MemoryRecords.builder(buf, messageVersion, compressionType(), TimestampType.CREATE_TIME, startingOffset);
+
+        for (int i = 0; i < batchSize; ++i) {
+            switch (bytes) {
+                case ONES:
+                    Arrays.fill(value, (byte) 1);
+                    break;
+                case RANDOM:
+                    random.nextBytes(value);
+                    break;
+            }
+
+            builder.append(0, null, value, headers);
+        }
+        return builder.build().buffer();
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
new file mode 100644
index 0000000..f176e06
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.api.ApiVersion;
+import kafka.common.LongRef;
+import kafka.log.AppendOrigin;
+import kafka.log.LogValidator;
+import kafka.message.CompressionCodec;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Time;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBenchmark {
+
+    @Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD"})
+    private CompressionType compressionType = CompressionType.LZ4;
+
+    @Override
+    CompressionType compressionType() {
+        return compressionType;
+    }
+
+    @Benchmark
+    public void measureValidateMessagesAndAssignOffsetsCompressed(Blackhole bh) {
+        MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
+        LogValidator.validateMessagesAndAssignOffsetsCompressed(records, new TopicPartition("a", 0),
+                new LongRef(startingOffset), Time.SYSTEM, System.currentTimeMillis(),
+                CompressionCodec.getCompressionCodec(compressionType.id),
+                CompressionCodec.getCompressionCodec(compressionType.id),
+                false,  messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
+                new AppendOrigin.Client$(),
+                ApiVersion.latestVersion(),
+                brokerTopicStats);
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
index d7f2a04..8aaa2d5 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -16,148 +16,36 @@
  */
 package org.apache.kafka.jmh.record;
 
-import kafka.api.ApiVersion;
-import kafka.common.LongRef;
-import kafka.log.AppendOrigin;
-import kafka.log.LogValidator;
-import kafka.message.CompressionCodec;
-import kafka.server.BrokerTopicStats;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.record.BufferSupplier;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.Time;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
 @Warmup(iterations = 5)
 @Measurement(iterations = 15)
-public class RecordBatchIterationBenchmark {
-
-    private final Random random = new Random(0);
-    private final int batchCount = 100;
-
-    public enum Bytes {
-        RANDOM, ONES
-    }
-
-    @Param(value = {"1", "2", "10", "50", "200", "500"})
-    private int maxBatchSize = 200;
+public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
 
     @Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD", "NONE"})
     private CompressionType compressionType = CompressionType.NONE;
 
-    @Param(value = {"1", "2"})
-    private byte messageVersion = CURRENT_MAGIC_VALUE;
-
-    @Param(value = {"100", "1000", "10000", "100000"})
-    private int messageSize = 1000;
-
-    @Param(value = {"RANDOM", "ONES"})
-    private Bytes bytes = Bytes.RANDOM;
-
-    @Param(value = {"NO_CACHING", "CREATE"})
-    private String bufferSupplierStr;
-
-    // zero starting offset is much faster for v1 batches, but that will almost never happen
-    private int startingOffset;
-
-    // Used by measureSingleMessage
-    private ByteBuffer singleBatchBuffer;
-
-    // Used by measureVariableBatchSize
-    private ByteBuffer[] batchBuffers;
-    private int[] batchSizes;
-    private BufferSupplier bufferSupplier;
-    private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
-
-    @Setup
-    public void init() {
-        brokerTopicStats = new BrokerTopicStats();
-
-        // For v0 batches a zero starting offset is much faster but that will almost never happen.
-        // For v2 batches we use starting offset = 0 as these batches are relative to the base
-        // offset and measureValidation will mutate these batches between iterations
-        startingOffset = messageVersion == 2 ? 0 : 42;
-
-        if (bufferSupplierStr.equals("NO_CACHING")) {
-            bufferSupplier = BufferSupplier.NO_CACHING;
-        } else if (bufferSupplierStr.equals("CREATE")) {
-            bufferSupplier = BufferSupplier.create();
-        } else {
-            throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
-        }
-        singleBatchBuffer = createBatch(1);
-
-        batchBuffers = new ByteBuffer[batchCount];
-        batchSizes = new int[batchCount];
-        for (int i = 0; i < batchCount; ++i) {
-            int size = random.nextInt(maxBatchSize) + 1;
-            batchBuffers[i] = createBatch(size);
-            batchSizes[i] = size;
-        }
-    }
-
-    private ByteBuffer createBatch(int batchSize) {
-        byte[] value = new byte[messageSize];
-        final ByteBuffer buf = ByteBuffer.allocate(
-            AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType, new byte[0], value,
-                    Record.EMPTY_HEADERS) * batchSize
-        );
-
-        final MemoryRecordsBuilder builder =
-            MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset);
-
-        for (int i = 0; i < batchSize; ++i) {
-            switch (bytes) {
-                case ONES:
-                    Arrays.fill(value, (byte) 1);
-                    break;
-                case RANDOM:
-                    random.nextBytes(value);
-                    break;
-            }
-
-            builder.append(0, null, value);
-        }
-        return builder.build().buffer();
-    }
-
-    @Benchmark
-    public void measureValidation(Blackhole bh) throws IOException {
-        MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
-        LogValidator.validateMessagesAndAssignOffsetsCompressed(records, new TopicPartition("a", 0),
-                new LongRef(startingOffset), Time.SYSTEM, System.currentTimeMillis(),
-                CompressionCodec.getCompressionCodec(compressionType.id),
-                CompressionCodec.getCompressionCodec(compressionType.id),
-                false,  messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
-                new AppendOrigin.Client$(),
-                ApiVersion.latestVersion(),
-                brokerTopicStats);
+    @Override
+    CompressionType compressionType() {
+        return compressionType;
     }
 
     @Benchmark
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java
new file mode 100644
index 0000000..001837e
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.common.LongRef;
+import kafka.log.AppendOrigin;
+import kafka.log.LogValidator;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.TimestampType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+public class UncompressedRecordBatchValidationBenchmark extends BaseRecordBatchBenchmark {
+
+    @Override
+    CompressionType compressionType() {
+        return CompressionType.NONE;
+    }
+
+    @Benchmark
+    public void measureAssignOffsetsNonCompressed(Blackhole bh) {
+        MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
+        LogValidator.assignOffsetsNonCompressed(records, new TopicPartition("a", 0),
+                new LongRef(startingOffset), System.currentTimeMillis(), false,
+                TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
+                new AppendOrigin.Client$(), messageVersion, brokerTopicStats);
+    }
+}