blob: e2b142db8d9f8680c804712b43fcf3d8ab595a42 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/sink/viceberg_delete_sink.h"
#include <fmt/format.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <zlib.h>
#include "common/logging.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/data_type/data_type_struct.h"
#include "exec/common/endian.h"
#include "exprs/vexpr.h"
#include "format/table/iceberg_delete_file_reader_helper.h"
#include "format/transformer/vfile_format_transformer.h"
#include "io/file_factory.h"
#include "runtime/runtime_state.h"
#include "util/slice.h"
#include "util/string_util.h"
#include "util/uid_util.h"
namespace doris {
namespace {
class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor {
public:
RewriteBitmapVisitor(const std::string& referenced_data_file_path,
roaring::Roaring64Map* rows_to_delete)
: _referenced_data_file_path(referenced_data_file_path),
_rows_to_delete(rows_to_delete) {}
Status visit(const std::string& file_path, int64_t pos) override {
if (_rows_to_delete == nullptr) {
return Status::InvalidArgument("rows_to_delete is null");
}
if (file_path == _referenced_data_file_path) {
_rows_to_delete->add(static_cast<uint64_t>(pos));
}
return Status::OK();
}
private:
const std::string& _referenced_data_file_path;
roaring::Roaring64Map* _rows_to_delete;
};
Status load_rewritable_delete_rows(RuntimeState* state, RuntimeProfile* profile,
const std::string& referenced_data_file_path,
const std::vector<TIcebergDeleteFileDesc>& delete_files,
const std::map<std::string, std::string>& hadoop_conf,
TFileType::type file_type,
const std::vector<TNetworkAddress>& broker_addresses,
roaring::Roaring64Map* rows_to_delete) {
if (rows_to_delete == nullptr) {
return Status::InvalidArgument("rows_to_delete is null");
}
if (state == nullptr || profile == nullptr || delete_files.empty()) {
return Status::OK();
}
TFileScanRangeParams params =
build_iceberg_delete_scan_range_params(hadoop_conf, file_type, broker_addresses);
IcebergDeleteFileIOContext delete_file_io_ctx(state);
IcebergDeleteFileReaderOptions options;
options.state = state;
options.profile = profile;
options.scan_params = &params;
options.io_ctx = &delete_file_io_ctx.io_ctx;
options.batch_size = 102400;
for (const auto& delete_file : delete_files) {
if (is_iceberg_deletion_vector(delete_file)) {
RETURN_IF_ERROR(read_iceberg_deletion_vector(delete_file, options, rows_to_delete));
continue;
}
RewriteBitmapVisitor visitor(referenced_data_file_path, rows_to_delete);
RETURN_IF_ERROR(read_iceberg_position_delete_file(delete_file, options, &visitor));
}
return Status::OK();
}
} // namespace
VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink,
const VExprContextSPtrs& output_exprs,
std::shared_ptr<Dependency> dep,
std::shared_ptr<Dependency> fin_dep)
: AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
DCHECK(_t_sink.__isset.iceberg_delete_sink);
}
Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
const auto& delete_sink = _t_sink.iceberg_delete_sink;
_delete_type = delete_sink.delete_type;
if (_delete_type != TFileContent::POSITION_DELETES) {
return Status::NotSupported("Iceberg delete only supports position delete files");
}
// Get file format settings
if (delete_sink.__isset.file_format) {
_file_format_type = delete_sink.file_format;
}
if (delete_sink.__isset.compress_type) {
_compress_type = delete_sink.compress_type;
}
// Get output path and table location
if (delete_sink.__isset.output_path) {
_output_path = delete_sink.output_path;
}
if (delete_sink.__isset.table_location) {
_table_location = delete_sink.table_location;
}
// Get Hadoop configuration
if (delete_sink.__isset.hadoop_config) {
_hadoop_conf.insert(delete_sink.hadoop_config.begin(), delete_sink.hadoop_config.end());
}
if (delete_sink.__isset.file_type) {
_file_type = delete_sink.file_type;
}
if (delete_sink.__isset.broker_addresses) {
_broker_addresses.assign(delete_sink.broker_addresses.begin(),
delete_sink.broker_addresses.end());
}
// Get partition information
if (delete_sink.__isset.partition_spec_id) {
_partition_spec_id = delete_sink.partition_spec_id;
}
if (delete_sink.__isset.partition_data_json) {
_partition_data_json = delete_sink.partition_data_json;
}
if (delete_sink.__isset.format_version) {
_format_version = delete_sink.format_version;
}
// for merge old deletion vector and old position delete to a new deletion vector.
if (_format_version >= 3 && delete_sink.__isset.rewritable_delete_file_sets) {
for (const auto& delete_file_set : delete_sink.rewritable_delete_file_sets) {
if (!delete_file_set.__isset.referenced_data_file_path ||
!delete_file_set.__isset.delete_files ||
delete_file_set.referenced_data_file_path.empty() ||
delete_file_set.delete_files.empty()) {
continue;
}
_rewritable_delete_files.emplace(delete_file_set.referenced_data_file_path,
delete_file_set.delete_files);
}
}
return Status::OK();
}
Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
// Initialize counters
_written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT);
_send_data_timer = ADD_TIMER(profile, "SendDataTime");
_write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime");
_delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", TUnit::UNIT);
_open_timer = ADD_TIMER(profile, "OpenTime");
_close_timer = ADD_TIMER(profile, "CloseTime");
SCOPED_TIMER(_open_timer);
if (_format_version < 3) {
RETURN_IF_ERROR(_init_position_delete_output_exprs());
}
LOG(INFO) << fmt::format(
"VIcebergDeleteSink opened: delete_type={}, output_path={}, format_version={}",
to_string(_delete_type), _output_path, _format_version);
return Status::OK();
}
Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) {
SCOPED_TIMER(_send_data_timer);
if (block.rows() == 0) {
return Status::OK();
}
_row_count += block.rows();
if (_delete_type != TFileContent::POSITION_DELETES) {
return Status::NotSupported("Iceberg delete only supports position delete files");
}
// Extract $row_id column and group by file_path
RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions));
if (_written_rows_counter) {
COUNTER_UPDATE(_written_rows_counter, block.rows());
}
return Status::OK();
}
Status VIcebergDeleteSink::close(Status close_status) {
SCOPED_TIMER(_close_timer);
if (!close_status.ok()) {
LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}",
close_status.to_string());
return close_status;
}
if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) {
SCOPED_TIMER(_write_delete_files_timer);
if (_format_version >= 3) {
RETURN_IF_ERROR(_write_deletion_vector_files(_file_deletions));
} else {
RETURN_IF_ERROR(_write_position_delete_files(_file_deletions));
}
}
// Update counters
if (_delete_file_count_counter) {
COUNTER_UPDATE(_delete_file_count_counter, _delete_file_count);
}
LOG(INFO) << fmt::format("VIcebergDeleteSink closed: rows={}, delete_files={}", _row_count,
_delete_file_count);
if (_state != nullptr) {
for (const auto& commit_data : _commit_data_list) {
_state->add_iceberg_commit_datas(commit_data);
}
}
return Status::OK();
}
int VIcebergDeleteSink::_get_row_id_column_index(const Block& block) {
// Find __DORIS_ICEBERG_ROWID_COL__ column in block
for (size_t i = 0; i < block.columns(); ++i) {
const auto& col_name = block.get_by_position(i).name;
if (col_name == doris::BeConsts::ICEBERG_ROWID_COL) {
return static_cast<int>(i);
}
}
return -1;
}
Status VIcebergDeleteSink::_collect_position_deletes(
const Block& block, std::map<std::string, IcebergFileDeletion>& file_deletions) {
// Find row id column
int row_id_col_idx = _get_row_id_column_index(block);
if (row_id_col_idx < 0) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ column not found in block for position delete");
}
const auto& row_id_col = block.get_by_position(row_id_col_idx);
const IColumn* row_id_data = row_id_col.column.get();
const IDataType* row_id_type = row_id_col.type.get();
const auto* nullable_col = check_and_get_column<ColumnNullable>(row_id_data);
if (nullable_col != nullptr) {
row_id_data = nullable_col->get_nested_column_ptr().get();
}
const auto* nullable_type = check_and_get_data_type<DataTypeNullable>(row_id_type);
if (nullable_type != nullptr) {
row_id_type = nullable_type->get_nested_type().get();
}
const auto* struct_col = check_and_get_column<ColumnStruct>(row_id_data);
const auto* struct_type = check_and_get_data_type<DataTypeStruct>(row_id_type);
if (!struct_col || !struct_type) {
return Status::InternalError("__DORIS_ICEBERG_ROWID_COL__ column is not a struct column");
}
// __DORIS_ICEBERG_ROWID_COL__ struct:
// (file_path: STRING, row_position: BIGINT, partition_spec_id: INT, partition_data: STRING)
size_t field_count = struct_col->tuple_size();
if (field_count < 2) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ struct must have at least 2 fields "
"(file_path, row_position)");
}
auto normalize = [](const std::string& name) { return doris::to_lower(name); };
int file_path_idx = -1;
int row_position_idx = -1;
int spec_id_idx = -1;
int partition_data_idx = -1;
const auto& field_names = struct_type->get_element_names();
for (size_t i = 0; i < field_names.size(); ++i) {
std::string name = normalize(field_names[i]);
if (file_path_idx < 0 && name == "file_path") {
file_path_idx = static_cast<int>(i);
} else if (row_position_idx < 0 && name == "row_position") {
row_position_idx = static_cast<int>(i);
} else if (spec_id_idx < 0 && name == "partition_spec_id") {
spec_id_idx = static_cast<int>(i);
} else if (partition_data_idx < 0 && name == "partition_data") {
partition_data_idx = static_cast<int>(i);
}
}
if (file_path_idx < 0 || row_position_idx < 0) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ must contain standard fields file_path and "
"row_position");
}
if (field_count >= 3 && spec_id_idx < 0) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_spec_id");
}
if (field_count >= 4 && partition_data_idx < 0) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_data");
}
const auto* file_path_col = check_and_get_column<ColumnString>(
remove_nullable(struct_col->get_column_ptr(file_path_idx)).get());
const auto* row_position_col = check_and_get_column<ColumnVector<TYPE_BIGINT>>(
remove_nullable(struct_col->get_column_ptr(row_position_idx)).get());
if (!file_path_col || !row_position_col) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ struct fields have incorrect types");
}
const ColumnVector<TYPE_INT>* spec_id_col = nullptr;
const ColumnString* partition_data_col = nullptr;
if (spec_id_idx >= 0 && spec_id_idx < static_cast<int>(field_count)) {
spec_id_col = check_and_get_column<ColumnVector<TYPE_INT>>(
remove_nullable(struct_col->get_column_ptr(spec_id_idx)).get());
if (!spec_id_col) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ partition_spec_id has incorrect type");
}
}
if (partition_data_idx >= 0 && partition_data_idx < static_cast<int>(field_count)) {
partition_data_col = check_and_get_column<ColumnString>(
remove_nullable(struct_col->get_column_ptr(partition_data_idx)).get());
if (!partition_data_col) {
return Status::InternalError(
"__DORIS_ICEBERG_ROWID_COL__ partition_data has incorrect type");
}
}
// Group by file_path using roaring bitmap
for (size_t i = 0; i < block.rows(); ++i) {
std::string file_path = file_path_col->get_data_at(i).to_string();
int64_t row_position = row_position_col->get_element(i);
if (row_position < 0) {
return Status::InternalError("Invalid row_position {} in row_id column", row_position);
}
int32_t partition_spec_id = _partition_spec_id;
std::string partition_data_json = _partition_data_json;
if (spec_id_col != nullptr) {
partition_spec_id = spec_id_col->get_element(i);
}
if (partition_data_col != nullptr) {
partition_data_json = partition_data_col->get_data_at(i).to_string();
}
auto [iter, inserted] = file_deletions.emplace(
file_path, IcebergFileDeletion(partition_spec_id, partition_data_json));
if (!inserted) {
if (iter->second.partition_spec_id != partition_spec_id ||
iter->second.partition_data_json != partition_data_json) {
LOG(WARNING) << fmt::format(
"Mismatched partition info for file {}, existing spec_id={}, data={}, "
"new spec_id={}, data={}",
file_path, iter->second.partition_spec_id, iter->second.partition_data_json,
partition_spec_id, partition_data_json);
}
}
iter->second.rows_to_delete.add(static_cast<uint64_t>(row_position));
}
return Status::OK();
}
Status VIcebergDeleteSink::_write_position_delete_files(
const std::map<std::string, IcebergFileDeletion>& file_deletions) {
constexpr size_t kBatchSize = 4096;
for (const auto& [data_file_path, deletion] : file_deletions) {
if (deletion.rows_to_delete.isEmpty()) {
continue;
}
// Generate unique delete file path
std::string delete_file_path = _generate_delete_file_path(data_file_path);
// Create delete file writer
auto writer = VIcebergDeleteFileWriterFactory::create_writer(
TFileContent::POSITION_DELETES, delete_file_path, _file_format_type,
_compress_type);
// Build column names for position delete
std::vector<std::string> column_names = {"file_path", "pos"};
if (_position_delete_output_expr_ctxs.empty()) {
RETURN_IF_ERROR(_init_position_delete_output_exprs());
}
// Open writer
RETURN_IF_ERROR(writer->open(_state, _state->runtime_profile(),
_position_delete_output_expr_ctxs, column_names, _hadoop_conf,
_file_type, _broker_addresses));
// Build block with (file_path, pos) columns
std::vector<int64_t> positions;
positions.reserve(kBatchSize);
for (auto it = deletion.rows_to_delete.begin(); it != deletion.rows_to_delete.end(); ++it) {
positions.push_back(static_cast<int64_t>(*it));
if (positions.size() >= kBatchSize) {
Block delete_block;
RETURN_IF_ERROR(
_build_position_delete_block(data_file_path, positions, delete_block));
RETURN_IF_ERROR(writer->write(delete_block));
positions.clear();
}
}
if (!positions.empty()) {
Block delete_block;
RETURN_IF_ERROR(_build_position_delete_block(data_file_path, positions, delete_block));
RETURN_IF_ERROR(writer->write(delete_block));
}
// Set partition info on writer before close
writer->set_partition_info(deletion.partition_spec_id, deletion.partition_data_json);
// Close writer and collect commit data
TIcebergCommitData commit_data;
RETURN_IF_ERROR(writer->close(commit_data));
// Set referenced data file path
commit_data.__set_referenced_data_file_path(data_file_path);
_commit_data_list.push_back(commit_data);
_delete_file_count++;
VLOG(1) << fmt::format("Written position delete file: path={}, rows={}, referenced_file={}",
delete_file_path, commit_data.row_count, data_file_path);
}
return Status::OK();
}
Status VIcebergDeleteSink::_init_position_delete_output_exprs() {
if (!_position_delete_output_expr_ctxs.empty()) {
return Status::OK();
}
std::vector<TExpr> texprs;
texprs.reserve(2);
std::string empty_string;
TExprNode file_path_node =
create_texpr_node_from(&empty_string, PrimitiveType::TYPE_STRING, 0, 0);
file_path_node.__set_num_children(0);
file_path_node.__set_output_scale(0);
file_path_node.__set_is_nullable(false);
TExpr file_path_expr;
file_path_expr.nodes.emplace_back(std::move(file_path_node));
texprs.emplace_back(std::move(file_path_expr));
int64_t zero = 0;
TExprNode pos_node = create_texpr_node_from(&zero, PrimitiveType::TYPE_BIGINT, 0, 0);
pos_node.__set_num_children(0);
pos_node.__set_output_scale(0);
pos_node.__set_is_nullable(false);
TExpr pos_expr;
pos_expr.nodes.emplace_back(std::move(pos_node));
texprs.emplace_back(std::move(pos_expr));
RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, _position_delete_output_expr_ctxs));
return Status::OK();
}
Status VIcebergDeleteSink::_build_position_delete_block(const std::string& file_path,
const std::vector<int64_t>& positions,
Block& output_block) {
// Create file_path column (repeated for each position)
auto file_path_col = ColumnString::create();
for (size_t i = 0; i < positions.size(); ++i) {
file_path_col->insert_data(file_path.data(), file_path.size());
}
// Create pos column
auto pos_col = ColumnVector<TYPE_BIGINT>::create();
pos_col->get_data().assign(positions.begin(), positions.end());
// Build block
output_block.insert(ColumnWithTypeAndName(std::move(file_path_col),
std::make_shared<DataTypeString>(), "file_path"));
output_block.insert(
ColumnWithTypeAndName(std::move(pos_col), std::make_shared<DataTypeInt64>(), "pos"));
return Status::OK();
}
std::string VIcebergDeleteSink::_get_file_extension() const {
std::string compress_name;
switch (_compress_type) {
case TFileCompressType::SNAPPYBLOCK: {
compress_name = ".snappy";
break;
}
case TFileCompressType::ZLIB: {
compress_name = ".zlib";
break;
}
case TFileCompressType::ZSTD: {
compress_name = ".zstd";
break;
}
default: {
compress_name = "";
break;
}
}
std::string file_format_name;
switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
file_format_name = ".parquet";
break;
}
case TFileFormatType::FORMAT_ORC: {
file_format_name = ".orc";
break;
}
default: {
file_format_name = "";
break;
}
}
return fmt::format("{}{}", compress_name, file_format_name);
}
Status VIcebergDeleteSink::_write_deletion_vector_files(
const std::map<std::string, IcebergFileDeletion>& file_deletions) {
std::vector<DeletionVectorBlob> blobs;
for (const auto& [data_file_path, deletion] : file_deletions) {
if (deletion.rows_to_delete.isEmpty()) {
continue;
}
roaring::Roaring64Map merged_rows = deletion.rows_to_delete;
DeletionVectorBlob blob;
blob.delete_count = static_cast<int64_t>(merged_rows.cardinality());
auto previous_delete_it = _rewritable_delete_files.find(data_file_path);
if (previous_delete_it != _rewritable_delete_files.end()) {
roaring::Roaring64Map previous_rows;
RETURN_IF_ERROR(load_rewritable_delete_rows(
_state, _state->runtime_profile(), data_file_path, previous_delete_it->second,
_hadoop_conf, _file_type, _broker_addresses, &previous_rows));
merged_rows |= previous_rows;
}
size_t bitmap_size = merged_rows.getSizeInBytes();
blob.referenced_data_file = data_file_path;
blob.partition_spec_id = deletion.partition_spec_id;
blob.partition_data_json = deletion.partition_data_json;
blob.merged_count = static_cast<int64_t>(merged_rows.cardinality());
blob.content_size_in_bytes = static_cast<int64_t>(4 + 4 + bitmap_size + 4);
blob.blob_data.resize(static_cast<size_t>(blob.content_size_in_bytes));
merged_rows.write(blob.blob_data.data() + 8);
uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size);
BigEndian::Store32(blob.blob_data.data(), total_length);
constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
memcpy(blob.blob_data.data() + 4, DV_MAGIC, 4);
uint32_t crc = static_cast<uint32_t>(
::crc32(0, reinterpret_cast<const Bytef*>(blob.blob_data.data() + 4),
4 + (uInt)bitmap_size));
BigEndian::Store32(blob.blob_data.data() + 8 + bitmap_size, crc);
blobs.emplace_back(std::move(blob));
}
if (blobs.empty()) {
return Status::OK();
}
std::string puffin_path = _generate_puffin_file_path();
int64_t puffin_file_size = 0;
RETURN_IF_ERROR(_write_puffin_file(puffin_path, &blobs, &puffin_file_size));
for (const auto& blob : blobs) {
TIcebergCommitData commit_data;
commit_data.__set_file_path(puffin_path);
commit_data.__set_row_count(blob.merged_count);
commit_data.__set_affected_rows(blob.delete_count);
commit_data.__set_file_size(puffin_file_size);
commit_data.__set_file_content(TFileContent::DELETION_VECTOR);
commit_data.__set_content_offset(blob.content_offset);
commit_data.__set_content_size_in_bytes(blob.content_size_in_bytes);
commit_data.__set_referenced_data_file_path(blob.referenced_data_file);
if (blob.partition_spec_id != 0 || !blob.partition_data_json.empty()) {
commit_data.__set_partition_spec_id(blob.partition_spec_id);
commit_data.__set_partition_data_json(blob.partition_data_json);
}
_commit_data_list.push_back(commit_data);
_delete_file_count++;
}
return Status::OK();
}
Status VIcebergDeleteSink::_write_puffin_file(const std::string& puffin_path,
std::vector<DeletionVectorBlob>* blobs,
int64_t* out_file_size) {
DCHECK(blobs != nullptr);
DCHECK(!blobs->empty());
io::FSPropertiesRef fs_properties(_file_type);
fs_properties.properties = &_hadoop_conf;
if (!_broker_addresses.empty()) {
fs_properties.broker_addresses = &_broker_addresses;
}
io::FileDescription file_description = {.path = puffin_path, .fs_name {}};
auto fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false};
io::FileWriterPtr file_writer;
RETURN_IF_ERROR(fs->create_file(file_description.path, &file_writer, &file_writer_options));
constexpr char PUFFIN_MAGIC[] = {'\x50', '\x46', '\x41', '\x31'};
RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
int64_t current_offset = 4;
for (auto& blob : *blobs) {
blob.content_offset = current_offset;
RETURN_IF_ERROR(file_writer->append(Slice(
reinterpret_cast<const uint8_t*>(blob.blob_data.data()), blob.blob_data.size())));
current_offset += static_cast<int64_t>(blob.blob_data.size());
}
RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
std::string footer_json = _build_puffin_footer_json(*blobs);
RETURN_IF_ERROR(file_writer->append(
Slice(reinterpret_cast<const uint8_t*>(footer_json.data()), footer_json.size())));
char footer_size_buf[4];
LittleEndian::Store32(footer_size_buf, static_cast<uint32_t>(footer_json.size()));
RETURN_IF_ERROR(file_writer->append(
Slice(reinterpret_cast<const uint8_t*>(footer_size_buf), sizeof(footer_size_buf))));
char flags[4] = {0, 0, 0, 0};
RETURN_IF_ERROR(
file_writer->append(Slice(reinterpret_cast<const uint8_t*>(flags), sizeof(flags))));
RETURN_IF_ERROR(file_writer->append(Slice(reinterpret_cast<const uint8_t*>(PUFFIN_MAGIC), 4)));
RETURN_IF_ERROR(file_writer->close());
*out_file_size = current_offset + 4 + static_cast<int64_t>(footer_json.size()) + 4 + 4 + 4;
return Status::OK();
}
std::string VIcebergDeleteSink::_build_puffin_footer_json(
const std::vector<DeletionVectorBlob>& blobs) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
writer.StartObject();
writer.Key("blobs");
writer.StartArray();
for (const auto& blob : blobs) {
writer.StartObject();
writer.Key("type");
writer.String("deletion-vector-v1");
writer.Key("fields");
writer.StartArray();
writer.EndArray();
writer.Key("snapshot-id");
writer.Int64(-1);
writer.Key("sequence-number");
writer.Int64(-1);
writer.Key("offset");
writer.Int64(blob.content_offset);
writer.Key("length");
writer.Int64(blob.content_size_in_bytes);
writer.Key("properties");
writer.StartObject();
writer.Key("referenced-data-file");
writer.String(blob.referenced_data_file.c_str(),
static_cast<rapidjson::SizeType>(blob.referenced_data_file.size()));
std::string cardinality = std::to_string(blob.merged_count);
writer.Key("cardinality");
writer.String(cardinality.c_str(), static_cast<rapidjson::SizeType>(cardinality.size()));
writer.EndObject();
writer.EndObject();
}
writer.EndArray();
writer.Key("properties");
writer.StartObject();
writer.Key("created-by");
writer.String("doris-puffin-v1");
writer.EndObject();
writer.EndObject();
return {buffer.GetString(), buffer.GetSize()};
}
std::string VIcebergDeleteSink::_generate_delete_file_path(
const std::string& referenced_data_file) {
// Generate unique delete file name using UUID
std::string uuid = generate_uuid_string();
std::string file_name;
std::string file_extension = _get_file_extension();
file_name =
fmt::format("delete_pos_{}_{}{}", uuid,
std::hash<std::string> {}(referenced_data_file) % 10000000, file_extension);
// Combine with output path or table location
std::string base_path = _output_path.empty() ? _table_location : _output_path;
// Ensure base path ends with /
if (!base_path.empty() && base_path.back() != '/') {
base_path += '/';
}
// Delete files are data files in Iceberg, write under data location
return fmt::format("{}{}", base_path, file_name);
}
std::string VIcebergDeleteSink::_generate_puffin_file_path() {
std::string uuid = generate_uuid_string();
std::string file_name = fmt::format("delete_dv_{}.puffin", uuid);
std::string base_path = _output_path.empty() ? _table_location : _output_path;
if (!base_path.empty() && base_path.back() != '/') {
base_path += '/';
}
return fmt::format("{}{}", base_path, file_name);
}
} // namespace doris