| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "format/arrow/arrow_block_convertor.h" |
| |
| #include <arrow/array/builder_base.h> |
| #include <arrow/array/builder_binary.h> |
| #include <arrow/array/builder_decimal.h> |
| #include <arrow/array/builder_nested.h> |
| #include <arrow/array/builder_primitive.h> |
| #include <arrow/record_batch.h> |
| #include <arrow/status.h> |
| #include <arrow/type.h> |
| #include <arrow/util/decimal.h> |
| #include <arrow/visit_type_inline.h> |
| #include <arrow/visitor.h> |
| #include <cctz/time_zone.h> |
| #include <glog/logging.h> |
| |
| #include <ctime> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "core/block/column_with_type_and_name.h" |
| #include "core/column/column.h" |
| #include "core/data_type/data_type.h" |
| #include "core/data_type/data_type_array.h" |
| #include "core/data_type/data_type_nullable.h" |
| #include "core/value/vdatetime_value.h" |
| #include "format/arrow/arrow_row_batch.h" |
| #include "format/arrow/arrow_utils.h" |
| |
| namespace arrow { |
| class Array; |
| } // namespace arrow |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| Status FromBlockToRecordBatchConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) { |
| int num_fields = _schema->num_fields(); |
| if (_block.columns() != num_fields) { |
| return Status::InvalidArgument("number fields not match"); |
| } |
| |
| // Calculate actual row range to convert |
| size_t actual_start = _row_range_start; |
| size_t actual_rows = _row_range_end > 0 ? (_row_range_end - _row_range_start) |
| : (_block.rows() - _row_range_start); |
| |
| // Validate range |
| if (actual_start + actual_rows > _block.rows()) { |
| return Status::InvalidArgument( |
| "Row range out of bounds: start={}, num_rows={}, block_rows={}", actual_start, |
| actual_rows, _block.rows()); |
| } |
| |
| _arrays.resize(num_fields); |
| |
| for (int idx = 0; idx < num_fields; ++idx) { |
| _cur_field_idx = idx; |
| _cur_start = actual_start; |
| _cur_rows = actual_rows; |
| _cur_col = _block.get_by_position(idx).column; |
| _cur_type = _block.get_by_position(idx).type; |
| auto column = _cur_col->convert_to_full_column_if_const(); |
| auto arrow_type = _schema->field(idx)->type(); |
| if (arrow_type->name() == "utf8" && column->byte_size() >= MAX_ARROW_UTF8) { |
| arrow_type = arrow::large_utf8(); |
| } |
| std::unique_ptr<arrow::ArrayBuilder> builder; |
| auto arrow_st = arrow::MakeBuilder(_pool, arrow_type, &builder); |
| if (!arrow_st.ok()) { |
| return to_doris_status(arrow_st); |
| } |
| _cur_builder = builder.get(); |
| try { |
| RETURN_IF_ERROR(_cur_type->get_serde()->write_column_to_arrow( |
| *column, nullptr, _cur_builder, _cur_start, _cur_start + _cur_rows, |
| _timezone_obj)); |
| } catch (std::exception& e) { |
| return Status::InternalError( |
| "Fail to convert block data to arrow data, type: {}, name: {}, error: {}", |
| _cur_type->get_name(), _block.get_by_position(idx).name, e.what()); |
| } |
| arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]); |
| if (!arrow_st.ok()) { |
| return to_doris_status(arrow_st); |
| } |
| } |
| *out = arrow::RecordBatch::Make(_schema, actual_rows, std::move(_arrays)); |
| return Status::OK(); |
| } |
| |
| Status FromRecordBatchToBlockConverter::convert(Block* block) { |
| DCHECK(block); |
| int num_fields = _batch->num_columns(); |
| if ((size_t)num_fields != _types.size()) { |
| return Status::InvalidArgument("number fields not match"); |
| } |
| |
| int64_t num_rows = _batch->num_rows(); |
| _columns.reserve(num_fields); |
| |
| for (int idx = 0; idx < num_fields; ++idx) { |
| auto doris_type = _types[idx]; |
| auto doris_column = doris_type->create_column(); |
| auto arrow_column = _batch->column(idx); |
| DCHECK_EQ(arrow_column->length(), num_rows); |
| RETURN_IF_ERROR(doris_type->get_serde()->read_column_from_arrow( |
| *doris_column, &*arrow_column, 0, num_rows, _timezone_obj)); |
| _columns.emplace_back(std::move(doris_column), std::move(doris_type), std::to_string(idx)); |
| } |
| |
| block->swap(_columns); |
| return Status::OK(); |
| } |
| |
| Status convert_to_arrow_batch(const Block& block, const std::shared_ptr<arrow::Schema>& schema, |
| arrow::MemoryPool* pool, std::shared_ptr<arrow::RecordBatch>* result, |
| const cctz::time_zone& timezone_obj) { |
| FromBlockToRecordBatchConverter converter(block, schema, pool, timezone_obj); |
| return converter.convert(result); |
| } |
| |
| Status convert_to_arrow_batch(const Block& block, const std::shared_ptr<arrow::Schema>& schema, |
| arrow::MemoryPool* pool, std::shared_ptr<arrow::RecordBatch>* result, |
| const cctz::time_zone& timezone_obj, size_t start_row, |
| size_t end_row) { |
| FromBlockToRecordBatchConverter converter(block, schema, pool, timezone_obj, start_row, |
| end_row); |
| return converter.convert(result); |
| } |
| |
| Status convert_from_arrow_batch(const std::shared_ptr<arrow::RecordBatch>& batch, |
| const DataTypes& types, Block* block, |
| const cctz::time_zone& timezone_obj) { |
| FromRecordBatchToBlockConverter converter(batch, types, timezone_obj); |
| return converter.convert(block); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |