/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.api;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.io.AsyncCloseable;

/**
 * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
 *
 * <h3>Lifecycle of a Reader</h3>
 *
 * A reader is a <i>sequential</i> reader that read records from a DL log starting
 * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
 * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
 * <p>
 * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
 * to read records out the log from provided position.
 * <p>
 * Closing the reader (via {@link #close()} will release all the resources occupied
 * by this reader instance.
 * <p>
 * Exceptions could be thrown during reading records. Once the exception is thrown,
 * the reader is set to an error state and it isn't usable anymore. It is the application's
 * responsibility to handle the exceptions and re-create readers if necessary.
 * <p>
 * Example:
 * <pre>
 * DistributedLogManager dlm = ...;
 * long nextTxId = ...;
 * LogReader reader = dlm.getInputStream(nextTxId);
 *
 * while (true) { // keep reading & processing records
 *     LogRecord record;
 *     try {
 *         record = reader.readNext(false);
 *         nextTxId = record.getTransactionId();
 *         // process the record
 *         ...
 *     } catch (IOException ioe) {
 *         // handle the exception
 *         ...
 *         reader = dlm.getInputStream(nextTxId + 1);
 *     }
 * }
 *
 * </pre>
 *
 * <h3>Read Records</h3>
 *
 * Reading records from an <i>endless</i> log in `synchronous` way isn't as
 * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
 * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
 * controlling the <i>waiting</i> behavior on `synchronous` reads.
 *
 * <h4>Blocking vs NonBlocking</h4>
 *
 * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
 * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
 * means the reads will only check readahead cache and return whatever records
 * available in the readahead cache.
 * <p>
 * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
 * catching up with writer (there are records in the log), the read call will
 * wait until records are read and returned. If the reader is caught up with
 * writer (there are no more records in the log at read time), the read call
 * will wait for a small period of time (defined in
 * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
 * records available in the readahead cache. In other words, if a reader sees
 * no record on blocking reads, it means the reader is `caught-up` with the
 * writer.
 * <p>
 * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
 * state machines. Applications could use <i>blocking</i> reads till caught up
 * with latest data. Once they are caught up with latest data, they could start
 * serving their service and turn to <i>non-blocking</i> read mode and tail read
 * data from the logs.
 * <p>
 * See examples below.
 *
 * <h4>Read Single Record</h4>
 *
 * {@link #readNext(boolean)} is reading individual records from a DL log.
 *
 * <pre>
 * LogReader reader = ...
 *
 * // keep reading records in blocking way until no records available in the log
 * LogRecord record = reader.readNext(false);
 * while (null != record) {
 *     // process the record
 *     ...
 *     // read next record
 *     records = reader.readNext(false);
 * }
 *
 * ...
 *
 * // reader is caught up with writer, doing non-blocking reads to tail the log
 * while (true) {
 *     record = reader.readNext(true)
 *     // process the new records
 *     ...
 * }
 * </pre>
 *
 * <h4>Read Batch of Records</h4>
 *
 * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
 * from a DL log.
 *
 * <pre>
 * LogReader reader = ...
 * int N = 10;
 *
 * // keep reading N records in blocking way until no records available in the log
 * List<LogRecord> records = reader.readBulk(false, N);
 * while (!records.isEmpty()) {
 *     // process the list of records
 *     ...
 *     if (records.size() < N) { // no more records available in the log
 *         break;
 *     }
 *     // read next N records
 *     records = reader.readBulk(false, N);
 * }
 *
 * ...
 *
 * // reader is caught up with writer, doing non-blocking reads to tail the log
 * while (true) {
 *     records = reader.readBulk(true, N)
 *     // process the new records
 *     ...
 * }
 *
 * </pre>
 *
 * <p>
 * NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
 * the {@link AsyncCloseable} interface so the reader could be closed asynchronously
 *
 * @see AsyncLogReader
 */
@Public
@Evolving
public interface LogReader extends Closeable, AsyncCloseable {

    /**
     * Read the next log record from the stream.
     * <p>
     * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
     * records from read ahead cache. It would return <i>null</i> if there isn't any records
     * available in the read ahead cache.
     * <p>
     * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
     * block until return a record if there are records in the stream (aka catching up).
     * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
     * milliseconds and return null if there isn't any more records in the stream.
     *
     * @param nonBlocking should the read make blocking calls to the backend or rely on the
     * readAhead cache
     * @return an operation from the stream or null if at end of stream
     * @throws IOException if there is an error reading from the stream
     */
    public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;

    /**
     * Read the next <i>numLogRecords</i> log records from the stream
     *
     * @param nonBlocking should the read make blocking calls to the backend or rely on the
     * readAhead cache
     * @param numLogRecords maximum number of log records returned by this call.
     * @return an operation from the stream or empty list if at end of stream
     * @throws IOException if there is an error reading from the stream
     * @see #readNext(boolean)
     */
    public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
}
