/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.distributedlog.DLMTestUtil.validateFutureFailed;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
import static org.junit.Assert.*;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.Assert;

import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAccessor;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.feature.FixedValueFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;

import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;

import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.IdleReaderException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.SimplePermitLimiter;


import org.apache.distributedlog.util.Utils;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;




/**
 * Test Cases for AsyncReaderWriter.
 */
public class TestAsyncReaderWriter extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestAsyncReaderWriter.class);

    protected DistributedLogConfiguration testConf;

    public TestAsyncReaderWriter() {
        this.testConf = new DistributedLogConfiguration();
        this.testConf.loadConf(conf);
        this.testConf.setReaderIdleErrorThresholdMillis(1200000);
        this.testConf.setReadAheadWaitTimeOnEndOfStream(20);
    }

    @Rule
    public TestName runtime = new TestName();

    /**
     * Test writing control records to writers: writers should be able to write control records, and
     * the readers should skip control records while reading.
     */
    @Test(timeout = 60000)
    public void testWriteControlRecord() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(1024);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        // Write 3 log segments. For each log segments, write one control record and nine user records.
        int txid = 1;
        for (long i = 0; i < 3; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
            DLSN dlsn = Utils.ioResult(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8))));
            assertEquals(currentLogSegmentSeqNo, dlsn.getLogSegmentSequenceNo());
            assertEquals(0, dlsn.getEntryId());
            assertEquals(0, dlsn.getSlotId());
            for (long j = 1; j < 10; j++) {
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                Utils.ioResult(writer.write(record));
            }
            writer.closeAndComplete();
        }
        dlm.close();

        // Read all the written data: It should skip control records and only return user records.
        DistributedLogManager readDlm = createNewDLM(confLocal, name);
        LogReader reader = readDlm.getInputStream(1);

        long numTrans = 0;
        long expectedTxId = 2;
        LogRecord record = reader.readNext(false);
        while (null != record) {
            DLMTestUtil.verifyLargeLogRecord(record);
            numTrans++;
            assertEquals(expectedTxId, record.getTransactionId());
            if (expectedTxId % 10 == 0) {
                expectedTxId += 2;
            } else {
                ++expectedTxId;
            }
            record = reader.readNext(false);
        }
        reader.close();
        assertEquals(3 * 9, numTrans);
        assertEquals(3 * 9, readDlm.getLogRecordCount());
        readDlm.close();
    }

    @Test(timeout = 60000)
    public void testAsyncWritePendingWritesAbortedWhenLedgerRollTriggerFails() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(1024);
        confLocal.setMaxLogSegmentBytes(1024);
        confLocal.setLogSegmentRollingIntervalMinutes(0);
        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());

        // Write one record larger than max seg size. Ledger doesn't roll until next write.
        int txid = 1;
        LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048);
        CompletableFuture<DLSN> result = writer.write(record);
        DLSN dlsn = Utils.ioResult(result, 10, TimeUnit.SECONDS);
        assertEquals(1, dlsn.getLogSegmentSequenceNo());

        record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1);
        result = writer.write(record);
        validateFutureFailed(result, LogRecordTooLongException.class);

        record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1);
        result = writer.write(record);
        validateFutureFailed(result, WriteException.class);

        record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1);
        result = writer.write(record);
        validateFutureFailed(result, WriteException.class);

        writer.closeAndComplete();
        dlm.close();
    }

    /**
     * Test Case: Simple Async Writes. Writes 30 records. They should be written correctly.
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testSimpleAsyncWrite() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(1024);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        DistributedLogManager dlm = createNewDLM(confLocal, name);

        final CountDownLatch syncLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
        final AtomicBoolean errorsFound = new AtomicBoolean(false);
        final AtomicReference<DLSN> maxDLSN = new AtomicReference<DLSN>(DLSN.InvalidDLSN);
        int txid = 1;
        for (long i = 0; i < numLogSegments; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
            for (long j = 0; j < numRecordsPerLogSegment; j++) {
                final long currentEntryId = j;
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
                dlsnFuture.whenComplete(new FutureEventListener<DLSN>() {
                    @Override
                    public void onSuccess(DLSN value) {
                        if (value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) {
                            LOG.debug("LogSegmentSequenceNumber: {}, Expected {}",
                                    value.getLogSegmentSequenceNo(), currentLogSegmentSeqNo);
                            errorsFound.set(true);
                        }

                        if (value.getEntryId() != currentEntryId) {
                            LOG.debug("EntryId: {}, Expected {}", value.getEntryId(), currentEntryId);
                            errorsFound.set(true);
                        }

                        if (value.compareTo(maxDLSN.get()) > 0) {
                            maxDLSN.set(value);
                        }

                        syncLatch.countDown();
                        LOG.debug("SyncLatch: {}", syncLatch.getCount());
                    }
                    @Override
                    public void onFailure(Throwable cause) {
                        LOG.error("Encountered exception on writing record {} in log segment {}",
                                currentEntryId, currentLogSegmentSeqNo);
                        errorsFound.set(true);
                    }
                });
            }
            writer.closeAndComplete();
        }

        syncLatch.await();
        assertFalse("Should not encounter any errors for async writes", errorsFound.get());

        LogRecordWithDLSN last = dlm.getLastLogRecord();
        assertEquals("Last DLSN" + last.getDlsn() + " isn't the maximum DLSN " + maxDLSN.get(),
                last.getDlsn(), maxDLSN.get());
        assertEquals(last.getDlsn(), dlm.getLastDLSN());
        assertEquals(last.getDlsn(), Utils.ioResult(dlm.getLastDLSNAsync()));
        DLMTestUtil.verifyLargeLogRecord(last);

        dlm.close();
    }

    /**
     * Write records into <i>numLogSegments</i> log segments.
     * Each log segment has <i>numRecordsPerLogSegment</i> records.
     *
     * @param dlm
     *          distributedlog manager
     * @param numLogSegments
     *          number of log segments
     * @param numRecordsPerLogSegment
     *          number records per log segment
     * @param startTxId
     *          start tx id
     * @return next tx id
     */
    private static long writeRecords(DistributedLogManager dlm,
                                     int numLogSegments,
                                     int numRecordsPerLogSegment,
                                     long startTxId,
                                     boolean emptyRecord) throws IOException {
        long txid = startTxId;
        for (long i = 0; i < numLogSegments; i++) {
            BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
            for (long j = 1; j <= numRecordsPerLogSegment; j++) {
                if (emptyRecord) {
                    writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++));
                } else {
                    writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++));
                }
            }
            writer.closeAndComplete();
        }
        return txid;
    }

    /**
     * Write <code>numRecords</code> records to the log, starting with <code>startTxId</code>.
     * It flushes every <code>flushPerNumRecords</code> records.
     *
     * @param dlm
     *          distributedlog manager
     * @param numRecords
     *          num records to write
     * @param startTxId
     *          start tx id
     * @param flushPerNumRecords
     *          number records to flush
     * @return next tx id
     * @throws IOException
     */
    private static long writeLogSegment(DistributedLogManager dlm,
                                        int numRecords,
                                        long startTxId,
                                        int flushPerNumRecords,
                                        boolean emptyRecord) throws IOException {
        long txid = startTxId;
        LogWriter writer = dlm.startLogSegmentNonPartitioned();
        for (long j = 1; j <= numRecords; j++) {
            if (emptyRecord) {
                writer.write(DLMTestUtil.getEmptyLogRecordInstance(txid++));
            } else {
                writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++));
            }
            if (j % flushPerNumRecords == 0) {
                writer.flush();
                writer.commit();
            }
        }
        writer.flush();
        writer.commit();
        writer.close();
        return txid;
    }

    private static void readNext(final AsyncLogReader reader,
                                 final DLSN startPosition,
                                 final long startSequenceId,
                                 final boolean monotonic,
                                 final CountDownLatch syncLatch,
                                 final CountDownLatch completionLatch,
                                 final AtomicBoolean errorsFound) {
        CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
        record.whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
            @Override
            public void onSuccess(LogRecordWithDLSN value) {
                try {
                    if (monotonic) {
                        assertEquals(startSequenceId, value.getSequenceId());
                    } else {
                        assertTrue(value.getSequenceId() < 0);
                        assertTrue(value.getSequenceId() > startSequenceId);
                    }
                    LOG.info("Received record {} from {}", value, reader.getStreamName());
                    assertTrue(!value.isControl());
                    assertTrue(value.getDlsn().getSlotId() == 0);
                    assertTrue(value.getDlsn().compareTo(startPosition) >= 0);
                    DLMTestUtil.verifyLargeLogRecord(value);
                } catch (Exception exc) {
                    LOG.debug("Exception Encountered when verifying log record {} : ", value.getDlsn(), exc);
                    errorsFound.set(true);
                    completionLatch.countDown();
                    return;
                }
                syncLatch.countDown();
                if (syncLatch.getCount() <= 0) {
                    completionLatch.countDown();
                } else {
                    TestAsyncReaderWriter.readNext(
                            reader,
                            value.getDlsn().getNextDLSN(),
                            monotonic ? value.getSequenceId() + 1 : value.getSequenceId(),
                            monotonic,
                            syncLatch,
                            completionLatch,
                            errorsFound);
                }
            }
            @Override
            public void onFailure(Throwable cause) {
                LOG.error("Encountered Exception on reading {}", reader.getStreamName(), cause);
                errorsFound.set(true);
                completionLatch.countDown();
            }
        });
    }

    void simpleAsyncReadTest(String name, DistributedLogConfiguration confLocal) throws Exception {
        confLocal.setOutputBufferSize(1024);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        // Write 30 records: 3 log segments, 10 records per log segment
        long txid = 1L;
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false);
        // Write another log segment with 5 records and flush every 2 records
        txid = writeLogSegment(dlm, 5, txid, 2, false);

        final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);
        final CountDownLatch syncLatch = new CountDownLatch((int) (txid - 1));
        final CountDownLatch completionLatch = new CountDownLatch(1);
        final AtomicBoolean errorsFound = new AtomicBoolean(false);

        boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion());
        TestAsyncReaderWriter.readNext(
                reader,
                DLSN.InvalidDLSN,
                monotonic ? 0L : Long.MIN_VALUE,
                monotonic,
                syncLatch,
                completionLatch,
                errorsFound);

        completionLatch.await();
        assertFalse("Errors encountered on reading records", errorsFound.get());
        syncLatch.await();

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncRead() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        simpleAsyncReadTest(name, confLocal);
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWriteWithMonitoredFuturePool() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setTaskExecutionWarnTimeMicros(1000);
        confLocal.setEnableTaskExecutionStats(true);
        simpleAsyncReadTest(name, confLocal);
    }

    @Test(timeout = 60000)
    public void testBulkAsyncRead() throws Exception {
        String name = "distrlog-bulkasyncread";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadMaxRecords(10000);
        confLocal.setReadAheadBatchSize(10);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 20;

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, 1L, false);

        final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
        int expectedTxID = 1;
        int numReads = 0;
        while (expectedTxID <= numLogSegments * numRecordsPerLogSegment) {
            if (expectedTxID == numLogSegments * numRecordsPerLogSegment) {
                break;
            }
            List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20));
            LOG.info("Bulk read {} entries.", records.size());

            assertTrue(records.size() >= 1);
            for (LogRecordWithDLSN record : records) {
                assertEquals(expectedTxID, record.getTransactionId());
                ++expectedTxID;
            }
            ++numReads;
        }

        // we expect bulk read works
        assertTrue(numReads < 60);

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testBulkAsyncReadWithWriteBatch() throws Exception {
        String name = "distrlog-bulkasyncread-with-writebatch";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(conf);
        confLocal.setOutputBufferSize(1024000);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadMaxRecords(10000);
        confLocal.setReadAheadBatchSize(10);

        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 20;

        writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, 1L, false);

        final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
        int expectedTxID = 1;
        for (long i = 0; i < 3; i++) {
            // since we batched 20 entries into single bookkeeper entry
            // we should be able to read 20 entries as a batch.
            List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20));
            assertEquals(20, records.size());
            for (LogRecordWithDLSN record : records) {
                assertEquals(expectedTxID, record.getTransactionId());
                ++expectedTxID;
            }
        }

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testAsyncReadEmptyRecords() throws Exception {
        String name = "distrlog-simpleasyncreadempty";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        long txid = 1L;
        // write 3 log segments, 10 records per log segment
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, true);
        // write another log segment with 5 records and flush every 2 records
        txid = writeLogSegment(dlm, 5, txid, 2, true);

        AsyncLogReader asyncReader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);
        assertEquals("Expected stream name = " + name + " but " + asyncReader.getStreamName() + " found",
                name, asyncReader.getStreamName());
        long numTrans = 0;
        DLSN lastDLSN = DLSN.InvalidDLSN;
        LogRecordWithDLSN record = Utils.ioResult(asyncReader.readNext());
        while (null != record) {
            DLMTestUtil.verifyEmptyLogRecord(record);
            assertEquals(0, record.getDlsn().getSlotId());
            assertTrue(record.getDlsn().compareTo(lastDLSN) > 0);
            lastDLSN = record.getDlsn();
            numTrans++;
            if (numTrans >= (txid - 1)) {
                break;
            }
            record = Utils.ioResult(asyncReader.readNext());
        }
        assertEquals((txid - 1), numTrans);
        Utils.close(asyncReader);
        dlm.close();
    }

    /**
     * Test Async Read by positioning to a given position in the log.
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testSimpleAsyncReadPosition() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(1024);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        long txid = 1L;
        // write 3 log segments, 10 records per log segment
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false);
        // write another log segment with 5 records
        txid = writeLogSegment(dlm, 5, txid, Integer.MAX_VALUE, false);

        final CountDownLatch syncLatch = new CountDownLatch((int) (txid - 14));
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final AtomicBoolean errorsFound = new AtomicBoolean(false);
        final AsyncLogReader reader = dlm.getAsyncLogReader(new DLSN(2, 2, 4));
        assertEquals(name, reader.getStreamName());

        boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion());
        TestAsyncReaderWriter.readNext(
                reader,
                new DLSN(2, 3, 0),
                monotonic ? 13L : Long.MIN_VALUE,
                monotonic,
                syncLatch,
                doneLatch,
                errorsFound);

        doneLatch.await();
        assertFalse("Errors found on reading records", errorsFound.get());
        syncLatch.await();

        Utils.close(reader);
        dlm.close();
    }

    /**
     * Test write/read entries when immediate flush is disabled.
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testSimpleAsyncReadWrite() throws Exception {
        testSimpleAsyncReadWriteInternal(runtime.getMethodName(), false);
    }

    /**
     * Test write/read entries when immediate flush is enabled.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testSimpleAsyncReadWriteImmediateFlush() throws Exception {
        testSimpleAsyncReadWriteInternal(runtime.getMethodName(), true);
    }

    /**
     * Test if entries written using log segment metadata that doesn't support enveloping
     * can be read correctly by a reader supporting both.
     *NOTE: An older reader cannot read enveloped entry, so we don't have a test case covering
     *       the other scenario.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testNoEnvelopeWriterEnvelopeReader() throws Exception {
        testSimpleAsyncReadWriteInternal(runtime.getMethodName(), true,
                LogSegmentMetadata.LogSegmentMetadataVersion.VERSION_V4_ENVELOPED_ENTRIES.value - 1);
    }

    static class WriteFutureEventListener implements FutureEventListener<DLSN> {
        private final LogRecord record;
        private final long currentLogSegmentSeqNo;
        private final long currentEntryId;
        private final CountDownLatch syncLatch;
        private final AtomicBoolean errorsFound;
        private final boolean verifyEntryId;

        WriteFutureEventListener(LogRecord record,
                                 long currentLogSegmentSeqNo,
                                 long currentEntryId,
                                 CountDownLatch syncLatch,
                                 AtomicBoolean errorsFound,
                                 boolean verifyEntryId) {
            this.record = record;
            this.currentLogSegmentSeqNo = currentLogSegmentSeqNo;
            this.currentEntryId = currentEntryId;
            this.syncLatch = syncLatch;
            this.errorsFound = errorsFound;
            this.verifyEntryId = verifyEntryId;
        }

        /**
         * Invoked if the computation completes successfully.
         */
        @Override
        public void onSuccess(DLSN value) {
            if (value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) {
                LOG.error("Ledger Seq No: {}, Expected: {}", value.getLogSegmentSequenceNo(), currentLogSegmentSeqNo);
                errorsFound.set(true);
            }

            if (verifyEntryId && value.getEntryId() != currentEntryId) {
                LOG.error("EntryId: {}, Expected: {}", value.getEntryId(), currentEntryId);
                errorsFound.set(true);
            }
            syncLatch.countDown();
        }

        /**
         * Invoked if the computation completes unsuccessfully.
         */
        @Override
        public void onFailure(Throwable cause) {
            LOG.error("Encountered failures on writing record as (lid = {}, eid = {}) :",
                    new Object[]{currentLogSegmentSeqNo, currentEntryId, cause});
            errorsFound.set(true);
            syncLatch.countDown();
        }
    }

    void testSimpleAsyncReadWriteInternal(String name, boolean immediateFlush)
            throws Exception {
        testSimpleAsyncReadWriteInternal(name, immediateFlush,
                                         LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
    }

    void testSimpleAsyncReadWriteInternal(String name, boolean immediateFlush,
                                          int logSegmentVersion) throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        confLocal.setOutputBufferSize(1024);
        confLocal.setDLLedgerMetadataLayoutVersion(logSegmentVersion);
        confLocal.setImmediateFlushEnabled(immediateFlush);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        final CountDownLatch readLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
        final CountDownLatch readDoneLatch = new CountDownLatch(1);
        final AtomicBoolean readErrors = new AtomicBoolean(false);
        final CountDownLatch writeLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
        final AtomicBoolean writeErrors = new AtomicBoolean(false);
        final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);
        assertEquals(name, reader.getStreamName());

        int txid = 1;
        for (long i = 0; i < 3; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
            for (long j = 0; j < 10; j++) {
                final long currentEntryId = j;
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
                dlsnFuture.whenComplete(new WriteFutureEventListener(
                        record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
                if (i == 0 && j == 0) {
                    boolean monotonic = LogSegmentMetadata.supportsSequenceId(logSegmentVersion);
                    TestAsyncReaderWriter.readNext(
                            reader,
                            DLSN.InvalidDLSN,
                            monotonic ? 0L : Long.MIN_VALUE,
                            monotonic,
                            readLatch,
                            readDoneLatch,
                            readErrors);
                }
            }
            writer.closeAndComplete();
        }

        writeLatch.await();
        assertFalse("All writes should succeed", writeErrors.get());

        readDoneLatch.await();
        assertFalse("All reads should succeed", readErrors.get());
        readLatch.await();

        Utils.close(reader);
        dlm.close();
    }


    /**
     * Test Case: starting reading when the streams don't exist.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testSimpleAsyncReadWriteStartEmpty() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        confLocal.setOutputBufferSize(1024);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        DistributedLogManager dlm = createNewDLM(confLocal, name);

        final CountDownLatch readerReadyLatch = new CountDownLatch(1);
        final CountDownLatch readerDoneLatch = new CountDownLatch(1);
        final CountDownLatch readerSyncLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);

        final TestReader reader = new TestReader(
                "test-reader",
                dlm,
                DLSN.InitialDLSN,
                false,
                0,
                readerReadyLatch,
                readerSyncLatch,
                readerDoneLatch);

        reader.start();

        // Increase the probability of reader failure and retry
        Thread.sleep(500);

        final AtomicBoolean writeErrors = new AtomicBoolean(false);
        final CountDownLatch writeLatch = new CountDownLatch(30);

        int txid = 1;
        for (long i = 0; i < 3; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
            for (long j = 0; j < 10; j++) {
                final long currentEntryId = j;
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
                dlsnFuture.whenComplete(new WriteFutureEventListener(
                        record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
            }
            writer.closeAndComplete();
        }

        writeLatch.await();
        assertFalse("All writes should succeed", writeErrors.get());

        readerDoneLatch.await();
        assertFalse("Should not encounter errors during reading", reader.areErrorsFound());
        readerSyncLatch.await();

        assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1);
        reader.stop();
        dlm.close();
    }


    /**
     * Test Case: starting reading when the streams don't exist.
     * {@link https://issues.apache.org/jira/browse/DL-42}
     */
    @DistributedLogAnnotations.FlakyTest
    @Ignore
    @Test(timeout = 120000)
    public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception {
        // int count = 50;
        int count = 1;
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        confLocal.setOutputBufferSize(1024);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 1;

        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);
        Namespace namespace = NamespaceBuilder.newBuilder()
                .conf(confLocal).uri(uri).build();
        final DistributedLogManager[] dlms = new DistributedLogManager[count];
        final TestReader[] readers = new TestReader[count];
        final CountDownLatch readyLatch = new CountDownLatch(count);
        final CountDownLatch[] syncLatches = new CountDownLatch[count];
        final CountDownLatch[] readerDoneLatches = new CountDownLatch[count];
        for (int s = 0; s < count; s++) {
            dlms[s] = namespace.openLog(name + String.format("%d", s));
            readerDoneLatches[s] = new CountDownLatch(1);
            syncLatches[s] = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
            readers[s] = new TestReader("reader-" + s,
                    dlms[s], DLSN.InitialDLSN, false, 0, readyLatch, syncLatches[s], readerDoneLatches[s]);
            readers[s].start();
        }

        // wait all readers were positioned at least once
        readyLatch.await();

        final CountDownLatch writeLatch = new CountDownLatch(3 * count);
        final AtomicBoolean writeErrors = new AtomicBoolean(false);

        int txid = 1;
        for (long i = 0; i < 3; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter[] writers = new BKAsyncLogWriter[count];
            for (int s = 0; s < count; s++) {
                writers[s] = (BKAsyncLogWriter) (dlms[s].startAsyncLogSegmentNonPartitioned());
            }
            for (long j = 0; j < 1; j++) {
                final long currentEntryId = j;
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                for (int s = 0; s < count; s++) {
                    CompletableFuture<DLSN> dlsnFuture = writers[s].write(record);
                    dlsnFuture.whenComplete(new WriteFutureEventListener(
                            record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
                }
            }
            for (int s = 0; s < count; s++) {
                writers[s].closeAndComplete();
            }
        }

        writeLatch.await();
        assertFalse("All writes should succeed", writeErrors.get());

        for (int s = 0; s < count; s++) {
            readerDoneLatches[s].await();
            assertFalse("Reader " + s + " should not encounter errors", readers[s].areErrorsFound());
            syncLatches[s].await();
            assertEquals(numLogSegments * numRecordsPerLogSegment, readers[s].getNumReads().get());
            assertTrue("Reader " + s + " should position at least once", readers[s].getNumReaderPositions().get() > 0);
        }

        for (int s = 0; s < count; s++) {
            readers[s].stop();
            dlms[s].close();
        }
    }

    /**
     * Flaky test fixed: readers need to be added to the pendingReaders.
     * @throws Exception
     */
    @Test(timeout = 300000)
    public void testSimpleAsyncReadWriteSimulateErrors() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(10);
        confLocal.setOutputBufferSize(1024);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 5;
        int numRecordsPerLogSegment = 10;

        final CountDownLatch doneLatch = new CountDownLatch(1);
        final CountDownLatch syncLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);

        TestReader reader = new TestReader(
                "test-reader",
                dlm,
                DLSN.InitialDLSN,
                true,
                0,
                new CountDownLatch(1),
                syncLatch,
                doneLatch);

        reader.start();

        final CountDownLatch writeLatch = new CountDownLatch(numLogSegments * numRecordsPerLogSegment);
        final AtomicBoolean writeErrors = new AtomicBoolean(false);

        int txid = 1;
        for (long i = 0; i < numLogSegments; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
            for (long j = 0; j < numRecordsPerLogSegment; j++) {
                final long currentEntryId = j;
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
                dlsnFuture.whenComplete(new WriteFutureEventListener(
                        record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
            }
            writer.closeAndComplete();
        }

        writeLatch.await();
        assertFalse("All writes should succeed", writeErrors.get());

        doneLatch.await();
        assertFalse("Should not encounter errors during reading", reader.areErrorsFound());
        syncLatch.await();

        assertTrue("Should position reader at least once", reader.getNumReaderPositions().get() > 1);
        reader.stop();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testSimpleAsyncReadWritePiggyBack() throws Exception {
        String name = runtime.getMethodName();

        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setEnableReadAhead(true);
        confLocal.setReadAheadWaitTime(500);
        confLocal.setReadAheadBatchSize(10);
        confLocal.setReadAheadMaxRecords(100);
        confLocal.setOutputBufferSize(1024);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(100);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        final CountDownLatch readLatch = new CountDownLatch(30);
        final CountDownLatch readDoneLatch = new CountDownLatch(1);
        final AtomicBoolean readErrors = new AtomicBoolean(false);
        final CountDownLatch writeLatch = new CountDownLatch(30);
        final AtomicBoolean writeErrors = new AtomicBoolean(false);

        int txid = 1;
        for (long i = 0; i < numLogSegments; i++) {
            final long currentLogSegmentSeqNo = i + 1;
            BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
            for (long j = 0; j < numRecordsPerLogSegment; j++) {
                Thread.sleep(50);
                final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
                CompletableFuture<DLSN> dlsnFuture = writer.write(record);
                dlsnFuture.whenComplete(new WriteFutureEventListener(
                        record, currentLogSegmentSeqNo, j, writeLatch, writeErrors, false));
                if (i == 0 && j == 0) {
                    boolean monotonic =
                            LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion());
                    TestAsyncReaderWriter.readNext(
                            reader,
                            DLSN.InvalidDLSN,
                            monotonic ? 0L : Long.MIN_VALUE,
                            monotonic,
                            readLatch,
                            readDoneLatch,
                            readErrors);
                }
            }
            writer.closeAndComplete();
        }

        writeLatch.await();
        assertFalse("All writes should succeed", writeErrors.get());

        readDoneLatch.await();
        assertFalse("All reads should succeed", readErrors.get());
        readLatch.await();

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testCancelReadRequestOnReaderClosed() throws Exception {
        final String name = "distrlog-cancel-read-requests-on-reader-closed";

        DistributedLogManager dlm = createNewDLM(testConf, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));
        writer.closeAndComplete();

        final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
        LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
        assertEquals(1L, record.getTransactionId());
        DLMTestUtil.verifyLogRecord(record);

        final CountDownLatch readLatch = new CountDownLatch(1);
        final AtomicBoolean receiveExpectedException = new AtomicBoolean(false);
        Thread readThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Utils.ioResult(reader.readNext());
                } catch (ReadCancelledException rce) {
                    receiveExpectedException.set(true);
                } catch (Throwable t) {
                    LOG.error("Receive unexpected exception on reading stream {} : ", name, t);
                }
                readLatch.countDown();
            }
        }, "read-thread");
        readThread.start();

        Thread.sleep(1000);

        // close reader should cancel the pending read next
        Utils.close(reader);

        readLatch.await();
        readThread.join();

        assertTrue("Read request should be cancelled.", receiveExpectedException.get());

        // closed reader should reject any readNext
        try {
            Utils.ioResult(reader.readNext());
            fail("Reader should reject readNext if it is closed.");
        } catch (ReadCancelledException rce) {
            // expected
        }

        dlm.close();
    }

    @Test(timeout = 60000)
    public void testAsyncWriteWithMinDelayBetweenFlushes() throws Exception {
        String name = "distrlog-asyncwrite-mindelay";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setMinDelayBetweenImmediateFlushMs(100);
        DistributedLogManager dlm = createNewDLM(confLocal, name);
        final Thread currentThread = Thread.currentThread();
        final int count = 5000;
        final CountDownLatch syncLatch = new CountDownLatch(count);
        int txid = 1;
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
        Stopwatch executionTime = Stopwatch.createStarted();
        for (long i = 0; i < count; i++) {
            Thread.sleep(1);
            final LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
            CompletableFuture<DLSN> dlsnFuture = writer.write(record);
            dlsnFuture.whenComplete(new FutureEventListener<DLSN>() {
                @Override
                public void onSuccess(DLSN value) {
                    syncLatch.countDown();
                    LOG.debug("SyncLatch: {} ; DLSN: {} ", syncLatch.getCount(), value);
                }
                @Override
                public void onFailure(Throwable cause) {
                    currentThread.interrupt();
                }
            });
        }

        boolean success = false;
        if (!(Thread.interrupted())) {
            try {
                success = syncLatch.await(10, TimeUnit.SECONDS);
            } catch (InterruptedException exc) {
                Thread.currentThread().interrupt();
            }
        }

        // Abort, not graceful close, since the latter will
        // flush as well, and may add an entry.
        writer.abort();

        executionTime.stop();
        assertTrue(!(Thread.interrupted()));
        assertTrue(success);

        LogRecordWithDLSN last = dlm.getLastLogRecord();
        LOG.info("Last Entry {}; elapsed time {}",
                last.getDlsn().getEntryId(), executionTime.elapsed(TimeUnit.MILLISECONDS));

        // Regardless of how many records we wrote; the number of BK entries should always be bounded by the min delay.
        // Since there are two flush processes--data flush and control flush, and since control flush may also end up
        // flushing data if data is available, the upper bound is 2*(time/min_delay + 1)
        assertTrue(last.getDlsn().getEntryId() <= ((executionTime.elapsed(TimeUnit.MILLISECONDS)
                / confLocal.getMinDelayBetweenImmediateFlushMs() + 1)) * 2);
        DLMTestUtil.verifyLogRecord(last);

        dlm.close();
    }

    @Test(timeout = 60000)
    public void testAsyncWriteWithMinDelayBetweenFlushesFlushFailure() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setMinDelayBetweenImmediateFlushMs(1);

        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);

        Namespace namespace = NamespaceBuilder.newBuilder()
                .conf(confLocal).uri(uri).clientId("gabbagoo").build();
        DistributedLogManager dlm = namespace.openLog(name);
        Namespace namespace1 = NamespaceBuilder.newBuilder()
                .conf(confLocal).uri(uri).clientId("tortellini").build();
        DistributedLogManager dlm1 = namespace1.openLog(name);

        int txid = 1;
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());

        // First write succeeds since lock isnt checked until transmit, which is scheduled
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
        writer.flushAndCommit();

        BKLogSegmentWriter perStreamWriter = writer.getCachedLogWriter();
        DistributedLock lock = perStreamWriter.getLock();
        Utils.ioResult(lock.asyncClose());

        // Get second writer, steal lock
        BKAsyncLogWriter writer2 = (BKAsyncLogWriter) (dlm1.startAsyncLogSegmentNonPartitioned());

        try {
            // Succeeds, kicks off scheduked flush
            writer.write(DLMTestUtil.getLogRecordInstance(txid++));

            // Succeeds, kicks off scheduled flush
            Thread.sleep(100);
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
            fail("should have thrown");
        } catch (LockingException ex) {
            LOG.debug("caught exception ", ex);
        }

        writer.close();
        dlm.close();
    }

    public void writeRecordsWithOutstandingWriteLimit(int stream, int global, boolean shouldFail) throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setPerWriterOutstandingWriteLimit(stream);
        confLocal.setOutstandingWriteLimitDarkmode(false);
        DistributedLogManager dlm;
        if (global > -1) {
            dlm = createNewDLM(confLocal, runtime.getMethodName(),
                    new SimplePermitLimiter(false, global, new NullStatsLogger(), true, new FixedValueFeature("", 0)));
        } else {
            dlm = createNewDLM(confLocal, runtime.getMethodName());
        }
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
        ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000);
        for (int i = 0; i < 1000; i++) {
            results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        }
        for (CompletableFuture<DLSN> result : results) {
            try {
                Utils.ioResult(result);
                if (shouldFail) {
                    fail("should fail due to no outstanding writes permitted");
                }
            } catch (OverCapacityException ex) {
                assertTrue(shouldFail);
            }
        }
        writer.closeAndComplete();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitNoLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(-1, -1, false);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitVeryHighLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(Integer.MAX_VALUE, Integer.MAX_VALUE, false);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitBlockAllStreamLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(0, Integer.MAX_VALUE, true);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitBlockAllGlobalLimit() throws Exception {
        writeRecordsWithOutstandingWriteLimit(Integer.MAX_VALUE, 0, true);
    }

    @Test(timeout = 60000)
    public void testOutstandingWriteLimitBlockAllLimitWithDarkmode() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setPerWriterOutstandingWriteLimit(0);
        confLocal.setOutstandingWriteLimitDarkmode(true);
        DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());
        ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000);
        for (int i = 0; i < 1000; i++) {
            results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        }
        for (CompletableFuture<DLSN> result : results) {
            Utils.ioResult(result);
        }
        writer.closeAndComplete();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testCloseAndCompleteLogSegmentWhenStreamIsInError() throws Exception {
        String name = "distrlog-close-and-complete-logsegment-when-stream-is-in-error";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);

        BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());

        long txId = 1L;
        for (int i = 0; i < 5; i++) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
        }

        BKLogSegmentWriter logWriter = writer.getCachedLogWriter();

        BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
        // fence the ledger
        driver.getReaderBKC().get().openLedger(logWriter.getLogSegmentId(),
                BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));

        try {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
            fail("Should fail write to a fenced ledger with BKTransmitException");
        } catch (BKTransmitException bkte) {
            // expected
        }

        try {
            writer.closeAndComplete();
            fail("Should fail to complete a log segment when its ledger is fenced");
        } catch (BKTransmitException bkte) {
            // expected
        }

        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        assertEquals(1, segments.size());
        assertTrue(segments.get(0).isInProgress());

        dlm.close();
    }

    @Test(timeout = 60000)
    public void testCloseAndCompleteLogSegmentWhenCloseFailed() throws Exception {
        String name = "distrlog-close-and-complete-logsegment-when-close-failed";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);

        BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());

        long txId = 1L;
        for (int i = 0; i < 5; i++) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
        }

        BKLogSegmentWriter logWriter = writer.getCachedLogWriter();

        BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver();
        // fence the ledger
        driver.getReaderBKC().get().openLedger(logWriter.getLogSegmentId(),
                BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));

        try {
            // insert a write to detect the fencing state, to make test more robust.
            writer.write(DLMTestUtil.getLogRecordInstance(txId++));
            writer.closeAndComplete();
            fail("Should fail to complete a log segment when its ledger is fenced");
        } catch (IOException ioe) {
            // expected
            LOG.error("Failed to close and complete log segment {} : ", logWriter.getFullyQualifiedLogSegment(), ioe);
        }

        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        assertEquals(1, segments.size());
        assertTrue(segments.get(0).isInProgress());

        dlm.close();
    }

    private void testAsyncReadIdleErrorInternal(String name,
                                                final int idleReaderErrorThreshold,
                                                final boolean heartBeatUsingControlRecs,
                                                final boolean simulateReaderStall) throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setReadAheadMaxRecords(1);
        confLocal.setReaderIdleWarnThresholdMillis(0);
        confLocal.setReaderIdleErrorThresholdMillis(idleReaderErrorThreshold);
        final DistributedLogManager dlm = createNewDLM(confLocal, name);
        final Thread currentThread = Thread.currentThread();
        final int segmentSize = 3;
        final int numSegments = 3;
        final CountDownLatch latch = new CountDownLatch(1);
        final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.schedule(
            new Runnable() {
                @Override
                public void run() {
                    try {
                        int txid = 1;
                        for (long i = 0; i < numSegments; i++) {
                            long start = txid;
                            BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
                            for (long j = 1; j <= segmentSize; j++) {
                                writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++));
                                if ((i == 0) && (j == 1)) {
                                    latch.countDown();
                                }
                            }

                            if (heartBeatUsingControlRecs) {
                                // There should be a control record such that
                                // wait time + commit time (BK) < Idle Reader Threshold
                                int threadSleepTime = idleReaderErrorThreshold
                                    - 200 // BK commitTime
                                    - 100; //safety margin

                                for (int iter = 1; iter <= (2 * idleReaderErrorThreshold / threadSleepTime); iter++) {
                                    Thread.sleep(threadSleepTime);
                                    writer.write(DLMTestUtil.getLargeLogRecordInstance(txid, true));
                                    writer.flush();
                                }
                                Thread.sleep(threadSleepTime);
                            }

                            writer.closeAndComplete();
                            if (!heartBeatUsingControlRecs) {
                                Thread.sleep(2 * idleReaderErrorThreshold);
                            }
                        }
                    } catch (Exception exc) {
                        if (!executor.isShutdown()) {
                            currentThread.interrupt();
                        }
                    }
                }
            }, 0, TimeUnit.MILLISECONDS);

        latch.await();
        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        if (simulateReaderStall) {
            reader.disableProcessingReadRequests();
        }
        boolean exceptionEncountered = false;
        int recordCount = 0;
        try {
            while (true) {
                CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
                Utils.ioResult(record);
                recordCount++;

                if (recordCount >= segmentSize * numSegments) {
                    break;
                }
            }
        } catch (IdleReaderException exc) {
            exceptionEncountered = true;
        }

        if (simulateReaderStall) {
            assertTrue(exceptionEncountered);
        } else if (heartBeatUsingControlRecs) {
            assertFalse(exceptionEncountered);
            Assert.assertEquals(segmentSize * numSegments, recordCount);
        } else {
            assertTrue(exceptionEncountered);
            Assert.assertEquals(segmentSize, recordCount);
        }
        assertFalse(currentThread.isInterrupted());
        Utils.close(reader);
        executor.shutdown();
    }

    @Test(timeout = 10000)
    public void testAsyncReadIdleControlRecord() throws Exception {
        String name = "distrlog-async-reader-idle-error-control";
        testAsyncReadIdleErrorInternal(name, 500, true, false);
    }

    @Test(timeout = 10000)
    public void testAsyncReadIdleError() throws Exception {
        String name = "distrlog-async-reader-idle-error";
        testAsyncReadIdleErrorInternal(name, 1000, false, false);
    }

    @Test(timeout = 10000)
    public void testAsyncReadIdleError2() throws Exception {
        String name = "distrlog-async-reader-idle-error-2";
        testAsyncReadIdleErrorInternal(name, 1000, true, true);
    }

    @Test(timeout = 60000)
    public void testReleaseLockAfterFailedToRecover() throws Exception {
        String name = "release-lock-after-failed-to-recover";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setLockTimeout(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setOutputBufferSize(0);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer =
                (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());

        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        writer.abort();

        for (int i = 0; i < 2; i++) {
            FailpointUtils.setFailpoint(
                    FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments,
                    FailpointUtils.FailPointActions.FailPointAction_Throw);

            try {
                dlm.startAsyncLogSegmentNonPartitioned();
                fail("Should fail during recovering incomplete log segments");
            } catch (IOException ioe) {
                // expected;
            } finally {
                FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
            }
        }

        writer = (BKAsyncLogWriter) (dlm.startAsyncLogSegmentNonPartitioned());

        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        assertEquals(1, segments.size());
        assertFalse(segments.get(0).isInProgress());

        writer.close();
        dlm.close();
    }

    @DistributedLogAnnotations.FlakyTest
    @Test(timeout = 60000)
    public void testAsyncReadMissingLogSegmentsNotification() throws Exception {
        String name = "distrlog-async-reader-missing-zk-notification";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setReadAheadMaxRecords(1);
        confLocal.setReadLACLongPollTimeout(49);
        confLocal.setReaderIdleWarnThresholdMillis(100);
        confLocal.setReaderIdleErrorThresholdMillis(20000);
        final DistributedLogManager dlm = createNewDLM(confLocal, name);
        final Thread currentThread = Thread.currentThread();
        final int segmentSize = 10;
        final int numSegments = 3;
        final CountDownLatch latch = new CountDownLatch(1);
        final CountDownLatch readLatch = new CountDownLatch(1);
        final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.schedule(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            int txid = 1;
                            for (long i = 0; i < numSegments; i++) {
                                BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned();
                                for (long j = 1; j <= segmentSize; j++) {
                                    writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++));
                                    if ((i == 0) && (j == 1)) {
                                        latch.countDown();
                                    } else {
                                        // wait for reader to start
                                        readLatch.await();
                                    }
                                }
                                writer.closeAndComplete();
                                Thread.sleep(100);
                            }
                        } catch (Exception exc) {
                            if (!executor.isShutdown()) {
                                currentThread.interrupt();
                            }
                        }
                    }
                }, 0, TimeUnit.MILLISECONDS);

        latch.await();
        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        reader.disableReadAheadLogSegmentsNotification();
        boolean exceptionEncountered = false;
        int recordCount = 0;
        try {
            while (true) {
                CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
                Utils.ioResult(record);
                if (recordCount == 0) {
                    readLatch.countDown();
                }
                recordCount++;

                if (recordCount >= segmentSize * numSegments) {
                    break;
                }
            }
        } catch (IdleReaderException exc) {
            exceptionEncountered = true;
        }
        assertTrue(!exceptionEncountered);
        Assert.assertEquals(recordCount, segmentSize * numSegments);
        assertTrue(!currentThread.isInterrupted());
        Utils.close(reader);
        executor.shutdown();
    }

    @Test(timeout = 60000)
    public void testGetLastTxId() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();

        int numRecords = 10;
        for (int i = 0; i < numRecords; i++) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
            assertEquals("last tx id should become " + i,
                    i, writer.getLastTxId());
        }
        // open a writer to recover the inprogress log segment
        AsyncLogWriter recoverWriter = dlm.startAsyncLogSegmentNonPartitioned();
        assertEquals("recovered last tx id should be " + (numRecords - 1),
                numRecords - 1, recoverWriter.getLastTxId());
    }

    @Test(timeout = 60000)
    public void testMaxReadAheadRecords() throws Exception {
        int maxRecords = 1;
        int batchSize = 8;
        int maxAllowedCachedRecords = maxRecords + batchSize - 1;

        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(Integer.MAX_VALUE);
        confLocal.setReadAheadMaxRecords(maxRecords);
        confLocal.setReadAheadBatchSize(batchSize);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();

        int numRecords = 40;
        for (int i = 1; i <= numRecords; i++) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
            assertEquals("last tx id should become " + i,
                    i, writer.getLastTxId());
        }
        LogRecord record = DLMTestUtil.getLogRecordInstance(numRecords);
        record.setControl();
        Utils.ioResult(writer.write(record));

        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        record = Utils.ioResult(reader.readNext());
        LOG.info("Read record {}", record);
        assertEquals(1L, record.getTransactionId());

        assertNotNull(reader.getReadAheadReader());
        assertTrue(reader.getReadAheadReader().getNumCachedEntries() <= maxAllowedCachedRecords);

        for (int i = 2; i <= numRecords; i++) {
            record = Utils.ioResult(reader.readNext());
            LOG.info("Read record {}", record);
            assertEquals((long) i, record.getTransactionId());
            TimeUnit.MILLISECONDS.sleep(20);
            int numCachedEntries = reader.getReadAheadReader().getNumCachedEntries();
            assertTrue("Should cache less than " + batchSize + " records but already found "
                    + numCachedEntries + " records when reading " + i + "th record",
                    numCachedEntries <= maxAllowedCachedRecords);
        }
        Utils.close(reader);
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStream() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();

        final int numRecords = 10;
        int i = 1;
        for (; i <= numRecords; i++) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
            assertEquals("last tx id should become " + i,
                    i, writer.getLastTxId());
        }

        Utils.ioResult(writer.markEndOfStream());

        // Multiple end of streams are ok.
        Utils.ioResult(writer.markEndOfStream());

        try {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
            fail("Should have thrown");
        } catch (EndOfStreamException ex) {
        }

        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        LogRecord record = null;
        for (int j = 0; j < numRecords; j++) {
            record = Utils.ioResult(reader.readNext());
            assertEquals(j + 1, record.getTransactionId());
        }

        try {
            record = Utils.ioResult(reader.readNext());
            fail("Should have thrown");
        } catch (EndOfStreamException ex) {
        }
        Utils.close(reader);
    }

    @Test(timeout = 60000)
    public void testMarkEndOfStreamAtBeginningOfSegment() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.markEndOfStream());
        try {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
            fail("Should have thrown");
        } catch (EndOfStreamException ex) {
        }
        writer.close();

        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        try {
            LogRecord record = Utils.ioResult(reader.readNext());
            fail("Should have thrown");
        } catch (EndOfStreamException ex) {
        }
        Utils.close(reader);
    }

    @Test(timeout = 60000)
    public void testBulkReadWaitingMoreRecords() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L);
        controlRecord.setControl();
        Utils.ioResult(writer.write(controlRecord));

        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture =
                reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();

        // write another records
        for (int i = 0; i < 5; i++) {
            long txid = 2L + i;
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
            controlRecord = DLMTestUtil.getLogRecordInstance(txid);
            controlRecord.setControl();
            Utils.ioResult(writer.write(controlRecord));
        }

        List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture);
        assertEquals(2, bulkReadRecords.size());
        assertEquals(1L, bulkReadRecords.get(0).getTransactionId());
        assertEquals(2L, bulkReadRecords.get(1).getTransactionId());
        for (LogRecordWithDLSN record : bulkReadRecords) {
            DLMTestUtil.verifyLogRecord(record);
        }
        LogRecordWithDLSN record = Utils.ioResult(readFuture);
        assertEquals(3L, record.getTransactionId());
        DLMTestUtil.verifyLogRecord(record);

        Utils.close(reader);
        writer.close();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testBulkReadNotWaitingMoreRecords() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L);
        controlRecord.setControl();
        Utils.ioResult(writer.write(controlRecord));

        BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
        CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
        CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();

        List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture);
        assertEquals(1, bulkReadRecords.size());
        assertEquals(1L, bulkReadRecords.get(0).getTransactionId());
        for (LogRecordWithDLSN record : bulkReadRecords) {
            DLMTestUtil.verifyLogRecord(record);
        }

        // write another records
        for (int i = 0; i < 5; i++) {
            long txid = 2L + i;
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
            controlRecord = DLMTestUtil.getLogRecordInstance(txid);
            controlRecord.setControl();
            Utils.ioResult(writer.write(controlRecord));
        }

        LogRecordWithDLSN record = Utils.ioResult(readFuture);
        assertEquals(2L, record.getTransactionId());
        DLMTestUtil.verifyLogRecord(record);

        Utils.close(reader);
        writer.close();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntries() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);

        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setPositionGapDetectionEnabled(false);
        confLocal.setReadAheadSkipBrokenEntries(true);
        confLocal.setEIInjectReadAheadBrokenEntries(true);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 10;

        long txid = 1L;
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false);

        AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);

        // 3 segments, 10 records each, immediate flush, batch size 1, so just the first
        // record in each ledger is discarded, for 30 - 3 = 27 records.
        for (int i = 0; i < 27; i++) {
            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
            assertFalse(record.getDlsn().getEntryId() % 10 == 0);
        }

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntriesWithGapDetection() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);

        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(1);
        confLocal.setPositionGapDetectionEnabled(true);
        confLocal.setReadAheadSkipBrokenEntries(true);
        confLocal.setEIInjectReadAheadBrokenEntries(true);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 1;
        int numRecordsPerLogSegment = 100;

        long txid = 1L;
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false);

        AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);

        try {
            // 3 segments, 10 records each, immediate flush, batch size 1, so just the first
            // record in each ledger is discarded, for 30 - 3 = 27 records.
            for (int i = 0; i < 30; i++) {
                LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
                assertFalse(record.getDlsn().getEntryId() % 10 == 0);
            }
            fail("should have thrown");
        } catch (DLIllegalStateException e) {
        }

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntriesAndLargeBatchSize() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);

        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(5);
        confLocal.setPositionGapDetectionEnabled(false);
        confLocal.setReadAheadSkipBrokenEntries(true);
        confLocal.setEIInjectReadAheadBrokenEntries(true);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 1;
        int numRecordsPerLogSegment = 100;

        long txid = 1L;
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false);

        AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);

        // Every 10th record broken. Reading 5 at once, beginning from 0:
        // 1. range 0-4 will be corrupted and discarded
        // 2. ranges 1-5, 2-6, 3-7, 4-8, 5-9 will be ok
        // 3. ranges 6-10, 7-11, 8-12, 9-13 will be bad
        // And so on, so 5 records in each 10 will be discarded, for 50 good records.
        for (int i = 0; i < 50; i++) {
            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
            assertFalse(record.getDlsn().getEntryId() % 10 == 0);
        }

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testReadBrokenEntriesAndLargeBatchSizeCrossSegment() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.loadConf(testConf);

        confLocal.setOutputBufferSize(0);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setImmediateFlushEnabled(true);
        confLocal.setReadAheadWaitTime(10);
        confLocal.setReadAheadBatchSize(8);
        confLocal.setPositionGapDetectionEnabled(false);
        confLocal.setReadAheadSkipBrokenEntries(true);
        confLocal.setEIInjectReadAheadBrokenEntries(true);
        DistributedLogManager dlm = createNewDLM(confLocal, name);

        int numLogSegments = 3;
        int numRecordsPerLogSegment = 5;

        long txid = 1L;
        txid = writeRecords(dlm, numLogSegments, numRecordsPerLogSegment, txid, false);

        AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);

        // Every 10th record broken. Reading 8 at once, beginning from 0:
        // 1. range 0-7 will be corrupted and discarded
        // 2. range 1-8 will be good, but only contain 4 records
        // And so on for the next segment, so 4 records in each segment, for 12 good records
        for (int i = 0; i < 12; i++) {
            LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
            assertFalse(record.getDlsn().getEntryId() % 10 == 0);
        }

        Utils.close(reader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testCreateLogStreamWithDifferentReplicationFactor() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);

        ConcurrentBaseConfiguration baseConf = new ConcurrentConstConfiguration(confLocal);
        DynamicDistributedLogConfiguration dynConf = new DynamicDistributedLogConfiguration(baseConf);
        dynConf.setProperty(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE,
                DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT - 1);

        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);
        Namespace namespace = NamespaceBuilder.newBuilder()
                .conf(confLocal).uri(uri).build();

        // use the pool
        DistributedLogManager dlm = namespace.openLog(name + "-pool");
        AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        assertEquals(1, segments.size());
        long ledgerId = segments.get(0).getLogSegmentId();
        LedgerHandle lh = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get()
                .openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
        LedgerMetadata metadata = BookKeeperAccessor.getLedgerMetadata(lh);
        assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT, metadata.getEnsembleSize());
        lh.close();
        Utils.close(writer);
        dlm.close();

        // use customized configuration
        dlm = namespace.openLog(
                name + "-custom",
                java.util.Optional.empty(),
                java.util.Optional.of(dynConf),
                java.util.Optional.empty());
        writer = dlm.startAsyncLogSegmentNonPartitioned();
        Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
        segments = dlm.getLogSegments();
        assertEquals(1, segments.size());
        ledgerId = segments.get(0).getLogSegmentId();
        lh = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get()
                .openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
        metadata = BookKeeperAccessor.getLedgerMetadata(lh);
        assertEquals(DistributedLogConfiguration.BKDL_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT - 1, metadata.getEnsembleSize());
        lh.close();
        Utils.close(writer);
        dlm.close();
        namespace.close();
    }

    @Test(timeout = 60000)
    public void testWriteRecordSet() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);

        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
        List<CompletableFuture<DLSN>> writeFutures = Lists.newArrayList();
        for (int i = 0; i < 5; i++) {
            LogRecord record = DLMTestUtil.getLogRecordInstance(1L + i);
            writeFutures.add(writer.write(record));
        }
        List<CompletableFuture<DLSN>> recordSetFutures = Lists.newArrayList();
        // write another 5 records
        final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(4096, CompressionCodec.Type.LZ4);
        for (int i = 0; i < 5; i++) {
            LogRecord record = DLMTestUtil.getLogRecordInstance(6L + i);
            CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
            recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise);
            recordSetFutures.add(writePromise);
        }
        final ByteBuf recordSetBuffer = recordSetWriter.getBuffer();
        LogRecord setRecord = new LogRecord(6L, recordSetBuffer);
        setRecord.setRecordSet();
        CompletableFuture<DLSN> writeRecordSetFuture = writer.write(setRecord);
        writeRecordSetFuture.whenComplete(new FutureEventListener<DLSN>() {
            @Override
            public void onSuccess(DLSN dlsn) {
                recordSetWriter.completeTransmit(
                        dlsn.getLogSegmentSequenceNo(),
                        dlsn.getEntryId(),
                        dlsn.getSlotId());
            }

            @Override
            public void onFailure(Throwable cause) {
                recordSetWriter.abortTransmit(cause);
            }
        });
        writeFutures.add(writeRecordSetFuture);
        Utils.ioResult(writeRecordSetFuture);
        // write last 5 records
        for (int i = 0; i < 5; i++) {
            LogRecord record = DLMTestUtil.getLogRecordInstance(11L + i);
            CompletableFuture<DLSN> writeFuture = writer.write(record);
            writeFutures.add(writeFuture);
            // make sure get log record count returns the right count
            if (i == 0) {
                Utils.ioResult(writeFuture);
                assertEquals(10, dlm.getLogRecordCount());
            }
        }

        List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writeFutures));

        for (int i = 0; i < 5; i++) {
            Assert.assertEquals(new DLSN(1L, i, 0L), writeResults.get(i));
        }
        Assert.assertEquals(new DLSN(1L, 5L, 0L), writeResults.get(5));
        for (int i = 0; i < 5; i++) {
            Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i));
        }
        List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetFutures));
        for (int i = 0; i < 5; i++) {
            Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i));
        }

        Utils.ioResult(writer.flushAndCommit());

        DistributedLogConfiguration readConf1 = new DistributedLogConfiguration();
        readConf1.addConfiguration(confLocal);
        readConf1.setDeserializeRecordSetOnReads(true);

        DistributedLogManager readDLM1 = createNewDLM(readConf1, name);
        AsyncLogReader reader1 = readDLM1.getAsyncLogReader(DLSN.InitialDLSN);
        for (int i = 0; i < 15; i++) {
            LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
            if (i < 5) {
                assertEquals(new DLSN(1L, i, 0L), record.getDlsn());
                assertEquals(1L + i, record.getTransactionId());
            } else if (i >= 10) {
                assertEquals(new DLSN(1L, 6L + i - 10, 0L), record.getDlsn());
                assertEquals(11L + i - 10, record.getTransactionId());
            } else {
                assertEquals(new DLSN(1L, 5L, i - 5), record.getDlsn());
                assertEquals(6L, record.getTransactionId());
            }
            assertEquals(i + 1, record.getPositionWithinLogSegment());
            assertArrayEquals(DLMTestUtil.generatePayload(i + 1), record.getPayload());
        }
        Utils.close(reader1);
        readDLM1.close();

        DistributedLogConfiguration readConf2 = new DistributedLogConfiguration();
        readConf2.addConfiguration(confLocal);
        readConf2.setDeserializeRecordSetOnReads(false);

        DistributedLogManager readDLM2 = createNewDLM(readConf2, name);
        AsyncLogReader reader2 = readDLM2.getAsyncLogReader(DLSN.InitialDLSN);
        for (int i = 0; i < 11; i++) {
            LogRecordWithDLSN record = Utils.ioResult(reader2.readNext());
            LOG.info("Read record {}", record);
            if (i < 5) {
                assertEquals(new DLSN(1L, i, 0L), record.getDlsn());
                assertEquals(1L + i, record.getTransactionId());
                assertEquals(i + 1, record.getPositionWithinLogSegment());
                assertArrayEquals(DLMTestUtil.generatePayload(i + 1), record.getPayload());
            } else if (i >= 6L) {
                assertEquals(new DLSN(1L, 6L + i - 6, 0L), record.getDlsn());
                assertEquals(11L + i - 6, record.getTransactionId());
                assertEquals(11 + i - 6, record.getPositionWithinLogSegment());
                assertArrayEquals(DLMTestUtil.generatePayload(11L + i - 6), record.getPayload());
            } else {
                assertEquals(new DLSN(1L, 5L, 0), record.getDlsn());
                assertEquals(6L, record.getTransactionId());
                assertEquals(6, record.getPositionWithinLogSegment());
                assertTrue(record.isRecordSet());
                assertEquals(5, LogRecordSet.numRecords(record));
            }
        }
        Utils.close(reader2);
        readDLM2.close();
    }

    @Test(timeout = 60000)
    public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setPeriodicKeepAliveMilliSeconds(0);
        confLocal.setReadLACLongPollTimeout(9);
        confLocal.setReaderIdleWarnThresholdMillis(20);
        confLocal.setReaderIdleErrorThresholdMillis(40);

        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));

        AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN));
        try {
            Utils.ioResult(reader.readNext());
            fail("Should fail when stream is idle");
        } catch (IdleReaderException ire) {
            // expected
        }
        Utils.close(reader);
        writer.close();
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception {
        String name = runtime.getMethodName();
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(testConf);
        confLocal.setOutputBufferSize(0);
        confLocal.setImmediateFlushEnabled(false);
        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
        confLocal.setPeriodicKeepAliveMilliSeconds(1000);
        confLocal.setReadLACLongPollTimeout(999);
        confLocal.setReaderIdleWarnThresholdMillis(2000);
        confLocal.setReaderIdleErrorThresholdMillis(4000);

        URI uri = createDLMURI("/" + name);
        ensureURICreated(uri);

        DistributedLogManager dlm = createNewDLM(confLocal, name);
        BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter());
        writer.write(DLMTestUtil.getLogRecordInstance(1L));

        AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN));
        LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
        assertEquals(1L, record.getTransactionId());
        DLMTestUtil.verifyLogRecord(record);

        Utils.close(reader);
        writer.close();
        dlm.close();
    }
}
