| // Copyright 2010 Google Inc. All Rights Reserved |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // |
| // Classes for memory management, used by materializations |
| // (arenas, segments, and STL collections parametrized via arena allocators) |
| // so that memory usage can be controlled at the application level. |
| // |
| // Materializations can be parametrized by specifying an instance of a |
| // BufferAllocator. The allocator implements |
| // memory management policy (e.g. setting allocation limits). Allocators may |
| // be shared between multiple materializations; e.g. you can designate a |
| // single allocator per a single user request, thus setting bounds on memory |
| // usage on a per-request basis. |
| #pragma once |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cstdint> |
| #include <limits> |
| #include <memory> |
| #include <ostream> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/util/boost_mutex_utils.h" |
| #include "kudu/util/memory/overwrite.h" |
| #include "kudu/util/mutex.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/singleton.h" |
| |
| namespace kudu { |
| |
| class BufferAllocator; |
| class MemTracker; |
| |
| // Wrapper for a block of data allocated by a BufferAllocator. Owns the block. |
| // (To release the block, destroy the buffer - it will then return it via the |
| // same allocator that has been used to create it). |
| class Buffer { |
| public: |
| ~Buffer(); |
| |
| void* data() const { return data_; } // The data buffer. |
| size_t size() const { return size_; } // In bytes. |
| |
| private: |
| friend class BufferAllocator; |
| |
| Buffer(void* data, size_t size, BufferAllocator* allocator) |
| : data_(CHECK_NOTNULL(data)), |
| size_(size), |
| allocator_(allocator) { |
| #ifndef NDEBUG |
| OverwriteWithPattern(reinterpret_cast<char*>(data_), size_, |
| "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW" |
| "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW" |
| "NEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEWNEW"); |
| #endif |
| } |
| |
| // Called by a successful realloc. |
| void Update(void* new_data, size_t new_size) { |
| #ifndef NDEBUG |
| if (new_size > size_) { |
| OverwriteWithPattern(reinterpret_cast<char*>(new_data) + size_, |
| new_size - size_, "NEW"); |
| } |
| #endif |
| data_ = new_data; |
| size_ = new_size; |
| } |
| |
| void* data_; |
| size_t size_; |
| BufferAllocator* const allocator_; |
| DISALLOW_COPY_AND_ASSIGN(Buffer); |
| }; |
| |
| // Allocators allow applications to control memory usage. They are |
| // used by materializations to allocate blocks of memory arenas. |
| // BufferAllocator is an abstract class that defines a common contract of |
| // all implementations of allocators. Specific allocators provide specific |
| // features, e.g. enforced resource limits, thread safety, etc. |
| class BufferAllocator { |
| public: |
| virtual ~BufferAllocator() {} |
| |
| // Called by the user when a new block of memory is needed. The 'requested' |
| // parameter specifies how much memory (in bytes) the user would like to get. |
| // The 'minimal' parameter specifies how much he is willing to settle for. |
| // The allocator returns a buffer sized in the range [minimal, requested], |
| // or NULL if the request can't be satisfied. When the buffer is destroyed, |
| // its destructor calls the FreeInternal() method on its allocator. |
| // CAVEAT: The allocator must outlive all buffers returned by it. |
| // |
| // Corner cases: |
| // 1. If requested == 0, the allocator will always return a non-NULL Buffer |
| // with a non-NULL data pointer and zero capacity. |
| // 2. If minimal == 0, the allocator will always return a non-NULL Buffer |
| // with a non-NULL data pointer, possibly with zero capacity. |
| Buffer* BestEffortAllocate(size_t requested, size_t minimal) { |
| DCHECK_LE(minimal, requested); |
| Buffer* result = AllocateInternal(requested, minimal, this); |
| LogAllocation(requested, minimal, result); |
| return result; |
| } |
| |
| // Called by the user when a new block of memory is needed. Equivalent to |
| // BestEffortAllocate(requested, requested). |
| Buffer* Allocate(size_t requested) { |
| return BestEffortAllocate(requested, requested); |
| } |
| |
| // Called by the user when a previously allocated block needs to be resized. |
| // Mimics semantics of <cstdlib> realloc. The 'requested' and 'minimal' |
| // represent the desired final buffer size, with semantics as in the Allocate. |
| // If the 'buffer' parameter is NULL, the call is equivalent to |
| // Allocate(requested, minimal). Otherwise, a reallocation of the buffer's |
| // data is attempted. On success, the original 'buffer' parameter is returned, |
| // but the buffer itself might have updated size and data. On failure, |
| // returns NULL, and leaves the input buffer unmodified. |
| // Reallocation might happen in-place, preserving the original data |
| // pointer, but it is not guaranteed - e.g. this function might degenerate to |
| // Allocate-Copy-Free. Either way, the content of the data buffer, up to the |
| // minimum of the new and old size, is preserved. |
| // |
| // Corner cases: |
| // 1. If requested == 0, the allocator will always return a non-NULL Buffer |
| // with a non-NULL data pointer and zero capacity. |
| // 2. If minimal == 0, the allocator will always return a non-NULL Buffer |
| // with a non-NULL data pointer, possibly with zero capacity. |
| Buffer* BestEffortReallocate(size_t requested, |
| size_t minimal, |
| Buffer* buffer) { |
| DCHECK_LE(minimal, requested); |
| Buffer* result; |
| if (buffer == NULL) { |
| result = AllocateInternal(requested, minimal, this); |
| LogAllocation(requested, minimal, result); |
| return result; |
| } else { |
| result = ReallocateInternal(requested, minimal, buffer, this) ? |
| buffer : NULL; |
| LogAllocation(requested, minimal, buffer); |
| return result; |
| } |
| } |
| |
| // Called by the user when a previously allocated block needs to be resized. |
| // Equivalent to BestEffortReallocate(requested, requested, buffer). |
| Buffer* Reallocate(size_t requested, Buffer* buffer) { |
| return BestEffortReallocate(requested, requested, buffer); |
| } |
| |
| // Returns the amount of memory (in bytes) still available for this allocator. |
| // For unbounded allocators (like raw HeapBufferAllocator) this is the highest |
| // size_t value possible. |
| // TODO(user): consider making pure virtual. |
| virtual size_t Available() const { return std::numeric_limits<size_t>::max(); } |
| |
| protected: |
| friend class Buffer; |
| |
| BufferAllocator() {} |
| |
| // Expose the constructor to subclasses of BufferAllocator. |
| Buffer* CreateBuffer(void* data, |
| size_t size, |
| BufferAllocator* allocator) { |
| return new Buffer(data, size, allocator); |
| } |
| |
| // Expose Buffer::Update to subclasses of BufferAllocator. |
| void UpdateBuffer(void* new_data, size_t new_size, Buffer* buffer) { |
| buffer->Update(new_data, new_size); |
| } |
| |
| // Called by chained buffer allocators. |
| Buffer* DelegateAllocate(BufferAllocator* delegate, |
| size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) { |
| return delegate->AllocateInternal(requested, minimal, originator); |
| } |
| |
| // Called by chained buffer allocators. |
| bool DelegateReallocate(BufferAllocator* delegate, |
| size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) { |
| return delegate->ReallocateInternal(requested, minimal, buffer, originator); |
| } |
| |
| // Called by chained buffer allocators. |
| void DelegateFree(BufferAllocator* delegate, Buffer* buffer) { |
| delegate->FreeInternal(buffer); |
| } |
| |
| private: |
| // Implemented by concrete subclasses. |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) = 0; |
| |
| // Implemented by concrete subclasses. Returns false on failure. |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) = 0; |
| |
| // Implemented by concrete subclasses. |
| virtual void FreeInternal(Buffer* buffer) = 0; |
| |
| // Logs a warning message if the allocation failed or if it returned less than |
| // the required number of bytes. |
| void LogAllocation(size_t required, size_t minimal, Buffer* buffer); |
| |
| DISALLOW_COPY_AND_ASSIGN(BufferAllocator); |
| }; |
| |
| // Allocates buffers on the heap, with no memory limits. Uses standard C |
| // allocation functions (malloc, realloc, free). |
| class HeapBufferAllocator : public BufferAllocator { |
| public: |
| virtual ~HeapBufferAllocator() {} |
| |
| // Returns a singleton instance of the heap allocator. |
| static HeapBufferAllocator* Get() { |
| return Singleton<HeapBufferAllocator>::get(); |
| } |
| |
| virtual size_t Available() const OVERRIDE { |
| return std::numeric_limits<size_t>::max(); |
| } |
| |
| private: |
| // Allocates memory that is aligned to 16 way. |
| // Use if you want to boost SIMD operations on the memory area. |
| const bool aligned_mode_; |
| |
| friend class Singleton<HeapBufferAllocator>; |
| |
| // Always allocates 'requested'-sized buffer, or returns NULL on OOM. |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE; |
| |
| void* Malloc(size_t size); |
| void* Realloc(void* previousData, size_t previousSize, size_t newSize); |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE; |
| |
| HeapBufferAllocator(); |
| explicit HeapBufferAllocator(bool aligned_mode) |
| : aligned_mode_(aligned_mode) {} |
| |
| DISALLOW_COPY_AND_ASSIGN(HeapBufferAllocator); |
| }; |
| |
| // Wrapper around the delegate allocator, that clears all newly allocated |
| // (and reallocated) memory. |
| class ClearingBufferAllocator : public BufferAllocator { |
| public: |
| // Does not take ownership of the delegate. |
| explicit ClearingBufferAllocator(BufferAllocator* delegate) |
| : delegate_(delegate) {} |
| |
| virtual size_t Available() const OVERRIDE { |
| return delegate_->Available(); |
| } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE; |
| |
| BufferAllocator* delegate_; |
| DISALLOW_COPY_AND_ASSIGN(ClearingBufferAllocator); |
| }; |
| |
| // Abstract policy for modifying allocation requests - e.g. enforcing quotas. |
| class Mediator { |
| public: |
| Mediator() {} |
| virtual ~Mediator() {} |
| |
| // Called by an allocator when a allocation request is processed. |
| // Must return a value in the range [minimal, requested], or zero. Returning |
| // zero (if minimal is non-zero) indicates denial to allocate. Returning |
| // non-zero indicates that the request should be capped at that value. |
| virtual size_t Allocate(size_t requested, size_t minimal) = 0; |
| |
| // Called by an allocator when the specified amount (in bytes) is released. |
| virtual void Free(size_t amount) = 0; |
| |
| // TODO(user): consider making pure virtual. |
| virtual size_t Available() const { return std::numeric_limits<size_t>::max(); } |
| }; |
| |
| // Optionally thread-safe skeletal implementation of a 'quota' abstraction, |
| // providing methods to allocate resources against the quota, and return them. |
| template<bool thread_safe> |
| class Quota : public Mediator { |
| public: |
| explicit Quota(bool enforced) : usage_(0), enforced_(enforced) {} |
| virtual ~Quota() {} |
| |
| // Returns a value in range [minimal, requested] if not exceeding remaining |
| // quota or if the quota is not enforced (soft quota), and adjusts the usage |
| // value accordingly. Otherwise, returns zero. The semantics of 'remaining |
| // quota' are defined by subclasses (that must supply GetQuotaInternal() |
| // method). |
| virtual size_t Allocate(size_t requested, size_t minimal) OVERRIDE; |
| |
| virtual void Free(size_t amount) OVERRIDE; |
| |
| // Returns memory still available in the quota. For unenforced Quota objects, |
| // you are still able to perform _minimal_ allocations when the available |
| // quota is 0 (or less than "minimal" param). |
| virtual size_t Available() const OVERRIDE { |
| lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex()); |
| const size_t quota = GetQuotaInternal(); |
| return (usage_ >= quota) ? 0 : (quota - usage_); |
| } |
| |
| // Returns the current quota value. |
| size_t GetQuota() const; |
| |
| // Returns the current usage value, defined as a sum of all the values |
| // granted by calls to Allocate, less these released via calls to Free. |
| size_t GetUsage() const; |
| |
| bool enforced() const { |
| return enforced_; |
| } |
| |
| protected: |
| // Overridden by specific implementations, to define semantics of |
| // the quota, i.e. the total amount of resources that the mediator will |
| // allocate. Called directly from GetQuota that optionally provides |
| // thread safety. An 'Allocate' request will succeed if |
| // GetUsage() + minimal <= GetQuota() or if the quota is not enforced (soft |
| // quota). |
| virtual size_t GetQuotaInternal() const = 0; |
| |
| Mutex* mutex() const { return thread_safe ? &mutex_ : NULL; } |
| |
| private: |
| mutable Mutex mutex_; |
| size_t usage_; |
| bool enforced_; |
| DISALLOW_COPY_AND_ASSIGN(Quota); |
| }; |
| |
| // Optionally thread-safe static quota implementation (where quota is explicitly |
| // set to a concrete numeric value). |
| template<bool thread_safe> |
| class StaticQuota : public Quota<thread_safe> { |
| public: |
| explicit StaticQuota(size_t quota) |
| : Quota<thread_safe>(true) { |
| SetQuota(quota); |
| } |
| StaticQuota(size_t quota, bool enforced) |
| : Quota<thread_safe>(enforced) { |
| SetQuota(quota); |
| } |
| virtual ~StaticQuota() {} |
| |
| // Sets quota to the new value. |
| void SetQuota(const size_t quota); |
| |
| protected: |
| virtual size_t GetQuotaInternal() const { return quota_; } |
| |
| private: |
| size_t quota_; |
| DISALLOW_COPY_AND_ASSIGN(StaticQuota); |
| }; |
| |
| // Places resource limits on another allocator, using the specified Mediator |
| // (e.g. quota) implementation. |
| // |
| // If the mediator and the delegate allocator are thread-safe, this allocator |
| // is also thread-safe, to the extent that it will not introduce any |
| // state inconsistencies. However, without additional synchronization, |
| // allocation requests are not atomic end-to-end. This way, it is deadlock- |
| // resilient (even if you have cyclic relationships between allocators) and |
| // allows better concurrency. But, it may cause over-conservative |
| // allocations under memory contention, if you have multiple levels of |
| // mediating allocators. For example, if two requests that can't both be |
| // satisfied are submitted concurrently, it may happen that one of them succeeds |
| // but gets smaller buffer allocated than it would if the requests were strictly |
| // ordered. This is usually not a problem, however, as you don't really want to |
| // operate so close to memory limits that some of your allocations can't be |
| // satisfied. If you do have a simple, cascading graph of allocators though, |
| // and want to force requests be atomic end-to-end, put a |
| // ThreadSafeBufferAllocator at the entry point. |
| class MediatingBufferAllocator : public BufferAllocator { |
| public: |
| // Does not take ownership of the delegate, nor the mediator, allowing |
| // both to be reused. |
| MediatingBufferAllocator(BufferAllocator* const delegate, |
| Mediator* const mediator) |
| : delegate_(delegate), |
| mediator_(mediator) {} |
| |
| virtual ~MediatingBufferAllocator() {} |
| |
| virtual size_t Available() const OVERRIDE { |
| return std::min(delegate_->Available(), mediator_->Available()); |
| } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE; |
| |
| BufferAllocator* delegate_; |
| Mediator* const mediator_; |
| }; |
| |
| // Convenience non-thread-safe static memory bounds enforcer. |
| // Combines MediatingBufferAllocator with a StaticQuota. |
| class MemoryLimit : public BufferAllocator { |
| public: |
| // Creates a limiter based on the default, heap allocator. Quota is infinite. |
| // (Can be set using SetQuota). |
| MemoryLimit() |
| : quota_(std::numeric_limits<size_t>::max()), |
| allocator_(HeapBufferAllocator::Get(), "a_) {} |
| |
| // Creates a limiter based on the default, heap allocator. |
| explicit MemoryLimit(size_t quota) |
| : quota_(quota), |
| allocator_(HeapBufferAllocator::Get(), "a_) {} |
| |
| // Creates a limiter relaying to the specified delegate allocator. |
| MemoryLimit(size_t quota, BufferAllocator* const delegate) |
| : quota_(quota), |
| allocator_(delegate, "a_) {} |
| |
| // Creates a (possibly non-enforcing) limiter relaying to the specified |
| // delegate allocator. |
| MemoryLimit(size_t quota, bool enforced, BufferAllocator* const delegate) |
| : quota_(quota, enforced), |
| allocator_(delegate, "a_) {} |
| |
| virtual ~MemoryLimit() {} |
| |
| virtual size_t Available() const OVERRIDE { |
| return allocator_.Available(); |
| } |
| |
| size_t GetQuota() const { return quota_.GetQuota(); } |
| size_t GetUsage() const { return quota_.GetUsage(); } |
| void SetQuota(const size_t quota) { quota_.SetQuota(quota); } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE { |
| return DelegateAllocate(&allocator_, requested, minimal, originator); |
| } |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE { |
| return DelegateReallocate(&allocator_, requested, minimal, buffer, |
| originator); |
| } |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE { |
| DelegateFree(&allocator_, buffer); |
| } |
| |
| StaticQuota<false> quota_; |
| MediatingBufferAllocator allocator_; |
| }; |
| |
| // An allocator that allows to bypass the (potential) soft quota below for a |
| // given amount of memory usage. The goal is to make the allocation methods and |
| // Available() work as if the allocator below had at least bypassed_amount of |
| // soft quota. Of course this class doesn't allow to exceed the hard quota. |
| class SoftQuotaBypassingBufferAllocator : public BufferAllocator { |
| public: |
| SoftQuotaBypassingBufferAllocator(BufferAllocator* allocator, |
| size_t bypassed_amount) |
| : allocator_(std::numeric_limits<size_t>::max(), allocator), |
| bypassed_amount_(bypassed_amount) {} |
| |
| virtual size_t Available() const OVERRIDE { |
| const size_t usage = allocator_.GetUsage(); |
| size_t available = allocator_.Available(); |
| if (bypassed_amount_ > usage) { |
| available = std::max(bypassed_amount_ - usage, available); |
| } |
| return available; |
| } |
| |
| private: |
| // Calculates how much to increase the minimal parameter to allocate more |
| // aggressively in the underlying allocator. This is to avoid getting only |
| // very small allocations when we exceed the soft quota below. The request |
| // with increased minimal size is more likely to fail because of exceeding |
| // hard quota, so we also fall back to the original minimal size. |
| size_t AdjustMinimal(size_t requested, size_t minimal) const { |
| return std::min(requested, std::max(minimal, Available())); |
| } |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE { |
| // Try increasing the "minimal" parameter to allocate more aggresively |
| // within the bypassed amount of soft quota. |
| Buffer* result = DelegateAllocate(&allocator_, |
| requested, |
| AdjustMinimal(requested, minimal), |
| originator); |
| if (result != NULL) { |
| return result; |
| } else { |
| return DelegateAllocate(&allocator_, |
| requested, |
| minimal, |
| originator); |
| } |
| } |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE { |
| if (DelegateReallocate(&allocator_, |
| requested, |
| AdjustMinimal(requested, minimal), |
| buffer, |
| originator)) { |
| return true; |
| } else { |
| return DelegateReallocate(&allocator_, |
| requested, |
| minimal, |
| buffer, |
| originator); |
| } |
| } |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE { |
| DelegateFree(&allocator_, buffer); |
| } |
| |
| // Using MemoryLimit with "infinite" limit to get GetUsage(). |
| MemoryLimit allocator_; |
| size_t bypassed_amount_; |
| }; |
| |
| // An interface for a MemoryStatisticsCollector - an object which collects |
| // information about the memory usage of the allocator. The collector will |
| // gather statistics about memory usage based on information received from the |
| // allocator. |
| class MemoryStatisticsCollectorInterface { |
| public: |
| MemoryStatisticsCollectorInterface() {} |
| |
| virtual ~MemoryStatisticsCollectorInterface() {} |
| |
| // Informs the collector that the allocator granted bytes memory. Note that in |
| // the case of reallocation bytes should be the increase in total memory |
| // usage, not the total size of the buffer after reallocation. |
| virtual void AllocatedMemoryBytes(size_t bytes) = 0; |
| |
| // Informs the collector that the allocator received a request for at least |
| // bytes memory, and rejected it (meaning that it granted nothing). |
| virtual void RefusedMemoryBytes(size_t bytes) = 0; |
| |
| // Informs the collector that bytes memory have been released to the |
| // allocator. |
| virtual void FreedMemoryBytes(size_t bytes) = 0; |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(MemoryStatisticsCollectorInterface); |
| }; |
| |
| class MemoryStatisticsCollectingBufferAllocator : public BufferAllocator { |
| public: |
| // Does not take ownership of the delegate. |
| // Takes ownership of memory_stats_collector. |
| MemoryStatisticsCollectingBufferAllocator( |
| BufferAllocator* const delegate, |
| MemoryStatisticsCollectorInterface* const memory_stats_collector) |
| : delegate_(delegate), |
| memory_stats_collector_(memory_stats_collector) {} |
| |
| virtual ~MemoryStatisticsCollectingBufferAllocator() {} |
| |
| virtual size_t Available() const OVERRIDE { |
| return delegate_->Available(); |
| } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE; |
| |
| BufferAllocator* delegate_; |
| std::unique_ptr<MemoryStatisticsCollectorInterface> memory_stats_collector_; |
| }; |
| |
| // BufferAllocator which uses MemTracker to keep track of and optionally |
| // (if a limit is set on the MemTracker) regulate memory consumption. |
| class MemoryTrackingBufferAllocator : public BufferAllocator { |
| public: |
| // Does not take ownership of the delegate. The delegate must remain |
| // valid for the lifetime of this allocator. Increments reference |
| // count for 'mem_tracker'. |
| // If 'mem_tracker' has a limit and 'enforce_limit' is true, then |
| // the classes calling this buffer allocator (whether directly, or |
| // through an Arena) must be able to handle the case when allocation |
| // fails. If 'enforce_limit' is false (this is the default), then |
| // allocation will always succeed. |
| MemoryTrackingBufferAllocator(BufferAllocator* const delegate, |
| std::shared_ptr<MemTracker> mem_tracker, |
| bool enforce_limit = false) |
| : delegate_(delegate), |
| mem_tracker_(std::move(mem_tracker)), |
| enforce_limit_(enforce_limit) {} |
| |
| virtual ~MemoryTrackingBufferAllocator() {} |
| |
| // If enforce limit is false, this always returns maximum possible value |
| // for int64_t (std::numeric_limits<int64_t>::max()). Otherwise, this |
| // is equivalent to calling mem_tracker_->SpareCapacity(); |
| virtual size_t Available() const OVERRIDE; |
| |
| private: |
| |
| // If enforce_limit_ is true, this is equivalent to calling |
| // mem_tracker_->TryConsume(bytes). If enforce_limit_ is false and |
| // mem_tracker_->TryConsume(bytes) is false, we call |
| // mem_tracker_->Consume(bytes) and always return true. |
| bool TryConsume(int64_t bytes); |
| |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE; |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE; |
| |
| BufferAllocator* delegate_; |
| std::shared_ptr<MemTracker> mem_tracker_; |
| bool enforce_limit_; |
| }; |
| |
| // Synchronizes access to AllocateInternal and FreeInternal, and exposes the |
| // mutex for use by subclasses. Allocation requests performed through this |
| // allocator are atomic end-to-end. Template parameter DelegateAllocatorType |
| // allows to specify a subclass of BufferAllocator for the delegate, to allow |
| // subclasses of ThreadSafeBufferAllocator to access additional methods provided |
| // by the allocator subclass. If this is not needed, it can be set to |
| // BufferAllocator. |
| template <class DelegateAllocatorType> |
| class ThreadSafeBufferAllocator : public BufferAllocator { |
| public: |
| // Does not take ownership of the delegate. |
| explicit ThreadSafeBufferAllocator(DelegateAllocatorType* delegate) |
| : delegate_(delegate) {} |
| virtual ~ThreadSafeBufferAllocator() {} |
| |
| virtual size_t Available() const OVERRIDE { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return delegate()->Available(); |
| } |
| |
| protected: |
| Mutex* mutex() const { return &mutex_; } |
| // Expose the delegate allocator, with the precise type of the allocator |
| // specified by the template parameter. The delegate() methods themselves |
| // don't give any thread-safety guarantees. Protect all uses taking the Mutex |
| // exposed by the mutex() method. |
| DelegateAllocatorType* delegate() { return delegate_; } |
| const DelegateAllocatorType* delegate() const { return delegate_; } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return DelegateAllocate(delegate(), requested, minimal, originator); |
| } |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return DelegateReallocate(delegate(), requested, minimal, buffer, |
| originator); |
| } |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| DelegateFree(delegate(), buffer); |
| } |
| |
| DelegateAllocatorType* delegate_; |
| mutable Mutex mutex_; |
| DISALLOW_COPY_AND_ASSIGN(ThreadSafeBufferAllocator); |
| }; |
| |
| // A version of ThreadSafeBufferAllocator that owns the supplied delegate |
| // allocator. |
| template <class DelegateAllocatorType> |
| class OwningThreadSafeBufferAllocator |
| : public ThreadSafeBufferAllocator<DelegateAllocatorType> { |
| public: |
| explicit OwningThreadSafeBufferAllocator(DelegateAllocatorType* delegate) |
| : ThreadSafeBufferAllocator<DelegateAllocatorType>(delegate), |
| delegate_owned_(delegate) {} |
| virtual ~OwningThreadSafeBufferAllocator() {} |
| |
| private: |
| std::unique_ptr<DelegateAllocatorType> delegate_owned_; |
| }; |
| |
| class ThreadSafeMemoryLimit |
| : public OwningThreadSafeBufferAllocator<MemoryLimit> { |
| public: |
| ThreadSafeMemoryLimit(size_t quota, bool enforced, |
| BufferAllocator* const delegate) |
| : OwningThreadSafeBufferAllocator<MemoryLimit>( |
| new MemoryLimit(quota, enforced, delegate)) {} |
| virtual ~ThreadSafeMemoryLimit() {} |
| |
| size_t GetQuota() const { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return delegate()->GetQuota(); |
| } |
| size_t GetUsage() const { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return delegate()->GetUsage(); |
| } |
| void SetQuota(const size_t quota) { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| delegate()->SetQuota(quota); |
| } |
| }; |
| |
| // A BufferAllocator that can be given ownership of many objects of given type. |
| // These objects will then be deleted when the buffer allocator is destroyed. |
| // The objects added last are deleted first (LIFO). |
| template <typename OwnedType> |
| class OwningBufferAllocator : public BufferAllocator { |
| public: |
| // Doesn't take ownership of delegate. |
| explicit OwningBufferAllocator(BufferAllocator* const delegate) |
| : delegate_(delegate) {} |
| |
| virtual ~OwningBufferAllocator() { |
| // Delete elements starting from the end. |
| while (!owned_.empty()) { |
| OwnedType* p = owned_.back(); |
| owned_.pop_back(); |
| delete p; |
| } |
| } |
| |
| // Add to the collection of objects owned by this allocator. The object added |
| // last is deleted first. |
| OwningBufferAllocator* Add(OwnedType* p) { |
| owned_.push_back(p); |
| return this; |
| } |
| |
| virtual size_t Available() const OVERRIDE { |
| return delegate_->Available(); |
| } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE { |
| return DelegateAllocate(delegate_, requested, minimal, originator); |
| } |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t minimal, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE { |
| return DelegateReallocate(delegate_, requested, minimal, buffer, |
| originator); |
| } |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE { |
| DelegateFree(delegate_, buffer); |
| } |
| |
| // Not using PointerVector here because we want to guarantee certain order of |
| // deleting elements (starting from the ones added last). |
| std::vector<OwnedType*> owned_; |
| BufferAllocator* delegate_; |
| }; |
| |
| // Buffer allocator that tries to guarantee the exact and consistent amount |
| // of memory. Uses hard MemoryLimit to enforce the upper bound but also |
| // guarantees consistent allocations by ignoring minimal requested amounts and |
| // always returning the full amount of memory requested if available. |
| // Allocations will fail if the memory requested would exceed the quota or if |
| // the underlying allocator fails to provide the memory. |
| class GuaranteeMemory : public BufferAllocator { |
| public: |
| // Doesn't take ownership of 'delegate'. |
| GuaranteeMemory(size_t memory_quota, |
| BufferAllocator* delegate) |
| : limit_(memory_quota, true, delegate), |
| memory_guarantee_(memory_quota) {} |
| |
| virtual size_t Available() const OVERRIDE { |
| return memory_guarantee_ - limit_.GetUsage(); |
| } |
| |
| private: |
| virtual Buffer* AllocateInternal(size_t requested, |
| size_t minimal, |
| BufferAllocator* originator) OVERRIDE { |
| if (requested > Available()) { |
| return NULL; |
| } else { |
| return DelegateAllocate(&limit_, requested, requested, originator); |
| } |
| } |
| |
| virtual bool ReallocateInternal(size_t requested, |
| size_t /* minimal */, |
| Buffer* buffer, |
| BufferAllocator* originator) OVERRIDE { |
| int64_t additional_memory = requested - (buffer != NULL ? buffer->size() : 0); |
| return additional_memory <= static_cast<int64_t>(Available()) |
| && DelegateReallocate(&limit_, requested, requested, |
| buffer, originator); |
| } |
| |
| virtual void FreeInternal(Buffer* buffer) OVERRIDE { |
| DelegateFree(&limit_, buffer); |
| } |
| |
| MemoryLimit limit_; |
| size_t memory_guarantee_; |
| DISALLOW_COPY_AND_ASSIGN(GuaranteeMemory); |
| }; |
| |
| // Implementation of inline and template methods |
| |
| template<bool thread_safe> |
| size_t Quota<thread_safe>::Allocate(const size_t requested, |
| const size_t minimal) { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| DCHECK_LE(minimal, requested) |
| << "\"minimal\" shouldn't be bigger than \"requested\""; |
| const size_t quota = GetQuotaInternal(); |
| size_t allocation; |
| if (usage_ > quota || minimal > quota - usage_) { |
| // OOQ (Out of quota). |
| if (!enforced() && minimal <= std::numeric_limits<size_t>::max() - usage_) { |
| // The quota is unenforced and the value of "minimal" won't cause an |
| // overflow. Perform a minimal allocation. |
| allocation = minimal; |
| } else { |
| allocation = 0; |
| } |
| LOG(WARNING) << "Out of quota. Requested: " << requested |
| << " bytes, or at least minimal: " << minimal |
| << ". Current quota value is: " << quota |
| << " while current usage is: " << usage_ |
| << ". The quota is " << (enforced() ? "" : "not ") |
| << "enforced. " |
| << ((allocation == 0) ? "Did not allocate any memory." |
| : "Allocated the minimal value requested."); |
| } else { |
| allocation = std::min(requested, quota - usage_); |
| } |
| usage_ += allocation; |
| return allocation; |
| } |
| |
| template<bool thread_safe> |
| void Quota<thread_safe>::Free(size_t amount) { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| usage_ -= amount; |
| // threads allocate/free memory concurrently via the same Quota object that is |
| // not protected with a mutex (thread_safe == false). |
| if (usage_ > (std::numeric_limits<size_t>::max() - (1 << 28))) { |
| LOG(ERROR) << "Suspiciously big usage_ value: " << usage_ |
| << " (could be a result size_t wrapping around below 0, " |
| << "for example as a result of race condition)."; |
| } |
| } |
| |
| template<bool thread_safe> |
| size_t Quota<thread_safe>::GetQuota() const { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return GetQuotaInternal(); |
| } |
| |
| template<bool thread_safe> |
| size_t Quota<thread_safe>::GetUsage() const { |
| lock_guard_maybe<Mutex> lock(mutex()); |
| return usage_; |
| } |
| |
| template<bool thread_safe> |
| void StaticQuota<thread_safe>::SetQuota(const size_t quota) { |
| lock_guard_maybe<Mutex> lock(Quota<thread_safe>::mutex()); |
| quota_ = quota; |
| } |
| |
| } // namespace kudu |