| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <rapidjson/allocators.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/encodings.h> |
| #include <rapidjson/pointer.h> |
| #include <rapidjson/rapidjson.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <rapidjson/writer.h> |
| #include <re2/re2.h> |
| #include <stdint.h> |
| #include <stdlib.h> |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <boost/iterator/iterator_facade.hpp> |
| #include <boost/token_functions.hpp> |
| #include <boost/tokenizer.hpp> |
| #include <memory> |
| #include <string> |
| #include <string_view> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/cast_set.h" |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/exception.h" |
| #include "common/status.h" |
| #include "exprs/json_functions.h" |
| #include "util/jsonb_parser_simd.h" |
| #include "util/string_parser.hpp" |
| #include "util/string_util.h" |
| #include "vec/aggregate_functions/aggregate_function.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_string.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/common/assert_cast.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_numbers.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type.h" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/data_types/data_type_number.h" |
| #include "vec/data_types/data_type_string.h" |
| #include "vec/functions/function.h" |
| #include "vec/functions/function_totype.h" |
| #include "vec/functions/simple_function_factory.h" |
| #include "vec/io/io_helper.h" |
| #include "vec/utils/stringop_substring.h" |
| #include "vec/utils/template_helpers.hpp" |
| |
| namespace doris { |
| class FunctionContext; |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| static const re2::RE2 JSON_PATTERN("^([^\\\"\\[\\]]*)(?:\\[([0-9]+|\\*)\\])?"); |
| |
| template <typename T, typename U> |
| void char_split(std::vector<T>& res, const U& var, char p) { |
| int start = 0; |
| int pos = start; |
| int end = var.length(); |
| while (pos < end) { |
| while (var[pos] != p && pos < end) { |
| pos++; |
| } |
| res.emplace_back(&var[start], pos - start); |
| pos++; |
| start = pos; |
| } |
| } |
| |
| // T = std::vector<std::string> |
| // TODO: update RE2 to support std::vector<std::string_view> |
| template <typename T> |
| void get_parsed_paths(const T& path_exprs, std::vector<JsonPath>* parsed_paths) { |
| if (path_exprs.empty()) { |
| return; |
| } |
| |
| if (path_exprs[0] != "$") { |
| parsed_paths->emplace_back("", -1, false); |
| } else { |
| parsed_paths->emplace_back("$", -1, true); |
| } |
| |
| for (int i = 1; i < path_exprs.size(); i++) { |
| std::string col; |
| std::string index; |
| if (UNLIKELY(!RE2::FullMatch(path_exprs[i], JSON_PATTERN, &col, &index))) { |
| parsed_paths->emplace_back("", -1, false); |
| } else { |
| int idx = -1; |
| if (!index.empty()) { |
| if (index == "*") { |
| idx = -2; |
| } else { |
| idx = atoi(index.c_str()); |
| } |
| } |
| parsed_paths->emplace_back(col, idx, true); |
| } |
| } |
| } |
| |
| rapidjson::Value* match_value(const std::vector<JsonPath>& parsed_paths, rapidjson::Value* document, |
| rapidjson::Document::AllocatorType& mem_allocator, |
| bool is_insert_null = false) { |
| rapidjson::Value* root = document; |
| rapidjson::Value* array_obj = nullptr; |
| for (int i = 1; i < parsed_paths.size(); i++) { |
| if (root == nullptr || root->IsNull()) { |
| return nullptr; |
| } |
| |
| if (UNLIKELY(!parsed_paths[i].is_valid)) { |
| return nullptr; |
| } |
| |
| const std::string& col = parsed_paths[i].key; |
| int index = parsed_paths[i].idx; |
| if (LIKELY(!col.empty())) { |
| if (root->IsArray()) { |
| array_obj = static_cast<rapidjson::Value*>( |
| mem_allocator.Malloc(sizeof(rapidjson::Value))); |
| array_obj->SetArray(); |
| bool is_null = true; |
| |
| // if array ,loop the array,find out all Objects,then find the results from the objects |
| for (int j = 0; j < root->Size(); j++) { |
| rapidjson::Value* json_elem = &((*root)[j]); |
| |
| if (json_elem->IsArray() || json_elem->IsNull()) { |
| continue; |
| } else { |
| if (!json_elem->IsObject()) { |
| continue; |
| } |
| if (!json_elem->HasMember(col.c_str())) { |
| if (is_insert_null) { // not found item, then insert a null object. |
| is_null = false; |
| rapidjson::Value nullObject(rapidjson::kNullType); |
| array_obj->PushBack(nullObject, mem_allocator); |
| } |
| continue; |
| } |
| rapidjson::Value* obj = &((*json_elem)[col.c_str()]); |
| if (obj->IsArray()) { |
| is_null = false; |
| for (int k = 0; k < obj->Size(); k++) { |
| array_obj->PushBack((*obj)[k], mem_allocator); |
| } |
| } else if (!obj->IsNull()) { |
| is_null = false; |
| array_obj->PushBack(*obj, mem_allocator); |
| } |
| } |
| } |
| |
| root = is_null ? &(array_obj->SetNull()) : array_obj; |
| } else if (root->IsObject()) { |
| if (!root->HasMember(col.c_str())) { |
| return nullptr; |
| } else { |
| root = &((*root)[col.c_str()]); |
| } |
| } else { |
| // root is not a nested type, return NULL |
| return nullptr; |
| } |
| } |
| |
| if (UNLIKELY(index != -1)) { |
| // judge the rapidjson:Value, which base the top's result, |
| // if not array return NULL;else get the index value from the array |
| if (root->IsArray()) { |
| if (root->IsNull()) { |
| return nullptr; |
| } else if (index == -2) { |
| // [*] |
| array_obj = static_cast<rapidjson::Value*>( |
| mem_allocator.Malloc(sizeof(rapidjson::Value))); |
| array_obj->SetArray(); |
| |
| for (int j = 0; j < root->Size(); j++) { |
| rapidjson::Value v; |
| v.CopyFrom((*root)[j], mem_allocator); |
| array_obj->PushBack(v, mem_allocator); |
| } |
| root = array_obj; |
| } else if (index >= root->Size()) { |
| return nullptr; |
| } else { |
| root = &((*root)[index]); |
| } |
| } else { |
| return nullptr; |
| } |
| } |
| } |
| return root; |
| } |
| |
| template <JsonFunctionType fntype> |
| rapidjson::Value* get_json_object(std::string_view json_string, std::string_view path_string, |
| rapidjson::Document* document) { |
| std::vector<JsonPath>* parsed_paths; |
| std::vector<JsonPath> tmp_parsed_paths; |
| |
| //Cannot use '\' as the last character, return NULL |
| if (path_string.back() == '\\') { |
| document->SetNull(); |
| return document; |
| } |
| |
| try { |
| #ifdef USE_LIBCPP |
| std::string s(path_string); |
| auto tok = get_json_token(s); |
| #else |
| auto tok = get_json_token(path_string); |
| #endif |
| std::vector<std::string> paths(tok.begin(), tok.end()); |
| get_parsed_paths(paths, &tmp_parsed_paths); |
| if (tmp_parsed_paths.empty()) { |
| return document; |
| } |
| } catch (boost::escaped_list_error&) { |
| // meet unknown escape sequence, example '$.name\k' |
| return document; |
| } |
| |
| parsed_paths = &tmp_parsed_paths; |
| |
| if (!(*parsed_paths)[0].is_valid) { |
| return document; |
| } |
| |
| if (UNLIKELY((*parsed_paths).size() == 1)) { |
| if (fntype == JSON_FUN_STRING) { |
| document->SetString(json_string.data(), |
| cast_set<rapidjson::SizeType>(json_string.size()), |
| document->GetAllocator()); |
| } else { |
| return document; |
| } |
| } |
| |
| document->Parse(json_string.data(), json_string.size()); |
| if (UNLIKELY(document->HasParseError())) { |
| // VLOG_CRITICAL << "Error at offset " << document->GetErrorOffset() << ": " |
| // << GetParseError_En(document->GetParseError()); |
| document->SetNull(); |
| return document; |
| } |
| |
| return match_value(*parsed_paths, document, document->GetAllocator()); |
| } |
| |
| template <typename NumberType> |
| struct GetJsonNumberType { |
| using ReturnType = typename NumberType::ReturnType; |
| using ColumnType = typename NumberType::ColumnType; |
| using Container = typename ColumnType::Container; |
| |
| static DataTypes get_variadic_argument_types_impl() { |
| return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()}; |
| } |
| |
| static void get_json_impl(rapidjson::Value*& root, const std::string_view& json_string, |
| const std::string_view& path_string, rapidjson::Document& document, |
| typename NumberType::T& res, UInt8& null_map) { |
| if constexpr (std::is_same_v<double, typename NumberType::T>) { |
| root = get_json_object<JSON_FUN_DOUBLE>(json_string, path_string, &document); |
| handle_result<double>(root, res, null_map); |
| } else if constexpr (std::is_same_v<int32_t, typename NumberType::T>) { |
| root = get_json_object<JSON_FUN_DOUBLE>(json_string, path_string, &document); |
| handle_result<int32_t>(root, res, null_map); |
| } else if constexpr (std::is_same_v<int64_t, typename NumberType::T>) { |
| root = get_json_object<JSON_FUN_DOUBLE>(json_string, path_string, &document); |
| handle_result<int64_t>(root, res, null_map); |
| } |
| } |
| static void vector_vector(FunctionContext* context, const ColumnString::Chars& ldata, |
| const ColumnString::Offsets& loffsets, |
| const ColumnString::Chars& rdata, |
| const ColumnString::Offsets& roffsets, Container& res, |
| NullMap& null_map) { |
| size_t size = loffsets.size(); |
| res.resize(size); |
| for (size_t i = 0; i < size; ++i) { |
| const char* l_raw_str = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]); |
| int l_str_size = loffsets[i] - loffsets[i - 1]; |
| const char* r_raw_str = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]); |
| int r_str_size = roffsets[i] - roffsets[i - 1]; |
| if (null_map[i]) { |
| res[i] = 0; |
| continue; |
| } |
| |
| std::string_view json_string(l_raw_str, l_str_size); |
| std::string_view path_string(r_raw_str, r_str_size); |
| rapidjson::Document document; |
| rapidjson::Value* root = nullptr; |
| |
| get_json_impl(root, json_string, path_string, document, res[i], null_map[i]); |
| } |
| } |
| static void vector_scalar(FunctionContext* context, const ColumnString::Chars& ldata, |
| const ColumnString::Offsets& loffsets, const StringRef& rdata, |
| Container& res, NullMap& null_map) { |
| size_t size = loffsets.size(); |
| res.resize(size); |
| std::string_view path_string(rdata.data, rdata.size); |
| for (size_t i = 0; i < size; ++i) { |
| const char* l_raw_str = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]); |
| int l_str_size = loffsets[i] - loffsets[i - 1]; |
| if (null_map[i]) { |
| res[i] = 0; |
| continue; |
| } |
| std::string_view json_string(l_raw_str, l_str_size); |
| |
| rapidjson::Document document; |
| rapidjson::Value* root = nullptr; |
| |
| get_json_impl(root, json_string, path_string, document, res[i], null_map[i]); |
| } |
| } |
| static void scalar_vector(FunctionContext* context, const StringRef& ldata, |
| const ColumnString::Chars& rdata, |
| const ColumnString::Offsets& roffsets, Container& res, |
| NullMap& null_map) { |
| size_t size = roffsets.size(); |
| res.resize(size); |
| std::string_view json_string(ldata.data, ldata.size); |
| for (size_t i = 0; i < size; ++i) { |
| const char* r_raw_str = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]); |
| int r_str_size = roffsets[i] - roffsets[i - 1]; |
| if (null_map[i]) { |
| res[i] = 0; |
| continue; |
| } |
| std::string_view path_string(r_raw_str, r_str_size); |
| rapidjson::Document document; |
| rapidjson::Value* root = nullptr; |
| |
| get_json_impl(root, json_string, path_string, document, res[i], null_map[i]); |
| } |
| } |
| |
| template <typename T> |
| requires std::is_same_v<double, T> |
| static void handle_result(rapidjson::Value* root, T& res, uint8_t& res_null) { |
| if (root == nullptr || root->IsNull()) { |
| res = 0; |
| res_null = 1; |
| } else if (root->IsInt()) { |
| res = root->GetInt(); |
| } else if (root->IsInt64()) { |
| res = static_cast<double>(root->GetInt64()); |
| } else if (root->IsDouble()) { |
| res = root->GetDouble(); |
| } else { |
| res_null = 1; |
| } |
| } |
| |
| template <typename T> |
| requires std::is_same_v<T, int32_t> |
| static void handle_result(rapidjson::Value* root, int32_t& res, uint8_t& res_null) { |
| if (root != nullptr && root->IsInt()) { |
| res = root->GetInt(); |
| } else { |
| res_null = 1; |
| } |
| } |
| |
| template <typename T> |
| requires std::is_same_v<T, int64_t> |
| static void handle_result(rapidjson::Value* root, int64_t& res, uint8_t& res_null) { |
| if (root != nullptr && root->IsInt64()) { |
| res = root->GetInt64(); |
| } else { |
| res_null = 1; |
| } |
| } |
| }; |
| |
| // Helper Class |
| struct JsonNumberTypeDouble { |
| using T = Float64; |
| using ReturnType = DataTypeFloat64; |
| using ColumnType = ColumnFloat64; |
| }; |
| |
| struct JsonNumberTypeInt { |
| using T = int32_t; |
| using ReturnType = DataTypeInt32; |
| using ColumnType = ColumnInt32; |
| }; |
| |
| struct JsonNumberTypeBigInt { |
| using T = int64_t; |
| using ReturnType = DataTypeInt64; |
| using ColumnType = ColumnInt64; |
| }; |
| |
| struct GetJsonDouble : public GetJsonNumberType<JsonNumberTypeDouble> { |
| static constexpr auto name = "get_json_double"; |
| using ReturnType = typename JsonNumberTypeDouble::ReturnType; |
| using ColumnType = typename JsonNumberTypeDouble::ColumnType; |
| }; |
| |
| struct GetJsonInt : public GetJsonNumberType<JsonNumberTypeInt> { |
| static constexpr auto name = "get_json_int"; |
| using ReturnType = typename JsonNumberTypeInt::ReturnType; |
| using ColumnType = typename JsonNumberTypeInt::ColumnType; |
| }; |
| |
| struct GetJsonBigInt : public GetJsonNumberType<JsonNumberTypeBigInt> { |
| static constexpr auto name = "get_json_bigint"; |
| using ReturnType = typename JsonNumberTypeBigInt::ReturnType; |
| using ColumnType = typename JsonNumberTypeBigInt::ColumnType; |
| }; |
| |
| struct GetJsonString { |
| static constexpr auto name = "get_json_string"; |
| using ReturnType = DataTypeString; |
| using ColumnType = ColumnString; |
| using Chars = ColumnString::Chars; |
| using Offsets = ColumnString::Offsets; |
| static void vector_vector(FunctionContext* context, const Chars& ldata, const Offsets& loffsets, |
| const Chars& rdata, const Offsets& roffsets, Chars& res_data, |
| Offsets& res_offsets, NullMap& null_map) { |
| size_t input_rows_count = loffsets.size(); |
| res_offsets.resize(input_rows_count); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (null_map[i]) { |
| StringOP::push_null_string(i, res_data, res_offsets, null_map); |
| continue; |
| } |
| int l_size = loffsets[i] - loffsets[i - 1]; |
| const auto l_raw = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]); |
| int r_size = roffsets[i] - roffsets[i - 1]; |
| const auto r_raw = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]); |
| |
| std::string_view json_string(l_raw, l_size); |
| std::string_view path_string(r_raw, r_size); |
| |
| execute_impl(json_string, path_string, res_data, res_offsets, null_map, i); |
| } |
| } |
| static void vector_scalar(FunctionContext* context, const Chars& ldata, const Offsets& loffsets, |
| const StringRef& rdata, Chars& res_data, Offsets& res_offsets, |
| NullMap& null_map) { |
| size_t input_rows_count = loffsets.size(); |
| res_offsets.resize(input_rows_count); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (null_map[i]) { |
| StringOP::push_null_string(i, res_data, res_offsets, null_map); |
| continue; |
| } |
| int l_size = loffsets[i] - loffsets[i - 1]; |
| const auto l_raw = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]); |
| |
| std::string_view json_string(l_raw, l_size); |
| std::string_view path_string(rdata.data, rdata.size); |
| |
| execute_impl(json_string, path_string, res_data, res_offsets, null_map, i); |
| } |
| } |
| static void scalar_vector(FunctionContext* context, const StringRef& ldata, const Chars& rdata, |
| const Offsets& roffsets, Chars& res_data, Offsets& res_offsets, |
| NullMap& null_map) { |
| size_t input_rows_count = roffsets.size(); |
| res_offsets.resize(input_rows_count); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (null_map[i]) { |
| StringOP::push_null_string(i, res_data, res_offsets, null_map); |
| continue; |
| } |
| int r_size = roffsets[i] - roffsets[i - 1]; |
| const auto r_raw = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]); |
| |
| std::string_view json_string(ldata.data, ldata.size); |
| std::string_view path_string(r_raw, r_size); |
| |
| execute_impl(json_string, path_string, res_data, res_offsets, null_map, i); |
| } |
| } |
| |
| static void execute_impl(const std::string_view& json_string, |
| const std::string_view& path_string, Chars& res_data, |
| Offsets& res_offsets, NullMap& null_map, size_t index_now) { |
| rapidjson::Document document; |
| rapidjson::Value* root = nullptr; |
| |
| root = get_json_object<JSON_FUN_STRING>(json_string, path_string, &document); |
| const int max_string_len = DEFAULT_MAX_JSON_SIZE; |
| |
| if (root == nullptr || root->IsNull()) { |
| StringOP::push_null_string(index_now, res_data, res_offsets, null_map); |
| } else if (root->IsString()) { |
| const auto ptr = root->GetString(); |
| size_t len = strnlen(ptr, max_string_len); |
| StringOP::push_value_string(std::string_view(ptr, len), index_now, res_data, |
| res_offsets); |
| } else { |
| rapidjson::StringBuffer buf; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buf); |
| root->Accept(writer); |
| |
| const auto ptr = buf.GetString(); |
| size_t len = strnlen(ptr, max_string_len); |
| StringOP::push_value_string(std::string_view(ptr, len), index_now, res_data, |
| res_offsets); |
| } |
| } |
| static DataTypes get_variadic_argument_types_impl() { |
| return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()}; |
| } |
| }; |
| |
| template <int flag> |
| struct JsonParser { |
| //string |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| value.SetString(data.data, cast_set<rapidjson::SizeType>(data.size), allocator); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'0'> { |
| // null |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| value.SetNull(); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'1'> { |
| // bool |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| DCHECK(data.size == 1 || strncmp(data.data, "true", 4) == 0 || |
| strncmp(data.data, "false", 5) == 0); |
| value.SetBool(*data.data == '1' || *data.data == 't'); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'2'> { |
| // int |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| value.SetInt(StringParser::string_to_int<int32_t>(data.data, data.size, &result)); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'3'> { |
| // double |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| value.SetDouble(StringParser::string_to_float<double>(data.data, data.size, &result)); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'4'> { |
| // time |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| // remove double quotes, "xxx" -> xxx |
| value.SetString(data.data + 1, cast_set<rapidjson::SizeType>(data.size - 2), allocator); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'5'> { |
| // bigint |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| value.SetInt64(StringParser::string_to_int<int64_t>(data.data, data.size, &result)); |
| } |
| }; |
| |
| template <> |
| struct JsonParser<'7'> { |
| // json string |
| static void update_value(StringParser::ParseResult& result, rapidjson::Value& value, |
| StringRef data, rapidjson::Document::AllocatorType& allocator) { |
| rapidjson::Document document; |
| JsonbValue* json_val = JsonbDocument::createValue(data.data, data.size); |
| convert_jsonb_to_rapidjson(*json_val, document, allocator); |
| value.CopyFrom(document, allocator); |
| } |
| }; |
| |
| template <int flag, typename Impl> |
| struct ExecuteReducer { |
| template <typename... TArgs> |
| static void run(TArgs&&... args) { |
| Impl::template execute_type<JsonParser<flag>>(std::forward<TArgs>(args)...); |
| } |
| }; |
| |
| struct FunctionJsonArrayImpl { |
| static constexpr auto name = "json_array"; |
| |
| static constexpr auto must_not_null = false; |
| template <int flag> |
| using Reducer = ExecuteReducer<flag, FunctionJsonArrayImpl>; |
| |
| static void execute_parse(const std::string& type_flags, |
| const std::vector<const ColumnString*>& data_columns, |
| std::vector<rapidjson::Value>& objects, |
| rapidjson::Document::AllocatorType& allocator, |
| const std::vector<const ColumnUInt8*>& nullmaps) { |
| for (int i = 0; i < data_columns.size() - 1; i++) { |
| constexpr_int_match<'0', '7', Reducer>::run(type_flags[i], objects, allocator, |
| data_columns[i], nullmaps[i]); |
| } |
| } |
| |
| template <typename TypeImpl> |
| static void execute_type(std::vector<rapidjson::Value>& objects, |
| rapidjson::Document::AllocatorType& allocator, |
| const ColumnString* data_column, const ColumnUInt8* nullmap) { |
| StringParser::ParseResult result; |
| rapidjson::Value value; |
| |
| for (int i = 0; i < objects.size(); i++) { |
| if (nullmap != nullptr && nullmap->get_data()[i]) { |
| JsonParser<'0'>::update_value(result, value, data_column->get_data_at(i), |
| allocator); |
| } else { |
| TypeImpl::update_value(result, value, data_column->get_data_at(i), allocator); |
| } |
| objects[i].PushBack(value, allocator); |
| } |
| } |
| }; |
| |
| struct FunctionJsonObjectImpl { |
| static constexpr auto name = "json_object"; |
| static constexpr auto must_not_null = true; |
| template <int flag> |
| using Reducer = ExecuteReducer<flag, FunctionJsonObjectImpl>; |
| |
| static void execute_parse(std::string type_flags, |
| const std::vector<const ColumnString*>& data_columns, |
| std::vector<rapidjson::Value>& objects, |
| rapidjson::Document::AllocatorType& allocator, |
| const std::vector<const ColumnUInt8*>& nullmaps) { |
| for (auto& array_object : objects) { |
| array_object.SetObject(); |
| } |
| |
| for (int i = 0; i + 1 < data_columns.size() - 1; i += 2) { |
| // last is for old type definition |
| constexpr_int_match<'0', '7', Reducer>::run(type_flags[i + 1], objects, allocator, |
| data_columns[i], data_columns[i + 1], |
| nullmaps[i + 1]); |
| } |
| } |
| |
| template <typename TypeImpl> |
| static void execute_type(std::vector<rapidjson::Value>& objects, |
| rapidjson::Document::AllocatorType& allocator, |
| const ColumnString* key_column, const ColumnString* value_column, |
| const ColumnUInt8* nullmap) { |
| StringParser::ParseResult result; |
| rapidjson::Value key; |
| rapidjson::Value value; |
| for (int i = 0; i < objects.size(); i++) { |
| JsonParser<'6'>::update_value(result, key, key_column->get_data_at(i), |
| allocator); // key always is string |
| if (nullmap != nullptr && nullmap->get_data()[i]) { |
| JsonParser<'0'>::update_value(result, value, value_column->get_data_at(i), |
| allocator); |
| } else { |
| TypeImpl::update_value(result, value, value_column->get_data_at(i), allocator); |
| } |
| objects[i].AddMember(key, value, allocator); |
| } |
| } |
| }; |
| |
| template <typename SpecificImpl> |
| class FunctionJsonAlwaysNotNullable : public IFunction { |
| public: |
| using IFunction::execute; |
| |
| static constexpr auto name = SpecificImpl::name; |
| |
| static FunctionPtr create() { |
| return std::make_shared<FunctionJsonAlwaysNotNullable<SpecificImpl>>(); |
| } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 0; } |
| |
| bool is_variadic() const override { return true; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return std::make_shared<DataTypeString>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| auto result_column = ColumnString::create(); |
| |
| std::vector<ColumnPtr> column_ptrs; // prevent converted column destruct |
| std::vector<const ColumnString*> data_columns; |
| std::vector<const ColumnUInt8*> nullmaps; |
| for (int i = 0; i < arguments.size(); i++) { |
| auto column = block.get_by_position(arguments[i]).column; |
| column_ptrs.push_back(column->convert_to_full_column_if_const()); |
| const ColumnNullable* col_nullable = |
| check_and_get_column<ColumnNullable>(column_ptrs.back().get()); |
| if (col_nullable) { |
| const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>( |
| col_nullable->get_null_map_column_ptr().get()); |
| nullmaps.push_back(col_nullmap); |
| const ColumnString* col = check_and_get_column<ColumnString>( |
| col_nullable->get_nested_column_ptr().get()); |
| data_columns.push_back(col); |
| } else { |
| nullmaps.push_back(nullptr); |
| data_columns.push_back(assert_cast<const ColumnString*>(column_ptrs.back().get())); |
| } |
| } |
| if (SpecificImpl::must_not_null) { |
| RETURN_IF_ERROR(check_keys_all_not_null(nullmaps, input_rows_count, arguments.size())); |
| } |
| execute(data_columns, *assert_cast<ColumnString*>(result_column.get()), input_rows_count, |
| nullmaps); |
| block.get_by_position(result).column = std::move(result_column); |
| return Status::OK(); |
| } |
| |
| static void execute(const std::vector<const ColumnString*>& data_columns, |
| ColumnString& result_column, size_t input_rows_count, |
| const std::vector<const ColumnUInt8*> nullmaps) { |
| std::string type_flags = data_columns.back()->get_data_at(0).to_string(); |
| |
| rapidjson::Document document; |
| rapidjson::Document::AllocatorType& allocator = document.GetAllocator(); |
| |
| std::vector<rapidjson::Value> objects; |
| for (int i = 0; i < input_rows_count; i++) { |
| objects.emplace_back(rapidjson::kArrayType); |
| } |
| |
| SpecificImpl::execute_parse(type_flags, data_columns, objects, allocator, nullmaps); |
| |
| rapidjson::StringBuffer buf; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buf); |
| |
| for (int i = 0; i < input_rows_count; i++) { |
| buf.Clear(); |
| writer.Reset(buf); |
| objects[i].Accept(writer); |
| result_column.insert_data(buf.GetString(), buf.GetSize()); |
| } |
| } |
| |
| static Status check_keys_all_not_null(const std::vector<const ColumnUInt8*>& nullmaps, |
| size_t size, size_t args) { |
| for (int i = 0; i < args; i += 2) { |
| const auto* null_map = nullmaps[i]; |
| if (null_map) { |
| auto not_null_num = |
| simd::count_zero_num((int8_t*)null_map->get_data().data(), size); |
| if (not_null_num < size) { |
| return Status::InternalError( |
| "function {} can not input null value , JSON documents may not contain " |
| "NULL member names. input size is {}:{}", |
| name, size, not_null_num); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| struct FunctionJsonQuoteImpl { |
| static constexpr auto name = "json_quote"; |
| |
| static DataTypePtr get_return_type_impl(const DataTypes& arguments) { |
| if (!arguments.empty() && arguments[0] && arguments[0]->is_nullable()) { |
| return make_nullable(std::make_shared<DataTypeString>()); |
| } |
| return std::make_shared<DataTypeString>(); |
| } |
| static void execute(const std::vector<const ColumnString*>& data_columns, |
| ColumnString& result_column, size_t input_rows_count) { |
| rapidjson::Document document; |
| rapidjson::Document::AllocatorType& allocator = document.GetAllocator(); |
| |
| rapidjson::Value value; |
| |
| rapidjson::StringBuffer buf; |
| |
| for (int i = 0; i < input_rows_count; i++) { |
| StringRef data = data_columns[0]->get_data_at(i); |
| value.SetString(data.data, cast_set<rapidjson::SizeType>(data.size), allocator); |
| |
| buf.Clear(); |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buf); |
| value.Accept(writer); |
| result_column.insert_data(buf.GetString(), buf.GetSize()); |
| } |
| } |
| }; |
| |
| struct JsonExtractName { |
| static constexpr auto name = "json_extract"; |
| }; |
| |
| struct JsonExtractNoQuotesName { |
| static constexpr auto name = "json_extract_no_quotes"; |
| }; |
| |
| template <typename Name, bool remove_quotes> |
| struct FunctionJsonExtractImpl { |
| static constexpr auto name = Name::name; |
| |
| static rapidjson::Value parse_json(const ColumnString* json_col, const ColumnString* path_col, |
| rapidjson::Document::AllocatorType& allocator, |
| const size_t row, const size_t col, |
| std::vector<bool>& column_is_consts) { |
| rapidjson::Value value; |
| rapidjson::Document document; |
| |
| const auto obj = json_col->get_data_at(index_check_const(row, column_is_consts[0])); |
| std::string_view json_string(obj.data, obj.size); |
| const auto path = path_col->get_data_at(index_check_const(row, column_is_consts[col])); |
| std::string_view path_string(path.data, path.size); |
| auto* root = get_json_object<JSON_FUN_STRING>(json_string, path_string, &document); |
| if (root != nullptr) { |
| value.CopyFrom(*root, allocator); |
| } |
| return value; |
| } |
| |
| static rapidjson::Value* get_document(const ColumnString* path_col, |
| rapidjson::Document* document, |
| std::vector<JsonPath>& parsed_paths, const size_t row, |
| bool is_const_column) { |
| const auto path = path_col->get_data_at(index_check_const(row, is_const_column)); |
| std::string_view path_string(path.data, path.size); |
| //Cannot use '\' as the last character, return NULL |
| if (path_string.back() == '\\') { |
| document->SetNull(); |
| return nullptr; |
| } |
| |
| #ifdef USE_LIBCPP |
| std::string s(path_string); |
| auto tok = get_json_token(s); |
| #else |
| auto tok = get_json_token(path_string); |
| #endif |
| // TODO: here maybe could use std::vector<std::string_view> or std::span |
| std::vector<std::string> paths(tok.begin(), tok.end()); |
| get_parsed_paths(paths, &parsed_paths); |
| if (parsed_paths.empty()) { |
| return nullptr; |
| } |
| if (!(parsed_paths)[0].is_valid) { |
| return nullptr; |
| } |
| return document; |
| } |
| |
| static void execute(const std::vector<const ColumnString*>& data_columns, |
| ColumnString& result_column, NullMap& null_map, size_t input_rows_count, |
| std::vector<bool>& column_is_consts) { |
| rapidjson::Document document; |
| rapidjson::Document::AllocatorType& allocator = document.GetAllocator(); |
| rapidjson::StringBuffer buf; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buf); |
| const auto* json_col = data_columns[0]; |
| auto insert_result_lambda = [&](rapidjson::Value& value, size_t row) { |
| if (value.IsNull()) { |
| null_map[row] = 1; |
| result_column.insert_default(); |
| } else { |
| // Check if the value is a string |
| if (value.IsString() && remove_quotes) { |
| // Get the string value without quotes |
| const char* str_ptr = value.GetString(); |
| size_t len = value.GetStringLength(); |
| // Insert without quotes |
| result_column.insert_data(str_ptr, len); |
| } else { |
| // Write value as string for other types |
| buf.Clear(); |
| writer.Reset(buf); |
| value.Accept(writer); |
| result_column.insert_data(buf.GetString(), buf.GetSize()); |
| } |
| } |
| }; |
| if (data_columns.size() == 2) { |
| rapidjson::Value value; |
| if (column_is_consts[1]) { |
| std::vector<JsonPath> parsed_paths; |
| auto* root = get_document(data_columns[1], &document, parsed_paths, 0, |
| column_is_consts[1]); |
| for (size_t row = 0; row < input_rows_count; row++) { |
| if (root != nullptr) { |
| const auto& obj = json_col->get_data_at(row); |
| std::string_view json_string(obj.data, obj.size); |
| if (UNLIKELY((parsed_paths).size() == 1)) { |
| document.SetString(json_string.data(), |
| cast_set<rapidjson::SizeType>(json_string.size()), |
| allocator); |
| } |
| document.Parse(json_string.data(), json_string.size()); |
| if (UNLIKELY(document.HasParseError())) { |
| null_map[row] = 1; |
| result_column.insert_default(); |
| continue; |
| } |
| auto* root_val = match_value(parsed_paths, &document, allocator); |
| if (root_val != nullptr) { |
| value.CopyFrom(*root_val, allocator); |
| } else { |
| rapidjson::Value tmp; |
| value.Swap(tmp); |
| } |
| } |
| insert_result_lambda(value, row); |
| } |
| } else { |
| for (size_t row = 0; row < input_rows_count; row++) { |
| value = parse_json(json_col, data_columns[1], allocator, row, 1, |
| column_is_consts); |
| insert_result_lambda(value, row); |
| } |
| } |
| |
| } else { |
| rapidjson::Value value; |
| value.SetArray(); |
| value.Reserve(cast_set<rapidjson::SizeType>(data_columns.size() - 1), allocator); |
| for (size_t row = 0; row < input_rows_count; row++) { |
| value.Clear(); |
| for (size_t col = 1; col < data_columns.size(); ++col) { |
| value.PushBack(parse_json(json_col, data_columns[col], allocator, row, col, |
| column_is_consts), |
| allocator); |
| } |
| insert_result_lambda(value, row); |
| } |
| } |
| } |
| }; |
| |
| template <typename Impl> |
| class FunctionJson : public IFunction { |
| public: |
| static constexpr auto name = Impl::name; |
| |
| static FunctionPtr create() { return std::make_shared<FunctionJson<Impl>>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 0; } |
| |
| bool is_variadic() const override { return true; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return Impl::get_return_type_impl(arguments); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| auto result_column = ColumnString::create(); |
| |
| std::vector<ColumnPtr> column_ptrs; // prevent converted column destruct |
| std::vector<const ColumnString*> data_columns; |
| for (int i = 0; i < arguments.size(); i++) { |
| column_ptrs.push_back( |
| block.get_by_position(arguments[i]).column->convert_to_full_column_if_const()); |
| data_columns.push_back(assert_cast<const ColumnString*>(column_ptrs.back().get())); |
| } |
| |
| Impl::execute(data_columns, *assert_cast<ColumnString*>(result_column.get()), |
| input_rows_count); |
| block.get_by_position(result).column = std::move(result_column); |
| return Status::OK(); |
| } |
| }; |
| |
| template <typename Impl> |
| class FunctionJsonNullable : public IFunction { |
| public: |
| static constexpr auto name = Impl::name; |
| static FunctionPtr create() { return std::make_shared<FunctionJsonNullable<Impl>>(); } |
| String get_name() const override { return name; } |
| size_t get_number_of_arguments() const override { return 0; } |
| bool is_variadic() const override { return true; } |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return make_nullable(std::make_shared<DataTypeString>()); |
| } |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| auto result_column = ColumnString::create(); |
| auto null_map = ColumnUInt8::create(input_rows_count, 0); |
| std::vector<const ColumnString*> data_columns; |
| std::vector<bool> column_is_consts; |
| for (int i = 0; i < arguments.size(); i++) { |
| ColumnPtr arg_col; |
| bool arg_const; |
| std::tie(arg_col, arg_const) = |
| unpack_if_const(block.get_by_position(arguments[i]).column); |
| column_is_consts.push_back(arg_const); |
| data_columns.push_back(assert_cast<const ColumnString*>(arg_col.get())); |
| } |
| Impl::execute(data_columns, *assert_cast<ColumnString*>(result_column.get()), |
| null_map->get_data(), input_rows_count, column_is_consts); |
| block.replace_by_position( |
| result, ColumnNullable::create(std::move(result_column), std::move(null_map))); |
| return Status::OK(); |
| } |
| }; |
| |
| using FunctionGetJsonDouble = FunctionBinaryStringOperateToNullType<GetJsonDouble>; |
| using FunctionGetJsonInt = FunctionBinaryStringOperateToNullType<GetJsonInt>; |
| using FunctionGetJsonBigInt = FunctionBinaryStringOperateToNullType<GetJsonBigInt>; |
| using FunctionGetJsonString = FunctionBinaryStringOperateToNullType<GetJsonString>; |
| |
| class FunctionJsonValid : public IFunction { |
| public: |
| static constexpr auto name = "json_valid"; |
| static FunctionPtr create() { return std::make_shared<FunctionJsonValid>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return make_nullable(std::make_shared<DataTypeInt32>()); |
| } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const IColumn& col_from = *(block.get_by_position(arguments[0]).column); |
| |
| auto null_map = ColumnUInt8::create(input_rows_count, 0); |
| |
| const ColumnString* col_from_string = check_and_get_column<ColumnString>(col_from); |
| if (auto* nullable = check_and_get_column<ColumnNullable>(col_from)) { |
| col_from_string = |
| check_and_get_column<ColumnString>(*nullable->get_nested_column_ptr()); |
| } |
| |
| if (!col_from_string) { |
| return Status::RuntimeError("Illegal column {} should be ColumnString", |
| col_from.get_name()); |
| } |
| |
| auto col_to = ColumnInt32::create(); |
| auto& vec_to = col_to->get_data(); |
| size_t size = col_from.size(); |
| vec_to.resize(size); |
| |
| // parser can be reused for performance |
| JsonBinaryValue jsonb_value; |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (col_from.is_null_at(i)) { |
| null_map->get_data()[i] = 1; |
| vec_to[i] = 0; |
| continue; |
| } |
| |
| const auto& val = col_from_string->get_data_at(i); |
| if (jsonb_value.from_json_string(val.data, cast_set<unsigned int>(val.size)).ok()) { |
| vec_to[i] = 1; |
| } else { |
| vec_to[i] = 0; |
| } |
| } |
| |
| block.replace_by_position(result, |
| ColumnNullable::create(std::move(col_to), std::move(null_map))); |
| |
| return Status::OK(); |
| } |
| }; |
| |
| class FunctionJsonContains : public IFunction { |
| public: |
| static constexpr auto name = "json_contains"; |
| static FunctionPtr create() { return std::make_shared<FunctionJsonContains>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 3; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return make_nullable(std::make_shared<DataTypeUInt8>()); |
| } |
| |
| DataTypes get_variadic_argument_types_impl() const override { |
| return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>(), |
| std::make_shared<DataTypeString>()}; |
| } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| bool json_contains_object(const rapidjson::Value& target, |
| const rapidjson::Value& search_value) const { |
| if (!target.IsObject() || !search_value.IsObject()) { |
| return false; |
| } |
| |
| for (auto itr = search_value.MemberBegin(); itr != search_value.MemberEnd(); ++itr) { |
| if (!target.HasMember(itr->name) || !json_contains(target[itr->name], itr->value)) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| bool json_contains_array(const rapidjson::Value& target, |
| const rapidjson::Value& search_value) const { |
| if (!target.IsArray() || !search_value.IsArray()) { |
| return false; |
| } |
| |
| for (auto itr = search_value.Begin(); itr != search_value.End(); ++itr) { |
| bool found = false; |
| for (auto target_itr = target.Begin(); target_itr != target.End(); ++target_itr) { |
| if (json_contains(*target_itr, *itr)) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| bool json_contains(const rapidjson::Value& target, const rapidjson::Value& search_value) const { |
| if (target == search_value) { |
| return true; |
| } |
| |
| if (target.IsObject() && search_value.IsObject()) { |
| return json_contains_object(target, search_value); |
| } |
| |
| if (target.IsArray() && search_value.IsArray()) { |
| return json_contains_array(target, search_value); |
| } |
| |
| return false; |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const IColumn& col_json = *(block.get_by_position(arguments[0]).column); |
| const IColumn& col_search = *(block.get_by_position(arguments[1]).column); |
| const IColumn& col_path = *(block.get_by_position(arguments[2]).column); |
| |
| auto null_map = ColumnUInt8::create(input_rows_count, 0); |
| |
| const ColumnString* col_json_string = check_and_get_column<ColumnString>(col_json); |
| const ColumnString* col_search_string = check_and_get_column<ColumnString>(col_search); |
| const ColumnString* col_path_string = check_and_get_column<ColumnString>(col_path); |
| |
| if (!col_json_string || !col_search_string || !col_path_string) { |
| return Status::RuntimeError("Illegal column should be ColumnString"); |
| } |
| |
| auto col_to = ColumnUInt8::create(); |
| auto& vec_to = col_to->get_data(); |
| size_t size = col_json.size(); |
| vec_to.resize(size); |
| |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (col_json.is_null_at(i) || col_search.is_null_at(i) || col_path.is_null_at(i)) { |
| null_map->get_data()[i] = 1; |
| vec_to[i] = 0; |
| continue; |
| } |
| |
| const auto& json_val = col_json_string->get_data_at(i); |
| const auto& search_val = col_search_string->get_data_at(i); |
| const auto& path_val = col_path_string->get_data_at(i); |
| |
| std::string_view json_string(json_val.data, json_val.size); |
| std::string_view search_string(search_val.data, search_val.size); |
| std::string_view path_string(path_val.data, path_val.size); |
| |
| rapidjson::Document document; |
| auto target_val = get_json_object<JSON_FUN_STRING>(json_string, path_string, &document); |
| if (target_val == nullptr || target_val->IsNull()) { |
| vec_to[i] = 0; |
| } else { |
| rapidjson::Document search_doc; |
| search_doc.Parse(search_string.data(), search_string.size()); |
| if (json_contains(*target_val, search_doc)) { |
| vec_to[i] = 1; |
| } else { |
| vec_to[i] = 0; |
| } |
| } |
| } |
| |
| block.replace_by_position(result, |
| ColumnNullable::create(std::move(col_to), std::move(null_map))); |
| |
| return Status::OK(); |
| } |
| }; |
| |
| class FunctionJsonUnquote : public IFunction { |
| public: |
| static constexpr auto name = "json_unquote"; |
| static FunctionPtr create() { return std::make_shared<FunctionJsonUnquote>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 1; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| return make_nullable(std::make_shared<DataTypeString>()); |
| } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| const IColumn& col_from = *(block.get_by_position(arguments[0]).column); |
| |
| auto null_map = ColumnUInt8::create(input_rows_count, 0); |
| |
| const ColumnString* col_from_string = check_and_get_column<ColumnString>(col_from); |
| if (auto* nullable = check_and_get_column<ColumnNullable>(col_from)) { |
| col_from_string = |
| check_and_get_column<ColumnString>(*nullable->get_nested_column_ptr()); |
| } |
| |
| if (!col_from_string) { |
| return Status::RuntimeError("Illegal column {} should be ColumnString", |
| col_from.get_name()); |
| } |
| |
| auto col_to = ColumnString::create(); |
| col_to->reserve(input_rows_count); |
| |
| // parser can be reused for performance |
| rapidjson::Document document; |
| for (size_t i = 0; i < input_rows_count; ++i) { |
| if (col_from.is_null_at(i)) { |
| null_map->get_data()[i] = 1; |
| col_to->insert_data(nullptr, 0); |
| continue; |
| } |
| |
| const auto& json_str = col_from_string->get_data_at(i); |
| if (json_str.size < 2 || json_str.data[0] != '"' || |
| json_str.data[json_str.size - 1] != '"') { |
| // non-quoted string |
| col_to->insert_data(json_str.data, json_str.size); |
| } else { |
| document.Parse(json_str.data, json_str.size); |
| if (document.HasParseError() || !document.IsString()) { |
| return Status::RuntimeError( |
| fmt::format("Invalid JSON text in argument 1 to function {}: {}", name, |
| std::string_view(json_str.data, json_str.size))); |
| } |
| col_to->insert_data(document.GetString(), document.GetStringLength()); |
| } |
| } |
| |
| block.replace_by_position(result, |
| ColumnNullable::create(std::move(col_to), std::move(null_map))); |
| |
| return Status::OK(); |
| } |
| }; |
| |
| enum class JsonModifyType { |
| JSON_INSERT = 0, |
| JSON_REPLACE, |
| JSON_SET, |
| }; |
| |
| struct FunctionJsonInsert { |
| static constexpr auto name = "json_insert"; |
| static constexpr auto modify_type = JsonModifyType::JSON_INSERT; |
| }; |
| |
| struct FunctionJsonReplace { |
| static constexpr auto name = "json_replace"; |
| static constexpr auto modify_type = JsonModifyType::JSON_REPLACE; |
| }; |
| struct FunctionJsonSet { |
| static constexpr auto name = "json_set"; |
| static constexpr auto modify_type = JsonModifyType::JSON_SET; |
| }; |
| |
| template <typename Kind> |
| class FunctionJsonModifyImpl : public IFunction { |
| private: |
| // T = std::vector<std::string> |
| // TODO: update RE2 to support std::vector<std::string_view> |
| // if path is not a valid path expression or contains |
| // a * wildcard, return runtime error. |
| template <typename T> |
| Status get_parsed_paths_with_status(const T& path_exprs, |
| std::vector<JsonPath>* parsed_paths) const { |
| if (UNLIKELY(path_exprs.empty())) { |
| return Status::RuntimeError("json path empty function {}", get_name()); |
| } |
| |
| if (path_exprs[0] != "$") { |
| // keep same behaviour with get_parsed_paths(), |
| // '$[0]' is invalid path, '$.[0]' is valid |
| return Status::RuntimeError( |
| "Invalid JSON path expression. The error is around character position 1"); |
| } |
| parsed_paths->emplace_back("$", -1, true); |
| |
| for (int i = 1; i < path_exprs.size(); i++) { |
| std::string col; |
| std::string index; |
| if (UNLIKELY(!RE2::FullMatch(path_exprs[i], JSON_PATTERN, &col, &index))) { |
| return Status::RuntimeError( |
| "Invalid JSON path expression. The error is around character position {}", |
| i + 1); |
| } else { |
| int idx = -1; |
| if (!index.empty()) { |
| if (index == "*") { |
| return Status::RuntimeError( |
| "In this situation, path expressions may not contain the * token"); |
| } else { |
| idx = atoi(index.c_str()); |
| } |
| } |
| parsed_paths->emplace_back(col, idx, true); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status get_parsed_path_columns(std::vector<std::vector<std::vector<JsonPath>>>& json_paths, |
| const std::vector<const ColumnString*>& data_columns, |
| size_t input_rows_count, |
| std::vector<bool>& column_is_consts) const { |
| for (auto col = 1; col + 1 < data_columns.size() - 1; col += 2) { |
| json_paths.emplace_back(std::vector<std::vector<JsonPath>>()); |
| for (auto row = 0; row < input_rows_count; row++) { |
| const auto path = data_columns[col]->get_data_at( |
| index_check_const(row, column_is_consts[col])); |
| std::string_view path_string(path.data, path.size); |
| std::vector<JsonPath> parsed_paths; |
| |
| #ifdef USE_LIBCPP |
| std::string s(path_string); |
| auto tok = get_json_token(s); |
| #else |
| auto tok = get_json_token(path_string); |
| #endif |
| std::vector<std::string> paths(tok.begin(), tok.end()); |
| RETURN_IF_ERROR(get_parsed_paths_with_status(paths, &parsed_paths)); |
| json_paths[col / 2].emplace_back(parsed_paths); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| public: |
| static constexpr auto name = Kind::name; |
| |
| static FunctionPtr create() { return std::make_shared<FunctionJsonModifyImpl<Kind>>(); } |
| |
| String get_name() const override { return name; } |
| |
| size_t get_number_of_arguments() const override { return 0; } |
| |
| bool is_variadic() const override { return true; } |
| |
| bool use_default_implementation_for_nulls() const override { return false; } |
| |
| DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
| bool is_nullable = false; |
| // arguments: (json_str, path, val[, path, val...], type_flag) |
| for (auto col = 0; col < arguments.size() - 1; col += 1) { |
| if (arguments[col]->is_nullable()) { |
| is_nullable = true; |
| break; |
| } |
| } |
| return is_nullable ? make_nullable(std::make_shared<DataTypeString>()) |
| : std::make_shared<DataTypeString>(); |
| } |
| |
| Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
| uint32_t result, size_t input_rows_count) const override { |
| auto result_column = ColumnString::create(); |
| bool is_nullable = false; |
| ColumnUInt8::MutablePtr ret_null_map = nullptr; |
| ColumnUInt8::Container* ret_null_map_data = nullptr; |
| |
| std::vector<const ColumnString*> data_columns; |
| std::vector<const ColumnUInt8*> nullmaps; |
| std::vector<bool> column_is_consts; |
| for (int i = 0; i < arguments.size(); i++) { |
| ColumnPtr arg_col; |
| bool arg_const; |
| std::tie(arg_col, arg_const) = |
| unpack_if_const(block.get_by_position(arguments[i]).column); |
| const auto* col_nullable = check_and_get_column<ColumnNullable>(arg_col.get()); |
| column_is_consts.push_back(arg_const); |
| if (col_nullable) { |
| if (!is_nullable) { |
| is_nullable = true; |
| } |
| const ColumnUInt8* col_nullmap = assert_cast<const ColumnUInt8*>( |
| col_nullable->get_null_map_column_ptr().get()); |
| nullmaps.push_back(col_nullmap); |
| const ColumnString* col = assert_cast<const ColumnString*>( |
| col_nullable->get_nested_column_ptr().get()); |
| data_columns.push_back(col); |
| } else { |
| nullmaps.push_back(nullptr); |
| data_columns.push_back(assert_cast<const ColumnString*>(arg_col.get())); |
| } |
| } |
| //*assert_cast<ColumnUInt8*>(ret_null_map.get()) |
| if (is_nullable) { |
| ret_null_map = ColumnUInt8::create(input_rows_count, 0); |
| ret_null_map_data = &(ret_null_map->get_data()); |
| } |
| RETURN_IF_ERROR(execute_process( |
| data_columns, *assert_cast<ColumnString*>(result_column.get()), input_rows_count, |
| nullmaps, is_nullable, ret_null_map_data, column_is_consts)); |
| |
| if (is_nullable) { |
| block.replace_by_position(result, ColumnNullable::create(std::move(result_column), |
| std::move(ret_null_map))); |
| } else { |
| block.get_by_position(result).column = std::move(result_column); |
| } |
| return Status::OK(); |
| } |
| |
| Status execute_process(const std::vector<const ColumnString*>& data_columns, |
| ColumnString& result_column, size_t input_rows_count, |
| const std::vector<const ColumnUInt8*> nullmaps, bool is_nullable, |
| ColumnUInt8::Container* ret_null_map_data, |
| std::vector<bool>& column_is_consts) const { |
| std::string type_flags = data_columns.back()->get_data_at(0).to_string(); |
| |
| std::vector<rapidjson::Document> objects; |
| for (auto row = 0; row < input_rows_count; row++) { |
| objects.emplace_back(rapidjson::kNullType); |
| const auto json_doc = |
| data_columns[0]->get_data_at(index_check_const(row, column_is_consts[0])); |
| std::string_view json_str(json_doc.data, json_doc.size); |
| objects[row].Parse(json_str.data(), json_str.size()); |
| if (UNLIKELY(objects[row].HasParseError())) { |
| return Status::RuntimeError("invalid json str {}: function {}", json_str, |
| get_name()); |
| } |
| } |
| |
| std::vector<std::vector<std::vector<JsonPath>>> json_paths; |
| RETURN_IF_ERROR(get_parsed_path_columns(json_paths, data_columns, input_rows_count, |
| column_is_consts)); |
| |
| execute_parse(type_flags, data_columns, objects, json_paths, nullmaps, column_is_consts); |
| |
| rapidjson::StringBuffer buf; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buf); |
| |
| for (int i = 0; i < input_rows_count; i++) { |
| buf.Clear(); |
| writer.Reset(buf); |
| objects[i].Accept(writer); |
| if (is_nullable && objects[i].IsNull()) { |
| (*ret_null_map_data)[i] = 1; |
| } |
| result_column.insert_data(buf.GetString(), buf.GetSize()); |
| } |
| return Status::OK(); |
| } |
| |
| template <int flag> |
| using Reducer = ExecuteReducer<flag, FunctionJsonModifyImpl<Kind>>; |
| |
| static void execute_parse(std::string type_flags, |
| const std::vector<const ColumnString*>& data_columns, |
| std::vector<rapidjson::Document>& objects, |
| std::vector<std::vector<std::vector<JsonPath>>>& json_paths, |
| const std::vector<const ColumnUInt8*>& nullmaps, |
| std::vector<bool>& column_is_consts) { |
| for (auto col = 1; col + 1 < data_columns.size() - 1; col += 2) { |
| constexpr_int_match<'0', '7', Reducer>::run( |
| type_flags[col + 1], objects, json_paths[col / 2], data_columns[col + 1], |
| nullmaps[col + 1], column_is_consts[col + 1]); |
| } |
| } |
| |
| static void modify_value(const std::vector<JsonPath>& parsed_paths, rapidjson::Value* document, |
| rapidjson::Document::AllocatorType& mem_allocator, bool is_insert, |
| bool is_replace, rapidjson::Value* value) { |
| rapidjson::Value* root = document; |
| rapidjson::Value key; |
| |
| auto i = 1; |
| for (; i < parsed_paths.size(); i++) { |
| if (root->IsNull()) { |
| return; |
| } |
| const std::string& col = parsed_paths[i].key; |
| int index = parsed_paths[i].idx; |
| if (LIKELY(!col.empty())) { |
| if (root->IsObject()) { |
| if (!root->HasMember(col.c_str())) { |
| break; |
| } else { |
| root = &((*root)[col.c_str()]); |
| } |
| } else { |
| // not object |
| return; |
| } |
| } |
| if (UNLIKELY(index != -1)) { |
| if (root->IsArray()) { |
| if (index >= root->Size()) { |
| // array append new value |
| if (is_insert && i + 1 == parsed_paths.size()) { |
| root->PushBack(*value, mem_allocator); |
| } |
| return; |
| } else { |
| root = &((*root)[index]); |
| } |
| } else { |
| if (i + 1 == parsed_paths.size()) { |
| // replace, example: |
| // json_replace({"a": 1}, '$.[0]', null); |
| // output: null |
| if (is_replace && index == 0) { |
| *root = *value; |
| return; |
| } |
| // convert to array, example: |
| // json_insert({"a": 1}, '$.[1]', 3); |
| // output: [{"a": 1}, 3] |
| if (is_insert && index > 0) { |
| key.SetArray(); |
| root->Swap(key); |
| root->PushBack(key, mem_allocator); |
| root->PushBack(*value, mem_allocator); |
| } |
| } |
| return; |
| } |
| } |
| } |
| |
| if (is_insert && i + 1 == parsed_paths.size()) { |
| if (LIKELY(root->IsObject())) { |
| // object insert new value |
| const std::string& col = parsed_paths[i].key; |
| int index = parsed_paths[i].idx; |
| if (LIKELY(!col.empty() && index == -1)) { |
| key.SetString(col.c_str(), mem_allocator); |
| root->AddMember(key, *value, mem_allocator); |
| } |
| } |
| } else if (is_replace && i == parsed_paths.size()) { |
| *root = *value; |
| } |
| } |
| |
| template <typename TypeImpl> |
| static void execute_type(std::vector<rapidjson::Document>& objects, |
| std::vector<std::vector<JsonPath>>& paths_column, |
| const ColumnString* value_column, const ColumnUInt8* nullmap, |
| bool column_is_const) { |
| StringParser::ParseResult result; |
| rapidjson::Value value; |
| for (auto row = 0; row < objects.size(); row++) { |
| std::vector<JsonPath>* parsed_paths = &paths_column[row]; |
| |
| if (nullmap != nullptr && nullmap->get_data()[row]) { |
| JsonParser<'0'>::update_value( |
| result, value, |
| value_column->get_data_at(index_check_const(row, column_is_const)), |
| objects[row].GetAllocator()); |
| } else { |
| TypeImpl::update_value( |
| result, value, |
| value_column->get_data_at(index_check_const(row, column_is_const)), |
| objects[row].GetAllocator()); |
| } |
| |
| switch (Kind::modify_type) { |
| case JsonModifyType::JSON_INSERT: |
| modify_value(*parsed_paths, &objects[row], objects[row].GetAllocator(), true, false, |
| &value); |
| break; |
| case JsonModifyType::JSON_REPLACE: |
| modify_value(*parsed_paths, &objects[row], objects[row].GetAllocator(), false, true, |
| &value); |
| break; |
| case JsonModifyType::JSON_SET: |
| modify_value(*parsed_paths, &objects[row], objects[row].GetAllocator(), true, true, |
| &value); |
| break; |
| } |
| } |
| } |
| }; |
| |
| void register_function_json(SimpleFunctionFactory& factory) { |
| factory.register_function<FunctionGetJsonInt>(); |
| factory.register_function<FunctionGetJsonBigInt>(); |
| factory.register_function<FunctionGetJsonDouble>(); |
| factory.register_function<FunctionGetJsonString>(); |
| factory.register_function<FunctionJsonUnquote>(); |
| |
| factory.register_function<FunctionJsonAlwaysNotNullable<FunctionJsonArrayImpl>>(); |
| factory.register_function<FunctionJsonAlwaysNotNullable<FunctionJsonObjectImpl>>(); |
| factory.register_function<FunctionJson<FunctionJsonQuoteImpl>>(); |
| factory.register_function< |
| FunctionJsonNullable<FunctionJsonExtractImpl<JsonExtractName, false>>>(); |
| factory.register_function< |
| FunctionJsonNullable<FunctionJsonExtractImpl<JsonExtractNoQuotesName, true>>>(); |
| |
| factory.register_function<FunctionJsonValid>(); |
| factory.register_function<FunctionJsonContains>(); |
| |
| factory.register_function<FunctionJsonModifyImpl<FunctionJsonInsert>>(); |
| factory.register_function<FunctionJsonModifyImpl<FunctionJsonReplace>>(); |
| factory.register_function<FunctionJsonModifyImpl<FunctionJsonSet>>(); |
| } |
| |
| } // namespace doris::vectorized |