blob: 97c60d88e4651bf30ad96225d9c78f04cb335d38 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <queue>
#include "runtime/buffered-tuple-stream.h"
#include "runtime/reservation-manager.h"
#include "runtime/row-batch.h"
namespace impala {
class RowBatch;
/// A RowBatchQueue that provides non-blocking queue semantics. RowBatches are stored
/// inside a BufferedTupleStream. None of the methods block, this class is not thread
/// safe. The amount of unpinned memory used by the queue can be limited by the
/// parameter 'max_unpinned_bytes'. Calls to AddBatch after the capacity has been reached
/// will return an error Status. Calls to GetBatch on an empty queue will return an error
/// Status.
///
/// In order to manage the reservation used by the BufferedTupleStream, this class uses
/// a ReservationManager that creates the BufferPool::ClientHandle used by the
/// BufferedTupleStream. The ReservationManager uses a ResourceProfile created by the fe/
/// to limit the amount of reserved memory used by the stream.
///
/// 'name' is a unique name which is purely used for error reporting by the
/// BufferedTupleStream.
///
/// 'max_unpinned_bytes' limits the maximum number of bytes that can be unpinned in the
/// underlying BufferedTupleStream. The limit is only a soft limit, it
/// might be exceeded during AddBatch when unpinning the stream.
///
/// 'resource_profile' is created in the fe/ by PlanRootSink and specifies the min and max
/// amount of reserved memory the BufferedTupleStream can use as well as the size of the
/// default and max page length used by the stream.
///
/// The remaining parameters are used to initialize the ReservationManager and
/// BufferedTupleStream.
class SpillableRowBatchQueue {
public:
SpillableRowBatchQueue(const std::string& name, int64_t max_unpinned_bytes,
RuntimeState* state, MemTracker* mem_tracker, RuntimeProfile* profile,
const RowDescriptor* row_desc, const TBackendResourceProfile& resource_profile,
const TDebugOptions& debug_options);
~SpillableRowBatchQueue();
/// Creates and initializes the ReservationManager and BufferedTupleStream. Returns an
/// error Status if either could not be initialized. The ReservationManager may fail
/// to initialize if it cannot claim the initial buffer reservation. The
/// BufferedTupleStream may fail to initialize if it could not create the read and
/// write buffers.
Status Open();
/// Adds the given RowBatch to the queue. Returns Status::OK() if the batch was
/// successfully added, returns an error if there was an issue when adding the batch to
/// the BufferedTupleStream. If adding the batch to the BufferedTupleStream cannot be
/// achieved because there is no more available reserved memory, this method will unpin
/// the stream and then add the RowBatch. If the batch still cannot be added, this
/// method returns an error Status. It is not valid to call this method if the queue is
/// full or closed.
Status AddBatch(RowBatch* batch);
/// Returns and removes the RowBatch at the head of the queue. Returns Status::OK() if
/// the batch was successfully read from the queue. It is not valid to call this method
/// if the queue is empty or has already been closed.
Status GetBatch(RowBatch* batch);
/// Returns true if the queue limit has been reached, false otherwise. It is not valid
/// to call this method if the queue is already closed.
bool IsFull() const;
/// Returns true if the queue is empty, false otherwise. It is not valid to call this
/// method if the queue is already closed.
bool IsEmpty() const;
/// Returns false if Close() has been called, true otherwise.
bool IsOpen() const;
/// Resets the remaining RowBatches in the queue and releases the queue memory.
void Close();
private:
/// BufferedTupleStream that stores all RowBatches.
std::unique_ptr<BufferedTupleStream> batch_queue_;
/// ReservationManager that manages the reserved memory and BufferPool::ClientHandle
/// used by the BufferedTupleStream.
ReservationManager reservation_manager_;
/// A unique name used by the BufferedTupleStream, used purely for debugging purposes.
const std::string& name_;
/// Used to in initialize and manage the BufferedTupleStream and ReservationManager.
RuntimeState* state_;
/// The MemTracker to use in the BufferedTupleStream.
MemTracker* mem_tracker_;
/// Used by the BufferPool::Client created for the BufferedTupleStream.
RuntimeProfile* profile_;
/// Used by the BufferedTupleStream, must match the RowDescriptor of the RowBatches
/// stored in the queue.
const RowDescriptor* row_desc_;
/// Used by the ReservationManager to set the min and max reservation that can be
/// used by the BufferedTupleStream. Used by the BufferedTupleStream to set the default
/// and max page lengths.
const TBackendResourceProfile& resource_profile_;
/// Used by the ReservationManager for the SET_DENY_RESERVATION_PROBABILITY debug
/// action.
const TDebugOptions& debug_options_;
/// The max number of bytes that can be unpinned in the BufferedTupleStream. Set by the
/// query option MAX_UNPINNED_RESULT_SPOOLING_MEMORY.
const int64_t max_unpinned_bytes_;
/// True if the queue has been closed, false otherwise.
bool closed_ = false;
};
} // namespace impala