blob: 741caed69656dd2311d9d02f5a1d252a8a739cd4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exprs/table_function/vjson_each.h"
#include <glog/logging.h>
#include <ostream>
#include <string>
#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_const.h"
#include "core/column/column_struct.h"
#include "core/string_ref.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "util/jsonb_document.h"
#include "util/jsonb_utils.h"
#include "util/jsonb_writer.h"
namespace doris {
#include "common/compile_check_begin.h"
template <bool TEXT_MODE>
VJsonEachTableFunction<TEXT_MODE>::VJsonEachTableFunction() {
_fn_name = TEXT_MODE ? "vjson_each_text" : "vjson_each";
}
template <bool TEXT_MODE>
Status VJsonEachTableFunction<TEXT_MODE>::process_init(Block* block, RuntimeState* /*state*/) {
ColumnPtr value_column;
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute_column(
_expr_context.get(), block, nullptr, block->rows(), value_column));
auto [col, is_const] = unpack_if_const(value_column);
_json_column = col;
_is_const = is_const;
return Status::OK();
}
// Helper: insert one JsonbValue as plain text into a ColumnNullable<ColumnString>.
// For strings: raw blob content (quotes stripped, matching json_each_text PG semantics).
// For null JSON values: SQL NULL (insert_default).
// For all others (numbers, bools, objects, arrays): JSON text representation.
static void insert_value_as_text(const JsonbValue* value, MutableColumnPtr& col) {
if (value == nullptr || value->isNull()) {
col->insert_default();
return;
}
if (value->isString()) {
const auto* str_val = value->unpack<JsonbStringVal>();
col->insert_data(str_val->getBlob(), str_val->getBlobLen());
} else {
JsonbToJson converter;
std::string text = converter.to_json_string(value);
col->insert_data(text.data(), text.size());
}
}
// Helper: insert one JsonbValue in JSONB binary form into a ColumnNullable<ColumnString>.
// For null JSON values: SQL NULL (insert_default).
// For all others: write JSONB binary via JsonbWriter.
static void insert_value_as_json(const JsonbValue* value, MutableColumnPtr& col,
JsonbWriter& writer) {
if (value == nullptr || value->isNull()) {
col->insert_default();
return;
}
writer.reset();
writer.writeValue(value);
const auto* buf = writer.getOutput()->getBuffer();
size_t len = writer.getOutput()->getSize();
col->insert_data(buf, len);
}
template <bool TEXT_MODE>
void VJsonEachTableFunction<TEXT_MODE>::process_row(size_t row_idx) {
TableFunction::process_row(row_idx);
if (_is_const && _cur_size > 0) {
return;
}
StringRef text;
const size_t idx = _is_const ? 0 : row_idx;
if (const auto* nullable_col = check_and_get_column<ColumnNullable>(*_json_column)) {
if (nullable_col->is_null_at(idx)) {
return;
}
text = assert_cast<const ColumnString&>(nullable_col->get_nested_column()).get_data_at(idx);
} else {
text = assert_cast<const ColumnString&>(*_json_column).get_data_at(idx);
}
const JsonbDocument* doc = nullptr;
auto st = JsonbDocument::checkAndCreateDocument(text.data, text.size, &doc);
if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
return;
}
const JsonbValue* jv = doc->getValue();
if (!jv->isObject()) {
return;
}
const auto* obj = jv->unpack<ObjectVal>();
_cur_size = obj->numElem();
if (_cur_size == 0) {
return;
}
_kv_pairs.first = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create());
_kv_pairs.second = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create());
_kv_pairs.first->reserve(_cur_size);
_kv_pairs.second->reserve(_cur_size);
if constexpr (TEXT_MODE) {
for (const auto& kv : *obj) {
_kv_pairs.first->insert_data(kv.getKeyStr(), kv.klen());
insert_value_as_text(kv.value(), _kv_pairs.second);
}
} else {
JsonbWriter writer;
for (const auto& kv : *obj) {
_kv_pairs.first->insert_data(kv.getKeyStr(), kv.klen());
insert_value_as_json(kv.value(), _kv_pairs.second, writer);
}
}
}
template <bool TEXT_MODE>
void VJsonEachTableFunction<TEXT_MODE>::process_close() {
_json_column = nullptr;
_kv_pairs.first = nullptr;
_kv_pairs.second = nullptr;
_cur_size = 0;
}
template <bool TEXT_MODE>
void VJsonEachTableFunction<TEXT_MODE>::get_same_many_values(MutableColumnPtr& column, int length) {
if (current_empty()) {
column->insert_many_defaults(length);
return;
}
ColumnStruct* ret;
if (_is_nullable) {
auto* nullable = assert_cast<ColumnNullable*>(column.get());
ret = assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get())
->insert_many_defaults(length);
} else {
ret = assert_cast<ColumnStruct*>(column.get());
}
ret->get_column(0).insert_many_from(*_kv_pairs.first, _cur_offset, length);
ret->get_column(1).insert_many_from(*_kv_pairs.second, _cur_offset, length);
}
template <bool TEXT_MODE>
int VJsonEachTableFunction<TEXT_MODE>::get_value(MutableColumnPtr& column, int max_step) {
max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
if (current_empty()) {
column->insert_default();
max_step = 1;
} else {
ColumnStruct* struct_col = nullptr;
if (_is_nullable) {
auto* nullable_col = assert_cast<ColumnNullable*>(column.get());
struct_col = assert_cast<ColumnStruct*>(nullable_col->get_nested_column_ptr().get());
assert_cast<ColumnUInt8*>(nullable_col->get_null_map_column_ptr().get())
->insert_many_defaults(max_step);
} else {
struct_col = assert_cast<ColumnStruct*>(column.get());
}
struct_col->get_column(0).insert_range_from(*_kv_pairs.first, _cur_offset, max_step);
struct_col->get_column(1).insert_range_from(*_kv_pairs.second, _cur_offset, max_step);
}
forward(max_step);
return max_step;
}
// // Explicit template instantiations
template class VJsonEachTableFunction<false>; // json_each
template class VJsonEachTableFunction<true>; // json_each_text
#include "common/compile_check_end.h"
} // namespace doris