| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "paimon/core/io/field_mapping_reader.h" |
| |
| #include <cassert> |
| #include <cstddef> |
| #include <utility> |
| |
| #include "arrow/api.h" |
| #include "arrow/array/array_base.h" |
| #include "arrow/array/array_nested.h" |
| #include "arrow/array/util.h" |
| #include "arrow/c/abi.h" |
| #include "arrow/c/bridge.h" |
| #include "arrow/scalar.h" |
| #include "arrow/util/checked_cast.h" |
| #include "fmt/format.h" |
| #include "paimon/common/data/binary_string.h" |
| #include "paimon/common/types/data_field.h" |
| #include "paimon/common/utils/arrow/mem_utils.h" |
| #include "paimon/common/utils/arrow/status_utils.h" |
| #include "paimon/core/casting/cast_executor.h" |
| #include "paimon/core/casting/casting_utils.h" |
| #include "paimon/core/utils/field_mapping.h" |
| #include "paimon/memory/bytes.h" |
| #include "paimon/reader/batch_reader.h" |
| |
| namespace paimon { |
| class MemoryPool; |
| |
| FieldMappingReader::FieldMappingReader(int32_t field_count, |
| std::unique_ptr<FileBatchReader>&& reader, |
| const BinaryRow& partition, |
| std::unique_ptr<FieldMapping>&& mapping, |
| const std::shared_ptr<MemoryPool>& pool) |
| : field_count_(field_count), |
| arrow_pool_(GetArrowPool(pool)), |
| reader_(std::move(reader)), |
| partition_(partition), |
| partition_info_(mapping->partition_info), |
| non_partition_info_(mapping->non_partition_info), |
| non_exist_field_info_(mapping->non_exist_field_info) { |
| if (non_exist_field_info_ != std::nullopt || partition_info_ != std::nullopt) { |
| need_mapping_ = true; |
| } |
| |
| for (int32_t i = 0; |
| i < static_cast<int32_t>(non_partition_info_.idx_in_target_read_schema.size()); i++) { |
| if (i != non_partition_info_.idx_in_target_read_schema[i]) { |
| need_mapping_ = true; |
| } |
| if (non_partition_info_.cast_executors[i] != nullptr) { |
| need_casting_ = true; |
| } |
| // Field name change (RENAME COLUMN) also requires mapping: data schema |
| // carries the file's physical name while read schema carries the |
| // post-rename logical name. If we skipped mapping, the inner reader's |
| // batch would be passed through with the old physical name and the |
| // consumer's name-based lookup against the read schema would fail. |
| if (non_partition_info_.non_partition_data_schema[i].Name() != |
| non_partition_info_.non_partition_read_schema[i].Name()) { |
| need_mapping_ = true; |
| } |
| } |
| } |
| |
| Result<std::shared_ptr<arrow::Array>> FieldMappingReader::CastNonPartitionArrayIfNeed( |
| const std::shared_ptr<arrow::Array>& src_array) const { |
| if (!need_casting_) { |
| return src_array; |
| } |
| auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(src_array.get()); |
| int32_t field_count = struct_array->num_fields(); |
| assert(static_cast<size_t>(field_count) == non_partition_info_.cast_executors.size()); |
| arrow::ArrayVector casted_array; |
| std::vector<std::string> casted_field_names; |
| casted_array.reserve(field_count); |
| casted_field_names.reserve(field_count); |
| for (int32_t i = 0; i < field_count; i++) { |
| if (non_partition_info_.cast_executors[i] != nullptr) { |
| auto single_column_array = struct_array->field(i); |
| // if src array is dict, cast to string first |
| auto dict_array = |
| std::dynamic_pointer_cast<arrow::DictionaryArray>(single_column_array); |
| if (dict_array) { |
| PAIMON_ASSIGN_OR_RAISE( |
| single_column_array, |
| CastingUtils::Cast(dict_array, /*target_type=*/arrow::utf8(), |
| arrow::compute::CastOptions::Safe(), arrow_pool_.get())); |
| } |
| PAIMON_ASSIGN_OR_RAISE( |
| std::shared_ptr<arrow::Array> casted, |
| non_partition_info_.cast_executors[i]->Cast( |
| single_column_array, non_partition_info_.non_partition_read_schema[i].Type(), |
| arrow_pool_.get())); |
| casted_array.push_back(casted); |
| casted_field_names.push_back(non_partition_info_.non_partition_data_schema[i].Name()); |
| } else { |
| // read and data type may both be string type, but after adapter transform, type may be |
| // dictionary, need reconstruct struct type |
| casted_array.push_back(struct_array->field(i)); |
| casted_field_names.push_back(non_partition_info_.non_partition_data_schema[i].Name()); |
| } |
| } |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, |
| arrow::StructArray::Make(casted_array, casted_field_names)); |
| return arrow_array; |
| } |
| |
| Result<BatchReader::ReadBatchWithBitmap> FieldMappingReader::NextBatchWithBitmap() { |
| PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap non_partition_result_with_bitmap, |
| reader_->NextBatchWithBitmap()); |
| if (!need_mapping_ && !need_casting_) { |
| return non_partition_result_with_bitmap; |
| } |
| if (BatchReader::IsEofBatch(non_partition_result_with_bitmap)) { |
| // read finish |
| partition_array_.reset(); |
| non_exist_array_.reset(); |
| return non_partition_result_with_bitmap; |
| } |
| auto& [non_partition_result, bitmap] = non_partition_result_with_bitmap; |
| auto& [c_array, c_schema] = non_partition_result; |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> non_partition_array, |
| arrow::ImportArray(c_array.get(), c_schema.get())); |
| |
| arrow::ArrayVector target_array(field_count_); |
| std::vector<std::string> target_field_names(field_count_); |
| // mapping non-partition array |
| PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> casted_non_partition_array, |
| CastNonPartitionArrayIfNeed(non_partition_array)); |
| MappingFields(casted_non_partition_array, non_partition_info_.non_partition_read_schema, |
| non_partition_info_.idx_in_target_read_schema, &target_array, |
| &target_field_names); |
| |
| // mapping partition array |
| if (partition_info_ != std::nullopt) { |
| if (!partition_array_ || partition_array_->length() < non_partition_array->length()) { |
| PAIMON_ASSIGN_OR_RAISE(partition_array_, |
| GeneratePartitionArray(non_partition_array->length())); |
| } |
| auto trim_partition_array = partition_array_->Slice(0, non_partition_array->length()); |
| MappingFields(trim_partition_array, partition_info_.value().partition_read_schema, |
| partition_info_.value().idx_in_target_read_schema, &target_array, |
| &target_field_names); |
| } |
| // mapping non-exist array |
| if (non_exist_field_info_ != std::nullopt) { |
| if (!non_exist_array_ || non_exist_array_->length() < non_partition_array->length()) { |
| PAIMON_ASSIGN_OR_RAISE(non_exist_array_, |
| GenerateNonExistArray(non_partition_array->length())); |
| } |
| auto trim_non_exist_array = non_exist_array_->Slice(0, non_partition_array->length()); |
| MappingFields(trim_non_exist_array, non_exist_field_info_.value().non_exist_read_schema, |
| non_exist_field_info_.value().idx_in_target_read_schema, &target_array, |
| &target_field_names); |
| } |
| |
| // construct target array |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, |
| arrow::StructArray::Make(target_array, target_field_names)); |
| std::unique_ptr<ArrowArray> target_c_arrow_array = std::make_unique<ArrowArray>(); |
| std::unique_ptr<ArrowSchema> target_c_schema = std::make_unique<ArrowSchema>(); |
| PAIMON_RETURN_NOT_OK_FROM_ARROW( |
| arrow::ExportArray(*arrow_array, target_c_arrow_array.get(), target_c_schema.get())); |
| auto target_batch = std::make_pair(std::move(target_c_arrow_array), std::move(target_c_schema)); |
| return std::make_pair(std::move(target_batch), std::move(bitmap)); |
| } |
| |
| Result<std::shared_ptr<arrow::Array>> FieldMappingReader::GenerateSinglePartitionArray( |
| int32_t idx, int32_t batch_size) const { |
| const auto& type = partition_info_.value().partition_read_schema[idx].Type(); |
| if (partition_.IsNullAt(partition_info_.value().idx_in_partition[idx])) { |
| // for null partition value |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( |
| std::shared_ptr<arrow::Array> null_array, |
| arrow::MakeArrayOfNull(type, batch_size, arrow_pool_.get())); |
| return null_array; |
| } |
| auto type_id = type->id(); |
| std::shared_ptr<arrow::Scalar> scalar; |
| switch (type_id) { |
| case arrow::Type::type::BOOL: { |
| bool value = partition_.GetBoolean(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::BooleanScalar>(value); |
| break; |
| } |
| case arrow::Type::type::INT8: { |
| int8_t value = partition_.GetByte(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::Int8Scalar>(value); |
| break; |
| } |
| case arrow::Type::type::INT16: { |
| int16_t value = partition_.GetShort(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::Int16Scalar>(value); |
| break; |
| } |
| case arrow::Type::type::INT32: { |
| int32_t value = partition_.GetInt(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::Int32Scalar>(value); |
| break; |
| } |
| case arrow::Type::type::INT64: { |
| int64_t value = partition_.GetLong(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::Int64Scalar>(value); |
| break; |
| } |
| case arrow::Type::type::FLOAT: { |
| float value = partition_.GetFloat(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::FloatScalar>(value); |
| break; |
| } |
| case arrow::Type::type::DOUBLE: { |
| double value = partition_.GetDouble(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::DoubleScalar>(value); |
| break; |
| } |
| case arrow::Type::type::STRING: { |
| BinaryString value = |
| partition_.GetString(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::StringScalar>(value.ToString()); |
| break; |
| } |
| case arrow::Type::type::BINARY: { |
| auto value = partition_.GetBinary(partition_info_.value().idx_in_partition[idx]); |
| std::string value_str(value->data(), value->size()); |
| scalar = std::make_shared<arrow::BinaryScalar>(value_str); |
| break; |
| } |
| case arrow::Type::type::DATE32: { |
| int32_t value = partition_.GetDate(partition_info_.value().idx_in_partition[idx]); |
| scalar = std::make_shared<arrow::Date32Scalar>(value); |
| break; |
| } |
| default: |
| return Status::Invalid( |
| fmt::format("Not support arrow type {} for partition", |
| partition_info_.value().partition_read_schema[idx].Type()->ToString())); |
| } |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( |
| std::shared_ptr<arrow::Array> arrow_array, |
| arrow::MakeArrayFromScalar(*scalar, batch_size, arrow_pool_.get())); |
| return arrow_array; |
| } |
| |
| Result<std::shared_ptr<arrow::Array>> FieldMappingReader::GeneratePartitionArray( |
| int32_t batch_size) const { |
| arrow::ArrayVector partition_array; |
| std::vector<std::string> partition_field_names; |
| partition_array.reserve(partition_info_.value().partition_read_schema.size()); |
| partition_field_names.reserve(partition_info_.value().partition_read_schema.size()); |
| for (size_t i = 0; i < partition_info_.value().partition_read_schema.size(); i++) { |
| PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> single_partition_array, |
| GenerateSinglePartitionArray(i, batch_size)); |
| partition_array.push_back(single_partition_array); |
| partition_field_names.push_back(partition_info_.value().partition_read_schema[i].Name()); |
| } |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( |
| std::shared_ptr<arrow::Array> arrow_array, |
| arrow::StructArray::Make(partition_array, partition_field_names)); |
| return arrow_array; |
| } |
| |
| Result<std::shared_ptr<arrow::Array>> FieldMappingReader::GenerateNonExistArray( |
| int32_t batch_size) const { |
| arrow::ArrayVector non_exist_array; |
| std::vector<std::string> non_exist_field_names; |
| non_exist_array.reserve(non_exist_field_info_.value().non_exist_read_schema.size()); |
| non_exist_field_names.reserve(non_exist_field_info_.value().non_exist_read_schema.size()); |
| for (const auto& non_exist_field : non_exist_field_info_.value().non_exist_read_schema) { |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( |
| std::shared_ptr<arrow::Array> null_array, |
| arrow::MakeArrayOfNull(non_exist_field.Type(), batch_size, arrow_pool_.get())); |
| non_exist_array.push_back(null_array); |
| non_exist_field_names.push_back(non_exist_field.Name()); |
| } |
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( |
| std::shared_ptr<arrow::Array> arrow_array, |
| arrow::StructArray::Make(non_exist_array, non_exist_field_names)); |
| return arrow_array; |
| } |
| |
| void FieldMappingReader::MappingFields(const std::shared_ptr<arrow::Array>& data_array, |
| const std::vector<DataField>& read_fields_of_data_array, |
| const std::vector<int32_t>& idx_in_target_schema, |
| arrow::ArrayVector* target_array, |
| std::vector<std::string>* target_field_names) { |
| auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(data_array.get()); |
| assert(struct_array); |
| assert(struct_array->fields().size() == idx_in_target_schema.size()); |
| for (size_t i = 0; i < idx_in_target_schema.size(); i++) { |
| // target type may be string type, but after adapter transform, type may be dictionary, |
| // need reconstruct struct type |
| (*target_array)[idx_in_target_schema[i]] = struct_array->field(i); |
| (*target_field_names)[idx_in_target_schema[i]] = read_fields_of_data_array[i].Name(); |
| } |
| } |
| |
| } // namespace paimon |