blob: 52aa4dc2a39717b17ae4992fbd61ab2769d4f9c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_execution/QueryContext.hpp"
#include <memory>
#include <unordered_map>
#include <utility>
#include <vector>
#include "catalog/CatalogDatabaseLite.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/ExpressionFactories.hpp"
#include "expressions/table_generator/GeneratorFunction.pb.h"
#include "expressions/table_generator/GeneratorFunctionFactory.hpp"
#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
#include "query_execution/QueryContext.pb.h"
#include "storage/AggregationOperationState.hpp"
#include "storage/HashTable.hpp"
#include "storage/HashTableFactory.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/InsertDestination.pb.h"
#include "types/TypedValue.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/SortConfiguration.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
#include "utility/lip_filter/LIPFilterDeployment.hpp"
#include "utility/lip_filter/LIPFilterFactory.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
using std::move;
using std::unique_ptr;
using std::vector;
namespace quickstep {
QueryContext::QueryContext(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
DCHECK(ProtoIsValid(proto, database))
<< "Attempted to create QueryContext from an invalid proto description:\n"
<< proto.DebugString();
for (int i = 0; i < proto.aggregation_states_size(); ++i) {
PartitionedAggregationOperationStates partitioned_aggregation_states;
const serialization::QueryContext::AggregationOperationStateContext &aggr_state_context_proto =
proto.aggregation_states(i);
for (std::uint64_t j = 0; j < aggr_state_context_proto.num_partitions(); ++j) {
partitioned_aggregation_states.emplace_back(
AggregationOperationState::ReconstructFromProto(aggr_state_context_proto.aggregation_state(),
database,
storage_manager));
}
aggregation_states_.push_back(move(partitioned_aggregation_states));
}
for (int i = 0; i < proto.generator_functions_size(); ++i) {
const GeneratorFunctionHandle *func_handle =
GeneratorFunctionFactory::Instance().reconstructFromProto(proto.generator_functions(i));
DCHECK(func_handle != nullptr);
generator_functions_.emplace_back(
std::unique_ptr<const GeneratorFunctionHandle>(func_handle));
}
for (int i = 0; i < proto.join_hash_tables_size(); ++i) {
PartitionedJoinHashTables partitioned_join_hash_tables;
const serialization::QueryContext::HashTableContext &hash_table_context_proto = proto.join_hash_tables(i);
for (std::uint64_t j = 0; j < hash_table_context_proto.num_partitions(); ++j) {
partitioned_join_hash_tables.emplace_back(
JoinHashTableFactory::CreateResizableFromProto(hash_table_context_proto.join_hash_table(), storage_manager));
}
join_hash_tables_.push_back(move(partitioned_join_hash_tables));
}
for (int i = 0; i < proto.insert_destinations_size(); ++i) {
const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i);
insert_destinations_.emplace_back(InsertDestination::ReconstructFromProto(
proto.query_id(),
insert_destination_proto,
database.getRelationSchemaById(insert_destination_proto.relation_id()),
storage_manager,
scheduler_client_id,
bus));
}
for (int i = 0; i < proto.lip_filters_size(); ++i) {
lip_filters_.emplace_back(
std::unique_ptr<LIPFilter>(
LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i))));
}
for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
lip_deployments_.emplace_back(
std::make_unique<LIPFilterDeployment>(
proto.lip_filter_deployments(i), lip_filters_));
}
for (int i = 0; i < proto.predicates_size(); ++i) {
predicates_.emplace_back(
PredicateFactory::ReconstructFromProto(proto.predicates(i), database));
}
for (int i = 0; i < proto.scalar_groups_size(); ++i) {
vector<unique_ptr<const Scalar>> scalar_group;
const serialization::QueryContext::ScalarGroup &scalar_group_proto = proto.scalar_groups(i);
for (int j = 0; j < scalar_group_proto.scalars_size(); ++j) {
scalar_group.emplace_back(
ScalarFactory::ReconstructFromProto(scalar_group_proto.scalars(j), database));
}
scalar_groups_.push_back(move(scalar_group));
}
for (int i = 0; i < proto.sort_configs_size(); ++i) {
sort_configs_.emplace_back(
SortConfiguration::ReconstructFromProto(proto.sort_configs(i), database));
}
for (int i = 0; i < proto.tuples_size(); ++i) {
tuples_.emplace_back(Tuple::ReconstructFromProto(proto.tuples(i)));
}
for (int i = 0; i < proto.update_groups_size(); ++i) {
const serialization::QueryContext::UpdateGroup &update_group_proto = proto.update_groups(i);
std::unordered_map<attribute_id, std::unique_ptr<const Scalar>> update_group;
for (int j = 0; j < update_group_proto.update_assignments_size(); ++j) {
const serialization::QueryContext::UpdateGroup::UpdateAssignment &update_assignment_proto =
update_group_proto.update_assignments(j);
unique_ptr<const Scalar> scalar(
ScalarFactory::ReconstructFromProto(update_assignment_proto.scalar(), database));
update_group.emplace(update_assignment_proto.attribute_id(), move(scalar));
}
update_groups_.push_back(move(update_group));
}
for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
window_aggregation_states_.emplace_back(
WindowAggregationOperationState::ReconstructFromProto(proto.window_aggregation_states(i),
database,
storage_manager));
}
}
bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto,
const CatalogDatabaseLite &database) {
for (int i = 0; i < proto.aggregation_states_size(); ++i) {
if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i).aggregation_state(), database)) {
return false;
}
}
// Each GeneratorFunctionHandle object is serialized as a function name with
// a list of arguments. Here checks that the arguments are valid TypedValue's.
for (int i = 0; i < proto.generator_functions_size(); ++i) {
const serialization::GeneratorFunctionHandle &func_proto = proto.generator_functions(i);
for (int j = 0; j < func_proto.args_size(); ++j) {
if (!TypedValue::ProtoIsValid(func_proto.args(j))) {
return false;
}
}
}
for (int i = 0; i < proto.join_hash_tables_size(); ++i) {
if (!JoinHashTableFactory::ProtoIsValid(proto.join_hash_tables(i).join_hash_table())) {
return false;
}
}
for (int i = 0; i < proto.insert_destinations_size(); ++i) {
const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i);
const relation_id rel_id = insert_destination_proto.relation_id();
if (!database.hasRelationWithId(rel_id) ||
!InsertDestination::ProtoIsValid(insert_destination_proto,
database.getRelationSchemaById(rel_id))) {
return false;
}
}
for (int i = 0; i < proto.lip_filters_size(); ++i) {
if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) {
return false;
}
}
for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) {
if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) {
return false;
}
}
for (int i = 0; i < proto.predicates_size(); ++i) {
if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) {
return false;
}
}
for (int i = 0; i < proto.scalar_groups_size(); ++i) {
const serialization::QueryContext::ScalarGroup &scalar_group_proto = proto.scalar_groups(i);
for (int j = 0; j < scalar_group_proto.scalars_size(); ++j) {
if (!ScalarFactory::ProtoIsValid(scalar_group_proto.scalars(j), database)) {
return false;
}
}
}
for (int i = 0; i < proto.sort_configs_size(); ++i) {
if (!SortConfiguration::ProtoIsValid(proto.sort_configs(i), database)) {
return false;
}
}
for (int i = 0; i < proto.tuples_size(); ++i) {
if (!Tuple::ProtoIsValid(proto.tuples(i))) {
return false;
}
}
for (int i = 0; i < proto.update_groups_size(); ++i) {
const serialization::QueryContext::UpdateGroup &update_group_proto = proto.update_groups(i);
const relation_id rel_id = update_group_proto.relation_id();
if (!database.hasRelationWithId(rel_id)) {
return false;
}
const CatalogRelationSchema &rel = database.getRelationSchemaById(rel_id);
for (int j = 0; j < update_group_proto.update_assignments_size(); ++j) {
const serialization::QueryContext::UpdateGroup::UpdateAssignment &update_assignment_proto =
update_group_proto.update_assignments(j);
if (!rel.hasAttributeWithId(update_assignment_proto.attribute_id()) ||
!ScalarFactory::ProtoIsValid(update_assignment_proto.scalar(), database)) {
return false;
}
}
}
for (int i = 0; i < proto.window_aggregation_states_size(); ++i) {
if (!WindowAggregationOperationState::ProtoIsValid(proto.window_aggregation_states(i),
database)) {
return false;
}
}
return proto.IsInitialized();
}
std::size_t QueryContext::getJoinHashTablesMemoryBytes() const {
SpinSharedMutexSharedLock<false> lock(hash_tables_mutex_);
std::size_t memory = 0;
for (std::size_t hashtable_id = 0;
hashtable_id < join_hash_tables_.size();
++hashtable_id) {
for (std::size_t partition_num = 0;
partition_num < join_hash_tables_[hashtable_id].size();
++partition_num) {
if (join_hash_tables_[hashtable_id][partition_num] != nullptr) {
memory += join_hash_tables_[hashtable_id][partition_num]
->getHashTableMemorySizeBytes();
}
}
}
return memory;
}
std::size_t QueryContext::getAggregationStatesMemoryBytes() const {
SpinSharedMutexSharedLock<false> lock(aggregation_states_mutex_);
std::size_t memory = 0;
for (std::size_t agg_state_id = 0;
agg_state_id < aggregation_states_.size();
++agg_state_id) {
for (std::size_t part_id = 0;
part_id < aggregation_states_[agg_state_id].size();
++part_id) {
if (aggregation_states_[agg_state_id][part_id] != nullptr) {
memory += aggregation_states_[agg_state_id][part_id]->getMemoryConsumptionBytes();
}
}
}
return memory;
}
void QueryContext::getTempRelationIDs(
std::vector<relation_id> *temp_relation_ids) const {
SpinSharedMutexSharedLock<false> lock(insert_destinations_mutex_);
DCHECK(temp_relation_ids != nullptr);
for (std::size_t id = 0; id < insert_destinations_.size(); ++id) {
InsertDestination *curr_insert_dest = insert_destinations_[id].get();
DCHECK(curr_insert_dest != nullptr);
temp_relation_ids->emplace_back(curr_insert_dest->getRelation().getID());
}
}
} // namespace quickstep