// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/scoped_ptr.hpp>
#include "exec/exec-node.h"
#include "runtime/bufferpool/buffer-pool.h"
namespace impala {
class KrpcDataStreamRecvr;
class RowBatch;
class ScalarExpr;
class TupleRowComparator;
/// Receiver node for data streams. The data stream receiver is created in Prepare()
/// and closed in Close().
/// is_merging is set to indicate that rows from different senders must be merged
/// according to the sort parameters in sort_exec_exprs_. (It is assumed that the rows
/// received from the senders themselves are sorted.)
/// If is_merging_ is true, the exchange node creates a DataStreamRecvrBase with the
/// is_merging_ flag and retrieves retrieves rows from the receiver via calls to
/// DataStreamRecvrBase::GetNext(). It also prepares, opens and closes the ordering exprs
/// in its SortExecExprs member that are used to compare rows.
/// If is_merging_ is false, the exchange node directly retrieves batches from the row
/// batch queue of the DataStreamRecvrBase via calls to DataStreamRecvrBase::GetBatch().
class ExchangeNode : public ExecNode {
ExchangeNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
virtual Status Prepare(RuntimeState* state);
virtual void Codegen(RuntimeState* state);
/// Blocks until the first batch is available for consumption via GetNext().
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
virtual void Close(RuntimeState* state);
/// the number of senders needs to be set after the c'tor, because it's not
/// recorded in TPlanNode, and before calling Prepare()
void set_num_senders(int num_senders) { num_senders_ = num_senders; }
virtual void DebugString(int indentation_level, std::stringstream* out) const;
/// Implements GetNext() for the case where is_merging_ is true. Delegates the GetNext()
/// call to the underlying DataStreamRecvrBase.
Status GetNextMerging(RuntimeState* state, RowBatch* output_batch, bool* eos);
/// Resets input_batch_ to the next batch from the from stream_recvr_'s queue.
/// Only used when is_merging_ is false.
Status FillInputRowBatch(RuntimeState* state);
/// Releases resources of the receiver by transferring the resource ownership of
/// the most recently dequeued row batch to 'output_batch'. Also cancels the underlying
/// receiver so all senders will get unblocked. This function is called after the
/// exchange node hits end-of-stream due to reaching the node's row count limit.
/// Please note that no more rows will be returned from the receiver once this function
/// is called.
void ReleaseRecvrResources(RowBatch* output_batch);
int num_senders_; // needed for stream_recvr_ construction
/// The underlying DataStreamRecvrBase instance. Ownership is shared between this
/// exchange node instance and the DataStreamMgr used to create the receiver.
/// stream_recvr_->Close() must be called before this instance is destroyed.
std::shared_ptr<KrpcDataStreamRecvr> stream_recvr_;
/// our input rows are a prefix of the rows we produce
RowDescriptor input_row_desc_;
/// Current batch of rows from the receiver queue being processed by this node.
/// Only valid if is_merging_ is false. (If is_merging_ is true, GetNext() is
/// delegated to the receiver). Owned by the stream receiver.
RowBatch* input_batch_;
/// Next row to copy from input_batch_. For non-merging exchanges, input_batch_
/// is retrieved directly from the sender queue in the stream recvr, and rows from
/// input_batch_ must be copied to the output batch in GetNext().
int next_row_idx_;
/// The buffer pool client for allocating buffers for tuple pointers and
/// tuple data in row batches.
BufferPool::ClientHandle recvr_buffer_pool_client_;
/// time spent reconstructing received rows
RuntimeProfile::Counter* convert_row_batch_timer_;
/// True if this is a merging exchange node. If true, GetNext() is delegated to the
/// underlying stream_recvr_, and input_batch_ is not used/valid.
bool is_merging_;
/// The TupleRowComparator based on 'sort_exec_exprs_' for merging exchange.
boost::scoped_ptr<TupleRowComparator> less_than_;
/// Sort expressions and parameters passed to the merging receiver..
std::vector<ScalarExpr*> ordering_exprs_;
std::vector<bool> is_asc_order_;
std::vector<bool> nulls_first_;
/// Offset specifying number of rows to skip.
int64_t offset_;
/// Number of rows skipped so far.
int64_t num_rows_skipped_;