blob: 47f54c55fdf7587f5863143b21209671158c93ee [file] [log] [blame]
/**
* Copyright 2016, Quickstep Research Group, Computer Sciences Department,
* University of Wisconsin—Madison.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
#include <cstddef>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "utility/DAG.hpp"
#include "utility/Macros.hpp"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
namespace quickstep {
class CatalogDatabaseLite;
class ForemanMessage;
class QueryHandle;
class StorageManager;
class WorkerMessage;
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A class that manages the execution of a query including generation
* of new work orders, keeping track of the query exection state.
**/
class QueryManager {
public:
typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
/**
* @brief Return codes for processMessage() function.
*
* @note When both operator and query get executed, kQueryExecuted takes
* precedence over kOperatorExecuted.
**/
enum class QueryStatusCode {
kOperatorExecuted = 0, // An operator in the query finished execution.
kQueryExecuted, // The query got executed.
kNone // None of the above.
};
/**
* @brief Constructor.
*
* @param foreman_client_id The TMB client ID of the foreman thread.
* @param num_numa_nodes The number of NUMA nodes used by the system.
* @param query_handle The QueryHandle object for this query.
* @param catalog_database The CatalogDatabse used by the query.
* @param storage_manager The StorageManager used by the query.
* @param bus The TMB used for communication.
**/
QueryManager(const tmb::client_id foreman_client_id,
const std::size_t num_numa_nodes,
QueryHandle *query_handle,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
tmb::MessageBus *bus);
/**
* @brief Get the next workorder to be excuted, wrapped in a WorkerMessage.
*
* @param start_operator_index Begin the search for the schedulable WorkOrder
* with the operator at this index.
* @param numa_node The next WorkOrder should preferably have its input(s)
* from this numa_node. This is a hint and not a binding requirement.
*
* @return A pointer to the WorkerMessage. If there's no WorkOrder to be
* executed, return NULL.
**/
WorkerMessage *getNextWorkerMessage(
const dag_node_index start_operator_index,
const numa_node_id node_id = -1);
/**
* @brief Process a message sent to the QueryManager.
*
* @param tagged_message TaggedMessage sent to the QueryManager.
*
* @return QueryStatusCode as determined after the message is processed.
**/
QueryStatusCode processMessage(const TaggedMessage &tagged_message);
/**
* @brief Get the QueryExecutionState for this query.
**/
inline const QueryExecutionState& getQueryExecutionState() const {
return *query_exec_state_;
}
/**
* @brief Get a pointer to the QueryContext.
**/
inline QueryContext* getQueryContextMutable() {
return query_context_.get();
}
private:
/**
* @brief Process the received WorkOrder complete message.
*
* @param node_index The index of the specified operator node in the query DAG
* for the completed WorkOrder.
**/
void processWorkOrderCompleteMessage(const dag_node_index op_index);
/**
* @brief Process the received RebuildWorkOrder complete message.
*
* @param node_index The index of the specified operator node in the query DAG
* for the completed RebuildWorkOrder.
**/
void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index);
/**
* @brief Process a current relational operator: Get its workorders and store
* them in the WorkOrdersContainer for this query. If the operator can
* be marked as done, do so.
*
* @param index The index of the relational operator to be processed in the
* query plan DAG.
* @param recursively_check_dependents If an operator is done, should we
* call processOperator on its dependents recursively.
**/
void processOperator(const dag_node_index index,
const bool recursively_check_dependents);
/**
* @brief Process the received data pipeline message.
*
* @param node_index The index of the specified operator node in the query DAG
* for the pipelining block.
* @param block The block id.
* @param rel_id The ID of the relation that produced 'block'.
**/
void processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
const relation_id rel_id);
/**
* @brief Process the received work order feedback message and notify
* relational operator.
*
* @param message Feedback message from work order.
**/
void processFeedbackMessage(const WorkOrder::FeedbackMessage &message);
/**
* @brief Fetch all work orders currently available in relational operator and
* store them internally.
*
* @param index The index of the relational operator to be processed in the
* query plan DAG.
*
* @return Whether any work order was generated by op.
**/
bool fetchNormalWorkOrders(const dag_node_index index);
/**
* @brief This function does the following things:
* 1. Mark the given relational operator as "done".
* 2. For all the dependents of this operator, check if all of their
* blocking dependencies are met. If so inform them that the blocking
* dependencies are met.
* 3. Check if the given operator is done producing output. If it's
* done, inform the dependents that they won't receive input anymore
* from the given operator.
*
* @param index The index of the given relational operator in the DAG.
**/
void markOperatorFinished(const dag_node_index index);
/**
* @brief Check if all the dependencies of the node at specified index have
* finished their execution.
*
* @note This function's true return value is a pre-requisite for calling
* getRebuildWorkOrders()
*
* @param node_index The index of the specified node in the query DAG.
*
* @return True if all the dependencies have finished their execution. False
* otherwise.
**/
inline bool checkAllDependenciesMet(const dag_node_index node_index) const {
for (const dag_node_index dependency_index :
query_dag_->getDependencies(node_index)) {
// If at least one of the dependencies is not met, return false.
if (!query_exec_state_->hasExecutionFinished(dependency_index)) {
return false;
}
}
return true;
}
/**
* @brief Check if all the blocking dependencies of the node at specified
* index have finished their execution.
*
* @note A blocking dependency is the one which is pipeline breaker. Output of
* a dependency can't be streamed to its dependent if the link between
* them is pipeline breaker.
*
* @param node_index The index of the specified node in the query DAG.
*
* @return True if all the blocking dependencies have finished their
* execution. False otherwise.
**/
inline bool checkAllBlockingDependenciesMet(
const dag_node_index node_index) const {
for (const dag_node_index blocking_dependency_index :
blocking_dependencies_[node_index]) {
if (!query_exec_state_->hasExecutionFinished(
blocking_dependency_index)) {
return false;
}
}
return true;
}
/**
* @brief Check if the execution of the given operator is over.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the execution of the given operator is over, false
* otherwise.
**/
inline bool checkOperatorExecutionOver(const dag_node_index index) const {
if (checkRebuildRequired(index)) {
return (checkNormalExecutionOver(index) && checkRebuildOver(index));
} else {
return checkNormalExecutionOver(index);
}
}
/**
* @brief Check if the given operator's normal execution is over.
*
* @note The conditions for a given operator's normal execution to get over:
* 1. All of its normal (i.e. non rebuild) WorkOrders have finished
* execution.
* 2. The operator is done generating work orders.
* 3. All of the dependencies of the given operator have been met.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the normal execution of the given operator is over, false
* otherwise.
**/
inline bool checkNormalExecutionOver(const dag_node_index index) const {
return (checkAllDependenciesMet(index) &&
!workorders_container_->hasNormalWorkOrder(index) &&
query_exec_state_->getNumQueuedWorkOrders(index) == 0 &&
query_exec_state_->hasDoneGenerationWorkOrders(index));
}
/**
* @brief Check if the rebuild operation is required for a given operator.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild operation is required, false otherwise.
**/
inline bool checkRebuildRequired(const dag_node_index index) const {
return query_exec_state_->isRebuildRequired(index);
}
/**
* @brief Check if the rebuild operation for a given operator is over.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild operation is over, false otherwise.
**/
inline bool checkRebuildOver(const dag_node_index index) const {
return query_exec_state_->hasRebuildInitiated(index) &&
!workorders_container_->hasRebuildWorkOrder(index) &&
(query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
/**
* @brief Check if the rebuild operation for a given operator has been
* initiated.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild operation has been initiated, false otherwise.
**/
inline bool checkRebuildInitiated(const dag_node_index index) const {
return query_exec_state_->hasRebuildInitiated(index);
}
/**
* @brief Initiate the rebuild process for partially filled blocks generated
* during the execution of the given operator.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild is over immediately, i.e. the operator didn't
* generate any rebuild WorkOrders, false otherwise.
**/
bool initiateRebuild(const dag_node_index index);
/**
* @brief Get the rebuild WorkOrders for an operator.
*
* @note This function should be called only once, when all the normal
* WorkOrders generated by an operator finish their execution.
*
* @param index The index of the operator in the query plan DAG.
* @param container A pointer to a WorkOrdersContainer to be used to store the
* generated WorkOrders.
**/
void getRebuildWorkOrders(const dag_node_index index,
WorkOrdersContainer *container);
const tmb::client_id foreman_client_id_;
const std::size_t query_id_;
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
tmb::MessageBus *bus_;
DAG<RelationalOperator, bool> *query_dag_;
std::unique_ptr<QueryContext> query_context_;
// For all nodes, store their receiving dependents.
std::vector<std::vector<dag_node_index>> output_consumers_;
// For all nodes, store their pipeline breaking dependencies (if any).
std::vector<std::vector<dag_node_index>> blocking_dependencies_;
std::unique_ptr<QueryExecutionState> query_exec_state_;
std::unique_ptr<WorkOrdersContainer> workorders_container_;
DISALLOW_COPY_AND_ASSIGN(QueryManager);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_