blob: 0ceddf25b3feee01e34fe6bc09d5e943aa6cf7cf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/common/schema_util.h"
#include <assert.h>
#include <fmt/format.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/MasterService_types.h>
#include <gen_cpp/Status_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <algorithm>
#include <cstdint>
#include <memory>
#include <ostream>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "exprs/json_functions.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "udf/udf.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_object.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/typeid_cast.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_object.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/get_least_supertype.h"
#include "vec/functions/function.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/json/parse2column.h"
#include "vec/json/path_in_data.h"
namespace doris::vectorized::schema_util {
size_t get_number_of_dimensions(const IDataType& type) {
if (const auto* type_array = typeid_cast<const DataTypeArray*>(&type)) {
return type_array->get_number_of_dimensions();
}
return 0;
}
size_t get_number_of_dimensions(const IColumn& column) {
if (const auto* column_array = check_and_get_column<ColumnArray>(column)) {
return column_array->get_number_of_dimensions();
}
return 0;
}
DataTypePtr get_base_type_of_array(const DataTypePtr& type) {
/// Get raw pointers to avoid extra copying of type pointers.
const DataTypeArray* last_array = nullptr;
const auto* current_type = type.get();
while (const auto* type_array = typeid_cast<const DataTypeArray*>(current_type)) {
current_type = type_array->get_nested_type().get();
last_array = type_array;
}
return last_array ? last_array->get_nested_type() : type;
}
Array create_empty_array_field(size_t num_dimensions) {
DCHECK(num_dimensions > 0);
Array array;
Array* current_array = &array;
for (size_t i = 1; i < num_dimensions; ++i) {
current_array->push_back(Array());
current_array = &current_array->back().get<Array&>();
}
return array;
}
bool is_conversion_required_between_integers(const IDataType& lhs, const IDataType& rhs) {
WhichDataType which_lhs(lhs);
WhichDataType which_rhs(rhs);
bool is_native_int = which_lhs.is_native_int() && which_rhs.is_native_int();
bool is_native_uint = which_lhs.is_native_uint() && which_rhs.is_native_uint();
return (is_native_int || is_native_uint) &&
lhs.get_size_of_value_in_memory() <= rhs.get_size_of_value_in_memory();
}
bool is_conversion_required_between_integers(FieldType lhs, FieldType rhs) {
// We only support signed integers for semi-structure data at present
// TODO add unsigned integers
if (lhs == FieldType::OLAP_FIELD_TYPE_BIGINT) {
return !(rhs == FieldType::OLAP_FIELD_TYPE_TINYINT ||
rhs == FieldType::OLAP_FIELD_TYPE_SMALLINT ||
rhs == FieldType::OLAP_FIELD_TYPE_INT || rhs == FieldType::OLAP_FIELD_TYPE_BIGINT);
}
if (lhs == FieldType::OLAP_FIELD_TYPE_INT) {
return !(rhs == FieldType::OLAP_FIELD_TYPE_TINYINT ||
rhs == FieldType::OLAP_FIELD_TYPE_SMALLINT ||
rhs == FieldType::OLAP_FIELD_TYPE_INT);
}
if (lhs == FieldType::OLAP_FIELD_TYPE_SMALLINT) {
return !(rhs == FieldType::OLAP_FIELD_TYPE_TINYINT ||
rhs == FieldType::OLAP_FIELD_TYPE_SMALLINT);
}
if (lhs == FieldType::OLAP_FIELD_TYPE_TINYINT) {
return !(rhs == FieldType::OLAP_FIELD_TYPE_TINYINT);
}
return true;
}
Status cast_column(const ColumnWithTypeAndName& arg, const DataTypePtr& type, ColumnPtr* result) {
ColumnsWithTypeAndName arguments {
arg, {type->create_column_const_with_default_value(1), type, type->get_name()}};
auto function = SimpleFunctionFactory::instance().get_function("CAST", arguments, type);
if (!function) {
return Status::InternalError("Not found cast function {} to {}", arg.type->get_name(),
type->get_name());
}
Block tmp_block {arguments};
size_t result_column = tmp_block.columns();
auto ctx = FunctionContext::create_context(nullptr, {}, {});
// We convert column string to jsonb type just add a string jsonb field to dst column instead of parse
// each line in original string column.
ctx->set_string_as_jsonb_string(true);
tmp_block.insert({nullptr, type, arg.name});
RETURN_IF_ERROR(
function->execute(ctx.get(), tmp_block, {0}, result_column, arg.column->size()));
*result = tmp_block.get_by_position(result_column).column->convert_to_full_column_if_const();
return Status::OK();
}
void get_column_by_type(const vectorized::DataTypePtr& data_type, const std::string& name,
TabletColumn& column, const ExtraInfo& ext_info) {
column.set_name(name);
column.set_type(data_type->get_storage_field_type());
if (ext_info.unique_id >= 0) {
column.set_unique_id(ext_info.unique_id);
}
if (ext_info.parent_unique_id >= 0) {
column.set_parent_unique_id(ext_info.parent_unique_id);
}
if (!ext_info.path_info.empty()) {
column.set_path_info(ext_info.path_info);
}
if (data_type->is_nullable()) {
const auto& real_type = static_cast<const DataTypeNullable&>(*data_type);
column.set_is_nullable(true);
get_column_by_type(real_type.get_nested_type(), name, column, {});
return;
}
if (data_type->get_type_id() == TypeIndex::Array) {
TabletColumn child;
get_column_by_type(assert_cast<const DataTypeArray*>(data_type.get())->get_nested_type(),
"", child, {});
column.set_length(TabletColumn::get_field_length_by_type(TPrimitiveType::ARRAY, 0));
column.add_sub_column(child);
column.set_default_value("[]");
return;
}
// size is not fixed when type is string or json
if (WhichDataType(*data_type).is_string() || WhichDataType(*data_type).is_json()) {
return;
}
if (WhichDataType(*data_type).is_simple()) {
column.set_length(data_type->get_size_of_value_in_memory());
return;
}
// TODO handle more types like struct/date/datetime/decimal...
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
TabletColumn get_column_by_type(const vectorized::DataTypePtr& data_type, const std::string& name,
const ExtraInfo& ext_info) {
TabletColumn result;
get_column_by_type(data_type, name, result, ext_info);
return result;
}
TabletColumn get_least_type_column(const TabletColumn& original, const DataTypePtr& new_type,
const ExtraInfo& ext_info, bool* changed) {
TabletColumn result_column;
vectorized::DataTypePtr original_type = original.get_vec_type();
vectorized::DataTypePtr common_type;
vectorized::get_least_supertype<vectorized::LeastSupertypeOnError::Jsonb>(
vectorized::DataTypes {original_type, new_type}, &common_type);
if (!original_type->equals(*common_type)) {
// update to common type
*changed = true;
vectorized::schema_util::get_column_by_type(common_type, original.name(), result_column,
ext_info);
} else {
*changed = false;
result_column = original;
result_column.set_parent_unique_id(ext_info.parent_unique_id);
result_column.set_unique_id(ext_info.unique_id);
result_column.set_path_info(ext_info.path_info);
}
return result_column;
}
void update_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
TabletSchemaSPtr& common_schema, int32_t variant_col_unique_id) {
// Types of subcolumns by path from all tuples.
std::unordered_map<PathInData, DataTypes, PathInData::Hash> subcolumns_types;
for (const TabletSchemaSPtr& schema : schemas) {
for (const TabletColumn& col : schema->columns()) {
// Get subcolumns of this variant
if (!col.path_info().empty() && col.parent_unique_id() > 0 &&
col.parent_unique_id() == variant_col_unique_id) {
subcolumns_types[col.path_info()].push_back(
DataTypeFactory::instance().create_data_type(col, col.is_nullable()));
}
}
}
PathsInData tuple_paths;
DataTypes tuple_types;
// Get the least common type for all paths.
for (const auto& [key, subtypes] : subcolumns_types) {
assert(!subtypes.empty());
if (key.get_path() == ColumnObject::COLUMN_NAME_DUMMY) {
continue;
}
size_t first_dim = get_number_of_dimensions(*subtypes[0]);
tuple_paths.emplace_back(key);
for (size_t i = 1; i < subtypes.size(); ++i) {
if (first_dim != get_number_of_dimensions(*subtypes[i])) {
tuple_types.emplace_back(make_nullable(std::make_shared<DataTypeJsonb>()));
LOG(INFO) << fmt::format(
"Uncompatible types of subcolumn '{}': {} and {}, cast to JSONB",
key.get_path(), subtypes[0]->get_name(), subtypes[i]->get_name());
break;
}
}
if (tuple_paths.size() == tuple_types.size()) {
continue;
}
DataTypePtr common_type;
get_least_supertype<LeastSupertypeOnError::Jsonb>(subtypes, &common_type);
if (!common_type->is_nullable()) {
common_type = make_nullable(common_type);
}
tuple_types.emplace_back(common_type);
}
CHECK_EQ(tuple_paths.size(), tuple_types.size());
std::string variant_col_name = common_schema->column_by_uid(variant_col_unique_id).name();
// Append all common type columns of this variant
for (int i = 0; i < tuple_paths.size(); ++i) {
TabletColumn common_column;
// const std::string& column_name = variant_col_name + "." + tuple_paths[i].get_path();
get_column_by_type(tuple_types[i], tuple_paths[i].get_path(), common_column,
ExtraInfo {.unique_id = -1,
.parent_unique_id = variant_col_unique_id,
.path_info = tuple_paths[i]});
// set ColumnType::VARIANT to occupy _field_path_to_index
common_schema->append_column(common_column, TabletSchema::ColumnType::VARIANT);
}
}
void inherit_tablet_index(TabletSchemaSPtr& schema) {
std::unordered_map<int32_t, TabletIndex> variants_index_meta;
// Get all variants tablet index metas if exist
for (const auto& col : schema->columns()) {
auto index_meta = schema->get_inverted_index(col.unique_id(), "");
if (col.is_variant_type() && index_meta != nullptr) {
variants_index_meta.emplace(col.unique_id(), *index_meta);
}
}
// Add index meta if extracted column is missing index meta
for (const auto& col : schema->columns()) {
if (!col.is_extracted_column()) {
continue;
}
auto it = variants_index_meta.find(col.parent_unique_id());
// variant has no index meta, ignore
if (it == variants_index_meta.end()) {
continue;
}
auto index_meta = schema->get_inverted_index(col);
// add index meta
TabletIndex index_info = it->second;
index_info.set_escaped_escaped_index_suffix_path(col.path_info().get_path());
if (index_meta != nullptr) {
// already exist
schema->update_index(col, index_info);
} else {
schema->append_index(index_info);
}
}
}
Status get_least_common_schema(const std::vector<TabletSchemaSPtr>& schemas,
const TabletSchemaSPtr& base_schema, TabletSchemaSPtr& output_schema,
bool check_schema_size) {
std::vector<int32_t> variant_column_unique_id;
// Construct a schema excluding the extracted columns and gather unique identifiers for variants.
// Ensure that the output schema also excludes these extracted columns. This approach prevents
// duplicated paths following the update_least_common_schema process.
auto build_schema_without_extracted_columns = [&](const TabletSchemaSPtr& base_schema) {
output_schema = std::make_shared<TabletSchema>();
output_schema->copy_from(*base_schema);
// Merge columns from other schemas
output_schema->clear_columns();
// Get all columns without extracted columns and collect variant col unique id
for (const TabletColumn& col : base_schema->columns()) {
if (col.is_variant_type()) {
variant_column_unique_id.push_back(col.unique_id());
}
if (!col.is_extracted_column()) {
output_schema->append_column(col);
}
}
};
if (base_schema == nullptr) {
// Pick tablet schema with max schema version
auto max_version_schema =
*std::max_element(schemas.cbegin(), schemas.cend(),
[](const TabletSchemaSPtr a, const TabletSchemaSPtr b) {
return a->schema_version() < b->schema_version();
});
CHECK(max_version_schema);
build_schema_without_extracted_columns(max_version_schema);
} else {
// use input base_schema schema as base schema
build_schema_without_extracted_columns(base_schema);
}
for (int32_t unique_id : variant_column_unique_id) {
update_least_common_schema(schemas, output_schema, unique_id);
}
inherit_tablet_index(output_schema);
if (check_schema_size &&
output_schema->columns().size() > config::variant_max_merged_tablet_schema_size) {
return Status::DataQualityError("Reached max column size limit {}",
config::variant_max_merged_tablet_schema_size);
}
return Status::OK();
}
Status parse_and_encode_variant_columns(Block& block, const std::vector<int>& variant_pos) {
try {
// Parse each variant column from raw string column
RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_pos));
vectorized::schema_util::finalize_variant_columns(block, variant_pos,
false /*not ingore sparse*/);
vectorized::schema_util::encode_variant_sparse_subcolumns(block, variant_pos);
} catch (const doris::Exception& e) {
// TODO more graceful, max_filter_ratio
LOG(WARNING) << "encounter execption " << e.to_string();
return Status::InternalError(e.to_string());
}
return Status::OK();
}
Status parse_variant_columns(Block& block, const std::vector<int>& variant_pos) {
for (int i = 0; i < variant_pos.size(); ++i) {
const auto& column_ref = block.get_by_position(variant_pos[i]).column;
bool is_nullable = column_ref->is_nullable();
const auto& column = remove_nullable(column_ref);
const auto& var = assert_cast<const ColumnObject&>(*column.get());
var.assume_mutable_ref().finalize();
if (!var.is_scalar_variant()) {
// already parsed
continue;
}
ColumnPtr raw_json_column;
if (WhichDataType(remove_nullable(var.get_root_type())).is_json()) {
// TODO more efficient way to parse jsonb type, currently we just convert jsonb to
// json str and parse them into variant
RETURN_IF_ERROR(cast_column({var.get_root(), var.get_root_type(), ""},
var.get_root()->is_nullable()
? make_nullable(std::make_shared<DataTypeString>())
: std::make_shared<DataTypeString>(),
&raw_json_column));
if (raw_json_column->is_nullable()) {
raw_json_column = assert_cast<const ColumnNullable*>(raw_json_column.get())
->get_nested_column_ptr();
}
} else {
const auto& root = *var.get_root();
raw_json_column =
root.is_nullable()
? assert_cast<const ColumnNullable&>(root).get_nested_column_ptr()
: var.get_root();
}
MutableColumnPtr variant_column = ColumnObject::create(true);
parse_json_to_variant(*variant_column.get(),
assert_cast<const ColumnString&>(*raw_json_column));
// Wrap variant with nullmap if it is nullable
ColumnPtr result = variant_column->get_ptr();
if (is_nullable) {
const auto& null_map =
assert_cast<const ColumnNullable&>(*column_ref).get_null_map_column_ptr();
result = ColumnNullable::create(result, null_map);
}
block.get_by_position(variant_pos[i]).column = result;
// block.get_by_position(variant_pos[i]).type = std::make_shared<DataTypeObject>("json", true);
}
return Status::OK();
}
void finalize_variant_columns(Block& block, const std::vector<int>& variant_pos,
bool ignore_sparse) {
for (int i = 0; i < variant_pos.size(); ++i) {
auto& column_ref = block.get_by_position(variant_pos[i]).column->assume_mutable_ref();
auto& column =
column_ref.is_nullable()
? assert_cast<ColumnObject&>(
assert_cast<ColumnNullable&>(column_ref).get_nested_column())
: assert_cast<ColumnObject&>(column_ref);
column.finalize(ignore_sparse);
}
}
void encode_variant_sparse_subcolumns(Block& block, const std::vector<int>& variant_pos) {
for (int i = 0; i < variant_pos.size(); ++i) {
auto& column_ref = block.get_by_position(variant_pos[i]).column->assume_mutable_ref();
auto& column =
column_ref.is_nullable()
? assert_cast<ColumnObject&>(
assert_cast<ColumnNullable&>(column_ref).get_nested_column())
: assert_cast<ColumnObject&>(column_ref);
// Make sure the root node is jsonb storage type
auto expected_root_type = make_nullable(std::make_shared<ColumnObject::MostCommonType>());
column.ensure_root_node_type(expected_root_type);
column.merge_sparse_to_root_column();
}
}
// ---------------------------
Status extract(ColumnPtr source, const PathInData& path, MutableColumnPtr& dst) {
auto type_string = std::make_shared<DataTypeString>();
std::string jsonpath = path.to_jsonpath();
bool is_nullable = source->is_nullable();
auto json_type = is_nullable ? make_nullable(std::make_shared<DataTypeJsonb>())
: std::make_shared<DataTypeJsonb>();
ColumnsWithTypeAndName arguments {
{source, json_type, ""},
{type_string->create_column_const(1, Field(jsonpath.data(), jsonpath.size())),
type_string, ""}};
auto function =
SimpleFunctionFactory::instance().get_function("jsonb_extract", arguments, json_type);
if (!function) {
return Status::InternalError("Not found function jsonb_extract");
}
Block tmp_block {arguments};
vectorized::ColumnNumbers argnum;
argnum.emplace_back(0);
argnum.emplace_back(1);
size_t result_column = tmp_block.columns();
tmp_block.insert({nullptr, json_type, ""});
RETURN_IF_ERROR(function->execute(nullptr, tmp_block, argnum, result_column, source->size()));
dst = tmp_block.get_by_position(result_column).column->assume_mutable();
return Status::OK();
}
// ---------------------------
std::string dump_column(DataTypePtr type, const ColumnPtr& col) {
Block tmp;
tmp.insert(ColumnWithTypeAndName {col, type, col->get_name()});
return tmp.dump_data(0, tmp.rows());
}
// ---------------------------
} // namespace doris::vectorized::schema_util