blob: f8cdf8b4f82f50664f9e232c36bb9b44df5af4c1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/partial-sort-node.h"
#include "exec/exec-node-util.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
#include "util/runtime-profile-counters.h"
#include "common/names.h"
namespace impala {
Status PartialSortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
DCHECK(!tnode.sort_node.__isset.offset || tnode.sort_node.offset == 0);
RETURN_IF_ERROR(PlanNode::Init(tnode, state));
const TSortInfo& tsort_info = tnode.sort_node.sort_info;
RETURN_IF_ERROR(ScalarExpr::Create(
tsort_info.ordering_exprs, *row_descriptor_, state, &ordering_exprs_));
DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
*children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
nulls_first_ = tnode.sort_node.sort_info.nulls_first;
return Status::OK();
}
Status PartialSortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new PartialSortNode(pool, *this, state->desc_tbl()));
return Status::OK();
}
PartialSortNode::PartialSortNode(
ObjectPool* pool, const PartialSortPlanNode& pnode, const DescriptorTbl& descs)
: ExecNode(pool, pnode, descs),
sorter_(nullptr),
input_batch_index_(0),
input_eos_(false),
sorter_eos_(true) {
ordering_exprs_ = pnode.ordering_exprs_;
sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
is_asc_order_ = pnode.is_asc_order_;
nulls_first_ = pnode.nulls_first_;
runtime_profile()->AddInfoString("SortType", "Partial");
}
PartialSortNode::~PartialSortNode() {
DCHECK(input_batch_.get() == nullptr);
}
Status PartialSortNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Prepare(state));
sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), false));
RETURN_IF_ERROR(sorter_->Prepare(pool_));
DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
state->CheckAndAddCodegenDisabledMessage(runtime_profile());
input_batch_.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
return Status::OK();
}
void PartialSortNode::Codegen(RuntimeState* state) {
DCHECK(state->ShouldCodegen());
ExecNode::Codegen(state);
if (IsNodeCodegenDisabled()) return;
Status codegen_status = sorter_->Codegen(state);
runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
}
Status PartialSortNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedOpenEventAdder ea(this);
RETURN_IF_ERROR(ExecNode::Open(state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(child(0)->Open(state));
if (!buffer_pool_client()->is_registered()) {
RETURN_IF_ERROR(ClaimBufferReservation(state));
}
return Status::OK();
}
Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
DCHECK_EQ(row_batch->num_rows(), 0);
if (!sorter_eos_) {
// There were rows in the current run that didn't fit in the last output batch.
RETURN_IF_ERROR(sorter_->GetNext(row_batch, &sorter_eos_));
if (sorter_eos_) {
sorter_->Reset();
*eos = input_eos_;
}
IncrementNumRowsReturned(row_batch->num_rows());
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
if (input_eos_) {
*eos = true;
return Status::OK();
}
DCHECK(sorter_eos_);
RETURN_IF_ERROR(sorter_->Open());
do {
if (input_batch_index_ == input_batch_->num_rows()) {
input_batch_->Reset();
input_batch_index_ = 0;
RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(), &input_eos_));
}
int num_processed;
RETURN_IF_ERROR(
sorter_->AddBatchNoSpill(input_batch_.get(), input_batch_index_, &num_processed));
input_batch_index_ += num_processed;
DCHECK(input_batch_index_ <= input_batch_->num_rows());
RETURN_IF_ERROR(QueryMaintenance(state));
} while (input_batch_index_ == input_batch_->num_rows() && !input_eos_);
RETURN_IF_ERROR(sorter_->InputDone());
RETURN_IF_ERROR(sorter_->GetNext(row_batch, &sorter_eos_));
if (sorter_eos_) {
sorter_->Reset();
*eos = input_eos_;
}
IncrementNumRowsReturned(row_batch->num_rows());
COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK();
}
Status PartialSortNode::Reset(RuntimeState* state, RowBatch* row_batch) {
DCHECK(false) << "PartialSortNode cannot be part of a subplan.";
return Status("Cannot reset partial sort");
}
void PartialSortNode::Close(RuntimeState* state) {
if (is_closed()) return;
input_batch_.reset();
child(0)->Close(state);
if (sorter_ != nullptr) sorter_->Close(state);
sorter_.reset();
ScalarExpr::Close(ordering_exprs_);
ScalarExpr::Close(sort_tuple_exprs_);
ExecNode::Close(state);
}
void PartialSortNode::DebugString(int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' ');
*out << "PartialSortNode(" << ScalarExpr::DebugString(ordering_exprs_);
for (int i = 0; i < is_asc_order_.size(); ++i) {
*out << (i > 0 ? " " : "") << (is_asc_order_[i] ? "asc" : "desc") << " nulls "
<< (nulls_first_[i] ? "first" : "last");
}
ExecNode::DebugString(indentation_level, out);
*out << ")";
}
}