// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "exec/blocking-plan-root-sink.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "runtime/row-batch.h"
#include "runtime/tuple-row.h"
#include "service/query-result-set.h"
#include "util/pretty-printer.h"

#include <memory>
#include <boost/thread/mutex.hpp>

using namespace std;
using boost::unique_lock;
using boost::mutex;

namespace impala {

BlockingPlanRootSink::BlockingPlanRootSink(
    TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
  : PlanRootSink(sink_id, row_desc, state) {}

Status BlockingPlanRootSink::Prepare(
    RuntimeState* state, MemTracker* parent_mem_tracker) {
  return PlanRootSink::Prepare(state, parent_mem_tracker);
}

Status BlockingPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
  SCOPED_TIMER(profile()->total_time_counter());
  PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
  RETURN_IF_ERROR(PlanRootSink::UpdateAndCheckRowsProducedLimit(state, batch));
  int current_batch_row = 0;

  // Don't enter the loop if batch->num_rows() == 0; no point triggering the consumer with
  // 0 rows to return. Be wary of ever returning 0-row batches to the client; some poorly
  // written clients may not cope correctly with them. See IMPALA-4335.
  while (current_batch_row < batch->num_rows()) {
    unique_lock<mutex> l(lock_);
    // Wait until the consumer gives us a result set to fill in, or the fragment
    // instance has been cancelled.
    while (results_ == nullptr && !state->is_cancelled()) {
      SCOPED_TIMER(profile_->inactive_timer());
      sender_cv_.Wait(l);
    }
    RETURN_IF_CANCELLED(state);

    // Otherwise the consumer is ready. Fill out the rows.
    DCHECK(results_ != nullptr);
    int num_to_fetch = batch->num_rows() - current_batch_row;
    if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, num_rows_requested_);
    // Debug action before AddBatch is called.
    RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_ADD_ROWS"));
    RETURN_IF_ERROR(
        results_->AddRows(output_expr_evals_, batch, current_batch_row, num_to_fetch));
    current_batch_row += num_to_fetch;
    // Prevent expr result allocations from accumulating.
    expr_results_pool_->Clear();
    // Signal the consumer.
    results_ = nullptr;
    consumer_cv_.NotifyAll();
  }
  rows_sent_counter_->Add(batch->num_rows());
  return Status::OK();
}

Status BlockingPlanRootSink::FlushFinal(RuntimeState* state) {
  SCOPED_TIMER(profile()->total_time_counter());
  unique_lock<mutex> l(lock_);
  sender_state_ = SenderState::EOS;
  // All rows have been sent by the producer, so wake up the producer so it can set eos to
  // true.
  consumer_cv_.NotifyAll();
  return Status::OK();
}

void BlockingPlanRootSink::Close(RuntimeState* state) {
  SCOPED_TIMER(profile()->total_time_counter());
  unique_lock<mutex> l(lock_);
  // FlushFinal() won't have been called when the fragment instance encounters an error
  // before sending all rows.
  if (sender_state_ == SenderState::ROWS_PENDING) {
    sender_state_ = SenderState::CLOSED_NOT_EOS;
  }
  consumer_cv_.NotifyAll();
  DataSink::Close(state);
}

void BlockingPlanRootSink::Cancel(RuntimeState* state) {
  DCHECK(state->is_cancelled());
  sender_cv_.NotifyAll();
  consumer_cv_.NotifyAll();
}

Status BlockingPlanRootSink::GetNext(RuntimeState* state, QueryResultSet* results,
    int num_results, bool* eos, int64_t timeout_us) {
  // Used to track how long the consumer waits for RowBatches to be produced and
  // materialized.
  DCHECK_GE(timeout_us, 0);
  MonotonicStopWatch wait_timeout_timer;
  wait_timeout_timer.Start();

  unique_lock<mutex> l(lock_);

  // Set the shared QueryResultSet pointer 'results_' to the given 'results' object and
  // wake up the sender thread so it can add rows to 'results_'.
  results_ = results;
  num_rows_requested_ = num_results;
  sender_cv_.NotifyAll();

  // True if the consumer timed out waiting for the producer to send rows, false
  // otherwise.
  bool timed_out = false;

  // Wait while the sender is still producing rows and hasn't filled in the current
  // result set.
  while (sender_state_ == SenderState::ROWS_PENDING && results_ != nullptr
      && !state->is_cancelled() && !timed_out) {
    if (timeout_us == 0) {
      consumer_cv_.Wait(l);
    } else {
      // It is possible for the timeout to expire, and for the QueryResultSet to still
      // have some rows appended to it. This can happen if the producer acquires the lock,
      // the timeout expires, and then the producer appends rows to the QueryResultSet.
      // This does not affect correctness because the producer always sets 'results_' to
      // nullptr if it appends any rows to the QueryResultSet and it always appends either
      // an entire RowBatch, or as many rows as requested.
      int64_t wait_duration_us = max(static_cast<int64_t>(1),
          timeout_us - static_cast<int64_t>(
                           round(wait_timeout_timer.ElapsedTime() / NANOS_PER_MICRO)));
      if (!consumer_cv_.WaitFor(l, wait_duration_us)) {
        VLOG_QUERY << "Fetch timed out";
        timed_out = true;

        // If the consumer timed out, make sure results_ is set to nullptr because the
        // consumer will destroy the current QueryResultSet and create a new one for the
        // next fetch request.
        results_ = nullptr;
      }
    }
  }

  *eos = sender_state_ == SenderState::EOS;
  return state->GetQueryStatus();
}
}
