blob: fc7790d0e1a465d51d29ac4f2f707ae2d1be6ae3 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "common/status.h"
#include "exec/blob-reader.h"
#include "exec/hdfs-table-writer.h"
#include "exec/puffin/blob.h"
#include "util/roaring-bitmap.h"
#include "util/thash128-util.h"
namespace impala {
class MemPool;
class ObjectPool;
namespace io {
class RequestContext;
}
/// Writer for Puffin files containing deletion vectors and statistics.
/// Implements the Puffin file format specification.
/// Format: [Magic(4)][Blob1][Blob2]...[BlobN][Footer JSON][Footer Length(4)][Magic(4)]
class PuffinWriter : public HdfsTableWriter {
public:
PuffinWriter(TableSinkBase* parent, RuntimeState* state, OutputPartition* output,
const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc);
~PuffinWriter() override;
/// Initialize HDFS connection and allocate buffers
Status Init() override WARN_UNUSED_RESULT;
/// Create new Puffin file and write magic bytes
Status InitNewFile() override WARN_UNUSED_RESULT;
/// Map rows to blobs
Status AppendRows(RowBatch* batch, const std::vector<int32_t>& row_group_indices,
bool* new_file) override WARN_UNUSED_RESULT;
/// Write footer and close file
Status Finalize() override WARN_UNUSED_RESULT;
void Close() override;
uint64_t default_block_size() const override { return 0; }
/// Return file extension
std::string file_extension() const override { return "puffin"; }
/// Add a blob to the in-memory buffer
Status AddBlob(puffin::Blob& blob) WARN_UNUSED_RESULT;
/// Add a deletion vector for a specific data file.
/// Creates a DeleteVector blob from the bitmap and adds it to the file.
/// 'data_file_path' identifies which data file this DV applies to.
/// 'bitmap' contains the deleted row positions.
/// 'snapshot_id' and 'sequence_number' are Iceberg metadata.
Status AddDeletionVector(const std::string& data_file_path,
RoaringBitmap64& bitmap, int64_t snapshot_id = 0,
int64_t sequence_number = 0) WARN_UNUSED_RESULT;
/// Load an existing deletion vector from a Puffin file and merge it with
/// in-memory deletion vectors for the same data file.
///
/// The loaded DV is merged (union operation) with any existing DV for the same
/// data_file_path in the deletion_vectors_ map. If no DV exists yet, it creates one.
///
/// @param puffin_file_path Full HDFS path to the Puffin file containing the DV
/// @param data_file_path Path to the data file this DV references
/// @param content_offset Byte offset of the DV blob within the Puffin file
/// @param content_size Size of the DV blob in bytes
/// @return Status::OK() on success, error status on failure
Status LoadExistingDeletionVector(
const std::string& puffin_file_path,
const std::string& data_file_path,
int64_t content_offset,
int64_t content_size) WARN_UNUSED_RESULT;
private:
/// Serialize footer to JSON and write to file
Status WriteFooter() WARN_UNUSED_RESULT;
/// Serialize footer metadata to JSON string
std::string SerializeFooterToJson();
/// Validate that the blob location is within file bounds.
/// @param file_path Path to the Puffin file
/// @param content_offset Offset of the blob
/// @param content_size Size of the blob
/// @return Status::OK() if valid, error status otherwise
Status ValidateBlobLocation(const std::string& file_path,
int64_t content_offset, int64_t content_size) WARN_UNUSED_RESULT;
/// Puffin file structure containing all blobs
puffin::File file_;
/// Track the current offset for blob writing
int64_t current_offset_;
/// Map from data file path to RoaringBitmap of deleted row positions
/// This is populated by AppendRows() and converted to deletion vector
/// blobs in Finalize()
std::map<std::string, RoaringBitmap64> deletion_vectors_;
/// Cache of the last filepath seen in AppendRows() and its iterator into
/// deletion_vectors_. Rows for the same data file tend to arrive in sequence, so
/// this avoids a full map lookup on every row in the common case.
std::string last_filepath_;
std::map<std::string, RoaringBitmap64>::iterator last_bitmap_it_ =
deletion_vectors_.end();
/// IO request context for loading existing deletion vectors
std::unique_ptr<io::RequestContext> io_request_context_;
/// Object pool for scan range allocations
ObjectPool* obj_pool_;
/// Deletion vector blob reader for deserializing DV blobs
std::unique_ptr<DeletionVectorBlobReader> dv_blob_reader_;
/// Memory pool for blob serialization buffers (tracked by parent's mem_tracker)
/// Lifetime: Init() → Close(). Cleared after each file write.
std::unique_ptr<MemPool> blob_mem_pool_;
/// Timer for loading existing deletion vectors
RuntimeProfile::Counter* load_dv_timer_;
/// Data cache counters, wired into io_request_context_ so HdfsFileReader can
/// update per-context cache statistics when loading existing deletion vectors.
RuntimeProfile::Counter* data_cache_hit_count_ = nullptr;
RuntimeProfile::Counter* data_cache_partial_hit_count_ = nullptr;
RuntimeProfile::Counter* data_cache_miss_count_ = nullptr;
RuntimeProfile::Counter* data_cache_hit_bytes_ = nullptr;
RuntimeProfile::Counter* data_cache_miss_bytes_ = nullptr;
/// Magic bytes "PFA1" as per Puffin spec
static constexpr uint32_t PUFFIN_MAGIC = 0x50464131;
/// Magic bytes for blob serialization
static constexpr uint32_t BLOB_MAGIC = 0xD1D33964;
/// Size of magic bytes
static const int MAGIC_SIZE = 4;
/// Size of footer length field (4 bytes)
static const int FOOTER_LENGTH_SIZE = 4;
};
} // namespace impala