blob: afc6596d55b248dd000da0938e11f9ba3caf5ef3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ExpandTransform.h"
#include <memory>
#include <Columns/ColumnsNumber.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/castColumn.h>
#include <Processors/IProcessor.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace local_engine
{
ExpandTransform::ExpandTransform(const DB::SharedHeader & input_, const DB::SharedHeader & output_, const ExpandField & project_set_exprs_)
: DB::IProcessor({input_}, {output_}), project_set_exprs(project_set_exprs_)
{
}
ExpandTransform::Status ExpandTransform::prepare()
{
auto & output = outputs.front();
auto & input = inputs.front();
if (output.isFinished() || isCancelled())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
return Status::PortFull;
}
if (!has_input)
{
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input_chunk = input.pull(true);
has_input = true;
expand_expr_iterator = 0;
}
return Status::Ready;
}
void ExpandTransform::work()
{
if (expand_expr_iterator >= project_set_exprs.getExpandRows())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "expand_expr_iterator >= project_set_exprs.getExpandRows()");
const auto & input_header = getInputs().front().getHeader();
const auto & input_columns = input_chunk.getColumns();
const auto & types = project_set_exprs.getTypes();
const auto & kinds = project_set_exprs.getKinds()[expand_expr_iterator];
const auto & fields = project_set_exprs.getFields()[expand_expr_iterator];
size_t rows = input_chunk.getNumRows();
DB::Columns columns(types.size());
for (size_t col_i = 0; col_i < types.size(); ++col_i)
{
const auto & type = types[col_i];
const auto & kind = kinds[col_i];
const auto & field = fields[col_i];
if (kind == EXPAND_FIELD_KIND_SELECTION)
{
auto index = field.safeGet<Int32>();
const auto & input_column = input_columns[index];
DB::ColumnWithTypeAndName input_arg;
input_arg.column = input_column;
input_arg.type = input_header.getByPosition(index).type;
/// input_column maybe non-Nullable
columns[col_i] = DB::castColumn(input_arg, type);
}
else if (kind == EXPAND_FIELD_KIND_LITERAL)
{
/// Add const column with field value
auto column = type->createColumnConst(rows, field)->convertToFullColumnIfConst();
columns[col_i] = std::move(column);
}
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown ExpandFieldKind {}", magic_enum::enum_name(kind));
}
output_chunk = DB::Chunk(std::move(columns), rows);
has_output = true;
++expand_expr_iterator;
has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
}
}