blob: 1960bb0bc6fef8b10dd74cb4482514bf0eeeb81d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/bufferpool/buffer-pool.h"
#include <boost/bind.hpp>
#include <limits>
#include <sstream>
#include "runtime/bufferpool/reservation-tracker.h"
#include "common/names.h"
#include "gutil/strings/substitute.h"
#include "util/bit-util.h"
#include "util/runtime-profile-counters.h"
#include "util/uid-util.h"
namespace impala {
/// The internal representation of a page, which can be pinned or unpinned. If the
/// page is pinned, a buffer is associated with the page.
///
/// Code manipulating the page is responsible for acquiring 'lock' when reading or
/// modifying the page.
struct BufferPool::Page : public BufferPool::PageList::Node {
Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
/// Increment the pin count. Caller must hold 'lock'.
void IncrementPinCount(PageHandle* handle) {
lock.DCheckLocked();
++pin_count;
// Pinned page buffers may be modified by anyone with a pointer to the buffer, so we
// have to assume they are dirty.
dirty = true;
}
/// Decrement the pin count. Caller must hold 'lock'.
void DecrementPinCount(PageHandle* handle) {
lock.DCheckLocked();
DCHECK(pin_count > 0);
--pin_count;
}
string DebugString() {
return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this,
len, pin_count, buffer.DebugString(), dirty);
}
// Helper for BufferPool::DebugString().
static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
lock_guard<SpinLock> pl(page->lock);
(*ss) << page->DebugString() << "\n";
return true;
}
/// The length of the page in bytes.
const int64_t len;
/// Lock to protect the below members of Page. The lock must be held when modifying any
/// of the below members and when reading any of the below members of an unpinned page.
SpinLock lock;
/// The pin count of the page.
int pin_count;
/// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned
/// and was evicted from memory.
BufferHandle buffer;
/// True if the buffer's contents need to be saved before evicting it from memory.
bool dirty;
};
BufferPool::BufferHandle::BufferHandle() {
Reset();
}
BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
*this = std::move(src);
}
BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) {
DCHECK(!is_open());
// Copy over all members then close src.
client_ = src.client_;
data_ = src.data_;
len_ = src.len_;
src.Reset();
return *this;
}
void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) {
client_ = client;
data_ = data;
len_ = len;
}
void BufferPool::BufferHandle::Reset() {
client_ = NULL;
data_ = NULL;
len_ = -1;
}
BufferPool::PageHandle::PageHandle() {
Reset();
}
BufferPool::PageHandle::PageHandle(PageHandle&& src) {
*this = std::move(src);
}
BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
DCHECK(!is_open());
// Copy over all members then close src.
page_ = src.page_;
client_ = src.client_;
src.Reset();
return *this;
}
void BufferPool::PageHandle::Open(Page* page, Client* client) {
DCHECK(!is_open());
page->lock.DCheckLocked();
page_ = page;
client_ = client;
}
void BufferPool::PageHandle::Reset() {
page_ = NULL;
client_ = NULL;
}
int BufferPool::PageHandle::pin_count() const {
DCHECK(is_open());
// The pin count can only be modified via this PageHandle, which must not be
// concurrently accessed by multiple threads, so it is safe to access without locking
return page_->pin_count;
}
int64_t BufferPool::PageHandle::len() const {
DCHECK(is_open());
// The length of the page cannot change, so it is safe to access without locking.
return page_->len;
}
const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
DCHECK(is_pinned());
// The 'buffer' field cannot change while the page is pinned, so it is safe to access
// without locking.
return &page_->buffer;
}
BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
: allocator_(new BufferAllocator(min_buffer_len)),
min_buffer_len_(min_buffer_len),
buffer_bytes_limit_(buffer_bytes_limit),
buffer_bytes_remaining_(buffer_bytes_limit) {
DCHECK_GT(min_buffer_len, 0);
DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
}
BufferPool::~BufferPool() {
DCHECK(pages_.empty());
}
Status BufferPool::RegisterClient(
const string& name, ReservationTracker* reservation, RuntimeProfile* profile,
Client* client) {
DCHECK(!client->is_registered());
DCHECK(reservation != NULL);
client->InitCounters(profile);
client->reservation_ = reservation;
client->name_ = name;
return Status::OK();
}
void BufferPool::Client::InitCounters(RuntimeProfile* profile) {
counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadWaitTime");
counters_.write_wait_time = ADD_TIMER(profile, "BufferPoolWriteWaitTime");
counters_.peak_unpinned_bytes =
profile->AddHighWaterMarkCounter("BufferPoolPeakUnpinnedBytes", TUnit::BYTES);
counters_.total_unpinned_bytes =
ADD_COUNTER(profile, "BufferPoolTotalUnpinnedBytes", TUnit::BYTES);
}
void BufferPool::DeregisterClient(Client* client) {
if (!client->is_registered()) return;
client->reservation_->Close();
client->name_.clear();
client->reservation_ = NULL;
}
Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) {
DCHECK(!handle->is_open());
DCHECK_GE(len, min_buffer_len_);
DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
BufferHandle buffer;
// No changes have been made to state yet, so we can cleanly return on error.
RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
Page* page = new Page(len);
{
lock_guard<SpinLock> pl(page->lock);
page->buffer = std::move(buffer);
handle->Open(page, client);
page->IncrementPinCount(handle);
}
// Only add to globally-visible list after page is initialized. The page lock also
// needs to be released before enqueueing to respect the lock ordering.
pages_.Enqueue(page);
client->reservation_->AllocateFrom(len);
return Status::OK();
}
void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
if (!handle->is_open()) return; // DestroyPage() should be idempotent.
Page* page = handle->page_;
if (handle->is_pinned()) {
// In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
// of cleaning up the page and freeing the buffer.
BufferHandle buffer;
ExtractBuffer(client, handle, &buffer);
FreeBuffer(client, &buffer);
return;
}
{
lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
// In the unpinned case, no reservation is consumed, so just free the buffer.
// TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
}
CleanUpPage(handle);
}
void BufferPool::CleanUpPage(PageHandle* handle) {
// Remove the destroyed page from data structures in a way that ensures no other
// threads have a remaining reference. Threads that access pages via the 'pages_'
// list hold 'pages_.lock_', so Remove() will not return until those threads are done
// and it is safe to delete page.
pages_.Remove(handle->page_);
delete handle->page_;
handle->Reset();
}
Status BufferPool::Pin(Client* client, PageHandle* handle) {
DCHECK(client->is_registered());
DCHECK(handle->is_open());
DCHECK_EQ(handle->client_, client);
Page* page = handle->page_;
{
lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
if (page->pin_count == 0) {
if (!page->buffer.is_open()) {
// No changes have been made to state yet, so we can cleanly return on error.
RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer));
// TODO: will need to initiate/wait for read if the page is not in-memory.
}
COUNTER_ADD(client->counters_.peak_unpinned_bytes, -handle->len());
}
page->IncrementPinCount(handle);
}
client->reservation_->AllocateFrom(page->len);
return Status::OK();
}
void BufferPool::Unpin(Client* client, PageHandle* handle) {
DCHECK(handle->is_open());
lock_guard<SpinLock> pl(handle->page_->lock);
UnpinLocked(client, handle);
}
void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
DCHECK(client->is_registered());
DCHECK_EQ(handle->client_, client);
// If handle is pinned, we can assume that the page itself is pinned.
DCHECK(handle->is_pinned());
Page* page = handle->page_;
page->lock.DCheckLocked();
page->DecrementPinCount(handle);
client->reservation_->ReleaseTo(page->len);
COUNTER_ADD(client->counters_.total_unpinned_bytes, handle->len());
COUNTER_ADD(client->counters_.peak_unpinned_bytes, handle->len());
// TODO: can evict now. Only need to preserve contents if 'page->dirty' is true.
}
void BufferPool::ExtractBuffer(
Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
DCHECK(page_handle->is_pinned());
DCHECK_EQ(page_handle->client_, client);
Page* page = page_handle->page_;
{
lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
// TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
// Bring the pin count to 1 so that we're not using surplus reservations.
while (page->pin_count > 1) UnpinLocked(client, page_handle);
*buffer_handle = std::move(page->buffer);
}
CleanUpPage(page_handle);
}
Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) {
client->reservation_->AllocateFrom(len);
return AllocateBufferInternal(client, len, handle);
}
Status BufferPool::AllocateBufferInternal(
Client* client, int64_t len, BufferHandle* buffer) {
DCHECK(!buffer->is_open());
DCHECK_GE(len, min_buffer_len_);
DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
SCOPED_TIMER(client->counters_.get_buffer_time);
// If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer.
if (TryDecreaseBufferBytesRemaining(len)) {
uint8_t* data;
Status status = allocator_->Allocate(len, &data);
if (!status.ok()) {
buffer_bytes_remaining_.Add(len);
return status;
}
DCHECK(data != NULL);
buffer->Open(client, data, len);
return Status::OK();
}
// If there is no remaining capacity, we must evict another page.
return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is "
"not implemented yet!", buffer_bytes_limit_));
}
void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
if (!handle->is_open()) return; // Should be idempotent.
DCHECK_EQ(client, handle->client_);
client->reservation_->ReleaseTo(handle->len_);
FreeBufferInternal(handle);
}
void BufferPool::FreeBufferInternal(BufferHandle* handle) {
DCHECK(handle->is_open());
allocator_->Free(handle->data(), handle->len());
buffer_bytes_remaining_.Add(handle->len());
handle->Reset();
}
Status BufferPool::TransferBuffer(
Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) {
DCHECK(src->is_open());
DCHECK(!dst->is_open());
DCHECK_EQ(src_client, src->client_);
DCHECK_NE(src, dst);
DCHECK_NE(src_client, dst_client);
dst_client->reservation_->AllocateFrom(src->len());
src_client->reservation_->ReleaseTo(src->len());
*dst = std::move(*src);
dst->client_ = dst_client;
return Status::OK();
}
bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
// TODO: we may want to change this policy so that we don't always use up to the limit
// for buffers, since this may starve other operators using non-buffer-pool memory.
while (true) {
int64_t old_value = buffer_bytes_remaining_.Load();
if (old_value < len) return false;
int64_t new_value = old_value - len;
if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
return true;
}
}
}
string BufferPool::Client::DebugString() const {
if (is_registered()) {
return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_,
reservation_->DebugString());
} else {
return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
}
}
string BufferPool::PageHandle::DebugString() const {
if (is_open()) {
lock_guard<SpinLock> pl(page_->lock);
return Substitute(
"<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
this, client_->DebugString(), page_->DebugString());
} else {
return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
}
}
string BufferPool::BufferHandle::DebugString() const {
if (is_open()) {
return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this,
client_->DebugString(), data_, len_);
} else {
return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
}
}
string BufferPool::DebugString() {
stringstream ss;
ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
<< " buffer_bytes_limit: " << buffer_bytes_limit_
<< " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
return ss.str();
}
}