blob: f4985d9ac42c851e0ca4e97fbf883415bc7a6721 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "iceberg/avro/avro_reader.h"
#include <memory>
#include <arrow/array/builder_base.h>
#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include <avro/DataFile.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/arrow/metadata_column_util_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_direct_decoder_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/name_mapping.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
namespace iceberg::avro {
namespace {
Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions& options,
int64_t buffer_size) {
::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
if (options.length) {
file_info.set_size(options.length.value());
}
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info));
return std::make_unique<AvroInputStream>(file, buffer_size);
}
// Check if the row position metadata column is in the read schema
bool HasRowPositionColumn(const Schema& schema) {
for (const auto& field : schema.fields()) {
if (field.field_id() == MetadataColumns::kFilePositionColumnId) {
return true;
}
}
return false;
}
// Abstract base class for Avro read backends.
class AvroReadBackend {
public:
virtual ~AvroReadBackend() = default;
virtual Result<::avro::ValidSchema> Init(
std::unique_ptr<AvroInputStream> input_stream) = 0;
virtual Status InitWithSchema(const ::avro::ValidSchema& file_schema,
const std::optional<Split>& split) = 0;
virtual void InitReadContext(const ::avro::ValidSchema& reader_schema) = 0;
virtual bool HasMore() = 0;
virtual Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* builder) = 0;
virtual bool IsPastSync(int64_t split_end) const = 0;
virtual const ::avro::Metadata& GetMetadata() const = 0;
virtual const ::avro::ValidSchema& GetReaderSchema() const = 0;
virtual void Close() = 0;
virtual bool Closed() const = 0;
};
// Backend implementation using direct Avro decoder.
class DirectDecoderBackend : public AvroReadBackend {
public:
Result<::avro::ValidSchema> Init(
std::unique_ptr<AvroInputStream> input_stream) override {
reader_ = std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
return reader_->dataSchema();
}
Status InitWithSchema(const ::avro::ValidSchema& file_schema,
const std::optional<Split>& split) override {
reader_->init(file_schema);
if (split) {
reader_->sync(split->offset);
}
return {};
}
void InitReadContext(const ::avro::ValidSchema&) override {}
bool HasMore() override { return reader_->hasMore(); }
Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* builder) override {
reader_->decr();
return DecodeAvroToBuilder(GetReaderSchema().root(), reader_->decoder(), projection,
read_schema, metadata_context, builder, decode_context_);
}
bool IsPastSync(int64_t split_end) const override {
return reader_->pastSync(split_end);
}
const ::avro::Metadata& GetMetadata() const override { return reader_->metadata(); }
const ::avro::ValidSchema& GetReaderSchema() const override {
return reader_->readerSchema();
}
void Close() override {
if (reader_) {
reader_->close();
reader_.reset();
}
}
bool Closed() const override { return reader_ == nullptr; }
private:
std::unique_ptr<::avro::DataFileReaderBase> reader_;
// Decode context for reusing scratch buffers
DecodeContext decode_context_;
};
// Backend implementation using avro::GenericDatum.
class GenericDatumBackend : public AvroReadBackend {
public:
Result<::avro::ValidSchema> Init(
std::unique_ptr<AvroInputStream> input_stream) override {
reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
std::move(input_stream));
return reader_->dataSchema();
}
Status InitWithSchema(const ::avro::ValidSchema& /*file_schema*/,
const std::optional<Split>& split) override {
if (split) {
reader_->sync(split->offset);
}
return {};
}
void InitReadContext(const ::avro::ValidSchema& reader_schema) override {
datum_ = std::make_unique<::avro::GenericDatum>(reader_schema);
}
bool HasMore() override {
has_more_ = reader_->read(*datum_);
return has_more_;
}
Status DecodeNext(const SchemaProjection& projection, const Schema& read_schema,
const arrow::MetadataColumnContext& metadata_context,
::arrow::ArrayBuilder* builder) override {
return AppendDatumToBuilder(GetReaderSchema().root(), *datum_, projection,
read_schema, metadata_context, builder);
}
bool IsPastSync(int64_t split_end) const override {
return reader_->pastSync(split_end);
}
const ::avro::Metadata& GetMetadata() const override { return reader_->metadata(); }
const ::avro::ValidSchema& GetReaderSchema() const override {
return reader_->readerSchema();
}
void Close() override {
if (reader_) {
reader_->close();
reader_.reset();
}
}
bool Closed() const override { return reader_ == nullptr; }
private:
std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
// Reusable GenericDatum for reading records
std::unique_ptr<::avro::GenericDatum> datum_;
// Cached result from HasMore()
bool has_more_ = false;
};
// A stateful context to keep track of the reading progress.
struct ReadContext {
// The arrow schema to build the record batch.
std::shared_ptr<::arrow::Schema> arrow_schema_;
// The builder to build the record batch.
std::shared_ptr<::arrow::ArrayBuilder> builder_;
};
} // namespace
// TODO(gang.wu): collect basic reader metrics
class AvroReader::Impl {
public:
Status Open(const ReaderOptions& options) {
// TODO(gangwu): perhaps adding a ReaderOptions::Validate() method
if (options.projection == nullptr) {
return InvalidArgument("Projected schema is required by Avro reader");
}
batch_size_ = options.properties.Get(ReaderProperties::kBatchSize);
read_schema_ = options.projection;
// Open the input stream and adapt to the avro interface.
ICEBERG_ASSIGN_OR_RAISE(
auto input_stream,
CreateInputStream(options,
options.properties.Get(ReaderProperties::kAvroBufferSize)));
// Create the appropriate backend based on configuration
if (options.properties.Get(ReaderProperties::kAvroSkipDatum)) {
backend_ = std::make_unique<DirectDecoderBackend>();
} else {
backend_ = std::make_unique<GenericDatumBackend>();
}
ICEBERG_ASSIGN_OR_RAISE(auto file_schema, backend_->Init(std::move(input_stream)));
// Validate field ids in the file schema.
HasIdVisitor has_id_visitor;
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
if (has_id_visitor.HasNoIds()) {
// Apply field IDs based on name mapping if available
if (options.name_mapping) {
ICEBERG_ASSIGN_OR_RAISE(
auto new_root_node,
MakeAvroNodeWithFieldIds(file_schema.root(), *options.name_mapping));
// Update the file schema to use the new schema with field IDs
file_schema = ::avro::ValidSchema(new_root_node);
} else {
return InvalidSchema(
"Avro file schema has no field IDs and no name mapping provided");
}
} else if (!has_id_visitor.AllHaveIds()) {
return InvalidSchema("Not all fields in the Avro file schema have field IDs");
}
// Project the read schema on top of the file schema.
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(),
/*prune_source=*/false));
ICEBERG_RETURN_UNEXPECTED(backend_->InitWithSchema(file_schema, options.split));
if (options.split) {
split_end_ = options.split->offset + options.split->length;
if (options.split->offset != 0 && HasRowPositionColumn(*read_schema_)) {
return NotSupported(
"Reading '_pos' metadata column with split is not supported for Avro files.");
}
}
metadata_context_ = {.file_path = options.path, .next_file_pos = 0};
return {};
}
Result<std::optional<ArrowArray>> Next() {
if (!context_) {
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
}
while (context_->builder_->length() < batch_size_) {
if (IsPastSync()) {
break;
}
if (!backend_->HasMore()) {
break;
}
ICEBERG_RETURN_UNEXPECTED(backend_->DecodeNext(
projection_, *read_schema_, metadata_context_, context_->builder_.get()));
metadata_context_.next_file_pos++;
}
return ConvertBuilderToArrowArray();
}
Status Close() {
backend_->Close();
context_.reset();
return {};
}
Result<ArrowSchema> Schema() {
if (!context_) {
ICEBERG_RETURN_UNEXPECTED(InitReadContext());
}
ArrowSchema arrow_schema;
auto export_result = ::arrow::ExportSchema(*context_->arrow_schema_, &arrow_schema);
if (!export_result.ok()) {
return InvalidSchema("Failed to export the arrow schema: {}",
export_result.message());
}
return arrow_schema;
}
Result<std::unordered_map<std::string, std::string>> Metadata() {
if (backend_->Closed()) {
return Invalid("Reader is not opened");
}
const auto& metadata = backend_->GetMetadata();
std::unordered_map<std::string, std::string> metadata_map;
metadata_map.reserve(metadata.size());
for (const auto& pair : metadata) {
metadata_map.insert_or_assign(pair.first,
std::string(pair.second.begin(), pair.second.end()));
}
return metadata_map;
}
private:
Status InitReadContext() {
context_ = std::make_unique<ReadContext>();
ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
auto import_result = ::arrow::ImportSchema(&arrow_schema);
if (!import_result.ok()) {
return InvalidSchema("Failed to import the arrow schema: {}",
import_result.status().message());
}
context_->arrow_schema_ = import_result.MoveValueUnsafe();
auto arrow_struct_type =
std::make_shared<::arrow::StructType>(context_->arrow_schema_->fields());
auto builder_result = ::arrow::MakeBuilder(arrow_struct_type);
if (!builder_result.ok()) {
return InvalidSchema("Failed to make the arrow builder: {}",
builder_result.status().message());
}
context_->builder_ = builder_result.MoveValueUnsafe();
backend_->InitReadContext(backend_->GetReaderSchema());
return {};
}
Result<std::optional<ArrowArray>> ConvertBuilderToArrowArray() {
if (context_->builder_->length() == 0) {
return std::nullopt;
}
auto builder_result = context_->builder_->Finish();
if (!builder_result.ok()) {
return InvalidArrowData("Failed to finish the arrow array builder: {}",
builder_result.status().message());
}
auto array = builder_result.MoveValueUnsafe();
ArrowArray arrow_array;
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
if (!export_result.ok()) {
return InvalidArrowData("Failed to export the arrow array: {}",
export_result.message());
}
return arrow_array;
}
bool IsPastSync() const {
if (!split_end_) {
return false;
}
return backend_->IsPastSync(split_end_.value());
}
private:
// Max number of rows in the record batch to read.
int64_t batch_size_{};
// The end of the split to read and used to terminate the reading.
std::optional<int64_t> split_end_;
// The schema to read.
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
// The metadata column context for populating _file and _pos columns.
arrow::MetadataColumnContext metadata_context_;
// The read backend to read data into Arrow.
std::unique_ptr<AvroReadBackend> backend_;
// The context to keep track of the reading progress.
std::unique_ptr<ReadContext> context_;
};
AvroReader::~AvroReader() = default;
Result<std::optional<ArrowArray>> AvroReader::Next() { return impl_->Next(); }
Result<ArrowSchema> AvroReader::Schema() { return impl_->Schema(); }
Result<std::unordered_map<std::string, std::string>> AvroReader::Metadata() {
return impl_->Metadata();
}
Status AvroReader::Open(const ReaderOptions& options) {
impl_ = std::make_unique<Impl>();
return impl_->Open(options);
}
Status AvroReader::Close() { return impl_->Close(); }
void RegisterReader() {
static ReaderFactoryRegistry avro_reader_register(
FileFormatType::kAvro,
[]() -> Result<std::unique_ptr<Reader>> { return std::make_unique<AvroReader>(); });
}
} // namespace iceberg::avro