blob: 2b2581a904a4cde53a3a14c957307b9b6073c6d2 [file] [log] [blame]
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#include "query_execution/Foreman.hpp"
#include <cstddef>
#include <memory>
#include <utility>
#include <vector>
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionScheme.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkerDirectory.hpp"
#include "query_execution/WorkerMessage.hpp"
#include "relational_operators/RebuildWorkOrder.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
#include "tmb/message_bus.h"
#include "tmb/tagged_message.h"
using std::move;
using std::pair;
using std::size_t;
using std::vector;
namespace quickstep {
void Foreman::initialize() {
if (cpu_id_ >= 0) {
// We can pin the foreman thread to a CPU if specified.
ThreadUtil::BindToCPU(cpu_id_);
}
initializeState();
DEBUG_ASSERT(query_dag_ != nullptr);
const dag_node_index dag_size = query_dag_->size();
// Collect all the workorders from all the relational operators in the DAG.
for (dag_node_index index = 0; index < dag_size; ++index) {
if (checkAllBlockingDependenciesMet(index)) {
query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
processOperator(index, false);
}
}
// Dispatch the WorkOrders generated so far.
dispatchWorkerMessages(0, 0);
}
void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index,
const size_t worker_thread_index) {
query_exec_state_->decrementNumQueuedWorkOrders(op_index);
// As the given worker finished executing a WorkOrder, decrement its number
// of queued WorkOrders.
workers_->decrementNumQueuedWorkOrders(worker_thread_index);
// Check if new work orders are available and fetch them if so.
fetchNormalWorkOrders(op_index);
if (checkRebuildRequired(op_index)) {
if (checkNormalExecutionOver(op_index)) {
if (!checkRebuildInitiated(op_index)) {
if (initiateRebuild(op_index)) {
// Rebuild initiated and completed right away.
markOperatorFinished(op_index);
} else {
// Rebuild under progress.
}
} else if (checkRebuildOver(op_index)) {
// Rebuild was under progress and now it is over.
markOperatorFinished(op_index);
}
} else {
// Normal execution under progress for this operator.
}
} else if (checkOperatorExecutionOver(op_index)) {
// Rebuild not required for this operator and its normal execution is
// complete.
markOperatorFinished(op_index);
}
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(op_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
// Process the dependent operator (of the operator whose WorkOrder
// was just executed) for which all the dependencies have been met.
processOperator(dependent_op_index, true);
}
}
// Dispatch the WorkerMessages to the workers. We prefer to start the search
// for the schedulable WorkOrders beginning from 'op_index'. The first
// candidate worker to receive the next WorkOrder is the one that sent the
// response message to Foreman.
dispatchWorkerMessages(worker_thread_index, op_index);
}
void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index,
const size_t worker_thread_index) {
query_exec_state_->decrementNumRebuildWorkOrders(op_index);
workers_->decrementNumQueuedWorkOrders(worker_thread_index);
if (checkRebuildOver(op_index)) {
markOperatorFinished(op_index);
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(op_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
processOperator(dependent_op_index, true);
}
}
}
// Dispatch the WorkerMessages to the workers. We prefer to start the search
// for the schedulable WorkOrders beginning from 'op_index'. The first
// candidate worker to receive the next WorkOrder is the one that sent the
// response message to Foreman.
dispatchWorkerMessages(worker_thread_index, op_index);
}
void Foreman::processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
const relation_id rel_id) {
for (const dag_node_index consumer_index :
output_consumers_[op_index]) {
// Feed the streamed block to the consumer. Note that 'output_consumers_'
// only contain those dependents of operator with index = op_index which are
// eligible to receive streamed input.
query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
// Because of the streamed input just fed, check if there are any new
// WorkOrders available and if so, fetch them.
fetchNormalWorkOrders(consumer_index);
}
// Dispatch the WorkerMessages to the workers. We prefer to start the search
// for the schedulable WorkOrders beginning from 'op_index'. The first
// candidate worker to receive the next WorkOrder is the one that sent the
// response message to Foreman.
// TODO(zuyu): Improve the data locality for the next WorkOrder.
dispatchWorkerMessages(0, op_index);
}
void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
RelationalOperator *op =
query_dag_->getNodePayloadMutable(msg.header().rel_op_index);
op->receiveFeedbackMessage(msg);
}
void Foreman::run() {
// Initialize before for Foreman eventloop.
initialize();
// Event loop
while (!query_exec_state_->hasQueryExecutionFinished()) {
// Receive() causes this thread to sleep until next message is received.
AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_msg.tagged_message;
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
serialization::WorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
break;
}
case kRebuildWorkOrderCompleteMessage: {
serialization::WorkOrderCompletionMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_thread_index());
break;
}
case kCatalogRelationNewBlockMessage: {
serialization::CatalogRelationNewBlockMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
const block_id block = proto.block_id();
CatalogRelation *relation =
static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id());
relation->addBlock(block);
if (proto.has_partition_id()) {
relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block);
}
break;
}
case kDataPipelineMessage: {
// Possible message senders include InsertDestinations and some
// operators which modify existing blocks.
serialization::DataPipelineMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processDataPipelineMessage(proto.operator_index(), proto.block_id(), proto.relation_id());
break;
}
case kWorkOrdersAvailableMessage: {
serialization::WorkOrdersAvailableMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
const dag_node_index op_index = proto.operator_index();
// Check if new work orders are available.
fetchNormalWorkOrders(op_index);
// Dispatch the WorkerMessages to the workers. We prefer to start the search
// for the schedulable WorkOrders beginning from 'op_index'. The first
// candidate worker to receive the next WorkOrder is the one that sent the
// response message to Foreman.
// TODO(zuyu): Improve the data locality for the next WorkOrder.
dispatchWorkerMessages(0, op_index);
break;
}
case kWorkOrderFeedbackMessage: {
WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()),
tagged_message.message_bytes());
processFeedbackMessage(msg);
break;
}
default:
LOG(FATAL) << "Unknown message type to Foreman";
}
}
// Clean up before exiting.
cleanUp();
}
void Foreman::dispatchWorkerMessages(
const size_t start_worker_index,
const dag_node_index start_operator_index) {
// Loop over all workers. Stopping criteria:
// 1. Every worker has been assigned exactly max_msgs_per_worker_ workorders.
// OR 2. No schedulable workorders at this time.
size_t done_workers_count = 0;
for (size_t curr_worker = start_worker_index;
done_workers_count < workers_->getNumWorkers();
curr_worker = (curr_worker + 1) % workers_->getNumWorkers()) {
if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
std::unique_ptr<WorkerMessage> msg;
msg.reset(getNextWorkerMessage(
start_operator_index, workers_->getNUMANode(curr_worker)));
if (msg.get() != nullptr) {
sendWorkerMessage(curr_worker, *msg);
workers_->incrementNumQueuedWorkOrders(curr_worker);
} else {
// No schedulable workorder at this point.
++done_workers_count;
}
} else {
// curr_worker already has been assigned max_msgs_per_worker workorders.
++done_workers_count;
}
}
}
void Foreman::initializeState() {
const dag_node_index dag_size = query_dag_->size();
output_consumers_.resize(dag_size);
blocking_dependencies_.resize(dag_size);
query_exec_state_.reset(new QueryExecutionState(dag_size));
workorders_container_.reset(new WorkOrdersContainer(dag_size, num_numa_nodes_));
for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) {
const QueryContext::insert_destination_id insert_destination_index =
query_dag_->getNodePayload(node_index).getInsertDestinationID();
if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
// Rebuild is necessary whenever InsertDestination is present.
query_exec_state_->setRebuildRequired(node_index);
query_exec_state_->setRebuildStatus(node_index, 0, false);
}
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(node_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
// The link is not a pipeline-breaker. Streaming of blocks is possible
// between these two operators.
output_consumers_[node_index].push_back(dependent_op_index);
} else {
// The link is a pipeline-breaker. Streaming of blocks is not possible
// between these two operators.
blocking_dependencies_[dependent_op_index].push_back(node_index);
}
}
}
}
// TODO(harshad) : The default policy may execute remote WorkOrders for an
// operator with a lower index even when there are local WorkOrders available for
// an operator with higher index. We should examine if avoiding this behavior
// has any benefits with respect to execution time and/or memory pressure.
WorkerMessage* Foreman::getNextWorkerMessage(
const dag_node_index start_operator_index, const int numa_node) {
// Default policy: Operator with lowest index first.
WorkOrder *work_order = nullptr;
size_t num_operators_checked = 0;
for (dag_node_index index = start_operator_index;
num_operators_checked < query_dag_->size();
index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
if (query_exec_state_->hasExecutionFinished(index)) {
continue;
}
if (numa_node != -1) {
// First try to get a normal WorkOrder from the specified NUMA node.
work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
if (work_order != nullptr) {
// A WorkOrder found on the given NUMA node.
query_exec_state_->incrementNumQueuedWorkOrders(index);
return WorkerMessage::WorkOrderMessage(work_order, index);
} else {
// Normal workorder not found on this node. Look for a rebuild workorder
// on this NUMA node.
work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
if (work_order != nullptr) {
return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
}
}
}
// Either no workorder found on the given NUMA node, or numa_node is -1.
// Try to get a normal WorkOrder from other NUMA nodes.
work_order = workorders_container_->getNormalWorkOrder(index);
if (work_order != nullptr) {
query_exec_state_->incrementNumQueuedWorkOrders(index);
return WorkerMessage::WorkOrderMessage(work_order, index);
} else {
// Normal WorkOrder not found, look for a RebuildWorkOrder.
work_order = workorders_container_->getRebuildWorkOrder(index);
if (work_order != nullptr) {
return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
}
}
}
// No WorkOrders available right now.
return nullptr;
}
void Foreman::sendWorkerMessage(const std::size_t worker_thread_index,
const WorkerMessage &message) {
message_type_id type;
if (message.getType() == WorkerMessage::kRebuildWorkOrder) {
type = kRebuildWorkOrderMessage;
} else if (message.getType() == WorkerMessage::kWorkOrder) {
type = kWorkOrderMessage;
} else {
FATAL_ERROR("Invalid WorkerMessageType");
}
TaggedMessage worker_tagged_message(&message, sizeof(message), type);
const tmb::MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
workers_->getClientID(worker_thread_index),
move(worker_tagged_message));
CHECK(send_status == tmb::MessageBus::SendStatus::kOK) <<
"Message could not be sent from Foreman with TMB client ID "
<< foreman_client_id_ << " to Foreman with TMB client ID "
<< workers_->getClientID(worker_thread_index);
}
bool Foreman::fetchNormalWorkOrders(const dag_node_index index) {
bool generated_new_workorders = false;
if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) {
// Do not fetch any work units until all blocking dependencies are met.
// The releational operator is not aware of blocking dependencies for
// uncorrelated scalar queries.
if (!checkAllBlockingDependenciesMet(index)) {
return false;
}
const size_t num_pending_workorders_before =
workorders_container_->getNumNormalWorkOrders(index);
const bool done_generation =
query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(),
query_context_.get(),
storage_manager_,
foreman_client_id_,
bus_);
if (done_generation) {
query_exec_state_->setDoneGenerationWorkOrders(index);
}
// TODO(shoban): It would be a good check to see if operator is making
// useful progress, i.e., the operator either generates work orders to
// execute or still has pending work orders executing. However, this will not
// work if Foreman polls operators without feeding data. This check can be
// enabled, if Foreman is refactored to call getAllWorkOrders() only when
// pending work orders are completed or new input blocks feed.
generated_new_workorders =
(num_pending_workorders_before <
workorders_container_->getNumNormalWorkOrders(index));
}
return generated_new_workorders;
}
void Foreman::processOperator(const dag_node_index index,
const bool recursively_check_dependents) {
if (fetchNormalWorkOrders(index)) {
// Fetched work orders. Return to wait for the generated work orders to
// execute, and skip the execution-finished checks.
return;
}
if (checkNormalExecutionOver(index)) {
if (checkRebuildRequired(index)) {
if (!checkRebuildInitiated(index)) {
// Rebuild hasn't started, initiate it.
if (initiateRebuild(index)) {
// Rebuild initiated and completed right away.
markOperatorFinished(index);
} else {
// Rebuild WorkOrders have been generated.
return;
}
} else if (checkRebuildOver(index)) {
// Rebuild had been initiated and it is over.
markOperatorFinished(index);
}
} else {
// Rebuild is not required and normal execution over, mark finished.
markOperatorFinished(index);
}
// If we reach here, that means the operator has been marked as finished.
if (recursively_check_dependents) {
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(index)) {
const dag_node_index dependent_op_index = dependent_link.first;
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
processOperator(dependent_op_index, true);
}
}
}
}
}
void Foreman::markOperatorFinished(const dag_node_index index) {
query_exec_state_->setExecutionFinished(index);
RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
op->updateCatalogOnCompletion();
const relation_id output_rel = op->getOutputRelationID();
for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
const dag_node_index dependent_op_index = dependent_link.first;
RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
// Signal dependent operator that current operator is done feeding input blocks.
if (output_rel >= 0) {
dependent_op->doneFeedingInputBlocks(output_rel);
}
if (checkAllBlockingDependenciesMet(dependent_op_index)) {
dependent_op->informAllBlockingDependenciesMet();
}
}
}
bool Foreman::initiateRebuild(const dag_node_index index) {
DEBUG_ASSERT(!workorders_container_->hasRebuildWorkOrder(index));
DEBUG_ASSERT(checkRebuildRequired(index));
DEBUG_ASSERT(!checkRebuildInitiated(index));
getRebuildWorkOrders(index, workorders_container_.get());
query_exec_state_->setRebuildStatus(
index, workorders_container_->getNumRebuildWorkOrders(index), true);
return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
void Foreman::getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container) {
const RelationalOperator &op = query_dag_->getNodePayload(index);
const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
return;
}
vector<MutableBlockReference> partially_filled_block_refs;
DCHECK(query_context_ != nullptr);
InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
DCHECK(insert_destination != nullptr);
insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
for (vector<MutableBlockReference>::size_type i = 0;
i < partially_filled_block_refs.size();
++i) {
container->addRebuildWorkOrder(
new RebuildWorkOrder(move(partially_filled_block_refs[i]),
index,
op.getOutputRelationID(),
foreman_client_id_,
bus_),
index);
}
}
} // namespace quickstep