blob: 39546c6c190a7b0d2aa9281bb447300ff56a1f26 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "query_optimizer/rules/Partition.hpp"
#include <cstddef>
#include <cstdint>
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>
#include "query_optimizer/OptimizerContext.hpp"
#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/ExpressionUtil.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
#include "query_optimizer/physical/HashJoin.hpp"
#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
#include "query_optimizer/physical/PatternMatcher.hpp"
#include "query_optimizer/physical/Physical.hpp"
#include "query_optimizer/physical/PhysicalType.hpp"
#include "query_optimizer/physical/Selection.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "utility/Cast.hpp"
#include "utility/EqualsAnyConstant.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
using std::make_unique;
using std::move;
using std::size_t;
using std::static_pointer_cast;
using std::unordered_set;
using std::vector;
namespace quickstep {
namespace optimizer {
namespace E = expressions;
namespace P = physical;
static bool ValidateNumRepartitions(const char *flagname, std::uint64_t value) {
return value > 1u;
}
DEFINE_uint64(num_repartitions, 4, "Number of repartitions for a join.");
static const volatile bool num_repartitions_dummy
= gflags::RegisterFlagValidator(&FLAGS_num_repartitions, &ValidateNumRepartitions);
void Partition::init(const P::PhysicalPtr &input) {
P::TopLevelPlanPtr top_level_plan;
CHECK(P::SomeTopLevelPlan::MatchesWithConditionalCast(input, &top_level_plan));
cost_model_ = make_unique<cost::StarSchemaSimpleCostModel>(top_level_plan->shared_subplans());
}
namespace {
bool needsSelection(const P::PhysicalType physical_type) {
return QUICKSTEP_EQUALS_ANY_CONSTANT(physical_type,
P::PhysicalType::kSharedSubplanReference,
P::PhysicalType::kSort,
P::PhysicalType::kTableReference,
P::PhysicalType::kUnionAll);
}
P::PhysicalPtr HashRepartition(const P::PhysicalPtr &node,
const vector<E::AttributeReferencePtr> &repartition_attributes,
const size_t num_repartitions) {
P::PartitionSchemeHeader::PartitionExprIds repartition_expr_ids;
for (const E::AttributeReferencePtr &attr : repartition_attributes) {
repartition_expr_ids.push_back({ attr->id() });
}
auto repartition_scheme_header = make_unique<P::PartitionSchemeHeader>(
P::PartitionSchemeHeader::PartitionType::kHash, num_repartitions, move(repartition_expr_ids));
if (needsSelection(node->getPhysicalType())) {
// Add a Selection node.
return P::Selection::Create(node,
CastSharedPtrVector<E::NamedExpression>(node->getOutputAttributes()),
nullptr /* filter_predicate */, repartition_scheme_header.release());
} else {
// Overwrite the output partition scheme header of the node.
return node->copyWithNewOutputPartitionSchemeHeader(repartition_scheme_header.release());
}
}
} // namespace
P::PhysicalPtr Partition::applyToNode(const P::PhysicalPtr &node) {
// Will be used for aggregations.
(void) optimizer_context_;
switch (node->getPhysicalType()) {
case P::PhysicalType::kHashJoin: {
const P::HashJoinPtr hash_join = static_pointer_cast<const P::HashJoin>(node);
P::PhysicalPtr left = hash_join->left();
const P::PartitionSchemeHeader *left_partition_scheme_header =
left->getOutputPartitionSchemeHeader();
P::PhysicalPtr right = hash_join->right();
const P::PartitionSchemeHeader *right_partition_scheme_header =
right->getOutputPartitionSchemeHeader();
if (!left_partition_scheme_header && !right_partition_scheme_header) {
break;
}
const auto &left_join_attributes = hash_join->left_join_attributes();
const auto &right_join_attributes = hash_join->right_join_attributes();
bool left_needs_repartition = false;
bool right_needs_repartition = false;
size_t num_partitions = 1u;
needsRepartitionForHashJoin(left_partition_scheme_header, left_join_attributes,
right_partition_scheme_header, right_join_attributes,
&left_needs_repartition, &right_needs_repartition, &num_partitions);
// Hash repartition.
if (left_needs_repartition) {
left = HashRepartition(left, left_join_attributes, num_partitions);
}
if (right_needs_repartition) {
right = HashRepartition(right, right_join_attributes, num_partitions);
}
unordered_set<E::ExprId> project_expr_ids;
for (const E::AttributeReferencePtr &project_expression : hash_join->getOutputAttributes()) {
project_expr_ids.insert(project_expression->id());
}
P::PartitionSchemeHeader::PartitionExprIds output_repartition_expr_ids;
for (size_t i = 0; i < left_join_attributes.size(); ++i) {
const E::ExprId left_join_id = left_join_attributes[i]->id();
const E::ExprId right_join_id = right_join_attributes[i]->id();
output_repartition_expr_ids.emplace_back();
if (project_expr_ids.count(left_join_id)) {
output_repartition_expr_ids.back().insert(left_join_id);
}
if (project_expr_ids.count(right_join_id)) {
output_repartition_expr_ids.back().insert(right_join_id);
}
if (output_repartition_expr_ids.back().empty()) {
// Some partition attribute will be projected out, so we use
// the input partition id as the output partition id.
output_repartition_expr_ids.clear();
break;
}
}
auto output_partition_scheme_header = make_unique<P::PartitionSchemeHeader>(
P::PartitionSchemeHeader::PartitionType::kHash, num_partitions, move(output_repartition_expr_ids));
if (left_needs_repartition || right_needs_repartition) {
return P::HashJoin::Create(left, right, left_join_attributes, right_join_attributes,
hash_join->residual_predicate(),
hash_join->project_expressions(),
hash_join->join_type(),
output_partition_scheme_header.release());
} else if (left_partition_scheme_header) {
return hash_join->copyWithNewOutputPartitionSchemeHeader(output_partition_scheme_header.release());
}
break;
}
case P::PhysicalType::kSelection: {
const P::SelectionPtr selection = static_pointer_cast<const P::Selection>(node);
const P::PartitionSchemeHeader *input_partition_scheme_header =
selection->input()->getOutputPartitionSchemeHeader();
if (input_partition_scheme_header && input_partition_scheme_header->isHashPartition()) {
unordered_set<E::ExprId> project_expr_ids;
for (const E::AttributeReferencePtr &project_expression : selection->getOutputAttributes()) {
project_expr_ids.insert(project_expression->id());
}
const auto &input_partition_expr_ids = input_partition_scheme_header->partition_expr_ids;
P::PartitionSchemeHeader::PartitionExprIds output_partition_expr_ids;
for (const auto &equivalent_expr_ids : input_partition_expr_ids) {
P::PartitionSchemeHeader::EquivalentPartitionExprIds output_equivalent_partition_expr_ids;
for (const E::ExprId expr_id : equivalent_expr_ids) {
if (project_expr_ids.find(expr_id) != project_expr_ids.end()) {
output_equivalent_partition_expr_ids.insert(expr_id);
}
}
if (!output_equivalent_partition_expr_ids.empty()) {
output_partition_expr_ids.push_back(move(output_equivalent_partition_expr_ids));
}
}
if (input_partition_expr_ids != output_partition_expr_ids) {
auto output_partition_scheme_header = make_unique<P::PartitionSchemeHeader>(
P::PartitionSchemeHeader::PartitionType::kHash,
input_partition_scheme_header->num_partitions,
move(output_partition_expr_ids));
return selection->copyWithNewOutputPartitionSchemeHeader(output_partition_scheme_header.release());
}
}
break;
}
default:
break;
}
return node;
}
void Partition::needsRepartitionForHashJoin(
const P::PartitionSchemeHeader *left_partition_scheme_header,
const vector<E::AttributeReferencePtr> &left_join_attributes,
const P::PartitionSchemeHeader *right_partition_scheme_header,
const vector<E::AttributeReferencePtr> &right_join_attributes,
bool *left_needs_repartition,
bool *right_needs_repartition,
size_t *num_partitions) {
DCHECK(left_partition_scheme_header || right_partition_scheme_header);
*left_needs_repartition = false;
*num_partitions = 1u;
if (left_partition_scheme_header) {
*num_partitions = left_partition_scheme_header->num_partitions;
// Need to repartition unless the partition attributes are as the same as
// the join attributes.
*left_needs_repartition = true;
if (left_partition_scheme_header->isHashPartition()) {
unordered_set<E::ExprId> left_join_expr_ids;
for (const E::AttributeReferencePtr &attr : left_join_attributes) {
left_join_expr_ids.insert(attr->id());
}
if (left_partition_scheme_header->reusablePartitionScheme(left_join_expr_ids)) {
*left_needs_repartition = false;
*num_partitions = left_partition_scheme_header->num_partitions;
}
}
} else if (right_partition_scheme_header) {
*left_needs_repartition = true;
*num_partitions = right_partition_scheme_header->num_partitions;
}
*right_needs_repartition = false;
if (right_partition_scheme_header) {
// Need to repartition unless the partition attributes are as the same as
// the join attributes.
*right_needs_repartition = true;
if (right_partition_scheme_header->isHashPartition()) {
unordered_set<E::ExprId> right_join_expr_ids;
for (const E::AttributeReferencePtr &attr : right_join_attributes) {
right_join_expr_ids.insert(attr->id());
}
if (right_partition_scheme_header->reusablePartitionScheme(right_join_expr_ids) &&
(*left_needs_repartition || *num_partitions == right_partition_scheme_header->num_partitions)) {
*right_needs_repartition = false;
*num_partitions = right_partition_scheme_header->num_partitions;
}
}
} else if (*left_needs_repartition) {
// TODO(quickstep-team): use a cost model to choose the broadcast hash join
// or repartitioned hash join if the right side has no partitions while the
// left side needs to repartition. For now, we always use the latter.
*right_needs_repartition = true;
}
if (*right_needs_repartition && *left_needs_repartition) {
*num_partitions = FLAGS_num_repartitions;
}
DCHECK_NE(1u, *num_partitions);
}
} // namespace optimizer
} // namespace quickstep