blob: 1cc4a81f476b9445242f1a6d667f218a6776596b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import static org.apache.kafka.common.record.DefaultRecordBatch.RECORDS_COUNT_OFFSET;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class DefaultRecordBatchTest {
@Test
public void testWriteEmptyHeader() {
long producerId = 23423L;
short producerEpoch = 145;
int baseSequence = 983;
long baseOffset = 15L;
long lastOffset = 37;
int partitionLeaderEpoch = 15;
long timestamp = System.currentTimeMillis();
for (TimestampType timestampType : Arrays.asList(TimestampType.CREATE_TIME, TimestampType.LOG_APPEND_TIME)) {
for (boolean isTransactional : Arrays.asList(true, false)) {
for (boolean isControlBatch : Arrays.asList(true, false)) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId,
producerEpoch, baseSequence, baseOffset, lastOffset, partitionLeaderEpoch, timestampType,
timestamp, isTransactional, isControlBatch);
buffer.flip();
DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
assertEquals(producerId, batch.producerId());
assertEquals(producerEpoch, batch.producerEpoch());
assertEquals(baseSequence, batch.baseSequence());
assertEquals(baseSequence + ((int) (lastOffset - baseOffset)), batch.lastSequence());
assertEquals(baseOffset, batch.baseOffset());
assertEquals(lastOffset, batch.lastOffset());
assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
assertEquals(isTransactional, batch.isTransactional());
assertEquals(timestampType, batch.timestampType());
assertEquals(timestamp, batch.maxTimestamp());
assertEquals(RecordBatch.NO_TIMESTAMP, batch.firstTimestamp());
assertEquals(isControlBatch, batch.isControlBatch());
}
}
}
}
@Test
public void buildDefaultRecordBatch() {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.CREATE_TIME, 1234567L);
builder.appendWithOffset(1234567, 1L, "a".getBytes(), "v".getBytes());
builder.appendWithOffset(1234568, 2L, "b".getBytes(), "v".getBytes());
MemoryRecords records = builder.build();
for (MutableRecordBatch batch : records.batches()) {
assertTrue(batch.isValid());
assertEquals(1234567, batch.baseOffset());
assertEquals(1234568, batch.lastOffset());
assertEquals(2L, batch.maxTimestamp());
assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
assertEquals(RecordBatch.NO_SEQUENCE, batch.lastSequence());
for (Record record : batch) {
assertTrue(record.isValid());
}
}
}
@Test
public void buildDefaultRecordBatchWithProducerId() {
long pid = 23423L;
short epoch = 145;
int baseSequence = 983;
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.CREATE_TIME, 1234567L, RecordBatch.NO_TIMESTAMP, pid, epoch, baseSequence);
builder.appendWithOffset(1234567, 1L, "a".getBytes(), "v".getBytes());
builder.appendWithOffset(1234568, 2L, "b".getBytes(), "v".getBytes());
MemoryRecords records = builder.build();
for (MutableRecordBatch batch : records.batches()) {
assertTrue(batch.isValid());
assertEquals(1234567, batch.baseOffset());
assertEquals(1234568, batch.lastOffset());
assertEquals(2L, batch.maxTimestamp());
assertEquals(pid, batch.producerId());
assertEquals(epoch, batch.producerEpoch());
assertEquals(baseSequence, batch.baseSequence());
assertEquals(baseSequence + 1, batch.lastSequence());
for (Record record : batch) {
assertTrue(record.isValid());
}
}
}
@Test
public void buildDefaultRecordBatchWithSequenceWrapAround() {
long pid = 23423L;
short epoch = 145;
int baseSequence = Integer.MAX_VALUE - 1;
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
TimestampType.CREATE_TIME, 1234567L, RecordBatch.NO_TIMESTAMP, pid, epoch, baseSequence);
builder.appendWithOffset(1234567, 1L, "a".getBytes(), "v".getBytes());
builder.appendWithOffset(1234568, 2L, "b".getBytes(), "v".getBytes());
builder.appendWithOffset(1234569, 3L, "c".getBytes(), "v".getBytes());
MemoryRecords records = builder.build();
List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
assertEquals(pid, batch.producerId());
assertEquals(epoch, batch.producerEpoch());
assertEquals(baseSequence, batch.baseSequence());
assertEquals(0, batch.lastSequence());
List<Record> allRecords = TestUtils.toList(batch);
assertEquals(3, allRecords.size());
assertEquals(Integer.MAX_VALUE - 1, allRecords.get(0).sequence());
assertEquals(Integer.MAX_VALUE, allRecords.get(1).sequence());
assertEquals(0, allRecords.get(2).sequence());
}
@Test
public void testSizeInBytes() {
Header[] headers = new Header[] {
new RecordHeader("foo", "value".getBytes()),
new RecordHeader("bar", (byte[]) null)
};
long timestamp = System.currentTimeMillis();
SimpleRecord[] records = new SimpleRecord[] {
new SimpleRecord(timestamp, "key".getBytes(), "value".getBytes()),
new SimpleRecord(timestamp + 30000, null, "value".getBytes()),
new SimpleRecord(timestamp + 60000, "key".getBytes(), null),
new SimpleRecord(timestamp + 60000, "key".getBytes(), "value".getBytes(), headers)
};
int actualSize = MemoryRecords.withRecords(CompressionType.NONE, records).sizeInBytes();
assertEquals(actualSize, DefaultRecordBatch.sizeInBytes(Arrays.asList(records)));
}
@Test(expected = InvalidRecordException.class)
public void testInvalidRecordSize() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
ByteBuffer buffer = records.buffer();
buffer.putInt(DefaultRecordBatch.LENGTH_OFFSET, 10);
DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
assertFalse(batch.isValid());
batch.ensureValid();
}
@Test(expected = InvalidRecordException.class)
public void testInvalidRecordCountTooManyNonCompressedV2() {
long now = System.currentTimeMillis();
DefaultRecordBatch batch = recordsWithInvalidRecordCount(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.NONE, 5);
// force iteration through the batch to execute validation
// batch validation is a part of normal workflow for LogValidator.validateMessagesAndAssignOffsets
for (Record record: batch) {
record.isValid();
}
}
@Test(expected = InvalidRecordException.class)
public void testInvalidRecordCountTooLittleNonCompressedV2() {
long now = System.currentTimeMillis();
DefaultRecordBatch batch = recordsWithInvalidRecordCount(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.NONE, 2);
// force iteration through the batch to execute validation
// batch validation is a part of normal workflow for LogValidator.validateMessagesAndAssignOffsets
for (Record record: batch) {
record.isValid();
}
}
@Test(expected = InvalidRecordException.class)
public void testInvalidRecordCountTooManyCompressedV2() {
long now = System.currentTimeMillis();
DefaultRecordBatch batch = recordsWithInvalidRecordCount(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP, 5);
// force iteration through the batch to execute validation
// batch validation is a part of normal workflow for LogValidator.validateMessagesAndAssignOffsets
for (Record record: batch) {
record.isValid();
}
}
@Test(expected = InvalidRecordException.class)
public void testInvalidRecordCountTooLittleCompressedV2() {
long now = System.currentTimeMillis();
DefaultRecordBatch batch = recordsWithInvalidRecordCount(RecordBatch.MAGIC_VALUE_V2, now, CompressionType.GZIP, 2);
// force iteration through the batch to execute validation
// batch validation is a part of normal workflow for LogValidator.validateMessagesAndAssignOffsets
for (Record record: batch) {
record.isValid();
}
}
@Test(expected = InvalidRecordException.class)
public void testInvalidCrc() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
ByteBuffer buffer = records.buffer();
buffer.putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, 23);
DefaultRecordBatch batch = new DefaultRecordBatch(buffer);
assertFalse(batch.isValid());
batch.ensureValid();
}
@Test
public void testSetLastOffset() {
SimpleRecord[] simpleRecords = new SimpleRecord[] {
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
};
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME, simpleRecords);
long lastOffset = 500L;
long firstOffset = lastOffset - simpleRecords.length + 1;
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
batch.setLastOffset(lastOffset);
assertEquals(lastOffset, batch.lastOffset());
assertEquals(firstOffset, batch.baseOffset());
assertTrue(batch.isValid());
List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
assertEquals(1, recordBatches.size());
assertEquals(lastOffset, recordBatches.get(0).lastOffset());
long offset = firstOffset;
for (Record record : records.records())
assertEquals(offset++, record.offset());
}
@Test
public void testSetPartitionLeaderEpoch() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
int leaderEpoch = 500;
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
batch.setPartitionLeaderEpoch(leaderEpoch);
assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
assertTrue(batch.isValid());
List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
assertEquals(1, recordBatches.size());
assertEquals(leaderEpoch, recordBatches.get(0).partitionLeaderEpoch());
}
@Test
public void testSetLogAppendTime() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
long logAppendTime = 15L;
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
batch.setMaxTimestamp(TimestampType.LOG_APPEND_TIME, logAppendTime);
assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
assertEquals(logAppendTime, batch.maxTimestamp());
assertTrue(batch.isValid());
List<MutableRecordBatch> recordBatches = Utils.toList(records.batches().iterator());
assertEquals(1, recordBatches.size());
assertEquals(logAppendTime, recordBatches.get(0).maxTimestamp());
assertEquals(TimestampType.LOG_APPEND_TIME, recordBatches.get(0).timestampType());
for (Record record : records.records())
assertEquals(logAppendTime, record.timestamp());
}
@Test(expected = IllegalArgumentException.class)
public void testSetNoTimestampTypeNotAllowed() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.NONE, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
batch.setMaxTimestamp(TimestampType.NO_TIMESTAMP_TYPE, RecordBatch.NO_TIMESTAMP);
}
@Test
public void testReadAndWriteControlBatch() {
long producerId = 1L;
short producerEpoch = 0;
int coordinatorEpoch = 15;
ByteBuffer buffer = ByteBuffer.allocate(128);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
CompressionType.NONE, TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, producerId,
producerEpoch, RecordBatch.NO_SEQUENCE, true, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
buffer.remaining());
EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
builder.appendEndTxnMarker(System.currentTimeMillis(), marker);
MemoryRecords records = builder.build();
List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
assertEquals(1, batches.size());
MutableRecordBatch batch = batches.get(0);
assertTrue(batch.isControlBatch());
List<Record> logRecords = TestUtils.toList(records.records());
assertEquals(1, logRecords.size());
Record commitRecord = logRecords.get(0);
assertEquals(marker, EndTransactionMarker.deserialize(commitRecord));
}
@Test
public void testStreamingIteratorConsistency() {
MemoryRecords records = MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V2, 0L,
CompressionType.GZIP, TimestampType.CREATE_TIME,
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
new SimpleRecord(3L, "c".getBytes(), "3".getBytes()));
DefaultRecordBatch batch = new DefaultRecordBatch(records.buffer());
try (CloseableIterator<Record> streamingIterator = batch.streamingIterator(BufferSupplier.create())) {
TestUtils.checkEquals(streamingIterator, batch.iterator());
}
}
@Test
public void testIncrementSequence() {
assertEquals(10, DefaultRecordBatch.incrementSequence(5, 5));
assertEquals(0, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE, 1));
assertEquals(4, DefaultRecordBatch.incrementSequence(Integer.MAX_VALUE - 5, 10));
}
@Test
public void testDecrementSequence() {
assertEquals(0, DefaultRecordBatch.decrementSequence(5, 5));
assertEquals(Integer.MAX_VALUE, DefaultRecordBatch.decrementSequence(0, 1));
}
private static DefaultRecordBatch recordsWithInvalidRecordCount(Byte magicValue, long timestamp,
CompressionType codec, int invalidCount) {
ByteBuffer buf = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L);
builder.appendWithOffset(0, timestamp, null, "hello".getBytes());
builder.appendWithOffset(1, timestamp, null, "there".getBytes());
builder.appendWithOffset(2, timestamp, null, "beautiful".getBytes());
MemoryRecords records = builder.build();
ByteBuffer buffer = records.buffer();
buffer.position(0);
buffer.putInt(RECORDS_COUNT_OFFSET, invalidCount);
buffer.position(0);
return new DefaultRecordBatch(buffer);
}
}