/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.bson;

import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.Float8Holder;
import org.apache.drill.exec.expr.holders.IntHolder;
import org.apache.drill.exec.expr.holders.VarBinaryHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
import com.google.common.base.Preconditions;
import org.bson.BsonBinary;
import org.bson.BsonReader;
import org.bson.BsonType;
import org.joda.time.DateTime;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;

public class BsonRecordReader {
  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BsonRecordReader.class);
  public final static int MAX_RECORD_SIZE = 128 * 1024;
  private final List<SchemaPath> columns;
  private boolean atLeastOneWrite = false;
  private final boolean readNumbersAsDouble;
  protected DrillBuf workBuf;
  private String currentFieldName;
  // Used for error context
  private BsonReader reader;

  public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) {
    this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble);
  }

  public BsonRecordReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean readNumbersAsDouble) {
    assert Preconditions.checkNotNull(columns).size() > 0 : "bson record reader requires at least a column";
    this.readNumbersAsDouble = readNumbersAsDouble;
    this.workBuf = managedBuf;
    this.columns = columns;
  }

  public void write(ComplexWriter writer, BsonReader reader) throws IOException {
    this.reader = reader;
    reader.readStartDocument();
    BsonType readBsonType = reader.getCurrentBsonType();
    switch (readBsonType) {
    case DOCUMENT:
      writeToListOrMap(reader, new MapOrListWriterImpl(writer.rootAsMap()), false, null);
      break;
    default:
      throw new DrillRuntimeException("Root object must be DOCUMENT type. Found: " + readBsonType);
    }
  }

  private void writeToListOrMap(BsonReader reader, final MapOrListWriterImpl writer, boolean isList, String fieldName) {
    writer.start();
    // If isList is true, then filedName can be null as it is not required while
    // writing
    while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) {
      if (!isList) {
        fieldName = reader.readName();
      }
      BsonType currentBsonType = reader.getCurrentBsonType();
      switch (currentBsonType) {
      case INT32:
        int readInt32 = reader.readInt32();
        if (readNumbersAsDouble) {
          writeDouble(readInt32, writer, fieldName, isList);
        } else {
          writeInt32(readInt32, writer, fieldName, isList);
        }
        atLeastOneWrite = true;
        break;
      case INT64:
        long readInt64 = reader.readInt64();
        if (readNumbersAsDouble) {
          writeDouble(readInt64, writer, fieldName, isList);
        } else {
          writeInt64(readInt64, writer, fieldName, isList);
        }
        atLeastOneWrite = true;
        break;
      case ARRAY:
        reader.readStartArray();
        writeToListOrMap(reader, (MapOrListWriterImpl) writer.list(fieldName), true, fieldName);
        atLeastOneWrite = true;
        break;
      case BINARY:
        // handle types
        writeBinary(reader, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case BOOLEAN:
        boolean readBoolean = reader.readBoolean();
        writeBoolean(readBoolean, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case DATE_TIME:
        long readDateTime = reader.readDateTime();
        writeDateTime(readDateTime, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case DOCUMENT:
        reader.readStartDocument();
        // To handle nested Documents.
        MapOrListWriterImpl _writer = writer;
        if (!isList) {
          _writer = (MapOrListWriterImpl) writer.map(fieldName);
        } else {
          _writer = (MapOrListWriterImpl) writer.listoftmap(fieldName);
        }
        writeToListOrMap(reader, _writer, false, fieldName);
        atLeastOneWrite = true;
        break;
      case DOUBLE:
        double readDouble = reader.readDouble();
        writeDouble(readDouble, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case JAVASCRIPT:
        final String readJavaScript = reader.readJavaScript();
        writeString(readJavaScript, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case JAVASCRIPT_WITH_SCOPE:
        final String readJavaScriptWithScopeString = reader.readJavaScriptWithScope();
        writeString(readJavaScriptWithScopeString, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case NULL:
        // just read and ignore.
        reader.readNull();
        break;
      case OBJECT_ID:
        writeObjectId(reader, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case STRING:
        final String readString = reader.readString();
        writeString(readString, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case SYMBOL:
        final String readSymbol = reader.readSymbol();
        writeString(readSymbol, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case TIMESTAMP:
        int time = reader.readTimestamp().getTime();
        writeTimeStamp(time, writer, fieldName, isList);
        atLeastOneWrite = true;
        break;
      case DECIMAL128:
         BigDecimal readBigDecimalAsDecimal128 = reader.readDecimal128().bigDecimalValue();
         writeDecimal128(readBigDecimalAsDecimal128, writer, fieldName, isList);
         atLeastOneWrite = true;
         break;
      default:
        // Didn't handled REGULAR_EXPRESSION and DB_POINTER types
        throw new DrillRuntimeException("UnSupported Bson type: " + currentBsonType);
      }
    }
    if (!isList) {
      reader.readEndDocument();
    } else {
      reader.readEndArray();
    }
  }

  private void writeBinary(BsonReader reader, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    final VarBinaryHolder vb = new VarBinaryHolder();
    BsonBinary readBinaryData = reader.readBinaryData();
    byte[] data = readBinaryData.getData();
    Byte type = readBinaryData.getType();
    // Based on specified binary type, cast it accordingly
    switch (type.intValue()) {
    case 1:
      // Double 1
      writeDouble(ByteBuffer.wrap(data).getDouble(), writer, fieldName, isList);
      break;
    case 2:
      // String 2
      writeString(new String(data, StandardCharsets.UTF_8), writer, fieldName, isList);
      break;
    case 8:
      // Boolean 8
      boolean boolValue = (data == null || data.length == 0) ? false : data[0] != 0x00;
      writeBoolean(boolValue, writer, fieldName, isList);
      break;
    case 9:
      // Date 9
      writeDateTime(ByteBuffer.wrap(data).getLong(), writer, fieldName, isList);
      break;
    case 13:
      // JavaScript 13
      writeString(new String(data, StandardCharsets.UTF_8), writer, fieldName, isList);
      break;
    case 14:
      // Symbol 14
      writeString(new String(data, StandardCharsets.UTF_8), writer, fieldName, isList);
      break;
    case 15:
      // JavaScript (with scope) 15
      writeString(new String(data, StandardCharsets.UTF_8), writer, fieldName, isList);
      break;
    case 16:
      // 32-bit integer 16
      writeInt32(ByteBuffer.wrap(data).getInt(), writer, fieldName, isList);
      break;
    case 17:
      // Timestamp 17
      writeTimeStamp(ByteBuffer.wrap(data).getInt(), writer, fieldName, isList);
      break;
    case 18:
      // 64-bit integer 18
      writeInt64(ByteBuffer.wrap(data).getInt(), writer, fieldName, isList);
      break;
    default:
      // In case of Object(3)/Binary data (5)/Object id (7) or in other case
      // considering as VarBinary
      final byte[] bytes = readBinaryData.getData();
      writeBinary(writer, fieldName, isList, vb, bytes);
      break;
    }
  }

  private void writeTimeStamp(int timestamp, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    DateTime dateTime = new DateTime(timestamp*1000L);
    TimeStampWriter t;
    if (isList == false) {
      t = writer.map.timeStamp(fieldName);
    } else {
      t = writer.list.timeStamp();
    }
    t.writeTimeStamp(dateTime.withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis());
  }

  private void writeString(String readString, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    byte[] strBytes = readString.getBytes(StandardCharsets.UTF_8);
    int length = strBytes.length;
    ensure(length);
    workBuf.setBytes(0, strBytes);
    final VarCharHolder vh = new VarCharHolder();
    vh.buffer = workBuf;
    vh.start = 0;
    vh.end = length;
    if (isList == false) {
      writer.varChar(fieldName).write(vh);
    } else {
      writer.list.varChar().write(vh);
    }
  }

  private void writeObjectId(BsonReader reader, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    final VarBinaryHolder vObj = new VarBinaryHolder();
    final byte[] objBytes = reader.readObjectId().toByteArray();
    writeBinary(writer, fieldName, isList, vObj, objBytes);
  }

  private void writeDouble(double readDouble, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    final Float8Holder f8h = new Float8Holder();
    f8h.value = readDouble;
    if (isList == false) {
      writer.float8(fieldName).write(f8h);
    } else {
      writer.list.float8().write(f8h);
    }
  }

  private void writeDateTime(long readDateTime, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    DateTime date = new DateTime(readDateTime);
    TimeStampWriter dt;
    if (isList == false) {
      dt = writer.map.timeStamp(fieldName);
    } else {
      dt = writer.list.timeStamp();
    }
    dt.writeTimeStamp(date.withZoneRetainFields(org.joda.time.DateTimeZone.UTC).getMillis());
  }

  private void writeBoolean(boolean readBoolean, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    final BitHolder bit = new BitHolder();
    bit.value = readBoolean ? 1 : 0;
    if (isList == false) {
      writer.bit(fieldName).write(bit);
    } else {
      writer.list.bit().write(bit);
    }
  }

  private void writeBinary(final MapOrListWriterImpl writer, String fieldName, boolean isList,
      final VarBinaryHolder vb, final byte[] bytes) {
    ensure(bytes.length);
    workBuf.setBytes(0, bytes);
    vb.buffer = workBuf;
    vb.start = 0;
    vb.end = bytes.length;
    if (isList == false) {
      writer.binary(fieldName).write(vb);
    } else {
      writer.list.varBinary().write(vb);
    }
  }

  private void writeInt64(long readInt64, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    final BigIntHolder bh = new BigIntHolder();
    bh.value = readInt64;
    if (isList == false) {
      writer.bigInt(fieldName).write(bh);
    } else {
      writer.list.bigInt().write(bh);
    }
  }

  private void writeInt32(int readInt32, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
    final IntHolder ih = new IntHolder();
    ih.value = readInt32;
    if (isList == false) {
      writer.integer(fieldName).write(ih);
    } else {
      writer.list.integer().write(ih);
    }
  }

    private void writeDecimal128(BigDecimal readBigDecimal, final MapOrListWriterImpl writer, String fieldName, boolean isList) {
        if (isList) {
            writer.list.varDecimal().writeVarDecimal(readBigDecimal);
        } else {
            writer.varDecimal(fieldName, readBigDecimal.precision(), readBigDecimal.scale()).writeVarDecimal(readBigDecimal);
        }
    }


    public void ensureAtLeastOneField(ComplexWriter writer) {
    if (!atLeastOneWrite) {
      // if we had no columns, create one empty one so we can return some data
      // for count purposes.
      SchemaPath sp = columns.get(0);
      PathSegment root = sp.getRootSegment();
      BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
      while (root.getChild() != null && !root.getChild().isArray()) {
        fieldWriter = fieldWriter.map(root.getNameSegment().getPath());
        root = root.getChild();
      }
      fieldWriter.integer(root.getNameSegment().getPath());
    }
  }

  private void ensure(final int length) {
    workBuf = workBuf.reallocIfNeeded(length);
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder("BsonRecordReader[");
    if (reader != null) {
      sb.append("Name=")
          .append(reader.getCurrentName())
          .append(", Type=")
          .append(reader.getCurrentBsonType());
    }
    sb.append(']');
    return sb.toString();
  }
}
