blob: c7729c26c52de8fc50e42da78e10187393580fb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AdvancedExpandStep.h"
#include <Columns/ColumnAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include <Columns/IColumn.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/castColumn.h>
#include <Operator/StreamingAggregatingStep.h>
#include <Processors/ResizeProcessor.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Poco/Logger.h>
#include <Common/AggregateUtil.h>
#include <Common/BlockTypeUtils.h>
#include <Common/CHUtil.h>
#include <Common/logger_useful.h>
namespace local_engine
{
static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits{
{
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}};
}
AdvancedExpandStep::AdvancedExpandStep(
DB::ContextPtr context_,
const DB::SharedHeader & input_header_,
size_t grouping_keys_,
const DB::AggregateDescriptions & aggregate_descriptions_,
const ExpandField & project_set_exprs_)
: DB::ITransformingStep(input_header_, toShared(buildOutputHeader(*input_header_, project_set_exprs_)), getTraits())
, context(context_)
, grouping_keys(grouping_keys_)
, aggregate_descriptions(aggregate_descriptions_)
, project_set_exprs(project_set_exprs_)
{
}
DB::Block AdvancedExpandStep::buildOutputHeader(const DB::Block &, const ExpandField & project_set_exprs_)
{
DB::ColumnsWithTypeAndName cols;
const auto & types = project_set_exprs_.getTypes();
const auto & names = project_set_exprs_.getNames();
chassert(names.size() == types.size());
for (size_t i = 0; i < project_set_exprs_.getExpandCols(); ++i)
cols.emplace_back(DB::ColumnWithTypeAndName(types[i], names[i]));
return DB::Block(std::move(cols));
}
void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & pipeline_settings)
{
const auto & settings = context->getSettingsRef();
DB::Names aggregate_grouping_keys;
for (size_t i = 0; i < output_header->columns(); ++i)
{
const auto & col = output_header->getByPosition(i);
if (typeid_cast<const DB::ColumnAggregateFunction *>(col.column.get()))
break;
aggregate_grouping_keys.push_back(col.name);
}
// partial to partial aggregate
auto params = AggregatorParamsHelper::buildParams(
context, aggregate_grouping_keys, aggregate_descriptions, AggregatorParamsHelper::Mode::PARTIAL_TO_PARTIAL);
auto input_header = input_headers.front();
auto build_transform = [&](DB::OutputPortRawPtrs outputs)
{
DB::Processors new_processors;
for (auto & output : outputs)
{
auto expand_processor
= std::make_shared<AdvancedExpandTransform>(input_header, *output_header, grouping_keys, project_set_exprs);
DB::connect(*output, expand_processor->getInputs().front());
new_processors.push_back(expand_processor);
auto expand_output_header = expand_processor->getOutputs().front().getSharedHeader();
auto transform_params = std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params, false);
auto aggregate_processor = std::make_shared<StreamingAggregatingTransform>(context, expand_output_header, transform_params);
DB::connect(expand_processor->getOutputs().back(), aggregate_processor->getInputs().front());
new_processors.push_back(aggregate_processor);
auto aggregate_output_header = aggregate_processor->getOutputs().front().getHeader();
auto resize_processor = std::make_shared<DB::ResizeProcessor>(expand_output_header, 2, 1);
DB::connect(aggregate_processor->getOutputs().front(), resize_processor->getInputs().front());
DB::connect(expand_processor->getOutputs().front(), resize_processor->getInputs().back());
new_processors.push_back(resize_processor);
}
return new_processors;
};
pipeline.transform(build_transform);
}
void AdvancedExpandStep::describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const
{
if (!processors.empty())
DB::IQueryPlanStep::describePipeline(processors, settings);
}
void AdvancedExpandStep::updateOutputHeader()
{
output_header = toShared(buildOutputHeader(*input_headers.front(), project_set_exprs));
}
/// It has two output ports. The 1st output port is for high cardinality data, the 2nd output port is for
/// low cardinality data.
AdvancedExpandTransform::AdvancedExpandTransform(
const DB::SharedHeader & input_header_, const DB::Block & output_header_, size_t grouping_keys_, const ExpandField & project_set_exprs_)
: DB::IProcessor({input_header_}, {output_header_, output_header_})
, grouping_keys(grouping_keys_)
, project_set_exprs(project_set_exprs_)
, input_header(input_header_)
{
for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
{
const auto & kinds = project_set_exprs.getKinds()[i];
size_t n = 0;
for (size_t k = 0; k < grouping_keys; ++k)
if (kinds[k] == EXPAND_FIELD_KIND_SELECTION)
n += 1;
need_to_aggregate.push_back((n != grouping_keys));
}
for (auto & port : outputs)
output_ports.push_back(&port);
}
DB::IProcessor::Status AdvancedExpandTransform::prepare()
{
auto & input = inputs.front();
if (isCancelled() || output_ports[0]->isFinished() || output_ports[1]->isFinished())
{
input.close();
output_ports[0]->finish();
output_ports[1]->finish();
return Status::Finished;
}
if (has_output)
{
auto & output_port = *output_ports[need_to_aggregate[expand_expr_iterator - 1]];
if (output_port.canPush())
{
output_blocks[need_to_aggregate[expand_expr_iterator - 1]] += 1;
output_rows[need_to_aggregate[expand_expr_iterator - 1]] += output_chunk.getNumRows();
output_port.push(std::move(output_chunk));
has_output = false;
auto status = expand_expr_iterator >= project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready;
return status;
}
else
{
return Status::PortFull;
}
}
if (!has_input)
{
if (input.isFinished())
{
LOG_DEBUG(
getLogger("AdvancedExpandTransform"),
"Input rows/blocks={}/{}. output rows/blocks=[{}/{}, {}/{}]",
input_rows,
input_blocks,
output_rows[0],
output_blocks[0],
output_rows[1],
output_blocks[1]);
output_ports[0]->finish();
output_ports[1]->finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
input_blocks += 1;
input_rows += input_chunk.getNumRows();
}
return Status::Ready;
}
void AdvancedExpandTransform::work()
{
expandInputChunk();
}
void AdvancedExpandTransform::expandInputChunk()
{
const auto & input_columns = input_chunk.getColumns();
const auto & types = project_set_exprs.getTypes();
const auto & kinds = project_set_exprs.getKinds()[expand_expr_iterator];
const auto & fields = project_set_exprs.getFields()[expand_expr_iterator];
size_t rows = input_chunk.getNumRows();
DB::Columns columns(types.size());
for (size_t col_i = 0; col_i < types.size(); ++col_i)
{
const auto & type = types[col_i];
const auto & kind = kinds[col_i];
const auto & field = fields[col_i];
if (kind == EXPAND_FIELD_KIND_SELECTION)
{
auto index = field.safeGet<Int32>();
const auto & input_column = input_columns[index];
DB::ColumnWithTypeAndName input_arg;
input_arg.column = input_column;
input_arg.type = input_header->getByPosition(index).type;
/// input_column maybe non-Nullable
columns[col_i] = DB::castColumn(input_arg, type);
}
else if (kind == EXPAND_FIELD_KIND_LITERAL)
{
/// Add const column with field value
auto column = type->createColumnConst(rows, field)->convertToFullColumnIfConst();
columns[col_i] = std::move(column);
}
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown ExpandFieldKind {}", magic_enum::enum_name(kind));
}
output_chunk = DB::Chunk(std::move(columns), rows);
has_output = true;
++expand_expr_iterator;
has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
}
}