// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/spillable-row-batch-queue.h"

#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"

#include "common/names.h"

namespace impala {

SpillableRowBatchQueue::SpillableRowBatchQueue(const string& name,
    int64_t max_unpinned_bytes, RuntimeState* state, MemTracker* mem_tracker,
    RuntimeProfile* profile, const RowDescriptor* row_desc,
    const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
  : name_(name),
    state_(state),
    mem_tracker_(mem_tracker),
    profile_(profile),
    row_desc_(row_desc),
    resource_profile_(resource_profile),
    debug_options_(debug_options),
    max_unpinned_bytes_(max_unpinned_bytes) {}

SpillableRowBatchQueue::~SpillableRowBatchQueue() {
  DCHECK(closed_);
}

Status SpillableRowBatchQueue::Open() {
  // Initialize the ResevationManager and then claim the initial reservation.
  reservation_manager_.Init(name_, profile_, state_->instance_buffer_reservation(),
      mem_tracker_, resource_profile_, debug_options_);
  RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state_));

  // Create the BufferedTupleStream, initialize it, and then create the read and write
  // buffer pages.
  batch_queue_ = make_unique<BufferedTupleStream>(state_, row_desc_,
      reservation_manager_.buffer_pool_client(), resource_profile_.spillable_buffer_size,
      resource_profile_.max_row_buffer_size);
  RETURN_IF_ERROR(batch_queue_->Init(name_, true));
  bool got_reservation = false;
  RETURN_IF_ERROR(batch_queue_->PrepareForReadWrite(true, &got_reservation));
  DCHECK(got_reservation) << "SpillableRowBatchQueue failed to get reservation using "
                          << "buffer pool client: "
                          << reservation_manager_.buffer_pool_client()->DebugString();
  return Status::OK();
}

Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
  DCHECK(!IsFull()) << "Cannot AddBatch on a full SpillableRowBatchQueue";
  DCHECK(!closed_) << "Cannot AddBatch on a closed SpillableRowBatchQueue";
  Status status;
  FOREACH_ROW(batch, 0, batch_itr) {
    // AddRow should only return false if there was not enough unused reservation to
    // allocate a page for the given row. If a row cannot be added to the batch_queue_
    // then start spilling to disk by unpining the stream. Once the stream is unpinned,
    // adding the row to the stream should succeed unless the unpinned pages needed to
    // be spilled and either (1) there was an error (e.g. IO error) when writing to disk,
    // (2) there is no more scratch space left to write to disk, or (3) spilling to disk
    // is disabled.
    if (UNLIKELY(!batch_queue_->AddRow(batch_itr.Get(), &status))) {
      RETURN_IF_ERROR(status);
      // StartSpilling checks if spilling is disabled and returns an error if it is not.
      RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));

      // The pin should be stream at this point.
      DCHECK(batch_queue_->is_pinned());
      DCHECK_EQ(batch_queue_->bytes_unpinned(), 0);

      // Unpin the stream and then add the row.
      RETURN_IF_ERROR(
          batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));

      // Append "Spilled" to the "ExecOption" info string in the runtime profile.
      profile_->AppendExecOption("Spilled");

      if (!batch_queue_->AddRow(batch_itr.Get(), &status)) {
        RETURN_IF_ERROR(status);
        // If the row could not be added after the stream was unpinned, an error should
        // have been set.
        DCHECK(false) << "Rows should be added in unpinned mode unless an error occurred";
      }
    }
  }
  return Status::OK();
}

Status SpillableRowBatchQueue::GetBatch(RowBatch* batch) {
  DCHECK(!IsEmpty()) << "Cannot GetBatch on an empty SpillableRowBatchQueue";
  DCHECK(!closed_) << "Cannot GetBatch on a closed SpillableRowBatchQueue";
  bool eos = false;
  RETURN_IF_ERROR(batch_queue_->GetNext(batch, &eos));
  // Validate that the value of eos is consistent with IsEmpty().
  DCHECK_EQ(eos, IsEmpty());
  return Status::OK();
}

bool SpillableRowBatchQueue::IsFull() const {
  // The queue is considered full if the number of unpinned bytes is greater than the
  // max number of unpinned bytes. The queue can only be full after the stream has been
  // unpinned. The number of unpinned bytes in the stream may exceed the imposed limit
  // because the entire stream is unpinned at once, without checking against the
  // max_unpinned_bytes_ limit.
  DCHECK(!closed_);
  return batch_queue_->bytes_unpinned() >= max_unpinned_bytes_;
}

bool SpillableRowBatchQueue::IsEmpty() const {
  // The batch_queue_ tracks how many rows have been added to the stream (regardless of
  // whether those rows have already been removed) and how many rows have been read from
  // the stream. If these values are equal, the queue is considered empty.
  DCHECK(!closed_);
  return batch_queue_->num_rows() == batch_queue_->rows_returned();
}

bool SpillableRowBatchQueue::IsOpen() const {
  return !closed_;
}

void SpillableRowBatchQueue::Close() {
  if (closed_) return;
  if (batch_queue_ != nullptr) {
    batch_queue_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
  }
  reservation_manager_.Close(state_);
  closed_ = true;
}
} // namespace impala
