| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.google.common.base.Optional; |
| import com.google.common.collect.Lists; |
| import com.twitter.distributedlog.annotations.DistributedLogAnnotations; |
| import com.twitter.distributedlog.config.ConcurrentBaseConfiguration; |
| import com.twitter.distributedlog.config.ConcurrentConstConfiguration; |
| import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; |
| import com.twitter.distributedlog.exceptions.BKTransmitException; |
| import com.twitter.distributedlog.exceptions.LockingException; |
| import com.twitter.distributedlog.io.CompressionCodec; |
| import com.twitter.distributedlog.util.Utils; |
| import com.twitter.util.Promise; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.BookKeeperAccessor; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.client.LedgerMetadata; |
| import org.apache.bookkeeper.feature.FixedValueFeature; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.junit.Ignore; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.base.Stopwatch; |
| import com.twitter.distributedlog.exceptions.DLIllegalStateException; |
| import com.twitter.distributedlog.exceptions.EndOfStreamException; |
| import com.twitter.distributedlog.exceptions.IdleReaderException; |
| import com.twitter.distributedlog.exceptions.LogRecordTooLongException; |
| import com.twitter.distributedlog.exceptions.OverCapacityException; |
| import com.twitter.distributedlog.exceptions.ReadCancelledException; |
| import com.twitter.distributedlog.exceptions.WriteException; |
| import com.twitter.distributedlog.lock.DistributedLock; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespace; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; |
| import com.twitter.distributedlog.util.FailpointUtils; |
| import com.twitter.distributedlog.util.FutureUtils; |
| import com.twitter.distributedlog.util.SimplePermitLimiter; |
| import com.twitter.util.Await; |
| import com.twitter.util.Duration; |
| import com.twitter.util.Future; |
| import com.twitter.util.FutureEventListener; |
| |
| import junit.framework.Assert; |
| import static com.google.common.base.Charsets.UTF_8; |
| import static com.twitter.distributedlog.DLMTestUtil.validateFutureFailed; |
| import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; |
| import static org.junit.Assert.*; |
| |
| public class TestAsyncReaderWriter extends TestDistributedLogBase { |
| static final Logger LOG = LoggerFactory.getLogger(TestAsyncReaderWriter.class); |
| |
| protected DistributedLogConfiguration testConf; |
| |
| public TestAsyncReaderWriter() { |
| this.testConf = new DistributedLogConfiguration(); |
| this.testConf.loadConf(conf); |
| this.testConf.setReaderIdleErrorThresholdMillis(1200000); |
| } |
| |
| @Rule |
| public TestName runtime = new TestName(); |
| |
| /** |
| * Test writing control records to writers: writers should be able to write control records, and |
| * the readers should skip control records while reading. |
| */ |
| @Test(timeout = 60000) |
| public void testWriteControlRecord() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(1024); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| // Write 3 log segments. For each log segments, write one control record and nine user records. |
| int txid = 1; |
| for (long i = 0; i < 3; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| DLSN dlsn = Await.result(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8)))); |
| assertEquals(currentLogSegmentSeqNo, dlsn.getLogSegmentSequenceNo()); |
| assertEquals(0, dlsn.getEntryId()); |
| assertEquals(0, dlsn.getSlotId()); |
| for (long j = 1; j < 10; j++) { |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Await.result(writer.write(record)); |
| } |
| writer.closeAndComplete(); |
| } |
| dlm.close(); |
| |
| // Read all the written data: It should skip control records and only return user records. |
| DistributedLogManager readDlm = createNewDLM(confLocal, name); |
| LogReader reader = readDlm.getInputStream(1); |
| |
| long numTrans = 0; |
| long expectedTxId = 2; |
| LogRecord record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLargeLogRecord(record); |
| numTrans++; |
| assertEquals(expectedTxId, record.getTransactionId()); |
| if (expectedTxId % 10 == 0) { |
| expectedTxId += 2; |
| } else { |
| ++expectedTxId; |
| } |
| record = reader.readNext(false); |
| } |
| assertEquals(3 * 9, numTrans); |
| assertEquals(3 * 9, readDlm.getLogRecordCount()); |
| readDlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAsyncWritePendingWritesAbortedWhenLedgerRollTriggerFails() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(1024); |
| confLocal.setMaxLogSegmentBytes(1024); |
| confLocal.setLogSegmentRollingIntervalMinutes(0); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| |
| // Write one record larger than max seg size. Ledger doesn't roll until next write. |
| int txid = 1; |
| LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048); |
| Future<DLSN> result = writer.write(record); |
| DLSN dlsn = Await.result(result, Duration.fromSeconds(10)); |
| assertEquals(1, dlsn.getLogSegmentSequenceNo()); |
| |
| record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1); |
| result = writer.write(record); |
| validateFutureFailed(result, LogRecordTooLongException.class); |
| |
| record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1); |
| result = writer.write(record); |
| validateFutureFailed(result, WriteException.class); |
| |
| record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1); |
| result = writer.write(record); |
| validateFutureFailed(result, WriteException.class); |
| |
| writer.closeAndComplete(); |
| dlm.close(); |
| } |
| |
| /** |
| * Test Case: Simple Async Writes. Writes 30 records. They should be written correctly. |
| * @throws Exception |
| */ |
| @Test(timeout = 60000) |
| public void testSimpleAsyncWrite() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(1024); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| final CountDownLatch syncLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); |
| final AtomicBoolean errorsFound = new AtomicBoolean(false); |
| final AtomicReference<DLSN> maxDLSN = new AtomicReference<DLSN>(DLSN.InvalidDLSN); |
| int txid = 1; |
| for (long i = 0; i < numLogSegments; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| for (long j = 0; j < numRecordsPerLogSegment; j++) { |
| final long currentEntryId = j; |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Future<DLSN> dlsnFuture = writer.write(record); |
| dlsnFuture.addEventListener(new FutureEventListener<DLSN>() { |
| @Override |
| public void onSuccess(DLSN value) { |
| if(value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) { |
| LOG.debug("LogSegmentSequenceNumber: {}, Expected {}", value.getLogSegmentSequenceNo(), currentLogSegmentSeqNo); |
| errorsFound.set(true); |
| } |
| |
| if(value.getEntryId() != currentEntryId) { |
| LOG.debug("EntryId: {}, Expected {}", value.getEntryId(), currentEntryId); |
| errorsFound.set(true); |
| } |
| |
| if (value.compareTo(maxDLSN.get()) > 0) { |
| maxDLSN.set(value); |
| } |
| |
| syncLatch.countDown(); |
| LOG.debug("SyncLatch: {}", syncLatch.getCount()); |
| } |
| @Override |
| public void onFailure(Throwable cause) { |
| LOG.error("Encountered exception on writing record {} in log segment {}", currentEntryId, currentLogSegmentSeqNo); |
| errorsFound.set(true); |
| } |
| }); |
| } |
| writer.closeAndComplete(); |
| } |
| |
| syncLatch.await(); |
| assertFalse("Should not encounter any errors for async writes", errorsFound.get()); |
| |
| LogRecordWithDLSN last = dlm.getLastLogRecord(); |
| assertEquals("Last DLSN" + last.getDlsn() + " isn't the maximum DLSN " + maxDLSN.get(), |
| last.getDlsn(), maxDLSN.get()); |
| assertEquals(last.getDlsn(), dlm.getLastDLSN()); |
| assertEquals(last.getDlsn(), Await.result(dlm.getLastDLSNAsync())); |
| DLMTestUtil.verifyLargeLogRecord(last); |
| |
| dlm.close(); |
| } |
| |
| /** |
| * Write records into <i>numLogSegments</i> log segments. Each log segment has <i>numRecordsPerLogSegment</i> records. |
| * |
| * @param dlm |
| * distributedlog manager |
| * @param numLogSegments |
| * number of log segments |
| * @param numRecordsPerLogSegment |
| * number records per log segment |
| * @param startTxId |
| * start tx id |
| * @return next tx id |
| */ |
| private static long writeRecords(DistributedLogManager dlm, |
| int numLogSegments, |
| int numRecordsPerLogSegment, |
| long startTxId, |
| boolean emptyRecord) throws IOException { |
| long txid = startTxId; |
| for (long i = 0; i < numLogSegments; i++) { |
| BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= numRecordsPerLogSegment; j++) { |
| if (emptyRecord) { |
| writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++)); |
| } else { |
| writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); |
| } |
| } |
| writer.closeAndComplete(); |
| } |
| return txid; |
| } |
| |
| /** |
| * Write <code>numRecords</code> records to the log, starting with <code>startTxId</code>. |
| * It flushes every <code>flushPerNumRecords</code> records. |
| * |
| * @param dlm |
| * distributedlog manager |
| * @param numRecords |
| * num records to write |
| * @param startTxId |
| * start tx id |
| * @param flushPerNumRecords |
| * number records to flush |
| * @return next tx id |
| * @throws IOException |
| */ |
| private static long writeLogSegment(DistributedLogManager dlm, |
| int numRecords, |
| long startTxId, |
| int flushPerNumRecords, |
| boolean emptyRecord) throws IOException { |
| long txid = startTxId; |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= numRecords; j++) { |
| if (emptyRecord) { |
| writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++)); |
| } else { |
| writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); |
| } |
| if (j % flushPerNumRecords == 0 ) { |
| writer.setReadyToFlush(); |
| writer.flushAndSync(); |
| } |
| } |
| writer.setReadyToFlush(); |
| writer.flushAndSync(); |
| writer.close(); |
| return txid; |
| } |
| |
| private static void readNext(final AsyncLogReader reader, |
| final DLSN startPosition, |
| final long startSequenceId, |
| final boolean monotonic, |
| final CountDownLatch syncLatch, |
| final CountDownLatch completionLatch, |
| final AtomicBoolean errorsFound) { |
| Future<LogRecordWithDLSN> record = reader.readNext(); |
| record.addEventListener(new FutureEventListener<LogRecordWithDLSN>() { |
| @Override |
| public void onSuccess(LogRecordWithDLSN value) { |
| try { |
| if (monotonic) { |
| assertEquals(startSequenceId, value.getSequenceId()); |
| } else { |
| assertTrue(value.getSequenceId() < 0); |
| assertTrue(value.getSequenceId() > startSequenceId); |
| } |
| LOG.debug("Recevied record {} from {}", value.getDlsn(), reader.getStreamName()); |
| assertTrue(!value.isControl()); |
| assertTrue(value.getDlsn().getSlotId() == 0); |
| assertTrue(value.getDlsn().compareTo(startPosition) >= 0); |
| DLMTestUtil.verifyLargeLogRecord(value); |
| } catch (Exception exc) { |
| LOG.debug("Exception Encountered when verifying log record {} : ", value.getDlsn(), exc); |
| errorsFound.set(true); |
| completionLatch.countDown(); |
| return; |
| } |
| syncLatch.countDown(); |
| if (syncLatch.getCount() <= 0) { |
| completionLatch.countDown(); |
| } else { |
| TestAsyncReaderWriter.readNext( |
| reader, |
| value.getDlsn().getNextDLSN(), |
| monotonic ? value.getSequenceId() + 1 : value.getSequenceId(), |
| monotonic, |
| syncLatch, |
| completionLatch, |
| errorsFound); |
| } |
| } |
| @Override |
| public void onFailure(Throwable cause) { |
| LOG.debug("Encountered Exception on reading {}", reader.getStreamName(), cause); |
| errorsFound.set(true); |
| completionLatch.countDown(); |
| } |
| }); |
| } |
| |
| void simpleAsyncReadTest(String name, DistributedLogConfiguration confLocal) throws Exception { |
| confLocal.setOutputBufferSize(1024); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| // Write 30 records: 3 log segments, 10 records per log segment |
| long txid = 1L; |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false); |
| // Write another log segment with 5 records and flush every 2 records |
| txid = writeLogSegment(dlm, 5, txid, 2, false); |
| |
| final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| final CountDownLatch syncLatch = new CountDownLatch((int) (txid - 1)); |
| final CountDownLatch completionLatch = new CountDownLatch(1); |
| final AtomicBoolean errorsFound = new AtomicBoolean(false); |
| |
| boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion()); |
| TestAsyncReaderWriter.readNext( |
| reader, |
| DLSN.InvalidDLSN, |
| monotonic ? 0L : Long.MIN_VALUE, |
| monotonic, |
| syncLatch, |
| completionLatch, |
| errorsFound); |
| |
| completionLatch.await(); |
| assertFalse("Errors encountered on reading records", errorsFound.get()); |
| syncLatch.await(); |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleAsyncRead() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| simpleAsyncReadTest(name, confLocal); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleAsyncReadWriteWithMonitoredFuturePool() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setTaskExecutionWarnTimeMicros(1000); |
| confLocal.setEnableTaskExecutionStats(true); |
| simpleAsyncReadTest(name, confLocal); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkAsyncRead() throws Exception { |
| String name = "distrlog-bulkasyncread"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadMaxRecords(10000); |
| confLocal.setReadAheadBatchSize(10); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 20; |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, 1L, false); |
| |
| final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| int expectedTxID = 1; |
| int numReads = 0; |
| while (expectedTxID <= numLogSegments * numRecordsPerLogSegment) { |
| if (expectedTxID == numLogSegments * numRecordsPerLogSegment) { |
| break; |
| } |
| List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20)); |
| LOG.info("Bulk read {} entries.", records.size()); |
| |
| assertTrue(records.size() >= 1); |
| for (LogRecordWithDLSN record : records) { |
| assertEquals(expectedTxID, record.getTransactionId()); |
| ++expectedTxID; |
| } |
| ++numReads; |
| } |
| |
| // we expect bulk read works |
| assertTrue(numReads < 60); |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkAsyncReadWithWriteBatch() throws Exception { |
| String name = "distrlog-bulkasyncread-with-writebatch"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setOutputBufferSize(1024000); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadMaxRecords(10000); |
| confLocal.setReadAheadBatchSize(10); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 20; |
| |
| writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, 1L, false); |
| |
| final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| int expectedTxID = 1; |
| for (long i = 0; i < 3; i++) { |
| // since we batched 20 entries into single bookkeeper entry |
| // we should be able to read 20 entries as a batch. |
| List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20)); |
| assertEquals(20, records.size()); |
| for (LogRecordWithDLSN record : records) { |
| assertEquals(expectedTxID, record.getTransactionId()); |
| ++expectedTxID; |
| } |
| } |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAsyncReadEmptyRecords() throws Exception { |
| String name = "distrlog-simpleasyncreadempty"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| long txid = 1L; |
| // write 3 log segments, 10 records per log segment |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, true); |
| // write another log segment with 5 records and flush every 2 records |
| txid = writeLogSegment(dlm, 5, txid, 2, true); |
| |
| AsyncLogReader asyncReader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| assertEquals("Expected stream name = " + name + " but " + asyncReader.getStreamName() + " found", |
| name, asyncReader.getStreamName()); |
| long numTrans = 0; |
| DLSN lastDLSN = DLSN.InvalidDLSN; |
| LogRecordWithDLSN record = Await.result(asyncReader.readNext()); |
| while (null != record) { |
| DLMTestUtil.verifyEmptyLogRecord(record); |
| assertEquals(0, record.getDlsn().getSlotId()); |
| assertTrue(record.getDlsn().compareTo(lastDLSN) > 0); |
| lastDLSN = record.getDlsn(); |
| numTrans++; |
| if (numTrans >= (txid - 1)) { |
| break; |
| } |
| record = Await.result(asyncReader.readNext()); |
| } |
| assertEquals((txid - 1), numTrans); |
| Utils.close(asyncReader); |
| dlm.close(); |
| } |
| |
| /** |
| * Test Async Read by positioning to a given position in the log |
| * @throws Exception |
| */ |
| @Test(timeout = 60000) |
| public void testSimpleAsyncReadPosition() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(1024); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| long txid = 1L; |
| // write 3 log segments, 10 records per log segment |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false); |
| // write another log segment with 5 records |
| txid = writeLogSegment(dlm, 5, txid, Integer.MAX_VALUE, false); |
| |
| final CountDownLatch syncLatch = new CountDownLatch((int)(txid - 14)); |
| final CountDownLatch doneLatch = new CountDownLatch(1); |
| final AtomicBoolean errorsFound = new AtomicBoolean(false); |
| final AsyncLogReader reader = dlm.getAsyncLogReader(new DLSN(2, 2, 4)); |
| assertEquals(name, reader.getStreamName()); |
| |
| boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion()); |
| TestAsyncReaderWriter.readNext( |
| reader, |
| new DLSN(2, 3, 0), |
| monotonic ? 13L : Long.MIN_VALUE, |
| monotonic, |
| syncLatch, |
| doneLatch, |
| errorsFound); |
| |
| doneLatch.await(); |
| assertFalse("Errors found on reading records", errorsFound.get()); |
| syncLatch.await(); |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| /** |
| * Test write/read entries when immediate flush is disabled. |
| * @throws Exception |
| */ |
| @Test(timeout = 60000) |
| public void testSimpleAsyncReadWrite() throws Exception { |
| testSimpleAsyncReadWriteInternal(runtime.getMethodName(), false); |
| } |
| |
| /** |
| * Test write/read entries when immediate flush is enabled. |
| * |
| * @throws Exception |
| */ |
| @Test(timeout = 60000) |
| public void testSimpleAsyncReadWriteImmediateFlush() throws Exception { |
| testSimpleAsyncReadWriteInternal(runtime.getMethodName(), true); |
| } |
| |
| /** |
| * Test if entries written using log segment metadata that doesn't support enveloping |
| * can be read correctly by a reader supporting both. |
| * |
| * NOTE: An older reader cannot read enveloped entry, so we don't have a test case covering |
| * the other scenario. |
| * |
| * @throws Exception |
| */ |
| @Test(timeout = 60000) |
| public void testNoEnvelopeWriterEnvelopeReader() throws Exception { |
| testSimpleAsyncReadWriteInternal(runtime.getMethodName(), true, |
| LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value - 1); |
| } |
| |
| static class WriteFutureEventListener implements FutureEventListener<DLSN> { |
| private final LogRecord record; |
| private final long currentLogSegmentSeqNo; |
| private final long currentEntryId; |
| private final CountDownLatch syncLatch; |
| private final AtomicBoolean errorsFound; |
| private final boolean verifyEntryId; |
| |
| WriteFutureEventListener(LogRecord record, |
| long currentLogSegmentSeqNo, |
| long currentEntryId, |
| CountDownLatch syncLatch, |
| AtomicBoolean errorsFound, |
| boolean verifyEntryId) { |
| this.record = record; |
| this.currentLogSegmentSeqNo = currentLogSegmentSeqNo; |
| this.currentEntryId = currentEntryId; |
| this.syncLatch = syncLatch; |
| this.errorsFound = errorsFound; |
| this.verifyEntryId = verifyEntryId; |
| } |
| |
| /** |
| * Invoked if the computation completes successfully |
| */ |
| @Override |
| public void onSuccess(DLSN value) { |
| if(value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) { |
| LOG.error("Ledger Seq No: {}, Expected: {}", value.getLogSegmentSequenceNo(), currentLogSegmentSeqNo); |
| errorsFound.set(true); |
| } |
| |
| if(verifyEntryId && value.getEntryId() != currentEntryId) { |
| LOG.error("EntryId: {}, Expected: {}", value.getEntryId(), currentEntryId); |
| errorsFound.set(true); |
| } |
| syncLatch.countDown(); |
| } |
| |
| /** |
| * Invoked if the computation completes unsuccessfully |
| */ |
| @Override |
| public void onFailure(Throwable cause) { |
| LOG.error("Encountered failures on writing record as (lid = {}, eid = {}) :", |
| new Object[]{currentLogSegmentSeqNo, currentEntryId, cause}); |
| errorsFound.set(true); |
| syncLatch.countDown(); |
| } |
| } |
| |
| void testSimpleAsyncReadWriteInternal(String name, boolean immediateFlush) |
| throws Exception { |
| testSimpleAsyncReadWriteInternal(name, immediateFlush, |
| LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION); |
| } |
| |
| void testSimpleAsyncReadWriteInternal(String name, boolean immediateFlush, |
| int logSegmentVersion) throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| confLocal.setOutputBufferSize(1024); |
| confLocal.setDLLedgerMetadataLayoutVersion(logSegmentVersion); |
| confLocal.setImmediateFlushEnabled(immediateFlush); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| final CountDownLatch readLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); |
| final CountDownLatch readDoneLatch = new CountDownLatch(1); |
| final AtomicBoolean readErrors = new AtomicBoolean(false); |
| final CountDownLatch writeLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); |
| final AtomicBoolean writeErrors = new AtomicBoolean(false); |
| final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| assertEquals(name, reader.getStreamName()); |
| |
| int txid = 1; |
| for (long i = 0; i < 3; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| for (long j = 0; j < 10; j++) { |
| final long currentEntryId = j; |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Future<DLSN> dlsnFuture = writer.write(record); |
| dlsnFuture.addEventListener(new WriteFutureEventListener( |
| record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); |
| if (i == 0 && j == 0) { |
| boolean monotonic = LogSegmentMetadata.supportsSequenceId(logSegmentVersion); |
| TestAsyncReaderWriter.readNext( |
| reader, |
| DLSN.InvalidDLSN, |
| monotonic ? 0L : Long.MIN_VALUE, |
| monotonic, |
| readLatch, |
| readDoneLatch, |
| readErrors); |
| } |
| } |
| writer.closeAndComplete(); |
| } |
| |
| writeLatch.await(); |
| assertFalse("All writes should succeed", writeErrors.get()); |
| |
| readDoneLatch.await(); |
| assertFalse("All reads should succeed", readErrors.get()); |
| readLatch.await(); |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| |
| /** |
| * Test Case: starting reading when the streams don't exist. |
| * |
| * @throws Exception |
| */ |
| @Test(timeout = 60000) |
| public void testSimpleAsyncReadWriteStartEmpty() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| confLocal.setOutputBufferSize(1024); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| final CountDownLatch readerReadyLatch = new CountDownLatch(1); |
| final CountDownLatch readerDoneLatch = new CountDownLatch(1); |
| final CountDownLatch readerSyncLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); |
| |
| final TestReader reader = new TestReader( |
| "test-reader", |
| dlm, |
| DLSN.InitialDLSN, |
| false, |
| 0, |
| readerReadyLatch, |
| readerSyncLatch, |
| readerDoneLatch); |
| |
| reader.start(); |
| |
| // Increase the probability of reader failure and retry |
| Thread.sleep(500); |
| |
| final AtomicBoolean writeErrors = new AtomicBoolean(false); |
| final CountDownLatch writeLatch = new CountDownLatch(30); |
| |
| int txid = 1; |
| for (long i = 0; i < 3; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| for (long j = 0; j < 10; j++) { |
| final long currentEntryId = j; |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Future<DLSN> dlsnFuture = writer.write(record); |
| dlsnFuture.addEventListener(new WriteFutureEventListener( |
| record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); |
| } |
| writer.closeAndComplete(); |
| } |
| |
| writeLatch.await(); |
| assertFalse("All writes should succeed", writeErrors.get()); |
| |
| readerDoneLatch.await(); |
| assertFalse("Should not encounter errors during reading", reader.areErrorsFound()); |
| readerSyncLatch.await(); |
| |
| assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1); |
| dlm.close(); |
| } |
| |
| |
| /** |
| * Test Case: starting reading when the streams don't exist. |
| * {@link https://issues.apache.org/jira/browse/DL-42} |
| */ |
| @DistributedLogAnnotations.FlakyTest |
| @Ignore |
| @Test(timeout = 120000) |
| public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception { |
| int count = 50; |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| confLocal.setOutputBufferSize(1024); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 1; |
| |
| URI uri = createDLMURI("/" + name); |
| ensureURICreated(uri); |
| DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(confLocal).uri(uri).build(); |
| final DistributedLogManager[] dlms = new DistributedLogManager[count]; |
| final TestReader[] readers = new TestReader[count]; |
| final CountDownLatch readyLatch = new CountDownLatch(count); |
| final CountDownLatch[] syncLatches = new CountDownLatch[count]; |
| final CountDownLatch[] readerDoneLatches = new CountDownLatch[count]; |
| for (int s = 0; s < count; s++) { |
| dlms[s] = namespace.openLog(name + String.format("%d", s)); |
| readerDoneLatches[s] = new CountDownLatch(1); |
| syncLatches[s] = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); |
| readers[s] = new TestReader("reader-" + s, |
| dlms[s], DLSN.InitialDLSN, false, 0, readyLatch, syncLatches[s], readerDoneLatches[s]); |
| readers[s].start(); |
| } |
| |
| // wait all readers were positioned at least once |
| readyLatch.await(); |
| |
| final CountDownLatch writeLatch = new CountDownLatch(3 * count); |
| final AtomicBoolean writeErrors = new AtomicBoolean(false); |
| |
| int txid = 1; |
| for (long i = 0; i < 3; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter[] writers = new BKAsyncLogWriter[count]; |
| for (int s = 0; s < count; s++) { |
| writers[s] = (BKAsyncLogWriter)(dlms[s].startAsyncLogSegmentNonPartitioned()); |
| } |
| for (long j = 0; j < 1; j++) { |
| final long currentEntryId = j; |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| for (int s = 0; s < count; s++) { |
| Future<DLSN> dlsnFuture = writers[s].write(record); |
| dlsnFuture.addEventListener(new WriteFutureEventListener( |
| record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); |
| } |
| } |
| for (int s = 0; s < count; s++) { |
| writers[s].closeAndComplete(); |
| } |
| } |
| |
| writeLatch.await(); |
| assertFalse("All writes should succeed", writeErrors.get()); |
| |
| for (int s = 0; s < count; s++) { |
| readerDoneLatches[s].await(); |
| assertFalse("Reader " + s + " should not encounter errors", readers[s].areErrorsFound()); |
| syncLatches[s].await(); |
| assertEquals(numLogSegments * numRecordsPerLogSegment, readers[s].getNumReads().get()); |
| assertTrue("Reader " + s + " should position at least once", readers[s].getNumReaderPositions().get() > 0); |
| } |
| |
| for (int s = 0; s < count; s++) { |
| readers[s].stop(); |
| dlms[s].close(); |
| } |
| } |
| |
| /** |
| * Flaky test fixed: readers need to be added to the pendingReaders |
| * @throws Exception |
| */ |
| @Test(timeout = 300000) |
| public void testSimpleAsyncReadWriteSimulateErrors() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(10); |
| confLocal.setOutputBufferSize(1024); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 20; |
| int numRecordsPerLogSegment = 10; |
| |
| final CountDownLatch doneLatch = new CountDownLatch(1); |
| final CountDownLatch syncLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment); |
| |
| TestReader reader = new TestReader( |
| "test-reader", |
| dlm, |
| DLSN.InitialDLSN, |
| true, |
| 0, |
| new CountDownLatch(1), |
| syncLatch, |
| doneLatch); |
| |
| reader.start(); |
| |
| final CountDownLatch writeLatch = new CountDownLatch(200); |
| final AtomicBoolean writeErrors = new AtomicBoolean(false); |
| |
| int txid = 1; |
| for (long i = 0; i < numLogSegments; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| for (long j = 0; j < numRecordsPerLogSegment; j++) { |
| final long currentEntryId = j; |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Future<DLSN> dlsnFuture = writer.write(record); |
| dlsnFuture.addEventListener(new WriteFutureEventListener( |
| record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true)); |
| } |
| writer.closeAndComplete(); |
| } |
| |
| writeLatch.await(); |
| assertFalse("All writes should succeed", writeErrors.get()); |
| |
| doneLatch.await(); |
| assertFalse("Should not encounter errors during reading", reader.areErrorsFound()); |
| syncLatch.await(); |
| |
| assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleAsyncReadWritePiggyBack() throws Exception { |
| String name = runtime.getMethodName(); |
| |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setEnableReadAhead(true); |
| confLocal.setReadAheadWaitTime(500); |
| confLocal.setReadAheadBatchSize(10); |
| confLocal.setReadAheadMaxRecords(100); |
| confLocal.setOutputBufferSize(1024); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(100); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| final CountDownLatch readLatch = new CountDownLatch(30); |
| final CountDownLatch readDoneLatch = new CountDownLatch(1); |
| final AtomicBoolean readErrors = new AtomicBoolean(false); |
| final CountDownLatch writeLatch = new CountDownLatch(30); |
| final AtomicBoolean writeErrors = new AtomicBoolean(false); |
| |
| int txid = 1; |
| for (long i = 0; i < numLogSegments; i++) { |
| final long currentLogSegmentSeqNo = i + 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| for (long j = 0; j < numRecordsPerLogSegment; j++) { |
| Thread.sleep(50); |
| final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Future<DLSN> dlsnFuture = writer.write(record); |
| dlsnFuture.addEventListener(new WriteFutureEventListener( |
| record, currentLogSegmentSeqNo, j, writeLatch, writeErrors, false)); |
| if (i == 0 && j == 0) { |
| boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion()); |
| TestAsyncReaderWriter.readNext( |
| reader, |
| DLSN.InvalidDLSN, |
| monotonic ? 0L : Long.MIN_VALUE, |
| monotonic, |
| readLatch, |
| readDoneLatch, |
| readErrors); |
| } |
| } |
| writer.closeAndComplete(); |
| } |
| |
| writeLatch.await(); |
| assertFalse("All writes should succeed", writeErrors.get()); |
| |
| readDoneLatch.await(); |
| assertFalse("All reads should succeed", readErrors.get()); |
| readLatch.await(); |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCancelReadRequestOnReaderClosed() throws Exception { |
| final String name = "distrlog-cancel-read-requests-on-reader-closed"; |
| |
| DistributedLogManager dlm = createNewDLM(testConf, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| writer.write(DLMTestUtil.getLogRecordInstance(1L)); |
| writer.closeAndComplete(); |
| |
| final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertEquals(1L, record.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(record); |
| |
| final CountDownLatch readLatch = new CountDownLatch(1); |
| final AtomicBoolean receiveExpectedException = new AtomicBoolean(false); |
| Thread readThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Await.result(reader.readNext()); |
| } catch (ReadCancelledException rce) { |
| receiveExpectedException.set(true); |
| } catch (Throwable t) { |
| LOG.error("Receive unexpected exception on reading stream {} : ", name, t); |
| } |
| readLatch.countDown(); |
| } |
| }, "read-thread"); |
| readThread.start(); |
| |
| Thread.sleep(1000); |
| |
| // close reader should cancel the pending read next |
| Utils.close(reader); |
| |
| readLatch.await(); |
| readThread.join(); |
| |
| assertTrue("Read request should be cancelled.", receiveExpectedException.get()); |
| |
| // closed reader should reject any readNext |
| try { |
| Await.result(reader.readNext()); |
| fail("Reader should reject readNext if it is closed."); |
| } catch (ReadCancelledException rce) { |
| // expected |
| } |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAsyncWriteWithMinDelayBetweenFlushes() throws Exception { |
| String name = "distrlog-asyncwrite-mindelay"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setMinDelayBetweenImmediateFlushMs(100); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| final Thread currentThread = Thread.currentThread(); |
| final int COUNT = 5000; |
| final CountDownLatch syncLatch = new CountDownLatch(COUNT); |
| int txid = 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| Stopwatch executionTime = Stopwatch.createStarted(); |
| for (long i = 0; i < COUNT; i++) { |
| Thread.sleep(1); |
| final LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); |
| Future<DLSN> dlsnFuture = writer.write(record); |
| dlsnFuture.addEventListener(new FutureEventListener<DLSN>() { |
| @Override |
| public void onSuccess(DLSN value) { |
| syncLatch.countDown(); |
| LOG.debug("SyncLatch: {} ; DLSN: {} ", syncLatch.getCount(), value); |
| } |
| @Override |
| public void onFailure(Throwable cause) { |
| currentThread.interrupt(); |
| } |
| }); |
| } |
| |
| boolean success = false; |
| if (!(Thread.interrupted())) { |
| try { |
| success = syncLatch.await(10, TimeUnit.SECONDS); |
| } catch (InterruptedException exc) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| // Abort, not graceful close, since the latter will |
| // flush as well, and may add an entry. |
| writer.abort(); |
| |
| executionTime.stop(); |
| assert(!(Thread.interrupted())); |
| assert(success); |
| |
| LogRecordWithDLSN last = dlm.getLastLogRecord(); |
| LOG.info("Last Entry {}; elapsed time {}", last.getDlsn().getEntryId(), executionTime.elapsed(TimeUnit.MILLISECONDS)); |
| |
| // Regardless of how many records we wrote; the number of BK entries should always be bounded by the min delay. |
| // Since there are two flush processes--data flush and control flush, and since control flush may also end up flushing |
| // data if data is available, the upper bound is 2*(time/min_delay + 1) |
| assertTrue(last.getDlsn().getEntryId() <= ((executionTime.elapsed(TimeUnit.MILLISECONDS) / confLocal.getMinDelayBetweenImmediateFlushMs() + 1))*2); |
| DLMTestUtil.verifyLogRecord(last); |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testAsyncWriteWithMinDelayBetweenFlushesFlushFailure() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setMinDelayBetweenImmediateFlushMs(1); |
| |
| URI uri = createDLMURI("/" + name); |
| ensureURICreated(uri); |
| |
| DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(confLocal).uri(uri).clientId("gabbagoo").build(); |
| DistributedLogManager dlm = namespace.openLog(name); |
| DistributedLogNamespace namespace1 = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(confLocal).uri(uri).clientId("tortellini").build(); |
| DistributedLogManager dlm1 = namespace1.openLog(name); |
| |
| int txid = 1; |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| |
| // First write succeeds since lock isnt checked until transmit, which is scheduled |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); |
| writer.flushAndCommit(); |
| |
| BKLogSegmentWriter perStreamWriter = writer.getCachedLogWriter(); |
| DistributedLock lock = perStreamWriter.getLock(); |
| FutureUtils.result(lock.asyncClose()); |
| |
| // Get second writer, steal lock |
| BKAsyncLogWriter writer2 = (BKAsyncLogWriter)(dlm1.startAsyncLogSegmentNonPartitioned()); |
| |
| try { |
| // Succeeds, kicks off scheduked flush |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| |
| // Succeeds, kicks off scheduled flush |
| Thread.sleep(100); |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); |
| fail("should have thrown"); |
| } catch (LockingException ex) { |
| LOG.debug("caught exception ", ex); |
| } |
| |
| writer.close(); |
| dlm.close(); |
| } |
| |
| public void writeRecordsWithOutstandingWriteLimit(int stream, int global, boolean shouldFail) throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setPerWriterOutstandingWriteLimit(stream); |
| confLocal.setOutstandingWriteLimitDarkmode(false); |
| DistributedLogManager dlm; |
| if (global > -1) { |
| dlm = createNewDLM(confLocal, runtime.getMethodName(), |
| new SimplePermitLimiter(false, global, new NullStatsLogger(), true, new FixedValueFeature("", 0))); |
| } else { |
| dlm = createNewDLM(confLocal, runtime.getMethodName()); |
| } |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000); |
| for (int i = 0; i < 1000; i++) { |
| results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| } |
| for (Future<DLSN> result : results) { |
| try { |
| Await.result(result); |
| if (shouldFail) { |
| fail("should fail due to no outstanding writes permitted"); |
| } |
| } catch (OverCapacityException ex) { |
| assertTrue(shouldFail); |
| } |
| } |
| writer.closeAndComplete(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testOutstandingWriteLimitNoLimit() throws Exception { |
| writeRecordsWithOutstandingWriteLimit(-1, -1, false); |
| } |
| |
| @Test(timeout = 60000) |
| public void testOutstandingWriteLimitVeryHighLimit() throws Exception { |
| writeRecordsWithOutstandingWriteLimit(Integer.MAX_VALUE, Integer.MAX_VALUE, false); |
| } |
| |
| @Test(timeout = 60000) |
| public void testOutstandingWriteLimitBlockAllStreamLimit() throws Exception { |
| writeRecordsWithOutstandingWriteLimit(0, Integer.MAX_VALUE, true); |
| } |
| |
| @Test(timeout = 60000) |
| public void testOutstandingWriteLimitBlockAllGlobalLimit() throws Exception { |
| writeRecordsWithOutstandingWriteLimit(Integer.MAX_VALUE, 0, true); |
| } |
| |
| @Test(timeout = 60000) |
| public void testOutstandingWriteLimitBlockAllLimitWithDarkmode() throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setPerWriterOutstandingWriteLimit(0); |
| confLocal.setOutstandingWriteLimitDarkmode(true); |
| DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName()); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000); |
| for (int i = 0; i < 1000; i++) { |
| results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| } |
| for (Future<DLSN> result : results) { |
| Await.result(result); |
| } |
| writer.closeAndComplete(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCloseAndCompleteLogSegmentWhenStreamIsInError() throws Exception { |
| String name = "distrlog-close-and-complete-logsegment-when-stream-is-in-error"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| |
| BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| |
| long txId = 1L; |
| for (int i = 0; i < 5; i++) { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); |
| } |
| |
| BKLogSegmentWriter logWriter = writer.getCachedLogWriter(); |
| |
| // fence the ledger |
| dlm.getWriterBKC().get().openLedger(logWriter.getLogSegmentId(), |
| BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); |
| |
| try { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); |
| fail("Should fail write to a fenced ledger with BKTransmitException"); |
| } catch (BKTransmitException bkte) { |
| // expected |
| } |
| |
| try { |
| writer.closeAndComplete(); |
| fail("Should fail to complete a log segment when its ledger is fenced"); |
| } catch (BKTransmitException bkte) { |
| // expected |
| } |
| |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| assertEquals(1, segments.size()); |
| assertTrue(segments.get(0).isInProgress()); |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCloseAndCompleteLogSegmentWhenCloseFailed() throws Exception { |
| String name = "distrlog-close-and-complete-logsegment-when-close-failed"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| |
| BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| |
| long txId = 1L; |
| for (int i = 0; i < 5; i++) { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++))); |
| } |
| |
| BKLogSegmentWriter logWriter = writer.getCachedLogWriter(); |
| |
| // fence the ledger |
| dlm.getWriterBKC().get().openLedger(logWriter.getLogSegmentId(), |
| BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); |
| |
| try { |
| // insert a write to detect the fencing state, to make test more robust. |
| writer.write(DLMTestUtil.getLogRecordInstance(txId++)); |
| writer.closeAndComplete(); |
| fail("Should fail to complete a log segment when its ledger is fenced"); |
| } catch (IOException ioe) { |
| // expected |
| LOG.error("Failed to close and complete log segment {} : ", logWriter.getFullyQualifiedLogSegment(), ioe); |
| } |
| |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| assertEquals(1, segments.size()); |
| assertTrue(segments.get(0).isInProgress()); |
| |
| dlm.close(); |
| } |
| |
| private void testAsyncReadIdleErrorInternal(String name, |
| final int idleReaderErrorThreshold, |
| final boolean heartBeatUsingControlRecs, |
| final boolean simulateReaderStall) throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadBatchSize(1); |
| confLocal.setReadAheadMaxRecords(1); |
| confLocal.setReaderIdleWarnThresholdMillis(50); |
| confLocal.setReaderIdleErrorThresholdMillis(idleReaderErrorThreshold); |
| final DistributedLogManager dlm = createNewDLM(confLocal, name); |
| final Thread currentThread = Thread.currentThread(); |
| final int segmentSize = 3; |
| final int numSegments = 3; |
| final CountDownLatch latch = new CountDownLatch(1); |
| final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); |
| executor.schedule( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| int txid = 1; |
| for (long i = 0; i < numSegments; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= segmentSize; j++) { |
| writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); |
| if ((i == 0) && (j == 1)) { |
| latch.countDown(); |
| } |
| } |
| |
| if (heartBeatUsingControlRecs) { |
| // There should be a control record such that |
| // wait time + commit time (BK) < Idle Reader Threshold |
| int threadSleepTime = idleReaderErrorThreshold |
| - 200 // BK commitTime |
| - 100; //safety margin |
| |
| for (int iter = 1; iter <= (2 * idleReaderErrorThreshold / threadSleepTime) ; iter++) { |
| Thread.sleep(threadSleepTime); |
| writer.write(DLMTestUtil.getLargeLogRecordInstance(txid, true)); |
| writer.setReadyToFlush(); |
| } |
| Thread.sleep(threadSleepTime); |
| } |
| |
| writer.closeAndComplete(); |
| if (!heartBeatUsingControlRecs) { |
| Thread.sleep(2 * idleReaderErrorThreshold); |
| } |
| } |
| } catch (Exception exc) { |
| if (!executor.isShutdown()) { |
| currentThread.interrupt(); |
| } |
| } |
| } |
| }, 0, TimeUnit.MILLISECONDS); |
| |
| latch.await(); |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| if (simulateReaderStall) { |
| reader.disableProcessingReadRequests(); |
| } |
| boolean exceptionEncountered = false; |
| int recordCount = 0; |
| try { |
| while (true) { |
| Future<LogRecordWithDLSN> record = reader.readNext(); |
| Await.result(record); |
| recordCount++; |
| |
| if (recordCount >= segmentSize * numSegments) { |
| break; |
| } |
| } |
| } catch (IdleReaderException exc) { |
| exceptionEncountered = true; |
| } |
| |
| if (simulateReaderStall) { |
| assertTrue(exceptionEncountered); |
| } else if (heartBeatUsingControlRecs) { |
| assertFalse(exceptionEncountered); |
| Assert.assertEquals(segmentSize * numSegments, recordCount); |
| } else { |
| assertTrue(exceptionEncountered); |
| Assert.assertEquals(segmentSize, recordCount); |
| } |
| assertFalse(currentThread.isInterrupted()); |
| executor.shutdown(); |
| } |
| |
| @Test(timeout = 10000) |
| public void testAsyncReadIdleControlRecord() throws Exception { |
| String name = "distrlog-async-reader-idle-error-control"; |
| testAsyncReadIdleErrorInternal(name, 500, true, false); |
| } |
| |
| @Test(timeout = 10000) |
| public void testAsyncReadIdleError() throws Exception { |
| String name = "distrlog-async-reader-idle-error"; |
| testAsyncReadIdleErrorInternal(name, 1000, false, false); |
| } |
| |
| @Test(timeout = 10000) |
| public void testAsyncReadIdleError2() throws Exception { |
| String name = "distrlog-async-reader-idle-error-2"; |
| testAsyncReadIdleErrorInternal(name, 1000, true, true); |
| } |
| |
| @Test(timeout = 60000) |
| public void testReleaseLockAfterFailedToRecover() throws Exception { |
| String name = "release-lock-after-failed-to-recover"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setLockTimeout(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setOutputBufferSize(0); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = |
| (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned()); |
| |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| writer.abort(); |
| |
| for (int i = 0; i < 2; i++) { |
| FailpointUtils.setFailpoint( |
| FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments, |
| FailpointUtils.FailPointActions.FailPointAction_Throw); |
| |
| try { |
| dlm.startAsyncLogSegmentNonPartitioned(); |
| fail("Should fail during recovering incomplete log segments"); |
| } catch (IOException ioe) { |
| // expected; |
| } finally { |
| FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments); |
| } |
| } |
| |
| writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned()); |
| |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| assertEquals(1, segments.size()); |
| assertFalse(segments.get(0).isInProgress()); |
| |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @DistributedLogAnnotations.FlakyTest |
| @Ignore |
| @Test(timeout = 60000) |
| public void testAsyncReadMissingZKNotification() throws Exception { |
| String name = "distrlog-async-reader-missing-zk-notification"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadBatchSize(1); |
| confLocal.setReadAheadMaxRecords(1); |
| confLocal.setReaderIdleWarnThresholdMillis(100); |
| confLocal.setReaderIdleErrorThresholdMillis(20000); |
| final DistributedLogManager dlm = createNewDLM(confLocal, name); |
| final Thread currentThread = Thread.currentThread(); |
| final int segmentSize = 10; |
| final int numSegments = 3; |
| final CountDownLatch latch = new CountDownLatch(1); |
| final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); |
| executor.schedule( |
| new Runnable() { |
| @Override |
| public void run() { |
| try { |
| int txid = 1; |
| for (long i = 0; i < numSegments; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= segmentSize; j++) { |
| writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); |
| if ((i == 0) && (j == 1)) { |
| latch.countDown(); |
| } |
| } |
| writer.closeAndComplete(); |
| Thread.sleep(100); |
| } |
| } catch (Exception exc) { |
| if (!executor.isShutdown()) { |
| currentThread.interrupt(); |
| } |
| } |
| } |
| }, 0, TimeUnit.MILLISECONDS); |
| |
| latch.await(); |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN)dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| reader.disableReadAheadZKNotification(); |
| boolean exceptionEncountered = false; |
| int recordCount = 0; |
| try { |
| while (true) { |
| Future<LogRecordWithDLSN> record = reader.readNext(); |
| Await.result(record); |
| recordCount++; |
| |
| if (recordCount >= segmentSize * numSegments) { |
| break; |
| } |
| } |
| } catch (IdleReaderException exc) { |
| exceptionEncountered = true; |
| } |
| assert(!exceptionEncountered); |
| Assert.assertEquals(recordCount, segmentSize * numSegments); |
| assert(!currentThread.isInterrupted()); |
| executor.shutdown(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testGetLastTxId() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| |
| int numRecords = 10; |
| for (int i = 0; i < numRecords; i++) { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); |
| assertEquals("last tx id should become " + i, |
| i, writer.getLastTxId()); |
| } |
| // open a writer to recover the inprogress log segment |
| AsyncLogWriter recoverWriter = dlm.startAsyncLogSegmentNonPartitioned(); |
| assertEquals("recovered last tx id should be " + (numRecords - 1), |
| numRecords - 1, recoverWriter.getLastTxId()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMaxReadAheadRecords() throws Exception { |
| int maxRecords = 1; |
| int batchSize = 8; |
| int maxAllowedCachedRecords = maxRecords + batchSize - 1; |
| |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE); |
| confLocal.setReadAheadMaxRecords(maxRecords); |
| confLocal.setReadAheadBatchSize(batchSize); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| |
| int numRecords = 40; |
| for (int i = 1; i <= numRecords; i++) { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); |
| assertEquals("last tx id should become " + i, |
| i, writer.getLastTxId()); |
| } |
| LogRecord record = DLMTestUtil.getLogRecordInstance(numRecords); |
| record.setControl(); |
| Await.result(writer.write(record)); |
| |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| record = Await.result(reader.readNext()); |
| LOG.info("Read record {}", record); |
| assertEquals(1L, record.getTransactionId()); |
| |
| assertNotNull(reader.bkLedgerManager.readAheadWorker); |
| assertTrue(reader.bkLedgerManager.readAheadCache.getNumCachedRecords() <= maxAllowedCachedRecords); |
| |
| for (int i = 2; i <= numRecords; i++) { |
| record = Await.result(reader.readNext()); |
| LOG.info("Read record {}", record); |
| assertEquals((long) i, record.getTransactionId()); |
| TimeUnit.MILLISECONDS.sleep(20); |
| int numCachedRecords = reader.bkLedgerManager.readAheadCache.getNumCachedRecords(); |
| assertTrue("Should cache less than " + batchSize + " records but already found " |
| + numCachedRecords + " records when reading " + i + "th record", |
| numCachedRecords <= maxAllowedCachedRecords); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStream() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| |
| final int NUM_RECORDS = 10; |
| int i = 1; |
| for (; i <= NUM_RECORDS; i++) { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); |
| assertEquals("last tx id should become " + i, |
| i, writer.getLastTxId()); |
| } |
| |
| Await.result(writer.markEndOfStream()); |
| |
| // Multiple end of streams are ok. |
| Await.result(writer.markEndOfStream()); |
| |
| try { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i))); |
| fail("Should have thrown"); |
| } catch (EndOfStreamException ex) { |
| } |
| |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| LogRecord record = null; |
| for (int j = 0; j < NUM_RECORDS; j++) { |
| record = Await.result(reader.readNext()); |
| assertEquals(j+1, record.getTransactionId()); |
| } |
| |
| try { |
| record = Await.result(reader.readNext()); |
| fail("Should have thrown"); |
| } catch (EndOfStreamException ex) { |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStreamAtBeginningOfSegment() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| Await.result(writer.markEndOfStream()); |
| try { |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1))); |
| fail("Should have thrown"); |
| } catch (EndOfStreamException ex) { |
| } |
| |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| try { |
| LogRecord record = Await.result(reader.readNext()); |
| fail("Should have thrown"); |
| } catch (EndOfStreamException ex) { |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkReadWaitingMoreRecords() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L); |
| controlRecord.setControl(); |
| FutureUtils.result(writer.write(controlRecord)); |
| |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| Future<LogRecordWithDLSN> readFuture = reader.readNext(); |
| |
| // write another records |
| for (int i = 0; i < 5; i++) { |
| long txid = 2L + i; |
| FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid))); |
| controlRecord = DLMTestUtil.getLogRecordInstance(txid); |
| controlRecord.setControl(); |
| FutureUtils.result(writer.write(controlRecord)); |
| } |
| |
| List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture); |
| assertEquals(2, bulkReadRecords.size()); |
| assertEquals(1L, bulkReadRecords.get(0).getTransactionId()); |
| assertEquals(2L, bulkReadRecords.get(1).getTransactionId()); |
| for (LogRecordWithDLSN record : bulkReadRecords) { |
| DLMTestUtil.verifyLogRecord(record); |
| } |
| LogRecordWithDLSN record = FutureUtils.result(readFuture); |
| assertEquals(3L, record.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(record); |
| |
| Utils.close(reader); |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testBulkReadNotWaitingMoreRecords() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L); |
| controlRecord.setControl(); |
| FutureUtils.result(writer.write(controlRecord)); |
| |
| BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS); |
| Future<LogRecordWithDLSN> readFuture = reader.readNext(); |
| |
| List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture); |
| assertEquals(1, bulkReadRecords.size()); |
| assertEquals(1L, bulkReadRecords.get(0).getTransactionId()); |
| for (LogRecordWithDLSN record : bulkReadRecords) { |
| DLMTestUtil.verifyLogRecord(record); |
| } |
| |
| // write another records |
| for (int i = 0; i < 5; i++) { |
| long txid = 2L + i; |
| FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid))); |
| controlRecord = DLMTestUtil.getLogRecordInstance(txid); |
| controlRecord.setControl(); |
| FutureUtils.result(writer.write(controlRecord)); |
| } |
| |
| LogRecordWithDLSN record = FutureUtils.result(readFuture); |
| assertEquals(2L, record.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(record); |
| |
| Utils.close(reader); |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testReadBrokenEntries() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| |
| confLocal.setOutputBufferSize(0); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(1); |
| confLocal.setPositionGapDetectionEnabled(false); |
| confLocal.setReadAheadSkipBrokenEntries(true); |
| confLocal.setEIInjectReadAheadBrokenEntries(true); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 10; |
| |
| long txid = 1L; |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false); |
| |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| |
| // 3 segments, 10 records each, immediate flush, batch size 1, so just the first |
| // record in each ledger is discarded, for 30 - 3 = 27 records. |
| for (int i = 0; i < 27; i++) { |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertFalse(record.getDlsn().getEntryId() % 10 == 0); |
| } |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testReadBrokenEntriesWithGapDetection() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| |
| confLocal.setOutputBufferSize(0); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(1); |
| confLocal.setPositionGapDetectionEnabled(true); |
| confLocal.setReadAheadSkipBrokenEntries(true); |
| confLocal.setEIInjectReadAheadBrokenEntries(true); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 1; |
| int numRecordsPerLogSegment = 100; |
| |
| long txid = 1L; |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false); |
| |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| |
| try { |
| // 3 segments, 10 records each, immediate flush, batch size 1, so just the first |
| // record in each ledger is discarded, for 30 - 3 = 27 records. |
| for (int i = 0; i < 30; i++) { |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertFalse(record.getDlsn().getEntryId() % 10 == 0); |
| } |
| fail("should have thrown"); |
| } catch (DLIllegalStateException e) { |
| } |
| |
| reader.asyncClose(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testReadBrokenEntriesAndLargeBatchSize() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| |
| confLocal.setOutputBufferSize(0); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(5); |
| confLocal.setPositionGapDetectionEnabled(false); |
| confLocal.setReadAheadSkipBrokenEntries(true); |
| confLocal.setEIInjectReadAheadBrokenEntries(true); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 1; |
| int numRecordsPerLogSegment = 100; |
| |
| long txid = 1L; |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false); |
| |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| |
| // Every 10th record broken. Reading 5 at once, beginning from 0: |
| // 1. range 0-4 will be corrupted and discarded |
| // 2. ranges 1-5, 2-6, 3-7, 4-8, 5-9 will be ok |
| // 3. ranges 6-10, 7-11, 8-12, 9-13 will be bad |
| // And so on, so 5 records in each 10 will be discarded, for 50 good records. |
| for (int i = 0; i < 50; i++) { |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertFalse(record.getDlsn().getEntryId() % 10 == 0); |
| } |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testReadBrokenEntriesAndLargeBatchSizeCrossSegment() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(testConf); |
| |
| confLocal.setOutputBufferSize(0); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setReadAheadWaitTime(10); |
| confLocal.setReadAheadBatchSize(8); |
| confLocal.setPositionGapDetectionEnabled(false); |
| confLocal.setReadAheadSkipBrokenEntries(true); |
| confLocal.setEIInjectReadAheadBrokenEntries(true); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| |
| int numLogSegments = 3; |
| int numRecordsPerLogSegment = 5; |
| |
| long txid = 1L; |
| txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false); |
| |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| |
| // Every 10th record broken. Reading 8 at once, beginning from 0: |
| // 1. range 0-7 will be corrupted and discarded |
| // 2. range 1-8 will be good, but only contain 4 records |
| // And so on for the next segment, so 4 records in each segment, for 12 good records |
| for (int i = 0; i < 12; i++) { |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertFalse(record.getDlsn().getEntryId() % 10 == 0); |
| } |
| |
| Utils.close(reader); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCreateLogStreamWithDifferentReplicationFactor() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| ConcurrentBaseConfiguration baseConf = new ConcurrentConstConfiguration(confLocal); |
| DynamicDistributedLogConfiguration dynConf = new DynamicDistributedLogConfiguration(baseConf); |
| dynConf.setProperty(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE, |
| DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT - 1); |
| |
| URI uri = createDLMURI("/" + name); |
| ensureURICreated(uri); |
| DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(confLocal).uri(uri).build(); |
| |
| // use the pool |
| DistributedLogManager dlm = namespace.openLog(name + "-pool"); |
| AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| assertEquals(1, segments.size()); |
| long ledgerId = segments.get(0).getLedgerId(); |
| LedgerHandle lh = ((BKDistributedLogNamespace) namespace).getReaderBKC() |
| .get().openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); |
| LedgerMetadata metadata = BookKeeperAccessor.getLedgerMetadata(lh); |
| assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT, metadata.getEnsembleSize()); |
| lh.close(); |
| Utils.close(writer); |
| dlm.close(); |
| |
| // use customized configuration |
| dlm = namespace.openLog( |
| name + "-custom", |
| Optional.<DistributedLogConfiguration>absent(), |
| Optional.of(dynConf)); |
| writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L))); |
| segments = dlm.getLogSegments(); |
| assertEquals(1, segments.size()); |
| ledgerId = segments.get(0).getLedgerId(); |
| lh = ((BKDistributedLogNamespace) namespace).getReaderBKC() |
| .get().openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8)); |
| metadata = BookKeeperAccessor.getLedgerMetadata(lh); |
| assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT - 1, metadata.getEnsembleSize()); |
| lh.close(); |
| Utils.close(writer); |
| dlm.close(); |
| namespace.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteRecordSet() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| |
| URI uri = createDLMURI("/" + name); |
| ensureURICreated(uri); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| List<Future<DLSN>> writeFutures = Lists.newArrayList(); |
| for (int i = 0; i < 5; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(1L + i); |
| writeFutures.add(writer.write(record)); |
| } |
| List<Future<DLSN>> recordSetFutures = Lists.newArrayList(); |
| // write another 5 records |
| final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(4096, CompressionCodec.Type.LZ4); |
| for (int i = 0; i < 5; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(6L + i); |
| Promise<DLSN> writePromise = new Promise<DLSN>(); |
| recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise); |
| recordSetFutures.add(writePromise); |
| } |
| final ByteBuffer recordSetBuffer = recordSetWriter.getBuffer(); |
| byte[] data = new byte[recordSetBuffer.remaining()]; |
| recordSetBuffer.get(data); |
| LogRecord setRecord = new LogRecord(6L, data); |
| setRecord.setRecordSet(); |
| Future<DLSN> writeRecordSetFuture = writer.write(setRecord); |
| writeRecordSetFuture.addEventListener(new FutureEventListener<DLSN>() { |
| @Override |
| public void onSuccess(DLSN dlsn) { |
| recordSetWriter.completeTransmit( |
| dlsn.getLogSegmentSequenceNo(), |
| dlsn.getEntryId(), |
| dlsn.getSlotId()); |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| recordSetWriter.abortTransmit(cause); |
| } |
| }); |
| writeFutures.add(writeRecordSetFuture); |
| FutureUtils.result(writeRecordSetFuture); |
| // write last 5 records |
| for (int i = 0; i < 5; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(11L + i); |
| Future<DLSN> writeFuture = writer.write(record); |
| writeFutures.add(writeFuture); |
| // make sure get log record count returns the right count |
| if (i == 0) { |
| FutureUtils.result(writeFuture); |
| assertEquals(10, dlm.getLogRecordCount()); |
| } |
| } |
| |
| List<DLSN> writeResults = FutureUtils.result(Future.collect(writeFutures)); |
| |
| for (int i = 0; i < 5; i++) { |
| Assert.assertEquals(new DLSN(1L, i, 0L), writeResults.get(i)); |
| } |
| Assert.assertEquals(new DLSN(1L, 5L, 0L), writeResults.get(5)); |
| for (int i = 0; i < 5; i++) { |
| Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i)); |
| } |
| List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetFutures)); |
| for (int i = 0; i < 5; i++) { |
| Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i)); |
| } |
| |
| FutureUtils.result(writer.flushAndCommit()); |
| |
| DistributedLogConfiguration readConf1 = new DistributedLogConfiguration(); |
| readConf1.addConfiguration(confLocal); |
| readConf1.setDeserializeRecordSetOnReads(true); |
| |
| DistributedLogManager readDLM1 = createNewDLM(readConf1, name); |
| AsyncLogReader reader1 = readDLM1.getAsyncLogReader(DLSN.InitialDLSN); |
| for (int i = 0; i < 15; i++) { |
| LogRecordWithDLSN record = FutureUtils.result(reader1.readNext()); |
| if (i < 5) { |
| assertEquals(new DLSN(1L, i, 0L), record.getDlsn()); |
| assertEquals(1L + i, record.getTransactionId()); |
| } else if (i >= 10) { |
| assertEquals(new DLSN(1L, 6L + i - 10, 0L), record.getDlsn()); |
| assertEquals(11L + i - 10, record.getTransactionId()); |
| } else { |
| assertEquals(new DLSN(1L, 5L, i - 5), record.getDlsn()); |
| assertEquals(6L, record.getTransactionId()); |
| } |
| assertEquals(i+1, record.getPositionWithinLogSegment()); |
| assertArrayEquals(DLMTestUtil.generatePayload(i+1), record.getPayload()); |
| } |
| |
| DistributedLogConfiguration readConf2 = new DistributedLogConfiguration(); |
| readConf2.addConfiguration(confLocal); |
| readConf2.setDeserializeRecordSetOnReads(false); |
| |
| DistributedLogManager readDLM2 = createNewDLM(readConf2, name); |
| AsyncLogReader reader2 = readDLM2.getAsyncLogReader(DLSN.InitialDLSN); |
| for (int i = 0; i < 11; i++) { |
| LogRecordWithDLSN record = FutureUtils.result(reader2.readNext()); |
| LOG.info("Read record {}", record); |
| if (i < 5) { |
| assertEquals(new DLSN(1L, i, 0L), record.getDlsn()); |
| assertEquals(1L + i, record.getTransactionId()); |
| assertEquals(i + 1, record.getPositionWithinLogSegment()); |
| assertArrayEquals(DLMTestUtil.generatePayload(i+1), record.getPayload()); |
| } else if (i >= 6L) { |
| assertEquals(new DLSN(1L, 6L + i - 6, 0L), record.getDlsn()); |
| assertEquals(11L + i - 6, record.getTransactionId()); |
| assertEquals(11 + i - 6, record.getPositionWithinLogSegment()); |
| assertArrayEquals(DLMTestUtil.generatePayload(11L + i - 6), record.getPayload()); |
| } else { |
| assertEquals(new DLSN(1L, 5L, 0), record.getDlsn()); |
| assertEquals(6L, record.getTransactionId()); |
| assertEquals(6, record.getPositionWithinLogSegment()); |
| assertTrue(record.isRecordSet()); |
| assertEquals(5, LogRecordSet.numRecords(record)); |
| } |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| confLocal.setPeriodicKeepAliveMilliSeconds(0); |
| confLocal.setReaderIdleWarnThresholdMillis(20); |
| confLocal.setReaderIdleErrorThresholdMillis(40); |
| |
| URI uri = createDLMURI("/" + name); |
| ensureURICreated(uri); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); |
| writer.write(DLMTestUtil.getLogRecordInstance(1L)); |
| |
| AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); |
| try { |
| FutureUtils.result(reader.readNext()); |
| fail("Should fail when stream is idle"); |
| } catch (IdleReaderException ire) { |
| // expected |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception { |
| String name = runtime.getMethodName(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(testConf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setImmediateFlushEnabled(false); |
| confLocal.setPeriodicFlushFrequencyMilliSeconds(0); |
| confLocal.setPeriodicKeepAliveMilliSeconds(1000); |
| confLocal.setReaderIdleWarnThresholdMillis(2000); |
| confLocal.setReaderIdleErrorThresholdMillis(4000); |
| |
| URI uri = createDLMURI("/" + name); |
| ensureURICreated(uri); |
| |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter()); |
| writer.write(DLMTestUtil.getLogRecordInstance(1L)); |
| |
| AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN)); |
| LogRecordWithDLSN record = FutureUtils.result(reader.readNext()); |
| assertEquals(1L, record.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(record); |
| } |
| } |