| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #include "query_execution/ForemanDistributed.hpp" |
| |
| #include <cstddef> |
| #include <cstdio> |
| #include <cstdlib> |
| #include <memory> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/Catalog.pb.h" |
| #include "catalog/CatalogDatabase.hpp" |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "cli/Flags.hpp" |
| #include "query_execution/AdmitRequestMessage.hpp" |
| #include "query_execution/BlockLocator.hpp" |
| #include "query_execution/BlockLocatorUtil.hpp" |
| #include "query_execution/PolicyEnforcerBase.hpp" |
| #include "query_execution/PolicyEnforcerDistributed.hpp" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/QueryExecutionMessages.pb.h" |
| #include "query_execution/QueryExecutionTypedefs.hpp" |
| #include "query_execution/QueryExecutionUtil.hpp" |
| #include "query_execution/ShiftbossDirectory.hpp" |
| #include "relational_operators/WorkOrder.pb.h" |
| #include "storage/DataExchangerAsync.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "storage/StorageManager.hpp" |
| #include "threading/ThreadUtil.hpp" |
| #include "utility/EqualsAnyConstant.hpp" |
| |
| #include "glog/logging.h" |
| |
| #include "tmb/address.h" |
| #include "tmb/id_typedefs.h" |
| #include "tmb/message_bus.h" |
| #include "tmb/message_style.h" |
| #include "tmb/tagged_message.h" |
| |
| using std::make_unique; |
| using std::move; |
| using std::size_t; |
| using std::unique_ptr; |
| using std::vector; |
| |
| using tmb::AnnotatedMessage; |
| using tmb::MessageBus; |
| using tmb::TaggedMessage; |
| using tmb::client_id; |
| |
| namespace quickstep { |
| |
| namespace S = serialization; |
| |
| class QueryHandle; |
| |
| ForemanDistributed::ForemanDistributed( |
| const BlockLocator &block_locator, |
| MessageBus *bus, |
| CatalogDatabaseLite *catalog_database, |
| QueryProcessor *query_processor, |
| const int cpu_id) |
| : ForemanBase(bus, cpu_id), |
| block_locator_(block_locator), |
| catalog_database_(DCHECK_NOTNULL(catalog_database)) { |
| const std::vector<QueryExecutionMessageType> sender_message_types{ |
| kBlockDomainRegistrationMessage, |
| kShiftbossRegistrationResponseMessage, |
| kQueryInitiateMessage, |
| kWorkOrderMessage, |
| kInitiateRebuildMessage, |
| kQueryTeardownMessage, |
| kQueryExecutionSuccessMessage, |
| kCommandResponseMessage, |
| kPoisonMessage}; |
| |
| for (const auto message_type : sender_message_types) { |
| bus_->RegisterClientAsSender(foreman_client_id_, message_type); |
| } |
| |
| const std::vector<QueryExecutionMessageType> receiver_message_types{ |
| kBlockDomainRegistrationResponseMessage, |
| kShiftbossRegistrationMessage, |
| kAdmitRequestMessage, |
| kQueryInitiateResponseMessage, |
| kCatalogRelationNewBlockMessage, |
| kDataPipelineMessage, |
| kInitiateRebuildResponseMessage, |
| kWorkOrderCompleteMessage, |
| kRebuildWorkOrderCompleteMessage, |
| kWorkOrderFeedbackMessage, |
| kPoisonMessage}; |
| |
| for (const auto message_type : receiver_message_types) { |
| bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); |
| } |
| |
| client_id locator_client_id; |
| storage_manager_ = make_unique<StorageManager>( |
| FLAGS_storage_path, |
| block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_), |
| locator_client_id, bus_); |
| |
| data_exchanger_.set_storage_manager(storage_manager_.get()); |
| data_exchanger_.start(); |
| |
| policy_enforcer_ = make_unique<PolicyEnforcerDistributed>( |
| foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_); |
| } |
| |
| void ForemanDistributed::run() { |
| if (cpu_id_ >= 0) { |
| // We can pin the foreman thread to a CPU if specified. |
| ThreadUtil::BindToCPU(cpu_id_); |
| } |
| |
| // Ensure that at least one Shiftboss to register. |
| if (shiftboss_directory_.empty()) { |
| const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); |
| const TaggedMessage &tagged_message = annotated_message.tagged_message; |
| DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type()); |
| DLOG(INFO) << "ForemanDistributed received ShiftbossRegistrationMessage from Client " << annotated_message.sender; |
| |
| S::ShiftbossRegistrationMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); |
| DCHECK_EQ(1u, shiftboss_directory_.size()); |
| } |
| |
| // Event loop |
| for (;;) { |
| // Receive() causes this thread to sleep until next message is received. |
| const AnnotatedMessage annotated_message = |
| bus_->Receive(foreman_client_id_, 0, true); |
| const TaggedMessage &tagged_message = annotated_message.tagged_message; |
| const tmb::message_type_id message_type = tagged_message.message_type(); |
| DLOG(INFO) << "ForemanDistributed received " << QueryExecutionUtil::MessageTypeToString(message_type) |
| << " from Client " << annotated_message.sender; |
| switch (message_type) { |
| case kShiftbossRegistrationMessage: { |
| S::ShiftbossRegistrationMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| |
| processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); |
| break; |
| } |
| case kAdmitRequestMessage: { |
| const AdmitRequestMessage *request_message = |
| static_cast<const AdmitRequestMessage*>(tagged_message.message()); |
| |
| if (!policy_enforcer_->admitQueries(request_message->getQueryHandles())) { |
| LOG(WARNING) << "The scheduler could not admit all the queries"; |
| } |
| break; |
| } |
| case kQueryInitiateResponseMessage: { |
| S::QueryInitiateResponseMessage proto; |
| CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); |
| break; |
| } |
| case kCatalogRelationNewBlockMessage: // Fall through |
| case kDataPipelineMessage: |
| case kRebuildWorkOrderCompleteMessage: |
| case kWorkOrderCompleteMessage: |
| case kWorkOrderFeedbackMessage: { |
| policy_enforcer_->processMessage(tagged_message); |
| break; |
| } |
| case kInitiateRebuildResponseMessage: { |
| // A unique case in the distributed version. |
| static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())-> |
| processInitiateRebuildResponseMessage(tagged_message); |
| break; |
| } |
| case kPoisonMessage: { |
| if (policy_enforcer_->hasQueries()) { |
| LOG(WARNING) << "ForemanDistributed thread exiting while some queries are " |
| "under execution or waiting to be admitted"; |
| } |
| |
| // Shutdown all Shiftbosses. |
| tmb::Address shiftboss_addresses; |
| for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) { |
| shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i)); |
| } |
| |
| tmb::MessageStyle broadcast_style; |
| broadcast_style.Broadcast(true); |
| |
| TaggedMessage poison_message(kPoisonMessage); |
| |
| const MessageBus::SendStatus send_status = |
| bus_->Send(foreman_client_id_, |
| shiftboss_addresses, |
| broadcast_style, |
| move(poison_message)); |
| DCHECK(send_status == MessageBus::SendStatus::kOK); |
| return; |
| } |
| default: |
| LOG(FATAL) << "Unknown message type to ForemanDistributed"; |
| } |
| |
| if (canCollectNewMessages(message_type)) { |
| vector<unique_ptr<S::WorkOrderMessage>> new_messages; |
| static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())-> |
| getWorkOrderProtoMessages(&new_messages); |
| dispatchWorkOrderMessages(new_messages); |
| } |
| } |
| } |
| |
| bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { |
| return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, |
| kCatalogRelationNewBlockMessage, |
| kWorkOrderFeedbackMessage); |
| } |
| |
| bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto, |
| const size_t next_shiftboss_index_to_schedule, |
| size_t *shiftboss_index_for_aggregation) { |
| const S::WorkOrder &work_order_proto = proto.work_order(); |
| QueryContext::aggregation_state_id aggr_state_index; |
| partition_id part_id; |
| vector<QueryContext::lip_filter_id> lip_filter_indexes; |
| block_id block = kInvalidBlockId; |
| |
| switch (work_order_proto.work_order_type()) { |
| case S::AGGREGATION: |
| aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index); |
| part_id = work_order_proto.GetExtension(S::AggregationWorkOrder::partition_id); |
| |
| for (int i = 0; i < work_order_proto.ExtensionSize(S::AggregationWorkOrder::lip_filter_indexes); ++i) { |
| lip_filter_indexes.push_back(work_order_proto.GetExtension(S::AggregationWorkOrder::lip_filter_indexes, i)); |
| } |
| |
| block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id); |
| break; |
| case S::BUILD_AGGREGATION_EXISTENCE_MAP: |
| aggr_state_index = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::aggr_state_index); |
| part_id = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::partition_id); |
| break; |
| case S::INITIALIZE_AGGREGATION: |
| aggr_state_index = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::aggr_state_index); |
| part_id = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::partition_id); |
| break; |
| case S::FINALIZE_AGGREGATION: |
| aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index); |
| part_id = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::partition_id); |
| break; |
| case S::DESTROY_AGGREGATION_STATE: |
| aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index); |
| part_id = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::partition_id); |
| break; |
| default: |
| return false; |
| } |
| |
| static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation( |
| proto.query_id(), aggr_state_index, part_id, lip_filter_indexes, block_locator_, block, |
| next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation); |
| |
| return true; |
| } |
| |
| bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto, |
| const size_t next_shiftboss_index_to_schedule, |
| size_t *shiftboss_index_for_hash_join) { |
| const S::WorkOrder &work_order_proto = proto.work_order(); |
| QueryContext::join_hash_table_id join_hash_table_index; |
| partition_id part_id; |
| vector<QueryContext::lip_filter_id> lip_filter_indexes; |
| block_id block = kInvalidBlockId; |
| |
| switch (work_order_proto.work_order_type()) { |
| case S::BUILD_HASH: |
| join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index); |
| part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id); |
| |
| for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildHashWorkOrder::lip_filter_indexes); ++i) { |
| lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildHashWorkOrder::lip_filter_indexes, i)); |
| } |
| |
| block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id); |
| break; |
| case S::HASH_JOIN: |
| join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index); |
| part_id = work_order_proto.GetExtension(S::HashJoinWorkOrder::partition_id); |
| break; |
| case S::DESTROY_HASH: |
| join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index); |
| part_id = work_order_proto.GetExtension(S::DestroyHashWorkOrder::partition_id); |
| break; |
| default: |
| return false; |
| } |
| |
| static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin( |
| proto.query_id(), join_hash_table_index, part_id, lip_filter_indexes, block_locator_, block, |
| next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join); |
| |
| return true; |
| } |
| |
| bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto, |
| const size_t next_shiftboss_index_to_schedule, |
| size_t *shiftboss_index_for_lip) { |
| const S::WorkOrder &work_order_proto = proto.work_order(); |
| vector<QueryContext::lip_filter_id> lip_filter_indexes; |
| partition_id part_id; |
| block_id block = kInvalidBlockId; |
| |
| switch (work_order_proto.work_order_type()) { |
| case S::BUILD_LIP_FILTER: |
| for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildLIPFilterWorkOrder::lip_filter_indexes); ++i) { |
| lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::lip_filter_indexes, i)); |
| } |
| part_id = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::partition_id); |
| block = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::build_block_id); |
| break; |
| case S::SELECT: |
| for (int i = 0; i < work_order_proto.ExtensionSize(S::SelectWorkOrder::lip_filter_indexes); ++i) { |
| lip_filter_indexes.push_back(work_order_proto.GetExtension(S::SelectWorkOrder::lip_filter_indexes, i)); |
| } |
| part_id = work_order_proto.GetExtension(S::SelectWorkOrder::partition_id); |
| block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id); |
| break; |
| default: |
| return false; |
| } |
| |
| static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForLip( |
| proto.query_id(), lip_filter_indexes, part_id, block_locator_, block, next_shiftboss_index_to_schedule, |
| shiftboss_index_for_lip); |
| |
| return true; |
| } |
| |
| namespace { |
| |
| constexpr size_t kDefaultShiftbossIndex = 0u; |
| |
| bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto, |
| const BlockLocator &block_locator, |
| std::size_t *shiftboss_index_for_block) { |
| block_id block = kInvalidBlockId; |
| switch (work_order_proto.work_order_type()) { |
| case S::SAVE_BLOCKS: { |
| block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id); |
| break; |
| } |
| case S::SORT_RUN_GENERATION: { |
| block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id); |
| break; |
| } |
| default: |
| return false; |
| } |
| |
| DCHECK_NE(block, kInvalidBlockId); |
| return block_locator.getBlockLocalityInfo(block, shiftboss_index_for_block); |
| } |
| |
| } // namespace |
| |
| void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { |
| static size_t shiftboss_index = kDefaultShiftbossIndex; |
| |
| PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get()); |
| for (const auto &message : messages) { |
| DCHECK(message != nullptr); |
| const S::WorkOrderMessage &proto = *message; |
| const S::WorkOrder &work_order_proto = proto.work_order(); |
| size_t shiftboss_index_for_particular_work_order_type; |
| if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) { |
| // Always schedule the single-node query to the same Shiftboss. |
| shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex; |
| } else if (isLipRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { |
| } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { |
| } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { |
| } else if (work_order_proto.work_order_type() == S::NESTED_LOOP_JOIN) { |
| static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForNestedLoopsJoin( |
| proto.query_id(), work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::nested_loops_join_index), |
| work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::partition_id), block_locator_, |
| work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id), |
| work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id), |
| shiftboss_index, &shiftboss_index_for_particular_work_order_type); |
| } else if (hasBlockLocalityInfo(work_order_proto, block_locator_, |
| &shiftboss_index_for_particular_work_order_type)) { |
| } else { |
| shiftboss_index_for_particular_work_order_type = shiftboss_index; |
| } |
| |
| sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto); |
| shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type); |
| |
| if (shiftboss_index == shiftboss_index_for_particular_work_order_type && |
| shiftboss_directory_.hasReachedCapacity(shiftboss_index)) { |
| shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size(); |
| } else { |
| // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case, |
| // <shiftboss_index_for_particular_work_order_type> might be scheduled one |
| // more WorkOrder for an Aggregation or a HashJoin. |
| } |
| } |
| } |
| |
| void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, |
| const S::WorkOrderMessage &proto) { |
| const size_t proto_length = proto.ByteSize(); |
| char *proto_bytes = static_cast<char*>(malloc(proto_length)); |
| CHECK(proto.SerializeToArray(proto_bytes, proto_length)); |
| |
| TaggedMessage message(static_cast<const void*>(proto_bytes), |
| proto_length, |
| kWorkOrderMessage); |
| free(proto_bytes); |
| |
| const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index); |
| DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage to Shiftboss with Client " << shiftboss_client_id; |
| const MessageBus::SendStatus send_status = |
| QueryExecutionUtil::SendTMBMessage(bus_, |
| foreman_client_id_, |
| shiftboss_client_id, |
| move(message)); |
| CHECK(send_status == MessageBus::SendStatus::kOK); |
| } |
| |
| void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id, |
| std::FILE *out) const { |
| const std::vector<WorkOrderTimeEntry> &recorded_times = |
| policy_enforcer_->getProfilingResults(query_id); |
| fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out); |
| for (const auto &workorder_entry : recorded_times) { |
| const std::size_t worker_id = workorder_entry.worker_id; |
| fprintf(out, |
| "%lu,%lu,%lu,%lu\n", |
| query_id, |
| worker_id, |
| workorder_entry.operator_id, // Operator ID. |
| workorder_entry.end_time - workorder_entry.start_time); // Time. |
| } |
| } |
| |
| void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id, |
| const std::size_t work_order_capacity) { |
| S::ShiftbossRegistrationResponseMessage proto; |
| proto.set_shiftboss_index(shiftboss_directory_.size()); |
| proto.mutable_catalog_database()->MergeFrom(static_cast<CatalogDatabase*>(catalog_database_)->getProto()); |
| |
| const size_t proto_length = proto.ByteSize(); |
| char *proto_bytes = static_cast<char*>(malloc(proto_length)); |
| CHECK(proto.SerializeToArray(proto_bytes, proto_length)); |
| |
| TaggedMessage message(static_cast<const void*>(proto_bytes), |
| proto_length, |
| kShiftbossRegistrationResponseMessage); |
| free(proto_bytes); |
| |
| shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity); |
| |
| DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage to Shiftboss with Client " |
| << shiftboss_client_id; |
| const MessageBus::SendStatus send_status = |
| QueryExecutionUtil::SendTMBMessage(bus_, |
| foreman_client_id_, |
| shiftboss_client_id, |
| move(message)); |
| CHECK(send_status == MessageBus::SendStatus::kOK); |
| } |
| |
| } // namespace quickstep |