blob: 9f7f2cd8d98a7555d7ba5639de5589a91abae537 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*!
* \file threading_backend.cc
* \brief Native threading backend
*/
#include <tvm/runtime/logging.h>
#include <tvm/runtime/threading_backend.h>
#if defined(__linux__) || defined(__ANDROID__)
#if __ANDROID_API__ >= 21
#include <pthread.h>
#endif
#include <fstream>
#include <sstream>
#else
#endif
#if defined(__linux__)
#include <sched.h>
#endif
#if defined(__hexagon__)
extern "C" {
#include <qurt_hvx.h>
}
#include <dlfcn.h>
#include <qurt.h>
#include <stdlib.h>
#define HEXAGON_STACK_SIZE 65536
#define HEXAGON_STACK_ALIGNMENT 32
#endif
#include <algorithm>
#include <thread>
#define CURRENT_THREAD_HANDLE (static_cast<std::thread::native_handle_type>(0))
namespace tvm {
namespace runtime {
namespace threading {
#ifdef __hexagon__
// pthreads are broken on older versions of qurt, so
// we need to use native APIs instead of std::threads
class QuRTThread {
typedef std::function<void()> Callback;
public:
explicit QuRTThread(Callback worker_callback) : worker_callback_(worker_callback) {
static int id = 1;
qurt_thread_attr_t attr;
char name[32];
int ret = posix_memalign(&stack_, HEXAGON_STACK_ALIGNMENT, HEXAGON_STACK_SIZE);
CHECK_EQ(ret, 0);
// When a std::function<> is cast to bool,
// it indicates whether it stores a callable target
CHECK_EQ((bool)worker_callback_, true);
qurt_thread_attr_init(&attr);
qurt_thread_attr_set_stack_size(&attr, HEXAGON_STACK_SIZE);
qurt_thread_attr_set_stack_addr(&attr, stack_);
snprintf(name, sizeof(name), "worker %d", id++);
qurt_thread_attr_set_name(&attr, name);
ret = qurt_thread_create(&thread_, &attr, (void (*)(void*))RunFunction, this);
CHECK_EQ(ret, QURT_EOK);
}
QuRTThread(QuRTThread&& other)
: thread_(other.thread_),
worker_callback_(std::move(other.worker_callback_)),
stack_(other.stack_) {
other.thread_ = 0;
other.stack_ = nullptr;
}
~QuRTThread() {
if (thread_) {
join();
}
if (stack_) {
free(stack_);
}
}
bool joinable() const { return qurt_thread_get_id() != thread_; }
void join() {
int status;
qurt_thread_join(thread_, &status);
}
private:
static void RunFunction(QuRTThread* qrt_thread) {
qrt_thread->worker_callback_();
qurt_thread_exit(QURT_EOK);
}
qurt_thread_t thread_;
Callback worker_callback_;
void* stack_ = nullptr;
};
#endif // __hexagon__
thread_local int max_concurrency = 0;
class ThreadGroup::Impl {
public:
Impl(int num_workers, std::function<void(int)> worker_callback, bool exclude_worker0)
: num_workers_(num_workers) {
ICHECK_GE(num_workers, 1) << "Requested a non-positive number of worker threads.";
for (int i = exclude_worker0; i < num_workers_; ++i) {
threads_.emplace_back([worker_callback, i] { worker_callback(i); });
}
InitSortedOrder();
}
~Impl() { Join(); }
void Join() {
for (auto& t : threads_) {
if (t.joinable()) t.join();
}
}
int Configure(AffinityMode mode, int nthreads, bool exclude_worker0,
std::vector<unsigned int> cpus) {
int num_workers_used = 0;
switch (mode) {
case kLittle:
num_workers_used = little_count_;
break;
case kBig:
num_workers_used = big_count_;
break;
case kSpecifyOneCorePerThread:
case kSpecifyThreadShareAllCore:
num_workers_used = cpus.size();
sorted_order_ = cpus;
break;
default:
// use default
num_workers_used = threading::MaxConcurrency();
}
// if a specific number was given, use that
if (nthreads) {
num_workers_used = nthreads;
}
// if MaxConcurrency restricted the number of workers (e.g., due to
// hyperthreading), respect the restriction. On CPUs with N logical cores
// and N/2 physical cores this will set affinity to the first N/2 logical
// ones.
num_workers_used = std::min(num_workers_, num_workers_used);
SetAffinity(exclude_worker0, mode);
return num_workers_used;
}
private:
void SetThreadAffinity(std::thread::native_handle_type thread,
const std::vector<unsigned int>& ids) {
#if defined(__linux__) || defined(__ANDROID__)
if (pthread_equal(thread, CURRENT_THREAD_HANDLE)) {
thread = pthread_self();
}
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (auto id : ids) {
CPU_SET(id, &cpuset);
}
#if defined(__ANDROID__)
#if __ANDROID_API__ >= 21
pid_t tid = pthread_gettid_np(thread);
#else
typedef struct {
void* next;
void* pred;
pid_t tid;
} pthread_internal;
pid_t tid = reinterpret_cast<pthread_internal*>(thread)->tid;
#endif
if (sched_setaffinity(tid, sizeof(cpu_set_t), &cpuset) != 0) {
LOG(WARNING) << "sched_setaffinity failed";
}
#else
pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
#endif
#endif
}
// bind worker threads to disjoint cores
// if worker 0 is offloaded to main, i.e. exclude_worker0 is true,
// the main thread is bound to core 0.
void SetAffinity(bool exclude_worker0, AffinityMode mode) {
#ifndef __hexagon__
const char* val = getenv("TVM_BIND_THREADS");
if (val != nullptr && atoi(val) != 1) {
return;
}
// Do not set affinity if there are more workers than found cores and mode is not kSpecify*.
if (sorted_order_.size() < static_cast<unsigned int>(num_workers_)) {
switch (mode) {
// When the mode is kSpecifyOneCorePerThread or kSpecifyThreadShareAllCore, we should
// let the threads share all the cpu cores.
case kSpecifyOneCorePerThread:
case kSpecifyThreadShareAllCore:
for (unsigned i = 0; i < threads_.size(); ++i) {
SetThreadFullCpuAffinity(threads_[i].native_handle(), mode);
}
if (exclude_worker0) { // main thread run task
SetMainThreadFullCpuAffinity(mode);
}
break;
case kLittle:
case kBig:
default:
LOG(WARNING) << "The thread affinity cannot be set when the number of workers"
<< "is larger than the number of available cores in the system.";
break;
}
} else {
ICHECK_GE(sorted_order_.size(), num_workers_);
switch (mode) {
case kSpecifyThreadShareAllCore:
for (unsigned i = 0; i < threads_.size(); ++i) {
SetThreadFullCpuAffinity(threads_[i].native_handle(), mode);
}
break;
case kLittle:
case kBig:
case kSpecifyOneCorePerThread:
for (unsigned i = 0; i < threads_.size(); ++i) {
bool reverse = mode == kLittle;
unsigned core_id;
if (reverse) {
core_id = sorted_order_[sorted_order_.size() - (i + exclude_worker0) - 1];
} else {
core_id = sorted_order_[i + exclude_worker0];
}
SetThreadAffinity(threads_[i].native_handle(), {core_id});
}
break;
}
if (exclude_worker0) { // main thread run task
// Main thread will have free migration on needed cores.
// Typically, the OS will schedule the main thread to run at core 0,
// which is idle, when other workers are running.
// See the comment inside SetMainThreadFullCpuAffinity function to get more detail.
SetMainThreadFullCpuAffinity(mode);
}
}
#endif // __hexagon__
}
void SetThreadFullCpuAffinity(std::thread::native_handle_type thread, AffinityMode mode) {
// For example, we have 2xA72 + 4xA53 (id is 0 - 5, 4, 5 is A72 big core)
// And we use config_threadpool API to set we will only use 4xA53.
// The sorted_order will be [4, 5, 0, 1, 2, 3].
// When to call this API, we have spawn threads on little cores for other workers
// in SetAffinity function. And for tvm main thread, it should also run on little cores,
// not big cores (4, 5).
// Note: this works well on x86 too. Because x86 doesn't have BIG.LITTLE,
// our implementation will use kBig mode by default and will let main thread
// run on intended cores.
#ifndef __hexagon__
std::vector<unsigned> ids;
switch (mode) {
case kSpecifyOneCorePerThread:
case kSpecifyThreadShareAllCore:
for (size_t i = 0; i < sorted_order_.size(); ++i) {
ids.push_back(sorted_order_[i]);
}
break;
case kLittle:
for (int i = 0; i < little_count_; ++i) {
ids.push_back(sorted_order_[sorted_order_.size() - i - 1]);
}
break;
case kBig:
int num_cpu_workers = std::min(MaxConcurrency(), big_count_);
for (int i = 0; i < num_cpu_workers; ++i) {
ids.push_back(sorted_order_[i]);
}
break;
}
SetThreadAffinity(thread, ids);
#endif // __hexagon__
}
void SetMainThreadFullCpuAffinity(AffinityMode mode) {
SetThreadFullCpuAffinity(CURRENT_THREAD_HANDLE, mode);
}
void InitSortedOrder() {
unsigned int threads = std::thread::hardware_concurrency();
#if defined(__hexagon__)
// With unsigned PDs, getting the number of available hardware threads
// is not supported in earlier versions of QuRT. In such cases assume 4.
if (threads == 0) threads = 4;
#endif
std::vector<std::pair<unsigned int, int64_t>> max_freqs;
for (unsigned int i = 0; i < threads; ++i) {
int64_t cur_freq = 0;
#if defined(__linux__) || defined(__ANDROID__)
std::ostringstream filepath;
filepath << "/sys/devices/system/cpu/cpu" << i << "/cpufreq/scaling_max_freq";
std::ifstream ifs(filepath.str());
if (!ifs.fail()) {
if (!(ifs >> cur_freq)) {
cur_freq = -1;
}
ifs.close();
}
#endif
max_freqs.push_back(std::make_pair(i, cur_freq));
}
auto fcmpbyfreq = [](const std::pair<unsigned int, int64_t>& a,
const std::pair<unsigned int, int64_t>& b) {
return a.second == b.second ? a.first < b.first : a.second > b.second;
};
std::sort(max_freqs.begin(), max_freqs.end(), fcmpbyfreq);
int64_t big_freq = max_freqs.begin()->second;
int64_t little_freq = max_freqs.rbegin()->second;
for (auto it = max_freqs.begin(); it != max_freqs.end(); it++) {
sorted_order_.push_back(it->first);
if (big_freq == it->second) {
big_count_++;
}
if (big_freq != little_freq && little_freq == it->second) {
little_count_++;
}
}
if (big_count_ + little_count_ != static_cast<int>(sorted_order_.size())) {
LOG(WARNING) << "more than two frequencies detected!";
}
}
int num_workers_;
#if defined(__hexagon__)
std::vector<QuRTThread> threads_;
#else
std::vector<std::thread> threads_;
#endif
std::vector<unsigned int> sorted_order_;
int big_count_ = 0;
int little_count_ = 0;
};
ThreadGroup::ThreadGroup(int num_workers, std::function<void(int)> worker_callback,
bool exclude_worker0)
: impl_(new ThreadGroup::Impl(num_workers, worker_callback, exclude_worker0)) {}
ThreadGroup::~ThreadGroup() { delete impl_; }
void ThreadGroup::Join() { impl_->Join(); }
int ThreadGroup::Configure(AffinityMode mode, int nthreads, bool exclude_worker0,
std::vector<unsigned int> cpus) {
return impl_->Configure(mode, nthreads, exclude_worker0, cpus);
}
void Yield() {
#ifdef __hexagon__
// QuRT doesn't have a yield API, so instead we sleep for the minimum amount
// of time to let the OS schedule another thread. std::this_thread::yield()
// compiles down to an empty function.
qurt_sleep(1);
#else
std::this_thread::yield();
#endif
}
/*!
* \brief Set the maximum number of available cores.
*/
void SetMaxConcurrency(int value) {
if (value < 0) {
LOG(WARNING) << "The value of maximum concurrency '" << value << "' can not be negative "
<< "the setting of maximum concurrency is not success.";
return;
}
max_concurrency = value;
}
int MaxConcurrency() {
int max_concurrency = 1;
if (tvm::runtime::threading::max_concurrency != 0) {
max_concurrency = tvm::runtime::threading::max_concurrency;
} else {
const char* val = getenv("TVM_NUM_THREADS");
if (val == nullptr) {
val = getenv("OMP_NUM_THREADS");
}
if (val != nullptr) {
max_concurrency = atoi(val);
} else {
max_concurrency = std::thread::hardware_concurrency();
#if defined(_M_X64) || defined(__x86_64__)
max_concurrency /= 2; // ignore hyper-threading
#elif defined(__hexagon__)
// Ideally max_concurrency is set to the total count of 128B
// HVX units available. This prevenets threads unable to lock
// an HVX unit from scheduling work on the Scalar cores instead
// of HVX.
int num_hvx128_contexts = (qurt_hvx_get_units() >> 8) & 0xFF;
// With unsigned PDs, getting the number of available hardware threads
// is not supported in earlier versions of QuRT. In such cases assume
// the number of HVX units available. If running on simulator, set
// max_concurrency to 1.
if (max_concurrency == 0) {
if (dlsym(RTLD_DEFAULT, "running_in_sim_dev_17bc90206f6cf5a7")) {
max_concurrency = 1;
} else {
max_concurrency = num_hvx128_contexts;
}
} else {
// If the hardware_concurrency has already set the max_concurrency to
// a non-zero value then make sure it is not greater than the number
// of HVX units available.
max_concurrency = std::min(num_hvx128_contexts, max_concurrency);
}
#endif
}
}
return std::max(max_concurrency, 1);
}
} // namespace threading
} // namespace runtime
} // namespace tvm