| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <rapidjson/allocators.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/encodings.h> |
| #include <rapidjson/rapidjson.h> |
| #include <simdjson/common_defs.h> |
| #include <simdjson/simdjson.h> // IWYU pragma: keep |
| |
| #include <memory> |
| #include <string> |
| #include <string_view> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "core/custom_allocator.h" |
| #include "core/string_ref.h" |
| #include "core/types.h" |
| #include "exprs/json_functions.h" |
| #include "format/generic_reader.h" |
| #include "format/line_reader.h" |
| #include "io/file_factory.h" |
| #include "io/fs/file_reader_writer_fwd.h" |
| #include "runtime/runtime_profile.h" |
| #include "util/decompressor.h" |
| |
| namespace simdjson::fallback::ondemand { |
| class object; |
| } // namespace simdjson::fallback::ondemand |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class SlotDescriptor; |
| class RuntimeState; |
| class TFileRangeDesc; |
| class TFileScanRangeParams; |
| |
| namespace io { |
| class FileSystem; |
| struct IOContext; |
| } // namespace io |
| |
| struct ScannerCounter; |
| class Block; |
| class IColumn; |
| |
| class NewJsonReader : public GenericReader { |
| ENABLE_FACTORY_CREATOR(NewJsonReader); |
| |
| public: |
| NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
| const TFileScanRangeParams& params, const TFileRangeDesc& range, |
| const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof, |
| io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr); |
| |
| NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params, |
| const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs, |
| io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr); |
| ~NewJsonReader() override = default; |
| |
| Status init_reader( |
| const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx, |
| bool is_load); |
| Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
| Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
| std::unordered_set<std::string>* missing_cols) override; |
| Status init_schema_reader() override; |
| Status get_parsed_schema(std::vector<std::string>* col_names, |
| std::vector<DataTypePtr>* col_types) override; |
| |
| protected: |
| void _collect_profile_before_close() override; |
| |
| private: |
| Status _get_range_params(); |
| void _init_system_properties(); |
| void _init_file_description(); |
| Status _open_file_reader(bool need_schema); |
| Status _open_line_reader(); |
| Status _parse_jsonpath_and_json_root(); |
| |
| Status _read_json_column(RuntimeState* state, Block& block, |
| const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
| bool* eof); |
| |
| Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size); |
| |
| // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request. |
| // Need to read all the data before performing JSON parsing. |
| Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size); |
| |
| // simdjson, replace none simdjson function if it is ready |
| Status _simdjson_init_reader(); |
| Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, |
| simdjson::error_code* error); |
| Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error, |
| bool* is_empty_row); |
| Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row); |
| |
| Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& block, size_t num_rows, |
| bool* eof); |
| |
| Status _simdjson_handle_simple_json(RuntimeState* state, Block& block, |
| const std::vector<SlotDescriptor*>& slot_descs, |
| bool* is_empty_row, bool* eof); |
| |
| Status _simdjson_handle_simple_json_write_columns( |
| Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
| bool* eof); |
| |
| Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, Block& block, |
| const std::vector<SlotDescriptor*>& slot_descs, |
| bool* is_empty_row, bool* eof); |
| |
| Status _simdjson_handle_flat_array_complex_json_write_columns( |
| Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
| bool* eof); |
| |
| Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& block, |
| const std::vector<SlotDescriptor*>& slot_descs, |
| bool* is_empty_row, bool* eof); |
| |
| Status _simdjson_set_column_value(simdjson::ondemand::object* value, Block& block, |
| const std::vector<SlotDescriptor*>& slot_descs, bool* valid); |
| |
| template <bool use_string_cache> |
| Status _simdjson_write_data_to_column(simdjson::ondemand::value& value, |
| const DataTypePtr& type_desc, IColumn* column_ptr, |
| const std::string& column_name, DataTypeSerDeSPtr serde, |
| bool* valid); |
| |
| Status _simdjson_write_columns_by_jsonpath(simdjson::ondemand::object* value, |
| const std::vector<SlotDescriptor*>& slot_descs, |
| Block& block, bool* valid); |
| Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg, |
| std::string col_name, bool* valid); |
| |
| size_t _column_index(const StringRef& name, size_t key_index); |
| |
| Status (NewJsonReader::*_vhandle_json_callback)(RuntimeState* state, Block& block, |
| const std::vector<SlotDescriptor*>& slot_descs, |
| bool* is_empty_row, bool* eof); |
| Status _get_column_default_value( |
| const std::vector<SlotDescriptor*>& slot_descs, |
| const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx); |
| |
| Status _fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde, |
| IColumn* column_ptr, bool* valid); |
| |
| // fe will add skip_bitmap_col to _file_slot_descs iff the target olap table has skip_bitmap_col |
| // and the current load is a flexible partial update |
| // flexible partial update can not be used when user specify jsonpaths, so we just fill the skip bitmap |
| // in `_simdjson_handle_simple_json` and `_vhandle_simple_json` (which will be used when jsonpaths is not specified) |
| bool _should_process_skip_bitmap_col() const { return skip_bitmap_col_idx != -1; } |
| void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count); |
| void _set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, Block& block, |
| size_t cur_row_count, bool* valid); |
| RuntimeState* _state = nullptr; |
| RuntimeProfile* _profile = nullptr; |
| ScannerCounter* _counter = nullptr; |
| const TFileScanRangeParams& _params; |
| const TFileRangeDesc& _range; |
| io::FileSystemProperties _system_properties; |
| io::FileDescription _file_description; |
| const std::vector<SlotDescriptor*>& _file_slot_descs; |
| |
| io::FileReaderSPtr _file_reader; |
| std::unique_ptr<LineReader> _line_reader; |
| bool _reader_eof; |
| std::unique_ptr<Decompressor> _decompressor; |
| TFileCompressType::type _file_compress_type; |
| |
| // When we fetch range doesn't start from 0 will always skip the first line |
| bool _skip_first_line; |
| |
| std::string _line_delimiter; |
| size_t _line_delimiter_length; |
| |
| uint32_t _next_row; |
| size_t _total_rows; |
| |
| std::string _jsonpaths; |
| std::string _json_root; |
| bool _read_json_by_line; |
| bool _strip_outer_array; |
| bool _num_as_string; |
| bool _fuzzy_parse; |
| |
| std::vector<std::vector<JsonPath>> _parsed_jsonpaths; |
| std::vector<JsonPath> _parsed_json_root; |
| bool _parsed_from_json_root = false; // to avoid parsing json root multiple times |
| |
| char _value_buffer[4 * 1024 * 1024]; // 4MB |
| char _parse_buffer[512 * 1024]; // 512KB |
| |
| using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>, |
| rapidjson::MemoryPoolAllocator<>>; |
| rapidjson::MemoryPoolAllocator<> _value_allocator; |
| rapidjson::MemoryPoolAllocator<> _parse_allocator; |
| Document _origin_json_doc; // origin json document object from parsed json string |
| rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root` |
| std::unordered_map<std::string, int> _name_map; |
| |
| bool* _scanner_eof = nullptr; |
| |
| size_t _current_offset; |
| |
| io::IOContext* _io_ctx = nullptr; |
| std::shared_ptr<io::IOContext> _io_ctx_holder; |
| |
| RuntimeProfile::Counter* _read_timer = nullptr; |
| |
| // ======SIMD JSON====== |
| // name mapping |
| /// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map. |
| using NameMap = phmap::flat_hash_map<StringRef, size_t, StringRefHash>; |
| NameMap _slot_desc_index; |
| /// Cached search results for previous row (keyed as index in JSON object) - used as a hint. |
| std::vector<NameMap::iterator> _prev_positions; |
| /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name. |
| std::vector<UInt8> _seen_columns; |
| // simdjson |
| DorisUniqueBufferPtr<uint8_t> _json_str_ptr; |
| const uint8_t* _json_str = nullptr; |
| static constexpr size_t _init_buffer_size = 1024 * 1024 * 8; |
| size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING; |
| size_t _original_doc_size = 0; |
| std::string _simdjson_ondemand_padding_buffer; |
| std::string _simdjson_ondemand_unscape_padding_buffer; |
| // char _simdjson_ondemand_padding_buffer[_padded_size]; |
| simdjson::ondemand::document _original_json_doc; |
| simdjson::ondemand::value _json_value; |
| // for strip outer array |
| // array_iter pointed to _array |
| simdjson::ondemand::array_iterator _array_iter; |
| simdjson::ondemand::array _array; |
| std::unique_ptr<simdjson::ondemand::parser> _ondemand_json_parser; |
| // column to default value string map |
| std::unordered_map<std::string, std::string> _col_default_value_map; |
| |
| // From document of simdjson: |
| // ``` |
| // Important: a value should be consumed once. Calling get_string() twice on the same value is an error. |
| // ``` |
| // We should cache the string_views to avoid multiple get_string() calls. |
| struct StringViewHash { |
| size_t operator()(const std::string_view& str) const { |
| return std::hash<int64_t>()(reinterpret_cast<int64_t>(str.data())); |
| } |
| }; |
| struct StringViewEqual { |
| bool operator()(const std::string_view& lhs, const std::string_view& rhs) const { |
| return lhs.data() == rhs.data() && lhs.size() == rhs.size(); |
| } |
| }; |
| std::unordered_map<std::string_view, std::string_view, StringViewHash, StringViewEqual> |
| _cached_string_values; |
| |
| int32_t skip_bitmap_col_idx {-1}; |
| |
| //Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString. |
| //If an illegal value is encountered during the load process, `_append_error_msg` should be called |
| //instead of directly returning `Status::DataQualityError` |
| bool _is_load = true; |
| |
| // In hive : create table xxx ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; |
| // Hive will not allow you to create columns with the same name but different case, including field names inside |
| // structs, and will automatically convert uppercase names in create sql to lowercase.However, when Hive loads data |
| // to table, the column names in the data may be uppercase,and there may be multiple columns with |
| // the same name but different capitalization.We refer to the behavior of hive, convert all column names |
| // in the data to lowercase,and use the last one as the insertion value |
| bool _is_hive_table = false; |
| |
| // hive : org.openx.data.jsonserde.JsonSerDe, `ignore.malformed.json` prop. |
| // If the variable is true, `null` will be inserted for llegal json format instead of return error. |
| bool _openx_json_ignore_malformed = false; |
| |
| DataTypeSerDeSPtrs _serdes; |
| DataTypeSerDe::FormatOptions _serde_options; |
| }; |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |