blob: ef34a0b1de0579f0a0f4a3f2bd301c415668f883 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/streaming-aggregation-node.h"
#include <sstream>
#include "exec/exec-node-util.h"
#include "gutil/strings/substitute.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "util/runtime-profile-counters.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
namespace impala {
StreamingAggregationNode::StreamingAggregationNode(
ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs)
: AggregationNodeBase(pool, pnode, descs) {
DCHECK(pnode.tnode_->conjuncts.empty()) << "Preaggs have no conjuncts";
DCHECK(limit_ == -1) << "Preaggs have no limits";
for (auto& t_agg : pnode.tnode_->agg_node.aggregators) {
DCHECK(t_agg.use_streaming_preaggregation);
}
}
Status StreamingAggregationNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
// Open the child before consuming resources in this node.
RETURN_IF_ERROR(child(0)->Open(state));
RETURN_IF_ERROR(ExecNode::Open(state));
for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Open(state));
// Streaming preaggregations do all processing in GetNext().
return Status::OK();
}
Status StreamingAggregationNode::GetNext(
RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
if (ReachedLimit()) {
*eos = true;
return Status::OK();
}
// With multiple Aggregators, each will only set a single tuple per row. We rely on the
// other tuples to be null to detect which Aggregator set which row.
if (aggs_.size() > 1) row_batch->ClearTuplePointers();
if (!child_eos_ || !child_batch_processed_) {
// For streaming preaggregations, we process rows from the child as we go.
RETURN_IF_ERROR(GetRowsStreaming(state, row_batch));
*eos = false;
} else {
bool aggregator_eos = false;
RETURN_IF_ERROR(
aggs_[curr_output_agg_idx_]->GetNext(state, row_batch, &aggregator_eos));
if (aggregator_eos) ++curr_output_agg_idx_;
*eos = curr_output_agg_idx_ >= aggs_.size();
}
IncrementNumRowsReturned(row_batch->num_rows());
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
Status StreamingAggregationNode::GetRowsStreaming(
RuntimeState* state, RowBatch* out_batch) {
if (child_batch_ == nullptr) {
child_batch_.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
}
int num_aggs = aggs_.size();
// Create mini batches.
vector<unique_ptr<RowBatch>> mini_batches;
if (!replicate_input_ && num_aggs > 1) {
for (int i = 0; i < num_aggs; ++i) {
mini_batches.push_back(make_unique<RowBatch>(
child(0)->row_desc(), state->batch_size(), mem_tracker()));
}
}
do {
DCHECK_EQ(out_batch->num_rows(), 0);
RETURN_IF_CANCELLED(state);
if (child_batch_processed_) {
DCHECK_EQ(child_batch_->num_rows(), 0);
RETURN_IF_ERROR(child(0)->GetNext(state, child_batch_.get(), &child_eos_));
child_batch_processed_ = false;
}
if (num_aggs == 1) {
RETURN_IF_ERROR(aggs_[0]->AddBatchStreaming(
state, out_batch, child_batch_.get(), &child_batch_processed_));
// We're not guaranteed to be able to stream the entirety of 'child_batch_' into
// 'out_batch' as AddBatchStreaming() will attach all var-len data to 'out_batch'
// and 'child_batch_' may have been referencing data that wasn't attached to it.
if (child_batch_processed_) {
child_batch_->Reset();
}
continue;
}
if (replicate_input_) {
bool eos = false;
while (replicate_agg_idx_ < num_aggs) {
RETURN_IF_ERROR(aggs_[replicate_agg_idx_]->AddBatchStreaming(
state, out_batch, child_batch_.get(), &eos));
if (eos) ++replicate_agg_idx_;
if (out_batch->AtCapacity()) break;
// If out_batch isn't full, we must have processed the entire input.
DCHECK(eos);
}
if (replicate_agg_idx_ == num_aggs) {
replicate_agg_idx_ = 0;
child_batch_processed_ = true;
child_batch_->Reset();
}
continue;
}
// Separate input batch into mini batches destined for the different aggs.
int num_tuples = child(0)->row_desc()->tuple_descriptors().size();
DCHECK_EQ(num_aggs, num_tuples);
int num_rows = child_batch_->num_rows();
DCHECK_LE(num_rows, out_batch->capacity());
if (num_rows > 0) {
RETURN_IF_ERROR(SplitMiniBatches(child_batch_.get(), &mini_batches));
for (int i = 0; i < num_tuples; ++i) {
RowBatch* mini_batch = mini_batches[i].get();
if (mini_batch->num_rows() > 0) {
bool eos;
RETURN_IF_ERROR(
aggs_[i]->AddBatchStreaming(state, out_batch, mini_batch, &eos));
// child_batch_'s size is <= out_batch's capacity, so even under high memory
// pressure where all rows are streamed though, we will be able to process
// the entire input. Note that unlike the single agg case above, this works
// because any node that hits this path must have been preceded by an
// aggregation node in 'replicate_input_' mode, and so all memory pointed to by
// 'child_batch_' will be attached to 'child_batch_'.
DCHECK(eos);
mini_batch->Reset();
}
}
}
child_batch_processed_ = true;
child_batch_->Reset();
} while (out_batch->num_rows() == 0 && !child_eos_);
if (child_eos_ && child_batch_processed_) {
child(0)->Close(state);
child_batch_.reset();
for (auto& agg : aggs_) RETURN_IF_ERROR(agg->InputDone());
}
return Status::OK();
}
Status StreamingAggregationNode::Reset(RuntimeState* state, RowBatch* row_batch) {
DCHECK(false) << "Cannot reset preaggregation";
return Status("Cannot reset preaggregation");
}
void StreamingAggregationNode::Close(RuntimeState* state) {
if (is_closed()) return;
// All expr mem allocations should happen in the Aggregator.
DCHECK(expr_results_pool() == nullptr
|| expr_results_pool()->total_allocated_bytes() == 0);
for (auto& agg : aggs_) agg->Close(state);
child_batch_.reset();
ExecNode::Close(state);
}
void StreamingAggregationNode::DebugString(
int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "StreamingAggregationNode(";
for (auto& agg : aggs_) agg->DebugString(indentation_level, out);
ExecNode::DebugString(indentation_level, out);
*out << ")";
}
} // namespace impala