/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.scan;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
import org.apache.drill.exec.physical.impl.protocol.OperatorExec;
import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implementation of the revised scan operator that uses a mutator aware of
 * batch sizes. This is the successor to {@link ScanBatch} and should be used
 * by all new scan implementations.
 * <p>
 * The basic concept is to split the scan operator into layers:
 * <ul>
 * <li>The {@code OperatorRecordBatch} which implements Drill's Volcano-like
 * protocol.</li>
 * <li>The scan operator "wrapper" (this class) which implements actions for the
 * operator record batch specifically for scan. It iterates over readers,
 * delegating semantic work to other classes.</li>
 * <li>The implementation of per-reader semantics in the two EVF versions and
 * other ad-hoc implementations.</li>
 * <li>The result set loader and related classes which pack values into
 * value vectors.</li>
 * <li>Value vectors, which store the data.</li>
 * </ul>
 * <p>
 * The layered format can be confusing. However, each layer is somewhat
 * complex, so dividing the work into layers keeps the overall complexity
 * somewhat under control.
 *
 * <h4>Scanner Framework</h4>
 *
 * Acts as an adapter between the operator protocol and the row reader
 * protocol.
 * <p>
 * The scan operator itself is simply a framework for handling a set of readers;
 * it knows nothing other than the interfaces of the components it works with;
 * delegating all knowledge of schemas, projection, reading and the like to
 * implementations of those interfaces. Because that work is complex, a set
 * of frameworks exist to handle most common use cases, but a specialized reader
 * can create a framework or reader from scratch.
 * <p>
 * Error handling in this class is minimal: the enclosing record batch iterator
 * is responsible for handling exceptions. Error handling relies on the fact
 * that the iterator will call <tt>close()</tt> regardless of which exceptions
 * are thrown.
 *
 * <h4>Protocol</h4>
 *
 * The scanner works directly with two other interfaces
 * <p>
 * The {@link ScanOperatorEvents} implementation provides the set of readers to
 * use. This class can simply maintain a list, or can create the reader on
 * demand.
 * <p>
 * More subtly, the factory also handles projection issues and manages vectors
 * across subsequent readers. A number of factories are available for the most
 * common cases. Extend these to implement a version specific to a data source.
 * <p>
 * The {@link RowBatchReader} is a surprisingly minimal interface that
 * nonetheless captures the essence of reading a result set as a set of batches.
 * The factory implementations mentioned above implement this interface to provide
 * commonly-used services, the most important of which is access to a
 * {#link ResultSetLoader} to write values into value vectors.
 *
 * <h4>Schema Versions</h4>
 *
 * Readers may change schemas from time to time. To track such changes,
 * this implementation tracks a batch schema version, maintained by comparing
 * one schema with the next.
 * <p>
 * Readers can discover columns as they read data, such as with any JSON-based
 * format. In this case, the row set mutator also provides a schema version,
 * but a fine-grained one that changes each time a column is added.
 * <p>
 * The two schema versions serve different purposes and are not interchangeable.
 * For example, if a scan reads two files, both will build up their own schemas,
 * each increasing its internal version number as work proceeds. But, at the
 * end of each batch, the schemas may (and, in fact, should) be identical,
 * which is the schema version downstream operators care about.
 *
 * <h4>Empty Files and/or Empty Schemas</h4>
 *
 * A corner case occurs if the input is empty, such as a CSV file
 * that contains no data. The general rule is the following:
 *
 * <ul>
 * <li>If the reader is "early schema" (the schema is defined at
 * open time), then the result will be a single empty batch with
 * the schema defined. Example: a CSV file without headers; in this case,
 * we know the schema is always the single `columns` array.</li>
 * <li>If the reader is "late schema" (the schema is defined while the
 * data is read), then no batch is returned because there is no schema.
 * Example: a JSON file. It is not helpful to return a single batch
 * with no columns; such a batch will simply conflict with some other
 * non-empty-schema batch. It turns out that other DBs handle this
 * case gracefully: a query of the form<br><pre><tt>
 * SELECT * FROM VALUES()</tt></pre><br>
 * Will produce an empty result: no schema, no data.</li>
 * <li>The hybrid case: the reader could provide an early schema,
 * but cannot do so. That is, the early schema contains no columns.
 * We treat this case identically to the late schema case. Example: a
 * CSV file with headers in which the header line is empty.</li>
 * </ul>
 */
public class ScanOperatorExec implements OperatorExec {

  private enum State {

    /**
     * The scan has been started, but next() has not yet been
     * called.
     */
    START,

    /**
     * A reader is active and has more batches to deliver.
     */
    READER,

    /**
     * All readers are completed, non returned any data, but
     * the final reader did provide a schema. An empty batch
     * was returned from next(). The next call to next() will
     * be the last.
     */
    EMPTY,

    /**
     * All readers are complete; no more batches to deliver.
     * Or, hit the limit pushed down to the scan.
     * close() is not yet called.
     */
    END,

    /**
     * A fatal error occurred during the START or READER
     * states. No further calls to next() allowed. Waiting
     * for the call to close().
     */
    FAILED,

    /**
     * Scan operator is closed. All resources and state are
     * released. No further calls of any kind are allowed.
     */
    CLOSED
  }

  static final Logger logger = LoggerFactory.getLogger(ScanOperatorExec.class);

  private final ScanOperatorEvents factory;
  private final boolean allowEmptyResult;
  protected final VectorContainerAccessor containerAccessor = new VectorContainerAccessor();
  private State state = State.START;
  protected OperatorContext context;
  private int readerCount;
  private ReaderState readerState;

  public ScanOperatorExec(ScanOperatorEvents factory, boolean allowEmptyResult) {
    this.factory = factory;
    this.allowEmptyResult = allowEmptyResult;
  }

  @Override
  public void bind(OperatorContext context) {
    this.context = context;
    factory.bind(context);
  }

  @Override
  public BatchAccessor batchAccessor() { return containerAccessor; }

  @VisibleForTesting
  public OperatorContext context() { return context; }

  @Override
  public boolean buildSchema() {
    assert state == State.START;

    // Spin though readers looking for the first that has enough data
    // to provide a schema. Skips empty, missing or otherwise "null"
    // readers.
    nextAction(true);
    if (state != State.END) {
      return true;
    }

    // Reader count check done here because readers are passed as
    // an iterator, not list. We don't know the count until we've
    // seen EOF from the iterator.
    if (readerCount == 0) {

      // The scan operator cannot handle the case that no readers are
      // available: there is nothing to define a schema, yet the scanner
      // is required to return a schema.
      //
      // This exception can be softened if the operator stack is enhanced
      // to handle "empty batches" in the most extreme case: no schema,
      // no data.
      throw UserException.executionError(
          new ExecutionSetupException("A scan batch must contain at least one reader."))
        .build(logger);
    }
    return false;
  }

  @Override
  public boolean next() {
    try {
      switch (state) {

        case READER:
        case START: // Occurs if no schema batch
          // Read another batch from the list of row readers. Keeps opening,
          // reading from, and closing readers as needed to locate a batch, or
          // until all readers are exhausted. Terminates when a batch is read,
          // or all readers are exhausted.
          nextAction(false);
          return state != State.END;

        case EMPTY:
          state = State.END;
          return false;

        case END:
          return false;

        default:
          throw new IllegalStateException("Unexpected state: " + state);
      }
    } catch (final Throwable t) {
      state = State.FAILED;
      throw t;
    }
  }

  private void nextAction(boolean readSchema) {
    while (true) {

      // If have a reader, read a batch
      if (readerState != null) {
        boolean hasData;
        if (readSchema) {
          hasData = readerState.buildSchema();
        } else {
          hasData = readerState.next();
        }
        if (hasData) {
          break;
        }
        closeReader();
      }

      // Another reader available?
      if (!nextReader()) {
        finalizeResults();
        return;
      }
      state = State.READER;

      // Is the reader usable?
      if (!readerState.open()) {
        closeReader();
      }
    }
  }

  /**
   * The last reader is done. Check for the special case that no reader
   * returned any rows, but some reader provided a schema. In this case,
   * we can return an empty result set with a schema. Otherwise, we have
   * to return a null result set: no schema, no data. For the Volcano
   * iterator protocol, this means no return of OK_NEW_SCHEMA, just
   * an immediate return of NONE.
   */
  private void finalizeResults() {
    if (allowEmptyResult &&
        containerAccessor.batchCount() == 0 &&
        containerAccessor.schemaVersion() > 0) {

      // We've exhausted all readers, none had data, but at least
      // one had a schema. Any zero-sized batch produced by a reader
      // was cleared when closing the reader. Recreate a valid empty
      // batch here to return downstream.
      containerAccessor.container().setEmpty();
      state = State.EMPTY;
    } else {
      if (containerAccessor.container() != null) {
        containerAccessor.container().setRecordCount(0);
      }
      state = State.END;
    }
  }

  /**
   * Open the next available reader, if any, preparing both the
   * reader and row set mutator.
   * @return true if another reader is active, false if no more
   * readers are available
   */
  private boolean nextReader() {

    // Get the next reader, if any.
    final RowBatchReader reader = factory.nextReader();
    if (reader == null) {
      return false;
    }
    readerCount++;

    // Open the reader. This can fail.
    readerState = new ReaderState(this, reader);
    return true;
  }

  /**
   * Close the current reader.
   */
  private void closeReader() {
    try {
      readerState.close();
    } finally {
      readerState = null;
    }
  }

  @Override
  public void cancel() {
    switch (state) {
    case FAILED:
    case CLOSED:
      break;
    default:
      state = State.FAILED;

      // Close early.
      closeAll();
    }
  }

  @Override
  public void close() {
    if (state == State.CLOSED) {
      return;
    }
    closeAll();
  }

  /**
   * Close reader and release row set mutator resources. May be called
   * twice: once when canceling, once when closing. Designed to be
   * safe on the second call.
   */
  private void closeAll() {

    // May throw an unchecked exception
    try {
      if (readerState != null) {
        closeReader();
      }
    } finally {
      factory.close();
      state = State.CLOSED;
    }
  }
}
