blob: 584abbecebb57a045dc61270409ed111e321a6cd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exprs/agg-fn-evaluator.h"
#include <sstream>
#include "codegen/llvm-codegen.h"
#include "common/logging.h"
#include "exprs/aggregate-functions.h"
#include "exprs/anyval-util.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-fn-call.h"
#include "gutil/strings/substitute.h"
#include "runtime/date-value.h"
#include "runtime/lib-cache.h"
#include "runtime/raw-value.h"
#include "runtime/runtime-state.h"
#include "runtime/string-value.inline.h"
#include "udf/udf-internal.h"
#include "util/debug-util.h"
#include <thrift/protocol/TDebugProtocol.h>
#include "common/names.h"
using namespace impala;
using namespace impala_udf;
using std::move;
// typedef for builtin aggregate functions. Unfortunately, these type defs don't
// really work since the actual builtin is implemented not in terms of the base
// AnyVal* type. Due to this, there are lots of casts when we use these typedefs.
// TODO: these typedefs exists as wrappers to go from (TupleRow, Tuple) to the
// types the aggregation functions need. This needs to be done with codegen instead.
typedef void (*InitFn)(FunctionContext*, AnyVal*);
typedef void (*UpdateFn0)(FunctionContext*, AnyVal*);
typedef void (*UpdateFn1)(FunctionContext*, const AnyVal&, AnyVal*);
typedef void (*UpdateFn2)(FunctionContext*, const AnyVal&, const AnyVal&, AnyVal*);
typedef void (*UpdateFn3)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, AnyVal*);
typedef void (*UpdateFn4)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, AnyVal*);
typedef void (*UpdateFn5)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, AnyVal*);
typedef void (*UpdateFn6)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, AnyVal*);
typedef void (*UpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, AnyVal*);
typedef void (*UpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, AnyVal*);
typedef StringVal (*SerializeFn)(FunctionContext*, const StringVal&);
typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&);
typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&);
const char* AggFnEvaluator::LLVM_CLASS_NAME = "class.impala::AggFnEvaluator";
AggFnEvaluator::AggFnEvaluator(const AggFn& agg_fn, bool is_clone)
: is_clone_(is_clone),
agg_fn_(agg_fn) {
}
AggFnEvaluator::~AggFnEvaluator() {
DCHECK(closed_);
}
const SlotDescriptor& AggFnEvaluator::intermediate_slot_desc() const {
return agg_fn_.intermediate_slot_desc();
}
const ColumnType& AggFnEvaluator::intermediate_type() const {
return agg_fn_.intermediate_type();
}
Status AggFnEvaluator::Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* pool,
MemPool* expr_perm_pool, MemPool* expr_results_pool, AggFnEvaluator** result) {
*result = nullptr;
// Create a new AggFn evaluator.
AggFnEvaluator* agg_fn_eval = pool->Add(new AggFnEvaluator(agg_fn, false));
agg_fn_eval->agg_fn_ctx_.reset(FunctionContextImpl::CreateContext(state, expr_perm_pool,
expr_results_pool, agg_fn.GetIntermediateTypeDesc(), agg_fn.GetOutputTypeDesc(),
agg_fn.arg_type_descs()));
Status status;
// Create the evaluators for the input expressions.
for (const ScalarExpr* input_expr : agg_fn.children()) {
ScalarExprEvaluator* input_eval;
status = ScalarExprEvaluator::Create(
*input_expr, state, pool, expr_perm_pool, expr_results_pool, &input_eval);
if (UNLIKELY(!status.ok())) goto cleanup;
agg_fn_eval->input_evals_.push_back(input_eval);
DCHECK(&input_eval->root() == input_expr);
AnyVal* staging_input_val;
status = AllocateAnyVal(state, expr_perm_pool, input_expr->type(),
"Could not allocate aggregate expression input value", &staging_input_val);
agg_fn_eval->staging_input_vals_.push_back(staging_input_val);
if (UNLIKELY(!status.ok())) goto cleanup;
}
DCHECK_EQ(agg_fn.GetNumChildren(), agg_fn_eval->input_evals_.size());
DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), agg_fn_eval->input_evals_.size());
status = AllocateAnyVal(state, expr_perm_pool, agg_fn.intermediate_type(),
"Could not allocate aggregate expression intermediate value",
&(agg_fn_eval->staging_intermediate_val_));
if (UNLIKELY(!status.ok())) goto cleanup;
status = AllocateAnyVal(state, expr_perm_pool, agg_fn.intermediate_type(),
"Could not allocate aggregate expression merge input value",
&(agg_fn_eval->staging_merge_input_val_));
if (UNLIKELY(!status.ok())) goto cleanup;
if (agg_fn.is_merge()) {
DCHECK_EQ(agg_fn_eval->staging_input_vals_.size(), 1)
<< "Merge should only have 1 input.";
}
*result = agg_fn_eval;
return Status::OK();
cleanup:
DCHECK(!status.ok());
agg_fn_eval->Close(state);
return status;
}
Status AggFnEvaluator::Create(const vector<AggFn*>& agg_fns, RuntimeState* state,
ObjectPool* pool, MemPool* expr_perm_pool, MemPool* expr_results_pool,
vector<AggFnEvaluator*>* evals) {
for (const AggFn* agg_fn : agg_fns) {
AggFnEvaluator* agg_fn_eval;
RETURN_IF_ERROR(AggFnEvaluator::Create(*agg_fn, state, pool, expr_perm_pool,
expr_results_pool, &agg_fn_eval));
evals->push_back(agg_fn_eval);
}
return Status::OK();
}
Status AggFnEvaluator::Open(RuntimeState* state) {
if (opened_) return Status::OK();
opened_ = true;
RETURN_IF_ERROR(ScalarExprEvaluator::Open(input_evals_, state));
// Now that we have opened all our input exprs, it is safe to evaluate any constant
// values for the UDA's FunctionContext (we cannot evaluate exprs before calling Open()
// on them).
vector<AnyVal*> constant_args(input_evals_.size(), nullptr);
for (int i = 0; i < input_evals_.size(); ++i) {
ScalarExprEvaluator* eval = input_evals_[i];
RETURN_IF_ERROR(eval->GetConstValue(state, *(agg_fn_.GetChild(i)),
&constant_args[i]));
}
agg_fn_ctx_->impl()->SetConstantArgs(move(constant_args));
return Status::OK();
}
Status AggFnEvaluator::Open(
const vector<AggFnEvaluator*>& evals, RuntimeState* state) {
for (AggFnEvaluator* eval : evals) RETURN_IF_ERROR(eval->Open(state));
return Status::OK();
}
void AggFnEvaluator::Close(RuntimeState* state) {
if (closed_) return;
closed_ = true;
if (!is_clone_) ScalarExprEvaluator::Close(input_evals_, state);
agg_fn_ctx_->impl()->Close();
agg_fn_ctx_.reset();
input_evals_.clear();
}
void AggFnEvaluator::Close(
const vector<AggFnEvaluator*>& evals, RuntimeState* state) {
for (AggFnEvaluator* eval : evals) eval->Close(state);
}
void AggFnEvaluator::SetDstSlot(const AnyVal* src, const SlotDescriptor& dst_slot_desc,
Tuple* dst) {
if (src->is_null) {
dst->SetNull(dst_slot_desc.null_indicator_offset());
return;
}
dst->SetNotNull(dst_slot_desc.null_indicator_offset());
void* slot = dst->GetSlot(dst_slot_desc.tuple_offset());
switch (dst_slot_desc.type().type) {
case TYPE_NULL:
return;
case TYPE_BOOLEAN:
*reinterpret_cast<bool*>(slot) = reinterpret_cast<const BooleanVal*>(src)->val;
return;
case TYPE_TINYINT:
*reinterpret_cast<int8_t*>(slot) = reinterpret_cast<const TinyIntVal*>(src)->val;
return;
case TYPE_SMALLINT:
*reinterpret_cast<int16_t*>(slot) = reinterpret_cast<const SmallIntVal*>(src)->val;
return;
case TYPE_INT:
*reinterpret_cast<int32_t*>(slot) = reinterpret_cast<const IntVal*>(src)->val;
return;
case TYPE_BIGINT:
*reinterpret_cast<int64_t*>(slot) = reinterpret_cast<const BigIntVal*>(src)->val;
return;
case TYPE_FLOAT:
*reinterpret_cast<float*>(slot) = reinterpret_cast<const FloatVal*>(src)->val;
return;
case TYPE_DOUBLE:
*reinterpret_cast<double*>(slot) = reinterpret_cast<const DoubleVal*>(src)->val;
return;
case TYPE_STRING:
case TYPE_VARCHAR:
*reinterpret_cast<StringValue*>(slot) =
StringValue::FromStringVal(*reinterpret_cast<const StringVal*>(src));
return;
case TYPE_CHAR:
case TYPE_FIXED_UDA_INTERMEDIATE:
if (UNLIKELY(slot != reinterpret_cast<const StringVal*>(src)->ptr)) {
agg_fn_ctx_->SetError(Substitute("UDA should not set pointer of $0 intermediate",
dst_slot_desc.type().DebugString()).c_str());
}
return;
case TYPE_TIMESTAMP:
*reinterpret_cast<TimestampValue*>(slot) = TimestampValue::FromTimestampVal(
*reinterpret_cast<const TimestampVal*>(src));
return;
case TYPE_DECIMAL:
switch (dst_slot_desc.type().GetByteSize()) {
case 4:
*reinterpret_cast<int32_t*>(slot) =
reinterpret_cast<const DecimalVal*>(src)->val4;
return;
case 8:
*reinterpret_cast<int64_t*>(slot) =
reinterpret_cast<const DecimalVal*>(src)->val8;
return;
case 16:
#if __BYTE_ORDER == __LITTLE_ENDIAN
// On little endian, &val4, &val8, &val16 are the same address.
// This code seems to trip up clang causing it to generate code that crashes.
// Be careful when modifying this. See IMPALA-959 for more details.
// I suspect an issue with xmm registers not reading from aligned memory.
memcpy(slot, &reinterpret_cast<const DecimalVal*>(src)->val16, 16);
#else
DCHECK(false) << "Not implemented.";
#endif
return;
default:
break;
}
case TYPE_DATE:
*reinterpret_cast<DateValue*>(slot) =
DateValue::FromDateVal(*reinterpret_cast<const DateVal*>(src));
return;
default:
DCHECK(false) << "NYI: " << dst_slot_desc.type();
}
}
// This function would be replaced in codegen.
void AggFnEvaluator::Init(Tuple* dst) {
DCHECK(opened_);
DCHECK(agg_fn_.init_fn_ != nullptr);
for (ScalarExprEvaluator* input_eval : input_evals_) {
DCHECK(input_eval->opened());
}
const ColumnType& type = intermediate_type();
const SlotDescriptor& slot_desc = intermediate_slot_desc();
if (type.type == TYPE_CHAR || type.type == TYPE_FIXED_UDA_INTERMEDIATE) {
// The intermediate value is represented as a fixed-length buffer inline in the tuple.
// The aggregate function writes to this buffer directly. staging_intermediate_val_
// is a StringVal with a pointer to the slot and the length of the slot.
void* slot = dst->GetSlot(slot_desc.tuple_offset());
StringVal* sv = reinterpret_cast<StringVal*>(staging_intermediate_val_);
sv->is_null = dst->IsNull(slot_desc.null_indicator_offset());
sv->ptr = reinterpret_cast<uint8_t*>(slot);
sv->len = type.len;
}
reinterpret_cast<InitFn>(agg_fn_.init_fn_)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(staging_intermediate_val_, slot_desc, dst);
agg_fn_ctx_->impl()->set_num_updates(0);
agg_fn_ctx_->impl()->set_num_removes(0);
}
static void SetAnyVal(const SlotDescriptor& desc, Tuple* tuple, AnyVal* dst) {
bool is_null = tuple->IsNull(desc.null_indicator_offset());
void* slot = nullptr;
if (!is_null) slot = tuple->GetSlot(desc.tuple_offset());
AnyValUtil::SetAnyVal(slot, desc.type(), dst);
}
void AggFnEvaluator::Update(const TupleRow* row, Tuple* dst, void* fn) {
if (fn == nullptr) return;
const SlotDescriptor& slot_desc = intermediate_slot_desc();
SetAnyVal(slot_desc, dst, staging_intermediate_val_);
for (int i = 0; i < input_evals_.size(); ++i) {
void* src_slot = input_evals_[i]->GetValue(row);
DCHECK(&input_evals_[i]->root() == agg_fn_.GetChild(i));
AnyValUtil::SetAnyVal(src_slot, agg_fn_.GetChild(i)->type(), staging_input_vals_[i]);
}
// TODO: this part is not so good and not scalable. It can be replaced with
// codegen but we can also consider leaving it for the first few cases for
// debugging.
switch (input_evals_.size()) {
case 0:
reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
break;
case 1:
reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], staging_intermediate_val_);
break;
case 2:
reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1], staging_intermediate_val_);
break;
case 3:
reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], staging_intermediate_val_);
break;
case 4:
reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3], staging_intermediate_val_);
break;
case 5:
reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], staging_intermediate_val_);
break;
case 6:
reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5], staging_intermediate_val_);
break;
case 7:
reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], staging_intermediate_val_);
break;
case 8:
reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
*staging_input_vals_[0], *staging_input_vals_[1],
*staging_input_vals_[2], *staging_input_vals_[3],
*staging_input_vals_[4], *staging_input_vals_[5],
*staging_input_vals_[6], *staging_input_vals_[7],
staging_intermediate_val_);
break;
default:
DCHECK(false) << "NYI";
}
SetDstSlot(staging_intermediate_val_, slot_desc, dst);
}
void AggFnEvaluator::Merge(Tuple* src, Tuple* dst) {
DCHECK(agg_fn_.merge_fn_ != nullptr);
const SlotDescriptor& slot_desc = intermediate_slot_desc();
SetAnyVal(slot_desc, dst, staging_intermediate_val_);
SetAnyVal(slot_desc, src, staging_merge_input_val_);
// The merge fn always takes one input argument.
reinterpret_cast<UpdateFn1>(agg_fn_.merge_fn_)(agg_fn_ctx_.get(),
*staging_merge_input_val_, staging_intermediate_val_);
SetDstSlot(staging_intermediate_val_, slot_desc, dst);
}
void AggFnEvaluator::SerializeOrFinalize(Tuple* src,
const SlotDescriptor& dst_slot_desc, Tuple* dst, void* fn) {
// No fn was given and the src and dst are identical. Nothing to be done.
if (fn == nullptr && src == dst) return;
// src != dst means we are performing a Finalize(), so even if fn == null we
// still must copy the value of the src slot into dst.
const SlotDescriptor& slot_desc = intermediate_slot_desc();
bool src_slot_null = src->IsNull(slot_desc.null_indicator_offset());
void* src_slot = nullptr;
if (!src_slot_null) src_slot = src->GetSlot(slot_desc.tuple_offset());
// No fn was given but the src and dst tuples are different (doing a Finalize()).
// Just copy the src slot into the dst tuple.
if (fn == nullptr) {
DCHECK_EQ(intermediate_type(), dst_slot_desc.type());
RawValue::Write(src_slot, dst, &dst_slot_desc, nullptr);
return;
}
AnyValUtil::SetAnyVal(src_slot, intermediate_type(), staging_intermediate_val_);
switch (dst_slot_desc.type().type) {
case TYPE_BOOLEAN: {
typedef BooleanVal(*Fn)(FunctionContext*, AnyVal*);
BooleanVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_TINYINT: {
typedef TinyIntVal(*Fn)(FunctionContext*, AnyVal*);
TinyIntVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_SMALLINT: {
typedef SmallIntVal(*Fn)(FunctionContext*, AnyVal*);
SmallIntVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_INT: {
typedef IntVal(*Fn)(FunctionContext*, AnyVal*);
IntVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_BIGINT: {
typedef BigIntVal(*Fn)(FunctionContext*, AnyVal*);
BigIntVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_FLOAT: {
typedef FloatVal(*Fn)(FunctionContext*, AnyVal*);
FloatVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_DOUBLE: {
typedef DoubleVal(*Fn)(FunctionContext*, AnyVal*);
DoubleVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_STRING:
case TYPE_VARCHAR: {
typedef StringVal(*Fn)(FunctionContext*, AnyVal*);
StringVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_DECIMAL: {
typedef DecimalVal(*Fn)(FunctionContext*, AnyVal*);
DecimalVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_TIMESTAMP: {
typedef TimestampVal(*Fn)(FunctionContext*, AnyVal*);
TimestampVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
case TYPE_CHAR:
case TYPE_FIXED_UDA_INTERMEDIATE: {
// Serialize() or Finalize() may rewrite the data in place, but must return the
// same pointer.
typedef StringVal(*Fn)(FunctionContext*, AnyVal*);
StringVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
if (UNLIKELY(dst->GetSlot(dst_slot_desc.tuple_offset()) != v.ptr)) {
agg_fn_ctx_->SetError(Substitute("UDA Serialize() and Finalize() must return "
"same pointer as input for $0 intermediate",
dst_slot_desc.type().DebugString()).c_str());
}
break;
}
case TYPE_DATE: {
typedef DateVal(*Fn)(FunctionContext*, AnyVal*);
DateVal v = reinterpret_cast<Fn>(fn)(
agg_fn_ctx_.get(), staging_intermediate_val_);
SetDstSlot(&v, dst_slot_desc, dst);
break;
}
default:
DCHECK(false) << "NYI";
}
}
void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool,
MemPool* expr_results_pool, AggFnEvaluator** cloned_eval) const {
DCHECK(opened_);
*cloned_eval = pool->Add(new AggFnEvaluator(agg_fn_, true));
(*cloned_eval)->agg_fn_ctx_.reset(
agg_fn_ctx_->impl()->Clone(expr_perm_pool, expr_results_pool));
DCHECK_EQ((*cloned_eval)->input_evals_.size(), 0);
(*cloned_eval)->input_evals_ = input_evals_;
(*cloned_eval)->staging_input_vals_ = staging_input_vals_;
(*cloned_eval)->staging_intermediate_val_ = staging_intermediate_val_;
(*cloned_eval)->staging_merge_input_val_ = staging_merge_input_val_;
(*cloned_eval)->opened_ = true;
}
void AggFnEvaluator::ShallowClone(ObjectPool* pool, MemPool* expr_perm_pool,
MemPool* expr_results_pool, const vector<AggFnEvaluator*>& evals,
vector<AggFnEvaluator*>* cloned_evals) {
for (const AggFnEvaluator* eval : evals) {
AggFnEvaluator* cloned_eval;
eval->ShallowClone(pool, expr_perm_pool, expr_results_pool, &cloned_eval);
cloned_evals->push_back(cloned_eval);
}
}
vector<ScopedResultsPool> ScopedResultsPool::Create(
const vector<AggFnEvaluator*>& evals, MemPool* new_results_pool) {
vector<ScopedResultsPool> result;
result.reserve(evals.size());
for (AggFnEvaluator* eval : evals) {
result.emplace_back(eval, new_results_pool);
}
return result;
}