/**
 *   Copyright 2015 Pivotal Software, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 **/

#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
#define QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_

#include <algorithm>
#include <cstddef>
#include <iterator>
#include <utility>
#include <vector>

#include "query_execution/QueryExecutionTypedefs.hpp"
#include "utility/Macros.hpp"

namespace quickstep {

/** \addtogroup QueryExecution
 *  @{
 */
/**
 * @brief A class which keeps the metadata about the workers.
 *
 * @note The number of workorders per worker used in this class are as viewed by
 *       Foreman. An alternative implementation can use TMB to query the queue
 *       lengths of each worker.
 *
 * @note This class is intended to be used only by Foreman thread. Therefore
 *       none of the methods in this class are thread-safe.
 **/
class WorkerDirectory {
 public:
  /**
   * @brief Constructor.
   *
   * @param num_workers The number of workers.
   * @param client_ids A vector of TMB client IDs for the workers.
   * @param numa_node_ids A vector of NUMA node IDs where the workers are
   *        pinned. If a worker is not pinned to any NUMA node, the NUMA node ID
   *        for that worker is -1.
   **/
  WorkerDirectory(const std::size_t num_workers,
                  const std::vector<client_id> &client_ids,
                  const std::vector<int> &numa_node_ids)
    : num_workers_(num_workers),
      num_queued_workorders_(num_workers, 0),
      numa_node_ids_(numa_node_ids),
      client_ids_(client_ids) {
    DEBUG_ASSERT(num_workers > 0);
    DEBUG_ASSERT(client_ids.size() == num_workers);
    DEBUG_ASSERT(numa_node_ids.size() == num_workers);
  }

  /**
   * @brief Get the number of worker threads.
   *
   * @return The number of worker threads at the time the function is called.
   **/
  inline const std::size_t getNumWorkers() const {
    return num_workers_;
  }

  /**
   * @brief Get the number of queued workorders for the given worker.
   *
   * @note The queued number of workorders consist of the workorders waiting for
   *       execution and the workorder being executed by the specified worker
   *       at the time this function is called.
   *
   * @param worker_id The logical ID of the given worker.
   *
   * @return The number of queued workorders.
   **/
  inline const std::size_t getNumQueuedWorkOrders(
      const std::size_t worker_id) const {
    DEBUG_ASSERT(worker_id < num_workers_);
    return num_queued_workorders_[worker_id];
  }

  /**
   * @brief Increment the number of queued workorders for the given worker by 1.
   *
   * @param worker_id The logical ID of the given worker.
   **/
  inline void incrementNumQueuedWorkOrders(const std::size_t worker_id) {
    DEBUG_ASSERT(worker_id < num_workers_);
    ++num_queued_workorders_[worker_id];
  }

  /**
   * @brief Decrement the number of queued workorders for the given worker by 1.
   *
   * @param worker_id The logical ID of the given worker.
   **/
  inline void decrementNumQueuedWorkOrders(const std::size_t worker_id) {
    DEBUG_ASSERT(worker_id < num_workers_);
    DEBUG_ASSERT(num_queued_workorders_[worker_id] >= 1);
    --num_queued_workorders_[worker_id];
  }

  /**
   * @brief Get the NUMA node where the specified worker is pinned to.
   *
   * @param worker_id The logical ID of the given worker.
   *
   * @return The NUMA node ID where the given worker is pinned. If the worker
   *         hasn't been pinned to any NUMA node, this value is -1.
   **/
  inline int getNUMANode(const std::size_t worker_id) const {
    DEBUG_ASSERT(worker_id < num_workers_);
    return numa_node_ids_[worker_id];
  }

  /**
   * @brief Get the TMB client ID of the specified worker.
   *
   * @param worker_id The logical ID of the given worker.
   *
   * @return The TMB client ID of the given worker.
   **/
  inline const client_id getClientID(const std::size_t worker_id) const {
    DEBUG_ASSERT(worker_id < num_workers_);
    return client_ids_[worker_id];
  }

  /**
   * @brief Generate address of a worker.
   *
   * @param worker_id The logical ID of the given worker.
   *
   * @return TMB Address of the given worker.
   **/
  inline Address getWorkerAddress(std::size_t worker_id) const {
    DEBUG_ASSERT(worker_id < num_workers_);
    Address worker_address;
    worker_address.AddRecipient(client_ids_[worker_id]);
    return worker_address;
  }

  /**
   * @brief Add metadata about a new worker.
   *
   * @param cid The TMB client ID of the new worker.
   * @param numa_node_id The NUMA node ID where the new worker is pinned.
   **/
  inline void addWorker(const client_id cid, const int numa_node_id) {
    ++num_workers_;
    num_queued_workorders_.push_back(0);
    numa_node_ids_.push_back(numa_node_id);
    client_ids_.push_back(cid);
  }

  /**
   * @brief Get information about the least loaded worker at the time the
   *        function is called.
   *
   * TODO(harshad) This method performs a linear search. Using a dynamic
   *               priority queue to track worker loads can help this method
   *               execute faster.
   * TODO(harshad) Extend this method to find least loaded workers on a given
   *               NUMA node.
   *
   * @note If there are multiple workers with the least amount of load, the
   *       worker with the smaller logical ID is returned.
   *
   * @return A pair: First element is the logical ID of the least loaded worker
   *         and second element is the number of queued workorders for this
   *         worker at the time the function is called.
   **/
  std::pair<std::size_t, std::size_t> getLeastLoadedWorker() const {
    std::vector<std::size_t>::const_iterator min_element_iter =
        std::min_element(std::begin(num_queued_workorders_),
                         std::end(num_queued_workorders_));
    DEBUG_ASSERT(min_element_iter != num_queued_workorders_.end());
    const std::size_t least_loaded_worker_id =
        std::distance(num_queued_workorders_.begin(), min_element_iter);
    return std::make_pair(least_loaded_worker_id, *min_element_iter);
  }

  /**
   * @brief Get information about the most loaded worker at the time the
   *        function is called.
   *
   * TODO(harshad) This method performs a linear search. Using a dynamic
   *               priority queue to track worker loads can help this method
   *               execute faster.
   * TODO(harshad) Extend this method to find most loaded workers on a given
   *               NUMA node.
   *
   * @note If there are multiple workers with the maximum amount of load, the
   *       worker with the smaller logical ID is returned.
   *
   * @return A pair: First element is the logical ID of the most loaded worker
   *         and second element is the number of queued workorders for this
   *         worker at the time the function is called.
   **/
  std::pair<std::size_t, std::size_t> getMostLoadedWorker() const {
    std::vector<std::size_t>::const_iterator max_element_iter =
        std::max_element(std::begin(num_queued_workorders_),
                         std::end(num_queued_workorders_));
    DEBUG_ASSERT(max_element_iter != num_queued_workorders_.end());
    const std::size_t most_loaded_worker_id =
        std::distance(num_queued_workorders_.begin(), max_element_iter);
    return std::make_pair(most_loaded_worker_id, *max_element_iter);
  }

 private:
  std::size_t num_workers_;

  // The number of WorkOrders queued for execution per worker.
  std::vector<std::size_t> num_queued_workorders_;

  // The NUMA node IDs where the workers are pinned to. If a worker is not
  // pinned, the ID is -1.
  std::vector<int> numa_node_ids_;

  // The vector of client IDs
  std::vector<client_id> client_ids_;

  DISALLOW_COPY_AND_ASSIGN(WorkerDirectory);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
