// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/sort-node.h"

#include "exec/exec-node-util.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
#include "util/runtime-profile-counters.h"

#include "common/names.h"

namespace impala {

Status SortPlanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
  RETURN_IF_ERROR(PlanNode::Init(tnode, state));
  const TSortInfo& tsort_info = tnode.sort_node.sort_info;
  RETURN_IF_ERROR(ScalarExpr::Create(
      tsort_info.ordering_exprs, *row_descriptor_, state, &ordering_exprs_));
  DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
  RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
      *children_[0]->row_descriptor_, state, &sort_tuple_slot_exprs_));
  is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
  nulls_first_ = tnode.sort_node.sort_info.nulls_first;
  return Status::OK();
}

Status SortPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
  ObjectPool* pool = state->obj_pool();
  *node = pool->Add(new SortNode(pool, *this, state->desc_tbl()));
  return Status::OK();
}

SortNode::SortNode(
    ObjectPool* pool, const SortPlanNode& pnode, const DescriptorTbl& descs)
  : ExecNode(pool, pnode, descs),
    offset_(pnode.tnode_->sort_node.__isset.offset ? pnode.tnode_->sort_node.offset : 0),
    sorter_(NULL),
    num_rows_skipped_(0) {
  ordering_exprs_ = pnode.ordering_exprs_;
  sort_tuple_exprs_ = pnode.sort_tuple_slot_exprs_;
  is_asc_order_ = pnode.is_asc_order_;
  nulls_first_ = pnode.nulls_first_;
  runtime_profile()->AddInfoString("SortType", "Total");
}

SortNode::~SortNode() {
}

Status SortNode::Prepare(RuntimeState* state) {
  SCOPED_TIMER(runtime_profile_->total_time_counter());
  RETURN_IF_ERROR(ExecNode::Prepare(state));
  sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
      sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true));
  RETURN_IF_ERROR(sorter_->Prepare(pool_));
  DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
  state->CheckAndAddCodegenDisabledMessage(runtime_profile());
  return Status::OK();
}

void SortNode::Codegen(RuntimeState* state) {
  DCHECK(state->ShouldCodegen());
  ExecNode::Codegen(state);
  if (IsNodeCodegenDisabled()) return;
  Status codegen_status = sorter_->Codegen(state);
  runtime_profile()->AddCodegenMsg(codegen_status.ok(), codegen_status);
}

Status SortNode::Open(RuntimeState* state) {
  SCOPED_TIMER(runtime_profile_->total_time_counter());
  ScopedOpenEventAdder ea(this);
  RETURN_IF_ERROR(ExecNode::Open(state));
  RETURN_IF_ERROR(child(0)->Open(state));
  // Claim reservation after the child has been opened to reduce the peak reservation
  // requirement.
  if (!buffer_pool_client()->is_registered()) {
    RETURN_IF_ERROR(ClaimBufferReservation(state));
  }
  RETURN_IF_ERROR(sorter_->Open());
  RETURN_IF_CANCELLED(state);
  RETURN_IF_ERROR(QueryMaintenance(state));

  // The child has been opened and the sorter created. Sort the input.
  // The final merge is done on-demand as rows are requested in GetNext().
  RETURN_IF_ERROR(SortInput(state));
  return Status::OK();
}

Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
  SCOPED_TIMER(runtime_profile_->total_time_counter());
  ScopedGetNextEventAdder ea(this, eos);
  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
  RETURN_IF_CANCELLED(state);
  RETURN_IF_ERROR(QueryMaintenance(state));

  if (ReachedLimit()) {
    *eos = true;
    return Status::OK();
  } else {
    *eos = false;
  }

  if (returned_buffer_) {
    // If the Sorter returned a buffer on the last call to GetNext(), we might have an
    // opportunity to release memory. Release reservation, unless it might be needed
    // for the next subplan iteration or merging spilled runs.
    returned_buffer_ = false;
    if (!IsInSubplan() && !sorter_->HasSpilledRuns()) {
      DCHECK(!buffer_pool_client()->has_unpinned_pages());
      Status status = ReleaseUnusedReservation();
      DCHECK(status.ok()) << "Should not fail - no runs were spilled so no pages are "
                          << "unpinned. " << status.GetDetail();
    }
  }

  DCHECK_EQ(row_batch->num_rows(), 0);
  RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
  while ((num_rows_skipped_ < offset_)) {
    num_rows_skipped_ += row_batch->num_rows();
    // Throw away rows in the output batch until the offset is skipped.
    int rows_to_keep = num_rows_skipped_ - offset_;
    if (rows_to_keep > 0) {
      row_batch->CopyRows(0, row_batch->num_rows() - rows_to_keep, rows_to_keep);
      row_batch->set_num_rows(rows_to_keep);
    } else {
      row_batch->set_num_rows(0);
    }
    if (rows_to_keep > 0 || *eos) break;
    RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
  }

  returned_buffer_ = row_batch->num_buffers() > 0;
  CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos);

  COUNTER_SET(rows_returned_counter_, rows_returned());

  return Status::OK();
}

Status SortNode::Reset(RuntimeState* state, RowBatch* row_batch) {
  num_rows_skipped_ = 0;
  if (sorter_.get() != NULL) sorter_->Reset();
  return ExecNode::Reset(state, row_batch);
}

void SortNode::Close(RuntimeState* state) {
  if (is_closed()) return;
  if (sorter_ != nullptr) sorter_->Close(state);
  sorter_.reset();
  ScalarExpr::Close(ordering_exprs_);
  ScalarExpr::Close(sort_tuple_exprs_);
  ExecNode::Close(state);
}

void SortNode::DebugString(int indentation_level, stringstream* out) const {
  *out << string(indentation_level * 2, ' ');
  *out << "SortNode(" << ScalarExpr::DebugString(ordering_exprs_);
  for (int i = 0; i < is_asc_order_.size(); ++i) {
    *out << (i > 0 ? " " : "")
         << (is_asc_order_[i] ? "asc" : "desc")
         << " nulls " << (nulls_first_[i] ? "first" : "last");
  }
  ExecNode::DebugString(indentation_level, out);
  *out << ")";
}

Status SortNode::SortInput(RuntimeState* state) {
  RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
  bool eos;
  do {
    RETURN_IF_ERROR(child(0)->GetNext(state, &batch, &eos));
    RETURN_IF_ERROR(sorter_->AddBatch(&batch));
    batch.Reset();
    RETURN_IF_CANCELLED(state);
    RETURN_IF_ERROR(QueryMaintenance(state));
  } while(!eos);

  // Unless we are inside a subplan expecting to call Open()/GetNext() on the child
  // again, the child can be closed at this point to release resources.
  if (!IsInSubplan()) child(0)->Close(state);

  RETURN_IF_ERROR(sorter_->InputDone());
  return Status::OK();
}

}
