blob: 366ab6167871565f17485d287aad74c603cb528b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_
#include <cstddef>
#include <memory>
#include <unordered_set>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryExecutionState.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/DAG.hpp"
#include "utility/ExecutionDAGVisualizer.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A base class that manages the execution of a query including
* generation of new work orders, and keeping track of the query
* exection state.
**/
class QueryManagerBase {
public:
typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index;
/**
* @brief Return codes for queryStatus() function.
*
* @note When both operator and query get executed, kQueryExecuted takes
* precedence over kOperatorExecuted.
**/
enum class QueryStatusCode {
kOperatorExecuted = 0, // An operator in the query finished execution.
kQueryExecuted, // The query got executed.
kNone // None of the above.
};
/**
* @brief Constructor.
*
* @param query_handle The QueryHandle object for this query.
**/
explicit QueryManagerBase(QueryHandle *query_handle);
/**
* @brief Virtual destructor.
**/
virtual ~QueryManagerBase() {}
/**
* @brief Get the query handle.
**/
const QueryHandle* query_handle() const {
return query_handle_.get();
}
/**
* @brief Get the QueryExecutionState for this query.
**/
inline const QueryExecutionState& getQueryExecutionState() const {
return *query_exec_state_;
}
/**
* @brief Process the received WorkOrder complete message.
*
* @param op_index The index of the specified operator node in the query DAG
* for the completed WorkOrder.
* @param part_id The partition id.
**/
void processWorkOrderCompleteMessage(const dag_node_index op_index,
const partition_id part_id);
/**
* @brief Process the received RebuildWorkOrder complete message.
*
* @param op_index The index of the specified operator node in the query DAG
* for the completed RebuildWorkOrder.
* @param part_id The partition id.
**/
void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
const partition_id part_id);
/**
* @brief Process the received data pipeline message.
*
* @param op_index The index of the specified operator node in the query DAG
* for the pipelining block.
* @param block The block id.
* @param rel_id The ID of the relation that produced 'block'.
* @param part_id The partition ID of 'block', if any. By default, a block
* blongs to the only partition (aka, no partition).
**/
void processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
const relation_id rel_id,
const partition_id part_id = 0);
/**
* @brief Fetch all work orders currently available in relational operator and
* store them internally.
*
* @param index The index of the relational operator to be processed in the
* query plan DAG.
*
* @return Whether any work order was generated by op.
**/
virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0;
/**
* @brief Process the received work order feedback message and notify
* relational operator.
*
* @param op_index The index of the specified operator node in the query DAG
* for the feedback message.
* @param message Feedback message from work order.
**/
void processFeedbackMessage(const dag_node_index op_index,
const WorkOrder::FeedbackMessage &message);
/**
* @brief Get the query status after processing an incoming message.
*
* @param op_index The index of the specified operator node in the query DAG
* for the incoming message.
*
* @return QueryStatusCode as determined after the message is processed.
**/
QueryStatusCode queryStatus(const dag_node_index op_index);
/**
* @brief Get the execution DAG visualizer.
*
* @return the execution DAG visualizer.
**/
ExecutionDAGVisualizer* dag_visualizer() {
return dag_visualizer_.get();
}
protected:
/**
* @brief This function does the following things:
* 1. Mark the given relational operator as "done".
* 2. For all the dependents of this operator, check if the given
* operator is done producing output. If it's done, inform the
* dependents that they won't receive input anymore from the given
* operator.
* 3. Check if all of their blocking dependencies are met. If so
* fetch normal work orders.
*
* @param index The index of the given relational operator in the DAG.
**/
void markOperatorFinished(const dag_node_index index);
/**
* @brief Check if all the blocking dependencies of the node at specified
* index have finished their execution.
*
* @note A blocking dependency is the one which is pipeline breaker. Output of
* a dependency can't be streamed to its dependent if the link between
* them is pipeline breaker.
*
* @param node_index The index of the specified node in the query DAG.
*
* @return True if all the blocking dependencies have finished their
* execution. False otherwise.
**/
inline bool checkAllBlockingDependenciesMet(
const dag_node_index node_index) const {
return blocking_dependencies_[node_index].empty();
}
/**
* @brief Check if the rebuild operation is required for a given operator.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild operation is required, false otherwise.
**/
inline bool checkRebuildRequired(const dag_node_index index) const {
return query_exec_state_->isRebuildRequired(index);
}
/**
* @brief Check if the rebuild operation for a given operator has been
* initiated.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild operation has been initiated, false otherwise.
**/
inline bool checkRebuildInitiated(const dag_node_index index) const {
return query_exec_state_->hasRebuildInitiated(index);
}
/**
* @brief Get the query's current memory consumption in bytes.
*
* @note This method returns a best guess consumption, at the time of the call.
**/
virtual std::size_t getQueryMemoryConsumptionBytes() const {
return 0;
}
std::unique_ptr<QueryHandle> query_handle_;
const std::size_t query_id_;
DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'.
const dag_node_index num_operators_in_dag_;
// For all nodes, store their receiving dependents.
std::vector<std::vector<dag_node_index>> output_consumers_;
// For all nodes, store their pipeline breaking dependencies (if any).
std::vector<std::unordered_set<dag_node_index>> blocking_dependencies_;
std::vector<dag_node_index> non_dependent_operators_;
std::unique_ptr<QueryExecutionState> query_exec_state_;
std::unique_ptr<ExecutionDAGVisualizer> dag_visualizer_;
private:
/**
* @brief Check if the given operator's normal execution is over.
*
* @note The conditions for a given operator's normal execution to get over:
* 1. All of its normal (i.e. non rebuild) WorkOrders have finished
* execution.
* 2. The operator is done generating work orders.
* 3. All of the dependencies of the given operator have been met.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the normal execution of the given operator is over, false
* otherwise.
**/
virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0;
/**
* @brief Initiate the rebuild process for partially filled blocks generated
* during the execution of the given operator.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild is over immediately, i.e. the operator didn't
* generate any rebuild WorkOrders, false otherwise.
**/
virtual bool initiateRebuild(const dag_node_index index) = 0;
/**
* @brief Check if the rebuild operation for a given operator is over.
*
* @param index The index of the given operator in the DAG.
*
* @return True if the rebuild operation is over, false otherwise.
**/
virtual bool checkRebuildOver(const dag_node_index index) const = 0;
// For all nodes, store their pipeline breaking dependents (if any).
std::vector<std::vector<dag_node_index>> blocking_dependents_;
std::vector<std::unordered_set<dag_node_index>> non_blocking_dependencies_;
DISALLOW_COPY_AND_ASSIGN(QueryManagerBase);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_