blob: 16b28850ce2b355bf68522e223788fd85f047cc6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "text_reader.h"
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <cstddef>
#include <vector>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "exec/line_reader.h"
#include "io/file_factory.h"
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/s3_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "vec/exec/scan/scanner.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
void HiveTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
if (_value_sep_len == 1) {
_split_field_single_char(line, splitted_values);
} else {
_split_field_multi_char(line, splitted_values);
}
}
void HiveTextFieldSplitter::_split_field_single_char(const Slice& line,
std::vector<Slice>* splitted_values) {
const char* data = line.data;
const size_t size = line.size;
size_t value_start = 0;
for (size_t i = 0; i < size; ++i) {
if (data[i] == _value_sep[0]) {
// hive will escape the field separator in string
if (_escape_char != 0 && i > 0 && data[i - 1] == _escape_char) {
continue;
}
process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
value_start = i + _value_sep_len;
}
}
process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
}
void HiveTextFieldSplitter::_split_field_multi_char(const Slice& line,
std::vector<Slice>* splitted_values) {
const char* data = line.data;
const size_t size = line.size;
size_t start = 0;
std::vector<int> next(_value_sep_len);
next[0] = -1;
for (int i = 1, j = -1; i < (int)_value_sep_len; i++) {
while (j >= 0 && _value_sep[i] != _value_sep[j + 1]) {
j = next[j];
}
if (_value_sep[i] == _value_sep[j + 1]) {
j++;
}
next[i] = j;
}
// KMP search
for (int i = 0, j = -1; i < (int)size; i++) {
while (j >= 0 && data[i] != _value_sep[j + 1]) {
j = next[j];
}
if (data[i] == _value_sep[j + 1]) {
j++;
}
if (j == (int)_value_sep_len - 1) {
size_t curpos = i - _value_sep_len + 1;
if (_escape_char != 0 && curpos > 0 && data[curpos - 1] == _escape_char) {
j = next[j];
continue;
}
if (curpos >= start) {
process_value_func(data, start, curpos - start, _trimming_char, splitted_values);
start = curpos + _value_sep_len;
}
j = next[j];
}
}
process_value_func(data, start, size - start, _trimming_char, splitted_values);
}
TextReader::TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx)
: CsvReader(state, profile, counter, params, range, file_slot_descs, io_ctx) {}
Status TextReader::_init_options() {
// get column_separator and line_delimiter
_value_separator = _params.file_attributes.text_params.column_separator;
_value_separator_length = _value_separator.size();
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
if (_params.file_attributes.text_params.__isset.escape) {
_escape = _params.file_attributes.text_params.escape;
}
_options.escape_char = _escape;
_options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
_options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
_options.null_format = _params.file_attributes.text_params.null_format.data();
_options.null_len = _params.file_attributes.text_params.null_format.length();
return Status::OK();
}
Status TextReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
return serde->deserialize_one_cell_from_hive_text(column, slice, _options);
}
Status TextReader::_create_line_reader() {
std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(_line_delimiter,
_line_delimiter_length, false);
_fields_splitter = std::make_unique<HiveTextFieldSplitter>(
false, false, _value_separator, _value_separator_length, -1, _escape);
_line_reader =
NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
text_line_reader_ctx, _size, _start_offset);
return Status::OK();
}
Status TextReader::_validate_line(const Slice& line, bool* success) {
// text file do not need utf8 check
*success = true;
return Status::OK();
}
Status TextReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
auto& null_column = assert_cast<ColumnNullable&>(column);
if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
null_column.insert_data(nullptr, 0);
return Status::OK();
}
static DataTypeStringSerDe stringSerDe(TYPE_STRING);
auto st = stringSerDe.deserialize_one_cell_from_hive_text(null_column.get_nested_column(),
slice, _options);
if (!st.ok()) {
// fill null if fail
null_column.insert_data(nullptr, 0); // 0 is meaningless here
return Status::OK();
}
// fill not null if success
null_column.get_null_map_data().push_back(0);
return Status::OK();
}
#include "common/compile_check_end.h"
} // namespace doris::vectorized