blob: b233793be009c86ce08dfab9a1d76e97d5e54c1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Time;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class LazyDownConversionRecordsTest {
/**
* Test the lazy down-conversion path in the presence of commit markers. When converting to V0 or V1, these batches
* are dropped. If there happen to be no more batches left to convert, we must get an overflow message batch after
* conversion.
*/
@Test
public void testConversionOfCommitMarker() throws IOException {
MemoryRecords recordsToConvert = MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(), RecordBatch.NO_PARTITION_LEADER_EPOCH,
1, (short) 1, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
MemoryRecords convertedRecords = convertRecords(recordsToConvert, (byte) 1, recordsToConvert.sizeInBytes());
ByteBuffer buffer = convertedRecords.buffer();
// read the offset and the batch length
buffer.getLong();
int sizeOfConvertedRecords = buffer.getInt();
// assert we got an overflow message batch
assertTrue(sizeOfConvertedRecords > buffer.limit());
assertFalse(convertedRecords.batchIterator().hasNext());
}
@RunWith(value = Parameterized.class)
public static class ParameterizedConversionTest {
private final CompressionType compressionType;
private final byte toMagic;
public ParameterizedConversionTest(CompressionType compressionType, byte toMagic) {
this.compressionType = compressionType;
this.toMagic = toMagic;
}
@Parameterized.Parameters(name = "compressionType={0}, toMagic={1}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
values.add(new Object[]{CompressionType.NONE, toMagic});
values.add(new Object[]{CompressionType.GZIP, toMagic});
}
return values;
}
/**
* Test the lazy down-conversion path.
*/
@Test
public void testConversion() throws IOException {
doTestConversion(false);
}
/**
* Test the lazy down-conversion path where the number of bytes we want to convert is much larger than the
* number of bytes we get after conversion. This causes overflow message batch(es) to be appended towards the
* end of the converted output.
*/
@Test
public void testConversionWithOverflow() throws IOException {
doTestConversion(true);
}
private void doTestConversion(boolean testConversionOverflow) throws IOException {
List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L);
Header[] headers = {new RecordHeader("headerKey1", "headerValue1".getBytes()),
new RecordHeader("headerKey2", "headerValue2".getBytes()),
new RecordHeader("headerKey3", "headerValue3".getBytes())};
List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()),
new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()),
new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()),
new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes(), headers),
new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()),
new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), headers));
assertEquals("incorrect test setup", offsets.size(), records.size());
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L);
for (int i = 0; i < 3; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L);
for (int i = 3; i < 6; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
0L);
for (int i = 6; i < 10; i++)
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
buffer.flip();
MemoryRecords recordsToConvert = MemoryRecords.readableRecords(buffer);
int numBytesToConvert = recordsToConvert.sizeInBytes();
if (testConversionOverflow)
numBytesToConvert *= 2;
MemoryRecords convertedRecords = convertRecords(recordsToConvert, toMagic, numBytesToConvert);
verifyDownConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic);
}
}
private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte toMagic, int bytesToConvert) throws IOException {
try (FileRecords inputRecords = FileRecords.open(tempFile())) {
inputRecords.append(recordsToConvert);
inputRecords.flush();
LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(new TopicPartition("test", 1),
inputRecords, toMagic, 0L, Time.SYSTEM);
LazyDownConversionRecordsSend lazySend = lazyRecords.toSend("foo");
File outputFile = tempFile();
FileChannel channel = FileChannel.open(outputFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE);
int written = 0;
while (written < bytesToConvert)
written += lazySend.writeTo(channel, written, bytesToConvert - written);
FileRecords convertedRecords = FileRecords.open(outputFile, true, (int) channel.size(), false);
ByteBuffer convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes());
convertedRecords.readInto(convertedRecordsBuffer, 0);
// cleanup
convertedRecords.close();
channel.close();
return MemoryRecords.readableRecords(convertedRecordsBuffer);
}
}
private static void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
MemoryRecords downConvertedRecords,
CompressionType compressionType,
byte toMagic) {
int i = 0;
for (RecordBatch batch : downConvertedRecords.batches()) {
assertTrue("Magic byte should be lower than or equal to " + toMagic, batch.magic() <= toMagic);
if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
else
assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
assertEquals("Compression type should not be affected by conversion", compressionType, batch.compressionType());
for (Record record : batch) {
assertTrue("Inner record should have magic " + toMagic, record.hasMagic(batch.magic()));
assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset());
assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key()));
assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value()));
assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
} else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
} else {
assertEquals("Timestamp should not change", initialRecords.get(i).timestamp(), record.timestamp());
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
assertArrayEquals("Headers should not change", initialRecords.get(i).headers(), record.headers());
}
i += 1;
}
}
assertEquals(initialOffsets.size(), i);
}
}