/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Ticker;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.IdleReaderException;
import org.apache.distributedlog.util.Utils;




/**
 * Synchronous Log Reader based on {@link AsyncLogReader}.
 */
class BKSyncLogReader implements LogReader, AsyncNotification {

    private final BKDistributedLogManager bkdlm;
    private final BKLogReadHandler readHandler;
    private final AtomicReference<IOException> readerException =
            new AtomicReference<IOException>(null);
    private final int maxReadAheadWaitTime;
    private CompletableFuture<Void> closeFuture;
    private final Optional<Long> startTransactionId;
    private boolean positioned = false;
    private Entry.Reader currentEntry = null;

    // readahead reader
    ReadAheadEntryReader readAheadReader = null;

    // idle reader settings
    private final boolean shouldCheckIdleReader;
    private final int idleErrorThresholdMillis;

    // Stats
    private final Counter idleReaderError;

    BKSyncLogReader(DistributedLogConfiguration conf,
                    BKDistributedLogManager bkdlm,
                    DLSN startDLSN,
                    Optional<Long> startTransactionId,
                    StatsLogger statsLogger) throws IOException {
        this.bkdlm = bkdlm;
        this.readHandler = bkdlm.createReadHandler(
                Optional.<String>absent(),
                this,
                true);
        this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
        this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis();
        this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE;
        this.startTransactionId = startTransactionId;

        // start readahead
        startReadAhead(startDLSN);
        if (!startTransactionId.isPresent()) {
            positioned = true;
        }

        // Stats
        StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
        idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error");
    }

    private void startReadAhead(DLSN startDLSN) throws IOException {
        readAheadReader = new ReadAheadEntryReader(
                    bkdlm.getStreamName(),
                    startDLSN,
                    bkdlm.getConf(),
                    readHandler,
                    bkdlm.getReaderEntryStore(),
                    bkdlm.getScheduler(),
                    Ticker.systemTicker(),
                    bkdlm.alertStatsLogger);
        readHandler.registerListener(readAheadReader);
        readHandler.asyncStartFetchLogSegments()
                .thenApply(logSegments -> {
                    readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
                    readAheadReader.start(logSegments.getValue());
                    return null;
                });
    }

    synchronized void releaseCurrentEntry() {
        if (null != currentEntry) {
            currentEntry.release();
            currentEntry = null;
        }
    }

    synchronized void checkClosedOrException() throws IOException {
        if (null != closeFuture) {
            throw new IOException("Reader is closed");
        }
        if (null != readerException.get()) {
            throw readerException.get();
        }
    }

    @VisibleForTesting
    ReadAheadEntryReader getReadAheadReader() {
        return readAheadReader;
    }

    @VisibleForTesting
    BKLogReadHandler getReadHandler() {
        return readHandler;
    }

    private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException {
        Entry.Reader entry = null;
        if (nonBlocking) {
            return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS);
        } else {
            while (!readAheadReader.isReadAheadCaughtUp()
                    && null == readerException.get()
                    && null == entry) {
                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
            }
            if (null != entry) {
                return entry;
            }
            // reader is caught up
            if (readAheadReader.isReadAheadCaughtUp()
                    && null == readerException.get()) {
                entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS);
            }
            return entry;
        }
    }

    private void markReaderAsIdle() throws IdleReaderException {
        idleReaderError.inc();
        IdleReaderException ire = new IdleReaderException("Sync reader on stream "
                + readHandler.getFullyQualifiedName()
                + " is idle for more than " + idleErrorThresholdMillis + " ms");
        readerException.compareAndSet(null, ire);
        throw ire;
    }

    @Override
    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
            throws IOException {
        checkClosedOrException();

        LogRecordWithDLSN record = doReadNext(nonBlocking);
        // no record is returned, check if the reader becomes idle
        if (null == record && shouldCheckIdleReader) {
            if (readAheadReader.getNumCachedEntries() <= 0
                    && readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) {
                markReaderAsIdle();
            }
        }
        return record;
    }

    private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException {
        LogRecordWithDLSN record = null;

        do {
            // fetch one record until we don't find any entry available in the readahead cache
            while (null == record) {
                if (null == currentEntry) {
                    currentEntry = readNextEntry(nonBlocking);
                    if (null == currentEntry) {
                        return null;
                    }
                }
                record = currentEntry.nextRecord();
                if (null == record) {
                    currentEntry = null;
                }
            }

            // check if we reached the end of stream
            if (record.isEndOfStream()) {
                EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for "
                        + readHandler.getFullyQualifiedName());
                readerException.compareAndSet(null, eos);
                throw eos;
            }
            // skip control records
            if (record.isControl()) {
                record = null;
                continue;
            }
            if (!positioned) {
                if (record.getTransactionId() < startTransactionId.get()) {
                    record = null;
                    continue;
                } else {
                    positioned = true;
                    break;
                }
            } else {
                break;
            }
        } while (true);
        return record;
    }

    @Override
    public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords)
            throws IOException {
        LinkedList<LogRecordWithDLSN> retList =
                new LinkedList<LogRecordWithDLSN>();

        int numRead = 0;
        LogRecordWithDLSN record = readNext(nonBlocking);
        while ((null != record)) {
            retList.add(record);
            numRead++;
            if (numRead >= numLogRecords) {
                break;
            }
            record = readNext(nonBlocking);
        }
        return retList;
    }

    @Override
    public CompletableFuture<Void> asyncClose() {
        CompletableFuture<Void> closePromise;
        synchronized (this) {
            if (null != closeFuture) {
                return closeFuture;
            }
            closeFuture = closePromise = new CompletableFuture<Void>();
            releaseCurrentEntry();
        }
        readHandler.unregisterListener(readAheadReader);
        readAheadReader.removeStateChangeNotification(this);
        FutureUtils.proxyTo(
            Utils.closeSequence(bkdlm.getScheduler(), true,
                    readAheadReader,
                    readHandler
            ),
            closePromise);
        return closePromise;
    }

    @Override
    public void close() throws IOException {
        Utils.ioResult(asyncClose());
    }

    //
    // Notification From ReadHandler
    //

    @Override
    public void notifyOnError(Throwable cause) {
        if (cause instanceof IOException) {
            readerException.compareAndSet(null, (IOException) cause);
        } else {
            readerException.compareAndSet(null, new IOException(cause));
        }
    }

    @Override
    public void notifyOnOperationComplete() {
        // no-op
    }
}
