blob: 891216e839c39d6b725c0a68f02a8ca716d4167a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/bufferpool/buffer-pool-internal.h"
#include <limits>
#include <sstream>
#include <boost/bind.hpp>
#include "common/names.h"
#include "gutil/strings/substitute.h"
#include "runtime/bufferpool/buffer-allocator.h"
#include "util/bit-util.h"
#include "util/cpu-info.h"
#include "util/metrics.h"
#include "util/runtime-profile-counters.h"
#include "util/time.h"
#include "util/uid-util.h"
DEFINE_int32(concurrent_scratch_ios_per_device, 2,
"Set this to influence the number of concurrent write I/Os issues to write data to "
"scratch files. This is multiplied by the number of active scratch directories to "
"obtain the target number of scratch write I/Os per query.");
namespace impala {
constexpr int BufferPool::LOG_MAX_BUFFER_BYTES;
constexpr int64_t BufferPool::MAX_BUFFER_BYTES;
void BufferPool::BufferHandle::Open(uint8_t* data, int64_t len, int home_core) {
DCHECK_LE(0, home_core);
DCHECK_LT(home_core, CpuInfo::GetMaxNumCores());
client_ = nullptr;
data_ = data;
len_ = len;
home_core_ = home_core;
}
BufferPool::PageHandle::PageHandle() {
Reset();
}
BufferPool::PageHandle::PageHandle(PageHandle&& src) {
Reset();
*this = std::move(src);
}
BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
DCHECK(!is_open());
// Copy over all members then close src.
page_ = src.page_;
client_ = src.client_;
src.Reset();
return *this;
}
void BufferPool::PageHandle::Open(Page* page, ClientHandle* client) {
DCHECK(!is_open());
page_ = page;
client_ = client;
}
void BufferPool::PageHandle::Reset() {
page_ = NULL;
client_ = NULL;
}
int BufferPool::PageHandle::pin_count() const {
DCHECK(is_open());
// The pin count can only be modified via this PageHandle, which must not be
// concurrently accessed by multiple threads, so it is safe to access without locking
return page_->pin_count;
}
int64_t BufferPool::PageHandle::len() const {
DCHECK(is_open());
return page_->len; // Does not require locking.
}
Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const {
DCHECK(is_open());
DCHECK(client_->is_registered());
DCHECK(is_pinned());
if (page_->pin_in_flight) {
// Finish the work started in Pin().
RETURN_IF_ERROR(client_->impl_->FinishMoveEvictedToPinned(page_));
}
DCHECK(!page_->pin_in_flight);
*buffer = &page_->buffer;
DCHECK((*buffer)->is_open());
return Status::OK();
}
BufferPool::BufferPool(MetricGroup* metrics, int64_t min_buffer_len,
int64_t buffer_bytes_limit, int64_t clean_page_bytes_limit)
: allocator_(new BufferAllocator(
this, metrics, min_buffer_len, buffer_bytes_limit, clean_page_bytes_limit)),
min_buffer_len_(min_buffer_len) {
DCHECK_GT(min_buffer_len, 0);
DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
}
BufferPool::~BufferPool() {}
Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group,
ReservationTracker* parent_reservation, MemTracker* mem_tracker,
int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
MemLimit mem_limit_mode) {
DCHECK(!client->is_registered());
DCHECK(parent_reservation != NULL);
client->impl_ = new Client(this, file_group, name, parent_reservation, mem_tracker,
mem_limit_mode, reservation_limit, profile);
return Status::OK();
}
void BufferPool::DeregisterClient(ClientHandle* client) {
if (!client->is_registered()) return;
client->impl_->Close(); // Will DCHECK if any remaining buffers or pinned pages.
delete client->impl_; // Will DCHECK if there are any remaining pages.
client->impl_ = NULL;
}
Status BufferPool::CreatePage(
ClientHandle* client, int64_t len, PageHandle* handle, const BufferHandle** buffer) {
DCHECK(!handle->is_open());
DCHECK_GE(len, min_buffer_len_);
DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
BufferHandle new_buffer;
// No changes have been made to state yet, so we can cleanly return on error.
RETURN_IF_ERROR(AllocateBuffer(client, len, &new_buffer));
Page* page = client->impl_->CreatePinnedPage(move(new_buffer));
handle->Open(page, client);
if (buffer != nullptr) *buffer = &page->buffer;
return Status::OK();
}
void BufferPool::DestroyPage(ClientHandle* client, PageHandle* handle) {
if (!handle->is_open()) return; // DestroyPage() should be idempotent.
if (handle->is_pinned()) {
// Cancel the read I/O - we don't need the data any more.
if (handle->page_->pin_in_flight) {
handle->page_->write_handle->CancelRead();
handle->page_->pin_in_flight = false;
}
// In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
// of cleaning up the page, freeing the buffer and updating reservations correctly.
BufferHandle buffer;
Status status = ExtractBuffer(client, handle, &buffer);
DCHECK(status.ok()) << status.msg().msg();
FreeBuffer(client, &buffer);
} else {
// In the unpinned case, no reservations are used so we just clean up the page.
client->impl_->DestroyPageInternal(handle);
}
}
Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) {
DCHECK(client->is_registered());
DCHECK(handle->is_open());
DCHECK_EQ(handle->client_, client);
Page* page = handle->page_;
if (page->pin_count == 0) {
RETURN_IF_ERROR(client->impl_->StartMoveToPinned(client, page));
COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, -page->len);
}
// Update accounting last to avoid complicating the error return path above.
++page->pin_count;
client->impl_->reservation()->AllocateFrom(page->len);
return Status::OK();
}
void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
DCHECK(handle->is_open());
DCHECK(client->is_registered());
DCHECK_EQ(handle->client_, client);
// If handle is pinned, we can assume that the page itself is pinned.
DCHECK(handle->is_pinned());
Page* page = handle->page_;
ReservationTracker* reservation = client->impl_->reservation();
reservation->ReleaseTo(page->len);
if (--page->pin_count > 0) return;
if (page->pin_in_flight) {
// Data is not in memory - move it back to evicted.
client->impl_->UndoMoveEvictedToPinned(page);
} else {
// Data is in memory - move it to dirty unpinned.
client->impl_->MoveToDirtyUnpinned(page);
}
COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
}
Status BufferPool::ExtractBuffer(
ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
DCHECK(page_handle->is_pinned());
DCHECK(!buffer_handle->is_open());
DCHECK_EQ(page_handle->client_, client);
// If an async pin is in flight, we need to wait for it.
const BufferHandle* dummy;
RETURN_IF_ERROR(page_handle->GetBuffer(&dummy));
// Bring the pin count to 1 so that we're not using surplus reservations.
while (page_handle->pin_count() > 1) Unpin(client, page_handle);
// Destroy the page and extract the buffer.
client->impl_->DestroyPageInternal(page_handle, buffer_handle);
DCHECK(buffer_handle->is_open());
return Status::OK();
}
Status BufferPool::AllocateBuffer(
ClientHandle* client, int64_t len, BufferHandle* handle) {
RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, true, nullptr));
Status status = allocator_->Allocate(client, len, handle);
// If the allocation failed, update client's accounting to reflect the failure.
if (!status.ok()) client->impl_->FreedBuffer(len);
return status;
}
Status BufferPool::AllocateUnreservedBuffer(
ClientHandle* client, int64_t len, BufferHandle* handle) {
DCHECK(!handle->is_open());
bool success;
RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, false, &success));
if (!success) return Status::OK(); // Leave 'handle' closed to indicate failure.
Status status = allocator_->Allocate(client, len, handle);
// If the allocation failed, update client's accounting to reflect the failure.
if (!status.ok()) client->impl_->FreedBuffer(len);
return status;
}
void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
if (!handle->is_open()) return; // Should be idempotent.
DCHECK_EQ(client, handle->client_);
int64_t len = handle->len_;
allocator_->Free(move(*handle));
client->impl_->FreedBuffer(len);
}
Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src,
ClientHandle* dst_client, BufferHandle* dst) {
DCHECK(src->is_open());
DCHECK(!dst->is_open());
DCHECK_EQ(src_client, src->client_);
DCHECK_NE(src, dst);
DCHECK_NE(src_client, dst_client);
dst_client->impl_->reservation()->AllocateFrom(src->len());
src_client->impl_->reservation()->ReleaseTo(src->len());
*dst = std::move(*src);
dst->client_ = dst_client;
return Status::OK();
}
void BufferPool::Maintenance() {
allocator_->Maintenance();
}
void BufferPool::ReleaseMemory(int64_t bytes_to_free) {
allocator_->ReleaseMemory(bytes_to_free);
}
int64_t BufferPool::GetSystemBytesLimit() const {
return allocator_->system_bytes_limit();
}
int64_t BufferPool::GetSystemBytesAllocated() const {
return allocator_->GetSystemBytesAllocated();
}
int64_t BufferPool::GetCleanPageBytesLimit() const {
return allocator_->GetCleanPageBytesLimit();
}
int64_t BufferPool::GetNumCleanPages() const {
return allocator_->GetNumCleanPages();
}
int64_t BufferPool::GetCleanPageBytes() const {
return allocator_->GetCleanPageBytes();
}
int64_t BufferPool::GetNumFreeBuffers() const {
return allocator_->GetNumFreeBuffers();
}
int64_t BufferPool::GetFreeBufferBytes() const {
return allocator_->GetFreeBufferBytes();
}
bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
return impl_->reservation()->IncreaseReservation(bytes);
}
bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) {
return impl_->reservation()->IncreaseReservationToFit(bytes);
}
Status BufferPool::ClientHandle::DecreaseReservationTo(
int64_t max_decrease, int64_t target_bytes) {
return impl_->DecreaseReservationTo(max_decrease, target_bytes);
}
int64_t BufferPool::ClientHandle::GetReservation() const {
return impl_->reservation()->GetReservation();
}
int64_t BufferPool::ClientHandle::GetUsedReservation() const {
return impl_->reservation()->GetUsedReservation();
}
int64_t BufferPool::ClientHandle::GetUnusedReservation() const {
return impl_->reservation()->GetUnusedReservation();
}
bool BufferPool::ClientHandle::TransferReservationFrom(
ReservationTracker* src, int64_t bytes) {
DCHECK(!impl_->has_unpinned_pages());
return src->TransferReservationTo(impl_->reservation(), bytes);
}
bool BufferPool::ClientHandle::TransferReservationTo(
ReservationTracker* dst, int64_t bytes) {
return impl_->reservation()->TransferReservationTo(dst, bytes);
}
void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) {
DCHECK_EQ(dst->tracker_->parent(), impl_->reservation());
bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes);
DCHECK(success); // SubReservation should not have a limit, so this shouldn't fail.
}
void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t bytes) {
DCHECK_EQ(src->tracker_->parent(), impl_->reservation());
bool success = src->tracker_->TransferReservationTo(impl_->reservation(), bytes);
DCHECK(success); // Transferring reservation to parent shouldn't fail.
}
void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double probability) {
impl_->reservation()->SetDebugDenyIncreaseReservation(probability);
}
bool BufferPool::ClientHandle::has_unpinned_pages() const {
return impl_->has_unpinned_pages();
}
BufferPool::SubReservation::SubReservation(ClientHandle* client) {
tracker_.reset(new ReservationTracker);
tracker_->InitChildTracker(
nullptr, client->impl_->reservation(), nullptr, numeric_limits<int64_t>::max());
}
BufferPool::SubReservation::~SubReservation() {}
int64_t BufferPool::SubReservation::GetReservation() const {
return tracker_->GetReservation();
}
void BufferPool::SubReservation::Close() {
// Give any reservation back to the client.
if (is_closed()) return;
bool success =
tracker_->TransferReservationTo(tracker_->parent(), tracker_->GetReservation());
DCHECK(success); // Transferring reservation to parent shouldn't fail.
tracker_->Close();
tracker_.reset();
}
BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker,
MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile)
: pool_(pool),
file_group_(file_group),
name_(name),
debug_write_delay_ms_(0),
num_pages_(0),
buffers_allocated_bytes_(0) {
// Set up a child profile with buffer pool info.
RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true);
reservation_.InitChildTracker(
child_profile, parent_reservation, mem_tracker, reservation_limit, mem_limit_mode);
counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
counters_.sys_alloc_time = ADD_TIMER(child_profile, "SystemAllocTime");
counters_.cumulative_allocations =
ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
counters_.cumulative_bytes_alloced =
ADD_COUNTER(child_profile, "CumulativeAllocationBytes", TUnit::BYTES);
counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime");
counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT);
counters_.bytes_read = ADD_COUNTER(child_profile, "ReadIoBytes", TUnit::BYTES);
counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime");
counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT);
counters_.bytes_written = ADD_COUNTER(child_profile, "WriteIoBytes", TUnit::BYTES);
counters_.peak_unpinned_bytes =
child_profile->AddHighWaterMarkCounter("PeakUnpinnedBytes", TUnit::BYTES);
}
BufferPool::Page* BufferPool::Client::CreatePinnedPage(BufferHandle&& buffer) {
Page* page = new Page(this, buffer.len());
page->buffer = move(buffer);
page->pin_count = 1;
boost::lock_guard<boost::mutex> lock(lock_);
// The buffer is transferred to the page so will be accounted for in
// pinned_pages_.bytes() instead of buffers_allocated_bytes_.
buffers_allocated_bytes_ -= page->len;
pinned_pages_.Enqueue(page);
++num_pages_;
DCHECK_CONSISTENCY();
return page;
}
void BufferPool::Client::DestroyPageInternal(
PageHandle* handle, BufferHandle* out_buffer) {
DCHECK(handle->is_pinned() || out_buffer == NULL);
Page* page = handle->page_;
// Remove the page from the list that it is currently present in (if any).
{
unique_lock<mutex> cl(lock_);
// First try to remove from the pinned or dirty unpinned lists.
if (!pinned_pages_.Remove(page) && !dirty_unpinned_pages_.Remove(page)) {
// The page either has a write in flight, is clean, or is evicted.
// Let the write complete, if in flight.
WaitForWrite(&cl, page);
// If clean, remove it from the clean pages list. If evicted, this is a no-op.
pool_->allocator_->RemoveCleanPage(cl, out_buffer != nullptr, page);
}
DCHECK(!page->in_queue());
--num_pages_;
}
if (page->write_handle != NULL) {
// Discard any on-disk data.
file_group_->DestroyWriteHandle(move(page->write_handle));
}
if (out_buffer != NULL) {
DCHECK(page->buffer.is_open());
*out_buffer = std::move(page->buffer);
buffers_allocated_bytes_ += out_buffer->len();
} else if (page->buffer.is_open()) {
pool_->allocator_->Free(move(page->buffer));
}
delete page;
handle->Reset();
}
void BufferPool::Client::MoveToDirtyUnpinned(Page* page) {
// Only valid to unpin pages if spilling is enabled.
DCHECK(spilling_enabled());
DCHECK_EQ(0, page->pin_count);
unique_lock<mutex> lock(lock_);
DCHECK_CONSISTENCY();
DCHECK(pinned_pages_.Contains(page));
pinned_pages_.Remove(page);
dirty_unpinned_pages_.Enqueue(page);
// Check if we should initiate writes for this (or another) dirty page.
WriteDirtyPagesAsync();
}
Status BufferPool::Client::StartMoveToPinned(ClientHandle* client, Page* page) {
unique_lock<mutex> cl(lock_);
DCHECK_CONSISTENCY();
// Propagate any write errors that occurred for this client.
RETURN_IF_ERROR(write_status_);
if (dirty_unpinned_pages_.Remove(page)) {
// No writes were initiated for the page - just move it back to the pinned state.
pinned_pages_.Enqueue(page);
return Status::OK();
}
if (in_flight_write_pages_.Contains(page)) {
// A write is in flight. If so, wait for it to complete - then we only have to
// handle the pinned and evicted cases.
WaitForWrite(&cl, page);
RETURN_IF_ERROR(write_status_); // The write may have set 'write_status_'.
}
// At this point we need to either reclaim a clean page or allocate a new buffer.
// We may need to clean some pages to do so.
RETURN_IF_ERROR(CleanPages(&cl, page->len));
if (pool_->allocator_->RemoveCleanPage(cl, true, page)) {
// The clean page still has an associated buffer. Restore the data, and move the page
// back to the pinned state.
pinned_pages_.Enqueue(page);
DCHECK(page->buffer.is_open());
DCHECK(page->write_handle != NULL);
// Don't need on-disk data.
cl.unlock(); // Don't block progress for other threads operating on other pages.
return file_group_->RestoreData(move(page->write_handle), page->buffer.mem_range());
}
// If the page wasn't in the clean pages list, it must have been evicted.
return StartMoveEvictedToPinned(&cl, client, page);
}
Status BufferPool::Client::StartMoveEvictedToPinned(
unique_lock<mutex>* client_lock, ClientHandle* client, Page* page) {
DCHECK(!page->buffer.is_open());
// Safe to modify the page's buffer handle without holding the page lock because no
// concurrent operations can modify evicted pages.
BufferHandle buffer;
RETURN_IF_ERROR(pool_->allocator_->Allocate(client, page->len, &page->buffer));
COUNTER_ADD(counters().bytes_read, page->len);
COUNTER_ADD(counters().read_io_ops, 1);
RETURN_IF_ERROR(
file_group_->ReadAsync(page->write_handle.get(), page->buffer.mem_range()));
pinned_pages_.Enqueue(page);
page->pin_in_flight = true;
DCHECK_CONSISTENCY();
return Status::OK();
}
void BufferPool::Client::UndoMoveEvictedToPinned(Page* page) {
// We need to get the page back to the evicted state where:
// * There is no in-flight read.
// * The page's data is on disk referenced by 'write_handle'
// * The page has no attached buffer.
DCHECK(page->pin_in_flight);
page->write_handle->CancelRead();
page->pin_in_flight = false;
unique_lock<mutex> lock(lock_);
DCHECK_CONSISTENCY();
DCHECK(pinned_pages_.Contains(page));
pinned_pages_.Remove(page);
// Discard the buffer - the pin was in flight so there was no way that a valid
// reference to the buffer's contents was returned since the pin was still in flight.
pool_->allocator_->Free(move(page->buffer));
}
Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
DCHECK(page->pin_in_flight);
SCOPED_TIMER(counters().read_wait_time);
// Don't hold any locks while reading back the data. It is safe to modify the page's
// buffer handle without holding any locks because no concurrent operations can modify
// evicted pages.
RETURN_IF_ERROR(
file_group_->WaitForAsyncRead(page->write_handle.get(), page->buffer.mem_range()));
file_group_->DestroyWriteHandle(move(page->write_handle));
page->pin_in_flight = false;
return Status::OK();
}
Status BufferPool::Client::PrepareToAllocateBuffer(
int64_t len, bool reserved, bool* success) {
if (success != nullptr) *success = false;
// Don't need to hold the client's 'lock_' yet because 'reservation_' operations are
// threadsafe.
if (reserved) {
// The client must have already reserved the memory.
reservation_.AllocateFrom(len);
} else {
DCHECK(success != nullptr);
// The client may not have reserved the memory.
if (!reservation_.IncreaseReservationToFitAndAllocate(len)) return Status::OK();
}
{
unique_lock<mutex> lock(lock_);
// Clean enough pages to allow allocation to proceed without violating our eviction
// policy.
Status status = CleanPages(&lock, len);
if (!status.ok()) {
// Reverse the allocation.
reservation_.ReleaseTo(len);
return status;
}
buffers_allocated_bytes_ += len;
DCHECK_CONSISTENCY();
}
if (success != nullptr) *success = true;
return Status::OK();
}
Status BufferPool::Client::DecreaseReservationTo(
int64_t max_decrease, int64_t target_bytes) {
unique_lock<mutex> lock(lock_);
// Get a snapshot of the current reservation. Reservation may be increased concurrently
// without holding 'lock_' but cannot be decreased, so the end result may be higher
// than 'target_bytes' if another thread concurrently increases reservation. This
// interleaving of threads gives the same result as IncreaseReservation() running
// after DecreaseReservationTo(). Similarly, running IncreaseReservationToFit() and
// DecreaseReservationTo() concurrently can lead to a range of outcomes, but that
// is unavoidable by the nature of the methods.
int64_t current_reservation = reservation_.GetReservation();
DCHECK_GE(current_reservation, target_bytes);
int64_t amount_to_free = min(max_decrease, current_reservation - target_bytes);
if (amount_to_free == 0) return Status::OK();
// Clean enough pages to allow us to safely release reservation.
RETURN_IF_ERROR(CleanPages(&lock, amount_to_free));
reservation_.DecreaseReservation(amount_to_free);
return Status::OK();
}
Status BufferPool::Client::CleanPages(unique_lock<mutex>* client_lock, int64_t len) {
DCheckHoldsLock(*client_lock);
DCHECK_CONSISTENCY();
// Work out what we need to get bytes of dirty unpinned + in flight pages down to
// in order to satisfy the eviction policy.
int64_t target_dirty_bytes = reservation_.GetReservation() - buffers_allocated_bytes_
- pinned_pages_.bytes() - len;
// Start enough writes to ensure that the loop condition below will eventually become
// false (or a write error will be encountered).
int64_t min_bytes_to_write =
max<int64_t>(0, dirty_unpinned_pages_.bytes() - target_dirty_bytes);
WriteDirtyPagesAsync(min_bytes_to_write);
// One of the writes we initiated, or an earlier in-flight write may have hit an error.
RETURN_IF_ERROR(write_status_);
// Wait until enough writes have finished so that we can make the allocation without
// violating the eviction policy. I.e. so that other clients can immediately get the
// memory they're entitled to without waiting for this client's write to complete.
DCHECK_GE(in_flight_write_pages_.bytes(), min_bytes_to_write) << DebugStringLocked();
while (dirty_unpinned_pages_.bytes() + in_flight_write_pages_.bytes()
> target_dirty_bytes) {
SCOPED_TIMER(counters().write_wait_time);
write_complete_cv_.Wait(*client_lock);
RETURN_IF_ERROR(write_status_); // Check if error occurred while waiting.
}
return Status::OK();
}
void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) {
DCHECK_GE(min_bytes_to_write, 0) << DebugStringLocked();
DCHECK_LE(min_bytes_to_write, dirty_unpinned_pages_.bytes()) << DebugStringLocked();
if (file_group_ == NULL) {
// Spilling disabled - there should be no unpinned pages to write.
DCHECK_EQ(0, min_bytes_to_write);
DCHECK_EQ(0, dirty_unpinned_pages_.bytes());
return;
}
// No point in starting writes if an error occurred because future operations for the
// client will fail regardless.
if (!write_status_.ok()) return;
// Compute the ideal amount of writes to start. We use a simple heuristic based on the
// total number of writes. The FileGroup's allocation should spread the writes across
// disks somewhat, but doesn't guarantee we're fully using all available disks. In
// future we could track the # of writes per-disk.
const int64_t target_writes = FLAGS_concurrent_scratch_ios_per_device
* file_group_->tmp_file_mgr()->NumActiveTmpDevices();
int64_t bytes_written = 0;
while (!dirty_unpinned_pages_.empty()
&& (bytes_written < min_bytes_to_write
|| in_flight_write_pages_.size() < target_writes)) {
Page* page = dirty_unpinned_pages_.tail(); // LIFO.
DCHECK(page != NULL) << "Should have been enough dirty unpinned pages";
{
lock_guard<SpinLock> pl(page->buffer_lock);
DCHECK(file_group_ != NULL);
DCHECK(page->buffer.is_open());
COUNTER_ADD(counters().bytes_written, page->len);
COUNTER_ADD(counters().write_io_ops, 1);
Status status = file_group_->Write(page->buffer.mem_range(),
[this, page](const Status& write_status) {
WriteCompleteCallback(page, write_status);
},
&page->write_handle);
// Exit early on error: there is no point in starting more writes because future
/// operations for this client will fail regardless.
if (!status.ok()) {
write_status_.MergeStatus(status);
return;
}
}
// Now that the write is in flight, update all the state
Page* tmp = dirty_unpinned_pages_.PopBack();
DCHECK_EQ(tmp, page);
in_flight_write_pages_.Enqueue(page);
bytes_written += page->len;
}
}
void BufferPool::Client::WriteCompleteCallback(Page* page, const Status& write_status) {
#ifndef NDEBUG
if (debug_write_delay_ms_ > 0) SleepForMs(debug_write_delay_ms_);
#endif
{
unique_lock<mutex> cl(lock_);
DCHECK(in_flight_write_pages_.Contains(page)) << DebugStringLocked();
// The status should always be propagated.
// TODO: if we add cancellation support to TmpFileMgr, consider cancellation path.
if (!write_status.ok()) write_status_.MergeStatus(write_status);
in_flight_write_pages_.Remove(page);
// Move to clean pages list even if an error was encountered - the buffer can be
// repurposed by other clients and 'write_status_' must be checked by this client
// before reading back the bad data.
pool_->allocator_->AddCleanPage(cl, page);
WriteDirtyPagesAsync(); // Start another asynchronous write if needed.
// Notify before releasing lock to avoid race with Page and Client destruction.
page->write_complete_cv_.NotifyAll();
write_complete_cv_.NotifyAll();
}
}
void BufferPool::Client::WaitForWrite(unique_lock<mutex>* client_lock, Page* page) {
DCheckHoldsLock(*client_lock);
while (in_flight_write_pages_.Contains(page)) {
SCOPED_TIMER(counters().write_wait_time);
page->write_complete_cv_.Wait(*client_lock);
}
}
void BufferPool::Client::WaitForAllWrites() {
unique_lock<mutex> cl(lock_);
while (in_flight_write_pages_.size() > 0) {
write_complete_cv_.Wait(cl);
}
}
string BufferPool::Client::DebugString() {
lock_guard<mutex> lock(lock_);
return DebugStringLocked();
}
string BufferPool::Client::DebugStringLocked() {
stringstream ss;
ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 "
"buffers allocated $3 num_pages: $4 pinned_bytes: $5 "
"dirty_unpinned_bytes: $6 in_flight_write_bytes: $7 reservation: {$8}",
this, name_, write_status_.GetDetail(), buffers_allocated_bytes_, num_pages_,
pinned_pages_.bytes(), dirty_unpinned_pages_.bytes(),
in_flight_write_pages_.bytes(), reservation_.DebugString());
ss << "\n " << pinned_pages_.size() << " pinned pages: ";
pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
ss << "\n " << dirty_unpinned_pages_.size() << " dirty unpinned pages: ";
dirty_unpinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
ss << "\n " << in_flight_write_pages_.size() << " in flight write pages: ";
in_flight_write_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
return ss.str();
}
string BufferPool::ClientHandle::DebugString() const {
if (is_registered()) {
return Substitute(
"<BufferPool::Client> $0 internal state: {$1}", this, impl_->DebugString());
} else {
return Substitute("<BufferPool::ClientHandle> $0 UNREGISTERED", this);
}
}
string BufferPool::PageHandle::DebugString() const {
if (is_open()) {
lock_guard<SpinLock> pl(page_->buffer_lock);
return Substitute("<BufferPool::PageHandle> $0 client: $1/$2 page: {$3}", this,
client_, client_->impl_, page_->DebugString());
} else {
return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
}
}
string BufferPool::Page::DebugString() {
return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3", this, len,
pin_count, buffer.DebugString());
}
bool BufferPool::Page::DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
lock_guard<SpinLock> pl(page->buffer_lock);
(*ss) << page->DebugString() << "\n";
return true;
}
string BufferPool::BufferHandle::DebugString() const {
if (is_open()) {
return Substitute("<BufferPool::BufferHandle> $0 client: $1/$2 data: $3 len: $4",
this, client_, client_->impl_, data_, len_);
} else {
return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
}
}
string BufferPool::DebugString() {
stringstream ss;
ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_ << "\n"
<< allocator_->DebugString();
return ss.str();
}
}