blob: 98fa6083a17890743eab67402406d47483a3f358 [file] [log] [blame]
/**
* Copyright 2015-2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#include "query_execution/Shiftboss.hpp"
#include <cstddef>
#include <cstdlib>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkerMessage.hpp"
#include "relational_operators/RebuildWorkOrder.hpp"
#include "relational_operators/WorkOrderFactory.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "threading/ThreadUtil.hpp"
#include "glog/logging.h"
#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/tagged_message.h"
using std::free;
using std::malloc;
using std::move;
using std::size_t;
using std::string;
using std::unique_ptr;
using std::vector;
using tmb::TaggedMessage;
namespace quickstep {
class WorkOrder;
void Shiftboss::run() {
if (cpu_id_ >= 0) {
// We can pin the shiftboss thread to a CPU if specified.
ThreadUtil::BindToCPU(cpu_id_);
}
for (;;) {
// Receive() is a blocking call, causing this thread to sleep until next
// message is received.
AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true));
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') received the typed '" << annotated_message.tagged_message.message_type()
<< "' message from client " << annotated_message.sender;
switch (annotated_message.tagged_message.message_type()) {
case kShiftbossRegistrationResponseMessage: {
foreman_client_id_ = annotated_message.sender;
break;
}
case kShiftbossInitiateMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
serialization::ShiftbossInitiateMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processShiftbossInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context());
break;
}
case kWorkOrderMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
serialization::WorkOrderMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(),
database_cache_,
query_context_.get(),
storage_manager_,
foreman_client_id_,
shiftboss_client_id_,
bus_);
unique_ptr<WorkerMessage> worker_message(
WorkerMessage::WorkOrderMessage(work_order, proto.operator_index()));
TaggedMessage worker_tagged_message(worker_message.get(),
sizeof(*worker_message),
kWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage
<< "') from Foreman to worker " << worker_index;
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
workers_->getClientID(worker_index),
move(worker_tagged_message));
break;
}
case kInitiateRebuildMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
serialization::InitiateRebuildMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
processInitiateRebuildMessage(proto.operator_index(),
proto.insert_destination_index(),
proto.relation_id());
break;
}
case kWorkOrderCompleteMessage: // Fall through.
case kRebuildWorkOrderCompleteMessage:
case kDataPipelineMessage:
case kWorkOrdersAvailableMessage:
case kWorkOrderFeedbackMessage: {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded typed '" << annotated_message.tagged_message.message_type()
<< "' message from worker (client " << annotated_message.sender << ") to Foreman";
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(annotated_message.tagged_message));
break;
}
case kQueryResultRelationMessage: {
const TaggedMessage &tagged_message = annotated_message.tagged_message;
serialization::QueryResultRelationMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
for (int i = 0; i < proto.blocks_size(); ++i) {
const block_id block = proto.blocks(i);
storage_manager_->saveBlockOrBlob(block);
if (storage_manager_->blockOrBlobIsLoaded(block)) {
// NOTE(zuyu): eviction is required to avoid accesses to the query
// result relation schema in CatalogDatabaseCache, for all query
// optimizer execution generator unit tests and the single-process
// Quickstep CLI.
storage_manager_->evictBlockOrBlob(block);
}
}
serialization::QueryResultRelationResponseMessage ack_proto;
ack_proto.set_relation_id(proto.relation_id());
const size_t ack_proto_length = ack_proto.ByteSize();
char *ack_proto_bytes = static_cast<char*>(malloc(ack_proto_length));
CHECK(ack_proto.SerializeToArray(ack_proto_bytes, ack_proto_length));
TaggedMessage ack_message(static_cast<const void*>(ack_proto_bytes),
ack_proto_length,
kQueryResultRelationResponseMessage);
free(ack_proto_bytes);
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent QueryResultRelationResponseMessage (typed '" << kQueryResultRelationResponseMessage
<< ") to Foreman";
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(ack_message));
break;
}
case kPoisonMessage: {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') forwarded PoisonMessage (typed '" << kPoisonMessage
<< "') from Foreman to all workers";
tmb::MessageStyle broadcast_style;
broadcast_style.Broadcast(true);
tmb::MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_,
worker_addresses_,
broadcast_style,
move(annotated_message.tagged_message));
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
return;
}
default: {
LOG(FATAL) << "Unknown TMB message type";
}
}
}
}
size_t Shiftboss::getSchedulableWorker() {
const size_t num_workers = workers_->getNumWorkers();
size_t curr_worker = start_worker_index_;
for (;;) {
if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) {
start_worker_index_ = (curr_worker + 1) % num_workers;
// TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker);
// But we need a WorkOrder queue first.
return curr_worker;
}
curr_worker = (curr_worker + 1) % num_workers;
}
}
void Shiftboss::registerWithForeman() {
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage
<< "') to all";
tmb::Address all_addresses;
all_addresses.All(true);
tmb::MessageStyle style;
serialization::ShiftbossRegistrationMessage proto;
proto.set_work_order_capacity(getWorkOrderCapacity());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage message(static_cast<const void*>(proto_bytes),
proto_length,
kShiftbossRegistrationMessage);
free(proto_bytes);
tmb::MessageBus::SendStatus send_status =
bus_->Send(shiftboss_client_id_, all_addresses, style, move(message));
DCHECK(send_status == tmb::MessageBus::SendStatus::kOK);
}
void Shiftboss::processShiftbossInitiateMessage(
const std::size_t query_id,
const serialization::CatalogDatabase &catalog_database_cache_proto,
const serialization::QueryContext &query_context_proto) {
database_cache_.update(catalog_database_cache_proto);
query_context_.reset(
new QueryContext(query_context_proto,
database_cache_,
storage_manager_,
foreman_client_id_,
shiftboss_client_id_,
bus_));
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent ShiftbossInitiateResponseMessage (typed '"
<< kShiftbossInitiateResponseMessage << "') to Foreman";
serialization::ShiftbossInitiateResponseMessage proto;
proto.set_query_id(query_id);
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
proto_length,
kShiftbossInitiateResponseMessage);
free(proto_bytes);
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(ack_message));
}
void Shiftboss::processInitiateRebuildMessage(const std::size_t op_index,
const QueryContext::insert_destination_id dest_index,
const relation_id rel_id) {
DCHECK_NE(foreman_client_id_, tmb::kClientIdNone);
DCHECK(query_context_ != nullptr);
InsertDestination *insert_destination = query_context_->getInsertDestination(dest_index);
DCHECK(insert_destination != nullptr);
vector<MutableBlockReference> partially_filled_block_refs;
insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage
<< "') to Foreman";
serialization::InitiateRebuildResponseMessage proto;
proto.set_operator_index(op_index);
proto.set_num_rebuild_work_orders(partially_filled_block_refs.size());
const size_t proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
TaggedMessage ack_message(static_cast<const void*>(proto_bytes),
proto_length,
kInitiateRebuildResponseMessage);
free(proto_bytes);
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
foreman_client_id_,
move(ack_message));
for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) {
// NOTE(zuyu): Worker releases the memory after the execution of
// RebuildWorkOrder on the Worker.
WorkOrder *rebuild_work_order =
new RebuildWorkOrder(move(partially_filled_block_refs[0]),
op_index,
rel_id,
foreman_client_id_,
shiftboss_client_id_,
bus_);
unique_ptr<WorkerMessage> worker_message(
WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index));
TaggedMessage worker_tagged_message(worker_message.get(),
sizeof(*worker_message),
kRebuildWorkOrderMessage);
const size_t worker_index = getSchedulableWorker();
LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_
<< "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage
<< "') to worker " << worker_index;
QueryExecutionUtil::SendTMBMessage(bus_,
shiftboss_client_id_,
workers_->getClientID(worker_index),
move(worker_tagged_message));
}
}
} // namespace quickstep