blob: 373d9c7c84c3f6c649c4a64618c3a90e5fa0bd87 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/runtime/vorc_transformer.h"
#include <glog/logging.h>
#include <stdlib.h>
#include <string.h>
#include <exception>
#include <ostream>
#include "common/cast_set.h"
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "orc/Int128.hh"
#include "orc/MemoryPool.hh"
#include "orc/OrcFile.hh"
#include "orc/Type.hh"
#include "orc/Vector.hh"
#include "runtime/define_primitive_type.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/binary_cast.hpp"
#include "util/debug_util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_complex.h"
#include "vec/columns/column_decimal.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/pod_array.h"
#include "vec/common/string_ref.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
VOrcOutputStream::VOrcOutputStream(doris::io::FileWriter* file_writer)
: _file_writer(file_writer), _cur_pos(0), _written_len(0), _name("VOrcOutputStream") {}
VOrcOutputStream::~VOrcOutputStream() {
if (!_is_closed) {
try {
close();
} catch (...) {
/*
* Under normal circumstances, close() will be called first, and then the destructor will be called.
* If the task is canceled, close() will not be executed, but the destructor will be called directly,
* which will cause the be core.When the task is canceled, since the log file has been written during
* close(), no operation is performed here.
*/
}
}
}
void VOrcOutputStream::close() {
if (!_is_closed) {
Defer defer {[this] { _is_closed = true; }};
Status st = _file_writer->close();
if (!st.ok()) {
LOG(WARNING) << "close orc output stream failed: " << st;
throw std::runtime_error(st.to_string());
}
}
}
void VOrcOutputStream::write(const void* data, size_t length) {
if (!_is_closed) {
Status st = _file_writer->append({static_cast<const uint8_t*>(data), length});
if (!st.ok()) {
LOG(WARNING) << "Write to ORC file failed: " << st;
// When a write error occurs,
// the error needs to be thrown to the upper layer.
// so that fe can get the exception.
throw std::runtime_error(st.to_string());
}
_cur_pos += length;
_written_len += length;
}
}
void VOrcOutputStream::set_written_len(int64_t written_len) {
_written_len = written_len;
}
VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
const VExprContextSPtrs& output_vexpr_ctxs, std::string schema,
std::vector<std::string> column_names, bool output_object_data,
TFileCompressType::type compress_type,
const iceberg::Schema* iceberg_schema)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_column_names(std::move(column_names)),
_write_options(new orc::WriterOptions()),
_schema_str(std::move(schema)),
_iceberg_schema(iceberg_schema) {
_write_options->setTimezoneName(_state->timezone());
_write_options->setUseTightNumericVector(true);
set_compression_type(compress_type);
}
Status VOrcTransformer::open() {
if (!_schema_str.empty()) {
try {
_schema = orc::Type::buildTypeFromString(_schema_str);
} catch (const std::exception& e) {
return Status::InternalError("Orc build schema from \"{}\" failed: {}", _schema_str,
e.what());
}
} else {
_schema = orc::createStructType();
const std::vector<iceberg::NestedField>* nested_fields = nullptr;
if (_iceberg_schema != nullptr) {
const iceberg::StructType& iceberg_root_struct_type = _iceberg_schema->root_struct();
nested_fields = &iceberg_root_struct_type.fields();
}
for (int i = 0; i < _output_vexpr_ctxs.size(); i++) {
VExprSPtr column_expr = _output_vexpr_ctxs[i]->root();
try {
std::unique_ptr<orc::Type> orc_type = _build_orc_type(
column_expr->data_type(), nested_fields ? &(*nested_fields)[i] : nullptr);
_schema->addStructField(_column_names[i], std::move(orc_type));
} catch (doris::Exception& e) {
return e.to_status();
}
}
}
_output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
try {
_write_options->setMemoryPool(ExecEnv::GetInstance()->orc_memory_pool());
_writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options);
} catch (const std::exception& e) {
return Status::InternalError("failed to create writer: {}", e.what());
}
_writer->addUserMetadata("CreatedBy", doris::get_short_version());
return Status::OK();
}
void VOrcTransformer::set_compression_type(const TFileCompressType::type& compress_type) {
switch (compress_type) {
case TFileCompressType::PLAIN: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_NONE);
break;
}
case TFileCompressType::SNAPPYBLOCK: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_SNAPPY);
break;
}
case TFileCompressType::ZLIB: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
break;
}
case TFileCompressType::ZSTD: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZSTD);
break;
}
default: {
_write_options->setCompression(orc::CompressionKind::CompressionKind_ZLIB);
}
}
}
std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(
const DataTypePtr& data_type, const iceberg::NestedField* nested_field) {
std::unique_ptr<orc::Type> type;
switch (data_type->get_primitive_type()) {
case TYPE_BOOLEAN: {
type = orc::createPrimitiveType(orc::BOOLEAN);
break;
}
case TYPE_TINYINT: {
type = orc::createPrimitiveType(orc::BYTE);
break;
}
case TYPE_SMALLINT: {
type = orc::createPrimitiveType(orc::SHORT);
break;
}
case TYPE_IPV4:
case TYPE_INT: {
type = orc::createPrimitiveType(orc::INT);
break;
}
case TYPE_BIGINT: {
type = orc::createPrimitiveType(orc::LONG);
type->setAttribute(ICEBERG_LONG_TYPE, "LONG");
break;
}
case TYPE_FLOAT: {
type = orc::createPrimitiveType(orc::FLOAT);
break;
}
case TYPE_DOUBLE: {
type = orc::createPrimitiveType(orc::DOUBLE);
break;
}
case TYPE_CHAR: {
type = orc::createCharType(
orc::CHAR,
assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len());
break;
}
case TYPE_VARCHAR: {
type = orc::createCharType(
orc::VARCHAR,
assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len());
break;
}
case TYPE_STRING:
case TYPE_IPV6:
case TYPE_BINARY: {
type = orc::createPrimitiveType(orc::STRING);
break;
}
case TYPE_DATEV2: {
type = orc::createPrimitiveType(orc::DATE);
break;
}
case TYPE_DATETIMEV2: {
type = orc::createPrimitiveType(orc::TIMESTAMP);
break;
}
case TYPE_DECIMAL32: {
type = orc::createDecimalType(data_type->get_precision(), data_type->get_scale());
break;
}
case TYPE_DECIMAL64: {
type = orc::createDecimalType(data_type->get_precision(), data_type->get_scale());
break;
}
case TYPE_DECIMAL128I: {
type = orc::createDecimalType(data_type->get_precision(), data_type->get_scale());
break;
}
case TYPE_VARBINARY: {
type = orc::createPrimitiveType(orc::BINARY);
break;
}
case TYPE_STRUCT: {
type = orc::createStructType();
const auto* type_struct =
assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get());
for (int j = 0; j < type_struct->get_elements().size(); ++j) {
type->addStructField(
type_struct->get_element_name(j),
_build_orc_type(
type_struct->get_element(j),
nested_field
? &nested_field->field_type()->as_struct_type()->fields()[j]
: nullptr));
}
break;
}
case TYPE_ARRAY: {
const auto* type_arr = assert_cast<const DataTypeArray*>(remove_nullable(data_type).get());
type = orc::createListType(_build_orc_type(
type_arr->get_nested_type(),
nested_field ? &nested_field->field_type()->as_list_type()->element_field()
: nullptr));
break;
}
case TYPE_MAP: {
const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(data_type).get());
type = orc::createMapType(
_build_orc_type(type_map->get_key_type(),
nested_field
? &nested_field->field_type()->as_map_type()->key_field()
: nullptr),
_build_orc_type(type_map->get_value_type(),
nested_field
? &nested_field->field_type()->as_map_type()->value_field()
: nullptr));
break;
}
default: {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
"Unsupported type {} to build orc type", data_type->get_name());
}
}
if (nested_field != nullptr) {
type->setAttribute(ORC_ICEBERG_ID_KEY, std::to_string(nested_field->field_id()));
type->setAttribute(ORC_ICEBERG_REQUIRED_KEY, std::to_string(nested_field->is_required()));
}
return type;
}
std::unique_ptr<orc::ColumnVectorBatch> VOrcTransformer::_create_row_batch(size_t sz) {
return _writer->createRowBatch(sz);
}
int64_t VOrcTransformer::written_len() {
// written_len() will be called in VFileResultWriter::_close_file_writer
// but _output_stream may be nullptr
// because the failure built by _schema in open()
if (_output_stream) {
return _output_stream->getLength();
}
return 0;
}
Status VOrcTransformer::close() {
try {
if (_writer != nullptr) {
_writer->close();
}
if (_output_stream) {
_output_stream->close();
}
} catch (const std::exception& e) {
return Status::IOError(e.what());
}
return Status::OK();
}
Status VOrcTransformer::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
}
// Buffer used by date/datetime/datev2/datetimev2/largeint type
Arena arena;
int sz = cast_set<int>(block.rows());
auto row_batch = _create_row_batch(sz);
auto* root = dynamic_cast<orc::StructVectorBatch*>(row_batch.get());
try {
DataTypeSerDe::FormatOptions options;
options.timezone = &_state->timezone_obj();
for (size_t i = 0; i < block.columns(); i++) {
const auto& col = block.get_by_position(i);
const auto& raw_column = col.column;
RETURN_IF_ERROR(_resize_row_batch(col.type, *raw_column, root->fields[i]));
RETURN_IF_ERROR(_serdes[i]->write_column_to_orc(_state->timezone(), *raw_column,
nullptr, root->fields[i], 0, sz, arena,
options));
}
root->numElements = sz;
_writer->add(*row_batch);
_cur_written_rows += sz;
} catch (const std::exception& e) {
LOG(WARNING) << "Orc write error: " << e.what();
return Status::InternalError(e.what());
}
return Status::OK();
}
Status VOrcTransformer::_resize_row_batch(const DataTypePtr& type, const IColumn& column,
orc::ColumnVectorBatch* orc_col_batch) {
auto real_type = remove_nullable(type);
switch (type->get_primitive_type()) {
case TYPE_STRUCT: {
auto* struct_batch = dynamic_cast<orc::StructVectorBatch*>(orc_col_batch);
const auto& struct_col =
column.is_nullable()
? assert_cast<const ColumnStruct&>(
assert_cast<const ColumnNullable&>(column).get_nested_column())
: assert_cast<const ColumnStruct&>(column);
int idx = 0;
for (auto* child : struct_batch->fields) {
const IColumn& child_column = struct_col.get_column(idx);
child->resize(child_column.size());
auto child_type = assert_cast<const vectorized::DataTypeStruct*>(real_type.get())
->get_element(idx);
++idx;
RETURN_IF_ERROR(_resize_row_batch(child_type, child_column, child));
}
break;
}
case TYPE_MAP: {
auto* map_batch = dynamic_cast<orc::MapVectorBatch*>(orc_col_batch);
const auto& map_column =
column.is_nullable()
? assert_cast<const ColumnMap&>(
assert_cast<const ColumnNullable&>(column).get_nested_column())
: assert_cast<const ColumnMap&>(column);
// key of map
const IColumn& nested_keys_column = map_column.get_keys();
map_batch->keys->resize(nested_keys_column.size());
auto key_type =
assert_cast<const vectorized::DataTypeMap*>(real_type.get())->get_key_type();
RETURN_IF_ERROR(_resize_row_batch(key_type, nested_keys_column, map_batch->keys.get()));
// value of map
const IColumn& nested_values_column = map_column.get_values();
map_batch->elements->resize(nested_values_column.size());
auto value_type =
assert_cast<const vectorized::DataTypeMap*>(real_type.get())->get_value_type();
RETURN_IF_ERROR(
_resize_row_batch(value_type, nested_values_column, map_batch->elements.get()));
break;
}
case TYPE_ARRAY: {
auto* list_batch = dynamic_cast<orc::ListVectorBatch*>(orc_col_batch);
const auto& array_col =
column.is_nullable()
? assert_cast<const ColumnArray&>(
assert_cast<const ColumnNullable&>(column).get_nested_column())
: assert_cast<const ColumnArray&>(column);
const IColumn& nested_column = array_col.get_data();
list_batch->elements->resize(nested_column.size());
auto child_type =
assert_cast<const vectorized::DataTypeArray*>(real_type.get())->get_nested_type();
RETURN_IF_ERROR(_resize_row_batch(child_type, nested_column, list_batch->elements.get()));
}
default:
break;
}
return Status::OK();
}
} // namespace doris::vectorized