// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exprs/table_function/vjson_each.h"

#include <glog/logging.h>

#include <ostream>
#include <string>

#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/column/column.h"
#include "core/column/column_const.h"
#include "core/column/column_struct.h"
#include "core/string_ref.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "util/jsonb_document.h"
#include "util/jsonb_utils.h"
#include "util/jsonb_writer.h"

namespace doris {
#include "common/compile_check_begin.h"

template <bool TEXT_MODE>
VJsonEachTableFunction<TEXT_MODE>::VJsonEachTableFunction() {
    _fn_name = TEXT_MODE ? "vjson_each_text" : "vjson_each";
}

template <bool TEXT_MODE>
Status VJsonEachTableFunction<TEXT_MODE>::process_init(Block* block, RuntimeState* /*state*/) {
    ColumnPtr value_column;
    RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute_column(
            _expr_context.get(), block, nullptr, block->rows(), value_column));
    auto [col, is_const] = unpack_if_const(value_column);
    _json_column = col;
    _is_const = is_const;
    return Status::OK();
}

// Helper: insert one JsonbValue as plain text into a ColumnNullable<ColumnString>.
// For strings: raw blob content (quotes stripped, matching json_each_text PG semantics).
// For null JSON values: SQL NULL (insert_default).
// For all others (numbers, bools, objects, arrays): JSON text representation.
static void insert_value_as_text(const JsonbValue* value, MutableColumnPtr& col) {
    if (value == nullptr || value->isNull()) {
        col->insert_default();
        return;
    }
    if (value->isString()) {
        const auto* str_val = value->unpack<JsonbStringVal>();
        col->insert_data(str_val->getBlob(), str_val->getBlobLen());
    } else {
        JsonbToJson converter;
        std::string text = converter.to_json_string(value);
        col->insert_data(text.data(), text.size());
    }
}

// Helper: insert one JsonbValue in JSONB binary form into a ColumnNullable<ColumnString>.
// For null JSON values: SQL NULL (insert_default).
// For all others: write JSONB binary via JsonbWriter.
static void insert_value_as_json(const JsonbValue* value, MutableColumnPtr& col,
                                 JsonbWriter& writer) {
    if (value == nullptr || value->isNull()) {
        col->insert_default();
        return;
    }
    writer.reset();
    writer.writeValue(value);
    const auto* buf = writer.getOutput()->getBuffer();
    size_t len = writer.getOutput()->getSize();
    col->insert_data(buf, len);
}

template <bool TEXT_MODE>
void VJsonEachTableFunction<TEXT_MODE>::process_row(size_t row_idx) {
    TableFunction::process_row(row_idx);
    if (_is_const && _cur_size > 0) {
        return;
    }

    StringRef text;
    const size_t idx = _is_const ? 0 : row_idx;
    if (const auto* nullable_col = check_and_get_column<ColumnNullable>(*_json_column)) {
        if (nullable_col->is_null_at(idx)) {
            return;
        }
        text = assert_cast<const ColumnString&>(nullable_col->get_nested_column()).get_data_at(idx);
    } else {
        text = assert_cast<const ColumnString&>(*_json_column).get_data_at(idx);
    }

    const JsonbDocument* doc = nullptr;
    auto st = JsonbDocument::checkAndCreateDocument(text.data, text.size, &doc);
    if (!st.ok() || !doc || !doc->getValue()) [[unlikely]] {
        return;
    }

    const JsonbValue* jv = doc->getValue();
    if (!jv->isObject()) {
        return;
    }

    const auto* obj = jv->unpack<ObjectVal>();
    _cur_size = obj->numElem();
    if (_cur_size == 0) {
        return;
    }

    _kv_pairs.first = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create());
    _kv_pairs.second = ColumnNullable::create(ColumnString::create(), ColumnUInt8::create());
    _kv_pairs.first->reserve(_cur_size);
    _kv_pairs.second->reserve(_cur_size);

    if constexpr (TEXT_MODE) {
        for (const auto& kv : *obj) {
            _kv_pairs.first->insert_data(kv.getKeyStr(), kv.klen());
            insert_value_as_text(kv.value(), _kv_pairs.second);
        }
    } else {
        JsonbWriter writer;
        for (const auto& kv : *obj) {
            _kv_pairs.first->insert_data(kv.getKeyStr(), kv.klen());
            insert_value_as_json(kv.value(), _kv_pairs.second, writer);
        }
    }
}

template <bool TEXT_MODE>
void VJsonEachTableFunction<TEXT_MODE>::process_close() {
    _json_column = nullptr;
    _kv_pairs.first = nullptr;
    _kv_pairs.second = nullptr;
    _cur_size = 0;
}

template <bool TEXT_MODE>
void VJsonEachTableFunction<TEXT_MODE>::get_same_many_values(MutableColumnPtr& column, int length) {
    if (current_empty()) {
        column->insert_many_defaults(length);
        return;
    }

    ColumnStruct* ret;
    if (_is_nullable) {
        auto* nullable = assert_cast<ColumnNullable*>(column.get());
        ret = assert_cast<ColumnStruct*>(nullable->get_nested_column_ptr().get());
        assert_cast<ColumnUInt8*>(nullable->get_null_map_column_ptr().get())
                ->insert_many_defaults(length);
    } else {
        ret = assert_cast<ColumnStruct*>(column.get());
    }

    ret->get_column(0).insert_many_from(*_kv_pairs.first, _cur_offset, length);
    ret->get_column(1).insert_many_from(*_kv_pairs.second, _cur_offset, length);
}

template <bool TEXT_MODE>
int VJsonEachTableFunction<TEXT_MODE>::get_value(MutableColumnPtr& column, int max_step) {
    max_step = std::min(max_step, (int)(_cur_size - _cur_offset));

    if (current_empty()) {
        column->insert_default();
        max_step = 1;
    } else {
        ColumnStruct* struct_col = nullptr;
        if (_is_nullable) {
            auto* nullable_col = assert_cast<ColumnNullable*>(column.get());
            struct_col = assert_cast<ColumnStruct*>(nullable_col->get_nested_column_ptr().get());
            assert_cast<ColumnUInt8*>(nullable_col->get_null_map_column_ptr().get())
                    ->insert_many_defaults(max_step);
        } else {
            struct_col = assert_cast<ColumnStruct*>(column.get());
        }

        struct_col->get_column(0).insert_range_from(*_kv_pairs.first, _cur_offset, max_step);
        struct_col->get_column(1).insert_range_from(*_kv_pairs.second, _cur_offset, max_step);
    }

    forward(max_step);
    return max_step;
}

// // Explicit template instantiations
template class VJsonEachTableFunction<false>; // json_each
template class VJsonEachTableFunction<true>;  // json_each_text

#include "common/compile_check_end.h"
} // namespace doris
