| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/runtime/vcsv_transformer.h" |
| |
| #include <glog/logging.h> |
| |
| #include <cstdlib> |
| #include <cstring> |
| |
| #include "common/cast_set.h" |
| #include "common/status.h" |
| #include "io/fs/file_writer.h" |
| #include "runtime/primitive_type.h" |
| #include "runtime/types.h" |
| #include "util/faststring.h" |
| #include "vec/columns/column_string.h" |
| #include "vec/common/string_buffer.hpp" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/data_types/serde/data_type_serde.h" |
| #include "vec/exec/format/csv/csv_reader.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| static const unsigned char bom[] = {0xEF, 0xBB, 0xBF}; |
| |
| VCSVTransformer::VCSVTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, |
| const VExprContextSPtrs& output_vexpr_ctxs, |
| bool output_object_data, std::string_view header_type, |
| std::string_view header, std::string_view column_separator, |
| std::string_view line_delimiter, bool with_bom, |
| TFileCompressType::type compress_type, |
| const THiveSerDeProperties* hive_serde_properties) |
| : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), |
| _column_separator(column_separator), |
| _line_delimiter(line_delimiter), |
| _file_writer(file_writer), |
| _with_bom(with_bom), |
| _compress_type(compress_type), |
| _is_text_format(hive_serde_properties != nullptr) { |
| if (!header.empty()) { |
| _csv_header = header; |
| if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { |
| _csv_header += _gen_csv_header_types(); |
| } |
| } else { |
| _csv_header = ""; |
| } |
| |
| _options.timezone = &state->timezone_obj(); |
| if (_is_text_format) { |
| _options.field_delim = hive_serde_properties->field_delim; |
| _options.collection_delim = hive_serde_properties->collection_delim[0]; |
| _options.map_key_delim = hive_serde_properties->mapkv_delim[0]; |
| if (hive_serde_properties->__isset.escape_char) { |
| _options.escape_char = hive_serde_properties->escape_char[0]; |
| } |
| _options.null_format = hive_serde_properties->null_format.data(); |
| _options.null_len = cast_set<int>(hive_serde_properties->null_format.length()); |
| // The list of separators + escapeChar are the bytes required to be escaped. |
| if (_options.escape_char != 0) { |
| _options.need_escape[_options.escape_char & 0xff] = true; |
| } |
| for (int i = 0; i <= 153; i++) { |
| _options.need_escape[_options.get_collection_delimiter(i) & 0xff] = true; |
| } |
| } |
| } |
| |
| Status VCSVTransformer::open() { |
| RETURN_IF_ERROR(get_block_compression_codec(_compress_type, &_compress_codec)); |
| if (_with_bom) { |
| Slice bom_slice(reinterpret_cast<const char*>(bom), sizeof(bom)); |
| if (_compress_codec) { |
| faststring compressed_data; |
| RETURN_IF_ERROR(_compress_codec->compress(bom_slice, &compressed_data)); |
| RETURN_IF_ERROR( |
| _file_writer->append(Slice(compressed_data.data(), compressed_data.size()))); |
| } else { |
| RETURN_IF_ERROR(_file_writer->append(bom_slice)); |
| } |
| } |
| if (!_csv_header.empty()) { |
| Slice header(_csv_header.data(), _csv_header.size()); |
| if (_compress_codec) { |
| faststring compressed_data; |
| RETURN_IF_ERROR(_compress_codec->compress(header, &compressed_data)); |
| RETURN_IF_ERROR( |
| _file_writer->append(Slice(compressed_data.data(), compressed_data.size()))); |
| } else { |
| return _file_writer->append(header); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| int64_t VCSVTransformer::written_len() { |
| return _file_writer->bytes_appended(); |
| } |
| |
| Status VCSVTransformer::close() { |
| return _file_writer->close(); |
| } |
| |
| Status VCSVTransformer::write(const Block& block) { |
| auto ser_col = ColumnString::create(); |
| ser_col->reserve(block.columns()); |
| VectorBufferWriter buffer_writer(*ser_col.get()); |
| for (int i = 0; i < block.rows(); i++) { |
| for (size_t col_id = 0; col_id < block.columns(); col_id++) { |
| if (col_id != 0) { |
| buffer_writer.write(_column_separator.data(), _column_separator.size()); |
| } |
| Status st = _is_text_format ? _serdes[col_id]->serialize_one_cell_to_hive_text( |
| *(block.get_by_position(col_id).column), i, |
| buffer_writer, _options) |
| : _serdes[col_id]->serialize_one_cell_to_json( |
| *(block.get_by_position(col_id).column), i, |
| buffer_writer, _options); |
| if (!st.ok()) { |
| // VectorBufferWriter must do commit before deconstruct, |
| // or it may throw DCHECK failure. |
| buffer_writer.commit(); |
| return st; |
| } |
| } |
| buffer_writer.write(_line_delimiter.data(), _line_delimiter.size()); |
| buffer_writer.commit(); |
| } |
| return _flush_plain_text_outstream(*ser_col.get()); |
| } |
| |
| Status VCSVTransformer::_flush_plain_text_outstream(ColumnString& ser_col) { |
| if (ser_col.byte_size() == 0) { |
| return Status::OK(); |
| } |
| |
| Slice append_data(ser_col.get_chars().data(), ser_col.get_chars().size()); |
| |
| if (_compress_codec) { |
| faststring compressed_data; |
| RETURN_IF_ERROR(_compress_codec->compress(append_data, &compressed_data)); |
| RETURN_IF_ERROR( |
| _file_writer->append(Slice(compressed_data.data(), compressed_data.size()))); |
| } else { |
| RETURN_IF_ERROR(_file_writer->append(append_data)); |
| } |
| |
| // clear the stream |
| ser_col.clear(); |
| return Status::OK(); |
| } |
| |
| std::string VCSVTransformer::_gen_csv_header_types() { |
| std::string types; |
| int num_columns = (int)_output_vexpr_ctxs.size(); |
| for (int i = 0; i < num_columns; ++i) { |
| types += type_to_string(_output_vexpr_ctxs[i]->root()->data_type()->get_primitive_type()); |
| if (i < num_columns - 1) { |
| types += _column_separator; |
| } |
| } |
| types += _line_delimiter; |
| return types; |
| } |
| } // namespace doris::vectorized |