/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.window;

import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.util.List;


/**
 * WindowFramer implementation that supports the FRAME clause.
 * <br>According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate functions support the FRAME clause.
 * This class will handle such functions even if the FRAME clause is not present.
 */
public abstract class FrameSupportTemplate implements WindowFramer {

  private static final Logger logger = LoggerFactory.getLogger(FrameSupportTemplate.class);

  private VectorContainer container;
  private VectorContainer internal;
  private List<WindowDataBatch> batches;
  private int outputCount; // number of rows in currently/last processed batch

  private WindowDataBatch current;

  private int frameLastRow;

  // true when at least one window function needs to process all batches of a partition before passing any batch downstream
  private boolean requireFullPartition;

  private long remainingRows; // num unprocessed rows in current partition
  private long remainingPeers; // num unprocessed peer rows in current frame
  private boolean partialPartition; // true if we remainingRows only account for the current batch and more batches are expected for the current partition

  private WindowPOP popConfig;

  @Override
  public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
                    final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException {
    this.container = container;
    this.batches = batches;

    internal = new VectorContainer(oContext);
    allocateInternal();

    outputCount = 0;

    this.requireFullPartition = requireFullPartition;
    this.popConfig = popConfig;
  }

  private void allocateInternal() {
    for (VectorWrapper<?> w : container) {
      ValueVector vv = internal.addOrGet(w.getField());
      vv.allocateNew();
    }
  }

  private boolean isPartitionDone() {
    return !partialPartition && remainingRows == 0;
  }

  /**
   * processes all rows of the first batch.
   */
  @Override
  public void doWork() throws DrillException {
    int currentRow = 0;

    this.current = batches.get(0);

    setupSaveFirstValue(current, internal);

    outputCount = current.getRecordCount();

    while (currentRow < outputCount) {
      if (!isPartitionDone()) {
        // we have a pending partition we need to handle from a previous call to doWork()
        assert currentRow == 0 : "pending partitions are only expected at the start of the batch";
        logger.trace("we have a pending partition {}", remainingRows);

        if (!requireFullPartition) {
          // we didn't compute the whole partition length in the previous partition, we need to update the length now
          updatePartitionSize(currentRow);
        }
      } else {
        newPartition(current, currentRow);
      }

      currentRow = processPartition(currentRow);
      if (isPartitionDone()) {
        reset();
      }
    }
  }

  private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
    remainingRows = 0;
    remainingPeers = 0;
    updatePartitionSize(currentRow);

    setupPartition(current, container);
    saveFirstValue(currentRow);
  }

  private void reset() {
    resetValues();
    for (VectorWrapper<?> vw : internal) {
      if ((vw.getValueVector() instanceof BaseDataValueVector)) {
        ((BaseDataValueVector) vw.getValueVector()).reset();
      }
    }
  }

  /**
   * process all rows (computes and writes aggregation values) of current batch that are part of current partition.
   * @param currentRow first unprocessed row
   * @return index of next unprocessed row
   * @throws DrillException if it can't write into the container
   */
  private int processPartition(final int currentRow) throws DrillException {
    logger.trace("{} rows remaining to process, currentRow: {}, outputCount: {}", remainingRows, currentRow, outputCount);

    setupWriteFirstValue(internal, container);

    if (popConfig.isFrameUnitsRows()) {
      return processROWS(currentRow);
    } else {
      return processRANGE(currentRow);
    }
  }

  private int processROWS(int row) throws DrillException {
    //TODO (DRILL-4413) we only need to call these once per batch
    setupEvaluatePeer(current, container);
    setupReadLastValue(current, container);

    while (row < outputCount && !isPartitionDone()) {
      logger.trace("aggregating row {}", row);
      evaluatePeer(row);

      outputRow(row);
      writeLastValue(row, row);

      remainingRows--;
      row++;
    }

    return row;
  }

  private int processRANGE(int row) throws DrillException {
    while (row < outputCount && !isPartitionDone()) {
      if (remainingPeers == 0) {
        // because all peer rows share the same frame, we only need to compute and aggregate the frame once
        if (popConfig.getStart().isCurrent()) {
          reset();
          saveFirstValue(row);
        }

        remainingPeers = aggregatePeers(row);
      }

      outputRow(row);
      writeLastValue(frameLastRow, row);

      remainingRows--;
      remainingPeers--;
      row++;
    }

    return row;
  }

  /**
   * updates partition's length after computing the number of rows for the current the partition starting at the specified
   * row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
   */
  private void updatePartitionSize(final int start) {
    logger.trace("compute partition size starting from {} on {} batches", start, batches.size());

    long length = 0;
    int row = start;

    // count all rows that are in the same partition of start
    // keep increasing length until we find first row of next partition or we reach the very last batch

    outer:
    for (WindowDataBatch batch : batches) {
      final int recordCount = batch.getRecordCount();

      // check first container from start row, and subsequent containers from first row
      for (; row < recordCount; row++, length++) {
        if (!isSamePartition(start, current, row, batch)) {
          break outer;
        }
      }

      if (!requireFullPartition) {
        // we are only interested in the first batch's records
        break;
      }

      row = 0;
    }

    if (!requireFullPartition) {
      // this is the last batch of current partition if
      boolean lastBatch = row < outputCount                     // partition ends before the end of the batch
        || batches.size() == 1                                  // it's the last available batch
        || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition

      partialPartition = !lastBatch;
    } else {
      partialPartition = false;
    }

    remainingRows += length;
  }

  /**
   * aggregates all peer rows of current row
   * @param start starting row of the current frame
   * @return num peer rows for current row
   * @throws SchemaChangeException
   */
  private long aggregatePeers(final int start) throws SchemaChangeException {
    logger.trace("aggregating rows starting from {}", start);

    final boolean unboundedFollowing = popConfig.getEnd().isUnbounded();
    VectorAccessible last = current;
    long length = 0;

    // a single frame can include rows from multiple batches
    // start processing first batch and, if necessary, move to next batches
    for (WindowDataBatch batch : batches) {
      setupEvaluatePeer(batch, container);
      final int recordCount = batch.getRecordCount();

      // for every remaining row in the partition, count it if it's a peer row
      for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
        if (unboundedFollowing) {
          if (length >= remainingRows) {
            break;
          }
        } else {
          if (!isPeer(start, current, row, batch)) {
            break;
          }
        }

        evaluatePeer(row);
        last = batch;
        frameLastRow = row;
      }
    }

    setupReadLastValue(last, container);

    return length;
  }

  @Override
  public int getOutputCount() {
    return outputCount;
  }

  // we need this abstract method for code generation
  @Override
  public void cleanup() {
    logger.trace("clearing internal");
    internal.clear();
  }

  @Override
  public String toString() {
    return "FrameSupportTemplate[internal=" + internal
        + ", outputCount=" + outputCount
        + ", current=" + current
        + ", frameLastRow=" + frameLastRow
        + ", remainingRows=" + remainingRows
        + ", partialPartition=" + partialPartition
        + "]";
  }

  /**
   * called once for each peer row of the current frame.
   * @param index of row to aggregate
   */
  public abstract void evaluatePeer(@Named("index") int index);
  public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;

  public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
  public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);

  public abstract void setupSaveFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
  public abstract void saveFirstValue(@Named("index") int index);
  public abstract void setupWriteFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);

  /**
   * called once for each row after we evaluate all peer rows. Used to write a value in the row
   *
   * @param outIndex index of row
   */
  public abstract void outputRow(@Named("outIndex") int outIndex);

  /**
   * Called once per partition, before processing the partition. Used to setup read/write vectors
   * @param incoming batch we will read from
   * @param outgoing batch we will be writing to
   *
   * @throws SchemaChangeException
   */
  public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
                                      @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;

  /**
   * reset all window functions
   */
  public abstract boolean resetValues();

  /**
   * compares two rows from different batches (can be the same), if they have the same value for the partition by
   * expression
   * @param b1Index index of first row
   * @param b1 batch for first row
   * @param b2Index index of second row
   * @param b2 batch for second row
   * @return true if the rows are in the same partition
   */
  public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
                                          @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);

  /**
   * compares two rows from different batches (can be the same), if they have the same value for the order by
   * expression
   * @param b1Index index of first row
   * @param b1 batch for first row
   * @param b2Index index of second row
   * @param b2 batch for second row
   * @return true if the rows are in the same partition
   */
  public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
                                 @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
}
