blob: 822fc5da6eae7b0762a191bfa39bfbe21af2b5eb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/aggregation-node.h"
#include <math.h>
#include <sstream>
#include <boost/functional/hash.hpp>
#include <thrift/protocol/TDebugProtocol.h>
#include <x86intrin.h>
#include "codegen/codegen-anyval.h"
#include "codegen/llvm-codegen.h"
#include "exec/old-hash-table.inline.h"
#include "exprs/agg-fn-evaluator.h"
#include "exprs/expr.h"
#include "exprs/expr-context.h"
#include "exprs/slot-ref.h"
#include "runtime/descriptors.h"
#include "runtime/mem-pool.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
#include "runtime/tuple.h"
#include "runtime/tuple-row.h"
#include "udf/udf-internal.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/Exprs_types.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
using namespace impala;
using namespace llvm;
namespace impala {
const char* AggregationNode::LLVM_CLASS_NAME = "class.impala::AggregationNode";
// TODO: pass in maximum size; enforce by setting limit in mempool
AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
intermediate_tuple_desc_(NULL),
output_tuple_id_(tnode.agg_node.output_tuple_id),
output_tuple_desc_(NULL),
singleton_intermediate_tuple_(NULL),
codegen_process_row_batch_fn_(NULL),
process_row_batch_fn_(NULL),
needs_finalize_(tnode.agg_node.need_finalize),
build_timer_(NULL),
get_results_timer_(NULL),
hash_table_buckets_counter_(NULL) {
}
Status AggregationNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Init(tnode, state));
RETURN_IF_ERROR(
Expr::CreateExprTrees(pool_, tnode.agg_node.grouping_exprs, &probe_expr_ctxs_));
for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) {
AggFnEvaluator* evaluator;
RETURN_IF_ERROR(AggFnEvaluator::Create(
pool_, tnode.agg_node.aggregate_functions[i], &evaluator));
aggregate_evaluators_.push_back(evaluator);
ExprContext* agg_expr_ctx;
if (evaluator->input_expr_ctxs().size() == 1) {
agg_expr_ctx = evaluator->input_expr_ctxs()[0];
} else {
// CodegenUpdateSlot() can only support aggregate operator with only one ExprContext
// so it doesn't support operator such as group_concat. There are also aggregate
// operators with no ExprContext (e.g. count(*)). In cases above, 'agg_expr_ctxs_'
// will contain NULL for that entry.
DCHECK(evaluator->agg_op() == AggFnEvaluator::OTHER || evaluator->is_count_star());
agg_expr_ctx = NULL;
}
agg_expr_ctxs_.push_back(agg_expr_ctx);
}
return Status::OK();
}
Status AggregationNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Prepare(state));
tuple_pool_.reset(new MemPool(mem_tracker()));
agg_fn_pool_.reset(new MemPool(expr_mem_tracker()));
build_timer_ = ADD_TIMER(runtime_profile(), "BuildTime");
get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
hash_table_buckets_counter_ =
ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
hash_table_load_factor_counter_ =
ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
intermediate_tuple_desc_ =
state->desc_tbl().GetTupleDescriptor(intermediate_tuple_id_);
output_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(output_tuple_id_);
DCHECK_EQ(intermediate_tuple_desc_->slots().size(),
output_tuple_desc_->slots().size());
RETURN_IF_ERROR(
Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker()));
AddExprCtxsToFree(probe_expr_ctxs_);
// Construct build exprs from intermediate_agg_tuple_desc_
for (int i = 0; i < probe_expr_ctxs_.size(); ++i) {
SlotDescriptor* desc = intermediate_tuple_desc_->slots()[i];
DCHECK(desc->type().type == TYPE_NULL ||
desc->type() == probe_expr_ctxs_[i]->root()->type());
// TODO: Generate the build exprs in the FE such that the existing logic
// for handling NULL_TYPE works.
// Hack to avoid TYPE_NULL SlotRefs.
Expr* expr = desc->type().type != TYPE_NULL ?
new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN);
state->obj_pool()->Add(expr);
build_expr_ctxs_.push_back(new ExprContext(expr));
state->obj_pool()->Add(build_expr_ctxs_.back());
}
// Construct a new row desc for preparing the build exprs because neither the child's
// nor this node's output row desc may contain the intermediate tuple, e.g.,
// in a single-node plan with an intermediate tuple different from the output tuple.
RowDescriptor build_row_desc(intermediate_tuple_desc_, false);
RETURN_IF_ERROR(
Expr::Prepare(build_expr_ctxs_, state, build_row_desc, expr_mem_tracker()));
AddExprCtxsToFree(build_expr_ctxs_);
int j = probe_expr_ctxs_.size();
for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
SlotDescriptor* intermediate_slot_desc = intermediate_tuple_desc_->slots()[j];
SlotDescriptor* output_slot_desc = output_tuple_desc_->slots()[j];
FunctionContext* agg_fn_ctx;
RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, child(0)->row_desc(),
intermediate_slot_desc, output_slot_desc, agg_fn_pool_.get(), &agg_fn_ctx));
agg_fn_ctxs_.push_back(agg_fn_ctx);
state->obj_pool()->Add(agg_fn_ctx);
}
DCHECK_EQ(agg_fn_ctxs_.size(), aggregate_evaluators_.size());
// TODO: how many buckets?
hash_tbl_.reset(new OldHashTable(state, build_expr_ctxs_, probe_expr_ctxs_,
vector<ExprContext*>(), 1, true, vector<bool>(build_expr_ctxs_.size(), true), id(),
mem_tracker(), vector<RuntimeFilter*>(), true));
AddCodegenDisabledMessage(state);
return Status::OK();
}
void AggregationNode::Codegen(RuntimeState* state) {
DCHECK(state->ShouldCodegen());
ExecNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
bool codegen_enabled = false;
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != NULL);
Function* update_tuple_fn = CodegenUpdateTuple(codegen);
if (update_tuple_fn != NULL) {
codegen_process_row_batch_fn_ = CodegenProcessRowBatch(codegen, update_tuple_fn);
if (codegen_process_row_batch_fn_ != NULL) {
// Update to using codegen'd process row batch.
codegen->AddFunctionToJit(codegen_process_row_batch_fn_,
reinterpret_cast<void**>(&process_row_batch_fn_));
codegen_enabled = true;
}
}
runtime_profile()->AddCodegenMsg(codegen_enabled);
}
Status AggregationNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Open(state));
RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state));
RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
DCHECK_EQ(aggregate_evaluators_.size(), agg_fn_ctxs_.size());
for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
RETURN_IF_ERROR(aggregate_evaluators_[i]->Open(state, agg_fn_ctxs_[i]));
}
if (probe_expr_ctxs_.empty()) {
// Create single intermediate tuple. This must happen after
// opening the aggregate evaluators.
singleton_intermediate_tuple_ = ConstructIntermediateTuple();
// Check for failures during AggFnEvaluator::Init().
RETURN_IF_ERROR(state->GetQueryStatus());
hash_tbl_->Insert(singleton_intermediate_tuple_);
}
RETURN_IF_ERROR(children_[0]->Open(state));
RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker());
int64_t num_input_rows = 0;
while (true) {
bool eos;
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(children_[0]->GetNext(state, &batch, &eos));
SCOPED_TIMER(build_timer_);
if (VLOG_ROW_IS_ON) {
for (int i = 0; i < batch.num_rows(); ++i) {
TupleRow* row = batch.GetRow(i);
VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc());
}
}
if (process_row_batch_fn_ != NULL) {
process_row_batch_fn_(this, &batch);
} else if (probe_expr_ctxs_.empty()) {
ProcessRowBatchNoGrouping(&batch);
} else {
ProcessRowBatchWithGrouping(&batch);
}
COUNTER_SET(hash_table_buckets_counter_, hash_tbl_->num_buckets());
COUNTER_SET(hash_table_load_factor_counter_, hash_tbl_->load_factor());
num_input_rows += batch.num_rows();
// We must set output_iterator_ here, rather than outside the loop, because
// output_iterator_ must be set if the function returns within the loop
output_iterator_ = hash_tbl_->Begin();
batch.Reset();
RETURN_IF_ERROR(QueryMaintenance(state));
if (eos) break;
}
// We have consumed all of the input from the child and transfered ownership of the
// resources we need, so the child can be closed safely to release its resources.
child(0)->Close(state);
VLOG_FILE << "aggregated " << num_input_rows << " input rows into "
<< hash_tbl_->size() << " output rows";
return Status::OK();
}
Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
SCOPED_TIMER(get_results_timer_);
if (ReachedLimit()) {
*eos = true;
return Status::OK();
}
*eos = false;
ExprContext** ctxs = &conjunct_ctxs_[0];
int num_ctxs = conjunct_ctxs_.size();
int count = 0;
const int N = state->batch_size();
while (!output_iterator_.AtEnd() && !row_batch->AtCapacity()) {
// This loop can go on for a long time if the conjuncts are very selective. Do query
// maintenance every N iterations.
if (count++ % N == 0) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
}
int row_idx = row_batch->AddRow();
TupleRow* row = row_batch->GetRow(row_idx);
Tuple* intermediate_tuple = output_iterator_.GetTuple();
Tuple* output_tuple =
FinalizeTuple(intermediate_tuple, row_batch->tuple_data_pool());
output_iterator_.Next<false>();
row->SetTuple(0, output_tuple);
if (ExecNode::EvalConjuncts(ctxs, num_ctxs, row)) {
VLOG_ROW << "output row: " << PrintRow(row, row_desc());
row_batch->CommitLastRow();
++num_rows_returned_;
if (ReachedLimit()) break;
}
}
*eos = output_iterator_.AtEnd() || ReachedLimit();
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
return Status::OK();
}
Status AggregationNode::Reset(RuntimeState* state) {
DCHECK(false) << "NYI";
return Status("NYI");
}
void AggregationNode::Close(RuntimeState* state) {
if (is_closed()) return;
// Iterate through the remaining rows in the hash table and call Serialize/Finalize on
// them in order to free any memory allocated by UDAs. Finalize() requires a dst tuple
// but we don't actually need the result, so allocate a single dummy tuple to avoid
// accumulating memory.
Tuple* dummy_dst = NULL;
if (needs_finalize_ && output_tuple_desc_ != NULL) {
dummy_dst = Tuple::Create(output_tuple_desc_->byte_size(), tuple_pool_.get());
}
while (!output_iterator_.AtEnd()) {
Tuple* tuple = output_iterator_.GetTuple();
if (needs_finalize_) {
AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs_, tuple, dummy_dst);
} else {
AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs_, tuple);
}
output_iterator_.Next<false>();
}
if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll();
if (hash_tbl_.get() != NULL) hash_tbl_->Close();
agg_expr_ctxs_.clear();
for (AggFnEvaluator* aggregate_evaluator : aggregate_evaluators_) {
aggregate_evaluator->Close(state);
}
for (FunctionContext* agg_fn_ctx : agg_fn_ctxs_) {
agg_fn_ctx->impl()->Close();
}
if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
Expr::Close(probe_expr_ctxs_, state);
Expr::Close(build_expr_ctxs_, state);
ExecNode::Close(state);
}
Tuple* AggregationNode::ConstructIntermediateTuple() {
Tuple* intermediate_tuple = Tuple::Create(
intermediate_tuple_desc_->byte_size(), tuple_pool_.get());
vector<SlotDescriptor*>::const_iterator slot_desc =
intermediate_tuple_desc_->slots().begin();
// copy grouping values
for (int i = 0; i < probe_expr_ctxs_.size(); ++i, ++slot_desc) {
if (hash_tbl_->last_expr_value_null(i)) {
intermediate_tuple->SetNull((*slot_desc)->null_indicator_offset());
} else {
void* src = hash_tbl_->last_expr_value(i);
void* dst = intermediate_tuple->GetSlot((*slot_desc)->tuple_offset());
RawValue::Write(src, dst, (*slot_desc)->type(), tuple_pool_.get());
}
}
// Initialize aggregate output.
for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++slot_desc) {
AggFnEvaluator* evaluator = aggregate_evaluators_[i];
evaluator->Init(agg_fn_ctxs_[i], intermediate_tuple);
// Codegen specific path.
// To minimize branching on the UpdateTuple path, initialize the result value
// so that UpdateTuple doesn't have to check if the aggregation
// dst slot is null.
// - sum/count: 0
// - min: max_value
// - max: min_value
// TODO: remove when we don't use the irbuilder for codegen here.
// This optimization no longer applies with AnyVal
if ((*slot_desc)->type().type != TYPE_STRING &&
(*slot_desc)->type().type != TYPE_VARCHAR &&
(*slot_desc)->type().type != TYPE_TIMESTAMP &&
(*slot_desc)->type().type != TYPE_CHAR &&
(*slot_desc)->type().type != TYPE_DECIMAL) {
ExprValue default_value;
void* default_value_ptr = NULL;
switch (evaluator->agg_op()) {
case AggFnEvaluator::MIN:
default_value_ptr = default_value.SetToMax((*slot_desc)->type());
RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
break;
case AggFnEvaluator::MAX:
default_value_ptr = default_value.SetToMin((*slot_desc)->type());
RawValue::Write(default_value_ptr, intermediate_tuple, *slot_desc, NULL);
break;
default:
break;
}
}
}
return intermediate_tuple;
}
void AggregationNode::UpdateTuple(Tuple* tuple, TupleRow* row) {
DCHECK(tuple != NULL || aggregate_evaluators_.empty());
AggFnEvaluator::Add(aggregate_evaluators_, agg_fn_ctxs_, row, tuple);
}
Tuple* AggregationNode::FinalizeTuple(Tuple* tuple, MemPool* pool) {
DCHECK(tuple != NULL || aggregate_evaluators_.empty());
DCHECK(output_tuple_desc_ != NULL);
Tuple* dst = tuple;
if (needs_finalize_ && intermediate_tuple_id_ != output_tuple_id_) {
dst = Tuple::Create(output_tuple_desc_->byte_size(), pool);
}
if (needs_finalize_) {
AggFnEvaluator::Finalize(aggregate_evaluators_, agg_fn_ctxs_, tuple, dst);
} else {
AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs_, tuple);
}
// Copy grouping values from tuple to dst.
// TODO: Codegen this.
if (dst != tuple) {
int num_grouping_slots = probe_expr_ctxs_.size();
for (int i = 0; i < num_grouping_slots; ++i) {
SlotDescriptor* src_slot_desc = intermediate_tuple_desc_->slots()[i];
SlotDescriptor* dst_slot_desc = output_tuple_desc_->slots()[i];
bool src_slot_null = tuple->IsNull(src_slot_desc->null_indicator_offset());
void* src_slot = NULL;
if (!src_slot_null) src_slot = tuple->GetSlot(src_slot_desc->tuple_offset());
RawValue::Write(src_slot, dst, dst_slot_desc, NULL);
}
}
return dst;
}
void AggregationNode::DebugString(int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "AggregationNode("
<< "intermediate_tuple_id=" << intermediate_tuple_id_
<< " output_tuple_id=" << output_tuple_id_
<< " needs_finalize=" << needs_finalize_
<< " probe_exprs=" << Expr::DebugString(probe_expr_ctxs_)
<< " agg_exprs=" << AggFnEvaluator::DebugString(aggregate_evaluators_);
ExecNode::DebugString(indentation_level, out);
*out << ")";
}
IRFunction::Type GetHllUpdateFunction2(const ColumnType& type) {
switch (type.type) {
case TYPE_BOOLEAN: return IRFunction::HLL_UPDATE_BOOLEAN;
case TYPE_TINYINT: return IRFunction::HLL_UPDATE_TINYINT;
case TYPE_SMALLINT: return IRFunction::HLL_UPDATE_SMALLINT;
case TYPE_INT: return IRFunction::HLL_UPDATE_INT;
case TYPE_BIGINT: return IRFunction::HLL_UPDATE_BIGINT;
case TYPE_FLOAT: return IRFunction::HLL_UPDATE_FLOAT;
case TYPE_DOUBLE: return IRFunction::HLL_UPDATE_DOUBLE;
case TYPE_STRING: return IRFunction::HLL_UPDATE_STRING;
case TYPE_DECIMAL: return IRFunction::HLL_UPDATE_DECIMAL;
default:
DCHECK(false) << "Unsupported type: " << type;
return IRFunction::FN_END;
}
}
// IR Generation for updating a single aggregation slot. Signature is:
// void UpdateSlot(FunctionContext* fn_ctx, ExprContext* expr_ctx,
// AggTuple* agg_tuple, char** row)
//
// The IR for sum(double_col) is:
// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
// %"class.impala::ExprContext"* %expr_ctx,
// { i8, [7 x i8], double }* %agg_tuple,
// %"class.impala::TupleRow"* %row) #34 {
// entry:
// %src = call { i8, double } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx,
// %"class.impala::TupleRow"* %row)
// %0 = extractvalue { i8, double } %src, 0
// %is_null = trunc i8 %0 to i1
// br i1 %is_null, label %ret, label %src_not_null
//
// src_not_null: ; preds = %entry
// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], double },
// { i8, [7 x i8], double }* %agg_tuple, i32 0, i32 2
// call void @SetNotNull({ i8, [7 x i8], double }* %agg_tuple)
// %dst_val = load double, double* %dst_slot_ptr
// %val = extractvalue { i8, double } %src, 1
// %1 = fadd double %dst_val, %val
// store double %1, double* %dst_slot_ptr
// br label %ret
//
// ret: ; preds = %src_not_null, %entry
// ret void
// }
//
// The IR for ndv(double_col) is:
// define void @UpdateSlot(%"class.impala_udf::FunctionContext"* %fn_ctx,
// %"class.impala::ExprContext"* %expr_ctx,
// { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple,
// %"class.impala::TupleRow"* %row) #34 {
// entry:
// %dst_lowered_ptr = alloca { i64, i8* }
// %src_lowered_ptr = alloca { i64, i8* }
// %src = call { i64, i8* } @GetSlotRef(%"class.impala::ExprContext"* %expr_ctx,
// %"class.impala::TupleRow"* %row)
// %0 = extractvalue { i64, i8* } %src, 0
// %is_null = trunc i64 %0 to i1
// br i1 %is_null, label %ret, label %src_not_null
//
// src_not_null: ; preds = %entry
// %dst_slot_ptr = getelementptr inbounds { i8, [7 x i8], %"struct.impala::StringValue" },
// { i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple, i32 0, i32 2
// call void @SetNotNull({ i8, [7 x i8], %"struct.impala::StringValue" }* %agg_tuple)
// %dst_val =
// load %"struct.impala::StringValue", %"struct.impala::StringValue"* %dst_slot_ptr
// store { i64, i8* } %src, { i64, i8* }* %src_lowered_ptr
// %src_unlowered_ptr =
// bitcast { i64, i8* }* %src_lowered_ptr to %"struct.impala_udf::StringVal"*
// %ptr = extractvalue %"struct.impala::StringValue" %dst_val, 0
// %dst_stringval = insertvalue { i64, i8* } zeroinitializer, i8* %ptr, 1
// %len = extractvalue %"struct.impala::StringValue" %dst_val, 1
// %1 = extractvalue { i64, i8* } %dst_stringval, 0
// %2 = zext i32 %len to i64
// %3 = shl i64 %2, 32
// %4 = and i64 %1, 4294967295
// %5 = or i64 %4, %3
// %dst_stringval1 = insertvalue { i64, i8* } %dst_stringval, i64 %5, 0
// store { i64, i8* } %dst_stringval1, { i64, i8* }* %dst_lowered_ptr
// %dst_unlowered_ptr =
// bitcast { i64, i8* }* %dst_lowered_ptr to %"struct.impala_udf::StringVal"*
// call void @HllMerge(%"class.impala_udf::FunctionContext"* %fn_ctx,
// %"struct.impala_udf::StringVal"* %src_unlowered_ptr,
// %"struct.impala_udf::StringVal"* %dst_unlowered_ptr)
// %anyval_result = load { i64, i8* }, { i64, i8* }* %dst_lowered_ptr
// %6 = extractvalue { i64, i8* } %anyval_result, 0
// %7 = ashr i64 %6, 32
// %8 = trunc i64 %7 to i32
// %9 = insertvalue %"struct.impala::StringValue" zeroinitializer, i32 %8, 1
// %10 = extractvalue { i64, i8* } %anyval_result, 1
// %11 = insertvalue %"struct.impala::StringValue" %9, i8* %10, 0
// store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* %dst_slot_ptr
// br label %ret
//
// ret: ; preds = %src_not_null, %entry
// ret void
// }
llvm::Function* AggregationNode::CodegenUpdateSlot(LlvmCodeGen* codegen,
AggFnEvaluator* evaluator, SlotDescriptor* slot_desc) {
// TODO: Fix this DCHECK and Init() once CodegenUpdateSlot() can handle AggFnEvaluator
// with multiple input expressions (e.g. group_concat).
DCHECK_EQ(evaluator->input_expr_ctxs().size(), 1);
ExprContext* input_expr_ctx = evaluator->input_expr_ctxs()[0];
Expr* input_expr = input_expr_ctx->root();
// TODO: implement timestamp
if (input_expr->type().type == TYPE_TIMESTAMP) return NULL;
Function* agg_expr_fn;
Status status = input_expr->GetCodegendComputeFn(codegen, &agg_expr_fn);
if (!status.ok()) {
VLOG_QUERY << "Could not codegen UpdateSlot(): " << status.GetDetail();
return NULL;
}
DCHECK(agg_expr_fn != NULL);
PointerType* fn_ctx_ptr_type =
codegen->GetPtrType(FunctionContextImpl::LLVM_FUNCTIONCONTEXT_NAME);
PointerType* expr_ctx_ptr_type = codegen->GetPtrType(ExprContext::LLVM_CLASS_NAME);
StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
if (tuple_struct == NULL) {
VLOG_QUERY << "Could not codegen UpdateSlot(): could not generate tuple struct.";
return NULL;
}
PointerType* tuple_ptr_type = codegen->GetPtrType(tuple_struct);
PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
// Create UpdateSlot prototype
LlvmCodeGen::FnPrototype prototype(codegen, "UpdateSlot", codegen->void_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("fn_ctx", fn_ctx_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("expr_ctx", expr_ctx_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", tuple_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
LlvmBuilder builder(codegen->context());
Value* args[4];
Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
Value* fn_ctx_arg = args[0];
Value* expr_ctx_arg = args[1];
Value* agg_tuple_arg = args[2];
Value* row_arg = args[3];
BasicBlock* src_not_null_block =
BasicBlock::Create(codegen->context(), "src_not_null", fn);
BasicBlock* ret_block = BasicBlock::Create(codegen->context(), "ret", fn);
// Call expr function to get src slot value
Value* agg_expr_fn_args[] = { expr_ctx_arg, row_arg };
CodegenAnyVal src = CodegenAnyVal::CreateCallWrapped(
codegen, &builder, input_expr->type(), agg_expr_fn, agg_expr_fn_args, "src");
Value* src_is_null = src.GetIsNull();
builder.CreateCondBr(src_is_null, ret_block, src_not_null_block);
// Src slot is not null, update dst_slot
builder.SetInsertPoint(src_not_null_block);
Value* dst_ptr = builder.CreateStructGEP(NULL, agg_tuple_arg,
slot_desc->llvm_field_idx(), "dst_slot_ptr");
Value* result = NULL;
if (slot_desc->is_nullable()) {
// Dst is NULL, just update dst slot to src slot and clear null bit
slot_desc->CodegenSetNullIndicator(
codegen, &builder, agg_tuple_arg, codegen->false_value());
}
// Update the slot
Value* dst_value = builder.CreateLoad(dst_ptr, "dst_val");
switch (evaluator->agg_op()) {
case AggFnEvaluator::COUNT:
if (evaluator->is_merge()) {
result = builder.CreateAdd(dst_value, src.GetVal(), "count_sum");
} else {
result = builder.CreateAdd(dst_value,
codegen->GetIntConstant(TYPE_BIGINT, 1), "count_inc");
}
break;
case AggFnEvaluator::MIN: {
Function* min_fn = codegen->CodegenMinMax(slot_desc->type(), true);
Value* min_args[] = { dst_value, src.GetVal() };
result = builder.CreateCall(min_fn, min_args, "min_value");
break;
}
case AggFnEvaluator::MAX: {
Function* max_fn = codegen->CodegenMinMax(slot_desc->type(), false);
Value* max_args[] = { dst_value, src.GetVal() };
result = builder.CreateCall(max_fn, max_args, "max_value");
break;
}
case AggFnEvaluator::SUM:
if (slot_desc->type().type == TYPE_FLOAT || slot_desc->type().type == TYPE_DOUBLE) {
result = builder.CreateFAdd(dst_value, src.GetVal());
} else {
result = builder.CreateAdd(dst_value, src.GetVal());
}
break;
case AggFnEvaluator::NDV: {
DCHECK_EQ(slot_desc->type().type, TYPE_STRING);
IRFunction::Type ir_function_type = evaluator->is_merge() ? IRFunction::HLL_MERGE
: GetHllUpdateFunction2(input_expr->type());
Function* hll_fn = codegen->GetFunction(ir_function_type, false);
// Create pointer to src_anyval to pass to HllUpdate() function. We must use the
// unlowered type.
Value* src_unlowered_ptr = src.GetUnloweredPtr("src_unlowered_ptr");
// Create StringVal* intermediate argument from dst_value
CodegenAnyVal dst_stringval =
CodegenAnyVal::GetNonNullVal(codegen, &builder, TYPE_STRING, "dst_stringval");
dst_stringval.SetFromRawValue(dst_value);
// Create pointer to dst_stringval to pass to HllUpdate() function. We must use
// the unlowered type.
Value* dst_lowered_ptr = dst_stringval.GetLoweredPtr("dst_lowered_ptr");
Type* dst_unlowered_ptr_type =
codegen->GetPtrType(CodegenAnyVal::GetUnloweredType(codegen, TYPE_STRING));
Value* dst_unlowered_ptr = builder.CreateBitCast(
dst_lowered_ptr, dst_unlowered_ptr_type, "dst_unlowered_ptr");
// Call 'hll_fn'
builder.CreateCall(
hll_fn, ArrayRef<Value*>({fn_ctx_arg, src_unlowered_ptr, dst_unlowered_ptr}));
// Convert StringVal intermediate 'dst_arg' back to StringValue
Value* anyval_result = builder.CreateLoad(dst_lowered_ptr, "anyval_result");
result = CodegenAnyVal(codegen, &builder, TYPE_STRING, anyval_result)
.ToNativeValue();
break;
}
default:
DCHECK(false) << "bad aggregate operator: " << evaluator->agg_op();
}
builder.CreateStore(result, dst_ptr);
builder.CreateBr(ret_block);
builder.SetInsertPoint(ret_block);
builder.CreateRetVoid();
return codegen->FinalizeFunction(fn);
}
// IR codegen for the UpdateTuple loop. This loop is query specific and
// based on the aggregate functions. The function signature must match the non-
// codegen'd UpdateTuple exactly.
// For the query:
// select count(*), count(int_col), sum(double_col) the IR looks like:
//
// ; Function Attrs: alwaysinline
// define void @UpdateTuple(%"class.impala::AggregationNode"* %this_ptr,
// %"class.impala::Tuple"* %agg_tuple,
// %"class.impala::TupleRow"* %tuple_row) #34 {
// entry:
// %tuple =
// bitcast %"class.impala::Tuple"* %agg_tuple to { i8, [7 x i8], i64, i64, double }*
// %src_slot = getelementptr inbounds { i8, [7 x i8], i64, i64, double },
// { i8, [7 x i8], i64, i64, double }* %tuple, i32 0, i32 2
// %count_star_val = load i64, i64* %src_slot
// %count_star_inc = add i64 %count_star_val, 1
// store i64 %count_star_inc, i64* %src_slot
// %0 = call %"class.impala_udf::FunctionContext"*
// @_ZNK6impala15AggregationNode11GetAggFnCtxEi(
// %"class.impala::AggregationNode"* %this_ptr, i32 1)
// %1 = call %"class.impala::ExprContext"*
// @_ZNK6impala15AggregationNode13GetAggExprCtxEi(
// %"class.impala::AggregationNode"* %this_ptr, i32 1)
// call void @UpdateSlot(%"class.impala_udf::FunctionContext"* %0,
// %"class.impala::ExprContext"* %1,
// { i8, [7 x i8], i64, i64, double }* %tuple,
// %"class.impala::TupleRow"* %tuple_row)
// %2 = call %"class.impala_udf::FunctionContext"*
// @_ZNK6impala15AggregationNode11GetAggFnCtxEi(
// %"class.impala::AggregationNode"* %this_ptr, i32 2)
// %3 = call %"class.impala::ExprContext"*
// @_ZNK6impala15AggregationNode13GetAggExprCtxEi(
// %"class.impala::AggregationNode"* %this_ptr, i32 2)
// call void @UpdateSlot.3(%"class.impala_udf::FunctionContext"* %2,
// %"class.impala::ExprContext"* %3,
// { i8, [7 x i8], i64, i64, double }* %tuple,
// %"class.impala::TupleRow"* %tuple_row)
// ret void
// }
Function* AggregationNode::CodegenUpdateTuple(LlvmCodeGen* codegen) {
SCOPED_TIMER(codegen->codegen_timer());
int j = probe_expr_ctxs_.size();
for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
AggFnEvaluator* evaluator = aggregate_evaluators_[i];
// Timestamp and char are never supported. NDV supports decimal and string but no
// other functions.
// TODO: the other aggregate functions might work with decimal as-is
if (slot_desc->type().type == TYPE_TIMESTAMP || slot_desc->type().type == TYPE_CHAR ||
(evaluator->agg_op() != AggFnEvaluator::NDV &&
(slot_desc->type().type == TYPE_DECIMAL ||
slot_desc->type().type == TYPE_STRING ||
slot_desc->type().type == TYPE_VARCHAR))) {
VLOG_QUERY << "Could not codegen UpdateIntermediateTuple because "
<< "string, char, timestamp and decimal are not yet supported.";
return NULL;
}
// Don't codegen things that aren't builtins (for now)
if (!evaluator->is_builtin()) return NULL;
}
if (intermediate_tuple_desc_->GetLlvmStruct(codegen) == NULL) {
VLOG_QUERY << "Could not codegen UpdateTuple because we could"
<< "not generate a matching llvm struct for the intermediate tuple.";
return NULL;
}
// Get the types to match the UpdateTuple signature
Type* agg_node_type = codegen->GetType(AggregationNode::LLVM_CLASS_NAME);
Type* agg_tuple_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
DCHECK(agg_node_type != NULL);
DCHECK(agg_tuple_type != NULL);
DCHECK(tuple_row_type != NULL);
PointerType* agg_node_ptr_type = codegen->GetPtrType(agg_node_type);
PointerType* agg_tuple_ptr_type = codegen->GetPtrType(agg_tuple_type);
PointerType* tuple_row_ptr_type = codegen->GetPtrType(tuple_row_type);
// Signature for UpdateTuple is
// void UpdateTuple(AggregationNode* this, FunctionContext** fn_ctx,
// ExprContext** expr_ctx, Tuple* tuple, TupleRow* row)
// This signature needs to match the non-codegen'd signature exactly.
StructType* tuple_struct = intermediate_tuple_desc_->GetLlvmStruct(codegen);
if (tuple_struct == NULL) {
VLOG_QUERY << "Could not codegen UpdateSlot(): could not generate tuple struct.";
return NULL;
}
PointerType* tuple_ptr = PointerType::get(tuple_struct, 0);
LlvmCodeGen::FnPrototype prototype(codegen, "UpdateTuple", codegen->void_type());
prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", agg_node_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("agg_tuple", agg_tuple_ptr_type));
prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
LlvmBuilder builder(codegen->context());
Value* args[3];
Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
// Cast the parameter types to the internal llvm runtime types.
// TODO: get rid of this by using right type in function signature
Value* this_arg = args[0];
Value* agg_tuple_arg = builder.CreateBitCast(args[1], tuple_ptr, "tuple");
Value* row_arg = args[2];
Function* get_fn_ctx_fn = codegen->GetFunction(IRFunction::AGG_NODE_GET_FN_CTX, false);
DCHECK(get_fn_ctx_fn != NULL);
Function* get_expr_ctx_fn =
codegen->GetFunction(IRFunction::AGG_NODE_GET_EXPR_CTX, false);
DCHECK(get_expr_ctx_fn != NULL);
// Loop over each expr and generate the IR for that slot. If the expr is not
// count(*), generate a helper IR function to update the slot and call that.
j = probe_expr_ctxs_.size();
for (int i = 0; i < aggregate_evaluators_.size(); ++i, ++j) {
SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[j];
AggFnEvaluator* evaluator = aggregate_evaluators_[i];
if (evaluator->is_count_star()) {
// TODO: we should be able to hoist this up to the loop over the batch and just
// increment the slot by the number of rows in the batch.
int field_idx = slot_desc->llvm_field_idx();
Value* const_one = codegen->GetIntConstant(TYPE_BIGINT, 1);
Value* slot_ptr = builder.CreateStructGEP(NULL, agg_tuple_arg, field_idx,
"src_slot");
Value* slot_loaded = builder.CreateLoad(slot_ptr, "count_star_val");
Value* count_inc = builder.CreateAdd(slot_loaded, const_one, "count_star_inc");
builder.CreateStore(count_inc, slot_ptr);
} else {
Function* update_slot_fn = CodegenUpdateSlot(codegen, evaluator, slot_desc);
if (update_slot_fn == NULL) return NULL;
// Call GetAggFnCtx() to get the function context.
Value* get_fn_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) };
Value* fn_ctx = builder.CreateCall(get_fn_ctx_fn, get_fn_ctx_args);
// Call GetAggExprCtx() to get the expression context.
DCHECK(agg_expr_ctxs_[i] != NULL);
Value* get_expr_ctx_args[] = { this_arg, codegen->GetIntConstant(TYPE_INT, i) };
Value* expr_ctx = builder.CreateCall(get_expr_ctx_fn, get_expr_ctx_args);
Value* update_slot_args[] = { fn_ctx, expr_ctx, agg_tuple_arg, row_arg };
builder.CreateCall(update_slot_fn, update_slot_args);
}
}
builder.CreateRetVoid();
// CodegenProcessRowBatch() does the final optimizations.
return codegen->FinalizeFunction(fn);
}
Function* AggregationNode::CodegenProcessRowBatch(LlvmCodeGen* codegen,
Function* update_tuple_fn) {
SCOPED_TIMER(codegen->codegen_timer());
DCHECK(update_tuple_fn != NULL);
// Get the cross compiled update row batch function
IRFunction::Type ir_fn = (!probe_expr_ctxs_.empty() ?
IRFunction::AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING :
IRFunction::AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING);
Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
if (process_batch_fn == NULL) {
LOG(ERROR) << "Could not find AggregationNode::ProcessRowBatch in module.";
return NULL;
}
int replaced;
if (!probe_expr_ctxs_.empty()) {
// Aggregation w/o grouping does not use a hash table.
// Codegen for hash
Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
if (hash_fn == NULL) return NULL;
// Codegen HashTable::Equals
Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
if (equals_fn == NULL) return NULL;
// Codegen for evaluating build rows
Function* eval_build_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
if (eval_build_row_fn == NULL) return NULL;
// Codegen for evaluating probe rows
Function* eval_probe_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
if (eval_probe_row_fn == NULL) return NULL;
// Replace call sites
replaced =
codegen->ReplaceCallSites(process_batch_fn, eval_build_row_fn, "EvalBuildRow");
DCHECK_EQ(replaced, 1);
replaced =
codegen->ReplaceCallSites(process_batch_fn, eval_probe_row_fn, "EvalProbeRow");
DCHECK_EQ(replaced, 1);
replaced = codegen->ReplaceCallSites(process_batch_fn, hash_fn, "HashCurrentRow");
DCHECK_EQ(replaced, 2);
replaced = codegen->ReplaceCallSites(process_batch_fn, equals_fn, "Equals");
DCHECK_EQ(replaced, 1);
}
replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, "UpdateTuple");
DCHECK_EQ(replaced, 1);
return codegen->FinalizeFunction(process_batch_fn);
}
}