| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/puffin/puffin-writer.h" |
| |
| #include <limits> |
| #include <string_view> |
| |
| #include <arpa/inet.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <rapidjson/writer.h> |
| |
| #include "common/logging.h" |
| #include "common/object-pool.h" |
| #include "common/status.h" |
| #include "exec/blob-reader.h" |
| #include "exec/hdfs-table-writer.h" |
| #include "exec/output-partition.h" |
| #include "exec/puffin/blob.h" |
| #include "exec/table-sink-base.h" |
| #include "exprs/scalar-expr-evaluator.h" |
| #include "gutil/strings/substitute.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/io/request-context.h" |
| #include "runtime/mem-pool.h" |
| #include "runtime/row-batch.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/tuple-row.h" |
| #include "util/coding-util.h" |
| #include "util/hash-util.h" |
| #include "util/hdfs-util.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/time.h" |
| |
| // Initial capacity for the per-file position buffer in AppendRows. Caps the upfront |
| // allocation for the common case where many files each have few deleted rows. The vector |
| // grows geometrically if a single file has many deletions. Mirrors the convention in |
| // iceberg-delete-builder.cc. |
| static constexpr int POSITIONS_BUFFER_INITIAL_CAPACITY = 128; |
| |
| namespace impala { |
| |
| PuffinWriter::PuffinWriter(TableSinkBase* parent, RuntimeState* state, |
| OutputPartition* output, const HdfsPartitionDescriptor* partition, |
| const HdfsTableDescriptor* table_desc) |
| : HdfsTableWriter(parent, state, output, partition, table_desc), |
| current_offset_(0), |
| io_request_context_(nullptr), |
| obj_pool_(nullptr), |
| blob_mem_pool_(new MemPool(parent_->mem_tracker())) {} |
| |
| PuffinWriter::~PuffinWriter() {} |
| |
| Status PuffinWriter::Init() { |
| |
| // Initialize IO infrastructure for loading existing DVs |
| // Actual DV loading is deferred to Finalize() after all rows have arrived |
| io_request_context_ = ExecEnv::GetInstance()->disk_io_mgr()->RegisterContext(); |
| obj_pool_ = state_->obj_pool(); |
| DCHECK(obj_pool_ != nullptr); |
| |
| // Wire data cache counters into the request context so HdfsFileReader can update |
| // per-context cache statistics when reading existing deletion vectors. |
| RuntimeProfile* profile = state_->runtime_profile(); |
| data_cache_hit_count_ = |
| ADD_COUNTER(profile, "DataCacheHitCount", TUnit::UNIT); |
| data_cache_partial_hit_count_ = |
| ADD_COUNTER(profile, "DataCachePartialHitCount", TUnit::UNIT); |
| data_cache_miss_count_ = |
| ADD_COUNTER(profile, "DataCacheMissCount", TUnit::UNIT); |
| data_cache_hit_bytes_ = |
| ADD_COUNTER(profile, "DataCacheHitBytes", TUnit::BYTES); |
| data_cache_miss_bytes_ = |
| ADD_COUNTER(profile, "DataCacheMissBytes", TUnit::BYTES); |
| io_request_context_->set_data_cache_hit_counter(data_cache_hit_count_); |
| io_request_context_->set_data_cache_partial_hit_counter(data_cache_partial_hit_count_); |
| io_request_context_->set_data_cache_miss_counter(data_cache_miss_count_); |
| io_request_context_->set_data_cache_hit_bytes_counter(data_cache_hit_bytes_); |
| io_request_context_->set_data_cache_miss_bytes_counter(data_cache_miss_bytes_); |
| |
| // Initialize deletion vector blob reader |
| dv_blob_reader_.reset(new DeletionVectorBlobReader()); |
| |
| // Initialize timer for tracking DV loading performance |
| load_dv_timer_ = ADD_TIMER(state_->runtime_profile(), "PuffinOldDVLoadTimer"); |
| |
| return Status::OK(); |
| } |
| |
| void PuffinWriter::Close() { |
| // Release blob reader |
| dv_blob_reader_.reset(); |
| |
| // Ensure blob memory is released |
| if (blob_mem_pool_ != nullptr) { |
| blob_mem_pool_->FreeAll(); |
| } |
| if (io_request_context_ != nullptr) { |
| ExecEnv::GetInstance()->disk_io_mgr()->UnregisterContext(io_request_context_.get()); |
| } |
| } |
| |
| Status PuffinWriter::InitNewFile() { |
| current_offset_ = MAGIC_SIZE; |
| uint32_t magic_be = htonl(PUFFIN_MAGIC); |
| RETURN_IF_ERROR(Write(magic_be)); |
| |
| return Status::OK(); |
| } |
| |
| Status PuffinWriter::AppendRows( |
| RowBatch* batch, const std::vector<int32_t>& row_group_indices, bool* new_file) { |
| SCOPED_TIMER(parent_->encode_timer()); |
| *new_file = false; |
| |
| if (batch->num_rows() == 0) return Status::OK(); |
| |
| // We expect the row batch to have two columns: filepath (string) and position (bigint) |
| // Similar to how IcebergBufferedDeleteSink processes rows |
| DCHECK_GE(output_expr_evals_.size(), 2); |
| ScalarExprEvaluator* filepath_eval = output_expr_evals_[0]; |
| ScalarExprEvaluator* position_eval = output_expr_evals_[1]; |
| |
| // File paths are ordered, so rows for the same data file always arrive as a |
| // contiguous run. We accumulate positions for the current file into a buffer and |
| // flush via AddElements when the file changes or the batch ends. |
| std::vector<uint64_t> pending_positions; |
| pending_positions.reserve(POSITIONS_BUFFER_INITIAL_CAPACITY); |
| |
| auto flush_pending = [&]() { |
| if (pending_positions.empty()) return; |
| last_bitmap_it_->second.AddElements(pending_positions); |
| output_->current_file_rows += pending_positions.size(); |
| pending_positions.clear(); |
| }; |
| |
| for (int i = 0; i < batch->num_rows(); ++i) { |
| TupleRow* row = batch->GetRow(i); |
| |
| // Extract filepath |
| impala_udf::StringVal filepath_sv = filepath_eval->GetStringVal(row); |
| DCHECK(!filepath_sv.is_null); |
| std::string_view filepath_sv_view( |
| reinterpret_cast<char*>(filepath_sv.ptr), filepath_sv.len); |
| |
| // Extract position (row number) |
| impala_udf::BigIntVal position_bi = position_eval->GetBigIntVal(row); |
| DCHECK(!position_bi.is_null); |
| int64_t position = position_bi.val; |
| |
| // Use the cached iterator when the filepath matches the previous row, otherwise |
| // flush the accumulated positions for the outgoing file, then look up (or insert) |
| // the entry in the map and refresh the cache. |
| if (last_bitmap_it_ == deletion_vectors_.end() || |
| last_filepath_ != filepath_sv_view) { |
| flush_pending(); |
| last_filepath_.assign(filepath_sv_view.data(), filepath_sv_view.size()); |
| last_bitmap_it_ = deletion_vectors_.emplace( |
| std::piecewise_construct, |
| std::forward_as_tuple(last_filepath_), |
| std::forward_as_tuple()).first; |
| } |
| pending_positions.push_back(static_cast<uint64_t>(position)); |
| } |
| flush_pending(); // flush the final (or only) file's positions |
| |
| return Status::OK(); |
| } |
| |
| Status PuffinWriter::Finalize() { |
| // Load old deletion vectors ONLY for data files that have new deletes |
| // This optimizes for the common case where only a subset of files are modified: |
| // - Iterate over deletion_vectors_ (files with new deletes) |
| // - Lookup each in referenced_deletion_vectors (files with old DVs) |
| // - Load only when both conditions are true |
| for (const auto& entry : deletion_vectors_) { |
| const std::string& data_file_path = entry.first; |
| |
| // Check if this data file has an old DV that needs to be loaded and merged |
| auto old_dv_it = output_->referenced_deletion_vectors->find( |
| THash128FromFilePath(data_file_path)); |
| if (old_dv_it != output_->referenced_deletion_vectors->end()) { |
| const TIcebergDeletionVector& old_dv_metadata = old_dv_it->second; |
| |
| VLOG(2) << "Loading old DV for data file with new deletes: " << data_file_path; |
| |
| // Load the old DV and merge it with new deletes |
| SCOPED_TIMER(load_dv_timer_); |
| RETURN_IF_ERROR(LoadExistingDeletionVector( |
| old_dv_metadata.path, data_file_path, |
| old_dv_metadata.content_offset, |
| old_dv_metadata.content_size_in_bytes)); |
| |
| // Track that this old DV is being replaced |
| output_->puffin_result.old_deletion_vectors[data_file_path] = old_dv_metadata; |
| } |
| } |
| |
| SCOPED_TIMER(parent_->hdfs_write_timer()); |
| |
| // Convert accumulated deletion vectors to blobs |
| for (auto& entry : deletion_vectors_) { |
| const std::string& data_file_path = entry.first; |
| RoaringBitmap64& bitmap = entry.second; |
| |
| // Add deletion vector blob for this data file |
| // Using default snapshot_id=0 and sequence_number=0 |
| // These should be set appropriately by the caller if needed |
| RETURN_IF_ERROR(AddDeletionVector(data_file_path, bitmap, 0, 0)); |
| } |
| |
| // Write all buffered blobs to the file |
| for (const auto& blob : file_.GetBlobs()) { |
| if (blob.data.data != nullptr && blob.metadata.length > 0) { |
| RETURN_IF_ERROR(Write(blob.data.data, blob.metadata.length)); |
| } |
| } |
| |
| // Write the footer containing blob metadata |
| RETURN_IF_ERROR(WriteFooter()); |
| |
| output_->current_file_bytes = current_offset_; |
| |
| blob_mem_pool_->Clear(); |
| |
| return Status::OK(); |
| } |
| |
| Status PuffinWriter::AddBlob(puffin::Blob& blob) { |
| // Set the offset for this blob |
| blob.metadata.offset = current_offset_; |
| |
| // Add to the file structure |
| file_.AddBlob(blob); |
| |
| // Update the current offset for the next blob |
| current_offset_ += blob.data.length; |
| |
| return Status::OK(); |
| } |
| |
| Status PuffinWriter::AddDeletionVector(const std::string& data_file_path, |
| RoaringBitmap64& bitmap, int64_t snapshot_id, int64_t sequence_number) { |
| // Get the serialized size of the bitmap |
| bitmap.Optimize(); // Optimize the bitmap before serialization for better compression |
| size_t bitmap_serialized_size = bitmap.BitmapSizeInBytes(); |
| if (bitmap_serialized_size == 0) { |
| return Status("Cannot add deletion vector: bitmap is empty"); |
| } |
| |
| // Calculate the total size for the blob following the serialization pattern: |
| // 4 bytes: combined length (vector size + magic bytes size) |
| // 4 bytes: magic sequence (0xD1D33964) |
| // N bytes: serialized vector (bitmap) |
| // 4 bytes: CRC-32 checksum |
| const size_t magic_size = 4; |
| const size_t length_field_size = 4; |
| const size_t crc_size = 4; |
| const size_t combined_length = bitmap_serialized_size + magic_size; |
| const size_t total_blob_size = |
| length_field_size + magic_size + bitmap_serialized_size + crc_size; |
| |
| DCHECK_GT(total_blob_size, 0); |
| // Allocate buffer for the complete serialized blob |
| uint8_t* data = blob_mem_pool_->TryAllocate(total_blob_size); |
| if (UNLIKELY(data == nullptr)) { |
| return parent_->mem_tracker()->MemLimitExceeded(state_, |
| "Failed to allocate memory for deletion vector blob serialization.", |
| total_blob_size); |
| } |
| uint8_t* write_ptr = data; |
| |
| uint32_t combined_length_be = htonl(static_cast<uint32_t>(combined_length)); |
| memcpy(write_ptr, &combined_length_be, length_field_size); |
| write_ptr += length_field_size; |
| |
| uint32_t magic_be = htonl(BLOB_MAGIC); |
| memcpy(write_ptr, &magic_be, magic_size); |
| write_ptr += magic_size; |
| |
| size_t written = bitmap.Serialize(write_ptr); |
| DCHECK_EQ(written, bitmap_serialized_size); |
| write_ptr += bitmap_serialized_size; |
| |
| const uint8_t* crc_start = data + length_field_size; |
| const size_t crc_data_length = magic_size + bitmap_serialized_size; |
| uint32_t crc = CRC32::ComputeChecksum(crc_start, crc_data_length); |
| // Write CRC as 4-byte big-endian integer |
| uint32_t crc_be = htonl(crc); |
| memcpy(write_ptr, &crc_be, crc_size); |
| |
| // Create blob data wrapper |
| puffin::BlobData blob_data = {data, total_blob_size}; |
| |
| // Create blob metadata |
| puffin::BlobMetadata metadata(puffin::BlobType::DELETION_VECTOR_V1, total_blob_size); |
| metadata.snapshot_id = snapshot_id; |
| metadata.sequence_number = sequence_number; |
| |
| // Store the data file path in properties |
| metadata.properties["referenced-data-file"] = data_file_path; |
| |
| // Create the deletion vector object to track the newly written blob |
| TIcebergDeletionVector new_dv; |
| // Note: path will be set later in createIcebergDataFileString() when final |
| // path is known |
| new_dv.__set_path(""); |
| new_dv.__set_content_offset(current_offset_); |
| new_dv.__set_content_size_in_bytes(total_blob_size); |
| new_dv.__set_record_count(bitmap.Cardinality()); |
| |
| // Create blob and add it |
| puffin::Blob blob(metadata, blob_data); |
| Status status = AddBlob(blob); |
| |
| // Add the new deletion vector to the result, keyed by the data file path |
| output_->puffin_result.new_deletion_vectors[data_file_path] = new_dv; |
| return status; |
| } |
| |
| Status PuffinWriter::WriteFooter() { |
| std::string footer_json = SerializeFooterToJson(); |
| uint32_t magic_be = htonl(PUFFIN_MAGIC); |
| RETURN_IF_ERROR(Write(magic_be)); |
| RETURN_IF_ERROR(Write(footer_json.data(), footer_json.size())); |
| |
| uint32_t footer_size = static_cast<uint32_t>(footer_json.size()); |
| RETURN_IF_ERROR(Write(footer_size)); |
| RETURN_IF_ERROR(Write(0)); //flags (reserved for future use) |
| RETURN_IF_ERROR(Write(magic_be)); |
| current_offset_ += 4 /*leading magic*/ + footer_size |
| + 4 /*footer size field*/ + 4 /*flags*/ + 4 /*trailing magic*/; |
| |
| return Status::OK(); |
| } |
| |
| std::string PuffinWriter::SerializeFooterToJson() { |
| using namespace rapidjson; |
| |
| // Create JSON document |
| Document doc; |
| doc.SetObject(); |
| Document::AllocatorType& allocator = doc.GetAllocator(); |
| |
| // Create blobs array |
| Value blobs_array(kArrayType); |
| for (const auto& blob : file_.GetBlobs()) { |
| Value blob_obj(kObjectType); |
| |
| // Add type |
| Value type_val; |
| type_val.SetString(puffin::BlobTypeToString(blob.metadata.type).c_str(), allocator); |
| blob_obj.AddMember("type", type_val, allocator); |
| |
| // Add fields array |
| Value fields_array(kArrayType); |
| for (int64_t field_id : blob.metadata.fields) { |
| fields_array.PushBack(field_id, allocator); |
| } |
| |
| blob_obj.AddMember("fields", fields_array, allocator); |
| blob_obj.AddMember("snapshot-id", blob.metadata.snapshot_id, allocator); |
| blob_obj.AddMember("sequence-number", blob.metadata.sequence_number, allocator); |
| blob_obj.AddMember("offset", blob.metadata.offset, allocator); |
| blob_obj.AddMember("length", static_cast<int64_t>(blob.metadata.length), allocator); |
| |
| Value codec_val; |
| codec_val.SetString( |
| puffin::CompressionCodecToString(blob.metadata.compression_codec).c_str(), |
| allocator); |
| blob_obj.AddMember("compression-codec", codec_val, allocator); |
| |
| Value props_obj(kObjectType); |
| for (const auto& prop : blob.metadata.properties) { |
| Value key(prop.first.c_str(), allocator); |
| Value val(prop.second.c_str(), allocator); |
| props_obj.AddMember(key, val, allocator); |
| } |
| blob_obj.AddMember("properties", props_obj, allocator); |
| |
| blobs_array.PushBack(blob_obj, allocator); |
| } |
| doc.AddMember("blobs", blobs_array, allocator); |
| |
| // Add file-level properties |
| Value file_props_obj(kObjectType); |
| for (const auto& prop : file_.GetFileMetadata()) { |
| Value key(prop.first.c_str(), allocator); |
| Value val(prop.second.c_str(), allocator); |
| file_props_obj.AddMember(key, val, allocator); |
| } |
| doc.AddMember("properties", file_props_obj, allocator); |
| |
| // Convert to string |
| rapidjson::StringBuffer buffer; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
| doc.Accept(writer); |
| |
| return std::string(buffer.GetString(), buffer.GetSize()); |
| } |
| |
| Status PuffinWriter::ValidateBlobLocation(const std::string& file_path, |
| int64_t content_offset, int64_t content_size) { |
| |
| // Validate offset and size are non-negative |
| if (content_offset < 0) { |
| return Status(strings::Substitute( |
| "Invalid content offset $0 for Puffin file '$1': must be non-negative", |
| content_offset, file_path)); |
| } |
| if (content_size <= 0) { |
| return Status(strings::Substitute( |
| "Invalid content size $0 for Puffin file '$1': must be positive", |
| content_size, file_path)); |
| } |
| |
| // Get file size to validate bounds |
| int64_t file_size; |
| RETURN_IF_ERROR(GetFileSize(output_->hdfs_connection, file_path.c_str(), &file_size)); |
| |
| // Validate that blob is within file bounds |
| if (content_offset + content_size > file_size) { |
| return Status(strings::Substitute( |
| "Blob location out of bounds in Puffin file '$0': " |
| "offset=$1 + size=$2 = $3 exceeds file size $4", |
| file_path, content_offset, content_size, |
| content_offset + content_size, file_size)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status PuffinWriter::LoadExistingDeletionVector( |
| const std::string& puffin_file_path, |
| const std::string& data_file_path, |
| int64_t content_offset, |
| int64_t content_size) { |
| |
| DCHECK(output_->hdfs_connection != nullptr) |
| << "HDFS connection must be initialized before loading DVs. " |
| << "Ensure Init() was called successfully."; |
| DCHECK(dv_blob_reader_ != nullptr) |
| << "DV blob reader not initialized"; |
| |
| VLOG(2) << "Loading existing deletion vector from Puffin file '" |
| << puffin_file_path << "' at offset " << content_offset |
| << " with size " << content_size << " for data file '" |
| << data_file_path << "'"; |
| |
| RETURN_IF_ERROR(ValidateBlobLocation( |
| puffin_file_path, content_offset, content_size)); |
| |
| RoaringBitmap64 loaded_dv; |
| Status load_status = dv_blob_reader_->Load( |
| io_request_context_.get(), parent_->mem_tracker(), obj_pool_, |
| puffin_file_path, content_offset, content_size, &loaded_dv); |
| |
| if (!load_status.ok()) { |
| return Status(strings::Substitute( |
| "Failed to load deletion vector from '$0': $1", |
| puffin_file_path, load_status.msg().msg())); |
| } |
| |
| VLOG(2) << "Successfully loaded DV with " << loaded_dv.Cardinality() |
| << " deleted positions"; |
| |
| auto& target_dv = deletion_vectors_[data_file_path]; |
| |
| target_dv.Or(loaded_dv); |
| |
| return Status::OK(); |
| } |
| |
| } // namespace impala |