blob: 4b1ca74ce9199204da749a400ede2b570e9602c4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H
#define IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H
#include <memory>
#include "exec/aggregation-node-base.h"
namespace impala {
class RowBatch;
class RuntimeState;
/// Node for doing streaming partitioned hash aggregation.
///
/// This node consumes the input from child(0) during GetNext() and then passes it to the
/// Aggregator, which does the actual work of aggregating. The aggregator will attempt to
/// aggregate the rows into its hash table, but if there is not enough memory available or
/// if the reduction from the aggregation is not very good, it will 'stream' the rows
/// through and return them without aggregating them instead of spilling. After all of the
/// input as been processed from child(0), subsequent calls to GetNext() will return any
/// rows that were aggregated in the Aggregator's hash table.
///
/// Since the rows returned by GetNext() may be only partially aggregated if there are
/// memory contraints, this is a preliminary aggregation step that functions as an
/// optimization and will always be followed in the plan by an AggregationNode that does
/// the final aggregation.
///
/// This node only supports grouping aggregations.
class StreamingAggregationNode : public AggregationNodeBase {
public:
StreamingAggregationNode(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
virtual Status Open(RuntimeState* state) override;
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
virtual void Close(RuntimeState* state) override;
virtual void DebugString(int indentation_level, std::stringstream* out) const override;
private:
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
/// Row batch retrieved from the child and passed to Aggregators in GetNext(). Stored
/// here as we may need it in multiple GetNext() calls if we're streaming rows through.
std::unique_ptr<RowBatch> child_batch_;
/// If true, there are no more rows in 'child_batch_' that need to be passed to any
/// Aggregator, and the next call to GetNext() will retrieve another batch, unless
/// 'child_eos_' is true.
bool child_batch_processed_ = true;
/// True if no more rows to process from child.
bool child_eos_ = false;
/// If 'replicate_input_' is true, the index in 'aggs_' of the next Aggregator to pass
/// 'child_batch_' into.
int32_t replicate_agg_idx_ = 0;
/// END: Members that must be Reset()
/////////////////////////////////////////
/// Get output rows from child for streaming pre-aggregation. Aggregates some rows with
/// hash table and passes through other rows converted into the intermediate
/// tuple format. Sets 'child_eos_' once all rows from child have been returned.
Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
};
} // namespace impala
#endif // IMPALA_EXEC_STREAMING_AGGREGATION_NODE_H