blob: 942f38339df0bd5306476927501dda0769ee4490 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_execution/ForemanDistributed.hpp"
#include <cstddef>
#include <cstdio>
#include <cstdlib>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "catalog/Catalog.pb.h"
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "cli/Flags.hpp"
#include "query_execution/AdmitRequestMessage.hpp"
#include "query_execution/BlockLocator.hpp"
#include "query_execution/BlockLocatorUtil.hpp"
#include "query_execution/PolicyEnforcerBase.hpp"
#include "query_execution/PolicyEnforcerDistributed.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/ShiftbossDirectory.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/DataExchangerAsync.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "threading/ThreadUtil.hpp"
#include "utility/EqualsAnyConstant.hpp"
#include "glog/logging.h"
#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/tagged_message.h"
using std::make_unique;
using std::move;
using std::size_t;
using std::unique_ptr;
using std::vector;
using tmb::AnnotatedMessage;
using tmb::MessageBus;
using tmb::TaggedMessage;
using tmb::client_id;
namespace quickstep {
namespace S = serialization;
class QueryHandle;
ForemanDistributed::ForemanDistributed(
const BlockLocator &block_locator,
MessageBus *bus,
CatalogDatabaseLite *catalog_database,
QueryProcessor *query_processor,
const int cpu_id)
: ForemanBase(bus, cpu_id),
block_locator_(block_locator),
catalog_database_(DCHECK_NOTNULL(catalog_database)) {
const std::vector<QueryExecutionMessageType> sender_message_types{
kBlockDomainRegistrationMessage,
kShiftbossRegistrationResponseMessage,
kQueryInitiateMessage,
kWorkOrderMessage,
kInitiateRebuildMessage,
kQueryTeardownMessage,
kQueryExecutionSuccessMessage,
kCommandResponseMessage,
kPoisonMessage};
for (const auto message_type : sender_message_types) {
bus_->RegisterClientAsSender(foreman_client_id_, message_type);
}
const std::vector<QueryExecutionMessageType> receiver_message_types{
kBlockDomainRegistrationResponseMessage,
kShiftbossRegistrationMessage,
kAdmitRequestMessage,
kQueryInitiateResponseMessage,
kCatalogRelationNewBlockMessage,
kDataPipelineMessage,
kInitiateRebuildResponseMessage,
kWorkOrderCompleteMessage,
kRebuildWorkOrderCompleteMessage,
kWorkOrderFeedbackMessage,
kPoisonMessage};
for (const auto message_type : receiver_message_types) {
bus_->RegisterClientAsReceiver(foreman_client_id_, message_type);
}
client_id locator_client_id;
storage_manager_ = make_unique<StorageManager>(
FLAGS_storage_path,
block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_),
locator_client_id, bus_);
data_exchanger_.set_storage_manager(storage_manager_.get());
data_exchanger_.start();
policy_enforcer_ = make_unique<PolicyEnforcerDistributed>(
foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_);
}
void ForemanDistributed::run() {
if (cpu_id_ >= 0) {
// We can pin the foreman thread to a CPU if specified.
ThreadUtil::BindToCPU(cpu_id_);
}
// Ensure that at least one Shiftboss to register.
if (shiftboss_directory_.empty()) {
const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type());
DLOG(INFO) << "ForemanDistributed received ShiftbossRegistrationMessage from Client " << annotated_message.sender;
S::ShiftbossRegistrationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
DCHECK_EQ(1u, shiftboss_directory_.size());
}
// Event loop
for (;;) {
// Receive() causes this thread to sleep until next message is received.
const AnnotatedMessage annotated_message =
bus_->Receive(foreman_client_id_, 0, true);
const TaggedMessage &tagged_message = annotated_message.tagged_message;
const tmb::message_type_id message_type = tagged_message.message_type();
DLOG(INFO) << "ForemanDistributed received " << QueryExecutionUtil::MessageTypeToString(message_type)
<< " from Client " << annotated_message.sender;
switch (message_type) {
case kShiftbossRegistrationMessage: {
S::ShiftbossRegistrationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity());
break;
}
case kAdmitRequestMessage: {
const AdmitRequestMessage *request_message =
static_cast<const AdmitRequestMessage*>(tagged_message.message());
if (!policy_enforcer_->admitQueries(request_message->getQueryHandles())) {
LOG(WARNING) << "The scheduler could not admit all the queries";
}
break;
}
case kQueryInitiateResponseMessage: {
S::QueryInitiateResponseMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
break;
}
case kCatalogRelationNewBlockMessage: // Fall through
case kDataPipelineMessage:
case kRebuildWorkOrderCompleteMessage:
case kWorkOrderCompleteMessage:
case kWorkOrderFeedbackMessage: {
policy_enforcer_->processMessage(tagged_message);
break;
}
case kInitiateRebuildResponseMessage: {
// A unique case in the distributed version.
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
processInitiateRebuildResponseMessage(tagged_message);
break;
}
case kPoisonMessage: {
if (policy_enforcer_->hasQueries()) {
LOG(WARNING) << "ForemanDistributed thread exiting while some queries are "
"under execution or waiting to be admitted";
}
// Shutdown all Shiftbosses.
tmb::Address shiftboss_addresses;
for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) {
shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i));
}
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
TaggedMessage poison_message(kPoisonMessage);
const MessageBus::SendStatus send_status =
bus_->Send(foreman_client_id_,
shiftboss_addresses,
broadcast_style,
move(poison_message));
DCHECK(send_status == MessageBus::SendStatus::kOK);
return;
}
default:
LOG(FATAL) << "Unknown message type to ForemanDistributed";
}
if (canCollectNewMessages(message_type)) {
vector<unique_ptr<S::WorkOrderMessage>> new_messages;
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->
getWorkOrderProtoMessages(&new_messages);
dispatchWorkOrderMessages(new_messages);
}
}
}
bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) {
return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type,
kCatalogRelationNewBlockMessage,
kWorkOrderFeedbackMessage);
}
bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto,
const size_t next_shiftboss_index_to_schedule,
size_t *shiftboss_index_for_aggregation) {
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::aggregation_state_id aggr_state_index;
partition_id part_id;
vector<QueryContext::lip_filter_id> lip_filter_indexes;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
part_id = work_order_proto.GetExtension(S::AggregationWorkOrder::partition_id);
for (int i = 0; i < work_order_proto.ExtensionSize(S::AggregationWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::AggregationWorkOrder::lip_filter_indexes, i));
}
block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
break;
case S::BUILD_AGGREGATION_EXISTENCE_MAP:
aggr_state_index = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::aggr_state_index);
part_id = work_order_proto.GetExtension(S::BuildAggregationExistenceMapWorkOrder::partition_id);
break;
case S::INITIALIZE_AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::aggr_state_index);
part_id = work_order_proto.GetExtension(S::InitializeAggregationWorkOrder::partition_id);
break;
case S::FINALIZE_AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
part_id = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::partition_id);
break;
case S::DESTROY_AGGREGATION_STATE:
aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index);
part_id = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::partition_id);
break;
default:
return false;
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
proto.query_id(), aggr_state_index, part_id, lip_filter_indexes, block_locator_, block,
next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
return true;
}
bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto,
const size_t next_shiftboss_index_to_schedule,
size_t *shiftboss_index_for_hash_join) {
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::join_hash_table_id join_hash_table_index;
partition_id part_id;
vector<QueryContext::lip_filter_id> lip_filter_indexes;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::BUILD_HASH:
join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id);
for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildHashWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildHashWorkOrder::lip_filter_indexes, i));
}
block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id);
break;
case S::HASH_JOIN:
join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
part_id = work_order_proto.GetExtension(S::HashJoinWorkOrder::partition_id);
break;
case S::DESTROY_HASH:
join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index);
part_id = work_order_proto.GetExtension(S::DestroyHashWorkOrder::partition_id);
break;
default:
return false;
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
proto.query_id(), join_hash_table_index, part_id, lip_filter_indexes, block_locator_, block,
next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join);
return true;
}
bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
const size_t next_shiftboss_index_to_schedule,
size_t *shiftboss_index_for_lip) {
const S::WorkOrder &work_order_proto = proto.work_order();
vector<QueryContext::lip_filter_id> lip_filter_indexes;
partition_id part_id;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::BUILD_LIP_FILTER:
for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildLIPFilterWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::lip_filter_indexes, i));
}
part_id = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::partition_id);
block = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::build_block_id);
break;
case S::SELECT:
for (int i = 0; i < work_order_proto.ExtensionSize(S::SelectWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::SelectWorkOrder::lip_filter_indexes, i));
}
part_id = work_order_proto.GetExtension(S::SelectWorkOrder::partition_id);
block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
break;
default:
return false;
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForLip(
proto.query_id(), lip_filter_indexes, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
shiftboss_index_for_lip);
return true;
}
namespace {
constexpr size_t kDefaultShiftbossIndex = 0u;
bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
const BlockLocator &block_locator,
std::size_t *shiftboss_index_for_block) {
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::SAVE_BLOCKS: {
block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
break;
}
case S::SORT_RUN_GENERATION: {
block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
break;
}
default:
return false;
}
DCHECK_NE(block, kInvalidBlockId);
return block_locator.getBlockLocalityInfo(block, shiftboss_index_for_block);
}
} // namespace
void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) {
static size_t shiftboss_index = kDefaultShiftbossIndex;
PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get());
for (const auto &message : messages) {
DCHECK(message != nullptr);
const S::WorkOrderMessage &proto = *message;
const S::WorkOrder &work_order_proto = proto.work_order();
size_t shiftboss_index_for_particular_work_order_type;
if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
// Always schedule the single-node query to the same Shiftboss.
shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
} else if (isLipRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (work_order_proto.work_order_type() == S::NESTED_LOOP_JOIN) {
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForNestedLoopsJoin(
proto.query_id(), work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::nested_loops_join_index),
work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::partition_id), block_locator_,
work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::left_block_id),
work_order_proto.GetExtension(S::NestedLoopsJoinWorkOrder::right_block_id),
shiftboss_index, &shiftboss_index_for_particular_work_order_type);
} else if (hasBlockLocalityInfo(work_order_proto, block_locator_,
&shiftboss_index_for_particular_work_order_type)) {
} else {
shiftboss_index_for_particular_work_order_type = shiftboss_index;
}
sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto);
shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
if (shiftboss_index == shiftboss_index_for_particular_work_order_type &&
shiftboss_directory_.hasReachedCapacity(shiftboss_index)) {
shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size();
} else {
// NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
// <shiftboss_index_for_particular_work_order_type> might be scheduled one
// more WorkOrder for an Aggregation or a HashJoin.
}
}
}
void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
const S::WorkOrderMessage &proto) {
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kWorkOrderMessage);
free(proto_bytes);
const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index);
DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage to Shiftboss with Client " << shiftboss_client_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
shiftboss_client_id,
move(message));
CHECK(send_status == MessageBus::SendStatus::kOK);
}
void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id,
std::FILE *out) const {
const std::vector<WorkOrderTimeEntry> &recorded_times =
policy_enforcer_->getProfilingResults(query_id);
fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out);
for (const auto &workorder_entry : recorded_times) {
const std::size_t worker_id = workorder_entry.worker_id;
fprintf(out,
"%lu,%lu,%lu,%lu\n",
query_id,
worker_id,
workorder_entry.operator_id, // Operator ID.
workorder_entry.end_time - workorder_entry.start_time); // Time.
}
}
void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id,
const std::size_t work_order_capacity) {
S::ShiftbossRegistrationResponseMessage proto;
proto.set_shiftboss_index(shiftboss_directory_.size());
proto.mutable_catalog_database()->MergeFrom(static_cast<CatalogDatabase*>(catalog_database_)->getProto());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kShiftbossRegistrationResponseMessage);
free(proto_bytes);
shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity);
DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage to Shiftboss with Client "
<< shiftboss_client_id;
const MessageBus::SendStatus send_status =
QueryExecutionUtil::SendTMBMessage(bus_,
foreman_client_id_,
shiftboss_client_id,
move(message));
CHECK(send_status == MessageBus::SendStatus::kOK);
}
} // namespace quickstep