| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/runtime/vparquet_writer.h" |
| |
| #include <arrow/io/type_fwd.h> |
| #include <glog/logging.h> |
| #include <math.h> |
| #include <parquet/column_writer.h> |
| #include <parquet/platform.h> |
| #include <parquet/schema.h> |
| #include <parquet/type_fwd.h> |
| #include <parquet/types.h> |
| #include <time.h> |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <exception> |
| #include <ostream> |
| #include <string> |
| |
| #include "common/status.h" |
| #include "gutil/endian.h" |
| #include "io/fs/file_writer.h" |
| #include "olap/olap_common.h" |
| #include "runtime/decimalv2_value.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/types.h" |
| #include "util/binary_cast.hpp" |
| #include "util/mysql_global.h" |
| #include "util/types.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_complex.h" |
| #include "vec/columns/column_decimal.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_string.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/columns/columns_number.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type_decimal.h" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/functions/function_helpers.h" |
| #include "vec/runtime/vdatetime_value.h" |
| |
| namespace doris::vectorized { |
| |
| static const std::string epoch_date_str = "1970-01-01"; |
| static const int64_t timestamp_threshold = -2177481943; |
| static const int64_t timestamp_diff = 343; |
| |
| ParquetOutputStream::ParquetOutputStream(doris::io::FileWriter* file_writer) |
| : _file_writer(file_writer), _cur_pos(0), _written_len(0) { |
| set_mode(arrow::io::FileMode::WRITE); |
| } |
| |
| ParquetOutputStream::~ParquetOutputStream() { |
| arrow::Status st = Close(); |
| if (!st.ok()) { |
| LOG(WARNING) << "close parquet file error: " << st.ToString(); |
| } |
| } |
| |
| arrow::Status ParquetOutputStream::Write(const void* data, int64_t nbytes) { |
| if (_is_closed) { |
| return arrow::Status::OK(); |
| } |
| size_t written_len = nbytes; |
| Status st = _file_writer->append({static_cast<const uint8_t*>(data), written_len}); |
| if (!st.ok()) { |
| return arrow::Status::IOError(st.to_string()); |
| } |
| _cur_pos += written_len; |
| _written_len += written_len; |
| return arrow::Status::OK(); |
| } |
| |
| arrow::Result<int64_t> ParquetOutputStream::Tell() const { |
| return _cur_pos; |
| } |
| |
| arrow::Status ParquetOutputStream::Close() { |
| if (!_is_closed) { |
| Defer defer {[this] { _is_closed = true; }}; |
| Status st = _file_writer->close(); |
| if (!st.ok()) { |
| LOG(WARNING) << "close parquet output stream failed: " << st; |
| return arrow::Status::IOError(st.to_string()); |
| } |
| } |
| return arrow::Status::OK(); |
| } |
| |
| int64_t ParquetOutputStream::get_written_len() const { |
| return _written_len; |
| } |
| |
| void ParquetOutputStream::set_written_len(int64_t written_len) { |
| _written_len = written_len; |
| } |
| |
| void ParquetBuildHelper::build_schema_repetition_type( |
| parquet::Repetition::type& parquet_repetition_type, |
| const TParquetRepetitionType::type& column_repetition_type) { |
| switch (column_repetition_type) { |
| case TParquetRepetitionType::REQUIRED: { |
| parquet_repetition_type = parquet::Repetition::REQUIRED; |
| break; |
| } |
| case TParquetRepetitionType::REPEATED: { |
| parquet_repetition_type = parquet::Repetition::REPEATED; |
| break; |
| } |
| case TParquetRepetitionType::OPTIONAL: { |
| parquet_repetition_type = parquet::Repetition::OPTIONAL; |
| break; |
| } |
| default: |
| parquet_repetition_type = parquet::Repetition::UNDEFINED; |
| } |
| } |
| |
| void ParquetBuildHelper::build_schema_data_type(parquet::Type::type& parquet_data_type, |
| const TParquetDataType::type& column_data_type) { |
| switch (column_data_type) { |
| case TParquetDataType::BOOLEAN: { |
| parquet_data_type = parquet::Type::BOOLEAN; |
| break; |
| } |
| case TParquetDataType::INT32: { |
| parquet_data_type = parquet::Type::INT32; |
| break; |
| } |
| case TParquetDataType::INT64: { |
| parquet_data_type = parquet::Type::INT64; |
| break; |
| } |
| case TParquetDataType::INT96: { |
| parquet_data_type = parquet::Type::INT96; |
| break; |
| } |
| case TParquetDataType::BYTE_ARRAY: { |
| parquet_data_type = parquet::Type::BYTE_ARRAY; |
| break; |
| } |
| case TParquetDataType::FLOAT: { |
| parquet_data_type = parquet::Type::FLOAT; |
| break; |
| } |
| case TParquetDataType::DOUBLE: { |
| parquet_data_type = parquet::Type::DOUBLE; |
| break; |
| } |
| case TParquetDataType::FIXED_LEN_BYTE_ARRAY: { |
| parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY; |
| break; |
| } |
| default: |
| parquet_data_type = parquet::Type::UNDEFINED; |
| } |
| } |
| |
| void ParquetBuildHelper::build_schema_data_logical_type( |
| std::shared_ptr<const parquet::LogicalType>& parquet_data_logical_type_ptr, |
| const TParquetDataLogicalType::type& column_data_logical_type, int* primitive_length, |
| const TypeDescriptor& type_desc) { |
| switch (column_data_logical_type) { |
| case TParquetDataLogicalType::DECIMAL: { |
| DCHECK(type_desc.precision != -1 && type_desc.scale != -1) |
| << "precision and scale: " << type_desc.precision << " " << type_desc.scale; |
| if (type_desc.type == TYPE_DECIMAL32) { |
| *primitive_length = 4; |
| } else if (type_desc.type == TYPE_DECIMAL64) { |
| *primitive_length = 8; |
| } else if (type_desc.type == TYPE_DECIMAL128I) { |
| *primitive_length = 16; |
| } else { |
| throw parquet::ParquetException( |
| "the logical decimal now only support in decimalv3, maybe error of " + |
| type_desc.debug_string()); |
| } |
| parquet_data_logical_type_ptr = |
| parquet::LogicalType::Decimal(type_desc.precision, type_desc.scale); |
| break; |
| } |
| case TParquetDataLogicalType::STRING: { |
| parquet_data_logical_type_ptr = parquet::LogicalType::String(); |
| break; |
| } |
| case TParquetDataLogicalType::DATE: { |
| parquet_data_logical_type_ptr = parquet::LogicalType::Date(); |
| break; |
| } |
| case TParquetDataLogicalType::TIMESTAMP: { |
| parquet_data_logical_type_ptr = |
| parquet::LogicalType::Timestamp(true, parquet::LogicalType::TimeUnit::MILLIS, true); |
| break; |
| } |
| default: { |
| parquet_data_logical_type_ptr = parquet::LogicalType::None(); |
| } |
| } |
| } |
| |
| void ParquetBuildHelper::build_compression_type( |
| parquet::WriterProperties::Builder& builder, |
| const TParquetCompressionType::type& compression_type) { |
| switch (compression_type) { |
| case TParquetCompressionType::SNAPPY: { |
| builder.compression(parquet::Compression::SNAPPY); |
| break; |
| } |
| case TParquetCompressionType::GZIP: { |
| builder.compression(parquet::Compression::GZIP); |
| break; |
| } |
| case TParquetCompressionType::BROTLI: { |
| builder.compression(parquet::Compression::BROTLI); |
| break; |
| } |
| case TParquetCompressionType::ZSTD: { |
| builder.compression(parquet::Compression::ZSTD); |
| break; |
| } |
| case TParquetCompressionType::LZ4: { |
| builder.compression(parquet::Compression::LZ4); |
| break; |
| } |
| case TParquetCompressionType::LZO: { |
| builder.compression(parquet::Compression::LZO); |
| break; |
| } |
| case TParquetCompressionType::BZ2: { |
| builder.compression(parquet::Compression::BZ2); |
| break; |
| } |
| case TParquetCompressionType::UNCOMPRESSED: { |
| builder.compression(parquet::Compression::UNCOMPRESSED); |
| break; |
| } |
| default: |
| builder.compression(parquet::Compression::UNCOMPRESSED); |
| } |
| } |
| |
| void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& builder, |
| const TParquetVersion::type& parquet_version) { |
| switch (parquet_version) { |
| case TParquetVersion::PARQUET_1_0: { |
| builder.version(parquet::ParquetVersion::PARQUET_1_0); |
| break; |
| } |
| case TParquetVersion::PARQUET_2_LATEST: { |
| builder.version(parquet::ParquetVersion::PARQUET_2_LATEST); |
| break; |
| } |
| default: |
| builder.version(parquet::ParquetVersion::PARQUET_1_0); |
| } |
| } |
| |
| VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* file_writer, |
| const VExprContextSPtrs& output_vexpr_ctxs, |
| const std::vector<TParquetSchema>& parquet_schemas, |
| const TParquetCompressionType::type& compression_type, |
| const bool& parquet_disable_dictionary, |
| const TParquetVersion::type& parquet_version, |
| bool output_object_data) |
| : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), |
| _rg_writer(nullptr), |
| _parquet_schemas(parquet_schemas), |
| _compression_type(compression_type), |
| _parquet_disable_dictionary(parquet_disable_dictionary), |
| _parquet_version(parquet_version) { |
| _outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer)); |
| } |
| |
| Status VParquetWriterWrapper::parse_properties() { |
| try { |
| parquet::WriterProperties::Builder builder; |
| ParquetBuildHelper::build_compression_type(builder, _compression_type); |
| ParquetBuildHelper::build_version(builder, _parquet_version); |
| if (_parquet_disable_dictionary) { |
| builder.disable_dictionary(); |
| } else { |
| builder.enable_dictionary(); |
| } |
| _properties = builder.build(); |
| } catch (const parquet::ParquetException& e) { |
| return Status::InternalError("parquet writer parse properties error: {}", e.what()); |
| } |
| return Status::OK(); |
| } |
| |
| Status VParquetWriterWrapper::parse_schema() { |
| parquet::schema::NodeVector fields; |
| parquet::Repetition::type parquet_repetition_type; |
| parquet::Type::type parquet_physical_type; |
| std::shared_ptr<const parquet::LogicalType> parquet_data_logical_type; |
| int primitive_length = -1; |
| for (int idx = 0; idx < _parquet_schemas.size(); ++idx) { |
| primitive_length = -1; |
| ParquetBuildHelper::build_schema_repetition_type( |
| parquet_repetition_type, _parquet_schemas[idx].schema_repetition_type); |
| ParquetBuildHelper::build_schema_data_type(parquet_physical_type, |
| _parquet_schemas[idx].schema_data_type); |
| ParquetBuildHelper::build_schema_data_logical_type( |
| parquet_data_logical_type, _parquet_schemas[idx].schema_data_logical_type, |
| &primitive_length, _output_vexpr_ctxs[idx]->root()->type()); |
| try { |
| fields.push_back(parquet::schema::PrimitiveNode::Make( |
| _parquet_schemas[idx].schema_column_name, parquet_repetition_type, |
| parquet_data_logical_type, parquet_physical_type, primitive_length)); |
| } catch (const parquet::ParquetException& e) { |
| LOG(WARNING) << "parquet writer parse schema error: " << e.what(); |
| return Status::InternalError("parquet writer parse schema error: {}", e.what()); |
| } |
| _schema = std::static_pointer_cast<parquet::schema::GroupNode>( |
| parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); |
| } |
| return Status::OK(); |
| } |
| |
| #define RETURN_WRONG_TYPE \ |
| return Status::InvalidArgument("Invalid column type: {}", raw_column->get_name()); |
| |
| #define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE) \ |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ |
| parquet::WRITER* col_writer = static_cast<parquet::WRITER*>(rgWriter->column(i)); \ |
| if (null_map != nullptr) { \ |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \ |
| if (const auto* column = check_and_get_column<const COLUMN_TYPE>(col)) { \ |
| for (size_t row_id = 0; row_id < sz; row_id++) { \ |
| if (null_data[row_id] != 0) { \ |
| single_def_level = 0; \ |
| } \ |
| col_writer->WriteBatch( \ |
| 1, &single_def_level, nullptr, \ |
| reinterpret_cast<const NATIVE_TYPE*>(&column->get_data()[row_id])); \ |
| single_def_level = 1; \ |
| } \ |
| } \ |
| } else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \ |
| col_writer->WriteBatch( \ |
| sz, nullable ? def_level.data() : nullptr, nullptr, \ |
| reinterpret_cast<const NATIVE_TYPE*>(not_nullable_column->get_data().data())); \ |
| } else { \ |
| RETURN_WRONG_TYPE \ |
| } |
| |
| #define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE) \ |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ |
| parquet::ByteArrayWriter* col_writer = \ |
| static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \ |
| if (null_map != nullptr) { \ |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \ |
| for (size_t row_id = 0; row_id < sz; row_id++) { \ |
| if (null_data[row_id] != 0) { \ |
| single_def_level = 0; \ |
| parquet::ByteArray value; \ |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ |
| single_def_level = 1; \ |
| } else { \ |
| const auto& tmp = col->get_data_at(row_id); \ |
| parquet::ByteArray value; \ |
| value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \ |
| value.len = tmp.size; \ |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ |
| } \ |
| } \ |
| } else if (const auto* not_nullable_column = check_and_get_column<const COLUMN_TYPE>(col)) { \ |
| for (size_t row_id = 0; row_id < sz; row_id++) { \ |
| const auto& tmp = not_nullable_column->get_data_at(row_id); \ |
| parquet::ByteArray value; \ |
| value.ptr = reinterpret_cast<const uint8_t*>(tmp.data); \ |
| value.len = tmp.size; \ |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, &value); \ |
| } \ |
| } else { \ |
| RETURN_WRONG_TYPE \ |
| } |
| |
| Status VParquetWriterWrapper::write(const Block& block) { |
| if (block.rows() == 0) { |
| return Status::OK(); |
| } |
| size_t sz = block.rows(); |
| try { |
| for (size_t i = 0; i < block.columns(); i++) { |
| auto& raw_column = block.get_by_position(i).column; |
| auto nullable = raw_column->is_nullable(); |
| const auto col = nullable ? reinterpret_cast<const ColumnNullable*>( |
| block.get_by_position(i).column.get()) |
| ->get_nested_column_ptr() |
| .get() |
| : block.get_by_position(i).column.get(); |
| auto null_map = nullable && reinterpret_cast<const ColumnNullable*>( |
| block.get_by_position(i).column.get()) |
| ->has_null() |
| ? reinterpret_cast<const ColumnNullable*>( |
| block.get_by_position(i).column.get()) |
| ->get_null_map_column_ptr() |
| : nullptr; |
| auto& type = block.get_by_position(i).type; |
| |
| std::vector<int16_t> def_level(sz); |
| // For scalar type, definition level == 1 means this value is not NULL. |
| std::fill(def_level.begin(), def_level.end(), 1); |
| int16_t single_def_level = 1; |
| switch (_output_vexpr_ctxs[i]->root()->type().type) { |
| case TYPE_BOOLEAN: { |
| DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, ColumnVector<UInt8>, bool) |
| break; |
| } |
| case TYPE_BIGINT: { |
| DISPATCH_PARQUET_NUMERIC_WRITER(Int64Writer, ColumnVector<Int64>, int64_t) |
| break; |
| } |
| case TYPE_LARGEINT: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::ByteArrayWriter* col_writer = |
| static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); |
| parquet::ByteArray value; |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| const int128_t tmp = assert_cast<const ColumnVector<Int128>&>(*col) |
| .get_data()[row_id]; |
| std::string value_str = fmt::format("{}", tmp); |
| value.ptr = reinterpret_cast<const uint8_t*>(value_str.data()); |
| value.len = value_str.length(); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else if (const auto* not_nullable_column = |
| check_and_get_column<const ColumnVector<Int128>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| const int128_t tmp = not_nullable_column->get_data()[row_id]; |
| std::string value_str = fmt::format("{}", tmp); |
| value.ptr = reinterpret_cast<const uint8_t*>(value_str.data()); |
| value.len = value_str.length(); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_FLOAT: { |
| DISPATCH_PARQUET_NUMERIC_WRITER(FloatWriter, ColumnVector<Float32>, float_t) |
| break; |
| } |
| case TYPE_DOUBLE: { |
| DISPATCH_PARQUET_NUMERIC_WRITER(DoubleWriter, ColumnVector<Float64>, double_t) |
| break; |
| } |
| case TYPE_TINYINT: |
| case TYPE_SMALLINT: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::Int32Writer* col_writer = |
| static_cast<parquet::Int32Writer*>(rgWriter->column(i)); |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| if (const auto* int16_column = |
| check_and_get_column<const ColumnVector<Int16>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| } |
| const int32_t tmp = int16_column->get_data()[row_id]; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, |
| reinterpret_cast<const int32_t*>(&tmp)); |
| single_def_level = 1; |
| } |
| } else if (const auto* int8_column = |
| check_and_get_column<const ColumnVector<Int8>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| } |
| const int32_t tmp = int8_column->get_data()[row_id]; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, |
| reinterpret_cast<const int32_t*>(&tmp)); |
| single_def_level = 1; |
| } |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| } else if (const auto& int16_column = |
| check_and_get_column<const ColumnVector<Int16>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| const int32_t tmp = int16_column->get_data()[row_id]; |
| col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, |
| reinterpret_cast<const int32_t*>(&tmp)); |
| } |
| } else if (const auto& int8_column = |
| check_and_get_column<const ColumnVector<Int8>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| const int32_t tmp = int8_column->get_data()[row_id]; |
| col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, |
| reinterpret_cast<const int32_t*>(&tmp)); |
| } |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_INT: { |
| DISPATCH_PARQUET_NUMERIC_WRITER(Int32Writer, ColumnVector<Int32>, Int32) |
| break; |
| } |
| case TYPE_DATETIME: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::Int64Writer* col_writer = |
| static_cast<parquet::Int64Writer*>(rgWriter->column(i)); |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| def_level[row_id] = null_data[row_id] == 0; |
| } |
| int64_t tmp_data[sz]; |
| int idx = 0; |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] == 0) { |
| VecDateTimeValue datetime_value = binary_cast<Int64, VecDateTimeValue>( |
| assert_cast<const ColumnVector<Int64>&>(*col) |
| .get_data()[row_id]); |
| if (!datetime_value.unix_timestamp(&tmp_data[idx], |
| TimezoneUtils::default_time_zone)) { |
| return Status::InternalError("get unix timestamp error."); |
| } |
| // -2177481943 represent '1900-12-31 23:54:17' |
| // but -2177481944 represent '1900-12-31 23:59:59' |
| // so for timestamp <= -2177481944, we subtract 343 (5min 43s) |
| if (tmp_data[idx] < timestamp_threshold) { |
| tmp_data[idx] -= timestamp_diff; |
| } |
| // convert seconds to MILLIS seconds |
| tmp_data[idx] *= 1000; |
| ++idx; |
| } |
| } |
| col_writer->WriteBatch(sz, def_level.data(), nullptr, |
| reinterpret_cast<const int64_t*>(tmp_data)); |
| } else if (const auto* not_nullable_column = |
| check_and_get_column<const ColumnVector<Int64>>(col)) { |
| std::vector<int64_t> res(sz); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| VecDateTimeValue datetime_value = binary_cast<Int64, VecDateTimeValue>( |
| not_nullable_column->get_data()[row_id]); |
| |
| if (!datetime_value.unix_timestamp(&res[row_id], |
| TimezoneUtils::default_time_zone)) { |
| return Status::InternalError("get unix timestamp error."); |
| }; |
| // -2177481943 represent '1900-12-31 23:54:17' |
| // but -2177481944 represent '1900-12-31 23:59:59' |
| // so for timestamp <= -2177481944, we subtract 343 (5min 43s) |
| if (res[row_id] < timestamp_threshold) { |
| res[row_id] -= timestamp_diff; |
| } |
| // convert seconds to MILLIS seconds |
| res[row_id] *= 1000; |
| } |
| col_writer->WriteBatch(sz, nullable ? def_level.data() : nullptr, nullptr, |
| reinterpret_cast<const int64_t*>(res.data())); |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_DATE: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::Int64Writer* col_writer = |
| static_cast<parquet::Int64Writer*>(rgWriter->column(i)); |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| def_level[row_id] = null_data[row_id] == 0; |
| } |
| VecDateTimeValue epoch_date; |
| if (!epoch_date.from_date_str(epoch_date_str.c_str(), |
| epoch_date_str.length())) { |
| return Status::InternalError("create epoch date from string error"); |
| } |
| int32_t days_from_epoch = epoch_date.daynr(); |
| int32_t tmp_data[sz]; |
| int idx = 0; |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] == 0) { |
| int32_t days = binary_cast<Int64, VecDateTimeValue>( |
| assert_cast<const ColumnVector<Int64>&>(*col) |
| .get_data()[row_id]) |
| .daynr(); |
| tmp_data[idx++] = days - days_from_epoch; |
| } |
| } |
| col_writer->WriteBatch(sz, def_level.data(), nullptr, |
| reinterpret_cast<const int64_t*>(tmp_data)); |
| } else if (check_and_get_column<const ColumnVector<Int64>>(col)) { |
| VecDateTimeValue epoch_date; |
| if (!epoch_date.from_date_str(epoch_date_str.c_str(), |
| epoch_date_str.length())) { |
| return Status::InternalError("create epoch date from string error"); |
| } |
| int32_t days_from_epoch = epoch_date.daynr(); |
| std::vector<int32_t> res(sz); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| int32_t days = binary_cast<Int64, VecDateTimeValue>( |
| assert_cast<const ColumnVector<Int64>&>(*col) |
| .get_data()[row_id]) |
| .daynr(); |
| res[row_id] = days - days_from_epoch; |
| } |
| col_writer->WriteBatch(sz, nullable ? def_level.data() : nullptr, nullptr, |
| reinterpret_cast<const int64_t*>(res.data())); |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_DATEV2: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::ByteArrayWriter* col_writer = |
| static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); |
| parquet::ByteArray value; |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| char buffer[30]; |
| int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; |
| value.ptr = reinterpret_cast<const uint8_t*>(buffer); |
| value.len = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( |
| assert_cast<const ColumnVector<UInt32>&>(*col) |
| .get_data()[row_id]) |
| .to_buffer(buffer, output_scale); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else if (const auto* not_nullable_column = |
| check_and_get_column<const ColumnVector<UInt32>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| char buffer[30]; |
| int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; |
| value.ptr = reinterpret_cast<const uint8_t*>(buffer); |
| value.len = binary_cast<UInt32, DateV2Value<DateV2ValueType>>( |
| not_nullable_column->get_data()[row_id]) |
| .to_buffer(buffer, output_scale); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_DATETIMEV2: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::ByteArrayWriter* col_writer = |
| static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); |
| parquet::ByteArray value; |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| char buffer[30]; |
| int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; |
| value.ptr = reinterpret_cast<const uint8_t*>(buffer); |
| value.len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( |
| assert_cast<const ColumnVector<UInt64>&>(*col) |
| .get_data()[row_id]) |
| .to_buffer(buffer, output_scale); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else if (const auto* not_nullable_column = |
| check_and_get_column<const ColumnVector<UInt64>>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| char buffer[30]; |
| int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; |
| value.ptr = reinterpret_cast<const uint8_t*>(buffer); |
| value.len = binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>( |
| not_nullable_column->get_data()[row_id]) |
| .to_buffer(buffer, output_scale); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_OBJECT: { |
| if (_output_object_data) { |
| DISPATCH_PARQUET_COMPLEX_WRITER(ColumnBitmap) |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_HLL: { |
| if (_output_object_data) { |
| DISPATCH_PARQUET_COMPLEX_WRITER(ColumnHLL) |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_CHAR: |
| case TYPE_VARCHAR: |
| case TYPE_STRING: { |
| DISPATCH_PARQUET_COMPLEX_WRITER(ColumnString) |
| break; |
| } |
| case TYPE_DECIMALV2: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::ByteArrayWriter* col_writer = |
| static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); |
| parquet::ByteArray value; |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| const DecimalV2Value decimal_val(reinterpret_cast<const PackedInt128*>( |
| col->get_data_at(row_id).data) |
| ->value); |
| char decimal_buffer[MAX_DECIMAL_WIDTH]; |
| int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; |
| value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer); |
| value.len = decimal_val.to_buffer(decimal_buffer, output_scale); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else if (const auto* not_nullable_column = |
| check_and_get_column<const ColumnDecimal128>(col)) { |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| const DecimalV2Value decimal_val( |
| reinterpret_cast<const PackedInt128*>( |
| not_nullable_column->get_data_at(row_id).data) |
| ->value); |
| char decimal_buffer[MAX_DECIMAL_WIDTH]; |
| int output_scale = _output_vexpr_ctxs[i]->root()->type().scale; |
| value.ptr = reinterpret_cast<const uint8_t*>(decimal_buffer); |
| value.len = decimal_val.to_buffer(decimal_buffer, output_scale); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } else { |
| RETURN_WRONG_TYPE |
| } |
| break; |
| } |
| case TYPE_DECIMAL32: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::FixedLenByteArrayWriter* col_writer = |
| static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i)); |
| parquet::FixedLenByteArray value; |
| auto decimal_type = check_and_get_data_type<DataTypeDecimal<Decimal32>>( |
| remove_nullable(type).get()); |
| DCHECK(decimal_type); |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| const auto& data_column = assert_cast<const ColumnDecimal32&>(*col); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| auto data = data_column.get_element(row_id); |
| auto big_endian = bswap_32(data); |
| value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else { |
| const auto& data_column = assert_cast<const ColumnDecimal32&>(*col); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| auto data = data_column.get_element(row_id); |
| auto big_endian = bswap_32(data); |
| value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } |
| break; |
| } |
| case TYPE_DECIMAL64: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::FixedLenByteArrayWriter* col_writer = |
| static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i)); |
| parquet::FixedLenByteArray value; |
| auto decimal_type = check_and_get_data_type<DataTypeDecimal<Decimal64>>( |
| remove_nullable(type).get()); |
| DCHECK(decimal_type); |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| const auto& data_column = assert_cast<const ColumnDecimal64&>(*col); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| auto data = data_column.get_element(row_id); |
| auto big_endian = bswap_64(data); |
| value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else { |
| const auto& data_column = assert_cast<const ColumnDecimal64&>(*col); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| auto data = data_column.get_element(row_id); |
| auto big_endian = bswap_64(data); |
| value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } |
| break; |
| } |
| case TYPE_DECIMAL128I: { |
| parquet::RowGroupWriter* rgWriter = get_rg_writer(); |
| parquet::FixedLenByteArrayWriter* col_writer = |
| static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i)); |
| parquet::FixedLenByteArray value; |
| auto decimal_type = check_and_get_data_type<DataTypeDecimal<Decimal128I>>( |
| remove_nullable(type).get()); |
| DCHECK(decimal_type); |
| if (null_map != nullptr) { |
| auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); |
| const auto& data_column = assert_cast<const ColumnDecimal128I&>(*col); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| if (null_data[row_id] != 0) { |
| single_def_level = 0; |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| single_def_level = 1; |
| } else { |
| auto data = data_column.get_element(row_id); |
| auto big_endian = gbswap_128(data); |
| value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); |
| col_writer->WriteBatch(1, &single_def_level, nullptr, &value); |
| } |
| } |
| } else { |
| const auto& data_column = assert_cast<const ColumnDecimal128I&>(*col); |
| for (size_t row_id = 0; row_id < sz; row_id++) { |
| auto data = data_column.get_element(row_id); |
| auto big_endian = gbswap_128(data); |
| value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); |
| col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, |
| &value); |
| } |
| } |
| break; |
| } |
| default: { |
| return Status::InvalidArgument( |
| "Invalid expression type: {}", |
| _output_vexpr_ctxs[i]->root()->type().debug_string()); |
| } |
| } |
| } |
| } catch (const std::exception& e) { |
| LOG(WARNING) << "Parquet write error: " << e.what(); |
| return Status::InternalError(e.what()); |
| } |
| _cur_written_rows += sz; |
| return Status::OK(); |
| } |
| |
| Status VParquetWriterWrapper::prepare() { |
| RETURN_IF_ERROR(parse_properties()); |
| RETURN_IF_ERROR(parse_schema()); |
| try { |
| _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, _properties); |
| } catch (const parquet::ParquetStatusException& e) { |
| LOG(WARNING) << "parquet file writer open error: " << e.what(); |
| return Status::InternalError("parquet file writer open error: {}", e.what()); |
| } |
| if (_writer == nullptr) { |
| return Status::InternalError("Failed to create file writer"); |
| } |
| return Status::OK(); |
| } |
| |
| parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() { |
| if (_rg_writer == nullptr) { |
| _rg_writer = _writer->AppendBufferedRowGroup(); |
| } |
| if (_cur_written_rows > _max_row_per_group) { |
| _rg_writer->Close(); |
| _rg_writer = _writer->AppendBufferedRowGroup(); |
| _cur_written_rows = 0; |
| } |
| return _rg_writer; |
| } |
| |
| int64_t VParquetWriterWrapper::written_len() { |
| return _outstream->get_written_len(); |
| } |
| |
| Status VParquetWriterWrapper::close() { |
| try { |
| if (_rg_writer != nullptr) { |
| LOG(INFO) << "--ftw: _rg_writer->Close()"; |
| _rg_writer->Close(); |
| _rg_writer = nullptr; |
| } |
| if (_writer != nullptr) { |
| _writer->Close(); |
| } |
| arrow::Status st = _outstream->Close(); |
| if (!st.ok()) { |
| LOG(WARNING) << "close parquet file error: " << st.ToString(); |
| return Status::IOError(st.ToString()); |
| } |
| } catch (const std::exception& e) { |
| _rg_writer = nullptr; |
| LOG(WARNING) << "Parquet writer close error: " << e.what(); |
| return Status::IOError(e.what()); |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace doris::vectorized |