/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import static org.junit.Assert.*;
import com.google.common.base.Optional;
import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;


/**
 * Test Case {@link ReadAheadEntryReader}.
 */
public class TestReadAheadEntryReader extends TestDistributedLogBase {

    private static final int MAX_CACHED_ENTRIES = 5;
    private static final int NUM_PREFETCH_ENTRIES = 10;

    @Rule
    public TestName runtime = new TestName();
    private DistributedLogConfiguration baseConf;
    private OrderedScheduler scheduler;
    private BookKeeperClient bkc;
    private ZooKeeperClient zkc;

    @Before
    public void setup() throws Exception {
        super.setup();
        baseConf = new DistributedLogConfiguration();
        baseConf.addConfiguration(conf);
        baseConf.setOutputBufferSize(0);
        baseConf.setPeriodicFlushFrequencyMilliSeconds(0);
        baseConf.setImmediateFlushEnabled(false);
        baseConf.setReadAheadMaxRecords(MAX_CACHED_ENTRIES);
        baseConf.setNumPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES);
        baseConf.setMaxPrefetchEntriesPerLogSegment(NUM_PREFETCH_ENTRIES);
        zkc = ZooKeeperClientBuilder.newBuilder()
                .name("test-zk")
                .zkServers(bkutil.getZkServers())
                .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
                .zkAclId(conf.getZkAclId())
                .build();
        bkc = BookKeeperClientBuilder.newBuilder()
                .name("test-bk")
                .dlConfig(conf)
                .ledgersPath("/ledgers")
                .zkServers(bkutil.getZkServers())
                .build();
        scheduler = OrderedScheduler.newBuilder()
                .name("test-read-ahead-entry-reader")
                .corePoolSize(1)
                .build();
    }

    @After
    public void teardown() throws Exception {
        if (null != bkc) {
            bkc.close();
        }
        if (null != scheduler) {
            scheduler.shutdown();
        }
        if (null != zkc) {
            zkc.close();
        }
        super.teardown();
    }

    private ReadAheadEntryReader createEntryReader(String streamName,
                                                   DLSN fromDLSN,
                                                   BKDistributedLogManager dlm,
                                                   DistributedLogConfiguration conf)
            throws Exception {
        BKLogReadHandler readHandler = dlm.createReadHandler(
                Optional.<String>absent(),
                true);
        LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
                conf,
                ConfUtils.getConstDynConf(conf),
                zkc,
                bkc,
                scheduler,
                null,
                NullStatsLogger.INSTANCE,
                AsyncFailureInjector.NULL);
        return new ReadAheadEntryReader(
                streamName,
                fromDLSN,
                conf,
                readHandler,
                entryStore,
                scheduler,
                Ticker.systemTicker(),
                new AlertStatsLogger(NullStatsLogger.INSTANCE, "test-alert"));
    }

    private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
        final CompletableFuture<Void> promise = new CompletableFuture<Void>();
        scheduler.submit(streamName, new Runnable() {
            @Override
            public void run() {
                FutureUtils.complete(promise, null);
            }
        });
        Utils.ioResult(promise);
    }

    void generateCompletedLogSegments(DistributedLogManager dlm,
                                      long numCompletedSegments,
                                      long segmentSize) throws Exception {
        generateCompletedLogSegments(dlm, numCompletedSegments, segmentSize, 1L);
    }

    void generateCompletedLogSegments(DistributedLogManager dlm,
                                      long numCompletedSegments,
                                      long segmentSize,
                                      long startTxId) throws Exception {

        long txid = startTxId;
        for (long i = 0; i < numCompletedSegments; i++) {
            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
            for (long j = 1; j <= segmentSize; j++) {
                Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
                LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
                ctrlRecord.setControl();
                Utils.ioResult(writer.write(ctrlRecord));
            }
            Utils.close(writer);
        }
    }

    AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
                                              DistributedLogConfiguration conf,
                                              long segmentSize) throws Exception {
        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
        for (long i = 1L; i <= segmentSize; i++) {
            Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
            LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
            ctrlRecord.setControl();
            Utils.ioResult(writer.write(ctrlRecord));
        }
        return writer;
    }

    void expectAlreadyTruncatedTransactionException(ReadAheadEntryReader reader,
                                                    String errMsg)
            throws Exception {
        try {
            reader.checkLastException();
            fail(errMsg);
        } catch (AlreadyTruncatedTransactionException atte) {
            // expected
        }
    }

    void expectIllegalStateException(ReadAheadEntryReader reader,
                                     String errMsg)
            throws Exception {
        try {
            reader.checkLastException();
            fail(errMsg);
        } catch (DLIllegalStateException le) {
            // expected
        }
    }

    void expectNoException(ReadAheadEntryReader reader) throws Exception {
        reader.checkLastException();
    }

    //
    // Test Positioning
    //

    @Test(timeout = 60000)
    public void testStartWithEmptySegmentList() throws Exception {
        String streamName = runtime.getMethodName();
        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);
        ReadAheadEntryReader readAheadEntryReader =
                createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf);

        readAheadEntryReader.start(Lists.<LogSegmentMetadata>newArrayList());

        ensureOrderSchedulerEmpty(streamName);
        assertFalse("ReadAhead should not be initialized with empty segment list",
                readAheadEntryReader.isInitialized());
        assertTrue("ReadAhead should be empty when it isn't initialized",
                readAheadEntryReader.isCacheEmpty());
        assertFalse("ReadAhead should not be marked as caught up when it isn't initialized",
                readAheadEntryReader.isReadAheadCaughtUp());

        // generate list of log segments
        generateCompletedLogSegments(dlm, 1, MAX_CACHED_ENTRIES / 2 + 1);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        assertEquals(segments.size() + " log segments found, expected to be only one",
                1, segments.size());

        // notify the readahead reader with new segment lsit
        readAheadEntryReader.onSegmentsUpdated(segments);

        // check the reader state after initialization
        ensureOrderSchedulerEmpty(streamName);
        assertTrue("ReadAhead should be initialized with non-empty segment list",
                readAheadEntryReader.isInitialized());
        assertNotNull("current segment reader should be initialized",
                readAheadEntryReader.getCurrentSegmentReader());
        assertEquals("current segment sequence number should be " + segments.get(0).getLogSegmentSequenceNumber(),
                segments.get(0).getLogSegmentSequenceNumber(), readAheadEntryReader.getCurrentSegmentSequenceNumber());
        assertNull("there should be no next segment reader",
                readAheadEntryReader.getNextSegmentReader());
        assertTrue("there should be no remaining segment readers",
                readAheadEntryReader.getSegmentReaders().isEmpty());

        Utils.close(readAheadEntryReader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testInitializeMultipleClosedLogSegments0() throws Exception {
        // 5 completed log segments, start from the begin
        testInitializeMultipleClosedLogSegments(5, DLSN.InitialDLSN, 0);
    }

    @Test(timeout = 60000)
    public void testInitializeMultipleClosedLogSegments1() throws Exception {
        // 5 completed log segments, start from the 4th segment and it should skip first 3 log segments
        testInitializeMultipleClosedLogSegments(5, new DLSN(4L, 0L, 0L), 3);
    }

    private void testInitializeMultipleClosedLogSegments(
            int numLogSegments,
            DLSN fromDLSN,
            int expectedCurrentSegmentIdx
    ) throws Exception {
        String streamName = runtime.getMethodName();
        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);

        // generate list of log segments
        generateCompletedLogSegments(dlm, 1, MAX_CACHED_ENTRIES / 2 + 1, 1L);
        generateCompletedLogSegments(dlm, numLogSegments - 1, 1, MAX_CACHED_ENTRIES + 2);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();
        assertEquals(segments.size() + " log segments found, expected to be " + numLogSegments,
                numLogSegments, segments.size());

        ReadAheadEntryReader readAheadEntryReader =
                createEntryReader(streamName, fromDLSN, dlm, baseConf);
        readAheadEntryReader.start(segments);

        ensureOrderSchedulerEmpty(streamName);
        assertTrue("ReadAhead should be initialized with non-empty segment list",
                readAheadEntryReader.isInitialized());
        assertNotNull("current segment reader should be initialized",
                readAheadEntryReader.getCurrentSegmentReader());
        assertTrue("current segment reader should be open and started",
                readAheadEntryReader.getCurrentSegmentReader().isReaderOpen()
                        && readAheadEntryReader.getCurrentSegmentReader().isReaderStarted());
        assertEquals("current segment reader should read " + segments.get(expectedCurrentSegmentIdx),
                segments.get(expectedCurrentSegmentIdx),
                readAheadEntryReader.getCurrentSegmentReader().getSegment());
        assertEquals("current segment sequence number should be "
                + segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
                segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
                readAheadEntryReader.getCurrentSegmentSequenceNumber());
        assertNull("next segment reader should not be initialized since it is a closed log segment",
                readAheadEntryReader.getNextSegmentReader());
        assertEquals("there should be " + (numLogSegments - (expectedCurrentSegmentIdx + 1))
                + " remaining segment readers",
                numLogSegments - (expectedCurrentSegmentIdx + 1),
                readAheadEntryReader.getSegmentReaders().size());
        int segmentIdx = expectedCurrentSegmentIdx + 1;
        for (ReadAheadEntryReader.SegmentReader reader : readAheadEntryReader.getSegmentReaders()) {
            LogSegmentMetadata expectedSegment = segments.get(segmentIdx);
            assertEquals("Segment should " + expectedSegment,
                    expectedSegment, reader.getSegment());
            assertTrue("Segment reader for " + expectedSegment + " should be open",
                    reader.isReaderOpen());
            assertFalse("Segment reader for " + expectedSegment + " should not be started",
                    reader.isReaderStarted());
            ++segmentIdx;
        }

        Utils.close(readAheadEntryReader);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testPositioningAtInvalidLogSegment() throws Exception {
        String streamName = runtime.getMethodName();
        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);

        // generate list of log segments
        generateCompletedLogSegments(dlm, 3, 3);
        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
        Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L)));

        List<LogSegmentMetadata> segments = dlm.getLogSegments();

        // positioning on a truncated log segment (segment 1)
        ReadAheadEntryReader readAheadEntryReader =
                createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf);
        readAheadEntryReader.start(segments);
        // ensure initialization to complete
        ensureOrderSchedulerEmpty(streamName);
        expectNoException(readAheadEntryReader);
        Entry.Reader entryReader =
                readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        assertEquals(2L, entryReader.getLSSN());
        assertEquals(1L, entryReader.getEntryId());
        entryReader.release();
        Utils.close(readAheadEntryReader);

        // positioning on a partially truncated log segment (segment 2) before min active dlsn
        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, baseConf);
        readAheadEntryReader.start(segments);
        // ensure initialization to complete
        ensureOrderSchedulerEmpty(streamName);
        expectNoException(readAheadEntryReader);
        entryReader =
                readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        assertEquals(2L, entryReader.getLSSN());
        assertEquals(1L, entryReader.getEntryId());
        entryReader.release();
        Utils.close(readAheadEntryReader);

        // positioning on a partially truncated log segment (segment 2) after min active dlsn
        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 2L, 0L), dlm, baseConf);
        readAheadEntryReader.start(segments);
        // ensure initialization to complete
        ensureOrderSchedulerEmpty(streamName);
        expectNoException(readAheadEntryReader);
        entryReader =
                readAheadEntryReader.getNextReadAheadEntry(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        assertEquals(2L, entryReader.getLSSN());
        assertEquals(2L, entryReader.getEntryId());
        entryReader.release();
        Utils.close(readAheadEntryReader);

        Utils.close(writer);
        dlm.close();
    }

    @Test(timeout = 60000)
    public void testPositioningIgnoreTruncationStatus() throws Exception {
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(baseConf);
        confLocal.setIgnoreTruncationStatus(true);

        String streamName = runtime.getMethodName();
        BKDistributedLogManager dlm = createNewDLM(confLocal, streamName);

        // generate list of log segments
        generateCompletedLogSegments(dlm, 3, 2);
        AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
        Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L)));

        List<LogSegmentMetadata> segments = dlm.getLogSegments();

        // positioning on a truncated log segment (segment 1)
        ReadAheadEntryReader readAheadEntryReader =
                createEntryReader(streamName, DLSN.InitialDLSN, dlm, confLocal);
        readAheadEntryReader.start(segments);
        // ensure initialization to complete
        ensureOrderSchedulerEmpty(streamName);
        expectNoException(readAheadEntryReader);
        Utils.close(readAheadEntryReader);

        // positioning on a partially truncated log segment (segment 2) before min active dlsn
        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 0L, 0L), dlm, confLocal);
        readAheadEntryReader.start(segments);
        // ensure initialization to complete
        ensureOrderSchedulerEmpty(streamName);
        expectNoException(readAheadEntryReader);
        Utils.close(readAheadEntryReader);

        // positioning on a partially truncated log segment (segment 2) after min active dlsn
        readAheadEntryReader = createEntryReader(streamName, new DLSN(2L, 1L, 0L), dlm, confLocal);
        readAheadEntryReader.start(segments);
        // ensure initialization to complete
        ensureOrderSchedulerEmpty(streamName);
        expectNoException(readAheadEntryReader);
        Utils.close(readAheadEntryReader);

        Utils.close(writer);
        dlm.close();
    }

    //
    // Test Reinitialization
    //

    @Test(timeout = 60000)
    public void testLogSegmentSequenceNumberGap() throws Exception {
        String streamName = runtime.getMethodName();
        BKDistributedLogManager dlm = createNewDLM(baseConf, streamName);

        // generate list of log segments
        generateCompletedLogSegments(dlm, 3, 2);
        List<LogSegmentMetadata> segments = dlm.getLogSegments();

        ReadAheadEntryReader readAheadEntryReader =
                createEntryReader(streamName, DLSN.InitialDLSN, dlm, baseConf);
        readAheadEntryReader.start(segments.subList(0, 1));
        int expectedCurrentSegmentIdx = 0;
        ensureOrderSchedulerEmpty(streamName);
        assertTrue("ReadAhead should be initialized with non-empty segment list",
                readAheadEntryReader.isInitialized());
        assertNotNull("current segment reader should be initialized",
                readAheadEntryReader.getCurrentSegmentReader());
        assertTrue("current segment reader should be open and started",
                readAheadEntryReader.getCurrentSegmentReader().isReaderOpen()
                        && readAheadEntryReader.getCurrentSegmentReader().isReaderStarted());
        assertEquals("current segment reader should read " + segments.get(expectedCurrentSegmentIdx),
                segments.get(expectedCurrentSegmentIdx),
                readAheadEntryReader.getCurrentSegmentReader().getSegment());
        assertEquals("current segment sequence number should be "
                + segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
                segments.get(expectedCurrentSegmentIdx).getLogSegmentSequenceNumber(),
                readAheadEntryReader.getCurrentSegmentSequenceNumber());
        assertNull("next segment reader should not be initialized since it is a closed log segment",
                readAheadEntryReader.getNextSegmentReader());

        readAheadEntryReader.onSegmentsUpdated(segments.subList(2, 3));
        ensureOrderSchedulerEmpty(streamName);
        expectIllegalStateException(readAheadEntryReader,
                "inconsistent log segment found");

        Utils.close(readAheadEntryReader);
        dlm.close();
    }

}
