| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <arrow/array/builder_binary.h> |
| #include <arrow/array/builder_decimal.h> |
| #include <arrow/array/builder_nested.h> |
| #include <arrow/array/builder_primitive.h> |
| #include <arrow/type.h> |
| #include <arrow/util/decimal.h> |
| #include <avro/Decoder.hh> |
| #include <avro/Node.hh> |
| #include <avro/NodeImpl.hh> |
| #include <avro/Types.hh> |
| |
| #include "iceberg/arrow/arrow_status_internal.h" |
| #include "iceberg/avro/avro_direct_decoder_internal.h" |
| #include "iceberg/avro/avro_schema_util_internal.h" |
| #include "iceberg/metadata_columns.h" |
| #include "iceberg/schema.h" |
| #include "iceberg/util/checked_cast.h" |
| #include "iceberg/util/macros.h" |
| |
| namespace iceberg::avro { |
| |
| using ::iceberg::arrow::ToErrorKind; |
| |
| namespace { |
| |
| /// \brief Forward declaration for mutual recursion. |
| Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, |
| const FieldProjection& projection, |
| const SchemaField& projected_field, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx); |
| |
| /// \brief Skip an Avro value based on its schema without decoding |
| Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { |
| switch (avro_node->type()) { |
| case ::avro::AVRO_NULL: |
| decoder.decodeNull(); |
| return {}; |
| |
| case ::avro::AVRO_BOOL: |
| decoder.decodeBool(); |
| return {}; |
| |
| case ::avro::AVRO_INT: |
| decoder.decodeInt(); |
| return {}; |
| |
| case ::avro::AVRO_LONG: |
| decoder.decodeLong(); |
| return {}; |
| |
| case ::avro::AVRO_FLOAT: |
| decoder.decodeFloat(); |
| return {}; |
| |
| case ::avro::AVRO_DOUBLE: |
| decoder.decodeDouble(); |
| return {}; |
| |
| case ::avro::AVRO_STRING: |
| decoder.skipString(); |
| return {}; |
| |
| case ::avro::AVRO_BYTES: |
| decoder.skipBytes(); |
| return {}; |
| |
| case ::avro::AVRO_FIXED: |
| decoder.skipFixed(avro_node->fixedSize()); |
| return {}; |
| |
| case ::avro::AVRO_RECORD: { |
| // Skip all fields in order |
| for (size_t i = 0; i < avro_node->leaves(); ++i) { |
| ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); |
| } |
| return {}; |
| } |
| |
| case ::avro::AVRO_ENUM: |
| decoder.decodeEnum(); |
| return {}; |
| |
| case ::avro::AVRO_ARRAY: { |
| const auto& element_node = avro_node->leafAt(0); |
| // skipArray() returns count like arrayStart(), must handle all blocks |
| int64_t block_count = decoder.skipArray(); |
| while (block_count > 0) { |
| for (int64_t i = 0; i < block_count; ++i) { |
| ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); |
| } |
| block_count = decoder.arrayNext(); |
| } |
| return {}; |
| } |
| |
| case ::avro::AVRO_MAP: { |
| const auto& value_node = avro_node->leafAt(1); |
| // skipMap() returns count like mapStart(), must handle all blocks |
| int64_t block_count = decoder.skipMap(); |
| while (block_count > 0) { |
| for (int64_t i = 0; i < block_count; ++i) { |
| decoder.skipString(); // Skip key (always string in Avro maps) |
| ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); |
| } |
| block_count = decoder.mapNext(); |
| } |
| return {}; |
| } |
| |
| case ::avro::AVRO_UNION: { |
| const size_t branch_index = decoder.decodeUnionIndex(); |
| // Validate branch index |
| const size_t num_branches = avro_node->leaves(); |
| if (branch_index >= num_branches) { |
| return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, |
| num_branches); |
| } |
| return SkipAvroValue(avro_node->leafAt(branch_index), decoder); |
| } |
| |
| default: |
| return InvalidArgument("Unsupported Avro type for skipping: {}", |
| ToString(avro_node)); |
| } |
| } |
| |
| /// \brief Decode Avro record directly to Arrow struct builder. |
| Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, |
| const std::span<const FieldProjection>& projections, |
| const StructType& struct_type, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { |
| if (avro_node->type() != ::avro::AVRO_RECORD) { |
| return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); |
| } |
| |
| auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); |
| ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); |
| |
| // Build a map from Avro field index to projection index (cached per struct schema) |
| // -1 means the field should be skipped |
| const FieldProjection* cache_key = projections.data(); |
| auto cache_it = ctx.avro_to_projection_cache.find(cache_key); |
| std::vector<int>* avro_to_projection; |
| |
| if (cache_it != ctx.avro_to_projection_cache.end()) { |
| // Use cached mapping |
| avro_to_projection = &cache_it->second; |
| } else { |
| // Build and cache the mapping |
| auto [inserted_it, inserted] = ctx.avro_to_projection_cache.emplace( |
| cache_key, std::vector<int>(avro_node->leaves(), -1)); |
| avro_to_projection = &inserted_it->second; |
| |
| for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { |
| const auto& field_projection = projections[proj_idx]; |
| if (field_projection.kind == FieldProjection::Kind::kProjected) { |
| size_t avro_field_index = std::get<size_t>(field_projection.from); |
| (*avro_to_projection)[avro_field_index] = static_cast<int>(proj_idx); |
| } |
| } |
| } |
| |
| // Read all Avro fields in order (must maintain decoder position) |
| for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { |
| int proj_idx = (*avro_to_projection)[avro_idx]; |
| |
| if (proj_idx < 0) { |
| // Skip this field - not in projection |
| const auto& avro_field_node = avro_node->leafAt(avro_idx); |
| ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); |
| } else { |
| // Decode this field |
| const auto& field_projection = projections[proj_idx]; |
| const auto& expected_field = struct_type.fields()[proj_idx]; |
| const auto& avro_field_node = avro_node->leafAt(avro_idx); |
| auto* field_builder = struct_builder->field_builder(proj_idx); |
| |
| ICEBERG_RETURN_UNEXPECTED( |
| DecodeFieldToBuilder(avro_field_node, decoder, field_projection, expected_field, |
| metadata_context, field_builder, ctx)); |
| } |
| } |
| |
| // Handle null fields (fields in projection but not in Avro) |
| for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { |
| const auto& field_projection = projections[proj_idx]; |
| const auto& expected_field = struct_type.fields()[proj_idx]; |
| auto* field_builder = struct_builder->field_builder(static_cast<int>(proj_idx)); |
| if (field_projection.kind == FieldProjection::Kind::kNull) { |
| ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); |
| } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { |
| int32_t field_id = expected_field.field_id(); |
| if (field_id == MetadataColumns::kFilePathColumnId) { |
| auto string_builder = |
| internal::checked_cast<::arrow::StringBuilder*>(field_builder); |
| ICEBERG_ARROW_RETURN_NOT_OK(string_builder->Append(metadata_context.file_path)); |
| } else if (field_id == MetadataColumns::kFilePositionColumnId) { |
| auto int_builder = internal::checked_cast<::arrow::Int64Builder*>(field_builder); |
| ICEBERG_ARROW_RETURN_NOT_OK(int_builder->Append(metadata_context.next_file_pos)); |
| } else { |
| return NotSupported("Unsupported metadata column field id: {}", field_id); |
| } |
| } else if (field_projection.kind != FieldProjection::Kind::kProjected) { |
| return InvalidArgument("Unsupported field projection kind: {}", |
| static_cast<int>(field_projection.kind)); |
| } |
| } |
| return {}; |
| } |
| |
| /// \brief Decode Avro array directly to Arrow list builder. |
| Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, |
| const FieldProjection& element_projection, |
| const ListType& list_type, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { |
| if (avro_node->type() != ::avro::AVRO_ARRAY) { |
| return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); |
| } |
| |
| auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); |
| ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); |
| |
| auto* value_builder = list_builder->value_builder(); |
| const auto& element_node = avro_node->leafAt(0); |
| const auto& element_field = list_type.fields().back(); |
| |
| // Read array block count |
| int64_t block_count = decoder.arrayStart(); |
| while (block_count != 0) { |
| for (int64_t i = 0; i < block_count; ++i) { |
| ICEBERG_RETURN_UNEXPECTED( |
| DecodeFieldToBuilder(element_node, decoder, element_projection, element_field, |
| metadata_context, value_builder, ctx)); |
| } |
| block_count = decoder.arrayNext(); |
| } |
| |
| return {}; |
| } |
| |
| /// \brief Decode Avro map directly to Arrow map builder. |
| Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, |
| const FieldProjection& key_projection, |
| const FieldProjection& value_projection, |
| const MapType& map_type, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { |
| auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); |
| |
| if (avro_node->type() == ::avro::AVRO_MAP) { |
| // Handle regular Avro map: map<string, value> |
| const auto& key_node = avro_node->leafAt(0); |
| const auto& value_node = avro_node->leafAt(1); |
| const auto& key_field = map_type.key(); |
| const auto& value_field = map_type.value(); |
| |
| ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); |
| auto* key_builder = map_builder->key_builder(); |
| auto* item_builder = map_builder->item_builder(); |
| |
| // Read map block count |
| int64_t block_count = decoder.mapStart(); |
| while (block_count != 0) { |
| for (int64_t i = 0; i < block_count; ++i) { |
| ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, |
| key_field, metadata_context, |
| key_builder, ctx)); |
| ICEBERG_RETURN_UNEXPECTED( |
| DecodeFieldToBuilder(value_node, decoder, value_projection, value_field, |
| metadata_context, item_builder, ctx)); |
| } |
| block_count = decoder.mapNext(); |
| } |
| |
| return {}; |
| } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { |
| // Handle array-based map: list<struct<key, value>> |
| const auto& key_field = map_type.key(); |
| const auto& value_field = map_type.value(); |
| |
| ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); |
| auto* key_builder = map_builder->key_builder(); |
| auto* item_builder = map_builder->item_builder(); |
| |
| const auto& record_node = avro_node->leafAt(0); |
| if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { |
| return InvalidArgument( |
| "Array-based map must contain records with exactly 2 fields, got: {}", |
| ToString(record_node)); |
| } |
| const auto& key_node = record_node->leafAt(0); |
| const auto& value_node = record_node->leafAt(1); |
| |
| // Read array block count |
| int64_t block_count = decoder.arrayStart(); |
| while (block_count != 0) { |
| for (int64_t i = 0; i < block_count; ++i) { |
| ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, |
| key_field, metadata_context, |
| key_builder, ctx)); |
| ICEBERG_RETURN_UNEXPECTED( |
| DecodeFieldToBuilder(value_node, decoder, value_projection, value_field, |
| metadata_context, item_builder, ctx)); |
| } |
| block_count = decoder.arrayNext(); |
| } |
| |
| return {}; |
| } else { |
| return InvalidArgument("Expected Avro map or array with map logical type, got: {}", |
| ToString(avro_node)); |
| } |
| } |
| |
| /// \brief Decode nested Avro data directly to Arrow array builder. |
| Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node, |
| ::avro::Decoder& decoder, |
| const std::span<const FieldProjection>& projections, |
| const NestedType& projected_type, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, |
| DecodeContext& ctx) { |
| switch (projected_type.type_id()) { |
| case TypeId::kStruct: { |
| const auto& struct_type = internal::checked_cast<const StructType&>(projected_type); |
| return DecodeStructToBuilder(avro_node, decoder, projections, struct_type, |
| metadata_context, array_builder, ctx); |
| } |
| |
| case TypeId::kList: { |
| if (projections.size() != 1) { |
| return InvalidArgument("Expected 1 projection for list, got: {}", |
| projections.size()); |
| } |
| const auto& list_type = internal::checked_cast<const ListType&>(projected_type); |
| return DecodeListToBuilder(avro_node, decoder, projections[0], list_type, |
| metadata_context, array_builder, ctx); |
| } |
| |
| case TypeId::kMap: { |
| if (projections.size() != 2) { |
| return InvalidArgument("Expected 2 projections for map, got: {}", |
| projections.size()); |
| } |
| const auto& map_type = internal::checked_cast<const MapType&>(projected_type); |
| return DecodeMapToBuilder(avro_node, decoder, projections[0], projections[1], |
| map_type, metadata_context, array_builder, ctx); |
| } |
| |
| default: |
| return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); |
| } |
| } |
| |
| Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, |
| ::avro::Decoder& decoder, |
| const SchemaField& projected_field, |
| ::arrow::ArrayBuilder* array_builder, |
| DecodeContext& ctx) { |
| const auto& projected_type = *projected_field.type(); |
| if (!projected_type.is_primitive()) { |
| return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); |
| } |
| |
| switch (projected_type.type_id()) { |
| case TypeId::kBoolean: { |
| if (avro_node->type() != ::avro::AVRO_BOOL) { |
| return InvalidArgument("Expected Avro boolean for boolean field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); |
| bool value = decoder.decodeBool(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| return {}; |
| } |
| |
| case TypeId::kInt: { |
| if (avro_node->type() != ::avro::AVRO_INT) { |
| return InvalidArgument("Expected Avro int for int field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); |
| int32_t value = decoder.decodeInt(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| return {}; |
| } |
| |
| case TypeId::kLong: { |
| auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); |
| if (avro_node->type() == ::avro::AVRO_LONG) { |
| int64_t value = decoder.decodeLong(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| } else if (avro_node->type() == ::avro::AVRO_INT) { |
| int32_t value = decoder.decodeInt(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value))); |
| } else { |
| return InvalidArgument("Expected Avro int/long for long field, got: {}", |
| ToString(avro_node)); |
| } |
| return {}; |
| } |
| |
| case TypeId::kFloat: { |
| if (avro_node->type() != ::avro::AVRO_FLOAT) { |
| return InvalidArgument("Expected Avro float for float field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); |
| float value = decoder.decodeFloat(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| return {}; |
| } |
| |
| case TypeId::kDouble: { |
| auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); |
| if (avro_node->type() == ::avro::AVRO_DOUBLE) { |
| double value = decoder.decodeDouble(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| } else if (avro_node->type() == ::avro::AVRO_FLOAT) { |
| float value = decoder.decodeFloat(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value))); |
| } else { |
| return InvalidArgument("Expected Avro float/double for double field, got: {}", |
| ToString(avro_node)); |
| } |
| return {}; |
| } |
| |
| case TypeId::kString: { |
| if (avro_node->type() != ::avro::AVRO_STRING) { |
| return InvalidArgument("Expected Avro string for string field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); |
| decoder.decodeString(ctx.string_scratch); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.string_scratch)); |
| return {}; |
| } |
| |
| case TypeId::kBinary: { |
| if (avro_node->type() != ::avro::AVRO_BYTES) { |
| return InvalidArgument("Expected Avro bytes for binary field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); |
| decoder.decodeBytes(ctx.bytes_scratch); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append( |
| ctx.bytes_scratch.data(), static_cast<int32_t>(ctx.bytes_scratch.size()))); |
| return {}; |
| } |
| |
| case TypeId::kFixed: { |
| if (avro_node->type() != ::avro::AVRO_FIXED) { |
| return InvalidArgument("Expected Avro fixed for fixed field, got: {}", |
| ToString(avro_node)); |
| } |
| const auto& fixed_type = internal::checked_cast<const FixedType&>(projected_type); |
| auto* builder = |
| internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); |
| |
| ctx.bytes_scratch.resize(fixed_type.length()); |
| decoder.decodeFixed(fixed_type.length(), ctx.bytes_scratch); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data())); |
| return {}; |
| } |
| |
| case TypeId::kUuid: { |
| if (avro_node->type() != ::avro::AVRO_FIXED || |
| avro_node->logicalType().type() != ::avro::LogicalType::UUID) { |
| return InvalidArgument("Expected Avro fixed for uuid field, got: {}", |
| ToString(avro_node)); |
| } |
| |
| auto* builder = |
| internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); |
| |
| ctx.bytes_scratch.resize(16); |
| decoder.decodeFixed(16, ctx.bytes_scratch); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx.bytes_scratch.data())); |
| return {}; |
| } |
| |
| case TypeId::kDecimal: { |
| if (avro_node->type() != ::avro::AVRO_FIXED || |
| avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) { |
| return InvalidArgument( |
| "Expected Avro fixed with DECIMAL logical type for decimal field, got: {}", |
| ToString(avro_node)); |
| } |
| |
| size_t byte_width = avro_node->fixedSize(); |
| auto* builder = internal::checked_cast<::arrow::Decimal128Builder*>(array_builder); |
| |
| ctx.bytes_scratch.resize(byte_width); |
| decoder.decodeFixed(byte_width, ctx.bytes_scratch); |
| ICEBERG_ARROW_ASSIGN_OR_RETURN( |
| auto decimal, ::arrow::Decimal128::FromBigEndian(ctx.bytes_scratch.data(), |
| ctx.bytes_scratch.size())); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal)); |
| return {}; |
| } |
| |
| case TypeId::kDate: { |
| if (avro_node->type() != ::avro::AVRO_INT || |
| avro_node->logicalType().type() != ::avro::LogicalType::DATE) { |
| return InvalidArgument( |
| "Expected Avro int with DATE logical type for date field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::Date32Builder*>(array_builder); |
| int32_t value = decoder.decodeInt(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| return {}; |
| } |
| |
| case TypeId::kTime: { |
| if (avro_node->type() != ::avro::AVRO_LONG || |
| avro_node->logicalType().type() != ::avro::LogicalType::TIME_MICROS) { |
| return InvalidArgument( |
| "Expected Avro long with TIME_MICROS for time field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::Time64Builder*>(array_builder); |
| int64_t value = decoder.decodeLong(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| return {}; |
| } |
| |
| case TypeId::kTimestamp: |
| case TypeId::kTimestampTz: { |
| if (avro_node->type() != ::avro::AVRO_LONG || |
| avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_MICROS) { |
| return InvalidArgument( |
| "Expected Avro long with TIMESTAMP_MICROS for timestamp field, got: {}", |
| ToString(avro_node)); |
| } |
| auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder); |
| int64_t value = decoder.decodeLong(); |
| ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); |
| return {}; |
| } |
| |
| default: |
| return InvalidArgument("Unsupported primitive type {} to decode from avro node {}", |
| projected_field.type()->ToString(), ToString(avro_node)); |
| } |
| } |
| |
| /// \brief Dispatch to appropriate handlers based on the projection kind. |
| Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, |
| const FieldProjection& projection, |
| const SchemaField& projected_field, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { |
| if (avro_node->type() == ::avro::AVRO_UNION) { |
| const size_t branch_index = decoder.decodeUnionIndex(); |
| |
| // Validate branch index |
| const size_t num_branches = avro_node->leaves(); |
| if (branch_index >= num_branches) { |
| return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, |
| num_branches); |
| } |
| |
| const auto& branch_node = avro_node->leafAt(branch_index); |
| if (branch_node->type() == ::avro::AVRO_NULL) { |
| ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); |
| return {}; |
| } else { |
| return DecodeFieldToBuilder(branch_node, decoder, projection, projected_field, |
| metadata_context, array_builder, ctx); |
| } |
| } |
| |
| const auto& projected_type = *projected_field.type(); |
| if (projected_type.is_primitive()) { |
| return DecodePrimitiveValueToBuilder(avro_node, decoder, projected_field, |
| array_builder, ctx); |
| } else { |
| const auto& nested_type = internal::checked_cast<const NestedType&>(projected_type); |
| return DecodeNestedValueToBuilder(avro_node, decoder, projection.children, |
| nested_type, metadata_context, array_builder, ctx); |
| } |
| } |
| |
| } // namespace |
| |
| Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, |
| const SchemaProjection& projection, |
| const Schema& projected_schema, |
| const arrow::MetadataColumnContext& metadata_context, |
| ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { |
| return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields, |
| projected_schema, metadata_context, array_builder, |
| ctx); |
| } |
| |
| } // namespace iceberg::avro |