blob: 02ee75e00bcdd19135d4ce76f900c4a51c99180c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.common.record;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
public class MemoryRecordsBuilderTest {
private final CompressionType compressionType;
private final int bufferOffset;
public MemoryRecordsBuilderTest(int bufferOffset, CompressionType compressionType) {
this.bufferOffset = bufferOffset;
this.compressionType = compressionType;
}
@Test
public void testCompressionRateV0() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
Record[] records = new Record[] {
Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
Record.create(Record.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
Record.create(Record.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
};
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
int uncompressedSize = 0;
for (Record record : records) {
uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
builder.append(record);
}
MemoryRecords built = builder.build();
if (compressionType == CompressionType.NONE) {
assertEquals(1.0, builder.compressionRate(), 0.00001);
} else {
int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V0;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
}
}
@Test
public void testCompressionRateV1() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
Record[] records = new Record[] {
Record.create(Record.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
Record.create(Record.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
Record.create(Record.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
};
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
int uncompressedSize = 0;
for (Record record : records) {
uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
builder.append(record);
}
MemoryRecords built = builder.build();
if (compressionType == CompressionType.NONE) {
assertEquals(1.0, builder.compressionRate(), 0.00001);
} else {
int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V1;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
}
}
@Test
public void buildUsingLogAppendTime() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(0L, "b".getBytes(), "2".getBytes());
builder.append(0L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(logAppendTime, info.maxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
for (Record record : records.records()) {
assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
assertEquals(logAppendTime, record.timestamp());
}
}
@Test
public void convertUsingLogAppendTime() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(logAppendTime, info.maxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
for (Record record : records.records()) {
assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
assertEquals(logAppendTime, record.timestamp());
}
}
@Test
public void buildUsingCreateTime() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(2L, "b".getBytes(), "2".getBytes());
builder.append(1L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(2L, info.maxTimestamp);
if (compressionType == CompressionType.NONE)
assertEquals(1L, info.shallowOffsetOfMaxTimestamp);
else
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
int i = 0;
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
for (Record record : records.records()) {
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(expectedTimestamps[i++], record.timestamp());
}
}
@Test
public void testSmallWriteLimit() {
// with a small write limit, we always allow at least one record to be added
byte[] key = "foo".getBytes();
byte[] value = "bar".getBytes();
int writeLimit = 0;
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit);
assertFalse(builder.isFull());
assertTrue(builder.hasRoomFor(key, value));
builder.append(0L, key, value);
assertTrue(builder.isFull());
assertFalse(builder.hasRoomFor(key, value));
MemoryRecords memRecords = builder.build();
List<Record> records = TestUtils.toList(memRecords.records());
assertEquals(1, records.size());
Record record = records.get(0);
assertEquals(ByteBuffer.wrap(key), record.key());
assertEquals(ByteBuffer.wrap(value), record.value());
}
@Test
public void writePastLimit() {
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(1L, "b".getBytes(), "2".getBytes());
assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes()));
builder.append(2L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(2L, info.maxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
long i = 0L;
for (Record record : records.records()) {
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(i++, record.timestamp());
}
}
@Test(expected = IllegalArgumentException.class)
public void testAppendAtInvalidOffset() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
// offsets must increase monotonically
builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null);
}
@Test(expected = IllegalArgumentException.class)
public void testAppendWithInvalidMagic() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
builder.append(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), null));
}
@Test
public void convertUsingCreateTime() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
MemoryRecords records = builder.build();
MemoryRecordsBuilder.RecordsInfo info = builder.info();
assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
for (Record record : records.records()) {
assertEquals(TimestampType.CREATE_TIME, record.timestampType());
assertEquals(Record.NO_TIMESTAMP, record.timestamp());
}
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (int bufferOffset : Arrays.asList(0, 15))
for (CompressionType compressionType : CompressionType.values())
values.add(new Object[] {bufferOffset, compressionType});
return values;
}
}