blob: a70edf5b2d11d58b662d9143ad12a5e91bc66733 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.impl.logsegment;
import com.google.common.collect.Lists;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
/**
* Test Case for {@link BKLogSegmentEntryReader}
*/
public class TestBKLogSegmentEntryReader extends TestDistributedLogBase {
@Rule
public TestName runtime = new TestName();
private OrderedScheduler scheduler;
private BookKeeperClient bkc;
private ZooKeeperClient zkc;
@Before
public void setup() throws Exception {
super.setup();
zkc = ZooKeeperClientBuilder.newBuilder()
.name("test-zk")
.zkServers(bkutil.getZkServers())
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.zkAclId(conf.getZkAclId())
.build();
bkc = BookKeeperClientBuilder.newBuilder()
.name("test-bk")
.dlConfig(conf)
.ledgersPath("/ledgers")
.zkServers(bkutil.getZkServers())
.build();
scheduler = OrderedScheduler.newBuilder()
.name("test-bk-logsegment-entry-reader")
.corePoolSize(1)
.build();
}
@After
public void teardown() throws Exception {
if (null != bkc) {
bkc.close();
}
if (null != scheduler) {
scheduler.shutdown();
}
if (null != zkc) {
zkc.close();
}
super.teardown();
}
BKLogSegmentEntryReader createEntryReader(LogSegmentMetadata segment,
long startEntryId,
DistributedLogConfiguration conf)
throws Exception {
LogSegmentEntryStore store = new BKLogSegmentEntryStore(
conf,
ConfUtils.getConstDynConf(conf),
zkc,
bkc,
scheduler,
null,
NullStatsLogger.INSTANCE,
AsyncFailureInjector.NULL);
return (BKLogSegmentEntryReader) Utils.ioResult(store.openReader(segment, startEntryId));
}
void generateCompletedLogSegments(DistributedLogManager dlm,
DistributedLogConfiguration conf,
long numCompletedSegments,
long segmentSize) throws Exception {
long txid = 1L;
for (long i = 0; i < numCompletedSegments; i++) {
AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
for (long j = 1; j <= segmentSize; j++) {
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
ctrlRecord.setControl();
Utils.ioResult(writer.write(ctrlRecord));
}
Utils.close(writer);
}
}
AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
DistributedLogConfiguration conf,
long segmentSize) throws Exception {
AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
for (long i = 1L; i <= segmentSize; i++) {
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
ctrlRecord.setControl();
Utils.ioResult(writer.write(ctrlRecord));
}
return writer;
}
@Test(timeout = 60000)
public void testReadEntriesFromCompleteLogSegment() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setOutputBufferSize(0);
confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
confLocal.setImmediateFlushEnabled(false);
confLocal.setNumPrefetchEntriesPerLogSegment(10);
confLocal.setMaxPrefetchEntriesPerLogSegment(10);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
generateCompletedLogSegments(dlm, confLocal, 1, 20);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
reader.start();
boolean done = false;
long txId = 1L;
long entryId = 0L;
while (!done) {
Entry.Reader entryReader;
try {
entryReader = Utils.ioResult(reader.readNext(1)).get(0);
} catch (EndOfLogSegmentException eol) {
done = true;
continue;
}
LogRecordWithDLSN record = entryReader.nextRecord();
while (null != record) {
if (!record.isControl()) {
DLMTestUtil.verifyLogRecord(record);
assertEquals(txId, record.getTransactionId());
++txId;
}
DLSN dlsn = record.getDlsn();
assertEquals(1L, dlsn.getLogSegmentSequenceNo());
assertEquals(entryId, dlsn.getEntryId());
record = entryReader.nextRecord();
}
++entryId;
}
assertEquals(21, txId);
assertFalse(reader.hasCaughtUpOnInprogress());
Utils.close(reader);
}
@Test(timeout = 60000)
public void testCloseReaderToCancelPendingReads() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setNumPrefetchEntriesPerLogSegment(10);
confLocal.setMaxPrefetchEntriesPerLogSegment(10);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
DLMTestUtil.generateCompletedLogSegments(dlm, confLocal, 1, 20);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
List<CompletableFuture<List<Entry.Reader>>> futures = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
futures.add(reader.readNext(1));
}
assertFalse("Reader should not be closed yet", reader.isClosed());
Utils.close(reader);
for (CompletableFuture<List<Entry.Reader>> future : futures) {
try {
Utils.ioResult(future);
fail("The read request should be cancelled");
} catch (ReadCancelledException rce) {
// expected
}
}
assertFalse(reader.hasCaughtUpOnInprogress());
assertTrue("Reader should be closed yet", reader.isClosed());
}
@Test(timeout = 60000)
public void testMaxPrefetchEntriesSmallBatch() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setOutputBufferSize(0);
confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
confLocal.setImmediateFlushEnabled(false);
confLocal.setNumPrefetchEntriesPerLogSegment(2);
confLocal.setMaxPrefetchEntriesPerLogSegment(10);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
generateCompletedLogSegments(dlm, confLocal, 1, 20);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
reader.start();
// wait for the read ahead entries to become available
while (reader.readAheadEntries.size() < 10) {
TimeUnit.MILLISECONDS.sleep(10);
}
long txId = 1L;
long entryId = 0L;
assertEquals(10, reader.readAheadEntries.size());
assertEquals(10, reader.getNextEntryId());
assertFalse(reader.hasCaughtUpOnInprogress());
// read first entry
Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
while (null != record) {
if (!record.isControl()) {
DLMTestUtil.verifyLogRecord(record);
assertEquals(txId, record.getTransactionId());
++txId;
}
DLSN dlsn = record.getDlsn();
assertEquals(1L, dlsn.getLogSegmentSequenceNo());
assertEquals(entryId, dlsn.getEntryId());
record = entryReader.nextRecord();
}
++entryId;
assertEquals(2L, txId);
// wait for the read ahead entries to become 10 again
while (reader.readAheadEntries.size() < 10) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(10, reader.readAheadEntries.size());
assertEquals(11, reader.getNextEntryId());
assertFalse(reader.hasCaughtUpOnInprogress());
Utils.close(reader);
}
@Test(timeout = 60000)
public void testMaxPrefetchEntriesLargeBatch() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setOutputBufferSize(0);
confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
confLocal.setImmediateFlushEnabled(false);
confLocal.setNumPrefetchEntriesPerLogSegment(10);
confLocal.setMaxPrefetchEntriesPerLogSegment(5);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
generateCompletedLogSegments(dlm, confLocal, 1, 20);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
reader.start();
// wait for the read ahead entries to become available
while (reader.readAheadEntries.size() < 5) {
TimeUnit.MILLISECONDS.sleep(10);
}
long txId = 1L;
long entryId = 0L;
assertEquals(5, reader.readAheadEntries.size());
assertEquals(5, reader.getNextEntryId());
// read first entry
Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
while (null != record) {
if (!record.isControl()) {
DLMTestUtil.verifyLogRecord(record);
assertEquals(txId, record.getTransactionId());
++txId;
}
DLSN dlsn = record.getDlsn();
assertEquals(1L, dlsn.getLogSegmentSequenceNo());
assertEquals(entryId, dlsn.getEntryId());
record = entryReader.nextRecord();
}
++entryId;
assertEquals(2L, txId);
// wait for the read ahead entries to become 10 again
while (reader.readAheadEntries.size() < 5) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(5, reader.readAheadEntries.size());
assertEquals(6, reader.getNextEntryId());
assertFalse(reader.hasCaughtUpOnInprogress());
Utils.close(reader);
}
@Test(timeout = 60000)
public void testMaxPrefetchEntriesSmallSegment() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setOutputBufferSize(0);
confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
confLocal.setImmediateFlushEnabled(false);
confLocal.setNumPrefetchEntriesPerLogSegment(10);
confLocal.setMaxPrefetchEntriesPerLogSegment(20);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
generateCompletedLogSegments(dlm, confLocal, 1, 5);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
reader.start();
// wait for the read ahead entries to become available
while (reader.readAheadEntries.size() < (reader.getLastAddConfirmed() + 1)) {
TimeUnit.MILLISECONDS.sleep(10);
}
long txId = 1L;
long entryId = 0L;
assertEquals((reader.getLastAddConfirmed() + 1), reader.readAheadEntries.size());
assertEquals((reader.getLastAddConfirmed() + 1), reader.getNextEntryId());
// read first entry
Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
while (null != record) {
if (!record.isControl()) {
DLMTestUtil.verifyLogRecord(record);
assertEquals(txId, record.getTransactionId());
++txId;
}
DLSN dlsn = record.getDlsn();
assertEquals(1L, dlsn.getLogSegmentSequenceNo());
assertEquals(entryId, dlsn.getEntryId());
record = entryReader.nextRecord();
}
++entryId;
assertEquals(2L, txId);
assertEquals(reader.getLastAddConfirmed(), reader.readAheadEntries.size());
assertEquals((reader.getLastAddConfirmed() + 1), reader.getNextEntryId());
assertFalse(reader.hasCaughtUpOnInprogress());
Utils.close(reader);
}
@Test(timeout = 60000)
public void testReadEntriesFromInprogressSegment() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setOutputBufferSize(0);
confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
confLocal.setImmediateFlushEnabled(false);
confLocal.setNumPrefetchEntriesPerLogSegment(20);
confLocal.setMaxPrefetchEntriesPerLogSegment(20);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
AsyncLogWriter writer = createInprogressLogSegment(dlm, confLocal, 5);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
reader.start();
long expectedLastAddConfirmed = 8L;
// wait until sending out all prefetch requests
while (reader.readAheadEntries.size() < expectedLastAddConfirmed + 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(expectedLastAddConfirmed + 2, reader.getNextEntryId());
long txId = 1L;
long entryId = 0L;
while (true) {
Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
while (null != record) {
if (!record.isControl()) {
DLMTestUtil.verifyLogRecord(record);
assertEquals(txId, record.getTransactionId());
++txId;
}
DLSN dlsn = record.getDlsn();
assertEquals(1L, dlsn.getLogSegmentSequenceNo());
assertEquals(entryId, dlsn.getEntryId());
record = entryReader.nextRecord();
}
++entryId;
if (entryId == expectedLastAddConfirmed + 1) {
break;
}
}
assertEquals(6L, txId);
CompletableFuture<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
// write another record to commit previous writes
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
// the long poll will be satisfied
List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture);
assertEquals(1, nextReadEntries.size());
assertTrue(reader.hasCaughtUpOnInprogress());
Entry.Reader entryReader = nextReadEntries.get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
assertNotNull(record);
assertTrue(record.isControl());
assertNull(entryReader.nextRecord());
// once the read is advanced, we will prefetch next record
while (reader.getNextEntryId() <= entryId) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(entryId + 2, reader.getNextEntryId());
assertEquals(1, reader.readAheadEntries.size());
Utils.close(reader);
Utils.close(writer);
}
@Test(timeout = 60000)
public void testReadEntriesOnStateChange() throws Exception {
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.addConfiguration(conf);
confLocal.setOutputBufferSize(0);
confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
confLocal.setImmediateFlushEnabled(false);
confLocal.setNumPrefetchEntriesPerLogSegment(20);
confLocal.setMaxPrefetchEntriesPerLogSegment(20);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
AsyncLogWriter writer = createInprogressLogSegment(dlm, confLocal, 5);
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(segments.size() + " log segments found, expected to be only one",
1, segments.size());
BKLogSegmentEntryReader reader = createEntryReader(segments.get(0), 0, confLocal);
reader.start();
long expectedLastAddConfirmed = 8L;
// wait until sending out all prefetch requests
while (reader.readAheadEntries.size() < expectedLastAddConfirmed + 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(expectedLastAddConfirmed + 2, reader.getNextEntryId());
long txId = 1L;
long entryId = 0L;
while (true) {
Entry.Reader entryReader = Utils.ioResult(reader.readNext(1)).get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
while (null != record) {
if (!record.isControl()) {
DLMTestUtil.verifyLogRecord(record);
assertEquals(txId, record.getTransactionId());
++txId;
}
DLSN dlsn = record.getDlsn();
assertEquals(1L, dlsn.getLogSegmentSequenceNo());
assertEquals(entryId, dlsn.getEntryId());
record = entryReader.nextRecord();
}
++entryId;
if (entryId == expectedLastAddConfirmed + 1) {
break;
}
}
assertEquals(6L, txId);
CompletableFuture<List<Entry.Reader>> nextReadFuture = reader.readNext(1);
// write another record to commit previous writes
Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
// the long poll will be satisfied
List<Entry.Reader> nextReadEntries = Utils.ioResult(nextReadFuture);
assertEquals(1, nextReadEntries.size());
Entry.Reader entryReader = nextReadEntries.get(0);
LogRecordWithDLSN record = entryReader.nextRecord();
assertNotNull(record);
assertTrue(record.isControl());
assertNull(entryReader.nextRecord());
// once the read is advanced, we will prefetch next record
while (reader.getNextEntryId() <= entryId) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(entryId + 2, reader.getNextEntryId());
assertEquals(1, reader.readAheadEntries.size());
// advance the entry id
++entryId;
// close the writer, the write will be committed
Utils.close(writer);
entryReader = Utils.ioResult(reader.readNext(1)).get(0);
record = entryReader.nextRecord();
assertNotNull(record);
assertFalse(record.isControl());
assertNull(entryReader.nextRecord());
while (reader.getNextEntryId() <= entryId + 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals(entryId + 2, reader.getNextEntryId());
assertEquals(1, reader.readAheadEntries.size());
// get the new log segment
List<LogSegmentMetadata> newSegments = dlm.getLogSegments();
assertEquals(1, newSegments.size());
assertFalse(newSegments.get(0).isInProgress());
reader.onLogSegmentMetadataUpdated(newSegments.get(0));
// when reader received the new log segments. the outstanding long poll
// should be cancelled and end of log segment should be signaled correctly
try {
// when we closed the log segment, another control record will be
// written, so we loop over the reader until we reach end of log segment.
Utils.ioResult(reader.readNext(1));
Utils.ioResult(reader.readNext(1));
fail("Should reach end of log segment");
} catch (EndOfLogSegmentException eol) {
// expected
}
Utils.close(reader);
}
}