blob: 782137a558e3e4b875ac7b8f567ecd2d3d92542a [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include "tscore/ink_platform.h"
#include "I_Continuation.h"
#include "I_Processor.h"
#include "I_Event.h"
#include <atomic>
#ifdef TS_MAX_THREADS_IN_EACH_THREAD_TYPE
constexpr int MAX_THREADS_IN_EACH_TYPE = TS_MAX_THREADS_IN_EACH_THREAD_TYPE;
#else
constexpr int MAX_THREADS_IN_EACH_TYPE = 3072;
#endif
#ifdef TS_MAX_NUMBER_EVENT_THREADS
constexpr int MAX_EVENT_THREADS = TS_MAX_NUMBER_EVENT_THREADS;
#else
constexpr int MAX_EVENT_THREADS = 4096;
#endif
class EThread;
/**
Main processor for the Event System. The EventProcessor is the core
component of the Event System. Once started, it is responsible for
creating and managing groups of threads that execute user-defined
tasks asynchronously at a given time or periodically.
The EventProcessor provides a set of scheduling functions through
which you can specify continuations to be called back by one of its
threads. These function calls do not block. Instead they return an
Event object and schedule the callback to the continuation passed in at
a later or specific time, as soon as possible or at certain intervals.
Singleton model:
Every executable that imports and statically links against the
EventSystem library is provided with a global instance of the
EventProcessor called eventProcessor. Therefore, it is not necessary to
create instances of the EventProcessor class because it was designed
as a singleton. It is important to note that none of its functions
are reentrant.
Thread Groups (Event types):
When the EventProcessor is started, the first group of threads is spawned and it is assigned the
special id ET_CALL. Depending on the complexity of the state machine or protocol, you may be
interested in creating additional threads and the EventProcessor gives you the ability to create a
single thread or an entire group of threads. In the former case, you call spawn_thread and the
thread is independent of the thread groups and it exists as long as your continuation handle
executes and there are events to process. In the latter, you call @c registerEventType to get an
event type and then @c spawn_event_theads which creates the threads in the group of that
type. Such threads require events to be scheduled on a specific thread in the group or for the
group in general using the event type. Note that between these two calls @c
EThread::schedule_spawn can be used to set up per thread initialization.
Callback event codes:
@b UNIX: For all of the scheduling functions, the callback_event
parameter is not used. On a callback, the event code passed in to
the continuation handler is always EVENT_IMMEDIATE.
@b NT: The value of the event code passed in to the continuation
handler is the value provided in the callback_event parameter.
Event allocation policy:
Events are allocated and deallocated by the EventProcessor. A state
machine may access the returned, non-recurring event until it is
cancelled or the callback from the event is complete. For recurring
events, the Event may be accessed until it is cancelled. Once the event
is complete or cancelled, it's the eventProcessor's responsibility to
deallocate it.
*/
class EventProcessor : public Processor
{
public:
/** Register an event type with @a name.
This must be called to get an event type to pass to @c spawn_event_threads
@see spawn_event_threads
*/
EventType register_event_type(char const *name);
/**
Spawn an additional thread for calling back the continuation. Spawns
a dedicated thread (EThread) that calls back the continuation passed
in as soon as possible.
@param cont continuation that the spawn thread will call back
immediately.
@return event object representing the start of the thread.
*/
Event *spawn_thread(Continuation *cont, const char *thr_name, size_t stacksize = 0);
/** Spawn a group of @a n_threads event dispatching threads.
The threads run an event loop which dispatches events scheduled for a specific thread or the event type.
@return EventType or thread id for the new group of threads (@a ev_type)
*/
EventType spawn_event_threads(EventType ev_type, int n_threads, size_t stacksize = DEFAULT_STACKSIZE);
/// Convenience overload.
/// This registers @a name as an event type using @c registerEventType and then calls the real @c spawn_event_threads
EventType spawn_event_threads(const char *name, int n_thread, size_t stacksize = DEFAULT_STACKSIZE);
/**
Schedules the continuation on a specific EThread to receive an event
at the given timeout. Requests the EventProcessor to schedule
the callback to the continuation 'c' at the time specified in
'atimeout_at'. The event is assigned to the specified EThread.
@param c Continuation to be called back at the time specified in
'atimeout_at'.
@param atimeout_at time value at which to callback.
@param ethread EThread on which to schedule the event.
@param callback_event code to be passed back to the continuation's
handler. See the Remarks section.
@param cookie user-defined value or pointer to be passed back in
the Event's object cookie field.
@return reference to an Event object representing the scheduling
of this callback.
*/
Event *schedule_imm(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
void *cookie = nullptr);
/**
Schedules the continuation on a specific thread group to receive an
event at the given timeout. Requests the EventProcessor to schedule
the callback to the continuation 'c' at the time specified in
'atimeout_at'. The callback is handled by a thread in the specified
thread group (event_type).
@param c Continuation to be called back at the time specified in
'atimeout_at'.
@param atimeout_at Time value at which to callback.
@param event_type thread group id (or event type) specifying the
group of threads on which to schedule the callback.
@param callback_event code to be passed back to the continuation's
handler. See the Remarks section.
@param cookie user-defined value or pointer to be passed back in
the Event's object cookie field.
@return reference to an Event object representing the scheduling of
this callback.
*/
Event *schedule_at(Continuation *c, ink_hrtime atimeout_at, EventType event_type = ET_CALL, int callback_event = EVENT_INTERVAL,
void *cookie = nullptr);
/**
Schedules the continuation on a specific thread group to receive an
event after the specified timeout elapses. Requests the EventProcessor
to schedule the callback to the continuation 'c' after the time
specified in 'atimeout_in' elapses. The callback is handled by a
thread in the specified thread group (event_type).
@param c Continuation to call back aftert the timeout elapses.
@param atimeout_in amount of time after which to callback.
@param event_type Thread group id (or event type) specifying the
group of threads on which to schedule the callback.
@param callback_event code to be passed back to the continuation's
handler. See the Remarks section.
@param cookie user-defined value or pointer to be passed back in
the Event's object cookie field.
@return reference to an Event object representing the scheduling of
this callback.
*/
Event *schedule_in(Continuation *c, ink_hrtime atimeout_in, EventType event_type = ET_CALL, int callback_event = EVENT_INTERVAL,
void *cookie = nullptr);
/**
Schedules the continuation on a specific thread group to receive
an event periodically. Requests the EventProcessor to schedule the
callback to the continuation 'c' every time 'aperiod' elapses. The
callback is handled by a thread in the specified thread group
(event_type).
@param c Continuation to call back every time 'aperiod' elapses.
@param aperiod duration of the time period between callbacks.
@param event_type thread group id (or event type) specifying the
group of threads on which to schedule the callback.
@param callback_event code to be passed back to the continuation's
handler. See the Remarks section.
@param cookie user-defined value or pointer to be passed back in
the Event's object cookie field.
@return reference to an Event object representing the scheduling of
this callback.
*/
Event *schedule_every(Continuation *c, ink_hrtime aperiod, EventType event_type = ET_CALL, int callback_event = EVENT_INTERVAL,
void *cookie = nullptr);
////////////////////////////////////////////
// reschedule an already scheduled event. //
// may be called directly or called by //
// schedule_xxx Event member functions. //
// The returned value may be different //
// from the argument e. //
////////////////////////////////////////////
Event *reschedule_imm(Event *e, int callback_event = EVENT_IMMEDIATE);
Event *reschedule_at(Event *e, ink_hrtime atimeout_at, int callback_event = EVENT_INTERVAL);
Event *reschedule_in(Event *e, ink_hrtime atimeout_in, int callback_event = EVENT_INTERVAL);
Event *reschedule_every(Event *e, ink_hrtime aperiod, int callback_event = EVENT_INTERVAL);
/// Schedule an @a event on continuation @a c when a thread of type @a ev_type is spawned.
/// The @a cookie is attached to the event instance passed to the continuation.
/// @return The scheduled event.
Event *schedule_spawn(Continuation *c, EventType ev_type, int event = EVENT_IMMEDIATE, void *cookie = nullptr);
/// Schedule the function @a f to be called in a thread of type @a ev_type when it is spawned.
Event *schedule_spawn(void (*f)(EThread *), EventType ev_type);
/// Schedule an @a event on continuation @a c to be called when a thread is spawned by this processor.
/// The @a cookie is attached to the event instance passed to the continuation.
/// @return The scheduled event.
// Event *schedule_spawn(Continuation *c, int event, void *cookie = NULL);
EventProcessor();
~EventProcessor() override;
EventProcessor(const EventProcessor &) = delete;
EventProcessor &operator=(const EventProcessor &) = delete;
/**
Initializes the EventProcessor and its associated threads. Spawns the
specified number of threads, initializes their state information and
sets them running. It creates the initial thread group, represented
by the event type ET_CALL.
@return 0 if successful, and a negative value otherwise.
*/
int start(int n_net_threads, size_t stacksize = DEFAULT_STACKSIZE) override;
/**
Stop the EventProcessor. Attempts to stop the EventProcessor and
all of the threads in each of the thread groups.
*/
void shutdown() override;
/**
Allocates size bytes on the event threads. This function is thread
safe.
@param size bytes to be allocated.
*/
off_t allocate(int size);
/**
An array of pointers to all of the EThreads handled by the
EventProcessor. An array of pointers to all of the EThreads created
throughout the existence of the EventProcessor instance.
*/
EThread *all_ethreads[MAX_EVENT_THREADS];
/**
An array of pointers, organized by thread group, to all of the
EThreads handled by the EventProcessor. An array of pointers to all of
the EThreads created throughout the existence of the EventProcessor
instance. It is a two-dimensional array whose first dimension is the
thread group id and the second the EThread pointers for that group.
*/
// EThread *eventthread[MAX_EVENT_TYPES][MAX_THREADS_IN_EACH_TYPE];
/// Data kept for each thread group.
/// The thread group ID is the index into an array of these and so is not stored explicitly.
struct ThreadGroupDescriptor {
std::string _name; ///< Name for the thread group.
int _count = 0; ///< # of threads of this type.
std::atomic<int> _started = 0; ///< # of started threads of this type.
uint64_t _next_round_robin = 0; ///< Index of thread to use for events assigned to this group.
Que(Event, link) _spawnQueue; ///< Events to dispatch when thread is spawned.
EThread *_thread[MAX_THREADS_IN_EACH_TYPE] = {}; ///< The actual threads in this group.
std::function<void()> _afterStartCallback = nullptr;
};
/// Storage for per group data.
ThreadGroupDescriptor thread_group[MAX_EVENT_TYPES];
/// Number of defined thread groups.
int n_thread_groups = 0;
/**
Total number of threads controlled by this EventProcessor. This is
the count of all the EThreads spawn by this EventProcessor, excluding
those created by spawn_thread
*/
int n_ethreads = 0;
bool has_tg_started(int etype);
/*------------------------------------------------------*\
| Unix & non NT Interface |
\*------------------------------------------------------*/
Event *schedule(Event *e, EventType etype);
EThread *assign_thread(EventType etype);
EThread *assign_affinity_by_type(Continuation *cont, EventType etype);
EThread *all_dthreads[MAX_EVENT_THREADS];
int n_dthreads = 0; // No. of dedicated threads
int thread_data_used = 0;
/// Provide container style access to just the active threads, not the entire array.
class active_threads_type
{
using iterator = EThread *const *; ///< Internal iterator type, pointer to array element.
public:
iterator
begin() const
{
return _begin;
}
iterator
end() const
{
return _end;
}
private:
iterator _begin; ///< Start of threads.
iterator _end; ///< End of threads.
/// Construct from base of the array (@a start) and the current valid count (@a n).
active_threads_type(iterator start, int n) : _begin(start), _end(start + n) {}
friend class EventProcessor;
};
// These can be used in container for loops and other range operations.
active_threads_type
active_ethreads() const
{
return {all_ethreads, n_ethreads};
}
active_threads_type
active_dthreads() const
{
return {all_dthreads, n_dthreads};
}
active_threads_type
active_group_threads(int type) const
{
ThreadGroupDescriptor const &group{thread_group[type]};
return {group._thread, group._count};
}
private:
void initThreadState(EThread *);
/// Used to generate a callback at the start of thread execution.
class ThreadInit : public Continuation
{
typedef ThreadInit self;
EventProcessor *_evp;
public:
explicit ThreadInit(EventProcessor *evp) : _evp(evp) { SET_HANDLER(&self::init); }
int
init(int /* event ATS_UNUSED */, Event *ev)
{
_evp->initThreadState(ev->ethread);
return 0;
}
};
friend class ThreadInit;
ThreadInit thread_initializer;
// Lock write access to the dedicated thread vector.
// @internal Not a @c ProxyMutex - that's a whole can of problems due to initialization ordering.
ink_mutex dedicated_thread_spawn_mutex;
};
extern inkcoreapi class EventProcessor eventProcessor;
void thread_started(EThread *);