| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #include "query_optimizer/ExecutionGenerator.hpp" |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstddef> |
| #include <functional> |
| #include <memory> |
| #include <numeric> |
| #include <sstream> |
| #include <string> |
| #include <type_traits> |
| #include <unordered_map> |
| |
| #include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| #include <unordered_set> |
| #endif |
| |
| #include <utility> |
| #include <vector> |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| #include "catalog/Catalog.pb.h" |
| #endif |
| |
| #include "catalog/CatalogAttribute.hpp" |
| #include "catalog/CatalogDatabase.hpp" |
| #include "catalog/CatalogRelation.hpp" |
| #include "catalog/CatalogRelationSchema.hpp" |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "catalog/PartitionScheme.hpp" |
| #include "catalog/PartitionSchemeHeader.hpp" |
| #include "cli/Flags.hpp" |
| #include "expressions/Expressions.pb.h" |
| #include "expressions/aggregation/AggregateFunction.hpp" |
| #include "expressions/aggregation/AggregateFunction.pb.h" |
| #include "expressions/aggregation/AggregationID.hpp" |
| #include "expressions/predicate/Predicate.hpp" |
| #include "expressions/scalar/Scalar.hpp" |
| #include "expressions/scalar/ScalarAttribute.hpp" |
| #include "expressions/window_aggregation/WindowAggregateFunction.hpp" |
| #include "expressions/window_aggregation/WindowAggregateFunction.pb.h" |
| #include "query_execution/QueryContext.hpp" |
| #include "query_execution/QueryContext.pb.h" |
| #include "query_optimizer/LIPFilterGenerator.hpp" |
| #include "query_optimizer/OptimizerContext.hpp" |
| #include "query_optimizer/QueryHandle.hpp" |
| #include "query_optimizer/QueryPlan.hpp" |
| #include "query_optimizer/cost_model/SimpleCostModel.hpp" |
| #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" |
| #include "query_optimizer/expressions/AggregateFunction.hpp" |
| #include "query_optimizer/expressions/Alias.hpp" |
| #include "query_optimizer/expressions/AttributeReference.hpp" |
| #include "query_optimizer/expressions/ComparisonExpression.hpp" |
| #include "query_optimizer/expressions/ExpressionType.hpp" |
| #include "query_optimizer/expressions/PatternMatcher.hpp" |
| #include "query_optimizer/expressions/Scalar.hpp" |
| #include "query_optimizer/expressions/ScalarLiteral.hpp" |
| #include "query_optimizer/expressions/WindowAggregateFunction.hpp" |
| #include "query_optimizer/physical/Aggregate.hpp" |
| #include "query_optimizer/physical/CopyFrom.hpp" |
| #include "query_optimizer/physical/CopyTo.hpp" |
| #include "query_optimizer/physical/CreateIndex.hpp" |
| #include "query_optimizer/physical/CreateTable.hpp" |
| #include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" |
| #include "query_optimizer/physical/DeleteTuples.hpp" |
| #include "query_optimizer/physical/DropTable.hpp" |
| #include "query_optimizer/physical/FilterJoin.hpp" |
| #include "query_optimizer/physical/HashJoin.hpp" |
| #include "query_optimizer/physical/InsertSelection.hpp" |
| #include "query_optimizer/physical/InsertTuple.hpp" |
| #include "query_optimizer/physical/LIPFilterConfiguration.hpp" |
| #include "query_optimizer/physical/NestedLoopsJoin.hpp" |
| #include "query_optimizer/physical/PartitionSchemeHeader.hpp" |
| #include "query_optimizer/physical/PatternMatcher.hpp" |
| #include "query_optimizer/physical/Physical.hpp" |
| #include "query_optimizer/physical/PhysicalType.hpp" |
| #include "query_optimizer/physical/Sample.hpp" |
| #include "query_optimizer/physical/Selection.hpp" |
| #include "query_optimizer/physical/SharedSubplanReference.hpp" |
| #include "query_optimizer/physical/Sort.hpp" |
| #include "query_optimizer/physical/TableGenerator.hpp" |
| #include "query_optimizer/physical/TableReference.hpp" |
| #include "query_optimizer/physical/TopLevelPlan.hpp" |
| #include "query_optimizer/physical/UnionAll.hpp" |
| #include "query_optimizer/physical/UpdateTable.hpp" |
| #include "query_optimizer/physical/WindowAggregate.hpp" |
| #include "relational_operators/AggregationOperator.hpp" |
| #include "relational_operators/BuildAggregationExistenceMapOperator.hpp" |
| #include "relational_operators/BuildHashOperator.hpp" |
| #include "relational_operators/BuildLIPFilterOperator.hpp" |
| #include "relational_operators/CreateIndexOperator.hpp" |
| #include "relational_operators/CreateTableOperator.hpp" |
| #include "relational_operators/DeleteOperator.hpp" |
| #include "relational_operators/DestroyAggregationStateOperator.hpp" |
| #include "relational_operators/DestroyHashOperator.hpp" |
| #include "relational_operators/DropTableOperator.hpp" |
| #include "relational_operators/FinalizeAggregationOperator.hpp" |
| #include "relational_operators/HashJoinOperator.hpp" |
| #include "relational_operators/InitializeAggregationOperator.hpp" |
| #include "relational_operators/InsertOperator.hpp" |
| #include "relational_operators/NestedLoopsJoinOperator.hpp" |
| #include "relational_operators/RelationalOperator.hpp" |
| #include "relational_operators/SampleOperator.hpp" |
| #include "relational_operators/SaveBlocksOperator.hpp" |
| #include "relational_operators/SelectOperator.hpp" |
| #include "relational_operators/SortMergeRunOperator.hpp" |
| #include "relational_operators/SortRunGenerationOperator.hpp" |
| #include "relational_operators/TableExportOperator.hpp" |
| #include "relational_operators/TableGeneratorOperator.hpp" |
| #include "relational_operators/TextScanOperator.hpp" |
| #include "relational_operators/UnionAllOperator.hpp" |
| #include "relational_operators/UpdateOperator.hpp" |
| #include "relational_operators/WindowAggregationOperator.hpp" |
| #include "storage/AggregationOperationState.pb.h" |
| #include "storage/HashTable.pb.h" |
| #include "storage/HashTableFactory.hpp" |
| #include "storage/InsertDestination.pb.h" |
| #include "storage/StorageBlockLayout.hpp" |
| #include "storage/StorageBlockLayout.pb.h" |
| #include "storage/StorageConstants.hpp" |
| #include "storage/SubBlockTypeRegistry.hpp" |
| #include "types/Type.hpp" |
| #include "types/Type.pb.h" |
| #include "types/TypedValue.hpp" |
| #include "types/TypedValue.pb.h" |
| #include "types/containers/Tuple.pb.h" |
| #include "utility/BarrieredReadWriteConcurrentBitVector.hpp" |
| #include "utility/SqlError.hpp" |
| |
| #include "gflags/gflags.h" |
| #include "glog/logging.h" |
| |
| using std::atomic; |
| using std::make_unique; |
| using std::move; |
| using std::pair; |
| using std::static_pointer_cast; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| |
| namespace quickstep { |
| namespace optimizer { |
| |
| DEFINE_string(join_hashtable_type, "SeparateChaining", |
| "HashTable implementation to use for hash joins (valid options " |
| "are SeparateChaining or LinearOpenAddressing)"); |
| static const volatile bool join_hashtable_type_dummy |
| = gflags::RegisterFlagValidator(&FLAGS_join_hashtable_type, |
| &ValidateHashTableImplTypeString); |
| |
| DEFINE_string(aggregate_hashtable_type, "SeparateChaining", |
| "HashTable implementation to use for aggregates with GROUP BY " |
| "(valid options are SeparateChaining or LinearOpenAddressing)"); |
| static const volatile bool aggregate_hashtable_type_dummy |
| = gflags::RegisterFlagValidator(&FLAGS_aggregate_hashtable_type, |
| &ValidateHashTableImplTypeString); |
| |
| DEFINE_bool(parallelize_load, true, "Parallelize loading data files."); |
| |
| static bool ValidateNumAggregationPartitions(const char *flagname, int value) { |
| return value > 0; |
| } |
| DEFINE_int32(num_aggregation_partitions, |
| 41, |
| "The number of partitions in PartitionedHashTablePool used for " |
| "performing the aggregation"); |
| static const volatile bool num_aggregation_partitions_dummy |
| = gflags::RegisterFlagValidator(&FLAGS_num_aggregation_partitions, &ValidateNumAggregationPartitions); |
| |
| DEFINE_uint64(partition_aggregation_num_groups_threshold, |
| 100000, |
| "The threshold used for deciding whether the aggregation is done " |
| "in a partitioned way or not"); |
| |
| namespace E = ::quickstep::optimizer::expressions; |
| namespace P = ::quickstep::optimizer::physical; |
| namespace S = ::quickstep::serialization; |
| |
| namespace { |
| |
| size_t CacheLineAlignedBytes(const size_t actual_bytes) { |
| return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes; |
| } |
| |
| size_t CalculateNumInitializationPartitionsForCollisionFreeVectorTable(const size_t memory_size) { |
| // At least 1 partition, at most (#workers * 2) partitions. |
| return std::max(1uL, std::min(memory_size / kCollisonFreeVectorInitBlobSize, |
| static_cast<size_t>(2 * FLAGS_num_workers))); |
| } |
| |
| void CalculateCollisionFreeAggregationInfo( |
| const size_t num_entries, const vector<pair<AggregationID, vector<const Type *>>> &group_by_aggrs_info, |
| S::CollisionFreeVectorInfo *collision_free_vector_info) { |
| size_t memory_size = CacheLineAlignedBytes( |
| BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries)); |
| |
| for (std::size_t i = 0; i < group_by_aggrs_info.size(); ++i) { |
| const auto &group_by_aggr_info = group_by_aggrs_info[i]; |
| |
| size_t state_size = 0; |
| switch (group_by_aggr_info.first) { |
| case AggregationID::kCount: { |
| state_size = sizeof(atomic<size_t>); |
| break; |
| } |
| case AggregationID::kSum: { |
| const vector<const Type *> &argument_types = group_by_aggr_info.second; |
| DCHECK_EQ(1u, argument_types.size()); |
| switch (argument_types.front()->getTypeID()) { |
| case TypeID::kInt: |
| case TypeID::kLong: |
| state_size = sizeof(atomic<std::int64_t>); |
| break; |
| case TypeID::kFloat: |
| case TypeID::kDouble: |
| state_size = sizeof(atomic<double>); |
| break; |
| default: |
| LOG(FATAL) << "No support by CollisionFreeVector"; |
| } |
| break; |
| } |
| default: |
| LOG(FATAL) << "No support by CollisionFreeVector"; |
| } |
| |
| collision_free_vector_info->add_state_offsets(memory_size); |
| memory_size += CacheLineAlignedBytes(state_size * num_entries); |
| } |
| |
| collision_free_vector_info->set_memory_size(memory_size); |
| collision_free_vector_info->set_num_init_partitions( |
| CalculateNumInitializationPartitionsForCollisionFreeVectorTable(memory_size)); |
| } |
| |
| size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) { |
| // Set finalization segment size as 4096 entries. |
| constexpr size_t kFinalizeSegmentSize = 4uL * 1024L; |
| |
| // At least 1 partition, at most (#workers * 2) partitions. |
| return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, |
| static_cast<size_t>(2 * FLAGS_num_workers))); |
| } |
| |
| bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions, |
| const std::vector<bool> &is_distincts, |
| const std::vector<attribute_id> &group_by_attrs, |
| const std::size_t estimated_num_groups) { |
| // If there's no aggregation, return false. |
| if (num_aggregate_functions == 0) { |
| return false; |
| } |
| // If there is only only aggregate function, we allow distinct aggregation. |
| // Otherwise it can't be partitioned with distinct aggregation. |
| if (num_aggregate_functions > 1) { |
| for (const bool distinct : is_distincts) { |
| if (distinct) { |
| return false; |
| } |
| } |
| } |
| // There's no distinct aggregation involved, Check if there's at least one |
| // GROUP BY operation. |
| if (group_by_attrs.empty()) { |
| return false; |
| } |
| |
| // Currently we require that all the group-by keys are ScalarAttributes for |
| // the convenient of implementing copy elision. |
| // TODO(jianqiao): relax this requirement. |
| for (const attribute_id group_by_attr : group_by_attrs) { |
| if (group_by_attr == kInvalidAttributeID) { |
| return false; |
| } |
| } |
| |
| // Currently we always use partitioned aggregation to parallelize distinct |
| // aggregation. |
| const bool all_distinct = std::accumulate(is_distincts.begin(), is_distincts.end(), |
| !is_distincts.empty(), std::logical_and<bool>()); |
| if (all_distinct) { |
| return true; |
| } |
| |
| // There are GROUP BYs without DISTINCT. Check if the estimated number of |
| // groups is large enough to warrant a partitioned aggregation. |
| return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold; |
| } |
| |
| } // namespace |
| |
| constexpr QueryPlan::DAGNodeIndex ExecutionGenerator::CatalogRelationInfo::kInvalidOperatorIndex; |
| |
| void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) { |
| CHECK(P::SomeTopLevelPlan::MatchesWithConditionalCast(physical_plan, &top_level_physical_plan_)) |
| << "The physical plan must be rooted by a TopLevelPlan"; |
| |
| cost_model_for_aggregation_.reset( |
| new cost::StarSchemaSimpleCostModel(top_level_physical_plan_->shared_subplans())); |
| cost_model_for_hash_join_.reset( |
| new cost::SimpleCostModel(top_level_physical_plan_->shared_subplans())); |
| |
| const auto &lip_filter_configuration = |
| top_level_physical_plan_->lip_filter_configuration(); |
| if (lip_filter_configuration != nullptr) { |
| lip_filter_generator_.reset(new LIPFilterGenerator(lip_filter_configuration)); |
| } |
| |
| const CatalogRelation *result_relation = nullptr; |
| |
| try { |
| for (const P::PhysicalPtr &shared_subplan : top_level_physical_plan_->shared_subplans()) { |
| generatePlanInternal(shared_subplan); |
| } |
| generatePlanInternal(top_level_physical_plan_->plan()); |
| |
| // Deploy LIPFilters if enabled. |
| if (lip_filter_generator_ != nullptr) { |
| lip_filter_generator_->deployLIPFilters(execution_plan_, query_context_proto_); |
| } |
| |
| // Set the query result relation if the input plan exists in physical_to_execution_map_, |
| // which indicates the plan is the result of a SELECT query. |
| const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator it = |
| physical_to_output_relation_map_.find(top_level_physical_plan_->plan()); |
| if (it != physical_to_output_relation_map_.end()) { |
| result_relation = it->second.relation; |
| } |
| |
| #ifdef QUICKSTEP_DEBUG |
| std::unordered_set<relation_id> temp_relations; |
| for (const CatalogRelationInfo &temporary_relation_info : temporary_relation_info_vec_) { |
| temp_relations.insert(temporary_relation_info.relation->getID()); |
| } |
| DCHECK_EQ(temporary_relation_info_vec_.size(), temp_relations.size()); |
| #endif |
| } catch (...) { |
| // Drop all temporary relations. |
| dropAllTemporaryRelations(); |
| throw; |
| } |
| |
| // Add one DropTableOperator per temporary relation, except for the result relation, if any. |
| // NOTE(zuyu): the Cli shell drops the result relation after printing, if enabled. |
| for (const CatalogRelationInfo &temporary_relation_info : temporary_relation_info_vec_) { |
| const CatalogRelation *temporary_relation = temporary_relation_info.relation; |
| if (temporary_relation == result_relation) { |
| query_handle_->setQueryResultRelation( |
| catalog_database_->getRelationByIdMutable(result_relation->getID())); |
| continue; |
| } |
| const QueryPlan::DAGNodeIndex drop_table_index = |
| execution_plan_->addRelationalOperator( |
| new DropTableOperator(query_handle_->query_id(), |
| *temporary_relation, |
| catalog_database_, |
| false /* only_drop_blocks */)); |
| DCHECK(!temporary_relation_info.isStoredRelation()); |
| execution_plan_->addDependenciesForDropOperator( |
| drop_table_index, |
| temporary_relation_info.producer_operator_index); |
| } |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| catalog_database_cache_proto_->set_name(catalog_database_->getName()); |
| |
| LOG(INFO) << "CatalogDatabaseCache proto has " << referenced_relation_ids_.size() << " relation(s)"; |
| for (const relation_id rel_id : referenced_relation_ids_) { |
| const CatalogRelationSchema &relation = |
| catalog_database_->getRelationSchemaById(rel_id); |
| LOG(INFO) << "RelationSchema " << rel_id |
| << ", name: " << relation.getName() |
| << ", " << relation.size() << " attribute(s)"; |
| catalog_database_cache_proto_->add_relations()->MergeFrom(relation.getProto()); |
| } |
| #endif |
| } |
| |
| void ExecutionGenerator::generatePlanInternal( |
| const P::PhysicalPtr &physical_plan) { |
| // Generate the execution plan in bottom-up. |
| for (const P::PhysicalPtr &child : physical_plan->children()) { |
| generatePlanInternal(child); |
| } |
| |
| // If enabled, collect attribute substitution map for LIPFilterGenerator. |
| if (lip_filter_generator_ != nullptr) { |
| lip_filter_generator_->registerAttributeMap(physical_plan, attribute_substitution_map_); |
| } |
| |
| switch (physical_plan->getPhysicalType()) { |
| case P::PhysicalType::kAggregate: |
| return convertAggregate( |
| std::static_pointer_cast<const P::Aggregate>(physical_plan)); |
| case P::PhysicalType::kCrossReferenceCoalesceAggregate: |
| return convertCrossReferenceCoalesceAggregate( |
| std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan)); |
| case P::PhysicalType::kCopyFrom: |
| return convertCopyFrom( |
| std::static_pointer_cast<const P::CopyFrom>(physical_plan)); |
| case P::PhysicalType::kCopyTo: |
| return convertCopyTo( |
| std::static_pointer_cast<const P::CopyTo>(physical_plan)); |
| case P::PhysicalType::kCreateIndex: |
| return convertCreateIndex( |
| std::static_pointer_cast<const P::CreateIndex>(physical_plan)); |
| case P::PhysicalType::kCreateTable: |
| return convertCreateTable( |
| std::static_pointer_cast<const P::CreateTable>(physical_plan)); |
| case P::PhysicalType::kDeleteTuples: |
| return convertDeleteTuples( |
| std::static_pointer_cast<const P::DeleteTuples>(physical_plan)); |
| case P::PhysicalType::kDropTable: |
| return convertDropTable( |
| std::static_pointer_cast<const P::DropTable>(physical_plan)); |
| case P::PhysicalType::kFilterJoin: |
| return convertFilterJoin( |
| std::static_pointer_cast<const P::FilterJoin>(physical_plan)); |
| case P::PhysicalType::kHashJoin: |
| return convertHashJoin( |
| std::static_pointer_cast<const P::HashJoin>(physical_plan)); |
| case P::PhysicalType::kInsertSelection: |
| return convertInsertSelection( |
| std::static_pointer_cast<const P::InsertSelection>(physical_plan)); |
| case P::PhysicalType::kInsertTuple: |
| return convertInsertTuple( |
| std::static_pointer_cast<const P::InsertTuple>(physical_plan)); |
| case P::PhysicalType::kNestedLoopsJoin: |
| return convertNestedLoopsJoin( |
| std::static_pointer_cast<const P::NestedLoopsJoin>(physical_plan)); |
| case P::PhysicalType::kSample: |
| return convertSample( |
| std::static_pointer_cast<const P::Sample>(physical_plan)); |
| case P::PhysicalType::kSelection: |
| return convertSelection( |
| std::static_pointer_cast<const P::Selection>(physical_plan)); |
| case P::PhysicalType::kSharedSubplanReference: |
| return convertSharedSubplanReference( |
| std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan)); |
| case P::PhysicalType::kSort: |
| return convertSort( |
| std::static_pointer_cast<const P::Sort>(physical_plan)); |
| case P::PhysicalType::kTableGenerator: |
| return convertTableGenerator( |
| std::static_pointer_cast<const P::TableGenerator>(physical_plan)); |
| case P::PhysicalType::kTableReference: |
| return convertTableReference( |
| std::static_pointer_cast<const P::TableReference>(physical_plan)); |
| case P::PhysicalType::kUnionAll: |
| return convertUnionAll( |
| std::static_pointer_cast<const P::UnionAll>(physical_plan)); |
| case P::PhysicalType::kUpdateTable: |
| return convertUpdateTable( |
| std::static_pointer_cast<const P::UpdateTable>(physical_plan)); |
| case P::PhysicalType::kWindowAggregate: |
| return convertWindowAggregate( |
| std::static_pointer_cast<const P::WindowAggregate>(physical_plan)); |
| default: |
| LOG(FATAL) << "Unknown physical plan node " |
| << physical_plan->getShortString(); |
| } |
| } |
| |
| std::string ExecutionGenerator::getNewRelationName() { |
| std::ostringstream out; |
| out << OptimizerContext::kInternalTemporaryRelationNamePrefix |
| << query_handle_->query_id() << "_" << rel_id_; |
| ++rel_id_; |
| return out.str(); |
| } |
| |
| void ExecutionGenerator::createTemporaryCatalogRelation( |
| const P::PhysicalPtr &physical, |
| const CatalogRelation **catalog_relation_output, |
| S::InsertDestination *insert_destination_proto) { |
| auto catalog_relation = |
| make_unique<CatalogRelation>(catalog_database_, getNewRelationName(), -1 /* id */, true /* is_temporary*/); |
| |
| const P::PartitionSchemeHeader *partition_scheme_header = physical->getOutputPartitionSchemeHeader(); |
| if (partition_scheme_header) { |
| PartitionSchemeHeader::PartitionAttributeIds output_partition_attr_ids; |
| for (const auto &partition_equivalent_expr_ids : partition_scheme_header->partition_expr_ids) { |
| DCHECK(!partition_equivalent_expr_ids.empty()); |
| const E::ExprId partition_expr_id = *partition_equivalent_expr_ids.begin(); |
| DCHECK(attribute_substitution_map_.find(partition_expr_id) != attribute_substitution_map_.end()); |
| // Use the attribute id from the input relation. |
| // NOTE(zuyu): The following line should be before changing |
| // 'attribute_substitution_map_' with the output attributes. |
| output_partition_attr_ids.push_back(attribute_substitution_map_[partition_expr_id]->getID()); |
| } |
| |
| const size_t num_partition = partition_scheme_header->num_partitions; |
| unique_ptr<PartitionSchemeHeader> output_partition_scheme_header; |
| switch (partition_scheme_header->partition_type) { |
| case P::PartitionSchemeHeader::PartitionType::kHash: |
| output_partition_scheme_header = |
| make_unique<HashPartitionSchemeHeader>(num_partition, move(output_partition_attr_ids)); |
| break; |
| case P::PartitionSchemeHeader::PartitionType::kRandom: |
| output_partition_scheme_header = |
| make_unique<RandomPartitionSchemeHeader>(num_partition); |
| break; |
| case P::PartitionSchemeHeader::PartitionType::kRange: |
| LOG(FATAL) << "Unimplemented"; |
| default: |
| LOG(FATAL) << "Unknown partition type"; |
| } |
| auto output_partition_scheme = make_unique<PartitionScheme>(output_partition_scheme_header.release()); |
| |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); |
| insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) |
| ->MergeFrom(output_partition_scheme->getProto()); |
| |
| catalog_relation->setPartitionScheme(output_partition_scheme.release()); |
| } else { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); |
| } |
| |
| attribute_id aid = 0; |
| for (const E::NamedExpressionPtr &project_expression : |
| physical->getOutputAttributes()) { |
| // The attribute name is simply set to the attribute id to make it distinct. |
| auto catalog_attribute = |
| make_unique<CatalogAttribute>(catalog_relation.get(), std::to_string(aid), |
| project_expression->getValueType(), aid, |
| project_expression->attribute_alias()); |
| attribute_substitution_map_[project_expression->id()] = |
| catalog_attribute.get(); |
| catalog_relation->addAttribute(catalog_attribute.release()); |
| ++aid; |
| } |
| |
| *catalog_relation_output = catalog_relation.get(); |
| const relation_id output_rel_id = catalog_database_->addRelation( |
| catalog_relation.release()); |
| |
| insert_destination_proto->set_relation_id(output_rel_id); |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| referenced_relation_ids_.insert(output_rel_id); |
| #endif |
| } |
| |
| void ExecutionGenerator::dropAllTemporaryRelations() { |
| for (const CatalogRelationInfo &temporary_relation_info : |
| temporary_relation_info_vec_) { |
| DCHECK_EQ(temporary_relation_info.relation->size_blocks(), 0u); |
| catalog_database_->dropRelationById(temporary_relation_info.relation->getID()); |
| } |
| } |
| |
| void ExecutionGenerator::convertNamedExpressions( |
| const std::vector<E::NamedExpressionPtr> &named_expressions, |
| S::QueryContext::ScalarGroup *scalar_group_proto) { |
| for (const E::NamedExpressionPtr &project_expression : named_expressions) { |
| unique_ptr<const Scalar> execution_scalar; |
| E::AliasPtr alias; |
| if (E::SomeAlias::MatchesWithConditionalCast(project_expression, &alias)) { |
| E::ScalarPtr scalar; |
| // We have not added aggregate expressions yet, |
| // so all child expressions of an Alias should be a Scalar. |
| CHECK(E::SomeScalar::MatchesWithConditionalCast(alias->expression(), &scalar)) |
| << alias->toString(); |
| execution_scalar.reset(scalar->concretize(attribute_substitution_map_)); |
| } else { |
| execution_scalar.reset(project_expression->concretize(attribute_substitution_map_)); |
| } |
| |
| scalar_group_proto->add_scalars()->MergeFrom(execution_scalar->getProto()); |
| } |
| } |
| |
| Predicate* ExecutionGenerator::convertPredicate( |
| const expressions::PredicatePtr &optimizer_predicate) const { |
| return optimizer_predicate->concretize(attribute_substitution_map_); |
| } |
| |
| void ExecutionGenerator::convertTableReference( |
| const P::TableReferencePtr &physical_table_reference) { |
| // TableReference is not converted to an execution operator; |
| // instead it just provides CatalogRelation info for its |
| // parent (e.g. the substitution map from an AttributeReference |
| // to a CatalogAttribute). |
| const CatalogRelation *catalog_relation = physical_table_reference->relation(); |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| referenced_relation_ids_.insert(catalog_relation->getID()); |
| #endif |
| |
| const std::vector<E::AttributeReferencePtr> &attribute_references = |
| physical_table_reference->attribute_list(); |
| DCHECK_EQ(attribute_references.size(), catalog_relation->size()); |
| |
| for (CatalogRelation::size_type i = 0; i < catalog_relation->size(); ++i) { |
| attribute_substitution_map_.emplace(attribute_references[i]->id(), |
| catalog_relation->getAttributeById(i)); |
| } |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_table_reference), |
| std::forward_as_tuple(CatalogRelationInfo::kInvalidOperatorIndex, |
| catalog_relation)); |
| } |
| |
| void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) { |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = |
| query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_sample, |
| &output_relation, |
| insert_destination_proto); |
| |
| // Create and add a Sample operator. |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_sample->input()); |
| DCHECK(input_relation_info != nullptr); |
| |
| SampleOperator *sample_op = |
| new SampleOperator(query_handle_->query_id(), |
| *input_relation_info->relation, |
| *output_relation, |
| insert_destination_index, |
| input_relation_info->isStoredRelation(), |
| physical_sample->is_block_sample(), |
| physical_sample->percentage()); |
| const QueryPlan::DAGNodeIndex sample_index = |
| execution_plan_->addRelationalOperator(sample_op); |
| insert_destination_proto->set_relational_op_index(sample_index); |
| |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(sample_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_sample), |
| std::forward_as_tuple(sample_index, |
| output_relation)); |
| temporary_relation_info_vec_.emplace_back(sample_index, output_relation); |
| } |
| |
| bool ExecutionGenerator::convertSimpleProjection( |
| const QueryContext::scalar_group_id project_expressions_group_index, |
| std::vector<attribute_id> *attributes) const { |
| const S::QueryContext::ScalarGroup &scalar_group_proto = |
| query_context_proto_->scalar_groups(project_expressions_group_index); |
| |
| for (int i = 0; i < scalar_group_proto.scalars_size(); ++i) { |
| if (scalar_group_proto.scalars(i).data_source() != S::Scalar::ATTRIBUTE) { |
| return false; |
| } |
| } |
| |
| for (int i = 0; i < scalar_group_proto.scalars_size(); ++i) { |
| attributes->push_back( |
| scalar_group_proto.scalars(i).GetExtension(S::ScalarAttribute::attribute_id)); |
| } |
| |
| return true; |
| } |
| |
| void ExecutionGenerator::convertSelection( |
| const P::SelectionPtr &physical_selection) { |
| const P::PhysicalPtr &input = physical_selection->input(); |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(input); |
| DCHECK(input_relation_info != nullptr); |
| const CatalogRelation &input_relation = *input_relation_info->relation; |
| |
| // Check if the Selection is only for renaming columns. |
| if (physical_selection->filter_predicate() == nullptr) { |
| const P::PartitionSchemeHeader *physical_select_partition_scheme_header = |
| physical_selection->getOutputPartitionSchemeHeader(); |
| const P::PartitionSchemeHeader *physical_input_partition_scheme_header = input->getOutputPartitionSchemeHeader(); |
| |
| const bool are_same_physical_partition_scheme_headers = |
| (!physical_select_partition_scheme_header && !physical_input_partition_scheme_header) || |
| (physical_select_partition_scheme_header && physical_input_partition_scheme_header && |
| physical_select_partition_scheme_header->equal(*physical_input_partition_scheme_header)); |
| |
| const std::vector<E::AttributeReferencePtr> input_attributes = input->getOutputAttributes(); |
| |
| const std::vector<E::NamedExpressionPtr> &project_expressions = |
| physical_selection->project_expressions(); |
| if (project_expressions.size() == input_attributes.size() && are_same_physical_partition_scheme_headers) { |
| bool has_different_attrs = false; |
| for (std::size_t attr_idx = 0; attr_idx < input_attributes.size(); ++attr_idx) { |
| if (project_expressions[attr_idx]->id() != input_attributes[attr_idx]->id()) { |
| has_different_attrs = true; |
| break; |
| } |
| } |
| if (!has_different_attrs) { |
| if (!input_relation_info->isStoredRelation()) { |
| CatalogRelation *catalog_relation = |
| const_cast<CatalogRelation*>(input_relation_info->relation); |
| for (std::size_t attr_idx = 0; attr_idx < project_expressions.size(); ++attr_idx) { |
| CatalogAttribute *catalog_attribute = |
| catalog_relation->getAttributeByIdMutable(attr_idx); |
| DCHECK(catalog_attribute != nullptr); |
| catalog_attribute->setDisplayName( |
| project_expressions[attr_idx]->attribute_alias()); |
| } |
| physical_to_output_relation_map_.emplace(physical_selection, |
| *input_relation_info); |
| return; |
| } |
| } |
| } |
| } |
| |
| // Convert the project expressions proto. |
| const QueryContext::scalar_group_id project_expressions_group_index = |
| query_context_proto_->scalar_groups_size(); |
| convertNamedExpressions(physical_selection->project_expressions(), |
| query_context_proto_->add_scalar_groups()); |
| |
| // Convert the predicate proto. |
| QueryContext::predicate_id execution_predicate_index = QueryContext::kInvalidPredicateId; |
| if (physical_selection->filter_predicate()) { |
| execution_predicate_index = query_context_proto_->predicates_size(); |
| |
| unique_ptr<const Predicate> execution_predicate(convertPredicate(physical_selection->filter_predicate())); |
| query_context_proto_->add_predicates()->MergeFrom(execution_predicate->getProto()); |
| } |
| |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_selection, |
| &output_relation, |
| insert_destination_proto); |
| |
| // Create and add a Select operator. |
| const bool has_repartition = physical_selection->hasRepartition(); |
| |
| // Use the "simple" form of the selection operator (a pure projection that |
| // doesn't require any expression evaluation or intermediate copies) if |
| // possible. |
| std::vector<attribute_id> attributes; |
| SelectOperator *op = |
| convertSimpleProjection(project_expressions_group_index, &attributes) |
| ? new SelectOperator(query_handle_->query_id(), |
| input_relation, |
| has_repartition, |
| *output_relation, |
| insert_destination_index, |
| execution_predicate_index, |
| move(attributes), |
| input_relation_info->isStoredRelation()) |
| : new SelectOperator(query_handle_->query_id(), |
| input_relation, |
| has_repartition, |
| *output_relation, |
| insert_destination_index, |
| execution_predicate_index, |
| project_expressions_group_index, |
| input_relation_info->isStoredRelation()); |
| |
| const QueryPlan::DAGNodeIndex select_index = |
| execution_plan_->addRelationalOperator(op); |
| insert_destination_proto->set_relational_op_index(select_index); |
| |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(select_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } else if (input_relation.isTemporary()) { |
| // NOTE(zuyu): drop the temp relation created by EliminateEmptyNode rule. |
| temporary_relation_info_vec_.emplace_back(select_index, &input_relation); |
| } |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_selection), |
| std::forward_as_tuple(select_index, |
| output_relation)); |
| temporary_relation_info_vec_.emplace_back(select_index, output_relation); |
| |
| if (lip_filter_generator_ != nullptr) { |
| lip_filter_generator_->addSelectionInfo(physical_selection, select_index); |
| } |
| } |
| |
| void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan) { |
| const std::unordered_map<physical::PhysicalPtr, CatalogRelationInfo>::const_iterator found_it = |
| physical_to_output_relation_map_.find( |
| top_level_physical_plan_->shared_subplan_at(physical_plan->subplan_id())); |
| if (found_it != physical_to_output_relation_map_.end()) { |
| physical_to_output_relation_map_.emplace(physical_plan, found_it->second); |
| |
| // Propagate the (ExprId -> CatalogAttribute) mapping. |
| const std::vector<E::AttributeReferencePtr> &referenced_attributes = |
| physical_plan->referenced_attributes(); |
| const std::vector<E::AttributeReferencePtr> &output_attributes = |
| physical_plan->output_attributes(); |
| for (std::size_t i = 0; i < referenced_attributes.size(); ++i) { |
| attribute_substitution_map_[output_attributes[i]->id()] = |
| attribute_substitution_map_[referenced_attributes[i]->id()]; |
| } |
| } |
| } |
| |
| void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan) { |
| const P::PhysicalPtr &probe_physical = physical_plan->left(); |
| P::PhysicalPtr build_physical = physical_plan->right(); |
| |
| // Let B denote the build side child. If B is also a FilterJoin, then the |
| // actual "concrete" input relation is B's probe side child, and B's build |
| // side becomes a LIPFilter that is attached to the BuildLIPFilterOperator |
| // created below. |
| P::FilterJoinPtr filter_join; |
| if (P::SomeFilterJoin::MatchesWithConditionalCast(build_physical, &filter_join)) { |
| build_physical = filter_join->left(); |
| DCHECK(build_physical->getPhysicalType() != P::PhysicalType::kFilterJoin); |
| } |
| |
| // Convert the predicate proto. |
| QueryContext::predicate_id build_side_predicate_index = QueryContext::kInvalidPredicateId; |
| if (physical_plan->build_side_filter_predicate()) { |
| build_side_predicate_index = query_context_proto_->predicates_size(); |
| |
| std::unique_ptr<const Predicate> build_side_predicate( |
| convertPredicate(physical_plan->build_side_filter_predicate())); |
| query_context_proto_->add_predicates()->MergeFrom(build_side_predicate->getProto()); |
| } |
| |
| const CatalogRelationInfo *probe_relation_info = |
| findRelationInfoOutputByPhysical(probe_physical); |
| const CatalogRelationInfo *build_relation_info = |
| findRelationInfoOutputByPhysical(build_physical); |
| |
| const CatalogRelation &build_relation = *build_relation_info->relation; |
| |
| // Create a BuildLIPFilterOperator for the FilterJoin. This operator builds |
| // LIP filters that are applied properly in downstream operators to achieve |
| // the filter-join semantics. |
| const QueryPlan::DAGNodeIndex build_filter_operator_index = |
| execution_plan_->addRelationalOperator( |
| new BuildLIPFilterOperator( |
| query_handle_->query_id(), |
| build_relation, |
| build_side_predicate_index, |
| build_relation_info->isStoredRelation())); |
| |
| if (!build_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(build_filter_operator_index, |
| build_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_plan), |
| std::forward_as_tuple(probe_relation_info->producer_operator_index, |
| probe_relation_info->relation)); |
| |
| DCHECK(lip_filter_generator_ != nullptr); |
| lip_filter_generator_->addFilterJoinInfo(physical_plan, |
| build_filter_operator_index); |
| } |
| |
| void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) { |
| // HashJoin is converted to three operators: |
| // BuildHash, HashJoin, DestroyHash. The second is the primary operator. |
| |
| const P::PhysicalPtr &probe_physical = physical_plan->left(); |
| const P::PhysicalPtr &build_physical = physical_plan->right(); |
| |
| std::vector<attribute_id> probe_attribute_ids; |
| std::vector<attribute_id> build_attribute_ids; |
| |
| const std::size_t build_cardinality = |
| cost_model_for_hash_join_->estimateCardinality(build_physical); |
| |
| bool any_probe_attributes_nullable = false; |
| bool any_build_attributes_nullable = false; |
| |
| const std::vector<E::AttributeReferencePtr> &left_join_attributes = |
| physical_plan->left_join_attributes(); |
| for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) { |
| const CatalogAttribute *probe_catalog_attribute |
| = attribute_substitution_map_[left_join_attribute->id()]; |
| probe_attribute_ids.emplace_back(probe_catalog_attribute->getID()); |
| |
| if (probe_catalog_attribute->getType().isNullable()) { |
| any_probe_attributes_nullable = true; |
| } |
| } |
| |
| const std::vector<E::AttributeReferencePtr> &right_join_attributes = |
| physical_plan->right_join_attributes(); |
| for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) { |
| const CatalogAttribute *build_catalog_attribute |
| = attribute_substitution_map_[right_join_attribute->id()]; |
| build_attribute_ids.emplace_back(build_catalog_attribute->getID()); |
| |
| if (build_catalog_attribute->getType().isNullable()) { |
| any_build_attributes_nullable = true; |
| } |
| } |
| |
| // Remember key types for call to SimplifyHashTableImplTypeProto() below. |
| std::vector<const Type*> key_types; |
| for (std::vector<E::AttributeReferencePtr>::size_type attr_idx = 0; |
| attr_idx < left_join_attributes.size(); |
| ++attr_idx) { |
| const Type &left_attribute_type = left_join_attributes[attr_idx]->getValueType(); |
| const Type &right_attribute_type = right_join_attributes[attr_idx]->getValueType(); |
| if (left_attribute_type.getTypeID() != right_attribute_type.getTypeID()) { |
| THROW_SQL_ERROR() << "Equality join predicate between two attributes of different types " |
| "is not allowed in HashJoin"; |
| } |
| key_types.push_back(&left_attribute_type); |
| } |
| |
| // Convert the residual predicate proto. |
| QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId; |
| if (physical_plan->residual_predicate()) { |
| residual_predicate_index = query_context_proto_->predicates_size(); |
| |
| unique_ptr<const Predicate> residual_predicate(convertPredicate(physical_plan->residual_predicate())); |
| query_context_proto_->add_predicates()->MergeFrom(residual_predicate->getProto()); |
| } |
| |
| // Convert the build predicate proto. |
| QueryContext::predicate_id build_predicate_index = QueryContext::kInvalidPredicateId; |
| if (physical_plan->build_predicate()) { |
| build_predicate_index = query_context_proto_->predicates_size(); |
| |
| unique_ptr<const Predicate> build_predicate(convertPredicate(physical_plan->build_predicate())); |
| query_context_proto_->add_predicates()->MergeFrom(build_predicate->getProto()); |
| } |
| |
| // Convert the project expressions proto. |
| const QueryContext::scalar_group_id project_expressions_group_index = |
| query_context_proto_->scalar_groups_size(); |
| convertNamedExpressions(physical_plan->project_expressions(), |
| query_context_proto_->add_scalar_groups()); |
| |
| const CatalogRelationInfo *build_relation_info = |
| findRelationInfoOutputByPhysical(build_physical); |
| const CatalogRelationInfo *probe_relation_info = |
| findRelationInfoOutputByPhysical(probe_physical); |
| |
| // Create a vector that indicates whether each project expression is using |
| // attributes from the build relation as input. This information is required |
| // by the current implementation of hash left outer join |
| std::unique_ptr<std::vector<bool>> is_selection_on_build; |
| if (physical_plan->join_type() == P::HashJoin::JoinType::kLeftOuterJoin) { |
| is_selection_on_build.reset( |
| new std::vector<bool>( |
| E::MarkExpressionsReferingAnyAttribute( |
| physical_plan->project_expressions(), |
| build_physical->getOutputAttributes()))); |
| } |
| |
| const CatalogRelation *build_relation = build_relation_info->relation; |
| const CatalogRelation *probe_relation = probe_relation_info->relation; |
| |
| // FIXME(quickstep-team): Add support for self-join. |
| // We check to see if the build_predicate is null as certain queries that |
| // support hash-select fuse will result in the first check being true. |
| if (build_relation == probe_relation && physical_plan->build_predicate() == nullptr) { |
| THROW_SQL_ERROR() << "Self-join is not supported"; |
| } |
| |
| // Create join hash table proto. |
| const QueryContext::join_hash_table_id join_hash_table_index = |
| query_context_proto_->join_hash_tables_size(); |
| S::QueryContext::HashTableContext *hash_table_context_proto = |
| query_context_proto_->add_join_hash_tables(); |
| |
| const std::size_t probe_num_partitions = probe_relation->getNumPartitions(); |
| hash_table_context_proto->set_num_partitions(probe_num_partitions); |
| |
| S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table(); |
| |
| // SimplifyHashTableImplTypeProto() switches the hash table implementation |
| // from SeparateChaining to SimpleScalarSeparateChaining when there is a |
| // single scalar key type with a reversible hash function. |
| hash_table_proto->set_hash_table_impl_type( |
| SimplifyHashTableImplTypeProto( |
| HashTableImplTypeProtoFromString(FLAGS_join_hashtable_type), |
| key_types)); |
| |
| for (const attribute_id build_attribute : build_attribute_ids) { |
| hash_table_proto->add_key_types()->MergeFrom( |
| build_relation->getAttributeById(build_attribute)->getType().getProto()); |
| } |
| |
| hash_table_proto->set_estimated_num_entries(build_cardinality); |
| |
| // Create three operators. |
| const QueryPlan::DAGNodeIndex build_operator_index = |
| execution_plan_->addRelationalOperator( |
| new BuildHashOperator( |
| query_handle_->query_id(), |
| *build_relation, |
| build_relation_info->isStoredRelation(), |
| build_attribute_ids, |
| any_build_attributes_nullable, |
| probe_num_partitions, |
| join_hash_table_index, |
| build_predicate_index)); |
| |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_plan, |
| &output_relation, |
| insert_destination_proto); |
| |
| // Get JoinType |
| HashJoinOperator::JoinType join_type; |
| switch (physical_plan->join_type()) { |
| case P::HashJoin::JoinType::kInnerJoin: |
| join_type = HashJoinOperator::JoinType::kInnerJoin; |
| break; |
| case P::HashJoin::JoinType::kLeftSemiJoin: |
| join_type = HashJoinOperator::JoinType::kLeftSemiJoin; |
| break; |
| case P::HashJoin::JoinType::kLeftAntiJoin: |
| join_type = HashJoinOperator::JoinType::kLeftAntiJoin; |
| break; |
| case P::HashJoin::JoinType::kLeftOuterJoin: |
| join_type = HashJoinOperator::JoinType::kLeftOuterJoin; |
| break; |
| default: |
| LOG(FATAL) << "Invalid physical::HashJoin::JoinType: " |
| << static_cast<typename std::underlying_type<P::HashJoin::JoinType>::type>( |
| physical_plan->join_type()); |
| } |
| |
| // Create hash join operator |
| const QueryPlan::DAGNodeIndex join_operator_index = |
| execution_plan_->addRelationalOperator( |
| new HashJoinOperator( |
| query_handle_->query_id(), |
| *build_relation, |
| *probe_relation, |
| probe_relation_info->isStoredRelation(), |
| probe_attribute_ids, |
| any_probe_attributes_nullable, |
| probe_num_partitions, |
| physical_plan->hasRepartition(), |
| *output_relation, |
| insert_destination_index, |
| join_hash_table_index, |
| residual_predicate_index, |
| project_expressions_group_index, |
| is_selection_on_build.get(), |
| join_type)); |
| insert_destination_proto->set_relational_op_index(join_operator_index); |
| |
| const QueryPlan::DAGNodeIndex destroy_operator_index = |
| execution_plan_->addRelationalOperator(new DestroyHashOperator( |
| query_handle_->query_id(), probe_num_partitions, join_hash_table_index)); |
| |
| if (!build_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(build_operator_index, |
| build_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| // Add the dependency for the producer operator of the build relation |
| // to prevent the build relation from being destroyed until after the join |
| // is complete (see QueryPlan::addDependenciesForDropOperator(), which |
| // makes the drop operator for the temporary relation dependent on all its |
| // consumers having finished). |
| execution_plan_->addDirectDependency(join_operator_index, |
| build_relation_info->producer_operator_index, |
| true /* is_pipeline_breaker */); |
| } |
| if (!probe_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(join_operator_index, |
| probe_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| execution_plan_->addDirectDependency(join_operator_index, |
| build_operator_index, |
| true /* is_pipeline_breaker */); |
| execution_plan_->addDirectDependency(destroy_operator_index, |
| join_operator_index, |
| true /* is_pipeline_breaker */); |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_plan), |
| std::forward_as_tuple(join_operator_index, |
| output_relation)); |
| temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation); |
| |
| if (lip_filter_generator_ != nullptr) { |
| lip_filter_generator_->addHashJoinInfo(physical_plan, |
| build_operator_index, |
| join_operator_index); |
| } |
| } |
| |
| void ExecutionGenerator::convertNestedLoopsJoin( |
| const P::NestedLoopsJoinPtr &physical_plan) { |
| // NestedLoopsJoin is converted to a NestedLoopsJoin operator. |
| |
| // Convert the join predicate proto. |
| const QueryContext::predicate_id execution_join_predicate_index = query_context_proto_->predicates_size(); |
| if (physical_plan->join_predicate()) { |
| unique_ptr<const Predicate> execution_join_predicate(convertPredicate(physical_plan->join_predicate())); |
| query_context_proto_->add_predicates()->MergeFrom(execution_join_predicate->getProto()); |
| } else { |
| query_context_proto_->add_predicates()->set_predicate_type(S::Predicate::TRUE); |
| } |
| |
| // Convert the project expressions proto. |
| const QueryContext::scalar_group_id project_expressions_group_index = |
| query_context_proto_->scalar_groups_size(); |
| convertNamedExpressions(physical_plan->project_expressions(), |
| query_context_proto_->add_scalar_groups()); |
| |
| const CatalogRelationInfo *left_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->left()); |
| const CatalogRelation &left_relation = *left_relation_info->relation; |
| const CatalogRelationInfo *right_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->right()); |
| const CatalogRelation &right_relation = *right_relation_info->relation; |
| |
| // FIXME(quickstep-team): Add support for self-join. |
| if (left_relation.getID() == right_relation.getID()) { |
| THROW_SQL_ERROR() << "NestedLoopsJoin does not support self-join yet"; |
| } |
| |
| const std::size_t num_partitions = left_relation.getNumPartitions(); |
| |
| #ifdef QUICKSTEP_DEBUG |
| if (right_relation.hasPartitionScheme()) { |
| DCHECK_EQ(num_partitions, right_relation.getNumPartitions()); |
| } |
| #endif |
| |
| const std::size_t nested_loops_join_index = |
| query_context_proto_->num_partitions_for_nested_loops_joins_size(); |
| query_context_proto_->add_num_partitions_for_nested_loops_joins(num_partitions); |
| |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_plan, |
| &output_relation, |
| insert_destination_proto); |
| |
| // Create and add a NestedLoopsJoin operator. |
| const QueryPlan::DAGNodeIndex join_operator_index = |
| execution_plan_->addRelationalOperator( |
| new NestedLoopsJoinOperator(query_handle_->query_id(), |
| nested_loops_join_index, |
| left_relation, |
| right_relation, |
| num_partitions, |
| physical_plan->hasRepartition(), |
| *output_relation, |
| insert_destination_index, |
| execution_join_predicate_index, |
| project_expressions_group_index, |
| left_relation_info->isStoredRelation(), |
| right_relation_info->isStoredRelation())); |
| insert_destination_proto->set_relational_op_index(join_operator_index); |
| |
| if (!left_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(join_operator_index, |
| left_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| if (!right_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(join_operator_index, |
| right_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_plan), |
| std::forward_as_tuple(join_operator_index, |
| output_relation)); |
| temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation); |
| } |
| |
| void ExecutionGenerator::convertCopyFrom( |
| const P::CopyFromPtr &physical_plan) { |
| // CopyFrom is converted to a TextScan and a SaveBlocks. |
| |
| const CatalogRelation *output_relation = physical_plan->catalog_relation(); |
| const relation_id output_rel_id = output_relation->getID(); |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| referenced_relation_ids_.insert(output_rel_id); |
| #endif |
| |
| // Create InsertDestination proto. |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| |
| if (output_relation->hasPartitionScheme()) { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); |
| insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) |
| ->MergeFrom(output_relation->getPartitionScheme()->getProto()); |
| } else { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); |
| |
| const vector<block_id> blocks(output_relation->getBlocksSnapshot()); |
| for (const block_id block : blocks) { |
| insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); |
| } |
| } |
| |
| insert_destination_proto->set_relation_id(output_rel_id); |
| insert_destination_proto->mutable_layout()->MergeFrom( |
| output_relation->getDefaultStorageBlockLayout().getDescription()); |
| |
| const QueryPlan::DAGNodeIndex scan_operator_index = |
| execution_plan_->addRelationalOperator( |
| new TextScanOperator( |
| query_handle_->query_id(), |
| physical_plan->file_name(), |
| physical_plan->options(), |
| *output_relation, |
| insert_destination_index)); |
| insert_destination_proto->set_relational_op_index(scan_operator_index); |
| |
| CatalogRelation *mutable_output_relation = |
| catalog_database_->getRelationByIdMutable(output_rel_id); |
| const QueryPlan::DAGNodeIndex save_blocks_operator_index = |
| execution_plan_->addRelationalOperator( |
| new SaveBlocksOperator(query_handle_->query_id(), mutable_output_relation)); |
| execution_plan_->addDirectDependency(save_blocks_operator_index, |
| scan_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| void ExecutionGenerator::convertCopyTo(const P::CopyToPtr &physical_plan) { |
| // CopyTo is converted to a TableExport operator. |
| |
| const CatalogRelation *input_relation; |
| bool input_relation_is_stored; |
| |
| const P::PhysicalPtr &input = physical_plan->input(); |
| P::TableReferencePtr table_reference; |
| const CatalogRelationInfo *input_relation_info = nullptr; |
| if (P::SomeTableReference::MatchesWithConditionalCast(input, &table_reference)) { |
| input_relation = table_reference->relation(); |
| input_relation_is_stored = true; |
| } else { |
| input_relation_info = findRelationInfoOutputByPhysical(input); |
| input_relation = input_relation_info->relation; |
| input_relation_is_stored = false; |
| } |
| |
| DCHECK(input_relation != nullptr); |
| const QueryPlan::DAGNodeIndex table_export_operator_index = |
| execution_plan_->addRelationalOperator( |
| new TableExportOperator(query_handle_->query_id(), |
| *input_relation, |
| input_relation_is_stored, |
| physical_plan->file_name(), |
| physical_plan->options())); |
| if (!input_relation_is_stored) { |
| DCHECK(input_relation_info != nullptr); |
| execution_plan_->addDirectDependency(table_export_operator_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } else if (input_relation->isTemporary()) { |
| // NOTE(zuyu): drop the temp relation created by EliminateEmptyNode rule. |
| temporary_relation_info_vec_.emplace_back(table_export_operator_index, input_relation); |
| } |
| } |
| |
| void ExecutionGenerator::convertCreateIndex( |
| const P::CreateIndexPtr &physical_plan) { |
| // CreateIndex is converted to a CreateIndex operator. |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->input()); |
| CatalogRelation *input_relation = |
| catalog_database_->getRelationByIdMutable( |
| input_relation_info->relation->getID()); |
| |
| // Check if any index with the specified name already exists. |
| if (input_relation->hasIndexWithName(physical_plan->index_name())) { |
| THROW_SQL_ERROR() << "The relation " << input_relation->getName() |
| << " already has an index named "<< physical_plan->index_name(); |
| } |
| |
| DCHECK_GT(physical_plan->index_attributes().size(), 0u); |
| |
| // Convert attribute references to a vector of pointers to catalog attributes. |
| std::vector<const CatalogAttribute*> index_attributes; |
| for (const E::AttributeReferencePtr &attribute : physical_plan->index_attributes()) { |
| const CatalogAttribute *catalog_attribute |
| = input_relation->getAttributeByName(attribute->attribute_name()); |
| DCHECK(catalog_attribute != nullptr); |
| index_attributes.emplace_back(catalog_attribute); |
| } |
| |
| // Create a copy of index description and add all the specified attributes to it. |
| IndexSubBlockDescription index_description(*physical_plan->index_description()); |
| for (const CatalogAttribute* catalog_attribute : index_attributes) { |
| index_description.add_indexed_attribute_ids(catalog_attribute->getID()); |
| } |
| if (input_relation->hasIndexWithDescription(index_description)) { |
| // Check if the given index description already exists in the relation. |
| THROW_SQL_ERROR() << "The relation " << input_relation->getName() |
| << " already defines this index on the given attribute(s)."; |
| } |
| if (!SubBlockTypeRegistry::IndexDescriptionIsValid(*input_relation, index_description)) { |
| // Check if the given index description is valid. |
| THROW_SQL_ERROR() << "The index with given properties cannot be created."; |
| } |
| execution_plan_->addRelationalOperator( |
| new CreateIndexOperator(query_handle_->query_id(), |
| input_relation, |
| physical_plan->index_name(), |
| std::move(index_description))); |
| } |
| |
| void ExecutionGenerator::convertCreateTable( |
| const P::CreateTablePtr &physical_plan) { |
| // CreateTable is converted to a CreateTable operator. |
| |
| std::unique_ptr<CatalogRelation> catalog_relation(new CatalogRelation( |
| catalog_database_, |
| physical_plan->relation_name(), |
| -1 /* id */, |
| false /* is_temporary*/)); |
| attribute_id aid = 0; |
| for (const E::AttributeReferencePtr &attribute : |
| physical_plan->attributes()) { |
| std::unique_ptr<CatalogAttribute> catalog_attribute(new CatalogAttribute( |
| catalog_relation.get(), |
| attribute->attribute_name(), |
| attribute->getValueType(), |
| aid, |
| attribute->attribute_alias())); |
| catalog_relation->addAttribute(catalog_attribute.release()); |
| ++aid; |
| } |
| |
| // If specified, set the physical block type as the users'. Otherwise, |
| // the system uses the default layout. |
| if (physical_plan->block_properties()) { |
| if (!StorageBlockLayout::DescriptionIsValid(*catalog_relation, |
| *physical_plan->block_properties())) { |
| THROW_SQL_ERROR() << "BLOCKPROPERTIES is invalid."; |
| } |
| |
| std::unique_ptr<StorageBlockLayout> layout( |
| new StorageBlockLayout(*catalog_relation, *physical_plan->block_properties())); |
| layout->finalize(); |
| catalog_relation->setDefaultStorageBlockLayout(layout.release()); |
| } |
| |
| if (physical_plan->partition_scheme_header_proto()) { |
| catalog_relation->setPartitionScheme(new PartitionScheme( |
| PartitionSchemeHeader::ReconstructFromProto(*physical_plan->partition_scheme_header_proto()))); |
| } |
| |
| execution_plan_->addRelationalOperator( |
| new CreateTableOperator(query_handle_->query_id(), |
| catalog_relation.release(), |
| catalog_database_)); |
| } |
| |
| void ExecutionGenerator::convertDeleteTuples( |
| const P::DeleteTuplesPtr &physical_plan) { |
| // If there is a selection predicate and the predicate value |
| // is not statically true, DeleteTuples is converted to |
| // a DeleteOperator and a SaveBlocksOperator; if there is not |
| // a selection predicate or the predicate value is statically true, |
| // it is converted to a DropTableOperator; otherwise, the predicate |
| // value is statically false, so no operator needs to be created. |
| |
| unique_ptr<const Predicate> execution_predicate; |
| if (physical_plan->predicate()) { |
| execution_predicate.reset(convertPredicate(physical_plan->predicate())); |
| } |
| |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->input()); |
| DCHECK(input_relation_info != nullptr); |
| |
| const CatalogRelation *input_relation = input_relation_info->relation; |
| |
| if (execution_predicate == nullptr || |
| (execution_predicate->hasStaticResult() && |
| execution_predicate->getStaticResult())) { |
| const QueryPlan::DAGNodeIndex drop_table_index = |
| execution_plan_->addRelationalOperator( |
| new DropTableOperator(query_handle_->query_id(), |
| *input_relation, |
| catalog_database_, |
| true /* only_drop_blocks */)); |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(drop_table_index, |
| input_relation_info->producer_operator_index, |
| true /* is_pipeline_breaker */); |
| } |
| } else if (!execution_predicate->hasStaticResult()) { |
| const QueryContext::predicate_id execution_predicate_index = query_context_proto_->predicates_size(); |
| query_context_proto_->add_predicates()->MergeFrom(execution_predicate->getProto()); |
| |
| const QueryPlan::DAGNodeIndex delete_tuples_index = |
| execution_plan_->addRelationalOperator( |
| new DeleteOperator(query_handle_->query_id(), |
| *input_relation, |
| execution_predicate_index, |
| input_relation_info->isStoredRelation())); |
| |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(delete_tuples_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| CatalogRelation *mutable_relation = |
| catalog_database_->getRelationByIdMutable(input_relation->getID()); |
| const QueryPlan::DAGNodeIndex save_blocks_index = |
| execution_plan_->addRelationalOperator( |
| new SaveBlocksOperator(query_handle_->query_id(), mutable_relation)); |
| execution_plan_->addDirectDependency(save_blocks_index, |
| delete_tuples_index, |
| false /* is_pipeline_breaker */); |
| } |
| } |
| |
| void ExecutionGenerator::convertDropTable( |
| const P::DropTablePtr &physical_plan) { |
| // DropTable is converted to a DropTable operator. |
| const CatalogRelation &catalog_relation = *physical_plan->catalog_relation(); |
| |
| #ifdef QUICKSTEP_DISTRIBUTED |
| referenced_relation_ids_.insert(catalog_relation.getID()); |
| #endif |
| |
| execution_plan_->addRelationalOperator( |
| new DropTableOperator(query_handle_->query_id(), |
| catalog_relation, |
| catalog_database_)); |
| } |
| |
| void ExecutionGenerator::convertInsertTuple( |
| const P::InsertTuplePtr &physical_plan) { |
| // InsertTuple is converted to an Insert and a SaveBlocks. |
| #ifdef QUICKSTEP_DISTRIBUTED |
| query_handle_->set_is_single_node_query(); |
| #endif // QUICKSTEP_DISTRIBUTED |
| |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->input()); |
| const CatalogRelation &input_relation = |
| *catalog_database_->getRelationById( |
| input_relation_info->relation->getID()); |
| |
| // Construct the tuple proto to be inserted. |
| std::vector<QueryContext::tuple_id> tuple_indexes; |
| |
| for (const std::vector<expressions::ScalarLiteralPtr> &tuple : physical_plan->column_values()) { |
| const QueryContext::tuple_id tuple_index = query_context_proto_->tuples_size(); |
| S::Tuple *tuple_proto = query_context_proto_->add_tuples(); |
| for (const E::ScalarLiteralPtr &literal : tuple) { |
| tuple_proto->add_attribute_values()->MergeFrom(literal->value().getProto()); |
| } |
| tuple_indexes.push_back(tuple_index); |
| } |
| |
| // FIXME(qzeng): A better way is using a traits struct to look up whether a storage |
| // block supports ad-hoc insertion instead of hard-coding the block types. |
| const StorageBlockLayout &storage_block_layout = |
| input_relation.getDefaultStorageBlockLayout(); |
| if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() == |
| TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE || |
| storage_block_layout.getDescription().tuple_store_description().sub_block_type() == |
| TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) { |
| THROW_SQL_ERROR() << "INSERT statement is not supported for the relation " |
| << input_relation.getName() |
| << ", because its storage blocks do not support ad-hoc insertion"; |
| } |
| |
| // Create InsertDestination proto. |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| |
| insert_destination_proto->set_relation_id(input_relation.getID()); |
| insert_destination_proto->mutable_layout()->MergeFrom( |
| input_relation.getDefaultStorageBlockLayout().getDescription()); |
| |
| if (input_relation.hasPartitionScheme()) { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); |
| insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) |
| ->MergeFrom(input_relation.getPartitionScheme()->getProto()); |
| } else { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); |
| |
| const vector<block_id> blocks(input_relation.getBlocksSnapshot()); |
| for (const block_id block : blocks) { |
| insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); |
| } |
| } |
| |
| const QueryPlan::DAGNodeIndex insert_operator_index = |
| execution_plan_->addRelationalOperator( |
| new InsertOperator(query_handle_->query_id(), |
| input_relation, |
| insert_destination_index, |
| tuple_indexes)); |
| insert_destination_proto->set_relational_op_index(insert_operator_index); |
| |
| CatalogRelation *mutable_relation = |
| catalog_database_->getRelationByIdMutable(input_relation.getID()); |
| const QueryPlan::DAGNodeIndex save_blocks_index = |
| execution_plan_->addRelationalOperator( |
| new SaveBlocksOperator(query_handle_->query_id(), mutable_relation)); |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(insert_operator_index, |
| input_relation_info->producer_operator_index, |
| true /* is_pipeline_breaker */); |
| } |
| execution_plan_->addDirectDependency(save_blocks_index, |
| insert_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| void ExecutionGenerator::convertInsertSelection( |
| const P::InsertSelectionPtr &physical_plan) { |
| // InsertSelection is converted to a Select and a SaveBlocks. |
| |
| const CatalogRelationInfo *destination_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->destination()); |
| const CatalogRelation &destination_relation = *destination_relation_info->relation; |
| |
| // FIXME(qzeng): A better way is using a traits struct to look up whether a storage |
| // block supports ad-hoc insertion instead of hard-coding the block types. |
| const StorageBlockLayout &storage_block_layout = |
| destination_relation.getDefaultStorageBlockLayout(); |
| if (storage_block_layout.getDescription().tuple_store_description().sub_block_type() == |
| TupleStorageSubBlockDescription::COMPRESSED_COLUMN_STORE |
| || storage_block_layout.getDescription().tuple_store_description().sub_block_type() == |
| TupleStorageSubBlockDescription::COMPRESSED_PACKED_ROW_STORE) { |
| THROW_SQL_ERROR() << "INSERT statement is not supported for the relation " |
| << destination_relation.getName() |
| << ", because its storage blocks do not support ad-hoc insertion"; |
| } |
| |
| // Create InsertDestination proto. |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| insert_destination_proto->set_relation_id(destination_relation.getID()); |
| insert_destination_proto->mutable_layout()->MergeFrom( |
| destination_relation.getDefaultStorageBlockLayout().getDescription()); |
| |
| if (destination_relation.hasPartitionScheme()) { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); |
| insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) |
| ->MergeFrom(destination_relation.getPartitionScheme()->getProto()); |
| } else { |
| insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); |
| |
| const vector<block_id> blocks(destination_relation.getBlocksSnapshot()); |
| for (const block_id block : blocks) { |
| insert_destination_proto->AddExtension(S::BlockPoolInsertDestination::blocks, block); |
| } |
| } |
| |
| const CatalogRelationInfo *selection_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->selection()); |
| const CatalogRelation &selection_relation = *selection_relation_info->relation; |
| |
| // Prepare the attributes, which are output columns of the selection relation. |
| std::vector<attribute_id> attributes; |
| for (E::AttributeReferencePtr attr_ref : physical_plan->selection()->getOutputAttributes()) { |
| unique_ptr<const Scalar> attribute(attr_ref->concretize(attribute_substitution_map_)); |
| |
| DCHECK_EQ(Scalar::kAttribute, attribute->getDataSource()); |
| attributes.push_back( |
| static_cast<const ScalarAttribute*>(attribute.get())->getAttribute().getID()); |
| } |
| |
| const P::PartitionSchemeHeader *input_physical_partition_scheme_header = |
| physical_plan->selection()->getOutputPartitionSchemeHeader(); |
| const P::PartitionSchemeHeader *output_physical_partition_scheme_header = |
| physical_plan->destination()->getOutputPartitionSchemeHeader(); |
| const bool has_repartition = |
| (input_physical_partition_scheme_header && output_physical_partition_scheme_header) |
| ? input_physical_partition_scheme_header->equal(*output_physical_partition_scheme_header) |
| : (input_physical_partition_scheme_header || output_physical_partition_scheme_header); |
| |
| // Create the select operator. |
| // TODO(jianqiao): This select operator is actually redundant. That is, |
| // we may directly set physical_plan_->selection()'s output relation to be |
| // destination_relation, instead of creating an intermediate selection_relation |
| // and then copy the data into destination_relation. One way to achieve this |
| // optimization is to enable specifying a specific output relation for each |
| // physical plan by modifying class Physical. |
| SelectOperator *insert_selection_op = |
| new SelectOperator(query_handle_->query_id(), |
| selection_relation, |
| has_repartition, |
| destination_relation, |
| insert_destination_index, |
| QueryContext::kInvalidPredicateId, |
| move(attributes), |
| selection_relation_info->isStoredRelation()); |
| |
| const QueryPlan::DAGNodeIndex insert_selection_index = |
| execution_plan_->addRelationalOperator(insert_selection_op); |
| insert_destination_proto->set_relational_op_index(insert_selection_index); |
| |
| CatalogRelation *mutable_relation = |
| catalog_database_->getRelationByIdMutable(destination_relation.getID()); |
| const QueryPlan::DAGNodeIndex save_blocks_index = |
| execution_plan_->addRelationalOperator( |
| new SaveBlocksOperator(query_handle_->query_id(), mutable_relation)); |
| |
| if (!selection_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(insert_selection_index, |
| selection_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } else if (selection_relation.isTemporary()) { |
| // NOTE(zuyu): drop the temp relation created by EliminateEmptyNode rule. |
| temporary_relation_info_vec_.emplace_back(insert_selection_index, &selection_relation); |
| } |
| execution_plan_->addDirectDependency(save_blocks_index, |
| insert_selection_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| void ExecutionGenerator::convertUnionAll( |
| const P::UnionAllPtr &physical_unionall) { |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = |
| query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_unionall, |
| &output_relation, |
| insert_destination_proto); |
| |
| const std::vector<P::PhysicalPtr> &operands = physical_unionall->operands(); |
| std::vector<const CatalogRelation*> input_relations; |
| std::vector<bool> is_stored_relation; |
| std::vector<std::vector<attribute_id>> select_attribute_ids; |
| std::vector<QueryPlan::DAGNodeIndex> dependency_operator_index; |
| |
| for (const auto &operand : operands) { |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(operand); |
| DCHECK(input_relation_info != nullptr); |
| input_relations.push_back(input_relation_info->relation); |
| is_stored_relation.push_back(input_relation_info->isStoredRelation()); |
| dependency_operator_index.push_back(input_relation_info->producer_operator_index); |
| |
| const QueryContext::scalar_group_id project_expressions_group_index = |
| query_context_proto_->scalar_groups_size(); |
| convertNamedExpressions( |
| E::ToNamedExpressions(operand->getOutputAttributes()), |
| query_context_proto_->add_scalar_groups()); |
| std::vector<attribute_id> select_attribute_id; |
| convertSimpleProjection(project_expressions_group_index, &select_attribute_id); |
| select_attribute_ids.push_back(std::move(select_attribute_id)); |
| } |
| |
| UnionAllOperator *union_all = |
| new UnionAllOperator(query_handle_->query_id(), |
| input_relations, |
| *output_relation, |
| insert_destination_index, |
| is_stored_relation, |
| select_attribute_ids); |
| |
| const QueryPlan::DAGNodeIndex union_all_index = |
| execution_plan_->addRelationalOperator(union_all); |
| insert_destination_proto->set_relational_op_index(union_all_index); |
| |
| for (std::size_t relation_id = 0; relation_id < is_stored_relation.size(); ++relation_id) { |
| if (!is_stored_relation[relation_id]) { |
| execution_plan_->addDirectDependency(union_all_index, |
| dependency_operator_index[relation_id], |
| false /* is_pipeline_breaker */); |
| } |
| } |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_unionall), |
| std::forward_as_tuple(union_all_index, |
| output_relation)); |
| temporary_relation_info_vec_.emplace_back(union_all_index, output_relation); |
| } |
| |
| void ExecutionGenerator::convertUpdateTable( |
| const P::UpdateTablePtr &physical_plan) { |
| // UpdateTable is converted to an Update and a SaveBlocks. |
| |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->input()); |
| DCHECK(input_relation_info != nullptr); |
| |
| const CatalogRelation *input_relation = input_relation_info->relation; |
| const relation_id input_rel_id = input_relation->getID(); |
| |
| // Create InsertDestination proto. |
| const QueryContext::insert_destination_id relocation_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *relocation_destination_proto = query_context_proto_->add_insert_destinations(); |
| |
| if (input_relation->hasPartitionScheme()) { |
| relocation_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE); |
| relocation_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme) |
| ->MergeFrom(input_relation->getPartitionScheme()->getProto()); |
| } else { |
| relocation_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL); |
| } |
| relocation_destination_proto->set_relation_id(input_rel_id); |
| |
| // Convert the predicate proto. |
| QueryContext::predicate_id execution_predicate_index = QueryContext::kInvalidPredicateId; |
| if (physical_plan->predicate()) { |
| execution_predicate_index = query_context_proto_->predicates_size(); |
| |
| unique_ptr<const Predicate> execution_predicate(convertPredicate(physical_plan->predicate())); |
| query_context_proto_->add_predicates()->MergeFrom(execution_predicate->getProto()); |
| } |
| |
| // Convert assignment expressions as a UpdateGroup proto. |
| const vector<E::AttributeReferencePtr> &assignees = physical_plan->assignees(); |
| const vector<E::ScalarPtr> &assignment_expressions = physical_plan->assignment_expressions(); |
| |
| DCHECK_EQ(assignees.size(), assignment_expressions.size()) |
| << physical_plan->toString(); |
| |
| const QueryContext::update_group_id update_group_index = query_context_proto_->update_groups_size(); |
| S::QueryContext::UpdateGroup *update_group_proto = query_context_proto_->add_update_groups(); |
| update_group_proto->set_relation_id(input_rel_id); |
| |
| for (vector<E::AttributeReferencePtr>::size_type i = 0; i < assignees.size(); ++i) { |
| unique_ptr<const Scalar> attribute( |
| assignees[i]->concretize(attribute_substitution_map_)); |
| DCHECK_EQ(Scalar::kAttribute, attribute->getDataSource()) |
| << assignees[i]->toString(); |
| |
| S::QueryContext::UpdateGroup::UpdateAssignment *update_assignment_proto = |
| update_group_proto->add_update_assignments(); |
| |
| update_assignment_proto->set_attribute_id( |
| static_cast<const ScalarAttribute*>(attribute.get())->getAttribute().getID()); |
| |
| unique_ptr<const Scalar> value( |
| assignment_expressions[i]->concretize(attribute_substitution_map_)); |
| update_assignment_proto->mutable_scalar()->MergeFrom(value->getProto()); |
| } |
| |
| const QueryPlan::DAGNodeIndex update_operator_index = |
| execution_plan_->addRelationalOperator(new UpdateOperator( |
| query_handle_->query_id(), |
| *input_relation, |
| relocation_destination_index, |
| execution_predicate_index, |
| update_group_index)); |
| relocation_destination_proto->set_relational_op_index(update_operator_index); |
| |
| const QueryPlan::DAGNodeIndex save_blocks_index = |
| execution_plan_->addRelationalOperator( |
| new SaveBlocksOperator(query_handle_->query_id(), |
| catalog_database_->getRelationByIdMutable(input_rel_id))); |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(update_operator_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| execution_plan_->addDirectDependency(save_blocks_index, |
| update_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| void ExecutionGenerator::convertAggregate( |
| const P::AggregatePtr &physical_plan) { |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->input()); |
| const CatalogRelation &input_relation = *input_relation_info->relation; |
| const size_t num_partitions = input_relation.getNumPartitions(); |
| |
| // Create aggr state proto. |
| const QueryContext::aggregation_state_id aggr_state_index = |
| query_context_proto_->aggregation_states_size(); |
| S::QueryContext::AggregationOperationStateContext *aggr_state_context_proto = |
| query_context_proto_->add_aggregation_states(); |
| aggr_state_context_proto->set_num_partitions(num_partitions); |
| |
| S::AggregationOperationState *aggr_state_proto = |
| aggr_state_context_proto->mutable_aggregation_state(); |
| aggr_state_proto->set_relation_id(input_relation.getID()); |
| |
| bool use_parallel_initialization = false; |
| |
| std::vector<const Type*> group_by_types; |
| std::vector<attribute_id> group_by_attrs; |
| for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) { |
| unique_ptr<const Scalar> execution_group_by_expression; |
| E::AliasPtr alias; |
| if (E::SomeAlias::MatchesWithConditionalCast(grouping_expression, &alias)) { |
| E::ScalarPtr scalar; |
| // NOTE(zuyu): For aggregate expressions, all child expressions of an |
| // Alias should be a Scalar. |
| CHECK(E::SomeScalar::MatchesWithConditionalCast(alias->expression(), &scalar)) |
| << alias->toString(); |
| execution_group_by_expression.reset(scalar->concretize(attribute_substitution_map_)); |
| } else { |
| execution_group_by_expression.reset( |
| grouping_expression->concretize(attribute_substitution_map_)); |
| } |
| aggr_state_proto->add_group_by_expressions()->MergeFrom(execution_group_by_expression->getProto()); |
| group_by_types.push_back(&execution_group_by_expression->getType()); |
| group_by_attrs.push_back(execution_group_by_expression->getAttributeIdForValueAccessor()); |
| } |
| |
| const auto &aggregate_expressions = physical_plan->aggregate_expressions(); |
| vector<bool> is_distincts; |
| vector<pair<AggregationID, vector<const Type *>>> group_by_aggrs_info; |
| for (const E::AliasPtr &named_aggregate_expression : aggregate_expressions) { |
| const E::AggregateFunctionPtr unnamed_aggregate_expression = |
| std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression()); |
| |
| // Add a new entry in 'aggregates'. |
| S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates(); |
| |
| // Set the AggregateFunction. |
| const AggregateFunction &aggr_func = unnamed_aggregate_expression->getAggregate(); |
| aggr_proto->mutable_function()->MergeFrom(aggr_func.getProto()); |
| |
| // Add each of the aggregate's arguments. |
| vector<const Type *> argument_types; |
| for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) { |
| unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_)); |
| argument_types.push_back(&concretized_argument->getType()); |
| |
| aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto()); |
| } |
| |
| if (!group_by_types.empty()) { |
| group_by_aggrs_info.emplace_back(aggr_func.getAggregationID(), move(argument_types)); |
| } |
| |
| // Set whether it is a DISTINCT aggregation. |
| const bool is_distinct = unnamed_aggregate_expression->is_distinct(); |
| aggr_proto->set_is_distinct(is_distinct); |
| is_distincts.push_back(is_distinct); |
| |
| // Add distinctify hash table impl type if it is a DISTINCT aggregation. |
| if (unnamed_aggregate_expression->is_distinct()) { |
| const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments(); |
| DCHECK_GE(arguments.size(), 1u); |
| // Right now only SeparateChaining implementation is supported. |
| aggr_state_proto->add_distinctify_hash_table_impl_types( |
| serialization::HashTableImplType::SEPARATE_CHAINING); |
| } |
| } |
| |
| bool aggr_state_is_partitioned = false; |
| std::size_t aggr_state_num_partitions = 1u; |
| if (!group_by_types.empty()) { |
| const std::size_t estimated_num_groups = |
| cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); |
| |
| std::size_t max_num_groups; |
| if (cost_model_for_aggregation_ |
| ->canUseCollisionFreeAggregation(physical_plan, |
| estimated_num_groups, |
| &max_num_groups)) { |
| // First option: use array-based aggregation if applicable. |
| aggr_state_proto->set_hash_table_impl_type( |
| serialization::HashTableImplType::COLLISION_FREE_VECTOR); |
| aggr_state_proto->set_estimated_num_entries(max_num_groups); |
| use_parallel_initialization = true; |
| aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups); |
| |
| DCHECK(!group_by_aggrs_info.empty()); |
| CalculateCollisionFreeAggregationInfo(max_num_groups, group_by_aggrs_info, |
| aggr_state_proto->mutable_collision_free_vector_info()); |
| } else { |
| if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation( |
| physical_plan, estimated_num_groups)) { |
| // Second option: use thread-private compact-key aggregation if applicable. |
| aggr_state_proto->set_hash_table_impl_type( |
| serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY); |
| } else { |
| // Otherwise, use SeparateChaining. |
| aggr_state_proto->set_hash_table_impl_type( |
| serialization::HashTableImplType::SEPARATE_CHAINING); |
| if (CheckAggregatePartitioned(aggregate_expressions.size(), is_distincts, group_by_attrs, |
| estimated_num_groups)) { |
| aggr_state_is_partitioned = true; |
| aggr_state_num_partitions = FLAGS_num_aggregation_partitions; |
| } |
| } |
| aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups)); |
| } |
| } else { |
| aggr_state_proto->set_estimated_num_entries(1uL); |
| } |
| aggr_state_proto->set_is_partitioned(aggr_state_is_partitioned); |
| aggr_state_proto->set_num_partitions(aggr_state_num_partitions); |
| |
| if (physical_plan->filter_predicate() != nullptr) { |
| unique_ptr<const Predicate> predicate(convertPredicate(physical_plan->filter_predicate())); |
| aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto()); |
| } |
| |
| const QueryPlan::DAGNodeIndex aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new AggregationOperator( |
| query_handle_->query_id(), |
| input_relation, |
| input_relation_info->isStoredRelation(), |
| aggr_state_index, |
| num_partitions)); |
| |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(aggregation_operator_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } else if (input_relation.isTemporary()) { |
| // NOTE(zuyu): drop the temp relation created by EliminateEmptyNode rule. |
| temporary_relation_info_vec_.emplace_back(aggregation_operator_index, &input_relation); |
| } |
| |
| if (use_parallel_initialization) { |
| DCHECK(aggr_state_proto->has_collision_free_vector_info()); |
| |
| const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new InitializeAggregationOperator( |
| query_handle_->query_id(), |
| aggr_state_index, |
| num_partitions, |
| aggr_state_proto->collision_free_vector_info().num_init_partitions())); |
| |
| execution_plan_->addDirectDependency(aggregation_operator_index, |
| initialize_aggregation_operator_index, |
| true /* is_pipeline_breaker */); |
| } |
| |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_plan, |
| &output_relation, |
| insert_destination_proto); |
| |
| const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new FinalizeAggregationOperator(query_handle_->query_id(), |
| aggr_state_index, |
| num_partitions, |
| physical_plan->hasRepartition(), |
| aggr_state_num_partitions, |
| *output_relation, |
| insert_destination_index)); |
| |
| insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index); |
| |
| execution_plan_->addDirectDependency(finalize_aggregation_operator_index, |
| aggregation_operator_index, |
| true /* is_pipeline_breaker */); |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_plan), |
| std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); |
| temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, |
| output_relation); |
| |
| const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index = |
| execution_plan_->addRelationalOperator( |
| new DestroyAggregationStateOperator(query_handle_->query_id(), |
| aggr_state_index, |
| num_partitions)); |
| |
| execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index, |
| finalize_aggregation_operator_index, |
| true); |
| |
| if (lip_filter_generator_ != nullptr) { |
| lip_filter_generator_->addAggregateInfo(physical_plan, |
| aggregation_operator_index); |
| } |
| } |
| |
| void ExecutionGenerator::convertCrossReferenceCoalesceAggregate( |
| const P::CrossReferenceCoalesceAggregatePtr &physical_plan) { |
| DCHECK_EQ(1u, physical_plan->left_join_attributes().size()); |
| DCHECK_EQ(1u, physical_plan->right_join_attributes().size()); |
| |
| const CatalogRelationInfo *left_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->left_child()); |
| const CatalogRelationInfo *right_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->right_child()); |
| const CatalogRelation &right_relation = *right_relation_info->relation; |
| |
| // TODO(quickstep-team): Support partitioned aggregation. |
| CHECK(!right_relation.hasPartitionScheme()); |
| const std::size_t num_partitions = 1u; |
| |
| // Create aggr state proto. |
| const QueryContext::aggregation_state_id aggr_state_index = |
| query_context_proto_->aggregation_states_size(); |
| S::QueryContext::AggregationOperationStateContext *aggr_state_context_proto = |
| query_context_proto_->add_aggregation_states(); |
| aggr_state_context_proto->set_num_partitions(num_partitions); |
| |
| S::AggregationOperationState *aggr_state_proto = |
| aggr_state_context_proto->mutable_aggregation_state(); |
| aggr_state_proto->set_relation_id(right_relation.getID()); |
| |
| // Group by the right join attribute. |
| std::unique_ptr<const Scalar> execution_group_by_expression( |
| physical_plan->right_join_attributes().front()->concretize( |
| attribute_substitution_map_)); |
| aggr_state_proto->add_group_by_expressions()->MergeFrom( |
| execution_group_by_expression->getProto()); |
| |
| aggr_state_proto->set_hash_table_impl_type( |
| serialization::HashTableImplType::COLLISION_FREE_VECTOR); |
| |
| const size_t estimated_num_entries = physical_plan->group_by_key_value_range(); |
| aggr_state_proto->set_estimated_num_entries(estimated_num_entries); |
| |
| const size_t aggr_state_num_partitions = |
| CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(estimated_num_entries); |
| aggr_state_proto->set_num_partitions(aggr_state_num_partitions); |
| |
| if (physical_plan->right_filter_predicate() != nullptr) { |
| std::unique_ptr<const Predicate> predicate( |
| convertPredicate(physical_plan->right_filter_predicate())); |
| aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto()); |
| } |
| |
| vector<pair<AggregationID, vector<const Type *>>> group_by_aggrs_info; |
| for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { |
| const E::AggregateFunctionPtr unnamed_aggregate_expression = |
| std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression()); |
| |
| // Add a new entry in 'aggregates'. |
| S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates(); |
| |
| // Set the AggregateFunction. |
| const AggregateFunction &aggr_func = unnamed_aggregate_expression->getAggregate(); |
| aggr_proto->mutable_function()->MergeFrom(aggr_func.getProto()); |
| |
| // Add each of the aggregate's arguments. |
| vector<const Type *> argument_types; |
| for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) { |
| unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_)); |
| argument_types.push_back(&concretized_argument->getType()); |
| |
| aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto()); |
| } |
| |
| group_by_aggrs_info.emplace_back(aggr_func.getAggregationID(), move(argument_types)); |
| |
| // Set whether it is a DISTINCT aggregation. |
| DCHECK(!unnamed_aggregate_expression->is_distinct()); |
| aggr_proto->set_is_distinct(false); |
| } |
| |
| CalculateCollisionFreeAggregationInfo(estimated_num_entries, group_by_aggrs_info, |
| aggr_state_proto->mutable_collision_free_vector_info()); |
| |
| const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new InitializeAggregationOperator( |
| query_handle_->query_id(), |
| aggr_state_index, |
| num_partitions, |
| aggr_state_proto->collision_free_vector_info().num_init_partitions())); |
| |
| const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index = |
| execution_plan_->addRelationalOperator( |
| new BuildAggregationExistenceMapOperator( |
| query_handle_->query_id(), |
| *left_relation_info->relation, |
| physical_plan->left_join_attributes().front()->id(), |
| left_relation_info->isStoredRelation(), |
| aggr_state_index, |
| num_partitions)); |
| |
| if (!left_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index, |
| left_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| const QueryPlan::DAGNodeIndex aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new AggregationOperator( |
| query_handle_->query_id(), |
| right_relation, |
| right_relation_info->isStoredRelation(), |
| aggr_state_index, |
| num_partitions)); |
| |
| if (!right_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(aggregation_operator_index, |
| right_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| |
| // Build aggregation existence map once initialization is done. |
| execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index, |
| initialize_aggregation_operator_index, |
| true /* is_pipeline_breaker */); |
| |
| // Start aggregation after building existence map. |
| execution_plan_->addDirectDependency(aggregation_operator_index, |
| build_aggregation_existence_map_operator_index, |
| true /* is_pipeline_breaker */); |
| |
| |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_plan, |
| &output_relation, |
| insert_destination_proto); |
| |
| const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new FinalizeAggregationOperator(query_handle_->query_id(), |
| aggr_state_index, |
| num_partitions, |
| physical_plan->hasRepartition(), |
| aggr_state_num_partitions, |
| *output_relation, |
| insert_destination_index)); |
| |
| insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index); |
| |
| execution_plan_->addDirectDependency(finalize_aggregation_operator_index, |
| aggregation_operator_index, |
| true /* is_pipeline_breaker */); |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_plan), |
| std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); |
| temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, |
| output_relation); |
| |
| const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index = |
| execution_plan_->addRelationalOperator( |
| new DestroyAggregationStateOperator(query_handle_->query_id(), |
| aggr_state_index, |
| num_partitions)); |
| |
| execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index, |
| finalize_aggregation_operator_index, |
| true); |
| } |
| |
| void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { |
| // Create sort configuration for run generation. |
| vector<bool> sort_ordering(physical_sort->sort_ascending()); |
| vector<bool> sort_null_ordering(physical_sort->nulls_first_flags()); |
| PtrVector<Scalar> sort_run_gen_attributes; |
| for (const E::AttributeReferencePtr &sort_attribute : |
| physical_sort->sort_attributes()) { |
| sort_run_gen_attributes.push_back( |
| sort_attribute->concretize(attribute_substitution_map_)); |
| } |
| const SortConfiguration sort_run_gen_config(sort_run_gen_attributes, |
| std::move(sort_ordering), |
| std::move(sort_null_ordering)); |
| const QueryContext::sort_config_id sort_run_gen_config_id = |
| query_context_proto_->sort_configs_size(); |
| S::SortConfiguration *sort_run_gen_config_proto = |
| query_context_proto_->add_sort_configs(); |
| sort_run_gen_config_proto->MergeFrom(sort_run_gen_config.getProto()); |
| |
| // Create SortRunGenerationOperator. |
| const CatalogRelation *initial_runs_relation; |
| const QueryContext::insert_destination_id initial_runs_destination_id = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *initial_runs_destination_proto = |
| query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation( |
| physical_sort, &initial_runs_relation, initial_runs_destination_proto); |
| |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_sort->input()); |
| const QueryPlan::DAGNodeIndex run_generator_index = |
| execution_plan_->addRelationalOperator(new SortRunGenerationOperator( |
| query_handle_->query_id(), |
| *input_relation_info->relation, |
| *initial_runs_relation, |
| initial_runs_destination_id, |
| sort_run_gen_config_id, |
| input_relation_info->isStoredRelation())); |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(run_generator_index, |
| input_relation_info->producer_operator_index, |
| false /* is_pipeline_breaker */); |
| } |
| temporary_relation_info_vec_.emplace_back(run_generator_index, |
| initial_runs_relation); |
| initial_runs_destination_proto->set_relational_op_index(run_generator_index); |
| |
| // Create sort configuration for run merging. |
| sort_ordering = physical_sort->sort_ascending(); |
| sort_null_ordering = physical_sort->nulls_first_flags(); |
| PtrVector<Scalar> sort_merge_run_attributes; |
| for (const E::AttributeReferencePtr &sort_attribute : |
| physical_sort->sort_attributes()) { |
| sort_merge_run_attributes.push_back( |
| sort_attribute->concretize(attribute_substitution_map_)); |
| } |
| const SortConfiguration sort_merge_run_config(sort_merge_run_attributes, |
| std::move(sort_ordering), |
| std::move(sort_null_ordering)); |
| const QueryContext::sort_config_id sort_merge_run_config_id = |
| query_context_proto_->sort_configs_size(); |
| query_context_proto_->add_sort_configs()->MergeFrom( |
| sort_merge_run_config.getProto()); |
| |
| // Create SortMergeRunOperator. |
| const CatalogRelation *merged_runs_relation; |
| const QueryContext::insert_destination_id merged_runs_destination_id = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *merged_runs_destination_proto = |
| query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_sort, |
| &merged_runs_relation, |
| merged_runs_destination_proto); |
| const CatalogRelation *sorted_relation; |
| const QueryContext::insert_destination_id sorted_output_destination_id = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *sorted_output_destination_proto = |
| query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_sort, |
| &sorted_relation, |
| sorted_output_destination_proto); |
| |
| // TODO(qzeng): Make the merge factor configurable. |
| const QueryPlan::DAGNodeIndex merge_run_operator_index = |
| execution_plan_->addRelationalOperator(new SortMergeRunOperator( |
| query_handle_->query_id(), |
| *initial_runs_relation, |
| *sorted_relation, |
| sorted_output_destination_id, |
| *merged_runs_relation, |
| merged_runs_destination_id, |
| sort_merge_run_config_id, |
| 64 /* merge_factor */, |
| physical_sort->limit(), |
| false /* input_relation_is_stored */)); |
| |
| execution_plan_->addDirectDependency(merge_run_operator_index, |
| run_generator_index, |
| false /* is_pipeline_breaker */); |
| merged_runs_destination_proto->set_relational_op_index(merge_run_operator_index); |
| sorted_output_destination_proto->set_relational_op_index(merge_run_operator_index); |
| |
| // Do not add merged_runs_relation into 'temporary_relation_info_vec_' |
| // and create the DropTableOperator for it at the end. Instead, add the drop |
| // operator right here, because the relation won't be used by any other operator. |
| const QueryPlan::DAGNodeIndex drop_merged_runs_index = |
| execution_plan_->addRelationalOperator( |
| new DropTableOperator( |
| query_handle_->query_id(), |
| *merged_runs_relation, |
| catalog_database_, |
| false /* only_drop_blocks */)); |
| execution_plan_->addDirectDependency( |
| drop_merged_runs_index, |
| merge_run_operator_index, |
| true /* is_pipeline_breaker */); |
| |
| temporary_relation_info_vec_.emplace_back(merge_run_operator_index, |
| sorted_relation); |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_sort), |
| std::forward_as_tuple(merge_run_operator_index, |
| sorted_relation)); |
| } |
| |
| void ExecutionGenerator::convertTableGenerator( |
| const P::TableGeneratorPtr &physical_tablegen) { |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = |
| query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_tablegen, |
| &output_relation, |
| insert_destination_proto); |
| |
| // Create GeneratorFunctionHandle proto |
| const QueryContext::generator_function_id generator_function_index = |
| query_context_proto_->generator_functions_size(); |
| query_context_proto_->add_generator_functions()->MergeFrom( |
| physical_tablegen->generator_function_handle()->getProto()); |
| |
| TableGeneratorOperator *op = |
| new TableGeneratorOperator(query_handle_->query_id(), |
| *output_relation, |
| insert_destination_index, |
| generator_function_index); |
| |
| const QueryPlan::DAGNodeIndex tablegen_index = |
| execution_plan_->addRelationalOperator(op); |
| insert_destination_proto->set_relational_op_index(tablegen_index); |
| |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_tablegen), |
| std::forward_as_tuple(tablegen_index, |
| output_relation)); |
| temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation); |
| } |
| |
| void ExecutionGenerator::convertWindowAggregate( |
| const P::WindowAggregatePtr &physical_plan) { |
| // Create window_aggregation_operation_state proto. |
| const QueryContext::window_aggregation_state_id window_aggr_state_index = |
| query_context_proto_->window_aggregation_states_size(); |
| S::WindowAggregationOperationState *window_aggr_state_proto = |
| query_context_proto_->add_window_aggregation_states(); |
| |
| // Get input. |
| const CatalogRelationInfo *input_relation_info = |
| findRelationInfoOutputByPhysical(physical_plan->input()); |
| const CatalogRelation &input_relation = *input_relation_info->relation; |
| DCHECK_EQ(1u, input_relation.getNumPartitions()); |
| window_aggr_state_proto->set_input_relation_id(input_relation.getID()); |
| |
| // Get window aggregate function expression. |
| const E::AliasPtr &named_window_aggregate_expression = |
| physical_plan->window_aggregate_expression(); |
| const E::WindowAggregateFunctionPtr &window_aggregate_function = |
| std::static_pointer_cast<const E::WindowAggregateFunction>( |
| named_window_aggregate_expression->expression()); |
| |
| // Set the WindowAggregateFunction. |
| window_aggr_state_proto->mutable_function()->MergeFrom( |
| window_aggregate_function->window_aggregate().getProto()); |
| |
| // Set the arguments. |
| for (const E::ScalarPtr &argument : window_aggregate_function->arguments()) { |
| unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_)); |
| window_aggr_state_proto->add_arguments()->MergeFrom(concretized_argument->getProto()); |
| } |
| |
| // Set partition keys. |
| const E::WindowInfo &window_info = window_aggregate_function->window_info(); |
| for (const E::ScalarPtr &partition_by_attribute |
| : window_info.partition_by_attributes) { |
| unique_ptr<const Scalar> concretized_partition_by_attribute( |
| partition_by_attribute->concretize(attribute_substitution_map_)); |
| window_aggr_state_proto->add_partition_by_attributes() |
| ->MergeFrom(concretized_partition_by_attribute->getProto()); |
| } |
| |
| // Set order keys. |
| for (const E::ScalarPtr &order_by_attribute |
| : window_info.order_by_attributes) { |
| unique_ptr<const Scalar> concretized_order_by_attribute( |
| order_by_attribute->concretize(attribute_substitution_map_)); |
| window_aggr_state_proto->add_order_by_attributes() |
| ->MergeFrom(concretized_order_by_attribute->getProto()); |
| } |
| |
| // Set window frame info. |
| if (window_info.frame_info == nullptr) { |
| // If the frame is not specified, use the default setting: |
| // 1. If ORDER BY key is specified, use cumulative aggregation: |
| // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. |
| // 2. If ORDER BY key is not specified either, use the whole partition: |
| // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. |
| window_aggr_state_proto->set_is_row(true); // frame mode: ROWS. |
| window_aggr_state_proto->set_num_preceding(-1); // UNBOUNDED PRECEDING. |
| window_aggr_state_proto->set_num_following( |
| window_info.order_by_attributes.empty() |
| ? -1 // UNBOUNDED FOLLOWING. |
| : 0); // CURRENT ROW. |
| } else { |
| const E::WindowFrameInfo *window_frame_info = window_info.frame_info; |
| window_aggr_state_proto->set_is_row(window_frame_info->is_row); |
| window_aggr_state_proto->set_num_preceding(window_frame_info->num_preceding); |
| window_aggr_state_proto->set_num_following(window_frame_info->num_following); |
| } |
| |
| // Create InsertDestination proto. |
| const CatalogRelation *output_relation = nullptr; |
| const QueryContext::insert_destination_id insert_destination_index = |
| query_context_proto_->insert_destinations_size(); |
| S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); |
| createTemporaryCatalogRelation(physical_plan, |
| &output_relation, |
| insert_destination_proto); |
| |
| const QueryPlan::DAGNodeIndex window_aggregation_operator_index = |
| execution_plan_->addRelationalOperator( |
| new WindowAggregationOperator(query_handle_->query_id(), |
| input_relation, |
| *output_relation, |
| window_aggr_state_index, |
| insert_destination_index)); |
| |
| // TODO(Shixuan): Once parallelism is introduced, the is_pipeline_breaker |
| // could be set to false. |
| if (!input_relation_info->isStoredRelation()) { |
| execution_plan_->addDirectDependency(window_aggregation_operator_index, |
| input_relation_info->producer_operator_index, |
| true /* is_pipeline_breaker */); |
| } else if (input_relation.isTemporary()) { |
| // NOTE(zuyu): drop the temp relation created by EliminateEmptyNode rule. |
| temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index, &input_relation); |
| } |
| |
| insert_destination_proto->set_relational_op_index(window_aggregation_operator_index); |
| |
| // Add to map and temp_relation_info_vec. |
| physical_to_output_relation_map_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(physical_plan), |
| std::forward_as_tuple(window_aggregation_operator_index, output_relation)); |
| temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index, |
| output_relation); |
| } |
| |
| } // namespace optimizer |
| } // namespace quickstep |