blob: 522915f064034a48d7b82076dddd6e5f3622c726 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Supplier;
import java.util.List;
import java.util.Random;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@RunWith(value = Parameterized.class)
public class MemoryRecordsBuilderTest {
private final CompressionType compressionType;
private final int bufferOffset;
private final Time time;
public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
this.bufferOffset = bufferOffset;
this.compressionType = compressionType;
this.time = Time.SYSTEM;
}
@Test
public void testWriteEmptyRecordSet() {
byte magic = RecordBatch.MAGIC_VALUE_V0;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
Supplier<MemoryRecordsBuilder> builderSupplier = () -> new MemoryRecordsBuilder(buffer, magic,
compressionType, TimestampType.CREATE_TIME, 0L, 0L,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
if (compressionType != CompressionType.ZSTD) {
MemoryRecords records = builderSupplier.get().build();
assertEquals(0, records.sizeInBytes());
assertEquals(bufferOffset, buffer.position());
} else {
Exception e = assertThrows(IllegalArgumentException.class, () -> builderSupplier.get().build());
assertEquals(e.getMessage(), "ZStandard compression is not supported for magic " + magic);
}
}
@Test
public void testWriteTransactionalRecordSet() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
MemoryRecords records = builder.build();
List<MutableRecordBatch> batches = Utils.toList(records.batches().iterator());
assertEquals(1, batches.size());
assertTrue(batches.get(0).isTransactional());
}
@Test(expected = IllegalArgumentException.class)
public void testWriteTransactionalNotAllowedMagicV0() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
public void testWriteTransactionalNotAllowedMagicV1() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
public void testWriteControlBatchNotAllowedMagicV0() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
public void testWriteControlBatchNotAllowedMagicV1() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
public void testWriteTransactionalWithInvalidPID() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = RecordBatch.NO_PRODUCER_ID;
short epoch = 15;
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.close();
}
@Test(expected = IllegalArgumentException.class)
public void testWriteIdempotentWithInvalidEpoch() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = RecordBatch.NO_PRODUCER_EPOCH;
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.close();
}
@Test(expected = IllegalArgumentException.class)
public void testWriteIdempotentWithInvalidBaseSequence() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = RecordBatch.NO_SEQUENCE;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.close();
}
@Test(expected = IllegalArgumentException.class)
public void testWriteEndTxnMarkerNonTransactionalBatch() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = RecordBatch.NO_SEQUENCE;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0));
}
@Test(expected = IllegalArgumentException.class)
public void testWriteEndTxnMarkerNonControlBatch() {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
long pid = 9809;
short epoch = 15;
int sequence = RecordBatch.NO_SEQUENCE;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0));
}
@Test
public void testCompressionRateV0() {
byte magic = RecordBatch.MAGIC_VALUE_V0;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
LegacyRecord[] records = new LegacyRecord[] {
LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()),
LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()),
LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()),
};
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
for (LegacyRecord record : records) {
uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
builder.append(record);
}
MemoryRecords built = builder.build();
if (compressionType == CompressionType.NONE) {
assertEquals(1.0, builder.compressionRatio(), 0.00001);
} else {
int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001);
}
}
@Test
public void testEstimatedSizeInBytes() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int previousEstimate = 0;
for (int i = 0; i < 10; i++) {
builder.append(new SimpleRecord(i, ("" + i).getBytes()));
int currentEstimate = builder.estimatedSizeInBytes();
assertTrue(currentEstimate > previousEstimate);
previousEstimate = currentEstimate;
}
int bytesWrittenBeforeClose = builder.estimatedSizeInBytes();
MemoryRecords records = builder.build();
assertEquals(records.sizeInBytes(), builder.estimatedSizeInBytes());
if (compressionType == CompressionType.NONE)
assertEquals(records.sizeInBytes(), bytesWrittenBeforeClose);
}
@Test
public void testCompressionRateV1() {
byte magic = RecordBatch.MAGIC_VALUE_V1;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
LegacyRecord[] records = new LegacyRecord[] {
LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()),
LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()),
LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()),
};
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
for (LegacyRecord record : records) {
uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
builder.append(record);
}
MemoryRecords built = builder.build();
if (compressionType == CompressionType.NONE) {
assertEquals(1.0, builder.compressionRatio(), 0.00001);
} else {
int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001);
}
}
@Test
public void buildUsingLogAppendTime() {
byte magic = RecordBatch.MAGIC_VALUE_V1;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(0L, "b".getBytes(), "2".getBytes());
builder.append(0L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(logAppendTime, info.maxTimestamp);
if (compressionType != CompressionType.NONE)
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
else
assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
for (RecordBatch batch : records.batches()) {
assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
for (Record record : batch)
assertEquals(logAppendTime, record.timestamp());
}
}
@Test
public void buildUsingCreateTime() {
byte magic = RecordBatch.MAGIC_VALUE_V1;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(2L, "b".getBytes(), "2".getBytes());
builder.append(1L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(2L, info.maxTimestamp);
if (compressionType == CompressionType.NONE)
assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
else
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
int i = 0;
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
for (RecordBatch batch : records.batches()) {
assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
for (Record record : batch)
assertEquals(expectedTimestamps[i++], record.timestamp());
}
}
@Test
public void testAppendedChecksumConsistency() {
assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V0);
assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1);
ByteBuffer buffer = ByteBuffer.allocate(512);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes());
MemoryRecords memoryRecords = builder.build();
List<Record> records = TestUtils.toList(memoryRecords.records());
assertEquals(1, records.size());
assertEquals(checksumOrNull, records.get(0).checksumOrNull());
}
}
@Test
public void testSmallWriteLimit() {
// with a small write limit, we always allow at least one record to be added
byte[] key = "foo".getBytes();
byte[] value = "bar".getBytes();
int writeLimit = 0;
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
assertFalse(builder.isFull());
assertTrue(builder.hasRoomFor(0L, key, value, Record.EMPTY_HEADERS));
builder.append(0L, key, value);
assertTrue(builder.isFull());
assertFalse(builder.hasRoomFor(0L, key, value, Record.EMPTY_HEADERS));
MemoryRecords memRecords = builder.build();
List<Record> records = TestUtils.toList(memRecords.records());
assertEquals(1, records.size());
Record record = records.get(0);
assertEquals(ByteBuffer.wrap(key), record.key());
assertEquals(ByteBuffer.wrap(value), record.value());
}
@Test
public void writePastLimit() {
byte magic = RecordBatch.MAGIC_VALUE_V1;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.setEstimatedCompressionRatio(0.5f);
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(1L, "b".getBytes(), "2".getBytes());
assertFalse(builder.hasRoomFor(2L, "c".getBytes(), "3".getBytes(), Record.EMPTY_HEADERS));
builder.append(2L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(2L, info.maxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
long i = 0L;
for (RecordBatch batch : records.batches()) {
assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
for (Record record : batch)
assertEquals(i++, record.timestamp());
}
}
@Test(expected = IllegalArgumentException.class)
public void testAppendAtInvalidOffset() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
// offsets must increase monotonically
builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null);
}
@Test
public void convertV2ToV1UsingMixedCreateAndLogAppendTime() {
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
compressionType, TimestampType.LOG_APPEND_TIME, 0L);
builder.append(10L, "1".getBytes(), "a".getBytes());
builder.close();
int sizeExcludingTxnMarkers = buffer.position();
MemoryRecords.writeEndTransactionalMarker(buffer, 1L, System.currentTimeMillis(), 0, 15L, (short) 0,
new EndTransactionMarker(ControlRecordType.ABORT, 0));
int position = buffer.position();
builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
TimestampType.CREATE_TIME, 1L);
builder.append(12L, "2".getBytes(), "b".getBytes());
builder.append(13L, "3".getBytes(), "c".getBytes());
builder.close();
sizeExcludingTxnMarkers += buffer.position() - position;
MemoryRecords.writeEndTransactionalMarker(buffer, 14L, System.currentTimeMillis(), 0, 1L, (short) 0,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
buffer.flip();
Supplier<ConvertedRecords<MemoryRecords>> convertedRecordsSupplier = () ->
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
if (compressionType != CompressionType.ZSTD) {
ConvertedRecords<MemoryRecords> convertedRecords = convertedRecordsSupplier.get();
MemoryRecords records = convertedRecords.records();
// Transactional markers are skipped when down converting to V1, so exclude them from size
verifyRecordsProcessingStats(convertedRecords.recordConversionStats(),
3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers);
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
assertEquals(2, batches.size());
assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
} else {
assertEquals(3, batches.size());
assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType());
}
List<Record> logRecords = Utils.toList(records.records().iterator());
assertEquals(3, logRecords.size());
assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
} else {
Exception e = assertThrows(UnsupportedCompressionTypeException.class, convertedRecordsSupplier::get);
assertEquals("Down-conversion of zstandard-compressed batches is not supported", e.getMessage());
}
}
@Test
public void convertToV1WithMixedV0AndV2Data() {
assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V0);
assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1);
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L);
builder.append(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "a".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
TimestampType.CREATE_TIME, 1L);
builder.append(11L, "2".getBytes(), "b".getBytes());
builder.append(12L, "3".getBytes(), "c".getBytes());
builder.close();
buffer.flip();
ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
.downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
MemoryRecords records = convertedRecords.records();
verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 2,
records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
assertEquals(2, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(1, batches.get(1).baseOffset());
} else {
assertEquals(3, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(1, batches.get(1).baseOffset());
assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(2).magic());
assertEquals(2, batches.get(2).baseOffset());
}
List<Record> logRecords = Utils.toList(records.records().iterator());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L, time);
records = convertedRecords.records();
batches = Utils.toList(records.batches().iterator());
logRecords = Utils.toList(records.records().iterator());
if (compressionType != CompressionType.NONE) {
assertEquals(2, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(1, batches.get(1).baseOffset());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 2,
records.sizeInBytes(), buffer.limit());
} else {
assertEquals(2, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(2, batches.get(1).baseOffset());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("3", utf8(logRecords.get(1).key()));
verifyRecordsProcessingStats(convertedRecords.recordConversionStats(), 3, 1,
records.sizeInBytes(), buffer.limit());
}
}
@Test
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() {
byte magic = RecordBatch.MAGIC_VALUE_V0;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.abort();
assertThrows(IllegalStateException.class, builder::build);
}
@Test
public void shouldResetBufferToInitialPositionOnAbort() {
byte magic = RecordBatch.MAGIC_VALUE_V0;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.abort();
assertEquals(bufferOffset, builder.buffer().position());
}
@Test
public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() {
byte magic = RecordBatch.MAGIC_VALUE_V0;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.abort();
try {
builder.close();
fail("Should have thrown IllegalStateException");
} catch (IllegalStateException e) {
// ok
}
}
@Test
public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() {
byte magic = RecordBatch.MAGIC_VALUE_V0;
assumeAtLeastV2OrNotZstd(magic);
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.abort();
try {
builder.append(0L, "a".getBytes(), "1".getBytes());
fail("Should have thrown IllegalStateException");
} catch (IllegalStateException e) {
// ok
}
}
@Parameterized.Parameters(name = "bufferOffset={0}, compression={1}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (int bufferOffset : Arrays.asList(0, 15))
for (CompressionType compressionType : CompressionType.values())
values.add(new Object[] {bufferOffset, compressionType});
return values;
}
@Test
public void testBuffersDereferencedOnClose() {
Runtime runtime = Runtime.getRuntime();
int payloadLen = 1024 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
byte[] key = new byte[0];
byte[] value = new byte[payloadLen];
new Random().nextBytes(value); // Use random payload so that compressed buffer is large
List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
long startMem = 0;
long memUsed = 0;
int iterations = 0;
while (iterations++ < 100) {
buffer.rewind();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
builder.append(1L, new byte[0], value);
builder.build();
builders.add(builder);
System.gc();
memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
// Ignore memory usage during initialization
if (iterations == 2)
startMem = memUsed;
else if (iterations > 2 && memUsed < (iterations - 2) * 1024)
break;
}
assertTrue("Memory usage too high: " + memUsed, iterations < 100);
}
private void verifyRecordsProcessingStats(RecordConversionStats processingStats, int numRecords,
int numRecordsConverted, long finalBytes, long preConvertedBytes) {
assertNotNull("Records processing info is null", processingStats);
assertEquals(numRecordsConverted, processingStats.numRecordsConverted());
// Since nanoTime accuracy on build machines may not be sufficient to measure small conversion times,
// only check if the value >= 0. Default is -1, so this checks if time has been recorded.
assertTrue("Processing time not recorded: " + processingStats, processingStats.conversionTimeNanos() >= 0);
long tempBytes = processingStats.temporaryMemoryBytes();
if (compressionType == CompressionType.NONE) {
if (numRecordsConverted == 0)
assertEquals(finalBytes, tempBytes);
else if (numRecordsConverted == numRecords)
assertEquals(preConvertedBytes + finalBytes, tempBytes);
else {
assertTrue(String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes),
tempBytes > finalBytes && tempBytes < finalBytes + preConvertedBytes);
}
} else {
long compressedBytes = finalBytes - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
assertTrue(String.format("Uncompressed size expected temp=%d, compressed=%d", tempBytes, compressedBytes),
tempBytes > compressedBytes);
}
}
private void assumeAtLeastV2OrNotZstd(byte magic) {
assumeTrue(compressionType != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2);
}
}