blob: 5aee65474108125ec4080dc2d091c360c89a0cea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GroupLimitRelParser.h"
#include <algorithm>
#include <memory>
#include <unordered_set>
#include <utility>
#include <Columns/IColumn.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/WindowDescription.h>
#include <Operator/BranchStep.h>
#include <Operator/GraceMergingAggregatedStep.h>
#include <Operator/WindowGroupLimitStep.h>
#include <Parser/AdvancedParametersParseUtil.h>
#include <Parser/RelParsers/SortParsingUtils.h>
#include <Parser/RelParsers/SortRelParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <google/protobuf/repeated_field.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/AggregateUtil.h>
#include <Common/ArrayJoinHelper.h>
#include <Common/GlutenConfig.h>
#include <Common/PlanUtil.h>
#include <Common/QueryContext.h>
#include <Common/logger_useful.h>
namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace DB
{
namespace Setting
{
extern const SettingsMaxThreads max_threads;
}
}
namespace local_engine
{
using namespace DB;
GroupLimitRelParser::GroupLimitRelParser(ParserContextPtr parser_context_) : RelParser(parser_context_)
{
}
DB::QueryPlanPtr
GroupLimitRelParser::parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
const auto win_rel_def = rel.windowgrouplimit();
google::protobuf::StringValue optimize_info_str;
optimize_info_str.ParseFromString(win_rel_def.advanced_extension().optimization().value());
auto optimization_info = WindowGroupOptimizationInfo::parse(optimize_info_str.value());
if (optimization_info.is_aggregate_group_limit)
{
AggregateGroupLimitRelParser aggregate_group_limit_parser(parser_context);
auto plan = aggregate_group_limit_parser.parse(std::move(current_plan_), rel, rel_stack_);
steps = aggregate_group_limit_parser.getSteps();
return std::move(plan);
}
else
{
WindowGroupLimitRelParser window_parser(parser_context);
auto plan = window_parser.parse(std::move(current_plan_), rel, rel_stack_);
steps = window_parser.getSteps();
return std::move(plan);
}
}
static std::vector<size_t> parsePartitionFields(const google::protobuf::RepeatedPtrField<substrait::Expression> & expressions)
{
std::vector<size_t> fields;
for (const auto & expr : expressions)
if (auto field_index = SubstraitParserUtils::getStructFieldIndex(expr))
fields.push_back(*field_index);
else if (expr.has_literal())
continue;
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow expression: {}", expr.DebugString());
return fields;
}
std::vector<size_t> parseSortFields(const google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields)
{
std::vector<size_t> fields;
for (const auto sort_field : sort_fields)
if (sort_field.expr().has_literal())
continue;
else if (auto field_index = SubstraitParserUtils::getStructFieldIndex(sort_field.expr()))
fields.push_back(*field_index);
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown expression: {}", sort_field.expr().DebugString());
return fields;
}
WindowGroupLimitRelParser::WindowGroupLimitRelParser(ParserContextPtr parser_context_) : RelParser(parser_context_)
{
}
DB::QueryPlanPtr
WindowGroupLimitRelParser::parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
const auto win_rel_def = rel.windowgrouplimit();
google::protobuf::StringValue optimize_info_str;
optimize_info_str.ParseFromString(win_rel_def.advanced_extension().optimization().value());
auto optimization_info = WindowGroupOptimizationInfo::parse(optimize_info_str.value());
window_function_name = optimization_info.window_function;
current_plan = std::move(current_plan_);
auto partition_fields = parsePartitionFields(win_rel_def.partition_expressions());
auto sort_fields = parseSortFields(win_rel_def.sorts());
size_t limit = static_cast<size_t>(win_rel_def.limit());
auto window_group_limit_step = std::make_unique<WindowGroupLimitStep>(
current_plan->getCurrentHeader(), window_function_name, partition_fields, sort_fields, limit);
window_group_limit_step->setStepDescription("Window group limit");
steps.emplace_back(window_group_limit_step.get());
current_plan->addStep(std::move(window_group_limit_step));
return std::move(current_plan);
}
AggregateGroupLimitRelParser::AggregateGroupLimitRelParser(ParserContextPtr parser_context_) : RelParser(parser_context_)
{
}
// used to decide which branch
size_t selectBranchOnPartitionKeysCardinality(
const std::vector<size_t> & partition_keys, double high_card_threshold, const std::list<DB::Chunk> & chunks)
{
size_t total_rows = 0;
std::unordered_set<UInt32> ids;
for (const auto & chunk : chunks)
{
total_rows += chunk.getNumRows();
DB::WeakHash32 hash(chunk.getNumRows());
const auto & cols = chunk.getColumns();
for (auto i : partition_keys)
hash.update(cols[i]->getWeakHash32());
const auto & data = hash.getData();
for (size_t n = 0, sz = chunk.getNumRows(); n < sz; ++n)
ids.insert(data[n]);
}
LOG_DEBUG(
getLogger("AggregateGroupLimitRelParser"),
"Approximate distinct keys {}, total rows: {}, thrshold: {}",
ids.size(),
total_rows,
high_card_threshold);
return ids.size() * 1.0 / (total_rows + 1) <= high_card_threshold ? 0 : 1;
}
DB::QueryPlanPtr AggregateGroupLimitRelParser::parse(
DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
{
// calculate window's topk by aggregation.
// 1. add a pre-projecttion. Make two tuple arguments for the aggregation function. One is the required columns for the output, the other
// is the required columns for sorting.
// 2. Collect the sorting directions for each sorting field, Let them as the aggregation function's parameters.
// 3. Add a aggregation step.
// 4. Add a post-projecttion. Explode the aggregation function's result, since the result is an array.
current_plan = std::move(current_plan_);
input_header = current_plan->getCurrentHeader();
win_rel_def = &rel.windowgrouplimit();
google::protobuf::StringValue optimize_info_str;
optimize_info_str.ParseFromString(win_rel_def->advanced_extension().optimization().value());
auto optimization_info = WindowGroupOptimizationInfo::parse(optimize_info_str.value());
limit = static_cast<size_t>(win_rel_def->limit());
aggregate_function_name = getAggregateFunctionName(optimization_info.window_function);
if (limit < 1)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid limit: {}", limit);
auto win_config = WindowConfig::loadFromContext(getContext());
auto high_card_threshold = win_config.aggregate_topk_high_cardinality_threshold;
// Aggregation doesn't perform well on high cardinality keys. We make two execution pathes here.
// - if the partition keys are low cardinality, run it by aggregation
// - if the partition keys are high cardinality, run it by window.
auto partition_fields = parsePartitionFields(win_rel_def->partition_expressions());
auto branch_in_header = current_plan->getCurrentHeader();
auto branch_step = std::make_unique<StaticBranchStep>(
getContext(),
branch_in_header,
2,
win_config.aggregate_topk_sample_rows,
[partition_fields, high_card_threshold](const std::list<DB::Chunk> & chunks) -> size_t
{ return selectBranchOnPartitionKeysCardinality(partition_fields, high_card_threshold, chunks); });
branch_step->setStepDescription("Window TopK");
steps.push_back(branch_step.get());
current_plan->addStep(std::move(branch_step));
// If all partition keys are low cardinality keys, use aggregattion to get topk of each partition
auto aggregation_plan = BranchStepHelper::createSubPlan(branch_in_header, 1);
collectPartitionAndSortFields();
prePrejectionForAggregateArguments(*aggregation_plan);
addGroupLmitAggregationStep(*aggregation_plan);
postProjectionForExplodingArrays(*aggregation_plan);
LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Aggregate topk plan:\n{}", PlanUtil::explainPlan(*aggregation_plan));
auto window_plan = BranchStepHelper::createSubPlan(branch_in_header, 1);
addSortStep(*window_plan);
addWindowLimitStep(*window_plan);
auto convert_actions_dag = DB::ActionsDAG::makeConvertingActions(
window_plan->getCurrentHeader()->getColumnsWithTypeAndName(),
aggregation_plan->getCurrentHeader()->getColumnsWithTypeAndName(),
DB::ActionsDAG::MatchColumnsMode::Position);
auto convert_step = std::make_unique<DB::ExpressionStep>(window_plan->getCurrentHeader(), std::move(convert_actions_dag));
convert_step->setStepDescription("Rename rank column name");
window_plan->addStep(std::move(convert_step));
LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Window topk plan:\n{}", PlanUtil::explainPlan(*window_plan));
std::vector<DB::QueryPlanPtr> branch_plans;
branch_plans.emplace_back(std::move(aggregation_plan));
branch_plans.emplace_back(std::move(window_plan));
auto unite_branches_step = std::make_unique<UniteBranchesStep>(getContext(), branch_in_header, std::move(branch_plans), 1);
unite_branches_step->setStepDescription("Unite TopK branches");
steps.push_back(unite_branches_step.get());
current_plan->addStep(std::move(unite_branches_step));
return std::move(current_plan);
}
String AggregateGroupLimitRelParser::getAggregateFunctionName(const String & window_function_name)
{
if (window_function_name == "row_number")
return "rowNumGroupArraySorted";
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported window function: {}", window_function_name);
}
void AggregateGroupLimitRelParser::collectPartitionAndSortFields()
{
partition_fields = parsePartitionFields(win_rel_def->partition_expressions());
auto full_sort_fields = parseSortFields(win_rel_def->sorts());
std::set<size_t> partition_fields_set(partition_fields.begin(), partition_fields.end());
std::set<size_t> full_sort_fields_set(full_sort_fields.begin(), full_sort_fields.end());
std::set<size_t> selected_sort_fields_set;
// Remove partition keys from sort keys
std::set_difference(
full_sort_fields_set.begin(),
full_sort_fields_set.end(),
partition_fields_set.begin(),
partition_fields_set.end(),
std::inserter(selected_sort_fields_set, selected_sort_fields_set.begin()));
if (selected_sort_fields_set.empty())
{
// FIXME: support empty sort keys.
sort_fields.push_back(*partition_fields_set.begin());
}
else
{
sort_fields = std::vector<size_t>(selected_sort_fields_set.begin(), selected_sort_fields_set.end());
}
}
// Build one tuple column as the aggregate function's arguments
void AggregateGroupLimitRelParser::prePrejectionForAggregateArguments(DB::QueryPlan & plan)
{
auto projection_actions = std::make_shared<DB::ActionsDAG>(input_header->getColumnsWithTypeAndName());
std::set<size_t> unique_partition_fields(partition_fields.begin(), partition_fields.end());
std::set<size_t> unique_sort_fields(sort_fields.begin(), sort_fields.end());
DB::NameSet required_column_names;
auto build_tuple = [&](const DB::DataTypes & data_types,
const Strings & names,
const DB::ActionsDAG::NodeRawConstPtrs & elements,
const String & name_prefix,
String & result_name)
{
result_name = expression_parser->getUniqueName(name_prefix);
auto tuple = expression_parser->toFunctionNode(*projection_actions, "tuple", elements, result_name);
auto tuple_type = std::make_shared<DB::DataTypeTuple>(data_types, names);
DB::ActionsDAG::NodeRawConstPtrs cast_args;
cast_args.push_back(tuple);
cast_args.push_back(
expression_parser->addConstColumn(*projection_actions, std::make_shared<DataTypeString>(), tuple_type->getName()));
tuple = expression_parser->toFunctionNode(*projection_actions, "CAST", cast_args, result_name);
projection_actions->addOrReplaceInOutputs(*tuple);
required_column_names.insert(tuple->result_name);
};
DB::DataTypes aggregate_data_tuple_types;
Strings aggregate_data_tuple_names;
DB::ActionsDAG::NodeRawConstPtrs aggregate_data_tuple_nodes;
for (size_t i = 0; i < input_header->columns(); ++i)
{
const auto & col = input_header->getByPosition(i);
if (unique_partition_fields.count(i))
{
required_column_names.insert(col.name);
aggregate_grouping_keys.push_back(col.name);
}
if (!unique_partition_fields.count(i) || unique_sort_fields.count(i))
{
aggregate_data_tuple_types.push_back(col.type);
aggregate_data_tuple_names.push_back(col.name);
aggregate_data_tuple_nodes.push_back(projection_actions->getInputs()[i]);
}
}
build_tuple(
aggregate_data_tuple_types,
aggregate_data_tuple_names,
aggregate_data_tuple_nodes,
"aggregate_data_tuple",
aggregate_tuple_column_name);
projection_actions->removeUnusedActions(required_column_names);
LOG_DEBUG(
getLogger("AggregateGroupLimitRelParser"),
"Projection for building tuples for aggregate function:\n{}",
projection_actions->dumpDAG());
auto expression_step = std::make_unique<DB::ExpressionStep>(input_header, std::move(*projection_actions));
expression_step->setStepDescription("Pre-projection for aggregate group limit arguments");
plan.addStep(std::move(expression_step));
}
DB::AggregateDescription AggregateGroupLimitRelParser::buildAggregateDescription(DB::QueryPlan & plan)
{
DB::AggregateDescription agg_desc;
agg_desc.column_name = aggregate_tuple_column_name;
agg_desc.argument_names = {aggregate_tuple_column_name};
auto & parameters = agg_desc.parameters;
parameters.push_back(static_cast<UInt32>(limit));
std::set<String> sort_field_names;
for (auto i : sort_fields)
sort_field_names.insert(input_header->getByPosition(i).name);
auto full_sort_desc = parseSortFields(*input_header, win_rel_def->sorts());
DB::SortDescription sort_desc;
for (const auto & sort_column : full_sort_desc)
if (sort_field_names.count(sort_column.column_name))
sort_desc.push_back(sort_column);
auto sort_directions = buildSQLLikeSortDescription(sort_desc);
parameters.push_back(sort_directions);
const auto & header = *plan.getCurrentHeader();
DB::DataTypes arg_types;
arg_types.push_back(header.getByName(aggregate_tuple_column_name).type);
DB::AggregateFunctionProperties properties;
agg_desc.function = getAggregateFunction(aggregate_function_name, arg_types, properties, parameters);
return agg_desc;
}
void AggregateGroupLimitRelParser::addGroupLmitAggregationStep(DB::QueryPlan & plan)
{
const auto & settings = getContext()->getSettingsRef();
DB::AggregateDescriptions agg_descs = {buildAggregateDescription(plan)};
auto params = AggregatorParamsHelper::buildParams(
getContext(), aggregate_grouping_keys, agg_descs, AggregatorParamsHelper::Mode::INIT_TO_COMPLETED);
auto agg_step = std::make_unique<GraceMergingAggregatedStep>(getContext(), plan.getCurrentHeader(), params, true);
plan.addStep(std::move(agg_step));
LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Plan after add group limit:\n{}", PlanUtil::explainPlan(plan));
}
void AggregateGroupLimitRelParser::postProjectionForExplodingArrays(DB::QueryPlan & plan)
{
const auto & header = *plan.getCurrentHeader();
/// flatten the array column.
auto agg_result_index = header.columns() - 1;
auto array_join_actions_dag = ArrayJoinHelper::applyArrayJoinOnOneColumn(header, agg_result_index);
auto new_steps = ArrayJoinHelper::addArrayJoinStep(getContext(), plan, array_join_actions_dag, false);
const auto & array_join_output_header = *plan.getCurrentHeader();
DB::ActionsDAG flatten_actions_dag(array_join_output_header.getColumnsWithTypeAndName());
DB::Names flatten_output_column_names;
for (size_t i = 0; i < array_join_output_header.columns() - 1; ++i)
{
const auto & col = array_join_output_header.getByPosition(i);
flatten_output_column_names.push_back(col.name);
}
auto last_column = array_join_output_header.getByPosition(array_join_output_header.columns() - 1);
const auto * tuple_column = typeid_cast<const DB::ColumnTuple *>(last_column.column.get());
const auto * tuple_datatype = typeid_cast<const DB::DataTypeTuple *>(last_column.type.get());
const auto & field_names = tuple_datatype->getElementNames();
DB::DataTypePtr tuple_index_type = std::make_shared<DB::DataTypeUInt32>();
const auto * tuple_node = flatten_actions_dag.getInputs().back();
for (size_t i = 0; i < field_names.size(); ++i)
{
DB::ActionsDAG::NodeRawConstPtrs tuple_index_args;
tuple_index_args.push_back(tuple_node);
tuple_index_args.push_back(expression_parser->addConstColumn(flatten_actions_dag, tuple_index_type, i + 1));
const auto * field_node = expression_parser->toFunctionNode(flatten_actions_dag, "tupleElement", tuple_index_args, field_names[i]);
flatten_actions_dag.addOrReplaceInOutputs(*field_node);
flatten_output_column_names.push_back(field_node->result_name);
}
flatten_actions_dag.removeUnusedActions(flatten_output_column_names);
LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Actions dag for untupling aggregate result:\n{}", flatten_actions_dag.dumpDAG());
auto flatten_expression_step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), std::move(flatten_actions_dag));
flatten_expression_step->setStepDescription("Untuple the aggregation result");
plan.addStep(std::move(flatten_expression_step));
const auto & flatten_tuple_output_header = plan.getCurrentHeader();
auto window_result_column = flatten_tuple_output_header->getByPosition(flatten_tuple_output_header->columns() - 1);
/// The result column is put at the end of the header.
auto output_header{*input_header};
output_header.insert(window_result_column);
auto adjust_pos_actions_dag = DB::ActionsDAG::makeConvertingActions(
flatten_tuple_output_header->getColumnsWithTypeAndName(),
output_header.getColumnsWithTypeAndName(),
DB::ActionsDAG::MatchColumnsMode::Name);
LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Actions dag for replacing columns:\n{}", adjust_pos_actions_dag.dumpDAG());
auto adjust_pos_expression_step = std::make_unique<DB::ExpressionStep>(flatten_tuple_output_header, std::move(adjust_pos_actions_dag));
adjust_pos_expression_step->setStepDescription("Adjust position of the output columns");
plan.addStep(std::move(adjust_pos_expression_step));
}
void AggregateGroupLimitRelParser::addSortStep(DB::QueryPlan & plan)
{
const auto & header = *plan.getCurrentHeader();
auto full_sort_descr = parseSortFields(header, win_rel_def->partition_expressions());
auto sort_descr = parseSortFields(header, win_rel_def->sorts());
full_sort_descr.insert(full_sort_descr.end(), sort_descr.begin(), sort_descr.end());
DB::SortingStep::Settings settings(getContext()->getSettingsRef());
auto config = MemoryConfig::loadFromContext(getContext());
double spill_mem_ratio = config.spill_mem_ratio;
settings.worth_external_sort = [spill_mem_ratio]() -> bool { return currentThreadGroupMemoryUsageRatio() > spill_mem_ratio; };
auto sorting_step = std::make_unique<DB::SortingStep>(plan.getCurrentHeader(), full_sort_descr, 0, settings);
sorting_step->setStepDescription("Sorting step");
plan.addStep(std::move(sorting_step));
}
static DB::WindowFrame buildWindowFrame(const std::string & ch_function_name)
{
DB::WindowFrame frame;
// default window frame is [unbounded preceding, current row]
if (ch_function_name == "row_number")
{
frame.type = DB::WindowFrame::FrameType::ROWS;
frame.begin_type = DB::WindowFrame::BoundaryType::Offset;
frame.begin_offset = 1;
}
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow window function: {}", ch_function_name);
return frame;
}
static DB::WindowFunctionDescription buildWindowFunctionDescription(const std::string & ch_function_name)
{
DB::WindowFunctionDescription description;
if (ch_function_name == "row_number")
{
description.column_name = ch_function_name;
description.function_node = nullptr;
DB::AggregateFunctionProperties agg_props;
auto agg_func = RelParser::getAggregateFunction(ch_function_name, {}, agg_props, {});
description.aggregate_function = agg_func;
}
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow window function: {}", ch_function_name);
return description;
}
// TODO: WindowGroupLimitStep has bad performance, need to improve it. So we still use window + filter here.
void AggregateGroupLimitRelParser::addWindowLimitStep(DB::QueryPlan & plan)
{
google::protobuf::StringValue optimize_info_str;
optimize_info_str.ParseFromString(win_rel_def->advanced_extension().optimization().value());
auto optimization_info = WindowGroupOptimizationInfo::parse(optimize_info_str.value());
auto window_function_name = optimization_info.window_function;
const auto & in_header = plan.getCurrentHeader();
DB::WindowDescription win_descr;
win_descr.frame = buildWindowFrame(window_function_name);
win_descr.partition_by = parseSortFields(*in_header, win_rel_def->partition_expressions());
win_descr.order_by = parseSortFields(*in_header, win_rel_def->sorts());
win_descr.full_sort_description = win_descr.partition_by;
win_descr.full_sort_description.insert(win_descr.full_sort_description.end(), win_descr.order_by.begin(), win_descr.order_by.end());
DB::WriteBufferFromOwnString ss;
ss << "partition by " << DB::dumpSortDescription(win_descr.partition_by);
ss << " order by " << DB::dumpSortDescription(win_descr.order_by);
ss << " " << win_descr.frame.toString();
win_descr.window_name = ss.str();
auto win_func_description = buildWindowFunctionDescription(window_function_name);
win_descr.window_functions.push_back(win_func_description);
auto win_step = std::make_unique<WindowStep>(in_header, win_descr, win_descr.window_functions, false);
win_step->setStepDescription("Window (" + win_descr.window_name + ")");
plan.addStep(std::move(win_step));
const auto & win_result_header = plan.getCurrentHeader();
DB::ActionsDAG limit_actions_dag(win_result_header->getColumnsWithTypeAndName());
const auto * rank_value_node = limit_actions_dag.getInputs().back();
const auto * limit_value_node = expression_parser->addConstColumn(limit_actions_dag, std::make_shared<DB::DataTypeInt32>(), limit);
const auto * cmp_node = expression_parser->toFunctionNode(limit_actions_dag, "lessOrEquals", {rank_value_node, limit_value_node});
auto cmp_column_name = cmp_node->result_name;
limit_actions_dag.addOrReplaceInOutputs(*cmp_node);
auto filter_step = std::make_unique<DB::FilterStep>(win_result_header, std::move(limit_actions_dag), cmp_column_name, true);
plan.addStep(std::move(filter_step));
}
void registerWindowGroupLimitRelParser(RelParserFactory & factory)
{
auto builder = [](ParserContextPtr parser_context) { return std::make_shared<GroupLimitRelParser>(parser_context); };
factory.registerBuilder(substrait::Rel::RelTypeCase::kWindowGroupLimit, builder);
}
}