blob: 274bf9d1d224f45507585feafd71f349da5695c4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class FileRecordsTest {
private Record[] records = new Record[] {
Record.create("abcd".getBytes()),
Record.create("efgh".getBytes()),
Record.create("ijkl".getBytes())
};
private FileRecords fileRecords;
@Before
public void setup() throws IOException {
this.fileRecords = createFileRecords(records);
}
/**
* Test that the cached size variable matches the actual file size as we append messages
*/
@Test
public void testFileSize() throws IOException {
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
for (int i = 0; i < 20; i++) {
fileRecords.append(MemoryRecords.withRecords(Record.create("abcd".getBytes())));
assertEquals(fileRecords.channel().size(), fileRecords.sizeInBytes());
}
}
/**
* Test that adding invalid bytes to the end of the log doesn't break iteration
*/
@Test
public void testIterationOverPartialAndTruncation() throws IOException {
testPartialWrite(0, fileRecords);
testPartialWrite(2, fileRecords);
testPartialWrite(4, fileRecords);
testPartialWrite(5, fileRecords);
testPartialWrite(6, fileRecords);
}
private void testPartialWrite(int size, FileRecords fileRecords) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(size);
for (int i = 0; i < size; i++)
buffer.put((byte) 0);
buffer.rewind();
fileRecords.channel().write(buffer);
// appending those bytes should not change the contents
TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
}
/**
* Iterating over the file does file reads but shouldn't change the position of the underlying FileChannel.
*/
@Test
public void testIterationDoesntChangePosition() throws IOException {
long position = fileRecords.channel().position();
TestUtils.checkEquals(Arrays.asList(records), fileRecords.records());
assertEquals(position, fileRecords.channel().position());
}
/**
* Test a simple append and read.
*/
@Test
public void testRead() throws IOException {
FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes());
TestUtils.checkEquals(fileRecords.shallowEntries(), read.shallowEntries());
List<LogEntry> items = shallowEntries(read);
LogEntry second = items.get(1);
read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes());
assertEquals("Try a read starting from the second message",
items.subList(1, 3), shallowEntries(read));
read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes());
assertEquals("Try a read of a single message starting from the second message",
Collections.singletonList(second), shallowEntries(read));
}
/**
* Test the MessageSet.searchFor API.
*/
@Test
public void testSearch() throws IOException {
// append a new message with a high offset
Record lastMessage = Record.create("test".getBytes());
fileRecords.append(MemoryRecords.withRecords(50L, lastMessage));
List<LogEntry> entries = shallowEntries(fileRecords);
int position = 0;
int message1Size = entries.get(0).sizeInBytes();
assertEquals("Should be able to find the first message by its offset",
new FileRecords.LogEntryPosition(0L, position, message1Size),
fileRecords.searchForOffsetWithSize(0, 0));
position += message1Size;
int message2Size = entries.get(1).sizeInBytes();
assertEquals("Should be able to find second message when starting from 0",
new FileRecords.LogEntryPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, 0));
assertEquals("Should be able to find second message starting from its offset",
new FileRecords.LogEntryPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, position));
position += message2Size + entries.get(2).sizeInBytes();
int message4Size = entries.get(3).sizeInBytes();
assertEquals("Should be able to find fourth message from a non-existant offset",
new FileRecords.LogEntryPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(3, position));
assertEquals("Should be able to find fourth message by correct offset",
new FileRecords.LogEntryPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(50, position));
}
/**
* Test that the message set iterator obeys start and end slicing
*/
@Test
public void testIteratorWithLimits() throws IOException {
LogEntry entry = shallowEntries(fileRecords).get(1);
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
int size = entry.sizeInBytes();
FileRecords slice = fileRecords.read(start, size);
assertEquals(Collections.singletonList(entry), shallowEntries(slice));
FileRecords slice2 = fileRecords.read(start, size - 1);
assertEquals(Collections.emptyList(), shallowEntries(slice2));
}
/**
* Test the truncateTo method lops off messages and appropriately updates the size
*/
@Test
public void testTruncate() throws IOException {
LogEntry entry = shallowEntries(fileRecords).get(0);
int end = fileRecords.searchForOffsetWithSize(1, 0).position;
fileRecords.truncateTo(end);
assertEquals(Collections.singletonList(entry), shallowEntries(fileRecords));
assertEquals(entry.sizeInBytes(), fileRecords.sizeInBytes());
}
/**
* Test that truncateTo only calls truncate on the FileChannel if the size of the
* FileChannel is bigger than the target size. This is important because some JVMs
* change the mtime of the file, even if truncate should do nothing.
*/
@Test
public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException {
FileChannel channelMock = EasyMock.createMock(FileChannel.class);
EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
EasyMock.expect(channelMock.position(42L)).andReturn(null);
EasyMock.replay(channelMock);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
fileRecords.truncateTo(42);
EasyMock.verify(channelMock);
}
/**
* Expect a KafkaException if targetSize is bigger than the size of
* the FileRecords.
*/
@Test
public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOException {
FileChannel channelMock = EasyMock.createMock(FileChannel.class);
EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
EasyMock.expect(channelMock.position(42L)).andReturn(null);
EasyMock.replay(channelMock);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
try {
fileRecords.truncateTo(43);
fail("Should throw KafkaException");
} catch (KafkaException e) {
// expected
}
EasyMock.verify(channelMock);
}
/**
* see #testTruncateNotCalledIfSizeIsSameAsTargetSize
*/
@Test
public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException {
FileChannel channelMock = EasyMock.createMock(FileChannel.class);
EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
EasyMock.expect(channelMock.position(23L)).andReturn(null).once();
EasyMock.replay(channelMock);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
fileRecords.truncateTo(23);
EasyMock.verify(channelMock);
}
/**
* Test the new FileRecords with pre allocate as true
*/
@Test
public void testPreallocateTrue() throws IOException {
File temp = tempFile();
FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
long position = fileRecords.channel().position();
int size = fileRecords.sizeInBytes();
assertEquals(0, position);
assertEquals(0, size);
assertEquals(512 * 1024 * 1024, temp.length());
}
/**
* Test the new FileRecords with pre allocate as false
*/
@Test
public void testPreallocateFalse() throws IOException {
File temp = tempFile();
FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, false);
long position = set.channel().position();
int size = set.sizeInBytes();
assertEquals(0, position);
assertEquals(0, size);
assertEquals(0, temp.length());
}
/**
* Test the new FileRecords with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data.
*/
@Test
public void testPreallocateClearShutdown() throws IOException {
File temp = tempFile();
FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, true);
set.append(MemoryRecords.withRecords(records));
int oldPosition = (int) set.channel().position();
int oldSize = set.sizeInBytes();
assertEquals(fileRecords.sizeInBytes(), oldPosition);
assertEquals(fileRecords.sizeInBytes(), oldSize);
set.close();
File tempReopen = new File(temp.getAbsolutePath());
FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 * 1024, true);
int position = (int) setReopen.channel().position();
int size = setReopen.sizeInBytes();
assertEquals(oldPosition, position);
assertEquals(oldPosition, size);
assertEquals(oldPosition, tempReopen.length());
}
@Test
public void testFormatConversionWithPartialMessage() throws IOException {
LogEntry entry = shallowEntries(fileRecords).get(1);
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
int size = entry.sizeInBytes();
FileRecords slice = fileRecords.read(start, size - 1);
Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0);
assertTrue("No message should be there", shallowEntries(messageV0).isEmpty());
assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
}
@Test
public void testConvertNonCompressedToMagic1() throws IOException {
List<LogEntry> entries = Arrays.asList(
LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
// Up conversion. In reality we only do down conversion, but up conversion should work as well.
// up conversion for non-compressed messages
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(records);
fileRecords.flush();
Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
}
}
@Test
public void testConvertCompressedToMagic1() throws IOException {
List<LogEntry> entries = Arrays.asList(
LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k1".getBytes(), "hello".getBytes())),
LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "k2".getBytes(), "goodbye".getBytes())));
MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
// up conversion for compressed messages
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(records);
fileRecords.flush();
Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
}
}
@Test
public void testConvertNonCompressedToMagic0() throws IOException {
List<LogEntry> entries = Arrays.asList(
LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.NONE, entries);
// down conversion for non-compressed messages
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(records);
fileRecords.flush();
Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
}
}
@Test
public void testConvertCompressedToMagic0() throws IOException {
List<LogEntry> entries = Arrays.asList(
LogEntry.create(0L, Record.create(Record.MAGIC_VALUE_V1, 1L, "k1".getBytes(), "hello".getBytes())),
LogEntry.create(2L, Record.create(Record.MAGIC_VALUE_V1, 2L, "k2".getBytes(), "goodbye".getBytes())));
MemoryRecords records = MemoryRecords.withLogEntries(CompressionType.GZIP, entries);
// down conversion for compressed messages
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(records);
fileRecords.flush();
Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
}
}
private void verifyConvertedMessageSet(List<LogEntry> initialEntries, Records convertedRecords, byte magicByte) {
int i = 0;
for (LogEntry logEntry : deepEntries(convertedRecords)) {
assertEquals("magic byte should be " + magicByte, magicByte, logEntry.record().magic());
assertEquals("offset should not change", initialEntries.get(i).offset(), logEntry.offset());
assertEquals("key should not change", initialEntries.get(i).record().key(), logEntry.record().key());
assertEquals("payload should not change", initialEntries.get(i).record().value(), logEntry.record().value());
i += 1;
}
}
private static List<LogEntry> shallowEntries(Records buffer) {
return TestUtils.toList(buffer.shallowEntries());
}
private static List<LogEntry> deepEntries(Records buffer) {
return TestUtils.toList(buffer.deepEntries());
}
private FileRecords createFileRecords(Record ... records) throws IOException {
FileRecords fileRecords = FileRecords.open(tempFile());
fileRecords.append(MemoryRecords.withRecords(records));
fileRecords.flush();
return fileRecords;
}
}