blob: e8d5ff8d5cb2d75f0d0cc43287071bfae14479f7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
#define QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_
#include <cstddef>
#include <list>
#include <memory>
#include <queue>
#include <vector>
#include "query_execution/WorkOrderSelectionPolicy.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
#include "glog/logging.h"
namespace quickstep {
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A container to hold the WorkOrders for a given query.
* @note This container stays alive during the lifetime of the query.
* @note The NUMA node indexing is assumed to start from 0.
**/
class WorkOrdersContainer {
public:
/**
* @brief Constructor
*
* @param num_operators Number of operators in the query DAG.
* @param num_numa_nodes Number of NUMA nodes in the system.
**/
WorkOrdersContainer(const std::size_t num_operators,
const std::size_t num_numa_nodes)
: num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) {
DEBUG_ASSERT(num_operators != 0);
for (std::size_t op = 0; op < num_operators; ++op) {
normal_workorders_.push_back(
new OperatorWorkOrdersContainer(num_numa_nodes_));
rebuild_workorders_.push_back(
new OperatorWorkOrdersContainer(num_numa_nodes_));
}
}
/**
* @brief Destructor.
*
* @note If the query is executed normally, we should never encounter a
* situation where at the time of deletion the WorkOrdersContainer has
* pending WorkOrders.
**/
~WorkOrdersContainer();
/**
* @brief Check if there are some pending WorkOrders for the given operator.
*
* @param operator_index Index of the operator.
*
* @return If there are pending WorkOrders.
**/
inline bool hasNormalWorkOrder(const std::size_t operator_index) const {
DCHECK_LT(operator_index, num_operators_);
return normal_workorders_[operator_index].hasWorkOrder();
}
/**
* @brief Check if there are some pending normal (i.e. non-rebuild) WorkOrders
* for the given operator which prefer the specified NUMA node.
*
* @param operator_index Index of the operator.
* @param numa_node_id The NUMA node.
*
* @return If there are pending WorkOrders for the given operator which prefer
* numa_node_id.
**/
inline bool hasNormalWorkOrderForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
DCHECK_LT(operator_index, num_operators_);
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return normal_workorders_[operator_index].hasWorkOrderForNUMANode(
numa_node_id);
}
/**
* @brief Check if there are some pending rebuild WorkOrders for the given
* operator.
*
* @param operator_index Index of the operator.
*
* @return If there are pending rebuild WorkOrders.
**/
inline bool hasRebuildWorkOrder(const std::size_t operator_index) const {
DCHECK_LT(operator_index, num_operators_);
return rebuild_workorders_[operator_index].hasWorkOrder();
}
/**
* @brief Check if there are some pending rebuild WorkOrders for the given
* operator which prefer the specified NUMA node.
*
* @param operator_index Index of the operator.
* @param numa_node_id The NUMA node.
*
* @return If there are pending rebuild WorkOrders for the given operator which
* prefer numa_node_id.
**/
inline bool hasRebuildWorkOrderForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
DCHECK_LT(operator_index, num_operators_);
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return rebuild_workorders_[operator_index].hasWorkOrderForNUMANode(
numa_node_id);
}
/**
* @brief Get a normal (non-rebuild) WorkOrder for a given operator which
* prefer the given NUMA node.
*
* @param operator_index The index of the operator.
* @param numa_node_id The NUMA node.
*
* @return A WorkOrder which prefers numa_node_id. If no such WorkOrder is
* available, return nullptr. The caller is responsible for taking the
* ownership.
**/
WorkOrder* getNormalWorkOrderForNUMANode(const std::size_t operator_index,
const int numa_node_id) {
DCHECK_LT(operator_index, num_operators_);
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return normal_workorders_[operator_index].getWorkOrderForNUMANode(
numa_node_id);
}
/**
* @brief Get a normal (non-rebuild) WorkOrder for a given operator.
*
* @param operator_index The index of the operator.
* @param prefer_single_NUMA_node If true, first try to get workorders which
* prefer exactly one NUMA node over workorders which list more than
* one NUMA node as their preference.
*
* @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
* caller is responsible for taking the ownership.
**/
WorkOrder* getNormalWorkOrder(const std::size_t operator_index,
const bool prefer_single_NUMA_node = true) {
DCHECK_LT(operator_index, num_operators_);
return normal_workorders_[operator_index].getWorkOrder(
prefer_single_NUMA_node);
}
WorkOrder* getNextWorkOrder(std::size_t *operator_index, bool *is_rebuild) {
if (rebuild_work_orders_policy_.hasWorkOrder()) {
*operator_index = rebuild_work_orders_policy_.getOperatorIndexForNextWorkOrder();
*is_rebuild = true;
return rebuild_workorders_[*operator_index].getWorkOrder();
}
if (normal_work_orders_policy_.hasWorkOrder()) {
*operator_index = normal_work_orders_policy_.getOperatorIndexForNextWorkOrder();
*is_rebuild = false;
return normal_workorders_[*operator_index].getWorkOrder();
}
return nullptr;
}
/**
* @brief Get a rebuild WorkOrder for a given operator whch prefer the
* specified NUMA node.
*
* @param operator_index The index of the operator.
* @param numa_node_id The NUMA node.
*
* @return A WorkOrder that prefers numa_node_id. If no such WorkOrder is
* available, return nullptr. The caller is responsible for taking the
* ownership.
**/
WorkOrder* getRebuildWorkOrderForNUMANode(const std::size_t operator_index,
const int numa_node_id) {
DCHECK_LT(operator_index, num_operators_);
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return rebuild_workorders_[operator_index].getWorkOrderForNUMANode(
numa_node_id);
}
/**
* @brief Get a rebuild WorkOrder for the given operator
*
* @param operator_index The index of the operator.
* @param prefer_single_NUMA_node If true, first try to get workorders which
* prefer exactly one NUMA node over workorders which list more than
* one NUMA node as their preference.
*
* @return A WorkOrder. If no WorkOrder is available, returns nullptr. The
* caller is responsible for taking the ownership.
**/
WorkOrder* getRebuildWorkOrder(const std::size_t operator_index,
const bool prefer_single_NUMA_node = true) {
DCHECK_LT(operator_index, num_operators_);
return rebuild_workorders_[operator_index].getWorkOrder(
prefer_single_NUMA_node);
}
/**
* @brief Add a normal (non-rebuild) WorkOrder generated from a given
* operator.
*
* @note Take the ownership of \p workorder.
* @note The workorder to be added contains information about its preferred
* NUMA nodes. This information is used to insert the WorkOrder
* appropriately.
*
* @param workorder A pointer to the WorkOrder to be added.
* @param operator_index The index of the operator in the query DAG.
**/
void addNormalWorkOrder(WorkOrder *workorder, const std::size_t operator_index) {
DCHECK(workorder != nullptr);
DCHECK_LT(operator_index, num_operators_);
normal_workorders_[operator_index].addWorkOrder(workorder);
normal_work_orders_policy_.addWorkOrder(operator_index);
}
/**
* @brief Add a rebuild WorkOrder generated from a given operator.
*
* @note Take the ownership of \p workorder.
* @note The workorder to be added contains information about its preferred
* NUMA nodes. This information is used to insert the WorkOrder
* appropriately.
*
* @param workorder A pointer to the WorkOrder to be added.
* @param operator_index The index of the operator in the query DAG.
**/
void addRebuildWorkOrder(WorkOrder *workorder,
const std::size_t operator_index) {
DCHECK(workorder != nullptr);
DCHECK_LT(operator_index, num_operators_);
rebuild_workorders_[operator_index].addWorkOrder(workorder);
rebuild_work_orders_policy_.addWorkOrder(operator_index);
}
/**
* @brief Get the number of pending normal WorkOrders for a given operator
* which prefer the specified NUMA node.
*
* @param operator_index The index of the operator.
* @param numa_node_id The NUMA node.
*
* @return The number of pending WorkOrders which prefer numa_node_id.
**/
inline std::size_t getNumNormalWorkOrdersForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
DCHECK_LT(operator_index, num_operators_);
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return normal_workorders_[operator_index].getNumWorkOrdersForNUMANode(
numa_node_id);
}
/**
* @brief Get the number of all pending normal (i.e. non-rebuild) WorkOrders
* for a given operator.
*
* @param operator_index The index of the operator.
*
* @return The number of pending normal WorkOrders.
**/
inline std::size_t getNumNormalWorkOrders(
const std::size_t operator_index) const {
DCHECK_LT(operator_index, num_operators_);
return normal_workorders_[operator_index].getNumWorkOrders();
}
/**
* @brief Get the number of pending rebuild WorkOrders for a given operator
* which prefer the specified NUMA node.
*
* @param operator_index The index of the operator.
* @param numa_node_id The NUMA node.
*
* @return The number of pending WorkOrders which prefer numa_node_id.
**/
inline std::size_t getNumRebuildWorkOrdersForNUMANode(
const std::size_t operator_index, const int numa_node_id) const {
DCHECK_LT(operator_index, num_operators_);
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return rebuild_workorders_[operator_index].getNumWorkOrdersForNUMANode(
numa_node_id);
}
/**
* @brief Get the number of all pending rebuild WorkOrders for a given
* operator.
*
* @param operator_index The index of the operator.
*
* @return The number of pending rebuild WorkOrders.
**/
inline std::size_t getNumRebuildWorkOrders(
const std::size_t operator_index) const {
DCHECK_LT(operator_index, num_operators_);
return rebuild_workorders_[operator_index].getNumWorkOrders();
}
/**
* @brief Get the total number of work orders for the given operator.
*
* @param operator_index The index of the operator.
*
* @return The total number of WorkOrders (normal + rebuild).
**/
inline std::size_t getNumTotalWorkOrders(
const std::size_t operator_index) const {
return getNumNormalWorkOrders(operator_index) +
getNumRebuildWorkOrders(operator_index);
}
private:
/**
* @brief An internal queue-based container structure to hold the WorkOrders.
**/
class InternalQueueContainer {
public:
InternalQueueContainer() {
}
inline void addWorkOrder(WorkOrder *workorder) {
workorders_.emplace(std::unique_ptr<WorkOrder>(workorder));
}
inline WorkOrder* getWorkOrder() {
if (workorders_.empty()) {
return nullptr;
}
WorkOrder *work_order = workorders_.front().release();
workorders_.pop();
return work_order;
}
inline bool hasWorkOrder() const {
return !workorders_.empty();
}
inline std::size_t getNumWorkOrders() const {
return workorders_.size();
}
private:
std::queue<std::unique_ptr<WorkOrder>> workorders_;
DISALLOW_COPY_AND_ASSIGN(InternalQueueContainer);
};
/**
* @brief An internal list-based container structure to hold the WorkOrders.
*
* @note We use this class for WorkOrders whose inputs come from multiple NUMA
* nodes e.g. a HashJoinWorkOrder whose probe block and the hash table
* may reside on two different NUMA nodes.
**/
class InternalListContainer {
public:
InternalListContainer() {
}
inline void addWorkOrder(WorkOrder *workorder) {
workorders_.emplace_back(std::unique_ptr<WorkOrder>(workorder));
}
inline WorkOrder* getWorkOrder() {
if (workorders_.empty()) {
return nullptr;
}
WorkOrder *work_order = workorders_.front().release();
workorders_.pop_front();
return work_order;
}
/**
* @note This method has O(N) complexity.
**/
WorkOrder* getWorkOrderForNUMANode(const int numa_node);
inline bool hasWorkOrder() const {
return !workorders_.empty();
}
/**
* @brief Check if numa_node is in the set of preferred NUMA nodes for at
* least one WorkOrder in this container.
*
* @note This method has O(N) complexity.
**/
bool hasWorkOrderForNUMANode(const int numa_node) const;
inline std::size_t getNumWorkOrders() const {
return workorders_.size();
}
/**
* @brief Return the number of WorkOrders that list the numa_node as one of
* the preferred nodes of execution.
**/
std::size_t getNumWorkOrdersForNUMANode(const int numa_node) const;
private:
std::list<std::unique_ptr<WorkOrder>> workorders_;
DISALLOW_COPY_AND_ASSIGN(InternalListContainer);
};
/**
* @brief A container to hold all the WorkOrders generated by one operator.
**/
class OperatorWorkOrdersContainer {
public:
explicit OperatorWorkOrdersContainer(const std::size_t num_numa_nodes)
: num_numa_nodes_(num_numa_nodes) {
for (std::size_t numa_node = 0; numa_node < num_numa_nodes; ++numa_node) {
single_numa_node_workorders_.push_back(new InternalQueueContainer());
}
}
void addWorkOrder(WorkOrder *workorder);
bool hasWorkOrderForNUMANode(const int numa_node_id) const {
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return single_numa_node_workorders_[numa_node_id].hasWorkOrder() ||
multiple_numa_nodes_workorders_.hasWorkOrderForNUMANode(
numa_node_id);
}
bool hasWorkOrder() const {
if (!(numa_agnostic_workorders_.hasWorkOrder() ||
multiple_numa_nodes_workorders_.hasWorkOrder())) {
for (std::size_t i = 0; i < num_numa_nodes_; ++i) {
if (hasWorkOrderForNUMANode(i)) {
return true;
}
}
return false;
}
return true;
}
std::size_t getNumWorkOrdersForNUMANode(
const int numa_node_id) const {
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
return single_numa_node_workorders_[numa_node_id].getNumWorkOrders() +
multiple_numa_nodes_workorders_.getNumWorkOrdersForNUMANode(
numa_node_id);
}
inline std::size_t getNumWorkOrders() const {
std::size_t num_workorders = numa_agnostic_workorders_.getNumWorkOrders();
// for each NUMA node involved ..
for (PtrVector<InternalQueueContainer>::const_iterator it =
single_numa_node_workorders_.begin();
it != single_numa_node_workorders_.end();
++it) {
// Add the number of workorders for the NUMA node.
num_workorders += it->getNumWorkOrders();
}
// Add the number of workorders who have multiple NUMA nodes as input.
num_workorders += multiple_numa_nodes_workorders_.getNumWorkOrders();
return num_workorders;
}
WorkOrder* getWorkOrderForNUMANode(const int numa_node_id) {
DCHECK_GE(numa_node_id, 0);
DCHECK_LT(static_cast<std::size_t>(numa_node_id), num_numa_nodes_);
WorkOrder *work_order = single_numa_node_workorders_[numa_node_id].getWorkOrder();
if (work_order == nullptr) {
work_order = multiple_numa_nodes_workorders_.getWorkOrderForNUMANode(
numa_node_id);
}
return work_order;
}
WorkOrder* getWorkOrder(const bool prefer_single_NUMA_node = true);
private:
WorkOrder* getSingleNUMANodeWorkOrderHelper();
const std::size_t num_numa_nodes_;
// A container to store NUMA agnostic workorders.
InternalQueueContainer numa_agnostic_workorders_;
// A vector of containers to store workorders which prefer exactly one NUMA
// node for execution.
PtrVector<InternalQueueContainer> single_numa_node_workorders_;
// A container to store workorders which prefer more than one NUMA node for
// execution.
InternalListContainer multiple_numa_nodes_workorders_;
// Note that no workorder should be shared among the *_workorders_ structures.
DISALLOW_COPY_AND_ASSIGN(OperatorWorkOrdersContainer);
};
const std::size_t num_operators_;
const std::size_t num_numa_nodes_;
PtrVector<OperatorWorkOrdersContainer> normal_workorders_;
PtrVector<OperatorWorkOrdersContainer> rebuild_workorders_;
LifoWorkOrderSelectionPolicy normal_work_orders_policy_;
FifoWorkOrderSelectionPolicy rebuild_work_orders_policy_;
DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_WORKORDERS_CONTAINER_HPP_