blob: 167c51cd6fa713f60055d811e0f25bb78f75565c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/sink/viceberg_merge_sink.h"
#include <fmt/format.h>
#include "common/consts.h"
#include "common/exception.h"
#include "common/logging.h"
#include "core/block/block.h"
#include "core/column/column_nullable.h"
#include "core/column/column_vector.h"
#include "exec/sink/sink_common.h"
#include "exec/sink/viceberg_delete_sink.h"
#include "exec/sink/writer/iceberg/viceberg_table_writer.h"
#include "format/table/iceberg/schema.h"
#include "format/table/iceberg/schema_parser.h"
#include "runtime/runtime_state.h"
#include "util/string_util.h"
namespace doris {
namespace {} // namespace
VIcebergMergeSink::VIcebergMergeSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
std::shared_ptr<Dependency> dep,
std::shared_ptr<Dependency> fin_dep)
: AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
DCHECK(_t_sink.__isset.iceberg_merge_sink);
}
VIcebergMergeSink::~VIcebergMergeSink() = default;
Status VIcebergMergeSink::init_properties(ObjectPool* pool, const RowDescriptor& row_desc) {
RETURN_IF_ERROR(_build_inner_sinks());
_table_writer = std::make_unique<VIcebergTableWriter>(_table_sink, _table_output_expr_ctxs,
nullptr, nullptr);
_delete_writer = std::make_unique<VIcebergDeleteSink>(_delete_sink, _delete_output_expr_ctxs,
nullptr, nullptr);
RETURN_IF_ERROR(_table_writer->init_properties(pool, row_desc));
RETURN_IF_ERROR(_delete_writer->init_properties(pool));
return Status::OK();
}
Status VIcebergMergeSink::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
_written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
_insert_rows_counter = ADD_COUNTER(profile, "InsertRows", TUnit::UNIT);
_delete_rows_counter = ADD_COUNTER(profile, "DeleteRows", TUnit::UNIT);
_send_data_timer = ADD_TIMER(profile, "SendDataTime");
_open_timer = ADD_TIMER(profile, "OpenTime");
_close_timer = ADD_TIMER(profile, "CloseTime");
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(_prepare_output_layout());
RuntimeProfile* table_profile = profile->create_child("IcebergMergeTableWriter", true, true);
RuntimeProfile* delete_profile = profile->create_child("IcebergMergeDeleteWriter", true, true);
RETURN_IF_ERROR(_table_writer->open(state, table_profile));
RETURN_IF_ERROR(_delete_writer->open(state, delete_profile));
return Status::OK();
}
Status VIcebergMergeSink::write(RuntimeState* state, Block& block) {
SCOPED_TIMER(_send_data_timer);
if (block.rows() == 0) {
return Status::OK();
}
Block output_block;
RETURN_IF_ERROR(_projection_block(block, &output_block));
if (output_block.rows() == 0) {
return Status::OK();
}
_row_count += output_block.rows();
if (_operation_idx < 0 || _row_id_idx < 0) {
return Status::InternalError("Iceberg merge sink missing operation/row_id columns");
}
const auto& op_column = output_block.get_by_position(_operation_idx).column;
const auto* op_data = remove_nullable(op_column).get();
IColumn::Filter delete_filter(output_block.rows(), 0);
IColumn::Filter insert_filter(output_block.rows(), 0);
bool has_delete = false;
bool has_insert = false;
size_t delete_rows = 0;
size_t insert_rows = 0;
for (size_t i = 0; i < output_block.rows(); ++i) {
int8_t op = static_cast<int8_t>(op_data->get_int(i));
bool delete_op = is_delete_op(op);
bool insert_op = is_insert_op(op);
if (!delete_op && !insert_op) {
return Status::InternalError("Unknown Iceberg merge operation {}", op);
}
if (delete_op) {
delete_filter[i] = 1;
has_delete = true;
++_delete_row_count;
++delete_rows;
}
if (insert_op) {
insert_filter[i] = 1;
has_insert = true;
++_insert_row_count;
++insert_rows;
}
}
bool skip_io = false;
#ifdef BE_TEST
skip_io = _skip_io;
#endif
if (has_delete && !skip_io) {
Block delete_block = output_block;
std::vector<int> delete_indices {_row_id_idx};
delete_block.erase_not_in(delete_indices);
Block::filter_block_internal(&delete_block, delete_filter);
RETURN_IF_ERROR(_delete_writer->write(state, delete_block));
}
if (has_insert && !skip_io) {
if (_data_column_indices.empty()) {
return Status::InternalError("Iceberg merge sink has no data columns for insert");
}
Block insert_block = output_block;
insert_block.erase_not_in(_data_column_indices);
Block::filter_block_internal(&insert_block, insert_filter);
RETURN_IF_ERROR(_table_writer->write_prepared_block(insert_block));
}
if (_written_rows_counter != nullptr) {
COUNTER_UPDATE(_written_rows_counter, output_block.rows());
}
if (_insert_rows_counter != nullptr) {
COUNTER_UPDATE(_insert_rows_counter, insert_rows);
}
if (_delete_rows_counter != nullptr) {
COUNTER_UPDATE(_delete_rows_counter, delete_rows);
}
return Status::OK();
}
Status VIcebergMergeSink::close(Status close_status) {
SCOPED_TIMER(_close_timer);
if (!close_status.ok()) {
LOG(WARNING) << fmt::format("VIcebergMergeSink close with error: {}",
close_status.to_string());
if (_table_writer) {
static_cast<void>(_table_writer->close(close_status));
}
if (_delete_writer) {
static_cast<void>(_delete_writer->close(close_status));
}
return close_status;
}
Status table_status = Status::OK();
Status delete_status = Status::OK();
if (_table_writer) {
table_status = _table_writer->close(close_status);
}
if (_delete_writer) {
delete_status = _delete_writer->close(close_status);
}
if (_written_rows_counter != nullptr) {
COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
}
if (_insert_rows_counter != nullptr) {
COUNTER_SET(_insert_rows_counter, static_cast<int64_t>(_insert_row_count));
}
if (_delete_rows_counter != nullptr) {
COUNTER_SET(_delete_rows_counter, static_cast<int64_t>(_delete_row_count));
}
if (!table_status.ok()) {
return table_status;
}
return delete_status;
}
Status VIcebergMergeSink::_build_inner_sinks() {
if (!_t_sink.__isset.iceberg_merge_sink) {
return Status::InternalError("Missing iceberg merge sink config");
}
const auto& merge_sink = _t_sink.iceberg_merge_sink;
TIcebergTableSink table_sink;
if (merge_sink.__isset.db_name) {
table_sink.__set_db_name(merge_sink.db_name);
}
if (merge_sink.__isset.tb_name) {
table_sink.__set_tb_name(merge_sink.tb_name);
}
if (merge_sink.__isset.schema_json) {
table_sink.__set_schema_json(merge_sink.schema_json);
}
if (merge_sink.__isset.partition_specs_json) {
table_sink.__set_partition_specs_json(merge_sink.partition_specs_json);
}
if (merge_sink.__isset.partition_spec_id) {
table_sink.__set_partition_spec_id(merge_sink.partition_spec_id);
}
if (merge_sink.__isset.sort_fields) {
table_sink.__set_sort_fields(merge_sink.sort_fields);
}
if (merge_sink.__isset.file_format) {
table_sink.__set_file_format(merge_sink.file_format);
}
if (merge_sink.__isset.compression_type) {
table_sink.__set_compression_type(merge_sink.compression_type);
}
if (merge_sink.__isset.output_path) {
table_sink.__set_output_path(merge_sink.output_path);
}
if (merge_sink.__isset.original_output_path) {
table_sink.__set_original_output_path(merge_sink.original_output_path);
}
if (merge_sink.__isset.hadoop_config) {
table_sink.__set_hadoop_config(merge_sink.hadoop_config);
}
if (merge_sink.__isset.file_type) {
table_sink.__set_file_type(merge_sink.file_type);
}
if (merge_sink.__isset.broker_addresses) {
table_sink.__set_broker_addresses(merge_sink.broker_addresses);
}
_table_sink.__set_type(TDataSinkType::ICEBERG_TABLE_SINK);
_table_sink.__set_iceberg_table_sink(table_sink);
TIcebergDeleteSink delete_sink;
if (merge_sink.__isset.db_name) {
delete_sink.__set_db_name(merge_sink.db_name);
}
if (merge_sink.__isset.tb_name) {
delete_sink.__set_tb_name(merge_sink.tb_name);
}
if (merge_sink.__isset.delete_type) {
delete_sink.__set_delete_type(merge_sink.delete_type);
}
if (merge_sink.__isset.file_format) {
delete_sink.__set_file_format(merge_sink.file_format);
}
if (merge_sink.__isset.compression_type) {
delete_sink.__set_compress_type(merge_sink.compression_type);
}
if (merge_sink.__isset.output_path) {
delete_sink.__set_output_path(merge_sink.output_path);
}
if (merge_sink.__isset.table_location) {
delete_sink.__set_table_location(merge_sink.table_location);
}
if (merge_sink.__isset.hadoop_config) {
delete_sink.__set_hadoop_config(merge_sink.hadoop_config);
}
if (merge_sink.__isset.file_type) {
delete_sink.__set_file_type(merge_sink.file_type);
}
if (merge_sink.__isset.partition_spec_id_for_delete) {
delete_sink.__set_partition_spec_id(merge_sink.partition_spec_id_for_delete);
}
if (merge_sink.__isset.partition_data_json_for_delete) {
delete_sink.__set_partition_data_json(merge_sink.partition_data_json_for_delete);
}
if (merge_sink.__isset.broker_addresses) {
delete_sink.__set_broker_addresses(merge_sink.broker_addresses);
}
_delete_sink.__set_type(TDataSinkType::ICEBERG_DELETE_SINK);
_delete_sink.__set_iceberg_delete_sink(delete_sink);
return Status::OK();
}
Status VIcebergMergeSink::_prepare_output_layout() {
if (_vec_output_expr_ctxs.empty()) {
return Status::InternalError("Iceberg merge sink has empty output expressions");
}
std::string row_id_name = doris::to_lower(BeConsts::ICEBERG_ROWID_COL);
std::string op_name = doris::to_lower(kOperationColumnName);
_operation_idx = -1;
_row_id_idx = -1;
for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) {
std::string expr_name = doris::to_lower(_vec_output_expr_ctxs[i]->expr_name());
if (_operation_idx < 0 && expr_name == op_name) {
_operation_idx = static_cast<int>(i);
} else if (_row_id_idx < 0 && expr_name == row_id_name) {
_row_id_idx = static_cast<int>(i);
}
}
if (_operation_idx < 0) {
return Status::InternalError("Iceberg merge sink missing operation column");
}
if (_row_id_idx < 0) {
return Status::InternalError("Iceberg merge sink missing row_id column");
}
_data_column_indices.clear();
_table_output_expr_ctxs.clear();
for (size_t i = 0; i < _vec_output_expr_ctxs.size(); ++i) {
if (static_cast<int>(i) == _operation_idx || static_cast<int>(i) == _row_id_idx) {
continue;
}
_data_column_indices.push_back(static_cast<int>(i));
_table_output_expr_ctxs.emplace_back(_vec_output_expr_ctxs[i]);
}
return Status::OK();
}
} // namespace doris