blob: 7e63ae12dbb540bbf40b24abeec59bee314e91d2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RUNTIME_THREAD_RESOURCE_MGR_H
#define IMPALA_RUNTIME_THREAD_RESOURCE_MGR_H
#include <stdlib.h>
#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <list>
#include "common/atomic.h"
#include "common/status.h"
namespace impala {
/// Singleton object to manage CPU (aka thread) resources for the process.
/// Implements a soft limit on the total number of threads being used across running
/// fragment instances. If there is only one fragment instance running, it can use the
/// entire pool, spinning up the maximum number of threads to saturate the
/// hardware. If there are multiple fragment instances, we try to share evenly
/// between them. Currently, the total system pool is split evenly between
/// all consumers. Each consumer gets ceil(total_system_threads / num_consumers).
//
/// Each fragment instance must register with the ThreadResourceMgr to request threads
/// (in the form of tokens). The fragment instance has required threads (it can't run
/// with fewer threads) and optional threads. If the fragment instance is running on its
/// own, it will be able to spin up more optional threads. When the system is under load,
/// the ThreadResourceMgr will stop giving out tokens for optional threads.
///
/// ThreadResourcePools should not be used for threads that are almost always idle (e.g.
/// periodic reporting threads).
/// ThreadResourcePools will temporarily go over the quota regularly and this is very
/// much by design. For example, if a fragment instance is running on its own with
/// 4 required threads and 28 optional and another fragment instance starts, the first
/// pool's quota is then cut by half (16 total) and will over time drop the optional
/// threads.
///
/// This class is thread safe.
///
/// Note: this is a fairly limited way to manage CPU consumption and has flaws, including:
/// * non-deterministic decisions about resource allocation
/// * lack of integration with admission control
/// * lack of any non-trivial policies such as hierachical limits or priorities.
class ThreadResourcePool;
class ThreadResourceMgr {
public:
/// Create a thread mgr object. If threads_quota is non-zero, it will be
/// the number of threads for the system, otherwise it will be determined
/// based on the hardware.
ThreadResourceMgr(int threads_quota = 0);
int system_threads_quota() const { return system_threads_quota_; }
/// Create a new pool and register with the thread mgr. Registering a pool
/// will update the quotas for all existing pools.
std::unique_ptr<ThreadResourcePool> CreatePool();
/// Destroy the pool and unregister with the thread mgr. This updates the quotas for
/// the remaining pools.
void DestroyPool(std::unique_ptr<ThreadResourcePool> pool);
private:
friend class ThreadResourcePool;
/// 'Optimal' number of threads for the entire process.
int system_threads_quota_;
/// Lock for the entire object. Protects all fields below. Must be acquired before
/// ThreadResourcePool::lock_ if both are held at the same time.
boost::mutex lock_;
/// Pools currently being managed
typedef std::set<ThreadResourcePool*> Pools;
Pools pools_;
/// Each pool currently gets the same share. This is the ceil of the
/// system quota divided by the number of pools.
AtomicInt32 per_pool_quota_{0};
/// Updates the per pool quota and notifies any pools that now have
/// more threads they can use. Must be called with lock_ taken.
/// If new_pool is non-null, new_pool will *not* be notified.
void UpdatePoolQuotas(ThreadResourcePool* new_pool = nullptr);
};
/// Pool abstraction for a single resource pool.
/// Note; there is no concept of hierarchy - all pools are treated equally even if
/// they belong to the same query..
class ThreadResourcePool {
public:
/// This function will be called whenever the pool has more threads it can run on.
/// This can happen on ReleaseThreadToken or if the quota for this pool increases.
/// This is a good place, for example, to wake up anything blocked on available threads.
/// This callback must not block.
/// Note that this is not called once for each available thread or even guaranteed that
/// when it is called, a thread is available (the quota could have changed again in
/// between). It is simply that something might have happened (similar to condition
/// variable semantics).
typedef boost::function<void (ThreadResourcePool*)> ThreadAvailableCb;
~ThreadResourcePool() { DCHECK(parent_ == nullptr) << "Must unregister pool"; }
/// Acquire a thread for the pool. This will always succeed; the pool will go over the
/// quota if needed. Pools should use this API to reserve threads they need in order to
/// make progress.
void AcquireThreadToken() {
int64_t num_threads = num_threads_.Add(1);
int64_t num_required = num_threads & REQUIRED_MASK;
DCHECK_LE(num_required, max_required_threads_);
}
/// Try to acquire a thread for this pool. If the pool is at the quota, this will
/// return false and the pool should not run. Pools should use this API for resources
/// they can use but don't need (e.g. extra scanner threads).
bool TryAcquireThreadToken();
/// Release a thread for the pool. This must be called once for each call to
/// AcquireThreadToken() and each successful call to TryAcquireThreadToken()
/// If the thread token is from AcquireThreadToken(), required must be true; false
/// if from TryAcquireThreadToken().
/// If 'skip_callbacks' is true, ReleaseThreadToken() will not run callbacks to find
/// a replacement for this thread. This is dangerous and can lead to underutilization
/// of the system.
void ReleaseThreadToken(bool required, bool skip_callbacks = false);
/// Register a callback to be notified when a thread is available.
/// Returns a unique id to be used when removing the callback.
/// Note: this is limited because we can't coordinate between multiple places in
/// execution that could use extra threads (e.g. do we use that thread for a
/// scanner or for a join).
int AddThreadAvailableCb(ThreadAvailableCb fn);
/// Unregister the callback corresponding to 'id'.
void RemoveThreadAvailableCb(int id);
/// Returns the number of threads that are from AcquireThreadToken.
int num_required_threads() const { return num_threads_.Load() & REQUIRED_MASK; }
/// Returns the number of thread resources returned by successful calls
/// to TryAcquireThreadToken.
int num_optional_threads() const { return num_threads_.Load() >> OPTIONAL_SHIFT; }
/// Returns the total number of thread resources for this pool
/// (i.e. num_optional_threads + num_required_threads).
int64_t num_threads() const {
return num_required_threads() + num_optional_threads();
}
/// Returns true if the number of optional threads has now exceeded the quota.
bool optional_exceeded() {
// Cache this so optional/required are computed based on the same value.
int64_t num_threads = num_threads_.Load();
int64_t optional_threads = num_threads >> OPTIONAL_SHIFT;
int64_t required_threads = num_threads & REQUIRED_MASK;
return optional_threads + required_threads > quota();
}
/// Returns the number of optional threads that can still be used.
int num_available_threads() const {
return std::max(0, quota() - static_cast<int>(num_threads()));
}
/// Returns the quota for this pool. Note this changes dynamically based on the global
/// number of registered resource pools.
int quota() const { return parent_->per_pool_quota_.Load(); }
/// Set the maximum number of required threads that will be running at one time.
/// The caller should not create more required threads than this, otherwise this
/// will DCHECK. Not thread-safe.
void set_max_required_threads(int max_required_threads) {
max_required_threads_ = max_required_threads;
}
private:
friend class ThreadResourceMgr;
/// Mask to extract required threads from 'num_threads_'.
static constexpr int64_t REQUIRED_MASK = 0xFFFFFFFF;
/// Shift to extract optional threads from 'num_threads_'.
static constexpr int OPTIONAL_SHIFT = 32;
ThreadResourcePool(ThreadResourceMgr* parent);
/// Invoke registered callbacks in round-robin manner until the quota is exhausted.
void InvokeCallbacks();
/// The parent resource manager. Set to NULL when unregistered.
ThreadResourceMgr* parent_;
/// Maximum number of required threads that should be running at one time. DCHECKs
/// if this is exceeded.
int64_t max_required_threads_ = std::numeric_limits<int32_t>::max();
/// A single 64 bit value to store both the number of optional and required threads.
/// This is combined to allow atomic compare-and-swap of both fields. The number of
/// required threads is the lower 32 bits and the number of optional threads is the
/// upper 32 bits.
AtomicInt64 num_threads_{0};
/// Lock for the fields below. This lock is taken when the callback function is called.
/// Must be acquired after ThreadResourceMgr::lock_ if both are held at the same time.
boost::mutex lock_;
/// A vector of registered callback functions. Entries will be set to "empty" function
/// objects, which can be constructed with the default ThreadAvailableCb() constructor,
/// when the function is unregistered.
std::vector<ThreadAvailableCb> thread_callbacks_;
/// The number of registered callbacks (i.e. the number of non-NULL entries in
/// 'thread_callbacks_'). Must hold 'lock_' to write, but can read without holding
/// 'lock_'.
AtomicInt32 num_callbacks_{0};
/// The index of the next callback to invoke in 'thread_callbacks_'. Protected by
/// 'lock_'.
int next_callback_idx_ = 0;
};
} // namespace impala
#endif