blob: dd7c914f5efa082afecf923f8630a702e50a8727 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exprs/hive-udf-call.h"
#include <jni.h>
#include <sstream>
#include <string>
#include "codegen/llvm-codegen.h"
#include "exprs/anyval-util.h"
#include "exprs/scalar-expr-evaluator.h"
#include "rpc/jni-thrift-util.h"
#include "runtime/lib-cache.h"
#include "runtime/runtime-state.h"
#include "util/bit-util.h"
#include "gen-cpp/Frontend_types.h"
#include "common/names.h"
const char* EXECUTOR_CLASS = "org/apache/impala/hive/executor/UdfExecutor";
const char* EXECUTOR_CTOR_SIGNATURE ="([B)V";
const char* EXECUTOR_EVALUATE_SIGNATURE = "()V";
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
namespace impala {
jclass HiveUdfCall::executor_cl_ = NULL;
jmethodID HiveUdfCall::executor_ctor_id_ = NULL;
jmethodID HiveUdfCall::executor_evaluate_id_ = NULL;
jmethodID HiveUdfCall::executor_close_id_ = NULL;
struct JniContext {
jobject executor;
uint8_t* input_values_buffer;
uint8_t* input_nulls_buffer;
uint8_t* output_value_buffer;
uint8_t output_null_value;
bool warning_logged;
/// AnyVal to evaluate the expression into. Only used as temporary storage during
/// expression evaluation.
AnyVal* output_anyval;
JniContext()
: executor(NULL),
input_values_buffer(NULL),
input_nulls_buffer(NULL),
output_value_buffer(NULL),
warning_logged(false),
output_anyval(NULL) {}
};
HiveUdfCall::HiveUdfCall(const TExprNode& node)
: ScalarExpr(node), input_buffer_size_(0) {
DCHECK_EQ(node.node_type, TExprNodeType::FUNCTION_CALL);
DCHECK_EQ(node.fn.binary_type, TFunctionBinaryType::JAVA);
DCHECK(executor_cl_ != NULL) << "Init() was not called!";
}
AnyVal* HiveUdfCall::Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) const {
FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL));
DCHECK(jni_ctx != NULL);
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == NULL) {
stringstream ss;
ss << "Hive UDF path=" << fn_.hdfs_location << " class=" << fn_.scalar_fn.symbol
<< " failed due to JNI issue getting the JNIEnv object";
fn_ctx->SetError(ss.str().c_str());
jni_ctx->output_anyval->is_null = true;
return jni_ctx->output_anyval;
}
// Evaluate all the children values and put the results in input_values_buffer
for (int i = 0; i < GetNumChildren(); ++i) {
void* v = eval->GetValue(*GetChild(i), row);
if (v == NULL) {
jni_ctx->input_nulls_buffer[i] = 1;
} else {
uint8_t* input_ptr = jni_ctx->input_values_buffer + input_byte_offsets_[i];
jni_ctx->input_nulls_buffer[i] = 0;
switch (GetChild(i)->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
// Using explicit sizes helps the compiler unroll memcpy
memcpy(input_ptr, v, 1);
break;
case TYPE_SMALLINT:
memcpy(input_ptr, v, 2);
break;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATE:
memcpy(input_ptr, v, 4);
break;
case TYPE_BIGINT:
case TYPE_DOUBLE:
memcpy(input_ptr, v, 8);
break;
case TYPE_TIMESTAMP:
memcpy(input_ptr, v, sizeof(TimestampValue));
break;
case TYPE_STRING:
case TYPE_VARCHAR:
memcpy(input_ptr, v, sizeof(StringValue));
break;
default:
DCHECK(false) << "NYI";
}
}
}
// Using this version of Call has the lowest overhead. This eliminates the
// vtable lookup and setting up return stacks.
env->CallNonvirtualVoidMethodA(
jni_ctx->executor, executor_cl_, executor_evaluate_id_, NULL);
Status status = JniUtil::GetJniExceptionMsg(env);
if (!status.ok()) {
if (!jni_ctx->warning_logged) {
stringstream ss;
ss << "Hive UDF path=" << fn_.hdfs_location << " class=" << fn_.scalar_fn.symbol
<< " failed due to: " << status.GetDetail();
fn_ctx->AddWarning(ss.str().c_str());
jni_ctx->warning_logged = true;
}
jni_ctx->output_anyval->is_null = true;
return jni_ctx->output_anyval;
}
// Write output_value_buffer to output_anyval
if (jni_ctx->output_null_value) {
jni_ctx->output_anyval->is_null = true;
} else {
AnyValUtil::SetAnyVal(jni_ctx->output_value_buffer, type(), jni_ctx->output_anyval);
}
return jni_ctx->output_anyval;
}
Status HiveUdfCall::InitEnv() {
DCHECK(executor_cl_ == NULL) << "Init() already called!";
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == NULL) return Status("Failed to get/create JVM");
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_));
executor_ctor_id_ = env->GetMethodID(
executor_cl_, "<init>", EXECUTOR_CTOR_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
executor_evaluate_id_ = env->GetMethodID(
executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
executor_close_id_ = env->GetMethodID(
executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE);
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}
Status HiveUdfCall::Init(
const RowDescriptor& row_desc, bool is_entry_point, RuntimeState* state) {
// Initialize children first.
RETURN_IF_ERROR(ScalarExpr::Init(row_desc, is_entry_point, state));
// Initialize input_byte_offsets_ and input_buffer_size_
for (int i = 0; i < GetNumChildren(); ++i) {
input_byte_offsets_.push_back(input_buffer_size_);
input_buffer_size_ += GetChild(i)->type().GetSlotSize();
// Align all values up to 8 bytes. We don't care about footprint since we allocate
// one buffer for all rows and we never copy the entire buffer.
input_buffer_size_ = BitUtil::RoundUpNumBytes(input_buffer_size_) * 8;
}
return Status::OK();
}
Status HiveUdfCall::OpenEvaluator(FunctionContext::FunctionStateScope scope,
RuntimeState* state, ScalarExprEvaluator* eval) const {
RETURN_IF_ERROR(ScalarExpr::OpenEvaluator(scope, state, eval));
// Create a JniContext in this thread's FunctionContext.
DCHECK_GE(fn_ctx_idx_, 0);
FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
JniContext* jni_ctx = new JniContext;
fn_ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, jni_ctx);
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == NULL) return Status("Failed to get/create JVM");
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
JniLocalFrame jni_frame;
{
// Scoped handle for libCache entry.
LibCacheEntryHandle handle;
string local_location;
RETURN_IF_ERROR(LibCache::instance()->GetLocalPath(fn_.hdfs_location,
LibCache::TYPE_JAR, fn_.last_modified_time, &handle, &local_location));
THiveUdfExecutorCtorParams ctor_params;
ctor_params.fn = fn_;
ctor_params.local_location = local_location;
ctor_params.input_byte_offsets = input_byte_offsets_;
jni_ctx->input_values_buffer = new uint8_t[input_buffer_size_];
jni_ctx->input_nulls_buffer = new uint8_t[GetNumChildren()];
jni_ctx->output_value_buffer = new uint8_t[type().GetSlotSize()];
ctor_params.input_buffer_ptr = (int64_t)jni_ctx->input_values_buffer;
ctor_params.input_nulls_ptr = (int64_t)jni_ctx->input_nulls_buffer;
ctor_params.output_buffer_ptr = (int64_t)jni_ctx->output_value_buffer;
ctor_params.output_null_ptr = (int64_t)&jni_ctx->output_null_value;
jbyteArray ctor_params_bytes;
// Pushed frame will be popped when jni_frame goes out-of-scope.
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
// Create the java executor object. The jar referenced by the libCache handle
// is not needed after the next call, so it is safe for the handle to go
// out-of-scope after the java object is instantiated.
jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
}
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_ctx->executor, &jni_ctx->executor));
RETURN_IF_ERROR(AllocateAnyVal(state, eval->expr_perm_pool(), type_,
"Could not allocate JNI output value", &jni_ctx->output_anyval));
return Status::OK();
}
void HiveUdfCall::CloseEvaluator(FunctionContext::FunctionStateScope scope,
RuntimeState* state, ScalarExprEvaluator* eval) const {
if (eval->opened()) {
FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL));
if (jni_ctx != NULL) {
JNIEnv* env = JniUtil::GetJNIEnv();
if (jni_ctx->executor != NULL) {
env->CallNonvirtualVoidMethodA(
jni_ctx->executor, executor_cl_, executor_close_id_, NULL);
Status s = JniUtil::GetJniExceptionMsg(env);
if (!s.ok()) state->LogError(s.msg());
env->DeleteGlobalRef(jni_ctx->executor);
}
if (jni_ctx->input_values_buffer != NULL) {
delete[] jni_ctx->input_values_buffer;
jni_ctx->input_values_buffer = NULL;
}
if (jni_ctx->input_nulls_buffer != NULL) {
delete[] jni_ctx->input_nulls_buffer;
jni_ctx->input_nulls_buffer = NULL;
}
if (jni_ctx->output_value_buffer != NULL) {
delete[] jni_ctx->output_value_buffer;
jni_ctx->output_value_buffer = NULL;
}
jni_ctx->output_anyval = NULL;
delete jni_ctx;
fn_ctx->SetFunctionState(FunctionContext::THREAD_LOCAL, nullptr);
}
}
ScalarExpr::CloseEvaluator(scope, state, eval);
}
Status HiveUdfCall::GetCodegendComputeFnImpl(LlvmCodeGen* codegen, llvm::Function** fn) {
return GetCodegendComputeFnWrapper(codegen, fn);
}
string HiveUdfCall::DebugString() const {
stringstream out;
out << "HiveUdfCall(hdfs_location=" << fn_.hdfs_location
<< " classname=" << fn_.scalar_fn.symbol << " "
<< ScalarExpr::DebugString() << ")";
return out.str();
}
BooleanVal HiveUdfCall::GetBooleanValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_BOOLEAN);
return *reinterpret_cast<BooleanVal*>(Evaluate(eval, row));
}
TinyIntVal HiveUdfCall::GetTinyIntValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_TINYINT);
return *reinterpret_cast<TinyIntVal*>(Evaluate(eval, row));
}
SmallIntVal HiveUdfCall::GetSmallIntValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_SMALLINT);
return * reinterpret_cast<SmallIntVal*>(Evaluate(eval, row));
}
IntVal HiveUdfCall::GetIntValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_INT);
return *reinterpret_cast<IntVal*>(Evaluate(eval, row));
}
BigIntVal HiveUdfCall::GetBigIntValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_BIGINT);
return *reinterpret_cast<BigIntVal*>(Evaluate(eval, row));
}
FloatVal HiveUdfCall::GetFloatValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_FLOAT);
return *reinterpret_cast<FloatVal*>(Evaluate(eval, row));
}
DoubleVal HiveUdfCall::GetDoubleValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_DOUBLE);
return *reinterpret_cast<DoubleVal*>(Evaluate(eval, row));
}
StringVal HiveUdfCall::GetStringValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_STRING);
StringVal result = *reinterpret_cast<StringVal*>(Evaluate(eval, row));
if (result.is_null) return StringVal::null();
// Copy the string into a result allocation with the usual lifetime for expr results.
// Needed because the UDF output buffer is owned by the Java UDF executor and may be
// freed or reused by the next call into the Java UDF executor.
FunctionContext* fn_ctx = eval->fn_context(fn_ctx_idx_);
return StringVal::CopyFrom(fn_ctx, result.ptr, result.len);
}
TimestampVal HiveUdfCall::GetTimestampValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
return *reinterpret_cast<TimestampVal*>(Evaluate(eval, row));
}
DecimalVal HiveUdfCall::GetDecimalValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_DECIMAL);
return *reinterpret_cast<DecimalVal*>(Evaluate(eval, row));
}
DateVal HiveUdfCall::GetDateValInterpreted(
ScalarExprEvaluator* eval, const TupleRow* row) const {
DCHECK_EQ(type_.type, TYPE_DATE);
return *reinterpret_cast<DateVal*>(Evaluate(eval, row));
}
}