/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.record;

import java.lang.reflect.Array;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class VectorContainer implements VectorAccessible {

  private final BufferAllocator allocator;
  protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
  private BatchSchema schema;
  private int recordCount;
  private boolean initialized;
  // private BufferAllocator allocator;
  // Schema has changed since last built. Must rebuild schema
  private boolean schemaChanged = true;

  public VectorContainer() {
    allocator = null;
  }

  public VectorContainer(OperatorContext oContext) {
    this(oContext.getAllocator());
  }

  public VectorContainer(BufferAllocator allocator) {
    this.allocator = allocator;
  }

  /**
   * Create a new vector container given a pre-defined schema. Creates the
   * corresponding vectors, but does not allocate memory for them. Call
   * {@link #allocateNew()} or {@link #allocateNewSafe()} to allocate
   * memory.
   * <p>
   * Note that this method does the equivalent of {@link #buildSchema(SelectionVectorMode)}
   * using the schema provided.
   *
   * @param allocator allocator to be used to allocate memory later
   * @param schema the schema that defines the vectors to create
   */

  public VectorContainer(BufferAllocator allocator, BatchSchema schema) {
    this.allocator = allocator;
    for (MaterializedField field : schema) {
      addOrGet(field, null);
    }
    this.schema = schema;
    schemaChanged = false;
  }

  @Override
  public String toString() {
    return super.toString()
        + "[recordCount = " + recordCount
        + ", schemaChanged = " + schemaChanged
        + ", schema = " + schema
        + ", wrappers = " + wrappers
        + ", ...]";
  }

  public BufferAllocator getAllocator() { return allocator; }

  public boolean isSchemaChanged() { return schemaChanged; }

  /**
   * Indicate the schema changed. Normally set by mutating this container.
   * If schemas are built externally, call this if the schema contained
   * here is different than the one provided in a previous batch. (Some
   * operators don't trust OK_NEW_SCHEMA, and use the schema changed
   * flag for the "real" truth.
   */

  public void schemaChanged() {
    schemaChanged = true;
  }

  public void addHyperList(List<ValueVector> vectors) {
    addHyperList(vectors, true);
  }

  public void addHyperList(List<ValueVector> vectors, boolean releasable) {
    schema = null;
    ValueVector[] vv = new ValueVector[vectors.size()];
    for (int i = 0; i < vv.length; i++) {
      vv[i] = vectors.get(i);
    }
    add(vv, releasable);
  }

  /**
   * Transfer vectors from containerIn to this.
   */
  public void transferIn(VectorContainer containerIn) {
    rawTransferIn(containerIn);
    setRecordCount(containerIn.getRecordCount());
  }

  @VisibleForTesting
  public void rawTransferIn(VectorContainer containerIn) {
    Preconditions.checkArgument(wrappers.size() == containerIn.wrappers.size());
    for (int i = 0; i < wrappers.size(); ++i) {
      containerIn.wrappers.get(i).transfer(wrappers.get(i));
    }
  }

  /**
   * Transfer vectors from this to containerOut
   */
  public void transferOut(VectorContainer containerOut) {
    Preconditions.checkArgument(this.wrappers.size() == containerOut.wrappers.size());
    for (int i = 0; i < this.wrappers.size(); ++i) {
      this.wrappers.get(i).transfer(containerOut.wrappers.get(i));
    }
  }

  public <T extends ValueVector> T addOrGet(MaterializedField field) {
    return addOrGet(field, null);
  }

  /**
   * This method should be called with MaterializedField which also has correct children field list specially when
   * the field type is MAP. Otherwise after calling this method if caller is not creating TransferPair on the
   * ValueVector, then the new ValueVector will not have information about it's list of children MaterializedField.
   * @param field
   * @param callBack
   * @param <T>
   * @return
   */
  @SuppressWarnings("unchecked")
  public <T extends ValueVector> T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) {
    final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName()));
    final ValueVector vector;
    if (id != null) {
      vector = getValueAccessorById(id.getFieldIds()).getValueVector();
      if (id.getFieldIds().length == 1 && !vector.getField().isEquivalent(field)) {
        final ValueVector newVector = TypeHelper.getNewVector(field, this.getAllocator(), callBack);
        replace(vector, newVector);
        return (T) newVector;
      }
    } else {
      vector = TypeHelper.getNewVector(field, this.getAllocator(), callBack);
      add(vector);
    }
    return (T) vector;
  }

  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
    MaterializedField field = MaterializedField.create(name, type);
    return addOrGet(field);
  }

  /**
   * Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned
   * container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code. In
   * the case of hyper vectors, this container actually doesn't do a full transfer, rather creating a clone vector
   * wrapper only.
   *
   * @param incoming
   *          The RecordBatch iterator the contains the batch we should take over.
   * @return A cloned vector container.
   */
  public static VectorContainer getTransferClone(VectorAccessible incoming, OperatorContext oContext) {
    VectorContainer vc = new VectorContainer(oContext);
    for (VectorWrapper<?> w : incoming) {
      vc.cloneAndTransfer(w);
    }
    return vc;
  }

  public static VectorContainer getTransferClone(VectorAccessible incoming, BufferAllocator allocator) {
    VectorContainer vc = new VectorContainer(allocator);
    for (VectorWrapper<?> w : incoming) {
      vc.cloneAndTransfer(w);
    }
    return vc;
  }

  public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper<?>[] ignoreWrappers, OperatorContext oContext) {
    Iterable<VectorWrapper<?>> wrappers = incoming;
    if (ignoreWrappers != null) {
      final List<VectorWrapper<?>> ignored = Lists.newArrayList(ignoreWrappers);
      final Set<VectorWrapper<?>> resultant = Sets.newLinkedHashSet(incoming);
      resultant.removeAll(ignored);
      wrappers = resultant;
    }

    final VectorContainer vc = new VectorContainer(oContext);
    for (VectorWrapper<?> w : wrappers) {
      vc.cloneAndTransfer(w);
    }

    return vc;
  }

  private void cloneAndTransfer(VectorWrapper<?> wrapper) {
    wrappers.add(wrapper.cloneAndTransfer(getAllocator()));
  }

  public void addCollection(Iterable<ValueVector> vectors) {
    schema = null;
    for (ValueVector vv : vectors) {
      wrappers.add(SimpleVectorWrapper.create(vv));
    }
  }

  /**
    * This works with non-hyper {@link VectorContainer}s which have no selection vectors.
    * Appends a row taken from a source {@link VectorContainer} to this {@link VectorContainer}.
    * @param srcContainer The {@link VectorContainer} to copy a row from.
    * @param srcIndex The index of the row to copy from the source {@link VectorContainer}.
    * @return Position one above where the row was appended
    */
  public int appendRow(VectorContainer srcContainer, int srcIndex) {
    for (int vectorIndex = 0; vectorIndex < wrappers.size(); vectorIndex++) {
      ValueVector destVector = wrappers.get(vectorIndex).getValueVector();
      ValueVector srcVector = srcContainer.wrappers.get(vectorIndex).getValueVector();
      destVector.copyEntry(recordCount, srcVector, srcIndex);
    }
    return incRecordCount();
  }

  public TypedFieldId add(ValueVector vv) {
    schemaChanged();
    schema = null;
    int i = wrappers.size();
    wrappers.add(SimpleVectorWrapper.create(vv));
    return new TypedFieldId.Builder().finalType(vv.getField().getType())
        .addId(i)
        .build();
  }

  public ValueVector getLast() {
    int sz = wrappers.size();
    if ( sz == 0 ) { return null; }
    return wrappers.get(sz - 1).getValueVector();
  }
  public void add(ValueVector[] hyperVector) {
    add(hyperVector, true);
  }

  public void add(ValueVector[] hyperVector, boolean releasable) {
    assert hyperVector.length != 0;
    schemaChanged();
    schema = null;
    Class<?> clazz = hyperVector[0].getClass();
    ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, hyperVector.length);
    System.arraycopy(hyperVector, 0, c, 0, hyperVector.length);
    // todo: work with a merged schema.
    wrappers.add(HyperVectorWrapper.create(hyperVector[0].getField(), c, releasable));
  }

  public void remove(ValueVector v) {
    schema = null;
    schemaChanged();
    for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
      VectorWrapper<?> w = iter.next();
      if (!w.isHyper() && v == w.getValueVector()) {
        w.clear();
        iter.remove();
        return;
      }
    }
    throw new IllegalStateException("You attempted to remove a vector that didn't exist.");
  }

  private void replace(ValueVector old, ValueVector newVector) {
    schema = null;
    schemaChanged();
    int i = 0;
    for (VectorWrapper<?> w : wrappers){
      if (!w.isHyper() && old == w.getValueVector()) {
        w.clear();
        wrappers.set(i, new SimpleVectorWrapper<>(newVector));
        return;
      }
      i++;
    }
    throw new IllegalStateException("You attempted to remove a vector that didn't exist.");
  }

  @Override
  public TypedFieldId getValueVectorId(SchemaPath path) {
    for (int i = 0; i < wrappers.size(); i++) {
      VectorWrapper<?> va = wrappers.get(i);
      TypedFieldId id = va.getFieldIdIfMatches(i, path);
      if (id != null) {
        return id;
      }
    }

    return null;
  }

  public VectorWrapper<?> getValueVector(int index) {
    return wrappers.get(index);
  }

  @Override
  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
    Preconditions.checkArgument(fieldIds.length >= 1);
    VectorWrapper<?> va = wrappers.get(fieldIds[0]);

    if (va == null) {
      return null;
    }

    if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) {
      throw new IllegalStateException(String.format(
          "Failure while reading vector.  Expected vector class of %s but was holding vector class %s, field= %s ",
          clazz.getCanonicalName(), va.getVectorClass().getCanonicalName(), va.getField()));
    }

    return va.getChildWrapper(fieldIds);
  }

  private VectorWrapper<?> getValueAccessorById(int... fieldIds) {
    Preconditions.checkArgument(fieldIds.length >= 1);
    VectorWrapper<?> va = wrappers.get(fieldIds[0]);

    if (va == null) {
      return null;
    }
    return va.getChildWrapper(fieldIds);
  }

  public boolean hasSchema() {
    return schema != null;
  }

  @Override
  public BatchSchema getSchema() {
    Preconditions
        .checkNotNull(schema,
            "Schema is currently null.  You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
    return schema;
  }

  public void buildSchema(SelectionVectorMode mode) {
    SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
    for (VectorWrapper<?> v : wrappers) {
      bldr.addField(v.getField());
    }
    schema = bldr.build();
    schemaChanged = false;
  }

  @Override
  public Iterator<VectorWrapper<?>> iterator() {
    return wrappers.iterator();
  }

  public void clear() {
    zeroVectors();
    removeAll();
  }

  public void removeAll() {
    wrappers.clear();
    schema = null;
  }

  public void setRecordCount(int recordCount) {
    this.recordCount = recordCount;
    this.initialized = true;
  }

  /**
   * Increment the record count
   * @return the new record count
   */
  public int incRecordCount() {
    initialized = true;
    return ++recordCount;
  }

  @Override
  public int getRecordCount() {
    Preconditions.checkState(hasRecordCount(), "Record count not set for this vector container");
    return recordCount;
  }

  public boolean hasRecordCount() { return initialized; }

  @Override
  public SelectionVector2 getSelectionVector2() {
    throw new UnsupportedOperationException();
  }

  @Override
  public SelectionVector4 getSelectionVector4() {
    throw new UnsupportedOperationException();
  }

  /**
   * Clears the contained vectors.  (See {@link ValueVector#clear}).
   * Note that the name <tt>zeroVector()</tt> in a value vector is
   * used for the action to set all vectors to zero. Here it means
   * to free the vector's memory. Sigh...
   */

  public void zeroVectors() {
    VectorAccessibleUtilities.clear(this);
  }

  public int getNumberOfColumns() {
    return wrappers.size();
  }

  public void allocate(int recordCount) {
    for (VectorWrapper<?> w : wrappers) {
      ValueVector v = w.getValueVector();
      v.setInitialCapacity(recordCount);
      v.allocateNew();
    }
  }

  public void allocateNew() {
    for (VectorWrapper<?> w : wrappers) {
      w.getValueVector().allocateNew();
    }
  }

  public boolean allocateNewSafe() {
    for (VectorWrapper<?> w : wrappers) {
      if (!w.getValueVector().allocateNewSafe()) {
        return false;
      }
    }
    return true;
  }

  public void setValueCount(int valueCount) {
    VectorAccessibleUtilities.setValueCount(this, valueCount);
    setRecordCount(valueCount);
  }

  /**
   * Merge two batches to create a single, combined, batch. Vectors
   * appear in the order defined by {@link BatchSchema#merge(BatchSchema)}.
   * The two batches must have identical row counts. The pattern is that
   * this container is the main part of the record batch, the other
   * represents new columns to merge.
   * <p>
   * Reference counts on the underlying buffers are <b>unchanged</b>.
   * The client code is assumed to abandon the two input containers in
   * favor of the merged container.
   *
   * @param otherContainer the container to merge with this one
   * @return a new, merged, container
   */
  public VectorContainer merge(VectorContainer otherContainer) {
    if (recordCount != otherContainer.recordCount) {
      throw new IllegalArgumentException();
    }
    VectorContainer merged = new VectorContainer(allocator);
    merged.schema = schema.merge(otherContainer.schema);
    merged.recordCount = recordCount;
    merged.wrappers.addAll(wrappers);
    merged.wrappers.addAll(otherContainer.wrappers);
    merged.schemaChanged = false;
    merged.initialized = true;
    return merged;
  }

  /**
   * Exchange buffers between two identical vector containers.
   * The schemas must be identical in both column schemas and
   * order. That is, after this call, data is exchanged between
   * the containers. Requires that both containers be owned
   * by the same allocator.
   *
   * @param other the target container with buffers to swap
   */

  public void exchange(VectorContainer other) {
    assert schema.isEquivalent(other.schema);
    assert wrappers.size() == other.wrappers.size();
    assert allocator != null  &&  allocator == other.allocator;
    for (int i = 0; i < wrappers.size(); i++) {
      wrappers.get(i).getValueVector().exchange(
          other.wrappers.get(i).getValueVector());
    }
    int temp = recordCount;
    recordCount = other.recordCount;
    other.recordCount = temp;
    boolean temp2 = schemaChanged;
    schemaChanged = other.schemaChanged;
    other.schemaChanged = temp2;
  }

  /**
   * This method create a pretty string for a record in the {@link VectorContainer}.
   * @param index The index of the record of interest.
   * @return The string representation of a record.
   */
  public String prettyPrintRecord(int index) {
    final StringBuilder sb = new StringBuilder();
    String separator = "";
    sb.append("[");

    for (VectorWrapper<?> vectorWrapper: wrappers) {
      sb.append(separator);
      separator = ", ";
      final String columnName = vectorWrapper.getField().getName();
      final Object value = vectorWrapper.getValueVector().getAccessor().getObject(index);

      // "columnName" = 11
      sb.append("\"").append(columnName).append("\" = ").append(value);
    }

    sb.append("]");
    return sb.toString();
  }

  /**
   * Safely set this container to an empty batch. An empty batch is not
   * fully empty: offset vectors must contain a single 0 entry in their
   * first position.
   */
  public void setEmpty() {
    // May not be needed; retaining for safety.
    zeroVectors();
    // Better to only allocate minimum-size offset vectors,
    // but no good way to do that presently.
    allocateNew();
    // The "fill empties" logic will set the zero
    // in the offset vectors that need it.
    setValueCount(0);
  }

  public void copySchemaFrom(VectorAccessible other) {
    for (VectorWrapper<?> wrapper : other) {
      addOrGet(wrapper.getField());
    }
  }

  public void buildFrom(BatchSchema sourceSchema) {
    for (MaterializedField field : sourceSchema) {
      add(TypeHelper.getNewVector(field, allocator));
    }
  }

  public void allocatePrecomputedChildCount(int valueCount,
      int bytesPerValue, int childValCount) {
    for (VectorWrapper<?> w : wrappers) {
      AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(),
          valueCount, bytesPerValue, childValCount);
    }
    setEmpty();
  }
}
