KAFKA-10438: Lazy initialization of record header to reduce memory usage (#9223)
There are no checks on the header key so instantiating key (bytes to string) is unnecessary.
One implication is that conversion failures will be detected a bit later, but this is consistent
with how we handle the header value.
**JMH RESULT**
1. ops: +12%
1. The optimization of memory usage is very small as the cost of creating extra ```ByteBuffer``` is
almost same to byte array copy (used to construct ```String```). Using large key results in better
improvement but I don't think large key is common case.
**BEFORE**
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2035938.174 ± 1653.566 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2040.000 ± 0.001 B/op
```
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 1979193.376 ± 1239.286 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 2120.000 ± 0.001 B/op
```
**AFTER**
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2289115.973 ± 2661.856 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 10 200 5 1000 2 thrpt 15 2032.000 ± 0.001 B/op
```
```
Benchmark (bufferSupplierStr) (bytes) (compressionType) (headerKeySize) (maxBatchSize) (maxHeaderSize) (messageSize) (messageVersion) Mode Cnt Score Error Units
RecordBatchIterationBenchmark.measureValidation NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 2222625.706 ± 908.358 ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm NO_CACHING RANDOM NONE 30 200 5 1000 2 thrpt 15 2040.000 ± 0.001 B/op
```
Reviewers: Ismael Juma <ismael@juma.me.uk>
diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
index 3b73c93..d042494 100644
--- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java
@@ -24,7 +24,8 @@
import org.apache.kafka.common.utils.Utils;
public class RecordHeader implements Header {
- private final String key;
+ private ByteBuffer keyBuffer;
+ private String key;
private ByteBuffer valueBuffer;
private byte[] value;
@@ -34,13 +35,16 @@
this.value = value;
}
- public RecordHeader(String key, ByteBuffer valueBuffer) {
- Objects.requireNonNull(key, "Null header keys are not permitted");
- this.key = key;
+ public RecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+ this.keyBuffer = Objects.requireNonNull(keyBuffer, "Null header keys are not permitted");
this.valueBuffer = valueBuffer;
}
public String key() {
+ if (key == null) {
+ key = Utils.utf8(keyBuffer, keyBuffer.remaining());
+ keyBuffer = null;
+ }
return key;
}
@@ -60,20 +64,20 @@
return false;
RecordHeader header = (RecordHeader) o;
- return Objects.equals(key, header.key) &&
+ return Objects.equals(key(), header.key()) &&
Arrays.equals(value(), header.value());
}
@Override
public int hashCode() {
- int result = key != null ? key.hashCode() : 0;
+ int result = key() != null ? key().hashCode() : 0;
result = 31 * result + Arrays.hashCode(value());
return result;
}
@Override
public String toString() {
- return "RecordHeader(key = " + key + ", value = " + Arrays.toString(value()) + ")";
+ return "RecordHeader(key = " + key() + ", value = " + Arrays.toString(value()) + ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 976b556..665ad54 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -535,7 +535,8 @@
if (headerKeySize < 0)
throw new InvalidRecordException("Invalid negative header key size " + headerKeySize);
- String headerKey = Utils.utf8(buffer, headerKeySize);
+ ByteBuffer headerKeyBuffer = buffer.slice();
+ headerKeyBuffer.limit(headerKeySize);
buffer.position(buffer.position() + headerKeySize);
ByteBuffer headerValue = null;
@@ -546,7 +547,7 @@
buffer.position(buffer.position() + headerValueSize);
}
- headers[i] = new RecordHeader(headerKey, headerValue);
+ headers[i] = new RecordHeader(headerKeyBuffer, headerValue);
}
return headers;
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 822b3b9..11854dc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -270,7 +270,7 @@
DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null);
}
- @Test(expected = StringIndexOutOfBoundsException.class)
+ @Test(expected = InvalidRecordException.class)
public void testInvalidHeaderKey() {
byte attributes = 0;
long timestampDelta = 2;
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 8e17446..48b9f57 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -262,7 +262,7 @@
recordConversionStats = recordConversionStats)
}
- private def assignOffsetsNonCompressed(records: MemoryRecords,
+ def assignOffsetsNonCompressed(records: MemoryRecords,
topicPartition: TopicPartition,
offsetCounter: LongRef,
now: Long,
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
new file mode 100644
index 0000000..834652e
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/BaseRecordBatchBenchmark.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.server.BrokerTopicStats;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
+
+@State(Scope.Benchmark)
+public abstract class BaseRecordBatchBenchmark {
+ private static final int MAX_HEADER_SIZE = 5;
+ private static final int HEADER_KEY_SIZE = 30;
+
+ private final Random random = new Random(0);
+
+ final int batchCount = 100;
+
+ public enum Bytes {
+ RANDOM, ONES
+ }
+
+ @Param(value = {"1", "2", "10", "50", "200", "500"})
+ private int maxBatchSize = 200;
+
+ @Param(value = {"1", "2"})
+ byte messageVersion = CURRENT_MAGIC_VALUE;
+
+ @Param(value = {"100", "1000", "10000", "100000"})
+ private int messageSize = 1000;
+
+ @Param(value = {"RANDOM", "ONES"})
+ private Bytes bytes = Bytes.RANDOM;
+
+ @Param(value = {"NO_CACHING", "CREATE"})
+ private String bufferSupplierStr = "NO_CACHING";
+
+ // zero starting offset is much faster for v1 batches, but that will almost never happen
+ int startingOffset;
+
+ // Used by measureSingleMessage
+ ByteBuffer singleBatchBuffer;
+
+ // Used by measureVariableBatchSize
+ ByteBuffer[] batchBuffers;
+ BufferSupplier bufferSupplier;
+ final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+
+ @Setup
+ public void init() {
+ // For v0 batches a zero starting offset is much faster but that will almost never happen.
+ // For v2 batches we use starting offset = 0 as these batches are relative to the base
+ // offset and measureValidation will mutate these batches between iterations
+ startingOffset = messageVersion == 2 ? 0 : 42;
+
+ if (bufferSupplierStr.equals("NO_CACHING")) {
+ bufferSupplier = BufferSupplier.NO_CACHING;
+ } else if (bufferSupplierStr.equals("CREATE")) {
+ bufferSupplier = BufferSupplier.create();
+ } else {
+ throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
+ }
+ singleBatchBuffer = createBatch(1);
+
+ batchBuffers = new ByteBuffer[batchCount];
+ for (int i = 0; i < batchCount; ++i) {
+ int size = random.nextInt(maxBatchSize) + 1;
+ batchBuffers[i] = createBatch(size);
+ }
+ }
+
+ private static Header[] createHeaders() {
+ char[] headerChars = new char[HEADER_KEY_SIZE];
+ Arrays.fill(headerChars, 'a');
+ String headerKey = new String(headerChars);
+ byte[] headerValue = new byte[0];
+ return IntStream.range(0, MAX_HEADER_SIZE).mapToObj(index -> new Header() {
+ @Override
+ public String key() {
+ return headerKey;
+ }
+
+ @Override
+ public byte[] value() {
+ return headerValue;
+ }
+ }).toArray(Header[]::new);
+ }
+
+ abstract CompressionType compressionType();
+
+ private ByteBuffer createBatch(int batchSize) {
+ // Magic v1 does not support record headers
+ Header[] headers = messageVersion < RecordBatch.MAGIC_VALUE_V2 ? Record.EMPTY_HEADERS : createHeaders();
+ byte[] value = new byte[messageSize];
+ final ByteBuffer buf = ByteBuffer.allocate(
+ AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType(), new byte[0], value,
+ headers) * batchSize
+ );
+
+ final MemoryRecordsBuilder builder =
+ MemoryRecords.builder(buf, messageVersion, compressionType(), TimestampType.CREATE_TIME, startingOffset);
+
+ for (int i = 0; i < batchSize; ++i) {
+ switch (bytes) {
+ case ONES:
+ Arrays.fill(value, (byte) 1);
+ break;
+ case RANDOM:
+ random.nextBytes(value);
+ break;
+ }
+
+ builder.append(0, null, value, headers);
+ }
+ return builder.build().buffer();
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
new file mode 100644
index 0000000..f176e06
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/CompressedRecordBatchValidationBenchmark.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.api.ApiVersion;
+import kafka.common.LongRef;
+import kafka.log.AppendOrigin;
+import kafka.log.LogValidator;
+import kafka.message.CompressionCodec;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Time;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+public class CompressedRecordBatchValidationBenchmark extends BaseRecordBatchBenchmark {
+
+ @Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD"})
+ private CompressionType compressionType = CompressionType.LZ4;
+
+ @Override
+ CompressionType compressionType() {
+ return compressionType;
+ }
+
+ @Benchmark
+ public void measureValidateMessagesAndAssignOffsetsCompressed(Blackhole bh) {
+ MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
+ LogValidator.validateMessagesAndAssignOffsetsCompressed(records, new TopicPartition("a", 0),
+ new LongRef(startingOffset), Time.SYSTEM, System.currentTimeMillis(),
+ CompressionCodec.getCompressionCodec(compressionType.id),
+ CompressionCodec.getCompressionCodec(compressionType.id),
+ false, messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
+ new AppendOrigin.Client$(),
+ ApiVersion.latestVersion(),
+ brokerTopicStats);
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
index d7f2a04..8aaa2d5 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
@@ -16,148 +16,36 @@
*/
package org.apache.kafka.jmh.record;
-import kafka.api.ApiVersion;
-import kafka.common.LongRef;
-import kafka.log.AppendOrigin;
-import kafka.log.LogValidator;
-import kafka.message.CompressionCodec;
-import kafka.server.BrokerTopicStats;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.AbstractRecords;
-import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CloseableIterator;
-import org.apache.kafka.common.utils.Time;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
-
-import static org.apache.kafka.common.record.RecordBatch.CURRENT_MAGIC_VALUE;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
-public class RecordBatchIterationBenchmark {
-
- private final Random random = new Random(0);
- private final int batchCount = 100;
-
- public enum Bytes {
- RANDOM, ONES
- }
-
- @Param(value = {"1", "2", "10", "50", "200", "500"})
- private int maxBatchSize = 200;
+public class RecordBatchIterationBenchmark extends BaseRecordBatchBenchmark {
@Param(value = {"LZ4", "SNAPPY", "GZIP", "ZSTD", "NONE"})
private CompressionType compressionType = CompressionType.NONE;
- @Param(value = {"1", "2"})
- private byte messageVersion = CURRENT_MAGIC_VALUE;
-
- @Param(value = {"100", "1000", "10000", "100000"})
- private int messageSize = 1000;
-
- @Param(value = {"RANDOM", "ONES"})
- private Bytes bytes = Bytes.RANDOM;
-
- @Param(value = {"NO_CACHING", "CREATE"})
- private String bufferSupplierStr;
-
- // zero starting offset is much faster for v1 batches, but that will almost never happen
- private int startingOffset;
-
- // Used by measureSingleMessage
- private ByteBuffer singleBatchBuffer;
-
- // Used by measureVariableBatchSize
- private ByteBuffer[] batchBuffers;
- private int[] batchSizes;
- private BufferSupplier bufferSupplier;
- private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
-
- @Setup
- public void init() {
- brokerTopicStats = new BrokerTopicStats();
-
- // For v0 batches a zero starting offset is much faster but that will almost never happen.
- // For v2 batches we use starting offset = 0 as these batches are relative to the base
- // offset and measureValidation will mutate these batches between iterations
- startingOffset = messageVersion == 2 ? 0 : 42;
-
- if (bufferSupplierStr.equals("NO_CACHING")) {
- bufferSupplier = BufferSupplier.NO_CACHING;
- } else if (bufferSupplierStr.equals("CREATE")) {
- bufferSupplier = BufferSupplier.create();
- } else {
- throw new IllegalArgumentException("Unsupported buffer supplier " + bufferSupplierStr);
- }
- singleBatchBuffer = createBatch(1);
-
- batchBuffers = new ByteBuffer[batchCount];
- batchSizes = new int[batchCount];
- for (int i = 0; i < batchCount; ++i) {
- int size = random.nextInt(maxBatchSize) + 1;
- batchBuffers[i] = createBatch(size);
- batchSizes[i] = size;
- }
- }
-
- private ByteBuffer createBatch(int batchSize) {
- byte[] value = new byte[messageSize];
- final ByteBuffer buf = ByteBuffer.allocate(
- AbstractRecords.estimateSizeInBytesUpperBound(messageVersion, compressionType, new byte[0], value,
- Record.EMPTY_HEADERS) * batchSize
- );
-
- final MemoryRecordsBuilder builder =
- MemoryRecords.builder(buf, messageVersion, compressionType, TimestampType.CREATE_TIME, startingOffset);
-
- for (int i = 0; i < batchSize; ++i) {
- switch (bytes) {
- case ONES:
- Arrays.fill(value, (byte) 1);
- break;
- case RANDOM:
- random.nextBytes(value);
- break;
- }
-
- builder.append(0, null, value);
- }
- return builder.build().buffer();
- }
-
- @Benchmark
- public void measureValidation(Blackhole bh) throws IOException {
- MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
- LogValidator.validateMessagesAndAssignOffsetsCompressed(records, new TopicPartition("a", 0),
- new LongRef(startingOffset), Time.SYSTEM, System.currentTimeMillis(),
- CompressionCodec.getCompressionCodec(compressionType.id),
- CompressionCodec.getCompressionCodec(compressionType.id),
- false, messageVersion, TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
- new AppendOrigin.Client$(),
- ApiVersion.latestVersion(),
- brokerTopicStats);
+ @Override
+ CompressionType compressionType() {
+ return compressionType;
}
@Benchmark
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java
new file mode 100644
index 0000000..001837e
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/UncompressedRecordBatchValidationBenchmark.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.record;
+
+import kafka.common.LongRef;
+import kafka.log.AppendOrigin;
+import kafka.log.LogValidator;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.TimestampType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+public class UncompressedRecordBatchValidationBenchmark extends BaseRecordBatchBenchmark {
+
+ @Override
+ CompressionType compressionType() {
+ return CompressionType.NONE;
+ }
+
+ @Benchmark
+ public void measureAssignOffsetsNonCompressed(Blackhole bh) {
+ MemoryRecords records = MemoryRecords.readableRecords(singleBatchBuffer.duplicate());
+ LogValidator.assignOffsetsNonCompressed(records, new TopicPartition("a", 0),
+ new LongRef(startingOffset), System.currentTimeMillis(), false,
+ TimestampType.CREATE_TIME, Long.MAX_VALUE, 0,
+ new AppendOrigin.Client$(), messageVersion, brokerTopicStats);
+ }
+}