blob: be5cfc1bc0ed290b0c1c24cf15731a9d3359617c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
#define QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_
#include <algorithm>
#include <cstddef>
#include <iterator>
#include <utility>
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A class which keeps the metadata about the workers.
*
* @note The number of workorders per worker used in this class are as viewed by
* Foreman. An alternative implementation can use TMB to query the queue
* lengths of each worker.
*
* @note This class is intended to be used only by Foreman thread. Therefore
* none of the methods in this class are thread-safe.
**/
class WorkerDirectory {
public:
/**
* @brief Constructor.
*
* @param num_workers The number of workers.
* @param client_ids A vector of TMB client IDs for the workers.
* @param numa_node_ids A vector of NUMA node IDs where the workers are
* pinned. If a worker is not pinned to any NUMA node, the NUMA node ID
* for that worker is -1.
**/
WorkerDirectory(const std::size_t num_workers,
const std::vector<client_id> &client_ids,
const std::vector<int> &numa_node_ids)
: num_workers_(num_workers),
num_queued_workorders_(num_workers, 0),
numa_node_ids_(numa_node_ids),
client_ids_(client_ids) {
DEBUG_ASSERT(num_workers > 0);
DEBUG_ASSERT(client_ids.size() == num_workers);
DEBUG_ASSERT(numa_node_ids.size() == num_workers);
}
/**
* @brief Get the number of worker threads.
*
* @return The number of worker threads at the time the function is called.
**/
inline const std::size_t getNumWorkers() const {
return num_workers_;
}
/**
* @brief Get the number of queued workorders for the given worker.
*
* @note The queued number of workorders consist of the workorders waiting for
* execution and the workorder being executed by the specified worker
* at the time this function is called.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return The number of queued workorders.
**/
inline const std::size_t getNumQueuedWorkOrders(
const std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
return num_queued_workorders_[worker_thread_index];
}
/**
* @brief Increment the number of queued workorders for the given worker by 1.
*
* @param worker_thread_index The logical ID of the given worker.
**/
inline void incrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
DEBUG_ASSERT(worker_thread_index < num_workers_);
++num_queued_workorders_[worker_thread_index];
}
/**
* @brief Decrement the number of queued workorders for the given worker by 1.
*
* @param worker_thread_index The logical ID of the given worker.
**/
inline void decrementNumQueuedWorkOrders(const std::size_t worker_thread_index) {
DEBUG_ASSERT(worker_thread_index < num_workers_);
DEBUG_ASSERT(num_queued_workorders_[worker_thread_index] >= 1);
--num_queued_workorders_[worker_thread_index];
}
/**
* @brief Get the NUMA node where the specified worker is pinned to.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return The NUMA node ID where the given worker is pinned. If the worker
* hasn't been pinned to any NUMA node, this value is -1.
**/
inline int getNUMANode(const std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
return numa_node_ids_[worker_thread_index];
}
/**
* @brief Get the TMB client ID of the specified worker.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return The TMB client ID of the given worker.
**/
inline const client_id getClientID(const std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
return client_ids_[worker_thread_index];
}
/**
* @brief Generate address of a worker.
*
* @param worker_thread_index The logical ID of the given worker.
*
* @return TMB Address of the given worker.
**/
inline Address getWorkerAddress(std::size_t worker_thread_index) const {
DEBUG_ASSERT(worker_thread_index < num_workers_);
Address worker_address;
worker_address.AddRecipient(client_ids_[worker_thread_index]);
return worker_address;
}
/**
* @brief Add metadata about a new worker.
*
* @param cid The TMB client ID of the new worker.
* @param numa_node_id The NUMA node ID where the new worker is pinned.
**/
inline void addWorker(const client_id cid, const int numa_node_id) {
++num_workers_;
num_queued_workorders_.push_back(0);
numa_node_ids_.push_back(numa_node_id);
client_ids_.push_back(cid);
}
/**
* @brief Get information about the least loaded worker at the time the
* function is called.
*
* TODO(harshad) This method performs a linear search. Using a dynamic
* priority queue to track worker loads can help this method
* execute faster.
* TODO(harshad) Extend this method to find least loaded workers on a given
* NUMA node.
*
* @note If there are multiple workers with the least amount of load, the
* worker with the smaller logical ID is returned.
*
* @return A pair: First element is the logical ID of the least loaded worker
* and second element is the number of queued workorders for this
* worker at the time the function is called.
**/
std::pair<std::size_t, std::size_t> getLeastLoadedWorker() const {
std::vector<std::size_t>::const_iterator min_element_iter =
std::min_element(std::begin(num_queued_workorders_),
std::end(num_queued_workorders_));
DEBUG_ASSERT(min_element_iter != num_queued_workorders_.end());
const std::size_t least_loaded_worker_thread_index =
std::distance(num_queued_workorders_.begin(), min_element_iter);
return std::make_pair(least_loaded_worker_thread_index, *min_element_iter);
}
/**
* @brief Get information about the most loaded worker at the time the
* function is called.
*
* TODO(harshad) This method performs a linear search. Using a dynamic
* priority queue to track worker loads can help this method
* execute faster.
* TODO(harshad) Extend this method to find most loaded workers on a given
* NUMA node.
*
* @note If there are multiple workers with the maximum amount of load, the
* worker with the smaller logical ID is returned.
*
* @return A pair: First element is the logical ID of the most loaded worker
* and second element is the number of queued workorders for this
* worker at the time the function is called.
**/
std::pair<std::size_t, std::size_t> getMostLoadedWorker() const {
std::vector<std::size_t>::const_iterator max_element_iter =
std::max_element(std::begin(num_queued_workorders_),
std::end(num_queued_workorders_));
DEBUG_ASSERT(max_element_iter != num_queued_workorders_.end());
const std::size_t most_loaded_worker_thread_index =
std::distance(num_queued_workorders_.begin(), max_element_iter);
return std::make_pair(most_loaded_worker_thread_index, *max_element_iter);
}
private:
std::size_t num_workers_;
// The number of WorkOrders queued for execution per worker.
std::vector<std::size_t> num_queued_workorders_;
// The NUMA node IDs where the workers are pinned to. If a worker is not
// pinned, the ID is -1.
std::vector<int> numa_node_ids_;
// The vector of client IDs
std::vector<client_id> client_ids_;
DISALLOW_COPY_AND_ASSIGN(WorkerDirectory);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_WORKER_DIRECTORY_HPP_