| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_ |
| #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_ |
| |
| #include <cstddef> |
| #include <memory> |
| #include <vector> |
| |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "query_execution/QueryExecutionState.hpp" |
| #include "relational_operators/RelationalOperator.hpp" |
| #include "relational_operators/WorkOrder.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "utility/DAG.hpp" |
| #include "utility/Macros.hpp" |
| |
| namespace quickstep { |
| |
| class QueryHandle; |
| |
| /** \addtogroup QueryExecution |
| * @{ |
| */ |
| |
| /** |
| * @brief A base class that manages the execution of a query including |
| * generation of new work orders, and keeping track of the query |
| * exection state. |
| **/ |
| class QueryManagerBase { |
| public: |
| typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index; |
| |
| /** |
| * @brief Return codes for queryStatus() function. |
| * |
| * @note When both operator and query get executed, kQueryExecuted takes |
| * precedence over kOperatorExecuted. |
| **/ |
| enum class QueryStatusCode { |
| kOperatorExecuted = 0, // An operator in the query finished execution. |
| kQueryExecuted, // The query got executed. |
| kNone // None of the above. |
| }; |
| |
| /** |
| * @brief Constructor. |
| * |
| * @param query_handle The QueryHandle object for this query. |
| **/ |
| explicit QueryManagerBase(QueryHandle *query_handle); |
| |
| /** |
| * @brief Virtual destructor. |
| **/ |
| virtual ~QueryManagerBase() {} |
| |
| /** |
| * @brief Get the query handle. |
| **/ |
| const QueryHandle* query_handle() const { |
| return query_handle_; |
| } |
| |
| /** |
| * @brief Get the QueryExecutionState for this query. |
| **/ |
| inline const QueryExecutionState& getQueryExecutionState() const { |
| return *query_exec_state_; |
| } |
| |
| /** |
| * @brief Process the received WorkOrder complete message. |
| * |
| * @param op_index The index of the specified operator node in the query DAG |
| * for the completed WorkOrder. |
| **/ |
| void processWorkOrderCompleteMessage(const dag_node_index op_index); |
| |
| /** |
| * @brief Process the received RebuildWorkOrder complete message. |
| * |
| * @param op_index The index of the specified operator node in the query DAG |
| * for the completed RebuildWorkOrder. |
| **/ |
| void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index); |
| |
| /** |
| * @brief Process the received data pipeline message. |
| * |
| * @param op_index The index of the specified operator node in the query DAG |
| * for the pipelining block. |
| * @param block The block id. |
| * @param rel_id The ID of the relation that produced 'block'. |
| **/ |
| void processDataPipelineMessage(const dag_node_index op_index, |
| const block_id block, |
| const relation_id rel_id); |
| |
| /** |
| * @brief Fetch all work orders currently available in relational operator and |
| * store them internally. |
| * |
| * @param index The index of the relational operator to be processed in the |
| * query plan DAG. |
| * |
| * @return Whether any work order was generated by op. |
| **/ |
| virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0; |
| |
| /** |
| * @brief Process the received work order feedback message and notify |
| * relational operator. |
| * |
| * @param op_index The index of the specified operator node in the query DAG |
| * for the feedback message. |
| * @param message Feedback message from work order. |
| **/ |
| void processFeedbackMessage(const dag_node_index op_index, |
| const WorkOrder::FeedbackMessage &message); |
| |
| /** |
| * @brief Get the query status after processing an incoming message. |
| * |
| * @param op_index The index of the specified operator node in the query DAG |
| * for the incoming message. |
| * |
| * @return QueryStatusCode as determined after the message is processed. |
| **/ |
| QueryStatusCode queryStatus(const dag_node_index op_index); |
| |
| protected: |
| /** |
| * @brief Process a current relational operator: Get its workorders and store |
| * them in the WorkOrdersContainer for this query. If the operator can |
| * be marked as done, do so. |
| * |
| * @param index The index of the relational operator to be processed in the |
| * query plan DAG. |
| * @param recursively_check_dependents If an operator is done, should we |
| * call processOperator on its dependents recursively. |
| **/ |
| void processOperator(const dag_node_index index, |
| const bool recursively_check_dependents); |
| |
| /** |
| * @brief This function does the following things: |
| * 1. Mark the given relational operator as "done". |
| * 2. For all the dependents of this operator, check if all of their |
| * blocking dependencies are met. If so inform them that the blocking |
| * dependencies are met. |
| * 3. Check if the given operator is done producing output. If it's |
| * done, inform the dependents that they won't receive input anymore |
| * from the given operator. |
| * |
| * @param index The index of the given relational operator in the DAG. |
| **/ |
| void markOperatorFinished(const dag_node_index index); |
| |
| /** |
| * @brief Check if all the dependencies of the node at specified index have |
| * finished their execution. |
| * |
| * @note This function's true return value is a pre-requisite for calling |
| * getRebuildWorkOrders() |
| * |
| * @param node_index The index of the specified node in the query DAG. |
| * |
| * @return True if all the dependencies have finished their execution. False |
| * otherwise. |
| **/ |
| inline bool checkAllDependenciesMet(const dag_node_index node_index) const { |
| for (const dag_node_index dependency_index : |
| query_dag_->getDependencies(node_index)) { |
| // If at least one of the dependencies is not met, return false. |
| if (!query_exec_state_->hasExecutionFinished(dependency_index)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * @brief Check if all the blocking dependencies of the node at specified |
| * index have finished their execution. |
| * |
| * @note A blocking dependency is the one which is pipeline breaker. Output of |
| * a dependency can't be streamed to its dependent if the link between |
| * them is pipeline breaker. |
| * |
| * @param node_index The index of the specified node in the query DAG. |
| * |
| * @return True if all the blocking dependencies have finished their |
| * execution. False otherwise. |
| **/ |
| inline bool checkAllBlockingDependenciesMet( |
| const dag_node_index node_index) const { |
| for (const dag_node_index blocking_dependency_index : |
| blocking_dependencies_[node_index]) { |
| if (!query_exec_state_->hasExecutionFinished( |
| blocking_dependency_index)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * @brief Check if the execution of the given operator is over. |
| * |
| * @param index The index of the given operator in the DAG. |
| * |
| * @return True if the execution of the given operator is over, false |
| * otherwise. |
| **/ |
| inline bool checkOperatorExecutionOver(const dag_node_index index) const { |
| return this->checkNormalExecutionOver(index) && |
| (!checkRebuildRequired(index) || this->checkRebuildOver(index)); |
| } |
| |
| /** |
| * @brief Check if the rebuild operation is required for a given operator. |
| * |
| * @param index The index of the given operator in the DAG. |
| * |
| * @return True if the rebuild operation is required, false otherwise. |
| **/ |
| inline bool checkRebuildRequired(const dag_node_index index) const { |
| return query_exec_state_->isRebuildRequired(index); |
| } |
| |
| /** |
| * @brief Check if the rebuild operation for a given operator has been |
| * initiated. |
| * |
| * @param index The index of the given operator in the DAG. |
| * |
| * @return True if the rebuild operation has been initiated, false otherwise. |
| **/ |
| inline bool checkRebuildInitiated(const dag_node_index index) const { |
| return query_exec_state_->hasRebuildInitiated(index); |
| } |
| |
| const QueryHandle *query_handle_; |
| |
| const std::size_t query_id_; |
| |
| DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'. |
| const dag_node_index num_operators_in_dag_; |
| |
| // For all nodes, store their receiving dependents. |
| std::vector<std::vector<dag_node_index>> output_consumers_; |
| |
| // For all nodes, store their pipeline breaking dependencies (if any). |
| std::vector<std::vector<dag_node_index>> blocking_dependencies_; |
| |
| std::unique_ptr<QueryExecutionState> query_exec_state_; |
| |
| private: |
| /** |
| * @brief Check if the given operator's normal execution is over. |
| * |
| * @note The conditions for a given operator's normal execution to get over: |
| * 1. All of its normal (i.e. non rebuild) WorkOrders have finished |
| * execution. |
| * 2. The operator is done generating work orders. |
| * 3. All of the dependencies of the given operator have been met. |
| * |
| * @param index The index of the given operator in the DAG. |
| * |
| * @return True if the normal execution of the given operator is over, false |
| * otherwise. |
| **/ |
| virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0; |
| |
| /** |
| * @brief Initiate the rebuild process for partially filled blocks generated |
| * during the execution of the given operator. |
| * |
| * @param index The index of the given operator in the DAG. |
| * |
| * @return True if the rebuild is over immediately, i.e. the operator didn't |
| * generate any rebuild WorkOrders, false otherwise. |
| **/ |
| virtual bool initiateRebuild(const dag_node_index index) = 0; |
| |
| /** |
| * @brief Check if the rebuild operation for a given operator is over. |
| * |
| * @param index The index of the given operator in the DAG. |
| * |
| * @return True if the rebuild operation is over, false otherwise. |
| **/ |
| virtual bool checkRebuildOver(const dag_node_index index) const = 0; |
| |
| DISALLOW_COPY_AND_ASSIGN(QueryManagerBase); |
| }; |
| |
| /** @} */ |
| |
| } // namespace quickstep |
| |
| #endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_ |