blob: e4281724533e385fa1fd75a7a2ec5328c125d9f5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <vec/data_types/data_type_number.h>
#include <vec/exprs/vcolumn_ref.h>
#include <vec/exprs/vslot_ref.h>
#include <memory>
#include <string>
#include <vector>
#include "common/status.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/lambda_function/lambda_function.h"
#include "vec/exprs/lambda_function/lambda_function_factory.h"
#include "vec/utils/util.hpp"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
class VExprContext;
// extend a block with all required parameters
struct LambdaArgs {
// the lambda function need the column ids of all the slots
std::vector<int> output_slot_ref_indexs;
// which line is extended to the original block
int64_t current_row_idx = 0;
// when a block is filled, the array may be truncated, recording where it was truncated
int64_t current_offset_in_array = 0;
// the beginning position of the array
size_t array_start = 0;
// the size of the array
int64_t cur_size = 0;
// offset of column array
const ColumnArray::Offsets64* offsets_ptr = nullptr;
// expend data of repeat times
int current_repeat_times = 0;
// whether the current row of the original block has been extended
bool current_row_eos = false;
};
class ArrayMapFunction : public LambdaFunction {
ENABLE_FACTORY_CREATOR(ArrayMapFunction);
public:
~ArrayMapFunction() override = default;
static constexpr auto name = "array_map";
static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); }
std::string get_name() const override { return name; }
Status execute(VExprContext* context, const vectorized::Block* block, size_t count,
ColumnPtr& result_column, const DataTypePtr& result_type,
const VExprSPtrs& children) const override {
LambdaArgs args_info;
// collect used slot ref in lambda function body
std::vector<int>& output_slot_ref_indexs = args_info.output_slot_ref_indexs;
_collect_slot_ref_column_id(children[0], output_slot_ref_indexs);
int gap = 0;
if (!output_slot_ref_indexs.empty()) {
auto max_id =
std::max_element(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end());
gap = *max_id + 1;
_set_column_ref_column_id(children[0], gap);
}
std::vector<std::string> names(gap);
DataTypes data_types(gap);
for (int i = 0; i < gap; ++i) {
if (_contains_column_id(output_slot_ref_indexs, i)) {
names[i] = block->get_by_position(i).name;
data_types[i] = block->get_by_position(i).type;
} else {
// padding some mock data to hold the position, like call block#rows function need
names[i] = "temp";
data_types[i] = std::make_shared<DataTypeUInt8>();
}
}
///* array_map(lambda,arg1,arg2,.....) *///
//1. child[1:end]->execute(src_block)
ColumnsWithTypeAndName arguments(children.size() - 1);
for (int i = 1; i < children.size(); ++i) {
ColumnPtr column;
RETURN_IF_ERROR(children[i]->execute_column(context, block, count, column));
arguments[i - 1].column = column;
arguments[i - 1].type = children[i]->execute_type(block);
arguments[i - 1].name = children[i]->expr_name();
}
// used for save column array outside null map
auto outside_null_map = ColumnUInt8::create(
arguments[0].column->convert_to_full_column_if_const()->size(), 0);
// offset column
MutableColumnPtr array_column_offset;
size_t nested_array_column_rows = 0;
ColumnPtr first_array_offsets = nullptr;
//2. get the result column from executed expr, and the needed is nested column of array
std::vector<ColumnPtr> lambda_datas(arguments.size());
for (int i = 0; i < arguments.size(); ++i) {
const auto& array_column_type_name = arguments[i];
auto column_array = array_column_type_name.column->convert_to_full_column_if_const();
auto type_array = array_column_type_name.type;
if (type_array->is_nullable()) {
// get the nullmap of nullable column
// hold the null column instead of a reference 'cause `column_array` will be assigned and freed below.
auto column_array_nullmap =
assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr();
// get the array column from nullable column
column_array = assert_cast<const ColumnNullable*>(column_array.get())
->get_nested_column_ptr();
// get the nested type from nullable type
type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get())
->get_nested_type();
// need to union nullmap from all columns
VectorizedUtils::update_null_map(
outside_null_map->get_data(),
assert_cast<const ColumnUInt8&>(*column_array_nullmap).get_data());
}
// here is the array column
const auto& col_array = assert_cast<const ColumnArray&>(*column_array);
const auto& col_type = assert_cast<const DataTypeArray&>(*type_array);
if (i == 0) {
nested_array_column_rows = col_array.get_data_ptr()->size();
first_array_offsets = col_array.get_offsets_ptr();
const auto& off_data = assert_cast<const ColumnArray::ColumnOffsets&>(
col_array.get_offsets_column());
array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size());
args_info.offsets_ptr = &col_array.get_offsets();
} else {
// select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2;
// c_array1: [0,1,2,3,4,5,6,7,8,9]
const auto& array_offsets =
assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets)
.get_data();
if (nested_array_column_rows != col_array.get_data_ptr()->size() ||
(!array_offsets.empty() &&
memcmp(array_offsets.data(), col_array.get_offsets().data(),
sizeof(array_offsets[0]) * array_offsets.size()) != 0)) {
return Status::InvalidArgument(
"in array map function, the input column size "
"are "
"not equal completely, nested column data rows 1st size is {}, {}th "
"size is {}.",
nested_array_column_rows, i + 1, col_array.get_data_ptr()->size());
}
}
lambda_datas[i] = col_array.get_data_ptr();
names.push_back("R" + array_column_type_name.name);
data_types.push_back(col_type.get_nested_type());
}
// if column_array is NULL, we know the array_data_column will not write any data,
// so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal
// the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump.
if (std::any_of(lambda_datas.begin(), lambda_datas.end(),
[](const auto& v) { return v->empty(); })) {
DataTypePtr nested_type;
bool is_nullable = result_type->is_nullable();
if (is_nullable) {
nested_type =
assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type();
} else {
nested_type = result_type;
}
auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get())
->get_nested_type()
->create_column();
auto result_array_column = ColumnArray::create(std::move(empty_nested_column),
std::move(array_column_offset));
if (is_nullable) {
result_column = ColumnNullable::create(std::move(result_array_column),
std::move(outside_null_map));
} else {
result_column = std::move(result_array_column);
}
return Status::OK();
}
MutableColumnPtr result_col = nullptr;
DataTypePtr res_type;
//process first row
args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
args_info.cur_size =
(*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start;
// lambda block to exectute the lambda, and reuse the memory
Block lambda_block;
auto column_size = names.size();
MutableColumns columns(column_size);
do {
bool mem_reuse = lambda_block.mem_reuse();
for (int i = 0; i < column_size; i++) {
if (mem_reuse) {
columns[i] = lambda_block.get_by_position(i).column->assume_mutable();
} else {
if (_contains_column_id(output_slot_ref_indexs, i) || i >= gap) {
// TODO: maybe could create const column, so not insert_many_from when extand data
// but now here handle batch_size of array nested data every time, so maybe have different rows
columns[i] = data_types[i]->create_column();
} else {
columns[i] = data_types[i]
->create_column_const_with_default_value(0)
->assume_mutable();
}
}
}
// batch_size of array nested data every time inorder to avoid memory overflow
while (columns[gap]->size() < batch_size) {
long max_step = batch_size - columns[gap]->size();
long current_step = std::min(
max_step, (long)(args_info.cur_size - args_info.current_offset_in_array));
size_t pos = args_info.array_start + args_info.current_offset_in_array;
for (int i = 0; i < arguments.size() && current_step > 0; ++i) {
columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step);
}
args_info.current_offset_in_array += current_step;
args_info.current_repeat_times += current_step;
if (args_info.current_offset_in_array >= args_info.cur_size) {
args_info.current_row_eos = true;
}
_extend_data(columns, block, args_info.current_repeat_times, gap,
args_info.current_row_idx, output_slot_ref_indexs);
args_info.current_repeat_times = 0;
if (args_info.current_row_eos) {
//current row is end of array, move to next row
args_info.current_row_idx++;
args_info.current_offset_in_array = 0;
if (args_info.current_row_idx >= count) {
break;
}
args_info.current_row_eos = false;
args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] -
args_info.array_start;
}
}
if (!mem_reuse) {
for (int i = 0; i < column_size; ++i) {
lambda_block.insert(vectorized::ColumnWithTypeAndName(std::move(columns[i]),
data_types[i], names[i]));
}
}
//3. child[0]->execute(new_block)
ColumnPtr res_col;
RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, lambda_block.rows(),
res_col));
res_col = res_col->convert_to_full_column_if_const();
res_type = children[0]->execute_type(&lambda_block);
if (!result_col) {
result_col = res_col->clone_empty();
}
result_col->insert_range_from(*res_col, 0, res_col->size());
lambda_block.clear_column_data(column_size);
} while (args_info.current_row_idx < count);
//4. get the result column after execution, reassemble it into a new array column, and return.
if (result_type->is_nullable()) {
if (res_type->is_nullable()) {
result_column = ColumnNullable::create(
ColumnArray::create(std::move(result_col), std::move(array_column_offset)),
std::move(outside_null_map));
} else {
// deal with eg: select array_map(x -> x is null, [null, 1, 2]);
// need to create the nested column null map for column array
auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
result_column = ColumnNullable::create(
ColumnArray::create(ColumnNullable::create(std::move(result_col),
std::move(nested_null_map)),
std::move(array_column_offset)),
std::move(outside_null_map));
}
} else {
if (res_type->is_nullable()) {
result_column =
ColumnArray::create(std::move(result_col), std::move(array_column_offset));
} else {
auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
result_column = ColumnArray::create(
ColumnNullable::create(std::move(result_col), std::move(nested_null_map)),
std::move(array_column_offset));
}
}
return Status::OK();
}
private:
bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs, int id) const {
const auto it = std::find(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end(), id);
return it != output_slot_ref_indexs.end();
}
void _set_column_ref_column_id(VExprSPtr expr, int gap) const {
for (const auto& child : expr->children()) {
if (child->is_column_ref()) {
auto* ref = static_cast<VColumnRef*>(child.get());
ref->set_gap(gap);
} else {
_set_column_ref_column_id(child, gap);
}
}
}
void _collect_slot_ref_column_id(VExprSPtr expr,
std::vector<int>& output_slot_ref_indexs) const {
for (const auto& child : expr->children()) {
if (child->is_slot_ref()) {
const auto* ref = static_cast<VSlotRef*>(child.get());
output_slot_ref_indexs.push_back(ref->column_id());
} else {
_collect_slot_ref_column_id(child, output_slot_ref_indexs);
}
}
}
void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* block,
int current_repeat_times, int size, int64_t current_row_idx,
const std::vector<int>& output_slot_ref_indexs) const {
if (!current_repeat_times || !size) {
return;
}
for (int i = 0; i < size; i++) {
if (_contains_column_id(output_slot_ref_indexs, i)) {
auto src_column =
block->get_by_position(i).column->convert_to_full_column_if_const();
columns[i]->insert_many_from(*src_column, current_row_idx, current_repeat_times);
} else {
// must be column const
DCHECK(is_column_const(*columns[i]));
columns[i]->resize(columns[i]->size() + current_repeat_times);
}
}
}
};
void register_function_array_map(doris::vectorized::LambdaFunctionFactory& factory) {
factory.register_function<ArrayMapFunction>();
}
#include "common/compile_check_end.h"
} // namespace doris::vectorized