blob: 6ac1462b55f378b01ece9ae1e58cf7e434f872cd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnVariant.cpp
// and modified by Doris
#include "vec/columns/column_variant.h"
#include <assert.h>
#include <fmt/core.h>
#include <fmt/format.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <optional>
#include <sstream>
#include <unordered_map>
#include <vector>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "exprs/json_functions.h"
#include "olap/olap_common.h"
#include "runtime/jsonb_value.h"
#include "runtime/primitive_type.h"
#include "util/defer_op.h"
#include "util/jsonb_utils.h"
#include "util/simd/bits.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
#include "vec/common/field_visitors.h"
#include "vec/common/schema_util.h"
#include "vec/common/string_buffer.hpp"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/convert_field_to_type.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nothing.h"
#include "vec/data_types/get_least_supertype.h"
#include "vec/json/path_in_data.h"
#ifdef __AVX2__
#include "util/jsonb_parser_simd.h"
#else
#include "util/jsonb_parser.h"
#endif
namespace doris::vectorized {
namespace {
DataTypePtr create_array_of_type(PrimitiveType type, size_t num_dimensions, bool is_nullable,
int precision = -1, int scale = -1) {
DataTypePtr result = type == PrimitiveType::INVALID_TYPE
? is_nullable ? make_nullable(std::make_shared<DataTypeNothing>())
: std::dynamic_pointer_cast<IDataType>(
std::make_shared<DataTypeNothing>())
: DataTypeFactory::instance().create_data_type(type, is_nullable,
precision, scale);
for (size_t i = 0; i < num_dimensions; ++i) {
result = std::make_shared<DataTypeArray>(result);
if (is_nullable) {
// wrap array with nullable
result = make_nullable(result);
}
}
return result;
}
DataTypePtr get_base_type_of_array(const DataTypePtr& type) {
/// Get raw pointers to avoid extra copying of type pointers.
const DataTypeArray* last_array = nullptr;
const auto* current_type = type.get();
if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) {
current_type = nullable->get_nested_type().get();
}
while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) {
current_type = type_array->get_nested_type().get();
last_array = type_array;
if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) {
current_type = nullable->get_nested_type().get();
}
}
return last_array ? last_array->get_nested_type() : type;
}
size_t get_number_of_dimensions(const IDataType& type) {
int num_dimensions = 0;
const auto* current_type = &type;
if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) {
current_type = nullable->get_nested_type().get();
}
while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) {
current_type = type_array->get_nested_type().get();
num_dimensions += 1;
if (const auto* nullable = typeid_cast<const DataTypeNullable*>(current_type)) {
current_type = nullable->get_nested_type().get();
}
}
return num_dimensions;
}
} // namespace
// current nested level is 2, inside column object
constexpr int CURRENT_SERIALIZE_NESTING_LEVEL = 2;
DataTypeSerDeSPtr ColumnVariant::Subcolumn::generate_data_serdes(DataTypePtr type, bool is_root) {
// For the root column, there is no path, so there is no need to add extra '"'
if (is_root) {
return type->get_serde(CURRENT_SERIALIZE_NESTING_LEVEL - 1);
} else {
return type->get_serde(CURRENT_SERIALIZE_NESTING_LEVEL);
}
}
ColumnVariant::Subcolumn::Subcolumn(MutableColumnPtr&& data_, DataTypePtr type, bool is_nullable_,
bool is_root_)
: least_common_type(type),
is_nullable(is_nullable_),
is_root(is_root_),
num_rows(data_->size()) {
data.push_back(std::move(data_));
data_types.push_back(type);
data_serdes.push_back(generate_data_serdes(type, is_root));
DCHECK_EQ(data.size(), data_types.size());
DCHECK_EQ(data.size(), data_serdes.size());
}
ColumnVariant::Subcolumn::Subcolumn(size_t size_, bool is_nullable_, bool is_root_)
: least_common_type(std::make_shared<DataTypeNothing>()),
is_nullable(is_nullable_),
num_of_defaults_in_prefix(size_),
is_root(is_root_),
num_rows(size_) {}
size_t ColumnVariant::Subcolumn::Subcolumn::size() const {
return num_rows + current_num_of_defaults;
}
size_t ColumnVariant::Subcolumn::Subcolumn::byteSize() const {
size_t res = 0;
for (const auto& part : data) {
res += part->byte_size();
}
return res;
}
size_t ColumnVariant::Subcolumn::Subcolumn::allocatedBytes() const {
size_t res = 0;
for (const auto& part : data) {
res += part->allocated_bytes();
}
return res;
}
void ColumnVariant::Subcolumn::insert(FieldWithDataType field) {
FieldInfo info;
info.precision = field.precision;
info.scale = field.scale;
info.scalar_type_id = field.field->get_type();
// if the field is nested type is valid, we need to use the base scalar type id
// which means the field is array
if (field.base_scalar_type_id != PrimitiveType::INVALID_TYPE) {
info.scalar_type_id = field.base_scalar_type_id;
}
info.num_dimensions = field.num_dimensions;
insert(std::move(*field.field), info);
}
void ColumnVariant::Subcolumn::insert(Field field) {
FieldInfo info;
schema_util::get_field_info(field, &info);
insert(std::move(field), info);
}
void ColumnVariant::Subcolumn::add_new_column_part(DataTypePtr type) {
data.push_back(type->create_column());
least_common_type = LeastCommonType {type, is_root};
data_types.push_back(type);
data_serdes.push_back(generate_data_serdes(type, is_root));
DCHECK_EQ(data.size(), data_types.size());
DCHECK_EQ(data.size(), data_serdes.size());
}
void ColumnVariant::Subcolumn::insert(Field field, FieldInfo info) {
auto base_type = info.scalar_type_id;
if (base_type == PrimitiveType::INVALID_TYPE && info.num_dimensions == 0) {
insert_default();
return;
}
++num_rows;
auto column_dim = least_common_type.get_dimensions();
auto value_dim = info.num_dimensions;
if (least_common_type.get_base()->get_primitive_type() == INVALID_TYPE) {
column_dim = value_dim;
}
if (base_type == PrimitiveType::INVALID_TYPE) {
value_dim = column_dim;
}
bool type_changed = false;
if (value_dim != column_dim || info.num_dimensions >= 2) {
// Deduce to JSONB
VLOG_DEBUG << fmt::format(
"Dimension of types mismatched between inserted value and column, "
"expected:{}, but meet:{} for type:{}",
column_dim, value_dim, least_common_type.get()->get_name());
base_type = MOST_COMMON_TYPE_ID;
value_dim = 0;
type_changed = true;
}
// Currently we support specify predefined schema for other types include decimal, datetime ...etc
// so we should set specified info to create correct types, and those predefined types are static and
// no conflict, so we can set them directly.
auto base_data_type =
create_array_of_type(base_type, value_dim, is_nullable, info.precision, info.scale);
if (data.empty()) {
add_new_column_part(base_data_type);
} else if ((least_common_type.get_base_type_id() != base_type &&
base_type != PrimitiveType::INVALID_TYPE) ||
type_changed) {
if (schema_util::is_conversion_required_between_integers(
base_type, least_common_type.get_base_type_id())) {
DataTypePtr least_type;
get_least_supertype_jsonb(DataTypes {base_data_type, least_common_type.get()},
&least_type);
if (!least_type->equals(*base_data_type)) {
type_changed = true;
}
add_new_column_part(least_type);
}
}
// 1. type changed means encounter different type, we need to convert it to the least common type
// 2. need_convert means the type is not the same as the least common type, we need to convert it
if (type_changed || info.need_convert) {
Field new_field;
convert_field_to_type(field, *least_common_type.get(), &new_field);
field = new_field;
}
data.back()->insert(field);
}
static DataTypePtr create_array(PrimitiveType type, size_t num_dimensions) {
if (type == PrimitiveType::INVALID_TYPE) {
return std::make_shared<DataTypeNothing>();
}
DataTypePtr result_type = DataTypeFactory::instance().create_data_type(type, true);
for (size_t i = 0; i < num_dimensions; ++i) {
result_type = make_nullable(std::make_shared<DataTypeArray>(result_type));
}
return result_type;
}
Array create_empty_array_field(size_t num_dimensions) {
if (num_dimensions == 0) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"Cannot create array field with 0 dimensions");
}
Array array;
Array* current_array = &array;
for (size_t i = 1; i < num_dimensions; ++i) {
current_array->push_back(Field::create_field<TYPE_NULL>(Null()));
current_array = &current_array->back().get<Array&>();
}
return array;
}
// Recreates column with default scalar values and keeps sizes of arrays.
static ColumnPtr recreate_column_with_default_values(const ColumnPtr& column,
PrimitiveType scalar_type,
size_t num_dimensions) {
const auto* column_array = check_and_get_column<ColumnArray>(remove_nullable(column).get());
if (column_array != nullptr && num_dimensions != 0) {
return make_nullable(ColumnArray::create(
recreate_column_with_default_values(column_array->get_data_ptr(), scalar_type,
num_dimensions - 1),
IColumn::mutate(column_array->get_offsets_ptr())));
}
return create_array(scalar_type, num_dimensions)
->create_column()
->clone_resized(column->size());
}
ColumnVariant::Subcolumn ColumnVariant::Subcolumn::clone_with_default_values(
const FieldInfo& field_info) const {
Subcolumn new_subcolumn(*this);
new_subcolumn.least_common_type = LeastCommonType {
create_array(field_info.scalar_type_id, field_info.num_dimensions), is_root};
for (int i = 0; i < new_subcolumn.data.size(); ++i) {
new_subcolumn.data[i] = recreate_column_with_default_values(
new_subcolumn.data[i], field_info.scalar_type_id, field_info.num_dimensions);
new_subcolumn.data_types[i] = create_array_of_type(field_info.scalar_type_id,
field_info.num_dimensions, is_nullable);
new_subcolumn.data_serdes[i] = generate_data_serdes(new_subcolumn.data_types[i], false);
}
return new_subcolumn;
}
Field ColumnVariant::Subcolumn::get_last_field() const {
if (data.empty()) {
return Field();
}
const auto& last_part = data.back();
assert(!last_part->empty());
return (*last_part)[last_part->size() - 1];
}
void ColumnVariant::Subcolumn::insert_range_from(const Subcolumn& src, size_t start,
size_t length) {
if (start + length > src.size()) {
throw doris::Exception(
ErrorCode::OUT_OF_BOUND,
"Invalid range for insert_range_from: start={}, length={}, src.size={}", start,
length, src.size());
}
size_t end = start + length;
num_rows += length;
if (data.empty()) {
add_new_column_part(src.get_least_common_type());
} else if (!least_common_type.get()->equals(*src.get_least_common_type())) {
DataTypePtr new_least_common_type;
get_least_supertype_jsonb(DataTypes {least_common_type.get(), src.get_least_common_type()},
&new_least_common_type);
if (!new_least_common_type->equals(*least_common_type.get())) {
add_new_column_part(std::move(new_least_common_type));
}
}
if (end <= src.num_of_defaults_in_prefix) {
data.back()->insert_many_defaults(length);
return;
}
if (start < src.num_of_defaults_in_prefix) {
data.back()->insert_many_defaults(src.num_of_defaults_in_prefix - start);
}
auto insert_from_part = [&](const auto& column, const auto& column_type, size_t from,
size_t n) {
if (from + n > column->size()) {
throw doris::Exception(
ErrorCode::OUT_OF_BOUND,
"Invalid range for insert_range_from: from={}, n={}, column.size={}", from, n,
column->size());
}
if (column_type->equals(*least_common_type.get())) {
data.back()->insert_range_from(*column, from, n);
return;
}
/// If we need to insert large range, there is no sense to cut part of column and cast it.
/// Casting of all column and inserting from it can be faster.
/// Threshold is just a guess.
if (n * 3 >= column->size()) {
ColumnPtr casted_column;
Status st = schema_util::cast_column({column, column_type, ""}, least_common_type.get(),
&casted_column);
if (!st.ok()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string());
}
data.back()->insert_range_from(*casted_column, from, n);
return;
}
auto casted_column = column->cut(from, n);
Status st = schema_util::cast_column({casted_column, column_type, ""},
least_common_type.get(), &casted_column);
if (!st.ok()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string());
}
data.back()->insert_range_from(*casted_column, 0, n);
};
size_t pos = 0;
size_t processed_rows = src.num_of_defaults_in_prefix;
/// Find the first part of the column that intersects the range.
while (pos < src.data.size() && processed_rows + src.data[pos]->size() < start) {
processed_rows += src.data[pos]->size();
++pos;
}
/// Insert from the first part of column.
if (pos < src.data.size() && processed_rows < start) {
size_t part_start = start - processed_rows;
size_t part_length = std::min(src.data[pos]->size() - part_start, end - start);
insert_from_part(src.data[pos], src.data_types[pos], part_start, part_length);
processed_rows += src.data[pos]->size();
++pos;
}
/// Insert from the parts of column in the middle of range.
while (pos < src.data.size() && processed_rows + src.data[pos]->size() < end) {
insert_from_part(src.data[pos], src.data_types[pos], 0, src.data[pos]->size());
processed_rows += src.data[pos]->size();
++pos;
}
/// Insert from the last part of column if needed.
if (pos < src.data.size() && processed_rows < end) {
size_t part_end = end - processed_rows;
insert_from_part(src.data[pos], src.data_types[pos], 0, part_end);
}
}
bool ColumnVariant::Subcolumn::is_finalized() const {
return current_num_of_defaults == 0 && num_of_defaults_in_prefix == 0 &&
(data.empty() || (data.size() == 1));
}
void ColumnVariant::Subcolumn::resize(size_t n) {
if (n == num_rows) {
return;
}
if (n > num_rows) {
insert_many_defaults(n - num_rows);
} else {
pop_back(num_rows - n);
}
}
template <typename Func>
MutableColumnPtr ColumnVariant::apply_for_columns(Func&& func) const {
if (!is_finalized()) {
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnVariant&>(*finalized);
return finalized_object.apply_for_columns(std::forward<Func>(func));
}
auto new_root = func(get_root())->assume_mutable();
auto res = ColumnVariant::create(_max_subcolumns_count, get_root_type(), std::move(new_root));
for (const auto& subcolumn : subcolumns) {
if (subcolumn->data.is_root) {
continue;
}
auto new_subcolumn = func(subcolumn->data.get_finalized_column_ptr());
if (!res->add_sub_column(subcolumn->path, new_subcolumn->assume_mutable(),
subcolumn->data.get_least_common_type())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "add path {} is error",
subcolumn->path.get_path());
}
}
auto sparse_column = func(serialized_sparse_column);
res->serialized_sparse_column = sparse_column->assume_mutable();
res->num_rows = res->serialized_sparse_column->size();
ENABLE_CHECK_CONSISTENCY(res.get());
return res;
}
void ColumnVariant::resize(size_t n) {
if (n == num_rows) {
return;
}
if (n > num_rows) {
insert_many_defaults(n - num_rows);
} else {
for (auto& subcolumn : subcolumns) {
subcolumn->data.pop_back(num_rows - n);
}
serialized_sparse_column->pop_back(num_rows - n);
}
num_rows = n;
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::Subcolumn::finalize(FinalizeMode mode) {
if (current_num_of_defaults) {
insert_many_defaults(current_num_of_defaults);
current_num_of_defaults = 0;
}
if (!is_root && data.size() == 1 && num_of_defaults_in_prefix == 0) {
data[0] = data[0]->convert_to_full_column_if_const();
return;
}
DataTypePtr to_type = least_common_type.get();
if (mode == FinalizeMode::WRITE_MODE && is_root) {
// Root always JSONB type in write mode
to_type = is_nullable ? make_nullable(std::make_shared<MostCommonType>())
: std::make_shared<MostCommonType>();
least_common_type = LeastCommonType {to_type, is_root};
}
auto result_column = to_type->create_column();
if (num_of_defaults_in_prefix) {
result_column->insert_many_defaults(num_of_defaults_in_prefix);
}
for (size_t i = 0; i < data.size(); ++i) {
auto& part = data[i];
auto from_type = data_types[i];
part = part->convert_to_full_column_if_const();
size_t part_size = part->size();
if (!from_type->equals(*to_type)) {
ColumnPtr ptr;
Status st = schema_util::cast_column({part, from_type, ""}, to_type, &ptr);
if (!st.ok()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, st.to_string());
}
part = ptr->convert_to_full_column_if_const();
}
result_column->insert_range_from(*part, 0, part_size);
}
data = {std::move(result_column)};
data_types = {std::move(to_type)};
data_serdes = {(generate_data_serdes(data_types[0], is_root))};
DCHECK_EQ(data.size(), data_types.size());
DCHECK_EQ(data.size(), data_serdes.size());
num_of_defaults_in_prefix = 0;
}
void ColumnVariant::Subcolumn::insert_default() {
if (UNLIKELY(data.empty())) {
++num_of_defaults_in_prefix;
} else {
data.back()->insert_default();
}
++num_rows;
}
void ColumnVariant::Subcolumn::insert_many_defaults(size_t length) {
if (data.empty()) {
num_of_defaults_in_prefix += length;
} else {
data.back()->insert_many_defaults(length);
}
num_rows += length;
}
void ColumnVariant::Subcolumn::pop_back(size_t n) {
if (n > size()) {
throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Invalid number of elements to pop: {}, size: {}", n, size());
}
num_rows -= n;
size_t num_removed = 0;
for (auto it = data.rbegin(); it != data.rend(); ++it) {
if (n == 0) {
break;
}
auto& column = *it;
if (n < column->size()) {
column->pop_back(n);
n = 0;
} else {
++num_removed;
n -= column->size();
}
}
size_t sz = data.size() - num_removed;
data.resize(sz);
data_types.resize(sz);
data_serdes.resize(sz);
// need to update least_common_type when pop_back a column from the last
least_common_type = sz > 0 ? LeastCommonType {data_types[sz - 1]}
: LeastCommonType {std::make_shared<DataTypeNothing>()};
num_of_defaults_in_prefix -= n;
}
IColumn& ColumnVariant::Subcolumn::get_finalized_column() {
if (!is_finalized()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized");
}
return *data[0];
}
const IColumn& ColumnVariant::Subcolumn::get_finalized_column() const {
if (!is_finalized()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized");
}
return *data[0];
}
const ColumnPtr& ColumnVariant::Subcolumn::get_finalized_column_ptr() const {
if (!is_finalized()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized");
}
return data[0];
}
ColumnPtr& ColumnVariant::Subcolumn::get_finalized_column_ptr() {
if (!is_finalized()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized");
}
return data[0];
}
void ColumnVariant::Subcolumn::remove_nullable() {
if (!is_finalized()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn is not finalized");
}
data[0] = doris::vectorized::remove_nullable(data[0]);
least_common_type.remove_nullable();
}
ColumnVariant::Subcolumn::LeastCommonType::LeastCommonType(DataTypePtr type_, bool is_root)
: type(std::move(type_)),
base_type(get_base_type_of_array(type)),
num_dimensions(get_number_of_dimensions(*type)) {
least_common_type_serder = Subcolumn::generate_data_serdes(type, is_root);
type_id = type->get_primitive_type();
base_type_id = base_type->get_primitive_type();
}
ColumnVariant::ColumnVariant(int32_t max_subcolumns_count)
: is_nullable(true), num_rows(0), _max_subcolumns_count(max_subcolumns_count) {
subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/));
ENABLE_CHECK_CONSISTENCY(this);
}
ColumnVariant::ColumnVariant(int32_t max_subcolumns_count, DataTypePtr root_type,
MutableColumnPtr&& root_column)
: is_nullable(true),
num_rows(root_column->size()),
_max_subcolumns_count(max_subcolumns_count) {
subcolumns.create_root(
Subcolumn(std::move(root_column), root_type, is_nullable, true /*root*/));
serialized_sparse_column->resize(num_rows);
ENABLE_CHECK_CONSISTENCY(this);
}
ColumnVariant::ColumnVariant(int32_t max_subcolumns_count, Subcolumns&& subcolumns_)
: is_nullable(true),
subcolumns(std::move(subcolumns_)),
num_rows(subcolumns.empty() ? 0 : (*subcolumns.begin())->data.size()),
_max_subcolumns_count(max_subcolumns_count) {
if (max_subcolumns_count && subcolumns_.size() > max_subcolumns_count + 1) {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"unmatched max subcolumns count:, max subcolumns count: {}, but "
"subcolumns count: {}",
max_subcolumns_count, subcolumns_.size());
}
serialized_sparse_column->resize(num_rows);
}
ColumnVariant::ColumnVariant(int32_t max_subcolumns_count, size_t size)
: is_nullable(true), num_rows(0), _max_subcolumns_count(max_subcolumns_count) {
subcolumns.create_root(Subcolumn(0, is_nullable, true /*root*/));
insert_many_defaults(size);
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::check_consistency() const {
CHECK(subcolumns.get_root() != nullptr);
for (const auto& leaf : subcolumns) {
if (num_rows != leaf->data.size()) {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"unmatched column: {}, expeted rows: {}, but meet: {}",
leaf->path.get_path(), num_rows, leaf->data.size());
}
}
if (num_rows != serialized_sparse_column->size()) {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"unmatched sparse column:, expeted rows: {}, but meet: {}", num_rows,
serialized_sparse_column->size());
}
}
size_t ColumnVariant::size() const {
ENABLE_CHECK_CONSISTENCY(this);
return num_rows;
}
MutableColumnPtr ColumnVariant::clone_resized(size_t new_size) const {
if (new_size == 0) {
return ColumnVariant::create(_max_subcolumns_count);
}
return apply_for_columns(
[&](const ColumnPtr column) { return column->clone_resized(new_size); });
}
size_t ColumnVariant::byte_size() const {
size_t res = 0;
for (const auto& entry : subcolumns) {
res += entry->data.byteSize();
}
res += serialized_sparse_column->byte_size();
return res;
}
size_t ColumnVariant::allocated_bytes() const {
size_t res = 0;
for (const auto& entry : subcolumns) {
res += entry->data.allocatedBytes();
}
res += serialized_sparse_column->allocated_bytes();
return res;
}
void ColumnVariant::for_each_subcolumn(ColumnCallback callback) {
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(part);
}
}
callback(serialized_sparse_column);
// callback may be filter, so the row count may be changed
num_rows = serialized_sparse_column->size();
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::insert_from(const IColumn& src, size_t n) {
const auto* src_v = check_and_get_column<ColumnVariant>(src);
return try_insert((*src_v)[n]);
}
void ColumnVariant::try_insert(const Field& field) {
DCHECK(field.get_type() == PrimitiveType::TYPE_VARIANT ||
field.get_type() == PrimitiveType::TYPE_NULL)
<< "not a variant field " << field.get_type();
size_t old_size = size();
const auto& object = field.get<const VariantMap&>();
for (const auto& [key, value] : object) {
if (!has_subcolumn(key)) {
bool succ = add_sub_column(key, old_size);
if (!succ) {
throw doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
"Failed to add sub column {}", key.get_path());
}
}
auto* subcolumn = get_subcolumn(key);
if (!subcolumn) {
doris::Exception(doris::ErrorCode::INVALID_ARGUMENT,
fmt::format("Failed to find sub column {}", key.get_path()));
}
subcolumn->insert(value);
}
for (auto& entry : subcolumns) {
if (old_size == entry->data.size()) {
bool inserted = UNLIKELY(entry->path.has_nested_part() &&
try_insert_default_from_nested(entry));
if (!inserted) {
entry->data.insert_default();
}
}
}
serialized_sparse_column->insert_default();
++num_rows;
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::insert_default() {
for (auto& entry : subcolumns) {
entry->data.insert_default();
}
serialized_sparse_column->insert_default();
++num_rows;
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::insert_many_defaults(size_t length) {
for (auto& entry : subcolumns) {
entry->data.insert_many_defaults(length);
}
serialized_sparse_column->resize(num_rows + length);
num_rows += length;
ENABLE_CHECK_CONSISTENCY(this);
}
bool ColumnVariant::Subcolumn::is_null_at(size_t n) const {
if (least_common_type.get_base_type_id() == PrimitiveType::INVALID_TYPE) {
return true;
}
size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
return true;
}
ind -= num_of_defaults_in_prefix;
for (const auto& part : data) {
if (ind < part->size()) {
const auto* nullable = check_and_get_column<ColumnNullable>(part.get());
return nullable ? nullable->is_null_at(ind) : false;
}
ind -= part->size();
}
throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
n);
}
void ColumnVariant::Subcolumn::get(size_t n, FieldWithDataType& res) const {
if (least_common_type.get_base_type_id() == PrimitiveType::INVALID_TYPE) {
res = FieldWithDataType(Field());
return;
}
if (is_finalized()) {
res = least_common_type.get()->get_field_with_data_type(*data[0], n);
return;
}
size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
res = FieldWithDataType(Field());
return;
}
ind -= num_of_defaults_in_prefix;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
const auto& part_type = data_types[i];
if (ind < part->size()) {
res = part_type->get_field_with_data_type(*part, ind);
return;
}
ind -= part->size();
}
throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
n);
}
void ColumnVariant::Subcolumn::serialize_to_sparse_column(ColumnString* key, std::string_view path,
ColumnString* value, size_t row) {
// no need insert
if (least_common_type.get_base_type_id() == PrimitiveType::INVALID_TYPE) {
return;
}
// no need insert
if (row < num_of_defaults_in_prefix) {
return;
}
// remove default
row -= num_of_defaults_in_prefix;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
const auto& nullable_col =
assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(*part);
size_t current_column_size = nullable_col.get_null_map_data().size();
if (row < current_column_size) {
// no need null in sparse column
if (!nullable_col.is_null_at(row)) {
// insert key
key->insert_data(path.data(), path.size());
// every subcolumn is always Nullable
auto nullable_serde =
std::static_pointer_cast<DataTypeNullableSerDe>(data_serdes[i]);
// insert value
ColumnString::Chars& chars = value->get_chars();
nullable_serde->get_nested_serde()->write_one_cell_to_binary(
nullable_col.get_nested_column(), chars, row);
value->get_offsets().push_back(chars.size());
}
return;
}
row -= current_column_size;
}
throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Index ({}) for serialize to sparse column is out of range", row);
}
const char* parse_binary_from_sparse_column(FieldType type, const char* data, Field& res,
FieldInfo& info_res) {
info_res.scalar_type_id = TabletColumn::get_primitive_type_by_field_type(type);
const char* end = data;
switch (type) {
case FieldType::OLAP_FIELD_TYPE_STRING: {
size_t size = *reinterpret_cast<const size_t*>(data);
data += sizeof(size_t);
res = Field::create_field<TYPE_STRING>(String(data, size));
end = data + size;
break;
}
case FieldType::OLAP_FIELD_TYPE_TINYINT: {
res = Field::create_field<TYPE_TINYINT>(Int8(*reinterpret_cast<const Int8*>(data)));
end = data + sizeof(Int8);
break;
}
case FieldType::OLAP_FIELD_TYPE_SMALLINT: {
res = Field::create_field<TYPE_SMALLINT>(Int16(*reinterpret_cast<const Int16*>(data)));
end = data + sizeof(Int16);
break;
}
case FieldType::OLAP_FIELD_TYPE_INT: {
res = Field::create_field<TYPE_INT>(Int32(*reinterpret_cast<const Int32*>(data)));
end = data + sizeof(Int32);
break;
}
case FieldType::OLAP_FIELD_TYPE_BIGINT: {
res = Field::create_field<TYPE_BIGINT>(Int64(*reinterpret_cast<const Int64*>(data)));
end = data + sizeof(Int64);
break;
}
case FieldType::OLAP_FIELD_TYPE_LARGEINT: {
res = Field::create_field<TYPE_LARGEINT>(
Int128(reinterpret_cast<const PackedInt128*>(data)->value));
end = data + sizeof(PackedInt128);
break;
}
case FieldType::OLAP_FIELD_TYPE_FLOAT: {
res = Field::create_field<TYPE_FLOAT>(Float32(*reinterpret_cast<const Float32*>(data)));
end = data + sizeof(Float32);
break;
}
case FieldType::OLAP_FIELD_TYPE_DOUBLE: {
res = Field::create_field<TYPE_DOUBLE>(Float64(*reinterpret_cast<const Float64*>(data)));
end = data + sizeof(Float64);
break;
}
case FieldType::OLAP_FIELD_TYPE_JSONB: {
size_t size = *reinterpret_cast<const size_t*>(data);
data += sizeof(size_t);
res = Field::create_field<TYPE_JSONB>(JsonbField(data, size));
end = data + size;
break;
}
case FieldType::OLAP_FIELD_TYPE_ARRAY: {
const size_t size = *reinterpret_cast<const size_t*>(data);
data += sizeof(size_t);
res = Field::create_field<TYPE_ARRAY>(Array(size));
auto& array = res.get<Array>();
info_res.num_dimensions++;
FieldType nested_filed_type = FieldType::OLAP_FIELD_TYPE_NONE;
for (size_t i = 0; i < size; ++i) {
Field nested_field;
const auto nested_type =
static_cast<FieldType>(*reinterpret_cast<const uint8_t*>(data++));
data = parse_binary_from_sparse_column(nested_type, data, nested_field, info_res);
array[i] = std::move(nested_field);
if (nested_type != FieldType::OLAP_FIELD_TYPE_NONE) {
nested_filed_type = nested_type;
}
}
info_res.scalar_type_id = TabletColumn::get_primitive_type_by_field_type(nested_filed_type);
end = data;
break;
}
case FieldType::OLAP_FIELD_TYPE_IPV4: {
res = Field::create_field<TYPE_IPV4>(IPv4(*reinterpret_cast<const IPv4*>(data)));
end = data + sizeof(IPv4);
break;
}
case FieldType::OLAP_FIELD_TYPE_IPV6: {
res = Field::create_field<TYPE_IPV6>(reinterpret_cast<const PackedUInt128*>(data)->value);
end = data + sizeof(PackedUInt128);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATEV2: {
res = Field::create_field<TYPE_DATEV2>(*reinterpret_cast<const UInt32*>(data));
end = data + sizeof(UInt32);
break;
}
case FieldType::OLAP_FIELD_TYPE_DATETIMEV2: {
const uint8_t scale = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
res = Field::create_field<TYPE_DATETIMEV2>(*reinterpret_cast<const UInt64*>(data));
info_res.precision = -1;
info_res.scale = static_cast<int>(scale);
end = data + sizeof(UInt64);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL32: {
const uint8_t precision = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
const uint8_t scale = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
res = Field::create_field<TYPE_DECIMAL32>(Decimal32(*reinterpret_cast<const Int32*>(data)));
info_res.precision = static_cast<int>(precision);
info_res.scale = static_cast<int>(scale);
end = data + sizeof(Int32);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL64: {
const uint8_t precision = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
const uint8_t scale = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
res = Field::create_field<TYPE_DECIMAL64>(Decimal64(*reinterpret_cast<const Int64*>(data)));
info_res.precision = static_cast<int>(precision);
info_res.scale = static_cast<int>(scale);
end = data + sizeof(Int64);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL128I: {
const uint8_t precision = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
const uint8_t scale = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
res = Field::create_field<TYPE_DECIMAL128I>(
Decimal128V3(reinterpret_cast<const PackedInt128*>(data)->value));
info_res.precision = static_cast<int>(precision);
info_res.scale = static_cast<int>(scale);
end = data + sizeof(PackedInt128);
break;
}
case FieldType::OLAP_FIELD_TYPE_DECIMAL256: {
const uint8_t precision = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
const uint8_t scale = *reinterpret_cast<const uint8_t*>(data);
data += sizeof(uint8_t);
res = Field::create_field<TYPE_DECIMAL256>(
Decimal256(*reinterpret_cast<const wide::Int256*>(data)));
info_res.precision = static_cast<int>(precision);
info_res.scale = static_cast<int>(scale);
end = data + sizeof(wide::Int256);
break;
}
case FieldType::OLAP_FIELD_TYPE_BOOL: {
res = Field::create_field<TYPE_BOOLEAN>(*reinterpret_cast<const uint8_t*>(data));
end = data + sizeof(uint8_t);
break;
}
case FieldType::OLAP_FIELD_TYPE_NONE: {
res = Field();
end = data;
break;
}
default:
throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Type ({}) for deserialize_from_sparse_column is invalid", type);
}
return end;
}
std::pair<Field, FieldInfo> ColumnVariant::deserialize_from_sparse_column(const ColumnString* value,
size_t row) {
const auto& data_ref = value->get_data_at(row);
const char* data = data_ref.data;
DCHECK(data_ref.size > 1);
const FieldType type = static_cast<FieldType>(*reinterpret_cast<const uint8_t*>(data++));
Field res;
FieldInfo info_res = {
.scalar_type_id = TabletColumn::get_primitive_type_by_field_type(type),
.have_nulls = false,
.need_convert = false,
.num_dimensions = 0,
};
const char* end = parse_binary_from_sparse_column(type, data, res, info_res);
DCHECK_EQ(end - data_ref.data, data_ref.size)
<< "FieldType: " << (int)type << " data_ref.size: " << data_ref.size << " end: " << end
<< " data: " << data;
return {std::move(res), std::move(info_res)};
}
Field ColumnVariant::operator[](size_t n) const {
Field object;
get(n, object);
return object;
}
void ColumnVariant::get(size_t n, Field& res) const {
if (UNLIKELY(n >= size())) {
throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Index ({}) for getting field is out of range for size {}", n,
size());
}
res = Field::create_field<TYPE_VARIANT>(VariantMap());
auto& object = res.get<VariantMap&>();
for (const auto& entry : subcolumns) {
FieldWithDataType field;
entry->data.get(n, field);
// Notice: we treat null as empty field, since we do not distinguish null and empty for Variant type.
if (field.field->get_type() == PrimitiveType::TYPE_NULL) {
continue;
}
object.try_emplace(PathInData(entry->path), std::move(field));
}
const auto& [path, value] = get_sparse_data_paths_and_values();
const auto& sparse_column_offsets = serialized_sparse_column_offsets();
size_t offset = sparse_column_offsets[n - 1];
size_t end = sparse_column_offsets[n];
// Iterator over [path, binary value]
for (size_t i = offset; i != end; ++i) {
const StringRef path_data = path->get_data_at(i);
auto data = ColumnVariant::deserialize_from_sparse_column(value, i);
object.try_emplace(
PathInData(path_data),
FieldWithDataType(std::move(data.first), data.second.precision, data.second.scale,
data.second.scalar_type_id, data.second.num_dimensions));
}
if (object.empty()) {
res = Field();
}
}
void ColumnVariant::add_nested_subcolumn(const PathInData& key, const FieldInfo& field_info,
size_t new_size) {
if (!key.has_nested_part()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Cannot add Nested subcolumn, because path doesn't contain Nested");
}
bool inserted = false;
/// We find node that represents the same Nested type as @key.
const auto* nested_node = subcolumns.find_best_match(key);
// TODO a better way to handle following case:
// {"a" : {"b" : [{"x" : 10}]}}
// {"a" : {"b" : {"c": [{"y" : 10}]}}}
// maybe a.b.c.y should not follow from a.b's nested data
// If we have a nested subcolumn and it contains nested node in it's path
if (nested_node &&
Subcolumns::find_parent(nested_node, [](const auto& node) { return node.is_nested(); })) {
/// Find any leaf of Nested subcolumn.
const auto* leaf = Subcolumns::find_leaf(nested_node, [&](const auto&) { return true; });
assert(leaf);
/// Recreate subcolumn with default values and the same sizes of arrays.
auto new_subcolumn = leaf->data.clone_with_default_values(field_info);
/// It's possible that we have already inserted value from current row
/// to this subcolumn. So, adjust size to expected.
if (new_subcolumn.size() > new_size) {
new_subcolumn.pop_back(new_subcolumn.size() - new_size);
}
assert(new_subcolumn.size() == new_size);
inserted = subcolumns.add(key, new_subcolumn);
} else {
/// If node was not found just add subcolumn with empty arrays.
inserted = subcolumns.add(key, Subcolumn(new_size, is_nullable));
}
if (!inserted) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Subcolumn '{}' already exists",
key.get_path());
}
if (num_rows == 0) {
num_rows = new_size;
} else if (new_size != num_rows) {
throw doris::Exception(
ErrorCode::INTERNAL_ERROR,
"Required size of subcolumn {} ({}) is inconsistent with column size ({})",
key.get_path(), new_size, num_rows);
}
}
void ColumnVariant::set_num_rows(size_t n) {
num_rows = n;
}
bool ColumnVariant::try_add_new_subcolumn(const PathInData& path) {
DCHECK(_max_subcolumns_count >= 0) << "max subcolumns count is: " << _max_subcolumns_count;
if (subcolumns.get_root() == nullptr || path.empty()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "column object has no root or path is empty");
}
if (path.get_is_typed() || path.has_nested_part()) {
return add_sub_column(path, num_rows);
}
// 1 for root, nested_path_count for nested path
int subcolumns_size = subcolumns.size() - nested_path_count - 1;
if (!_max_subcolumns_count || subcolumns_size < _max_subcolumns_count) {
return add_sub_column(path, num_rows);
}
return false;
}
void ColumnVariant::insert_range_from(const IColumn& src, size_t start, size_t length) {
const auto& src_object = assert_cast<const ColumnVariant&>(src);
ENABLE_CHECK_CONSISTENCY(&src_object);
ENABLE_CHECK_CONSISTENCY(this);
// First, insert src subcolumns
// We can reach the limit of subcolumns, and in this case
// the rest of subcolumns from src will be inserted into sparse column.
std::map<std::string_view, Subcolumn> src_path_and_subcoumn_for_sparse_column;
int idx_hint = 0;
for (const auto& entry : src_object.subcolumns) {
// Check if we already have such dense column path.
if (auto* subcolumn = get_subcolumn(entry->path, idx_hint); subcolumn != nullptr) {
subcolumn->insert_range_from(entry->data, start, length);
} else if (try_add_new_subcolumn(entry->path)) {
subcolumn = get_subcolumn(entry->path);
DCHECK(subcolumn != nullptr);
subcolumn->insert_range_from(entry->data, start, length);
} else {
CHECK(!entry->path.has_nested_part());
src_path_and_subcoumn_for_sparse_column.emplace(entry->path.get_path(), entry->data);
}
++idx_hint;
}
// Paths in sparse column are sorted, so paths from src_dense_column_path_for_sparse_column should be inserted properly
// to keep paths sorted. Let's sort them in advance.
std::vector<std::pair<std::string_view, Subcolumn>> sorted_src_subcolumn_for_sparse_column;
auto it = src_path_and_subcoumn_for_sparse_column.begin();
auto end = src_path_and_subcoumn_for_sparse_column.end();
while (it != end) {
sorted_src_subcolumn_for_sparse_column.emplace_back(it->first, it->second);
++it;
}
insert_from_sparse_column_and_fill_remaing_dense_column(
src_object, std::move(sorted_src_subcolumn_for_sparse_column), start, length);
num_rows += length;
// finalize();
ENABLE_CHECK_CONSISTENCY(this);
}
// std::map<std::string_view, Subcolumn>
void ColumnVariant::insert_from_sparse_column_and_fill_remaing_dense_column(
const ColumnVariant& src,
std::vector<std::pair<std::string_view, Subcolumn>>&&
sorted_src_subcolumn_for_sparse_column,
size_t start, size_t length) {
/// Check if src object doesn't have any paths in serialized sparse column.
const auto& src_serialized_sparse_column_offsets = src.serialized_sparse_column_offsets();
if (src_serialized_sparse_column_offsets[start - 1] ==
src_serialized_sparse_column_offsets[start + length - 1]) {
size_t current_size = num_rows;
/// If no src subcolumns should be inserted into sparse column, insert defaults.
if (sorted_src_subcolumn_for_sparse_column.empty()) {
serialized_sparse_column->resize(num_rows + length);
} else {
// Otherwise insert required src dense columns into sparse column.
auto [sparse_column_keys, sparse_column_values] = get_sparse_data_paths_and_values();
auto& sparse_column_offsets = serialized_sparse_column_offsets();
for (size_t i = start; i != start + length; ++i) {
// Paths in sorted_src_subcolumn_for_sparse_column are already sorted.
for (auto& [path, subcolumn] : sorted_src_subcolumn_for_sparse_column) {
subcolumn.serialize_to_sparse_column(sparse_column_keys, path,
sparse_column_values, i);
}
// TODO add dcheck
sparse_column_offsets.push_back(sparse_column_keys->size());
}
}
// Insert default values in all remaining dense columns.
for (const auto& entry : subcolumns) {
if (entry->data.size() == current_size) {
entry->data.insert_many_defaults(length);
}
}
return;
}
// Src object column contains some paths in serialized sparse column in specified range.
// Iterate over this range and insert all required paths into serialized sparse column or subcolumns.
const auto& [src_sparse_column_path, src_sparse_column_values] =
src.get_sparse_data_paths_and_values();
auto [sparse_column_path, sparse_column_values] = get_sparse_data_paths_and_values();
auto& sparse_column_offsets = serialized_sparse_column_offsets();
for (size_t row = start; row != start + length; ++row) {
size_t current_size = sparse_column_offsets.size();
// Use separate index to iterate over sorted sorted_src_subcolumn_for_sparse_column.
size_t sorted_src_subcolumn_for_sparse_column_idx = 0;
size_t sorted_src_subcolumn_for_sparse_column_size =
sorted_src_subcolumn_for_sparse_column.size();
size_t offset = src_serialized_sparse_column_offsets[row - 1];
size_t end = src_serialized_sparse_column_offsets[row];
// Iterator over [path, binary value]
for (size_t i = offset; i != end; ++i) {
const StringRef src_sparse_path_string = src_sparse_column_path->get_data_at(i);
const std::string_view src_sparse_path(src_sparse_path_string);
// Check if we have this path in subcolumns.
const PathInData column_path(src_sparse_path);
if (auto* subcolumn = get_subcolumn(column_path); subcolumn != nullptr) {
// Deserialize binary value into subcolumn from src serialized sparse column data.
const auto& data =
ColumnVariant::deserialize_from_sparse_column(src_sparse_column_values, i);
subcolumn->insert(data.first, data.second);
} else {
// Before inserting this path into sparse column check if we need to
// insert subcolumns from sorted_src_subcolumn_for_sparse_column before.
while (sorted_src_subcolumn_for_sparse_column_idx <
sorted_src_subcolumn_for_sparse_column_size &&
sorted_src_subcolumn_for_sparse_column
[sorted_src_subcolumn_for_sparse_column_idx]
.first < src_sparse_path) {
auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column
[sorted_src_subcolumn_for_sparse_column_idx++];
src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path,
sparse_column_values, row);
}
/// Insert path and value from src sparse column to our sparse column.
sparse_column_path->insert_from(*src_sparse_column_path, i);
sparse_column_values->insert_from(*src_sparse_column_values, i);
}
}
// Insert remaining dynamic paths from src_dynamic_paths_for_sparse_data.
while (sorted_src_subcolumn_for_sparse_column_idx <
sorted_src_subcolumn_for_sparse_column_size) {
auto& [src_path, src_subcolumn] = sorted_src_subcolumn_for_sparse_column
[sorted_src_subcolumn_for_sparse_column_idx++];
src_subcolumn.serialize_to_sparse_column(sparse_column_path, src_path,
sparse_column_values, row);
}
// All the sparse columns in this row are null.
sparse_column_offsets.push_back(sparse_column_path->size());
// Insert default values in all remaining dense columns.
for (const auto& entry : subcolumns) {
if (entry->data.size() == current_size) {
entry->data.insert_default();
}
}
}
}
MutableColumnPtr ColumnVariant::permute(const Permutation& perm, size_t limit) const {
if (limit == 0) {
limit = num_rows;
} else {
limit = std::min(num_rows, limit);
}
if (perm.size() < limit) {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Size of permutation ({}) is less than required ({})", perm.size(),
limit);
__builtin_unreachable();
}
if (limit == 0) {
return ColumnVariant::create(_max_subcolumns_count);
}
return apply_for_columns([&](const ColumnPtr column) { return column->permute(perm, limit); });
}
void ColumnVariant::pop_back(size_t length) {
for (auto& entry : subcolumns) {
entry->data.pop_back(length);
}
serialized_sparse_column->pop_back(length);
num_rows -= length;
ENABLE_CHECK_CONSISTENCY(this);
}
const ColumnVariant::Subcolumn* ColumnVariant::get_subcolumn(const PathInData& key) const {
const auto* node = subcolumns.find_leaf(key);
if (node == nullptr) {
VLOG_DEBUG << "There is no subcolumn " << key.get_path();
return nullptr;
}
return &node->data;
}
size_t ColumnVariant::Subcolumn::serialize_text_json(size_t n, BufferWritable& output,
DataTypeSerDe::FormatOptions opt) const {
if (least_common_type.get_base_type_id() == PrimitiveType::INVALID_TYPE) {
output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(),
DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size());
return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size();
}
size_t ind = n;
if (ind < num_of_defaults_in_prefix) {
output.write(DataTypeSerDe::NULL_IN_COMPLEX_TYPE.data(),
DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size());
return DataTypeSerDe::NULL_IN_COMPLEX_TYPE.size();
}
ind -= num_of_defaults_in_prefix;
for (size_t i = 0; i < data.size(); ++i) {
const auto& part = data[i];
const auto& part_type_serde = data_serdes[i];
if (ind < part->size()) {
return part_type_serde->serialize_one_cell_to_json(*part, ind, output, opt);
}
ind -= part->size();
}
throw doris::Exception(ErrorCode::OUT_OF_BOUND,
"Index ({}) for serializing JSON is out of range", n);
}
const ColumnVariant::Subcolumn* ColumnVariant::get_subcolumn_with_cache(const PathInData& key,
size_t key_index) const {
// Optimization by caching the order of fields (which is almost always the same)
// and a quick check to match the next expected field, instead of searching the hash table.
if (_prev_positions.size() > key_index && _prev_positions[key_index].second != nullptr &&
key == _prev_positions[key_index].first) {
return _prev_positions[key_index].second;
}
const auto* subcolumn = get_subcolumn(key);
if (key_index >= _prev_positions.size()) {
_prev_positions.resize(key_index + 1);
}
if (subcolumn != nullptr) {
_prev_positions[key_index] = std::make_pair(key, subcolumn);
}
return subcolumn;
}
ColumnVariant::Subcolumn* ColumnVariant::get_subcolumn(const PathInData& key, size_t key_index) {
return const_cast<ColumnVariant::Subcolumn*>(get_subcolumn_with_cache(key, key_index));
}
const ColumnVariant::Subcolumn* ColumnVariant::get_subcolumn(const PathInData& key,
size_t key_index) const {
return get_subcolumn_with_cache(key, key_index);
}
ColumnVariant::Subcolumn* ColumnVariant::get_subcolumn(const PathInData& key) {
const auto* node = subcolumns.find_leaf(key);
if (node == nullptr) {
VLOG_DEBUG << "There is no subcolumn " << key.get_path();
return nullptr;
}
return &const_cast<Subcolumns::Node*>(node)->data;
}
bool ColumnVariant::has_subcolumn(const PathInData& key) const {
return subcolumns.find_leaf(key) != nullptr;
}
bool ColumnVariant::add_sub_column(const PathInData& key, MutableColumnPtr&& subcolumn,
DataTypePtr type) {
size_t new_size = subcolumn->size();
if (key.empty() && subcolumns.empty()) {
// create root
subcolumns.create_root(Subcolumn(std::move(subcolumn), type, is_nullable, true));
num_rows = new_size;
return true;
}
if (key.empty() && (!subcolumns.get_root()->is_scalar() || is_null_root() ||
subcolumns.get_root()->data.get_least_common_type()->get_primitive_type() ==
INVALID_TYPE)) {
bool root_it_scalar = subcolumns.get_root()->is_scalar();
// update root to scalar
subcolumns.get_mutable_root()->modify_to_scalar(
Subcolumn(std::move(subcolumn), type, is_nullable, true));
if (!root_it_scalar) {
subcolumns.add_leaf(subcolumns.get_root_ptr());
}
if (num_rows == 0) {
num_rows = new_size;
}
return true;
}
bool inserted = subcolumns.add(key, Subcolumn(std::move(subcolumn), type, is_nullable));
if (!inserted) {
VLOG_DEBUG << "Duplicated sub column " << key.get_path();
return false;
}
if (num_rows == 0) {
num_rows = new_size;
} else if (new_size != num_rows) {
VLOG_DEBUG << "Size of subcolumn is in consistent with column";
return false;
}
if (key.get_is_typed()) {
typed_path_count++;
}
if (key.has_nested_part()) {
nested_path_count++;
}
return true;
}
bool ColumnVariant::add_sub_column(const PathInData& key, size_t new_size) {
if (key.empty() && subcolumns.empty()) {
// create root
subcolumns.create_root(Subcolumn(new_size, is_nullable, true));
num_rows = new_size;
return true;
}
if (key.empty() && (!subcolumns.get_root()->is_scalar())) {
// update none scalar root column to scalar node
subcolumns.get_mutable_root()->modify_to_scalar(Subcolumn(new_size, is_nullable, true));
subcolumns.add_leaf(subcolumns.get_root_ptr());
if (num_rows == 0) {
num_rows = new_size;
}
return true;
}
bool inserted = subcolumns.add(key, Subcolumn(new_size, is_nullable));
if (!inserted) {
VLOG_DEBUG << "Duplicated sub column " << key.get_path();
return false;
}
if (num_rows == 0) {
num_rows = new_size;
} else if (new_size != num_rows) {
VLOG_DEBUG << "Size of subcolumn is in consistent with column";
return false;
}
if (key.get_is_typed()) {
typed_path_count++;
}
if (key.has_nested_part()) {
nested_path_count++;
}
return true;
}
bool ColumnVariant::is_finalized() const {
return std::all_of(subcolumns.begin(), subcolumns.end(),
[](const auto& entry) { return entry->data.is_finalized(); });
}
void ColumnVariant::Subcolumn::wrapp_array_nullable() {
// Wrap array with nullable, treat empty array as null to elimate conflict at present
auto& result_column = get_finalized_column_ptr();
if (is_column<vectorized::ColumnArray>(result_column.get()) && !result_column->is_nullable()) {
auto new_null_map = ColumnUInt8::create();
new_null_map->reserve(result_column->size());
auto& null_map_data = new_null_map->get_data();
const auto* array = static_cast<const ColumnArray*>(result_column.get());
for (size_t i = 0; i < array->size(); ++i) {
null_map_data.push_back(array->is_default_at(i));
}
result_column = ColumnNullable::create(std::move(result_column), std::move(new_null_map));
data_types[0] = make_nullable(data_types[0]);
least_common_type = LeastCommonType {data_types[0], is_root};
}
}
void ColumnVariant::serialize_one_row_to_string(int64_t row, std::string* output) const {
auto tmp_col = ColumnString::create();
VectorBufferWriter write_buffer(*tmp_col.get());
if (is_scalar_variant()) {
subcolumns.get_root()->data.serialize_text_json(row, write_buffer);
} else {
// TODO preallocate memory
serialize_one_row_to_json_format(row, write_buffer, nullptr);
}
write_buffer.commit();
auto str_ref = tmp_col->get_data_at(0);
*output = std::string(str_ref.data, str_ref.size);
}
void ColumnVariant::serialize_one_row_to_string(int64_t row, BufferWritable& output) const {
if (is_scalar_variant()) {
subcolumns.get_root()->data.serialize_text_json(row, output);
return;
}
serialize_one_row_to_json_format(row, output, nullptr);
return;
}
/// Struct that represents elements of the JSON path.
/// "a.b.c" -> ["a", "b", "c"]
struct PathElements {
explicit PathElements(const String& path) {
const char* start = path.data();
const char* end = start + path.size();
const char* pos = start;
const char* last_dot_pos = pos - 1;
for (pos = start; pos != end; ++pos) {
if (*pos == '.') {
elements.emplace_back(last_dot_pos + 1, size_t(pos - last_dot_pos - 1));
last_dot_pos = pos;
}
}
elements.emplace_back(last_dot_pos + 1, size_t(pos - last_dot_pos - 1));
}
size_t size() const { return elements.size(); }
std::vector<std::string_view> elements;
};
/// Struct that represents a prefix of a JSON path. Used during output of the JSON object.
struct Prefix {
/// Shrink current prefix to the common prefix of current prefix and specified path.
/// For example, if current prefix is a.b.c.d and path is a.b.e, then shrink the prefix to a.b.
void shrink_to_common_prefix(const PathElements& path_elements) {
/// Don't include last element in path_elements in the prefix.
size_t i = 0;
while (i != elements.size() && i != (path_elements.elements.size() - 1) &&
elements[i].first == path_elements.elements[i]) {
++i;
}
elements.resize(i);
}
/// Check is_first flag in current object.
bool is_first_in_current_object() const {
if (elements.empty()) {
return root_is_first_flag;
}
return elements.back().second;
}
/// Set flag is_first = false in current object.
void set_not_first_in_current_object() {
if (elements.empty()) {
root_is_first_flag = false;
} else {
elements.back().second = false;
}
}
size_t size() const { return elements.size(); }
/// Elements of the prefix: (path element, is_first flag in this prefix).
/// is_first flag indicates if we already serialized some key in the object with such prefix.
std::vector<std::pair<std::string_view, bool>> elements;
bool root_is_first_flag = true;
};
// skip empty nested json:
// 1. nested array with only nulls, eg. [null. null],todo: think a better way to deal distinguish array null value and real null value.
// 2. type is nothing
bool ColumnVariant::Subcolumn::is_empty_nested(size_t row) const {
PrimitiveType base_type_id = least_common_type.get_base_type_id();
const DataTypePtr& type = least_common_type.get();
// check if it is empty nested json array, then skip
if (base_type_id == PrimitiveType::TYPE_VARIANT) {
DCHECK(type->equals(*ColumnVariant::NESTED_TYPE));
FieldWithDataType field;
get(row, field);
if (field.field->get_type() == PrimitiveType::TYPE_ARRAY) {
const auto& array = field.field->get<Array>();
bool only_nulls_inside = true;
for (const auto& elem : array) {
if (elem.get_type() != PrimitiveType::TYPE_NULL) {
only_nulls_inside = false;
break;
}
}
// if only nulls then skip
return only_nulls_inside;
}
}
// skip nothing type
if (base_type_id == PrimitiveType::INVALID_TYPE) {
return true;
}
return false;
}
bool ColumnVariant::is_visible_root_value(size_t nrow) const {
if (is_null_root()) {
return false;
}
const auto* root = subcolumns.get_root();
if (root->data.is_null_at(nrow)) {
return false;
}
if (root->data.least_common_type.get_base_type_id() == PrimitiveType::TYPE_VARIANT) {
// nested field
return !root->data.is_empty_nested(nrow);
}
for (const auto& subcolumn : subcolumns) {
if (subcolumn->data.is_root) {
continue; // Skip the root column
}
// If any non-root subcolumn is NOT null, set serialize_root to false and exit early
if (!subcolumn->data.is_null_at(nrow)) {
return false;
}
}
size_t ind = nrow - root->data.num_of_defaults_in_prefix;
// null value as empty json, todo: think a better way to disinguish empty json and null json.
for (const auto& part : root->data.data) {
if (ind < part->size()) {
return !part->get_data_at(ind).empty();
}
ind -= part->size();
}
throw doris::Exception(ErrorCode::OUT_OF_BOUND, "Index ({}) for getting field is out of range",
nrow);
}
void ColumnVariant::serialize_one_row_to_json_format(int64_t row_num, BufferWritable& output,
bool* is_null) const {
// root is not eighther null or empty, we should only process root value
if (is_visible_root_value(row_num)) {
subcolumns.get_root()->data.serialize_text_json(row_num, output);
return;
}
const auto& column_map = assert_cast<const ColumnMap&>(*serialized_sparse_column);
const auto& sparse_data_offsets = column_map.get_offsets();
const auto [sparse_data_paths, sparse_data_values] = get_sparse_data_paths_and_values();
size_t sparse_data_offset = sparse_data_offsets[static_cast<ssize_t>(row_num) - 1];
size_t sparse_data_end = sparse_data_offsets[static_cast<ssize_t>(row_num)];
// We need to convert the set of paths in this row to a JSON object.
// To do it, we first collect all the paths from current row, then we sort them
// and construct the resulting JSON object by iterating over sorted list of paths.
// For example:
// b.c, a.b, a.a, b.e, g, h.u.t -> a.a, a.b, b.c, b.e, g, h.u.t -> {"a" : {"a" : ..., "b" : ...}, "b" : {"c" : ..., "e" : ...}, "g" : ..., "h" : {"u" : {"t" : ...}}}.
std::vector<String> sorted_paths;
std::unordered_map<std::string, Subcolumn> subcolumn_path_map;
sorted_paths.reserve(get_subcolumns().size() + (sparse_data_end - sparse_data_offset));
for (const auto& subcolumn : get_subcolumns()) {
// Skip root value, we have already processed it
if (subcolumn->data.is_root) {
continue;
}
// skip empty nested value
if (subcolumn->data.is_empty_nested(row_num)) {
continue;
}
/// We consider null value and absence of the path in a row as equivalent cases, because we cannot actually distinguish them.
/// So, we don't output null values at all.
if (!subcolumn->data.is_null_at(row_num)) {
sorted_paths.emplace_back(subcolumn->path.get_path());
}
subcolumn_path_map.emplace(subcolumn->path.get_path(), subcolumn->data);
}
for (size_t i = sparse_data_offset; i != sparse_data_end; ++i) {
auto path = sparse_data_paths->get_data_at(i).to_string();
sorted_paths.emplace_back(path);
}
std::sort(sorted_paths.begin(), sorted_paths.end());
writeChar('{', output);
size_t index_in_sparse_data_values = sparse_data_offset;
// current_prefix represents the path of the object we are currently serializing keys in.
Prefix current_prefix;
for (const auto& path : sorted_paths) {
PathElements path_elements(path);
// Change prefix to common prefix between current prefix and current path.
// If prefix changed (it can only decrease), close all finished objects.
// For example:
// Current prefix: a.b.c.d
// Current path: a.b.e.f
// It means now we have : {..., "a" : {"b" : {"c" : {"d" : ...
// Common prefix will be a.b, so it means we should close objects a.b.c.d and a.b.c: {..., "a" : {"b" : {"c" : {"d" : ...}}
// and continue serializing keys in object a.b
size_t prev_prefix_size = current_prefix.size();
current_prefix.shrink_to_common_prefix(path_elements);
size_t prefix_size = current_prefix.size();
if (prefix_size != prev_prefix_size) {
size_t objects_to_close = prev_prefix_size - prefix_size;
for (size_t i = 0; i != objects_to_close; ++i) {
writeChar('}', output);
}
}
// Now we are inside object that has common prefix with current path.
// We should go inside all objects in current path.
// From the example above we should open object a.b.e:
// {..., "a" : {"b" : {"c" : {"d" : ...}}, "e" : {
if (prefix_size + 1 < path_elements.size()) {
for (size_t i = prefix_size; i != path_elements.size() - 1; ++i) {
/// Write comma before the key if it's not the first key in this prefix.
if (!current_prefix.is_first_in_current_object()) {
writeChar(',', output);
} else {
current_prefix.set_not_first_in_current_object();
}
writeJSONString(path_elements.elements[i], output);
writeCString(":{", output);
// Update current prefix.
current_prefix.elements.emplace_back(path_elements.elements[i], true);
}
}
// Write comma before the key if it's not the first key in this prefix.
if (!current_prefix.is_first_in_current_object()) {
writeChar(',', output);
} else {
current_prefix.set_not_first_in_current_object();
}
writeJSONString(path_elements.elements.back(), output);
writeCString(":", output);
// Serialize value of current path.
if (auto subcolumn_it = subcolumn_path_map.find(path);
subcolumn_it != subcolumn_path_map.end()) {
DataTypeSerDe::FormatOptions options;
options.escape_char = '\\';
subcolumn_it->second.serialize_text_json(row_num, output, options);
} else {
// To serialize value stored in shared data we should first deserialize it from binary format.
Subcolumn tmp_subcolumn(0, true);
const auto& data = ColumnVariant::deserialize_from_sparse_column(
sparse_data_values, index_in_sparse_data_values++);
tmp_subcolumn.insert(data.first, data.second);
DataTypeSerDe::FormatOptions options;
options.escape_char = '\\';
tmp_subcolumn.serialize_text_json(0, output, options);
}
}
// Close all remaining open objects.
for (size_t i = 0; i != current_prefix.elements.size(); ++i) {
writeChar('}', output);
}
writeChar('}', output);
}
size_t ColumnVariant::Subcolumn::get_non_null_value_size() const {
size_t res = 0;
for (const auto& part : data) {
const auto& null_data = assert_cast<const ColumnNullable&>(*part).get_null_map_data();
res += simd::count_zero_num((int8_t*)null_data.data(), null_data.size());
}
return res;
}
Status ColumnVariant::serialize_sparse_columns(
std::map<std::string_view, Subcolumn>&& remaing_subcolumns) {
CHECK(is_finalized());
clear_sparse_column();
if (remaing_subcolumns.empty()) {
serialized_sparse_column->resize(num_rows);
return Status::OK();
}
serialized_sparse_column->reserve(num_rows);
auto [sparse_column_keys, sparse_column_values] = get_sparse_data_paths_and_values();
auto& sparse_column_offsets = serialized_sparse_column_offsets();
// Fill the column map for each row
for (size_t i = 0; i < num_rows; ++i) {
for (auto& [path, subcolumn] : remaing_subcolumns) {
subcolumn.serialize_to_sparse_column(sparse_column_keys, path, sparse_column_values, i);
}
// TODO add dcheck
sparse_column_offsets.push_back(sparse_column_keys->size());
}
CHECK_EQ(serialized_sparse_column->size(), num_rows);
return Status::OK();
}
void ColumnVariant::unnest(Subcolumns::NodePtr& entry, Subcolumns& subcolumns) const {
entry->data.finalize();
auto nested_column = entry->data.get_finalized_column_ptr()->assume_mutable();
auto* nested_column_nullable = assert_cast<ColumnNullable*>(nested_column.get());
auto* nested_column_array =
assert_cast<ColumnArray*>(nested_column_nullable->get_nested_column_ptr().get());
auto& offset = nested_column_array->get_offsets_ptr();
auto* nested_object_nullable = assert_cast<ColumnNullable*>(
nested_column_array->get_data_ptr()->assume_mutable().get());
auto& nested_object_column =
assert_cast<ColumnVariant&>(nested_object_nullable->get_nested_column());
PathInData nested_path = entry->path;
for (auto& nested_entry : nested_object_column.subcolumns) {
if (nested_entry->data.least_common_type.get_base_type_id() ==
PrimitiveType::INVALID_TYPE) {
continue;
}
nested_entry->data.finalize();
PathInDataBuilder path_builder;
// format nested path
path_builder.append(nested_path.get_parts(), false);
path_builder.append(nested_entry->path.get_parts(), true);
auto subnested_column = ColumnArray::create(
ColumnNullable::create(nested_entry->data.get_finalized_column_ptr(),
nested_object_nullable->get_null_map_column_ptr()),
offset);
auto nullable_subnested_column = ColumnNullable::create(
std::move(subnested_column), nested_column_nullable->get_null_map_column_ptr());
auto type = make_nullable(
std::make_shared<DataTypeArray>(nested_entry->data.least_common_type.get()));
Subcolumn subcolumn(nullable_subnested_column->assume_mutable(), type, is_nullable);
subcolumns.add(path_builder.build(), subcolumn);
}
}
void ColumnVariant::clear_sparse_column() {
#ifndef NDEBUG
auto& sparse_column_offsets = serialized_sparse_column_offsets();
if (sparse_column_offsets[num_rows - 1] != 0) {
throw Exception(ErrorCode::INTERNAL_ERROR, "sprase column has nonnull value");
}
#endif
serialized_sparse_column->clear();
}
void ColumnVariant::finalize(FinalizeMode mode) {
if (is_finalized() && mode == FinalizeMode::READ_MODE) {
_prev_positions.clear();
ENABLE_CHECK_CONSISTENCY(this);
return;
}
Subcolumns new_subcolumns;
if (auto* root = subcolumns.get_mutable_root(); root == nullptr) {
CHECK(false);
} else {
root->data.finalize(mode);
new_subcolumns.create_root(root->data);
}
// finalize all subcolumns
for (auto&& entry : subcolumns) {
if (entry->data.is_root) {
continue;
}
const auto& least_common_type = entry->data.get_least_common_type();
/// Do not add subcolumns, which consists only from NULLs
if (get_base_type_of_array(least_common_type)->get_primitive_type() ==
PrimitiveType::INVALID_TYPE) {
continue;
}
// unnest all nested columns, add them to new_subcolumns
if (mode == FinalizeMode::WRITE_MODE && least_common_type->equals(*NESTED_TYPE)) {
// reset counter
unnest(entry, new_subcolumns);
continue;
}
entry->data.finalize(mode);
entry->data.wrapp_array_nullable();
new_subcolumns.add(entry->path, entry->data);
}
// after unnest need to recalculate nested_path_count in new_subcolumns
nested_path_count =
std::count_if(new_subcolumns.begin(), new_subcolumns.end(),
[](const auto& entry) { return entry->path.has_nested_part(); });
std::swap(subcolumns, new_subcolumns);
_prev_positions.clear();
ENABLE_CHECK_CONSISTENCY(this);
}
Status ColumnVariant::convert_typed_path_to_storage_type(
const std::unordered_map<std::string, TabletSchema::SubColumnInfo>& typed_paths) {
for (auto&& entry : subcolumns) {
if (auto it = typed_paths.find(entry->path.get_path()); it != typed_paths.end()) {
CHECK(entry->data.is_finalized());
vectorized::DataTypePtr storage_type =
vectorized::DataTypeFactory::instance().create_data_type(it->second.column);
vectorized::DataTypePtr finalized_type = entry->data.get_least_common_type();
auto current_column = entry->data.get_finalized_column_ptr()->get_ptr();
if (!storage_type->equals(*finalized_type)) {
RETURN_IF_ERROR(vectorized::schema_util::cast_column(
{current_column, finalized_type, ""}, storage_type, &current_column));
}
VLOG_DEBUG << "convert " << entry->path.get_path() << " from type"
<< entry->data.get_least_common_type()->get_name() << " to "
<< storage_type->get_name();
entry->data.data[0] = current_column;
entry->data.data_types[0] = storage_type;
entry->data.data_serdes[0] = Subcolumn::generate_data_serdes(storage_type, false);
entry->data.least_common_type = Subcolumn::LeastCommonType {storage_type, false};
}
}
return Status::OK();
}
Status ColumnVariant::pick_subcolumns_to_sparse_column(
const std::unordered_map<std::string, TabletSchema::SubColumnInfo>& typed_paths,
bool variant_enable_typed_paths_to_sparse) {
DCHECK(_max_subcolumns_count >= 0) << "max subcolumns count is: " << _max_subcolumns_count;
// no need to pick subcolumns to sparse column, all subcolumns will be picked
if (_max_subcolumns_count == 0) {
return Status::OK();
}
// root column must be exsit in subcolumns
// nested path count is the count of nested columns
int64_t current_subcolumns_count = subcolumns.size() - 1 - nested_path_count;
// 1000 count
// b : 1500 typed path + 700 subcolumns -> 1200 count ()
if (!variant_enable_typed_paths_to_sparse) {
current_subcolumns_count -= typed_paths.size();
}
bool need_pick_subcolumn_to_sparse_column = current_subcolumns_count > _max_subcolumns_count;
if (!need_pick_subcolumn_to_sparse_column) {
return Status::OK();
}
Subcolumns new_subcolumns;
if (auto* root = subcolumns.get_mutable_root(); root == nullptr) {
CHECK(false);
} else {
new_subcolumns.create_root(root->data);
}
// picked to sparse columns
std::set<std::string_view> selected_path;
// pick subcolumns sort by size of none null values
std::unordered_map<std::string_view, size_t> none_null_value_sizes;
// 1. get the none null value sizes
for (auto&& entry : subcolumns) {
// root column is already in the subcolumns
if (entry->data.is_root) {
continue;
}
if ((!variant_enable_typed_paths_to_sparse &&
typed_paths.find(entry->path.get_path()) != typed_paths.end()) ||
entry->path.has_nested_part()) {
VLOG_DEBUG << "pick " << entry->path.get_path() << " as typed column";
new_subcolumns.add(entry->path, entry->data);
continue;
}
size_t size = entry->data.get_non_null_value_size();
if (size == 0) {
continue;
}
none_null_value_sizes[entry->path.get_path()] = size;
}
// 2. sort by the size
std::vector<std::pair<std::string_view, size_t>> sorted_by_size(none_null_value_sizes.begin(),
none_null_value_sizes.end());
std::sort(sorted_by_size.begin(), sorted_by_size.end(),
[](const auto& a, const auto& b) { return a.second > b.second; });
// 3. pick config::variant_max_subcolumns_count selected subcolumns
for (size_t i = 0; i < std::min(size_t(_max_subcolumns_count), sorted_by_size.size()); ++i) {
selected_path.insert(sorted_by_size[i].first);
}
std::map<std::string_view, Subcolumn> remaing_subcolumns;
// add selected subcolumns to new_subcolumns, otherwise add to remaining_subcolumns
for (auto&& entry : subcolumns) {
if (entry->data.is_root) {
continue;
}
if (selected_path.find(entry->path.get_path()) != selected_path.end()) {
new_subcolumns.add(entry->path, entry->data);
VLOG_DEBUG << "pick " << entry->path.get_path() << " as sub column";
} else if (none_null_value_sizes.find(entry->path.get_path()) !=
none_null_value_sizes.end()) {
VLOG_DEBUG << "pick " << entry->path.get_path() << " as sparse column";
CHECK(!entry->path.has_nested_part());
remaing_subcolumns.emplace(entry->path.get_path(), entry->data);
}
}
ENABLE_CHECK_CONSISTENCY(this);
RETURN_IF_ERROR(serialize_sparse_columns(std::move(remaing_subcolumns)));
std::swap(subcolumns, new_subcolumns);
ENABLE_CHECK_CONSISTENCY(this);
return Status::OK();
}
void ColumnVariant::finalize() {
static_cast<void>(finalize(FinalizeMode::READ_MODE));
}
void ColumnVariant::ensure_root_node_type(const DataTypePtr& expected_root_type) {
auto& root = subcolumns.get_mutable_root()->data;
if (!root.get_least_common_type()->equals(*expected_root_type)) {
// make sure the root type is alawys as expected
ColumnPtr casted_column;
static_cast<void>(
schema_util::cast_column(ColumnWithTypeAndName {root.get_finalized_column_ptr(),
root.get_least_common_type(), ""},
expected_root_type, &casted_column));
root.data[0] = casted_column;
root.data_types[0] = expected_root_type;
root.least_common_type = Subcolumn::LeastCommonType {expected_root_type, true};
root.num_rows = casted_column->size();
}
}
bool ColumnVariant::empty() const {
return subcolumns.empty() || subcolumns.begin()->get()->path.get_path() == COLUMN_NAME_DUMMY;
}
ColumnPtr ColumnVariant::filter(const Filter& filter, ssize_t count) const {
if (!is_finalized()) {
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnVariant&>(*finalized);
return finalized_object.filter(filter, count);
}
if (num_rows == 0) {
auto res = ColumnVariant::create(_max_subcolumns_count, count_bytes_in_filter(filter));
ENABLE_CHECK_CONSISTENCY(res.get());
return res;
}
auto new_root = get_root()->filter(filter, count)->assume_mutable();
auto new_column =
ColumnVariant::create(_max_subcolumns_count, get_root_type(), std::move(new_root));
for (const auto& entry : subcolumns) {
if (entry->data.is_root) {
continue;
}
auto subcolumn = entry->data.get_finalized_column().filter(filter, -1);
new_column->add_sub_column(entry->path, subcolumn->assume_mutable(),
entry->data.get_least_common_type());
}
new_column->serialized_sparse_column = serialized_sparse_column->filter(filter, count);
ENABLE_CHECK_CONSISTENCY(new_column.get());
return new_column;
}
ColumnPtr ColumnVariant::replicate(const IColumn::Offsets& offsets) const {
column_match_offsets_size(num_rows, offsets.size());
if (num_rows == 0) {
return ColumnVariant::create(_max_subcolumns_count);
}
return apply_for_columns([&](const ColumnPtr column) { return column->replicate(offsets); });
}
size_t ColumnVariant::filter(const Filter& filter) {
if (!is_finalized()) {
finalize();
}
size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
if (count == 0) {
clear();
} else {
for (auto& subcolumn : subcolumns) {
subcolumn->data.num_rows = count;
}
for_each_subcolumn([&](auto& part) {
if (part->size() != count) {
if (part->is_exclusive()) {
const auto result_size = part->filter(filter);
if (result_size != count) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"result_size not euqal with filter_size, result_size={}, "
"filter_size={}",
result_size, count);
}
CHECK_EQ(result_size, count);
} else {
part = part->filter(filter, count);
}
}
});
}
num_rows = count;
ENABLE_CHECK_CONSISTENCY(this);
return count;
}
void ColumnVariant::clear_column_data() {
for (auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
DCHECK_EQ(part->use_count(), 1);
(*std::move(part)).clear();
}
entry->data.num_of_defaults_in_prefix = 0;
entry->data.current_num_of_defaults = 0;
entry->data.num_rows = 0;
}
serialized_sparse_column->clear();
num_rows = 0;
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::clear() {
Subcolumns empty;
// we must keep root column exist
empty.create_root(Subcolumn(0, is_nullable, true));
std::swap(empty, subcolumns);
serialized_sparse_column->clear();
num_rows = 0;
_prev_positions.clear();
ENABLE_CHECK_CONSISTENCY(this);
}
void ColumnVariant::create_root(const DataTypePtr& type, MutableColumnPtr&& column) {
if (num_rows == 0) {
num_rows = column->size();
}
add_sub_column({}, std::move(column), type);
if (serialized_sparse_column->empty()) {
serialized_sparse_column->resize(num_rows);
}
ENABLE_CHECK_CONSISTENCY(this);
}
const DataTypePtr& ColumnVariant::get_most_common_type() {
static auto type = make_nullable(std::make_shared<MostCommonType>());
return type;
}
bool ColumnVariant::is_null_root() const {
const auto* root = subcolumns.get_root();
if (root == nullptr) {
return true;
}
if (root->data.num_of_defaults_in_prefix == 0 &&
(root->data.data.empty() ||
root->data.get_least_common_type()->get_primitive_type() == PrimitiveType::INVALID_TYPE)) {
return true;
}
return false;
}
bool ColumnVariant::is_scalar_variant() const {
const auto& sparse_offsets = serialized_sparse_column_offsets().data();
// Only root itself is scalar, and no sparse data
return !is_null_root() && subcolumns.get_leaves().size() == 1 &&
subcolumns.get_root()->is_scalar() &&
sparse_offsets[num_rows - 1] == 0; // no sparse data
}
const DataTypePtr ColumnVariant::NESTED_TYPE = std::make_shared<vectorized::DataTypeNullable>(
std::make_shared<vectorized::DataTypeArray>(std::make_shared<vectorized::DataTypeNullable>(
std::make_shared<vectorized::DataTypeVariant>(0))));
const DataTypePtr ColumnVariant::NESTED_TYPE_AS_ARRAY_OF_JSONB =
std::make_shared<vectorized::DataTypeArray>(std::make_shared<vectorized::DataTypeNullable>(
std::make_shared<vectorized::DataTypeJsonb>()));
DataTypePtr ColumnVariant::get_root_type() const {
return subcolumns.get_root()->data.get_least_common_type();
}
void ColumnVariant::insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
const uint32_t* indices_end) {
// optimize when src and this column are scalar variant, since try_insert is inefficiency
const auto* src_v = check_and_get_column<ColumnVariant>(src);
bool src_can_do_quick_insert =
src_v != nullptr && src_v->is_scalar_variant() && src_v->is_finalized();
// num_rows == 0 means this column is empty, we not need to check it type
if (num_rows == 0 && src_can_do_quick_insert) {
// add a new root column, and insert from src root column
clear();
add_sub_column({}, src_v->get_root()->clone_empty(), src_v->get_root_type());
get_subcolumns().get_mutable_root()->data.num_rows = indices_end - indices_begin;
get_root()->insert_indices_from(*src_v->get_root(), indices_begin, indices_end);
serialized_sparse_column->insert_many_defaults(indices_end - indices_begin);
num_rows += indices_end - indices_begin;
} else if (src_can_do_quick_insert && is_scalar_variant() &&
src_v->get_root_type()->equals(*get_root_type())) {
get_root()->insert_indices_from(*src_v->get_root(), indices_begin, indices_end);
serialized_sparse_column->insert_many_defaults(indices_end - indices_begin);
get_subcolumns().get_mutable_root()->data.num_rows += indices_end - indices_begin;
num_rows += indices_end - indices_begin;
} else {
for (const auto* x = indices_begin; x != indices_end; ++x) {
try_insert(src[*x]);
}
}
finalize();
}
// void ColumnVariant::insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
// const uint32_t* indices_end) {
// for (const auto* x = indices_begin; x != indices_end; ++x) {
// ColumnVariant::insert_from(src, *x);
// }
// finalize();
// ENABLE_CHECK_CONSISTENCY(this);
// }
template <typename Func>
void ColumnVariant::for_each_imutable_column(Func&& callback) const {
if (!is_finalized()) {
auto finalized = clone_finalized();
auto& finalized_object = assert_cast<ColumnVariant&>(*finalized);
finalized_object.for_each_imutable_column(callback);
return;
}
for (const auto& entry : subcolumns) {
for (auto& part : entry->data.data) {
callback(part);
}
}
callback(serialized_sparse_column);
}
void ColumnVariant::update_hash_with_value(size_t n, SipHash& hash) const {
for_each_imutable_column(
[&](const ColumnPtr column) { return column->update_hash_with_value(n, hash); });
}
void ColumnVariant::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const {
for_each_imutable_column([&](const ColumnPtr column) {
return column->update_hashes_with_value(hashes, nullptr);
});
}
void ColumnVariant::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const {
for_each_imutable_column([&](const ColumnPtr column) {
return column->update_xxHash_with_value(start, end, hash, nullptr);
});
}
void ColumnVariant::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type,
uint32_t rows, uint32_t offset,
const uint8_t* __restrict null_data) const {
for_each_imutable_column([&](const ColumnPtr column) {
return column->update_crcs_with_value(hash, type, rows, offset, nullptr);
});
}
void ColumnVariant::update_crc_with_value(size_t start, size_t end, uint32_t& hash,
const uint8_t* __restrict null_data) const {
for_each_imutable_column([&](const ColumnPtr column) {
return column->update_crc_with_value(start, end, hash, nullptr);
});
}
std::string ColumnVariant::debug_string() const {
std::stringstream res;
res << get_name() << "(num_row = " << num_rows;
for (const auto& entry : subcolumns) {
if (entry->data.is_finalized()) {
res << "[column:" << entry->data.data[0]->dump_structure()
<< ",type:" << entry->data.data_types[0]->get_name()
<< ",path:" << entry->path.get_path() << "],";
}
}
res << ")";
return res.str();
}
Status ColumnVariant::sanitize() const {
RETURN_IF_CATCH_EXCEPTION(check_consistency());
std::unordered_set<std::string> subcolumn_set;
// deduplicate subcolumns, example {"a.b" : 123, "a" : {"b" : 123}}
for (const auto& subcolumn : subcolumns) {
if (subcolumn->data.is_root) {
continue;
}
if (subcolumn_set.contains(subcolumn->path.get_path())) {
return Status::InvalidJsonPath("may contains duplicated entry : {}",
subcolumn->path.get_path());
}
subcolumn_set.insert(subcolumn->path.get_path());
}
VLOG_DEBUG << "sanitized " << debug_string();
return Status::OK();
}
ColumnVariant::Subcolumn ColumnVariant::Subcolumn::cut(size_t start, size_t length) const {
Subcolumn new_subcolumn(0, is_nullable);
new_subcolumn.insert_range_from(*this, start, length);
return new_subcolumn;
}
const ColumnVariant::Subcolumns::Node* ColumnVariant::get_leaf_of_the_same_nested(
const Subcolumns::NodePtr& entry) const {
const auto* leaf = subcolumns.get_leaf_of_the_same_nested(
entry->path,
[&](const Subcolumns::Node& node) { return node.data.size() > entry->data.size(); });
if (leaf && leaf->data.get_least_common_typeBase()->get_primitive_type() == INVALID_TYPE) {
return nullptr;
}
return leaf;
}
bool ColumnVariant::try_insert_many_defaults_from_nested(const Subcolumns::NodePtr& entry) const {
const auto* leaf = get_leaf_of_the_same_nested(entry);
if (!leaf) {
return false;
}
size_t old_size = entry->data.size();
FieldInfo field_info = {
.scalar_type_id = entry->data.least_common_type.get_base_type_id(),
.num_dimensions = entry->data.get_dimensions(),
};
/// Cut the needed range from the found leaf
/// and replace scalar values to the correct
/// default values for given entry.
auto new_subcolumn = leaf->data.cut(old_size, leaf->data.size() - old_size)
.clone_with_default_values(field_info);
entry->data.insert_range_from(new_subcolumn, 0, new_subcolumn.size());
ENABLE_CHECK_CONSISTENCY(this);
return true;
}
/// Visitor that keeps @num_dimensions_to_keep dimensions in arrays
/// and replaces all scalars or nested arrays to @replacement at that level.
class FieldVisitorReplaceScalars : public StaticVisitor<Field> {
public:
FieldVisitorReplaceScalars(const Field& replacement_, size_t num_dimensions_to_keep_)
: replacement(replacement_), num_dimensions_to_keep(num_dimensions_to_keep_) {}
template <PrimitiveType T>
Field apply(const typename PrimitiveTypeTraits<T>::NearestFieldType& x) const {
if constexpr (T == TYPE_ARRAY) {
if (num_dimensions_to_keep == 0) {
return replacement;
}
const size_t size = x.size();
Array res(size);
for (size_t i = 0; i < size; ++i) {
res[i] = apply_visitor(
FieldVisitorReplaceScalars(replacement, num_dimensions_to_keep - 1), x[i]);
}
return Field::create_field<TYPE_ARRAY>(res);
} else {
return replacement;
}
}
private:
const Field& replacement;
size_t num_dimensions_to_keep;
};
bool ColumnVariant::try_insert_default_from_nested(const Subcolumns::NodePtr& entry) const {
const auto* leaf = get_leaf_of_the_same_nested(entry);
if (!leaf) {
return false;
}
auto last_field = leaf->data.get_last_field();
if (last_field.is_null()) {
return false;
}
size_t leaf_num_dimensions = leaf->data.get_dimensions();
size_t entry_num_dimensions = entry->data.get_dimensions();
auto default_scalar = entry_num_dimensions > leaf_num_dimensions
? Field::create_field<TYPE_ARRAY>(create_empty_array_field(
entry_num_dimensions - leaf_num_dimensions))
: entry->data.get_least_common_type()->get_default();
auto default_field = apply_visitor(
FieldVisitorReplaceScalars(default_scalar, leaf_num_dimensions), last_field);
entry->data.insert(std::move(default_field));
return true;
}
size_t ColumnVariant::find_path_lower_bound_in_sparse_data(StringRef path,
const ColumnString& sparse_data_paths,
size_t start, size_t end) {
// Simple random access iterator over values in ColumnString in specified range.
class Iterator {
public:
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-local-typedefs"
using difference_type = std::ptrdiff_t;
using value_type = StringRef;
using iterator_category = std::random_access_iterator_tag;
using pointer = StringRef*;
using reference = StringRef&;
#pragma GCC diagnostic pop
Iterator() = delete;
Iterator(const ColumnString* data_, size_t index_) : data(data_), index(index_) {}
Iterator(const Iterator& rhs) = default;
Iterator& operator=(const Iterator& rhs) = default;
inline Iterator& operator+=(difference_type rhs) {
index += rhs;
return *this;
}
inline StringRef operator*() const { return data->get_data_at(index); }
inline Iterator& operator++() {
++index;
return *this;
}
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-member-function"
#endif
inline Iterator& operator--() {
--index;
return *this;
}
#ifdef __clang__
#pragma clang diagnostic pop
#endif
inline difference_type operator-(const Iterator& rhs) const { return index - rhs.index; }
const ColumnString* data;
size_t index;
};
Iterator start_it(&sparse_data_paths, start);
Iterator end_it(&sparse_data_paths, end);
auto it = std::lower_bound(start_it, end_it, path);
return it.index;
}
void ColumnVariant::fill_path_column_from_sparse_data(Subcolumn& subcolumn, NullMap* null_map,
StringRef path,
const ColumnPtr& sparse_data_column,
size_t start, size_t end) {
const auto& sparse_data_map = assert_cast<const ColumnMap&>(*sparse_data_column);
const auto& sparse_data_offsets = sparse_data_map.get_offsets();
size_t first_offset = sparse_data_offsets[static_cast<ssize_t>(start) - 1];
size_t last_offset = sparse_data_offsets[static_cast<ssize_t>(end) - 1];
// Check if we have at least one row with data.
if (first_offset == last_offset) {
if (null_map) {
null_map->resize_fill(end - start, 1);
}
subcolumn.insert_many_defaults(end - start);
return;
}
const auto& sparse_data_paths = assert_cast<const ColumnString&>(sparse_data_map.get_keys());
const auto& sparse_data_values = assert_cast<const ColumnString&>(sparse_data_map.get_values());
for (size_t i = start; i != end; ++i) {
size_t paths_start = sparse_data_offsets[static_cast<ssize_t>(i) - 1];
size_t paths_end = sparse_data_offsets[static_cast<ssize_t>(i)];
auto lower_bound_path_index = ColumnVariant::find_path_lower_bound_in_sparse_data(
path, sparse_data_paths, paths_start, paths_end);
bool is_null = false;
if (lower_bound_path_index != paths_end &&
sparse_data_paths.get_data_at(lower_bound_path_index) == path) {
// auto value_data = sparse_data_values.get_data_at(lower_bound_path_index);
// ReadBufferFromMemory buf(value_data.data, value_data.size);
// dynamic_serialization->deserializeBinary(path_column, buf, getFormatSettings());
const auto& data = ColumnVariant::deserialize_from_sparse_column(
&sparse_data_values, lower_bound_path_index);
subcolumn.insert(data.first, data.second);
is_null = false;
} else {
subcolumn.insert_default();
is_null = true;
}
if (null_map) {
null_map->push_back(is_null);
}
}
}
MutableColumnPtr ColumnVariant::clone() const {
auto res = ColumnVariant::create(_max_subcolumns_count);
Subcolumns new_subcolumns;
for (const auto& subcolumn : subcolumns) {
auto new_subcolumn = subcolumn->data;
if (subcolumn->data.is_root) {
new_subcolumns.create_root(std::move(new_subcolumn));
} else if (!new_subcolumns.add(subcolumn->path, std::move(new_subcolumn))) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "add path {} is error in clone()",
subcolumn->path.get_path());
}
}
if (!new_subcolumns.get_root()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "root is nullptr in clone()");
}
res->subcolumns = std::move(new_subcolumns);
auto&& column = serialized_sparse_column->get_ptr();
auto sparse_column = std::move(*column).mutate();
res->serialized_sparse_column = sparse_column->assume_mutable();
res->set_num_rows(num_rows);
ENABLE_CHECK_CONSISTENCY(res.get());
return res;
}
} // namespace doris::vectorized