| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #ifndef IMPALA_RUNTIME_MEM_TRACKER_H |
| #define IMPALA_RUNTIME_MEM_TRACKER_H |
| |
| #include <stdint.h> |
| #include <map> |
| #include <memory> |
| #include <queue> |
| #include <vector> |
| #include <boost/thread/mutex.hpp> |
| #include <boost/unordered_map.hpp> |
| |
| #include "common/logging.h" |
| #include "common/atomic.h" |
| #include "runtime/mem-tracker-types.h" |
| #include "util/debug-util.h" |
| #include "util/internal-queue.h" |
| #include "util/metrics-fwd.h" |
| #include "util/runtime-profile-counters.h" |
| #include "util/spinlock.h" |
| |
| #include "gen-cpp/Types_types.h" // for TUniqueId |
| |
| namespace impala { |
| |
| class ObjectPool; |
| struct ReservationTrackerCounters; |
| class TQueryOptions; |
| |
| /// A MemTracker tracks memory consumption; it contains an optional limit |
| /// and can be arranged into a tree structure such that the consumption tracked |
| /// by a MemTracker is also tracked by its ancestors. |
| /// |
| /// A MemTracker has a hard and a soft limit derived from the limit. If the hard limit |
| /// is exceeded, all memory allocations and queries should fail until we are under the |
| /// limit again. The soft limit can be exceeded without causing query failures, but |
| /// consumers of memory that can tolerate running without more memory should not allocate |
| /// memory in excess of the soft limit. |
| /// |
| /// We use a five-level hierarchy of mem trackers: process, pool, query, fragment |
| /// instance. Specific parts of the fragment (exec nodes, sinks, etc) will add a |
| /// fifth level when they are initialized. This function also initializes a user |
| /// function mem tracker (in the fifth level). |
| /// |
| /// By default, memory consumption is tracked via calls to Consume()/Release(), either to |
| /// the tracker itself or to one of its descendents. Alternatively, a consumption metric |
| /// can be specified, and then the metric's value is used as the consumption rather than |
| /// the tally maintained by Consume() and Release(). A tcmalloc metric is used to track |
| /// process memory consumption, since the process memory usage may be higher than the |
| /// computed total memory (tcmalloc does not release deallocated memory immediately). |
| /// Other consumption metrics are used in trackers below the process level to account |
| /// for memory (such as free buffer pool buffers) that is not tracked by Consume() and |
| /// Release(). |
| /// |
| /// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is |
| /// reached. If LimitExceeded() is called and the limit is exceeded, it will first call |
| /// the GcFunctions to try to free memory and recheck the limit. For example, the process |
| /// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so |
| /// this will be called before the process limit is reported as exceeded. GcFunctions are |
| /// called in the order they are added, so expensive functions should be added last. |
| /// GcFunctions are called with a global lock held, so should be non-blocking and not |
| /// call back into MemTrackers, except to release memory. |
| // |
| /// This class is thread-safe. |
| class MemTracker { |
| public: |
| /// 'byte_limit' < 0 means no limit |
| /// 'label' is the label used in the usage string (LogUsage()) |
| /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be |
| /// included |
| /// in LogUsage() output if consumption is 0. |
| MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(), |
| MemTracker* parent = nullptr, bool log_usage_if_zero = true); |
| |
| /// C'tor for tracker for which consumption counter is created as part of a profile. |
| /// The counter is created with name COUNTER_NAME. |
| MemTracker(RuntimeProfile* profile, int64_t byte_limit, |
| const std::string& label = std::string(), MemTracker* parent = nullptr); |
| |
| /// C'tor for tracker that uses consumption_metric as the consumption value. |
| /// Consume()/Release() can still be called. This is used for the root process tracker |
| /// (if 'parent' is NULL). It is also to report on other categories of memory under the |
| /// process tracker, e.g. buffer pool free buffers (if 'parent - non-NULL). |
| MemTracker(IntGauge* consumption_metric, int64_t byte_limit = -1, |
| const std::string& label = std::string(), MemTracker* parent = nullptr); |
| |
| ~MemTracker(); |
| |
| /// Closes this MemTracker. After closing it is invalid to consume memory on this |
| /// tracker and the tracker's consumption counter (which may be owned by a |
| /// RuntimeProfile, not this MemTracker) can be safely destroyed. MemTrackers without |
| /// consumption metrics in the context of a daemon must always be closed. |
| /// Idempotent: calling multiple times has no effect. |
| void Close(); |
| |
| /// Closes the MemTracker and deregisters it from its parent. Can be called before |
| /// destruction to prevent other threads from getting a reference to the MemTracker |
| /// via its parent. Only used to deregister the query-level MemTracker from the |
| /// global hierarchy. |
| void CloseAndUnregisterFromParent(); |
| |
| /// Include counters from a ReservationTracker in logs and other diagnostics. |
| /// The counters should be owned by the fragment's RuntimeProfile. |
| void EnableReservationReporting(const ReservationTrackerCounters& counters); |
| |
| /// Construct a MemTracker object for query 'id' with 'mem_limit' as the memory limit. |
| /// The MemTracker is a child of the request pool MemTracker for 'pool_name', which is |
| /// created if needed. The returned MemTracker is owned by 'obj_pool'. |
| static MemTracker* CreateQueryMemTracker(const TUniqueId& id, int64_t mem_limit, |
| const std::string& pool_name, ObjectPool* obj_pool); |
| |
| /// Increases consumption of this tracker and its ancestors by 'bytes'. |
| void Consume(int64_t bytes) { |
| DCHECK(!closed_) << label_; |
| if (bytes <= 0) { |
| if (bytes < 0) Release(-bytes); |
| return; |
| } |
| |
| if (consumption_metric_ != nullptr) { |
| RefreshConsumptionFromMetric(); |
| return; |
| } |
| for (MemTracker* tracker : all_trackers_) { |
| tracker->consumption_->Add(bytes); |
| if (tracker->consumption_metric_ == nullptr) { |
| DCHECK_GE(tracker->consumption_->current_value(), 0); |
| } |
| } |
| } |
| |
| /// Increases/Decreases the consumption of this tracker and the ancestors up to (but |
| /// not including) end_tracker. This is useful if we want to move tracking between |
| /// trackers that share a common (i.e. end_tracker) ancestor. This happens when we want |
| /// to update tracking on a particular mem tracker but the consumption against |
| /// the limit recorded in one of its ancestors already happened. |
| void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) { |
| DCHECK(!closed_) << label_; |
| DCHECK(consumption_metric_ == nullptr) << "Should not be called on root."; |
| for (MemTracker* tracker : all_trackers_) { |
| if (tracker == end_tracker) return; |
| DCHECK(!tracker->has_limit()); |
| DCHECK(!tracker->closed_) << tracker->label_; |
| tracker->consumption_->Add(bytes); |
| } |
| DCHECK(false) << "end_tracker is not an ancestor"; |
| } |
| |
| void ReleaseLocal(int64_t bytes, MemTracker* end_tracker) { |
| ConsumeLocal(-bytes, end_tracker); |
| } |
| |
| /// Increases consumption of this tracker and its ancestors by 'bytes' only if |
| /// they can all consume 'bytes' without exceeding limit (hard or soft) specified |
| /// by 'mode'. If any limit would be exceed, no MemTrackers are updated. If the |
| /// caller can tolerate an allocation failing, it should set mode=SOFT so that |
| /// other callers that may not tolerate allocation failures have a better chance |
| /// of success. Returns true if the consumption was successfully updated. |
| WARN_UNUSED_RESULT |
| bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) { |
| DCHECK(!closed_) << label_; |
| if (consumption_metric_ != nullptr) RefreshConsumptionFromMetric(); |
| if (UNLIKELY(bytes <= 0)) return true; |
| int i; |
| // Walk the tracker tree top-down. |
| for (i = all_trackers_.size() - 1; i >= 0; --i) { |
| MemTracker* tracker = all_trackers_[i]; |
| const int64_t limit = tracker->GetLimit(mode); |
| if (limit < 0) { |
| tracker->consumption_->Add(bytes); // No limit at this tracker. |
| } else { |
| // If TryConsume fails, we can try to GC, but we may need to try several times if |
| // there are concurrent consumers because we don't take a lock before trying to |
| // update consumption_. |
| while (true) { |
| if (LIKELY(tracker->consumption_->TryAdd(bytes, limit))) break; |
| |
| VLOG_RPC << "TryConsume failed, bytes=" << bytes |
| << " consumption=" << tracker->consumption_->current_value() |
| << " limit=" << limit << " attempting to GC"; |
| if (UNLIKELY(tracker->GcMemory(limit - bytes))) { |
| DCHECK_GE(i, 0); |
| // Failed for this mem tracker. Roll back the ones that succeeded. |
| for (int j = all_trackers_.size() - 1; j > i; --j) { |
| all_trackers_[j]->consumption_->Add(-bytes); |
| } |
| return false; |
| } |
| VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes |
| << " consumption=" << tracker->consumption_->current_value() |
| << " limit=" << limit; |
| } |
| } |
| } |
| // Everyone succeeded, return. |
| DCHECK_EQ(i, -1); |
| return true; |
| } |
| |
| /// Decreases consumption of this tracker and its ancestors by 'bytes'. |
| void Release(int64_t bytes) { |
| DCHECK(!closed_) << label_; |
| if (bytes <= 0) { |
| if (bytes < 0) Consume(-bytes); |
| return; |
| } |
| |
| if (consumption_metric_ != nullptr) { |
| RefreshConsumptionFromMetric(); |
| return; |
| } |
| for (MemTracker* tracker : all_trackers_) { |
| tracker->consumption_->Add(-bytes); |
| /// If a UDF calls FunctionContext::TrackAllocation() but allocates less than the |
| /// reported amount, the subsequent call to FunctionContext::Free() may cause the |
| /// process mem tracker to go negative until it is synced back to the tcmalloc |
| /// metric. Don't blow up in this case. (Note that this doesn't affect non-process |
| /// trackers since we can enforce that the reported memory usage is internally |
| /// consistent.) |
| if (tracker->consumption_metric_ == nullptr) { |
| DCHECK_GE(tracker->consumption_->current_value(), 0) |
| << std::endl |
| << tracker->LogUsage(UNLIMITED_DEPTH); |
| } |
| } |
| } |
| |
| /// Transfer 'bytes' of consumption from this tracker to 'dst', updating |
| /// all ancestors up to the first shared ancestor. Must not be used if |
| /// 'dst' has a limit, or an ancestor with a limit, that is not a common |
| /// ancestor with the tracker, because this does not check memory limits. |
| void TransferTo(MemTracker* dst, int64_t bytes); |
| |
| /// Returns true if a valid limit of this tracker or one of its ancestors is |
| /// exceeded. |
| bool AnyLimitExceeded(MemLimit mode) { |
| for (MemTracker* tracker : limit_trackers_) { |
| if (tracker->LimitExceeded(mode)) return true; |
| } |
| return false; |
| } |
| |
| /// If this tracker has a limit, checks the limit and attempts to free up some memory if |
| /// the hard limit is exceeded by calling any added GC functions. Returns true if the |
| /// limit is exceeded after calling the GC functions. Returns false if there is no limit |
| /// or consumption is under the limit. |
| bool LimitExceeded(MemLimit mode) { |
| if (UNLIKELY(CheckLimitExceeded(mode))) return LimitExceededSlow(mode); |
| return false; |
| } |
| |
| /// Returns the maximum consumption that can be made without exceeding the limit on |
| /// this tracker or any of its parents. Returns int64_t::max() if there are no |
| /// limits and a negative value if any limit is already exceeded. |
| int64_t SpareCapacity(MemLimit mode) const; |
| |
| /// Refresh the memory consumption value from the consumption metric. Only valid to |
| /// call if this tracker has a consumption metric. |
| void RefreshConsumptionFromMetric(); |
| |
| int64_t limit() const { return limit_; } |
| bool has_limit() const { return limit_ >= 0; } |
| int64_t soft_limit() const { return soft_limit_; } |
| int64_t GetLimit(MemLimit mode) const { |
| if (mode == MemLimit::SOFT) return soft_limit(); |
| DCHECK_ENUM_EQ(mode, MemLimit::HARD); |
| return limit(); |
| } |
| const std::string& label() const { return label_; } |
| |
| /// Returns the lowest limit for this tracker and its ancestors. Returns |
| /// -1 if there is no limit. |
| int64_t GetLowestLimit(MemLimit mode) const; |
| |
| /// Returns the memory 'reserved' by this resource pool mem tracker, which is the sum |
| /// of the memory reserved by the queries in it (i.e. its child trackers). The mem |
| /// reserved for a query that is currently executing is its limit_, if set (which |
| /// should be the common case with admission control). Otherwise, if the query has |
| /// no limit or the query is finished executing, the current consumption is used. |
| int64_t GetPoolMemReserved(); |
| |
| /// Returns the memory consumed in bytes. |
| int64_t consumption() const { return consumption_->current_value(); } |
| |
| /// Note that if consumption_ is based on consumption_metric_, this will the max value |
| /// we've recorded in consumption(), not necessarily the highest value |
| /// consumption_metric_ has ever reached. |
| int64_t peak_consumption() const { return consumption_->value(); } |
| |
| MemTracker* parent() const { return parent_; } |
| |
| /// Signature for function that can be called to free some memory after limit is |
| /// reached. The function should try to free at least 'bytes_to_free' bytes of |
| /// memory. See the class header for further details on the expected behaviour of |
| /// these functions. |
| typedef std::function<void(int64_t bytes_to_free)> GcFunction; |
| |
| /// Add a function 'f' to be called if the limit is reached, if none of the other |
| /// previously-added GC functions were successful at freeing up enough memory. |
| /// 'f' does not need to be thread-safe as long as it is added to only one MemTracker. |
| /// Note that 'f' must be valid for the lifetime of this MemTracker. |
| void AddGcFunction(GcFunction f); |
| |
| /// Register this MemTracker's metrics. Each key will be of the form |
| /// "<prefix>.<metric name>". |
| void RegisterMetrics(MetricGroup* metrics, const std::string& prefix); |
| |
| /// Logs the usage of this tracker and optionally its children (recursively). |
| /// If 'logged_consumption' is non-NULL, sets the consumption value logged. |
| /// 'max_recursive_depth' specifies the maximum number of levels of children |
| /// to include in the dump. If it is zero, then no children are dumped. |
| /// Limiting the recursive depth reduces the cost of dumping, particularly |
| /// for the process MemTracker. |
| /// TODO: once all memory is accounted in ReservationTracker hierarchy, move |
| /// reporting there. |
| std::string LogUsage(int max_recursive_depth, |
| const std::string& prefix = "", int64_t* logged_consumption = nullptr); |
| /// Dumping the process MemTracker is expensive. Limiting the recursive depth |
| /// to two levels limits the level of detail to a one-line summary for each query |
| /// MemTracker, avoiding all MemTrackers below that level. This provides a summary |
| /// of process usage with substantially lower cost than the full dump. |
| static const int PROCESS_MEMTRACKER_LIMITED_DEPTH = 2; |
| /// Unlimited dumping is useful for query memtrackers or error conditions that |
| /// are not performance sensitive |
| static const int UNLIMITED_DEPTH = INT_MAX; |
| |
| /// Logs the usage of 'limit' number of queries based on maximum total memory |
| /// consumption. |
| std::string LogTopNQueries(int limit); |
| |
| /// Log the memory usage when memory limit is exceeded and return a status object with |
| /// details of the allocation which caused the limit to be exceeded. |
| /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If |
| /// 'failed_allocation_size' is zero, nothing about the allocation size is logged. |
| /// If 'state' is non-NULL, logs the error to 'state'. |
| Status MemLimitExceeded(RuntimeState* state, const std::string& details, |
| int64_t failed_allocation = 0) WARN_UNUSED_RESULT { |
| return MemLimitExceeded(this, state, details, failed_allocation); |
| } |
| |
| /// Makes MemLimitExceeded callable for nullptr MemTrackers. |
| static Status MemLimitExceeded(MemTracker* mtracker, RuntimeState* state, |
| const std::string& details, int64_t failed_allocation = 0) WARN_UNUSED_RESULT; |
| |
| void set_query_exec_finished() { |
| DCHECK(is_query_mem_tracker_); |
| query_exec_finished_.Store(1); |
| } |
| |
| static const std::string COUNTER_NAME; |
| |
| private: |
| friend class PoolMemTrackerRegistry; |
| |
| /// Returns true if the current memory tracker's limit is exceeded. |
| bool CheckLimitExceeded(MemLimit mode) const { |
| int64_t limit = GetLimit(mode); |
| return limit >= 0 && limit < consumption(); |
| } |
| |
| /// Slow path for LimitExceeded(). |
| bool LimitExceededSlow(MemLimit mode); |
| |
| /// If consumption is higher than max_consumption, attempts to free memory by calling |
| /// any added GC functions. Returns true if max_consumption is still exceeded. Takes |
| /// gc_lock. Updates metrics if initialized. |
| bool GcMemory(int64_t max_consumption); |
| |
| /// Walks the MemTracker hierarchy and populates all_trackers_ and |
| /// limit_trackers_ |
| void Init(); |
| |
| /// Adds tracker to child_trackers_ |
| void AddChildTracker(MemTracker* tracker); |
| |
| /// Log consumption of all the trackers provided. Returns the sum of consumption in |
| /// 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels |
| /// of children to include in the dump. If it is zero, then no children are dumped. |
| static std::string LogUsage(int max_recursive_depth, const std::string& prefix, |
| const std::list<MemTracker*>& trackers, int64_t* logged_consumption); |
| |
| /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy |
| /// and populates 'min_pq' with 'limit' number of elements (that contain state related |
| /// to query MemTrackers) based on maximum total memory consumption. |
| void GetTopNQueries( |
| std::priority_queue<pair<int64_t, string>, |
| vector<pair<int64_t, string>>, std::greater<pair<int64_t, string>>>& min_pq, |
| int limit); |
| |
| /// If an ancestor of this tracker is a query MemTracker, return that tracker. |
| /// Otherwise return NULL. |
| MemTracker* GetQueryMemTracker(); |
| |
| /// Lock to protect GcMemory(). This prevents many GCs from occurring at once. |
| boost::mutex gc_lock_; |
| |
| /// True if this is a Query MemTracker returned from CreateQueryMemTracker(). |
| bool is_query_mem_tracker_ = false; |
| |
| /// Only used if 'is_query_mem_tracker_' is true. |
| /// 0 if the query is still executing or 1 if it has finished executing. Before |
| /// it has finished executing, the tracker limit is treated as "reserved memory" |
| /// for the purpose of admission control - see GetPoolMemReserved(). |
| AtomicInt32 query_exec_finished_{0}; |
| |
| /// Only valid for MemTrackers returned from CreateQueryMemTracker() |
| TUniqueId query_id_; |
| |
| /// Only valid for MemTrackers returned from GetRequestPoolMemTracker() |
| std::string pool_name_; |
| |
| /// Hard limit on memory consumption, in bytes. May not be exceeded. If limit_ == -1, |
| /// there is no consumption limit. |
| const int64_t limit_; |
| |
| /// Soft limit on memory consumption, in bytes. Can be exceeded but callers to |
| /// TryConsume() can opt not to exceed this limit. If -1, there is no consumption limit. |
| const int64_t soft_limit_; |
| |
| std::string label_; |
| |
| /// The parent of this tracker. The pointer is never modified, even after this tracker |
| /// is unregistered. |
| MemTracker* const parent_; |
| |
| /// in bytes; not owned |
| RuntimeProfile::HighWaterMarkCounter* consumption_; |
| |
| /// holds consumption_ counter if not tied to a profile |
| RuntimeProfile::HighWaterMarkCounter local_counter_; |
| |
| /// If non-NULL, used to measure consumption (in bytes) rather than the values provided |
| /// to Consume()/Release(). Only used for the process tracker, thus parent_ should be |
| /// NULL if consumption_metric_ is set. |
| IntGauge* consumption_metric_; |
| |
| /// If non-NULL, counters from a corresponding ReservationTracker that should be |
| /// reported in logs and other diagnostics. Owned by this MemTracker. The counters |
| /// are owned by the fragment's RuntimeProfile. |
| AtomicPtr<ReservationTrackerCounters> reservation_counters_; |
| |
| std::vector<MemTracker*> all_trackers_; // this tracker plus all of its ancestors |
| std::vector<MemTracker*> limit_trackers_; // all_trackers_ with valid limits |
| |
| /// All the child trackers of this tracker. Used only for computing resource pool mem |
| /// reserved and error reporting, i.e., updating a parent tracker does not update its |
| /// children. |
| SpinLock child_trackers_lock_; |
| std::list<MemTracker*> child_trackers_; |
| |
| /// Iterator into parent_->child_trackers_ for this object. Stored to have O(1) |
| /// remove. |
| std::list<MemTracker*>::iterator child_tracker_it_; |
| |
| /// Functions to call after the limit is reached to free memory. |
| std::vector<GcFunction> gc_functions_; |
| |
| /// If false, this tracker (and its children) will not be included in LogUsage() output |
| /// if consumption is 0. |
| bool log_usage_if_zero_; |
| |
| bool closed_ = false; |
| |
| /// The number of times the GcFunctions were called. |
| IntCounter* num_gcs_metric_; |
| |
| /// The number of bytes freed by the last round of calling the GcFunctions (-1 before any |
| /// GCs are performed). |
| IntGauge* bytes_freed_by_last_gc_metric_; |
| |
| /// The number of bytes over the limit we were the last time LimitExceeded() was called |
| /// and the limit was exceeded pre-GC. -1 if there is no limit or the limit was never |
| /// exceeded. |
| IntGauge* bytes_over_limit_metric_; |
| |
| /// Metric for limit_. |
| IntGauge* limit_metric_; |
| }; |
| |
| /// Global registry for query and pool MemTrackers. Owned by ExecEnv. |
| class PoolMemTrackerRegistry { |
| public: |
| /// Returns a MemTracker object for request pool 'pool_name'. Calling this with the same |
| /// 'pool_name' will return the same MemTracker object. This is used to track the local |
| /// memory usage of all requests executing in this pool. If 'create_if_not_present' is |
| /// true, the first time this is called for a pool, a new MemTracker object is created |
| /// with the process tracker as its parent. There is no explicit per-pool byte_limit |
| /// set at any particular impalad, so newly created trackers will always have a limit |
| /// of -1. |
| MemTracker* GetRequestPoolMemTracker( |
| const std::string& pool_name, bool create_if_not_present); |
| |
| private: |
| /// All per-request pool MemTracker objects. It is assumed that request pools will live |
| /// for the entire duration of the process lifetime so MemTrackers are never removed |
| /// from this map. Protected by 'pool_to_mem_trackers_lock_' |
| typedef boost::unordered_map<std::string, std::unique_ptr<MemTracker>> PoolTrackersMap; |
| PoolTrackersMap pool_to_mem_trackers_; |
| /// IMPALA-3068: Use SpinLock instead of boost::mutex so that the lock won't |
| /// automatically destroy itself as part of process teardown, which could cause races. |
| SpinLock pool_to_mem_trackers_lock_; |
| }; |
| } |
| |
| #endif |