blob: 4594fd6e668faafd6b6c40488fcf299b17705e03 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>
#include <algorithm>
#include <cstdlib>
#include <memory>
#include <string>
#include <string_view>
#include <tuple>
#include <type_traits>
#include <utility>
#include <variant>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "runtime/define_primitive_type.h"
#include "runtime/jsonb_value.h"
#include "runtime/primitive_type.h"
#include "udf/udf.h"
#include "util/jsonb_document.h"
#include "util/jsonb_stream.h"
#include "util/jsonb_utils.h"
#include "util/jsonb_writer.h"
#include "util/simd/bits.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/custom_allocator.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_jsonb.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_string.h"
#include "vec/functions/function.h"
#include "vec/functions/like.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/utils/stringop_substring.h"
#include "vec/utils/template_helpers.hpp"
#include "vec/utils/util.hpp"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
enum class NullalbeMode { NULLABLE = 0, FOLLOW_INPUT };
enum class JsonbParseErrorMode { FAIL = 0, RETURN_NULL, RETURN_VALUE };
// func(string,string) -> json
template <NullalbeMode nullable_mode, JsonbParseErrorMode parse_error_handle_mode>
class FunctionJsonbParseBase : public IFunction {
private:
struct FunctionJsonbParseState {
StringRef default_value;
JsonBinaryValue default_value_parser;
bool has_const_default_value = false;
bool default_is_null = false;
};
public:
static constexpr auto name = "json_parse";
static constexpr auto alias = "jsonb_parse";
static FunctionPtr create() { return std::make_shared<FunctionJsonbParseBase>(); }
String get_name() const override {
String error_mode;
switch (parse_error_handle_mode) {
case JsonbParseErrorMode::FAIL:
break;
case JsonbParseErrorMode::RETURN_NULL:
error_mode = "_error_to_null";
break;
case JsonbParseErrorMode::RETURN_VALUE:
error_mode = "_error_to_value";
break;
}
return name + error_mode;
}
bool is_variadic() const override {
return parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE;
}
size_t get_number_of_arguments() const override {
switch (parse_error_handle_mode) {
case JsonbParseErrorMode::FAIL:
return 1;
case JsonbParseErrorMode::RETURN_NULL:
return 1;
case JsonbParseErrorMode::RETURN_VALUE:
return 0;
}
}
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
bool is_nullable = false;
switch (nullable_mode) {
case NullalbeMode::NULLABLE:
is_nullable = true;
break;
case NullalbeMode::FOLLOW_INPUT: {
for (auto arg : arguments) {
is_nullable |= arg->is_nullable();
}
break;
}
}
return is_nullable ? make_nullable(std::make_shared<DataTypeJsonb>())
: std::make_shared<DataTypeJsonb>();
}
bool use_default_implementation_for_nulls() const override { return false; }
Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override {
if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
std::shared_ptr<FunctionJsonbParseState> state =
std::make_shared<FunctionJsonbParseState>();
context->set_function_state(FunctionContext::FRAGMENT_LOCAL, state);
}
if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE) {
if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
auto* state = reinterpret_cast<FunctionJsonbParseState*>(
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
if (state) {
if (context->get_num_args() == 2) {
if (context->is_col_constant(1)) {
const auto default_value_col = context->get_constant_col(1)->column_ptr;
if (default_value_col->is_null_at(0)) {
state->default_is_null = true;
} else {
const auto& default_value = default_value_col->get_data_at(0);
state->default_value = default_value;
state->has_const_default_value = true;
}
}
} else if (context->get_num_args() == 1) {
RETURN_IF_ERROR(
state->default_value_parser.from_json_string(std::string("{}")));
state->default_value = StringRef(state->default_value_parser.value(),
state->default_value_parser.size());
state->has_const_default_value = true;
}
}
}
if (context->get_num_args() != 1 && context->get_num_args() != 2) {
return Status::InvalidArgument(
"{} function should have 1 or 2 arguments, "
"but got {}",
get_name(), context->get_num_args());
}
}
return Status::OK();
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
auto&& [col_from, col_from_is_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
if (col_from_is_const && col_from->is_null_at(0)) {
auto col_str = ColumnString::create();
col_str->insert_default();
auto null_map = ColumnUInt8::create(1, 1);
auto nullable_col = ColumnNullable::create(std::move(col_str), std::move(null_map));
block.get_by_position(result).column =
ColumnConst::create(std::move(nullable_col), input_rows_count);
return Status::OK();
}
auto null_map = ColumnUInt8::create(0, 0);
bool is_nullable = false;
switch (nullable_mode) {
case NullalbeMode::NULLABLE: {
is_nullable = true;
break;
}
case NullalbeMode::FOLLOW_INPUT: {
for (auto arg : arguments) {
is_nullable |= block.get_by_position(arg).type->is_nullable();
}
break;
}
}
if (is_nullable) {
null_map = ColumnUInt8::create(input_rows_count, 0);
}
const ColumnString* col_from_string = nullptr;
if (col_from->is_nullable()) {
const auto& nullable_col = assert_cast<const ColumnNullable&>(*col_from);
VectorizedUtils::update_null_map(null_map->get_data(),
nullable_col.get_null_map_data());
col_from_string =
assert_cast<const ColumnString*>(nullable_col.get_nested_column_ptr().get());
} else {
col_from_string = assert_cast<const ColumnString*>(col_from.get());
}
StringRef constant_default_value;
bool default_value_const = false;
bool default_value_null_const = false;
ColumnPtr default_value_col;
JsonBinaryValue default_jsonb_value_parser;
const ColumnString* default_value_str_col = nullptr;
const NullMap* default_value_nullmap = nullptr;
if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE) {
auto* state = reinterpret_cast<FunctionJsonbParseState*>(
context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
if (state && state->has_const_default_value) {
constant_default_value = state->default_value;
default_value_null_const = state->default_is_null;
default_value_const = true;
} else if (arguments.size() > 1) {
if (block.get_by_position(arguments[1]).type->get_primitive_type() !=
PrimitiveType::TYPE_JSONB) {
return Status::InvalidArgument(
"{} second argument should be jsonb type, but got {}", get_name(),
block.get_by_position(arguments[1]).type->get_name());
}
std::tie(default_value_col, default_value_const) =
unpack_if_const(block.get_by_position(arguments[1]).column);
if (default_value_const) {
const JsonbDocument* default_value_doc = nullptr;
if (default_value_col->is_null_at(0)) {
default_value_null_const = true;
} else {
auto data = default_value_col->get_data_at(0);
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(data.data, data.size,
&default_value_doc));
constant_default_value = data;
}
} else {
if (default_value_col->is_nullable()) {
const auto& nullable_col =
assert_cast<const ColumnNullable&>(*default_value_col);
default_value_str_col = assert_cast<const ColumnString*>(
nullable_col.get_nested_column_ptr().get());
default_value_nullmap = &(nullable_col.get_null_map_data());
} else {
default_value_str_col =
assert_cast<const ColumnString*>(default_value_col.get());
}
}
} else if (arguments.size() == 1) {
// parse default value '{}' should always success.
RETURN_IF_ERROR(default_jsonb_value_parser.from_json_string(std::string("{}")));
default_value_const = true;
constant_default_value.data = default_jsonb_value_parser.value();
constant_default_value.size = default_jsonb_value_parser.size();
}
}
auto col_to = ColumnString::create();
col_to->reserve(input_rows_count);
auto& null_map_data = null_map->get_data();
// parser can be reused for performance
JsonBinaryValue jsonb_value;
for (size_t i = 0; i < input_rows_count; ++i) {
if (is_nullable && null_map_data[i]) {
col_to->insert_default();
continue;
}
auto index = index_check_const(i, col_from_is_const);
const auto& val = col_from_string->get_data_at(index);
auto st = jsonb_value.from_json_string(val.data, val.size);
if (st.ok()) {
// insert jsonb format data
col_to->insert_data(jsonb_value.value(), jsonb_value.size());
} else {
if constexpr (parse_error_handle_mode == JsonbParseErrorMode::FAIL) {
return Status::InvalidArgument(
"Parse json document failed at row {}, error: {}", i, st.to_string());
} else if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_NULL) {
null_map_data[i] = 1;
col_to->insert_default();
} else {
if (default_value_const) {
if (default_value_null_const) {
null_map_data[i] = 1;
col_to->insert_default();
} else {
col_to->insert_data(constant_default_value.data,
constant_default_value.size);
}
} else {
if (default_value_nullmap && (*default_value_nullmap)[i]) {
null_map_data[i] = 1;
col_to->insert_default();
continue;
}
auto value = default_value_str_col->get_data_at(i);
col_to->insert_data(value.data, value.size);
}
}
}
}
if (is_nullable) {
block.replace_by_position(
result, ColumnNullable::create(std::move(col_to), std::move(null_map)));
} else {
block.replace_by_position(result, std::move(col_to));
}
return Status::OK();
}
};
// jsonb_parse return type nullable as input
using FunctionJsonbParse =
FunctionJsonbParseBase<NullalbeMode::FOLLOW_INPUT, JsonbParseErrorMode::FAIL>;
using FunctionJsonbParseErrorNull =
FunctionJsonbParseBase<NullalbeMode::NULLABLE, JsonbParseErrorMode::RETURN_NULL>;
using FunctionJsonbParseErrorValue =
FunctionJsonbParseBase<NullalbeMode::FOLLOW_INPUT, JsonbParseErrorMode::RETURN_VALUE>;
// func(jsonb, [varchar, varchar, ...]) -> nullable(type)
template <typename Impl>
class FunctionJsonbExtract : public IFunction {
public:
static constexpr auto name = Impl::name;
static constexpr auto alias = Impl::alias;
static FunctionPtr create() { return std::make_shared<FunctionJsonbExtract>(); }
String get_name() const override { return name; }
bool is_variadic() const override { return true; }
size_t get_number_of_arguments() const override { return 0; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<typename Impl::ReturnType>());
}
DataTypes get_variadic_argument_types_impl() const override {
if constexpr (vectorized::HasGetVariadicArgumentTypesImpl<Impl>) {
return Impl::get_variadic_argument_types_impl();
} else {
return {};
}
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
DCHECK_GE(arguments.size(), 2);
ColumnPtr jsonb_data_column;
bool jsonb_data_const = false;
const NullMap* data_null_map = nullptr;
if (block.get_by_position(arguments[0]).type->get_primitive_type() !=
PrimitiveType::TYPE_JSONB) {
return Status::InvalidArgument(
"jsonb_extract first argument should be json type, but got {}",
block.get_by_position(arguments[0]).type->get_name());
}
// prepare jsonb data column
std::tie(jsonb_data_column, jsonb_data_const) =
unpack_if_const(block.get_by_position(arguments[0]).column);
if (jsonb_data_column->is_nullable()) {
const auto& nullable_column = assert_cast<const ColumnNullable&>(*jsonb_data_column);
jsonb_data_column = nullable_column.get_nested_column_ptr();
data_null_map = &nullable_column.get_null_map_data();
}
const auto& ldata = assert_cast<const ColumnString*>(jsonb_data_column.get())->get_chars();
const auto& loffsets =
assert_cast<const ColumnString*>(jsonb_data_column.get())->get_offsets();
// prepare parse path column prepare
std::vector<const ColumnString*> jsonb_path_columns;
std::vector<bool> path_const(arguments.size() - 1);
std::vector<const NullMap*> path_null_maps(arguments.size() - 1, nullptr);
for (int i = 0; i < arguments.size() - 1; ++i) {
ColumnPtr path_column;
bool is_const = false;
std::tie(path_column, is_const) =
unpack_if_const(block.get_by_position(arguments[i + 1]).column);
path_const[i] = is_const;
if (path_column->is_nullable()) {
const auto& nullable_column = assert_cast<const ColumnNullable&>(*path_column);
path_column = nullable_column.get_nested_column_ptr();
path_null_maps[i] = &nullable_column.get_null_map_data();
}
jsonb_path_columns.push_back(assert_cast<const ColumnString*>(path_column.get()));
}
auto null_map = ColumnUInt8::create(input_rows_count, 0);
auto res = Impl::ColumnType::create();
// execute Impl
if constexpr (std::is_same_v<typename Impl::ReturnType, DataTypeString> ||
std::is_same_v<typename Impl::ReturnType, DataTypeJsonb>) {
auto& res_data = res->get_chars();
auto& res_offsets = res->get_offsets();
RETURN_IF_ERROR(Impl::vector_vector_v2(
context, ldata, loffsets, data_null_map, jsonb_data_const, jsonb_path_columns,
path_null_maps, path_const, res_data, res_offsets, null_map->get_data()));
} else {
// not support other extract type for now (e.g. int, double, ...)
DCHECK_EQ(jsonb_path_columns.size(), 1);
const auto& rdata = jsonb_path_columns[0]->get_chars();
const auto& roffsets = jsonb_path_columns[0]->get_offsets();
auto create_all_null_result = [&]() {
res = Impl::ColumnType::create();
res->insert_default();
auto nullable_column =
ColumnNullable::create(std::move(res), ColumnUInt8::create(1, 1));
auto const_column =
ColumnConst::create(std::move(nullable_column), input_rows_count);
block.get_by_position(result).column = std::move(const_column);
return Status::OK();
};
if (jsonb_data_const) {
if (data_null_map && (*data_null_map)[0]) {
return create_all_null_result();
}
RETURN_IF_ERROR(Impl::scalar_vector(context, jsonb_data_column->get_data_at(0),
rdata, roffsets, path_null_maps[0],
res->get_data(), null_map->get_data()));
} else if (path_const[0]) {
if (path_null_maps[0] && (*path_null_maps[0])[0]) {
return create_all_null_result();
}
RETURN_IF_ERROR(Impl::vector_scalar(context, ldata, loffsets, data_null_map,
jsonb_path_columns[0]->get_data_at(0),
res->get_data(), null_map->get_data()));
} else {
RETURN_IF_ERROR(Impl::vector_vector(context, ldata, loffsets, data_null_map, rdata,
roffsets, path_null_maps[0], res->get_data(),
null_map->get_data()));
}
}
block.get_by_position(result).column =
ColumnNullable::create(std::move(res), std::move(null_map));
return Status::OK();
}
};
class FunctionJsonbKeys : public IFunction {
public:
static constexpr auto name = "json_keys";
static constexpr auto alias = "jsonb_keys";
static FunctionPtr create() { return std::make_shared<FunctionJsonbKeys>(); }
String get_name() const override { return name; }
bool is_variadic() const override { return true; }
size_t get_number_of_arguments() const override { return 0; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(
std::make_shared<DataTypeArray>(make_nullable(std::make_shared<DataTypeString>())));
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
DCHECK_GE(arguments.size(), 1);
DCHECK(arguments.size() == 1 || arguments.size() == 2)
<< "json_keys should have 1 or 2 arguments, but got " << arguments.size();
const NullMap* data_null_map = nullptr;
const ColumnString* col_from_string = nullptr;
// prepare jsonb data column
auto&& [jsonb_data_column, json_data_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
if (jsonb_data_column->is_nullable()) {
const auto* nullable = check_and_get_column<ColumnNullable>(jsonb_data_column.get());
col_from_string =
assert_cast<const ColumnString*>(nullable->get_nested_column_ptr().get());
data_null_map = &nullable->get_null_map_data();
} else {
col_from_string = assert_cast<const ColumnString*>(jsonb_data_column.get());
}
// prepare parse path column prepare, maybe we do not have path column
ColumnPtr jsonb_path_column = nullptr;
const ColumnString* jsonb_path_col = nullptr;
bool path_const = false;
const NullMap* path_null_map = nullptr;
if (arguments.size() == 2) {
// we have should have a ColumnString for path
std::tie(jsonb_path_column, path_const) =
unpack_if_const(block.get_by_position(arguments[1]).column);
if (jsonb_path_column->is_nullable()) {
const auto* nullable =
check_and_get_column<ColumnNullable>(jsonb_path_column.get());
jsonb_path_column = nullable->get_nested_column_ptr();
path_null_map = &nullable->get_null_map_data();
}
jsonb_path_col = check_and_get_column<ColumnString>(jsonb_path_column.get());
}
auto null_map = ColumnUInt8::create(input_rows_count, 0);
NullMap& res_null_map = null_map->get_data();
auto dst_arr = ColumnArray::create(
ColumnNullable::create(ColumnString::create(), ColumnUInt8::create()),
ColumnArray::ColumnOffsets::create());
auto& dst_nested_column = assert_cast<ColumnNullable&>(dst_arr->get_data());
Status st = std::visit(
[&](auto data_const, auto has_path, auto path_const) {
return inner_loop_impl<data_const, has_path, path_const>(
input_rows_count, *dst_arr, dst_nested_column, res_null_map,
*col_from_string, data_null_map, jsonb_path_col, path_null_map);
},
vectorized::make_bool_variant(json_data_const),
vectorized::make_bool_variant(jsonb_path_column),
vectorized::make_bool_variant(path_const));
if (!st.ok()) {
return st;
}
block.get_by_position(result).column =
ColumnNullable::create(std::move(dst_arr), std::move(null_map));
return st;
}
private:
template <bool JSONB_DATA_CONST, bool JSONB_PATH_PARAM, bool JSON_PATH_CONST>
static ALWAYS_INLINE Status inner_loop_impl(size_t input_rows_count, ColumnArray& dst_arr,
ColumnNullable& dst_nested_column,
NullMap& res_null_map,
const ColumnString& col_from_string,
const NullMap* jsonb_data_nullmap,
const ColumnString* jsonb_path_column,
const NullMap* path_null_map) {
// if path is const, we just need to parse it once
JsonbPath const_path;
if constexpr (JSONB_PATH_PARAM && JSON_PATH_CONST) {
StringRef r_raw_ref = jsonb_path_column->get_data_at(0);
if (!const_path.seek(r_raw_ref.data, r_raw_ref.size)) {
return Status::InvalidArgument("Json path error: Invalid Json Path for value: {}",
r_raw_ref.to_string());
}
if (const_path.is_wildcard()) {
return Status::InvalidJsonPath(
"In this situation, path expressions may not contain the * and ** tokens "
"or an array range.");
}
}
for (size_t i = 0; i < input_rows_count; ++i) {
auto index = index_check_const(i, JSONB_DATA_CONST);
// if jsonb data is null or path column is null , we should return null
if (jsonb_data_nullmap && (*jsonb_data_nullmap)[index]) {
res_null_map[i] = 1;
dst_arr.insert_default();
continue;
}
if constexpr (JSONB_PATH_PARAM && !JSON_PATH_CONST) {
if (path_null_map && (*path_null_map)[i]) {
res_null_map[i] = 1;
dst_arr.insert_default();
continue;
}
}
auto json_data = col_from_string.get_data_at(index);
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(json_data.data, json_data.size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
dst_arr.clear();
return Status::InvalidArgument("jsonb data is invalid");
}
const JsonbValue* obj_val;
JsonbFindResult find_result;
if constexpr (JSONB_PATH_PARAM) {
if constexpr (!JSON_PATH_CONST) {
auto data = jsonb_path_column->get_data_at(i);
JsonbPath path;
if (!path.seek(data.data, data.size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {} at row: {}",
std::string_view(data.data, data.size), i);
}
if (path.is_wildcard()) {
return Status::InvalidJsonPath(
"In this situation, path expressions may not contain the * and ** "
"tokens "
"or an array range. at row: {}",
i);
}
find_result = doc->getValue()->findValue(path);
} else {
find_result = doc->getValue()->findValue(const_path);
}
obj_val = find_result.value;
} else {
obj_val = doc->getValue();
}
if (!obj_val || !obj_val->isObject()) {
// if jsonb data is not object we should return null
res_null_map[i] = 1;
dst_arr.insert_default();
continue;
}
const auto* obj = obj_val->unpack<ObjectVal>();
for (const auto& it : *obj) {
dst_nested_column.insert_data(it.getKeyStr(), it.klen());
}
dst_arr.get_offsets().push_back(dst_nested_column.size());
} //for
return Status::OK();
}
};
class FunctionJsonbExtractPath : public IFunction {
public:
static constexpr auto name = "json_exists_path";
static constexpr auto alias = "jsonb_exists_path";
using ColumnType = ColumnUInt8;
using Container = typename ColumnType::Container;
static FunctionPtr create() { return std::make_shared<FunctionJsonbExtractPath>(); }
String get_name() const override { return name; }
size_t get_number_of_arguments() const override { return 2; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
// it only needs to indicate existence and does not need to return nullable values.
const auto nullable = std::ranges::any_of(
arguments, [](const DataTypePtr& type) { return type->is_nullable(); });
if (nullable) {
return make_nullable(std::make_shared<DataTypeUInt8>());
} else {
return std::make_shared<DataTypeUInt8>();
}
}
bool use_default_implementation_for_nulls() const override { return false; }
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
// prepare jsonb data column
auto&& [jsonb_data_column, jsonb_data_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
const NullMap* data_null_map = nullptr;
const ColumnString* data_col = nullptr;
if (jsonb_data_column->is_nullable()) {
const auto* nullable = assert_cast<const ColumnNullable*>(jsonb_data_column.get());
data_col = assert_cast<const ColumnString*>(nullable->get_nested_column_ptr().get());
data_null_map = &nullable->get_null_map_data();
} else {
data_col = assert_cast<const ColumnString*>(jsonb_data_column.get());
}
const auto& ldata = data_col->get_chars();
const auto& loffsets = data_col->get_offsets();
// prepare parse path column prepare
auto&& [path_column, path_const] =
unpack_if_const(block.get_by_position(arguments[1]).column);
const ColumnString* path_col = nullptr;
const NullMap* path_null_map = nullptr;
if (path_column->is_nullable()) {
const auto* nullable = assert_cast<const ColumnNullable*>(path_column.get());
path_col = assert_cast<const ColumnString*>(nullable->get_nested_column_ptr().get());
path_null_map = &nullable->get_null_map_data();
} else {
path_col = assert_cast<const ColumnString*>(path_column.get());
}
DCHECK(!(jsonb_data_const && path_const))
<< "jsonb_data_const and path_const should not be both const";
auto create_all_null_result = [&]() {
auto res = ColumnType::create();
res->insert_default();
auto nullable_column =
ColumnNullable::create(std::move(res), ColumnUInt8::create(1, 1));
auto const_column = ColumnConst::create(std::move(nullable_column), input_rows_count);
block.get_by_position(result).column = std::move(const_column);
return Status::OK();
};
MutableColumnPtr result_null_map_column;
NullMap* result_null_map = nullptr;
if (data_null_map || path_null_map) {
result_null_map_column = ColumnUInt8::create(input_rows_count, 0);
result_null_map = &assert_cast<ColumnUInt8&>(*result_null_map_column).get_data();
if (data_null_map) {
VectorizedUtils::update_null_map(*result_null_map, *data_null_map,
jsonb_data_const);
}
if (path_null_map) {
VectorizedUtils::update_null_map(*result_null_map, *path_null_map, path_const);
}
if (0 == simd::count_zero_num(reinterpret_cast<const int8_t*>(result_null_map->data()),
input_rows_count)) {
return create_all_null_result();
}
}
auto res = ColumnType::create();
bool is_invalid_json_path = false;
const auto& rdata = path_col->get_chars();
const auto& roffsets = path_col->get_offsets();
if (jsonb_data_const) {
if (data_null_map && (*data_null_map)[0]) {
return create_all_null_result();
}
scalar_vector(context, data_col->get_data_at(0), rdata, roffsets, res->get_data(),
result_null_map, is_invalid_json_path);
} else if (path_const) {
if (path_null_map && (*path_null_map)[0]) {
return create_all_null_result();
}
vector_scalar(context, ldata, loffsets, path_col->get_data_at(0), res->get_data(),
result_null_map, is_invalid_json_path);
} else {
vector_vector(context, ldata, loffsets, rdata, roffsets, res->get_data(),
result_null_map, is_invalid_json_path);
}
if (is_invalid_json_path) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {}",
std::string_view(reinterpret_cast<const char*>(rdata.data()), rdata.size()));
}
if (result_null_map) {
auto nullabel_col =
ColumnNullable::create(std::move(res), std::move(result_null_map_column));
block.get_by_position(result).column = std::move(nullabel_col);
} else {
block.get_by_position(result).column = std::move(res);
}
return Status::OK();
}
private:
static ALWAYS_INLINE void inner_loop_impl(size_t i, Container& res, const char* l_raw_str,
size_t l_str_size, JsonbPath& path) {
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(l_raw_str, l_str_size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
return;
}
// value is NOT necessary to be deleted since JsonbValue will not allocate memory
auto result = doc->getValue()->findValue(path);
if (result.value) {
res[i] = 1;
}
}
static void vector_vector(FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets,
const ColumnString::Chars& rdata,
const ColumnString::Offsets& roffsets, Container& res,
const NullMap* result_null_map, bool& is_invalid_json_path) {
const size_t size = loffsets.size();
res.resize_fill(size, 0);
for (size_t i = 0; i < size; i++) {
if (result_null_map && (*result_null_map)[i]) {
continue;
}
const char* l_raw_str = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]);
int l_str_size = loffsets[i] - loffsets[i - 1];
const char* r_raw_str = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]);
int r_str_size = roffsets[i] - roffsets[i - 1];
JsonbPath path;
if (!path.seek(r_raw_str, r_str_size)) {
is_invalid_json_path = true;
return;
}
inner_loop_impl(i, res, l_raw_str, l_str_size, path);
}
}
static void scalar_vector(FunctionContext* context, const StringRef& ldata,
const ColumnString::Chars& rdata,
const ColumnString::Offsets& roffsets, Container& res,
const NullMap* result_null_map, bool& is_invalid_json_path) {
const size_t size = roffsets.size();
res.resize_fill(size, 0);
for (size_t i = 0; i < size; i++) {
if (result_null_map && (*result_null_map)[i]) {
continue;
}
const char* r_raw_str = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]);
int r_str_size = roffsets[i] - roffsets[i - 1];
JsonbPath path;
if (!path.seek(r_raw_str, r_str_size)) {
is_invalid_json_path = true;
return;
}
inner_loop_impl(i, res, ldata.data, ldata.size, path);
}
}
static void vector_scalar(FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets, const StringRef& rdata,
Container& res, const NullMap* result_null_map,
bool& is_invalid_json_path) {
const size_t size = loffsets.size();
res.resize_fill(size, 0);
JsonbPath path;
if (!path.seek(rdata.data, rdata.size)) {
is_invalid_json_path = true;
return;
}
for (size_t i = 0; i < size; i++) {
if (result_null_map && (*result_null_map)[i]) {
continue;
}
const char* l_raw_str = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]);
int l_str_size = loffsets[i] - loffsets[i - 1];
inner_loop_impl(i, res, l_raw_str, l_str_size, path);
}
}
};
template <typename ValueType>
struct JsonbExtractStringImpl {
using ReturnType = typename ValueType::ReturnType;
using ColumnType = typename ValueType::ColumnType;
private:
static ALWAYS_INLINE void inner_loop_impl(JsonbWriter* writer, size_t i,
ColumnString::Chars& res_data,
ColumnString::Offsets& res_offsets, NullMap& null_map,
std::unique_ptr<JsonbToJson>& formater,
const char* l_raw, size_t l_size, JsonbPath& path) {
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(l_raw, l_size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
return;
}
// value is NOT necessary to be deleted since JsonbValue will not allocate memory
auto find_result = doc->getValue()->findValue(path);
if (UNLIKELY(!find_result.value)) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
return;
}
if constexpr (ValueType::only_get_type) {
StringOP::push_value_string(std::string_view(find_result.value->typeName()), i,
res_data, res_offsets);
return;
} else {
static_assert(std::is_same_v<DataTypeJsonb, ReturnType>);
if constexpr (ValueType::no_quotes) {
if (find_result.value->isString()) {
const auto* str_value = find_result.value->unpack<JsonbStringVal>();
const auto* blob = str_value->getBlob();
if (str_value->length() > 1 && blob[0] == '"' &&
blob[str_value->length() - 1] == '"') {
writer->writeStartString();
writer->writeString(blob + 1, str_value->length() - 2);
writer->writeEndString();
StringOP::push_value_string(
std::string_view(writer->getOutput()->getBuffer(),
writer->getOutput()->getSize()),
i, res_data, res_offsets);
return;
}
}
}
writer->writeValueSimple(find_result.value);
StringOP::push_value_string(std::string_view(writer->getOutput()->getBuffer(),
writer->getOutput()->getSize()),
i, res_data, res_offsets);
}
}
public:
// for jsonb_extract_string
static Status vector_vector_v2(
FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets, const NullMap* l_null_map,
const bool& json_data_const,
const std::vector<const ColumnString*>& rdata_columns, // here we can support more paths
const std::vector<const NullMap*>& r_null_maps, const std::vector<bool>& path_const,
ColumnString::Chars& res_data, ColumnString::Offsets& res_offsets, NullMap& null_map) {
const size_t input_rows_count = null_map.size();
res_offsets.resize(input_rows_count);
auto writer = std::make_unique<JsonbWriter>();
std::unique_ptr<JsonbToJson> formater;
// reuseable json path list, espacially for const path
std::vector<JsonbPath> json_path_list;
json_path_list.resize(rdata_columns.size());
// lambda function to parse json path for row i and path pi
auto parse_json_path = [&](size_t i, size_t pi) -> Status {
const auto index = index_check_const(i, path_const[pi]);
const ColumnString* path_col = rdata_columns[pi];
const ColumnString::Chars& rdata = path_col->get_chars();
const ColumnString::Offsets& roffsets = path_col->get_offsets();
size_t r_off = roffsets[index - 1];
size_t r_size = roffsets[index] - r_off;
const char* r_raw = reinterpret_cast<const char*>(&rdata[r_off]);
JsonbPath path;
if (!path.seek(r_raw, r_size)) {
return Status::InvalidArgument("Json path error: Invalid Json Path for value: {}",
std::string_view(r_raw, r_size));
}
json_path_list[pi] = std::move(path);
return Status::OK();
};
for (size_t pi = 0; pi < rdata_columns.size(); pi++) {
if (path_const[pi]) {
if (r_null_maps[pi] && (*r_null_maps[pi])[0]) {
continue;
}
RETURN_IF_ERROR(parse_json_path(0, pi));
}
}
res_data.reserve(ldata.size());
for (size_t i = 0; i < input_rows_count; ++i) {
if (null_map[i]) {
continue;
}
const auto data_index = index_check_const(i, json_data_const);
if (l_null_map && (*l_null_map)[data_index]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
continue;
}
size_t l_off = loffsets[data_index - 1];
size_t l_size = loffsets[data_index] - l_off;
const char* l_raw = reinterpret_cast<const char*>(&ldata[l_off]);
if (rdata_columns.size() == 1) { // just return origin value
const auto path_index = index_check_const(i, path_const[0]);
if (r_null_maps[0] && (*r_null_maps[0])[path_index]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
continue;
}
if (!path_const[0]) {
RETURN_IF_ERROR(parse_json_path(i, 0));
}
writer->reset();
inner_loop_impl(writer.get(), i, res_data, res_offsets, null_map, formater, l_raw,
l_size, json_path_list[0]);
} else { // will make array string to user
writer->reset();
bool has_value = false;
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(l_raw, l_size, &doc);
for (size_t pi = 0; pi < rdata_columns.size(); ++pi) {
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
continue;
}
const auto path_index = index_check_const(i, path_const[pi]);
if (r_null_maps[pi] && (*r_null_maps[pi])[path_index]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
break;
}
if (!path_const[pi]) {
RETURN_IF_ERROR(parse_json_path(i, pi));
}
auto find_result = doc->getValue()->findValue(json_path_list[pi]);
if (find_result.value) {
if (!has_value) {
has_value = true;
writer->writeStartArray();
}
if (find_result.value->isArray() && find_result.is_wildcard) {
// To avoid getting results of nested array like [[1, 2, 3], [4, 5, 6]],
// if value is array, we should write all items in array, instead of write the array itself.
// finaly we will get results like [1, 2, 3, 4, 5, 6]
for (const auto& item : *find_result.value->unpack<ArrayVal>()) {
writer->writeValue(&item);
}
} else {
writer->writeValue(find_result.value);
}
}
}
if (has_value) {
writer->writeEndArray();
StringOP::push_value_string(std::string_view(writer->getOutput()->getBuffer(),
writer->getOutput()->getSize()),
i, res_data, res_offsets);
} else {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
}
}
} //for
return Status::OK();
}
static Status vector_vector(FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets, const NullMap* l_null_map,
const ColumnString::Chars& rdata,
const ColumnString::Offsets& roffsets, const NullMap* r_null_map,
ColumnString::Chars& res_data, ColumnString::Offsets& res_offsets,
NullMap& null_map) {
size_t input_rows_count = loffsets.size();
res_offsets.resize(input_rows_count);
std::unique_ptr<JsonbToJson> formater;
JsonbWriter writer;
for (size_t i = 0; i < input_rows_count; ++i) {
if (l_null_map && (*l_null_map)[i]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
continue;
}
if (r_null_map && (*r_null_map)[i]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
continue;
}
int l_size = loffsets[i] - loffsets[i - 1];
const char* l_raw = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]);
int r_size = roffsets[i] - roffsets[i - 1];
const char* r_raw = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]);
JsonbPath path;
if (!path.seek(r_raw, r_size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {} at row: {}",
std::string_view(r_raw, r_size), i);
}
writer.reset();
inner_loop_impl(&writer, i, res_data, res_offsets, null_map, formater, l_raw, l_size,
path);
} //for
return Status::OK();
} //function
static Status vector_scalar(FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets, const NullMap* l_null_map,
const StringRef& rdata, ColumnString::Chars& res_data,
ColumnString::Offsets& res_offsets, NullMap& null_map) {
size_t input_rows_count = loffsets.size();
res_offsets.resize(input_rows_count);
std::unique_ptr<JsonbToJson> formater;
JsonbPath path;
if (!path.seek(rdata.data, rdata.size)) {
return Status::InvalidArgument("Json path error: Invalid Json Path for value: {}",
std::string_view(rdata.data, rdata.size));
}
JsonbWriter writer;
for (size_t i = 0; i < input_rows_count; ++i) {
if (l_null_map && (*l_null_map)[i]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
continue;
}
int l_size = loffsets[i] - loffsets[i - 1];
const char* l_raw = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]);
writer.reset();
inner_loop_impl(&writer, i, res_data, res_offsets, null_map, formater, l_raw, l_size,
path);
} //for
return Status::OK();
} //function
static Status scalar_vector(FunctionContext* context, const StringRef& ldata,
const ColumnString::Chars& rdata,
const ColumnString::Offsets& roffsets, const NullMap* r_null_map,
ColumnString::Chars& res_data, ColumnString::Offsets& res_offsets,
NullMap& null_map) {
size_t input_rows_count = roffsets.size();
res_offsets.resize(input_rows_count);
std::unique_ptr<JsonbToJson> formater;
JsonbWriter writer;
for (size_t i = 0; i < input_rows_count; ++i) {
if (r_null_map && (*r_null_map)[i]) {
StringOP::push_null_string(i, res_data, res_offsets, null_map);
continue;
}
int r_size = roffsets[i] - roffsets[i - 1];
const char* r_raw = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]);
JsonbPath path;
if (!path.seek(r_raw, r_size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {} at row: {}",
std::string_view(r_raw, r_size), i);
}
writer.reset();
inner_loop_impl(&writer, i, res_data, res_offsets, null_map, formater, ldata.data,
ldata.size, path);
} //for
return Status::OK();
} //function
};
struct JsonbExtractIsnull {
static constexpr auto name = "json_extract_isnull";
static constexpr auto alias = "jsonb_extract_isnull";
using ReturnType = DataTypeUInt8;
using ColumnType = ColumnUInt8;
using Container = typename ColumnType::Container;
private:
static ALWAYS_INLINE void inner_loop_impl(size_t i, Container& res, NullMap& null_map,
const char* l_raw_str, size_t l_str_size,
JsonbPath& path) {
if (null_map[i]) {
res[i] = 0;
return;
}
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(l_raw_str, l_str_size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
null_map[i] = 1;
res[i] = 0;
return;
}
// value is NOT necessary to be deleted since JsonbValue will not allocate memory
auto find_result = doc->getValue()->findValue(path);
const auto* value = find_result.value;
if (UNLIKELY(!value)) {
null_map[i] = 1;
res[i] = 0;
return;
}
res[i] = value->isNull();
}
public:
// for jsonb_extract_int/int64/double
static Status vector_vector(FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets, const NullMap* l_null_map,
const ColumnString::Chars& rdata,
const ColumnString::Offsets& roffsets, const NullMap* r_null_map,
Container& res, NullMap& null_map) {
size_t size = loffsets.size();
res.resize(size);
for (size_t i = 0; i < loffsets.size(); i++) {
if ((l_null_map && (*l_null_map)[i]) || (r_null_map && (*r_null_map)[i])) {
res[i] = 0;
null_map[i] = 1;
continue;
}
const char* l_raw_str = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]);
int l_str_size = loffsets[i] - loffsets[i - 1];
const char* r_raw_str = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]);
int r_str_size = roffsets[i] - roffsets[i - 1];
JsonbPath path;
if (!path.seek(r_raw_str, r_str_size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {} at row: {}",
std::string_view(r_raw_str, r_str_size), i);
}
inner_loop_impl(i, res, null_map, l_raw_str, l_str_size, path);
} //for
return Status::OK();
} //function
static Status scalar_vector(FunctionContext* context, const StringRef& ldata,
const ColumnString::Chars& rdata,
const ColumnString::Offsets& roffsets, const NullMap* r_null_map,
Container& res, NullMap& null_map) {
size_t size = roffsets.size();
res.resize(size);
for (size_t i = 0; i < size; i++) {
if (r_null_map && (*r_null_map)[i]) {
res[i] = 0;
null_map[i] = 1;
continue;
}
const char* r_raw_str = reinterpret_cast<const char*>(&rdata[roffsets[i - 1]]);
int r_str_size = roffsets[i] - roffsets[i - 1];
JsonbPath path;
if (!path.seek(r_raw_str, r_str_size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {} at row: {}",
std::string_view(r_raw_str, r_str_size), i);
}
inner_loop_impl(i, res, null_map, ldata.data, ldata.size, path);
} //for
return Status::OK();
} //function
static Status vector_scalar(FunctionContext* context, const ColumnString::Chars& ldata,
const ColumnString::Offsets& loffsets, const NullMap* l_null_map,
const StringRef& rdata, Container& res, NullMap& null_map) {
size_t size = loffsets.size();
res.resize(size);
JsonbPath path;
if (!path.seek(rdata.data, rdata.size)) {
return Status::InvalidArgument("Json path error: Invalid Json Path for value: {}",
std::string_view(rdata.data, rdata.size));
}
for (size_t i = 0; i < loffsets.size(); i++) {
if (l_null_map && (*l_null_map)[i]) {
res[i] = 0;
null_map[i] = 1;
continue;
}
const char* l_raw_str = reinterpret_cast<const char*>(&ldata[loffsets[i - 1]]);
int l_str_size = loffsets[i] - loffsets[i - 1];
inner_loop_impl(i, res, null_map, l_raw_str, l_str_size, path);
} //for
return Status::OK();
} //function
};
struct JsonbTypeJson {
using T = std::string;
using ReturnType = DataTypeJsonb;
using ColumnType = ColumnString;
static const bool only_get_type = false;
static const bool no_quotes = false;
};
struct JsonbTypeJsonNoQuotes {
using T = std::string;
using ReturnType = DataTypeJsonb;
using ColumnType = ColumnString;
static const bool only_get_type = false;
static const bool no_quotes = true;
};
struct JsonbTypeType {
using T = std::string;
using ReturnType = DataTypeString;
using ColumnType = ColumnString;
static const bool only_get_type = true;
static const bool no_quotes = false;
};
struct JsonbExtractJsonb : public JsonbExtractStringImpl<JsonbTypeJson> {
static constexpr auto name = "jsonb_extract";
static constexpr auto alias = "json_extract";
};
struct JsonbExtractJsonbNoQuotes : public JsonbExtractStringImpl<JsonbTypeJsonNoQuotes> {
static constexpr auto name = "jsonb_extract_no_quotes";
static constexpr auto alias = "json_extract_no_quotes";
};
struct JsonbType : public JsonbExtractStringImpl<JsonbTypeType> {
static constexpr auto name = "json_type";
static constexpr auto alias = "jsonb_type";
};
using FunctionJsonbExists = FunctionJsonbExtractPath;
using FunctionJsonbType = FunctionJsonbExtract<JsonbType>;
using FunctionJsonbExtractIsnull = FunctionJsonbExtract<JsonbExtractIsnull>;
using FunctionJsonbExtractJsonb = FunctionJsonbExtract<JsonbExtractJsonb>;
using FunctionJsonbExtractJsonbNoQuotes = FunctionJsonbExtract<JsonbExtractJsonbNoQuotes>;
template <typename Impl>
class FunctionJsonbLength : public IFunction {
public:
static constexpr auto name = "json_length";
String get_name() const override { return name; }
static FunctionPtr create() { return std::make_shared<FunctionJsonbLength<Impl>>(); }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeInt32>());
}
DataTypes get_variadic_argument_types_impl() const override {
return Impl::get_variadic_argument_types();
}
size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}
bool use_default_implementation_for_nulls() const override { return false; }
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
return Impl::execute_impl(context, block, arguments, result, input_rows_count);
}
};
struct JsonbLengthUtil {
static Status jsonb_length_execute(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count) {
DCHECK_GE(arguments.size(), 2);
ColumnPtr jsonb_data_column;
bool jsonb_data_const = false;
// prepare jsonb data column
std::tie(jsonb_data_column, jsonb_data_const) =
unpack_if_const(block.get_by_position(arguments[0]).column);
ColumnPtr path_column;
bool is_const = false;
std::tie(path_column, is_const) =
unpack_if_const(block.get_by_position(arguments[1]).column);
auto null_map = ColumnUInt8::create(input_rows_count, 0);
auto return_type = block.get_data_type(result);
MutableColumnPtr res = return_type->create_column();
JsonbPath path;
if (is_const) {
if (path_column->is_null_at(0)) {
for (size_t i = 0; i < input_rows_count; ++i) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
}
block.replace_by_position(
result, ColumnNullable::create(std::move(res), std::move(null_map)));
return Status::OK();
}
auto path_value = path_column->get_data_at(0);
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument("Json path error: Invalid Json Path for value: {}",
std::string_view(path_value.data, path_value.size));
}
}
for (size_t i = 0; i < input_rows_count; ++i) {
if (jsonb_data_column->is_null_at(i) || path_column->is_null_at(i) ||
(jsonb_data_column->get_data_at(i).size == 0)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
if (!is_const) {
auto path_value = path_column->get_data_at(i);
path.clean();
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {}",
std::string_view(reinterpret_cast<const char*>(path_value.data),
path_value.size));
}
}
auto jsonb_value = jsonb_data_column->get_data_at(i);
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
const JsonbDocument* doc = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(jsonb_value.data,
jsonb_value.size, &doc));
auto find_result = doc->getValue()->findValue(path);
const auto* value = find_result.value;
if (UNLIKELY(!value)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
auto length = value->numElements();
res->insert_data(const_cast<const char*>((char*)&length), 0);
}
block.replace_by_position(result,
ColumnNullable::create(std::move(res), std::move(null_map)));
return Status::OK();
}
};
struct JsonbLengthAndPathImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeJsonb>(), std::make_shared<DataTypeString>()};
}
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count) {
return JsonbLengthUtil::jsonb_length_execute(context, block, arguments, result,
input_rows_count);
}
};
template <typename Impl>
class FunctionJsonbContains : public IFunction {
public:
static constexpr auto name = "json_contains";
String get_name() const override { return name; }
static FunctionPtr create() { return std::make_shared<FunctionJsonbContains<Impl>>(); }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeUInt8>());
}
DataTypes get_variadic_argument_types_impl() const override {
return Impl::get_variadic_argument_types();
}
size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}
bool use_default_implementation_for_nulls() const override { return false; }
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
return Impl::execute_impl(context, block, arguments, result, input_rows_count);
}
};
struct JsonbContainsUtil {
static Status jsonb_contains_execute(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count) {
DCHECK_GE(arguments.size(), 3);
auto jsonb_data1_column = block.get_by_position(arguments[0]).column;
auto jsonb_data2_column = block.get_by_position(arguments[1]).column;
ColumnPtr path_column;
bool is_const = false;
std::tie(path_column, is_const) =
unpack_if_const(block.get_by_position(arguments[2]).column);
auto null_map = ColumnUInt8::create(input_rows_count, 0);
auto return_type = block.get_data_type(result);
MutableColumnPtr res = return_type->create_column();
JsonbPath path;
if (is_const) {
if (path_column->is_null_at(0)) {
for (size_t i = 0; i < input_rows_count; ++i) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
}
block.replace_by_position(
result, ColumnNullable::create(std::move(res), std::move(null_map)));
return Status::OK();
}
auto path_value = path_column->get_data_at(0);
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument("Json path error: Invalid Json Path for value: {}",
std::string_view(path_value.data, path_value.size));
}
}
for (size_t i = 0; i < input_rows_count; ++i) {
if (jsonb_data1_column->is_null_at(i) || jsonb_data2_column->is_null_at(i) ||
path_column->is_null_at(i)) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
if (!is_const) {
auto path_value = path_column->get_data_at(i);
path.clean();
if (!path.seek(path_value.data, path_value.size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {}",
std::string_view(path_value.data, path_value.size));
}
}
auto jsonb_value1 = jsonb_data1_column->get_data_at(i);
auto jsonb_value2 = jsonb_data2_column->get_data_at(i);
if (jsonb_value1.size == 0 || jsonb_value2.size == 0) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
// doc is NOT necessary to be deleted since JsonbDocument will not allocate memory
const JsonbDocument* doc1 = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(jsonb_value1.data,
jsonb_value1.size, &doc1));
const JsonbDocument* doc2 = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(jsonb_value2.data,
jsonb_value2.size, &doc2));
auto find_result = doc1->getValue()->findValue(path);
const auto* value1 = find_result.value;
const JsonbValue* value2 = doc2->getValue();
if (!value1 || !value2) {
null_map->get_data()[i] = 1;
res->insert_data(nullptr, 0);
continue;
}
auto contains_value = value1->contains(value2);
res->insert_data(const_cast<const char*>((char*)&contains_value), 0);
}
block.replace_by_position(result,
ColumnNullable::create(std::move(res), std::move(null_map)));
return Status::OK();
}
};
template <bool ignore_null>
class FunctionJsonbArray : public IFunction {
public:
static constexpr auto name = "json_array";
static constexpr auto alias = "jsonb_array";
static FunctionPtr create() { return std::make_shared<FunctionJsonbArray>(); }
String get_name() const override { return name; }
size_t get_number_of_arguments() const override { return 0; }
bool is_variadic() const override { return true; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeJsonb>();
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
auto return_data_type = std::make_shared<DataTypeJsonb>();
auto column = return_data_type->create_column();
column->reserve(input_rows_count);
JsonbWriter writer;
for (size_t i = 0; i < input_rows_count; ++i) {
writer.writeStartArray();
for (auto argument : arguments) {
auto&& [arg_column, is_const] =
unpack_if_const(block.get_by_position(argument).column);
if (arg_column->is_nullable()) {
const auto& nullable_column =
assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(
*arg_column);
const auto& null_map = nullable_column.get_null_map_data();
const auto& nested_column = nullable_column.get_nested_column();
const auto& jsonb_column =
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
nested_column);
auto index = index_check_const(i, is_const);
if (null_map[index]) {
if constexpr (ignore_null) {
continue;
} else {
writer.writeNull();
}
} else {
auto jsonb_binary = jsonb_column.get_data_at(index);
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(jsonb_binary.data,
jsonb_binary.size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
if constexpr (ignore_null) {
continue;
} else {
writer.writeNull();
}
} else {
writer.writeValue(doc->getValue());
}
}
} else {
const auto& jsonb_column =
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
*arg_column);
auto index = index_check_const(i, is_const);
auto jsonb_binary = jsonb_column.get_data_at(index);
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(jsonb_binary.data,
jsonb_binary.size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
if constexpr (ignore_null) {
continue;
} else {
writer.writeNull();
}
} else {
writer.writeValue(doc->getValue());
}
}
}
writer.writeEndArray();
column->insert_data(writer.getOutput()->getBuffer(), writer.getOutput()->getSize());
writer.reset();
}
block.get_by_position(result).column = std::move(column);
return Status::OK();
}
};
class FunctionJsonbObject : public IFunction {
public:
static constexpr auto name = "json_object";
static constexpr auto alias = "jsonb_object";
static FunctionPtr create() { return std::make_shared<FunctionJsonbObject>(); }
String get_name() const override { return name; }
size_t get_number_of_arguments() const override { return 0; }
bool is_variadic() const override { return true; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return std::make_shared<DataTypeJsonb>();
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
if (arguments.size() % 2 != 0) {
return Status::InvalidArgument(
"JSON object must have an even number of arguments, but got: {}",
arguments.size());
}
auto return_data_type = std::make_shared<DataTypeJsonb>();
auto write_key = [](JsonbWriter& writer, const ColumnString& key_col, const bool is_const,
const NullMap* null_map, const size_t arg_index, const size_t row_idx) {
auto index = index_check_const(row_idx, is_const);
if (null_map && (*null_map)[index]) {
return Status::InvalidArgument(
"JSON documents may not contain NULL member name(argument "
"index: "
"{}, row index: {})",
row_idx, arg_index);
}
auto key_string = key_col.get_data_at(index);
if (key_string.size > 255) {
return Status::InvalidArgument(
"JSON object keys(argument index: {}) must be less than 256 "
"bytes, but got size: {}",
arg_index, key_string.size);
}
writer.writeKey(key_string.data, static_cast<uint8_t>(key_string.size));
return Status::OK();
};
auto write_value = [](JsonbWriter& writer, const ColumnString& value_col,
const bool is_const, const NullMap* null_map, const size_t arg_index,
const size_t row_idx) {
auto index = index_check_const(row_idx, is_const);
if (null_map && (*null_map)[index]) {
writer.writeNull();
return Status::OK();
}
auto value_string = value_col.get_data_at(index);
const JsonbDocument* doc = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(value_string.data,
value_string.size, &doc));
writer.writeValue(doc->getValue());
return Status::OK();
};
for (size_t arg_idx = 0; arg_idx != arguments.size(); arg_idx += 2) {
auto key_argument = arguments[arg_idx];
auto value_argument = arguments[arg_idx + 1];
auto& key_data_type = block.get_by_position(key_argument).type;
auto& value_data_type = block.get_by_position(value_argument).type;
if (!is_string_type(key_data_type->get_primitive_type())) {
return Status::InvalidArgument(
"JSON object key(argument index: {}) must be String, but got type: "
"{}(primitive type: {})",
arg_idx, key_data_type->get_name(),
static_cast<int>(key_data_type->get_primitive_type()));
}
if (value_data_type->get_primitive_type() != PrimitiveType::TYPE_JSONB) {
return Status::InvalidArgument(
"JSON object value(argument index: {}) must be JSON, but got type: {}",
arg_idx, value_data_type->get_name());
}
}
auto column = return_data_type->create_column();
column->reserve(input_rows_count);
JsonbWriter writer;
for (size_t i = 0; i != input_rows_count; ++i) {
writer.writeStartObject();
for (size_t arg_idx = 0; arg_idx != arguments.size(); arg_idx += 2) {
auto key_argument = arguments[arg_idx];
auto value_argument = arguments[arg_idx + 1];
auto&& [key_column, key_const] =
unpack_if_const(block.get_by_position(key_argument).column);
auto&& [value_column, value_const] =
unpack_if_const(block.get_by_position(value_argument).column);
if (key_column->is_nullable()) {
const auto& nullable_column =
assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(
*key_column);
const auto& null_map = nullable_column.get_null_map_data();
const auto& nested_column = nullable_column.get_nested_column();
const auto& key_arg_column =
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
nested_column);
RETURN_IF_ERROR(
write_key(writer, key_arg_column, key_const, &null_map, arg_idx, i));
} else {
const auto& key_arg_column =
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
*key_column);
RETURN_IF_ERROR(
write_key(writer, key_arg_column, key_const, nullptr, arg_idx, i));
}
if (value_column->is_nullable()) {
const auto& nullable_column =
assert_cast<const ColumnNullable&, TypeCheckOnRelease::DISABLE>(
*value_column);
const auto& null_map = nullable_column.get_null_map_data();
const auto& nested_column = nullable_column.get_nested_column();
const auto& value_arg_column =
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
nested_column);
RETURN_IF_ERROR(write_value(writer, value_arg_column, value_const, &null_map,
arg_idx + 1, i));
} else {
const auto& value_arg_column =
assert_cast<const ColumnString&, TypeCheckOnRelease::DISABLE>(
*value_column);
RETURN_IF_ERROR(write_value(writer, value_arg_column, value_const, nullptr,
arg_idx + 1, i));
}
}
writer.writeEndObject();
column->insert_data(writer.getOutput()->getBuffer(), writer.getOutput()->getSize());
writer.reset();
}
block.get_by_position(result).column = std::move(column);
return Status::OK();
}
};
enum class JsonbModifyType { Insert, Set, Replace };
template <JsonbModifyType modify_type>
struct JsonbModifyName {
static constexpr auto name = "jsonb_modify";
static constexpr auto alias = "json_modify";
};
template <>
struct JsonbModifyName<JsonbModifyType::Insert> {
static constexpr auto name = "jsonb_insert";
static constexpr auto alias = "json_insert";
};
template <>
struct JsonbModifyName<JsonbModifyType::Set> {
static constexpr auto name = "jsonb_set";
static constexpr auto alias = "json_set";
};
template <>
struct JsonbModifyName<JsonbModifyType::Replace> {
static constexpr auto name = "jsonb_replace";
static constexpr auto alias = "json_replace";
};
template <JsonbModifyType modify_type>
class FunctionJsonbModify : public IFunction {
public:
static constexpr auto name = JsonbModifyName<modify_type>::name;
static constexpr auto alias = JsonbModifyName<modify_type>::alias;
static FunctionPtr create() { return std::make_shared<FunctionJsonbModify<modify_type>>(); }
String get_name() const override { return name; }
size_t get_number_of_arguments() const override { return 0; }
bool is_variadic() const override { return true; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeJsonb>());
}
Status create_all_null_result(const DataTypePtr& return_data_type, Block& block,
uint32_t result, size_t input_rows_count) const {
auto result_column = return_data_type->create_column();
result_column->insert_default();
auto const_column = ColumnConst::create(std::move(result_column), input_rows_count);
block.get_by_position(result).column = std::move(const_column);
return Status::OK();
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
if (arguments.size() % 2 != 1 || arguments.size() < 3) {
return Status::InvalidArgument(
"Function {} must have an odd number of arguments and more than 2 arguments, "
"but got: {}",
name, arguments.size());
}
const size_t keys_count = (arguments.size() - 1) / 2;
auto return_data_type = make_nullable(std::make_shared<DataTypeJsonb>());
auto result_column = return_data_type->create_column();
auto& result_nullable_col = assert_cast<ColumnNullable&>(*result_column);
auto& null_map = result_nullable_col.get_null_map_data();
auto& res_string_column =
assert_cast<ColumnString&>(result_nullable_col.get_nested_column());
auto& res_chars = res_string_column.get_chars();
auto& res_offsets = res_string_column.get_offsets();
null_map.resize_fill(input_rows_count, 0);
res_offsets.resize(input_rows_count);
auto&& [json_data_arg_column, json_data_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
if (json_data_const) {
if (json_data_arg_column->is_null_at(0)) {
return create_all_null_result(return_data_type, block, result, input_rows_count);
}
}
std::vector<const ColumnString*> json_path_columns(keys_count);
std::vector<bool> json_path_constant(keys_count);
std::vector<const NullMap*> json_path_null_maps(keys_count, nullptr);
std::vector<const ColumnString*> json_value_columns(keys_count);
std::vector<bool> json_value_constant(keys_count);
std::vector<const NullMap*> json_value_null_maps(keys_count, nullptr);
const NullMap* json_data_null_map = nullptr;
const ColumnString* json_data_column;
if (json_data_arg_column->is_nullable()) {
const auto& nullable_column = assert_cast<const ColumnNullable&>(*json_data_arg_column);
json_data_null_map = &nullable_column.get_null_map_data();
const auto& nested_column = nullable_column.get_nested_column();
json_data_column = assert_cast<const ColumnString*>(&nested_column);
} else {
json_data_column = assert_cast<const ColumnString*>(json_data_arg_column.get());
}
for (size_t i = 1; i < arguments.size(); i += 2) {
auto&& [path_column, path_const] =
unpack_if_const(block.get_by_position(arguments[i]).column);
auto&& [value_column, value_const] =
unpack_if_const(block.get_by_position(arguments[i + 1]).column);
if (path_const) {
if (path_column->is_null_at(0)) {
return create_all_null_result(return_data_type, block, result,
input_rows_count);
}
}
json_path_constant[i / 2] = path_const;
if (path_column->is_nullable()) {
const auto& nullable_column = assert_cast<const ColumnNullable&>(*path_column);
json_path_null_maps[i / 2] = &nullable_column.get_null_map_data();
const auto& nested_column = nullable_column.get_nested_column();
json_path_columns[i / 2] = assert_cast<const ColumnString*>(&nested_column);
} else {
json_path_columns[i / 2] = assert_cast<const ColumnString*>(path_column.get());
}
json_value_constant[i / 2] = value_const;
if (value_column->is_nullable()) {
const auto& nullable_column = assert_cast<const ColumnNullable&>(*value_column);
json_value_null_maps[i / 2] = &nullable_column.get_null_map_data();
const auto& nested_column = nullable_column.get_nested_column();
json_value_columns[i / 2] = assert_cast<const ColumnString*>(&nested_column);
} else {
json_value_columns[i / 2] = assert_cast<const ColumnString*>(value_column.get());
}
}
DorisVector<const JsonbDocument*> json_documents(input_rows_count);
if (json_data_const) {
auto json_data_string = json_data_column->get_data_at(0);
const JsonbDocument* doc = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(json_data_string.data,
json_data_string.size, &doc));
if (!doc || !doc->getValue()) [[unlikely]] {
return create_all_null_result(return_data_type, block, result, input_rows_count);
}
for (size_t i = 0; i != input_rows_count; ++i) {
json_documents[i] = doc;
}
} else {
for (size_t i = 0; i != input_rows_count; ++i) {
if (json_data_null_map && (*json_data_null_map)[i]) {
null_map[i] = 1;
json_documents[i] = nullptr;
continue;
}
auto json_data_string = json_data_column->get_data_at(i);
const JsonbDocument* doc = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(json_data_string.data,
json_data_string.size, &doc));
if (!doc || !doc->getValue()) [[unlikely]] {
null_map[i] = 1;
continue;
}
json_documents[i] = doc;
}
}
DorisVector<DorisVector<JsonbPath>> json_paths(keys_count);
DorisVector<DorisVector<const JsonbValue*>> json_values(keys_count);
RETURN_IF_ERROR(parse_paths_and_values(json_paths, json_values, arguments, input_rows_count,
json_path_columns, json_path_constant,
json_path_null_maps, json_value_columns,
json_value_constant, json_value_null_maps));
JsonbWriter writer;
struct DocumentBuffer {
DorisUniqueBufferPtr<char> ptr;
size_t size = 0;
size_t capacity = 0;
};
DocumentBuffer tmp_buffer;
for (size_t row_idx = 0; row_idx != input_rows_count; ++row_idx) {
for (size_t i = 1; i < arguments.size(); i += 2) {
const size_t index = i / 2;
auto& json_path = json_paths[index];
auto& json_value = json_values[index];
const auto path_index = index_check_const(row_idx, json_path_constant[index]);
const auto value_index = index_check_const(row_idx, json_value_constant[index]);
if (null_map[row_idx]) {
continue;
}
if (json_documents[row_idx] == nullptr) {
null_map[row_idx] = 1;
continue;
}
if (json_path_null_maps[index] && (*json_path_null_maps[index])[path_index]) {
null_map[row_idx] = 1;
continue;
}
auto find_result =
json_documents[row_idx]->getValue()->findValue(json_path[path_index]);
if (find_result.is_wildcard) {
return Status::InvalidArgument(
" In this situation, path expressions may not contain the * and ** "
"tokens or an array range, argument index: {}, row index: {}",
i, row_idx);
}
if constexpr (modify_type == JsonbModifyType::Insert) {
if (find_result.value) {
continue;
}
} else if constexpr (modify_type == JsonbModifyType::Replace) {
if (!find_result.value) {
continue;
}
}
std::vector<const JsonbValue*> parents;
bool replace = false;
parents.emplace_back(json_documents[row_idx]->getValue());
if (find_result.value) {
// find target path, replace it with the new value.
replace = true;
if (!build_parents_by_path(json_documents[row_idx]->getValue(),
json_path[path_index], parents)) {
DCHECK(false);
continue;
}
} else {
// does not find target path, insert the new value.
JsonbPath new_path;
for (size_t j = 0; j < json_path[path_index].get_leg_vector_size() - 1; ++j) {
auto* current_leg = json_path[path_index].get_leg_from_leg_vector(j);
std::unique_ptr<leg_info> leg = std::make_unique<leg_info>(
current_leg->leg_ptr, current_leg->leg_len,
current_leg->array_index, current_leg->type);
new_path.add_leg_to_leg_vector(std::move(leg));
}
if (!build_parents_by_path(json_documents[row_idx]->getValue(), new_path,
parents)) {
continue;
}
}
const auto legs_count = json_path[path_index].get_leg_vector_size();
leg_info* last_leg =
legs_count > 0
? json_path[path_index].get_leg_from_leg_vector(legs_count - 1)
: nullptr;
RETURN_IF_ERROR(write_json_value(json_documents[row_idx]->getValue(), parents, 0,
json_value[value_index], replace, last_leg,
writer));
auto* writer_output = writer.getOutput();
if (writer_output->getSize() > tmp_buffer.capacity) {
tmp_buffer.capacity =
((size_t(writer_output->getSize()) + 1024 - 1) / 1024) * 1024;
tmp_buffer.ptr = make_unique_buffer<char>(tmp_buffer.capacity);
DCHECK_LE(writer_output->getSize(), tmp_buffer.capacity);
}
memcpy(tmp_buffer.ptr.get(), writer_output->getBuffer(), writer_output->getSize());
tmp_buffer.size = writer_output->getSize();
writer.reset();
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(
tmp_buffer.ptr.get(), tmp_buffer.size, &json_documents[row_idx]));
}
if (!null_map[row_idx]) {
const auto* jsonb_document = json_documents[row_idx];
const auto size = jsonb_document->numPackedBytes();
res_chars.insert(reinterpret_cast<const char*>(jsonb_document),
reinterpret_cast<const char*>(jsonb_document) + size);
}
res_offsets[row_idx] = static_cast<uint32_t>(res_chars.size());
if (!null_map[row_idx]) {
auto* ptr = res_chars.data() + res_offsets[row_idx - 1];
auto size = res_offsets[row_idx] - res_offsets[row_idx - 1];
const JsonbDocument* doc = nullptr;
THROW_IF_ERROR(JsonbDocument::checkAndCreateDocument(
reinterpret_cast<const char*>(ptr), size, &doc));
}
}
block.get_by_position(result).column = std::move(result_column);
return Status::OK();
}
bool build_parents_by_path(const JsonbValue* root, const JsonbPath& path,
std::vector<const JsonbValue*>& parents) const {
const size_t index = parents.size() - 1;
if (index == path.get_leg_vector_size()) {
return true;
}
JsonbPath current;
auto* current_leg = path.get_leg_from_leg_vector(index);
std::unique_ptr<leg_info> leg =
std::make_unique<leg_info>(current_leg->leg_ptr, current_leg->leg_len,
current_leg->array_index, current_leg->type);
current.add_leg_to_leg_vector(std::move(leg));
auto find_result = root->findValue(current);
if (!find_result.value) {
std::string path_string;
current.to_string(&path_string);
return false;
} else if (find_result.value == root) {
return true;
} else {
parents.emplace_back(find_result.value);
}
return build_parents_by_path(find_result.value, path, parents);
}
Status write_json_value(const JsonbValue* root, const std::vector<const JsonbValue*>& parents,
const size_t parent_index, const JsonbValue* value, const bool replace,
const leg_info* last_leg, JsonbWriter& writer) const {
if (parent_index >= parents.size()) {
return Status::InvalidArgument(
"JsonbModify: parent_index {} is out of bounds for parents size {}",
parent_index, parents.size());
}
if (parents[parent_index] != root) {
return Status::InvalidArgument(
"JsonbModify: parent value does not match root value, parent_index: {}, "
"parents size: {}",
parent_index, parents.size());
}
if (parent_index == parents.size() - 1 && replace) {
// We are at the last parent, write the value directly
if (value == nullptr) {
writer.writeNull();
} else {
writer.writeValue(value);
}
return Status::OK();
}
bool value_written = false;
bool is_last_parent = (parent_index == parents.size() - 1);
const auto* next_parent = is_last_parent ? nullptr : parents[parent_index + 1];
if (root->isArray()) {
writer.writeStartArray();
const auto* array_val = root->unpack<ArrayVal>();
for (int i = 0; i != array_val->numElem(); ++i) {
auto* it = array_val->get(i);
if (is_last_parent && last_leg->array_index == i) {
value_written = true;
writer.writeValue(value);
} else if (it == next_parent) {
value_written = true;
RETURN_IF_ERROR(write_json_value(it, parents, parent_index + 1, value, replace,
last_leg, writer));
} else {
writer.writeValue(it);
}
}
if (is_last_parent && !value_written) {
value_written = true;
writer.writeValue(value);
}
writer.writeEndArray();
} else {
/**
Because even for a non-array object, `$[0]` can still point to that object:
```
select json_extract('{"key": "value"}', '$[0]');
+------------------------------------------+
| json_extract('{"key": "value"}', '$[0]') |
+------------------------------------------+
| {"key": "value"} |
+------------------------------------------+
```
So when inserting an element into `$[1]`, even if '$' does not represent an array,
it should be converted to an array before insertion:
```
select json_insert('123','$[1]', null);
+---------------------------------+
| json_insert('123','$[1]', null) |
+---------------------------------+
| [123, null] |
+---------------------------------+
```
*/
if (is_last_parent && last_leg && last_leg->type == ARRAY_CODE) {
writer.writeStartArray();
writer.writeValue(root);
writer.writeValue(value);
writer.writeEndArray();
return Status::OK();
} else if (root->isObject()) {
writer.writeStartObject();
const auto* object_val = root->unpack<ObjectVal>();
for (const auto& it : *object_val) {
writer.writeKey(it.getKeyStr(), it.klen());
if (it.value() == next_parent) {
value_written = true;
RETURN_IF_ERROR(write_json_value(it.value(), parents, parent_index + 1,
value, replace, last_leg, writer));
} else {
writer.writeValue(it.value());
}
}
if (is_last_parent && !value_written) {
value_written = true;
writer.writeStartObject();
writer.writeKey(last_leg->leg_ptr, static_cast<uint8_t>(last_leg->leg_len));
writer.writeValue(value);
writer.writeEndObject();
}
writer.writeEndObject();
} else {
return Status::InvalidArgument("Cannot insert value into this type");
}
}
if (!value_written) {
return Status::InvalidArgument(
"JsonbModify: value not written, parent_index: {}, parents size: {}",
parent_index, parents.size());
}
return Status::OK();
}
Status parse_paths_and_values(DorisVector<DorisVector<JsonbPath>>& json_paths,
DorisVector<DorisVector<const JsonbValue*>>& json_values,
const ColumnNumbers& arguments, const size_t input_rows_count,
const std::vector<const ColumnString*>& json_path_columns,
const std::vector<bool>& json_path_constant,
const std::vector<const NullMap*>& json_path_null_maps,
const std::vector<const ColumnString*>& json_value_columns,
const std::vector<bool>& json_value_constant,
const std::vector<const NullMap*>& json_value_null_maps) const {
for (size_t i = 1; i < arguments.size(); i += 2) {
const size_t index = i / 2;
const auto* json_path_column = json_path_columns[index];
const auto* value_column = json_value_columns[index];
json_paths[index].resize(json_path_constant[index] ? 1 : input_rows_count);
json_values[index].resize(json_value_constant[index] ? 1 : input_rows_count, nullptr);
for (size_t row_idx = 0; row_idx != json_paths[index].size(); ++row_idx) {
if (json_path_null_maps[index] && (*json_path_null_maps[index])[row_idx]) {
continue;
}
auto path_string = json_path_column->get_data_at(row_idx);
if (!json_paths[index][row_idx].seek(path_string.data, path_string.size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {}, "
"argument "
"index: {}, row index: {}",
std::string_view(path_string.data, path_string.size), i, row_idx);
}
if (json_paths[index][row_idx].is_wildcard()) {
return Status::InvalidArgument(
"In this situation, path expressions may not contain the * and ** "
"tokens, argument index: {}, row index: {}",
i, row_idx);
}
}
for (size_t row_idx = 0; row_idx != json_values[index].size(); ++row_idx) {
if (json_value_null_maps[index] && (*json_value_null_maps[index])[row_idx]) {
continue;
}
auto value_string = value_column->get_data_at(row_idx);
const JsonbDocument* doc = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(value_string.data,
value_string.size, &doc));
if (doc) {
json_values[index][row_idx] = doc->getValue();
}
}
}
return Status::OK();
}
};
struct JsonbContainsAndPathImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeJsonb>(), std::make_shared<DataTypeJsonb>(),
std::make_shared<DataTypeString>()};
}
static Status execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count) {
return JsonbContainsUtil::jsonb_contains_execute(context, block, arguments, result,
input_rows_count);
}
};
class FunctionJsonSearch : public IFunction {
private:
using OneFun = std::function<Status(size_t, bool*)>;
static Status always_one(size_t i, bool* res) {
*res = true;
return Status::OK();
}
static Status always_all(size_t i, bool* res) {
*res = false;
return Status::OK();
}
using CheckNullFun = std::function<bool(size_t)>;
static bool always_not_null(size_t) { return false; }
using GetJsonStringRefFun = std::function<StringRef(size_t)>;
Status matched(const std::string_view& str, LikeState* state, unsigned char* res) const {
StringRef pattern; // not used
StringRef value_val(str.data(), str.size());
return (state->scalar_function)(&state->search_state, value_val, pattern, res);
}
/**
* Recursive search for matching string, if found, the result will be added to a vector
* @param element json element
* @param one_match
* @param search_str
* @param cur_path
* @param matches The path that has already been matched
* @return true if matched else false
*/
bool find_matches(const JsonbValue* element, const bool& one_match, LikeState* state,
JsonbPath* cur_path, std::unordered_set<std::string>* matches) const {
if (element->isString()) {
const auto* json_string = element->unpack<JsonbStringVal>();
const std::string_view element_str(json_string->getBlob(), json_string->length());
unsigned char res;
RETURN_IF_ERROR(matched(element_str, state, &res));
if (res) {
std::string str;
auto valid = cur_path->to_string(&str);
if (!valid) {
return false;
}
return matches->insert(str).second;
} else {
return false;
}
} else if (element->isObject()) {
const auto* object = element->unpack<ObjectVal>();
bool find = false;
for (const auto& item : *object) {
Slice key(item.getKeyStr(), item.klen());
const auto* child_element = item.value();
// construct an object member path leg.
auto leg = std::make_unique<leg_info>(key.data, key.size, 0, MEMBER_CODE);
cur_path->add_leg_to_leg_vector(std::move(leg));
find |= find_matches(child_element, one_match, state, cur_path, matches);
cur_path->pop_leg_from_leg_vector();
if (one_match && find) {
return true;
}
}
return find;
} else if (element->isArray()) {
const auto* array = element->unpack<ArrayVal>();
bool find = false;
for (int i = 0; i < array->numElem(); ++i) {
auto leg = std::make_unique<leg_info>(nullptr, 0, i, ARRAY_CODE);
cur_path->add_leg_to_leg_vector(std::move(leg));
const auto* child_element = array->get(i);
// construct an array cell path leg.
find |= find_matches(child_element, one_match, state, cur_path, matches);
cur_path->pop_leg_from_leg_vector();
if (one_match && find) {
return true;
}
}
return find;
} else {
return false;
}
}
void make_result_str(JsonbWriter& writer, std::unordered_set<std::string>& matches,
ColumnString* result_col) const {
if (matches.size() == 1) {
for (const auto& str_ref : matches) {
writer.writeStartString();
writer.writeString(str_ref);
writer.writeEndString();
}
} else {
writer.writeStartArray();
for (const auto& str_ref : matches) {
writer.writeStartString();
writer.writeString(str_ref);
writer.writeEndString();
}
writer.writeEndArray();
}
result_col->insert_data(writer.getOutput()->getBuffer(),
(size_t)writer.getOutput()->getSize());
}
template <bool search_is_const>
Status execute_vector(Block& block, size_t input_rows_count, CheckNullFun json_null_check,
GetJsonStringRefFun col_json_string, CheckNullFun one_null_check,
OneFun one_check, CheckNullFun search_null_check,
const ColumnString* col_search_string, FunctionContext* context,
size_t result) const {
auto result_col = ColumnString::create();
auto null_map = ColumnUInt8::create(input_rows_count, 0);
std::shared_ptr<LikeState> state_ptr;
LikeState* state = nullptr;
if (search_is_const) {
state = reinterpret_cast<LikeState*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
}
bool is_one = false;
JsonbWriter writer;
for (size_t i = 0; i < input_rows_count; ++i) {
// an error occurs if the json_doc argument is not a valid json document.
if (json_null_check(i)) {
null_map->get_data()[i] = 1;
result_col->insert_data("", 0);
continue;
}
const auto& json_doc_str = col_json_string(i);
const JsonbDocument* json_doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(json_doc_str.data, json_doc_str.size,
&json_doc);
if (!st.ok()) {
return Status::InvalidArgument(
"the json_doc argument at row {} is not a valid json document: {}", i,
st.to_string());
}
if (!one_null_check(i)) {
RETURN_IF_ERROR(one_check(i, &is_one));
}
if (one_null_check(i) || search_null_check(i)) {
null_map->get_data()[i] = 1;
result_col->insert_data("", 0);
continue;
}
// an error occurs if any path argument is not a valid path expression.
std::string root_path_str = "$";
JsonbPath root_path;
root_path.seek(root_path_str.c_str(), root_path_str.size());
std::vector<JsonbPath*> paths;
paths.push_back(&root_path);
if (!search_is_const) {
state_ptr = std::make_shared<LikeState>();
state_ptr->is_like_pattern = true;
const auto& search_str = col_search_string->get_data_at(i);
RETURN_IF_ERROR(FunctionLike::construct_like_const_state(context, search_str,
state_ptr, false));
state = state_ptr.get();
}
// maintain a hashset to deduplicate matches.
std::unordered_set<std::string> matches;
for (const auto& item : paths) {
auto* cur_path = item;
auto find = find_matches(json_doc->getValue(), is_one, state, cur_path, &matches);
if (is_one && find) {
break;
}
}
if (matches.empty()) {
// returns NULL if the search_str is not found in the document.
null_map->get_data()[i] = 1;
result_col->insert_data("", 0);
continue;
}
writer.reset();
make_result_str(writer, matches, result_col.get());
}
auto result_col_nullable =
ColumnNullable::create(std::move(result_col), std::move(null_map));
block.replace_by_position(result, std::move(result_col_nullable));
return Status::OK();
}
static constexpr auto one = "one";
static constexpr auto all = "all";
public:
static constexpr auto name = "json_search";
static FunctionPtr create() { return std::make_shared<FunctionJsonSearch>(); }
String get_name() const override { return name; }
bool is_variadic() const override { return false; }
size_t get_number_of_arguments() const override { return 3; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeJsonb>());
}
bool use_default_implementation_for_nulls() const override { return false; }
Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override {
if (scope != FunctionContext::THREAD_LOCAL) {
return Status::OK();
}
if (context->is_col_constant(2)) {
std::shared_ptr<LikeState> state = std::make_shared<LikeState>();
state->is_like_pattern = true;
const auto pattern_col = context->get_constant_col(2)->column_ptr;
const auto& pattern = pattern_col->get_data_at(0);
RETURN_IF_ERROR(
FunctionLike::construct_like_const_state(context, pattern, state, false));
context->set_function_state(scope, state);
}
return Status::OK();
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
// the json_doc, one_or_all, and search_str must be given.
// and we require the positions are static.
if (arguments.size() < 3) {
return Status::InvalidArgument("too few arguments for function {}", name);
}
if (arguments.size() > 3) {
return Status::NotSupported("escape and path params are not support now");
}
CheckNullFun json_null_check = always_not_null;
GetJsonStringRefFun get_json_fun;
// prepare jsonb data column
auto&& [col_json, json_is_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
const auto* col_json_string = check_and_get_column<ColumnString>(col_json.get());
if (const auto* nullable = check_and_get_column<ColumnNullable>(col_json.get())) {
col_json_string =
check_and_get_column<ColumnString>(nullable->get_nested_column_ptr().get());
}
if (!col_json_string) {
return Status::RuntimeError("Illegal arg json {} should be ColumnString",
col_json->get_name());
}
auto create_all_null_result = [&]() {
auto res_str = ColumnString::create();
res_str->insert_default();
auto res = ColumnNullable::create(std::move(res_str), ColumnUInt8::create(1, 1));
if (input_rows_count > 1) {
block.get_by_position(result).column =
ColumnConst::create(std::move(res), input_rows_count);
} else {
block.get_by_position(result).column = std::move(res);
}
return Status::OK();
};
if (json_is_const) {
if (col_json->is_null_at(0)) {
return create_all_null_result();
} else {
const auto& json_str = col_json_string->get_data_at(0);
get_json_fun = [json_str](size_t i) { return json_str; };
}
} else {
json_null_check = [col_json](size_t i) { return col_json->is_null_at(i); };
get_json_fun = [col_json_string](size_t i) { return col_json_string->get_data_at(i); };
}
// one_or_all
CheckNullFun one_null_check = always_not_null;
OneFun one_check = always_one;
auto&& [col_one, one_is_const] =
unpack_if_const(block.get_by_position(arguments[1]).column);
one_is_const |= input_rows_count == 1;
const auto* col_one_string = check_and_get_column<ColumnString>(col_one.get());
if (const auto* nullable = check_and_get_column<ColumnNullable>(col_one.get())) {
col_one_string = check_and_get_column<ColumnString>(*nullable->get_nested_column_ptr());
}
if (!col_one_string) {
return Status::RuntimeError("Illegal arg one {} should be ColumnString",
col_one->get_name());
}
if (one_is_const) {
if (col_one->is_null_at(0)) {
return create_all_null_result();
} else {
const auto& one_or_all = col_one_string->get_data_at(0);
std::string one_or_all_str = one_or_all.to_string();
if (strcasecmp(one_or_all_str.c_str(), all) == 0) {
one_check = always_all;
} else if (strcasecmp(one_or_all_str.c_str(), one) == 0) {
// nothing
} else {
// an error occurs if the one_or_all argument is not 'one' nor 'all'.
return Status::InvalidArgument(
"the one_or_all argument {} is not 'one' not 'all'", one_or_all_str);
}
}
} else {
one_null_check = [col_one](size_t i) { return col_one->is_null_at(i); };
one_check = [col_one_string](size_t i, bool* is_one) {
const auto& one_or_all = col_one_string->get_data_at(i);
std::string one_or_all_str = one_or_all.to_string();
if (strcasecmp(one_or_all_str.c_str(), all) == 0) {
*is_one = false;
} else if (strcasecmp(one_or_all_str.c_str(), one) == 0) {
*is_one = true;
} else {
// an error occurs if the one_or_all argument is not 'one' nor 'all'.
return Status::InvalidArgument(
"the one_or_all argument {} is not 'one' not 'all'", one_or_all_str);
}
return Status::OK();
};
}
// search_str
auto&& [col_search, search_is_const] =
unpack_if_const(block.get_by_position(arguments[2]).column);
const auto* col_search_string = check_and_get_column<ColumnString>(col_search.get());
if (const auto* nullable = check_and_get_column<ColumnNullable>(col_search.get())) {
col_search_string =
check_and_get_column<ColumnString>(*nullable->get_nested_column_ptr());
}
if (!col_search_string) {
return Status::RuntimeError("Illegal arg pattern {} should be ColumnString",
col_search->get_name());
}
if (search_is_const) {
CheckNullFun search_null_check = always_not_null;
if (col_search->is_null_at(0)) {
return create_all_null_result();
}
RETURN_IF_ERROR(execute_vector<true>(
block, input_rows_count, json_null_check, get_json_fun, one_null_check,
one_check, search_null_check, col_search_string, context, result));
} else {
CheckNullFun search_null_check = [col_search](size_t i) {
return col_search->is_null_at(i);
};
RETURN_IF_ERROR(execute_vector<false>(
block, input_rows_count, json_null_check, get_json_fun, one_null_check,
one_check, search_null_check, col_search_string, context, result));
}
return Status::OK();
}
};
struct DocumentBuffer {
std::unique_ptr<char[]> ptr;
size_t size = 0;
size_t capacity = 0;
};
class FunctionJsonbRemove : public IFunction {
public:
static constexpr auto name = "jsonb_remove";
static constexpr auto alias = "json_remove";
static FunctionPtr create() { return std::make_shared<FunctionJsonbRemove>(); }
String get_name() const override { return name; }
size_t get_number_of_arguments() const override { return 0; }
bool is_variadic() const override { return true; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeJsonb>());
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
DCHECK_GE(arguments.size(), 2);
// Check if arguments count is valid (json_doc + at least one path)
if (arguments.size() < 2) {
return Status::InvalidArgument("json_remove requires at least 2 arguments");
}
auto return_data_type = make_nullable(std::make_shared<DataTypeJsonb>());
auto result_column = return_data_type->create_column();
auto& nullable_column = assert_cast<ColumnNullable&>(*result_column);
auto& res_chars =
assert_cast<ColumnString&>(nullable_column.get_nested_column()).get_chars();
auto& res_offsets =
assert_cast<ColumnString&>(nullable_column.get_nested_column()).get_offsets();
auto& null_map = nullable_column.get_null_map_data();
res_chars.reserve(input_rows_count * 64);
res_offsets.resize(input_rows_count);
null_map.resize_fill(input_rows_count, 0);
// Get JSON document column
auto [json_column, json_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
const auto* json_nullable = check_and_get_column<ColumnNullable>(json_column.get());
const ColumnString* json_data_column = nullptr;
const NullMap* json_null_map = nullptr;
if (json_nullable) {
json_null_map = &json_nullable->get_null_map_data();
json_data_column =
check_and_get_column<ColumnString>(&json_nullable->get_nested_column());
} else {
json_data_column = check_and_get_column<ColumnString>(json_column.get());
}
if (!json_data_column) {
return Status::InvalidArgument("First argument must be a JSON document");
}
// Parse paths
std::vector<const ColumnString*> path_columns;
std::vector<const NullMap*> path_null_maps;
std::vector<bool> path_constants;
for (size_t i = 1; i < arguments.size(); ++i) {
auto [path_column, path_const] =
unpack_if_const(block.get_by_position(arguments[i]).column);
const auto* path_nullable = check_and_get_column<ColumnNullable>(path_column.get());
if (path_nullable) {
path_null_maps.push_back(&path_nullable->get_null_map_data());
path_columns.push_back(
check_and_get_column<ColumnString>(&path_nullable->get_nested_column()));
} else {
path_null_maps.push_back(nullptr);
path_columns.push_back(check_and_get_column<ColumnString>(path_column.get()));
}
if (!path_columns.back()) {
return Status::InvalidArgument(
fmt::format("Argument {} must be a string path", i + 1));
}
path_constants.push_back(path_const);
}
// Reusable JsonbWriter for performance
JsonbWriter writer;
for (size_t row_idx = 0; row_idx < input_rows_count; ++row_idx) {
size_t json_idx = index_check_const(row_idx, json_const);
// Check if JSON document is null
if (json_null_map && (*json_null_map)[json_idx]) {
null_map[row_idx] = 1;
res_offsets[row_idx] = static_cast<uint32_t>(res_chars.size());
continue;
}
// Parse JSON document
const auto& json_data = json_data_column->get_data_at(json_idx);
const JsonbDocument* json_doc = nullptr;
Status parse_status = JsonbDocument::checkAndCreateDocument(json_data.data,
json_data.size, &json_doc);
if (!parse_status.ok() || !json_doc) {
null_map[row_idx] = 1;
res_offsets[row_idx] = static_cast<uint32_t>(res_chars.size());
continue;
}
// Check if any path is null
bool has_null_path = false;
for (size_t path_idx = 0; path_idx < path_columns.size(); ++path_idx) {
size_t idx = index_check_const(row_idx, path_constants[path_idx]);
if (path_null_maps[path_idx] && (*path_null_maps[path_idx])[idx]) {
has_null_path = true;
break;
}
}
if (has_null_path) {
null_map[row_idx] = 1;
res_offsets[row_idx] = static_cast<uint32_t>(res_chars.size());
continue;
}
std::vector<JsonbPath> paths;
std::vector<bool> path_constants_vec;
for (size_t path_idx = 0; path_idx < path_columns.size(); ++path_idx) {
size_t idx = index_check_const(row_idx, path_constants[path_idx]);
const auto& path_data = path_columns[path_idx]->get_data_at(idx);
JsonbPath path;
if (!path.seek(path_data.data, path_data.size)) {
return Status::InvalidArgument(
"Json path error: Invalid Json Path for value: {} at row: {}",
std::string_view(path_data.data, path_data.size), row_idx);
}
if (path.is_wildcard() || path.is_supper_wildcard()) {
return Status::InvalidArgument(
"In this situation, path expressions may not contain the * and ** "
"tokens or an array range, argument index: {}, row index: {}",
path_idx + 1, row_idx);
}
paths.push_back(std::move(path));
path_constants_vec.push_back(path_constants[path_idx]);
}
const JsonbValue* current_value = json_doc->getValue();
DocumentBuffer tmp_buffer;
for (size_t path_idx = 0; path_idx < paths.size(); ++path_idx) {
writer.reset();
auto find_result = current_value->findValue(paths[path_idx]);
if (find_result.is_wildcard) {
continue;
}
if (find_result.value) {
RETURN_IF_ERROR(clone_without_path(current_value, paths[path_idx], writer));
auto* writer_output = writer.getOutput();
if (writer_output->getSize() > tmp_buffer.capacity) {
tmp_buffer.capacity =
((size_t(writer_output->getSize()) + 1024 - 1) / 1024) * 1024;
tmp_buffer.ptr = std::make_unique<char[]>(tmp_buffer.capacity);
DCHECK_LE(writer_output->getSize(), tmp_buffer.capacity);
}
memcpy(tmp_buffer.ptr.get(), writer_output->getBuffer(),
writer_output->getSize());
tmp_buffer.size = writer_output->getSize();
const JsonbDocument* new_doc = nullptr;
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(
tmp_buffer.ptr.get(), tmp_buffer.size, &new_doc));
current_value = new_doc->getValue();
}
}
const JsonbDocument* modified_doc = nullptr;
if (current_value != json_doc->getValue()) {
RETURN_IF_ERROR(JsonbDocument::checkAndCreateDocument(
tmp_buffer.ptr.get(), tmp_buffer.size, &modified_doc));
} else {
modified_doc = json_doc;
}
// Write the final result
const auto size = modified_doc->numPackedBytes();
res_chars.insert(reinterpret_cast<const char*>(modified_doc),
reinterpret_cast<const char*>(modified_doc) + size);
res_offsets[row_idx] = static_cast<uint32_t>(res_chars.size());
}
block.get_by_position(result).column = std::move(result_column);
return Status::OK();
}
private:
Status clone_without_path(const JsonbValue* root, const JsonbPath& path,
JsonbWriter& writer) const {
// Start writing at the root level
if (root->isObject()) {
writer.writeStartObject();
RETURN_IF_ERROR(clone_object_without_path(root, path, 0, writer));
writer.writeEndObject();
} else if (root->isArray()) {
writer.writeStartArray();
RETURN_IF_ERROR(clone_array_without_path(root, path, 0, writer));
writer.writeEndArray();
} else {
// Primitive value - can't remove anything from it
writer.writeValue(root);
}
return Status::OK();
}
Status clone_object_without_path(const JsonbValue* obj_value, const JsonbPath& path,
size_t depth, JsonbWriter& writer) const {
const auto* obj = obj_value->unpack<ObjectVal>();
for (const auto& kv : *obj) {
std::string key(kv.getKeyStr(), kv.klen());
if (depth < path.get_leg_vector_size()) {
const auto* leg = path.get_leg_from_leg_vector(depth);
if (leg->type == MEMBER_CODE) {
std::string target_key(leg->leg_ptr, leg->leg_len);
if (key == target_key) {
if (depth == path.get_leg_vector_size() - 1) {
continue;
} else {
writer.writeKey(kv.getKeyStr(), kv.klen());
if (kv.value()->isObject()) {
writer.writeStartObject();
RETURN_IF_ERROR(clone_object_without_path(kv.value(), path,
depth + 1, writer));
writer.writeEndObject();
} else if (kv.value()->isArray()) {
writer.writeStartArray();
RETURN_IF_ERROR(clone_array_without_path(kv.value(), path,
depth + 1, writer));
writer.writeEndArray();
} else {
writer.writeValue(kv.value());
}
}
} else {
writer.writeKey(kv.getKeyStr(), kv.klen());
writer.writeValue(kv.value());
}
} else {
writer.writeKey(kv.getKeyStr(), kv.klen());
writer.writeValue(kv.value());
}
} else {
writer.writeKey(kv.getKeyStr(), kv.klen());
writer.writeValue(kv.value());
}
}
return Status::OK();
}
Status clone_array_without_path(const JsonbValue* arr_value, const JsonbPath& path,
size_t depth, JsonbWriter& writer) const {
const auto* arr = arr_value->unpack<ArrayVal>();
int index = 0;
for (const auto& element : *arr) {
if (depth < path.get_leg_vector_size()) {
const auto* leg = path.get_leg_from_leg_vector(depth);
if (leg->type == ARRAY_CODE) {
int target_index = leg->array_index;
if (index == target_index) {
if (depth == path.get_leg_vector_size() - 1) {
// This is the target element to remove - skip it
} else {
if (element.isObject()) {
writer.writeStartObject();
RETURN_IF_ERROR(clone_object_without_path(&element, path, depth + 1,
writer));
writer.writeEndObject();
} else if (element.isArray()) {
writer.writeStartArray();
RETURN_IF_ERROR(clone_array_without_path(&element, path, depth + 1,
writer));
writer.writeEndArray();
} else {
writer.writeValue(&element);
}
}
} else {
writer.writeValue(&element);
}
} else {
writer.writeValue(&element);
}
} else {
writer.writeValue(&element);
}
index++;
}
return Status::OK();
}
};
class FunctionStripNullValue : public IFunction {
public:
static constexpr auto name = "strip_null_value";
static FunctionPtr create() { return std::make_shared<FunctionStripNullValue>(); }
String get_name() const override { return name; }
bool is_variadic() const override { return false; }
size_t get_number_of_arguments() const override { return 1; }
bool use_default_implementation_for_nulls() const override { return false; }
DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
return make_nullable(std::make_shared<DataTypeJsonb>());
}
Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
const auto& arg_column = block.get_by_position(arguments[0]).column;
const ColumnString* json_column = nullptr;
const NullMap* json_null_map = nullptr;
if (arg_column->is_nullable()) {
const auto& nullable_col = assert_cast<const ColumnNullable&>(*arg_column);
json_column = assert_cast<const ColumnString*>(&nullable_col.get_nested_column());
json_null_map = &nullable_col.get_null_map_data();
} else {
json_column = assert_cast<const ColumnString*>(arg_column.get());
}
auto return_data_type = make_nullable(std::make_shared<DataTypeJsonb>());
auto result_column = return_data_type->create_column();
auto& result_nullmap = assert_cast<ColumnNullable&>(*result_column).get_null_map_data();
auto& result_data_col = assert_cast<ColumnString&>(
assert_cast<ColumnNullable&>(*result_column).get_nested_column());
result_nullmap.resize_fill(input_rows_count, 0);
for (size_t i = 0; i != input_rows_count; ++i) {
if (json_null_map && (*json_null_map)[i]) {
result_nullmap[i] = 1;
result_data_col.insert_default();
continue;
}
const JsonbDocument* json_doc = nullptr;
const auto& json_str = json_column->get_data_at(i);
RETURN_IF_ERROR(
JsonbDocument::checkAndCreateDocument(json_str.data, json_str.size, &json_doc));
if (json_doc) [[likely]] {
if (json_doc->getValue()->isNull()) {
result_nullmap[i] = 1;
result_data_col.insert_default();
} else {
result_nullmap[i] = 0;
result_data_col.insert_data(json_str.data, json_str.size);
}
} else {
result_nullmap[i] = 1;
result_data_col.insert_default();
}
}
block.get_by_position(result).column = std::move(result_column);
return Status::OK();
}
};
void register_function_jsonb(SimpleFunctionFactory& factory) {
factory.register_function<FunctionJsonbParse>(FunctionJsonbParse::name);
factory.register_alias(FunctionJsonbParse::name, FunctionJsonbParse::alias);
factory.register_function<FunctionJsonbParseErrorNull>("json_parse_error_to_null");
factory.register_alias("json_parse_error_to_null", "jsonb_parse_error_to_null");
factory.register_function<FunctionJsonbParseErrorValue>("json_parse_error_to_value");
factory.register_alias("json_parse_error_to_value", "jsonb_parse_error_to_value");
factory.register_function<FunctionJsonbExists>();
factory.register_alias(FunctionJsonbExists::name, FunctionJsonbExists::alias);
factory.register_function<FunctionJsonbType>();
factory.register_alias(FunctionJsonbType::name, FunctionJsonbType::alias);
factory.register_function<FunctionJsonbKeys>();
factory.register_alias(FunctionJsonbKeys::name, FunctionJsonbKeys::alias);
factory.register_function<FunctionJsonbExtractIsnull>();
factory.register_alias(FunctionJsonbExtractIsnull::name, FunctionJsonbExtractIsnull::alias);
factory.register_function<FunctionJsonbExtractJsonb>();
factory.register_alias(FunctionJsonbExtractJsonb::name, FunctionJsonbExtractJsonb::alias);
factory.register_function<FunctionJsonbExtractJsonbNoQuotes>();
factory.register_alias(FunctionJsonbExtractJsonbNoQuotes::name,
FunctionJsonbExtractJsonbNoQuotes::alias);
factory.register_function<FunctionJsonbLength<JsonbLengthAndPathImpl>>();
factory.register_function<FunctionJsonbContains<JsonbContainsAndPathImpl>>();
factory.register_function<FunctionJsonSearch>();
factory.register_function<FunctionJsonbArray<false>>();
factory.register_alias(FunctionJsonbArray<false>::name, FunctionJsonbArray<false>::alias);
factory.register_function<FunctionJsonbArray<true>>("json_array_ignore_null");
factory.register_alias("json_array_ignore_null", "jsonb_array_ignore_null");
factory.register_function<FunctionJsonbObject>();
factory.register_alias(FunctionJsonbObject::name, FunctionJsonbObject::alias);
factory.register_function<FunctionJsonbModify<JsonbModifyType::Insert>>();
factory.register_alias(FunctionJsonbModify<JsonbModifyType::Insert>::name,
FunctionJsonbModify<JsonbModifyType::Insert>::alias);
factory.register_function<FunctionJsonbModify<JsonbModifyType::Set>>();
factory.register_alias(FunctionJsonbModify<JsonbModifyType::Set>::name,
FunctionJsonbModify<JsonbModifyType::Set>::alias);
factory.register_function<FunctionJsonbModify<JsonbModifyType::Replace>>();
factory.register_alias(FunctionJsonbModify<JsonbModifyType::Replace>::name,
FunctionJsonbModify<JsonbModifyType::Replace>::alias);
factory.register_function<FunctionJsonbRemove>();
factory.register_alias(FunctionJsonbRemove::name, FunctionJsonbRemove::alias);
factory.register_function<FunctionStripNullValue>();
}
} // namespace doris::vectorized