blob: 2ce968cd439f5c722678ae067f1154e739613c33 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXPRS_AGG_FN_EVALUATOR_H
#define IMPALA_EXPRS_AGG_FN_EVALUATOR_H
#include <string>
#include <boost/scoped_ptr.hpp>
#include "common/status.h"
#include "exprs/agg-fn.h"
#include "runtime/descriptors.h"
#include "runtime/lib-cache.h"
#include "runtime/tuple-row.h"
#include "runtime/types.h"
#include "udf/udf.h"
#include "udf/udf-internal.h"
#include "gen-cpp/Exprs_types.h"
#include "gen-cpp/PlanNodes_types.h"
#include "gen-cpp/Types_types.h"
namespace impala {
class MemPool;
class MemTracker;
class ObjectPool;
class RowDescriptor;
class RuntimeState;
class SlotDescriptor;
class Tuple;
class TupleRow;
class TExprNode;
/// AggFnEvaluator is the interface for evaluating aggregate functions during execution.
///
/// AggFnEvaluator contains runtime state and implements wrapper functions which convert
/// the input TupleRow into AnyVal format expected by UDAF functions defined in AggFn.
/// It also evaluates TupleRow against input expressions, stores the results in staging
/// input values which are passed to Update() function to update the intermediate value
/// and handles the merging of intermediate values in the merge phases of execution.
///
/// This class is not threadsafe. An evaluator can be cloned to isolate resource
/// consumption per partition in an aggregation node.
///
class AggFnEvaluator {
public:
/// Creates an AggFnEvaluator object from the aggregate expression 'agg_fn'.
/// The evaluator is added to 'pool' and returned in 'eval'. This will also
/// create a single evaluator for each input expression. All allocations will come
/// from 'mem_pool'. Note that it's the responsibility to call Close() all evaluators
/// even if this function returns error status on initialization failure.
static Status Create(const AggFn& agg_fn, RuntimeState* state, ObjectPool* pool,
MemPool* mem_pool, AggFnEvaluator** eval) WARN_UNUSED_RESULT;
/// Convenience functions for creating evaluators for multiple aggregate functions.
static Status Create(const std::vector<AggFn*>& agg_fns, RuntimeState* state,
ObjectPool* pool, MemPool* mem_pool, std::vector<AggFnEvaluator*>* evals)
WARN_UNUSED_RESULT;
~AggFnEvaluator();
/// Initializes the evaluator by calling Open() on all the input expressions' evaluators
/// and caches all constant input arguments.
/// TODO: Move the evaluation of constant input arguments to AggFn setup.
Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
/// Convenience functions for opening multiple AggFnEvaluators.
static Status Open(const std::vector<AggFnEvaluator*>& evals,
RuntimeState* state) WARN_UNUSED_RESULT;
/// Used by PartitionedAggregation node to initialize one evaluator per partition.
/// Avoid the overhead of re-initializing an evaluator (e.g. calling GetConstVal()
/// on the input expressions). Cannot be called until after Open() has been called.
/// 'cloned_eval' is a shallow copy of this evaluator: all input values, staging
/// intermediate values and merge values are shared with the original evaluator. Only
/// the FunctionContext 'agg_fn_ctx' is cloned for resource isolation per partition.
/// So, it's not safe to use cloned evaluators concurrently.
void ShallowClone(
ObjectPool* pool, MemPool* mem_pool, AggFnEvaluator** cloned_eval) const;
/// Convenience function for cloning multiple evaluators. The newly cloned evaluators
/// are appended to 'cloned_evals'.
static void ShallowClone(ObjectPool* pool, MemPool* mem_pool,
const std::vector<AggFnEvaluator*>& evals,
std::vector<AggFnEvaluator*>* cloned_evals);
/// Free resources owned by the evaluator.
void Close(RuntimeState* state);
static void Close(const std::vector<AggFnEvaluator*>& evals, RuntimeState* state);
const AggFn& agg_fn() const { return agg_fn_; }
FunctionContext* IR_ALWAYS_INLINE agg_fn_ctx() const;
ScalarExprEvaluator* const* IR_ALWAYS_INLINE input_evals() const;
/// Call the initialization function of the AggFn. May update 'dst'.
void Init(Tuple* dst);
/// Updates the intermediate state dst based on adding the input src row. This can be
/// called either to drive the UDA's Update() or Merge() function, depending on whether
/// the AggFn is a merging aggregation.
void Add(const TupleRow* src, Tuple* dst);
/// Updates the intermediate state dst to remove the input src row, i.e. undo
/// Add(src, dst). Only used internally for analytic fn builtins.
void Remove(const TupleRow* src, Tuple* dst);
/// Explicitly does a merge, even if this evaluator is not marked as merging.
/// This is used by the partitioned agg node when it needs to merge spill results.
/// In the non-spilling case, this node would normally not merge.
void Merge(Tuple* src, Tuple* dst);
/// Flattens any intermediate values containing pointers, and frees any memory
/// allocated during the init, update and merge phases.
void Serialize(Tuple* dst);
/// Does one final transformation of the aggregated value in 'agg_val' and stores the
/// result in 'output_val'. Also frees the resources allocated during init, update and
/// merge phases.
void Finalize(Tuple* agg_val, Tuple* output_val);
/// Puts the finalized value from Tuple* src in Tuple* dst just as Finalize() does.
/// However, unlike Finalize(), GetValue() does not clean up state in src.
/// GetValue() can be called repeatedly with the same src. Only used internally for
/// analytic fn builtins. Note that StringVal result is from local allocation (which
/// will be freed in the next QueryMaintenance()) so it needs to be copied out if it
/// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row batch).
void GetValue(Tuple* src, Tuple* dst);
/// Helper functions for calling the above functions on many evaluators.
static void Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst);
static void Add(const std::vector<AggFnEvaluator*>& evals, const TupleRow* src,
Tuple* dst);
static void Remove(const std::vector<AggFnEvaluator*>& evals,
const TupleRow* src, Tuple* dst);
static void Serialize(const std::vector<AggFnEvaluator*>& evals,
Tuple* dst);
static void GetValue(const std::vector<AggFnEvaluator*>& evals, Tuple* src,
Tuple* dst);
static void Finalize(const std::vector<AggFnEvaluator*>& evals, Tuple* src,
Tuple* dst);
/// Free local allocations made in UDA functions and input arguments' evals.
void FreeLocalAllocations();
static void FreeLocalAllocations(const std::vector<AggFnEvaluator*>& evals);
std::string DebugString() const;
static std::string DebugString(const std::vector<AggFnEvaluator*>& evals);
static const char* LLVM_CLASS_NAME;
private:
/// True if the evaluator has been initialized.
bool opened_ = false;
/// True if the evaluator has been closed.
bool closed_ = false;
/// True if this evaluator is created from a ShallowClone() call.
const bool is_clone_;
const AggFn& agg_fn_;
/// Pointer to the MemPool which all allocations come from.
/// Owned by the exec node which owns this evaluator.
MemPool* mem_pool_ = nullptr;
/// This contains runtime state such as constant input arguments to the aggregate
/// functions and a FreePool from which the intermediate values are allocated.
/// Owned by this evaluator.
boost::scoped_ptr<FunctionContext> agg_fn_ctx_;
/// Evaluators for input expressions for this aggregate function.
/// Empty if there is no input expression (e.g. count(*)).
std::vector<ScalarExprEvaluator*> input_evals_;
/// Staging input values used by the interpreted Update() / Merge() paths.
/// It stores the evaluation results of input expressions to be passed to the
/// Update() / Merge() function.
std::vector<impala_udf::AnyVal*> staging_input_vals_;
/// Staging intermediate and merged values used in the interpreted
/// Update() / Merge() paths.
impala_udf::AnyVal* staging_intermediate_val_ = nullptr;
impala_udf::AnyVal* staging_merge_input_val_ = nullptr;
/// Use Create() instead.
AggFnEvaluator(const AggFn& agg_fn, MemPool* mem_pool, bool is_clone);
/// Return the intermediate type of the aggregate function.
inline const SlotDescriptor& intermediate_slot_desc() const;
inline const ColumnType& intermediate_type() const;
/// The interpreted path for the UDA's Update() function. It sets up the arguments to
/// call 'fn' is either the 'update_fn_' or 'merge_fn_' of agg_fn_, depending on whether
/// agg_fn_ is a merging aggregation. This converts from the agg-expr signature, taking
/// TupleRow to the UDA signature taking AnyVals by evaluating any input expressions
/// and populating the staging input values.
///
/// Note that this function may be superseded by the codegend Update() IR function
/// generated by AggFn::CodegenUpdateOrMergeFunction() when codegen is enabled.
void Update(const TupleRow* row, Tuple* dst, void* fn);
/// Sets up the arguments to call 'fn'. This converts from the agg-expr signature,
/// taking TupleRow to the UDA signature taking AnyVals. Writes the serialize/finalize
/// result to the given destination slot/tuple. 'fn' can be NULL to indicate the src
/// value should simply be written into the destination. Note that StringVal result is
/// from local allocation (which will be freed in the next QueryMaintenance()) so it
/// needs to be copied out if it needs to survive beyond QueryMaintenance() (e.g. if
/// 'dst' lives in a row batch).
void SerializeOrFinalize(Tuple* src, const SlotDescriptor& dst_slot_desc,
Tuple* dst, void* fn);
/// Writes the result in src into dst pointed to by dst_slot_desc
inline void SetDstSlot(
const impala_udf::AnyVal* src, const SlotDescriptor& dst_slot_desc, Tuple* dst);
};
inline void AggFnEvaluator::Add(const TupleRow* row, Tuple* dst) {
agg_fn_ctx_->impl()->IncrementNumUpdates();
Update(row, dst, agg_fn_.merge_or_update_fn());
}
inline void AggFnEvaluator::Remove(const TupleRow* row, Tuple* dst) {
agg_fn_ctx_->impl()->IncrementNumRemoves();
Update(row, dst, agg_fn_.remove_fn());
}
inline void AggFnEvaluator::Serialize(Tuple* tuple) {
SerializeOrFinalize(tuple, agg_fn_.intermediate_slot_desc(), tuple,
agg_fn_.serialize_fn());
}
inline void AggFnEvaluator::Finalize(Tuple* agg_val, Tuple* output_val) {
SerializeOrFinalize(agg_val, agg_fn_.output_slot_desc(), output_val,
agg_fn_.finalize_fn());
}
inline void AggFnEvaluator::GetValue(Tuple* src, Tuple* dst) {
SerializeOrFinalize(src, agg_fn_.output_slot_desc(), dst,
agg_fn_.get_value_fn());
}
inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evals, Tuple* dst) {
for (int i = 0; i < evals.size(); ++i) evals[i]->Init(dst);
}
inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evals,
const TupleRow* src, Tuple* dst) {
for (int i = 0; i < evals.size(); ++i) evals[i]->Add(src, dst);
}
inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evals,
const TupleRow* src, Tuple* dst) {
for (int i = 0; i < evals.size(); ++i) evals[i]->Remove(src, dst);
}
inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& evals,
Tuple* dst) {
for (int i = 0; i < evals.size(); ++i) evals[i]->Serialize(dst);
}
inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evals,
Tuple* src, Tuple* dst) {
for (int i = 0; i < evals.size(); ++i) evals[i]->GetValue(src, dst);
}
inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evals,
Tuple* agg_val, Tuple* output_val) {
for (int i = 0; i < evals.size(); ++i) {
evals[i]->Finalize(agg_val, output_val);
}
}
}
#endif