blob: 95c92673cbe678f081f9e7ed41a4e20eb4f85d41 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/tablet_info.h"
#include <butil/logging.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <tuple>
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/tablet_schema.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/large_int_value.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/primitive_type.h"
#include "runtime/raw_value.h"
#include "runtime/types.h"
#include "util/string_parser.hpp"
#include "util/string_util.h"
#include "vec/columns/column.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"
// NOLINTNEXTLINE(unused-includes)
#include "vec/exprs/vexpr_context.h" // IWYU pragma: keep
#include "vec/exprs/vliteral.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris {
void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
pindex->set_id(index_id);
pindex->set_schema_hash(schema_hash);
for (auto* slot : slots) {
pindex->add_columns(slot->col_name());
}
for (auto* column : columns) {
column->to_schema_pb(pindex->add_columns_desc());
}
for (auto* index : indexes) {
index->to_schema_pb(pindex->add_indexes_desc());
}
}
bool VOlapTablePartKeyComparator::operator()(const BlockRowWithIndicator& lhs,
const BlockRowWithIndicator& rhs) const {
vectorized::Block* l_block = std::get<0>(lhs);
vectorized::Block* r_block = std::get<0>(rhs);
int32_t l_row = std::get<1>(lhs);
int32_t r_row = std::get<1>(rhs);
bool l_use_new = std::get<2>(lhs);
bool r_use_new = std::get<2>(rhs);
VLOG_TRACE << '\n' << l_block->dump_data() << '\n' << r_block->dump_data();
if (l_row == -1) {
return false;
} else if (r_row == -1) {
return true;
}
if (_param_locs.empty()) { // no transform, use origin column
for (auto slot_loc : _slot_locs) {
auto res = l_block->get_by_position(slot_loc).column->compare_at(
l_row, r_row, *r_block->get_by_position(slot_loc).column, -1);
if (res != 0) {
return res < 0;
}
}
} else { // use transformed column to compare
DCHECK(_slot_locs.size() == _param_locs.size())
<< _slot_locs.size() << ' ' << _param_locs.size();
const std::vector<uint16_t>* l_index = l_use_new ? &_param_locs : &_slot_locs;
const std::vector<uint16_t>* r_index = r_use_new ? &_param_locs : &_slot_locs;
for (int i = 0; i < _slot_locs.size(); i++) {
vectorized::ColumnPtr l_col = l_block->get_by_position((*l_index)[i]).column;
vectorized::ColumnPtr r_col = r_block->get_by_position((*r_index)[i]).column;
auto res = l_col->compare_at(l_row, r_row, *r_col, -1);
if (res != 0) {
return res < 0;
}
}
}
// equal, return false
return false;
}
Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_db_id = pschema.db_id();
_table_id = pschema.table_id();
_version = pschema.version();
if (pschema.has_unique_key_update_mode()) {
_unique_key_update_mode = pschema.unique_key_update_mode();
if (pschema.has_sequence_map_col_unique_id()) {
_sequence_map_col_uid = pschema.sequence_map_col_unique_id();
}
} else {
// for backward compatibility
if (pschema.has_partial_update() && pschema.partial_update()) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
_is_strict_mode = pschema.is_strict_mode();
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = pschema.auto_increment_column();
if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) {
return Status::InternalError(
"Auto increment column id is not set in FE. Maybe FE is an older version "
"different from BE.");
}
_auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
}
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) {
if (pschema.has_partial_update_new_key_policy()) {
_partial_update_new_row_policy = pschema.partial_update_new_key_policy();
}
}
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
}
_timezone = pschema.timezone();
for (const auto& col : pschema.partial_update_input_columns()) {
_partial_update_input_columns.insert(col);
}
std::unordered_map<std::string, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
for (const auto& p_slot_desc : pschema.slot_descs()) {
auto* slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
_tuple_desc->add_slot(slot_desc);
std::string data_type;
EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
std::string is_null_str = slot_desc->is_nullable() ? "true" : "false";
std::string data_type_str =
std::to_string(int64_t(TabletColumn::get_field_type_by_string(data_type)));
slots_map.emplace(to_lower(slot_desc->col_name()) + "+" + data_type_str + is_null_str,
slot_desc);
}
for (const auto& p_index : pschema.indexes()) {
auto* index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
for (const auto& pcolumn_desc : p_index.columns_desc()) {
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(pcolumn_desc.name())) {
std::string is_null_str = pcolumn_desc.is_nullable() ? "true" : "false";
std::string data_type_str = std::to_string(
int64_t(TabletColumn::get_field_type_by_string(pcolumn_desc.type())));
auto it = slots_map.find(to_lower(pcolumn_desc.name()) + "+" + data_type_str +
is_null_str);
if (it == std::end(slots_map)) {
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\npschema={}",
pcolumn_desc.name(), pcolumn_desc.type(), data_type_str, is_null_str,
keys, debug_string(), pschema.ShortDebugString());
return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
}
index->slots.emplace_back(it->second);
}
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
}
for (const auto& pindex_desc : p_index.indexes_desc()) {
TabletIndex* ti = _obj_pool.add(new TabletIndex());
ti->init_from_pb(pindex_desc);
index->indexes.emplace_back(ti);
}
_indexes.emplace_back(index);
}
std::sort(_indexes.begin(), _indexes.end(),
[](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
return lhs->index_id < rhs->index_id;
});
return Status::OK();
}
Status OlapTableSchemaParam::init_unique_key_update_mode(const TOlapTableSchemaParam& tschema) {
if (tschema.__isset.unique_key_update_mode) {
switch (tschema.unique_key_update_mode) {
case doris::TUniqueKeyUpdateMode::UPSERT: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
break;
}
default: {
return Status::InternalError(
"Unknown unique_key_update_mode: {}, should be one of "
"UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS",
tschema.unique_key_update_mode);
}
}
if (tschema.__isset.sequence_map_col_unique_id) {
_sequence_map_col_uid = tschema.sequence_map_col_unique_id;
}
} else {
// for backward compatibility
if (tschema.__isset.is_partial_update && tschema.is_partial_update) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
return Status::OK();
}
Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_db_id = tschema.db_id;
_table_id = tschema.table_id;
_version = tschema.version;
RETURN_IF_ERROR(init_unique_key_update_mode(tschema));
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = tschema.auto_increment_column;
if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) {
return Status::InternalError(
"Auto increment column id is not set in FE. Maybe FE is an older version "
"different from BE.");
}
_auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;
}
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) {
if (tschema.__isset.partial_update_new_key_policy) {
switch (tschema.partial_update_new_key_policy) {
case doris::TPartialUpdateNewRowPolicy::APPEND: {
_partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND;
break;
}
case doris::TPartialUpdateNewRowPolicy::ERROR: {
_partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR;
break;
}
default: {
return Status::InvalidArgument(
"Unknown partial_update_new_key_behavior: {}, should be one of "
"'APPEND' or 'ERROR'",
tschema.partial_update_new_key_policy);
}
}
}
}
for (const auto& tcolumn : tschema.partial_update_input_columns) {
_partial_update_input_columns.insert(tcolumn);
}
std::unordered_map<std::string, SlotDescriptor*> slots_map;
_tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
for (const auto& t_slot_desc : tschema.slot_descs) {
auto* slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
_tuple_desc->add_slot(slot_desc);
std::string is_null_str = slot_desc->is_nullable() ? "true" : "false";
std::string data_type_str = std::to_string(int64_t(slot_desc->col_type()));
slots_map.emplace(to_lower(slot_desc->col_name()) + "+" + data_type_str + is_null_str,
slot_desc);
}
for (const auto& t_index : tschema.indexes) {
std::unordered_map<std::string, int32_t> index_slots_map;
auto* index = _obj_pool.add(new OlapTableIndexSchema());
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (const auto& tcolumn_desc : t_index.columns_desc) {
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(tcolumn_desc.column_name)) {
std::string is_null_str = tcolumn_desc.is_allow_null ? "true" : "false";
std::string data_type_str =
std::to_string(int64_t(thrift_to_type(tcolumn_desc.column_type.type)));
auto it = slots_map.find(to_lower(tcolumn_desc.column_name) + "+" + data_type_str +
is_null_str);
if (it == slots_map.end()) {
std::stringstream ss;
ss << tschema;
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\ntschema={}",
tcolumn_desc.column_name, tcolumn_desc.column_type.type, data_type_str,
is_null_str, keys, debug_string(), ss.str());
return Status::InternalError("unknown index column, column={}, type={}",
tcolumn_desc.column_name,
tcolumn_desc.column_type.type);
}
index->slots.emplace_back(it->second);
}
index_slots_map.emplace(to_lower(tcolumn_desc.column_name), tcolumn_desc.col_unique_id);
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
}
if (t_index.__isset.indexes_desc) {
for (const auto& tindex_desc : t_index.indexes_desc) {
std::vector<int32_t> column_unique_ids(tindex_desc.columns.size());
for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
auto it = index_slots_map.find(to_lower(tindex_desc.columns[i]));
if (it != index_slots_map.end()) {
column_unique_ids[i] = it->second;
}
}
TabletIndex* ti = _obj_pool.add(new TabletIndex());
ti->init_from_thrift(tindex_desc, column_unique_ids);
index->indexes.emplace_back(ti);
}
}
if (t_index.__isset.where_clause) {
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_tree(t_index.where_clause, index->where_clause));
}
_indexes.emplace_back(index);
}
std::sort(_indexes.begin(), _indexes.end(),
[](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
return lhs->index_id < rhs->index_id;
});
return Status::OK();
}
void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_db_id(_db_id);
pschema->set_table_id(_table_id);
pschema->set_version(_version);
pschema->set_unique_key_update_mode(_unique_key_update_mode);
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
// for backward compatibility
pschema->set_partial_update(true);
}
pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy);
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
pschema->set_sequence_map_col_unique_id(_sequence_map_col_uid);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
_tuple_desc->to_protobuf(pschema->mutable_tuple_desc());
for (auto* slot : _tuple_desc->slots()) {
slot->to_protobuf(pschema->add_slot_descs());
}
for (auto* index : _indexes) {
index->to_protobuf(pschema->add_indexes());
}
}
std::string OlapTableSchemaParam::debug_string() const {
std::stringstream ss;
ss << "tuple_desc=" << _tuple_desc->debug_string();
return ss.str();
}
VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema,
const TOlapTablePartitionParam& t_param)
: _schema(schema),
_t_param(t_param),
_slots(_schema->tuple_desc()->slots()),
_mem_tracker(std::make_unique<MemTracker>("OlapTablePartitionParam")),
_part_type(t_param.partition_type) {
if (t_param.__isset.enable_automatic_partition && t_param.enable_automatic_partition) {
_is_auto_partition = true;
auto size = t_param.partition_function_exprs.size();
_part_func_ctx.resize(size);
_partition_function.resize(size);
DCHECK((t_param.partition_type == TPartitionType::RANGE_PARTITIONED && size == 1) ||
(t_param.partition_type == TPartitionType::LIST_PARTITIONED && size >= 1))
<< "now support only 1 partition column for auto range partitions. "
<< t_param.partition_type << " " << size;
for (int i = 0; i < size; ++i) {
Status st = vectorized::VExpr::create_expr_tree(t_param.partition_function_exprs[i],
_part_func_ctx[i]);
if (!st.ok()) {
throw Exception(Status::InternalError("Partition function expr is not valid"),
"Partition function expr is not valid");
}
_partition_function[i] = _part_func_ctx[i]->root();
}
}
if (t_param.__isset.enable_auto_detect_overwrite && t_param.enable_auto_detect_overwrite) {
_is_auto_detect_overwrite = true;
DCHECK(t_param.__isset.overwrite_group_id);
_overwrite_group_id = t_param.overwrite_group_id;
}
if (_is_auto_partition) {
// the nullable mode depends on partition_exprs. not column slots. so use them.
DCHECK(_partition_function.size() <= _slots.size())
<< _partition_function.size() << ", " << _slots.size();
// suppose (k0, [k1], [k2]), so get [k1, 0], [k2, 1]
std::map<std::string, int> partition_slots_map; // name to idx in part_exprs
for (size_t i = 0; i < t_param.partition_columns.size(); i++) {
partition_slots_map.emplace(t_param.partition_columns[i], i);
}
// here we rely on the same order and number of the _part_funcs and _slots in the prefix
// _part_block contains all slots of table.
for (auto* slot : _slots) {
// try to replace with partition expr.
if (auto it = partition_slots_map.find(slot->col_name());
it != partition_slots_map.end()) { // it's a partition column slot
auto& expr_type = _partition_function[it->second]->data_type();
_partition_block.insert({expr_type->create_column(), expr_type, slot->col_name()});
} else {
_partition_block.insert({slot->get_empty_mutable_column(),
slot->get_data_type_ptr(), slot->col_name()});
}
}
VLOG_TRACE << _partition_block.dump_structure();
} else {
// we insert all. but not all will be used. it will controlled by _partition_slot_locs
for (auto* slot : _slots) {
_partition_block.insert({slot->get_empty_mutable_column(), slot->get_data_type_ptr(),
slot->col_name()});
}
}
}
VOlapTablePartitionParam::~VOlapTablePartitionParam() {
_mem_tracker->release(_mem_usage);
}
Status VOlapTablePartitionParam::init() {
std::vector<std::string> slot_column_names;
for (auto* slot_desc : _schema->tuple_desc()->slots()) {
slot_column_names.emplace_back(slot_desc->col_name());
}
auto find_slot_locs = [&slot_column_names](const std::string& slot_name,
std::vector<uint16_t>& locs,
const std::string& column_type) {
auto it = std::find(slot_column_names.begin(), slot_column_names.end(), slot_name);
if (it == slot_column_names.end()) {
return Status::InternalError("{} column not found, column ={}", column_type, slot_name);
}
locs.emplace_back(it - slot_column_names.begin());
return Status::OK();
};
// here we find the partition columns. others maybe non-partition columns/special columns.
if (_t_param.__isset.partition_columns) {
for (auto& part_col : _t_param.partition_columns) {
RETURN_IF_ERROR(find_slot_locs(part_col, _partition_slot_locs, "partition"));
}
}
_partitions_map = std::make_unique<
std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>(
VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs));
if (_t_param.__isset.distributed_columns) {
for (auto& col : _t_param.distributed_columns) {
RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed"));
}
}
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
// initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
if (!_t_param.partitions_is_fake) {
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
}
} else {
_partitions_map->emplace(
std::tuple {part->end_key.first, part->end_key.second, false}, part);
}
}
}
_mem_usage = _partition_block.allocated_bytes();
_mem_tracker->consume(_mem_usage);
return Status::OK();
}
bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
BlockRowWithIndicator key) const {
VOlapTablePartKeyComparator comparator(_partition_slot_locs, _transformed_slot_locs);
// we have used upper_bound to find to ensure key < part.right and this part is closest(right - key is min)
// now we only have to check (key >= part.left). the comparator(a,b) means a < b, so we use anti
return part->start_key.second == -1 /* spj: start_key.second == -1 means only single partition*/
|| !comparator(key, std::tuple {part->start_key.first, part->start_key.second, false});
}
// insert value into _partition_block's column
// NOLINTBEGIN(readability-function-size)
static Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos) {
auto column = std::move(*part_key->first->get_by_position(pos).column).mutate();
//TODO: use assert_cast before insert_data
switch (t_expr.node_type) {
case TExprNodeType::DATE_LITERAL: {
if (vectorized::DataTypeFactory::instance()
.create_data_type(t_expr.type)
->get_primitive_type() == TYPE_DATEV2) {
DateV2Value<DateV2ValueType> dt;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
std::stringstream ss;
ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
return Status::InternalError(ss.str());
}
column->insert_data(reinterpret_cast<const char*>(&dt), 0);
} else if (vectorized::DataTypeFactory::instance()
.create_data_type(t_expr.type)
->get_primitive_type() == TYPE_DATETIMEV2) {
DateV2Value<DateTimeV2ValueType> dt;
const int32_t scale =
t_expr.type.types.empty() ? -1 : t_expr.type.types.front().scalar_type.scale;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size(), scale)) {
std::stringstream ss;
ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
return Status::InternalError(ss.str());
}
column->insert_data(reinterpret_cast<const char*>(&dt), 0);
} else {
VecDateTimeValue dt;
if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
std::stringstream ss;
ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
return Status::InternalError(ss.str());
}
column->insert_data(reinterpret_cast<const char*>(&dt), 0);
}
break;
}
case TExprNodeType::INT_LITERAL: {
switch (t_expr.type.types[0].scalar_type.type) {
case TPrimitiveType::TINYINT: {
int8_t value = t_expr.int_literal.value;
column->insert_data(reinterpret_cast<const char*>(&value), 0);
break;
}
case TPrimitiveType::SMALLINT: {
int16_t value = t_expr.int_literal.value;
column->insert_data(reinterpret_cast<const char*>(&value), 0);
break;
}
case TPrimitiveType::INT: {
int32_t value = t_expr.int_literal.value;
column->insert_data(reinterpret_cast<const char*>(&value), 0);
break;
}
default:
int64_t value = t_expr.int_literal.value;
column->insert_data(reinterpret_cast<const char*>(&value), 0);
}
break;
}
case TExprNodeType::LARGE_INT_LITERAL: {
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
auto value = StringParser::string_to_int<__int128>(t_expr.large_int_literal.value.c_str(),
t_expr.large_int_literal.value.size(),
&parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
value = MAX_INT128;
}
column->insert_data(reinterpret_cast<const char*>(&value), 0);
break;
}
case TExprNodeType::STRING_LITERAL: {
int len = t_expr.string_literal.value.size();
const char* str_val = t_expr.string_literal.value.c_str();
column->insert_data(str_val, len);
break;
}
case TExprNodeType::BOOL_LITERAL: {
column->insert_data(reinterpret_cast<const char*>(&t_expr.bool_literal.value), 0);
break;
}
case TExprNodeType::NULL_LITERAL: {
// insert a null literal
if (!column->is_nullable()) {
// https://github.com/apache/doris/pull/39449 have forbid this cause. always add this check as protective measures
return Status::InternalError("The column {} is not null, can't insert into NULL value.",
part_key->first->get_by_position(pos).name);
}
column->insert_data(nullptr, 0);
break;
}
default: {
return Status::InternalError("unsupported partition column node type, type={}",
t_expr.node_type);
}
}
part_key->second = column->size() - 1;
return Status::OK();
}
// NOLINTEND(readability-function-size)
Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs,
BlockRow* part_key) {
for (int i = 0; i < t_exprs.size(); i++) {
RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key, _partition_slot_locs[i]));
}
return Status::OK();
}
Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartition& t_part,
VOlapTablePartition*& part_result) {
DCHECK(part_result == nullptr);
// here we set the default value of partition bounds first! if it doesn't have some key, it will be -1.
part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block));
part_result->id = t_part.id;
part_result->is_mutable = t_part.is_mutable;
// only load_to_single_tablet = true will set load_tablet_idx
if (t_part.__isset.load_tablet_idx) {
part_result->load_tablet_idx = t_part.load_tablet_idx;
}
if (_is_in_partition) {
for (const auto& keys : t_part.in_keys) {
RETURN_IF_ERROR(_create_partition_keys(
keys, &part_result->in_keys.emplace_back(&_partition_block, -1)));
}
if (t_part.__isset.is_default_partition && t_part.is_default_partition &&
_default_partition == nullptr) {
_default_partition = part_result;
}
} else { // range
if (t_part.__isset.start_keys) {
RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part_result->start_key));
}
// we generate the right bound but not insert into partition map
if (t_part.__isset.end_keys) {
RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part_result->end_key));
}
}
part_result->num_buckets = t_part.num_buckets;
auto num_indexes = _schema->indexes().size();
if (t_part.indexes.size() != num_indexes) {
return Status::InternalError(
"number of partition's index is not equal with schema's"
", num_part_indexes={}, num_schema_indexes={}",
t_part.indexes.size(), num_indexes);
}
part_result->indexes = t_part.indexes;
std::sort(part_result->indexes.begin(), part_result->indexes.end(),
[](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
return lhs.index_id < rhs.index_id;
});
// check index
for (int j = 0; j < num_indexes; ++j) {
if (part_result->indexes[j].index_id != _schema->indexes()[j]->index_id) {
return Status::InternalError(
"partition's index is not equal with schema's"
", part_index={}, schema_index={}",
part_result->indexes[j].index_id, _schema->indexes()[j]->index_id);
}
}
if (t_part.__isset.total_replica_num) {
part_result->total_replica_num = t_part.total_replica_num;
}
if (t_part.__isset.load_required_replica_num) {
part_result->load_required_replica_num = t_part.load_required_replica_num;
}
return Status::OK();
}
Status VOlapTablePartitionParam::add_partitions(
const std::vector<TOlapTablePartition>& partitions) {
for (const auto& t_part : partitions) {
auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
part->id = t_part.id;
part->is_mutable = t_part.is_mutable;
// we dont pass right keys when it's MAX_VALUE. so there's possibility we only have start_key but not end_key
// range partition
if (t_part.__isset.start_keys) {
RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key));
}
if (t_part.__isset.end_keys) {
RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key));
}
// list partition - we only set 1 value in 1 partition for new created ones
if (t_part.__isset.in_keys) {
for (const auto& keys : t_part.in_keys) {
RETURN_IF_ERROR(_create_partition_keys(
keys, &part->in_keys.emplace_back(&_partition_block, -1)));
}
if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
_default_partition = part;
}
}
part->num_buckets = t_part.num_buckets;
auto num_indexes = _schema->indexes().size();
if (t_part.indexes.size() != num_indexes) {
return Status::InternalError(
"number of partition's index is not equal with schema's"
", num_part_indexes={}, num_schema_indexes={}",
t_part.indexes.size(), num_indexes);
}
part->indexes = t_part.indexes;
std::sort(part->indexes.begin(), part->indexes.end(),
[](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
return lhs.index_id < rhs.index_id;
});
// check index
for (int j = 0; j < num_indexes; ++j) {
if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
return Status::InternalError(
"partition's index is not equal with schema's"
", part_index={}, schema_index={}",
part->indexes[j].index_id, _schema->indexes()[j]->index_id);
}
}
_partitions.emplace_back(part);
// after _creating_partiton_keys
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
_partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
}
} else {
_partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false},
part);
}
}
return Status::OK();
}
Status VOlapTablePartitionParam::replace_partitions(
std::vector<int64_t>& old_partition_ids,
const std::vector<TOlapTablePartition>& new_partitions) {
// remove old replaced partitions
DCHECK(old_partition_ids.size() == new_partitions.size());
// init and add new partitions. insert into _partitions
for (int i = 0; i < new_partitions.size(); i++) {
const auto& t_part = new_partitions[i];
// pair old_partition_ids and new_partitions one by one. TODO: sort to opt performance
VOlapTablePartition* old_part = nullptr;
auto old_part_id = old_partition_ids[i];
if (auto it = std::find_if(
_partitions.begin(), _partitions.end(),
[=](const VOlapTablePartition* lhs) { return lhs->id == old_part_id; });
it != _partitions.end()) {
old_part = *it;
} else {
return Status::InternalError("Cannot find old tablet {} in replacing", old_part_id);
}
auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
part->id = t_part.id;
part->is_mutable = t_part.is_mutable;
/// just substitute directly. no need to remove and reinsert keys.
// range partition
part->start_key = std::move(old_part->start_key);
part->end_key = std::move(old_part->end_key);
// list partition
part->in_keys = std::move(old_part->in_keys);
if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
_default_partition = part;
}
part->num_buckets = t_part.num_buckets;
auto num_indexes = _schema->indexes().size();
if (t_part.indexes.size() != num_indexes) {
return Status::InternalError(
"number of partition's index is not equal with schema's"
", num_part_indexes={}, num_schema_indexes={}",
t_part.indexes.size(), num_indexes);
}
part->indexes = t_part.indexes;
std::sort(part->indexes.begin(), part->indexes.end(),
[](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
return lhs.index_id < rhs.index_id;
});
// check index
for (int j = 0; j < num_indexes; ++j) {
if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
return Status::InternalError(
"partition's index is not equal with schema's"
", part_index={}, schema_index={}",
part->indexes[j].index_id, _schema->indexes()[j]->index_id);
}
}
// add new partitions with new id.
_partitions.emplace_back(part);
VLOG_NOTICE << "params add new partition " << part->id;
// replace items in _partition_maps
if (_is_in_partition) {
for (auto& in_key : part->in_keys) {
(*_partitions_map)[std::tuple {in_key.first, in_key.second, false}] = part;
}
} else {
(*_partitions_map)[std::tuple {part->end_key.first, part->end_key.second, false}] =
part;
}
}
// remove old partitions by id
std::ranges::sort(old_partition_ids);
for (auto it = _partitions.begin(); it != _partitions.end();) {
if (std::ranges::binary_search(old_partition_ids, (*it)->id)) {
it = _partitions.erase(it);
} else {
it++;
}
}
return Status::OK();
}
} // namespace doris