| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exprs/scalar-fn-call.h" |
| |
| #include <vector> |
| #include <gutil/strings/substitute.h> |
| #include <llvm/IR/Attributes.h> |
| #include <llvm/ExecutionEngine/ExecutionEngine.h> |
| |
| #include <boost/preprocessor/punctuation/comma_if.hpp> |
| #include <boost/preprocessor/repetition/repeat.hpp> |
| #include <boost/preprocessor/repetition/enum_params.hpp> |
| #include <boost/preprocessor/repetition/repeat_from_to.hpp> |
| |
| #include "codegen/codegen-anyval.h" |
| #include "codegen/llvm-codegen.h" |
| #include "exprs/anyval-util.h" |
| #include "exprs/expr-context.h" |
| #include "runtime/hdfs-fs-cache.h" |
| #include "runtime/lib-cache.h" |
| #include "runtime/runtime-state.h" |
| #include "runtime/types.h" |
| #include "udf/udf-internal.h" |
| #include "util/debug-util.h" |
| #include "util/dynamic-util.h" |
| #include "util/symbols-util.h" |
| |
| #include "common/names.h" |
| |
| using namespace impala; |
| using namespace impala_udf; |
| using namespace strings; |
| using llvm::ArrayType; |
| using llvm::BasicBlock; |
| using llvm::Function; |
| using llvm::GlobalVariable; |
| using llvm::PointerType; |
| using llvm::Type; |
| using llvm::Value; |
| using std::move; |
| using std::pair; |
| |
| // Maximum number of arguments the interpretation path supports. |
| #define MAX_INTERP_ARGS 20 |
| |
| ScalarFnCall::ScalarFnCall(const TExprNode& node) |
| : Expr(node), |
| vararg_start_idx_(node.__isset.vararg_start_idx ? node.vararg_start_idx : -1), |
| scalar_fn_wrapper_(NULL), |
| prepare_fn_(NULL), |
| close_fn_(NULL), |
| scalar_fn_(NULL) { |
| DCHECK_NE(fn_.binary_type, TFunctionBinaryType::JAVA); |
| } |
| |
| Status ScalarFnCall::LoadPrepareAndCloseFn(LlvmCodeGen* codegen) { |
| if (fn_.scalar_fn.__isset.prepare_fn_symbol) { |
| RETURN_IF_ERROR(GetFunction(codegen, fn_.scalar_fn.prepare_fn_symbol, |
| reinterpret_cast<void**>(&prepare_fn_))); |
| } |
| if (fn_.scalar_fn.__isset.close_fn_symbol) { |
| RETURN_IF_ERROR(GetFunction(codegen, fn_.scalar_fn.close_fn_symbol, |
| reinterpret_cast<void**>(&close_fn_))); |
| } |
| return Status::OK(); |
| } |
| |
| Status ScalarFnCall::Prepare(RuntimeState* state, const RowDescriptor& desc, |
| ExprContext* context) { |
| RETURN_IF_ERROR(Expr::Prepare(state, desc, context)); |
| |
| if (fn_.scalar_fn.symbol.empty()) { |
| // This path is intended to only be used during development to test FE |
| // code before the BE has implemented the function. |
| // Having the failure in the BE (rather than during analysis) allows for |
| // better FE testing. |
| DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::BUILTIN); |
| stringstream ss; |
| ss << "Function " << fn_.name.function_name << " is not implemented."; |
| return Status(ss.str()); |
| } |
| |
| // Check if the function takes CHAR as input or returns CHAR. |
| FunctionContext::TypeDesc return_type = AnyValUtil::ColumnTypeToTypeDesc(type_); |
| vector<FunctionContext::TypeDesc> arg_types; |
| bool has_char_arg_or_result = type_.type == TYPE_CHAR; |
| for (int i = 0; i < children_.size(); ++i) { |
| arg_types.push_back(AnyValUtil::ColumnTypeToTypeDesc(children_[i]->type_)); |
| has_char_arg_or_result |= children_[i]->type_.type == TYPE_CHAR; |
| } |
| |
| fn_context_index_ = |
| context->Register(state, return_type, arg_types, ComputeVarArgsBufferSize()); |
| |
| // Use the interpreted path and call the builtin without codegen if any of the |
| // followings is true: |
| // 1. codegen is disabled by query option |
| // 2. there are char arguments (as they aren't supported yet) |
| // 3. there is an optimization hint to disable codegen and UDF can be interpreted. |
| // IR UDF or UDF with more than MAX_INTERP_ARGS number of fixed arguments |
| // cannot be interpreted. |
| // |
| // TODO: codegen for char arguments |
| bool is_ir_udf = fn_.binary_type == TFunctionBinaryType::IR; |
| bool too_many_args_to_interp = NumFixedArgs() > MAX_INTERP_ARGS; |
| bool udf_interpretable = !is_ir_udf && !too_many_args_to_interp; |
| if (state->CodegenDisabledByQueryOption() || has_char_arg_or_result || |
| (state->CodegenHasDisableHint() && udf_interpretable)) { |
| if (is_ir_udf) { |
| // CHAR or VARCHAR are not supported as input arguments or return values for UDFs. |
| DCHECK(!has_char_arg_or_result && state->CodegenDisabledByQueryOption()); |
| return Status(Substitute("Cannot interpret LLVM IR UDF '$0': Codegen is needed. " |
| "Please set DISABLE_CODEGEN to false.", fn_.name.function_name)); |
| } |
| |
| // The templates for builtin or native UDFs used in the interpretation path |
| // support up to MAX_INTERP_ARGS number of arguments only. |
| if (too_many_args_to_interp) { |
| DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::NATIVE); |
| // CHAR or VARCHAR are not supported as input arguments or return values for UDFs. |
| DCHECK(!has_char_arg_or_result && state->CodegenDisabledByQueryOption()); |
| return Status(Substitute("Cannot interpret native UDF '$0': number of arguments is " |
| "more than $1. Codegen is needed. Please set DISABLE_CODEGEN to false.", |
| fn_.name.function_name, MAX_INTERP_ARGS)); |
| } |
| |
| Status status = LibCache::instance()->GetSoFunctionPtr( |
| fn_.hdfs_location, fn_.scalar_fn.symbol, &scalar_fn_, &cache_entry_); |
| if (!status.ok()) { |
| if (fn_.binary_type == TFunctionBinaryType::BUILTIN) { |
| // Builtins symbols should exist unless there is a version mismatch. |
| return Status(TErrorCode::MISSING_BUILTIN, fn_.name.function_name, |
| fn_.scalar_fn.symbol); |
| } else { |
| DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::NATIVE); |
| return Status(Substitute("Problem loading UDF '$0':\n$1", |
| fn_.name.function_name, status.GetDetail())); |
| } |
| } |
| } else { |
| // Add the expression to the list of expressions to codegen in the codegen phase. |
| state->AddScalarFnToCodegen(this); |
| } |
| |
| // For IR UDF, the loading of the Prepare() and Close() functions is deferred until |
| // first time GetCodegendComputeFn() is invoked. |
| if (!is_ir_udf) RETURN_IF_ERROR(LoadPrepareAndCloseFn(NULL)); |
| return Status::OK(); |
| } |
| |
| Status ScalarFnCall::Open(RuntimeState* state, ExprContext* ctx, |
| FunctionContext::FunctionStateScope scope) { |
| // Opens and inits children |
| RETURN_IF_ERROR(Expr::Open(state, ctx, scope)); |
| FunctionContext* fn_ctx = ctx->fn_context(fn_context_index_); |
| |
| if (scalar_fn_wrapper_ == NULL) { |
| // We're in the interpreted path (i.e. no JIT). Populate our FunctionContext's |
| // staging_input_vals, which will be reused across calls to scalar_fn_. |
| DCHECK(scalar_fn_ != NULL); |
| vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals(); |
| for (int i = 0; i < NumFixedArgs(); ++i) { |
| AnyVal* input_val; |
| RETURN_IF_ERROR(AllocateAnyVal(state, ctx->pool_.get(), children_[i]->type(), |
| "Could not allocate expression value", &input_val)); |
| input_vals->push_back(input_val); |
| } |
| } |
| |
| // Only evaluate constant arguments at the top level of function contexts. |
| // If 'ctx' was cloned, the constant values were copied from the parent. |
| if (scope == FunctionContext::FRAGMENT_LOCAL) { |
| vector<AnyVal*> constant_args; |
| for (int i = 0; i < children_.size(); ++i) { |
| AnyVal* const_val; |
| RETURN_IF_ERROR(children_[i]->GetConstVal(state, ctx, &const_val)); |
| constant_args.push_back(const_val); |
| } |
| fn_ctx->impl()->SetConstantArgs(move(constant_args)); |
| } |
| |
| if (scalar_fn_wrapper_ == NULL) { |
| // Now we have the constant values, cache them so that the interpreted path can |
| // call the UDF without reevaluating the arguments. 'staging_input_vals' and |
| // 'varargs_buffer' in the FunctionContext are used to pass fixed and variable-length |
| // arguments respectively. 'non_constant_args()' in the FunctionContext will contain |
| // pointers to the remaining (non-constant) children that are evaluated for every row. |
| vector<pair<Expr*, AnyVal*>> non_constant_args; |
| uint8_t* varargs_buffer = fn_ctx->impl()->varargs_buffer(); |
| for (int i = 0; i < children_.size(); ++i) { |
| AnyVal* input_arg; |
| int arg_bytes = AnyValUtil::AnyValSize(children_[i]->type()); |
| if (i < NumFixedArgs()) { |
| input_arg = (*fn_ctx->impl()->staging_input_vals())[i]; |
| } else { |
| input_arg = reinterpret_cast<AnyVal*>(varargs_buffer); |
| varargs_buffer += arg_bytes; |
| } |
| // IMPALA-4586: Cache constant arguments only if the frontend has rewritten them |
| // into literal expressions. This gives the frontend control over how expressions |
| // are evaluated. This means that setting enable_expr_rewrites=false will also |
| // disable caching of non-literal constant expressions, which gives the old |
| // behaviour (before this caching optimisation was added) of repeatedly evaluating |
| // exprs that are constant according to IsConstant(). For exprs that are not truly |
| // constant (yet IsConstant() returns true for) e.g. non-deterministic UDFs, this |
| // means that setting enable_expr_rewrites=false works as a safety valve to get |
| // back the old behaviour, before constant expr folding or caching was added. |
| // TODO: once we can annotate UDFs as non-deterministic (IMPALA-4606), we should |
| // be able to trust IsConstant() and switch back to that. |
| if (children_[i]->IsLiteral()) { |
| const AnyVal* constant_arg = fn_ctx->impl()->constant_args()[i]; |
| DCHECK(constant_arg != NULL); |
| memcpy(input_arg, constant_arg, arg_bytes); |
| } else { |
| non_constant_args.emplace_back(children_[i], input_arg); |
| } |
| } |
| fn_ctx->impl()->SetNonConstantArgs(move(non_constant_args)); |
| } |
| |
| if (prepare_fn_ != NULL) { |
| if (scope == FunctionContext::FRAGMENT_LOCAL) { |
| prepare_fn_(fn_ctx, FunctionContext::FRAGMENT_LOCAL); |
| if (fn_ctx->has_error()) return Status(fn_ctx->error_msg()); |
| } |
| prepare_fn_(fn_ctx, FunctionContext::THREAD_LOCAL); |
| if (fn_ctx->has_error()) return Status(fn_ctx->error_msg()); |
| } |
| |
| // If we're calling MathFunctions::RoundUpTo(), we need to set output_scale_, which |
| // determines how many decimal places are printed. |
| // TODO: revisit this. We should be able to do this if the scale argument is |
| // non-constant. |
| if (fn_.name.function_name == "round" && type_.type == TYPE_DOUBLE) { |
| DCHECK_EQ(children_.size(), 2); |
| if (children_[1]->IsConstant()) { |
| IntVal scale_arg = children_[1]->GetIntVal(ctx, NULL); |
| output_scale_ = scale_arg.val; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| void ScalarFnCall::Close(RuntimeState* state, ExprContext* context, |
| FunctionContext::FunctionStateScope scope) { |
| if (fn_context_index_ != -1 && close_fn_ != NULL) { |
| FunctionContext* fn_ctx = context->fn_context(fn_context_index_); |
| close_fn_(fn_ctx, FunctionContext::THREAD_LOCAL); |
| if (scope == FunctionContext::FRAGMENT_LOCAL) { |
| close_fn_(fn_ctx, FunctionContext::FRAGMENT_LOCAL); |
| } |
| } |
| Expr::Close(state, context, scope); |
| } |
| |
| bool ScalarFnCall::IsConstant() const { |
| if (fn_.name.function_name == "rand" || fn_.name.function_name == "random" |
| || fn_.name.function_name == "uuid" || fn_.name.function_name == "sleep") { |
| return false; |
| } |
| return Expr::IsConstant(); |
| } |
| |
| // Dynamically loads the pre-compiled UDF and codegens a function that calls each child's |
| // codegen'd function, then passes those values to the UDF and returns the result. |
| // Example generated IR for a UDF with signature |
| // create function Udf(double, int...) returns double |
| // select Udf(1.0, 2, 3, 4, 5) |
| // define { i8, double } @UdfWrapper(i8* %context, %"class.impala::TupleRow"* %row) { |
| // entry: |
| // %arg_val = call { i8, double } |
| // @ExprWrapper(i8* %context, %"class.impala::TupleRow"* %row) |
| // %arg_ptr = alloca { i8, double } |
| // store { i8, double } %arg_val, { i8, double }* %arg_ptr |
| // %arg_val1 = call i64 @ExprWrapper1(i8* %context, %"class.impala::TupleRow"* %row) |
| // store i64 %arg_val1, i64* inttoptr (i64 89111072 to i64*) |
| // %arg_val2 = call i64 @ExprWrapper2(i8* %context, %"class.impala::TupleRow"* %row) |
| // store i64 %arg_val2, i64* inttoptr (i64 89111080 to i64*) |
| // %arg_val3 = call i64 @ExprWrapper3(i8* %context, %"class.impala::TupleRow"* %row) |
| // store i64 %arg_val3, i64* inttoptr (i64 89111088 to i64*) |
| // %arg_val4 = call i64 @ExprWrapper4(i8* %context, %"class.impala::TupleRow"* %row) |
| // store i64 %arg_val4, i64* inttoptr (i64 89111096 to i64*) |
| // %result = call { i8, double } |
| // @_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE( |
| // %"class.impala_udf::FunctionContext"* inttoptr |
| // (i64 37522464 to %"class.impala_udf::FunctionContext"*), |
| // {i8, double }* %arg_ptr, |
| // i32 4, |
| // i64* inttoptr (i64 89111072 to i64*)) |
| // ret { i8, double } %result |
| Status ScalarFnCall::GetCodegendComputeFn(LlvmCodeGen* codegen, Function** fn) { |
| if (ir_compute_fn_ != NULL) { |
| *fn = ir_compute_fn_; |
| return Status::OK(); |
| } |
| if (type_.type == TYPE_CHAR) { |
| return Status("ScalarFnCall Codegen not supported for CHAR"); |
| } |
| for (int i = 0; i < GetNumChildren(); ++i) { |
| if (children_[i]->type().type == TYPE_CHAR) { |
| *fn = NULL; |
| return Status("ScalarFnCall Codegen not supported for CHAR"); |
| } |
| } |
| |
| if (fn_.binary_type == TFunctionBinaryType::IR) { |
| string local_path; |
| RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath( |
| fn_.hdfs_location, LibCache::TYPE_IR, &local_path)); |
| // Link the UDF module into this query's main module (essentially copy the UDF |
| // module into the main module) so the UDF's functions are available in the main |
| // module. |
| RETURN_IF_ERROR(codegen->LinkModule(local_path)); |
| // Load the Prepare() and Close() functions from the LLVM module. |
| RETURN_IF_ERROR(LoadPrepareAndCloseFn(codegen)); |
| } |
| |
| Function* udf; |
| RETURN_IF_ERROR(GetUdf(codegen, &udf)); |
| |
| // Create wrapper that computes args and calls UDF |
| stringstream fn_name; |
| fn_name << udf->getName().str() << "Wrapper"; |
| |
| Value* args[2]; |
| *fn = CreateIrFunctionPrototype(codegen, fn_name.str(), &args); |
| Value* expr_ctx = args[0]; |
| Value* row = args[1]; |
| BasicBlock* block = BasicBlock::Create(codegen->context(), "entry", *fn); |
| LlvmBuilder builder(block); |
| |
| // Populate UDF arguments |
| vector<Value*> udf_args; |
| |
| // First argument is always FunctionContext*. |
| // Index into our registered offset in the ExprContext. |
| Value* expr_ctx_gep = builder.CreateStructGEP(NULL, expr_ctx, 1, "expr_ctx_gep"); |
| Value* fn_ctxs_base = builder.CreateLoad(expr_ctx_gep, "fn_ctxs_base"); |
| // Use GEP to add our index to the base pointer |
| Value* fn_ctx_ptr = |
| builder.CreateConstGEP1_32(fn_ctxs_base, fn_context_index_, "fn_ctx_ptr"); |
| Value* fn_ctx = builder.CreateLoad(fn_ctx_ptr, "fn_ctx"); |
| udf_args.push_back(fn_ctx); |
| |
| // Allocate a varargs array. The array's entry type is the appropriate AnyVal subclass. |
| // E.g. if the vararg type is STRING, and the function is called with 10 arguments, we |
| // allocate a StringVal[10] array. We allocate the buffer with Alloca so that LLVM can |
| // optimise out the buffer once the function call is inlined. |
| Value* varargs_buffer = NULL; |
| if (vararg_start_idx_ != -1) { |
| Type* unlowered_varargs_type = |
| CodegenAnyVal::GetUnloweredType(codegen, VarArgsType()); |
| varargs_buffer = codegen->CreateEntryBlockAlloca(builder, unlowered_varargs_type, |
| NumVarArgs(), FunctionContextImpl::VARARGS_BUFFER_ALIGNMENT, "varargs_buffer"); |
| } |
| |
| // Call children to populate remaining arguments |
| for (int i = 0; i < GetNumChildren(); ++i) { |
| Function* child_fn = NULL; |
| vector<Value*> child_fn_args; |
| // Set 'child_fn' to the codegen'd function, sets child_fn = NULL if codegen fails |
| children_[i]->GetCodegendComputeFn(codegen, &child_fn); |
| if (child_fn == NULL) { |
| // Set 'child_fn' to the interpreted function |
| child_fn = GetStaticGetValWrapper(children_[i]->type(), codegen); |
| // First argument to interpreted function is children_[i] |
| Type* expr_ptr_type = codegen->GetPtrType(Expr::LLVM_CLASS_NAME); |
| child_fn_args.push_back(codegen->CastPtrToLlvmPtr(expr_ptr_type, children_[i])); |
| } |
| child_fn_args.push_back(expr_ctx); |
| child_fn_args.push_back(row); |
| |
| // Call 'child_fn', adding the result to either 'udf_args' or 'varargs_buffer' |
| DCHECK(child_fn != NULL); |
| Type* arg_type = CodegenAnyVal::GetUnloweredType(codegen, children_[i]->type()); |
| Value* arg_val_ptr; |
| if (i < NumFixedArgs()) { |
| // Allocate space to store 'child_fn's result so we can pass the pointer to the UDF. |
| arg_val_ptr = codegen->CreateEntryBlockAlloca(builder, arg_type, "arg_val_ptr"); |
| udf_args.push_back(arg_val_ptr); |
| } else { |
| // Store the result of 'child_fn' in varargs_buffer[i]. |
| arg_val_ptr = |
| builder.CreateConstGEP1_32(varargs_buffer, i - NumFixedArgs(), "arg_val_ptr"); |
| } |
| DCHECK_EQ(arg_val_ptr->getType(), arg_type->getPointerTo()); |
| // The result of the call must be stored in a lowered AnyVal |
| Value* lowered_arg_val_ptr = builder.CreateBitCast(arg_val_ptr, |
| CodegenAnyVal::GetLoweredPtrType(codegen, children_[i]->type()), |
| "lowered_arg_val_ptr"); |
| CodegenAnyVal::CreateCall( |
| codegen, &builder, child_fn, child_fn_args, "arg_val", lowered_arg_val_ptr); |
| } |
| |
| if (vararg_start_idx_ != -1) { |
| // We've added the FunctionContext argument plus any non-variadic arguments |
| DCHECK_EQ(udf_args.size(), vararg_start_idx_ + 1); |
| DCHECK_GE(GetNumChildren(), 1); |
| // Add the number of varargs |
| udf_args.push_back(codegen->GetIntConstant(TYPE_INT, NumVarArgs())); |
| // Add all the accumulated vararg inputs as one input argument. |
| PointerType* vararg_type = |
| codegen->GetPtrType(CodegenAnyVal::GetUnloweredType(codegen, VarArgsType())); |
| udf_args.push_back(builder.CreateBitCast(varargs_buffer, vararg_type, "varargs")); |
| } |
| |
| // Call UDF |
| Value* result_val = |
| CodegenAnyVal::CreateCall(codegen, &builder, udf, udf_args, "result"); |
| builder.CreateRet(result_val); |
| |
| *fn = codegen->FinalizeFunction(*fn); |
| if (*fn == NULL) { |
| return Status( |
| TErrorCode::UDF_VERIFY_FAILED, fn_.scalar_fn.symbol, fn_.hdfs_location); |
| } |
| ir_compute_fn_ = *fn; |
| // TODO: don't do this for child exprs |
| codegen->AddFunctionToJit(ir_compute_fn_, &scalar_fn_wrapper_); |
| return Status::OK(); |
| } |
| |
| Status ScalarFnCall::GetUdf(LlvmCodeGen* codegen, Function** udf) { |
| // from_utc_timestamp() and to_utc_timestamp() have inline ASM that cannot be JIT'd. |
| // TimestampFunctions::AddSub() contains a try/catch which doesn't work in JIT'd |
| // code. Always use the interpreted version of these functions. |
| // TODO: fix these built-in functions so we don't need 'broken_builtin' below. |
| bool broken_builtin = fn_.name.function_name == "from_utc_timestamp" || |
| fn_.name.function_name == "to_utc_timestamp" || |
| fn_.scalar_fn.symbol.find("AddSub") != string::npos; |
| if (fn_.binary_type == TFunctionBinaryType::NATIVE || |
| (fn_.binary_type == TFunctionBinaryType::BUILTIN && broken_builtin)) { |
| // In this path, we are code that has been statically compiled to assembly. |
| // This can either be a UDF implemented in a .so or a builtin using the UDF |
| // interface with the code in impalad. |
| void* fn_ptr; |
| Status status = LibCache::instance()->GetSoFunctionPtr( |
| fn_.hdfs_location, fn_.scalar_fn.symbol, &fn_ptr, &cache_entry_); |
| if (!status.ok() && fn_.binary_type == TFunctionBinaryType::BUILTIN) { |
| // Builtins symbols should exist unless there is a version mismatch. |
| status.AddDetail(ErrorMsg(TErrorCode::MISSING_BUILTIN, |
| fn_.name.function_name, fn_.scalar_fn.symbol).msg()); |
| } |
| RETURN_IF_ERROR(status); |
| DCHECK(fn_ptr != NULL); |
| |
| // Per the x64 ABI, DecimalVals are returned via a DecmialVal* output argument. |
| // So, the return type is void. |
| bool is_decimal = type().type == TYPE_DECIMAL; |
| Type* return_type = is_decimal ? codegen->void_type() : |
| CodegenAnyVal::GetLoweredType(codegen, type()); |
| |
| // Convert UDF function pointer to Function*. Start by creating a function |
| // prototype for it. |
| LlvmCodeGen::FnPrototype prototype(codegen, fn_.scalar_fn.symbol, return_type); |
| |
| if (is_decimal) { |
| // Per the x64 ABI, DecimalVals are returned via a DecmialVal* output argument |
| Type* output_type = |
| codegen->GetPtrType(CodegenAnyVal::GetUnloweredType(codegen, type())); |
| prototype.AddArgument("output", output_type); |
| } |
| |
| // The "FunctionContext*" argument. |
| prototype.AddArgument("ctx", |
| codegen->GetPtrType("class.impala_udf::FunctionContext")); |
| |
| // The "fixed" arguments for the UDF function. |
| for (int i = 0; i < NumFixedArgs(); ++i) { |
| stringstream arg_name; |
| arg_name << "fixed_arg_" << i; |
| Type* arg_type = codegen->GetPtrType( |
| CodegenAnyVal::GetUnloweredType(codegen, children_[i]->type())); |
| prototype.AddArgument(arg_name.str(), arg_type); |
| } |
| // The varargs for the UDF function if there is any. |
| if (NumVarArgs() > 0) { |
| Type* vararg_type = CodegenAnyVal::GetUnloweredPtrType( |
| codegen, children_[vararg_start_idx_]->type()); |
| prototype.AddArgument("num_var_arg", codegen->GetType(TYPE_INT)); |
| prototype.AddArgument("var_arg", vararg_type); |
| } |
| |
| // Create a Function* with the generated type. This is only a function |
| // declaration, not a definition, since we do not create any basic blocks or |
| // instructions in it. |
| *udf = prototype.GeneratePrototype(NULL, NULL, false); |
| |
| // Associate the dynamically loaded function pointer with the Function* we defined. |
| // This tells LLVM where the compiled function definition is located in memory. |
| codegen->execution_engine()->addGlobalMapping(*udf, fn_ptr); |
| } else if (fn_.binary_type == TFunctionBinaryType::BUILTIN) { |
| // In this path, we're running a builtin with the UDF interface. The IR is |
| // in the llvm module. |
| *udf = codegen->GetFunction(fn_.scalar_fn.symbol, false); |
| if (*udf == NULL) { |
| // Builtins symbols should exist unless there is a version mismatch. |
| stringstream ss; |
| ss << "Builtin '" << fn_.name.function_name << "' with symbol '" |
| << fn_.scalar_fn.symbol << "' does not exist. " |
| << "Verify that all your impalads are the same version."; |
| return Status(ss.str()); |
| } |
| // Builtin functions may use Expr::GetConstant(). Clone the function in case we need |
| // to use it again, and rename it to something more manageable than the mangled name. |
| string demangled_name = SymbolsUtil::DemangleNoArgs((*udf)->getName().str()); |
| *udf = codegen->CloneFunction(*udf); |
| (*udf)->setName(demangled_name); |
| InlineConstants(codegen, *udf); |
| *udf = codegen->FinalizeFunction(*udf); |
| DCHECK(*udf != NULL); |
| } else { |
| // We're running an IR UDF. |
| DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::IR); |
| *udf = codegen->GetFunction(fn_.scalar_fn.symbol, false); |
| if (*udf == NULL) { |
| stringstream ss; |
| ss << "Unable to locate function " << fn_.scalar_fn.symbol << " from LLVM module " |
| << fn_.hdfs_location; |
| return Status(ss.str()); |
| } |
| *udf = codegen->FinalizeFunction(*udf); |
| if (*udf == NULL) { |
| return Status( |
| TErrorCode::UDF_VERIFY_FAILED, fn_.scalar_fn.symbol, fn_.hdfs_location); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status ScalarFnCall::GetFunction(LlvmCodeGen* codegen, const string& symbol, void** fn) { |
| if (fn_.binary_type == TFunctionBinaryType::NATIVE || |
| fn_.binary_type == TFunctionBinaryType::BUILTIN) { |
| return LibCache::instance()->GetSoFunctionPtr(fn_.hdfs_location, symbol, fn, |
| &cache_entry_); |
| } else { |
| DCHECK_EQ(fn_.binary_type, TFunctionBinaryType::IR); |
| DCHECK(codegen != NULL); |
| Function* ir_fn = codegen->GetFunction(symbol, false); |
| if (ir_fn == NULL) { |
| stringstream ss; |
| ss << "Unable to locate function " << symbol << " from LLVM module " |
| << fn_.hdfs_location; |
| return Status(ss.str()); |
| } |
| codegen->AddFunctionToJit(ir_fn, fn); |
| return Status::OK(); |
| } |
| } |
| |
| void ScalarFnCall::EvaluateNonConstantChildren( |
| ExprContext* context, const TupleRow* row) { |
| FunctionContext* fn_ctx = context->fn_context(fn_context_index_); |
| for (pair<Expr*, AnyVal*> child : fn_ctx->impl()->non_constant_args()) { |
| void* val = context->GetValue(child.first, row); |
| AnyValUtil::SetAnyVal(val, child.first->type(), child.second); |
| } |
| } |
| |
| template<typename RETURN_TYPE> |
| RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, const TupleRow* row) { |
| DCHECK(scalar_fn_ != NULL) << DebugString(); |
| FunctionContext* fn_ctx = context->fn_context(fn_context_index_); |
| vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals(); |
| EvaluateNonConstantChildren(context, row); |
| |
| if (vararg_start_idx_ == -1) { |
| switch (children_.size()) { |
| #define ARG_DECL_ONE(z, n, data) BOOST_PP_COMMA_IF(n) const AnyVal& |
| #define ARG_DECL_LIST(n) \ |
| FunctionContext* BOOST_PP_COMMA_IF(n) BOOST_PP_REPEAT(n, ARG_DECL_ONE, unused) |
| #define ARG_ONE(z, n, data) BOOST_PP_COMMA_IF(n) *(*input_vals)[n] |
| #define ARG_LIST(n) fn_ctx BOOST_PP_COMMA_IF(n) BOOST_PP_REPEAT(n, ARG_ONE, unused) |
| |
| // Expands to code snippet like the following for X from 0 to 20: |
| // case X: |
| // typedef RETURN_TYPE (*ScalarFnX)(FunctionContext*, const AnyVal& a1, ..., |
| // const AnyVal& aX); |
| // return reinterpret_cast<ScalarFnn>(scalar_fn_)(fn_ctx, *(*input_vals)[0], ..., |
| // *(*input_vals)[X-1]); |
| #define SCALAR_FN_TYPE(n) BOOST_PP_CAT(ScalarFn, n) |
| #define INTERP_SCALAR_FN(z, n, unused) \ |
| case n: \ |
| typedef RETURN_TYPE (*SCALAR_FN_TYPE(n))(ARG_DECL_LIST(n)); \ |
| return reinterpret_cast<SCALAR_FN_TYPE(n)>(scalar_fn_)(ARG_LIST(n)); |
| |
| // Support up to MAX_INTERP_ARGS arguments in the interpretation path |
| BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_ADD(MAX_INTERP_ARGS, 1), |
| INTERP_SCALAR_FN, unused) |
| |
| default: |
| DCHECK(false) << "Interpreted path not implemented."; |
| } |
| } else { |
| int num_varargs = children_.size() - NumFixedArgs(); |
| const AnyVal* varargs = reinterpret_cast<AnyVal*>(fn_ctx->impl()->varargs_buffer()); |
| switch (NumFixedArgs()) { |
| |
| // Expands to code snippet like the following for X from 0 to 20: |
| // case X: |
| // typedef RETURN_TYPE (*VarargFnX)(FunctionContext*, const AnyVal& a1, ..., |
| // const AnyVal& aX, int num_varargs, const AnyVal* varargs); |
| // return reinterpret_cast<VarargFnX>(scalar_fn_)(fn_ctx, *(*input_vals)[0], ..., |
| // *(*input_vals)[X-1], num_varargs, varargs); |
| #define SCALAR_VARARG_FN_TYPE(n) BOOST_PP_CAT(VarargFn, n) |
| #define INTERP_SCALAR_VARARG_FN(z, n, text) \ |
| case n: \ |
| typedef RETURN_TYPE (*SCALAR_VARARG_FN_TYPE(n))(ARG_DECL_LIST(n), int, \ |
| const AnyVal*); \ |
| return reinterpret_cast<SCALAR_VARARG_FN_TYPE(n)>(scalar_fn_)(ARG_LIST(n), \ |
| num_varargs, varargs); |
| |
| BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_ADD(MAX_INTERP_ARGS, 1), |
| INTERP_SCALAR_VARARG_FN, unused) |
| |
| default: |
| DCHECK(false) << "Interpreted path not implemented."; |
| } |
| } |
| return RETURN_TYPE::null(); |
| } |
| |
| typedef BooleanVal (*BooleanWrapper)(ExprContext*, const TupleRow*); |
| typedef TinyIntVal (*TinyIntWrapper)(ExprContext*, const TupleRow*); |
| typedef SmallIntVal (*SmallIntWrapper)(ExprContext*, const TupleRow*); |
| typedef IntVal (*IntWrapper)(ExprContext*, const TupleRow*); |
| typedef BigIntVal (*BigIntWrapper)(ExprContext*, const TupleRow*); |
| typedef FloatVal (*FloatWrapper)(ExprContext*, const TupleRow*); |
| typedef DoubleVal (*DoubleWrapper)(ExprContext*, const TupleRow*); |
| typedef StringVal (*StringWrapper)(ExprContext*, const TupleRow*); |
| typedef TimestampVal (*TimestampWrapper)(ExprContext*, const TupleRow*); |
| typedef DecimalVal (*DecimalWrapper)(ExprContext*, const TupleRow*); |
| |
| // TODO: macroify this? |
| BooleanVal ScalarFnCall::GetBooleanVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_BOOLEAN); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<BooleanVal>(context, row); |
| BooleanWrapper fn = reinterpret_cast<BooleanWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| TinyIntVal ScalarFnCall::GetTinyIntVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_TINYINT); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<TinyIntVal>(context, row); |
| TinyIntWrapper fn = reinterpret_cast<TinyIntWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| SmallIntVal ScalarFnCall::GetSmallIntVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_SMALLINT); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<SmallIntVal>(context, row); |
| SmallIntWrapper fn = reinterpret_cast<SmallIntWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| IntVal ScalarFnCall::GetIntVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_INT); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<IntVal>(context, row); |
| IntWrapper fn = reinterpret_cast<IntWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| BigIntVal ScalarFnCall::GetBigIntVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_BIGINT); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<BigIntVal>(context, row); |
| BigIntWrapper fn = reinterpret_cast<BigIntWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| FloatVal ScalarFnCall::GetFloatVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_FLOAT); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<FloatVal>(context, row); |
| FloatWrapper fn = reinterpret_cast<FloatWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| DoubleVal ScalarFnCall::GetDoubleVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_DOUBLE); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<DoubleVal>(context, row); |
| DoubleWrapper fn = reinterpret_cast<DoubleWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| StringVal ScalarFnCall::GetStringVal(ExprContext* context, const TupleRow* row) { |
| DCHECK(type_.IsStringType()); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<StringVal>(context, row); |
| StringWrapper fn = reinterpret_cast<StringWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| TimestampVal ScalarFnCall::GetTimestampVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_TIMESTAMP); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<TimestampVal>(context, row); |
| TimestampWrapper fn = reinterpret_cast<TimestampWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| DecimalVal ScalarFnCall::GetDecimalVal(ExprContext* context, const TupleRow* row) { |
| DCHECK_EQ(type_.type, TYPE_DECIMAL); |
| DCHECK(context != NULL); |
| if (scalar_fn_wrapper_ == NULL) return InterpretEval<DecimalVal>(context, row); |
| DecimalWrapper fn = reinterpret_cast<DecimalWrapper>(scalar_fn_wrapper_); |
| return fn(context, row); |
| } |
| |
| string ScalarFnCall::DebugString() const { |
| stringstream out; |
| out << "ScalarFnCall(udf_type=" << fn_.binary_type << " location=" << fn_.hdfs_location |
| << " symbol_name=" << fn_.scalar_fn.symbol << Expr::DebugString() << ")"; |
| return out.str(); |
| } |
| |
| int ScalarFnCall::ComputeVarArgsBufferSize() const { |
| for (int i = NumFixedArgs(); i < children_.size(); ++i) { |
| // All varargs should have same type. |
| DCHECK_EQ(children_[i]->type(), VarArgsType()); |
| } |
| return NumVarArgs() == 0 ? 0 : NumVarArgs() * AnyValUtil::AnyValSize(VarArgsType()); |
| } |