/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.iotdb.rpc;

import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;


public class IoTDBRpcDataSet {

  public static final String TIMESTAMP_STR = "Time";
  public static final String VALUE_IS_NULL = "The value got by %s (column name) is NULL.";
  public static final int START_INDEX = 2;
  public String sql;
  public boolean isClosed = false;
  public TSIService.Iface client;
  public List<String> columnNameList; // no deduplication
  public List<TSDataType> columnTypeList; // no deduplication
  public Map<String, Integer> columnOrdinalMap; // used because the server returns deduplicated columns
  public List<TSDataType> columnTypeDeduplicatedList; // deduplicated from columnTypeList
  public int fetchSize;
  public boolean emptyResultSet = false;
  public boolean hasCachedRecord = false;
  public boolean lastReadWasNull;


  public byte[][] values; // used to cache the current row record value
  // column size
  public int columnSize;

  public long sessionId;
  public long queryId;
  public boolean ignoreTimeStamp;

  public int rowsIndex = 0; // used to record the row index in current TSQueryDataSet

  public TSQueryDataSet tsQueryDataSet = null;
  public byte[] time; // used to cache the current time value
  public byte[] currentBitmap; // used to cache the current bitmap for every column
  public static final int FLAG = 0x80; // used to do `and` operation with bitmap to judge whether the value is null

  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
  public IoTDBRpcDataSet(String sql, List<String> columnNameList, List<String> columnTypeList,
      Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp,
      long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet,
      int fetchSize) {
    this.sessionId = sessionId;
    this.ignoreTimeStamp = ignoreTimeStamp;
    this.sql = sql;
    this.queryId = queryId;
    this.client = client;
    this.fetchSize = fetchSize;
    columnSize = columnNameList.size();

    this.columnNameList = new ArrayList<>();
    this.columnTypeList = new ArrayList<>();
    if (!ignoreTimeStamp) {
      this.columnNameList.add(TIMESTAMP_STR);
      this.columnTypeList.add(TSDataType.INT64);
    }
    // deduplicate and map
    this.columnOrdinalMap = new HashMap<>();
    if (!ignoreTimeStamp) {
      this.columnOrdinalMap.put(TIMESTAMP_STR, 1);
    }

    // deduplicate and map
    if (columnNameIndex != null) {
      this.columnTypeDeduplicatedList = new ArrayList<>(columnNameIndex.size());
      for (int i = 0; i < columnNameIndex.size(); i++) {
        columnTypeDeduplicatedList.add(null);
      }
      for (int i = 0; i < columnNameList.size(); i++) {
        String name = columnNameList.get(i);
        this.columnNameList.add(name);
        this.columnTypeList.add(TSDataType.valueOf(columnTypeList.get(i)));
        if (!columnOrdinalMap.containsKey(name)) {
          int index = columnNameIndex.get(name);
          columnOrdinalMap.put(name, index + START_INDEX);
          columnTypeDeduplicatedList.set(index, TSDataType.valueOf(columnTypeList.get(i)));
        }
      }
    } else {
      this.columnTypeDeduplicatedList = new ArrayList<>();
      int index = START_INDEX;
      for (int i = 0; i < columnNameList.size(); i++) {
        String name = columnNameList.get(i);
        this.columnNameList.add(name);
        this.columnTypeList.add(TSDataType.valueOf(columnTypeList.get(i)));
        if (!columnOrdinalMap.containsKey(name)) {
          columnOrdinalMap.put(name, index++);
          columnTypeDeduplicatedList.add(TSDataType.valueOf(columnTypeList.get(i)));
        }
      }
    }

    time = new byte[Long.BYTES];
    currentBitmap = new byte[columnTypeDeduplicatedList.size()];
    values = new byte[columnTypeDeduplicatedList.size()][];
    for (int i = 0; i < values.length; i++) {
      TSDataType dataType = columnTypeDeduplicatedList.get(i);
      switch (dataType) {
        case BOOLEAN:
          values[i] = new byte[1];
          break;
        case INT32:
          values[i] = new byte[Integer.BYTES];
          break;
        case INT64:
          values[i] = new byte[Long.BYTES];
          break;
        case FLOAT:
          values[i] = new byte[Float.BYTES];
          break;
        case DOUBLE:
          values[i] = new byte[Double.BYTES];
          break;
        case TEXT:
          values[i] = null;
          break;
        default:
          throw new UnSupportedDataTypeException(
              String
                  .format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
      }
    }
    this.tsQueryDataSet = queryDataSet;
  }

  public void close() throws StatementExecutionException, TException {
    if (isClosed) {
      return;
    }
    if (client != null) {
      try {
        TSCloseOperationReq closeReq = new TSCloseOperationReq(sessionId);
        closeReq.setQueryId(queryId);
        TSStatus closeResp = client.closeOperation(closeReq);
        RpcUtils.verifySuccess(closeResp);
      } catch (StatementExecutionException e) {
        throw new StatementExecutionException(
            "Error occurs for close operation in server side because ", e);
      } catch (TException e) {
        throw new TException("Error occurs when connecting to server for close operation ", e);
      }
    }
    client = null;
    isClosed = true;
  }

  public boolean next() throws StatementExecutionException, IoTDBConnectionException {
    if (hasCachedResults()) {
      constructOneRow();
      return true;
    }
    if (emptyResultSet) {
      return false;
    }
    if (fetchResults()) {
      constructOneRow();
      return true;
    }
    return false;
  }

  public boolean fetchResults() throws StatementExecutionException, IoTDBConnectionException {
    rowsIndex = 0;
    TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
    try {
      TSFetchResultsResp resp = client.fetchResults(req);

      RpcUtils.verifySuccess(resp.getStatus());
      if (!resp.hasResultSet) {
        emptyResultSet = true;
      } else {
        tsQueryDataSet = resp.getQueryDataSet();
      }
      return resp.hasResultSet;
    } catch (TException e) {
      throw new IoTDBConnectionException(
          "Cannot fetch result from server, because of network connection: {} ", e);
    }
  }

  public boolean hasCachedResults() {
    return (tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining());
  }

  public void constructOneRow() {
    tsQueryDataSet.time.get(time);
    for (int i = 0; i < tsQueryDataSet.bitmapList.size(); i++) {
      ByteBuffer bitmapBuffer = tsQueryDataSet.bitmapList.get(i);
      // another new 8 row, should move the bitmap buffer position to next byte
      if (rowsIndex % 8 == 0) {
        currentBitmap[i] = bitmapBuffer.get();
      }
      if (!isNull(i, rowsIndex)) {
        ByteBuffer valueBuffer = tsQueryDataSet.valueList.get(i);
        TSDataType dataType = columnTypeDeduplicatedList.get(i);
        switch (dataType) {
          case BOOLEAN:
          case INT32:
          case INT64:
          case FLOAT:
          case DOUBLE:
            valueBuffer.get(values[i]);
            break;
          case TEXT:
            int length = valueBuffer.getInt();
            values[i] = ReadWriteIOUtils.readBytes(valueBuffer, length);
            break;
          default:
            throw new UnSupportedDataTypeException(
                String
                    .format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
        }
      }
    }
    rowsIndex++;
    hasCachedRecord = true;
  }

  public boolean isNull(int columnIndex) throws StatementExecutionException {
    int index = columnOrdinalMap.get(findColumnNameByIndex(columnIndex)) - START_INDEX;
    // time column will never be null
    if (index < 0) {
      return true;
    }
    return isNull(index, rowsIndex - 1);
  }

  public boolean isNull(String columnName) throws StatementExecutionException {
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    // time column will never be null
    if (index < 0) {
      return true;
    }
    return isNull(index, rowsIndex - 1);
  }

  private boolean isNull(int index, int rowNum) {
    byte bitmap = currentBitmap[index];
    int shift = rowNum % 8;
    return ((FLAG >>> shift) & (bitmap & 0xff)) == 0;
  }

  public boolean getBoolean(int columnIndex) throws StatementExecutionException {
    return getBoolean(findColumnNameByIndex(columnIndex));
  }

  public boolean getBoolean(String columnName) throws StatementExecutionException {
    checkRecord();
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    if (!isNull(index, rowsIndex - 1)) {
      lastReadWasNull = false;
      return BytesUtils.bytesToBool(values[index]);
    } else {
      lastReadWasNull = true;
      return false;
    }
  }

  public double getDouble(int columnIndex) throws StatementExecutionException {
    return getDouble(findColumnNameByIndex(columnIndex));
  }

  public double getDouble(String columnName) throws StatementExecutionException {
    checkRecord();
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    if (!isNull(index, rowsIndex - 1)) {
      lastReadWasNull = false;
      return BytesUtils.bytesToDouble(values[index]);
    } else {
      lastReadWasNull = true;
      return 0;
    }
  }

  public float getFloat(int columnIndex) throws StatementExecutionException {
    return getFloat(findColumnNameByIndex(columnIndex));
  }

  public float getFloat(String columnName) throws StatementExecutionException {
    checkRecord();
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    if (!isNull(index, rowsIndex - 1)) {
      lastReadWasNull = false;
      return BytesUtils.bytesToFloat(values[index]);
    } else {
      lastReadWasNull = true;
      return 0;
    }
  }

  public int getInt(int columnIndex) throws StatementExecutionException {
    return getInt(findColumnNameByIndex(columnIndex));
  }

  public int getInt(String columnName) throws StatementExecutionException {
    checkRecord();
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    if (!isNull(index, rowsIndex - 1)) {
      lastReadWasNull = false;
      return BytesUtils.bytesToInt(values[index]);
    } else {
      lastReadWasNull = true;
      return 0;
    }
  }

  public long getLong(int columnIndex) throws StatementExecutionException {
    return getLong(findColumnNameByIndex(columnIndex));
  }

  public long getLong(String columnName) throws StatementExecutionException {
    checkRecord();
    if (columnName.equals(TIMESTAMP_STR)) {
      return BytesUtils.bytesToLong(time);
    }
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    if (!isNull(index, rowsIndex - 1)) {
      lastReadWasNull = false;
      return BytesUtils.bytesToLong(values[index]);
    } else {
      lastReadWasNull = true;
      return 0;
    }
  }

  public Object getObject(int columnIndex) throws StatementExecutionException {
    return getObject(findColumnNameByIndex(columnIndex));
  }

  public Object getObject(String columnName) throws StatementExecutionException {
    return getValueByName(columnName);
  }

  public String getString(int columnIndex) throws StatementExecutionException {
    return getString(findColumnNameByIndex(columnIndex));
  }

  public String getString(String columnName) throws StatementExecutionException {
    return getValueByName(columnName);
  }

  public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException {
    return new Timestamp(getLong(columnIndex));
  }

  public Timestamp getTimestamp(String columnName) throws StatementExecutionException {
    return getTimestamp(findColumn(columnName));
  }

  public int findColumn(String columnName) {
    return columnOrdinalMap.get(columnName);
  }

  public String getValueByName(String columnName) throws StatementExecutionException {
    checkRecord();
    if (columnName.equals(TIMESTAMP_STR)) {
      return String.valueOf(BytesUtils.bytesToLong(time));
    }
    int index = columnOrdinalMap.get(columnName) - START_INDEX;
    if (index < 0 || index >= values.length || isNull(index, rowsIndex - 1)) {
      lastReadWasNull = true;
      return null;
    }
    lastReadWasNull = false;
    return getString(index, columnTypeDeduplicatedList.get(index), values);
  }

  public String getString(int index, TSDataType tsDataType, byte[][] values) {
    switch (tsDataType) {
      case BOOLEAN:
        return String.valueOf(BytesUtils.bytesToBool(values[index]));
      case INT32:
        return String.valueOf(BytesUtils.bytesToInt(values[index]));
      case INT64:
        return String.valueOf(BytesUtils.bytesToLong(values[index]));
      case FLOAT:
        return String.valueOf(BytesUtils.bytesToFloat(values[index]));
      case DOUBLE:
        return String.valueOf(BytesUtils.bytesToDouble(values[index]));
      case TEXT:
        return new String(values[index]);
      default:
        return null;
    }
  }

  public String findColumnNameByIndex(int columnIndex) throws StatementExecutionException {
    if (columnIndex <= 0) {
      throw new StatementExecutionException("column index should start from 1");
    }
    if (columnIndex > columnNameList.size()) {
      throw new StatementExecutionException(
          String.format("column index %d out of range %d", columnIndex, columnNameList.size()));
    }
    return columnNameList.get(columnIndex - 1);
  }

  public void checkRecord() throws StatementExecutionException {
    if (Objects.isNull(tsQueryDataSet)) {
      throw new StatementExecutionException("No record remains");
    }
  }

  public void setTsQueryDataSet(TSQueryDataSet tsQueryDataSet) {
    this.tsQueryDataSet = tsQueryDataSet;
  }
}
