/**
 *   Copyright 2011-2015 Quickstep Technologies LLC.
 *   Copyright 2015-2016 Pivotal Software, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 **/

#ifndef QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <exception>
#include <string>

#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"
#include "utility/ThreadSafeQueue.hpp"

#include "glog/logging.h"

#include "tmb/id_typedefs.h"

namespace tmb { class MessageBus; }

namespace quickstep {

class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
class WorkOrderProtosContainer;
class WorkOrdersContainer;

/** \addtogroup RelationalOperators
 *  @{
 */

/**
 * @brief Exception thrown when a text file can't be opened for reading.
 **/
class TextScanReadError : public std::exception {
 public:
  explicit TextScanReadError(const std::string &filename)
      : message_("TextScanReadError: Unable to read file ") {
    message_.append(filename);
    message_.append(" for text scan");
  }

  ~TextScanReadError() throw() {}

  virtual const char* what() const throw() {
    return message_.c_str();
  }

 private:
  std::string message_;
};

/**
 * @brief Exception thrown when improperly formatted data is encountered while
 *        scanning a text file.
 **/
class TextScanFormatError : public std::exception {
 public:
  explicit TextScanFormatError(const std::string reason)
      : message_("TextScanFormatError: Malformed data encountered during text scan (") {
    message_.append(reason);
    message_.push_back(')');
  }

  ~TextScanFormatError() throw() {}

  virtual const char* what() const throw() {
    return message_.c_str();
  }

 private:
  std::string message_;
};

/**
 * @brief A structure for text data blobs.
 */
struct TextBlob {
  TextBlob(const block_id text_blob_id, const std::size_t text_size)
      : blob_id(text_blob_id), size(text_size) {}
  block_id blob_id;
  std::size_t size;
};

/**
 * @brief An operator which reads tuples from a text file and inserts them into
 *        a relation.
 **/
class TextScanOperator : public RelationalOperator {
 public:
  enum FeedbackMessageType : WorkOrder::FeedbackMessageType {
    kNewTextBlobMessage,
    kSplitWorkOrderCompletionMessage,
  };

  /**
   * @brief Constructor
   *
   * @param file_pattern The glob-like file pattern of the sources to load. The
   *        pattern could include * (wildcard for multiple chars) and ?
   *        (wildcard for single char). It defaults to single file load, if a
   *        file is specified.
   * @param field_terminator The string which separates attribute values in
   *        the text file.
   * @param process_escape_sequences Whether to decode escape sequences in the
   *        text file.
   * @param parallelize_load Parallelize the load process by th spliting file
   *        into blobs, and generating separate work-orders for each of them.
   * @param output_relation The output relation.
   * @param output_destination_index The index of the InsertDestination in the
   *        QueryContext to insert tuples.
   **/
  TextScanOperator(const std::string &file_pattern,
                   const char field_terminator,
                   const bool process_escape_sequences,
                   const bool parallelize_load,
                   const CatalogRelation &output_relation,
                   const QueryContext::insert_destination_id output_destination_index)
      : file_pattern_(file_pattern),
        field_terminator_(field_terminator),
        process_escape_sequences_(process_escape_sequences),
        parallelize_load_(parallelize_load),
        output_relation_(output_relation),
        output_destination_index_(output_destination_index),
        num_done_split_work_orders_(0),
        num_split_work_orders_(0),
        work_generated_(false) {}

  ~TextScanOperator() override {}

  bool getAllWorkOrders(WorkOrdersContainer *container,
                        QueryContext *query_context,
                        StorageManager *storage_manager,
                        const tmb::client_id foreman_client_id,
                        const tmb::client_id agent_client_id,
                        tmb::MessageBus *bus) override;

  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;

  QueryContext::insert_destination_id getInsertDestinationID() const override {
    return output_destination_index_;
  }

  const relation_id getOutputRelationID() const override {
    return output_relation_.getID();
  }

  void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override;

 private:
  const std::string file_pattern_;
  const char field_terminator_;
  const bool process_escape_sequences_;
  const bool parallelize_load_;

  const CatalogRelation &output_relation_;
  const QueryContext::insert_destination_id output_destination_index_;

  ThreadSafeQueue<TextBlob> text_blob_queue_;
  std::atomic<std::uint32_t> num_done_split_work_orders_;
  std::uint32_t num_split_work_orders_;

  // Indicates if work order to load file is generated for non-parallel load, and
  // if work order to split file to blobs is generated for parallel load.
  bool work_generated_;

  DISALLOW_COPY_AND_ASSIGN(TextScanOperator);
};

/**
 * @brief A WorkOrder produced by TextScanOperator
 **/
class TextScanWorkOrder : public WorkOrder {
 public:
  /**
   * @brief Constructor
   *
   * @param filename The name of the text file to bulk insert.
   * @param field_terminator The string which separates attribute values in
   *        the text file.
   * @param process_escape_sequences Whether to decode escape sequences in the
   *        text file.
   * @param output_destination The InsertDestination to insert tuples.
   * @param storage_manager The StorageManager to use.
   **/
  TextScanWorkOrder(
      const std::string &filename,
      const char field_terminator,
      const bool process_escape_sequences,
      InsertDestination *output_destination,
      StorageManager *storage_manager);

  /**
   * @brief Constructor.
   *
   * @param text_blob Blob ID containing the data to be scanned.
   * @param text_size Size of the data in the blob.
   * @param field_terminator The character which separates attribute values in
   *        the text file.
   * @param process_escape_sequences Whether to decode escape sequences in the
   *        text file.
   * @param output_destination The InsertDestination to write the read tuples.
   * @param storage_manager The StorageManager to use.
   */
  TextScanWorkOrder(
      const block_id text_blob,
      const std::size_t text_size,
      const char field_terminator,
      const bool process_escape_sequences,
      InsertDestination *output_destination,
      StorageManager *storage_manager);

  ~TextScanWorkOrder() override {}

  // FIXME(chasseur): Rollback if an exception is thrown after some tuples have
  // already been inserted.
  /**
   * @exception TextScanReadError The text file could not be opened for
   *            reading.
   * @exception TextScanFormatError Malformed data was encountered in the
   *            text file.
   * @exception TupleTooLargeForBlock A tuple in the text file was too large
   *            to fit in a StorageBlock.
   **/
  void execute() override;

 private:
  // Parse up to three octal digits (0-7) starting at '*start_pos' in
  // 'row_string' as a char literal. '*start_pos' will be modified to
  // the first position AFTER the parsed octal digits.
  static char ParseOctalLiteral(const std::string &row_string,
                                std::size_t *start_pos);

  // Parse up to two hexadecimal digits (0-F, case insensitive) starting at
  // '*start_pos' in 'row_string' as a char literal. '*start_pos' will be
  // modified to the first position AFTER the parsed hexadecimal digits.
  static char ParseHexLiteral(const std::string &row_string,
                              std::size_t *start_pos);

  // Read the next text row from the open FILE stream '*file' into
  // '*row_string'. Returns false if end-of-file is reached and there are no
  // more rows, true if a row string was successfully read. For ease of
  // parsing, '*row_string' has the trailing row-terminator removed and
  // replaced with a field-terminator.
  bool readRowFromFile(FILE *file, std::string *row_string) const;

  // Read the next text from blob memory starting at '**start_pos' and ending
  // at '*end_pos' into '*row_string'. Returns false if the end of the blob is
  // reached and there are no more rows, true if a row was successfully read.
  // For ease of parsing, '*row_string' has the trailing row-terminator removed
  // and replaced with a field-terminator. After call '*start_pos' points to
  // first character AFTER the read row in the blob.
  bool readRowFromBlob(const char **start_pos,
                       const char *end_pos,
                       std::string *row_string) const;

  // Trim a row-terminator (newline or carriage-return + newline) off the end
  // of '*row_string'. Returns true if the row-terminator was successfully
  // removed, false if '*row_string' did not end in a row-terminator.
  bool removeRowTerminator(std::string *row_string) const;

  // Extract a field string starting at '*start_pos' in 'row_string' into
  // '*field_string'. This method also expands escape sequences if
  // 'process_escape_sequences_' is true. Returns true if a field string was
  // successfully extracted, false in the special case where the NULL-literal
  // string "\N" was found. Throws TextScanFormatError if text was malformed.
  bool extractFieldString(const std::string &row_string,
                          std::size_t *start_pos,
                          std::string *field_string) const;

  // Make a tuple by parsing all of the individual fields specified in
  // 'row_string'.
  Tuple parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const;

  const bool is_file_;
  const std::string filename_;
  const char field_terminator_;
  const block_id text_blob_;
  const std::size_t text_size_;
  const bool process_escape_sequences_;

  InsertDestination *output_destination_;
  StorageManager *storage_manager_;

  DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};

/**
 * @brief A WorkOrder to split the file into blobs of text that can be processed
 * separately.
 **/
class TextSplitWorkOrder : public WorkOrder {
 public:
  /**
   * @brief Constructor.
   * @param filename File to split into row-aligned blobs.
   * @param process_escape_sequences Whether to decode escape sequences in the
   *        text file.
   * @param storage_manager The StorageManager to use.
   * @param operator_index Operator index of the current operator. This is used
   *                       to send new-work available message to Foreman.
   * @param foreman_client_id The TMB client ID of the foreman thread.
   * @param agent_client_id The TMB client ID of the agent that sends messages
   *        to Foreman.
   * @param bus A pointer to the TMB.
   */
  TextSplitWorkOrder(const std::string &filename,
                     const bool process_escape_sequences,
                     StorageManager *storage_manager,
                     const std::size_t operator_index,
                     const tmb::client_id foreman_client_id,
                     const tmb::client_id agent_client_id,
                     MessageBus *bus)
      : filename_(filename),
        process_escape_sequences_(process_escape_sequences),
        storage_manager_(DCHECK_NOTNULL(storage_manager)),
        operator_index_(operator_index),
        foreman_client_id_(foreman_client_id),
        agent_client_id_(agent_client_id),
        bus_(DCHECK_NOTNULL(bus)) {}

  /**
   * @exception TextScanReadError The text file could not be opened for
   *            reading.
   */
  void execute() override;

 private:
  // Allocate a new blob.
  void allocateBlob();

  // Find the last row terminator in current blob.
  std::size_t findLastRowTerminator();

  // Send the blob info to its operator via TMB.
  void sendBlobInfoToOperator(const bool write_row_aligned);

  // Get the writeable address (unwritten chunk) in current blob.
  inline char* writeableBlobAddress() {
    return static_cast<char*>(text_blob_->getMemoryMutable()) + written_;
  }

  // Number of bytes remaining to be written.
  inline std::size_t remainingBlobBytes() const {
    return blob_size_ - written_;
  }

  const std::string filename_;  // File to split.
  const bool process_escape_sequences_;

  StorageManager *storage_manager_;

  const std::size_t operator_index_;  // Opeartor index.
  const tmb::client_id foreman_client_id_, agent_client_id_;
  MessageBus *bus_;

  MutableBlobReference text_blob_;  // Mutable reference to current blob.
  block_id text_blob_id_;  // Current blob ID.
  std::size_t written_ = 0;  // Bytes written in current blob.
  std::size_t blob_size_ = 0;  // Size of the current blob.

  DISALLOW_COPY_AND_ASSIGN(TextSplitWorkOrder);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
