blob: dcdd27a6686323094d1405ff955570f92690f8e1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_optimizer/rules/ReduceGroupByAttributes.hpp"
#include <algorithm>
#include <map>
#include <vector>
#include <unordered_set>
#include <utility>
#include "catalog/CatalogRelation.hpp"
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/ExpressionUtil.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
#include "query_optimizer/physical/PatternMatcher.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/rules/PruneColumns.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
namespace quickstep {
namespace optimizer {
DEFINE_uint64(reduce_group_by_attributes_threshold, 3u,
"The threshold for a stored relation's number of attributes in a "
"group-by clause for the ReduceGroupByAttributes optimization "
"rule to pull the stored relation up the aggregation");
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
P::PhysicalPtr ReduceGroupByAttributes::apply(const P::PhysicalPtr &input) {
DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
cost_model_.reset(new cost::StarSchemaSimpleCostModel(
std::static_pointer_cast<const P::TopLevelPlan>(input)->shared_subplans()));
P::PhysicalPtr output = applyInternal(input);
if (output != input) {
output = PruneColumns().apply(output);
}
return output;
}
P::PhysicalPtr ReduceGroupByAttributes::applyInternal(const P::PhysicalPtr &input) {
std::vector<P::PhysicalPtr> new_children;
for (const P::PhysicalPtr &child : input->children()) {
new_children.push_back(applyInternal(child));
}
if (new_children != input->children()) {
return applyToNode(input->copyWithNewChildren(new_children));
} else {
return applyToNode(input);
}
}
P::PhysicalPtr ReduceGroupByAttributes::applyToNode(const P::PhysicalPtr &input) {
P::TableReferencePtr table_reference;
if (P::SomeTableReference::MatchesWithConditionalCast(input, &table_reference)) {
// Collect the attributes-to-TableReference mapping info.
for (const auto &attr : table_reference->attribute_list()) {
source_.emplace(attr->id(), std::make_pair(table_reference, attr));
}
return input;
}
P::AggregatePtr aggregate;
if (!P::SomeAggregate::MatchesWithConditionalCast(input, &aggregate) ||
aggregate->grouping_expressions().size() <= 1u) {
return input;
}
// Divide the group-by attributes into groups based on their source table.
std::map<P::TableReferencePtr, std::vector<E::AttributeReferencePtr>> table_attributes;
for (const auto &expr : aggregate->grouping_expressions()) {
const auto source_it = source_.find(expr->id());
if (source_it != source_.end()) {
table_attributes[source_it->second.first].emplace_back(source_it->second.second);
}
}
std::unordered_set<E::ExprId> erased_grouping_attr_ids;
std::vector<std::pair<P::TableReferencePtr, E::AttributeReferencePtr>> hoisted_tables;
// For each group (i.e. each source table), if it is profitable then we pull
// the table up the aggregation.
for (const auto &pair : table_attributes) {
const P::TableReferencePtr table = pair.first;
const std::vector<E::AttributeReferencePtr> &attributes = pair.second;
// TODO(jianqiao): find a cost-based metic instead of hard-coding the threshold
// number of group-by attributes.
if (attributes.size() <= FLAGS_reduce_group_by_attributes_threshold) {
continue;
}
std::vector<AttributeInfo> attr_infos;
for (const auto &attr : attributes) {
attr_infos.emplace_back(attr,
cost_model_->impliesUniqueAttributes(table, {attr}),
!attr->getValueType().isVariableLength(),
attr->getValueType().maximumByteLength());
}
std::vector<const AttributeInfo *> attr_info_refs;
for (const auto &info : attr_infos) {
attr_info_refs.emplace_back(&info);
}
std::sort(attr_info_refs.begin(),
attr_info_refs.end(),
AttributeInfo::IsBetterThan);
const AttributeInfo &best_candidate = *attr_info_refs.front();
if (!best_candidate.is_unique) {
// Cannot find a key attribute. Give up pulling this table up.
continue;
}
const E::AttributeReferencePtr key_attribute = best_candidate.attribute;
hoisted_tables.emplace_back(table, key_attribute);
for (const auto &attr : attributes) {
if (attr->id() != key_attribute->id()) {
erased_grouping_attr_ids.emplace(attr->id());
}
}
}
if (erased_grouping_attr_ids.empty()) {
return input;
}
// Reconstuct the Aggregate node with reduced group-by attributes and then
// construct HashJoin nodes on top of the Aggregate.
std::vector<E::NamedExpressionPtr> reduced_grouping_expressions;
for (const auto &expr : aggregate->grouping_expressions()) {
if (erased_grouping_attr_ids.find(expr->id()) == erased_grouping_attr_ids.end()) {
reduced_grouping_expressions.emplace_back(expr);
}
}
const P::AggregatePtr new_aggregate =
P::Aggregate::Create(aggregate->input(),
reduced_grouping_expressions,
aggregate->aggregate_expressions(),
aggregate->filter_predicate());
P::PhysicalPtr output = new_aggregate;
std::vector<E::NamedExpressionPtr> project_expressions =
E::ToNamedExpressions(output->getOutputAttributes());
for (const auto &pair : hoisted_tables) {
const P::TableReferencePtr &source_table = pair.first;
const E::AttributeReferencePtr &probe_attribute = pair.second;
E::AttributeReferencePtr build_attribute;
std::vector<E::AttributeReferencePtr> new_attribute_list;
for (const auto &attr : source_table->attribute_list()) {
if (attr->id() == probe_attribute->id()) {
build_attribute =
E::AttributeReference::Create(optimizer_context_->nextExprId(),
attr->attribute_name(),
attr->attribute_alias(),
attr->relation_name(),
attr->getValueType(),
E::AttributeReferenceScope::kLocal);
new_attribute_list.emplace_back(build_attribute);
} else {
new_attribute_list.emplace_back(attr);
project_expressions.emplace_back(attr);
}
}
DCHECK(build_attribute != nullptr);
const P::TableReferencePtr build_side_table =
P::TableReference::Create(source_table->relation(),
source_table->relation()->getName(),
new_attribute_list);
output = P::HashJoin::Create(output,
build_side_table,
{probe_attribute},
{build_attribute},
nullptr,
project_expressions,
P::HashJoin::JoinType::kInnerJoin);
}
return output;
}
} // namespace optimizer
} // namespace quickstep