/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.protocol;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * State machine that drives the operator executable. Converts
 * between the iterator protocol and the operator executable protocol.
 * Implemented as a separate class in anticipation of eventually
 * changing the record batch (iterator) protocol.
 *
 * <h4>Schema-Only Batch</h4>
 *
 * The scan operator is designed to provide an initial, empty, schema-only
 * batch. At the time that this code was written, it was (mis-?) understood
 * that Drill used a "fast schema" path that provided a schema-only batch
 * as the first batch. However, it turns out that most operators fail when
 * presented with an empty batch: many do not properly set the offset
 * vector for variable-width vectors to an initial 0 position, causing all
 * kinds of issues.
 * <p>
 * To work around this issue, the code defaults to *not* providing the
 * schema batch.
 */

public class OperatorDriver {

  private static final Logger logger = LoggerFactory.getLogger(OperatorDriver.class);

  public enum State {

    /**
     * Before the first call to next().
     */

    START,

    /**
     * Attempting to start the operator.
     */

    STARTING,

    /**
     * Read from readers.
     */

    RUN,

    /**
     * No more data to deliver.
     */

    END,

    /**
     * An error occurred.
     */

    FAILED,

    /**
     * Operation was cancelled. No more batches will be returned,
     * but close() has not yet been called.
     */

    CANCELED,

    /**
     * close() called and resources are released. No more batches
     * will be returned, but close() has not yet been called.
     * (This state is semantically identical to FAILED, it exists just
     * in case an implementation needs to know the difference between the
     * END, FAILED and CANCELED states.)
     */

    CLOSED
  }

  private OperatorDriver.State state = State.START;

  /**
   * Operator context. The driver "owns" the context and is responsible
   * for closing it.
   */

  private final OperatorContext opContext;
  private final OperatorExec operatorExec;
  private final BatchAccessor batchAccessor;
  private int schemaVersion;
  private final boolean enableSchemaBatch;

  public OperatorDriver(OperatorContext opContext, OperatorExec opExec, boolean enableSchemaBatch) {
    this.opContext = opContext;
    this.operatorExec = opExec;
    batchAccessor = operatorExec.batchAccessor();
    this.enableSchemaBatch = enableSchemaBatch;
  }

  /**
   * Get the next batch. Performs initialization on the first call.
   * @return the iteration outcome to send downstream
   */

  public IterOutcome next() {
    try {
      switch (state) {
      case START:
        return start();
      case RUN:
        return doNext();
      default:
        logger.debug("Extra call to next() in state {}: {}", state, operatorLabel());
        return IterOutcome.NONE;
      }
    } catch (UserException e) {
      cancelSilently();
      state = State.FAILED;
      throw e;
    } catch (Throwable t) {
      cancelSilently();
      state = State.FAILED;
      throw UserException.executionError(t)
        .addContext("Exception thrown from", operatorLabel())
        .build(logger);
    }
  }

  /**
   * Cancels the operator before reaching EOF.
   */

  public void cancel() {
    try {
      switch (state) {
      case START:
      case RUN:
        cancelSilently();
        break;
      default:
        break;
      }
    } finally {
      state = State.CANCELED;
    }
  }

 /**
   * Start the operator executor. Bind it to the various contexts.
   * Then start the executor and fetch the first schema.
   * @return result of the first batch, which should contain
   * only a schema, or EOF
   */

  private IterOutcome start() {
    state = State.STARTING;
    schemaVersion = -1;
    if (!enableSchemaBatch) {
      return doNext();
    }
    if (operatorExec.buildSchema()) {
      schemaVersion = batchAccessor.schemaVersion();

      // Report schema change.

      batchAccessor.container().schemaChanged();
      state = State.RUN;
      return IterOutcome.OK_NEW_SCHEMA;
    } else {
      state = State.END;
      return IterOutcome.NONE;
    }
  }

  /**
   * Fetch a record batch, detecting EOF and a new schema.
   * @return the <tt>IterOutcome</tt> for the above cases
   */

  private IterOutcome doNext() {
    if (! operatorExec.next()) {
      state = State.END;
      return IterOutcome.NONE;
    }
    int newVersion = batchAccessor.schemaVersion();
    boolean schemaChanged = newVersion != schemaVersion;

    // Set the container schema changed based on whether the
    // current schema differs from that the last time through
    // this method. That is, we take "schema changed" to be
    // "schema changed since last call to next." The result hide
    // trivial changes within this operator.

    if (schemaChanged) {
      batchAccessor.container().schemaChanged();
    }
    if (state == State.STARTING || schemaChanged) {
      schemaVersion = newVersion;
      state = State.RUN;
      return IterOutcome.OK_NEW_SCHEMA;
    }
    state = State.RUN;
    return IterOutcome.OK;
  }

  /**
   * Implement a cancellation, and ignore any exception that is
   * thrown. We're already in trouble here, no need to keep track
   * of additional things that go wrong.
   * <p>
   * Cancellation is done only if the operator is doing work.
   * The request is not propagated if either the operator never
   * started, or is already finished.
   */

  private void cancelSilently() {
    try {
      if (state == State.STARTING || state == State.RUN) {
        operatorExec.cancel();
      }
    } catch (Throwable t) {
      // Ignore; we're already in a bad state.
      logger.error("Exception thrown from cancel() for {}", operatorLabel(), t);
    }
  }

  private String operatorLabel() {
    return operatorExec.getClass().getCanonicalName();
  }

  public void close() {
    if (state == State.CLOSED) {
      return;
    }
    try {
      operatorExec.close();
    } catch (UserException e) {
      throw e;
    } catch (Throwable t) {
      throw UserException.executionError(t)
        .addContext("Exception thrown from", operatorLabel())
        .build(logger);
    } finally {
      opContext.close();
      state = State.CLOSED;
    }
  }

  public BatchAccessor batchAccessor() {
    return batchAccessor;
  }

  public OperatorContext operatorContext() {
    return opContext;
  }
}
