/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
#define QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_

#include <cstddef>
#include <list>
#include <memory>
#include <queue>
#include <vector>

#include "query_execution/WorkOrderSelectionPolicy.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"

#include "glog/logging.h"

namespace quickstep {

/** \addtogroup QueryExecution
 *  @{
 */

 /**
  * @brief A container to hold the WorkOrders for a given query.
  * @note This container stays alive during the lifetime of the query.
  * @note The NUMA node indexing is assumed to start from 0.
  **/
class WorkOrdersContainer {
 public:
  /**
   * @brief Constructor
   *
   * @param num_operators Number of operators in the query DAG.
   * @param num_numa_nodes Number of NUMA nodes in the system.
   **/
  WorkOrdersContainer(const std::size_t num_operators,
                      const std::size_t num_numa_nodes)
      : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) {
    DEBUG_ASSERT(num_operators != 0);
    for (std::size_t op = 0; op < num_operators; ++op) {
      normal_workorders_.push_back(
          new OperatorWorkOrdersContainer(num_numa_nodes_));
      rebuild_workorders_.push_back(
          new OperatorWorkOrdersContainer(num_numa_nodes_));
    }
  }

  /**
   * @brief Destructor.
   *
   * @note If the query is executed normally, we should never encounter a
   *       situation where at the time of deletion the WorkOrdersContainer has
   *       pending WorkOrders.
   **/
  ~WorkOrdersContainer();

  /**
   * @brief Check if there are some pending WorkOrders for the given operator.
   *
   * @param operator_index Index of the operator.
   *
   * @return If there are pending WorkOrders.
   **/
  inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
    DCHECK_LT(operator_index, num_operators_);
    return normal_workorders_[operator_index].hasWorkOrder();
  }

  /**
   * @brief Check if there are some pending normal (i.e. non-rebuild) WorkOrders
   *        for the given operator which prefer the specified NUMA node.
   *
   * @param operator_index Index of the operator.
   * @param numa_node_id The NUMA node.
   *
   * @return If there are pending WorkOrders for the given operator which prefer
   *         numa_node_id.
   **/
  inline bool hasNormalWorkOrderForNUMANode(
      const std::size_t operator_index, const int numa_node_id) const {
    DCHECK_LT(operator_index, num_operators_);
    DCHECK_GE(numa_node_id, 0);
    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
    return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
        numa_node_id);
  }

  /**
   * @brief Check if there are some pending rebuild WorkOrders for the given
   *        operator.
   *
   * @param operator_index Index of the operator.
   *
   * @return If there are pending rebuild WorkOrders.
   **/
  inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
    DCHECK_LT(operator_index, num_operators_);
    return rebuild_workorders_[operator_index].hasWorkOrder();
  }

  /**
   * @brief Check if there are some pending rebuild WorkOrders for the given
   *        operator which prefer the specified NUMA node.
   *
   * @param operator_index Index of the operator.
   * @param numa_node_id The NUMA node.
   *
   * @return If there are pending rebuild WorkOrders for the given operator which
   *         prefer numa_node_id.
   **/
  inline bool hasRebuildWorkOrderForNUMANode(
      const std::size_t operator_index, const int numa_node_id) const {
    DCHECK_LT(operator_index, num_operators_);
    DCHECK_GE(numa_node_id, 0);
    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
    return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
        numa_node_id);
  }

  /**
   * @brief Get a normal (non-rebuild) WorkOrder for a given operator which
   *        prefer the given NUMA node.
   *
   * @param operator_index The index of the operator.
   * @param numa_node_id The NUMA node.
   *
   * @return A WorkOrder which prefers numa_node_id. If no such WorkOrder is
   *         available, return nullptr. The caller is responsible for taking the
   *         ownership.
   **/
  WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
                                           const int numa_node_id) {
    DCHECK_LT(operator_index, num_operators_);
    DCHECK_GE(numa_node_id, 0);
    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
    return normal_workorders_[operator_index].getWorkOrderForNUMANode(
        numa_node_id);
  }

  /**
   * @brief Get a normal (non-rebuild) WorkOrder for a given operator.
   *
   * @param operator_index The index of the operator.
   * @param prefer_single_NUMA_node If true, first try to get workorders which
   *        prefer exactly one NUMA node over workorders which list more than
   *        one NUMA node as their preference.
   *
   * @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
   *         caller is responsible for taking the ownership.
   **/
  WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
                                const bool prefer_single_NUMA_node = true) {
    DCHECK_LT(operator_index, num_operators_);
    return normal_workorders_[operator_index].getWorkOrder(
        prefer_single_NUMA_node);
  }

  WorkOrder* getNextWorkOrder(std::size_t *operator_index, bool *is_rebuild) {
    if (rebuild_work_orders_policy_.hasWorkOrder()) {
      *operator_index = rebuild_work_orders_policy_.getOperatorIndexForNextWorkOrder();
      *is_rebuild = true;
      return rebuild_workorders_[*operator_index].getWorkOrder();
    }

    if (normal_work_orders_policy_.hasWorkOrder()) {
      *operator_index = normal_work_orders_policy_.getOperatorIndexForNextWorkOrder();
      *is_rebuild = false;
      return normal_workorders_[*operator_index].getWorkOrder();
    }

    return nullptr;
  }

  /**
   * @brief Get a rebuild WorkOrder for a given operator whch prefer the
   *        specified NUMA node.
   *
   * @param operator_index The index of the operator.
   * @param numa_node_id The NUMA node.
   *
   * @return A WorkOrder that prefers numa_node_id. If no such WorkOrder is
   *         available, return nullptr. The caller is responsible for taking the
   *         ownership.
   **/
  WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
                                            const int numa_node_id) {
    DCHECK_LT(operator_index, num_operators_);
    DCHECK_GE(numa_node_id, 0);
    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
    return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
        numa_node_id);
  }

  /**
   * @brief Get a rebuild WorkOrder for the given operator
   *
   * @param operator_index The index of the operator.
   * @param prefer_single_NUMA_node If true, first try to get workorders which
   *        prefer exactly one NUMA node over workorders which list more than
   *        one NUMA node as their preference.
   *
   * @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
   *         caller is responsible for taking the ownership.
   **/
  WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
                                 const bool prefer_single_NUMA_node = true) {
    DCHECK_LT(operator_index, num_operators_);
    return rebuild_workorders_[operator_index].getWorkOrder(
        prefer_single_NUMA_node);
  }

  /**
   * @brief Add a normal (non-rebuild) WorkOrder generated from a given
   *        operator.
   *
   * @note Take the ownership of \p workorder.
   * @note The workorder to be added contains information about its preferred
   *       NUMA nodes. This information is used to insert the WorkOrder
   *       appropriately.
   *
   * @param workorder A pointer to the WorkOrder to be added.
   * @param operator_index The index of the operator in the query DAG.
   **/
  void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
    DCHECK(workorder != nullptr);
    DCHECK_LT(operator_index, num_operators_);
    normal_workorders_[operator_index].addWorkOrder(workorder);
    normal_work_orders_policy_.addWorkOrder(operator_index);
  }

  /**
   * @brief Add a rebuild WorkOrder generated from a given operator.
   *
   * @note Take the ownership of \p workorder.
   * @note The workorder to be added contains information about its preferred
   *       NUMA nodes. This information is used to insert the WorkOrder
   *       appropriately.
   *
   * @param workorder A pointer to the WorkOrder to be added.
   * @param operator_index The index of the operator in the query DAG.
   **/
  void addRebuildWorkOrder(WorkOrder *workorder,
                           const std::size_t operator_index) {
    DCHECK(workorder != nullptr);
    DCHECK_LT(operator_index, num_operators_);
    rebuild_workorders_[operator_index].addWorkOrder(workorder);
    rebuild_work_orders_policy_.addWorkOrder(operator_index);
  }

  /**
   * @brief Get the number of pending normal WorkOrders for a given operator
   *        which prefer the specified NUMA node.
   *
   * @param operator_index The index of the operator.
   * @param numa_node_id The NUMA node.
   *
   * @return The number of pending WorkOrders which prefer numa_node_id.
   **/
  inline std::size_t getNumNormalWorkOrdersForNUMANode(
      const std::size_t operator_index, const int numa_node_id) const {
    DCHECK_LT(operator_index, num_operators_);
    DCHECK_GE(numa_node_id, 0);
    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
    return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
        numa_node_id);
  }

  /**
   * @brief Get the number of all pending normal (i.e. non-rebuild) WorkOrders
   *        for a given operator.
   *
   * @param operator_index The index of the operator.
   *
   * @return The number of pending normal WorkOrders.
   **/
  inline std::size_t getNumNormalWorkOrders(
      const std::size_t operator_index) const {
    DCHECK_LT(operator_index, num_operators_);
    return normal_workorders_[operator_index].getNumWorkOrders();
  }

  /**
   * @brief Get the number of pending rebuild WorkOrders for a given operator
   *        which prefer the specified NUMA node.
   *
   * @param operator_index The index of the operator.
   * @param numa_node_id The NUMA node.
   *
   * @return The number of pending WorkOrders which prefer numa_node_id.
   **/
  inline std::size_t getNumRebuildWorkOrdersForNUMANode(
      const std::size_t operator_index, const int numa_node_id) const {
    DCHECK_LT(operator_index, num_operators_);
    DCHECK_GE(numa_node_id, 0);
    DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
    return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
        numa_node_id);
  }

  /**
   * @brief Get the number of all pending rebuild WorkOrders for a given
   *        operator.
   *
   * @param operator_index The index of the operator.
   *
   * @return The number of pending rebuild WorkOrders.
   **/
  inline std::size_t getNumRebuildWorkOrders(
      const std::size_t operator_index) const {
    DCHECK_LT(operator_index, num_operators_);
    return rebuild_workorders_[operator_index].getNumWorkOrders();
  }

  /**
   * @brief Get the total number of work orders for the given operator.
   *
   * @param operator_index The index of the operator.
   *
   * @return The total number of WorkOrders (normal + rebuild).
   **/
  inline std::size_t getNumTotalWorkOrders(
      const std::size_t operator_index) const {
    return getNumNormalWorkOrders(operator_index) +
           getNumRebuildWorkOrders(operator_index);
  }

 private:
  /**
   * @brief An internal queue-based container structure to hold the WorkOrders.
   **/
  class InternalQueueContainer {
   public:
    InternalQueueContainer() {
    }

    inline void addWorkOrder(WorkOrder *workorder) {
      workorders_.emplace(std::unique_ptr<WorkOrder>(workorder));
    }

    inline WorkOrder* getWorkOrder() {
      if (workorders_.empty()) {
        return nullptr;
      }

      WorkOrder *work_order = workorders_.front().release();
      workorders_.pop();
      return work_order;
    }

    inline bool hasWorkOrder() const {
      return !workorders_.empty();
    }

    inline std::size_t getNumWorkOrders() const {
      return workorders_.size();
    }

   private:
    std::queue<std::unique_ptr<WorkOrder>> workorders_;

    DISALLOW_COPY_AND_ASSIGN(InternalQueueContainer);
  };

  /**
   * @brief An internal list-based container structure to hold the WorkOrders.
   *
   * @note We use this class for WorkOrders whose inputs come from multiple NUMA
   *       nodes e.g. a HashJoinWorkOrder whose probe block and the hash table
   *       may reside on two different NUMA nodes.
   **/
  class InternalListContainer {
   public:
    InternalListContainer() {
    }

    inline void addWorkOrder(WorkOrder *workorder) {
      workorders_.emplace_back(std::unique_ptr<WorkOrder>(workorder));
    }

    inline WorkOrder* getWorkOrder() {
      if (workorders_.empty()) {
        return nullptr;
      }

      WorkOrder *work_order = workorders_.front().release();
      workorders_.pop_front();
      return work_order;
    }

    /**
     * @note This method has O(N) complexity.
     **/
    WorkOrder* getWorkOrderForNUMANode(const int numa_node);

    inline bool hasWorkOrder() const {
      return !workorders_.empty();
    }

    /**
     * @brief Check if numa_node is in the set of preferred NUMA nodes for at
     *        least one WorkOrder in this container.
     *
     * @note This method has O(N) complexity.
     **/
    bool hasWorkOrderForNUMANode(const int numa_node) const;

    inline std::size_t getNumWorkOrders() const {
      return workorders_.size();
    }

    /**
     * @brief Return the number of WorkOrders that list the numa_node as one of
     *        the preferred nodes of execution.
     **/
    std::size_t getNumWorkOrdersForNUMANode(const int numa_node) const;

   private:
    std::list<std::unique_ptr<WorkOrder>> workorders_;

    DISALLOW_COPY_AND_ASSIGN(InternalListContainer);
  };

  /**
   * @brief A container to hold all the WorkOrders generated by one operator.
   **/
  class OperatorWorkOrdersContainer {
   public:
    explicit OperatorWorkOrdersContainer(const std::size_t num_numa_nodes)
        : num_numa_nodes_(num_numa_nodes) {
      for (std::size_t numa_node = 0; numa_node < num_numa_nodes; ++numa_node) {
        single_numa_node_workorders_.push_back(new InternalQueueContainer());
      }
    }

    void addWorkOrder(WorkOrder *workorder);

    bool hasWorkOrderForNUMANode(const int numa_node_id) const {
      DCHECK_GE(numa_node_id, 0);
      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
      return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
             multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
                 numa_node_id);
    }

    bool hasWorkOrder() const {
      if (!(numa_agnostic_workorders_.hasWorkOrder() ||
            multiple_numa_nodes_workorders_.hasWorkOrder())) {
        for (std::size_t i = 0; i < num_numa_nodes_; ++i) {
          if (hasWorkOrderForNUMANode(i)) {
            return true;
          }
        }
        return false;
      }
      return true;
    }

    std::size_t getNumWorkOrdersForNUMANode(
        const int numa_node_id) const {
      DCHECK_GE(numa_node_id, 0);
      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
      return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
             multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
                 numa_node_id);
    }

    inline std::size_t getNumWorkOrders() const {
      std::size_t num_workorders = numa_agnostic_workorders_.getNumWorkOrders();
      // for each NUMA node involved ..
      for (PtrVector<InternalQueueContainer>::const_iterator it =
               single_numa_node_workorders_.begin();
           it != single_numa_node_workorders_.end();
           ++it) {
        // Add the number of workorders for the NUMA node.
        num_workorders += it->getNumWorkOrders();
      }
      // Add the number of workorders who have multiple NUMA nodes as input.
      num_workorders += multiple_numa_nodes_workorders_.getNumWorkOrders();
      return num_workorders;
    }

    WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
      DCHECK_GE(numa_node_id, 0);
      DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
      WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
      if (work_order == nullptr) {
        work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(
            numa_node_id);
      }
      return work_order;
    }

    WorkOrder* getWorkOrder(const bool prefer_single_NUMA_node = true);

   private:
    WorkOrder* getSingleNUMANodeWorkOrderHelper();

    const std::size_t num_numa_nodes_;

    // A container to store NUMA agnostic workorders.
    InternalQueueContainer numa_agnostic_workorders_;

    // A vector of containers to store workorders which prefer exactly one NUMA
    // node for execution.
    PtrVector<InternalQueueContainer> single_numa_node_workorders_;

    // A container to store workorders which prefer more than one NUMA node for
    // execution.
    InternalListContainer multiple_numa_nodes_workorders_;
    // Note that no workorder should be shared among the *_workorders_ structures.

    DISALLOW_COPY_AND_ASSIGN(OperatorWorkOrdersContainer);
  };

  const std::size_t num_operators_;
  const std::size_t num_numa_nodes_;

  PtrVector<OperatorWorkOrdersContainer> normal_workorders_;
  PtrVector<OperatorWorkOrdersContainer> rebuild_workorders_;

  LifoWorkOrderSelectionPolicy normal_work_orders_policy_;
  FifoWorkOrderSelectionPolicy rebuild_work_orders_policy_;

  DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
};

/** @} */

}  // namespace quickstep


#endif  // QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
