blob: fef3dd4eb6cd7ef74edcbc69c422fb4df30f5d64 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.distributedlog.EnvelopedEntry.HEADER_LENGTH;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.Entry.Reader;
import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Test;
/**
* Test Case of {@link Entry}.
*/
public class TestEntry {
@Test(timeout = 20000)
public void testEmptyRecordSet() throws Exception {
Writer writer = Entry.newEntry(
"test-empty-record-set",
1024,
true,
CompressionCodec.Type.NONE);
assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
ByteBuf buffer = writer.getBuffer();
EnvelopedEntryReader reader = (EnvelopedEntryReader) Entry.newBuilder()
.setEntry(buffer)
.setLogSegmentInfo(1L, 0L)
.setEntryId(0L)
.buildReader();
int refCnt = reader.getSrcBuf().refCnt();
assertFalse(reader.isExhausted());
Assert.assertNull("Empty record set should return null",
reader.nextRecord());
assertTrue(reader.isExhausted());
assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
// read next record again (to test release buffer)
Assert.assertNull("Empty record set should return null",
reader.nextRecord());
assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
buffer.release();
}
@Test(timeout = 20000)
public void testWriteTooLongRecord() throws Exception {
Writer writer = Entry.newEntry(
"test-write-too-long-record",
1024,
true,
CompressionCodec.Type.NONE);
assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
try {
writer.writeRecord(largeRecord, new CompletableFuture<DLSN>());
Assert.fail("Should fail on writing large record");
} catch (LogRecordTooLongException lrtle) {
// expected
}
assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
ByteBuf buffer = writer.getBuffer();
assertEquals("zero bytes", HEADER_LENGTH, buffer.readableBytes());
buffer.release();
}
@Test(timeout = 20000)
public void testWriteRecords() throws Exception {
Writer writer = Entry.newEntry(
"test-write-records",
1024,
true,
CompressionCodec.Type.NONE);
assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
// write first 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8));
record.setPositionWithinLogSegment(i);
CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
}
// write large record
LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
try {
writer.writeRecord(largeRecord, new CompletableFuture<DLSN>());
Assert.fail("Should fail on writing large record");
} catch (LogRecordTooLongException lrtle) {
// expected
}
assertEquals("5 records", 5, writer.getNumRecords());
// write another 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i + 5, ("record-" + (i + 5)).getBytes(UTF_8));
record.setPositionWithinLogSegment(i + 5);
CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
}
ByteBuf buffer = writer.getBuffer();
buffer.retain();
// Test transmit complete
writer.completeTransmit(1L, 1L);
List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 10; i++) {
assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
}
// Test reading from buffer
Reader reader = Entry.newBuilder()
.setEntry(buffer)
.setLogSegmentInfo(1L, 1L)
.setEntryId(0L)
.setEnvelopeEntry(true)
.buildReader();
buffer.release();
LogRecordWithDLSN record = reader.nextRecord();
int numReads = 0;
long expectedTxid = 0L;
while (null != record) {
assertEquals(expectedTxid, record.getTransactionId());
assertEquals(expectedTxid, record.getSequenceId());
assertEquals(new DLSN(1L, 0L, expectedTxid), record.getDlsn());
++numReads;
++expectedTxid;
record = reader.nextRecord();
}
assertEquals(10, numReads);
reader.release();
}
@Test(timeout = 20000)
public void testWriteRecordSet() throws Exception {
Writer writer = Entry.newEntry(
"test-write-recordset",
1024,
true,
CompressionCodec.Type.NONE);
assertEquals("zero bytes", HEADER_LENGTH, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
// write first 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8));
record.setPositionWithinLogSegment(i);
CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
}
final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(1024, CompressionCodec.Type.NONE);
List<CompletableFuture<DLSN>> recordSetPromiseList = Lists.newArrayList();
// write another 5 records as a batch
for (int i = 0; i < 5; i++) {
ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8));
CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
recordSetWriter.writeRecord(record, writePromise);
recordSetPromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), recordSetWriter.getNumRecords());
}
final ByteBuf recordSetBuffer = recordSetWriter.getBuffer();
LogRecord setRecord = new LogRecord(5L, recordSetBuffer);
setRecord.setPositionWithinLogSegment(5);
setRecord.setRecordSet();
CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writePromise.whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN dlsn) {
recordSetWriter.completeTransmit(
dlsn.getLogSegmentSequenceNo(),
dlsn.getEntryId(),
dlsn.getSlotId());
}
@Override
public void onFailure(Throwable cause) {
recordSetWriter.abortTransmit(cause);
}
});
writer.writeRecord(setRecord, writePromise);
writePromiseList.add(writePromise);
// write last 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i + 10, ("record-" + (i + 10)).getBytes(UTF_8));
record.setPositionWithinLogSegment(i + 10);
writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 11) + " records", (i + 11), writer.getNumRecords());
}
ByteBuf buffer = writer.getBuffer();
buffer.retain();
// Test transmit complete
writer.completeTransmit(1L, 1L);
List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 5; i++) {
assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
}
assertEquals(new DLSN(1L, 1L, 5), writeResults.get(5));
for (int i = 0; i < 5; i++) {
assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
}
List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetPromiseList));
for (int i = 0; i < 5; i++) {
assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
}
// Test reading from buffer
verifyReadResult(buffer, 1L, 1L, 1L, true,
new DLSN(1L, 1L, 2L), 3, 5, 5,
new DLSN(1L, 1L, 2L), 2L);
verifyReadResult(buffer, 1L, 1L, 1L, true,
new DLSN(1L, 1L, 7L), 0, 3, 5,
new DLSN(1L, 1L, 7L), 7L);
verifyReadResult(buffer, 1L, 1L, 1L, true,
new DLSN(1L, 1L, 12L), 0, 0, 3,
new DLSN(1L, 1L, 12L), 12L);
verifyReadResult(buffer, 1L, 1L, 1L, false,
new DLSN(1L, 1L, 2L), 3, 5, 5,
new DLSN(1L, 1L, 2L), 2L);
verifyReadResult(buffer, 1L, 1L, 1L, false,
new DLSN(1L, 1L, 7L), 0, 3, 5,
new DLSN(1L, 1L, 7L), 7L);
verifyReadResult(buffer, 1L, 1L, 1L, false,
new DLSN(1L, 1L, 12L), 0, 0, 3,
new DLSN(1L, 1L, 12L), 12L);
buffer.release();
}
void verifyReadResult(ByteBuf data,
long lssn, long entryId, long startSequenceId,
boolean deserializeRecordSet,
DLSN skipTo,
int firstNumRecords,
int secondNumRecords,
int lastNumRecords,
DLSN expectedDLSN,
long expectedTxId) throws Exception {
Reader reader = Entry.newBuilder()
.setEntry(data)
.setLogSegmentInfo(lssn, startSequenceId)
.setEntryId(entryId)
.deserializeRecordSet(deserializeRecordSet)
.buildReader();
reader.skipTo(skipTo);
LogRecordWithDLSN record;
for (int i = 0; i < firstNumRecords; i++) { // first
record = reader.nextRecord();
assertNotNull(record);
assertEquals(expectedDLSN, record.getDlsn());
assertEquals(expectedTxId, record.getTransactionId());
assertNotNull("record " + record + " payload is null",
record.getPayloadBuf());
assertEquals("record-" + expectedTxId, new String(record.getPayload(), UTF_8));
expectedDLSN = expectedDLSN.getNextDLSN();
++expectedTxId;
}
boolean verifyDeserializedRecords = true;
if (firstNumRecords > 0) {
verifyDeserializedRecords = deserializeRecordSet;
}
if (verifyDeserializedRecords) {
long txIdOfRecordSet = 5;
for (int i = 0; i < secondNumRecords; i++) {
record = reader.nextRecord();
assertNotNull(record);
assertEquals(expectedDLSN, record.getDlsn());
assertEquals(txIdOfRecordSet, record.getTransactionId());
assertNotNull("record " + record + " payload is null",
record.getPayload());
assertEquals("record-" + expectedTxId, new String(record.getPayload(), UTF_8));
expectedDLSN = expectedDLSN.getNextDLSN();
++expectedTxId;
}
} else {
record = reader.nextRecord();
assertNotNull(record);
assertEquals(expectedDLSN, record.getDlsn());
assertEquals(expectedTxId, record.getTransactionId());
for (int i = 0; i < secondNumRecords; i++) {
expectedDLSN = expectedDLSN.getNextDLSN();
++expectedTxId;
}
}
for (int i = 0; i < lastNumRecords; i++) {
record = reader.nextRecord();
assertNotNull(record);
assertEquals(expectedDLSN, record.getDlsn());
assertEquals(expectedTxId, record.getTransactionId());
assertNotNull("record " + record + " payload is null",
record.getPayload());
assertEquals("record-" + expectedTxId, new String(record.getPayload(), UTF_8));
expectedDLSN = expectedDLSN.getNextDLSN();
++expectedTxId;
}
}
}