blob: 5a5939802c4569a95b2640af133a393a7a427a02 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/exchange-node.h"
#include <boost/scoped_ptr.hpp>
#include "exec/exec-node-util.h"
#include "exprs/scalar-expr.h"
#include "runtime/exec-env.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
#include "util/time.h"
#include "util/tuple-row-compare.h"
#include "gen-cpp/PlanNodes_types.h"
#include "common/names.h"
DECLARE_int32(stress_datastream_recvr_delay_ms);
using namespace impala;
DEFINE_int64(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
"(Advanced) Maximum size of per-query receive-side buffer");
Status ExchangePlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
if (!tnode.exchange_node.__isset.sort_info) return Status::OK();
RETURN_IF_ERROR(ScalarExpr::Create(tnode.exchange_node.sort_info.ordering_exprs,
*row_descriptor_, state, &ordering_exprs_));
is_asc_order_ = tnode.exchange_node.sort_info.is_asc_order;
nulls_first_ = tnode.exchange_node.sort_info.nulls_first;
return Status::OK();
}
Status ExchangePlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new ExchangeNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
ExchangeNode::ExchangeNode(
ObjectPool* pool, const ExchangePlanNode& pnode, const DescriptorTbl& descs)
: ExecNode(pool, pnode, descs),
num_senders_(0),
stream_recvr_(),
input_row_desc_(descs, pnode.tnode_->exchange_node.input_row_tuples,
vector<bool>(pnode.tnode_->nullable_tuples.begin(),
pnode.tnode_->nullable_tuples.begin()
+ pnode.tnode_->exchange_node.input_row_tuples.size())),
next_row_idx_(0),
is_merging_(pnode.tnode_->exchange_node.__isset.sort_info),
offset_(pnode.tnode_->exchange_node.__isset.offset ?
pnode.tnode_->exchange_node.offset :
0),
num_rows_skipped_(0) {
DCHECK_GE(offset_, 0);
DCHECK(is_merging_ || (offset_ == 0));
if (!is_merging_) return;
ordering_exprs_ = pnode.ordering_exprs_;
is_asc_order_ = pnode.is_asc_order_;
nulls_first_ = pnode.nulls_first_;
}
Status ExchangeNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Prepare(state));
convert_row_batch_timer_ = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");
#ifndef NDEBUG
if (FLAGS_stress_datastream_recvr_delay_ms > 0) {
SleepForMs(FLAGS_stress_datastream_recvr_delay_ms);
}
#endif
RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(
Substitute("Exchg Recvr (id=$0)", id_), nullptr,
ExecEnv::GetInstance()->buffer_reservation(), mem_tracker(),
numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_,
MemLimit::HARD));
// TODO: figure out appropriate buffer size
DCHECK_GT(num_senders_, 0);
stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(
&input_row_desc_, *state, state->fragment_instance_id(), id_, num_senders_,
FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker(),
&recvr_buffer_pool_client_);
if (is_merging_) {
less_than_.reset(
new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
state->CheckAndAddCodegenDisabledMessage(runtime_profile());
}
return Status::OK();
}
void ExchangeNode::Codegen(RuntimeState* state) {
DCHECK(state->ShouldCodegen());
ExecNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
if (is_merging_) {
Status codegen_status = less_than_->Codegen(state);
runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
}
}
Status ExchangeNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
RETURN_IF_ERROR(ExecNode::Open(state));
RETURN_IF_CANCELLED(state);
if (is_merging_) {
// CreateMerger() will populate its merging heap with batches from the stream_recvr_,
// so it is not necessary to call FillInputRowBatch().
RETURN_IF_ERROR(
less_than_->Open(pool_, state, expr_perm_pool(), expr_results_pool()));
RETURN_IF_ERROR(stream_recvr_->CreateMerger(*less_than_.get()));
} else {
RETURN_IF_ERROR(FillInputRowBatch(state));
}
return Status::OK();
}
Status ExchangeNode::Reset(RuntimeState* state, RowBatch* row_batch) {
DCHECK(false) << "NYI";
return Status("NYI");
}
void ExchangeNode::Close(RuntimeState* state) {
if (is_closed()) return;
if (less_than_.get() != nullptr) less_than_->Close(state);
if (stream_recvr_ != nullptr) stream_recvr_->Close();
ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&recvr_buffer_pool_client_);
ScalarExpr::Close(ordering_exprs_);
ExecNode::Close(state);
}
Status ExchangeNode::FillInputRowBatch(RuntimeState* state) {
DCHECK(!is_merging_);
Status ret_status;
{
SCOPED_TIMER(state->total_network_receive_timer());
ret_status = stream_recvr_->GetBatch(&input_batch_);
}
VLOG_FILE << "exch: has batch=" << (input_batch_ == NULL ? "false" : "true")
<< " #rows=" << (input_batch_ != NULL ? input_batch_->num_rows() : 0)
<< " is_cancelled=" << (ret_status.IsCancelled() ? "true" : "false")
<< " instance_id=" << PrintId(state->fragment_instance_id());
return ret_status;
}
void ExchangeNode::ReleaseRecvrResources(RowBatch* output_batch) {
stream_recvr_->TransferAllResources(output_batch);
stream_recvr_->CancelStream();
}
Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
if (ReachedLimit()) {
ReleaseRecvrResources(output_batch);
*eos = true;
return Status::OK();
} else {
*eos = false;
}
if (is_merging_) return GetNextMerging(state, output_batch, eos);
while (true) {
{
SCOPED_TIMER(convert_row_batch_timer_);
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
// copy rows until we hit the limit/capacity or until we exhaust input_batch_
while (!ReachedLimit() && !output_batch->AtCapacity()
&& input_batch_ != NULL && next_row_idx_ < input_batch_->num_rows()) {
TupleRow* src = input_batch_->GetRow(next_row_idx_);
++next_row_idx_;
int j = output_batch->AddRow();
TupleRow* dest = output_batch->GetRow(j);
// if the input row is shorter than the output row, make sure not to leave
// uninitialized Tuple* around
output_batch->ClearRow(dest);
// this works as expected if rows from input_batch form a prefix of
// rows in output_batch
input_batch_->CopyRow(src, dest);
output_batch->CommitLastRow();
IncrementNumRowsReturned(1);
}
COUNTER_SET(rows_returned_counter_, rows_returned());
if (ReachedLimit()) {
ReleaseRecvrResources(output_batch);
*eos = true;
return Status::OK();
}
if (output_batch->AtCapacity()) return Status::OK();
}
// we need more rows
stream_recvr_->TransferAllResources(output_batch);
RETURN_IF_ERROR(FillInputRowBatch(state));
*eos = (input_batch_ == nullptr);
// No need to call CancelStream() on the receiver here as all incoming row batches
// have been consumed so we should have replied to all senders already.
if (*eos) return Status::OK();
next_row_idx_ = 0;
DCHECK(input_batch_->row_desc()->LayoutIsPrefixOf(*output_batch->row_desc()));
}
}
Status ExchangeNode::GetNextMerging(RuntimeState* state, RowBatch* output_batch,
bool* eos) {
DCHECK_EQ(output_batch->num_rows(), 0);
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
// Clear any expr result allocations made by the merger.
expr_results_pool_->Clear();
RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
while (num_rows_skipped_ < offset_) {
num_rows_skipped_ += output_batch->num_rows();
// Throw away rows in the output batch until the offset is skipped.
int64_t rows_to_keep = num_rows_skipped_ - offset_;
if (rows_to_keep > 0) {
output_batch->CopyRows(0, output_batch->num_rows() - rows_to_keep, rows_to_keep);
output_batch->set_num_rows(rows_to_keep);
} else {
output_batch->set_num_rows(0);
}
if (rows_to_keep > 0 || *eos || output_batch->AtCapacity()) break;
RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
}
CheckLimitAndTruncateRowBatchIfNeeded(output_batch, eos);
// On eos, transfer all remaining resources from the input batches maintained
// by the merger to the output batch. Also cancel the underlying receiver so
// the senders' fragments can exit early.
if (*eos) ReleaseRecvrResources(output_batch);
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
void ExchangeNode::DebugString(int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "ExchangeNode(#senders=" << num_senders_;
ExecNode::DebugString(indentation_level, out);
*out << ")";
}