blob: e9bff30dba927f3344fcd16a99729199c1b281d4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/vtablet_block_convertor.h"
#include <fmt/format.h>
#include <gen_cpp/FrontendService.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>
#include <algorithm>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/consts.h"
#include "common/status.h"
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
#include "service/brpc.h"
#include "util/binary_cast.hpp"
#include "util/brpc_client_cache.h"
#include "util/thread.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/core/types.h"
#include "vec/core/wide_integer_to_string.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"
#include "vec/functions/simple_function_factory.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
// !FIXME: Here we should consider using MutableBlock, due to potential data reorganization
Status OlapTableBlockConvertor::validate_and_convert_block(
RuntimeState* state, vectorized::Block* input_block,
std::shared_ptr<vectorized::Block>& block, vectorized::VExprContextSPtrs output_vexpr_ctxs,
size_t rows, bool& has_filtered_rows) {
DCHECK(input_block->rows() > 0);
block = vectorized::Block::create_shared(input_block->get_columns_with_type_and_name());
if (!output_vexpr_ctxs.empty()) {
// Do vectorized expr here to speed up load
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
output_vexpr_ctxs, *input_block, block.get()));
}
if (_is_partial_update_and_auto_inc) {
// If this load is partial update and this table has a auto inc column,
// e.g. table schema: k1, v1, v2(auto inc)
// 1. insert columns include auto inc column
// e.g. insert into table (k1, v2) value(a, 1);
// we do nothing.
// 2. insert columns do not include auto inc column
// e.g. insert into table (k1, v1) value(a, a);
// we need to fill auto_inc_cols by creating a new column.
if (!_auto_inc_col_idx.has_value()) {
RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows));
}
} else if (_auto_inc_col_idx.has_value()) {
// fill the valus for auto-increment columns
DCHECK_EQ(_is_partial_update_and_auto_inc, false);
RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows));
}
int filtered_rows = 0;
{
SCOPED_RAW_TIMER(&_validate_data_ns);
_filter_map.clear();
_filter_map.resize(rows, 0);
auto st = _validate_data(state, block.get(), rows, filtered_rows);
_num_filtered_rows += filtered_rows;
has_filtered_rows = filtered_rows > 0;
if (!st.ok()) {
return st;
}
_convert_to_dest_desc_block(block.get());
}
return Status::OK();
}
void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size,
bool is_partial_update_and_auto_inc,
int32_t auto_increment_column_unique_id) {
_batch_size = batch_size;
if (is_partial_update_and_auto_inc) {
_is_partial_update_and_auto_inc = is_partial_update_and_auto_inc;
_auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
db_id, table_id, auto_increment_column_unique_id);
return;
}
for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) {
if (_output_tuple_desc->slots()[idx]->is_auto_increment()) {
_auto_inc_col_idx = idx;
_auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
db_id, table_id, _output_tuple_desc->slots()[idx]->col_unique_id());
_auto_inc_id_buffer->set_batch_size_at_least(_batch_size);
break;
}
}
}
template <bool is_min>
DecimalV2Value OlapTableBlockConvertor::_get_decimalv2_min_or_max(const DataTypePtr& type) {
std::map<std::pair<int, int>, DecimalV2Value>* pmap;
if constexpr (is_min) {
pmap = &_min_decimalv2_val;
} else {
pmap = &_max_decimalv2_val;
}
// found
auto iter = pmap->find(
{remove_nullable(type)->get_precision(), remove_nullable(type)->get_scale()});
if (iter != pmap->end()) {
return iter->second;
}
// save min or max DecimalV2Value for next time
DecimalV2Value value;
if constexpr (is_min) {
value.to_min_decimal(type->get_precision(), type->get_scale());
} else {
value.to_max_decimal(type->get_precision(), type->get_scale());
}
pmap->emplace(std::pair<int, int> {type->get_precision(), type->get_scale()}, value);
return value;
}
template <typename DecimalType, bool IsMin>
DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const DataTypePtr& type) {
std::map<int, typename DecimalType::NativeType>* pmap;
if constexpr (std::is_same_v<DecimalType, vectorized::Decimal32>) {
pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
} else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal64>) {
pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val;
} else if constexpr (std::is_same_v<DecimalType, vectorized::Decimal128V3>) {
pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val;
} else {
pmap = IsMin ? &_min_decimal256_val : &_max_decimal256_val;
}
// found
auto iter = pmap->find(type->get_precision());
if (iter != pmap->end()) {
return DecimalType(iter->second);
}
typename DecimalType::NativeType value;
if constexpr (IsMin) {
value = vectorized::min_decimal_value<DecimalType::PType>(type->get_precision());
} else {
value = vectorized::max_decimal_value<DecimalType::PType>(type->get_precision());
}
pmap->emplace(type->get_precision(), value);
return DecimalType(value);
}
Status OlapTableBlockConvertor::_internal_validate_column(
RuntimeState* state, vectorized::Block* block, const DataTypePtr& type,
vectorized::ColumnPtr column, size_t slot_index, fmt::memory_buffer& error_prefix,
const size_t row_count, vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](size_t row) {
_filter_map[row] = true;
auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; },
[&error_prefix, &error_msg]() -> std::string {
return fmt::to_string(error_prefix) +
fmt::to_string(error_msg);
});
error_msg.clear();
return ret;
};
const auto* column_ptr = vectorized::check_and_get_column<vectorized::ColumnNullable>(*column);
const auto& real_column_ptr =
column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr());
const auto* null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data();
auto need_to_validate = [](size_t j, size_t row, const std::vector<char>& filter_map,
const unsigned char* null_map) {
return !filter_map[row] && (null_map == nullptr || null_map[j] == 0);
};
// may change orig_column if substring function is performed
auto string_column_checker = [&state, &error_msg, need_to_validate,
set_invalid_and_append_error_msg](
vectorized::ColumnPtr& orig_column,
const DataTypePtr& orig_type,
vectorized::IColumn::Permutation* rows,
const std::vector<char>& filter_map) {
int limit = config::string_type_length_soft_limit_bytes;
int len = -1;
// when type.len is negative, std::min will return overflow value, so we need to check it
const auto* type_str =
check_and_get_data_type<DataTypeString>(remove_nullable(orig_type).get());
if (type_str) {
if (type_str->len() >= 0) {
len = type_str->len();
limit = std::min(limit, type_str->len());
}
}
const auto* tmp_column_ptr =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*orig_column);
const auto& tmp_real_column_ptr =
tmp_column_ptr == nullptr ? orig_column : (tmp_column_ptr->get_nested_column_ptr());
const auto* column_string =
assert_cast<const vectorized::ColumnString*>(tmp_real_column_ptr.get());
const auto* null_map =
tmp_column_ptr == nullptr ? nullptr : tmp_column_ptr->get_null_map_data().data();
const auto* __restrict offsets = column_string->get_offsets().data();
int invalid_count = 0;
size_t row_count = orig_column->size();
for (int64_t j = 0; j < row_count; ++j) {
invalid_count += (offsets[j] - offsets[j - 1]) > limit;
}
if (invalid_count) {
// For string column, if in non-strict load mode(for both insert stmt and stream load),
// truncate the string to schema len.
// After truncation, still need to check if byte len of each row exceed the schema len,
// because currently the schema len is defined in bytes, and substring works by unit of chars.
// This is a workaround for now, need to improve it after better support of multi-byte chars.
if (type_str && !state->enable_insert_strict()) {
ColumnsWithTypeAndName argument_template;
auto input_type = remove_nullable(orig_type);
auto pos_type = DataTypeFactory::instance().create_data_type(
FieldType::OLAP_FIELD_TYPE_INT, 0, 0);
auto len_type = DataTypeFactory::instance().create_data_type(
FieldType::OLAP_FIELD_TYPE_INT, 0, 0);
argument_template.emplace_back(nullptr, input_type, "string column");
argument_template.emplace_back(nullptr, pos_type, "pos column");
argument_template.emplace_back(nullptr, len_type, "len column");
auto func = SimpleFunctionFactory::instance().get_function(
"substring", argument_template, input_type, {}, state->be_exec_version());
if (!func) {
return Status::InternalError("get function substring failed");
}
auto pos_column = pos_type->create_column_const(row_count, to_field<TYPE_INT>(1));
auto len_column =
len_type->create_column_const(row_count, to_field<TYPE_INT>(limit));
Block tmp_block({{remove_nullable(orig_column), input_type, "string column"},
{pos_column, pos_type, "pos"},
{len_column, len_type, "len"},
{nullptr, input_type, "result"}});
RETURN_IF_ERROR(func->execute(nullptr, tmp_block, {0, 1, 2}, 3, row_count));
column_string = assert_cast<const vectorized::ColumnString*>(
tmp_block.get_by_position(3).column.get());
orig_column =
orig_column->is_nullable()
? ColumnNullable::create(tmp_block.get_by_position(3).column,
tmp_column_ptr->get_null_map_column_ptr())
: std::move(tmp_block.get_by_position(3).column);
}
for (size_t j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (need_to_validate(j, row, filter_map, null_map)) {
auto str_val = column_string->get_data_at(j);
bool invalid = str_val.size > limit;
if (invalid) {
if (str_val.size > len) {
fmt::format_to(error_msg, "{}",
"the length of input is too long than schema. ");
fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
str_val.to_prefix(32));
fmt::format_to(error_msg, "schema length: {}; ", len);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
} else if (str_val.size > limit) {
fmt::format_to(
error_msg, "{}",
"the length of input string is too long than vec schema. ");
fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
str_val.to_prefix(32));
fmt::format_to(error_msg, "schema length: {}; ", len);
fmt::format_to(error_msg, "limit length: {}; ", limit);
fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
}
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
}
}
return Status::OK();
};
switch (type->get_primitive_type()) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map));
block->get_by_position(slot_index).column = std::move(column);
break;
}
case TYPE_JSONB: {
const auto* column_string =
assert_cast<const vectorized::ColumnString*>(real_column_ptr.get());
for (size_t j = 0; j < row_count; ++j) {
if (!_filter_map[j]) {
if (type->is_nullable() && column_ptr && column_ptr->is_null_at(j)) {
continue;
}
auto str_val = column_string->get_data_at(j);
bool invalid = str_val.size == 0;
if (invalid) {
error_msg.clear();
fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid");
RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
}
}
}
break;
}
case TYPE_DECIMALV2: {
// column_decimal utilizes the ColumnPtr from the block* block in _validate_data and can be modified.
auto* column_decimal = const_cast<vectorized::ColumnDecimal128V2*>(
assert_cast<const vectorized::ColumnDecimal128V2*>(real_column_ptr.get()));
const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
for (size_t j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (need_to_validate(j, row, _filter_map, null_map)) {
auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
column_decimal->get_data()[j]);
bool invalid = false;
if (dec_val.greater_than_scale(type->get_scale())) {
auto code =
dec_val.round(&dec_val, remove_nullable(type)->get_scale(), HALF_UP);
column_decimal->get_data()[j] = dec_val.value();
if (code != E_DEC_OK) {
fmt::format_to(error_msg, "round one decimal failed.value={}; ",
dec_val.to_string());
invalid = true;
}
}
if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
fmt::format_to(error_msg, "{}", "decimal value is not valid for definition");
fmt::format_to(error_msg, ", value={}", dec_val.to_string());
fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(),
type->get_scale());
fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(),
max_decimalv2.to_string());
invalid = true;
}
if (invalid) {
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
}
break;
}
case TYPE_DECIMAL32: {
#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType) \
auto column_decimal = assert_cast<const vectorized::ColumnDecimal<DecimalType::PType>*>( \
real_column_ptr.get()); \
const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type); \
const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type); \
const auto* __restrict datas = column_decimal->get_data().data(); \
int invalid_count = 0; \
for (int j = 0; j < row_count; ++j) { \
const auto dec_val = datas[j]; \
invalid_count += dec_val > max_decimal || dec_val < min_decimal; \
} \
if (invalid_count) { \
for (size_t j = 0; j < row_count; ++j) { \
auto row = rows ? (*rows)[j] : j; \
if (need_to_validate(j, row, _filter_map, null_map)) { \
auto dec_val = column_decimal->get_data()[j]; \
bool invalid = false; \
if (dec_val > max_decimal || dec_val < min_decimal) { \
fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \
fmt::format_to(error_msg, ", value={}", dec_val.value); \
fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(), \
type->get_scale()); \
fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal.value, \
max_decimal.value); \
invalid = true; \
} \
if (invalid) { \
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \
} \
} \
} \
}
CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal32);
break;
}
case TYPE_DECIMAL64: {
CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal64);
break;
}
case TYPE_DECIMAL128I: {
CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal128V3);
break;
}
case TYPE_DECIMAL256: {
CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal256);
break;
}
#undef CHECK_VALIDATION_FOR_DECIMALV3
case TYPE_ARRAY: {
const auto* column_array =
assert_cast<const vectorized::ColumnArray*>(real_column_ptr.get());
const auto* type_array =
assert_cast<const vectorized::DataTypeArray*>(remove_nullable(type).get());
auto nested_type = type_array->get_nested_type();
const auto& offsets = column_array->get_offsets();
vectorized::IColumn::Permutation permutation(offsets.back());
for (size_t r = 0; r < row_count; ++r) {
for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
permutation[c] = rows ? (*rows)[r] : r;
}
}
fmt::format_to(error_prefix, "ARRAY type failed: ");
auto data_column_ptr = column_array->get_data_ptr();
switch (nested_type->get_primitive_type()) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
RETURN_IF_ERROR(
string_column_checker(data_column_ptr, nested_type, &permutation, _filter_map));
const_cast<vectorized::ColumnArray*>(column_array)->get_data_ptr() =
std::move(data_column_ptr);
break;
}
default:
RETURN_IF_ERROR(_validate_column(state, block, nested_type, data_column_ptr, slot_index,
error_prefix, permutation.size(), &permutation));
break;
}
break;
}
case TYPE_MAP: {
const auto* column_map = assert_cast<const vectorized::ColumnMap*>(real_column_ptr.get());
// column_map utilizes the ColumnPtr from the block* block in _validate_data and can be modified.
RETURN_IF_ERROR((const_cast<ColumnMap*>(column_map))->deduplicate_keys(true));
const auto* type_map =
assert_cast<const vectorized::DataTypeMap*>(remove_nullable(type).get());
auto key_type = type_map->get_key_type();
auto val_type = type_map->get_value_type();
const auto& offsets = column_map->get_offsets();
vectorized::IColumn::Permutation permutation(offsets.back());
for (size_t r = 0; r < row_count; ++r) {
for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
permutation[c] = rows ? (*rows)[r] : r;
}
}
fmt::format_to(error_prefix, "MAP type failed: ");
switch (key_type->get_primitive_type()) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
auto key_column_ptr = column_map->get_keys_ptr();
RETURN_IF_ERROR(
string_column_checker(key_column_ptr, key_type, &permutation, _filter_map));
const_cast<vectorized::ColumnMap*>(column_map)->get_keys_ptr() =
std::move(key_column_ptr);
break;
}
default:
RETURN_IF_ERROR(_validate_column(state, block, key_type, column_map->get_keys_ptr(),
slot_index, error_prefix, permutation.size(),
&permutation));
break;
}
switch (val_type->get_primitive_type()) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
auto value_column_ptr = column_map->get_values_ptr();
RETURN_IF_ERROR(
string_column_checker(value_column_ptr, val_type, &permutation, _filter_map));
const_cast<vectorized::ColumnMap*>(column_map)->get_values_ptr() =
std::move(value_column_ptr);
break;
}
default:
RETURN_IF_ERROR(_validate_column(state, block, val_type, column_map->get_values_ptr(),
slot_index, error_prefix, permutation.size(),
&permutation));
break;
}
break;
}
case TYPE_STRUCT: {
const auto column_struct =
assert_cast<const vectorized::ColumnStruct*>(real_column_ptr.get());
const auto* type_struct =
assert_cast<const vectorized::DataTypeStruct*>(remove_nullable(type).get());
DCHECK(type_struct->get_elements().size() == column_struct->tuple_size());
fmt::format_to(error_prefix, "STRUCT type failed: ");
for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
auto element_type = type_struct->get_element(sc);
switch (element_type->get_primitive_type()) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
auto element_column_ptr = column_struct->get_column_ptr(sc);
RETURN_IF_ERROR(string_column_checker(element_column_ptr, element_type, nullptr,
_filter_map));
const_cast<vectorized::ColumnStruct*>(column_struct)->get_column_ptr(sc) =
std::move(element_column_ptr);
break;
}
default:
RETURN_IF_ERROR(_validate_column(state, block, type_struct->get_element(sc),
column_struct->get_column_ptr(sc), slot_index,
error_prefix,
column_struct->get_column_ptr(sc)->size()));
break;
}
}
break;
}
case TYPE_AGG_STATE: {
auto* column_string = vectorized::check_and_get_column<ColumnString>(*real_column_ptr);
if (column_string) {
RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map));
}
break;
}
default:
break;
}
// Dispose the column should do not contain the NULL value
// Only two case:
// 1. column is nullable but the desc is not nullable
// 2. desc->type is BITMAP
if ((!type->is_nullable() || type->get_primitive_type() == TYPE_BITMAP) && column_ptr) {
for (int j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
if (null_map[j] && !_filter_map[row]) {
fmt::format_to(error_msg, "null value for not null column, type={}",
type->get_name());
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
}
return Status::OK();
}
Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::Block* block,
const size_t rows, int& filtered_rows) {
filtered_rows = 0;
Defer defer {[&] {
for (int i = 0; i < rows; ++i) {
filtered_rows += _filter_map[i];
}
}};
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
block->get_by_position(i).column =
block->get_by_position(i).column->convert_to_full_column_if_const();
const auto& column = block->get_by_position(i).column;
fmt::memory_buffer error_prefix;
fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
RETURN_IF_ERROR(
_validate_column(state, block, desc->type(), column, i, error_prefix, rows));
}
return Status::OK();
}
void OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::vectorized::Block* block) {
for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) {
if (desc->is_nullable()) {
block->get_by_position(i).type =
vectorized::make_nullable(block->get_by_position(i).type);
block->get_by_position(i).column =
vectorized::make_nullable(block->get_by_position(i).column);
} else {
block->get_by_position(i).type = assert_cast<const vectorized::DataTypeNullable&>(
*block->get_by_position(i).type)
.get_nested_type();
block->get_by_position(i).column = assert_cast<const vectorized::ColumnNullable&>(
*block->get_by_position(i).column)
.get_nested_column_ptr();
}
}
}
}
Status OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, size_t rows) {
size_t idx = _auto_inc_col_idx.value();
SlotDescriptor* slot = _output_tuple_desc->slots()[idx];
DCHECK(slot->type()->get_primitive_type() == PrimitiveType::TYPE_BIGINT);
DCHECK(!slot->is_nullable());
size_t null_value_count = 0;
auto dst_column = vectorized::ColumnInt64::create();
vectorized::ColumnInt64::Container& dst_values = dst_column->get_data();
vectorized::ColumnPtr src_column_ptr = block->get_by_position(idx).column;
if (const auto* const_column =
check_and_get_column<vectorized::ColumnConst>(src_column_ptr.get())) {
// for insert stmt like "insert into tbl1 select null,col1,col2,... from tbl2" or
// "insert into tbl1 select 1,col1,col2,... from tbl2", the type of literal's column
// will be `ColumnConst`
if (const_column->is_null_at(0)) {
// the input of autoinc column are all null literals
// fill the column with generated ids
null_value_count = rows;
std::vector<std::pair<int64_t, size_t>> res;
RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res));
for (auto [start, length] : res) {
_auto_inc_id_allocator.insert_ids(start, length);
}
for (size_t i = 0; i < rows; i++) {
dst_values.emplace_back(_auto_inc_id_allocator.next_id());
}
} else {
// the input of autoinc column are all int64 literals
// fill the column with that literal
int64_t value = const_column->get_int(0);
dst_values.resize_fill(rows, value);
}
} else if (const auto* src_nullable_column =
check_and_get_column<vectorized::ColumnNullable>(src_column_ptr.get())) {
auto src_nested_column_ptr = src_nullable_column->get_nested_column_ptr();
const auto& null_map_data = src_nullable_column->get_null_map_data();
dst_values.reserve(rows);
for (size_t i = 0; i < rows; i++) {
null_value_count += null_map_data[i];
}
std::vector<std::pair<int64_t, size_t>> res;
RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res));
for (auto [start, length] : res) {
_auto_inc_id_allocator.insert_ids(start, length);
}
for (size_t i = 0; i < rows; i++) {
dst_values.emplace_back((null_map_data[i] != 0) ? _auto_inc_id_allocator.next_id()
: src_nested_column_ptr->get_int(i));
}
} else {
return Status::OK();
}
block->get_by_position(idx).column = std::move(dst_column);
block->get_by_position(idx).type = remove_nullable(slot->type());
return Status::OK();
}
Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* block,
size_t rows) {
auto dst_column = vectorized::ColumnInt64::create();
vectorized::ColumnInt64::Container& dst_values = dst_column->get_data();
size_t null_value_count = rows;
std::vector<std::pair<int64_t, size_t>> res;
RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res));
for (auto [start, length] : res) {
_auto_inc_id_allocator.insert_ids(start, length);
}
for (size_t i = 0; i < rows; i++) {
dst_values.emplace_back(_auto_inc_id_allocator.next_id());
}
block->insert(vectorized::ColumnWithTypeAndName(std::move(dst_column),
std::make_shared<DataTypeInt64>(),
BeConsts::PARTIAL_UPDATE_AUTO_INC_COL));
return Status::OK();
}
} // namespace doris::vectorized