blob: f66134b0fa2fc652d5fdc73aa9e032f705e75954 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_
#include <cstddef>
#include <memory>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryManagerBase.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
namespace quickstep {
class CatalogDatabaseLite;
class QueryHandle;
namespace serialization { class WorkOrderCompletionMessage; }
/** \addtogroup QueryExecution
* @{
*/
/**
* @brief A base class that ensures that a high level policy is maintained
* in sharing resources among concurrent queries.
**/
class PolicyEnforcerBase {
public:
/**
* @brief Constructor.
*
* @param catalog_database The CatalogDatabase used.
**/
explicit PolicyEnforcerBase(CatalogDatabaseLite *catalog_database);
/**
* @brief Virtual Destructor.
**/
virtual ~PolicyEnforcerBase() {
if (hasQueries()) {
LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or "
"waiting queries";
}
}
/**
* @brief Admit multiple queries in the system.
*
* @note In the current simple implementation, we only allow one active
* query in the system. Other queries will have to wait.
*
* @param query_handles A vector of QueryHandles for the queries to be
* admitted.
*
* @return True if all the queries were admitted, false if at least one query
* was not admitted.
**/
bool admitQueries(const std::vector<QueryHandle*> &query_handles);
/**
* @brief Remove a given query that is under execution.
*
* @note This function is made public so that it is possible for a query
* to be killed. Otherwise, it should only be used privately by the
* class.
*
* TODO(harshad) - Extend this function to support removal of waiting queries.
*
* @param query_id The ID of the query to be removed.
**/
void removeQuery(const std::size_t query_id);
/**
* @brief Process a message sent to the Foreman, which gets passed on to the
* policy enforcer.
*
* @param message The message.
**/
void processMessage(const TaggedMessage &tagged_message);
/**
* @brief Check if there are any queries to be executed.
*
* @return True if there is at least one active or waiting query, false if
* the policy enforcer doesn't have any query.
**/
inline bool hasQueries() const {
return !(admitted_queries_.empty() && waiting_queries_.empty());
}
/**
* @brief Check if the given query has profiling results.
*
* @note Even enabled profiling, not every query has profiling results.
* For example, CreateTable and CreateIndex do not produce work orders,
* so they do not have profiling results.
*
* @return True if it has profiling results, otherwise false.
**/
bool hasProfilingResults(const std::size_t query_id) const {
return workorder_time_recorder_.find(query_id) != workorder_time_recorder_.end();
}
/**
* @brief Get the profiling results for individual work order execution for a
* given query.
*
* @note This function should only be called if profiling individual work
* orders option is enabled.
*
* @param query_id The ID of the query for which the profiling results are
* requested.
*
* @return A vector of records, each being a single profiling entry.
**/
inline const std::vector<WorkOrderTimeEntry>& getProfilingResults(
const std::size_t query_id) const {
DCHECK(profile_individual_workorders_);
DCHECK(hasProfilingResults(query_id));
return workorder_time_recorder_.at(query_id);
}
/**
* @brief Admit a query to the system.
*
* @param query_handle The QueryHandle for the new query.
*
* @return Whether the query was admitted to the system.
**/
virtual bool admitQuery(QueryHandle *query_handle) = 0;
protected:
static constexpr std::size_t kMaxConcurrentQueries = 1;
/**
* @brief Add custom actions upon the completion of a query.
*
* @param query_manager The query manager.
**/
virtual void onQueryCompletion(QueryManagerBase *query_manager) {}
/**
* @brief Record the execution time for a finished WorkOrder.
*
* TODO(harshad) - Extend the functionality to rebuild work orders.
*
* @param proto The completion message proto sent after the WorkOrder
* execution.
**/
void recordTimeForWorkOrder(
const serialization::WorkOrderCompletionMessage &proto);
CatalogDatabaseLite *catalog_database_;
const bool profile_individual_workorders_;
// Key = query ID, value = QueryManagerBase* for the key query.
std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_;
// The queries which haven't been admitted yet.
std::queue<QueryHandle*> waiting_queries_;
WorkOrderTimeRecorder workorder_time_recorder_;
private:
/**
* @brief Decrement the number of queued workorders for the given worker by 1.
*
* @param proto The completion message proto received after the WorkOrder
* execution.
**/
virtual void decrementNumQueuedWorkOrders(
const serialization::WorkOrderCompletionMessage &proto) = 0;
DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_