blob: cecfcf3f0dcf540cb29794a9949e44f779bd20e7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "format/json/new_json_reader.h"
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <rapidjson/error/en.h>
#include <rapidjson/reader.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <simdjson/simdjson.h> // IWYU pragma: keep
#include <algorithm>
#include <cinttypes>
#include <cstdio>
#include <cstring>
#include <map>
#include <memory>
#include <string_view>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_array.h"
#include "core/column/column_map.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_struct.h"
#include "core/custom_allocator.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_factory.hpp"
#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
#include "core/data_type/data_type_struct.h"
#include "core/data_type/define_primitive_type.h"
#include "exec/scan/scanner.h"
#include "exprs/json_functions.h"
#include "format/file_reader/new_plain_text_line_reader.h"
#include "io/file_factory.h"
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/stream_load_pipe.h"
#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "util/slice.h"
namespace doris::io {
struct IOContext;
enum class FileCachePolicy : uint8_t;
} // namespace doris::io
namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;
NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
: _vhandle_json_callback(nullptr),
_state(state),
_profile(profile),
_counter(counter),
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
_file_reader(nullptr),
_line_reader(nullptr),
_reader_eof(false),
_decompressor(nullptr),
_skip_first_line(false),
_next_row(0),
_total_rows(0),
_value_allocator(_value_buffer, sizeof(_value_buffer)),
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
_scanner_eof(scanner_eof),
_current_offset(0),
_io_ctx(io_ctx),
_io_ctx_holder(std::move(io_ctx_holder)) {
if (_io_ctx == nullptr && _io_ctx_holder) {
_io_ctx = _io_ctx_holder.get();
}
_read_timer = ADD_TIMER(_profile, "ReadTime");
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_init_system_properties();
_init_file_description();
}
NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
: _vhandle_json_callback(nullptr),
_state(nullptr),
_profile(profile),
_params(params),
_range(range),
_file_slot_descs(file_slot_descs),
_line_reader(nullptr),
_reader_eof(false),
_decompressor(nullptr),
_skip_first_line(false),
_next_row(0),
_total_rows(0),
_value_allocator(_value_buffer, sizeof(_value_buffer)),
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
_io_ctx(io_ctx),
_io_ctx_holder(std::move(io_ctx_holder)) {
if (_io_ctx == nullptr && _io_ctx_holder) {
_io_ctx = _io_ctx_holder.get();
}
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_init_system_properties();
_init_file_description();
}
void NewJsonReader::_init_system_properties() {
if (_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _range.file_type;
} else {
_system_properties.system_type = _params.file_type;
}
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
_system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
_params.broker_addresses.end());
}
}
void NewJsonReader::_init_file_description() {
_file_description.path = _range.path;
_file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
if (_range.__isset.fs_name) {
_file_description.fs_name = _range.fs_name;
}
if (_range.__isset.file_cache_admission) {
_file_description.file_cache_admission = _range.file_cache_admission;
}
}
Status NewJsonReader::init_reader(
const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx,
bool is_load) {
_is_load = is_load;
// generate _col_default_value_map
RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, col_default_value_ctx));
//use serde insert data to column.
for (auto* slot_desc : _file_slot_descs) {
_serdes.emplace_back(slot_desc->get_data_type_ptr()->get_serde());
}
// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
RETURN_IF_ERROR(_simdjson_init_reader());
return Status::OK();
}
Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_reader_eof) {
*eof = true;
return Status::OK();
}
const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
while (block->rows() < batch_size && !_reader_eof) {
if (UNLIKELY(_read_json_by_line && _skip_first_line)) {
size_t size = 0;
const uint8_t* line_ptr = nullptr;
RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, &_reader_eof, _io_ctx));
_skip_first_line = false;
continue;
}
bool is_empty_row = false;
RETURN_IF_ERROR(
_read_json_column(_state, *block, _file_slot_descs, &is_empty_row, &_reader_eof));
if (is_empty_row) {
// Read empty row, just continue
continue;
}
++(*read_rows);
}
return Status::OK();
}
Status NewJsonReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (const auto& slot : _file_slot_descs) {
name_to_type->emplace(slot->col_name(), slot->type());
}
return Status::OK();
}
// init decompressor, file reader and line reader for parsing schema
Status NewJsonReader::init_schema_reader() {
RETURN_IF_ERROR(_get_range_params());
// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
RETURN_IF_ERROR(_open_file_reader(true));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}
// generate _parsed_jsonpaths and _parsed_json_root
RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
return Status::OK();
}
Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
bool eof = false;
const uint8_t* json_str = nullptr;
DorisUniqueBufferPtr<uint8_t> json_str_ptr;
size_t size = 0;
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof, _io_ctx));
} else {
size_t read_size = 0;
RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size));
json_str = json_str_ptr.get();
size = read_size;
if (read_size == 0) {
eof = true;
}
}
if (size == 0 || eof) {
return Status::EndOfFile("Empty file.");
}
// clear memory here.
_value_allocator.Clear();
_parse_allocator.Clear();
bool has_parse_error = false;
// parse jsondata to JsonDoc
// As the issue: https://github.com/Tencent/rapidjson/issues/1458
// Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
if (_num_as_string) {
has_parse_error =
_origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, size)
.HasParseError();
} else {
has_parse_error = _origin_json_doc.Parse((char*)json_str, size).HasParseError();
}
if (has_parse_error) {
return Status::DataQualityError(
"Parse json data for JsonDoc failed. code: {}, error info: {}",
_origin_json_doc.GetParseError(),
rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
}
// set json root
if (!_parsed_json_root.empty()) {
_json_doc = JsonFunctions::get_json_object_from_parsed_json(
_parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator());
if (_json_doc == nullptr) {
return Status::DataQualityError("JSON Root not found.");
}
} else {
_json_doc = &_origin_json_doc;
}
if (_json_doc->IsArray() && !_strip_outer_array) {
return Status::DataQualityError(
"JSON data is array-object, `strip_outer_array` must be TRUE.");
}
if (!_json_doc->IsArray() && _strip_outer_array) {
return Status::DataQualityError(
"JSON data is not an array-object, `strip_outer_array` must be FALSE.");
}
rapidjson::Value* objectValue = nullptr;
if (_json_doc->IsArray()) {
if (_json_doc->Size() == 0) {
// may be passing an empty json, such as "[]"
return Status::InternalError<false>("Empty first json line");
}
objectValue = &(*_json_doc)[0];
} else {
objectValue = _json_doc;
}
if (!objectValue->IsObject()) {
return Status::DataQualityError("JSON data is not an object. but: {}",
objectValue->GetType());
}
// use jsonpaths to col_names
if (!_parsed_jsonpaths.empty()) {
for (auto& _parsed_jsonpath : _parsed_jsonpaths) {
size_t len = _parsed_jsonpath.size();
if (len == 0) {
return Status::InvalidArgument("It's invalid jsonpaths.");
}
std::string key = _parsed_jsonpath[len - 1].key;
col_names->emplace_back(key);
col_types->emplace_back(
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
}
return Status::OK();
}
for (int i = 0; i < objectValue->MemberCount(); ++i) {
auto it = objectValue->MemberBegin() + i;
col_names->emplace_back(it->name.GetString());
col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
}
return Status::OK();
}
Status NewJsonReader::_get_range_params() {
if (!_params.__isset.file_attributes) {
return Status::InternalError<false>("BE cat get file_attributes");
}
// get line_delimiter
if (_params.file_attributes.__isset.text_params &&
_params.file_attributes.text_params.__isset.line_delimiter) {
_line_delimiter = _params.file_attributes.text_params.line_delimiter;
_line_delimiter_length = _line_delimiter.size();
}
if (_params.file_attributes.__isset.jsonpaths) {
_jsonpaths = _params.file_attributes.jsonpaths;
}
if (_params.file_attributes.__isset.json_root) {
_json_root = _params.file_attributes.json_root;
}
if (_params.file_attributes.__isset.read_json_by_line) {
_read_json_by_line = _params.file_attributes.read_json_by_line;
}
if (_params.file_attributes.__isset.strip_outer_array) {
_strip_outer_array = _params.file_attributes.strip_outer_array;
}
if (_params.file_attributes.__isset.num_as_string) {
_num_as_string = _params.file_attributes.num_as_string;
}
if (_params.file_attributes.__isset.fuzzy_parse) {
_fuzzy_parse = _params.file_attributes.fuzzy_parse;
}
if (_range.table_format_params.table_format_type == "hive") {
_is_hive_table = true;
}
if (_params.file_attributes.__isset.openx_json_ignore_malformed) {
_openx_json_ignore_malformed = _params.file_attributes.openx_json_ignore_malformed;
}
return Status::OK();
}
static Status ignore_malformed_json_append_null(Block& block) {
for (auto& column : block.get_columns()) {
if (!column->is_nullable()) [[unlikely]] {
return Status::DataQualityError("malformed json, but the column `{}` is not nullable.",
column->get_name());
}
static_cast<ColumnNullable*>(column->assume_mutable().get())->insert_default();
}
return Status::OK();
}
Status NewJsonReader::_open_file_reader(bool need_schema) {
int64_t start_offset = _range.start_offset;
if (start_offset != 0) {
start_offset -= 1;
}
_current_offset = start_offset;
if (_params.file_type == TFileType::FILE_STREAM) {
// Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
need_schema));
} else {
_file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
io::FileReaderSPtr file_reader;
if (_io_ctx_holder) {
file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL,
std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
io::PrefetchRange(_range.start_offset, _range.size)));
} else {
file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description, reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
}
_file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
}
return Status::OK();
}
Status NewJsonReader::_open_line_reader() {
int64_t size = _range.size;
if (_range.start_offset != 0) {
// When we fetch range doesn't start from 0, size will += 1.
size += 1;
_skip_first_line = true;
} else {
_skip_first_line = false;
}
_line_reader = NewPlainTextLineReader::create_unique(
_profile, _file_reader, _decompressor.get(),
std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length,
false),
size, _current_offset);
return Status::OK();
}
Status NewJsonReader::_parse_jsonpath_and_json_root() {
// parse jsonpaths
if (!_jsonpaths.empty()) {
rapidjson::Document jsonpaths_doc;
if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError()) {
if (!jsonpaths_doc.IsArray()) {
return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths);
}
for (int i = 0; i < jsonpaths_doc.Size(); i++) {
const rapidjson::Value& path = jsonpaths_doc[i];
if (!path.IsString()) {
return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths);
}
std::string json_path = path.GetString();
// $ -> $. in json_path
if (UNLIKELY(json_path.size() == 1 && json_path[0] == '$')) {
json_path.insert(1, ".");
}
std::vector<JsonPath> parsed_paths;
JsonFunctions::parse_json_paths(json_path, &parsed_paths);
_parsed_jsonpaths.push_back(std::move(parsed_paths));
}
} else {
return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths);
}
}
// parse jsonroot
if (!_json_root.empty()) {
std::string json_root = _json_root;
// $ -> $. in json_root
if (json_root.size() == 1 && json_root[0] == '$') {
json_root.insert(1, ".");
}
JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
}
return Status::OK();
}
Status NewJsonReader::_read_json_column(RuntimeState* state, Block& block,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
return (this->*_vhandle_json_callback)(state, block, slot_descs, is_empty_row, eof);
}
Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf,
size_t* read_size) {
switch (_params.file_type) {
case TFileType::FILE_LOCAL:
[[fallthrough]];
case TFileType::FILE_HDFS:
case TFileType::FILE_HTTP:
[[fallthrough]];
case TFileType::FILE_S3: {
size_t file_size = _file_reader->size();
*file_buf = make_unique_buffer<uint8_t>(file_size);
Slice result(file_buf->get(), file_size);
RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, read_size, _io_ctx));
_current_offset += *read_size;
break;
}
case TFileType::FILE_STREAM: {
RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size));
break;
}
default: {
return Status::NotSupported<false>("no supported file reader type: {}", _params.file_type);
}
}
return Status::OK();
}
Status NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf,
size_t* read_size) {
auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
// first read: read from the pipe once.
RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size));
// When the file is not chunked, the entire file has already been read.
if (!stream_load_pipe->is_chunked_transfer()) {
return Status::OK();
}
std::vector<uint8_t> buf;
uint64_t cur_size = 0;
// second read: continuously read data from the pipe until all data is read.
DorisUniqueBufferPtr<uint8_t> read_buf;
size_t read_buf_size = 0;
while (true) {
RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size));
if (read_buf_size == 0) {
break;
} else {
buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size);
cur_size += read_buf_size;
read_buf_size = 0;
read_buf.reset();
}
}
// No data is available during the second read.
if (cur_size == 0) {
return Status::OK();
}
DorisUniqueBufferPtr<uint8_t> total_buf = make_unique_buffer<uint8_t>(cur_size + *read_size);
// copy the data during the first read
memcpy(total_buf.get(), file_buf->get(), *read_size);
// copy the data during the second read
memcpy(total_buf.get() + *read_size, buf.data(), cur_size);
*file_buf = std::move(total_buf);
*read_size += cur_size;
return Status::OK();
}
// ---------SIMDJSON----------
// simdjson, replace none simdjson function if it is ready
Status NewJsonReader::_simdjson_init_reader() {
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader(false));
if (LIKELY(_read_json_by_line)) {
RETURN_IF_ERROR(_open_line_reader());
}
// generate _parsed_jsonpaths and _parsed_json_root
RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
//improve performance
if (_parsed_jsonpaths.empty()) { // input is a simple json-string
_vhandle_json_callback = &NewJsonReader::_simdjson_handle_simple_json;
} else { // input is a complex json-string and a json-path
if (_strip_outer_array) {
_vhandle_json_callback = &NewJsonReader::_simdjson_handle_flat_array_complex_json;
} else {
_vhandle_json_callback = &NewJsonReader::_simdjson_handle_nested_complex_json;
}
}
_ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
for (int i = 0; i < _file_slot_descs.size(); ++i) {
_slot_desc_index[StringRef {_file_slot_descs[i]->col_name()}] = i;
if (_file_slot_descs[i]->is_skip_bitmap_col()) {
skip_bitmap_col_idx = i;
}
}
_simdjson_ondemand_padding_buffer.resize(_padded_size);
_simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
return Status::OK();
}
Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Block& block,
size_t num_rows, bool* eof) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", error.error(),
error.what());
_counter->num_rows_filtered++;
// Before continuing to process other rows, we need to first clean the fail parsed row.
for (int i = 0; i < block.columns(); ++i) {
auto column = block.get_by_position(i).column->assume_mutable();
if (column->size() > num_rows) {
column->pop_back(column->size() - num_rows);
}
}
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size);
},
[&]() -> std::string { return fmt::to_string(error_msg); }));
return Status::OK();
}
Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Block& block,
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
// simple json
size_t size = 0;
simdjson::error_code error;
size_t num_rows = block.rows();
try {
// step1: get and parse buf to get json doc
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
if (size == 0 || *eof) {
*is_empty_row = true;
return Status::OK();
}
// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
if (_is_load) {
return Status::OK();
} else if (_openx_json_ignore_malformed) {
RETURN_IF_ERROR(ignore_malformed_json_append_null(block));
return Status::OK();
}
}
RETURN_IF_ERROR(st);
if (*is_empty_row || *eof) {
return Status::OK();
}
// step 3: write columns by json value
RETURN_IF_ERROR(
_simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof));
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
}
return Status::OK();
}
Status NewJsonReader::_simdjson_handle_simple_json_write_columns(
Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
bool* eof) {
simdjson::ondemand::object objectValue;
size_t num_rows = block.rows();
bool valid = false;
try {
if (_json_value.type() == simdjson::ondemand::json_type::array) {
_array = _json_value.get_array();
if (_array.count_elements() == 0) {
// may be passing an empty json, such as "[]"
RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", "", nullptr));
if (*_scanner_eof) {
*is_empty_row = true;
return Status::OK();
}
return Status::OK();
}
_array_iter = _array.begin();
while (true) {
objectValue = *_array_iter;
RETURN_IF_ERROR(
_simdjson_set_column_value(&objectValue, block, slot_descs, &valid));
if (!valid) {
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
}
++_array_iter;
if (_array_iter == _array.end()) {
// Hint to read next json doc
break;
}
}
} else {
objectValue = _json_value;
RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block, slot_descs, &valid));
if (!valid) {
if (*_scanner_eof) {
*is_empty_row = true;
return Status::OK();
}
}
*is_empty_row = false;
}
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (!valid) {
if (*_scanner_eof) {
*is_empty_row = true;
return Status::OK();
}
}
}
return Status::OK();
}
Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
// array complex json
size_t size = 0;
simdjson::error_code error;
size_t num_rows = block.rows();
try {
// step1: get and parse buf to get json doc
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
if (size == 0 || *eof) {
*is_empty_row = true;
return Status::OK();
}
// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
return Status::OK();
}
// step 3: write columns by json value
RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs,
is_empty_row, eof));
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
}
return Status::OK();
}
Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns(
Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
bool* eof) {
// Advance one row in array list, if it is the endpoint, stop advance and break the loop
#define ADVANCE_ROW() \
++_array_iter; \
if (_array_iter == _array.end()) { \
break; \
}
simdjson::ondemand::object cur;
size_t num_rows = block.rows();
try {
bool valid = true;
_array = _json_value.get_array();
_array_iter = _array.begin();
while (true) {
cur = (*_array_iter).get_object();
// extract root
if (!_parsed_from_json_root && !_parsed_json_root.empty()) {
simdjson::ondemand::value val;
Status st = JsonFunctions::extract_from_object(cur, _parsed_json_root, &val);
if (UNLIKELY(!st.ok())) {
if (st.is<NOT_FOUND>()) {
RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr));
ADVANCE_ROW();
continue;
}
return st;
}
if (val.type() != simdjson::ondemand::json_type::object) {
RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr));
ADVANCE_ROW();
continue;
}
cur = val.get_object();
}
RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid));
ADVANCE_ROW();
if (!valid) {
continue; // process next line
}
*is_empty_row = false;
}
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
}
return Status::OK();
}
Status NewJsonReader::_simdjson_handle_nested_complex_json(
RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row, bool* eof) {
// nested complex json
while (true) {
size_t num_rows = block.rows();
simdjson::ondemand::object cur;
size_t size = 0;
simdjson::error_code error;
try {
RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
if (size == 0 || *eof) {
*is_empty_row = true;
return Status::OK();
}
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
continue; // continue to read next
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
return Status::OK();
}
*is_empty_row = false;
bool valid = true;
if (_json_value.type() != simdjson::ondemand::json_type::object) {
RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr));
continue;
}
cur = _json_value.get_object();
st = _simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid);
if (!st.ok()) {
RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr));
// Before continuing to process other rows, we need to first clean the fail parsed row.
for (int i = 0; i < block.columns(); ++i) {
auto column = block.get_by_position(i).column->assume_mutable();
if (column->size() > num_rows) {
column->pop_back(column->size() - num_rows);
}
}
continue;
}
if (!valid) {
// there is only one line in this case, so if it return false, just set is_empty_row true
// so that the caller will continue reading next line.
*is_empty_row = true;
}
break; // read a valid row
} catch (simdjson::simdjson_error& e) {
RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
if (*_scanner_eof) {
// When _scanner_eof is true and valid is false, it means that we have encountered
// unqualified data and decided to stop the scan.
*is_empty_row = true;
return Status::OK();
}
continue;
}
}
return Status::OK();
}
size_t NewJsonReader::_column_index(const StringRef& name, size_t key_index) {
/// Optimization by caching the order of fields (which is almost always the same)
/// and a quick check to match the next expected field, instead of searching the hash table.
if (_prev_positions.size() > key_index && name == _prev_positions[key_index]->first) {
return _prev_positions[key_index]->second;
}
auto it = _slot_desc_index.find(name);
if (it != _slot_desc_index.end()) {
if (key_index < _prev_positions.size()) {
_prev_positions[key_index] = it;
}
return it->second;
}
return size_t(-1);
}
Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
const std::vector<SlotDescriptor*>& slot_descs,
bool* valid) {
// set
_seen_columns.assign(block.columns(), false);
size_t cur_row_count = block.rows();
bool has_valid_value = false;
// iterate through object, simdjson::ondemond will parsing on the fly
size_t key_index = 0;
for (auto field : *value) {
std::string_view key = field.unescaped_key();
StringRef name_ref(key.data(), key.size());
std::string key_string;
if (_is_hive_table) {
key_string = name_ref.to_string();
std::transform(key_string.begin(), key_string.end(), key_string.begin(), ::tolower);
name_ref = StringRef(key_string);
}
const size_t column_index = _column_index(name_ref, key_index++);
if (UNLIKELY(ssize_t(column_index) < 0)) {
// This key is not exist in slot desc, just ignore
continue;
}
if (column_index == skip_bitmap_col_idx) {
continue;
}
if (_seen_columns[column_index]) {
if (_is_hive_table) {
//Since value can only be traversed once,
// we can only insert the original value first, then delete it, and then reinsert the new value
block.get_by_position(column_index).column->assume_mutable()->pop_back(1);
} else {
continue;
}
}
simdjson::ondemand::value val = field.value();
auto* column_ptr = block.get_by_position(column_index).column->assume_mutable().get();
RETURN_IF_ERROR(_simdjson_write_data_to_column<false>(
val, slot_descs[column_index]->type(), column_ptr,
slot_descs[column_index]->col_name(), _serdes[column_index], valid));
if (!(*valid)) {
return Status::OK();
}
_seen_columns[column_index] = true;
has_valid_value = true;
}
if (!has_valid_value && _is_load) {
std::string col_names;
for (auto* slot_desc : slot_descs) {
col_names.append(slot_desc->col_name() + ", ");
}
RETURN_IF_ERROR(_append_error_msg(value,
"There is no column matching jsonpaths in the json file, "
"columns:[{}], please check columns "
"and jsonpaths:" +
_jsonpaths,
col_names, valid));
return Status::OK();
}
if (_should_process_skip_bitmap_col()) {
_append_empty_skip_bitmap_value(block, cur_row_count);
}
// fill missing slot
int nullcount = 0;
for (size_t i = 0; i < slot_descs.size(); ++i) {
if (_seen_columns[i]) {
continue;
}
if (i == skip_bitmap_col_idx) {
continue;
}
auto* slot_desc = slot_descs[i];
auto* column_ptr = block.get_by_position(i).column->assume_mutable().get();
// Quick path to insert default value, instead of using default values in the value map.
if (!_should_process_skip_bitmap_col() &&
(_col_default_value_map.empty() ||
_col_default_value_map.find(slot_desc->col_name()) == _col_default_value_map.end())) {
column_ptr->insert_default();
continue;
}
if (column_ptr->size() < cur_row_count + 1) {
DCHECK(column_ptr->size() == cur_row_count);
if (_should_process_skip_bitmap_col()) {
// not found, skip this column in flexible partial update
if (slot_desc->is_key() && !slot_desc->is_auto_increment()) {
RETURN_IF_ERROR(
_append_error_msg(value,
"The key columns can not be ommited in flexible "
"partial update, missing key column: {}",
slot_desc->col_name(), valid));
// remove this line in block
for (size_t index = 0; index < block.columns(); ++index) {
auto column = block.get_by_position(index).column->assume_mutable();
if (column->size() != cur_row_count) {
DCHECK(column->size() == cur_row_count + 1);
column->pop_back(1);
DCHECK(column->size() == cur_row_count);
}
}
return Status::OK();
}
_set_skip_bitmap_mark(slot_desc, column_ptr, block, cur_row_count, valid);
column_ptr->insert_default();
} else {
RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid));
if (!(*valid)) {
return Status::OK();
}
}
++nullcount;
}
DCHECK(column_ptr->size() == cur_row_count + 1);
}
// There is at least one valid value here
DCHECK(nullcount < block.columns());
*valid = true;
return Status::OK();
}
template <bool use_string_cache>
Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
const DataTypePtr& type_desc,
IColumn* column_ptr,
const std::string& column_name,
DataTypeSerDeSPtr serde, bool* valid) {
ColumnNullable* nullable_column = nullptr;
IColumn* data_column_ptr = column_ptr;
DataTypeSerDeSPtr data_serde = serde;
if (column_ptr->is_nullable()) {
nullable_column = reinterpret_cast<ColumnNullable*>(column_ptr);
data_column_ptr = nullable_column->get_nested_column().get_ptr().get();
data_serde = serde->get_nested_serdes()[0];
// kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType.
if (value.type() == simdjson::ondemand::json_type::null) {
nullable_column->insert_default();
*valid = true;
return Status::OK();
}
} else if (value.type() == simdjson::ondemand::json_type::null) [[unlikely]] {
if (_is_load) {
RETURN_IF_ERROR(_append_error_msg(
nullptr, "Json value is null, but the column `{}` is not nullable.",
column_name, valid));
return Status::OK();
} else {
return Status::DataQualityError(
"Json value is null, but the column `{}` is not nullable.", column_name);
}
}
auto primitive_type = type_desc->get_primitive_type();
if (_is_load || !is_complex_type(primitive_type)) {
if (value.type() == simdjson::ondemand::json_type::string) {
std::string_view value_string;
if constexpr (use_string_cache) {
const auto cache_key = value.raw_json().value();
if (_cached_string_values.contains(cache_key)) {
value_string = _cached_string_values[cache_key];
} else {
value_string = value.get_string();
_cached_string_values.emplace(cache_key, value_string);
}
} else {
DCHECK(_cached_string_values.empty());
value_string = value.get_string();
}
Slice slice {value_string.data(), value_string.size()};
RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
_serde_options));
} else if (value.type() == simdjson::ondemand::json_type::boolean) {
const char* str_value = nullptr;
// insert "1"/"0" , not "true"/"false".
if (value.get_bool()) {
str_value = (char*)"1";
} else {
str_value = (char*)"0";
}
Slice slice {str_value, 1};
RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
_serde_options));
} else {
// Maybe we can `switch (value->GetType()) case: kNumberType`.
// Note that `if (value->IsInt())`, but column is FloatColumn.
std::string_view json_str = simdjson::to_json_string(value);
Slice slice {json_str.data(), json_str.size()};
RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
_serde_options));
}
} else if (primitive_type == TYPE_STRUCT) {
if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] {
return Status::DataQualityError(
"Json value isn't object, but the column `{}` is struct.", column_name);
}
const auto* type_struct =
assert_cast<const DataTypeStruct*>(remove_nullable(type_desc).get());
auto sub_col_size = type_struct->get_elements().size();
simdjson::ondemand::object struct_value = value.get_object();
auto sub_serdes = data_serde->get_nested_serdes();
auto* struct_column_ptr = assert_cast<ColumnStruct*>(data_column_ptr);
std::map<std::string, size_t> sub_col_name_to_idx;
for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) {
sub_col_name_to_idx.emplace(type_struct->get_element_name(sub_col_idx), sub_col_idx);
}
std::vector<bool> has_value(sub_col_size, false);
for (simdjson::ondemand::field sub : struct_value) {
std::string_view sub_key_view = sub.unescaped_key();
std::string sub_key(sub_key_view.data(), sub_key_view.length());
std::transform(sub_key.begin(), sub_key.end(), sub_key.begin(), ::tolower);
if (sub_col_name_to_idx.find(sub_key) == sub_col_name_to_idx.end()) [[unlikely]] {
continue;
}
size_t sub_column_idx = sub_col_name_to_idx[sub_key];
auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr();
if (has_value[sub_column_idx]) [[unlikely]] {
// Since struct_value can only be traversed once, we can only insert
// the original value first, then delete it, and then reinsert the new value.
sub_column_ptr->pop_back(1);
}
has_value[sub_column_idx] = true;
const auto& sub_col_type = type_struct->get_element(sub_column_idx);
RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
sub.value(), sub_col_type, sub_column_ptr.get(), column_name + "." + sub_key,
sub_serdes[sub_column_idx], valid));
}
//fill missing subcolumn
for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) {
if (has_value[sub_col_idx]) {
continue;
}
auto sub_column_ptr = struct_column_ptr->get_column(sub_col_idx).get_ptr();
if (sub_column_ptr->is_nullable()) {
sub_column_ptr->insert_default();
continue;
} else [[unlikely]] {
return Status::DataQualityError(
"Json file structColumn miss field {} and this column isn't nullable.",
column_name + "." + type_struct->get_element_name(sub_col_idx));
}
}
} else if (primitive_type == TYPE_MAP) {
if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] {
return Status::DataQualityError("Json value isn't object, but the column `{}` is map.",
column_name);
}
simdjson::ondemand::object object_value = value.get_object();
auto sub_serdes = data_serde->get_nested_serdes();
auto* map_column_ptr = assert_cast<ColumnMap*>(data_column_ptr);
size_t field_count = 0;
for (simdjson::ondemand::field member_value : object_value) {
auto f = [](std::string_view key_view, const DataTypePtr& type_desc,
IColumn* column_ptr, DataTypeSerDeSPtr serde,
DataTypeSerDe::FormatOptions serde_options, bool* valid) {
auto* data_column_ptr = column_ptr;
auto data_serde = serde;
if (column_ptr->is_nullable()) {
auto* nullable_column = static_cast<ColumnNullable*>(column_ptr);
nullable_column->get_null_map_data().push_back(0);
data_column_ptr = nullable_column->get_nested_column().get_ptr().get();
data_serde = serde->get_nested_serdes()[0];
}
Slice slice(key_view.data(), key_view.length());
RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
serde_options));
return Status::OK();
};
RETURN_IF_ERROR(f(member_value.unescaped_key(),
assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get())
->get_key_type(),
map_column_ptr->get_keys_ptr()->assume_mutable()->get_ptr().get(),
sub_serdes[0], _serde_options, valid));
simdjson::ondemand::value field_value = member_value.value();
RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
field_value,
assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get())
->get_value_type(),
map_column_ptr->get_values_ptr()->assume_mutable()->get_ptr().get(),
column_name + ".value", sub_serdes[1], valid));
field_count++;
}
auto& offsets = map_column_ptr->get_offsets();
offsets.emplace_back(offsets.back() + field_count);
} else if (primitive_type == TYPE_ARRAY) {
if (value.type() != simdjson::ondemand::json_type::array) [[unlikely]] {
return Status::DataQualityError("Json value isn't array, but the column `{}` is array.",
column_name);
}
simdjson::ondemand::array array_value = value.get_array();
auto sub_serdes = data_serde->get_nested_serdes();
auto* array_column_ptr = assert_cast<ColumnArray*>(data_column_ptr);
int field_count = 0;
for (simdjson::ondemand::value sub_value : array_value) {
RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
sub_value,
assert_cast<const DataTypeArray*>(remove_nullable(type_desc).get())
->get_nested_type(),
array_column_ptr->get_data().get_ptr().get(), column_name + ".element",
sub_serdes[0], valid));
field_count++;
}
auto& offsets = array_column_ptr->get_offsets();
offsets.emplace_back(offsets.back() + field_count);
} else {
return Status::InternalError("Not support load to complex column.");
}
//We need to finally set the nullmap of column_nullable to keep the size consistent with data_column
if (nullable_column && value.type() != simdjson::ondemand::json_type::null) {
nullable_column->get_null_map_data().push_back(0);
}
*valid = true;
return Status::OK();
}
Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
std::string col_name, bool* valid) {
std::string err_msg;
if (!col_name.empty()) {
fmt::memory_buffer error_buf;
fmt::format_to(error_buf, error_msg, col_name, _jsonpaths);
err_msg = fmt::to_string(error_buf);
} else {
err_msg = error_msg;
}
_counter->num_rows_filtered++;
if (valid != nullptr) {
// current row is invalid
*valid = false;
}
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
if (!obj) {
return "";
}
std::string_view str_view;
(void)!obj->raw_json().get(str_view);
return std::string(str_view.data(), str_view.size());
},
[&]() -> std::string { return err_msg; }));
return Status::OK();
}
Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
simdjson::error_code* error) {
SCOPED_TIMER(_read_timer);
// step1: read buf from pipe.
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx));
} else {
size_t length = 0;
RETURN_IF_ERROR(_read_one_message(&_json_str_ptr, &length));
_json_str = _json_str_ptr.get();
*size = length;
if (length == 0) {
*eof = true;
}
}
if (*eof) {
return Status::OK();
}
// step2: init json parser iterate.
if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
// For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
// Hence, a re-allocation is needed if the space is not enough.
_simdjson_ondemand_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING);
_simdjson_ondemand_unscape_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING);
_padded_size = *size + simdjson::SIMDJSON_PADDING;
}
// trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') {
// skip the first three BOM bytes
_json_str += 3;
*size -= 3;
}
memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
_original_doc_size = *size;
*error = _ondemand_json_parser
->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
_padded_size)
.get(_original_json_doc);
return Status::OK();
}
Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row) {
if (size == 0 || eof) {
*is_empty_row = true;
return Status::OK();
}
if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
_total_rows = _json_value.count_elements().value();
_next_row = 0;
if (_total_rows == 0) {
// meet an empty json array.
*is_empty_row = true;
}
}
return Status::OK();
}
Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
bool* is_empty_row) {
SCOPED_TIMER(_read_timer);
auto return_quality_error = [&](fmt::memory_buffer& error_msg,
const std::string& doc_info) -> Status {
_counter->num_rows_filtered++;
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return doc_info; },
[&]() -> std::string { return fmt::to_string(error_msg); }));
if (*_scanner_eof) {
// Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
// we meet enough invalid rows and the scanner should be stopped.
// So we set eof to true and return OK, the caller will stop the process as we meet the end of file.
*eof = true;
return Status::OK();
}
return Status::DataQualityError(fmt::to_string(error_msg));
};
if (*error != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
*error, simdjson::error_message(*error));
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
auto type_res = _original_json_doc.type();
if (type_res.error() != simdjson::error_code::SUCCESS) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
type_res.error(), simdjson::error_message(type_res.error()));
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
simdjson::ondemand::json_type type = type_res.value();
if (type != simdjson::ondemand::json_type::object &&
type != simdjson::ondemand::json_type::array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Not an json object or json array");
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) {
try {
// set json root
// if it is an array at top level, then we should iterate the entire array in
// ::_simdjson_handle_flat_array_complex_json
simdjson::ondemand::object object = _original_json_doc;
Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value);
if (!st.ok()) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", st.to_string());
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
_parsed_from_json_root = true;
} catch (simdjson::simdjson_error& e) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}",
e.what());
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
} else {
_json_value = _original_json_doc;
}
if (_json_value.type() == simdjson::ondemand::json_type::array && !_strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is array-object, `strip_outer_array` must be TRUE.");
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}",
"JSON data is not an array-object, `strip_outer_array` must be FALSE.");
return return_quality_error(error_msg, std::string((char*)_json_str, *size));
}
RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
return Status::OK();
}
Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
simdjson::ondemand::object* value, const std::vector<SlotDescriptor*>& slot_descs,
Block& block, bool* valid) {
// write by jsonpath
bool has_valid_value = false;
Defer clear_defer([this]() { _cached_string_values.clear(); });
for (size_t i = 0; i < slot_descs.size(); i++) {
auto* slot_desc = slot_descs[i];
auto* column_ptr = block.get_by_position(i).column->assume_mutable().get();
simdjson::ondemand::value json_value;
Status st;
if (i < _parsed_jsonpaths.size()) {
st = JsonFunctions::extract_from_object(*value, _parsed_jsonpaths[i], &json_value);
if (!st.ok() && !st.is<NOT_FOUND>()) {
return st;
}
}
if (i < _parsed_jsonpaths.size() && JsonFunctions::is_root_path(_parsed_jsonpaths[i])) {
// Indicate that the jsonpath is "$" or "$.", read the full root json object, insert the original doc directly
ColumnNullable* nullable_column = nullptr;
IColumn* target_column_ptr = nullptr;
if (slot_desc->is_nullable()) {
nullable_column = assert_cast<ColumnNullable*>(column_ptr);
target_column_ptr = &nullable_column->get_nested_column();
nullable_column->get_null_map_data().push_back(0);
}
auto* column_string = assert_cast<ColumnString*>(target_column_ptr);
column_string->insert_data(_simdjson_ondemand_padding_buffer.data(),
_original_doc_size);
has_valid_value = true;
} else if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
// not match in jsondata, filling with default value
RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid));
if (!(*valid)) {
return Status::OK();
}
} else {
RETURN_IF_ERROR(_simdjson_write_data_to_column<true>(json_value, slot_desc->type(),
column_ptr, slot_desc->col_name(),
_serdes[i], valid));
if (!(*valid)) {
return Status::OK();
}
has_valid_value = true;
}
}
if (!has_valid_value) {
// there is no valid value in json line but has filled with default value before
// so remove this line in block
std::string col_names;
for (int i = 0; i < block.columns(); ++i) {
auto column = block.get_by_position(i).column->assume_mutable();
column->pop_back(1);
}
for (auto* slot_desc : slot_descs) {
col_names.append(slot_desc->col_name() + ", ");
}
RETURN_IF_ERROR(_append_error_msg(value,
"There is no column matching jsonpaths in the json file, "
"columns:[{}], please check columns "
"and jsonpaths:" +
_jsonpaths,
col_names, valid));
return Status::OK();
}
*valid = true;
return Status::OK();
}
Status NewJsonReader::_get_column_default_value(
const std::vector<SlotDescriptor*>& slot_descs,
const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx) {
for (auto* slot_desc : slot_descs) {
auto it = col_default_value_ctx.find(slot_desc->col_name());
if (it != col_default_value_ctx.end() && it->second != nullptr) {
const auto& ctx = it->second;
// NULL_LITERAL means no valid value of current column
if (ctx->root()->node_type() == TExprNodeType::type::NULL_LITERAL) {
continue;
}
ColumnWithTypeAndName result;
RETURN_IF_ERROR(ctx->execute_const_expr(result));
DCHECK(result.column->size() == 1);
_col_default_value_map.emplace(slot_desc->col_name(),
result.column->get_data_at(0).to_string());
}
}
return Status::OK();
}
Status NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde,
IColumn* column_ptr, bool* valid) {
auto col_value = _col_default_value_map.find(slot_desc->col_name());
if (col_value == _col_default_value_map.end()) {
if (slot_desc->is_nullable()) {
auto* nullable_column = static_cast<ColumnNullable*>(column_ptr);
nullable_column->insert_default();
} else {
if (_is_load) {
RETURN_IF_ERROR(_append_error_msg(
nullptr, "The column `{}` is not nullable, but it's not found in jsondata.",
slot_desc->col_name(), valid));
} else {
return Status::DataQualityError(
"The column `{}` is not nullable, but it's not found in jsondata.",
slot_desc->col_name());
}
}
} else {
const std::string& v_str = col_value->second;
Slice column_default_value {v_str};
RETURN_IF_ERROR(serde->deserialize_one_cell_from_json(*column_ptr, column_default_value,
_serde_options));
}
*valid = true;
return Status::OK();
}
void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) {
auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(
block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get());
auto* skip_bitmap_col_ptr =
assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get());
DCHECK(skip_bitmap_nullable_col_ptr->size() == cur_row_count);
// should append an empty bitmap for every row wheather this line misses columns
skip_bitmap_nullable_col_ptr->get_null_map_data().push_back(0);
skip_bitmap_col_ptr->insert_default();
DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1);
}
void NewJsonReader::_set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr,
Block& block, size_t cur_row_count, bool* valid) {
// we record the missing column's column unique id in skip bitmap
// to indicate which columns need to do the alignment process
auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(
block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get());
auto* skip_bitmap_col_ptr =
assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get());
DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1);
auto& skip_bitmap = skip_bitmap_col_ptr->get_data().back();
skip_bitmap.add(slot_desc->col_unique_id());
}
void NewJsonReader::_collect_profile_before_close() {
if (_line_reader != nullptr) {
_line_reader->collect_profile_before_close();
}
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
}
#include "common/compile_check_end.h"
} // namespace doris