blob: deff2064c7dcef1920e4b35e2c971a062cffdb5a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_execution/QueryManagerSingleNode.hpp"
#include <algorithm>
#include <cstddef>
#include <memory>
#include <utility>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/WorkerMessage.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/RebuildWorkOrder.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageManager.hpp"
#include "utility/DAG.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
namespace quickstep {
class WorkOrder;
QueryManagerSingleNode::QueryManagerSingleNode(
const tmb::client_id foreman_client_id,
const std::size_t num_numa_nodes,
QueryHandle *query_handle,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
tmb::MessageBus *bus)
: QueryManagerBase(query_handle),
foreman_client_id_(foreman_client_id),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
bus_(DCHECK_NOTNULL(bus)),
query_context_(new QueryContext(query_handle->getQueryContextProto(),
*catalog_database,
storage_manager_,
foreman_client_id_,
bus_)),
workorders_container_(
new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)),
estimated_memory_consumption_in_bytes_(0) {
updateActiveOperators();
}
WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
const dag_node_index start_operator_index, const numa_node_id numa_node) {
WorkerMessage *worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
if (worker_message != nullptr) {
return worker_message;
}
// We didn't find a work order. Try activating new operators.
updateActiveOperators();
worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
if (worker_message == nullptr) {
std::cout << "No workorder even after activating new operators\n";
}
return worker_message;
}
WorkerMessage* QueryManagerSingleNode::getNextWorkerMessageFromActiveOperators(
const numa_node_id numa_node) {
WorkOrder *work_order = nullptr;
for (dag_node_index index : active_operators_) {
if (checkAllBlockingDependenciesMet(index)) {
if (numa_node != kAnyNUMANodeID) {
// First try to get a normal WorkOrder from the specified NUMA node.
work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
if (work_order != nullptr) {
// A WorkOrder found on the given NUMA node.
query_exec_state_->incrementNumQueuedWorkOrders(index);
return WorkerMessage::WorkOrderMessage(work_order, index);
} else {
// Normal workorder not found on this node. Look for a rebuild workorder
// on this NUMA node.
work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
if (work_order != nullptr) {
return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
}
}
}
// Either no workorder found on the given NUMA node, or numa_node is
// 'kAnyNUMANodeID'.
// Try to get a normal WorkOrder from other NUMA nodes.
work_order = workorders_container_->getNormalWorkOrder(index);
if (work_order != nullptr) {
query_exec_state_->incrementNumQueuedWorkOrders(index);
return WorkerMessage::WorkOrderMessage(work_order, index);
} else {
// Normal WorkOrder not found, look for a RebuildWorkOrder.
work_order = workorders_container_->getRebuildWorkOrder(index);
if (work_order != nullptr) {
return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
}
}
}
}
return nullptr;
}
bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
bool generated_new_workorders = false;
if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
// Do not fetch any work units until all blocking dependencies are met.
// The releational operator is not aware of blocking dependencies for
// uncorrelated scalar queries.
if (!checkAllBlockingDependenciesMet(index)) {
return false;
}
if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end()) {
// This operator is not yet active.
std::cout << "Operator " << index << " not active yet\n";
return false;
}
const size_t num_pending_workorders_before =
workorders_container_->getNumNormalWorkOrders(index);
const bool done_generation =
query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
query_context_.get(),
storage_manager_,
foreman_client_id_,
bus_);
if (done_generation) {
query_exec_state_->setDoneGenerationWorkOrders(index);
}
// TODO(shoban): It would be a good check to see if operator is making
// useful progress, i.e., the operator either generates work orders to
// execute or still has pending work orders executing. However, this will not
// work if Foreman polls operators without feeding data. This check can be
// enabled, if Foreman is refactored to call getAllWorkOrders() only when
// pending work orders are completed or new input blocks feed.
generated_new_workorders =
(num_pending_workorders_before <
workorders_container_->getNumNormalWorkOrders(index));
}
return generated_new_workorders;
}
bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) {
DCHECK(!workorders_container_->hasRebuildWorkOrder(index));
DCHECK(checkRebuildRequired(index));
DCHECK(!checkRebuildInitiated(index));
getRebuildWorkOrders(index, workorders_container_.get());
query_exec_state_->setRebuildStatus(
index, workorders_container_->getNumRebuildWorkOrders(index), true);
return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
WorkOrdersContainer *container) {
const RelationalOperator &op = query_dag_->getNodePayload(index);
const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
return;
}
std::vector<MutableBlockReference> partially_filled_block_refs;
DCHECK(query_context_ != nullptr);
InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
DCHECK(insert_destination != nullptr);
insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
for (std::vector<MutableBlockReference>::size_type i = 0;
i < partially_filled_block_refs.size();
++i) {
container->addRebuildWorkOrder(
new RebuildWorkOrder(query_id_,
std::move(partially_filled_block_refs[i]),
index,
op.getOutputRelationID(),
foreman_client_id_,
bus_),
index);
}
}
void QueryManagerSingleNode::activateOperator(const dag_node_index index) {
// DCHECK(checkAllBlockingDependenciesMet(index));
// It is okay to call the line below multiple times.
query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
query_context_->activateOperator(
query_handle()->getQueryContextProto(), index, storage_manager_);
processOperator(index, false);
}
void QueryManagerSingleNode::updateActiveOperators() {
// Mark the active operators in the DAG.
for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
if (!query_exec_state_->hasExecutionFinished(index)) {
if (!isOperatorActive(index)) {
// This is an inactive operator.
const std::size_t estimated_memory_for_operator = query_context_->getHashTableSize(index);
// NOTE(harshad) - We might get in trouble in the following situation:
// If there's a build operator in the leaf of the query plan DAG, that
// has huge memory requirements (greater than the threshold). That
// operator won't be activated thereby leading the query execution in a
// deadlock.
if (canActivateOperator(estimated_memory_for_operator)) {
estimated_memory_consumption_in_bytes_ += estimated_memory_for_operator;
active_operators_.push_back(index);
activateOperator(index);
if (estimated_memory_for_operator > 0) {
std::cout << "Activated operator: " << index << " Total active: " << active_operators_.size() << "\n";
}
} else {
std::cout << "operator: " << index << " not activated Total active: " << active_operators_.size() << "\n";
}
}
}
}
}
bool QueryManagerSingleNode::isOperatorActive(const dag_node_index index) const {
auto iter = std::find(active_operators_.begin(), active_operators_.end(), index);
return iter != active_operators_.end();
}
bool QueryManagerSingleNode::canActivateOperator(const std::size_t estimated_memory_for_operator) const {
const std::size_t total_memory_consumption = estimated_memory_for_operator + estimated_memory_consumption_in_bytes_;
const std::size_t estimated_num_slots = StorageManager::SlotsNeededForBytes(total_memory_consumption);
const std::size_t threshold_num_slots = kAdmissionNumSlotsThreshold * storage_manager_->getMaxBufferPoolSlots();
if (estimated_memory_for_operator > 0) {
std::cout << "Memory estimate: " << estimated_memory_for_operator;
std::cout << " estimated slots: " << estimated_num_slots << " threshold: " << threshold_num_slots << "\n";
}
return estimated_num_slots < threshold_num_slots;
}
void QueryManagerSingleNode::markOperatorFinished(const dag_node_index index) {
query_exec_state_->setExecutionFinished(index);
RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
op->updateCatalogOnCompletion();
const relation_id output_rel = op->getOutputRelationID();
for (const std::pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
const dag_node_index dependent_op_index = dependent_link.first;
RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
// Signal dependent operator that current operator is done feeding input blocks.
if (output_rel >= 0) {
dependent_op->doneFeedingInputBlocks(output_rel);
}
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
dependent_op->informAllBlockingDependenciesMet();
}
}
// Reduce the estimate for this operator.
const std::size_t estimated_memory_for_operator = query_context_->getHashTableSize(index);
estimated_memory_consumption_in_bytes_ -= estimated_memory_for_operator;
std::cout << "Operator " << index << " over, decremented: " << estimated_memory_for_operator << " bytes Total active: " << active_operators_.size();
std::cout << " Current slots: " << StorageManager::SlotsNeededForBytes(estimated_memory_consumption_in_bytes_) << "\n";
// Remove this operator from the active operators list.
auto iter =
std::find(active_operators_.begin(), active_operators_.end(), index);
if (iter != active_operators_.end()) {
active_operators_.erase(iter);
} else {
DLOG(WARNING) << "Marking operator " << index << " as finished which wasn't active";
}
}
} // namespace quickstep