blob: b13392360b25e54360644ab1ec03a2c0d68842dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/arrow/block_convertor.h"
#include <arrow/array/builder_base.h>
#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_decimal.h>
#include <arrow/array/builder_nested.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/record_batch.h>
#include <arrow/status.h>
#include <arrow/type.h>
#include <arrow/util/decimal.h>
#include <arrow/visit_type_inline.h>
#include <arrow/visitor.h>
#include <cctz/time_zone.h>
#include <glog/logging.h>
#include <ctime>
#include <memory>
#include <utility>
#include <vector>
#include "common/status.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/utils.h"
#include "vec/columns/column.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/runtime/vdatetime_value.h"
namespace arrow {
class Array;
} // namespace arrow
namespace doris {
class FromBlockConverter {
public:
FromBlockConverter(const vectorized::Block& block, const std::shared_ptr<arrow::Schema>& schema,
arrow::MemoryPool* pool, const cctz::time_zone& timezone_obj)
: _block(block),
_schema(schema),
_pool(pool),
_cur_field_idx(-1),
_timezone_obj(timezone_obj) {}
~FromBlockConverter() = default;
Status convert(std::shared_ptr<arrow::RecordBatch>* out);
private:
const vectorized::Block& _block;
const std::shared_ptr<arrow::Schema>& _schema;
arrow::MemoryPool* _pool;
size_t _cur_field_idx;
size_t _cur_start;
size_t _cur_rows;
vectorized::ColumnPtr _cur_col;
vectorized::DataTypePtr _cur_type;
arrow::ArrayBuilder* _cur_builder = nullptr;
const cctz::time_zone& _timezone_obj;
std::vector<std::shared_ptr<arrow::Array>> _arrays;
};
Status FromBlockConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
size_t num_fields = _schema->num_fields();
if (_block.columns() != num_fields) {
return Status::InvalidArgument("number fields not match");
}
_arrays.resize(num_fields);
for (size_t idx = 0; idx < num_fields; ++idx) {
_cur_field_idx = idx;
_cur_start = 0;
_cur_rows = _block.rows();
_cur_col = _block.get_by_position(idx).column;
_cur_type = _block.get_by_position(idx).type;
auto column = _cur_col->convert_to_full_column_if_const();
auto arrow_type = _schema->field(idx)->type();
if (arrow_type->name() == "utf8" && column->byte_size() >= MAX_ARROW_UTF8) {
arrow_type = arrow::large_utf8();
}
std::unique_ptr<arrow::ArrayBuilder> builder;
auto arrow_st = arrow::MakeBuilder(_pool, arrow_type, &builder);
if (!arrow_st.ok()) {
return to_doris_status(arrow_st);
}
_cur_builder = builder.get();
try {
RETURN_IF_ERROR(_cur_type->get_serde()->write_column_to_arrow(
*column, nullptr, _cur_builder, _cur_start, _cur_start + _cur_rows,
_timezone_obj));
} catch (std::exception& e) {
return Status::InternalError(
"Fail to convert block data to arrow data, type: {}, name: {}, error: {}",
_cur_type->get_name(), _block.get_by_position(idx).name, e.what());
}
arrow_st = _cur_builder->Finish(&_arrays[_cur_field_idx]);
if (!arrow_st.ok()) {
return to_doris_status(arrow_st);
}
}
*out = arrow::RecordBatch::Make(_schema, _block.rows(), std::move(_arrays));
return Status::OK();
}
Status convert_to_arrow_batch(const vectorized::Block& block,
const std::shared_ptr<arrow::Schema>& schema, arrow::MemoryPool* pool,
std::shared_ptr<arrow::RecordBatch>* result,
const cctz::time_zone& timezone_obj) {
FromBlockConverter converter(block, schema, pool, timezone_obj);
return converter.convert(result);
}
} // namespace doris