blob: 9a13b4810c04f458a3e2c2931e1e02bb23ab988d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_optimizer/rules/AttachLIPFilters.hpp"
#include <algorithm>
#include <cstdint>
#include <map>
#include <set>
#include <unordered_set>
#include <unordered_map>
#include <vector>
#include <utility>
#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
#include "query_optimizer/physical/PatternMatcher.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/Selection.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "utility/lip_filter/LIPFilter.hpp"
#include "glog/logging.h"
namespace quickstep {
namespace optimizer {
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
P::PhysicalPtr AttachLIPFilters::apply(const P::PhysicalPtr &input) {
DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
const P::TopLevelPlanPtr top_level_plan =
std::static_pointer_cast<const P::TopLevelPlan>(input);
cost_model_.reset(
new cost::StarSchemaSimpleCostModel(
top_level_plan->shared_subplans()));
const P::LIPFilterConfigurationPtr &existing_configuration =
top_level_plan->lip_filter_configuration();
if (existing_configuration != nullptr) {
lip_filter_configuration_.reset(existing_configuration->clone());
} else {
lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
}
std::set<E::ExprId> already_filtered_attributes;
attachLIPFilters(NodeList(input), &already_filtered_attributes);
P::PhysicalPtr output;
if (!lip_filter_configuration_->getBuildInfoMap().empty() ||
!lip_filter_configuration_->getProbeInfoMap().empty()) {
output = top_level_plan->copyWithLIPFilterConfiguration(
P::LIPFilterConfigurationPtr(lip_filter_configuration_.release()));
} else {
output = input;
}
return output;
}
void AttachLIPFilters::attachLIPFilters(
const NodeList &path,
std::set<expressions::ExprId> *already_filtered_attributes) {
const P::PhysicalPtr &node = path.node;
// First process child nodes
for (const auto &child : node->children()) {
std::set<E::ExprId> child_filtered_attributes;
attachLIPFilters(path.cons(child), &child_filtered_attributes);
already_filtered_attributes->insert(child_filtered_attributes.begin(),
child_filtered_attributes.end());
}
// Attach LIP filters to HashJoin/Selection/Aggregate nodes
P::PhysicalPtr probe_child = nullptr;
switch (node->getPhysicalType()) {
case P::PhysicalType::kHashJoin:
probe_child = std::static_pointer_cast<const P::HashJoin>(node)->left();
break;
case P::PhysicalType::kSelection:
probe_child = std::static_pointer_cast<const P::Selection>(node)->input();
break;
case P::PhysicalType::kAggregate:
probe_child = std::static_pointer_cast<const P::Aggregate>(node)->input();
break;
default:
break;
}
if (probe_child != nullptr &&
cost_model_->estimateCardinality(probe_child) >= 100000) {
const auto &candidate_lip_filters = getProbeSideInfo(path.cons(probe_child));
if (!candidate_lip_filters.empty()) {
std::map<E::AttributeReferencePtr, LIPFilterInfoPtr> selected_filters;
for (const auto &info : candidate_lip_filters) {
auto it = selected_filters.find(info->attribute);
if (it == selected_filters.end()) {
selected_filters.emplace(info->attribute, info);
} else if (LIPFilterInfo::isBetterThan(*info, *it->second)) {
it->second = info;
}
}
for (const auto &pair : selected_filters) {
const E::ExprId source_attr_id = pair.second->source_attribute->id();
if (already_filtered_attributes->find(source_attr_id)
== already_filtered_attributes->end()) {
bool use_exact_filter = false;
std::int64_t min_cpp_value;
std::int64_t max_cpp_value;
const bool has_exact_min_max_stats =
findExactMinMaxValuesForAttributeHelper(pair.second->source,
pair.second->source_attribute,
&min_cpp_value,
&max_cpp_value);
if (has_exact_min_max_stats) {
const std::int64_t value_range = max_cpp_value - min_cpp_value;
DCHECK_GE(value_range, 0);
// TODO(jianqiao): Add this threshold as a gflag (together with
// InjectJoinFilters::kMaxFilterSize).
if (value_range <= 1000000000L) {
use_exact_filter = true;
}
}
if (use_exact_filter) {
lip_filter_configuration_->addBuildInfo(
P::BitVectorExactFilterBuildInfo::Create(
pair.second->source_attribute,
min_cpp_value,
max_cpp_value,
false),
pair.second->source);
} else {
lip_filter_configuration_->addBuildInfo(
P::SingleIdentityHashFilterBuildInfo::Create(
pair.second->source_attribute,
std::max(64uL, pair.second->estimated_cardinality * 8u)),
pair.second->source);
}
lip_filter_configuration_->addProbeInfo(
P::LIPFilterProbeInfo::Create(
pair.first,
pair.second->source_attribute,
pair.second->source),
node);
already_filtered_attributes->emplace(source_attr_id);
}
}
}
}
}
const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
::getBuildSideInfo(const NodeList &path) {
const P::PhysicalPtr &node = path.node;
if (build_side_info_.find(node) == build_side_info_.end()) {
std::vector<LIPFilterInfoPtr> lip_filters;
// 1. Gather candidate LIP filters propagated from descendant nodes.
std::unordered_set<E::ExprId> output_attribute_ids;
for (const auto &attr : node->getOutputAttributes()) {
output_attribute_ids.emplace(attr->id());
}
switch (node->getPhysicalType()) {
case P::PhysicalType::kAggregate:
case P::PhysicalType::kSelection:
case P::PhysicalType::kHashJoin: {
for (const P::PhysicalPtr &child : node->children()) {
for (const LIPFilterInfoPtr &info : getBuildSideInfo(path.cons(child))) {
lip_filters.emplace_back(info);
}
}
break;
}
default:
break;
}
// 2. Consider the parent physical node. If it is a HashJoin,
// then each build-side join attribute is a candidate LIP filter
// which can be built by the BuildHashOperator that corresponds
// to the parent HashJoin node.
P::HashJoinPtr hash_join;
if (path.cdr() != nullptr &&
P::SomeHashJoin::MatchesWithConditionalCast(path.cdr()->node, &hash_join)) {
const P::PhysicalPtr &build_node = hash_join->right();
// TODO(jianqiao): consider probe-side info to allow cascading propagation.
double selectivity = cost_model_->estimateSelectivity(build_node);
// Only consider attributes that are selective.
if (selectivity < 1.0) {
std::size_t cardinality = cost_model_->estimateCardinality(build_node);
for (const auto &attr : hash_join->right_join_attributes()) {
// NOTE(jianqiao): currently we only consider attributes of primitive
// fixed-length types.
if (TypedValue::HashIsReversible(attr->getValueType().getTypeID())) {
lip_filters.emplace_back(
std::make_shared<LIPFilterInfo>(attr,
path.cdr()->node,
path.depth,
selectivity,
cardinality));
}
}
}
}
build_side_info_.emplace(node, std::move(lip_filters));
}
return build_side_info_.at(node);
}
const std::vector<AttachLIPFilters::LIPFilterInfoPtr>& AttachLIPFilters
::getProbeSideInfo(const NodeList &path) {
const P::PhysicalPtr &node = path.node;
if (probe_side_info_.find(node) == probe_side_info_.end()) {
std::vector<LIPFilterInfoPtr> lip_filters;
if (path.cdr() != nullptr) {
// 1. Gather candidate LIP filters propagated from ancestor nodes.
const auto &parent_lip_filters = getProbeSideInfo(*path.cdr());
if (!parent_lip_filters.empty()) {
std::unordered_set<E::ExprId> output_attribute_ids;
for (const auto &attr : node->getOutputAttributes()) {
output_attribute_ids.emplace(attr->id());
}
for (const auto &info : parent_lip_filters) {
if (output_attribute_ids.find(info->attribute->id()) != output_attribute_ids.end()) {
lip_filters.emplace_back(info);
}
}
}
// 2. Consider the parent physical node. If it is an InnerHashJoin or
// LeftSemiHashJoin, then we can propagate the build-side LIP filters
// to the probe-side.
P::HashJoinPtr hash_join;
if (P::SomeHashJoin::MatchesWithConditionalCast(path.cdr()->node, &hash_join) &&
(hash_join->join_type() == P::HashJoin::JoinType::kInnerJoin ||
hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin)) {
const P::PhysicalPtr &build_side_child = hash_join->right();
std::unordered_map<E::ExprId, E::AttributeReferencePtr> join_attribute_pairs;
for (std::size_t i = 0; i < hash_join->left_join_attributes().size(); ++i) {
const E::AttributeReferencePtr probe_side_join_attribute =
hash_join->left_join_attributes()[i];
const E::AttributeReferencePtr build_side_join_attribute =
hash_join->right_join_attributes()[i];
join_attribute_pairs.emplace(build_side_join_attribute->id(),
probe_side_join_attribute);
}
for (const auto &info : getBuildSideInfo(path.cdr()->cons(build_side_child))) {
const auto pair_it = join_attribute_pairs.find(info->attribute->id());
if (pair_it != join_attribute_pairs.end()) {
lip_filters.emplace_back(
std::make_shared<LIPFilterInfo>(pair_it->second,
info->source,
info->depth,
info->estimated_selectivity,
info->estimated_cardinality,
info->attribute));
}
}
}
}
probe_side_info_.emplace(node, std::move(lip_filters));
}
return probe_side_info_.at(node);
}
bool AttachLIPFilters::findExactMinMaxValuesForAttributeHelper(
const physical::PhysicalPtr &physical_plan,
const expressions::AttributeReferencePtr &attribute,
std::int64_t *min_cpp_value,
std::int64_t *max_cpp_value) const {
bool min_value_is_exact;
bool max_value_is_exact;
const TypedValue min_value =
cost_model_->findMinValueStat(physical_plan, attribute, &min_value_is_exact);
const TypedValue max_value =
cost_model_->findMaxValueStat(physical_plan, attribute, &max_value_is_exact);
if (min_value.isNull() || max_value.isNull() ||
(!min_value_is_exact) || (!max_value_is_exact)) {
return false;
}
switch (attribute->getValueType().getTypeID()) {
case TypeID::kInt: {
*min_cpp_value = min_value.getLiteral<int>();
*max_cpp_value = max_value.getLiteral<int>();
return true;
}
case TypeID::kLong: {
*min_cpp_value = min_value.getLiteral<std::int64_t>();
*max_cpp_value = max_value.getLiteral<std::int64_t>();
return true;
}
default:
return false;
}
}
} // namespace optimizer
} // namespace quickstep