/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#ifndef QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_

#include <cctype>
#include <cstddef>
#include <exception>
#include <string>
#include <vector>

#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"

#include "tmb/id_typedefs.h"

namespace tmb { class MessageBus; }

namespace quickstep {

class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
class WorkOrderProtosContainer;
class WorkOrdersContainer;

namespace serialization { class WorkOrder; }

/** \addtogroup RelationalOperators
 *  @{
 */

/**
 * @brief Exception thrown when a text file can't be opened for reading.
 **/
class TextScanReadError : public std::exception {
 public:
  explicit TextScanReadError(const std::string &filename)
      : message_("TextScanReadError: Unable to read file ") {
    message_.append(filename);
    message_.append(" for text scan");
  }

  ~TextScanReadError() throw() {}

  virtual const char* what() const throw() {
    return message_.c_str();
  }

 private:
  std::string message_;
};

/**
 * @brief Exception thrown when improperly formatted data is encountered while
 *        scanning a text file.
 **/
class TextScanFormatError : public std::exception {
 public:
  explicit TextScanFormatError(const std::string reason)
      : message_("TextScanFormatError: Malformed data encountered during text scan (") {
    message_.append(reason);
    message_.push_back(')');
  }

  ~TextScanFormatError() throw() {}

  virtual const char* what() const throw() {
    return message_.c_str();
  }

 private:
  std::string message_;
};

/**
 * @brief An operator which reads tuples from a text file and inserts them into
 *        a relation.
 **/
class TextScanOperator : public RelationalOperator {
 public:
  /**
   * @brief Constructor
   *
   * @param query_id The ID of the query to which this operator belongs.
   * @param file_pattern The glob-like file pattern of the sources to load. The
   *        pattern could include * (wildcard for multiple chars) and ?
   *        (wildcard for single char). It defaults to single file load, if a
   *        file is specified.
   * @param field_terminator The string which separates attribute values in
   *        the text file.
   * @param process_escape_sequences Whether to decode escape sequences in the
   *        text file.
   * @param output_relation The output relation.
   * @param output_destination_index The index of the InsertDestination in the
   *        QueryContext to insert tuples.
   **/
  TextScanOperator(const std::size_t query_id,
                   const std::string &file_pattern,
                   const char field_terminator,
                   const bool process_escape_sequences,
                   const CatalogRelation &output_relation,
                   const QueryContext::insert_destination_id output_destination_index)
      : RelationalOperator(query_id, 1u, output_relation.getNumPartitions() != 1u /* has_repartition */,
                           output_relation.getNumPartitions()),
        file_pattern_(file_pattern),
        field_terminator_(field_terminator),
        process_escape_sequences_(process_escape_sequences),
        output_relation_(output_relation),
        output_destination_index_(output_destination_index),
        work_generated_(false) {}

  ~TextScanOperator() override {}

  OperatorType getOperatorType() const override {
    return kTextScan;
  }

  std::string getName() const override {
    return "TextScanOperator";
  }

  bool getAllWorkOrders(WorkOrdersContainer *container,
                        QueryContext *query_context,
                        StorageManager *storage_manager,
                        const tmb::client_id scheduler_client_id,
                        tmb::MessageBus *bus) override;

  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;

  QueryContext::insert_destination_id getInsertDestinationID() const override {
    return output_destination_index_;
  }

  const relation_id getOutputRelationID() const override {
    return output_relation_.getID();
  }

 private:
  serialization::WorkOrder* createWorkOrderProto(const std::string &filename,
                                                 const std::size_t text_offset,
                                                 const std::size_t text_segment_size);

  const std::string file_pattern_;
  const char field_terminator_;
  const bool process_escape_sequences_;

  const CatalogRelation &output_relation_;
  const QueryContext::insert_destination_id output_destination_index_;

  bool work_generated_;

  DISALLOW_COPY_AND_ASSIGN(TextScanOperator);
};

/**
 * @brief A WorkOrder produced by TextScanOperator
 **/
class TextScanWorkOrder : public WorkOrder {
 public:
  /**
   * @brief Constructor
   *
   * @param query_id The ID of the query to which this WorkOrder belongs.
   * @param filename The name of the text file to bulk insert.
   * @param text_offset The start position in the text file to start text scan.
   * @param text_segment_size The size of text segment to be scanned.
   * @param field_terminator The character which separates attribute values in
   *        the text file.
   * @param process_escape_sequences Whether to decode escape sequences in the
   *        text file.
   * @param output_destination The InsertDestination to insert tuples.
   * @param hdfs The HDFS connector via libhdfs3.
   **/
  TextScanWorkOrder(
      const std::size_t query_id,
      const std::string &filename,
      const std::size_t text_offset,
      const std::size_t text_segment_size,
      const char field_terminator,
      const bool process_escape_sequences,
      InsertDestination *output_destination,
      void *hdfs = nullptr)
      : WorkOrder(query_id),
        filename_(filename),
        text_offset_(text_offset),
        text_segment_size_(text_segment_size),
        field_terminator_(field_terminator),
        process_escape_sequences_(process_escape_sequences),
        output_destination_(DCHECK_NOTNULL(output_destination)),
        hdfs_(hdfs) {}

  ~TextScanWorkOrder() override {}

  // FIXME(chasseur): Rollback if an exception is thrown after some tuples have
  // already been inserted.
  /**
   * @exception TextScanReadError The text file could not be opened for
   *            reading.
   * @exception TextScanFormatError Malformed data was encountered in the
   *            text file.
   * @exception TupleTooLargeForBlock A tuple in the text file was too large
   *            to fit in a StorageBlock.
   **/
  void execute() override;

 private:
  /**
   * @brief Extract a field string starting at \p *field_ptr. This method also
   *        expands escape sequences if \p process_escape_sequences_ is true.
   *        Throws TextScanFormatError if text was malformed.
   *
   * @param field_ptr \p *field_ptr points to the current position of the input
   *        char stream for parsing. The overall char stream must end with a
   *        newline character. After the call, \p *field_ptr will be modified to
   *        the start position of the NEXT field string.
   * @param is_null_literal OUTPUT parameter. Set to true if the NULL-literal
   *        string "\N" was found.
   * @param has_reached_end_of_line OUTPUT parameter. Set to true if the newline
   *        character was encountered.
   * @param field_string OUTPUT parameter. Set to the extracted field string.
   */
  void extractFieldString(const char **field_ptr,
                          bool *is_null_literal,
                          bool *has_reached_end_of_line,
                          std::string *field_string) const;

  /**
   * @brief This method helps incorporate fault tolerance while ingesting data.
   *        It is called whenever a faulty row is encountered and it is
   *        required to move \p *field_ptr to the next row.
   *
   * @param field_ptr \p *field_ptr points to the current position of the input
   *        char stream while parsing a faulty row. After the call, \p *field_ptr
   *        will be modified to the start position of the NEXT record in the
   *        stream.
   */
  void skipFaultyRow(const char **field_ptr) const;

  /**
   * @brief Make a tuple by parsing all of the individual fields from a char stream.
   *
   * @param \p *row_ptr points to the current position of the input char stream
   *        for parsing. The overall char stream must end with a newline character.
   *        After the call, \p *row_ptr will be modified to the start position of
   *        the NEXT text row.
   * @param relation The relation schema for the tuple.
   * @param is_faulty OUTPUT parameter. Set to true if the row is faulty,
   * @return The tuple parsed from the char stream.
   */
  std::vector<TypedValue> parseRow(const char **row_ptr,
                                   const CatalogRelationSchema &relation,
                                   bool *is_faulty) const;

  /**
   * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
   *        a char literal. \p *literal_ptr will be modified to the last position
   *        of the parsed octal digits.
   *
   * @param literal_ptr \p *literal_ptr points to the current position of the
   *        input char stream for parsing. The overall char stream must end with
   *        a newline character.
   * @return The char literal from the parsed octal digits.
   */
  inline static char ParseOctalLiteral(const char **literal_ptr) {
    int value = 0;
    const char *ptr = *literal_ptr;
    for (int i = 0; i < 3; ++i, ++ptr) {
      const int char_value = *ptr - '0';
      if ((char_value >= 0) && (char_value < 8)) {
        value = value * 8 + char_value;
      } else {
        break;
      }
    }
    *literal_ptr = ptr - 1;
    return value;
  }

  /**
   * @brief Parse up to two hexadecimal digits (0-F, case insensitive) starting
   *        at \p *literal_ptr as a char literal. \p *literal_ptr will be modified
   *        to the last position of the parsed octal digits.
   *
   * @param literal_ptr \p *literal_ptr points to the current position of the
   *        input char stream for parsing. The overall char stream must end with
   *        a newline character.
   * @return The char literal from the parsed hexadecimal digits.
   */
  inline static char ParseHexLiteral(const char **literal_ptr) {
    int value = 0;
    const char *ptr = *literal_ptr;
    for (int i = 0; i < 2; ++i, ++ptr) {
      const char c = *ptr;
      int char_value;
      if (std::isdigit(c)) {
        char_value = c - '0';
      } else if (c >= 'a' && c <= 'f') {
        char_value = c - 'a' + 10;
      } else if (c >= 'A' && c <= 'F') {
        char_value = c - 'A' + 10;
      } else {
        break;
      }
      value = value * 16 + char_value;
    }
    *literal_ptr = ptr - 1;
    return value;
  }

  const std::string filename_;
  const std::size_t text_offset_;
  const std::size_t text_segment_size_;
  const char field_terminator_;
  const bool process_escape_sequences_;

  InsertDestination *output_destination_;

  // Not owned.
  void *hdfs_;

  DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
