| /** |
| * Copyright 2011-2015 Quickstep Technologies LLC. |
| * Copyright 2015-2016 Pivotal Software, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| **/ |
| |
| #include "query_execution/Foreman.hpp" |
| |
| #include <cstddef> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogDatabase.hpp" |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "catalog/PartitionScheme.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/QueryExecutionMessages.pb.h" |
| #include "query_execution/QueryExecutionTypedefs.hpp" |
| #include "query_execution/QueryExecutionUtil.hpp" |
| #include "query_execution/WorkerDirectory.hpp" |
| #include "query_execution/WorkerMessage.hpp" |
| #include "relational_operators/RebuildWorkOrder.hpp" |
| #include "relational_operators/RelationalOperator.hpp" |
| #include "relational_operators/WorkOrder.hpp" |
| #include "storage/InsertDestination.hpp" |
| #include "storage/StorageBlock.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "threading/ThreadUtil.hpp" |
| #include "utility/Macros.hpp" |
| |
| #include "glog/logging.h" |
| |
| #include "tmb/message_bus.h" |
| #include "tmb/tagged_message.h" |
| |
| using std::move; |
| using std::pair; |
| using std::size_t; |
| using std::vector; |
| |
| namespace quickstep { |
| |
| void Foreman::initialize() { |
| if (cpu_id_ >= 0) { |
| // We can pin the foreman thread to a CPU if specified. |
| ThreadUtil::BindToCPU(cpu_id_); |
| } |
| initializeState(); |
| |
| DEBUG_ASSERT(query_dag_ != nullptr); |
| const dag_node_index dag_size = query_dag_->size(); |
| |
| // Collect all the workorders from all the relational operators in the DAG. |
| for (dag_node_index index = 0; index < dag_size; ++index) { |
| if (checkAllBlockingDependenciesMet(index)) { |
| query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet(); |
| processOperator(index, false); |
| } |
| } |
| |
| // Dispatch the WorkOrders generated so far. |
| dispatchWorkerMessages(0, 0); |
| } |
| |
| void Foreman::processWorkOrderCompleteMessage(const dag_node_index op_index, |
| const size_t worker_id) { |
| query_exec_state_->decrementNumQueuedWorkOrders(op_index); |
| |
| // As the given worker finished executing a WorkOrder, decrement its number |
| // of queued WorkOrders. |
| workers_->decrementNumQueuedWorkOrders(worker_id); |
| |
| // Check if new work orders are available and fetch them if so. |
| fetchNormalWorkOrders(op_index); |
| |
| if (checkRebuildRequired(op_index)) { |
| if (checkNormalExecutionOver(op_index)) { |
| if (!checkRebuildInitiated(op_index)) { |
| if (initiateRebuild(op_index)) { |
| // Rebuild initiated and completed right away. |
| markOperatorFinished(op_index); |
| } else { |
| // Rebuild under progress. |
| } |
| } else if (checkRebuildOver(op_index)) { |
| // Rebuild was under progress and now it is over. |
| markOperatorFinished(op_index); |
| } |
| } else { |
| // Normal execution under progress for this operator. |
| } |
| } else if (checkOperatorExecutionOver(op_index)) { |
| // Rebuild not required for this operator and its normal execution is |
| // complete. |
| markOperatorFinished(op_index); |
| } |
| |
| for (const pair<dag_node_index, bool> &dependent_link : |
| query_dag_->getDependents(op_index)) { |
| const dag_node_index dependent_op_index = dependent_link.first; |
| if (checkAllBlockingDependenciesMet(dependent_op_index)) { |
| // Process the dependent operator (of the operator whose WorkOrder |
| // was just executed) for which all the dependencies have been met. |
| processOperator(dependent_op_index, true); |
| } |
| } |
| |
| // Dispatch the WorkerMessages to the workers. We prefer to start the search |
| // for the schedulable WorkOrders beginning from 'op_index'. The first |
| // candidate worker to receive the next WorkOrder is the one that sent the |
| // response message to Foreman. |
| dispatchWorkerMessages(worker_id, op_index); |
| } |
| |
| void Foreman::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index, |
| const size_t worker_id) { |
| query_exec_state_->decrementNumRebuildWorkOrders(op_index); |
| workers_->decrementNumQueuedWorkOrders(worker_id); |
| |
| if (checkRebuildOver(op_index)) { |
| markOperatorFinished(op_index); |
| |
| for (const pair<dag_node_index, bool> &dependent_link : |
| query_dag_->getDependents(op_index)) { |
| const dag_node_index dependent_op_index = dependent_link.first; |
| if (checkAllBlockingDependenciesMet(dependent_op_index)) { |
| processOperator(dependent_op_index, true); |
| } |
| } |
| } |
| |
| // Dispatch the WorkerMessages to the workers. We prefer to start the search |
| // for the schedulable WorkOrders beginning from 'op_index'. The first |
| // candidate worker to receive the next WorkOrder is the one that sent the |
| // response message to Foreman. |
| dispatchWorkerMessages(worker_id, op_index); |
| } |
| |
| void Foreman::processDataPipelineMessage(const dag_node_index op_index, |
| const block_id block, |
| const relation_id rel_id) { |
| for (const dag_node_index consumer_index : |
| output_consumers_[op_index]) { |
| // Feed the streamed block to the consumer. Note that 'output_consumers_' |
| // only contain those dependents of operator with index = op_index which are |
| // eligible to receive streamed input. |
| query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id); |
| // Because of the streamed input just fed, check if there are any new |
| // WorkOrders available and if so, fetch them. |
| fetchNormalWorkOrders(consumer_index); |
| } |
| |
| // Dispatch the WorkerMessages to the workers. We prefer to start the search |
| // for the schedulable WorkOrders beginning from 'op_index'. The first |
| // candidate worker to receive the next WorkOrder is the one that sent the |
| // response message to Foreman. |
| // TODO(zuyu): Improve the data locality for the next WorkOrder. |
| dispatchWorkerMessages(0, op_index); |
| } |
| |
| void Foreman::processFeedbackMessage(const WorkOrder::FeedbackMessage &msg) { |
| RelationalOperator *op = |
| query_dag_->getNodePayloadMutable(msg.header().rel_op_index); |
| op->receiveFeedbackMessage(msg); |
| } |
| |
| void Foreman::run() { |
| // Initialize before for Foreman eventloop. |
| initialize(); |
| |
| // Event loop |
| while (!query_exec_state_->hasQueryExecutionFinished()) { |
| // Receive() causes this thread to sleep until next message is received. |
| AnnotatedMessage annotated_msg = bus_->Receive(foreman_client_id_, 0, true); |
| const TaggedMessage &tagged_message = annotated_msg.tagged_message; |
| switch (tagged_message.message_type()) { |
| case kWorkOrderCompleteMessage: { |
| serialization::WorkOrderCompletionMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| processWorkOrderCompleteMessage(proto.operator_index(), proto.worker_id()); |
| break; |
| } |
| case kRebuildWorkOrderCompleteMessage: { |
| serialization::WorkOrderCompletionMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| processRebuildWorkOrderCompleteMessage(proto.operator_index(), proto.worker_id()); |
| break; |
| } |
| case kCatalogRelationNewBlockMessage: { |
| serialization::CatalogRelationNewBlockMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| const block_id block = proto.block_id(); |
| |
| CatalogRelation *relation = |
| static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id()); |
| relation->addBlock(block); |
| |
| if (proto.has_partition_id()) { |
| relation->getPartitionSchemeMutable()->addBlockToPartition(proto.partition_id(), block); |
| } |
| break; |
| } |
| case kDataPipelineMessage: { |
| // Possible message senders include InsertDestinations and some |
| // operators which modify existing blocks. |
| serialization::DataPipelineMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| processDataPipelineMessage(proto.operator_index(), proto.block_id(), proto.relation_id()); |
| break; |
| } |
| case kWorkOrdersAvailableMessage: { |
| serialization::WorkOrdersAvailableMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| const dag_node_index op_index = proto.operator_index(); |
| |
| // Check if new work orders are available. |
| fetchNormalWorkOrders(op_index); |
| |
| // Dispatch the WorkerMessages to the workers. We prefer to start the search |
| // for the schedulable WorkOrders beginning from 'op_index'. The first |
| // candidate worker to receive the next WorkOrder is the one that sent the |
| // response message to Foreman. |
| // TODO(zuyu): Improve the data locality for the next WorkOrder. |
| dispatchWorkerMessages(0, op_index); |
| break; |
| } |
| case kWorkOrderFeedbackMessage: { |
| WorkOrder::FeedbackMessage msg(const_cast<void *>(tagged_message.message()), |
| tagged_message.message_bytes()); |
| processFeedbackMessage(msg); |
| break; |
| } |
| default: |
| LOG(FATAL) << "Unknown message type to Foreman"; |
| } |
| } |
| |
| // Clean up before exiting. |
| cleanUp(); |
| } |
| |
| void Foreman::dispatchWorkerMessages( |
| const size_t start_worker_index, |
| const dag_node_index start_operator_index) { |
| // Loop over all workers. Stopping criteria: |
| // 1. Every worker has been assigned exactly max_msgs_per_worker_ workorders. |
| // OR 2. No schedulable workorders at this time. |
| size_t done_workers_count = 0; |
| for (size_t curr_worker = start_worker_index; |
| done_workers_count < workers_->getNumWorkers(); |
| curr_worker = (curr_worker + 1) % workers_->getNumWorkers()) { |
| if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) { |
| std::unique_ptr<WorkerMessage> msg; |
| msg.reset(getNextWorkerMessage( |
| start_operator_index, workers_->getNUMANode(curr_worker))); |
| if (msg.get() != nullptr) { |
| sendWorkerMessage(curr_worker, *msg); |
| workers_->incrementNumQueuedWorkOrders(curr_worker); |
| } else { |
| // No schedulable workorder at this point. |
| ++done_workers_count; |
| } |
| } else { |
| // curr_worker already has been assigned max_msgs_per_worker workorders. |
| ++done_workers_count; |
| } |
| } |
| } |
| |
| void Foreman::initializeState() { |
| const dag_node_index dag_size = query_dag_->size(); |
| |
| output_consumers_.resize(dag_size); |
| blocking_dependencies_.resize(dag_size); |
| |
| query_exec_state_.reset(new QueryExecutionState(dag_size)); |
| workorders_container_.reset(new WorkOrdersContainer(dag_size, num_numa_nodes_)); |
| |
| for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) { |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_dag_->getNodePayload(node_index).getInsertDestinationID(); |
| if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) { |
| // Rebuild is necessary whenever InsertDestination is present. |
| query_exec_state_->setRebuildRequired(node_index); |
| query_exec_state_->setRebuildStatus(node_index, 0, false); |
| } |
| |
| for (const pair<dag_node_index, bool> &dependent_link : |
| query_dag_->getDependents(node_index)) { |
| const dag_node_index dependent_op_index = dependent_link.first; |
| if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) { |
| // The link is not a pipeline-breaker. Streaming of blocks is possible |
| // between these two operators. |
| output_consumers_[node_index].push_back(dependent_op_index); |
| } else { |
| // The link is a pipeline-breaker. Streaming of blocks is not possible |
| // between these two operators. |
| blocking_dependencies_[dependent_op_index].push_back(node_index); |
| } |
| } |
| } |
| } |
| |
| // TODO(harshad) : The default policy may execute remote WorkOrders for an |
| // operator with a lower index even when there are local WorkOrders available for |
| // an operator with higher index. We should examine if avoiding this behavior |
| // has any benefits with respect to execution time and/or memory pressure. |
| WorkerMessage* Foreman::getNextWorkerMessage( |
| const dag_node_index start_operator_index, const int numa_node) { |
| // Default policy: Operator with lowest index first. |
| WorkOrder *work_order = nullptr; |
| size_t num_operators_checked = 0; |
| for (dag_node_index index = start_operator_index; |
| num_operators_checked < query_dag_->size(); |
| index = (index + 1) % query_dag_->size(), ++num_operators_checked) { |
| if (query_exec_state_->hasExecutionFinished(index)) { |
| continue; |
| } |
| if (numa_node != -1) { |
| // First try to get a normal WorkOrder from the specified NUMA node. |
| work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node); |
| if (work_order != nullptr) { |
| // A WorkOrder found on the given NUMA node. |
| query_exec_state_->incrementNumQueuedWorkOrders(index); |
| return WorkerMessage::WorkOrderMessage(work_order, index); |
| } else { |
| // Normal workorder not found on this node. Look for a rebuild workorder |
| // on this NUMA node. |
| work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node); |
| if (work_order != nullptr) { |
| return WorkerMessage::RebuildWorkOrderMessage(work_order, index); |
| } |
| } |
| } |
| // Either no workorder found on the given NUMA node, or numa_node is -1. |
| // Try to get a normal WorkOrder from other NUMA nodes. |
| work_order = workorders_container_->getNormalWorkOrder(index); |
| if (work_order != nullptr) { |
| query_exec_state_->incrementNumQueuedWorkOrders(index); |
| return WorkerMessage::WorkOrderMessage(work_order, index); |
| } else { |
| // Normal WorkOrder not found, look for a RebuildWorkOrder. |
| work_order = workorders_container_->getRebuildWorkOrder(index); |
| if (work_order != nullptr) { |
| return WorkerMessage::RebuildWorkOrderMessage(work_order, index); |
| } |
| } |
| } |
| // No WorkOrders available right now. |
| return nullptr; |
| } |
| |
| void Foreman::sendWorkerMessage(const std::size_t worker_id, |
| const WorkerMessage &message) { |
| message_type_id type; |
| if (message.getType() == WorkerMessage::kRebuildWorkOrder) { |
| type = kRebuildWorkOrderMessage; |
| } else if (message.getType() == WorkerMessage::kWorkOrder) { |
| type = kWorkOrderMessage; |
| } else { |
| FATAL_ERROR("Invalid WorkerMessageType"); |
| } |
| TaggedMessage worker_tagged_message(&message, sizeof(message), type); |
| |
| const tmb::MessageBus::SendStatus send_status = |
| QueryExecutionUtil::SendTMBMessage(bus_, |
| foreman_client_id_, |
| workers_->getClientID(worker_id), |
| move(worker_tagged_message)); |
| CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << |
| "Message could not be sent from Foreman with TMB client ID " |
| << foreman_client_id_ << " to Foreman with TMB client ID " |
| << workers_->getClientID(worker_id); |
| } |
| |
| bool Foreman::fetchNormalWorkOrders(const dag_node_index index) { |
| bool generated_new_workorders = false; |
| if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { |
| // Do not fetch any work units until all blocking dependencies are met. |
| // The releational operator is not aware of blocking dependencies for |
| // uncorrelated scalar queries. |
| if (!checkAllBlockingDependenciesMet(index)) { |
| return false; |
| } |
| const size_t num_pending_workorders_before = |
| workorders_container_->getNumNormalWorkOrders(index); |
| const bool done_generation = |
| query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(), |
| query_context_.get(), |
| storage_manager_, |
| foreman_client_id_, |
| agent_client_id_, |
| bus_); |
| if (done_generation) { |
| query_exec_state_->setDoneGenerationWorkOrders(index); |
| } |
| |
| // TODO(shoban): It would be a good check to see if operator is making |
| // useful progress, i.e., the operator either generates work orders to |
| // execute or still has pending work orders executing. However, this will not |
| // work if Foreman polls operators without feeding data. This check can be |
| // enabled, if Foreman is refactored to call getAllWorkOrders() only when |
| // pending work orders are completed or new input blocks feed. |
| |
| generated_new_workorders = |
| (num_pending_workorders_before < |
| workorders_container_->getNumNormalWorkOrders(index)); |
| } |
| return generated_new_workorders; |
| } |
| |
| void Foreman::processOperator(const dag_node_index index, |
| const bool recursively_check_dependents) { |
| if (fetchNormalWorkOrders(index)) { |
| // Fetched work orders. Return to wait for the generated work orders to |
| // execute, and skip the execution-finished checks. |
| return; |
| } |
| |
| if (checkNormalExecutionOver(index)) { |
| if (checkRebuildRequired(index)) { |
| if (!checkRebuildInitiated(index)) { |
| // Rebuild hasn't started, initiate it. |
| if (initiateRebuild(index)) { |
| // Rebuild initiated and completed right away. |
| markOperatorFinished(index); |
| } else { |
| // Rebuild WorkOrders have been generated. |
| return; |
| } |
| } else if (checkRebuildOver(index)) { |
| // Rebuild had been initiated and it is over. |
| markOperatorFinished(index); |
| } |
| } else { |
| // Rebuild is not required and normal execution over, mark finished. |
| markOperatorFinished(index); |
| } |
| // If we reach here, that means the operator has been marked as finished. |
| if (recursively_check_dependents) { |
| for (const pair<dag_node_index, bool> &dependent_link : |
| query_dag_->getDependents(index)) { |
| const dag_node_index dependent_op_index = dependent_link.first; |
| if (checkAllBlockingDependenciesMet(dependent_op_index)) { |
| processOperator(dependent_op_index, true); |
| } |
| } |
| } |
| } |
| } |
| |
| void Foreman::markOperatorFinished(const dag_node_index index) { |
| query_exec_state_->setExecutionFinished(index); |
| |
| RelationalOperator *op = query_dag_->getNodePayloadMutable(index); |
| op->updateCatalogOnCompletion(); |
| |
| const relation_id output_rel = op->getOutputRelationID(); |
| for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) { |
| const dag_node_index dependent_op_index = dependent_link.first; |
| RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index); |
| // Signal dependent operator that current operator is done feeding input blocks. |
| if (output_rel >= 0) { |
| dependent_op->doneFeedingInputBlocks(output_rel); |
| } |
| if (checkAllBlockingDependenciesMet(dependent_op_index)) { |
| dependent_op->informAllBlockingDependenciesMet(); |
| } |
| } |
| } |
| |
| bool Foreman::initiateRebuild(const dag_node_index index) { |
| DEBUG_ASSERT(!workorders_container_->hasRebuildWorkOrder(index)); |
| DEBUG_ASSERT(checkRebuildRequired(index)); |
| DEBUG_ASSERT(!checkRebuildInitiated(index)); |
| |
| getRebuildWorkOrders(index, workorders_container_.get()); |
| |
| query_exec_state_->setRebuildStatus( |
| index, workorders_container_->getNumRebuildWorkOrders(index), true); |
| |
| return (query_exec_state_->getNumRebuildWorkOrders(index) == 0); |
| } |
| |
| void Foreman::getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container) { |
| const RelationalOperator &op = query_dag_->getNodePayload(index); |
| const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID(); |
| |
| if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) { |
| return; |
| } |
| |
| vector<MutableBlockReference> partially_filled_block_refs; |
| |
| DCHECK(query_context_ != nullptr); |
| InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index); |
| DCHECK(insert_destination != nullptr); |
| |
| insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); |
| |
| for (vector<MutableBlockReference>::size_type i = 0; |
| i < partially_filled_block_refs.size(); |
| ++i) { |
| container->addRebuildWorkOrder( |
| new RebuildWorkOrder(move(partially_filled_block_refs[i]), |
| index, |
| op.getOutputRelationID(), |
| foreman_client_id_, |
| agent_client_id_, |
| bus_), |
| index); |
| } |
| } |
| |
| } // namespace quickstep |