blob: 374c96df9645f8c193f618312a2a66dc66fa3f1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_execution/QueryManagerBase.hpp"
#include <memory>
#include <utility>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
using std::pair;
namespace quickstep {
DEFINE_bool(visualize_execution_dag, false,
"If true, visualize the execution plan DAG into a graph in DOT "
"format (DOT is a plain text graph description language) which is "
"then printed via stderr.");
QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
: query_handle_(DCHECK_NOTNULL(query_handle)),
query_id_(query_handle->query_id()),
query_dag_(DCHECK_NOTNULL(
DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
num_operators_in_dag_(query_dag_->size()),
output_consumers_(num_operators_in_dag_),
blocking_dependencies_(num_operators_in_dag_),
query_exec_state_(new QueryExecutionState(num_operators_in_dag_)),
blocking_dependents_(num_operators_in_dag_),
non_blocking_dependencies_(num_operators_in_dag_) {
if (FLAGS_visualize_execution_dag) {
dag_visualizer_ =
std::make_unique<quickstep::ExecutionDAGVisualizer>(query_handle_->getQueryPlan());
}
for (dag_node_index node_index = 0;
node_index < num_operators_in_dag_;
++node_index) {
const QueryContext::insert_destination_id insert_destination_index =
query_dag_->getNodePayload(node_index).getInsertDestinationID();
if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
// Rebuild is necessary whenever InsertDestination is present.
query_exec_state_->setRebuildRequired(node_index);
}
if (query_dag_->getDependencies(node_index).empty()) {
non_dependent_operators_.push_back(node_index);
}
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(node_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
// The link is a pipeline-breaker. Streaming of blocks is not possible
// between these two operators.
blocking_dependencies_[dependent_op_index].insert(node_index);
blocking_dependents_[node_index].push_back(dependent_op_index);
} else {
// The link is not a pipeline-breaker. Streaming of blocks is possible
// between these two operators.
non_blocking_dependencies_[dependent_op_index].insert(node_index);
output_consumers_[node_index].push_back(dependent_op_index);
}
}
}
}
QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus(
const dag_node_index op_index) {
// As kQueryExecuted takes precedence over kOperatorExecuted, we first check
// whether the query has finished its execution.
if (query_exec_state_->hasQueryExecutionFinished()) {
return QueryStatusCode::kQueryExecuted;
}
if (query_exec_state_->hasExecutionFinished(op_index)) {
return QueryStatusCode::kOperatorExecuted;
}
return QueryStatusCode::kNone;
}
void QueryManagerBase::processFeedbackMessage(
const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) {
RelationalOperator *op =
query_dag_->getNodePayloadMutable(op_index);
op->receiveFeedbackMessage(msg);
if (query_exec_state_->hasDoneGenerationWorkOrders(op_index)) {
return;
}
fetchNormalWorkOrders(op_index);
}
void QueryManagerBase::processWorkOrderCompleteMessage(
const dag_node_index op_index,
const partition_id part_id) {
query_exec_state_->decrementNumQueuedWorkOrders(op_index);
if (!checkNormalExecutionOver(op_index)) {
// Normal execution under progress for this operator.
return;
}
if (checkRebuildRequired(op_index)) {
DCHECK(!checkRebuildInitiated(op_index));
if (!initiateRebuild(op_index)) {
// Rebuild under progress.
return;
}
// Rebuild initiated and completed right away.
}
markOperatorFinished(op_index);
}
void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
const partition_id part_id) {
query_exec_state_->decrementNumRebuildWorkOrders(op_index);
if (!checkRebuildOver(op_index)) {
return;
}
markOperatorFinished(op_index);
}
void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
const relation_id rel_id,
const partition_id part_id) {
for (const dag_node_index consumer_index :
output_consumers_[op_index]) {
// Feed the streamed block to the consumer. Note that 'output_consumers_'
// only contain those dependents of operator with index = op_index which are
// eligible to receive streamed input.
query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id);
// Because of the streamed input just fed, check if there are any new
// WorkOrders available and if so, fetch them.
if (checkAllBlockingDependenciesMet(consumer_index)) {
fetchNormalWorkOrders(consumer_index);
}
}
}
void QueryManagerBase::markOperatorFinished(const dag_node_index index) {
query_exec_state_->setExecutionFinished(index);
for (const dag_node_index dependent_op_index : blocking_dependents_[index]) {
blocking_dependencies_[dependent_op_index].erase(index);
}
for (const dag_node_index dependent_op_index : output_consumers_[index]) {
non_blocking_dependencies_[dependent_op_index].erase(index);
}
RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
op->updateCatalogOnCompletion();
const relation_id output_rel = op->getOutputRelationID();
for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (output_rel >= 0) {
// Signal dependent operator that current operator is done feeding input blocks.
query_dag_->getNodePayloadMutable(dependent_op_index)->doneFeedingInputBlocks(output_rel);
}
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
// Process the dependent operator (of the operator whose WorkOrder
// was just executed) for which all the dependencies have been met.
if (!fetchNormalWorkOrders(dependent_op_index) &&
non_blocking_dependencies_[dependent_op_index].empty() &&
checkNormalExecutionOver(dependent_op_index) &&
(!checkRebuildRequired(dependent_op_index) || initiateRebuild(dependent_op_index))) {
markOperatorFinished(dependent_op_index);
}
}
}
}
} // namespace quickstep