/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#ifndef QUICKSTEP_RELATIONAL_OPERATORS_WORK_UNIT_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_WORK_UNIT_HPP_

#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <utility>
#include <vector>

#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"

#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/tagged_message.h"

namespace quickstep {

/** \addtogroup RelationalOperators
 *  @{
 */

/**
 * @brief A single unit of work in a query plan, produced by a
 *        RelationalOperator. Where possible, WorkOrders should be of
 *        single-block granularity to maximize the opportunity for parallelism.
 **/
class WorkOrder {
 public:
  /**
   * @brief Type of feedback message sent to relational operator.
   *
   * @note This is a per-operator type that is set and understood only by the
   *       relational operator.
   */
  typedef std::uint16_t FeedbackMessageType;

  /**
   * @brief Header struct used in feedback message.
   *
   * @note This is a per-operator type that is set and understood only by the
   *       relational operator.
   */
  struct FeedbackMessageHeader {
    std::size_t query_id;
    std::size_t rel_op_index;
    std::size_t payload_size;
    FeedbackMessageType payload_type;

    /**
     * @brief Header constructor.
     *
     * @param query_id The ID of the query.
     * @param relational_op_index Index of the relation operator.
     * @param payload_size Size of the payload of the message.
     * @param payload_type Type of payload.
     */
    FeedbackMessageHeader(const std::size_t query_id,
                          const std::size_t relational_op_index,
                          const std::size_t payload_size,
                          const FeedbackMessageType payload_type)
        : query_id(query_id),
          rel_op_index(relational_op_index),
          payload_size(payload_size),
          payload_type(payload_type) {}
  };

  /**
   * @brief A generic tagged message that can be sent from work order to
   *        relational operator.
   **/
  class FeedbackMessage {
   public:
    /**
     * @brief Feedback message constructor.
     *
     * @param type Type of the message.
     * @param query_id The ID of the query.
     * @param rel_op_index Relational operator index.
     * @param payload Blob of payload.
     * @param payload_size Size of the payload blob.
     * @param ownership Whether to take ownership of the payload blob.
     */
    FeedbackMessage(const FeedbackMessageType type,
                    const std::size_t query_id,
                    const std::size_t rel_op_index,
                    void *payload,
                    const std::size_t payload_size,
                    const bool ownership = true)
        : header_(query_id, rel_op_index, payload_size, type),
          payload_(payload),
          ownership_(ownership) {}

    /**
     * @brief Deserializing feedback message constructor.
     *
     * @param serialized_bytes Serialized byte stream of feedback message.
     * @param num_bytes Number of bytes in stream.
     * @param copy_payload Copy payload and take ownership.
     */
    FeedbackMessage(void *serialized_bytes,
                    const std::size_t num_bytes,
                    const bool copy_payload = false)
        : header_(*static_cast<const FeedbackMessageHeader *>(serialized_bytes)),
          ownership_(copy_payload) {
      CHECK_EQ(num_bytes, sizeof(header_) + header_.payload_size);
      if (copy_payload) {
        payload_ = static_cast<char *>(std::malloc(header_.payload_size));
        std::memcpy(payload_,
                    static_cast<const char *>(serialized_bytes) + sizeof(header_),
                    header_.payload_size);
      } else {
        payload_ = static_cast<char *>(serialized_bytes) + sizeof(header_);
      }
    }

    /**
     * @brief Move constructor.
     *
     * @param orig Message to be moved.
     */
    FeedbackMessage(FeedbackMessage &&orig)
        : header_(orig.header_),
          payload_(orig.payload_),
          ownership_(orig.ownership_) {
      orig.header_.payload_size = 0;
      orig.payload_ = nullptr;
    }

    /**
     * @brief Move assignment.
     *
     * @param orig Message to be moved.
     */
    FeedbackMessage& operator=(FeedbackMessage &&orig) {
      if (this != &orig) {
        if (ownership_) {
          std::free(payload_);
        }

        header_ = orig.header_;
        payload_ = orig.payload_;
        ownership_ = orig.ownership_;
        orig.payload_ = nullptr;
        orig.header_.payload_size = 0;
      }
      return *this;
    }

    /**
     * @brief Destructor.
     */
    virtual ~FeedbackMessage() {
      if (ownership_) {
        std::free(payload_);
      }
    }

    /**
     * @brief Serialize the feedback message into a byte stream.
     *
     * @return A pair containing the message stream and the message size.
     *
     * @note The caller is responsible for freeing the byte stream.
     */
    std::pair<void*, std::size_t> serializeMessage() const {
      char *msg = static_cast<char *>(
          std::malloc(sizeof(header_) + header_.payload_size));
      CHECK(nullptr != msg) << "Unable to allocate byte stream.";
      std::memcpy(msg, &header_, sizeof(header_));
      std::memcpy(msg + sizeof(header_), payload_, header_.payload_size);
      return std::make_pair(msg, sizeof(header_) + header_.payload_size);
    }

    /**
     * @brief Message type accessor.
     */
    FeedbackMessageType type() const { return header_.payload_type; }

    /**
     * @brief Header accessor.
     */
    const FeedbackMessageHeader& header() const { return header_; }

    /**
     * @brief Payload accessor.
     */
    const void* payload() const { return payload_; }

    /**
     * @brief Payload size accessor.
     */
    std::size_t payload_size() const { return header_.payload_size; }

    /**
     * @brief Ownership.
     */
    bool ownership() const { return ownership_; }

    /**
     * @brief Drop ownership.
     *
     * @note The entity calling this should take responsibility of freeing the
     * payload.
     */
    void dropOwnership() {
      ownership_ = false;
    }

   private:
    FeedbackMessageHeader header_;
    void* payload_;
    bool ownership_;

    DISALLOW_COPY_AND_ASSIGN(FeedbackMessage);
  };

  /**
   * @brief Virtual destructor.
   **/
  virtual ~WorkOrder() {}

  /**
   * @brief Run this WorkOrder in the calling thread.
   **/
  virtual void execute() = 0;

  /**
   * @brief Get the preferred NUMA node(s) where this WorkOrder should be
   *        executed.
   *
   * @return A vector of preferred NUMA nodes. An empty vector indicates that
   *         the WorkOrder can be executed on any NUMA node.
   **/
  const std::vector<int>& getPreferredNUMANodes() const {
    return preferred_numa_nodes_;
  }

  /**
   * @brief Send message to relational operator.
   *
   * @param bus A pointer to the TMB.
   * @param sender_id The client ID of the sender.
   * @param receiver_id The client ID of the receiver.
   * @param feedback_msg Feedback message to be sent to relational operator.
   **/
  static void SendFeedbackMessage(tmb::MessageBus *bus,
                                  tmb::client_id sender_id,
                                  tmb::client_id receiver_id,
                                  const FeedbackMessage &feedback_msg) {
    std::pair<void *, std::size_t> stream = feedback_msg.serializeMessage();

    tmb::TaggedMessage msg;
    msg.acquire_message(stream.first, stream.second, kWorkOrderFeedbackMessage);

    tmb::Address receiver_address;
    receiver_address.AddRecipient(receiver_id);

    tmb::MessageStyle single_receiver_style;

    DCHECK(bus != nullptr);
    DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage to Scheduler with Client " << receiver_id;
    const tmb::MessageBus::SendStatus send_status =
        bus->Send(sender_id,
                  receiver_address,
                  single_receiver_style,
                  std::move(msg));
    CHECK(send_status == tmb::MessageBus::SendStatus::kOK);
  }

  /**
   * @brief Get the ID of the query which this WorkOder belongs to.
   **/
  inline std::size_t getQueryID() const {
    return query_id_;
  }

  /**
   * @brief Get the partition id.
   *
   * @return The partition id.
   */
  partition_id getPartitionId() const {
    return partition_id_;
  }

 protected:
  /**
   * @brief Constructor.
   *
   * @param query_id The ID of the query to which this WorkOrder belongs.
   * @param part_id The partition id.
   **/
  explicit WorkOrder(const std::size_t query_id,
                     const partition_id part_id = 0)
      : query_id_(query_id),
        partition_id_(part_id) {}

  const std::size_t query_id_;
  const partition_id partition_id_;

  // A vector of preferred NUMA node IDs where this workorder should be executed.
  // These node IDs typically indicate the NUMA node IDs of the input(s) of the
  // workorder. Derived classes should ensure that there are no duplicate entries
  // in this vector.
  std::vector<int> preferred_numa_nodes_;

 private:
  DISALLOW_COPY_AND_ASSIGN(WorkOrder);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_RELATIONAL_OPERATORS_WORK_UNIT_HPP_
