blob: 7744174ec86635720867972db6fd676ff5e25af1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*!
* \file thread_pool.cc
* \brief Threadpool for multi-threading runtime.
*/
#include <dmlc/thread_local.h>
#include <tvm/runtime/c_backend_api.h>
#include <tvm/runtime/c_runtime_api.h>
#include <tvm/runtime/container/array.h>
#include <tvm/runtime/logging.h>
#include <tvm/runtime/packed_func.h>
#include <tvm/runtime/registry.h>
#include <tvm/runtime/threading_backend.h>
#if TVM_THREADPOOL_USE_OPENMP
#include <omp.h>
#endif
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstring>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include "../support/utils.h"
const constexpr int kL1CacheBytes = 64;
namespace tvm {
namespace runtime {
namespace {
using support::IsNumber;
constexpr uint32_t kDefaultSpinCount = 300000;
uint32_t GetSpinCount() {
const char* val = getenv("TVM_THREAD_POOL_SPIN_COUNT");
if (!val) {
return kDefaultSpinCount;
}
return atoi(val);
}
} // namespace
// stride in the page, fit to cache line.
constexpr int kSyncStride = 64 / sizeof(std::atomic<int>);
/*!
* \brief Thread local main environment.
*/
class ParallelLauncher {
public:
// Reset the task request.
void Init(FTVMParallelLambda flambda, void* cdata, int num_task, bool need_sync) {
num_pending_.store(num_task);
this->cdata = cdata;
this->flambda = flambda;
this->env.num_task = num_task;
has_error_.store(false);
// reshape
if (static_cast<size_t>(num_task) > par_errors_.size()) {
par_errors_.resize(num_task + 1);
if (need_sync) {
delete[] sync_counter_;
sync_counter_ = new std::atomic<int>[num_task * kSyncStride];
}
}
if (need_sync) {
for (int i = 0; i < num_task; ++i) {
sync_counter_[i * kSyncStride].store(0, std::memory_order_relaxed);
}
this->env.sync_handle = sync_counter_;
} else {
this->env.sync_handle = nullptr;
}
}
~ParallelLauncher() { delete[] sync_counter_; }
// Wait n jobs to finish
int WaitForJobs() {
while (num_pending_.load() != 0) {
tvm::runtime::threading::Yield();
}
if (!has_error_.load()) return 0;
std::ostringstream os;
for (size_t i = 0; i < par_errors_.size(); ++i) {
if (par_errors_[i].length() != 0) {
os << "Task " << i << " error: " << par_errors_[i] << '\n';
par_errors_[i].clear();
}
}
TVMAPISetLastError(os.str().c_str());
return -1;
}
// Signal that one job has finished.
void SignalJobError(int task_id) {
num_pending_.fetch_sub(1);
par_errors_[task_id] = TVMGetLastError();
has_error_.store(true);
}
// Signal that one job has finished.
void SignalJobFinish() { num_pending_.fetch_sub(1); }
// Get thread local version of the store.
static ParallelLauncher* ThreadLocal() { return dmlc::ThreadLocalStore<ParallelLauncher>::Get(); }
// The parallel lambda
FTVMParallelLambda flambda;
// The closure data
void* cdata;
// Local env
TVMParallelGroupEnv env;
// Whether this thread is worker of the pool.
// used to prevent recursive launch.
bool is_worker{false};
private:
// The pending jobs.
std::atomic<int32_t> num_pending_;
// Whether error has been countered.
std::atomic<bool> has_error_;
// The counter page.
std::atomic<int32_t>* sync_counter_{nullptr};
// The error message
std::vector<std::string> par_errors_;
};
/*! \brief Lock-free single-producer-single-consumer queue for each thread */
class SpscTaskQueue {
public:
/*! \brief The task entry */
struct Task {
ParallelLauncher* launcher;
int32_t task_id;
};
SpscTaskQueue() : buffer_(new Task[kRingSize]), head_(0), tail_(0) {}
~SpscTaskQueue() { delete[] buffer_; }
/*!
* \brief Push a task into the queue and notify the comsumer if it is on wait.
* \param input The task to be dequeued.
*/
void Push(const Task& input) {
while (!Enqueue(input)) {
tvm::runtime::threading::Yield();
}
if (pending_.fetch_add(1) == -1) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.notify_one();
}
}
/*!
* \brief Pop a task out of the queue and condition wait if no tasks.
* \param output The pointer to the task to be dequeued.
* \param spin_count The number of iterations to spin before sleep.
* \return Whether pop is successful (true) or we need to exit now (false).
*/
bool Pop(Task* output, uint32_t spin_count) {
// Busy wait a bit when the queue is empty.
// If a new task comes to the queue quickly, this wait avoid the worker from sleeping.
// The default spin count is set by following the typical omp convention
for (uint32_t i = 0; i < spin_count && pending_.load() == 0; ++i) {
tvm::runtime::threading::Yield();
}
if (pending_.fetch_sub(1) == 0) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return pending_.load() >= 0 || exit_now_.load(); });
}
if (exit_now_.load(std::memory_order_relaxed)) {
return false;
}
const uint32_t head = head_.load(std::memory_order_relaxed);
// sanity check if the queue is empty
ICHECK(tail_.load(std::memory_order_acquire) != head);
*output = buffer_[head];
head_.store((head + 1) % kRingSize, std::memory_order_release);
return true;
}
/*!
* \brief Signal to terminate the worker.
*/
void SignalForKill() {
std::lock_guard<std::mutex> lock(mutex_);
exit_now_.store(true);
cv_.notify_all();
}
protected:
/*!
* \brief Lock-free enqueue.
* \param input The task to be enqueued.
* \return Whether the task is enqueued.
*/
bool Enqueue(const Task& input) {
if (exit_now_.load(std::memory_order_relaxed)) return false;
const uint32_t tail = tail_.load(std::memory_order_relaxed);
if ((tail + 1) % kRingSize != (head_.load(std::memory_order_acquire))) {
buffer_[tail] = input;
tail_.store((tail + 1) % kRingSize, std::memory_order_release);
return true;
}
return false;
}
// the cache line paddings are used for avoid false sharing between atomic variables
typedef char cache_line_pad_t[kL1CacheBytes];
cache_line_pad_t pad0_;
// size of the queue, the queue can host size_ - 1 items at most
// define it as a constant for better compiler optimization
static constexpr const int kRingSize = 2;
// pointer to access the item
Task* const buffer_;
cache_line_pad_t pad1_;
// queue head, where one gets a task from the queue
std::atomic<uint32_t> head_;
cache_line_pad_t pad2_;
// queue tail, when one puts a task to the queue
std::atomic<uint32_t> tail_;
cache_line_pad_t pad3_;
// pending tasks in the queue
std::atomic<int8_t> pending_{0};
cache_line_pad_t pad4_;
// signal for exit now
std::atomic<bool> exit_now_{false};
// internal mutex
std::mutex mutex_;
// cv for consumer
std::condition_variable cv_;
};
// The thread pool
class ThreadPool {
public:
ThreadPool() : num_workers_(tvm::runtime::threading::MaxConcurrency()) {
const char* exclude_worker0 = getenv("TVM_EXCLUDE_WORKER0");
if (exclude_worker0 && atoi(exclude_worker0) == 0) {
exclude_worker0_ = false;
}
Init();
}
~ThreadPool() {
for (std::unique_ptr<SpscTaskQueue>& q : queues_) {
q->SignalForKill();
}
threads_.reset();
}
void Reset() {
for (std::unique_ptr<SpscTaskQueue>& q : queues_) {
q->SignalForKill();
}
// Destroy threads before we destory the shared queue, otherwise we segfault on MacOS
threads_.reset();
queues_.clear();
Init();
}
int Launch(FTVMParallelLambda flambda, void* cdata, int num_task, int need_sync) {
ParallelLauncher* launcher = ParallelLauncher::ThreadLocal();
ICHECK(!launcher->is_worker)
<< "Cannot launch parallel job inside worker, consider fuse then parallel";
if (num_task == 0) {
num_task = num_workers_used_;
}
if (need_sync != 0) {
ICHECK_LE(num_task, num_workers_used_)
<< "Request parallel sync task larger than number of threads used "
<< " workers=" << num_workers_used_ << " request=" << num_task;
}
launcher->Init(flambda, cdata, num_task, need_sync != 0);
SpscTaskQueue::Task tsk;
tsk.launcher = launcher;
// if worker0 is taken by the main, queues_[0] is abandoned
for (int i = exclude_worker0_; i < num_task; ++i) {
tsk.task_id = i;
queues_[i]->Push(tsk);
}
// use the main thread to run task 0
if (exclude_worker0_) {
TVMParallelGroupEnv* penv = &(tsk.launcher->env);
if ((*tsk.launcher->flambda)(0, penv, cdata) == 0) {
tsk.launcher->SignalJobFinish();
} else {
tsk.launcher->SignalJobError(tsk.task_id);
}
}
int res = launcher->WaitForJobs();
return res;
}
static ThreadPool* ThreadLocal() { return dmlc::ThreadLocalStore<ThreadPool>::Get(); }
void UpdateWorkerConfiguration(threading::ThreadGroup::AffinityMode mode, int nthreads,
const std::vector<unsigned int>& cpus) {
// this will also reset the affinity of the ThreadGroup
// may use less than the MaxConcurrency number of workers
num_workers_used_ = threads_->Configure(mode, nthreads, exclude_worker0_, cpus);
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction
num_workers_used_ = std::min(num_workers_, num_workers_used_);
}
int32_t NumThreads() const { return num_workers_used_; }
private:
// Shared initialization code
void Init() {
for (int i = 0; i < num_workers_; ++i) {
// The SpscTaskQueue only hosts ONE item at a time
queues_.emplace_back(std::make_unique<SpscTaskQueue>());
}
threads_ = std::make_unique<tvm::runtime::threading::ThreadGroup>(
num_workers_, [this](int worker_id) { this->RunWorker(worker_id); },
exclude_worker0_ /* include_main_thread */);
num_workers_used_ = threads_->Configure(threading::ThreadGroup::kBig, 0, exclude_worker0_);
}
// Internal worker function.
void RunWorker(int worker_id) {
SpscTaskQueue* queue = queues_[worker_id].get();
SpscTaskQueue::Task task;
ParallelLauncher::ThreadLocal()->is_worker = true;
// Initialize the spin count (from envvar TVM_THREAD_POOL_SPIN_COUNT) on
// the global first use of the ThreadPool.
// TODO(tulloch): should we make this configurable via standard APIs?
static size_t spin_count = GetSpinCount();
while (queue->Pop(&task, spin_count)) {
ICHECK(task.launcher != nullptr);
TVMParallelGroupEnv* penv = &(task.launcher->env);
void* cdata = task.launcher->cdata;
if ((*task.launcher->flambda)(task.task_id, penv, cdata) == 0) {
task.launcher->SignalJobFinish();
} else {
task.launcher->SignalJobError(task.task_id);
}
}
}
int num_workers_;
// number of workers used (can be restricted with affinity pref)
int num_workers_used_;
// if or not to exclude worker 0 and use main to run task 0
bool exclude_worker0_{true};
std::vector<std::unique_ptr<SpscTaskQueue> > queues_;
std::unique_ptr<tvm::runtime::threading::ThreadGroup> threads_;
};
/*!
* \brief args[0] is the AffinityMode, args[1] is the number of threads.
* args2 is a list of CPUs which is used to set the CPU affinity.
*/
TVM_REGISTER_GLOBAL("runtime.config_threadpool").set_body([](TVMArgs args, TVMRetValue* rv) {
threading::ThreadGroup::AffinityMode mode =
static_cast<threading::ThreadGroup::AffinityMode>(static_cast<int>(args[0]));
int nthreads = args[1];
std::vector<unsigned int> cpus;
if (args.num_args >= 3) {
Array<String> cpu_array = args[2];
for (auto cpu : cpu_array) {
ICHECK(IsNumber(cpu)) << "The CPU core information '" << cpu << "' is not a number.";
cpus.push_back(std::stoi(cpu));
}
}
threading::Configure(mode, nthreads, cpus);
});
TVM_REGISTER_GLOBAL("runtime.NumThreads").set_body_typed([]() -> int32_t {
return threading::NumThreads();
});
namespace threading {
#if TVM_THREADPOOL_USE_OPENMP
/*!
* \brief Helper function that allows to pin threads to cores in case of multi instance execution
* when we use OpenMP thread pool.
*
* \param mode Affinity mode (now supports only kSpecifyOneCorePerThread and
* kSpecifyThreadShareAllCore).
* \param nthreads The number of threads to use (0 = use all).
* \param cpus A list of CPU ids to set 'cpu affinity'.
*
*/
static void ConfigureOMP(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
const std::vector<unsigned int>& cpus) {
#if defined(__linux__) || defined(__ANDROID__)
const int num_workers = MaxConcurrency();
if (mode == ThreadGroup::kSpecifyOneCorePerThread) {
#pragma omp parallel num_threads(num_workers)
{
int core_id = cpus[omp_get_thread_num()];
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);
#if defined(__ANDROID__)
sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
#else
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
#endif
}
} else if (mode == ThreadGroup::kSpecifyThreadShareAllCore) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (auto id : cpus) {
CPU_SET(id, &cpuset);
}
#pragma omp parallel num_threads(num_workers)
{
#if defined(__ANDROID__)
sched_setaffinity(pthread_self(), sizeof(cpu_set_t), &cpuset);
#else
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
#endif
}
}
#endif
}
#endif
void ResetThreadPool() { tvm::runtime::ThreadPool::ThreadLocal()->Reset(); }
/*!
* \brief configure the CPU id affinity
* \param mode The preferred CPU type (1 = big, -1 = little, -2 = kSpecifyOneCorePerThread,
* -3 = kSpecifyThreadShareAllCore).
* \param nthreads The number of threads to use (0 = use all).
* \param cpus cpus A list of CPUs is used to set the 'cpu affinity' for the worker threads.
*
*/
TVM_DLL void Configure(tvm::runtime::threading::ThreadGroup::AffinityMode mode, int nthreads,
std::vector<unsigned int> cpus) {
tvm::runtime::threading::SetMaxConcurrency(cpus.size());
#if !TVM_THREADPOOL_USE_OPENMP
tvm::runtime::ThreadPool::ThreadLocal()->UpdateWorkerConfiguration(mode, nthreads, cpus);
#else
ConfigureOMP(mode, nthreads, cpus);
#endif
}
int32_t NumThreads() { return tvm::runtime::ThreadPool::ThreadLocal()->NumThreads(); }
} // namespace threading
} // namespace runtime
} // namespace tvm
int TVMBackendParallelLaunch(FTVMParallelLambda flambda, void* cdata, int num_task) {
int num_workers = tvm::runtime::threading::MaxConcurrency();
if (num_workers == 1) {
std::atomic<int32_t> sync_counter{0};
TVMParallelGroupEnv env;
env.num_task = 1;
env.sync_handle = &sync_counter;
(*flambda)(0, &env, cdata);
return 0;
} else {
#if !TVM_THREADPOOL_USE_OPENMP
int res = tvm::runtime::ThreadPool::ThreadLocal()->Launch(flambda, cdata, num_task, 1);
return res;
#else
if (num_task == 0) num_task = num_workers;
omp_set_num_threads(num_task);
#pragma omp parallel num_threads(num_task)
{
TVMParallelGroupEnv env;
env.num_task = num_task;
(*flambda)(omp_get_thread_num(), &env, cdata);
}
return 0;
#endif
}
}
int TVMBackendParallelBarrier(int task_id, TVMParallelGroupEnv* penv) {
#if TVM_THREADPOOL_USE_OPENMP
#pragma omp barrier
#else
using tvm::runtime::kSyncStride;
int num_task = penv->num_task;
std::atomic<int>* sync_counter = reinterpret_cast<std::atomic<int>*>(penv->sync_handle);
int old_counter = sync_counter[task_id * kSyncStride].fetch_add(1, std::memory_order_release);
for (int i = 0; i < num_task; ++i) {
if (i != task_id) {
while (sync_counter[i * kSyncStride].load(std::memory_order_relaxed) <= old_counter) {
tvm::runtime::threading::Yield();
}
}
}
std::atomic_thread_fence(std::memory_order_acquire);
#endif
return 0;
}