| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // This file includes definitions of classes used internally in the buffer pool. |
| // |
| /// +========================+ |
| /// | IMPLEMENTATION NOTES | |
| /// +========================+ |
| /// |
| /// Lock Ordering |
| /// ============= |
| /// The lock acquisition order is: |
| /// 1. Client::lock_ |
| /// 2. FreeBufferArena::lock_. If multiple arena locks are acquired, must be acquired in |
| /// ascending order. |
| /// 3. Page::lock |
| /// |
| /// If a reference to a Page is acquired through a page list, the Page* reference only |
| /// remains valid so long as list's lock is held. |
| /// |
| /// Page States |
| /// =========== |
| /// Each Page object is owned by at most one InternalList<Page> at any given point. |
| /// Each page is either pinned or unpinned. Unpinned has a number of sub-states, which |
| /// is determined by which list in Client/BufferPool contains the page. |
| /// * Pinned: Always in this state when 'pin_count' > 0. The page has a buffer and is in |
| /// Client::pinned_pages_. 'pin_in_flight' determines which sub-state the page is in: |
| /// -> When pin_in_flight=false, the buffer contains the page's data and the client can |
| /// read and write to the buffer. |
| /// -> When pin_in_flight=true, the page's data is in the process of being read from |
| /// scratch disk into the buffer. Clients will block on the read I/O if they attempt |
| /// to access the buffer. |
| /// * Unpinned - Dirty: When no write to scratch has been started for an unpinned page. |
| /// The page is in Client::dirty_unpinned_pages_. |
| /// * Unpinned - Write in flight: When the write to scratch has been started but not |
| /// completed for a dirty unpinned page. The page is in |
| /// Client::write_in_flight_pages_. For accounting purposes this is considered a |
| /// dirty page. |
| /// * Unpinned - Clean: When the write to scratch has completed but the page was not |
| /// evicted. The page is in a clean pages list in a BufferAllocator arena. |
| /// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The page is |
| /// not in any list. |
| /// |
| /// Page Eviction Policy |
| /// ==================== |
| /// The page eviction policy is designed so that clients that run only in-memory (i.e. |
| /// don't unpin pages) never block on I/O. To achieve this, we must be able to |
| /// fulfil reservations by either allocating buffers or evicting clean pages. Assuming |
| /// reservations are not overcommitted (they shouldn't be), this global invariant can be |
| /// maintained by enforcing a local invariant for every client: |
| /// |
| /// reservation >= BufferHandles returned to client |
| // + pinned pages + dirty pages (dirty unpinned or write in flight) |
| /// |
| /// The local invariant is maintained by writing pages to disk as the first step of any |
| /// operation that allocates a new buffer or reclaims buffers from clean pages. I.e. |
| /// "dirty pages" must be decreased before one of the other values on the R.H.S. of the |
| /// invariant can be increased. Operations block waiting for enough writes to complete |
| /// to satisfy the invariant. |
| |
| #ifndef IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H |
| #define IMPALA_RUNTIME_BUFFER_POOL_INTERNAL_H |
| |
| #include <memory> |
| #include <sstream> |
| |
| #include <boost/thread/mutex.hpp> |
| |
| #include "runtime/bufferpool/buffer-pool-counters.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/bufferpool/reservation-tracker.h" |
| #include "util/condition-variable.h" |
| |
| // Ensure that DCheckConsistency() function calls get removed in release builds. |
| #ifndef NDEBUG |
| #define DCHECK_CONSISTENCY() DCheckConsistency() |
| #else |
| #define DCHECK_CONSISTENCY() |
| #endif |
| |
| namespace impala { |
| |
| /// The internal representation of a page, which can be pinned or unpinned. See the |
| /// class comment for explanation of the different page states. |
| struct BufferPool::Page : public InternalList<Page>::Node { |
| Page(Client* client, int64_t len) |
| : client(client), len(len), pin_count(0), pin_in_flight(false) {} |
| |
| std::string DebugString(); |
| |
| // Helper for BufferPool::DebugString(). |
| static bool DebugStringCallback(std::stringstream* ss, BufferPool::Page* page); |
| |
| /// The client that the page belongs to. |
| Client* const client; |
| |
| /// The length of the page in bytes. |
| const int64_t len; |
| |
| /// The pin count of the page. Only accessed in contexts that are passed the associated |
| /// PageHandle, so it cannot be accessed by multiple threads concurrently. |
| int pin_count; |
| |
| /// True if the read I/O to pin the page was started but not completed. Only accessed |
| /// in contexts that are passed the associated PageHandle, so it cannot be accessed |
| /// by multiple threads concurrently. |
| bool pin_in_flight; |
| |
| /// Non-null if there is a write in flight, the page is clean, or the page is evicted. |
| std::unique_ptr<TmpFileMgr::WriteHandle> write_handle; |
| |
| /// Condition variable signalled when a write for this page completes. Protected by |
| /// client->lock_. |
| ConditionVariable write_complete_cv_; |
| |
| /// This lock must be held when accessing 'buffer' if the page is unpinned and not |
| /// evicted (i.e. it is safe to access 'buffer' if the page is pinned or evicted). |
| SpinLock buffer_lock; |
| |
| /// Buffer with the page's contents. Closed only iff page is evicted. Open otherwise. |
| BufferHandle buffer; |
| }; |
| |
| /// Wrapper around InternalList<Page> that tracks the # of bytes in the list. |
| class BufferPool::PageList { |
| public: |
| PageList() : bytes_(0) {} |
| ~PageList() { |
| // Clients always empty out their list before destruction. |
| DCHECK(list_.empty()); |
| DCHECK_EQ(0, bytes_); |
| } |
| |
| void Enqueue(Page* page) { |
| list_.Enqueue(page); |
| bytes_ += page->len; |
| } |
| |
| bool Remove(Page* page) { |
| if (list_.Remove(page)) { |
| bytes_ -= page->len; |
| return true; |
| } |
| return false; |
| } |
| |
| Page* Dequeue() { |
| Page* page = list_.Dequeue(); |
| if (page != nullptr) { |
| bytes_ -= page->len; |
| } |
| return page; |
| } |
| |
| Page* PopBack() { |
| Page* page = list_.PopBack(); |
| if (page != nullptr) { |
| bytes_ -= page->len; |
| } |
| return page; |
| } |
| |
| void Iterate(boost::function<bool(Page*)> fn) { list_.Iterate(fn); } |
| bool Contains(Page* page) { return list_.Contains(page); } |
| Page* tail() { return list_.tail(); } |
| bool empty() const { return list_.empty(); } |
| int size() const { return list_.size(); } |
| int64_t bytes() const { return bytes_; } |
| |
| void DCheckConsistency() { |
| DCHECK_GE(bytes_, 0); |
| DCHECK_EQ(list_.empty(), bytes_ == 0); |
| } |
| |
| private: |
| InternalList<Page> list_; |
| int64_t bytes_; |
| }; |
| |
| /// The internal state for the client. |
| class BufferPool::Client { |
| public: |
| Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name, |
| ReservationTracker* parent_reservation, MemTracker* mem_tracker, |
| MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile); |
| |
| ~Client() { |
| DCHECK_EQ(0, num_pages_); |
| DCHECK_EQ(0, buffers_allocated_bytes_); |
| } |
| |
| /// Release reservation for this client. |
| void Close() { reservation_.Close(); } |
| |
| /// Create a pinned page using 'buffer', which was allocated using AllocateBuffer(). |
| /// No client or page locks should be held by the caller. |
| Page* CreatePinnedPage(BufferHandle&& buffer); |
| |
| /// Reset 'handle', clean up references to handle->page and release any resources |
| /// associated with handle->page. If the page is pinned, 'out_buffer' can be passed in |
| /// and the page's buffer will be returned. |
| /// Neither the client's lock nor handle->page_->buffer_lock should be held by the |
| /// caller. |
| void DestroyPageInternal(PageHandle* handle, BufferHandle* out_buffer = NULL); |
| |
| /// Updates client state to reflect that 'page' is now a dirty unpinned page. May |
| /// initiate writes for this or other dirty unpinned pages. |
| /// Neither the client's lock nor page->buffer_lock should be held by the caller. |
| void MoveToDirtyUnpinned(Page* page); |
| |
| /// Move an unpinned page to the pinned state, moving between data structures and |
| /// reading from disk if necessary. Ensures the page has a buffer. If the data is |
| /// already in memory, ensures the data is in the page's buffer. If the data is on |
| /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to |
| /// true. Neither the client's lock nor page->buffer_lock should be held by the caller. |
| Status StartMoveToPinned(ClientHandle* client, Page* page) WARN_UNUSED_RESULT; |
| |
| /// Moves a page that has a pin in flight back to the evicted state, undoing |
| /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held |
| /// by the caller. |
| void UndoMoveEvictedToPinned(Page* page); |
| |
| /// Finish the work of bring the data of an evicted page to memory if |
| /// page->pin_in_flight was set to true by StartMoveToPinned(). |
| Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT; |
| |
| /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() or |
| /// AllocateUnreservedBuffer() APIs. Deducts from the client's reservation and updates |
| /// internal accounting. Cleans dirty pages if needed to satisfy the buffer pool's |
| /// internal invariants. No page or client locks should be held by the caller. |
| /// If 'reserved' is true, we assume that the memory is already reserved. If it is |
| /// false, tries to increase the reservation if needed. |
| /// |
| /// On success, returns OK and sets 'success' to true if non-NULL. If an error is |
| /// encountered, e.g. while cleaning pages, returns an error status. If the reservation |
| /// could not be increased for an unreserved allocation, returns OK and sets 'success' |
| /// to false (for unreserved allocations, 'success' must be non-NULL). |
| Status PrepareToAllocateBuffer( |
| int64_t len, bool reserved, bool* success) WARN_UNUSED_RESULT; |
| |
| /// Implementation of ClientHandle::DecreaseReservationTo(). |
| Status DecreaseReservationTo(int64_t max_decrease, int64_t target_bytes) WARN_UNUSED_RESULT; |
| |
| /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update |
| /// internal accounting and release the buffer to the client's reservation. No page or |
| /// client locks should be held by the caller. |
| void FreedBuffer(int64_t len) { |
| boost::lock_guard<boost::mutex> cl(lock_); |
| reservation_.ReleaseTo(len); |
| buffers_allocated_bytes_ -= len; |
| DCHECK_CONSISTENCY(); |
| } |
| |
| /// Wait for the in-flight write for 'page' to complete. |
| /// 'lock_' must be held by the caller via 'client_lock'. page->buffer_lock should |
| /// not be held. |
| void WaitForWrite(boost::unique_lock<boost::mutex>* client_lock, Page* page); |
| |
| /// Test helper: wait for all in-flight writes to complete. |
| /// 'lock_' must not be held by the caller. |
| void WaitForAllWrites(); |
| |
| /// Asserts that 'client_lock' is holding 'lock_'. |
| void DCheckHoldsLock(const boost::unique_lock<boost::mutex>& client_lock) { |
| DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock()); |
| } |
| |
| ReservationTracker* reservation() { return &reservation_; } |
| const BufferPoolClientCounters& counters() const { return counters_; } |
| bool spilling_enabled() const { return file_group_ != NULL; } |
| void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } |
| bool has_unpinned_pages() const { |
| // Safe to read without lock since other threads should not be calling BufferPool |
| // functions that create, destroy or unpin pages. |
| return pinned_pages_.size() < num_pages_; |
| } |
| |
| /// Print debugging info about the state of the client. Caller must not hold 'lock_'. |
| std::string DebugString(); |
| |
| private: |
| // Check consistency of client, DCHECK if inconsistent. 'lock_' must be held. |
| void DCheckConsistency() { |
| DCHECK_GE(buffers_allocated_bytes_, 0) << DebugStringLocked(); |
| pinned_pages_.DCheckConsistency(); |
| dirty_unpinned_pages_.DCheckConsistency(); |
| in_flight_write_pages_.DCheckConsistency(); |
| DCHECK_LE(pinned_pages_.size() + dirty_unpinned_pages_.size() |
| + in_flight_write_pages_.size(), |
| num_pages_) << DebugStringLocked(); |
| // Check that we flushed enough pages to disk given our eviction policy. |
| DCHECK_GE(reservation_.GetReservation(), buffers_allocated_bytes_ |
| + pinned_pages_.bytes() + dirty_unpinned_pages_.bytes() |
| + in_flight_write_pages_.bytes()) << DebugStringLocked(); |
| } |
| |
| /// Must be called once before allocating or reclaiming a buffer of 'len'. Ensures that |
| /// enough dirty pages are flushed to disk to satisfy the buffer pool's internal |
| /// invariants after the allocation. 'lock_' should be held by the caller via |
| /// 'client_lock' |
| Status CleanPages(boost::unique_lock<boost::mutex>* client_lock, int64_t len); |
| |
| /// Initiates asynchronous writes of dirty unpinned pages to disk. Ensures that at |
| /// least 'min_bytes_to_write' bytes of writes will be written asynchronously. May |
| /// start writes more aggressively so that I/O and compute can be overlapped. If |
| /// any errors are encountered, 'write_status_' is set. 'write_status_' must therefore |
| /// be checked before reading back any pages. 'lock_' must be held by the caller. |
| void WriteDirtyPagesAsync(int64_t min_bytes_to_write = 0); |
| |
| /// Called when a write for 'page' completes. |
| void WriteCompleteCallback(Page* page, const Status& write_status); |
| |
| /// Move an evicted page to the pinned state by allocating a new buffer, starting an |
| /// async read from disk and moving the page to 'pinned_pages_'. client->impl must be |
| /// locked by the caller via 'client_lock' and handle->page must be unlocked. |
| /// 'client_lock' is released then reacquired. |
| Status StartMoveEvictedToPinned( |
| boost::unique_lock<boost::mutex>* client_lock, ClientHandle* client, Page* page); |
| |
| /// Same as DebugString() except the caller must already hold 'lock_'. |
| std::string DebugStringLocked(); |
| |
| /// The buffer pool that owns the client. |
| BufferPool* const pool_; |
| |
| /// The file group that should be used for allocating scratch space. If NULL, spilling |
| /// is disabled. |
| TmpFileMgr::FileGroup* const file_group_; |
| |
| /// A name identifying the client. |
| const std::string name_; |
| |
| /// The reservation tracker for the client. All pages pinned by the client count as |
| /// usage against 'reservation_'. |
| ReservationTracker reservation_; |
| |
| /// The RuntimeProfile counters for this client, owned by the client's RuntimeProfile. |
| /// All non-NULL. |
| BufferPoolClientCounters counters_; |
| |
| /// Debug option to delay write completion. |
| int debug_write_delay_ms_; |
| |
| /// Lock to protect the below member variables; |
| boost::mutex lock_; |
| |
| /// Condition variable signalled when a write for this client completes. |
| ConditionVariable write_complete_cv_; |
| |
| /// All non-OK statuses returned by write operations are merged into this status. |
| /// All operations that depend on pages being written to disk successfully (e.g. |
| /// reading pages back from disk) must check 'write_status_' before proceeding, so |
| /// that write errors that occurred asynchronously are correctly propagated. The |
| /// write error is global to the client so can be propagated to any Status-returning |
| /// operation for the client (even for operations on different Pages or Buffers). |
| /// Write errors are not recoverable so it is best to propagate them as quickly |
| /// as possible, instead of waiting to propagate them in a specific way. |
| Status write_status_; |
| |
| /// Total number of pages for this client. Used for debugging and enforcing that all |
| /// pages are destroyed before the client. |
| int64_t num_pages_; |
| |
| /// Total bytes of buffers in BufferHandles returned to clients (i.e. obtained from |
| /// AllocateBuffer() or ExtractBuffer()). |
| int64_t buffers_allocated_bytes_; |
| |
| /// All pinned pages for this client. |
| PageList pinned_pages_; |
| |
| /// Dirty unpinned pages for this client for which writes are not in flight. Page |
| /// writes are started in LIFO order, because operators typically have sequential access |
| /// patterns where the most recently evicted page will be last to be read. |
| PageList dirty_unpinned_pages_; |
| |
| /// Dirty unpinned pages for this client for which writes are in flight. |
| PageList in_flight_write_pages_; |
| }; |
| } |
| |
| #endif |