blob: f9d7d98895593f45abee3dde937405a566876b26 [file] [log] [blame]
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
* Copyright 2016, Quickstep Research Group, Computer Sciences Department,
* University of Wisconsin—Madison.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
#include "query_optimizer/ExecutionGenerator.hpp"
#include <algorithm>
#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/cost_model/SimpleCostModel.hpp"
#include "query_optimizer/expressions/AggregateFunction.hpp"
#include "query_optimizer/expressions/Alias.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ComparisonExpression.hpp"
#include "query_optimizer/expressions/ExpressionType.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
#include "query_optimizer/expressions/Scalar.hpp"
#include "query_optimizer/expressions/ScalarLiteral.hpp"
#include "query_optimizer/physical/CopyFrom.hpp"
#include "query_optimizer/physical/CreateIndex.hpp"
#include "query_optimizer/physical/CreateTable.hpp"
#include "query_optimizer/physical/DeleteTuples.hpp"
#include "query_optimizer/physical/DropTable.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
#include "query_optimizer/physical/InsertSelection.hpp"
#include "query_optimizer/physical/InsertTuple.hpp"
#include "query_optimizer/physical/NestedLoopsJoin.hpp"
#include "query_optimizer/physical/PatternMatcher.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/Sample.hpp"
#include "query_optimizer/physical/Selection.hpp"
#include "query_optimizer/physical/SharedSubplanReference.hpp"
#include "query_optimizer/physical/Sort.hpp"
#include "query_optimizer/physical/TableGenerator.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/physical/UpdateTable.hpp"
#include "relational_operators/AggregationOperator.hpp"
#include "relational_operators/BuildHashOperator.hpp"
#include "relational_operators/CreateIndexOperator.hpp"
#include "relational_operators/CreateTableOperator.hpp"
#include "relational_operators/DeleteOperator.hpp"
#include "relational_operators/DestroyHashOperator.hpp"
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
#include "relational_operators/HashJoinOperator.hpp"
#include "relational_operators/InsertOperator.hpp"
#include "relational_operators/NestedLoopsJoinOperator.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/SampleOperator.hpp"
#include "relational_operators/SaveBlocksOperator.hpp"
#include "relational_operators/SelectOperator.hpp"
#include "relational_operators/SortMergeRunOperator.hpp"
#include "relational_operators/SortRunGenerationOperator.hpp"
#include "relational_operators/TableGeneratorOperator.hpp"
#include "relational_operators/TextScanOperator.hpp"
#include "relational_operators/UpdateOperator.hpp"
#include "storage/AggregationOperationState.pb.h"
#include "storage/HashTable.pb.h"
#include "storage/HashTableFactory.hpp"
#include "storage/InsertDestination.pb.h"
#include "storage/StorageBlockLayout.hpp"
#include "storage/StorageBlockLayout.pb.h"
#include "types/Type.hpp"
#include "types/Type.pb.h"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "types/containers/Tuple.pb.h"
#include "utility/SqlError.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
using std::move;
using std::static_pointer_cast;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
namespace quickstep {
namespace optimizer {
DEFINE_string(join_hashtable_type, "SeparateChaining",
"HashTable implementation to use for hash joins (valid options "
"are SeparateChaining or LinearOpenAddressing)");
static const bool join_hashtable_type_dummy
= gflags::RegisterFlagValidator(&FLAGS_join_hashtable_type,
&ValidateHashTableImplTypeString);
DEFINE_string(aggregate_hashtable_type, "LinearOpenAddressing",
"HashTable implementation to use for aggregates with GROUP BY "
"(valid options are SeparateChaining or LinearOpenAddressing)");
static const bool aggregate_hashtable_type_dummy
= gflags::RegisterFlagValidator(&FLAGS_aggregate_hashtable_type,
&ValidateHashTableImplTypeString);
DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
namespace S = ::quickstep::serialization;
constexpr QueryPlan::DAGNodeIndex ExecutionGenerator::CatalogRelationInfo::kInvalidOperatorIndex;
void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
CHECK(P::SomeTopLevelPlan::MatchesWithConditionalCast(physical_plan, &top_level_physical_plan_))
<< "The physical plan must be rooted by a TopLevelPlan";
const CatalogRelation *result_relation = nullptr;
try {
for (const P::PhysicalPtr &shared_subplan : top_level_physical_plan_->shared_subplans()) {
generatePlanInternal(shared_subplan);
}
generatePlanInternal(top_level_physical_plan_->plan());
// Set the query result relation if the input plan exists in physical_to_execution_map_,
// which indicates the plan is the result of a SELECT query.
const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator it =
physical_to_output_relation_map_.find(top_level_physical_plan_->plan());
if (it != physical_to_output_relation_map_.end()) {
result_relation = it->second.relation;
}
} catch (...) {
// Drop all temporary relations.
dropAllTemporaryRelations();
throw;
}
// Add one DropTableOperator per temporary relation, except for the result relation, if any.
// NOTE(zuyu): the Cli shell drops the result relation after printing, if enabled.
for (const CatalogRelationInfo &temporary_relation_info : temporary_relation_info_vec_) {
const CatalogRelation *temporary_relation = temporary_relation_info.relation;
if (temporary_relation == result_relation) {
query_handle_->setQueryResultRelation(
optimizer_context_->catalog_database()->getRelationByIdMutable(result_relation->getID()));
continue;
}
const QueryPlan::DAGNodeIndex drop_table_index =
execution_plan_->addRelationalOperator(
new DropTableOperator(*temporary_relation,
optimizer_context_->catalog_database(),
false /* only_drop_blocks */));
DCHECK(!temporary_relation_info.isStoredRelation());
execution_plan_->addDependenciesForDropOperator(
drop_table_index,
temporary_relation_info.producer_operator_index);
}
}
void ExecutionGenerator::generatePlanInternal(
const P::PhysicalPtr &physical_plan) {
// Generate the execution plan in bottom-up.
for (const P::PhysicalPtr &child : physical_plan->children()) {
generatePlanInternal(child);
}
switch (physical_plan->getPhysicalType()) {
case P::PhysicalType::kAggregate:
return convertAggregate(
std::static_pointer_cast<const P::Aggregate>(physical_plan));
case P::PhysicalType::kCopyFrom:
return convertCopyFrom(
std::static_pointer_cast<const P::CopyFrom>(physical_plan));
case P::PhysicalType::kCreateIndex:
return convertCreateIndex(
std::static_pointer_cast<const P::CreateIndex>(physical_plan));
case P::PhysicalType::kCreateTable:
return convertCreateTable(
std::static_pointer_cast<const P::CreateTable>(physical_plan));
case P::PhysicalType::kDeleteTuples:
return convertDeleteTuples(
std::static_pointer_cast<const P::DeleteTuples>(physical_plan));
case P::PhysicalType::kDropTable:
return convertDropTable(
std::static_pointer_cast<const P::DropTable>(physical_plan));
case P::PhysicalType::kHashJoin:
return convertHashJoin(
std::static_pointer_cast<const P::HashJoin>(physical_plan));
case P::PhysicalType::kInsertSelection:
return convertInsertSelection(
std::static_pointer_cast<const P::InsertSelection>(physical_plan));
case P::PhysicalType::kInsertTuple:
return convertInsertTuple(
std::static_pointer_cast<const P::InsertTuple>(physical_plan));
case P::PhysicalType::kNestedLoopsJoin:
return convertNestedLoopsJoin(
std::static_pointer_cast<const P::NestedLoopsJoin>(physical_plan));
case P::PhysicalType::kSample:
return convertSample(
std::static_pointer_cast<const P::Sample>(physical_plan));
case P::PhysicalType::kSelection:
return convertSelection(
std::static_pointer_cast<const P::Selection>(physical_plan));
case P::PhysicalType::kSharedSubplanReference:
return convertSharedSubplanReference(
std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan));
case P::PhysicalType::kSort:
return convertSort(
std::static_pointer_cast<const P::Sort>(physical_plan));
case P::PhysicalType::kTableGenerator:
return convertTableGenerator(
std::static_pointer_cast<const P::TableGenerator>(physical_plan));
case P::PhysicalType::kTableReference:
return convertTableReference(
std::static_pointer_cast<const P::TableReference>(physical_plan));
case P::PhysicalType::kUpdateTable:
return convertUpdateTable(
std::static_pointer_cast<const P::UpdateTable>(physical_plan));
default:
LOG(FATAL) << "Unknown physical plan node "
<< physical_plan->getShortString();
}
}
std::string ExecutionGenerator::getNewRelationName() {
std::ostringstream out;
out << OptimizerContext::kInternalTemporaryRelationNamePrefix
<< optimizer_context_->query_id() << "_" << rel_id_;
++rel_id_;
return out.str();
}
void ExecutionGenerator::createTemporaryCatalogRelation(
const P::PhysicalPtr &physical,
const CatalogRelation **catalog_relation_output,
S::InsertDestination *insert_destination_proto) {
std::unique_ptr<CatalogRelation> catalog_relation(
new CatalogRelation(optimizer_context_->catalog_database(),
getNewRelationName(),
-1 /* id */,
true /* is_temporary*/));
attribute_id aid = 0;
for (const E::NamedExpressionPtr &project_expression :
physical->getOutputAttributes()) {
// The attribute name is simply set to the attribute id to make it distinct.
std::unique_ptr<CatalogAttribute> catalog_attribute(
new CatalogAttribute(catalog_relation.get(),
std::to_string(aid),
project_expression->getValueType(),
aid,
project_expression->attribute_alias()));
attribute_substitution_map_[project_expression->id()] =
catalog_attribute.get();
catalog_relation->addAttribute(catalog_attribute.release());
++aid;
}
*catalog_relation_output = catalog_relation.get();
const relation_id output_rel_id = optimizer_context_->catalog_database()->addRelation(
catalog_relation.release());
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_rel_id);
}
void ExecutionGenerator::dropAllTemporaryRelations() {
CatalogDatabase *catalog_database = optimizer_context_->catalog_database();
for (const CatalogRelationInfo &temporary_relation_info :
temporary_relation_info_vec_) {
DCHECK_EQ(temporary_relation_info.relation->size_blocks(), 0u);
catalog_database->dropRelationById(temporary_relation_info.relation->getID());
}
}
void ExecutionGenerator::convertNamedExpressions(
const std::vector<E::NamedExpressionPtr> &named_expressions,
S::QueryContext::ScalarGroup *scalar_group_proto) {
for (const E::NamedExpressionPtr &project_expression : named_expressions) {
unique_ptr<const Scalar> execution_scalar;
E::AliasPtr alias;
if (E::SomeAlias::MatchesWithConditionalCast(project_expression, &alias)) {
E::ScalarPtr scalar;
// We have not added aggregate expressions yet,
// so all child expressions of an Alias should be a Scalar.
CHECK(E::SomeScalar::MatchesWithConditionalCast(alias->expression(), &scalar))
<< alias->toString();
execution_scalar.reset(scalar->concretize(attribute_substitution_map_));
} else {
execution_scalar.reset(project_expression->concretize(attribute_substitution_map_));
}
scalar_group_proto->add_scalars()->CopyFrom(execution_scalar->getProto());
}
}
Predicate* ExecutionGenerator::convertPredicate(
const expressions::PredicatePtr &optimizer_predicate) const {
return optimizer_predicate->concretize(attribute_substitution_map_);
}
void ExecutionGenerator::convertTableReference(
const P::TableReferencePtr &physical_table_reference) {
// TableReference is not converted to an execution operator;
// instead it just provides CatalogRelation info for its
// parent (e.g. the substitution map from an AttributeReference
// to a CatalogAttribute).
const CatalogRelation *catalog_relation = physical_table_reference->relation();
const std::vector<E::AttributeReferencePtr> &attribute_references =
physical_table_reference->attribute_list();
DCHECK_EQ(attribute_references.size(), catalog_relation->size());
for (CatalogRelation::size_type i = 0; i < catalog_relation->size(); ++i) {
attribute_substitution_map_.emplace(attribute_references[i]->id(),
catalog_relation->getAttributeById(i));
}
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_table_reference),
std::forward_as_tuple(CatalogRelationInfo::kInvalidOperatorIndex,
catalog_relation));
}
void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto =
query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_sample,
&output_relation,
insert_destination_proto);
// Create and add a Sample operator.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_sample->input());
DCHECK(input_relation_info != nullptr);
SampleOperator *sample_op = new SampleOperator(*input_relation_info->relation,
*output_relation,
insert_destination_index,
input_relation_info->isStoredRelation(),
physical_sample->is_block_sample(),
physical_sample->percentage());
const QueryPlan::DAGNodeIndex sample_index =
execution_plan_->addRelationalOperator(sample_op);
insert_destination_proto->set_relational_op_index(sample_index);
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(sample_index,
input_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_sample),
std::forward_as_tuple(sample_index,
output_relation));
temporary_relation_info_vec_.emplace_back(sample_index, output_relation);
}
bool ExecutionGenerator::convertSimpleProjection(
const QueryContext::scalar_group_id project_expressions_group_index,
std::vector<attribute_id> *attributes) const {
const S::QueryContext::ScalarGroup &scalar_group_proto =
query_context_proto_->scalar_groups(project_expressions_group_index);
for (int i = 0; i < scalar_group_proto.scalars_size(); ++i) {
if (scalar_group_proto.scalars(i).data_source() != S::Scalar::ATTRIBUTE) {
return false;
}
}
for (int i = 0; i < scalar_group_proto.scalars_size(); ++i) {
attributes->push_back(
scalar_group_proto.scalars(i).GetExtension(S::ScalarAttribute::attribute_id));
}
return true;
}
void ExecutionGenerator::convertSelection(
const P::SelectionPtr &physical_selection) {
// Check if the Selection is only for renaming columns.
if (physical_selection->filter_predicate() == nullptr) {
const std::vector<E::AttributeReferencePtr> input_attributes =
physical_selection->input()->getOutputAttributes();
const std::vector<E::NamedExpressionPtr> &project_expressions =
physical_selection->project_expressions();
if (project_expressions.size() == input_attributes.size()) {
bool has_different_attrs = false;
for (std::size_t attr_idx = 0; attr_idx < input_attributes.size(); ++attr_idx) {
if (project_expressions[attr_idx]->id() != input_attributes[attr_idx]->id()) {
has_different_attrs = true;
break;
}
}
if (!has_different_attrs) {
const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator input_catalog_rel_it =
physical_to_output_relation_map_.find(physical_selection->input());
DCHECK(input_catalog_rel_it != physical_to_output_relation_map_.end());
if (!input_catalog_rel_it->second.isStoredRelation()) {
CatalogRelation *catalog_relation =
const_cast<CatalogRelation*>(input_catalog_rel_it->second.relation);
for (std::size_t attr_idx = 0; attr_idx < project_expressions.size(); ++attr_idx) {
CatalogAttribute *catalog_attribute =
catalog_relation->getAttributeByIdMutable(attr_idx);
DCHECK(catalog_attribute != nullptr);
catalog_attribute->setDisplayName(
project_expressions[attr_idx]->attribute_alias());
}
physical_to_output_relation_map_.emplace(physical_selection,
input_catalog_rel_it->second);
return;
}
}
}
}
// Convert the project expressions proto.
const QueryContext::scalar_group_id project_expressions_group_index =
query_context_proto_->scalar_groups_size();
convertNamedExpressions(physical_selection->project_expressions(),
query_context_proto_->add_scalar_groups());
// Convert the predicate proto.
QueryContext::predicate_id execution_predicate_index = QueryContext::kInvalidPredicateId;
if (physical_selection->filter_predicate()) {
execution_predicate_index = query_context_proto_->predicates_size();
unique_ptr<const Predicate> execution_predicate(convertPredicate(physical_selection->filter_predicate()));
query_context_proto_->add_predicates()->CopyFrom(execution_predicate->getProto());
}
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_selection,
&output_relation,
insert_destination_proto);
// Create and add a Select operator.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_selection->input());
DCHECK(input_relation_info != nullptr);
// Use the "simple" form of the selection operator (a pure projection that
// doesn't require any expression evaluation or intermediate copies) if
// possible.
std::unique_ptr<std::vector<attribute_id>> attributes(new std::vector<attribute_id>());
SelectOperator *op
= convertSimpleProjection(project_expressions_group_index, attributes.get())
? new SelectOperator(*input_relation_info->relation,
*output_relation,
insert_destination_index,
execution_predicate_index,
attributes.release(),
input_relation_info->isStoredRelation())
: new SelectOperator(*input_relation_info->relation,
*output_relation,
insert_destination_index,
execution_predicate_index,
project_expressions_group_index,
input_relation_info->isStoredRelation());
const QueryPlan::DAGNodeIndex select_index =
execution_plan_->addRelationalOperator(op);
insert_destination_proto->set_relational_op_index(select_index);
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(select_index,
input_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_selection),
std::forward_as_tuple(select_index,
output_relation));
temporary_relation_info_vec_.emplace_back(select_index, output_relation);
}
void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan) {
const std::unordered_map<physical::PhysicalPtr, CatalogRelationInfo>::const_iterator found_it =
physical_to_output_relation_map_.find(
top_level_physical_plan_->shared_subplan_at(physical_plan->subplan_id()));
if (found_it != physical_to_output_relation_map_.end()) {
physical_to_output_relation_map_.emplace(physical_plan, found_it->second);
}
}
void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
// HashJoin is converted to three operators:
// BuildHash, HashJoin, DestroyHash. The second is the primary operator.
P::PhysicalPtr probe_physical = physical_plan->left();
P::PhysicalPtr build_physical = physical_plan->right();
std::vector<attribute_id> probe_attribute_ids;
std::vector<attribute_id> build_attribute_ids;
bool any_probe_attributes_nullable = false;
bool any_build_attributes_nullable = false;
const std::vector<E::AttributeReferencePtr> &left_join_attributes =
physical_plan->left_join_attributes();
for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
const CatalogAttribute *probe_catalog_attribute
= attribute_substitution_map_[left_join_attribute->id()];
probe_attribute_ids.emplace_back(probe_catalog_attribute->getID());
if (probe_catalog_attribute->getType().isNullable()) {
any_probe_attributes_nullable = true;
}
}
const std::vector<E::AttributeReferencePtr> &right_join_attributes =
physical_plan->right_join_attributes();
for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
const CatalogAttribute *build_catalog_attribute
= attribute_substitution_map_[right_join_attribute->id()];
build_attribute_ids.emplace_back(build_catalog_attribute->getID());
if (build_catalog_attribute->getType().isNullable()) {
any_build_attributes_nullable = true;
}
}
// Remember key types for call to SimplifyHashTableImplTypeProto() below.
std::vector<const Type*> key_types;
for (std::vector<E::AttributeReferencePtr>::size_type attr_idx = 0;
attr_idx < left_join_attributes.size();
++attr_idx) {
const Type &left_attribute_type = left_join_attributes[attr_idx]->getValueType();
const Type &right_attribute_type = right_join_attributes[attr_idx]->getValueType();
if (left_attribute_type.getTypeID() != right_attribute_type.getTypeID()) {
THROW_SQL_ERROR() << "Equality join predicate between two attributes of different types "
"is not allowed in HashJoin";
}
key_types.push_back(&left_attribute_type);
}
// Choose the smaller table as the inner build table,
// and the other one as the outer probe table.
std::size_t probe_cardinality = cost_model_->estimateCardinality(probe_physical);
std::size_t build_cardinality = cost_model_->estimateCardinality(build_physical);
if (probe_cardinality < build_cardinality) {
// Switch the probe and build physical nodes.
std::swap(probe_physical, build_physical);
std::swap(probe_cardinality, build_cardinality);
std::swap(probe_attribute_ids, build_attribute_ids);
std::swap(any_probe_attributes_nullable, any_build_attributes_nullable);
}
// Convert the residual predicate proto.
QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId;
if (physical_plan->residual_predicate()) {
residual_predicate_index = query_context_proto_->predicates_size();
unique_ptr<const Predicate> residual_predicate(convertPredicate(physical_plan->residual_predicate()));
query_context_proto_->add_predicates()->CopyFrom(residual_predicate->getProto());
}
// Convert the project expressions proto.
const QueryContext::scalar_group_id project_expressions_group_index =
query_context_proto_->scalar_groups_size();
convertNamedExpressions(physical_plan->project_expressions(),
query_context_proto_->add_scalar_groups());
const CatalogRelationInfo *build_relation_info =
findRelationInfoOutputByPhysical(build_physical);
const CatalogRelationInfo *probe_operator_info =
findRelationInfoOutputByPhysical(probe_physical);
// FIXME(quickstep-team): Add support for self-join.
if (build_relation_info->relation == probe_operator_info->relation) {
THROW_SQL_ERROR() << "Self-join is not supported";
}
// Create join hash table proto.
const QueryContext::join_hash_table_id join_hash_table_index =
query_context_proto_->join_hash_tables_size();
S::HashTable *hash_table_proto = query_context_proto_->add_join_hash_tables();
// SimplifyHashTableImplTypeProto() switches the hash table implementation
// from SeparateChaining to SimpleScalarSeparateChaining when there is a
// single scalar key type with a reversible hash function.
hash_table_proto->set_hash_table_impl_type(
SimplifyHashTableImplTypeProto(
HashTableImplTypeProtoFromString(FLAGS_join_hashtable_type),
key_types));
const CatalogRelationSchema *build_relation = build_relation_info->relation;
for (const attribute_id build_attribute : build_attribute_ids) {
hash_table_proto->add_key_types()->CopyFrom(
build_relation->getAttributeById(build_attribute)->getType().getProto());
}
hash_table_proto->set_estimated_num_entries(build_cardinality);
// Create three operators.
const QueryPlan::DAGNodeIndex build_operator_index =
execution_plan_->addRelationalOperator(
new BuildHashOperator(
*build_relation_info->relation,
build_relation_info->isStoredRelation(),
build_attribute_ids,
any_build_attributes_nullable,
join_hash_table_index));
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_plan,
&output_relation,
insert_destination_proto);
const QueryPlan::DAGNodeIndex join_operator_index =
execution_plan_->addRelationalOperator(
new HashJoinOperator(
*build_relation_info->relation,
*probe_operator_info->relation,
probe_operator_info->isStoredRelation(),
probe_attribute_ids,
any_probe_attributes_nullable,
*output_relation,
insert_destination_index,
join_hash_table_index,
residual_predicate_index,
project_expressions_group_index));
insert_destination_proto->set_relational_op_index(join_operator_index);
const QueryPlan::DAGNodeIndex destroy_operator_index =
execution_plan_->addRelationalOperator(
new DestroyHashOperator(join_hash_table_index));
if (!build_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(build_operator_index,
build_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
// Add the dependency for the producer operator of the build relation
// to prevent the build relation from being destroyed until after the join
// is complete (see QueryPlan::addDependenciesForDropOperator(), which
// makes the drop operator for the temporary relation dependent on all its
// consumers having finished).
execution_plan_->addDirectDependency(join_operator_index,
build_relation_info->producer_operator_index,
true /* is_pipeline_breaker */);
}
if (!probe_operator_info->isStoredRelation()) {
execution_plan_->addDirectDependency(join_operator_index,
probe_operator_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
execution_plan_->addDirectDependency(join_operator_index,
build_operator_index,
true /* is_pipeline_breaker */);
execution_plan_->addDirectDependency(destroy_operator_index,
join_operator_index,
true /* is_pipeline_breaker */);
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
std::forward_as_tuple(join_operator_index,
output_relation));
temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
}
void ExecutionGenerator::convertNestedLoopsJoin(
const P::NestedLoopsJoinPtr &physical_plan) {
// NestedLoopsJoin is converted to a NestedLoopsJoin operator.
// Convert the join predicate proto.
const QueryContext::predicate_id execution_join_predicate_index = query_context_proto_->predicates_size();
if (physical_plan->join_predicate()) {
unique_ptr<const Predicate> execution_join_predicate(convertPredicate(physical_plan->join_predicate()));
query_context_proto_->add_predicates()->CopyFrom(execution_join_predicate->getProto());
} else {
query_context_proto_->add_predicates()->set_predicate_type(S::Predicate::TRUE);
}
// Convert the project expressions proto.
const QueryContext::scalar_group_id project_expressions_group_index =
query_context_proto_->scalar_groups_size();
convertNamedExpressions(physical_plan->project_expressions(),
query_context_proto_->add_scalar_groups());
const CatalogRelationInfo *left_relation_info =
findRelationInfoOutputByPhysical(physical_plan->left());
const CatalogRelationInfo *right_relation_info =
findRelationInfoOutputByPhysical(physical_plan->right());
// FIXME(quickstep-team): Add support for self-join.
if (left_relation_info->relation == right_relation_info->relation) {
THROW_SQL_ERROR() << "NestedLoopsJoin does not support self-join yet";
}
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_plan,
&output_relation,
insert_destination_proto);
// Create and add a NestedLoopsJoin operator.
const QueryPlan::DAGNodeIndex join_operator_index =
execution_plan_->addRelationalOperator(new NestedLoopsJoinOperator(
*left_relation_info->relation,
*right_relation_info->relation,
*output_relation,
insert_destination_index,
execution_join_predicate_index,
project_expressions_group_index,
left_relation_info->isStoredRelation(),
right_relation_info->isStoredRelation()));
insert_destination_proto->set_relational_op_index(join_operator_index);
if (!left_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(join_operator_index,
left_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
if (!right_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(join_operator_index,
right_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
std::forward_as_tuple(join_operator_index,
output_relation));
temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
}
void ExecutionGenerator::convertCopyFrom(
const P::CopyFromPtr &physical_plan) {
// CopyFrom is converted to a TextScan and a SaveBlocks.
const CatalogRelation *output_relation = physical_plan->catalog_relation();
// Create InsertDestination proto.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(output_relation->getID());
insert_destination_proto->mutable_layout()->MergeFrom(
output_relation->getDefaultStorageBlockLayout().getDescription());
const vector<block_id> blocks(physical_plan->catalog_relation()->getBlocksSnapshot());
for (const block_id block : blocks) {
insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
}
const QueryPlan::DAGNodeIndex scan_operator_index =
execution_plan_->addRelationalOperator(
new TextScanOperator(
physical_plan->file_name(),
physical_plan->column_delimiter(),
physical_plan->escape_strings(),
FLAGS_parallelize_load,
*output_relation,
insert_destination_index));
insert_destination_proto->set_relational_op_index(scan_operator_index);
const QueryPlan::DAGNodeIndex save_blocks_operator_index =
execution_plan_->addRelationalOperator(
new SaveBlocksOperator());
execution_plan_->addDirectDependency(save_blocks_operator_index,
scan_operator_index,
false /* is_pipeline_breaker */);
}
void ExecutionGenerator::convertCreateIndex(
const P::CreateIndexPtr &physical_plan) {
// CreateIndex is converted to a CreateIndex operator.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
CatalogRelation *input_relation =
optimizer_context_->catalog_database()->getRelationByIdMutable(
input_relation_info->relation->getID());
// Check if any index with the specified name already exists.
if (input_relation->hasIndexWithName(physical_plan->index_name())) {
THROW_SQL_ERROR() << "The relation " << input_relation->getName()
<< " already has an index named "<< physical_plan->index_name();
}
DCHECK_GT(physical_plan->index_attributes().size(), 0u);
// Convert attribute references to a vector of pointers to catalog attributes.
std::vector<const CatalogAttribute*> index_attributes;
for (const E::AttributeReferencePtr &attribute : physical_plan->index_attributes()) {
const CatalogAttribute *catalog_attribute
= input_relation->getAttributeByName(attribute->attribute_name());
DCHECK(catalog_attribute != nullptr);
index_attributes.emplace_back(catalog_attribute);
}
// Create a copy of index description and add all the specified attributes to it.
IndexSubBlockDescription index_description(*physical_plan->index_description());
for (const CatalogAttribute* catalog_attribute : index_attributes) {
index_description.add_indexed_attribute_ids(catalog_attribute->getID());
}
if (input_relation->hasIndexWithDescription(index_description)) {
// Check if the given index description already exists in the relation.
THROW_SQL_ERROR() << "The relation " << input_relation->getName()
<< " already defines this index on the given attribute(s).";
}
if (!index_description.IsInitialized()) {
// Check if the given index description is valid.
THROW_SQL_ERROR() << "The index with given properties cannot be created.";
}
execution_plan_->addRelationalOperator(new CreateIndexOperator(input_relation,
physical_plan->index_name(),
std::move(index_description)));
}
void ExecutionGenerator::convertCreateTable(
const P::CreateTablePtr &physical_plan) {
// CreateTable is converted to a CreateTable operator.
std::unique_ptr<CatalogRelation> catalog_relation(new CatalogRelation(
optimizer_context_->catalog_database(),
physical_plan->relation_name(),
-1 /* id */,
false /* is_temporary*/));
attribute_id aid = 0;
for (const E::AttributeReferencePtr &attribute :
physical_plan->attributes()) {
std::unique_ptr<CatalogAttribute> catalog_attribute(new CatalogAttribute(
catalog_relation.get(),
attribute->attribute_name(),
attribute->getValueType(),
aid,
attribute->attribute_alias()));
catalog_relation->addAttribute(catalog_attribute.release());
++aid;
}
// If specified, set the physical block type as the users'. Otherwise,
// the system uses the default layout.
if (physical_plan->block_properties()) {
if (!StorageBlockLayout::DescriptionIsValid(*catalog_relation,
*physical_plan->block_properties())) {
THROW_SQL_ERROR() << "BLOCKPROPERTIES is invalid.";
}
std::unique_ptr<StorageBlockLayout> layout(
new StorageBlockLayout(*catalog_relation, *physical_plan->block_properties()));
layout->finalize();
catalog_relation->setDefaultStorageBlockLayout(layout.release());
}
execution_plan_->addRelationalOperator(
new CreateTableOperator(catalog_relation.release(),
optimizer_context_->catalog_database()));
}
void ExecutionGenerator::convertDeleteTuples(
const P::DeleteTuplesPtr &physical_plan) {
// If there is a selection predicate and the predicate value
// is not statically true, DeleteTuples is converted to
// a DeleteOperator and a SaveBlocksOperator; if there is not
// a selection predicate or the predicate value is statically true,
// it is converted to a DropTableOperator; otherwise, the predicate
// value is statically false, so no operator needs to be created.
unique_ptr<const Predicate> execution_predicate;
if (physical_plan->predicate()) {
execution_predicate.reset(convertPredicate(physical_plan->predicate()));
}
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
DCHECK(input_relation_info != nullptr);
if (execution_predicate == nullptr ||
(execution_predicate->hasStaticResult() &&
execution_predicate->getStaticResult())) {
const QueryPlan::DAGNodeIndex drop_table_index =
execution_plan_->addRelationalOperator(
new DropTableOperator(*input_relation_info->relation,
optimizer_context_->catalog_database(),
true /* only_drop_blocks */));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(drop_table_index,
input_relation_info->producer_operator_index,
true /* is_pipeline_breaker */);
}
} else if (!execution_predicate->hasStaticResult()) {
const QueryContext::predicate_id execution_predicate_index = query_context_proto_->predicates_size();
query_context_proto_->add_predicates()->CopyFrom(execution_predicate->getProto());
const QueryPlan::DAGNodeIndex delete_tuples_index =
execution_plan_->addRelationalOperator(new DeleteOperator(
*input_relation_info->relation,
execution_predicate_index,
input_relation_info->isStoredRelation()));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(delete_tuples_index,
input_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(
new SaveBlocksOperator());
execution_plan_->addDirectDependency(save_blocks_index,
delete_tuples_index,
false /* is_pipeline_breaker */);
}
}
void ExecutionGenerator::convertDropTable(
const P::DropTablePtr &physical_plan) {
// DropTable is converted to a DropTable operator.
execution_plan_->addRelationalOperator(
new DropTableOperator(*physical_plan->catalog_relation(),
optimizer_context_->catalog_database()));
}
void ExecutionGenerator::convertInsertTuple(
const P::InsertTuplePtr &physical_plan) {
// InsertTuple is converted to an Insert and a SaveBlocks.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
const CatalogRelation &input_relation =
*optimizer_context_->catalog_database()->getRelationById(
input_relation_info->relation->getID());
// Construct the tuple proto to be inserted.
const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size();
S::Tuple *tuple_proto = query_context_proto_->add_tuples();
for (const E::ScalarLiteralPtr &literal : physical_plan->column_values()) {
tuple_proto->add_attribute_values()->CopyFrom(literal->value().getProto());
}
// FIXME(qzeng): A better way is using a traits struct to look up whether a storage
// block supports ad-hoc insertion instead of hard-coding the block types.
const StorageBlockLayout &storage_block_layout =
input_relation.getDefaultStorageBlockLayout();
if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE ||
storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
<< input_relation.getName()
<< ", because its storage blocks do not support ad-hoc insertion";
}
// Create InsertDestination proto.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(input_relation.getID());
insert_destination_proto->mutable_layout()->MergeFrom(
input_relation.getDefaultStorageBlockLayout().getDescription());
const vector<block_id> blocks(input_relation.getBlocksSnapshot());
for (const block_id block : blocks) {
insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
}
const QueryPlan::DAGNodeIndex insert_operator_index =
execution_plan_->addRelationalOperator(
new InsertOperator(input_relation,
insert_destination_index,
tuple_index));
insert_destination_proto->set_relational_op_index(insert_operator_index);
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(
new SaveBlocksOperator());
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(insert_operator_index,
input_relation_info->producer_operator_index,
true /* is_pipeline_breaker */);
}
execution_plan_->addDirectDependency(save_blocks_index,
insert_operator_index,
false /* is_pipeline_breaker */);
}
void ExecutionGenerator::convertInsertSelection(
const P::InsertSelectionPtr &physical_plan) {
// InsertSelection is converted to a Select and a SaveBlocks.
const CatalogRelationInfo *destination_relation_info =
findRelationInfoOutputByPhysical(physical_plan->destination());
const CatalogRelation &destination_relation = *destination_relation_info->relation;
// FIXME(qzeng): A better way is using a traits struct to look up whether a storage
// block supports ad-hoc insertion instead of hard-coding the block types.
const StorageBlockLayout &storage_block_layout =
destination_relation.getDefaultStorageBlockLayout();
if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE
|| storage_block_layout.getDescription().tuple_store_description().sub_block_type() ==
TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) {
THROW_SQL_ERROR() << "INSERT statement is not supported for the relation "
<< destination_relation.getName()
<< ", because its storage blocks do not support ad-hoc insertion";
}
// Create InsertDestination proto.
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
insert_destination_proto->set_relation_id(destination_relation.getID());
insert_destination_proto->mutable_layout()->MergeFrom(
destination_relation.getDefaultStorageBlockLayout().getDescription());
const vector<block_id> blocks(destination_relation.getBlocksSnapshot());
for (const block_id block : blocks) {
insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block);
}
const CatalogRelationInfo *selection_relation_info =
findRelationInfoOutputByPhysical(physical_plan->selection());
// Prepare the attributes, which are output columns of the selection relation.
std::unique_ptr<std::vector<attribute_id>> attributes(new std::vector<attribute_id>());
for (E::AttributeReferencePtr attr_ref : physical_plan->selection()->getOutputAttributes()) {
unique_ptr<const Scalar> attribute(attr_ref->concretize(attribute_substitution_map_));
DCHECK_EQ(Scalar::kAttribute, attribute->getDataSource());
attributes->emplace_back(
static_cast<const ScalarAttribute*>(attribute.get())->getAttribute().getID());
}
// Create the select operator.
// TODO(jianqiao): This select operator is actually redundant. That is,
// we may directly set physical_plan_->selection()'s output relation to be
// destination_relation, instead of creating an intermediate selection_relation
// and then copy the data into destination_relation. One way to achieve this
// optimization is to enable specifying a specific output relation for each
// physical plan by modifying class Physical.
SelectOperator *insert_selection_op =
new SelectOperator(*selection_relation_info->relation,
destination_relation,
insert_destination_index,
QueryContext::kInvalidPredicateId,
attributes.release(),
selection_relation_info->isStoredRelation());
const QueryPlan::DAGNodeIndex insert_selection_index =
execution_plan_->addRelationalOperator(insert_selection_op);
insert_destination_proto->set_relational_op_index(insert_selection_index);
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(new SaveBlocksOperator());
if (!selection_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(insert_selection_index,
selection_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
execution_plan_->addDirectDependency(save_blocks_index,
insert_selection_index,
false /* is_pipeline_breaker */);
}
void ExecutionGenerator::convertUpdateTable(
const P::UpdateTablePtr &physical_plan) {
// UpdateTable is converted to an Update and a SaveBlocks.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
DCHECK(input_relation_info != nullptr);
const relation_id input_rel_id = input_relation_info->relation->getID();
// Create InsertDestination proto.
const QueryContext::insert_destination_id relocation_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *relocation_destination_proto = query_context_proto_->add_insert_destinations();
relocation_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
relocation_destination_proto->set_relation_id(input_rel_id);
// Convert the predicate proto.
QueryContext::predicate_id execution_predicate_index = QueryContext::kInvalidPredicateId;
if (physical_plan->predicate()) {
execution_predicate_index = query_context_proto_->predicates_size();
unique_ptr<const Predicate> execution_predicate(convertPredicate(physical_plan->predicate()));
query_context_proto_->add_predicates()->CopyFrom(execution_predicate->getProto());
}
// Convert assignment expressions as a UpdateGroup proto.
const vector<E::AttributeReferencePtr> &assignees = physical_plan->assignees();
const vector<E::ScalarPtr> &assignment_expressions = physical_plan->assignment_expressions();
DCHECK_EQ(assignees.size(), assignment_expressions.size())
<< physical_plan->toString();
const QueryContext::update_group_id update_group_index = query_context_proto_->update_groups_size();
S::QueryContext::UpdateGroup *update_group_proto = query_context_proto_->add_update_groups();
update_group_proto->set_relation_id(input_rel_id);
for (vector<E::AttributeReferencePtr>::size_type i = 0; i < assignees.size(); ++i) {
unique_ptr<const Scalar> attribute(
assignees[i]->concretize(attribute_substitution_map_));
DCHECK_EQ(Scalar::kAttribute, attribute->getDataSource())
<< assignees[i]->toString();
S::QueryContext::UpdateGroup::UpdateAssignment *update_assignment_proto =
update_group_proto->add_update_assignments();
update_assignment_proto->set_attribute_id(
static_cast<const ScalarAttribute*>(attribute.get())->getAttribute().getID());
unique_ptr<const Scalar> value(
assignment_expressions[i]->concretize(attribute_substitution_map_));
update_assignment_proto->mutable_scalar()->CopyFrom(value->getProto());
}
const QueryPlan::DAGNodeIndex update_operator_index =
execution_plan_->addRelationalOperator(
new UpdateOperator(
*optimizer_context_->catalog_database()->getRelationById(input_rel_id),
relocation_destination_index,
execution_predicate_index,
update_group_index));
relocation_destination_proto->set_relational_op_index(update_operator_index);
const QueryPlan::DAGNodeIndex save_blocks_index =
execution_plan_->addRelationalOperator(
new SaveBlocksOperator());
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(update_operator_index,
input_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
execution_plan_->addDirectDependency(save_blocks_index,
update_operator_index,
false /* is_pipeline_breaker */);
}
void ExecutionGenerator::convertAggregate(
const P::AggregatePtr &physical_plan) {
// Create aggr state proto.
const QueryContext::aggregation_state_id aggr_state_index =
query_context_proto_->aggregation_states_size();
S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states();
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
const E::AggregateFunctionPtr unnamed_aggregate_expression =
std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
// Add a new entry in 'aggregates'.
S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
// Set the AggregateFunction.
aggr_proto->mutable_function()->CopyFrom(
unnamed_aggregate_expression->getAggregate().getProto());
// Add each of the aggregate's arguments.
for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
}
// Set whether it is a DISTINCT aggregation.
aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct());
}
std::vector<const Type*> group_by_types;
for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
unique_ptr<const Scalar> execution_group_by_expression;
E::AliasPtr alias;
if (E::SomeAlias::MatchesWithConditionalCast(grouping_expression, &alias)) {
E::ScalarPtr scalar;
// NOTE(zuyu): For aggregate expressions, all child expressions of an
// Alias should be a Scalar.
CHECK(E::SomeScalar::MatchesWithConditionalCast(alias->expression(), &scalar))
<< alias->toString();
execution_group_by_expression.reset(scalar->concretize(attribute_substitution_map_));
} else {
execution_group_by_expression.reset(
grouping_expression->concretize(attribute_substitution_map_));
}
aggr_state_proto->add_group_by_expressions()->CopyFrom(execution_group_by_expression->getProto());
group_by_types.push_back(&execution_group_by_expression->getType());
}
if (physical_plan->filter_predicate() != nullptr) {
unique_ptr<const Predicate> predicate(convertPredicate(physical_plan->filter_predicate()));
aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
}
aggr_state_proto->set_estimated_num_entries(cost_model_->estimateCardinality(physical_plan));
if (!group_by_types.empty()) {
// SimplifyHashTableImplTypeProto() switches the hash table implementation
// from SeparateChaining to SimpleScalarSeparateChaining when there is a
// single scalar key type with a reversible hash function.
aggr_state_proto->set_hash_table_impl_type(
SimplifyHashTableImplTypeProto(
HashTableImplTypeProtoFromString(FLAGS_aggregate_hashtable_type),
group_by_types));
}
const QueryPlan::DAGNodeIndex aggregation_operator_index =
execution_plan_->addRelationalOperator(
new AggregationOperator(
*input_relation_info->relation,
input_relation_info->isStoredRelation(),
aggr_state_index));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(aggregation_operator_index,
input_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_plan,
&output_relation,
insert_destination_proto);
const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new FinalizeAggregationOperator(aggr_state_index,
*output_relation,
insert_destination_index));
insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index);
execution_plan_->addDirectDependency(finalize_aggregation_operator_index,
aggregation_operator_index,
true /* is_pipeline_breaker */);
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_plan),
std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
output_relation);
}
void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
// Create sort configuration for run generation.
vector<bool> sort_ordering(physical_sort->sort_ascending());
vector<bool> sort_null_ordering(physical_sort->nulls_first_flags());
PtrVector<Scalar> sort_run_gen_attributes;
for (const E::AttributeReferencePtr &sort_attribute :
physical_sort->sort_attributes()) {
sort_run_gen_attributes.push_back(
sort_attribute->concretize(attribute_substitution_map_));
}
const SortConfiguration sort_run_gen_config(sort_run_gen_attributes,
std::move(sort_ordering),
std::move(sort_null_ordering));
const QueryContext::sort_config_id sort_run_gen_config_id =
query_context_proto_->sort_configs_size();
S::SortConfiguration *sort_run_gen_config_proto =
query_context_proto_->add_sort_configs();
sort_run_gen_config_proto->CopyFrom(sort_run_gen_config.getProto());
// Create SortRunGenerationOperator.
const CatalogRelation *initial_runs_relation;
const QueryContext::insert_destination_id initial_runs_destination_id =
query_context_proto_->insert_destinations_size();
S::InsertDestination *initial_runs_destination_proto =
query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(
physical_sort, &initial_runs_relation, initial_runs_destination_proto);
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_sort->input());
const QueryPlan::DAGNodeIndex run_generator_index =
execution_plan_->addRelationalOperator(
new SortRunGenerationOperator(*input_relation_info->relation,
*initial_runs_relation,
initial_runs_destination_id,
sort_run_gen_config_id,
input_relation_info->isStoredRelation()));
if (!input_relation_info->isStoredRelation()) {
execution_plan_->addDirectDependency(run_generator_index,
input_relation_info->producer_operator_index,
false /* is_pipeline_breaker */);
}
temporary_relation_info_vec_.emplace_back(run_generator_index,
initial_runs_relation);
initial_runs_destination_proto->set_relational_op_index(run_generator_index);
// Create sort configuration for run merging.
sort_ordering = physical_sort->sort_ascending();
sort_null_ordering = physical_sort->nulls_first_flags();
PtrVector<Scalar> sort_merge_run_attributes;
for (const E::AttributeReferencePtr &sort_attribute :
physical_sort->sort_attributes()) {
sort_merge_run_attributes.push_back(
sort_attribute->concretize(attribute_substitution_map_));
}
const SortConfiguration sort_merge_run_config(sort_merge_run_attributes,
std::move(sort_ordering),
std::move(sort_null_ordering));
const QueryContext::sort_config_id sort_merge_run_config_id =
query_context_proto_->sort_configs_size();
S::SortConfiguration *sort_merge_run_config_proto =
query_context_proto_->add_sort_configs();
sort_merge_run_config_proto->CopyFrom(sort_merge_run_config.getProto());
// Create SortMergeRunOperator.
const CatalogRelation *merged_runs_relation;
const QueryContext::insert_destination_id merged_runs_destination_id =
query_context_proto_->insert_destinations_size();
S::InsertDestination *merged_runs_destination_proto =
query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_sort,
&merged_runs_relation,
merged_runs_destination_proto);
const CatalogRelation *sorted_relation;
const QueryContext::insert_destination_id sorted_output_destination_id =
query_context_proto_->insert_destinations_size();
S::InsertDestination *sorted_output_destination_proto =
query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_sort,
&sorted_relation,
sorted_output_destination_proto);
// TODO(qzeng): Make the merge factor configurable.
const QueryPlan::DAGNodeIndex merge_run_operator_index =
execution_plan_->addRelationalOperator(
new SortMergeRunOperator(*initial_runs_relation,
*sorted_relation,
sorted_output_destination_id,
*merged_runs_relation,
merged_runs_destination_id,
sort_merge_run_config_id,
64 /* merge_factor */,
physical_sort->limit(),
false /* input_relation_is_stored */));
execution_plan_->addDirectDependency(merge_run_operator_index,
run_generator_index,
false /* is_pipeline_breaker */);
merged_runs_destination_proto->set_relational_op_index(merge_run_operator_index);
sorted_output_destination_proto->set_relational_op_index(merge_run_operator_index);
// Do not add merged_runs_relation into 'temporary_relation_info_vec_'
// and create the DropTableOperator for it at the end. Instead, add the drop
// operator right here, because the relation won't be used by any other operator.
const QueryPlan::DAGNodeIndex drop_merged_runs_index =
execution_plan_->addRelationalOperator(
new DropTableOperator(
*merged_runs_relation,
optimizer_context_->catalog_database(),
false /* only_drop_blocks */));
execution_plan_->addDirectDependency(
drop_merged_runs_index,
merge_run_operator_index,
true /* is_pipeline_breaker */);
temporary_relation_info_vec_.emplace_back(merge_run_operator_index,
sorted_relation);
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_sort),
std::forward_as_tuple(merge_run_operator_index,
sorted_relation));
}
void ExecutionGenerator::convertTableGenerator(
const P::TableGeneratorPtr &physical_tablegen) {
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
query_context_proto_->insert_destinations_size();
S::InsertDestination *insert_destination_proto =
query_context_proto_->add_insert_destinations();
createTemporaryCatalogRelation(physical_tablegen,
&output_relation,
insert_destination_proto);
// Create GeneratorFunctionHandle proto
const QueryContext::generator_function_id generator_function_index =
query_context_proto_->generator_functions_size();
query_context_proto_->add_generator_functions()->CopyFrom(
physical_tablegen->generator_function_handle()->getProto());
TableGeneratorOperator *op =
new TableGeneratorOperator(*output_relation,
insert_destination_index,
generator_function_index);
const QueryPlan::DAGNodeIndex tablegen_index =
execution_plan_->addRelationalOperator(op);
insert_destination_proto->set_relational_op_index(tablegen_index);
physical_to_output_relation_map_.emplace(
std::piecewise_construct,
std::forward_as_tuple(physical_tablegen),
std::forward_as_tuple(tablegen_index,
output_relation));
temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
}
} // namespace optimizer
} // namespace quickstep