blob: 9271a3fa9e1202584ea1893962ef6a4ae8dde8b6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.toNullableArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
public class MemoryRecordsTest {
private CompressionType compression;
private byte magic;
private long firstOffset;
public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compression) {
this.magic = magic;
this.compression = compression;
this.firstOffset = firstOffset;
}
@Test
public void testIterator() {
MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
List<Record> list = asList(
Record.create(magic, 1L, "a".getBytes(), "1".getBytes()),
Record.create(magic, 2L, "b".getBytes(), "2".getBytes()),
Record.create(magic, 3L, "c".getBytes(), "3".getBytes()),
Record.create(magic, 4L, null, "4".getBytes()),
Record.create(magic, 5L, "e".getBytes(), null),
Record.create(magic, 6L, null, null));
for (int i = 0; i < list.size(); i++) {
Record r = list.get(i);
builder1.append(r);
builder2.append(i + 1, toNullableArray(r.key()), toNullableArray(r.value()));
}
MemoryRecords recs1 = builder1.build();
MemoryRecords recs2 = builder2.build();
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : asList(recs1, recs2)) {
Iterator<LogEntry> iter = recs.deepEntries().iterator();
for (int i = 0; i < list.size(); i++) {
assertTrue(iter.hasNext());
LogEntry entry = iter.next();
assertEquals(firstOffset + i, entry.offset());
assertEquals(list.get(i), entry.record());
entry.record().ensureValid();
}
assertFalse(iter.hasNext());
}
}
}
@Test
public void testHasRoomForMethod() {
MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME);
builder.append(Record.create(magic, 0L, "a".getBytes(), "1".getBytes()));
assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
builder.close();
assertFalse(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
}
@Test
public void testFilterTo() {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
builder.append(11L, "1".getBytes(), "b".getBytes());
builder.append(12L, null, "c".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L);
builder.append(13L, null, "d".getBytes());
builder.append(20L, "4".getBytes(), "e".getBytes());
builder.append(15L, "5".getBytes(), "f".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L);
builder.append(16L, "6".getBytes(), "g".getBytes());
builder.close();
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
filtered.flip();
assertEquals(7, result.messagesRead);
assertEquals(4, result.messagesRetained);
assertEquals(buffer.limit(), result.bytesRead);
assertEquals(filtered.limit(), result.bytesRetained);
if (magic > 0) {
assertEquals(20L, result.maxTimestamp);
if (compression == CompressionType.NONE)
assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
else
assertEquals(5L, result.shallowOffsetOfMaxTimestamp);
}
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
assertEquals(expectedOffsets.size(), shallowEntries.size());
for (int i = 0; i < expectedOffsets.size(); i++) {
LogEntry shallowEntry = shallowEntries.get(i);
assertEquals(expectedOffsets.get(i).longValue(), shallowEntry.offset());
assertEquals(magic, shallowEntry.record().magic());
assertEquals(compression, shallowEntry.record().compressionType());
assertEquals(magic == Record.MAGIC_VALUE_V0 ? TimestampType.NO_TIMESTAMP_TYPE : TimestampType.CREATE_TIME,
shallowEntry.record().timestampType());
}
List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
assertEquals(4, deepEntries.size());
LogEntry first = deepEntries.get(0);
assertEquals(1L, first.offset());
assertEquals(Record.create(magic, 11L, "1".getBytes(), "b".getBytes()), first.record());
LogEntry second = deepEntries.get(1);
assertEquals(4L, second.offset());
assertEquals(Record.create(magic, 20L, "4".getBytes(), "e".getBytes()), second.record());
LogEntry third = deepEntries.get(2);
assertEquals(5L, third.offset());
assertEquals(Record.create(magic, 15L, "5".getBytes(), "f".getBytes()), third.record());
LogEntry fourth = deepEntries.get(3);
assertEquals(6L, fourth.offset());
assertEquals(Record.create(magic, 16L, "6".getBytes(), "g".getBytes()), fourth.record());
}
@Test
public void testFilterToPreservesLogAppendTime() {
long logAppendTime = System.currentTimeMillis();
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime);
builder.append(10L, null, "a".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime);
builder.append(11L, "1".getBytes(), "b".getBytes());
builder.append(12L, null, "c".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime);
builder.append(13L, null, "d".getBytes());
builder.append(14L, "4".getBytes(), "e".getBytes());
builder.append(15L, "5".getBytes(), "f".getBytes());
builder.close();
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
for (LogEntry shallowEntry : shallowEntries) {
assertEquals(compression, shallowEntry.record().compressionType());
if (magic > Record.MAGIC_VALUE_V0) {
assertEquals(TimestampType.LOG_APPEND_TIME, shallowEntry.record().timestampType());
assertEquals(logAppendTime, shallowEntry.record().timestamp());
}
}
}
@Parameterized.Parameters
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (long firstOffset : asList(0L, 57L))
for (byte magic : asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
for (CompressionType type: CompressionType.values())
values.add(new Object[] {magic, firstOffset, type});
return values;
}
private static class RetainNonNullKeysFilter implements MemoryRecords.LogEntryFilter {
@Override
public boolean shouldRetain(LogEntry entry) {
return entry.record().hasKey();
}
}
}