// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <queue>

#include "runtime/buffered-tuple-stream.h"
#include "runtime/reservation-manager.h"
#include "runtime/row-batch.h"

namespace impala {

class RowBatch;

/// A RowBatchQueue that provides non-blocking queue semantics. RowBatches are stored
/// inside a BufferedTupleStream. None of the methods block, this class is not thread
/// safe. The amount of unpinned memory used by the queue can be limited by the
/// parameter 'max_unpinned_bytes'. Calls to AddBatch after the capacity has been reached
/// will return an error Status. Calls to GetBatch on an empty queue will return an error
/// Status.
///
/// In order to manage the reservation used by the BufferedTupleStream, this class uses
/// a ReservationManager that creates the BufferPool::ClientHandle used by the
/// BufferedTupleStream. The ReservationManager uses a ResourceProfile created by the fe/
/// to limit the amount of reserved memory used by the stream.
///
/// 'name' is a unique name which is purely used for error reporting by the
/// BufferedTupleStream.
///
/// 'max_unpinned_bytes' limits the maximum number of bytes that can be unpinned in the
/// underlying BufferedTupleStream. The limit is only a soft limit, it
/// might be exceeded during AddBatch when unpinning the stream.
///
/// 'resource_profile' is created in the fe/ by PlanRootSink and specifies the min and max
/// amount of reserved memory the BufferedTupleStream can use as well as the size of the
/// default and max page length used by the stream.
///
/// The remaining parameters are used to initialize the ReservationManager and
/// BufferedTupleStream.
class SpillableRowBatchQueue {
 public:
  SpillableRowBatchQueue(const std::string& name, int64_t max_unpinned_bytes,
      RuntimeState* state, MemTracker* mem_tracker, RuntimeProfile* profile,
      const RowDescriptor* row_desc, const TBackendResourceProfile& resource_profile,
      const TDebugOptions& debug_options);
  ~SpillableRowBatchQueue();

  /// Creates and initializes the ReservationManager and BufferedTupleStream. Returns an
  /// error Status if either could not be initialized. The ReservationManager may fail
  /// to initialize if it cannot claim the initial buffer reservation. The
  /// BufferedTupleStream may fail to initialize if it could not create the read and
  /// write buffers.
  Status Open();

  /// Adds the given RowBatch to the queue. Returns Status::OK() if the batch was
  /// successfully added, returns an error if there was an issue when adding the batch to
  /// the BufferedTupleStream. If adding the batch to the BufferedTupleStream cannot be
  /// achieved because there is no more available reserved memory, this method will unpin
  /// the stream and then add the RowBatch. If the batch still cannot be added, this
  /// method returns an error Status. It is not valid to call this method if the queue is
  /// full or closed.
  Status AddBatch(RowBatch* batch);

  /// Returns and removes the RowBatch at the head of the queue. Returns Status::OK() if
  /// the batch was successfully read from the queue. It is not valid to call this method
  /// if the queue is empty or has already been closed.
  Status GetBatch(RowBatch* batch);

  /// Returns true if the queue limit has been reached, false otherwise. It is not valid
  /// to call this method if the queue is already closed.
  bool IsFull() const;

  /// Returns true if the queue is empty, false otherwise. It is not valid to call this
  /// method if the queue is already closed.
  bool IsEmpty() const;

  /// Returns false if Close() has been called, true otherwise.
  bool IsOpen() const;

  /// Resets the remaining RowBatches in the queue and releases the queue memory.
  void Close();

 private:
  /// BufferedTupleStream that stores all RowBatches.
  std::unique_ptr<BufferedTupleStream> batch_queue_;

  /// ReservationManager that manages the reserved memory and BufferPool::ClientHandle
  /// used by the BufferedTupleStream.
  ReservationManager reservation_manager_;

  /// A unique name used by the BufferedTupleStream, used purely for debugging purposes.
  const std::string& name_;

  /// Used to in initialize and manage the BufferedTupleStream and ReservationManager.
  RuntimeState* state_;

  /// The MemTracker to use in the BufferedTupleStream.
  MemTracker* mem_tracker_;

  /// Used by the BufferPool::Client created for the BufferedTupleStream.
  RuntimeProfile* profile_;

  /// Used by the BufferedTupleStream, must match the RowDescriptor of the RowBatches
  /// stored in the queue.
  const RowDescriptor* row_desc_;

  /// Used by the ReservationManager to set the min and max reservation that can be
  /// used by the BufferedTupleStream. Used by the BufferedTupleStream to set the default
  /// and max page lengths.
  const TBackendResourceProfile& resource_profile_;

  /// Used by the ReservationManager for the SET_DENY_RESERVATION_PROBABILITY debug
  /// action.
  const TDebugOptions& debug_options_;

  /// The max number of bytes that can be unpinned in the BufferedTupleStream. Set by the
  /// query option MAX_UNPINNED_RESULT_SPOOLING_MEMORY.
  const int64_t max_unpinned_bytes_;

  /// True if the queue has been closed, false otherwise.
  bool closed_ = false;
};
} // namespace impala
