blob: 7839612272e1cbaa21cd12bcb7567f8b755e1db8 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RUNTIME_BUFFER_POOL_H
#define IMPALA_RUNTIME_BUFFER_POOL_H
#include <stdint.h>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <string>
#include "runtime/bufferpool/buffer-allocator.h"
#include "runtime/bufferpool/buffer-pool-counters.h"
#include "common/atomic.h"
#include "common/status.h"
#include "gutil/macros.h"
#include "util/internal-queue.h"
#include "util/spinlock.h"
namespace impala {
class BufferAllocator;
class ReservationTracker;
/// A buffer pool that manages memory buffers for all queries in an Impala daemon.
/// The buffer pool enforces buffer reservations, limits, and implements policies
/// for moving spilled memory from in-memory buffers to disk. It also enables reuse of
/// buffers between queries, to avoid frequent allocations.
///
/// The buffer pool can be used for allocating any large buffers (above a configurable
/// minimum length), whether or not the buffers will be spilled. Smaller allocations
/// are not serviced directly by the buffer pool: clients of the buffer pool must
/// subdivide buffers if they wish to use smaller allocations.
///
/// All buffer pool operations are in the context of a registered buffer pool client.
/// A buffer pool client should be created for every allocator of buffers at the level
/// of granularity required for reporting and enforcement of reservations, e.g. an exec
/// node. The client tracks buffer reservations via its ReservationTracker and also
/// includes info that is helpful for debugging (e.g. the operator that is associated
/// with the buffer). The client is not threadsafe, i.e. concurrent buffer pool
/// operations should not be invoked for the same client.
///
/// TODO:
/// * Implement spill-to-disk.
/// * Decide on, document, and enforce upper limits on page size.
///
/// Pages, Buffers and Pinning
/// ==========================
/// * A page is a logical block of memory that can reside in memory or on disk.
/// * A buffer is a physical block of memory that can hold a page in memory.
/// * A page handle is used by buffer pool clients to identify and access a page and
/// the corresponding buffer. Clients do not interact with pages directly.
/// * A buffer handle is used by buffer pool clients to identify and access a buffer.
/// * A page is pinned if it has pin count > 0. A pinned page stays mapped to the same
/// buffer.
/// * An unpinned page can be written out to disk by the buffer pool so that the buffer
/// can be used for another purpose.
///
/// Buffer/Page Sizes
/// =================
/// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and
/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size.
///
/// Reservations
/// ============
/// Before allocating buffers or pinning pages, a client must reserve memory through its
/// ReservationTracker. Reservation of n bytes give a client the right to allocate
/// buffers or pin pages summing up to n bytes. Reservations are both necessary and
/// sufficient for a client to allocate buffers or pin pages: the operations succeed
/// unless a "system error" such as a disk write error is encountered that prevents
/// unpinned pages from being to disk.
///
/// More memory may be reserved than is used, e.g. if a client is not using its full
/// reservation. In such cases, the buffer pool can use the free buffers in any way,
/// e.g. for keeping unpinned pages in memory, so long as it is able to fulfill the
/// reservations when needed, e.g. by flushing unpinned pages to disk.
///
/// Page/Buffer Handles
/// ===================
/// The buffer pool exposes PageHandles and BufferHandles, which are owned by clients of
/// the buffer pool, and act as a proxy for the internal data structure representing the
/// page or buffer in the buffer pool. Handles are "open" if they are associated with a
/// page or buffer. An open PageHandle is obtained by creating a page. PageHandles are
/// closed by calling BufferPool::DestroyPage(). An open BufferHandle is obtained by
/// allocating a buffer or extracting a BufferHandle from a PageHandle. A page's buffer
/// can also be accessed through the PageHandle. The handle destructors check for
/// resource leaks, e.g. an open handle that would result in a buffer leak.
///
/// Pin Counting of Page Handles:
/// ----------------------------------
/// Page handles are scoped to a client. The invariants are as follows:
/// * A page can only be accessed through an open handle.
/// * A page is destroyed once the handle is destroyed via DestroyPage().
/// * A page's buffer can only be accessed through a pinned handle.
/// * Pin() can be called on an open handle, incrementing the handle's pin count.
/// * Unpin() can be called on a pinned handle, but not an unpinned handle.
/// * Pin() always increases usage of reservations, and Unpin() always decreases usage,
/// i.e. the handle consumes <pin count> * <page size> bytes of reservation.
///
/// Example Usage: Buffers
/// ==================================
/// The simplest use case is to allocate a memory buffer.
/// * The new buffer is created with AllocateBuffer().
/// * The client reads and writes to the buffer as it sees fit.
/// * If the client is done with the buffer's contents it can call FreeBuffer() to
/// destroy the handle and free the buffer, or use TransferBuffer() to transfer
/// the buffer to a different client.
///
/// Example Usage: Spillable Pages
/// ==============================
/// * A spilling operator creates a new page with CreatePage().
/// * The client reads and writes to the page's buffer as it sees fit.
/// * If the operator encounters memory pressure, it can decrease reservation usage by
/// calling Unpin() on the page. The page may then be written to disk and its buffer
/// repurposed internally by BufferPool.
/// * Once the operator needs the page's contents again and has sufficient unused
/// reservation, it can call Pin(), which brings the page's contents back into memory,
/// perhaps in a different buffer. Therefore the operator must fix up any pointers into
/// the previous buffer.
/// * If the operator is done with the page, it can call FreeBuffer() to destroy the
/// handle and release resources, or call ExtractBuffer() to extract the buffer.
///
/// Synchronization
/// ===============
/// The data structures in the buffer pool itself are thread-safe. Client-owned data
/// structures - Client, PageHandle and BufferHandle - are not protected from concurrent
/// access by the buffer pool: clients must ensure that they do not invoke concurrent
/// operations with the same Client, PageHandle or BufferHandle.
//
/// +========================+
/// | IMPLEMENTATION DETAILS |
/// +========================+
///
/// Lock Ordering
/// =============
/// The lock ordering is:
/// * pages_::lock_ -> Page::lock_
///
/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held
/// until done with the page to ensure the page isn't concurrently deleted.
class BufferPool {
public:
class BufferHandle;
class Client;
class PageHandle;
/// Constructs a new buffer pool.
/// 'min_buffer_len': the minimum buffer length for the pool. Must be a power of two.
/// 'buffer_bytes_limit': the maximum physical memory in bytes that can be used by the
/// buffer pool. If 'buffer_bytes_limit' is not a multiple of 'min_buffer_len', the
/// remainder will not be usable.
BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit);
~BufferPool();
/// Register a client. Returns an error status and does not register the client if the
/// arguments are invalid. 'name' is an arbitrary name used to identify the client in
/// any errors messages or logging. Counters for this client are added to the (non-NULL)
/// 'profile'. 'client' is the client to register. 'client' should not already be
/// registered.
Status RegisterClient(const std::string& name, ReservationTracker* reservation,
RuntimeProfile* profile, Client* client);
/// Deregister 'client' if it is registered. Idempotent.
void DeregisterClient(Client* client);
/// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length
/// supported by BufferPool (see BufferPool class comment). The client must have
/// sufficient unused reservation to pin the new page (otherwise it will DCHECK).
/// CreatePage() only fails when a system error prevents the buffer pool from fulfilling
/// the reservation.
/// On success, the handle is mapped to the new page.
Status CreatePage(Client* client, int64_t len, PageHandle* handle);
/// Increment the pin count of 'handle'. After Pin() the underlying page will
/// be mapped to a buffer, which will be accessible through 'handle'. Uses
/// reservation from 'client'. The caller is responsible for ensuring it has enough
/// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only
/// fails when a system error prevents the buffer pool from fulfilling the reservation.
/// 'handle' must be open.
Status Pin(Client* client, PageHandle* handle);
/// Decrement the pin count of 'handle'. Decrease client's reservation usage. If the
/// handle's pin count becomes zero, it is no longer valid for the underlying page's
/// buffer to be accessed via 'handle'. If the page's total pin count across all
/// handles that reference it goes to zero, the page's data may be written to disk and
/// the buffer reclaimed. 'handle' must be open and have a pin count > 0.
/// TODO: once we implement spilling, it will be an error to call Unpin() with
/// spilling disabled. E.g. if Impala is running without scratch (we want to be
/// able to test Unpin() before we implement actual spilling).
void Unpin(Client* client, PageHandle* handle);
/// Destroy the page referenced by 'handle' (if 'handle' is open). Any buffers or disk
/// storage backing the page are freed. Idempotent. If the page is pinned, the
/// reservation usage is decreased accordingly.
void DestroyPage(Client* client, PageHandle* handle);
/// Extracts buffer from a pinned page. After this returns, the page referenced by
/// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from
/// 'page_handle'. This may decrease reservation usage of 'client' if the page was
/// pinned multiple times via 'page_handle'.
void ExtractBuffer(
Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
/// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
/// is responsible for ensuring it has enough unused reservation before calling
/// AllocateBuffer() (otherwise it will DCHECK). AllocateBuffer() only fails when
/// a system error prevents the buffer pool from fulfilling the reservation.
Status AllocateBuffer(Client* client, int64_t len, BufferHandle* handle);
/// If 'handle' is open, close 'handle', free the buffer and and decrease the
/// reservation usage from 'client'. Idempotent.
void FreeBuffer(Client* client, BufferHandle* handle);
/// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the
/// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and
/// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be
/// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different.
/// After a successful call, 'src' is closed and 'dst' is open.
Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client,
BufferHandle* dst);
/// Print a debug string with the state of the buffer pool.
std::string DebugString();
int64_t min_buffer_len() const;
int64_t buffer_bytes_limit() const;
private:
DISALLOW_COPY_AND_ASSIGN(BufferPool);
struct Page;
/// Same as Unpin(), except the lock for the page referenced by 'handle' must be held
/// by the caller.
void UnpinLocked(Client* client, PageHandle* handle);
/// Perform the cleanup of the page object and handle when the page is destroyed.
/// Reset 'handle', free the Page object and remove the 'pages_' entry.
/// The 'handle->page_' lock should *not* be held by the caller.
void CleanUpPage(PageHandle* handle);
/// Allocate a buffer of length 'len'. Assumes that the client's reservation has already
/// been consumed for the buffer. Returns an error if the pool is unable to fulfill the
/// reservation.
Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer);
/// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates
/// internal state but does not release to any reservation.
void FreeBufferInternal(BufferHandle* buffer);
/// Check if we can allocate another buffer of size 'len' bytes without
/// 'buffer_bytes_remaining_' going negative.
/// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful.
bool TryDecreaseBufferBytesRemaining(int64_t len);
/// Allocator for allocating and freeing all buffer memory.
boost::scoped_ptr<BufferAllocator> allocator_;
/// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two
/// multiple of this length. This is always a power of two.
const int64_t min_buffer_len_;
/// The maximum physical memory in bytes that can be used for buffers.
const int64_t buffer_bytes_limit_;
/// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for
/// allocating new buffers. Must be updated atomically before a new buffer is
/// allocated or after an existing buffer is freed.
AtomicInt64 buffer_bytes_remaining_;
/// List containing all pages. Protected by the list's internal lock.
typedef InternalQueue<Page> PageList;
PageList pages_;
};
/// External representation of a client of the BufferPool. Clients are used for
/// reservation accounting, and will be used in the future for tracking per-client
/// buffer pool counters. This class is the external handle for a client so
/// each Client instance is owned by the BufferPool's client, rather than the BufferPool.
/// Each Client should only be used by a single thread at a time: concurrently calling
/// Client methods or BufferPool methods with the Client as an argument is not supported.
class BufferPool::Client {
public:
Client() : reservation_(NULL) {}
/// Client must be deregistered.
~Client() { DCHECK(!is_registered()); }
bool is_registered() const { return reservation_ != NULL; }
ReservationTracker* reservation() { return reservation_; }
std::string DebugString() const;
private:
friend class BufferPool;
DISALLOW_COPY_AND_ASSIGN(Client);
/// Initialize 'counters_' and add the counters to 'profile'.
void InitCounters(RuntimeProfile* profile);
/// A name identifying the client.
std::string name_;
/// The reservation tracker for the client. NULL means the client isn't registered.
/// All pages pinned by the client count as usage against 'reservation_'.
ReservationTracker* reservation_;
/// The RuntimeProfile counters for this client. All non-NULL if is_registered()
/// is true.
BufferPoolClientCounters counters_;
};
/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
/// be used by a single thread at a time: concurrently calling BufferHandle methods or
/// BufferPool methods with the BufferHandle as an argument is not supported.
class BufferPool::BufferHandle {
public:
BufferHandle();
~BufferHandle() { DCHECK(!is_open()); }
/// Allow move construction of handles, to support std::move().
BufferHandle(BufferHandle&& src);
/// Allow move assignment of handles, to support STL classes like std::vector.
/// Destination must be uninitialized.
BufferHandle& operator=(BufferHandle&& src);
bool is_open() const { return data_ != NULL; }
int64_t len() const {
DCHECK(is_open());
return len_;
}
/// Get a pointer to the start of the buffer.
uint8_t* data() const {
DCHECK(is_open());
return data_;
}
std::string DebugString() const;
private:
DISALLOW_COPY_AND_ASSIGN(BufferHandle);
friend class BufferPool;
/// Internal helper to set the handle to an opened state.
void Open(const Client* client, uint8_t* data, int64_t len);
/// Internal helper to reset the handle to an unopened state.
void Reset();
/// The client the buffer handle belongs to, used to validate that the correct client
/// is provided in BufferPool method calls.
const Client* client_;
/// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
uint8_t* data_;
/// Length of the buffer in bytes.
int64_t len_;
};
/// The handle for a page used by clients of the BufferPool. Each PageHandle should
/// only be used by a single thread at a time: concurrently calling PageHandle methods
/// or BufferPool methods with the PageHandle as an argument is not supported.
class BufferPool::PageHandle {
public:
PageHandle();
~PageHandle() { DCHECK(!is_open()); }
// Allow move construction of page handles, to support std::move().
PageHandle(PageHandle&& src);
// Allow move assignment of page handles, to support STL classes like std::vector.
// Destination must be closed.
PageHandle& operator=(PageHandle&& src);
bool is_open() const { return page_ != NULL; }
bool is_pinned() const { return pin_count() > 0; }
int pin_count() const;
int64_t len() const;
/// Get a pointer to the start of the page's buffer. Only valid to call if the page
/// is pinned via this handle.
uint8_t* data() const { return buffer_handle()->data(); }
/// Return a pointer to the page's buffer handle. Only valid to call if the page is
/// pinned via this handle. Only const accessors of the returned handle can be used:
/// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify
/// the handle.
const BufferHandle* buffer_handle() const;
std::string DebugString() const;
private:
DISALLOW_COPY_AND_ASSIGN(PageHandle);
friend class BufferPool;
friend class Page;
/// Internal helper to open the handle for the given page.
void Open(Page* page, Client* client);
/// Internal helper to reset the handle to an unopened state.
void Reset();
/// The internal page structure. NULL if the handle is not open.
Page* page_;
/// The client the page handle belongs to, used to validate that the correct client
/// is being used.
const Client* client_;
};
}
#endif