blob: 51d05529adff3b5b78aefa403b9e4c42d0a2fc76 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.base.Optional;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.Await;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.twitter.util.TimeoutException;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
/**
* Test {@link BKLogReadHandler}
*/
public class TestBKLogReadHandler extends TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestBKLogReadHandler.class);
@Rule
public TestName runtime = new TestName();
private void prepareLogSegments(String name, int numSegments, int numEntriesPerSegment) throws Exception {
DLMTestUtil.BKLogPartitionWriteHandlerAndClients bkdlmAndClients = createNewBKDLM(conf, name);
long txid = 1;
for (int sid = 0; sid < numSegments; ++sid) {
BKLogSegmentWriter out = bkdlmAndClients.getWriteHandler().startLogSegment(txid);
for (int eid = 0; eid < numEntriesPerSegment; ++eid) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
out.write(record);
++txid;
}
FutureUtils.result(out.asyncClose());
bkdlmAndClients.getWriteHandler().completeAndCloseLogSegment(
out.getLogSegmentSequenceNumber(),
out.getLogSegmentId(),
1 + sid * numEntriesPerSegment,
(sid + 1) * numEntriesPerSegment,
numEntriesPerSegment);
}
bkdlmAndClients.close();
}
private void prepareLogSegmentsNonPartitioned(String name, int numSegments, int numEntriesPerSegment) throws Exception {
DistributedLogManager dlm = createNewDLM(conf, name);
long txid = 1;
for (int sid = 0; sid < numSegments; ++sid) {
LogWriter out = dlm.startLogSegmentNonPartitioned();
for (int eid = 0; eid < numEntriesPerSegment; ++eid) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
out.write(record);
++txid;
}
out.close();
}
dlm.close();
}
@Test(timeout = 60000)
public void testGetLedgerList() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegments(dlName, 3, 3);
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = dlm.createReadHandler();
List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false);
List<LogSegmentMetadata> ledgerList2 = readHandler.getFilteredLedgerList(true, false);
List<LogSegmentMetadata> ledgerList3 = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false);
assertEquals(3, ledgerList.size());
assertEquals(3, ledgerList2.size());
assertEquals(3, ledgerList3.size());
for (int i=0; i<3; i++) {
assertEquals(ledgerList3.get(i), ledgerList2.get(i));
}
}
@Test(timeout = 60000)
public void testForceGetLedgerList() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegments(dlName, 3, 3);
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = dlm.createReadHandler();
List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(true, false, LogSegmentMetadata.COMPARATOR, false);
final AtomicReference<List<LogSegmentMetadata>> resultHolder =
new AtomicReference<List<LogSegmentMetadata>>(null);
final CountDownLatch latch = new CountDownLatch(1);
readHandler.asyncGetLedgerList(LogSegmentMetadata.COMPARATOR, null, new BookkeeperInternalCallbacks.GenericCallback<List<LogSegmentMetadata>>() {
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> result) {
resultHolder.set(result);
latch.countDown();
}
});
latch.await();
List<LogSegmentMetadata> newLedgerList = resultHolder.get();
assertNotNull(newLedgerList);
LOG.info("Force sync get list : {}", ledgerList);
LOG.info("Async get list : {}", newLedgerList);
assertEquals(3, ledgerList.size());
assertEquals(3, newLedgerList.size());
for (int i=0; i<3; i++) {
assertEquals(ledgerList.get(i), newLedgerList.get(i));
}
}
@Test(timeout = 60000)
public void testGetFilteredLedgerListInWriteHandler() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegments(dlName, 11, 3);
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
// Get full list.
BKLogWriteHandler writeHandler0 = dlm.createWriteHandler(false);
List<LogSegmentMetadata> cachedFullLedgerList =
writeHandler0.getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
assertTrue(cachedFullLedgerList.size() <= 1);
List<LogSegmentMetadata> fullLedgerList = writeHandler0.getFullLedgerListDesc(false, false);
assertEquals(11, fullLedgerList.size());
// Get filtered list.
BKLogWriteHandler writeHandler1 = dlm.createWriteHandler(false);
List<LogSegmentMetadata> filteredLedgerListDesc = writeHandler1.getFilteredLedgerListDesc(false, false);
assertEquals(1, filteredLedgerListDesc.size());
assertEquals(fullLedgerList.get(0), filteredLedgerListDesc.get(0));
List<LogSegmentMetadata> filteredLedgerList = writeHandler1.getFilteredLedgerList(false, false);
assertEquals(1, filteredLedgerList.size());
assertEquals(fullLedgerList.get(0), filteredLedgerList.get(0));
}
@Test(timeout = 60000)
public void testGetFirstDLSNWithOpenLedger() throws Exception {
String dlName = runtime.getMethodName();
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.loadConf(conf);
confLocal.setImmediateFlushEnabled(true);
confLocal.setOutputBufferSize(0);
int numEntriesPerSegment = 100;
DistributedLogManager dlm1 = createNewDLM(confLocal, dlName);
long txid = 1;
ArrayList<Future<DLSN>> futures = new ArrayList<Future<DLSN>>(numEntriesPerSegment);
AsyncLogWriter out = dlm1.startAsyncLogSegmentNonPartitioned();
for (int eid = 0; eid < numEntriesPerSegment; ++eid) {
futures.add(out.write(DLMTestUtil.getLogRecordInstance(txid)));
++txid;
}
for (Future<DLSN> future : futures) {
Await.result(future);
}
BKLogReadHandler readHandler =
((BKDistributedLogManager) dlm1).createReadHandler();
DLSN last = dlm1.getLastDLSN();
assertEquals(new DLSN(1,99,0), last);
DLSN first = Await.result(dlm1.getFirstDLSNAsync());
assertEquals(new DLSN(1,0,0), first);
Utils.close(out);
}
@Test(timeout = 60000)
public void testGetFirstDLSNNoLogSegments() throws Exception {
String dlName = runtime.getMethodName();
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = dlm.createReadHandler();
Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
try {
Await.result(futureRecord);
fail("should have thrown exception");
} catch (LogNotFoundException ex) {
}
}
@Test(timeout = 60000)
public void testGetFirstDLSNWithLogSegments() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegments(dlName, 3, 3);
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = dlm.createReadHandler();
Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
try {
LogRecordWithDLSN record = Await.result(futureRecord);
assertEquals(new DLSN(1, 0, 0), record.getDlsn());
} catch (Exception ex) {
fail("should not have thrown exception: " + ex);
}
}
@Test(timeout = 60000)
public void testGetFirstDLSNAfterCleanTruncation() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 3, 10);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler =
((BKDistributedLogManager) dlm).createReadHandler();
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0));
Boolean success = Await.result(futureSuccess);
assertTrue(success);
Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
LogRecordWithDLSN record = Await.result(futureRecord);
assertEquals(new DLSN(2, 0, 0), record.getDlsn());
}
@Test(timeout = 60000)
public void testGetFirstDLSNAfterPartialTruncation() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 3, 10);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler =
((BKDistributedLogManager) dlm).createReadHandler();
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
// Only truncates at ledger boundary.
Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0));
Boolean success = Await.result(futureSuccess);
assertTrue(success);
Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
LogRecordWithDLSN record = Await.result(futureRecord);
assertEquals(new DLSN(2, 0, 0), record.getDlsn());
}
@Test(timeout = 60000)
public void testGetLogRecordCountEmptyLedger() throws Exception {
String dlName = runtime.getMethodName();
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN);
try {
Await.result(count);
fail("log is empty, should have returned log empty ex");
} catch (LogNotFoundException ex) {
}
}
@Test(timeout = 60000)
public void testGetLogRecordCountTotalCount() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN);
assertEquals(33, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountAtLedgerBoundary() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(2, 0, 0));
assertEquals(30, Await.result(count).longValue());
count = readHandler.asyncGetLogRecordCount(new DLSN(3, 0, 0));
assertEquals(27, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountPastEnd() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(12, 0, 0));
assertEquals(0, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountLastRecord() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(11, 2, 0));
assertEquals(1, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountInteriorRecords() throws Exception {
String dlName = runtime.getMethodName();
prepareLogSegmentsNonPartitioned(dlName, 5, 10);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(3, 5, 0));
assertEquals(25, Await.result(count).longValue());
count = readHandler.asyncGetLogRecordCount(new DLSN(2, 5, 0));
assertEquals(35, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountWithControlRecords() throws Exception {
DistributedLogManager dlm = createNewDLM(conf, runtime.getMethodName());
long txid = 1;
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 5, txid);
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 0, 10, txid);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
assertEquals(15, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountWithAllControlRecords() throws Exception {
DistributedLogManager dlm = createNewDLM(conf, runtime.getMethodName());
long txid = 1;
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 0, txid);
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 10, 0, txid);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
assertEquals(0, Await.result(count).longValue());
}
@Test(timeout = 60000)
public void testGetLogRecordCountWithSingleInProgressLedger() throws Exception {
String streamName = runtime.getMethodName();
BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
int txid = 1;
Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
BKLogReadHandler readHandler = bkdlm.createReadHandler();
List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false);
assertEquals(1, ledgerList.size());
assertTrue(ledgerList.get(0).isInProgress());
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
assertEquals(2, Await.result(count).longValue());
Utils.close(out);
}
@Test(timeout = 60000)
public void testGetLogRecordCountWithCompletedAndInprogressLedgers() throws Exception {
String streamName = runtime.getMethodName();
BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
long txid = 1;
txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, txid);
AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
BKLogReadHandler readHandler = bkdlm.createReadHandler();
List<LogSegmentMetadata> ledgerList = readHandler.getLedgerList(false, false, LogSegmentMetadata.COMPARATOR, false);
assertEquals(2, ledgerList.size());
assertFalse(ledgerList.get(0).isInProgress());
assertTrue(ledgerList.get(1).isInProgress());
Future<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
assertEquals(7, Await.result(count).longValue());
Utils.close(out);
}
@Test(timeout = 60000)
public void testLockStreamWithMissingLog() throws Exception {
String streamName = runtime.getMethodName();
BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
BKLogReadHandler readHandler = bkdlm.createReadHandler();
try {
Await.result(readHandler.lockStream());
fail("Should fail lock stream if log not found");
} catch (LogNotFoundException ex) {
}
BKLogReadHandler subscriberReadHandler = bkdlm.createReadHandler(Optional.of("test-subscriber"));
try {
Await.result(subscriberReadHandler.lockStream());
fail("Subscriber should fail lock stream if log not found");
} catch (LogNotFoundException ex) {
// expected
}
}
@Test(timeout = 60000)
public void testLockStreamDifferentSubscribers() throws Exception {
String streamName = runtime.getMethodName();
BKDistributedLogManager bkdlm = createNewDLM(conf, streamName);
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1);
BKLogReadHandler readHandler = bkdlm.createReadHandler();
Await.result(readHandler.lockStream());
// two subscribers could lock stream in parallel
BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName);
BKLogReadHandler s10Handler =
bkdlm10.createReadHandler(Optional.of("s1"));
Await.result(s10Handler.lockStream());
BKDistributedLogManager bkdlm20 = createNewDLM(conf, streamName);
BKLogReadHandler s20Handler =
bkdlm20.createReadHandler(Optional.of("s2"));
Await.result(s20Handler.lockStream());
readHandler.asyncClose();
bkdlm.close();
s10Handler.asyncClose();
bkdlm10.close();
s20Handler.asyncClose();
bkdlm20.close();
}
@Test(timeout = 60000)
public void testLockStreamSameSubscriber() throws Exception {
String streamName = runtime.getMethodName();
BKDistributedLogManager bkdlm = createNewDLM(conf, streamName);
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1);
BKLogReadHandler readHandler = bkdlm.createReadHandler();
Await.result(readHandler.lockStream());
// same subscrbiers couldn't lock stream in parallel
BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName);
BKLogReadHandler s10Handler =
bkdlm10.createReadHandler(Optional.of("s1"));
Await.result(s10Handler.lockStream());
BKDistributedLogManager bkdlm11 = createNewDLM(conf, streamName);
BKLogReadHandler s11Handler =
bkdlm11.createReadHandler(Optional.of("s1"));
try {
Await.result(s11Handler.lockStream(), Duration.apply(10000, TimeUnit.MILLISECONDS));
fail("Should fail lock stream using same subscriber id");
} catch (OwnershipAcquireFailedException oafe) {
// expected
} catch (TimeoutException te) {
// expected.
}
readHandler.asyncClose();
bkdlm.close();
s10Handler.asyncClose();
bkdlm10.close();
s11Handler.asyncClose();
bkdlm11.close();
}
}