blob: 3856b71781f0c66ebed1db79a86396feb5c23e8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AggregateRelParser.h"
#include <memory>
#include <AggregateFunctions/Combinators/AggregateFunctionIf.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeTuple.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Operator/DefaultHashAggregateResult.h>
#include <Operator/GraceAggregatingStep.h>
#include <Operator/GraceMergingAggregatedStep.h>
#include <Operator/StreamingAggregatingStep.h>
#include <Parser/AdvancedParametersParseUtil.h>
#include <Parser/AggregateFunctionParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <boost/algorithm/string/join.hpp>
#include <google/protobuf/wrappers.pb.h>
#include <Common/CHUtil.h>
#include <Common/GlutenConfig.h>
namespace DB
{
namespace Setting
{
extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TYPE;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
}
namespace local_engine
{
using namespace DB;
AggregateRelParser::AggregateRelParser(ParserContextPtr parser_context_) : RelParser(parser_context_)
{
}
DB::QueryPlanPtr
AggregateRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
rel_stack = &rel_stack_;
setup(std::move(query_plan), rel);
addPreProjection();
LOG_TRACE(logger, "header after pre-projection is: {}", plan->getCurrentHeader()->dumpStructure());
if (has_final_stage)
{
addMergingAggregatedStep();
LOG_TRACE(logger, "header after merging is: {}", plan->getCurrentHeader()->dumpStructure());
addPostProjection();
LOG_TRACE(logger, "header after post-projection is: {}", plan->getCurrentHeader()->dumpStructure());
}
else if (has_complete_stage)
{
addCompleteModeAggregatedStep();
LOG_TRACE(logger, "header after complete aggregate is: {}", plan->getCurrentHeader()->dumpStructure());
addPostProjection();
LOG_TRACE(logger, "header after post-projection is: {}", plan->getCurrentHeader()->dumpStructure());
}
else
{
addAggregatingStep();
LOG_TRACE(logger, "header after aggregating is: {}", plan->getCurrentHeader()->dumpStructure());
}
/// Add a check here to help find bugs, Don't remove it.
/// Thre order of result of result columns must be ordered grouping keys ++ ordered aggregate expression results
const auto & aggregation_output_header = *plan->getCurrentHeader();
for (size_t i = 0; i < grouping_keys.size(); ++i)
{
auto pos = aggregation_output_header.getPositionByName(grouping_keys[i]);
if (pos != i)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "The order of aggregation result columns is invalid");
}
/// If the groupings is empty, we still need to return one row with default values even if the input is empty.
if ((rel.aggregate().groupings().empty() || rel.aggregate().groupings()[0].grouping_expressions().empty())
&& (has_final_stage || has_complete_stage || rel.aggregate().measures().empty()))
{
LOG_TRACE(&Poco::Logger::get("AggregateRelParser"), "default aggregate result step");
auto default_agg_result = std::make_unique<DefaultHashAggregateResultStep>(plan->getCurrentHeader());
default_agg_result->setStepDescription("Default aggregate result");
steps.push_back(default_agg_result.get());
plan->addStep(std::move(default_agg_result));
}
return std::move(plan);
}
void AggregateRelParser::setup(DB::QueryPlanPtr query_plan, const substrait::Rel & rel)
{
plan = std::move(query_plan);
aggregate_rel = &rel.aggregate();
std::set<substrait::AggregationPhase> phase_set;
for (const auto & measure : aggregate_rel->measures())
{
auto phase = measure.measure().phase();
phase_set.insert(phase);
}
has_first_stage = phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE);
has_inter_stage = phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE);
has_final_stage = phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT);
has_complete_stage = phase_set.contains(substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_RESULT);
bool next_step_is_agg = false;
google::protobuf::StringValue raw_extra_params;
raw_extra_params.ParseFromString(aggregate_rel->advanced_extension().optimization().value());
auto extra_params = AggregateOptimizationInfo::parse(raw_extra_params.value());
if (aggregate_rel->measures().empty())
{
/// For aggregate without aggregate functions. we use other ways the determine wchich stage it is.
/// In general, when has_required_child_distribution_expressions is true, this aggregate stage is the final
/// merge aggregated step. But there are exceptions.
/// 1. when the query is a distinct like aggregation. for example.
/// select n_regionkey, count(distinct n_nationkey) from nation group by n_regionkey
/// There should be four aggregation steps.
/// step1: partial aggregation on keys (n_regionkey, n_nationkey) with empty aggregation functions.
/// step2: aggregation on keys (n_regionkey, n_nationkey) with empty aggregation functions after shuffle.
/// step3: aggregation on keys (n_regionkey) with partial aggregation count(n_nationkey)
/// step4: aggregation on keys (n_regionkey) with final aggregation merge count(n_nationkey) after shuffle
/// If step3 is followed by another aggregation, we must let has_final_stage = false. Otherwise it will make
/// columns position missmatch.
/// 2. the two aggregation stages are merged into one by rule `MergeTwoPhasesHashBaseAggregate`. In this case
/// we also let has_first_stage = false;
/// FIXME: Really don't like this implementation. It's too easy to be broken.
next_step_is_agg = rel_stack->empty() ? false : rel_stack->back()->rel_type_case() == substrait::Rel::RelTypeCase::kAggregate;
bool is_last_stage = extra_params.has_required_child_distribution_expressions && !next_step_is_agg;
has_first_stage = !extra_params.has_pre_partial_aggregate;
has_final_stage = extra_params.has_pre_partial_aggregate && is_last_stage;
has_complete_stage = !extra_params.has_pre_partial_aggregate && is_last_stage;
}
if (phase_set.size() > 1 && has_final_stage)
{
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "AggregateRelParser: multiple aggregation phases with final stage are not supported");
}
if (phase_set.size() > 1 && has_complete_stage)
{
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "AggregateRelParser: multiple aggregation phases with complete mode are not supported");
}
const auto & input_header = *plan->getCurrentHeader();
for (const auto & measure : aggregate_rel->measures())
{
AggregateInfo agg_info;
auto arg = measure.measure().arguments(0).value();
agg_info.signature_function_name = *parseSignatureFunctionName(measure.measure().function_reference());
auto function_parser = AggregateFunctionParserFactory::instance().get(agg_info.signature_function_name, parser_context);
if (!function_parser)
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported aggregate function: {}", agg_info.signature_function_name);
/// Put function_parser, parser_func_info and function_name into agg_info for reducing repeated builds.
agg_info.function_parser = function_parser;
agg_info.parser_func_info = AggregateFunctionParser::CommonFunctionInfo(measure);
agg_info.function_name = function_parser->getCHFunctionName(agg_info.parser_func_info);
agg_info.measure = &measure;
aggregates.push_back(agg_info);
}
if (aggregate_rel->groupings_size() == 1)
{
for (const auto & expr : aggregate_rel->groupings(0).grouping_expressions())
{
auto field_index = SubstraitParserUtils::getStructFieldIndex(expr);
if (field_index)
grouping_keys.push_back(input_header.getByPosition(*field_index).name);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported group expression: {}", expr.DebugString());
}
}
else if (aggregate_rel->groupings_size() != 0)
{
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupport multible groupings");
}
}
/// Projections for function arguments.
/// The projections are built by the function parsers.
void AggregateRelParser::addPreProjection()
{
const auto & input_header = *plan->getCurrentHeader();
ActionsDAG projection_action{input_header.getColumnsWithTypeAndName()};
std::string dag_footprint = projection_action.dumpDAG();
for (auto & agg_info : aggregates)
{
auto arg_nodes = agg_info.function_parser->parseFunctionArguments(agg_info.parser_func_info, projection_action);
// This may remove elements from arg_nodes, because some of them are converted to CH func parameters.
agg_info.params = agg_info.function_parser->parseFunctionParameters(agg_info.parser_func_info, arg_nodes, projection_action);
for (auto & arg_node : arg_nodes)
{
agg_info.arg_column_names.emplace_back(arg_node->result_name);
agg_info.arg_column_types.emplace_back(arg_node->result_type);
projection_action.addOrReplaceInOutputs(*arg_node);
}
}
if (projection_action.dumpDAG() != dag_footprint)
{
/// Avoid unnecessary evaluation
projection_action.removeUnusedActions();
auto projection_step = std::make_unique<DB::ExpressionStep>(plan->getCurrentHeader(), std::move(projection_action));
projection_step->setStepDescription("Projection before aggregate");
steps.emplace_back(projection_step.get());
plan->addStep(std::move(projection_step));
}
}
void AggregateRelParser::buildAggregateDescriptions(AggregateDescriptions & descriptions)
{
const auto & current_plan_header = *plan->getCurrentHeader();
auto build_result_column_name
= [this, current_plan_header](
const String & function_name, const Array & params, const Strings & arg_names, substrait::AggregationPhase phase)
{
if (phase == substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT
|| phase == substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE)
{
assert(arg_names.size() == 1);
return arg_names[0];
}
String result = function_name;
if (!params.empty())
{
result += "(";
for (size_t i = 0; i < params.size(); ++i)
{
if (i != 0)
result += ",";
result += toString(params[i]);
}
result += ")";
}
result += "(";
result += boost::algorithm::join(arg_names, ",");
result += ")";
// Make the name unique to avoid name collision(issue #6878).
auto res = this->getUniqueName(result);
// Just a check for remining this issue.
if (current_plan_header.findByName(res))
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "Name ({}) collision in header: {}", res, current_plan_header.dumpStructure());
return res;
};
for (auto & agg_info : aggregates)
{
AggregateDescription description;
const auto & measure = agg_info.measure->measure();
description.column_name
= build_result_column_name(agg_info.function_name, agg_info.params, agg_info.arg_column_names, measure.phase());
agg_info.measure_column_name = description.column_name;
// std::cout << "description.column_name:" << description.column_name << std::endl;
description.argument_names = agg_info.arg_column_names;
DB::AggregateFunctionProperties properties;
if (!agg_info.function_name.ends_with("State"))
{
// May apply `PartialMerge` and `If` on the original function.
auto [combinator_function_name, combinator_function_arg_types] = agg_info.function_parser->tryApplyCHCombinator(
agg_info.parser_func_info, agg_info.function_name, agg_info.arg_column_types);
description.function
= getAggregateFunction(combinator_function_name, combinator_function_arg_types, properties, agg_info.params);
}
else
{
// If the function is a state function, we don't need to apply `PartialMerge`.
// In INITIAL_TO_INTERMEDIATE or INITIAL_TO_RESULT phase, we do arguments -> xxState.
// In INTERMEDIATE_TO_RESULT phase, we do xxState -> xxState.
if (agg_info.parser_func_info.phase == substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE
|| agg_info.parser_func_info.phase == substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_RESULT)
{
description.function = getAggregateFunction(agg_info.function_name, agg_info.arg_column_types, properties, agg_info.params);
}
else if (agg_info.parser_func_info.phase == substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT)
{
if (agg_info.arg_column_types.size() != 1)
throw Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"Only support one argument aggregate function in phase {}",
agg_info.parser_func_info.phase);
const auto * type = checkAndGetDataType<DataTypeAggregateFunction>(agg_info.arg_column_types[0].get());
if (!type)
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument for function {} must have type AggregateFunction as arguments",
agg_info.function_name);
auto nested_types = type->getArgumentsDataTypes();
description.function = getAggregateFunction(agg_info.function_name, nested_types, properties, agg_info.params);
}
else
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unsupported phase for state function: {}", agg_info.function_name);
}
}
description.parameters = agg_info.params;
descriptions.emplace_back(description);
}
}
void AggregateRelParser::addMergingAggregatedStep()
{
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
const auto & settings = getContext()->getSettingsRef();
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
auto params = AggregatorParamsHelper::buildParams(
getContext(), grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED);
auto merging_step = std::make_unique<GraceMergingAggregatedStep>(getContext(), plan->getCurrentHeader(), params, false);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
else
{
auto params = AggregatorParamsHelper::buildParams(
getContext(),
grouping_keys,
aggregate_descriptions,
AggregatorParamsHelper::Mode::PARTIAL_TO_FINISHED,
AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
/// We don't use the grouping set feature in CH, so grouping_sets_params_list should always be empty.
DB::GroupingSetsParamsList grouping_sets_params_list;
auto merging_step = std::make_unique<DB::MergingAggregatedStep>(
plan->getCurrentHeader(),
params,
grouping_sets_params_list,
true,
false,
1,
false,
settings[Setting::max_block_size],
settings[Setting::aggregation_in_order_max_block_bytes],
settings[Setting::enable_memory_bound_merging_of_aggregation_results]);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
}
void AggregateRelParser::addCompleteModeAggregatedStep()
{
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
const auto & settings = getContext()->getSettingsRef();
auto config = StreamingAggregateConfig::loadFromContext(getContext());
if (config.enable_streaming_aggregating)
{
auto params = AggregatorParamsHelper::buildParams(
getContext(), grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::INIT_TO_COMPLETED);
auto merging_step = std::make_unique<GraceMergingAggregatedStep>(getContext(), plan->getCurrentHeader(), params, true);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
else
{
auto params = AggregatorParamsHelper::buildParams(
getContext(),
grouping_keys,
aggregate_descriptions,
AggregatorParamsHelper::Mode::INIT_TO_COMPLETED,
AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
auto aggregating_step = std::make_unique<AggregatingStep>(
plan->getCurrentHeader(),
params,
GroupingSetsParamsList(),
true,
settings[Setting::max_block_size],
settings[Setting::aggregation_in_order_max_block_bytes],
1,
1,
false,
false,
SortDescription(),
SortDescription(),
false,
false,
false);
steps.emplace_back(aggregating_step.get());
plan->addStep(std::move(aggregating_step));
}
}
void AggregateRelParser::addAggregatingStep()
{
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
const auto & settings = getContext()->getSettingsRef();
auto config = StreamingAggregateConfig::loadFromContext(getContext());
bool is_distinct_aggreate = false;
if (!rel_stack->empty())
{
const auto & next_rel = *(rel_stack->back());
if (next_rel.rel_type_case() == substrait::Rel::RelTypeCase::kAggregate)
is_distinct_aggreate = true;
}
if (config.enable_streaming_aggregating)
{
// Disable spilling to disk.
// If group_by_two_level_threshold_bytes != 0, `Aggregator` will use memory usage as a condition to convert
// the hash table into a two level one. The method of determining the amount of memory used by the hash table is
// unreliable. It will appear that a small hash table is converted into a two level structure, resulting in a
// lot of small blocks. So we disable this condition, reamain `group_by_two_level_threshold` as the condition to
// convert a single level hash table into a two level one.
auto params = AggregatorParamsHelper::buildParams(
getContext(), grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::INIT_TO_PARTIAL);
if (!is_distinct_aggreate)
{
auto aggregating_step = std::make_unique<StreamingAggregatingStep>(getContext(), plan->getCurrentHeader(), params);
steps.emplace_back(aggregating_step.get());
plan->addStep(std::move(aggregating_step));
}
else
{
/// If it's an aggregation query involves distinct, for example,
/// select n_regionkey, count(distinct(n_name)), sum(n_nationkey) from nation group by n_regionkey
/// The running steps are as follow in general.
/// step1: partial aggregation on keys (n_regionkey, n_name) with empty aggregation functions.
/// step2: exchagne shuffle
/// step3: aggregation on keys (n_regionkey, n_name) with partial aggregation sum(n_nationkey)
/// step4: aggregation on keys (n_regionkey) with partial aggregation merge sum(n_nationkey), and partial aggregation count(n_name)
/// step5: exchange shuffle
/// step6: aggregation on keys (n_regionkey) with final aggregation merge sum(n_nationkey) and count(n_name)
/// We cannot use streaming aggregating strategy in step3. Otherwise it will generate multiple blocks with same n_name in them. This
/// will make the result for count(distinct(n_name)) wrong. step3 must finish all inputs before it puts any block into step4.
/// So we introduce GraceAggregatingStep here, it can handle mass data with high cardinality.
auto aggregating_step = std::make_unique<GraceAggregatingStep>(getContext(), plan->getCurrentHeader(), params, has_first_stage);
steps.emplace_back(aggregating_step.get());
plan->addStep(std::move(aggregating_step));
}
}
else
{
auto params = AggregatorParamsHelper::buildParams(
getContext(),
grouping_keys,
aggregate_descriptions,
AggregatorParamsHelper::Mode::INIT_TO_PARTIAL,
AggregatorParamsHelper::Algorithm::CHTwoStageAggregate);
auto aggregating_step = std::make_unique<AggregatingStep>(
plan->getCurrentHeader(),
params,
GroupingSetsParamsList(),
false,
settings[Setting::max_block_size],
settings[Setting::aggregation_in_order_max_block_bytes],
1,
1,
false,
false,
SortDescription(),
SortDescription(),
false,
false,
false);
steps.emplace_back(aggregating_step.get());
plan->addStep(std::move(aggregating_step));
}
}
// Only be called in final stage.
void AggregateRelParser::addPostProjection()
{
const auto & input_header = *plan->getCurrentHeader();
ActionsDAG project_actions_dag{input_header.getColumnsWithTypeAndName()};
auto dag_footprint = project_actions_dag.dumpDAG();
if (has_final_stage)
{
for (const auto & agg_info : aggregates)
{
for (const auto * input_node : project_actions_dag.getInputs())
if (input_node->result_name == agg_info.measure_column_name)
agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info, input_node, project_actions_dag, false);
}
}
else if (has_complete_stage)
{
// on the complete mode, it must consider the nullability when converting node type
for (const auto & agg_info : aggregates)
{
for (const auto * output_node : project_actions_dag.getOutputs())
if (output_node->result_name == agg_info.measure_column_name)
agg_info.function_parser->convertNodeTypeIfNeeded(agg_info.parser_func_info, output_node, project_actions_dag, true);
}
}
if (project_actions_dag.dumpDAG() != dag_footprint)
{
QueryPlanStepPtr convert_step = std::make_unique<ExpressionStep>(plan->getCurrentHeader(), std::move(project_actions_dag));
convert_step->setStepDescription("Post-projection for aggregate");
steps.emplace_back(convert_step.get());
plan->addStep(std::move(convert_step));
}
}
void registerAggregateParser(RelParserFactory & factory)
{
auto builder = [](ParserContextPtr parser_context) { return std::make_shared<AggregateRelParser>(parser_context); };
factory.registerBuilder(substrait::Rel::RelTypeCase::kAggregate, builder);
}
}