blob: 939888f3dede4691b4b823cc8df07408557ca374 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/aggregator.h"
#include <sstream>
#include "codegen/codegen-anyval.h"
#include "codegen/llvm-codegen.h"
#include "exec/exec-node.h"
#include "exprs/agg-fn-evaluator.h"
#include "exprs/expr-value.h"
#include "exprs/scalar-expr.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
namespace impala {
AggregatorConfig::AggregatorConfig(
const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode)
: intermediate_tuple_id_(taggregator.intermediate_tuple_id),
intermediate_tuple_desc_(
state->desc_tbl().GetTupleDescriptor(intermediate_tuple_id_)),
output_tuple_id_(taggregator.output_tuple_id),
output_tuple_desc_(state->desc_tbl().GetTupleDescriptor(output_tuple_id_)),
row_desc_(*pnode->row_descriptor_),
input_row_desc_(*pnode->children_[0]->row_descriptor_),
needs_finalize_(taggregator.need_finalize) {}
Status AggregatorConfig::Init(
const TAggregator& taggregator, RuntimeState* state, PlanNode* pnode) {
DCHECK(intermediate_tuple_desc_ != nullptr);
DCHECK(output_tuple_desc_ != nullptr);
DCHECK_EQ(intermediate_tuple_desc_->slots().size(), output_tuple_desc_->slots().size());
int j = taggregator.grouping_exprs.size();
for (int i = 0; i < taggregator.aggregate_functions.size(); ++i, ++j) {
SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
AggFn* agg_fn;
RETURN_IF_ERROR(AggFn::Create(taggregator.aggregate_functions[i], input_row_desc_,
*intermediate_slot_desc, *output_slot_desc, state, &agg_fn));
aggregate_functions_.push_back(agg_fn);
}
RETURN_IF_ERROR(
ScalarExpr::Create(pnode->tnode_->conjuncts, row_desc_, state, &conjuncts_));
return Status::OK();
}
const char* Aggregator::LLVM_CLASS_NAME = "class.impala::Aggregator";
Aggregator::Aggregator(ExecNode* exec_node, ObjectPool* pool,
const AggregatorConfig& config, const std::string& name, int agg_idx)
: id_(exec_node->id()),
exec_node_(exec_node),
agg_idx_(agg_idx),
pool_(pool),
intermediate_tuple_id_(config.intermediate_tuple_id_),
intermediate_tuple_desc_(config.intermediate_tuple_desc_),
output_tuple_id_(config.output_tuple_id_),
output_tuple_desc_(config.output_tuple_desc_),
row_desc_(config.row_desc_),
input_row_desc_(config.input_row_desc_),
needs_finalize_(config.needs_finalize_),
agg_fns_(config.aggregate_functions_),
conjuncts_(config.conjuncts_),
runtime_profile_(RuntimeProfile::Create(pool_, name)) {}
Aggregator::~Aggregator() {}
Status Aggregator::Prepare(RuntimeState* state) {
mem_tracker_.reset(new MemTracker(
runtime_profile_, -1, runtime_profile_->name(), exec_node_->mem_tracker()));
expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
expr_perm_pool_.reset(new MemPool(expr_mem_tracker_.get()));
expr_results_pool_.reset(new MemPool(expr_mem_tracker_.get()));
RETURN_IF_ERROR(AggFnEvaluator::Create(agg_fns_, state, pool_, expr_perm_pool_.get(),
expr_results_pool_.get(), &agg_fn_evals_));
RETURN_IF_ERROR(ScalarExprEvaluator::Create(conjuncts_, state, pool_,
expr_perm_pool_.get(), expr_results_pool_.get(), &conjunct_evals_));
DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
return Status::OK();
}
Status Aggregator::Open(RuntimeState* state) {
RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
RETURN_IF_ERROR(ScalarExprEvaluator::Open(conjunct_evals_, state));
return Status::OK();
}
void Aggregator::Close(RuntimeState* state) {
// Close all the agg-fn-evaluators
AggFnEvaluator::Close(agg_fn_evals_, state);
AggFn::Close(agg_fns_);
ScalarExprEvaluator::Close(conjunct_evals_, state);
ScalarExpr::Close(conjuncts_);
if (expr_perm_pool_.get() != nullptr) expr_perm_pool_->FreeAll();
if (expr_results_pool_.get() != nullptr) expr_results_pool_->FreeAll();
if (expr_mem_tracker_.get() != nullptr) expr_mem_tracker_->Close();
if (mem_tracker_.get() != nullptr) mem_tracker_->Close();
}
// TODO: codegen this function.
void Aggregator::InitAggSlots(
const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* intermediate_tuple) {
vector<SlotDescriptor*>::const_iterator slot_desc =
intermediate_tuple_desc_->slots().begin() + GetNumGroupingExprs();
for (int i = 0; i < agg_fn_evals.size(); ++i, ++slot_desc) {
// To minimize branching on the UpdateTuple path, initialize the result value so that
// the Add() UDA function can ignore the NULL bit of its destination value. E.g. for
// SUM(), if we initialize the destination value to 0 (with the NULL bit set), we can
// just start adding to the destination value (rather than repeatedly checking the
// destination NULL bit. The codegen'd version of UpdateSlot() exploits this to
// eliminate a branch per value.
//
// For boolean and numeric types, the default values are false/0, so the nullable
// aggregate functions SUM() and AVG() produce the correct result. For MIN()/MAX(),
// initialize the value to max/min possible value for the same effect.
AggFnEvaluator* eval = agg_fn_evals[i];
eval->Init(intermediate_tuple);
DCHECK(agg_fns_[i] == &(eval->agg_fn()));
const AggFn* agg_fn = agg_fns_[i];
const AggFn::AggregationOp agg_op = agg_fn->agg_op();
if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX)
&& !agg_fn->intermediate_type().IsStringType()
&& !agg_fn->intermediate_type().IsTimestampType()) {
ExprValue default_value;
void* default_value_ptr = nullptr;
if (agg_op == AggFn::MIN) {
default_value_ptr = default_value.SetToMax((*slot_desc)->type());
} else {
DCHECK_EQ(agg_op, AggFn::MAX);
default_value_ptr = default_value.SetToMin((*slot_desc)->type());
}
RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, nullptr);
}
}
}
void Aggregator::UpdateTuple(
AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row, bool is_merge) noexcept {
DCHECK(tuple != nullptr || agg_fns_.empty());
for (int i = 0; i < agg_fns_.size(); ++i) {
if (is_merge) {
agg_fn_evals[i]->Merge(row->GetTuple(0), tuple);
} else {
agg_fn_evals[i]->Add(row, tuple);
}
}
}
Tuple* Aggregator::GetOutputTuple(
const vector<AggFnEvaluator*>& agg_fn_evals, Tuple* tuple, MemPool* pool) {
DCHECK(tuple != nullptr || agg_fn_evals.empty()) << tuple;
Tuple* dst = tuple;
if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
}
if (needs_finalize_) {
AggFnEvaluator::Finalize(agg_fn_evals, tuple, dst);
} else {
AggFnEvaluator::Serialize(agg_fn_evals, tuple);
}
// Copy grouping values from tuple to dst.
// TODO: Codegen this.
if (dst != tuple) {
int num_grouping_slots = GetNumGroupingExprs();
for (int i = 0; i < num_grouping_slots; ++i) {
SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
void* src_slot = nullptr;
if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
RawValue::Write(src_slot, dst, dst_slot_desc, nullptr);
}
}
return dst;
}
Status Aggregator::QueryMaintenance(RuntimeState* state) {
expr_results_pool_->Clear();
return state->CheckQueryState();
}
// IR Generation for updating a single aggregation slot. Signature is:
// void UpdateSlot(AggFnEvaluator* agg_expr_eval, AggTuple* agg_tuple, char** row)
//
// The IR for sum(double_col), which is constructed directly with the IRBuilder, is:
//
// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
// <{ double, i8 }>* %agg_tuple, %"class.impala::TupleRow"* %row) #33 {
// entry:
// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
// @_ZNK6impala14AggFnEvaluator11input_evalsEv(
// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
// %0 = getelementptr %"class.impala::ScalarExprEvaluator"*,
// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
// %input_eval = load %"class.impala::ScalarExprEvaluator"*,
// %"class.impala::ScalarExprEvaluator"** %0
// %input0 = call { i8, double } @GetSlotRef(%"class.impala::ScalarExprEvaluator"*
// %input_eval, %"class.impala::TupleRow"* %row)
// %dst_slot_ptr = getelementptr inbounds <{ double, i8 }>,
// <{ double, i8 }>* %agg_tuple, i32 0, i32 0
// %dst_val = load double, double* %dst_slot_ptr
// %1 = extractvalue { i8, double } %input0, 0
// %is_null = trunc i8 %1 to i1
// br i1 %is_null, label %ret, label %not_null
//
// ret: ; preds = %not_null, %entry
// ret void
//
// not_null: ; preds = %entry
// %val = extractvalue { i8, double } %input0, 1
// %2 = fadd double %dst_val, %val
// %3 = bitcast <{ double, i8 }>* %agg_tuple to i8*
// %null_byte_ptr = getelementptr inbounds i8, i8* %3, i32 8
// %null_byte = load i8, i8* %null_byte_ptr
// %null_bit_cleared = and i8 %null_byte, -2
// store i8 %null_bit_cleared, i8* %null_byte_ptr
// store double %2, double* %dst_slot_ptr
// br label %ret
// }
//
// The IR for ndv(timestamp_col), which uses the UDA interface, is:
//
// define void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
// <{ [1024 x i8] }>* %agg_tuple,
// %"class.impala::TupleRow"* %row) #39 {
// entry:
// %dst_lowered_ptr = alloca { i64, i8* }
// %0 = alloca { i64, i64 }
// %input_evals_vector = call %"class.impala::ScalarExprEvaluator"**
// @_ZNK6impala14AggFnEvaluator11input_evalsEv(
// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
// %1 = getelementptr %"class.impala::ScalarExprEvaluator"*,
// %"class.impala::ScalarExprEvaluator"** %input_evals_vector, i32 0
// %input_eval = load %"class.impala::ScalarExprEvaluator"*,
// %"class.impala::ScalarExprEvaluator"** %1
// %input0 = call { i64, i64 } @GetSlotRef(
// %"class.impala::ScalarExprEvaluator"* %input_eval,
// %"class.impala::TupleRow"* %row)
// %dst_slot_ptr = getelementptr inbounds <{ [1024 x i8] }>,
// <{ [1024 x i8] }>* %agg_tuple, i32 0, i32 0
// %2 = bitcast [1024 x i8]* %dst_slot_ptr to i8*
// %dst = insertvalue { i64, i8* } zeroinitializer, i8* %2, 1
// %3 = extractvalue { i64, i8* } %dst, 0
// %4 = and i64 %3, 4294967295
// %5 = or i64 %4, 4398046511104
// %dst1 = insertvalue { i64, i8* } %dst, i64 %5, 0
// %agg_fn_ctx = call %"class.impala_udf::FunctionContext"*
// @_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv(
// %"class.impala::AggFnEvaluator"* %agg_fn_eval)
// store { i64, i64 } %input0, { i64, i64 }* %0
// %input_unlowered_ptr =
// bitcast { i64, i64 }* %0 to %"struct.impala_udf::TimestampVal"*
// store { i64, i8* } %dst1, { i64, i8* }* %dst_lowered_ptr
// %dst_unlowered_ptr =
// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
// call void @"void impala::AggregateFunctions::HllUpdate<impala_udf::TimestampVal>"(
// %"class.impala_udf::FunctionContext"* %agg_fn_ctx,
// %"struct.impala_udf::TimestampVal"* %input_unlowered_ptr,
// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
// br label %ret
//
// ret: ; preds = %entry
// ret void
// }
//
Status Aggregator::CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
SlotDescriptor* slot_desc, llvm::Function** fn) {
llvm::PointerType* agg_fn_eval_type = codegen->GetStructPtrType<AggFnEvaluator>();
llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
if (tuple_struct == nullptr) {
return Status("Aggregator::CodegenUpdateSlot(): failed to generate "
"intermediate tuple desc");
}
llvm::PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_eval", agg_fn_eval_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
LlvmBuilder builder(codegen->context());
llvm::Value* args[3];
*fn = prototype.GeneratePrototype(&builder, &args[0]);
llvm::Value* agg_fn_eval_arg = args[0];
llvm::Value* agg_tuple_arg = args[1];
llvm::Value* row_arg = args[2];
// Get the vector of input expressions' evaluators.
llvm::Value* input_evals_vector = codegen->CodegenCallFunction(&builder,
IRFunction::AGG_FN_EVALUATOR_INPUT_EVALUATORS, agg_fn_eval_arg,
"input_evals_vector");
AggFn* agg_fn = agg_fns_[agg_fn_idx];
const int num_inputs = agg_fn->GetNumChildren();
DCHECK_GE(num_inputs, 1);
vector<CodegenAnyVal> input_vals;
for (int i = 0; i < num_inputs; ++i) {
ScalarExpr* input_expr = agg_fn->GetChild(i);
llvm::Function* input_expr_fn;
RETURN_IF_ERROR(input_expr->GetCodegendComputeFn(codegen, false, &input_expr_fn));
DCHECK(input_expr_fn != nullptr);
// Call input expr function with the matching evaluator to get src slot value.
llvm::Value* input_eval =
codegen->CodegenArrayAt(&builder, input_evals_vector, i, "input_eval");
string input_name = Substitute("input$0", i);
CodegenAnyVal input_val = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
input_expr->type(), input_expr_fn,
llvm::ArrayRef<llvm::Value*>({input_eval, row_arg}), input_name.c_str());
input_vals.push_back(input_val);
}
AggFn::AggregationOp agg_op = agg_fn->agg_op();
const ColumnType& dst_type = agg_fn->intermediate_type();
bool dst_is_int_or_float_or_bool = dst_type.IsIntegerType()
|| dst_type.IsFloatingPointType() || dst_type.IsBooleanType();
bool dst_is_numeric_or_bool = dst_is_int_or_float_or_bool || dst_type.IsDecimalType()
|| dst_type.IsDateType();
llvm::BasicBlock* ret_block = llvm::BasicBlock::Create(codegen->context(), "ret", *fn);
// Emit the code to compute 'result' and set the NULL indicator if needed. First check
// for special cases where we can emit a very simple instruction sequence, then fall
// back to the general-purpose approach of calling the cross-compiled builtin UDA.
CodegenAnyVal& src = input_vals[0];
// 'dst_slot_ptr' points to the slot in the aggregate tuple to update.
llvm::Value* dst_slot_ptr = builder.CreateStructGEP(
nullptr, agg_tuple_arg, slot_desc->llvm_field_idx(), "dst_slot_ptr");
// TODO: consider moving the following codegen logic to AggFn.
if (agg_op == AggFn::COUNT) {
src.CodegenBranchIfNull(&builder, ret_block);
llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
llvm::Value* result = agg_fn->is_merge() ?
builder.CreateAdd(dst_value, src.GetVal(), "count_sum") :
builder.CreateAdd(dst_value, codegen->GetI64Constant(1), "count_inc");
builder.CreateStore(result, dst_slot_ptr);
DCHECK(!slot_desc->is_nullable());
} else if ((agg_op == AggFn::MIN || agg_op == AggFn::MAX) && dst_is_numeric_or_bool) {
bool is_min = agg_op == AggFn::MIN;
src.CodegenBranchIfNull(&builder, ret_block);
codegen->CodegenMinMax(
&builder, slot_desc->type(), src.GetVal(), dst_slot_ptr, is_min, *fn);
// Dst may have been NULL, make sure to unset the NULL bit.
DCHECK(slot_desc->is_nullable());
slot_desc->CodegenSetNullIndicator(
codegen, &builder, agg_tuple_arg, codegen->false_value());
} else if (agg_op == AggFn::SUM && dst_is_int_or_float_or_bool) {
src.CodegenBranchIfNull(&builder, ret_block);
llvm::Value* dst_value = builder.CreateLoad(dst_slot_ptr, "dst_val");
llvm::Value* result = dst_type.IsFloatingPointType() ?
builder.CreateFAdd(dst_value, src.GetVal()) :
builder.CreateAdd(dst_value, src.GetVal());
builder.CreateStore(result, dst_slot_ptr);
if (slot_desc->is_nullable()) {
slot_desc->CodegenSetNullIndicator(
codegen, &builder, agg_tuple_arg, codegen->false_value());
} else {
// 'slot_desc' is not nullable if the aggregate function is sum_init_zero(),
// because the slot is initialized to be zero and the null bit is nonexistent.
DCHECK_EQ(agg_fn->fn_name(), "sum_init_zero");
}
} else {
// The remaining cases are implemented using the UDA interface.
// Create intermediate argument 'dst' from 'dst_value'
CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal(codegen, &builder, dst_type, "dst");
// For a subset of builtins we generate a different code sequence that exploits two
// properties of the builtins. First, NULL input values can be skipped. Second, the
// value of the slot was initialized in the right way in InitAggSlots() (e.g. 0 for
// SUM) that we get the right result if UpdateSlot() pretends that the NULL bit of
// 'dst' is unset. Empirically this optimisation makes TPC-H Q1 5-10% faster.
bool special_null_handling = !agg_fn->intermediate_type().IsStringType()
&& !agg_fn->intermediate_type().IsTimestampType()
&& (agg_op == AggFn::MIN || agg_op == AggFn::MAX || agg_op == AggFn::SUM
|| agg_op == AggFn::AVG || agg_op == AggFn::NDV);
if (slot_desc->is_nullable()) {
if (special_null_handling) {
src.CodegenBranchIfNull(&builder, ret_block);
slot_desc->CodegenSetNullIndicator(
codegen, &builder, agg_tuple_arg, codegen->false_value());
} else {
dst.SetIsNull(slot_desc->CodegenIsNull(codegen, &builder, agg_tuple_arg));
}
}
dst.LoadFromNativePtr(dst_slot_ptr);
// Get the FunctionContext object for the AggFnEvaluator.
llvm::Function* get_agg_fn_ctx_fn =
codegen->GetFunction(IRFunction::AGG_FN_EVALUATOR_AGG_FN_CTX, false);
DCHECK(get_agg_fn_ctx_fn != nullptr);
llvm::Value* agg_fn_ctx_val =
builder.CreateCall(get_agg_fn_ctx_fn, {agg_fn_eval_arg}, "agg_fn_ctx");
// Call the UDA to update/merge 'src' into 'dst', with the result stored in
// 'updated_dst_val'.
CodegenAnyVal updated_dst_val;
RETURN_IF_ERROR(CodegenCallUda(
codegen, &builder, agg_fn, agg_fn_ctx_val, input_vals, dst, &updated_dst_val));
// Copy the value back to the slot. In the FIXED_UDA_INTERMEDIATE case, the
// UDA function writes directly to the slot so there is nothing to copy.
if (dst_type.type != TYPE_FIXED_UDA_INTERMEDIATE) {
updated_dst_val.StoreToNativePtr(dst_slot_ptr);
}
if (slot_desc->is_nullable() && !special_null_handling) {
// Set NULL bit in the slot based on the return value.
llvm::Value* result_is_null = updated_dst_val.GetIsNull("result_is_null");
slot_desc->CodegenSetNullIndicator(
codegen, &builder, agg_tuple_arg, result_is_null);
}
}
builder.CreateBr(ret_block);
builder.SetInsertPoint(ret_block);
builder.CreateRetVoid();
// Avoid producing huge UpdateTuple() function after inlining - LLVM's optimiser
// memory/CPU usage scales super-linearly with function size.
// E.g. compute stats on all columns of a 1000-column table previously took 4 minutes to
// codegen because all the UpdateSlot() functions were inlined.
if (agg_fn_idx >= LlvmCodeGen::CODEGEN_INLINE_EXPRS_THRESHOLD) {
codegen->SetNoInline(*fn);
}
*fn = codegen->FinalizeFunction(*fn);
if (*fn == nullptr) {
return Status("Aggregator::CodegenUpdateSlot(): codegen'd "
"UpdateSlot() function failed verification, see log");
}
return Status::OK();
}
Status Aggregator::CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder,
AggFn* agg_fn, llvm::Value* agg_fn_ctx_val, const vector<CodegenAnyVal>& input_vals,
const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) {
llvm::Function* uda_fn;
RETURN_IF_ERROR(agg_fn->CodegenUpdateOrMergeFunction(codegen, &uda_fn));
// Set up arguments for call to UDA, which are the FunctionContext*, followed by
// pointers to all input values, followed by a pointer to the destination value.
vector<llvm::Value*> uda_fn_args;
uda_fn_args.push_back(agg_fn_ctx_val);
// Create pointers to input args to pass to uda_fn. We must use the unlowered type,
// e.g. IntVal, because the UDA interface expects the values to be passed as const
// references to the classes.
DCHECK_EQ(agg_fn->GetNumChildren(), input_vals.size());
for (int i = 0; i < input_vals.size(); ++i) {
uda_fn_args.push_back(input_vals[i].GetUnloweredPtr("input_unlowered_ptr"));
}
// Create pointer to dst to pass to uda_fn. We must use the unlowered type for the
// same reason as above.
llvm::Value* dst_lowered_ptr = dst_val.GetLoweredPtr("dst_lowered_ptr");
const ColumnType& dst_type = agg_fn->intermediate_type();
llvm::Type* dst_unlowered_ptr_type =
CodegenAnyVal::GetUnloweredPtrType(codegen, dst_type);
llvm::Value* dst_unlowered_ptr = builder->CreateBitCast(
dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
uda_fn_args.push_back(dst_unlowered_ptr);
// Call 'uda_fn'
builder->CreateCall(uda_fn, uda_fn_args);
// Convert intermediate 'dst_arg' back to the native type.
llvm::Value* anyval_result = builder->CreateLoad(dst_lowered_ptr, "anyval_result");
*updated_dst_val = CodegenAnyVal(codegen, builder, dst_type, anyval_result);
return Status::OK();
}
// IR codegen for the UpdateTuple loop. This loop is query specific and based on the
// aggregate functions. The function signature must match the non- codegen'd UpdateTuple
// exactly.
// For the query:
// select count(*), count(int_col), sum(double_col) the IR looks like:
//
// define void @UpdateTuple(%"class.impala::Aggregator"* %this_ptr,
// %"class.impala::AggFnEvaluator"** %agg_fn_evals, %"class.impala::Tuple"* %tuple,
// %"class.impala::TupleRow"* %row, i1 %is_merge) #33 {
// entry:
// %tuple1 = bitcast %"class.impala::Tuple"* %tuple to <{ i64, i64, double, i8 }>*
// %src_slot = getelementptr inbounds <{ i64, i64, double, i8 }>,
// <{ i64, i64, double, i8 }>* %tuple1, i32 0, i32 0
// %count_star_val = load i64, i64* %src_slot
// %count_star_inc = add i64 %count_star_val, 1
// store i64 %count_star_inc, i64* %src_slot
// %0 = getelementptr %"class.impala::AggFnEvaluator"*,
// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 1
// %agg_fn_eval =
// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %0
// call void @UpdateSlot(%"class.impala::AggFnEvaluator"* %agg_fn_eval,
// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
// %1 = getelementptr %"class.impala::AggFnEvaluator"*,
// %"class.impala::AggFnEvaluator"** %agg_fn_evals, i32 2
// %agg_fn_eval2 =
// load %"class.impala::AggFnEvaluator"*, %"class.impala::AggFnEvaluator"** %1
// call void @UpdateSlot.2(%"class.impala::AggFnEvaluator"* %agg_fn_eval2,
// <{ i64, i64, double, i8 }>* %tuple1, %"class.impala::TupleRow"* %row)
// ret void
// }
//
Status Aggregator::CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) {
for (const SlotDescriptor* slot_desc : intermediate_tuple_desc_->slots()) {
if (slot_desc->type().type == TYPE_CHAR) {
return Status::Expected("Aggregator::CodegenUpdateTuple(): cannot "
"codegen CHAR in aggregations");
}
}
if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == nullptr) {
return Status::Expected("Aggregator::CodegenUpdateTuple(): failed to"
" generate intermediate tuple desc");
}
// Get the types to match the UpdateTuple signature
llvm::PointerType* agg_node_ptr_type = codegen->GetStructPtrType<Aggregator>();
llvm::PointerType* evals_type = codegen->GetStructPtrPtrType<AggFnEvaluator>();
llvm::PointerType* tuple_ptr_type = codegen->GetStructPtrType<Tuple>();
llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
llvm::StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
llvm::PointerType* tuple_ptr = codegen->GetPtrType(tuple_struct);
LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_fn_evals", evals_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("is_merge", codegen->bool_type()));
LlvmBuilder builder(codegen->context());
llvm::Value* args[5];
*fn = prototype.GeneratePrototype(&builder, &args[0]);
llvm::Value* agg_fn_evals_arg = args[1];
llvm::Value* tuple_arg = args[2];
llvm::Value* row_arg = args[3];
// Cast the parameter types to the internal llvm runtime types.
// TODO: get rid of this by using right type in function signature
tuple_arg = builder.CreateBitCast(tuple_arg, tuple_ptr, "tuple");
// Loop over each expr and generate the IR for that slot. If the expr is not
// count(*), generate a helper IR function to update the slot and call that.
int j = GetNumGroupingExprs();
for (int i = 0; i < agg_fns_.size(); ++i, ++j) {
SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
AggFn* agg_fn = agg_fns_[i];
if (agg_fn->is_count_star()) {
// TODO: we should be able to hoist this up to the loop over the batch and just
// increment the slot by the number of rows in the batch.
int field_idx = slot_desc->llvm_field_idx();
llvm::Value* const_one = codegen->GetI64Constant(1);
llvm::Value* slot_ptr =
builder.CreateStructGEP(nullptr, tuple_arg, field_idx, "src_slot");
llvm::Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
llvm::Value* count_inc =
builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
builder.CreateStore(count_inc, slot_ptr);
} else {
llvm::Function* update_slot_fn;
RETURN_IF_ERROR(CodegenUpdateSlot(codegen, i, slot_desc, &update_slot_fn));
// Load agg_fn_evals_[i]
llvm::Value* agg_fn_eval_val =
codegen->CodegenArrayAt(&builder, agg_fn_evals_arg, i, "agg_fn_eval");
// Call UpdateSlot(agg_fn_evals_[i], tuple, row);
llvm::Value* update_slot_args[] = {agg_fn_eval_val, tuple_arg, row_arg};
builder.CreateCall(update_slot_fn, update_slot_args);
}
}
builder.CreateRetVoid();
// Avoid inlining big UpdateTuple function into outer loop - we're unlikely to get
// any benefit from it since the function call overhead will be amortized.
if (agg_fns_.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
codegen->SetNoInline(*fn);
}
// CodegenProcessBatch() does the final optimizations.
*fn = codegen->FinalizeFunction(*fn);
if (*fn == nullptr) {
return Status("Aggregator::CodegenUpdateTuple(): codegen'd "
"UpdateTuple() function failed verification, see log");
}
return Status::OK();
}
} // namespace impala