blob: 35bc926bacb15d7328fb48e7fa866057f695bd4f [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/data/delete_loader.h"
#include <string>
#include <vector>
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/deletes/position_delete_index.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/row/arrow_array_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/struct_like_set.h"
namespace iceberg {
namespace {
/// \brief Build the projection schema for reading position delete files.
std::shared_ptr<Schema> PosDeleteSchema() {
return std::make_shared<Schema>(std::vector<SchemaField>{
MetadataColumns::kDeleteFilePath,
MetadataColumns::kDeleteFilePos,
});
}
/// \brief Open a delete file with the given projection schema.
Result<std::unique_ptr<Reader>> OpenDeleteFile(const DataFile& file,
std::shared_ptr<Schema> projection,
const std::shared_ptr<FileIO>& io) {
ReaderOptions options{
.path = file.file_path,
.length = static_cast<size_t>(file.file_size_in_bytes),
.io = io,
.projection = std::move(projection),
};
return ReaderFactoryRegistry::Open(file.file_format, options);
}
} // namespace
DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
DeleteLoader::~DeleteLoader() = default;
Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteIndex& index,
std::string_view data_file_path) const {
// TODO(gangwu): push down path filter to open the file.
ICEBERG_ASSIGN_OR_RAISE(auto reader, OpenDeleteFile(file, PosDeleteSchema(), io_));
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
while (true) {
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
if (!batch_opt.has_value()) break;
auto& batch = batch_opt.value();
internal::ArrowArrayGuard batch_guard(&batch);
ICEBERG_ASSIGN_OR_RAISE(
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
for (int64_t i = 0; i < batch.length; ++i) {
if (i > 0) {
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
}
// Field 0: file_path
ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
auto path = std::get<std::string_view>(path_scalar);
if (path == data_file_path) {
// Field 1: pos
ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
index.Delete(std::get<int64_t>(pos_scalar));
}
}
}
return reader->Close();
}
Status DeleteLoader::LoadDV(const DataFile& file, PositionDeleteIndex& index) const {
return NotSupported("Loading deletion vectors is not yet supported");
}
Result<PositionDeleteIndex> DeleteLoader::LoadPositionDeletes(
std::span<const std::shared_ptr<DataFile>> delete_files,
std::string_view data_file_path) const {
PositionDeleteIndex index;
for (const auto& file : delete_files) {
if (file->referenced_data_file.has_value() &&
file->referenced_data_file.value() != data_file_path) {
continue;
}
if (file->IsDeletionVector()) {
ICEBERG_RETURN_UNEXPECTED(LoadDV(*file, index));
continue;
}
ICEBERG_PRECHECK(file->content == DataFile::Content::kPositionDeletes,
"Expected position delete file but got content type {}",
ToString(file->content));
ICEBERG_RETURN_UNEXPECTED(LoadPositionDelete(*file, index, data_file_path));
}
return index;
}
Result<std::unique_ptr<UncheckedStructLikeSet>> DeleteLoader::LoadEqualityDeletes(
std::span<const std::shared_ptr<DataFile>> delete_files,
const StructType& equality_type) const {
auto eq_set = std::make_unique<UncheckedStructLikeSet>(equality_type);
std::shared_ptr<Schema> projection = equality_type.ToSchema();
for (const auto& file : delete_files) {
ICEBERG_PRECHECK(file->content == DataFile::Content::kEqualityDeletes,
"Expected equality delete file but got content type {}",
static_cast<int>(file->content));
ICEBERG_ASSIGN_OR_RAISE(auto reader, OpenDeleteFile(*file, projection, io_));
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
while (true) {
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
if (!batch_opt.has_value()) break;
auto& batch = batch_opt.value();
internal::ArrowArrayGuard batch_guard(&batch);
ICEBERG_ASSIGN_OR_RAISE(
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
for (int64_t i = 0; i < batch.length; ++i) {
if (i > 0) {
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
}
ICEBERG_RETURN_UNEXPECTED(eq_set->Insert(*row));
}
}
ICEBERG_RETURN_UNEXPECTED(reader->Close());
}
return eq_set;
}
} // namespace iceberg