blob: 7b4163e5c48ac5444584b34ba03f25b20c6f213f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/functions/function_java_udf.h"
#include <bthread/bthread.h>
#include <memory>
#include <string>
#include "common/exception.h"
#include "jni.h"
#include "runtime/exec_env.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
#include "vec/core/block.h"
#include "vec/exec/jni_connector.h"
const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
const char* EXECUTOR_CTOR_SIGNATURE = "([B)V";
const char* EXECUTOR_EVALUATE_SIGNATURE = "(Ljava/util/Map;Ljava/util/Map;)J";
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
namespace doris::vectorized {
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& argument_types,
const DataTypePtr& return_type)
: fn_(fn), _argument_types(argument_types), _return_type(return_type) {}
Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
SCOPED_TIMER(context->get_udf_execute_timer());
std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
JniLocalFrame jni_frame;
{
std::string local_location;
auto function_cache = UserFunctionCache::instance();
TJavaUdfExecutorCtorParams ctor_params;
ctor_params.__set_fn(fn_);
// get jar path if both file path location and checksum are null
if (!fn_.hdfs_location.empty() && !fn_.checksum.empty()) {
RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
&local_location));
ctor_params.__set_location(local_location);
}
jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_ctx->executor_cl));
jni_ctx->executor_ctor_id =
env->GetMethodID(jni_ctx->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
jni_ctx->executor_evaluate_id =
env->GetMethodID(jni_ctx->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
jni_ctx->executor_close_id =
env->GetMethodID(jni_ctx->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
jni_ctx->executor = env->NewObject(jni_ctx->executor_cl, jni_ctx->executor_ctor_id,
ctor_params_bytes);
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
env->DeleteLocalRef(ctor_params_bytes);
}
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
jni_ctx->open_successes = true;
}
return Status::OK();
}
Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t num_rows) const {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
auto* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
SCOPED_TIMER(context->get_udf_execute_timer());
std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
auto input_table_schema = JniConnector::parse_table_schema(&block, arguments, true);
std::map<String, String> input_params = {
{"meta_address", std::to_string((long)input_table.get())},
{"required_fields", input_table_schema.first},
{"columns_types", input_table_schema.second}};
jobject input_map = nullptr;
RETURN_IF_ERROR(JniUtil::convert_to_java_map(env, input_params, &input_map));
auto output_table_schema = JniConnector::parse_table_schema(&block, {result}, true);
std::string output_nullable =
block.get_by_position(result).type->is_nullable() ? "true" : "false";
std::map<String, String> output_params = {{"is_nullable", output_nullable},
{"required_fields", output_table_schema.first},
{"columns_types", output_table_schema.second}};
jobject output_map = nullptr;
RETURN_IF_ERROR(JniUtil::convert_to_java_map(env, output_params, &output_map));
long output_address = env->CallLongMethod(jni_ctx->executor, jni_ctx->executor_evaluate_id,
input_map, output_map);
env->DeleteGlobalRef(input_map);
env->DeleteGlobalRef(output_map);
RETURN_ERROR_IF_EXC(env);
return JniConnector::fill_block(&block, {result}, output_address);
}
Status JavaFunctionCall::close(FunctionContext* context,
FunctionContext::FunctionStateScope scope) {
auto close_func = [context]() {
auto* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
// JNIContext own some resource and its release method depend on JavaFunctionCall
// has to release the resource before JavaFunctionCall is deconstructed.
if (jni_ctx) {
RETURN_IF_ERROR(jni_ctx->close());
}
return Status::OK();
};
if (bthread_self() == 0) {
return close_func();
} else {
DorisMetrics::instance()->udf_close_bthread_count->increment(1);
// Use the close_workers pthread pool to execute the close function
auto task = std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
auto task_future = task->get_future();
RETURN_IF_ERROR(ExecEnv::GetInstance()->udf_close_workers_pool()->submit_func(
[task]() { (*task)(); }));
RETURN_IF_ERROR(task_future.get());
return Status::OK();
}
}
} // namespace doris::vectorized