blob: c2f7fade8bad66866e4a806dcd8168021472d686 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "P_EventSystem.h"
#include <sched.h>
#if TS_USE_HWLOC
#if HAVE_ALLOCA_H
#include <alloca.h>
#endif
#include <hwloc.h>
#endif
#include "tscore/ink_defs.h"
#include "tscore/hugepages.h"
/// Global singleton.
class EventProcessor eventProcessor;
class ThreadAffinityInitializer : public Continuation
{
using self = ThreadAffinityInitializer;
public:
/// Default construct.
ThreadAffinityInitializer() { SET_HANDLER(&self::set_affinity); }
/// Load up basic affinity data.
void init();
/// Set the affinity for the current thread.
int set_affinity(int, Event *);
/// Allocate a stack.
/// @internal This is the external entry point and is different depending on
/// whether HWLOC is enabled.
void *alloc_stack(EThread *t, size_t stacksize);
protected:
/// Allocate a hugepage stack.
/// If huge pages are not enable, allocate a basic stack.
void *alloc_hugepage_stack(size_t stacksize);
#if TS_USE_HWLOC
/// Allocate a stack based on NUMA information, if possible.
void *alloc_numa_stack(EThread *t, size_t stacksize);
private:
hwloc_obj_type_t obj_type = HWLOC_OBJ_MACHINE;
int obj_count = 0;
char const *obj_name = nullptr;
#endif
};
ThreadAffinityInitializer Thread_Affinity_Initializer;
namespace
{
int
EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int)
{
int id = 0;
EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES];
// scan the thread local values
for (EThread *t : eventProcessor.active_group_threads(ET_CALL)) {
t->summarize_stats(summary);
}
ink_mutex_acquire(&(rsb->mutex));
for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) {
EThread::EventMetrics *m = summary + ts_idx;
// Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful.
rsb->global[id + EThread::STAT_LOOP_COUNT]->sum = m->_count;
rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT);
rsb->global[id + EThread::STAT_LOOP_WAIT]->sum = m->_wait;
rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT);
rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum = m->_loop_time._min;
rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN);
rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum = m->_loop_time._max;
rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX);
rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum = m->_events._total;
rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS);
rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum = m->_events._min;
rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN);
rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum = m->_events._max;
rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1;
RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX);
}
ink_mutex_release(&(rsb->mutex));
return REC_ERR_OKAY;
}
/// This is a wrapper used to convert a static function into a continuation. The function pointer is
/// passed in the cookie. For this reason the class is used as a singleton.
/// @internal This is the implementation for @c schedule_spawn... overloads.
class ThreadInitByFunc : public Continuation
{
public:
ThreadInitByFunc() { SET_HANDLER(&ThreadInitByFunc::invoke); }
int
invoke(int, Event *ev)
{
void (*f)(EThread *) = reinterpret_cast<void (*)(EThread *)>(ev->cookie);
f(ev->ethread);
return 0;
}
} Thread_Init_Func;
} // namespace
void *
ThreadAffinityInitializer::alloc_hugepage_stack(size_t stacksize)
{
return ats_hugepage_enabled() ? ats_alloc_hugepage(stacksize) : ats_memalign(ats_pagesize(), stacksize);
}
#if TS_USE_HWLOC
void
ThreadAffinityInitializer::init()
{
int affinity = 1;
REC_ReadConfigInteger(affinity, "proxy.config.exec_thread.affinity");
switch (affinity) {
case 4: // assign threads to logical processing units
// Older versions of libhwloc (eg. Ubuntu 10.04) don't have HWLOC_OBJ_PU.
#if HAVE_HWLOC_OBJ_PU
obj_type = HWLOC_OBJ_PU;
obj_name = "Logical Processor";
break;
#endif
case 3: // assign threads to real cores
obj_type = HWLOC_OBJ_CORE;
obj_name = "Core";
break;
case 1: // assign threads to NUMA nodes (often 1:1 with sockets)
obj_type = HWLOC_OBJ_NODE;
obj_name = "NUMA Node";
if (hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type) > 0) {
break;
}
// fallthrough
case 2: // assign threads to sockets
obj_type = HWLOC_OBJ_SOCKET;
obj_name = "Socket";
break;
default: // assign threads to the machine as a whole (a level below SYSTEM)
obj_type = HWLOC_OBJ_MACHINE;
obj_name = "Machine";
}
obj_count = hwloc_get_nbobjs_by_type(ink_get_topology(), obj_type);
Debug("iocore_thread", "Affinity: %d %ss: %d PU: %d", affinity, obj_name, obj_count, ink_number_of_processors());
}
int
ThreadAffinityInitializer::set_affinity(int, Event *)
{
EThread *t = this_ethread();
if (obj_count > 0) {
// Get our `obj` instance with index based on the thread number we are on.
hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
#if HWLOC_API_VERSION >= 0x00010100
int cpu_mask_len = hwloc_bitmap_snprintf(nullptr, 0, obj->cpuset) + 1;
char *cpu_mask = (char *)alloca(cpu_mask_len);
hwloc_bitmap_snprintf(cpu_mask, cpu_mask_len, obj->cpuset);
Debug("iocore_thread", "EThread: %p %s: %d CPU Mask: %s\n", t, obj_name, obj->logical_index, cpu_mask);
#else
Debug("iocore_thread", "EThread: %d %s: %d", _name, obj->logical_index);
#endif // HWLOC_API_VERSION
hwloc_set_thread_cpubind(ink_get_topology(), t->tid, obj->cpuset, HWLOC_CPUBIND_STRICT);
} else {
Warning("hwloc returned an unexpected number of objects -- CPU affinity disabled");
}
return 0;
}
void *
ThreadAffinityInitializer::alloc_numa_stack(EThread *t, size_t stacksize)
{
hwloc_membind_policy_t mem_policy = HWLOC_MEMBIND_DEFAULT;
hwloc_nodeset_t nodeset = hwloc_bitmap_alloc();
int num_nodes = 0;
void *stack = nullptr;
hwloc_obj_t obj = hwloc_get_obj_by_type(ink_get_topology(), obj_type, t->id % obj_count);
// Find the NUMA node set that correlates to our next thread CPU set
hwloc_cpuset_to_nodeset(ink_get_topology(), obj->cpuset, nodeset);
// How many NUMA nodes will we be needing to allocate across?
num_nodes = hwloc_get_nbobjs_inside_cpuset_by_type(ink_get_topology(), obj->cpuset, HWLOC_OBJ_NODE);
if (num_nodes == 1) {
// The preferred memory policy. The thread lives in one NUMA node.
mem_policy = HWLOC_MEMBIND_BIND;
} else if (num_nodes > 1) {
// If we have mode than one NUMA node we should interleave over them.
mem_policy = HWLOC_MEMBIND_INTERLEAVE;
}
if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
// Let's temporarily set the memory binding to our destination NUMA node
#if HWLOC_API_VERSION >= 0x20000
hwloc_set_membind(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
#else
hwloc_set_membind_nodeset(ink_get_topology(), nodeset, mem_policy, HWLOC_MEMBIND_THREAD);
#endif
}
// Alloc our stack
stack = this->alloc_hugepage_stack(stacksize);
if (mem_policy != HWLOC_MEMBIND_DEFAULT) {
// Now let's set it back to default for this thread.
#if HWLOC_API_VERSION >= 0x20000
hwloc_set_membind(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
#else
hwloc_set_membind_nodeset(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_DEFAULT,
HWLOC_MEMBIND_THREAD);
#endif
}
hwloc_bitmap_free(nodeset);
return stack;
}
void *
ThreadAffinityInitializer::alloc_stack(EThread *t, size_t stacksize)
{
return this->obj_count > 0 ? this->alloc_numa_stack(t, stacksize) : this->alloc_hugepage_stack(stacksize);
}
#else
void
ThreadAffinityInitializer::init()
{
}
int
ThreadAffinityInitializer::set_affinity(int, Event *)
{
return 0;
}
void *
ThreadAffinityInitializer::alloc_stack(EThread *, size_t stacksize)
{
return this->alloc_hugepage_stack(stacksize);
}
#endif // TS_USE_HWLOC
EventProcessor::EventProcessor() : thread_initializer(this)
{
ink_zero(all_ethreads);
ink_zero(all_dthreads);
ink_mutex_init(&dedicated_thread_spawn_mutex);
// Because ET_NET is compile time set to 0 it *must* be the first type registered.
this->register_event_type("ET_NET");
}
EventProcessor::~EventProcessor()
{
ink_mutex_destroy(&dedicated_thread_spawn_mutex);
}
namespace
{
Event *
make_event_for_scheduling(Continuation *c, int event_code, void *cookie)
{
Event *e = eventAllocator.alloc();
e->init(c);
e->mutex = c->mutex;
e->callback_event = event_code;
e->cookie = cookie;
return e;
}
} // namespace
Event *
EventProcessor::schedule_spawn(Continuation *c, EventType ev_type, int event_code, void *cookie)
{
Event *e = make_event_for_scheduling(c, event_code, cookie);
ink_assert(ev_type < MAX_EVENT_TYPES);
thread_group[ev_type]._spawnQueue.enqueue(e);
return e;
}
Event *
EventProcessor::schedule_spawn(void (*f)(EThread *), EventType ev_type)
{
Event *e = make_event_for_scheduling(&Thread_Init_Func, EVENT_IMMEDIATE, reinterpret_cast<void *>(f));
ink_assert(ev_type < MAX_EVENT_TYPES);
thread_group[ev_type]._spawnQueue.enqueue(e);
return e;
}
EventType
EventProcessor::register_event_type(char const *name)
{
ThreadGroupDescriptor *tg = &(thread_group[n_thread_groups++]);
ink_release_assert(n_thread_groups <= MAX_EVENT_TYPES); // check for overflow
tg->_name = name;
return n_thread_groups - 1;
}
EventType
EventProcessor::spawn_event_threads(char const *name, int n_threads, size_t stacksize)
{
int ev_type = this->register_event_type(name);
this->spawn_event_threads(ev_type, n_threads, stacksize);
return ev_type;
}
EventType
EventProcessor::spawn_event_threads(EventType ev_type, int n_threads, size_t stacksize)
{
char thr_name[MAX_THREAD_NAME_LENGTH];
int i;
ThreadGroupDescriptor *tg = &(thread_group[ev_type]);
ink_release_assert(n_threads > 0);
ink_release_assert((n_ethreads + n_threads) <= MAX_EVENT_THREADS);
ink_release_assert(ev_type < MAX_EVENT_TYPES);
stacksize = std::max(stacksize, static_cast<decltype(stacksize)>(INK_THREAD_STACK_MIN));
// Make sure it is a multiple of our page size
if (ats_hugepage_enabled()) {
stacksize = INK_ALIGN(stacksize, ats_hugepage_size());
} else {
stacksize = INK_ALIGN(stacksize, ats_pagesize());
}
Debug("iocore_thread", "Thread stack size set to %zu", stacksize);
for (i = 0; i < n_threads; ++i) {
EThread *t = new EThread(REGULAR, n_ethreads + i);
all_ethreads[n_ethreads + i] = t;
tg->_thread[i] = t;
t->id = i; // unfortunately needed to support affinity and NUMA logic.
t->set_event_type(ev_type);
t->schedule_spawn(&thread_initializer);
}
tg->_count = n_threads;
n_ethreads += n_threads;
schedule_spawn(&thread_started, ev_type);
// Separate loop to avoid race conditions between spawn events and updating the thread table for
// the group. Some thread set up depends on knowing the total number of threads but that can't be
// safely updated until all the EThread instances are created and stored in the table.
for (i = 0; i < n_threads; ++i) {
Debug("iocore_thread_start", "Created %s thread #%d", tg->_name.c_str(), i + 1);
snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[%s %d]", tg->_name.c_str(), i);
void *stack = Thread_Affinity_Initializer.alloc_stack(tg->_thread[i], stacksize);
tg->_thread[i]->start(thr_name, stack, stacksize);
}
Debug("iocore_thread", "Created thread group '%s' id %d with %d threads", tg->_name.c_str(), ev_type, n_threads);
return ev_type; // useless but not sure what would be better.
}
// This is called from inside a thread as the @a start_event for that thread. It chains to the
// startup events for the appropriate thread group start events.
void
EventProcessor::initThreadState(EThread *t)
{
// Run all thread type initialization continuations that match the event types for this thread.
for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
if (t->is_event_type(i)) {
// To avoid race conditions on the event in the spawn queue, create a local one to actually send.
// Use the spawn queue event as a read only model.
Event *nev = eventAllocator.alloc();
for (Event *ev = thread_group[i]._spawnQueue.head; nullptr != ev; ev = ev->link.next) {
nev->init(ev->continuation, 0, 0);
nev->ethread = t;
nev->callback_event = ev->callback_event;
nev->mutex = ev->continuation->mutex;
nev->cookie = ev->cookie;
ev->continuation->handleEvent(ev->callback_event, nev);
}
nev->free();
}
}
}
int
EventProcessor::start(int n_event_threads, size_t stacksize)
{
// do some sanity checking.
static bool started = false;
ink_release_assert(!started);
ink_release_assert(n_event_threads > 0 && n_event_threads <= MAX_EVENT_THREADS);
started = true;
Thread_Affinity_Initializer.init();
// Least ugly thing - this needs to be the first callback from the thread but by the time this
// method is called other spawn callbacks have been registered. This forces thread affinity
// first. The other alternative would be to require a call to an @c init method which I like even
// less because this cannot be done in the constructor - that depends on too much other
// infrastructure being in place (e.g. the proxy allocators).
thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr));
// Get our statistics set up
RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES);
char name[256];
for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) {
for (int id = 0; id < EThread::N_EVENT_STATS; ++id) {
snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]);
RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL);
}
}
// Name must be that of a stat, pick one at random since we do all of them in one pass/callback.
RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0);
this->spawn_event_threads(ET_CALL, n_event_threads, stacksize);
Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
return 0;
}
void
EventProcessor::shutdown()
{
}
Event *
EventProcessor::spawn_thread(Continuation *cont, const char *thr_name, size_t stacksize)
{
/* Spawning threads in a live system - There are two potential race conditions in this logic. The
first is multiple calls to this method. In that case @a all_dthreads can end up in a bad state
as the same entry is overwritten while another is left uninitialized.
The other is read/write contention where another thread (e.g. the stats collection thread) is
iterating over the threads while the active count (@a n_dthreads) is being updated causing use
of a not yet initialized array element.
This logic covers both situations. For write/write the actual array update is locked. The
potentially expensive set up is done outside the lock making the time spent locked small. For
read/write it suffices to do the active count increment after initializing the array
element. It's not a problem if, for one cycle, a new thread is skipped.
*/
// Do as much as possible outside the lock. Until the array element and count is changed
// this is thread safe.
Event *e = eventAllocator.alloc();
e->init(cont, 0, 0);
e->ethread = new EThread(DEDICATED, e);
e->mutex = e->ethread->mutex;
cont->mutex = e->ethread->mutex;
{
ink_scoped_mutex_lock lock(dedicated_thread_spawn_mutex);
ink_release_assert(n_dthreads < MAX_EVENT_THREADS);
all_dthreads[n_dthreads] = e->ethread;
++n_dthreads; // Be very sure this is after the array element update.
}
e->ethread->start(thr_name, nullptr, stacksize);
return e;
}
bool
EventProcessor::has_tg_started(int etype)
{
return thread_group[etype]._started == thread_group[etype]._count;
}
void
thread_started(EThread *t)
{
// Find what type of thread this is, and increment the "_started" counter of that thread type.
for (int i = 0; i < MAX_EVENT_TYPES; ++i) {
if (t->is_event_type(i)) {
if (++eventProcessor.thread_group[i]._started == eventProcessor.thread_group[i]._count &&
eventProcessor.thread_group[i]._afterStartCallback != nullptr) {
eventProcessor.thread_group[i]._afterStartCallback();
}
break;
}
}
}