| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #include "query_optimizer/LIPFilterGenerator.hpp" |
| |
| #include <map> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "catalog/CatalogAttribute.hpp" |
| #include "query_execution/QueryContext.pb.h" |
| #include "query_optimizer/physical/LIPFilterConfiguration.hpp" |
| #include "relational_operators/RelationalOperator.hpp" |
| #include "types/Type.hpp" |
| #include "utility/lip_filter/LIPFilter.hpp" |
| #include "utility/lip_filter/LIPFilter.pb.h" |
| |
| #include "glog/logging.h" |
| |
| namespace quickstep { |
| namespace optimizer { |
| |
| namespace E = ::quickstep::optimizer::expressions; |
| namespace P = ::quickstep::optimizer::physical; |
| |
| void LIPFilterGenerator::registerAttributeMap( |
| const P::PhysicalPtr &node, |
| const std::unordered_map<E::ExprId, const CatalogAttribute *> &attribute_substitution_map) { |
| // Check if a builder is attached to node. |
| const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap(); |
| const auto build_it = build_info_map.find(node); |
| if (build_it != build_info_map.end()) { |
| auto &map_entry = attribute_map_[node]; |
| for (const auto &info : build_it->second) { |
| E::ExprId attr_id = info->build_attribute()->id(); |
| map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id)); |
| } |
| } |
| // Check if a prober is attached to node. |
| const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap(); |
| const auto probe_it = probe_info_map.find(node); |
| if (probe_it != probe_info_map.end()) { |
| auto &map_entry = attribute_map_[node]; |
| for (const auto &info : probe_it->second) { |
| E::ExprId attr_id = info->probe_attribute()->id(); |
| map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id)); |
| } |
| } |
| } |
| |
| void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan, |
| serialization::QueryContext *query_context_proto) { |
| lip_filter_builder_map_.clear(); |
| lip_filter_deployment_protos_.clear(); |
| |
| // Deploy builders |
| const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap(); |
| for (const auto &info : builder_infos_) { |
| const auto build_it = build_info_map.find(info.builder_node); |
| if (build_it != build_info_map.end()) { |
| deployBuilderInternal(execution_plan, |
| query_context_proto, |
| info.builder_node, |
| info.builder_operator_index, |
| build_it->second); |
| } |
| } |
| |
| // Deploy probers |
| const auto &probe_info_map = lip_filter_configuration_->getProbeInfoMap(); |
| for (const auto &info : prober_infos_) { |
| const auto probe_it = probe_info_map.find(info.prober_node); |
| if (probe_it != probe_info_map.end()) { |
| deployProberInteral(execution_plan, |
| query_context_proto, |
| info.prober_node, |
| info.prober_operator_index, |
| probe_it->second); |
| } |
| } |
| |
| // Attach LIPFilterDeployment information to the RelationalOperators. |
| for (const auto &entry : lip_filter_deployment_protos_) { |
| RelationalOperator *relop = |
| execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(entry.first); |
| relop->deployLIPFilters(entry.second.first); |
| } |
| } |
| |
| void LIPFilterGenerator::deployBuilderInternal( |
| QueryPlan *execution_plan, |
| serialization::QueryContext *query_context_proto, |
| const physical::PhysicalPtr &builder_node, |
| const QueryPlan::DAGNodeIndex builder_operator_index, |
| const std::vector<physical::LIPFilterBuildInfoPtr> &build_info_vec) { |
| auto *lip_filter_deployment_info_proto = |
| getLIPFilterDeploymentProto(builder_operator_index, query_context_proto); |
| |
| const auto &builder_attribute_map = attribute_map_.at(builder_node); |
| for (const auto &info : build_info_vec) { |
| // Add the LIPFilter information into query context. |
| const QueryContext::lip_filter_id lip_filter_id = query_context_proto->lip_filters_size(); |
| serialization::LIPFilter *lip_filter_proto = query_context_proto->add_lip_filters(); |
| const CatalogAttribute *target_attr = |
| builder_attribute_map.at(info->build_attribute()->id()); |
| const Type &attr_type = target_attr->getType(); |
| |
| switch (info->filter_type()) { |
| case LIPFilterType::kSingleIdentityHashFilter: { |
| DCHECK(!attr_type.isVariableLength()); |
| const P::SingleIdentityHashFilterBuildInfo &sihf_info = |
| *std::static_pointer_cast<const P::SingleIdentityHashFilterBuildInfo>(info); |
| lip_filter_proto->set_lip_filter_type( |
| serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER); |
| lip_filter_proto->SetExtension(serialization::SingleIdentityHashFilter::filter_cardinality, |
| sihf_info.filter_cardinality()); |
| lip_filter_proto->SetExtension(serialization::SingleIdentityHashFilter::attribute_size, |
| attr_type.minimumByteLength()); |
| break; |
| } |
| case LIPFilterType::kBitVectorExactFilter: { |
| DCHECK(!attr_type.isVariableLength()); |
| const P::BitVectorExactFilterBuildInfo &bvef_info = |
| *std::static_pointer_cast<const P::BitVectorExactFilterBuildInfo>(info); |
| lip_filter_proto->set_lip_filter_type( |
| serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER); |
| lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::min_value, |
| bvef_info.min_value()); |
| lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::max_value, |
| bvef_info.max_value()); |
| lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::attribute_size, |
| attr_type.minimumByteLength()); |
| lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::is_anti_filter, |
| bvef_info.is_anti_filter()); |
| break; |
| } |
| default: |
| LOG(FATAL) << "Unsupported LIPFilter type"; |
| break; |
| } |
| |
| // Register the builder information which is needed later by the probers. |
| lip_filter_builder_map_.emplace( |
| std::make_pair(info->build_attribute()->id(), builder_node), |
| std::make_pair(lip_filter_id, builder_operator_index)); |
| |
| // Add the builder deployment information into query context. |
| auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_build_entries(); |
| lip_filter_entry_proto->set_lip_filter_id(lip_filter_id); |
| lip_filter_entry_proto->set_attribute_id(target_attr->getID()); |
| lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(attr_type.getProto()); |
| } |
| } |
| |
| void LIPFilterGenerator::deployProberInteral( |
| QueryPlan *execution_plan, |
| serialization::QueryContext *query_context_proto, |
| const physical::PhysicalPtr &prober_node, |
| const QueryPlan::DAGNodeIndex prober_operator_index, |
| const std::vector<physical::LIPFilterProbeInfoPtr> &probe_info_vec) { |
| auto *lip_filter_deployment_info_proto = |
| getLIPFilterDeploymentProto(prober_operator_index, query_context_proto); |
| |
| const auto &prober_attribute_map = attribute_map_.at(prober_node); |
| for (const auto &info : probe_info_vec) { |
| // Find the corresponding builder for the to-be-probed LIPFilter. |
| const auto &builder_info = |
| lip_filter_builder_map_.at( |
| std::make_pair(info->build_attribute()->id(), info->builder())); |
| const CatalogAttribute *target_attr = |
| prober_attribute_map.at(info->probe_attribute()->id()); |
| |
| // Add the prober deployment information into query context. |
| auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_probe_entries(); |
| lip_filter_entry_proto->set_lip_filter_id(builder_info.first); |
| lip_filter_entry_proto->set_attribute_id(target_attr->getID()); |
| lip_filter_entry_proto->mutable_attribute_type()->CopyFrom( |
| target_attr->getType().getProto()); |
| |
| // A prober must wait until the corresponding builder has completed building |
| // the LIPFilter. |
| execution_plan->addOrUpgradeDirectDependency(prober_operator_index, |
| builder_info.second, |
| true /* is_pipeline_breaker */); |
| } |
| } |
| |
| serialization::LIPFilterDeployment* LIPFilterGenerator::getLIPFilterDeploymentProto( |
| const QueryPlan::DAGNodeIndex op_index, |
| serialization::QueryContext *query_context_proto) { |
| const auto proto_it = lip_filter_deployment_protos_.find(op_index); |
| if (proto_it == lip_filter_deployment_protos_.end()) { |
| const int lip_deployment_index = |
| query_context_proto->lip_filter_deployments_size(); |
| auto *lip_filter_deployment_proto = |
| query_context_proto->add_lip_filter_deployments(); |
| lip_filter_deployment_protos_.emplace( |
| op_index, std::make_pair(lip_deployment_index, lip_filter_deployment_proto)); |
| return lip_filter_deployment_proto; |
| } else { |
| return proto_it->second.second; |
| } |
| } |
| |
| } // namespace optimizer |
| } // namespace quickstep |