blob: 2217b9833286cbe7022ffeec66251ce0cc1e639f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "format/transformer/iceberg_partition_function.h"
#include "common/cast_set.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "core/column/column_const.h"
#include "core/column/column_nullable.h"
#include "core/column/column_struct.h"
#include "core/data_type/data_type_struct.h"
#include "exec/sink/writer/iceberg/partition_transformers.h"
#include "format/table/iceberg/partition_spec.h"
#include "util/string_util.h"
namespace doris {
#include "common/compile_check_begin.h"
using HashValType = PartitionerBase::HashValType;
static void initialize_shuffle_hashes(std::vector<HashValType>& hashes, size_t rows,
ShuffleHashMethod method) {
hashes.resize(rows);
if (method == ShuffleHashMethod::CRC32C) {
constexpr HashValType CRC32C_SHUFFLE_SEED = 0x9E3779B9U;
std::fill(hashes.begin(), hashes.end(), CRC32C_SHUFFLE_SEED);
} else {
std::fill(hashes.begin(), hashes.end(), 0);
}
}
static void update_shuffle_hashes(const ColumnPtr& column, const DataTypePtr& type,
HashValType* __restrict result, ShuffleHashMethod method) {
if (method == ShuffleHashMethod::CRC32C) {
column->update_crc32c_batch(result, nullptr);
} else {
column->update_crcs_with_value(result, type->get_primitive_type(),
cast_set<HashValType>(column->size()));
}
}
static void apply_shuffle_channel_ids(std::vector<HashValType>& hashes, size_t partition_count,
ShuffleHashMethod method) {
for (auto& h : hashes) {
if (method == ShuffleHashMethod::CRC32C) {
h = crc32c_shuffle_mix(h) % partition_count;
} else {
h = h % partition_count;
}
}
}
IcebergInsertPartitionFunction::IcebergInsertPartitionFunction(
HashValType partition_count, ShuffleHashMethod hash_method,
std::vector<TExpr> partition_exprs, std::vector<TIcebergPartitionField> partition_fields)
: _partition_count(partition_count),
_hash_method(hash_method),
_partition_exprs(std::move(partition_exprs)),
_partition_fields_spec(std::move(partition_fields)) {}
Status IcebergInsertPartitionFunction::init(const std::vector<TExpr>& texprs) {
const auto& exprs = _partition_exprs.empty() ? texprs : _partition_exprs;
if (!exprs.empty()) {
RETURN_IF_ERROR(VExpr::create_expr_trees(exprs, _partition_expr_ctxs));
}
if (!_partition_fields_spec.empty()) {
_partition_fields.reserve(_partition_fields_spec.size());
for (const auto& field : _partition_fields_spec) {
VExprContextSPtr ctx;
RETURN_IF_ERROR(VExpr::create_expr_tree(field.source_expr, ctx));
InsertPartitionField insert_field;
insert_field.transform = field.transform;
insert_field.expr_ctx = std::move(ctx);
insert_field.source_id = field.__isset.source_id ? field.source_id : 0;
insert_field.name = field.__isset.name ? field.name : "";
_partition_fields.emplace_back(std::move(insert_field));
}
}
return Status::OK();
}
Status IcebergInsertPartitionFunction::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, row_desc));
if (!_partition_fields.empty()) {
VExprContextSPtrs field_ctxs;
field_ctxs.reserve(_partition_fields.size());
for (const auto& field : _partition_fields) {
field_ctxs.emplace_back(field.expr_ctx);
}
RETURN_IF_ERROR(VExpr::prepare(field_ctxs, state, row_desc));
}
return Status::OK();
}
Status IcebergInsertPartitionFunction::open(RuntimeState* state) {
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
if (!_partition_fields.empty()) {
VExprContextSPtrs field_ctxs;
field_ctxs.reserve(_partition_fields.size());
for (const auto& field : _partition_fields) {
field_ctxs.emplace_back(field.expr_ctx);
}
RETURN_IF_ERROR(VExpr::open(field_ctxs, state));
for (auto& field : _partition_fields) {
try {
doris::iceberg::PartitionField partition_field(field.source_id, 0, field.name,
field.transform);
field.transformer = PartitionColumnTransforms::create(
partition_field, field.expr_ctx->root()->data_type());
} catch (const doris::Exception& e) {
LOG(WARNING) << "Merge partitioning fallback to RR: " << e.what();
_fallback_to_random = true;
_partition_fields.clear();
break;
}
}
}
return Status::OK();
}
Status IcebergInsertPartitionFunction::get_partitions(RuntimeState* /*state*/, Block* block,
size_t partition_count,
std::vector<HashValType>& partitions) const {
if (_fallback_to_random) {
return Status::InternalError("Merge partitioning fallback to random");
}
if (partition_count == 0) {
return Status::InternalError("Partition count is zero");
}
if (!_partition_fields.empty()) {
RETURN_IF_ERROR(_compute_hashes_with_transform(block, partitions));
} else {
RETURN_IF_ERROR(_compute_hashes_with_exprs(block, partitions));
}
apply_shuffle_channel_ids(partitions, partition_count, _hash_method);
return Status::OK();
}
Status IcebergInsertPartitionFunction::clone(RuntimeState* state,
std::unique_ptr<PartitionFunction>& function) const {
auto* new_function = new IcebergInsertPartitionFunction(
_partition_count, _hash_method, _partition_exprs, _partition_fields_spec);
function.reset(new_function);
RETURN_IF_ERROR(
_clone_expr_ctxs(state, _partition_expr_ctxs, new_function->_partition_expr_ctxs));
if (!_partition_fields.empty()) {
VExprContextSPtrs src_field_ctxs;
src_field_ctxs.reserve(_partition_fields.size());
for (const auto& field : _partition_fields) {
src_field_ctxs.emplace_back(field.expr_ctx);
}
VExprContextSPtrs dst_field_ctxs;
RETURN_IF_ERROR(_clone_expr_ctxs(state, src_field_ctxs, dst_field_ctxs));
new_function->_partition_fields.reserve(dst_field_ctxs.size());
for (size_t i = 0; i < dst_field_ctxs.size(); ++i) {
InsertPartitionField field;
field.transform = _partition_fields[i].transform;
field.expr_ctx = dst_field_ctxs[i];
field.source_id = _partition_fields[i].source_id;
field.name = _partition_fields[i].name;
new_function->_partition_fields.emplace_back(std::move(field));
}
}
new_function->_fallback_to_random = _fallback_to_random;
return Status::OK();
}
Status IcebergInsertPartitionFunction::_compute_hashes_with_transform(
Block* block, std::vector<HashValType>& partitions) const {
const size_t rows = block->rows();
if (rows == 0) {
partitions.clear();
return Status::OK();
}
if (_partition_fields.empty()) {
return Status::InternalError("Merge partitioning insert fields are empty");
}
std::vector<int> results(_partition_fields.size());
for (size_t i = 0; i < _partition_fields.size(); ++i) {
RETURN_IF_ERROR(_partition_fields[i].expr_ctx->execute(block, &results[i]));
}
initialize_shuffle_hashes(partitions, rows, _hash_method);
auto* __restrict hash_values = partitions.data();
for (size_t i = 0; i < _partition_fields.size(); ++i) {
if (_partition_fields[i].transformer == nullptr) {
return Status::InternalError("Merge partitioning transform is not initialized");
}
ColumnWithTypeAndName transformed =
_partition_fields[i].transformer->apply(*block, results[i]);
const auto& [column, is_const] = unpack_if_const(transformed.column);
if (is_const) {
// A const column has the same value for all rows in this block,
// so it contributes an identical hash delta to every row and does
// not affect relative partition assignment. Actual partition
// placement is determined by downstream IcebergPartitionWriter.
continue;
}
update_shuffle_hashes(column, transformed.type, hash_values, _hash_method);
}
return Status::OK();
}
Status IcebergInsertPartitionFunction::_compute_hashes_with_exprs(
Block* block, std::vector<HashValType>& partitions) const {
const size_t rows = block->rows();
if (rows == 0) {
partitions.clear();
return Status::OK();
}
if (_partition_expr_ctxs.empty()) {
return Status::InternalError("Merge partitioning insert exprs are empty");
}
std::vector<ColumnWithTypeAndName> results(_partition_expr_ctxs.size());
for (size_t i = 0; i < _partition_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(block, results[i]));
}
initialize_shuffle_hashes(partitions, rows, _hash_method);
auto* __restrict hash_values = partitions.data();
for (size_t i = 0; i < results.size(); ++i) {
const auto& [column, is_const] = unpack_if_const(results[i].column);
if (is_const) {
// Same value for all rows — no effect on inter-row partitioning.
continue;
}
update_shuffle_hashes(column, results[i].type, hash_values, _hash_method);
}
return Status::OK();
}
Status IcebergInsertPartitionFunction::_clone_expr_ctxs(RuntimeState* state,
const VExprContextSPtrs& src,
VExprContextSPtrs& dst) const {
dst.resize(src.size());
for (size_t i = 0; i < src.size(); ++i) {
RETURN_IF_ERROR(src[i]->clone(state, dst[i]));
}
return Status::OK();
}
IcebergDeletePartitionFunction::IcebergDeletePartitionFunction(HashValType partition_count,
ShuffleHashMethod hash_method,
std::vector<TExpr> delete_exprs)
: _partition_count(partition_count),
_hash_method(hash_method),
_delete_exprs(std::move(delete_exprs)) {}
Status IcebergDeletePartitionFunction::init(const std::vector<TExpr>& texprs) {
const auto& exprs = _delete_exprs.empty() ? texprs : _delete_exprs;
if (!exprs.empty()) {
RETURN_IF_ERROR(VExpr::create_expr_trees(exprs, _delete_partition_expr_ctxs));
}
return Status::OK();
}
Status IcebergDeletePartitionFunction::prepare(RuntimeState* state, const RowDescriptor& row_desc) {
return VExpr::prepare(_delete_partition_expr_ctxs, state, row_desc);
}
Status IcebergDeletePartitionFunction::open(RuntimeState* state) {
return VExpr::open(_delete_partition_expr_ctxs, state);
}
Status IcebergDeletePartitionFunction::get_partitions(RuntimeState* /*state*/, Block* block,
size_t partition_count,
std::vector<HashValType>& partitions) const {
if (partition_count == 0) {
return Status::InternalError("Partition count is zero");
}
RETURN_IF_ERROR(_compute_hashes(block, partitions));
apply_shuffle_channel_ids(partitions, partition_count, _hash_method);
return Status::OK();
}
Status IcebergDeletePartitionFunction::clone(RuntimeState* state,
std::unique_ptr<PartitionFunction>& function) const {
auto* new_function =
new IcebergDeletePartitionFunction(_partition_count, _hash_method, _delete_exprs);
function.reset(new_function);
return _clone_expr_ctxs(state, _delete_partition_expr_ctxs,
new_function->_delete_partition_expr_ctxs);
}
Status IcebergDeletePartitionFunction::_compute_hashes(Block* block,
std::vector<HashValType>& partitions) const {
const size_t rows = block->rows();
if (rows == 0) {
partitions.clear();
return Status::OK();
}
if (_delete_partition_expr_ctxs.empty()) {
return Status::InternalError("Merge partitioning delete exprs are empty");
}
std::vector<ColumnWithTypeAndName> results(_delete_partition_expr_ctxs.size());
for (size_t i = 0; i < _delete_partition_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(_delete_partition_expr_ctxs[i]->execute(block, results[i]));
}
initialize_shuffle_hashes(partitions, rows, _hash_method);
auto* __restrict hash_values = partitions.data();
for (size_t i = 0; i < results.size(); ++i) {
const auto& [column, is_const] = unpack_if_const(results[i].column);
if (is_const) {
// Same value for all rows — no effect on inter-row partitioning.
continue;
}
ColumnPtr hash_col = column;
DataTypePtr hash_type = results[i].type;
RETURN_IF_ERROR(_get_delete_hash_column(results[i], &hash_col, &hash_type));
update_shuffle_hashes(hash_col, hash_type, hash_values, _hash_method);
}
return Status::OK();
}
Status IcebergDeletePartitionFunction::_get_delete_hash_column(const ColumnWithTypeAndName& column,
ColumnPtr* out_column,
DataTypePtr* out_type) const {
ColumnPtr hash_col = column.column;
DataTypePtr hash_type = column.type;
if (auto* nullable_col = check_and_get_column<ColumnNullable>(hash_col.get())) {
hash_col = nullable_col->get_nested_column_ptr();
hash_type = remove_nullable(hash_type);
}
const auto* struct_col = check_and_get_column<ColumnStruct>(hash_col.get());
const auto* struct_type = check_and_get_data_type<DataTypeStruct>(hash_type.get());
if (!struct_col || !struct_type) {
*out_column = column.column;
*out_type = column.type;
return Status::OK();
}
int file_path_idx = _find_file_path_index(*struct_type);
if (file_path_idx < 0 || file_path_idx >= struct_col->tuple_size()) {
return Status::InternalError("Row id struct missing file_path field");
}
*out_column = struct_col->get_column_ptr(file_path_idx);
*out_type = struct_type->get_element(file_path_idx);
return Status::OK();
}
int IcebergDeletePartitionFunction::_find_file_path_index(const DataTypeStruct& struct_type) const {
auto normalize = [](const std::string& name) { return doris::to_lower(name); };
auto match_any = [](const std::string& name, std::initializer_list<const char*> candidates) {
for (const char* candidate : candidates) {
if (name == candidate) {
return true;
}
}
return false;
};
int file_path_idx = -1;
const auto& field_names = struct_type.get_element_names();
for (size_t i = 0; i < field_names.size(); ++i) {
std::string name = normalize(field_names[i]);
if (file_path_idx < 0 && match_any(name, {"file_path", "data_file_path", "path"})) {
file_path_idx = static_cast<int>(i);
break;
}
}
if (file_path_idx < 0 && !struct_type.get_elements().empty()) {
file_path_idx = 0;
}
return file_path_idx;
}
Status IcebergDeletePartitionFunction::_clone_expr_ctxs(RuntimeState* state,
const VExprContextSPtrs& src,
VExprContextSPtrs& dst) const {
dst.resize(src.size());
for (size_t i = 0; i < src.size(); ++i) {
RETURN_IF_ERROR(src[i]->clone(state, dst[i]));
}
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris