| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "csv_reader.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <regex> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/consts.h" |
| #include "common/status.h" |
| #include "exec/decompressor.h" |
| #include "exec/line_reader.h" |
| #include "io/file_factory.h" |
| #include "io/fs/broker_file_reader.h" |
| #include "io/fs/buffered_reader.h" |
| #include "io/fs/file_reader.h" |
| #include "io/fs/s3_file_reader.h" |
| #include "io/fs/tracing_file_reader.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/runtime_state.h" |
| #include "util/string_util.h" |
| #include "util/utf8_check.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/data_types/data_type_factory.hpp" |
| #include "vec/exec/format/file_reader/new_plain_binary_line_reader.h" |
| #include "vec/exec/format/file_reader/new_plain_text_line_reader.h" |
| #include "vec/exec/scan/scanner.h" |
| |
| namespace doris { |
| class RuntimeProfile; |
| namespace vectorized { |
| class IColumn; |
| } // namespace vectorized |
| namespace io { |
| struct IOContext; |
| enum class FileCachePolicy : uint8_t; |
| } // namespace io |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) { |
| const char* data = line.data; |
| const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions(); |
| size_t value_start_offset = 0; |
| for (auto idx : column_sep_positions) { |
| process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char, |
| splitted_values); |
| value_start_offset = idx + _value_sep_len; |
| } |
| if (line.size >= value_start_offset) { |
| // process the last column |
| process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char, |
| splitted_values); |
| } |
| } |
| |
| void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line, |
| std::vector<Slice>* splitted_values) { |
| const char* data = line.data; |
| const size_t size = line.size; |
| size_t value_start = 0; |
| for (size_t i = 0; i < size; ++i) { |
| if (data[i] == _value_sep[0]) { |
| process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values); |
| value_start = i + _value_sep_len; |
| } |
| } |
| process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values); |
| } |
| |
| void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line, |
| std::vector<Slice>* splitted_values) { |
| size_t start = 0; // point to the start pos of next col value. |
| size_t curpos = 0; // point to the start pos of separator matching sequence. |
| |
| // value_sep : AAAA |
| // line.data : 1234AAAA5678 |
| // -> 1234,5678 |
| |
| // start start |
| // ▼ ▼ |
| // 1234AAAA5678\0 |
| // ▲ ▲ |
| // curpos curpos |
| |
| //kmp |
| std::vector<int> next(_value_sep_len); |
| next[0] = -1; |
| for (int i = 1, j = -1; i < _value_sep_len; i++) { |
| while (j > -1 && _value_sep[i] != _value_sep[j + 1]) { |
| j = next[j]; |
| } |
| if (_value_sep[i] == _value_sep[j + 1]) { |
| j++; |
| } |
| next[i] = j; |
| } |
| |
| for (int i = 0, j = -1; i < line.size; i++) { |
| // i : line |
| // j : _value_sep |
| while (j > -1 && line[i] != _value_sep[j + 1]) { |
| j = next[j]; |
| } |
| if (line[i] == _value_sep[j + 1]) { |
| j++; |
| } |
| if (j == _value_sep_len - 1) { |
| curpos = i - _value_sep_len + 1; |
| |
| /* |
| * column_separator : "xx" |
| * data.csv : data1xxxxdata2 |
| * |
| * Parse incorrectly: |
| * data1[xx]xxdata2 |
| * data1x[xx]xdata2 |
| * data1xx[xx]data2 |
| * The string "xxxx" is parsed into three "xx" delimiters. |
| * |
| * Parse correctly: |
| * data1[xx]xxdata2 |
| * data1xx[xx]data2 |
| */ |
| |
| if (curpos >= start) { |
| process_value_func(line.data, start, curpos - start, _trimming_char, |
| splitted_values); |
| start = i + 1; |
| } |
| |
| j = next[j]; |
| } |
| } |
| process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values); |
| } |
| |
| void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) { |
| if (is_single_char_delim) { |
| _split_field_single_char(line, splitted_values); |
| } else { |
| _split_field_multi_char(line, splitted_values); |
| } |
| } |
| |
| CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
| const TFileScanRangeParams& params, const TFileRangeDesc& range, |
| const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx, |
| std::shared_ptr<io::IOContext> io_ctx_holder) |
| : _profile(profile), |
| _params(params), |
| _file_reader(nullptr), |
| _line_reader(nullptr), |
| _decompressor(nullptr), |
| _state(state), |
| _counter(counter), |
| _range(range), |
| _file_slot_descs(file_slot_descs), |
| _line_reader_eof(false), |
| _skip_lines(0), |
| _io_ctx(io_ctx), |
| _io_ctx_holder(std::move(io_ctx_holder)) { |
| if (_io_ctx == nullptr && _io_ctx_holder) { |
| _io_ctx = _io_ctx_holder.get(); |
| } |
| _file_format_type = _params.format_type; |
| _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; |
| if (_range.__isset.compress_type) { |
| // for compatibility |
| _file_compress_type = _range.compress_type; |
| } else { |
| _file_compress_type = _params.compress_type; |
| } |
| _size = _range.size; |
| |
| _split_values.reserve(_file_slot_descs.size()); |
| _init_system_properties(); |
| _init_file_description(); |
| _serdes = vectorized::create_data_type_serdes(_file_slot_descs); |
| } |
| |
| void CsvReader::_init_system_properties() { |
| if (_range.__isset.file_type) { |
| // for compatibility |
| _system_properties.system_type = _range.file_type; |
| } else { |
| _system_properties.system_type = _params.file_type; |
| } |
| _system_properties.properties = _params.properties; |
| _system_properties.hdfs_params = _params.hdfs_params; |
| if (_params.__isset.broker_addresses) { |
| _system_properties.broker_addresses.assign(_params.broker_addresses.begin(), |
| _params.broker_addresses.end()); |
| } |
| } |
| |
| void CsvReader::_init_file_description() { |
| _file_description.path = _range.path; |
| _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1; |
| if (_range.__isset.fs_name) { |
| _file_description.fs_name = _range.fs_name; |
| } |
| } |
| |
| Status CsvReader::init_reader(bool is_load) { |
| // set the skip lines and start offset |
| _start_offset = _range.start_offset; |
| if (_start_offset == 0) { |
| // check header typer first |
| if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && |
| !_params.file_attributes.header_type.empty()) { |
| std::string header_type = to_lower(_params.file_attributes.header_type); |
| if (header_type == BeConsts::CSV_WITH_NAMES) { |
| _skip_lines = 1; |
| } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { |
| _skip_lines = 2; |
| } |
| } else if (_params.file_attributes.__isset.skip_lines) { |
| _skip_lines = _params.file_attributes.skip_lines; |
| } |
| } else if (_start_offset != 0) { |
| if ((_file_compress_type != TFileCompressType::PLAIN) || |
| (_file_compress_type == TFileCompressType::UNKNOWN && |
| _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) { |
| return Status::InternalError<false>("For now we do not support split compressed file"); |
| } |
| // pre-read to promise first line skipped always read |
| int64_t pre_read_len = std::min( |
| static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()), |
| _start_offset); |
| _start_offset -= pre_read_len; |
| _size += pre_read_len; |
| // not first range will always skip one line |
| _skip_lines = 1; |
| } |
| |
| _use_nullable_string_opt.resize(_file_slot_descs.size()); |
| for (int i = 0; i < _file_slot_descs.size(); ++i) { |
| auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr(); |
| if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) { |
| _use_nullable_string_opt[i] = 1; |
| } |
| } |
| |
| RETURN_IF_ERROR(_init_options()); |
| RETURN_IF_ERROR(_create_file_reader(false)); |
| RETURN_IF_ERROR(_create_decompressor()); |
| RETURN_IF_ERROR(_create_line_reader()); |
| |
| _is_load = is_load; |
| if (!_is_load) { |
| // For query task, there are 2 slot mapping. |
| // One is from file slot to values in line. |
| // eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5 |
| // the _col_idxs will save: 0, 2, 4 |
| // The other is from file slot to columns in output block |
| // eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5 |
| // where "p1" is the partition col which does not exist in file |
| // the _file_slot_idx_map will save: 1, 2, 3 |
| DCHECK(_params.__isset.column_idxs); |
| _col_idxs = _params.column_idxs; |
| int idx = 0; |
| for (const auto& slot_info : _params.required_slots) { |
| if (slot_info.is_file_slot) { |
| _file_slot_idx_map.push_back(idx); |
| } |
| idx++; |
| } |
| } else { |
| // For load task, the column order is same as file column order |
| int i = 0; |
| for (const auto& desc [[maybe_unused]] : _file_slot_descs) { |
| _col_idxs.push_back(i++); |
| } |
| } |
| |
| _line_reader_eof = false; |
| return Status::OK(); |
| } |
| |
| // !FIXME: Here we should use MutableBlock |
| Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
| if (_line_reader_eof) { |
| *eof = true; |
| return Status::OK(); |
| } |
| |
| const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); |
| size_t rows = 0; |
| |
| bool success = false; |
| bool is_remove_bom = false; |
| if (_push_down_agg_type == TPushAggOp::type::COUNT) { |
| while (rows < batch_size && !_line_reader_eof) { |
| const uint8_t* ptr = nullptr; |
| size_t size = 0; |
| RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
| |
| // _skip_lines == 0 means this line is the actual data beginning line for the entire file |
| // is_remove_bom means _remove_bom should only execute once |
| if (_skip_lines == 0 && !is_remove_bom) { |
| ptr = _remove_bom(ptr, size); |
| is_remove_bom = true; |
| } |
| |
| // _skip_lines > 0 means we do not need to remove bom |
| if (_skip_lines > 0) { |
| _skip_lines--; |
| is_remove_bom = true; |
| continue; |
| } |
| if (size == 0) { |
| if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) { |
| ++rows; |
| } |
| // Read empty line, continue |
| continue; |
| } |
| |
| RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); |
| ++rows; |
| } |
| auto mutate_columns = block->mutate_columns(); |
| for (auto& col : mutate_columns) { |
| col->resize(rows); |
| } |
| block->set_columns(std::move(mutate_columns)); |
| } else { |
| auto columns = block->mutate_columns(); |
| while (rows < batch_size && !_line_reader_eof) { |
| const uint8_t* ptr = nullptr; |
| size_t size = 0; |
| RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
| |
| // _skip_lines == 0 means this line is the actual data beginning line for the entire file |
| // is_remove_bom means _remove_bom should only execute once |
| if (!is_remove_bom && _skip_lines == 0) { |
| ptr = _remove_bom(ptr, size); |
| is_remove_bom = true; |
| } |
| |
| // _skip_lines > 0 means we do not remove bom |
| if (_skip_lines > 0) { |
| _skip_lines--; |
| is_remove_bom = true; |
| continue; |
| } |
| if (size == 0) { |
| if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) { |
| RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows)); |
| } |
| // Read empty line, continue |
| continue; |
| } |
| |
| RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); |
| if (!success) { |
| continue; |
| } |
| RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows)); |
| } |
| block->set_columns(std::move(columns)); |
| } |
| |
| *eof = (rows == 0); |
| *read_rows = rows; |
| |
| return Status::OK(); |
| } |
| |
| Status CsvReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
| std::unordered_set<std::string>* missing_cols) { |
| for (const auto& slot : _file_slot_descs) { |
| name_to_type->emplace(slot->col_name(), slot->type()); |
| } |
| return Status::OK(); |
| } |
| |
| // init decompressor, file reader and line reader for parsing schema |
| Status CsvReader::init_schema_reader() { |
| _start_offset = _range.start_offset; |
| if (_start_offset != 0) { |
| return Status::InvalidArgument( |
| "start offset of TFileRangeDesc must be zero in get parsered schema"); |
| } |
| if (_params.file_type == TFileType::FILE_BROKER) { |
| return Status::InternalError<false>( |
| "Getting parsered schema from csv file do not support stream load and broker " |
| "load."); |
| } |
| |
| // csv file without names line and types line. |
| _read_line = 1; |
| _is_parse_name = false; |
| |
| if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && |
| !_params.file_attributes.header_type.empty()) { |
| std::string header_type = to_lower(_params.file_attributes.header_type); |
| if (header_type == BeConsts::CSV_WITH_NAMES) { |
| _is_parse_name = true; |
| } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { |
| _read_line = 2; |
| _is_parse_name = true; |
| } |
| } |
| |
| RETURN_IF_ERROR(_init_options()); |
| RETURN_IF_ERROR(_create_file_reader(true)); |
| RETURN_IF_ERROR(_create_decompressor()); |
| RETURN_IF_ERROR(_create_line_reader()); |
| return Status::OK(); |
| } |
| |
| Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names, |
| std::vector<DataTypePtr>* col_types) { |
| if (_read_line == 1) { |
| if (!_is_parse_name) { //parse csv file without names and types |
| size_t col_nums = 0; |
| RETURN_IF_ERROR(_parse_col_nums(&col_nums)); |
| for (size_t i = 0; i < col_nums; ++i) { |
| col_names->emplace_back("c" + std::to_string(i + 1)); |
| } |
| } else { // parse csv file with names |
| RETURN_IF_ERROR(_parse_col_names(col_names)); |
| } |
| |
| for (size_t j = 0; j < col_names->size(); ++j) { |
| col_types->emplace_back( |
| DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true)); |
| } |
| } else { // parse csv file with names and types |
| RETURN_IF_ERROR(_parse_col_names(col_names)); |
| RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types)); |
| } |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) { |
| auto& null_column = assert_cast<ColumnNullable&>(column); |
| if (_empty_field_as_null) { |
| if (slice.size == 0) { |
| null_column.insert_data(nullptr, 0); |
| return Status::OK(); |
| } |
| } |
| if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) { |
| if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) { |
| null_column.insert_data(nullptr, 0); |
| return Status::OK(); |
| } |
| } |
| static DataTypeStringSerDe stringSerDe(TYPE_STRING); |
| auto st = stringSerDe.deserialize_one_cell_from_csv(null_column.get_nested_column(), slice, |
| _options); |
| if (!st.ok()) { |
| // fill null if fail |
| null_column.insert_data(nullptr, 0); // 0 is meaningless here |
| return Status::OK(); |
| } |
| // fill not null if success |
| null_column.get_null_map_data().push_back(0); |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_init_options() { |
| // get column_separator and line_delimiter |
| _value_separator = _params.file_attributes.text_params.column_separator; |
| _value_separator_length = _value_separator.size(); |
| _line_delimiter = _params.file_attributes.text_params.line_delimiter; |
| _line_delimiter_length = _line_delimiter.size(); |
| if (_params.file_attributes.text_params.__isset.enclose) { |
| _enclose = _params.file_attributes.text_params.enclose; |
| } |
| if (_params.file_attributes.text_params.__isset.escape) { |
| _escape = _params.file_attributes.text_params.escape; |
| } |
| |
| _trim_tailing_spaces = |
| (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()); |
| |
| _options.escape_char = _escape; |
| _options.quote_char = _enclose; |
| |
| if (_params.file_attributes.text_params.collection_delimiter.empty()) { |
| _options.collection_delim = ','; |
| } else { |
| _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0]; |
| } |
| if (_params.file_attributes.text_params.mapkv_delimiter.empty()) { |
| _options.map_key_delim = ':'; |
| } else { |
| _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0]; |
| } |
| |
| if (_params.file_attributes.text_params.__isset.null_format) { |
| _options.null_format = _params.file_attributes.text_params.null_format.data(); |
| _options.null_len = _params.file_attributes.text_params.null_format.length(); |
| } |
| |
| if (_params.file_attributes.__isset.trim_double_quotes) { |
| _trim_double_quotes = _params.file_attributes.trim_double_quotes; |
| } |
| _options.converted_from_string = _trim_double_quotes; |
| |
| if (_state != nullptr) { |
| _keep_cr = _state->query_options().keep_carriage_return; |
| } |
| |
| if (_params.file_attributes.text_params.__isset.empty_field_as_null) { |
| _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null; |
| } |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_create_decompressor() { |
| if (_file_compress_type != TFileCompressType::UNKNOWN) { |
| RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); |
| } else { |
| RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_create_file_reader(bool need_schema) { |
| if (_params.file_type == TFileType::FILE_STREAM) { |
| RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, |
| need_schema)); |
| } else { |
| _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; |
| io::FileReaderOptions reader_options = |
| FileFactory::get_reader_options(_state, _file_description); |
| io::FileReaderSPtr file_reader; |
| if (_io_ctx_holder) { |
| file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
| _profile, _system_properties, _file_description, reader_options, |
| io::DelegateReader::AccessMode::SEQUENTIAL, |
| std::static_pointer_cast<const io::IOContext>(_io_ctx_holder), |
| io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); |
| } else { |
| file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
| _profile, _system_properties, _file_description, reader_options, |
| io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, |
| io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); |
| } |
| _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader), |
| _io_ctx->file_reader_stats) |
| : file_reader; |
| } |
| if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && |
| _params.file_type != TFileType::FILE_BROKER) { |
| return Status::EndOfFile("init reader failed, empty csv file: " + _range.path); |
| } |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_create_line_reader() { |
| std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx; |
| if (_enclose == 0) { |
| text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>( |
| _line_delimiter, _line_delimiter_length, _keep_cr); |
| _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>( |
| _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1); |
| |
| } else { |
| // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0 |
| size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0; |
| text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>( |
| _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length, |
| col_sep_num, _enclose, _escape, _keep_cr); |
| |
| _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>( |
| _trim_tailing_spaces, true, |
| std::static_pointer_cast<EncloseCsvLineReaderCtx>(text_line_reader_ctx), |
| _value_separator_length, _enclose); |
| } |
| switch (_file_format_type) { |
| case TFileFormatType::FORMAT_CSV_PLAIN: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_GZ: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_BZ2: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_LZOP: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
| [[fallthrough]]; |
| case TFileFormatType::FORMAT_CSV_DEFLATE: |
| _line_reader = |
| NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(), |
| text_line_reader_ctx, _size, _start_offset); |
| |
| break; |
| case TFileFormatType::FORMAT_PROTO: |
| _fields_splitter = std::make_unique<CsvProtoFieldSplitter>(); |
| _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader); |
| break; |
| default: |
| return Status::InternalError<false>( |
| "Unknown format type, cannot init line reader in csv reader, type={}", |
| _file_format_type); |
| } |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) { |
| return serde->deserialize_one_cell_from_csv(column, slice, _options); |
| } |
| |
| Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, |
| std::vector<MutableColumnPtr>& columns, size_t* rows) { |
| bool is_success = false; |
| |
| RETURN_IF_ERROR(_line_split_to_values(line, &is_success)); |
| if (UNLIKELY(!is_success)) { |
| // If not success, which means we met an invalid row, filter this row and return. |
| return Status::OK(); |
| } |
| |
| for (int i = 0; i < _file_slot_descs.size(); ++i) { |
| int col_idx = _col_idxs[i]; |
| // col idx is out of range, fill with null format |
| auto value = col_idx < _split_values.size() |
| ? _split_values[col_idx] |
| : Slice(_options.null_format, _options.null_len); |
| |
| IColumn* col_ptr = columns[i].get(); |
| if (!_is_load) { |
| // block is a Block*, and get_by_position returns a ColumnPtr, |
| // which is a const pointer. Therefore, using const_cast is permissible. |
| col_ptr = const_cast<IColumn*>( |
| block->get_by_position(_file_slot_idx_map[i]).column.get()); |
| } |
| |
| if (_use_nullable_string_opt[i]) { |
| // For load task, we always read "string" from file. |
| // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe. |
| // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls. |
| RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value)); |
| } else { |
| RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value)); |
| } |
| } |
| ++(*rows); |
| |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, |
| size_t* rows) { |
| for (int i = 0; i < _file_slot_descs.size(); ++i) { |
| IColumn* col_ptr = columns[i].get(); |
| if (!_is_load) { |
| // block is a Block*, and get_by_position returns a ColumnPtr, |
| // which is a const pointer. Therefore, using const_cast is permissible. |
| col_ptr = const_cast<IColumn*>( |
| block->get_by_position(_file_slot_idx_map[i]).column.get()); |
| } |
| auto& null_column = assert_cast<ColumnNullable&>(*col_ptr); |
| null_column.insert_data(nullptr, 0); |
| } |
| ++(*rows); |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_validate_line(const Slice& line, bool* success) { |
| if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) { |
| if (!_is_load) { |
| return Status::InternalError<false>("Only support csv data in utf8 codec"); |
| } else { |
| _counter->num_rows_filtered++; |
| *success = false; |
| RETURN_IF_ERROR(_state->append_error_msg_to_file( |
| [&]() -> std::string { return std::string(line.data, line.size); }, |
| [&]() -> std::string { |
| return "Invalid file encoding: all CSV files must be UTF-8 encoded"; |
| })); |
| return Status::OK(); |
| } |
| } |
| *success = true; |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { |
| _split_line(line); |
| |
| if (_is_load) { |
| // Only check for load task. For query task, the non exist column will be filled "null". |
| // if actual column number in csv file is not equal to _file_slot_descs.size() |
| // then filter this line. |
| bool ignore_col = false; |
| ignore_col = _params.__isset.file_attributes && |
| _params.file_attributes.__isset.ignore_csv_redundant_col && |
| _params.file_attributes.ignore_csv_redundant_col; |
| |
| if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) || |
| (ignore_col && _split_values.size() < _file_slot_descs.size())) { |
| _counter->num_rows_filtered++; |
| *success = false; |
| RETURN_IF_ERROR(_state->append_error_msg_to_file( |
| [&]() -> std::string { return std::string(line.data, line.size); }, |
| [&]() -> std::string { |
| fmt::memory_buffer error_msg; |
| fmt::format_to(error_msg, |
| "Column count mismatch: expected {}, but found {}", |
| _file_slot_descs.size(), _split_values.size()); |
| std::string escaped_separator = |
| std::regex_replace(_value_separator, std::regex("\t"), "\\t"); |
| std::string escaped_delimiter = |
| std::regex_replace(_line_delimiter, std::regex("\n"), "\\n"); |
| fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator, |
| escaped_delimiter); |
| if (_enclose != 0) { |
| fmt::format_to(error_msg, " encl:{}", _enclose); |
| } |
| if (_escape != 0) { |
| fmt::format_to(error_msg, " esc:{}", _escape); |
| } |
| fmt::format_to(error_msg, ")"); |
| return fmt::to_string(error_msg); |
| })); |
| return Status::OK(); |
| } |
| } |
| |
| *success = true; |
| return Status::OK(); |
| } |
| |
| void CsvReader::_split_line(const Slice& line) { |
| _split_values.clear(); |
| _fields_splitter->split_line(line, &_split_values); |
| } |
| |
| Status CsvReader::_parse_col_nums(size_t* col_nums) { |
| const uint8_t* ptr = nullptr; |
| size_t size = 0; |
| RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
| if (size == 0) { |
| return Status::InternalError<false>( |
| "The first line is empty, can not parse column numbers"); |
| } |
| if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) { |
| return Status::InternalError<false>("Only support csv data in utf8 codec"); |
| } |
| ptr = _remove_bom(ptr, size); |
| _split_line(Slice(ptr, size)); |
| *col_nums = _split_values.size(); |
| return Status::OK(); |
| } |
| |
| Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) { |
| const uint8_t* ptr = nullptr; |
| size_t size = 0; |
| // no use of _line_reader_eof |
| RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
| if (size == 0) { |
| return Status::InternalError<false>("The first line is empty, can not parse column names"); |
| } |
| if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) { |
| return Status::InternalError<false>("Only support csv data in utf8 codec"); |
| } |
| ptr = _remove_bom(ptr, size); |
| _split_line(Slice(ptr, size)); |
| for (auto _split_value : _split_values) { |
| col_names->emplace_back(_split_value.to_string()); |
| } |
| return Status::OK(); |
| } |
| |
| // TODO(ftw): parse type |
| Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) { |
| // delete after. |
| for (size_t i = 0; i < col_nums; ++i) { |
| col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>())); |
| } |
| |
| // 1. check _line_reader_eof |
| // 2. read line |
| // 3. check utf8 |
| // 4. check size |
| // 5. check _split_values.size must equal to col_nums. |
| // 6. fill col_types |
| return Status::OK(); |
| } |
| |
| const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) { |
| if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) { |
| LOG(INFO) << "remove bom"; |
| size -= 3; |
| return ptr + 3; |
| } |
| return ptr; |
| } |
| |
| Status CsvReader::close() { |
| if (_line_reader) { |
| _line_reader->close(); |
| } |
| |
| if (_file_reader) { |
| RETURN_IF_ERROR(_file_reader->close()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::vectorized |