blob: 11b9d706815046df1e8ab1de5cb2697cb29e8b2e [file] [log] [blame]
/**
* Copyright 2016, Quickstep Research Group, Computer Sciences Department,
* University of Wisconsin—Madison.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#include "query_execution/PriorityPolicyEnforcer.hpp"
#include <algorithm>
#include <cstddef>
#include <memory>
#include <queue>
#include <utility>
#include <unordered_map>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/Learner.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
#include "query_execution/WorkerDirectory.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageManager.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
namespace quickstep {
DEFINE_uint64(max_msgs_per_dispatch_round, 40, "Maximum number of messages that"
" can be allocated in a single round of dispatch of messages to"
" the workers.");
DEFINE_bool(highest_priority_first, false, "Pick queries from the highest priority level first");
DEFINE_bool(dynamic_probabilities_in_learner, true, "Whether the learner should have dynamic probabilities or static probabilities");
PriorityPolicyEnforcer::PriorityPolicyEnforcer(const tmb::client_id foreman_client_id,
const std::size_t num_numa_nodes,
CatalogDatabaseLite *catalog_database,
StorageManager *storage_manager,
WorkerDirectory *worker_directory,
tmb::MessageBus *bus,
const bool profile_individual_workorders)
: foreman_client_id_(foreman_client_id),
num_numa_nodes_(num_numa_nodes),
catalog_database_(catalog_database),
storage_manager_(storage_manager),
worker_directory_(worker_directory),
bus_(bus),
profile_individual_workorders_(profile_individual_workorders),
committed_memory_(0) {
learner_.reset(new Learner());
}
bool PriorityPolicyEnforcer::admitQuery(QueryHandle *query_handle) {
// Find a victim query to be suspended.
while (!admissionMemoryCheck(query_handle)) {
std::pair<int, std::size_t> victim_query = getQueryWithHighestMemoryFootprint();
if (victim_query.first != kInvalidQueryID) {
// We need to suspend this query - move it from admitted to suspended.
suspendQuery(victim_query.first);
}
}
// if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
// Query with the same ID not present, ok to admit.
admitted_queries_[query_id].reset(
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
std::cout << "Admitted query with ID: " << query_handle->query_id()
<< " priority: " << query_handle->query_priority() << "\n";
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
query_handle->setAdmissionTime();
query_id_to_handle_[query_handle->query_id()] = query_handle;
LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
return false;
}
/*} else {
// This query will have to wait.
std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
// This query was not waitlisted earlier.
query_id_to_handle_[query_handle->query_id()] = query_handle;
waiting_queries_.push(query_handle);
}
return false;
}*/
}
bool PriorityPolicyEnforcer::admitSuspendedQuery(QueryHandle *query_handle) {
if (admissionMemoryCheck(query_handle)) { //admitted_queries_.size() < kMaxConcurrentQueries)
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
// Query with the same ID not present, ok to admit.
admitted_queries_[query_id].reset(
new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle,
catalog_database_, storage_manager_, bus_));
std::cout << "Admitted query with ID: " << query_handle->query_id()
<< " priority: " << query_handle->query_priority() << "\n";
priority_query_ids_[query_handle->query_priority()].emplace_back(query_id);
learner_->addQuery(*query_handle);
query_handle->setAdmissionTime();
query_id_to_handle_[query_handle->query_id()] = query_handle;
LOG(INFO) << "Query " << query_handle->query_id() << " mem estimate: " << query_handle->getEstimatedMaxMemoryInBytes() << " bytes";
return true;
} else {
LOG(ERROR) << "Query with the same ID " << query_id << " exists";
return false;
}
} else {
// Let the query be in the suspended mode.
// This query will have to wait.
/*std::cout << "Query " << query_handle->query_id() << " waitlisted\n";
if (query_id_to_handle_.find(query_handle->query_id()) == query_id_to_handle_.end()) {
// This query was not waitlisted earlier.
query_id_to_handle_[query_handle->query_id()] = query_handle;
waiting_queries_.push(query_handle);
}*/
return false;
}
}
void PriorityPolicyEnforcer::processMessage(const TaggedMessage &tagged_message) {
// TODO(harshad) : Provide processXMessage() public functions in
// QueryManager, so that we need to extract message from the
// TaggedMessage only once.
std::size_t query_id;
std::size_t operator_id;
switch (tagged_message.message_type()) {
case kWorkOrderCompleteMessage: {
serialization::NormalWorkOrderCompletionMessage proto;
// Note: This proto message contains the time it took to execute the
// WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
if (!hasQuerySuspended(query_id)) {
// Add completion feedback for non-suspended queries.
if (FLAGS_dynamic_probabilities_in_learner) {
learner_->addCompletionFeedback<true>(proto);
} else {
learner_->addCompletionFeedback<false>(proto);
}
}
if (profile_individual_workorders_) {
recordTimeForWorkOrder(proto);
}
break;
}
case kRebuildWorkOrderCompleteMessage: {
serialization::RebuildWorkOrderCompletionMessage proto;
// Note: This proto message contains the time it took to execute the
// rebuild WorkOrder. It can be accessed in this scope.
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
operator_id = proto.operator_index();
worker_directory_->decrementNumQueuedWorkOrders(
proto.worker_thread_index());
break;
}
case kCatalogRelationNewBlockMessage: {
serialization::CatalogRelationNewBlockMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
break;
}
case kDataPipelineMessage: {
serialization::DataPipelineMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
break;
}
case kWorkOrdersAvailableMessage: {
serialization::WorkOrdersAvailableMessage proto;
CHECK(proto.ParseFromArray(tagged_message.message(),
tagged_message.message_bytes()));
query_id = proto.query_id();
break;
}
case kWorkOrderFeedbackMessage: {
WorkOrder::FeedbackMessage msg(
const_cast<void *>(tagged_message.message()),
tagged_message.message_bytes());
query_id = msg.header().query_id;
break;
}
default:
LOG(FATAL) << "Unknown message type found in PriorityPolicyEnforcer";
}
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
const QueryManager::QueryStatusCode return_code =
admitted_queries_[query_id]->processMessage(tagged_message);
// NOTE: kQueryExecuted takes precedence over kOperatorExecuted.
if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) {
removeQuery(query_id);
if (!suspended_queries_.empty()) {
// Admit a suspended query.
QueryHandle *suspended_query = suspended_queries_.back();
if (admitSuspendedQuery(suspended_query)) {
std::cout << "Admitting suspended query " << suspended_query->query_id() << " back\n";
suspended_queries_.pop_back();
suspended_query_managers_.erase(suspended_query->query_id());
return;
}
}
if (!waiting_queries_.empty()) {
// Admit the earliest waiting query.
QueryHandle *new_query = waiting_queries_.front();
// waiting_queries_.pop();
if (admitQuery(new_query)) {
std::cout << "Removing Q " << new_query->query_id() << " from waitlist\n";
waiting_queries_.pop();
}
}
} else if (return_code == QueryManager::QueryStatusCode::kOperatorExecuted) {
learner_->removeOperator(query_id, operator_id);
}
}
void PriorityPolicyEnforcer::getWorkerMessages(
std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
if (!FLAGS_highest_priority_first) {
// Iterate over admitted queries until either there are no more
// messages available, or the maximum number of messages have
// been collected.
DCHECK(worker_messages->empty());
std::unordered_map<std::size_t, bool> finished_queries_ids;
if (learner_->hasActiveQueries()) {
// Key = priority level. Value = Whether we have already checked the
std::unordered_map<std::size_t, bool> checked_priority_levels;
// While there are more priority levels to be checked ..
while (checked_priority_levels.size() < priority_query_ids_.size() && worker_messages->size() < FLAGS_max_msgs_per_dispatch_round) {
const int chosen_priority_level = learner_->pickRandomPriorityLevel();
if (chosen_priority_level == kInvalidPriorityLevel) {
DLOG(INFO) << "No valid priority level chosen";
break;
} else if (checked_priority_levels.find(static_cast<std::size_t>(
chosen_priority_level)) != checked_priority_levels.end()) {
continue;
} else {
WorkerMessage *next_worker_message =
getNextWorkerMessageFromPriorityLevel(chosen_priority_level,
&finished_queries_ids);
if (next_worker_message != nullptr) {
worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
} else {
checked_priority_levels[static_cast<std::size_t>(chosen_priority_level)] = true;
}
}
}
} else {
DLOG(INFO) << "No active queries in the learner at this point.";
return;
}
for (auto finished_qid_pair : finished_queries_ids) {
removeQuery(finished_qid_pair.first);
}
} else {
getWorkerMessagesHPF(worker_messages);
}
}
void PriorityPolicyEnforcer::getWorkerMessagesHPF(std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) {
// Iterate over admitted queries until either there are no more
// messages available, or the maximum number of messages have
// been collected.
DCHECK(worker_messages->empty());
if (learner_->hasActiveQueries()) {
const std::set<std::size_t> &priority_set = learner_->getSetOfPriorityLevels();
std::unordered_map<std::size_t, bool> finished_queries_ids;
auto priority_set_reverse_it = priority_set.rbegin();
if (priority_set.empty()) {
DLOG(INFO) << "No priority level available";
return;
}
while (worker_messages->size() < FLAGS_max_msgs_per_dispatch_round && priority_set_reverse_it != priority_set.rend()) {
WorkerMessage *next_worker_message = getNextWorkerMessageFromPriorityLevel(*priority_set_reverse_it, &finished_queries_ids);
if (next_worker_message != nullptr) {
worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message));
} else {
// No work orders available in the current priority level, check the
// next largest priority level.
++priority_set_reverse_it;
}
}
for (auto finished_qid_pair : finished_queries_ids) {
removeQuery(finished_qid_pair.first);
}
}
}
void PriorityPolicyEnforcer::removeQuery(const std::size_t query_id) {
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) {
LOG(WARNING) << "Removing query with ID " << query_id
<< " that hasn't finished its execution";
}
admitted_queries_.erase(query_id);
// Remove the query from priority_query_ids_ structure.
const int query_priority = learner_->getQueryPriority(query_id);
DCHECK(query_priority != kInvalidPriorityLevel);
const std::size_t query_priority_unsigned =
static_cast<std::size_t>(query_priority);
std::vector<std::size_t> *query_ids_for_priority_level =
&priority_query_ids_[query_priority_unsigned];
query_ids_for_priority_level->erase(
std::remove(query_ids_for_priority_level->begin(),
query_ids_for_priority_level->end(),
query_id));
if (query_ids_for_priority_level->empty()) {
// No more queries for the given priority level. Remove the entry.
priority_query_ids_.erase(query_priority_unsigned);
}
query_id_to_handle_[query_id]->setCompletionTime();
const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
committed_memory_ -= estimated_memory_bytes;
// Remove the query from the learner.
learner_->removeQuery(query_id);
// TODO(harshad) - Admit waiting queries, if any.
DLOG(INFO) << "Removed query: " << query_id << " with priority: " << query_priority;
}
void PriorityPolicyEnforcer::suspendQuery(const std::size_t query_id) {
suspended_query_managers_[query_id].reset(admitted_queries_[query_id].release());
suspended_queries_.push_back(query_id_to_handle_[query_id]);
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
admitted_queries_.erase(query_id);
// Remove the query from priority_query_ids_ structure.
const int query_priority = learner_->getQueryPriority(query_id);
DCHECK(query_priority != kInvalidPriorityLevel);
const std::size_t query_priority_unsigned =
static_cast<std::size_t>(query_priority);
std::vector<std::size_t> *query_ids_for_priority_level =
&priority_query_ids_[query_priority_unsigned];
query_ids_for_priority_level->erase(
std::remove(query_ids_for_priority_level->begin(),
query_ids_for_priority_level->end(),
query_id));
if (query_ids_for_priority_level->empty()) {
// No more queries for the given priority level. Remove the entry.
priority_query_ids_.erase(query_priority_unsigned);
}
// TODO(harshad) - Support actually evicting the memory used up by the suspended query.
const std::size_t estimated_memory_bytes = query_id_to_handle_[query_id]->getEstimatedMaxMemoryInBytes();
committed_memory_ -= estimated_memory_bytes;
// Remove the query from the learner.
learner_->removeQuery(query_id);
std::cout << "Suspended query: " << query_id << " with priority: " << query_priority;
}
bool PriorityPolicyEnforcer::admitQueries(
const std::vector<QueryHandle*> &query_handles) {
bool result = true;
for (QueryHandle *curr_query : query_handles) {
if (!admitQuery(curr_query)) {
result = false;
}
}
return result;
}
void PriorityPolicyEnforcer::recordTimeForWorkOrder(
const serialization::NormalWorkOrderCompletionMessage &proto) {
const std::size_t query_id = proto.query_id();
if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) {
workorder_time_recorder_[query_id];
}
workorder_time_recorder_[query_id].emplace_back(
proto.worker_thread_index(),
proto.operator_index(),
proto.execution_time_in_microseconds());
}
WorkerMessage* PriorityPolicyEnforcer::getNextWorkerMessageFromPriorityLevel(
const std::size_t priority_level,
std::unordered_map<std::size_t, bool> *finished_queries_ids) {
// Key = query ID from the given priority level, value = whether we have
// checked this query earlier.
std::unordered_map<std::size_t, bool> checked_query_ids;
// While there are more queries to be checked ..
while (checked_query_ids.size() < priority_query_ids_[priority_level].size()) {
int chosen_query_id;
if (FLAGS_dynamic_probabilities_in_learner) {
chosen_query_id = learner_->pickRandomQueryFromPriorityLevel<true>(priority_level);
} else {
chosen_query_id = learner_->pickRandomQueryFromPriorityLevel<false>(priority_level);
}
if (chosen_query_id == kInvalidQueryID) {
// No query available at this time in this priority level.
return nullptr;
} else if (checked_query_ids.find(static_cast<std::size_t>(chosen_query_id)) != checked_query_ids.end()) {
// Find a query from the same priority level, but not present in the
// checked_query_ids map.
for (const std::size_t qid : priority_query_ids_[priority_level]) {
if (checked_query_ids.find(qid) == checked_query_ids.end() &&
finished_queries_ids->find(qid) == finished_queries_ids->end()) {
// Query not seen already.
QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(qid)].get();
DCHECK(chosen_query_manager != nullptr);
std::unique_ptr<WorkerMessage> next_worker_message(
chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
if (next_worker_message != nullptr) {
return next_worker_message.release();
} else {
// This query doesn't have any WorkerMessage right now. Mark as checked.
checked_query_ids[qid] = true;
if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
(*finished_queries_ids)[static_cast<std::size_t>(qid)] = true;
}
}
}
}
} else {
// We haven't seen this query earlier. Check if it has any schedulable
// WorkOrder.
QueryManager *chosen_query_manager = admitted_queries_[static_cast<std::size_t>(chosen_query_id)].get();
DCHECK(chosen_query_manager != nullptr);
std::unique_ptr<WorkerMessage> next_worker_message(chosen_query_manager->getNextWorkerMessage(0, kAnyNUMANodeID));
if (next_worker_message != nullptr) {
return next_worker_message.release();
} else {
// This query doesn't have any WorkerMessage right now. Mark as checked.
checked_query_ids[chosen_query_id] = true;
if (chosen_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
(*finished_queries_ids)[static_cast<std::size_t>(chosen_query_id)] = true;
}
}
}
}
return nullptr;
}
bool PriorityPolicyEnforcer::admissionMemoryCheck(const QueryHandle *query_handle) {
if (admitted_queries_.empty()) {
// No query running in the system, let the query in.
return true;
}
const std::size_t estimated_memory_requirement_bytes = query_handle->getEstimatedMaxMemoryInBytes();
const std::size_t estimated_slots = StorageManager::SlotsNeededForBytes(estimated_memory_requirement_bytes);
const std::size_t current_slots = StorageManager::SlotsNeededForBytes(storage_manager_->getMemorySize());
const std::size_t committed_slots = StorageManager::SlotsNeededForBytes(committed_memory_);
if (std::max(committed_slots, current_slots) + estimated_slots < storage_manager_->getMaxBufferPoolSlots()) {
committed_memory_ += estimated_memory_requirement_bytes;
return true;
}
return false;
}
const std::size_t PriorityPolicyEnforcer::getMemoryForQueryInBytes(const std::size_t query_id) {
DCHECK(query_id_to_handle_.find(query_id) != query_id_to_handle_.end());
QueryHandle *query_handle = query_id_to_handle_[query_id];
std::size_t memory = query_handle->getMemoryTempRelationsBytes();
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
memory += admitted_queries_[query_id]->getMemoryBytes();
return memory;
}
const std::pair<int, std::size_t> PriorityPolicyEnforcer::getQueryWithHighestMemoryFootprint() {
std::size_t max_memory_footprint = 0;
int query_id_with_max_memory = kInvalidQueryID;
for (auto it = admitted_queries_.begin(); it != admitted_queries_.end(); ++it) {
const std::size_t curr_query_footprint = getMemoryForQueryInBytes(it->first);
if (curr_query_footprint > max_memory_footprint) {
max_memory_footprint = curr_query_footprint;
query_id_with_max_memory = static_cast<int>(it->first);
}
}
return std::make_pair(query_id_with_max_memory, max_memory_footprint);
}
bool PriorityPolicyEnforcer::hasQuerySuspended(const std::size_t query_id) const {
auto it = std::find_if(suspended_queries_.begin(), suspended_queries_.end(), [query_id] (const QueryHandle *handle) {return handle->query_id() == query_id; });
return it != suspended_queries_.end();
}
} // namespace quickstep