/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.aggregate;

import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;

import javax.inject.Named;

import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamingAggTemplate implements StreamingAggregator {
  private static final Logger logger = LoggerFactory.getLogger(StreamingAggregator.class);
  private static final boolean EXTRA_DEBUG = false;
  private int maxOutputRows = ValueVector.MAX_ROW_COUNT;

  // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
  private IterOutcome lastOutcome;

  // First batch after build schema phase
  private boolean first = true;
  private boolean firstBatchForSchema; // true if the current batch came in with an OK_NEW_SCHEMA.
  private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set

  private boolean newSchema;

  // End of all data
  private boolean done;

  // index in the incoming (sv4/sv2/vector)
  private int underlyingIndex;
  // The indexes below refer to the actual record indexes in input batch
  // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself)
  private int previousIndex = -1;  // the last index that has been processed. Initialized to -1 every time a new
                                   // aggregate group begins (including every time a new data set begins)
  private int currentIndex = Integer.MAX_VALUE; // current index being processed
  /**
   * Number of records added to the current aggregation group.
   */
  private long addedRecordCount;
  // There are two outcomes from the aggregator. One is the aggregator's outcome defined in
  // StreamingAggregator.AggOutcome. The other is the outcome from the last call to incoming.next
  private IterOutcome outcome;
  // Number of aggregation groups added into the output batch
  private int outputCount;
  private RecordBatch incoming;
  // the Streaming Agg Batch that this aggregator belongs to
  private StreamingAggBatch outgoing;

  private OperatorContext context;

  @Override
  public void setup(OperatorContext context, RecordBatch incoming,
                    StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException {
    this.context = context;
    this.incoming = incoming;
    this.outgoing = outgoing;
    this.maxOutputRows = outputRowCount;
    setupInterior(incoming, outgoing);
  }

  private void allocateOutgoing() {
    for (VectorWrapper<?> w : outgoing) {
      w.getValueVector().allocateNew();
    }
  }

  @Override
  public IterOutcome getOutcome() {
    return outcome;
  }

  @Override
  public int getOutputCount() {
    return outputCount;
  }

  @Override
  public AggOutcome doWork(IterOutcome outerOutcome) {
    if (done || outerOutcome == NONE) {
      outcome = IterOutcome.NONE;
      return AggOutcome.CLEANUP_AND_RETURN;
    }

    try { // outside block to ensure that first is set to false after the first run.
      outputCount = 0;
      // allocate outgoing since either this is the first time or if a subsequent time we would
      // have sent the previous outgoing batch to downstream operator
      allocateOutgoing();

      if (firstBatchForDataSet) {
        this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE : this.getVectorIndex(underlyingIndex);

        if (outerOutcome == OK_NEW_SCHEMA) {
          firstBatchForSchema = true;
        }
        // consume empty batches until we get one with data (unless we got an EMIT). If we got an emit
        // then this is the first batch, it was empty and we also got an emit.
        if (incoming.getRecordCount() == 0 ) {
          if (outerOutcome != EMIT) {
            outer:
            while (true) {
              IterOutcome out = outgoing.next(0, incoming);
              switch (out) {
                case OK_NEW_SCHEMA:
                  //lastOutcome = out;
                  firstBatchForSchema = true;
                case OK:
                  if (incoming.getRecordCount() == 0) {
                    continue;
                  } else {
                    currentIndex = this.getVectorIndex(underlyingIndex);
                    break outer;
                  }
                case EMIT:
                  outerOutcome = EMIT;
                  if (incoming.getRecordCount() == 0) {
                    // When we see an EMIT we let the  agg record batch know that it should either
                    // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
                    // RETURN_AND_RESET with the outcome so the record batch can take care of it.
                    return setOkAndReturnEmit();
                  } else {
                    currentIndex = this.getVectorIndex(underlyingIndex);
                    break outer;
                  }

                case NONE:
                  out = IterOutcome.OK_NEW_SCHEMA;
                case STOP:
                default:
                  lastOutcome = out;
                  outcome = out;
                  done = true;
                  return AggOutcome.CLEANUP_AND_RETURN;
              } // switch (outcome)
            } // while empty batches are seen
          } else {
            return setOkAndReturnEmit();
          }
        }
      }

      if (newSchema) {
        return AggOutcome.UPDATE_AGGREGATOR;
      }

      // if the previous iteration has an outcome that was terminal, don't do anything.
      if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) {
        outcome = lastOutcome;
        return AggOutcome.CLEANUP_AND_RETURN;
      }

      outside: while(true) {
        // loop through existing records, adding as necessary.
        if(!processRemainingRecordsInBatch()) {
          // output batch is full. Return.
          return setOkAndReturn(outerOutcome);
        }
        // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all
        // the rows in incoming batch
        if(outerOutcome == EMIT) {
          // output the last record
          outputToBatch(previousIndex);
          resetIndex();
          return setOkAndReturnEmit();
        }

        /**
         * Hold onto the previous incoming batch. When the incoming uses an
         * SV4, the InternalBatch DOES NOT clone or transfer the data. Instead,
         * it clones only the SV4, and assumes that the same hyper-list of
         * batches will be offered again after the next call to the incoming
         * next(). This is, in fact, how the SV4 works, so all is fine. The
         * trick to be aware of, however, is that this MUST BE TRUE even if
         * the incoming next() returns NONE: the incoming is obligated to continue
         * to offer the set of batches. That is, the incoming cannot try to be
         * tidy and release the batches one end-of-data or the following code
         * will fail, likely with an IndexOutOfBounds exception.
         */

        InternalBatch previous = new InternalBatch(incoming, context);

        try {
          while (true) {

            IterOutcome out = outgoing.next(0, incoming);
            if (EXTRA_DEBUG) {
              logger.debug("Received IterOutcome of {}", out);
            }
            switch (out) {
              case NONE:
                done = true;
                lastOutcome = out;
                if (firstBatchForDataSet && addedRecordCount == 0) {
                  return setOkAndReturn(NONE);
                } else if (addedRecordCount > 0) {
                  outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
                  // (output container full or not) as we are not going to insert any more records.
                  if (EXTRA_DEBUG) {
                    logger.debug("Received no more batches, returning.");
                  }
                  return setOkAndReturn(NONE);
                } else {
                  // not first batch and record Count == 0
                  outcome = out;
                  return AggOutcome.CLEANUP_AND_RETURN;
                }
                // EMIT is handled like OK, except that we do not loop back to process the
                // next incoming batch; we return instead
              case EMIT:
                if (incoming.getRecordCount() == 0) {
                  if (addedRecordCount > 0) {
                    outputToBatchPrev(previous, previousIndex, outputCount);
                  }
                } else {
                  resetIndex();
                  currentIndex = this.getVectorIndex(underlyingIndex);
                  if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                    if (EXTRA_DEBUG) {
                      logger.debug("New value was same as last value of previous batch, adding.");
                    }
                    addRecordInc(currentIndex);
                    previousIndex = currentIndex;
                    incIndex();
                    if (EXTRA_DEBUG) {
                      logger.debug("Continuing outside");
                    }
                  } else { // not the same
                    if (EXTRA_DEBUG) {
                      logger.debug("This is not the same as the previous, add record and continue outside.");
                    }
                    if (addedRecordCount > 0) {
                      if (outputToBatchPrev(previous, previousIndex, outputCount)) {
                        if (EXTRA_DEBUG) {
                          logger.debug("Output container is full. flushing it.");
                        }
                        return setOkAndReturn(EMIT);
                      }
                    }
                    // important to set the previous index to -1 since we start a new group
                    previousIndex = -1;
                  }
                  if (!processRemainingRecordsInBatch()) {
                    // output batch is full. Return.
                    return setOkAndReturn(EMIT);
                  }
                  outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index.
                }
                resetIndex();
                return setOkAndReturnEmit();

              case NOT_YET:
                this.outcome = out;
                return AggOutcome.RETURN_OUTCOME;

              case OK_NEW_SCHEMA:
                firstBatchForSchema = true;
                //lastOutcome = out;
                if (EXTRA_DEBUG) {
                  logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
                }
                if (addedRecordCount > 0) {
                  outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
                  // (output container full or not) as we are not going to insert anymore records.
                  if (EXTRA_DEBUG) {
                    logger.debug("Wrote out end of previous batch, returning.");
                  }
                  newSchema = true;
                  return setOkAndReturn(OK_NEW_SCHEMA);
                }
                cleanup();
                return AggOutcome.UPDATE_AGGREGATOR;
              case OK:
                resetIndex();
                if (incoming.getRecordCount() == 0) {
                  continue;
                } else {
                  currentIndex = this.getVectorIndex(underlyingIndex);
                  if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                    if (EXTRA_DEBUG) {
                      logger.debug("New value was same as last value of previous batch, adding.");
                    }
                    addRecordInc(currentIndex);
                    previousIndex = currentIndex;
                    incIndex();
                    if (EXTRA_DEBUG) {
                      logger.debug("Continuing outside");
                    }
                    continue outside;
                  } else { // not the same
                    if (EXTRA_DEBUG) {
                      logger.debug("This is not the same as the previous, add record and continue outside.");
                    }
                    if (addedRecordCount > 0) {
                      if (outputToBatchPrev(previous, previousIndex, outputCount)) {
                        if (EXTRA_DEBUG) {
                          logger.debug("Output container is full. flushing it.");
                        }
                        previousIndex = -1;
                        return setOkAndReturn(OK);
                      }
                    }
                    previousIndex = -1;
                    continue outside;
                  }
                }
              case STOP:
              default:
                lastOutcome = out;
                outcome = out;
                return AggOutcome.CLEANUP_AND_RETURN;
            }
          }
        } finally {
          // make sure to clear previous
          if (previous != null) {
            previous.clear();
          }
        }
      }
    } finally {
      if (first) {
        first = false;
      }
    }
  }

  @Override
  public boolean isDone() {
    return done;
  }

  /**
   * Process the remaining records in the batch. Returns false if not all records are processed (if the output
   * container gets full), true otherwise.
   * @return  Boolean indicating all records were processed
   */
  private boolean processRemainingRecordsInBatch() {
    for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
      if (EXTRA_DEBUG) {
        logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
      }
      if (previousIndex == -1) {
        if (EXTRA_DEBUG) {
          logger.debug("Adding the initial row's keys and values.");
        }
        addRecordInc(currentIndex);
      }
      else if (isSame( previousIndex, currentIndex )) {
        if (EXTRA_DEBUG) {
          logger.debug("Values were found the same, adding.");
        }
        addRecordInc(currentIndex);
      } else {
        if (EXTRA_DEBUG) {
          logger.debug("Values were different, outputting previous batch.");
        }
        if(!outputToBatch(previousIndex)) {
          // There is still space in outgoing container, so proceed to the next input.
          if (EXTRA_DEBUG) {
            logger.debug("Output successful.");
          }
          addRecordInc(currentIndex);
        } else {
          if (EXTRA_DEBUG) {
            logger.debug("Output container has reached its capacity. Flushing it.");
          }

          // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls.
          previousIndex = -1;
          return false;
        }
      }
      previousIndex = currentIndex;
    }
    return true;
  }

  private final void incIndex() {
    underlyingIndex++;
    if (underlyingIndex >= incoming.getRecordCount()) {
      currentIndex = Integer.MAX_VALUE;
      return;
    }
    currentIndex = getVectorIndex(underlyingIndex);
  }

  private final void resetIndex() {
    underlyingIndex = 0;
    currentIndex = Integer.MAX_VALUE;
  }

  /**
   * Set the outcome to OK (or OK_NEW_SCHEMA) and return the AggOutcome parameter
   *
   * @return outcome
   */
  private final AggOutcome setOkAndReturn(IterOutcome seenOutcome) {
    IterOutcome outcomeToReturn;
    firstBatchForDataSet = false;
    if (firstBatchForSchema) {
      outcomeToReturn = OK_NEW_SCHEMA;
      firstBatchForSchema = false;
    } else {
      outcomeToReturn = OK;
    }
    outcome = outcomeToReturn;

    outgoing.getContainer().setValueCount(outputCount);
    return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME;
  }

  /**
   * setOkAndReturn (as above) if the iter outcome was EMIT
   *
   * @return outcome
   */
  private final AggOutcome setOkAndReturnEmit() {
    IterOutcome outcomeToReturn;
    firstBatchForDataSet = true;
    previousIndex = -1;
    if (firstBatchForSchema) {
      outcomeToReturn = OK_NEW_SCHEMA;
      firstBatchForSchema = false;
    } else {
      outcomeToReturn = EMIT;
    }
    outcome = outcomeToReturn;

    outgoing.getContainer().setValueCount(outputCount);
    return AggOutcome.RETURN_AND_RESET;
  }

  // Returns output container status after insertion of the given record. Caller must check the return value if it
  // plans to insert more records into outgoing container.
  private final boolean outputToBatch(int inIndex) {
    assert outputCount < maxOutputRows :
        "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";

    outputRecordKeys(inIndex, outputCount);

    outputRecordValues(outputCount);

    if (EXTRA_DEBUG) {
      logger.debug("{} values output successfully", outputCount);
    }
    resetValues();
    outputCount++;
    addedRecordCount = 0;
    return outputCount == maxOutputRows;
  }

  // Returns output container status after insertion of the given record. Caller must check the return value if it
  // plans to inserts more record into outgoing container.
  private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
    assert outputCount < maxOutputRows :
        "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";

    outputRecordKeysPrev(b1, inIndex, outIndex);
    outputRecordValues(outIndex);
    resetValues();
    outputCount++;
    addedRecordCount = 0;
    return outputCount == maxOutputRows;
  }

  private void addRecordInc(int index) {
    addRecord(index);
    addedRecordCount++;
  }

  @Override
  public void cleanup() {
  }

  @Override
  public String toString() {
    return "StreamingAggTemplate[underlyingIndex=" + underlyingIndex
        + ", previousIndex=" + previousIndex
        + ", currentIndex=" + currentIndex
        + ", addedRecordCount=" + addedRecordCount
        + ", outputCount=" + outputCount
        + "]";
  }

  @Override
  public boolean previousBatchProcessed() {
    return (currentIndex == Integer.MAX_VALUE);
  }

  public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
  public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
  public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
  public abstract void addRecord(@Named("index") int index);
  public abstract void outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
  public abstract void outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
  public abstract void outputRecordValues(@Named("outIndex") int outIndex);
  public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
  public abstract boolean resetValues();
}
