blob: da21660f62e4b6e8ee2fcdf1c27dec3a1b7e09db [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/spillable-row-batch-queue.h"
#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "common/names.h"
namespace impala {
SpillableRowBatchQueue::SpillableRowBatchQueue(const string& name,
int64_t max_unpinned_bytes, RuntimeState* state, MemTracker* mem_tracker,
RuntimeProfile* profile, const RowDescriptor* row_desc,
const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
: name_(name),
state_(state),
mem_tracker_(mem_tracker),
profile_(profile),
row_desc_(row_desc),
resource_profile_(resource_profile),
debug_options_(debug_options),
max_unpinned_bytes_(max_unpinned_bytes) {}
SpillableRowBatchQueue::~SpillableRowBatchQueue() {
DCHECK(closed_);
}
Status SpillableRowBatchQueue::Open() {
// Initialize the ResevationManager and then claim the initial reservation.
reservation_manager_.Init(name_, profile_, state_->instance_buffer_reservation(),
mem_tracker_, resource_profile_, debug_options_);
RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state_));
// Create the BufferedTupleStream, initialize it, and then create the read and write
// buffer pages.
batch_queue_ = make_unique<BufferedTupleStream>(state_, row_desc_,
reservation_manager_.buffer_pool_client(), resource_profile_.spillable_buffer_size,
resource_profile_.max_row_buffer_size);
RETURN_IF_ERROR(batch_queue_->Init(name_, true));
bool got_reservation = false;
RETURN_IF_ERROR(batch_queue_->PrepareForReadWrite(true, &got_reservation));
DCHECK(got_reservation) << "SpillableRowBatchQueue failed to get reservation using "
<< "buffer pool client: "
<< reservation_manager_.buffer_pool_client()->DebugString();
return Status::OK();
}
Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
DCHECK(!IsFull()) << "Cannot AddBatch on a full SpillableRowBatchQueue";
DCHECK(!closed_) << "Cannot AddBatch on a closed SpillableRowBatchQueue";
Status status;
FOREACH_ROW(batch, 0, batch_itr) {
// AddRow should only return false if there was not enough unused reservation to
// allocate a page for the given row. If a row cannot be added to the batch_queue_
// then start spilling to disk by unpining the stream. Once the stream is unpinned,
// adding the row to the stream should succeed unless the unpinned pages needed to
// be spilled and either (1) there was an error (e.g. IO error) when writing to disk,
// (2) there is no more scratch space left to write to disk, or (3) spilling to disk
// is disabled.
if (UNLIKELY(!batch_queue_->AddRow(batch_itr.Get(), &status))) {
RETURN_IF_ERROR(status);
// StartSpilling checks if spilling is disabled and returns an error if it is not.
RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
// The pin should be stream at this point.
DCHECK(batch_queue_->is_pinned());
DCHECK_EQ(batch_queue_->bytes_unpinned(), 0);
// Unpin the stream and then add the row.
RETURN_IF_ERROR(
batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
// Append "Spilled" to the "ExecOption" info string in the runtime profile.
profile_->AppendExecOption("Spilled");
if (!batch_queue_->AddRow(batch_itr.Get(), &status)) {
RETURN_IF_ERROR(status);
// If the row could not be added after the stream was unpinned, an error should
// have been set.
DCHECK(false) << "Rows should be added in unpinned mode unless an error occurred";
}
}
}
return Status::OK();
}
Status SpillableRowBatchQueue::GetBatch(RowBatch* batch) {
DCHECK(!IsEmpty()) << "Cannot GetBatch on an empty SpillableRowBatchQueue";
DCHECK(!closed_) << "Cannot GetBatch on a closed SpillableRowBatchQueue";
bool eos = false;
RETURN_IF_ERROR(batch_queue_->GetNext(batch, &eos));
// Validate that the value of eos is consistent with IsEmpty().
DCHECK_EQ(eos, IsEmpty());
return Status::OK();
}
bool SpillableRowBatchQueue::IsFull() const {
// The queue is considered full if the number of unpinned bytes is greater than the
// max number of unpinned bytes. The queue can only be full after the stream has been
// unpinned. The number of unpinned bytes in the stream may exceed the imposed limit
// because the entire stream is unpinned at once, without checking against the
// max_unpinned_bytes_ limit.
DCHECK(!closed_);
return batch_queue_->bytes_unpinned() >= max_unpinned_bytes_;
}
bool SpillableRowBatchQueue::IsEmpty() const {
// The batch_queue_ tracks how many rows have been added to the stream (regardless of
// whether those rows have already been removed) and how many rows have been read from
// the stream. If these values are equal, the queue is considered empty.
DCHECK(!closed_);
return batch_queue_->num_rows() == batch_queue_->rows_returned();
}
bool SpillableRowBatchQueue::IsOpen() const {
return !closed_;
}
void SpillableRowBatchQueue::Close() {
if (closed_) return;
if (batch_queue_ != nullptr) {
batch_queue_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
}
reservation_manager_.Close(state_);
closed_ = true;
}
} // namespace impala