blob: 2bcaa98be6e6d1567a7f0f39469a822f3b411f1b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/non-grouping-aggregator.h"
#include <sstream>
#include "codegen/llvm-codegen.h"
#include "exec/exec-node.h"
#include "exprs/agg-fn-evaluator.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/mem-pool.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
namespace impala {
NonGroupingAggregator::NonGroupingAggregator(ExecNode* exec_node, ObjectPool* pool,
const TAggregator& taggregator, const DescriptorTbl& descs, int agg_idx)
: Aggregator(exec_node, pool, taggregator, descs,
Substitute("NonGroupingAggregator $0", agg_idx), agg_idx) {}
Status NonGroupingAggregator::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(Aggregator::Prepare(state));
singleton_tuple_pool_.reset(new MemPool(mem_tracker_.get()));
return Status::OK();
}
void NonGroupingAggregator::Codegen(RuntimeState* state) {
LlvmCodeGen* codegen = state->codegen();
DCHECK(codegen != nullptr);
TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
Status codegen_status = CodegenAddBatchImpl(codegen, prefetch_mode);
runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
}
Status NonGroupingAggregator::Open(RuntimeState* state) {
RETURN_IF_ERROR(Aggregator::Open(state));
// Create the single output tuple for this non-grouping agg. This must happen after
// opening the aggregate evaluators.
singleton_output_tuple_ =
ConstructSingletonOutputTuple(agg_fn_evals_, singleton_tuple_pool_.get());
// Check for failures during AggFnEvaluator::Init().
RETURN_IF_ERROR(state->GetQueryStatus());
singleton_output_tuple_returned_ = false;
return Status::OK();
}
Status NonGroupingAggregator::GetNext(
RuntimeState* state, RowBatch* row_batch, bool* eos) {
RETURN_IF_ERROR(QueryMaintenance(state));
// There was no grouping, so evaluate the conjuncts and return the single result row.
// We allow calling GetNext() after eos, so don't return this row again.
if (!singleton_output_tuple_returned_) GetSingletonOutput(row_batch);
singleton_output_tuple_returned_ = true;
*eos = true;
return Status::OK();
}
void NonGroupingAggregator::GetSingletonOutput(RowBatch* row_batch) {
int row_idx = row_batch->AddRow();
TupleRow* row = row_batch->GetRow(row_idx);
row_batch->ClearRow(row);
// The output row batch may reference memory allocated by Serialize() or Finalize(),
// allocating that memory directly from the row batch's pool means we can safely return
// the batch.
vector<ScopedResultsPool> allocate_from_batch_pool =
ScopedResultsPool::Create(agg_fn_evals_, row_batch->tuple_data_pool());
Tuple* output_tuple = GetOutputTuple(
agg_fn_evals_, singleton_output_tuple_, row_batch->tuple_data_pool());
row->SetTuple(agg_idx_, output_tuple);
if (ExecNode::EvalConjuncts(conjunct_evals_.data(), conjunct_evals_.size(), row)) {
row_batch->CommitLastRow();
++num_rows_returned_;
COUNTER_SET(rows_returned_counter_, num_rows_returned_);
}
// Keep the current chunk to amortize the memory allocation over a series
// of Reset()/Open()/GetNext()* calls.
row_batch->tuple_data_pool()->AcquireData(singleton_tuple_pool_.get(), true);
// This node no longer owns the memory for singleton_output_tuple_.
singleton_output_tuple_ = nullptr;
}
void NonGroupingAggregator::Close(RuntimeState* state) {
if (!singleton_output_tuple_returned_) {
GetOutputTuple(agg_fn_evals_, singleton_output_tuple_, singleton_tuple_pool_.get());
}
if (singleton_tuple_pool_.get() != nullptr) singleton_tuple_pool_->FreeAll();
// Must be called after singleton_tuple_pool_ is freed, so that mem_tracker_ can be
// closed.
Aggregator::Close(state);
}
Status NonGroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
SCOPED_TIMER(build_timer_);
RETURN_IF_ERROR(QueryMaintenance(state));
if (add_batch_impl_fn_ != nullptr) {
RETURN_IF_ERROR(add_batch_impl_fn_(this, batch));
} else {
RETURN_IF_ERROR(AddBatchImpl(batch));
}
return Status::OK();
}
Status NonGroupingAggregator::AddBatchStreaming(
RuntimeState* state, RowBatch* out_batch, RowBatch* child_batch, bool* eos) {
*eos = true;
return AddBatch(state, child_batch);
}
Tuple* NonGroupingAggregator::ConstructSingletonOutputTuple(
const vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool) {
Tuple* output_tuple = Tuple::Create(intermediate_tuple_desc_->byte_size(), pool);
InitAggSlots(agg_fn_evals, output_tuple);
return output_tuple;
}
string NonGroupingAggregator::DebugString(int indentation_level) const {
stringstream ss;
DebugString(indentation_level, &ss);
return ss.str();
}
void NonGroupingAggregator::DebugString(int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "NonGroupingAggregator("
<< "intermediate_tuple_id=" << intermediate_tuple_id_
<< " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_
<< " agg_exprs=" << AggFn::DebugString(agg_fns_);
*out << ")";
}
Status NonGroupingAggregator::CodegenAddBatchImpl(
LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) {
llvm::Function* update_tuple_fn;
RETURN_IF_ERROR(CodegenUpdateTuple(codegen, &update_tuple_fn));
// Get the cross compiled update row batch function
IRFunction::Type ir_fn = IRFunction::NON_GROUPING_AGG_ADD_BATCH_IMPL;
llvm::Function* add_batch_impl_fn = codegen->GetFunction(ir_fn, true);
DCHECK(add_batch_impl_fn != nullptr);
int replaced;
replaced = codegen->ReplaceCallSites(add_batch_impl_fn, update_tuple_fn, "UpdateTuple");
DCHECK_GE(replaced, 1);
add_batch_impl_fn = codegen->FinalizeFunction(add_batch_impl_fn);
if (add_batch_impl_fn == nullptr) {
return Status("NonGroupingAggregator::CodegenAddBatchImpl(): codegen'd "
"AddBatchImpl() function failed verification, see log");
}
void** codegened_fn_ptr = reinterpret_cast<void**>(&add_batch_impl_fn_);
codegen->AddFunctionToJit(add_batch_impl_fn, codegened_fn_ptr);
return Status::OK();
}
} // namespace impala