PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context"
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 66fb15e..21226a9 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -57,7 +57,7 @@
typedef struct pni_timer_t pni_timer_t;
typedef enum {
- WAKE, /* see if any work to do in proactor/psocket context */
+ EVENT_FD, /* schedule() or pn_proactor_interrupt() */
LISTENER_IO,
PCONNECTION_IO,
RAW_CONNECTION_IO,
@@ -67,7 +67,7 @@
// Data to use with epoll.
typedef struct epoll_extended_t {
int fd;
- epoll_type_t type; // io/timer/wakeup
+ epoll_type_t type; // io/timer/eventfd
uint32_t wanted; // events to poll for
bool polling;
pmutex barrier_mutex;
@@ -79,36 +79,36 @@
LISTENER,
RAW_CONNECTION,
TIMER_MANAGER
-} pcontext_type_t;
+} task_type_t;
-typedef struct pcontext_t {
+typedef struct task_t {
pmutex mutex;
pn_proactor_t *proactor; /* Immutable */
- pcontext_type_t type;
+ task_type_t type;
bool working;
- bool on_wake_list;
- bool wake_pending; // unprocessed eventfd wake callback
- struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+ bool on_ready_list;
+ bool ready; // ready to run and on ready list. Poller notified by eventfd.
+ struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
bool closing;
// Next 4 are protected by the proactor mutex
- struct pcontext_t* next; /* Protected by proactor.mutex */
- struct pcontext_t* prev; /* Protected by proactor.mutex */
+ struct task_t* next; /* Protected by proactor.mutex */
+ struct task_t* prev; /* Protected by proactor.mutex */
int disconnect_ops; /* ops remaining before disconnect complete */
bool disconnecting; /* pn_proactor_disconnect */
// Protected by schedule mutex
tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */
tslot_t *prev_runner;
- bool sched_wake;
+ bool sched_ready;
bool sched_pending; /* If true, one or more unseen epoll or other events to process() */
- bool runnable ; /* in need of scheduling */
-} pcontext_t;
+ bool runnable ; /* on one of the runnable lists */
+} task_t;
typedef enum {
NEW,
UNUSED, /* pn_proactor_done() called, may never come back */
SUSPENDED,
- PROCESSING, /* Hunting for a context */
- BATCHING, /* Doing work on behalf of a context */
+ PROCESSING, /* Hunting for a task */
+ BATCHING, /* Doing work on behalf of a task */
DELETING,
POLLING
} tslot_state;
@@ -121,8 +121,8 @@
bool suspended;
volatile bool scheduled;
tslot_state state;
- pcontext_t *context;
- pcontext_t *prev_context;
+ task_t *task;
+ task_t *prev_task;
bool earmarked;
tslot_t *suspend_list_prev;
tslot_t *suspend_list_next;
@@ -131,7 +131,7 @@
};
typedef struct pni_timer_manager_t {
- pcontext_t context;
+ task_t task;
epoll_extended_t epoll_timer;
pmutex deletion_mutex;
pni_timer_t *proactor_timer;
@@ -141,12 +141,12 @@
} pni_timer_manager_t;
struct pn_proactor_t {
- pcontext_t context;
+ task_t task;
pni_timer_manager_t timer_manager;
- epoll_extended_t epoll_wake;
+ epoll_extended_t epoll_schedule; /* ready list */
epoll_extended_t epoll_interrupt;
pn_event_batch_t batch;
- pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */
+ task_t *tasks; /* track in-use tasks for PN_PROACTOR_INACTIVE and disconnect */
pni_timer_t *timer;
size_t disconnects_pending; /* unfinished proactor disconnects*/
// need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
@@ -155,21 +155,21 @@
bool need_timeout;
bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
bool timeout_processed; /* timeout event dispatched in the most recent event batch */
- int context_count;
+ int task_count;
- // wake subsystem
+ // ready list subsystem
int eventfd;
pmutex eventfd_mutex;
- bool wakes_in_progress;
- pcontext_t *wake_list_first;
- pcontext_t *wake_list_last;
+ bool ready_list_active;
+ task_t *ready_list_first;
+ task_t *ready_list_last;
// Interrupts have a dedicated eventfd because they must be async-signal safe.
int interruptfd;
// If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
acceptor_t *overflow;
pmutex overflow_mutex;
- // Sched vars specific to proactor context.
+ // Sched vars specific to proactor task.
bool sched_interrupt;
// Global scheduling/poller vars.
@@ -185,20 +185,19 @@
tslot_t *poller;
bool poller_suspended;
tslot_t *last_earmark;
- pcontext_t *sched_wake_first;
- pcontext_t *sched_wake_last;
- pcontext_t *sched_wake_current;
+ task_t *sched_ready_first;
+ task_t *sched_ready_last;
+ task_t *sched_ready_current;
pmutex tslot_mutex;
int earmark_count;
bool earmark_drain;
- bool sched_wakes_pending;
// For debugging help for core dumps with optimized code.
pn_event_type_t current_event_type;
// Mostly read only: after init or once thread_count stabilizes
pn_collector_t *collector __attribute__((aligned(64)));
- pcontext_t **warm_runnables;
- pcontext_t **runnables;
+ task_t **warm_runnables;
+ task_t **runnables;
tslot_t **resume_list;
pn_hash_t *tslot_map;
struct epoll_event *kevents;
@@ -219,17 +218,17 @@
} psocket_t;
typedef struct pconnection_t {
- pcontext_t context;
+ task_t task;
psocket_t psocket;
pni_timer_t *timer;
const char *host, *port;
uint32_t new_events;
- int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool)
+ int wake_count; // TODO: protected by task.mutex so should be moved in there (also really bool)
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
pn_condition_t *disconnect_condition;
- // Following values only changed by (sole) working context:
+ // Following values only changed by (sole) working task:
uint32_t current_arm; // active epoll io events
bool connected;
bool read_blocked;
@@ -277,7 +276,7 @@
} accepted_t;
struct pn_listener_t {
- pcontext_t context;
+ task_t task;
acceptor_t *acceptors; /* Array of listening sockets */
size_t acceptors_size;
char addr_buf[PN_MAX_ADDR];
@@ -339,22 +338,22 @@
return pn_collector_peek(p->collector);
}
-bool wake_if_inactive(pn_proactor_t *p);
+bool schedule_if_inactive(pn_proactor_t *p);
int pclosefd(pn_proactor_t *p, int fd);
-void proactor_add(pcontext_t *ctx);
-bool proactor_remove(pcontext_t *ctx);
+void proactor_add(task_t *tsk);
+bool proactor_remove(task_t *tsk);
bool unassign_thread(tslot_t *ts, tslot_state new_state);
-void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p);
-static void pcontext_finalize(pcontext_t* ctx) {
- pmutex_finalize(&ctx->mutex);
+void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p);
+static void task_finalize(task_t* tsk) {
+ pmutex_finalize(&tsk->mutex);
}
-bool wake(pcontext_t *ctx);
-void wake_notify(pcontext_t *ctx);
-void wake_done(pcontext_t *ctx);
+bool schedule(task_t *tsk);
+void notify_poller(task_t *tsk);
+void schedule_done(task_t *tsk);
void psocket_init(psocket_t* ps, epoll_type_t type);
bool start_polling(epoll_extended_t *ee, int epollfd);
@@ -366,11 +365,11 @@
accepted_t *listener_accepted_next(pn_listener_t *listener);
-pcontext_t *pni_psocket_raw_context(psocket_t *ps);
-pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake);
+task_t *pni_psocket_raw_task(psocket_t *ps);
+pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready);
typedef struct praw_connection_t praw_connection_t;
-pcontext_t *pni_raw_connection_context(praw_connection_t *rc);
+task_t *pni_raw_connection_task(praw_connection_t *rc);
praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
void pni_raw_connection_done(praw_connection_t *rc);
@@ -379,7 +378,7 @@
void pni_timer_set(pni_timer_t *timer, uint64_t deadline);
bool pni_timer_manager_init(pni_timer_manager_t *tm);
void pni_timer_manager_finalize(pni_timer_manager_t *tm);
-pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake);
+pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready);
void pni_pconnection_timeout(pconnection_t *pc);
void pni_proactor_timeout(pn_proactor_t *p);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 30ef5f1..ae0c37b 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -25,28 +25,30 @@
waiting for work. The poller calls epoll_wait(), generates work lists, resumes suspended
threads, and grabs work for itself or polls again.
- A serialized grouping of Proton events is a context (connection, listener, proactor).
+ A serialized grouping of Proton events is a task (connection, listener, proactor).
Each has multiple pollable fds that make it schedulable. E.g. a connection could have a
socket fd, (indirect) timerfd, and (indirect) eventfd all signaled in a single epoll_wait().
At the conclusion of each
N = epoll_wait(..., N_MAX, timeout)
- there will be N epoll events and M wakes on the wake list. M can be very large in a
- server with many active connections. The poller makes the contexts "runnable" if they are
- not already running. A running context can only be made runnable once until it completes
- a chunk of work and calls unassign_thread(). (N + M - duplicates) contexts will be
+ there will be N epoll events and M tasks on a ready list. M can be very large in a
+ server with many active connections. The poller makes the tasks "runnable" if they are
+ not already running. A running task cannot be made runnable again until it completes
+ a chunk of work and calls unassign_thread(). (N + M - duplicates) tasks will be
scheduled. A new poller will occur when next_runnable() returns NULL.
- A running context, before it stops "working" must check to see if there were new incoming
- events that the poller posted to the context, but could not make it runnable since it was
- already running. The context will know if it needs to put itself back on the wake list
- to be runnable later to process the pending events.
+ A task may have its own dedicated kernel file descriptor (socket, timerfd) which can be seen
+ by the poller to make the task runnable. A task may aslso be scheduled to run by placing it
+ on a ready queue which is monitored by the poller via two eventfds.
Lock ordering - never add locks right to left:
- context -> sched -> wake
- non-proactor-context -> proactor-context
+ task -> sched -> ready
+ non-proactor-task -> proactor-task
tslot -> sched
+
+ TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
+ transitions from "private to the scheduler" to "visible to the task".
*/
@@ -99,8 +101,8 @@
// Maybe futex is even better?
// See other "TODO" in code.
//
-// Consider case of large number of wakes: next_event_batch() could start by
-// looking for pending wakes before a kernel call to epoll_wait(), or there
+// Consider case of large number of ready tasks: next_event_batch() could start by
+// looking for ready tasks before a kernel call to epoll_wait(), or there
// could be several eventfds with random assignment of wakeables.
@@ -190,119 +192,118 @@
}
/*
- * The proactor maintains a number of serialization contexts: each
+ * The proactor maintains a number of serialization tasks: each
* connection, each listener, the proactor itself. The serialization
* is presented to the application via each associated event batch.
*
- * Multiple threads can be trying to do work on a single context
- * (i.e. socket IO is ready and wakeup at same time). Mutexes are used
- * to manage contention. Some vars are only ever touched by one
- * "working" thread and are accessed without holding the mutex.
+ * A task will only ever run in a single thread at a time.
*
- * Currently internal wakeups (via wake()/wake_notify()) are used to
- * force a context to check if it has work to do. To minimize trips
- * through the kernel, wake() is a no-op if the context has a working
- * thread. Conversely, a thread must never stop working without
- * checking if it has newly arrived work.
+ * Other threads of excution (including user threads) can interact with a
+ * particular task (i.e. connection wake or proactor interrupt). Mutexes are
+ * needed here for shared access to task data. schedule()/notify_poller() are
+ * used to ensure a task will run to act on the changed data.
*
- * External wake operations, like pn_connection_wake() are built on top of
- * the internal wake mechanism.
+ * To minimize trips through the kernel, schedule() is a no-op if the task is
+ * already running or about to run. Conversely, a task must never stop working
+ * without checking state that may have been recently changed by another thread.
+ *
+ * External wake operations, like pn_connection_wake() or expired timers are
+ * built on top of this schedule() mechanism.
*
* pn_proactor_interrupt() must be async-signal-safe so it has a dedicated
* eventfd to allow a lock-free pn_proactor_interrupt() implementation.
*/
-// Fake thread for temporarily disabling the scheduling of a context.
-static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
+// Fake thread for temporarily disabling the scheduling of a task.
+static struct tslot_t *RESCHEDULE_PLACEHOLDER = (struct tslot_t*) -1;
-void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) {
- memset(ctx, 0, sizeof(*ctx));
- pmutex_init(&ctx->mutex);
- ctx->proactor = p;
- ctx->type = t;
+void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p) {
+ memset(tsk, 0, sizeof(*tsk));
+ pmutex_init(&tsk->mutex);
+ tsk->proactor = p;
+ tsk->type = t;
}
/*
- * Wake strategy with eventfd.
- * - wakees can be in the list only once
- * - wakers only use the eventfd if wakes_in_progress is false
- * There is a single rearm between wakes > 0 and wakes == 0
+ * schedule() strategy with eventfd:
+ * - tasks can be in the ready list only once
+ * - a scheduling thread will only activate the eventfd if ready_list_active is false
+ * There is a single rearm between ready list empty and non-empty
*
- * There can potentially be many contexts with wakes pending.
+ * There can potentially be many tasks with work pending.
*
- * The wake list is in two parts. The front is the chunk the
- * scheduler will process until the next epoll_wait(). sched_wake
- * indicates which chunk it is on. The ctx may already be running or
+ * The ready list is in two parts. The front is the chunk the
+ * poller will process until the next epoll_wait(). sched_ready
+ * indicates which chunk it is on. The task may already be running or
* scheduled to run.
*
- * The ctx must be actually running to absorb ctx->wake_pending.
+ * The task must be actually running to absorb task_t->ready.
*
- * The wake list can keep growing while popping wakes. The list between
- * sched_wake_first and sched_wake_last are protected by the sched
- * lock (for pop operations), sched_wake_last to wake_list_last are
+ * The ready list can keep growing while popping ready tasks. The list between
+ * sched_ready_first and sched_ready_last are protected by the sched
+ * lock (for pop operations), sched_ready_last to ready_list_last are
* protected by the eventfd mutex (for add operations). Both locks
* are needed to cross or reconcile the two portions of the list.
*/
// Call with sched lock held.
-static void pop_wake(pcontext_t *ctx) {
- // every context on the sched_wake_list is either currently running,
- // or to be scheduled. wake() will not "see" any of the wake_next
- // pointers until wake_pending and working have transitioned to 0
- // and false, when a context stops working.
+static void pop_ready_task(task_t *tsk) {
+ // every task on the sched_ready_list is either currently running,
+ // or to be scheduled. schedule() will not "see" any of the ready_next
+ // pointers until ready and working have transitioned to 0
+ // and false, when a task stops working.
//
- // every context must transition as:
+ // every task must transition as:
//
- // !wake_pending .. wake() .. on wake_list .. on sched_wake_list .. working context .. !sched_wake && !wake_pending
+ // !ready .. schedule() .. on ready_list .. on sched_ready_list .. working task .. !sched_ready && !ready
//
- // Intervening locks at each transition ensures wake_next has memory coherence throughout the wake cycle.
- pn_proactor_t *p = ctx->proactor;
- if (ctx == p->sched_wake_current)
- p->sched_wake_current = ctx->wake_next;
- if (ctx == p->sched_wake_first) {
+ // Intervening locks at each transition ensures ready_next has memory coherence throughout the ready task scheduling cycle.
+ pn_proactor_t *p = tsk->proactor;
+ if (tsk == p->sched_ready_current)
+ p->sched_ready_current = tsk->ready_next;
+ if (tsk == p->sched_ready_first) {
// normal code path
- if (ctx == p->sched_wake_last) {
- p->sched_wake_first = p->sched_wake_last = NULL;
+ if (tsk == p->sched_ready_last) {
+ p->sched_ready_first = p->sched_ready_last = NULL;
} else {
- p->sched_wake_first = ctx->wake_next;
+ p->sched_ready_first = tsk->ready_next;
}
- if (!p->sched_wake_first)
- p->sched_wake_last = NULL;
+ if (!p->sched_ready_first)
+ p->sched_ready_last = NULL;
} else {
- // ctx is not first in a multi-element list
- pcontext_t *prev = NULL;
- for (pcontext_t *i = p->sched_wake_first; i != ctx; i = i->wake_next)
+ // tsk is not first in a multi-element list
+ task_t *prev = NULL;
+ for (task_t *i = p->sched_ready_first; i != tsk; i = i->ready_next)
prev = i;
- prev->wake_next = ctx->wake_next;
- if (ctx == p->sched_wake_last)
- p->sched_wake_last = prev;
+ prev->ready_next = tsk->ready_next;
+ if (tsk == p->sched_ready_last)
+ p->sched_ready_last = prev;
}
- ctx->on_wake_list = false;
+ tsk->on_ready_list = false;
}
-// part1: call with ctx->owner lock held, return true if notify required by caller
-// Note that this will return false if either there is a pending wake OR if we are already
-// in the connection context that is to be woken (as we don't have to wake it up)
-bool wake(pcontext_t *ctx) {
+// part1: call with tsk->owner lock held, return true if notify_poller required by caller.
+// Nothing to do if the task is currently at work or work is already pending.
+bool schedule(task_t *tsk) {
bool notify = false;
- if (!ctx->wake_pending) {
- if (!ctx->working) {
- ctx->wake_pending = true;
- pn_proactor_t *p = ctx->proactor;
+ if (!tsk->ready) {
+ if (!tsk->working) {
+ tsk->ready = true;
+ pn_proactor_t *p = tsk->proactor;
lock(&p->eventfd_mutex);
- ctx->wake_next = NULL;
- ctx->on_wake_list = true;
- if (!p->wake_list_first) {
- p->wake_list_first = p->wake_list_last = ctx;
+ tsk->ready_next = NULL;
+ tsk->on_ready_list = true;
+ if (!p->ready_list_first) {
+ p->ready_list_first = p->ready_list_last = tsk;
} else {
- p->wake_list_last->wake_next = ctx;
- p->wake_list_last = ctx;
+ p->ready_list_last->ready_next = tsk;
+ p->ready_list_last = tsk;
}
- if (!p->wakes_in_progress) {
- // force a wakeup via the eventfd
- p->wakes_in_progress = true;
+ if (!p->ready_list_active) {
+ // unblock poller via the eventfd
+ p->ready_list_active = true;
notify = true;
}
unlock(&p->eventfd_mutex);
@@ -312,18 +313,18 @@
return notify;
}
-// part2: make OS call without lock held
-void wake_notify(pcontext_t *ctx) {
- pn_proactor_t *p = ctx->proactor;
+// part2: unblock epoll_wait(). Make OS call without lock held.
+void notify_poller(task_t *tsk) {
+ pn_proactor_t *p = tsk->proactor;
if (p->eventfd == -1)
return;
- rearm(p, &p->epoll_wake);
+ rearm(p, &p->epoll_schedule);
}
-// call with owner lock held, once for each pop from the wake list
-void wake_done(pcontext_t *ctx) {
-// assert(ctx->wake_pending > 0);
- ctx->wake_pending = false;
+// call with task lock held from xxx_process().
+void schedule_done(task_t *tsk) {
+// assert(tsk->ready > 0);
+ tsk->ready = false;
}
@@ -409,35 +410,35 @@
}
// Call with sched lock
-static void assign_thread(tslot_t *ts, pcontext_t *ctx) {
- assert(!ctx->runner);
- ctx->runner = ts;
- ctx->prev_runner = NULL;
- ctx->runnable = false;
- ts->context = ctx;
- ts->prev_context = NULL;
+static void assign_thread(tslot_t *ts, task_t *tsk) {
+ assert(!tsk->runner);
+ tsk->runner = ts;
+ tsk->prev_runner = NULL;
+ tsk->runnable = false;
+ ts->task = tsk;
+ ts->prev_task = NULL;
}
// call with sched lock
-static bool rewake(pcontext_t *ctx) {
- // Special case wake() where context is unassigned and a popped wake needs to be put back on the list.
- // Should be rare.
+static bool reschedule(task_t *tsk) {
+ // Special case schedule() where task is done/unassigned but sched_pending work has arrived.
+ // Should be an infrequent corner case.
bool notify = false;
- pn_proactor_t *p = ctx->proactor;
+ pn_proactor_t *p = tsk->proactor;
lock(&p->eventfd_mutex);
- assert(ctx->wake_pending);
- assert(!ctx->on_wake_list);
- ctx->wake_next = NULL;
- ctx->on_wake_list = true;
- if (!p->wake_list_first) {
- p->wake_list_first = p->wake_list_last = ctx;
+ assert(tsk->ready);
+ assert(!tsk->on_ready_list);
+ tsk->ready_next = NULL;
+ tsk->on_ready_list = true;
+ if (!p->ready_list_first) {
+ p->ready_list_first = p->ready_list_last = tsk;
} else {
- p->wake_list_last->wake_next = ctx;
- p->wake_list_last = ctx;
+ p->ready_list_last->ready_next = tsk;
+ p->ready_list_last = tsk;
}
- if (!p->wakes_in_progress) {
- // force a wakeup via the eventfd
- p->wakes_in_progress = true;
+ if (!p->ready_list_active) {
+ // unblock the poller via the eventfd
+ p->ready_list_active = true;
notify = true;
}
unlock(&p->eventfd_mutex);
@@ -446,37 +447,37 @@
// Call with sched lock
bool unassign_thread(tslot_t *ts, tslot_state new_state) {
- pcontext_t *ctx = ts->context;
+ task_t *tsk = ts->task;
bool notify = false;
bool deleting = (ts->state == DELETING);
- ts->context = NULL;
+ ts->task = NULL;
ts->state = new_state;
- if (ctx) {
- ctx->runner = NULL;
- ctx->prev_runner = ts;
+ if (tsk) {
+ tsk->runner = NULL;
+ tsk->prev_runner = ts;
}
- // Check if context has unseen events/wake that need processing.
+ // Check if unseen events or schedule() calls occurred while task was working.
- if (ctx && !deleting) {
- pn_proactor_t *p = ctx->proactor;
- ts->prev_context = ts->context;
- if (ctx->sched_pending) {
- // Need a new wake
- if (ctx->sched_wake) {
- if (!ctx->on_wake_list) {
+ if (tsk && !deleting) {
+ pn_proactor_t *p = tsk->proactor;
+ ts->prev_task = ts->task;
+ if (tsk->sched_pending) {
+ // Make sure the task is already scheduled or put it on the ready list
+ if (tsk->sched_ready) {
+ if (!tsk->on_ready_list) {
// Remember it for next poller
- ctx->sched_wake = false;
- notify = rewake(ctx); // back on wake list for poller to see
+ tsk->sched_ready = false;
+ notify = reschedule(tsk); // back on ready list for poller to see
}
// else already scheduled
} else {
- // bad corner case. Block ctx from being scheduled again until a later post_wake()
- ctx->runner = REWAKE_PLACEHOLDER;
+ // bad corner case. Block tsk from being scheduled again until a later post_ready()
+ tsk->runner = RESCHEDULE_PLACEHOLDER;
unlock(&p->sched_mutex);
- lock(&ctx->mutex);
- notify = wake(ctx);
- unlock(&ctx->mutex);
+ lock(&tsk->mutex);
+ notify = schedule(tsk);
+ unlock(&tsk->mutex);
lock(&p->sched_mutex);
}
}
@@ -485,50 +486,50 @@
}
// Call with sched lock
-static void earmark_thread(tslot_t *ts, pcontext_t *ctx) {
- assign_thread(ts, ctx);
+static void earmark_thread(tslot_t *ts, task_t *tsk) {
+ assign_thread(ts, tsk);
ts->earmarked = true;
- ctx->proactor->earmark_count++;
+ tsk->proactor->earmark_count++;
}
// Call with sched lock
static void remove_earmark(tslot_t *ts) {
- pcontext_t *ctx = ts->context;
- ts->context = NULL;
- ctx->runner = NULL;
+ task_t *tsk = ts->task;
+ ts->task = NULL;
+ tsk->runner = NULL;
ts->earmarked = false;
- ctx->proactor->earmark_count--;
+ tsk->proactor->earmark_count--;
}
// Call with sched lock
-static void make_runnable(pcontext_t *ctx) {
- pn_proactor_t *p = ctx->proactor;
+static void make_runnable(task_t *tsk) {
+ pn_proactor_t *p = tsk->proactor;
assert(p->n_runnables <= p->runnables_capacity);
- assert(!ctx->runnable);
- if (ctx->runner) return;
+ assert(!tsk->runnable);
+ if (tsk->runner) return;
- ctx->runnable = true;
+ tsk->runnable = true;
// Track it as normal or warm or earmarked
if (pni_warm_sched) {
- tslot_t *ts = ctx->prev_runner;
- if (ts && ts->prev_context == ctx) {
+ tslot_t *ts = tsk->prev_runner;
+ if (ts && ts->prev_task == tsk) {
if (ts->state == SUSPENDED || ts->state == PROCESSING) {
if (p->n_warm_runnables < p->thread_capacity) {
- p->warm_runnables[p->n_warm_runnables++] = ctx;
- assign_thread(ts, ctx);
+ p->warm_runnables[p->n_warm_runnables++] = tsk;
+ assign_thread(ts, tsk);
}
else
- p->runnables[p->n_runnables++] = ctx;
+ p->runnables[p->n_runnables++] = tsk;
return;
}
if (ts->state == UNUSED && !p->earmark_drain) {
- earmark_thread(ts, ctx);
+ earmark_thread(ts, tsk);
p->last_earmark = ts;
return;
}
}
}
- p->runnables[p->n_runnables++] = ctx;
+ p->runnables[p->n_runnables++] = tsk;
}
@@ -567,7 +568,7 @@
unlock(&driver_ptr_mutex);
}
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_wake, bool topup);
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup);
static void write_flush(pconnection_t *pc);
static void listener_begin_close(pn_listener_t* l);
static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block);
@@ -585,12 +586,12 @@
return ps->epoll_io.type == LISTENER_IO ? containerof(ps, acceptor_t, psocket) : NULL;
}
-static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
- return c->type == PCONNECTION ? containerof(c, pconnection_t, context) : NULL;
+static inline pconnection_t *task_pconnection(task_t *t) {
+ return t->type == PCONNECTION ? containerof(t, pconnection_t, task) : NULL;
}
-static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
- return c->type == LISTENER ? containerof(c, pn_listener_t, context) : NULL;
+static inline pn_listener_t *task_listener(task_t *t) {
+ return t->type == LISTENER ? containerof(t, pn_listener_t, task) : NULL;
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
@@ -671,10 +672,10 @@
return item;
}
-// Add an overflowing acceptor to the overflow list. Called with listener context lock held.
+// Add an overflowing acceptor to the overflow list. Called with listener task lock held.
static void acceptor_set_overflow(acceptor_t *a) {
a->overflowed = true;
- pn_proactor_t *p = a->listener->context.proactor;
+ pn_proactor_t *p = a->listener->task.proactor;
lock(&p->overflow_mutex);
acceptor_list_append(&p->overflow, a);
unlock(&p->overflow_mutex);
@@ -693,8 +694,8 @@
acceptor_t *a = acceptor_list_next(&ovflw);
while (a) {
pn_listener_t *l = a->listener;
- lock(&l->context.mutex);
- bool rearming = !l->context.closing;
+ lock(&l->task.mutex);
+ bool rearming = !l->task.closing;
bool notify = false;
assert(!a->armed);
assert(a->overflowed);
@@ -703,9 +704,9 @@
rearm(p, &a->psocket.epoll_io);
a->armed = true;
}
- else notify = wake(&l->context);
- unlock(&l->context.mutex);
- if (notify) wake_notify(&l->context);
+ else notify = schedule(&l->task);
+ unlock(&l->task.mutex);
+ if (notify) notify_poller(&l->task);
a = acceptor_list_next(&ovflw);
}
}
@@ -738,7 +739,7 @@
}
- pcontext_init(&pc->context, PCONNECTION, p);
+ task_init(&pc->task, PCONNECTION, p);
psocket_init(&pc->psocket, PCONNECTION_IO);
pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port);
pc->new_events = 0;
@@ -777,7 +778,7 @@
// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer.
// Return true when all possible outstanding epoll events associated with this pconnection have been processed.
static inline bool pconnection_is_final(pconnection_t *pc) {
- return !pc->current_arm && !pc->context.wake_pending && !pc->tick_pending;
+ return !pc->current_arm && !pc->task.ready && !pc->tick_pending;
}
static void pconnection_final_free(pconnection_t *pc) {
@@ -793,7 +794,7 @@
pmutex_finalize(&pc->rearm_mutex);
pn_condition_free(pc->disconnect_condition);
pn_connection_driver_destroy(&pc->driver);
- pcontext_finalize(&pc->context);
+ task_finalize(&pc->task);
pni_timer_free(pc->timer);
free(pc);
}
@@ -803,13 +804,13 @@
static void pconnection_cleanup(pconnection_t *pc) {
assert(pconnection_is_final(pc));
int fd = pc->psocket.epoll_io.fd;
- stop_polling(&pc->psocket.epoll_io, pc->context.proactor->epollfd);
+ stop_polling(&pc->psocket.epoll_io, pc->task.proactor->epollfd);
if (fd != -1)
- pclosefd(pc->context.proactor, fd);
+ pclosefd(pc->task.proactor, fd);
- lock(&pc->context.mutex);
- bool can_free = proactor_remove(&pc->context);
- unlock(&pc->context.mutex);
+ lock(&pc->task.mutex);
+ bool can_free = proactor_remove(&pc->task);
+ unlock(&pc->task.mutex);
if (can_free)
pconnection_final_free(pc);
// else proactor_disconnect logic owns psocket and its final free
@@ -832,8 +833,8 @@
// Call with lock held or from forced_shutdown
static void pconnection_begin_close(pconnection_t *pc) {
- if (!pc->context.closing) {
- pc->context.closing = true;
+ if (!pc->task.closing) {
+ pc->task.closing = true;
pc->tick_pending = false;
if (pc->current_arm) {
// Force EPOLLHUP callback(s)
@@ -849,7 +850,7 @@
pc->new_events = 0;
pconnection_begin_close(pc);
// pconnection_process will never be called again. Zero everything.
- pc->context.wake_pending = 0;
+ pc->task.ready = 0;
pn_collector_release(pc->driver.collector);
assert(pconnection_is_final(pc));
pconnection_cleanup(pc);
@@ -859,18 +860,18 @@
void pni_pconnection_timeout(pconnection_t *pc) {
bool notify = false;
uint64_t now = pn_proactor_now_64();
- lock(&pc->context.mutex);
- if (!pc->context.closing) {
+ lock(&pc->task.mutex);
+ if (!pc->task.closing) {
// confirm no simultaneous timeout change from another thread.
if (pc->expected_timeout && now >= pc->expected_timeout) {
pc->tick_pending = true;
pc->expected_timeout = 0;
- notify = wake(&pc->context);
+ notify = schedule(&pc->task);
}
}
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
if (notify)
- wake_notify(&pc->context);
+ notify_poller(&pc->task);
}
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
@@ -878,7 +879,7 @@
if (!pc->driver.connection) return NULL;
pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
if (!e) {
- pn_proactor_t *p = pc->context.proactor;
+ pn_proactor_t *p = pc->task.proactor;
bool idle_threads;
lock(&p->sched_mutex);
idle_threads = (p->suspend_list_head != NULL);
@@ -913,7 +914,7 @@
return pn_connection_driver_write_closed(&pc->driver);
}
-/* Call only from working context (no competitor for pc->current_arm or
+/* Call only from working task (no competitor for pc->current_arm or
connection driver). If true returned, caller must do
pconnection_rearm().
@@ -944,13 +945,13 @@
static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) {
lock(&pc->rearm_mutex);
pc->current_arm = pc->psocket.epoll_io.wanted = wanted_now;
- rearm(pc->context.proactor, &pc->psocket.epoll_io);
+ rearm(pc->task.proactor, &pc->psocket.epoll_io);
unlock(&pc->rearm_mutex);
// Return immediately. pc may have just been freed by another thread.
}
/* Only call when context switch is imminent. Sched lock is highly contested. */
-// Call with both context and sched locks.
+// Call with both task and sched locks.
static bool pconnection_sched_sync(pconnection_t *pc) {
uint32_t sync_events = 0;
uint32_t sync_args = pc->tick_pending << 1;
@@ -960,12 +961,12 @@
pc->current_arm = 0; // or outside lock?
sync_events = pc->new_events;
}
- if (pc->context.sched_wake) {
- pc->context.sched_wake = false;
- wake_done(&pc->context);
+ if (pc->task.sched_ready) {
+ pc->task.sched_ready = false;
+ schedule_done(&pc->task);
sync_args |= 1;
}
- pc->context.sched_pending = false;
+ pc->task.sched_pending = false;
if (sync_args || sync_events) {
// Only replace if poller has found new work for us.
@@ -974,11 +975,11 @@
}
// Indicate if there are free proactor threads
- pn_proactor_t *p = pc->context.proactor;
+ pn_proactor_t *p = pc->task.proactor;
return p->poller_suspended || p->suspend_list_head;
}
-/* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */
+/* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */
static inline bool pconnection_work_pending(pconnection_t *pc) {
if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
return true;
@@ -989,14 +990,15 @@
/* Call with no locks. */
static void pconnection_done(pconnection_t *pc) {
- pn_proactor_t *p = pc->context.proactor;
- tslot_t *ts = pc->context.runner;
+ pn_proactor_t *p = pc->task.proactor;
+ tslot_t *ts = pc->task.runner;
write_flush(pc);
bool notify = false;
- bool self_wake = false;
- lock(&pc->context.mutex);
- pc->context.working = false; // So we can wake() ourself if necessary. We remain the de facto
- // working context while the lock is held. Need sched_sync too to drain possible stale wake.
+ bool self_sched = false;
+ lock(&pc->task.mutex);
+ pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto
+ // working task instance while the lock is held. Need sched_sync too to drain
+ // a possible stale sched_ready.
pc->hog_count = 0;
bool has_event = pconnection_has_event(pc);
// Do as little as possible while holding the sched lock
@@ -1005,33 +1007,33 @@
unlock(&p->sched_mutex);
if (has_event || pconnection_work_pending(pc)) {
- self_wake = true;
+ self_sched = true;
} else if (pn_connection_driver_finished(&pc->driver)) {
pconnection_begin_close(pc);
if (pconnection_is_final(pc)) {
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
pconnection_cleanup(pc);
// pc may be undefined now
lock(&p->sched_mutex);
notify = unassign_thread(ts, UNUSED);
unlock(&p->sched_mutex);
if (notify)
- wake_notify(&p->context);
+ notify_poller(&p->task);
return;
}
}
- if (self_wake)
- notify = wake(&pc->context);
+ if (self_sched)
+ notify = schedule(&pc->task);
int wanted = pconnection_rearm_check(pc);
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return without touching pc again.
lock(&p->sched_mutex);
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
- if (notify) wake_notify(&p->context);
+ if (notify) notify_poller(&p->task);
return;
}
@@ -1104,33 +1106,33 @@
static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_wake, bool topup) {
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) {
bool waking = false;
bool tick_required = false;
bool immediate_write = false;
- lock(&pc->context.mutex);
+ lock(&pc->task.mutex);
if (!topup) { // Save some state in case of crash investigation.
pc->process_events = events;
- pc->process_args = (pc->tick_pending << 1) | sched_wake;
+ pc->process_args = (pc->tick_pending << 1) | sched_ready;
}
if (events) {
pc->new_events = events;
pc->current_arm = 0;
events = 0;
}
- if (sched_wake) wake_done(&pc->context);
+ if (sched_ready) schedule_done(&pc->task);
if (topup) {
// Only called by the batch owner. Does not loop, just "tops up"
// once. May be back depending on hog_count.
- assert(pc->context.working);
+ assert(pc->task.working);
}
else {
- if (pc->context.working) {
- // Another thread is the working context. Should be impossible with new scheduler.
+ if (pc->task.working) {
+ // Another thread is the working task. Should be impossible with new scheduler.
EPOLL_FATAL("internal epoll proactor error: two worker threads", 0);
}
- pc->context.working = true;
+ pc->task.working = true;
}
// Confirmed as working thread. Review state and unlock ASAP.
@@ -1139,7 +1141,7 @@
if (pc->queued_disconnect) { // From pn_proactor_disconnect()
pc->queued_disconnect = false;
- if (!pc->context.closing) {
+ if (!pc->task.closing) {
if (pc->disconnect_condition) {
pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
}
@@ -1148,7 +1150,7 @@
}
if (pconnection_has_event(pc)) {
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
return &pc->batch;
}
bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc);
@@ -1165,7 +1167,7 @@
uint32_t update_events = pc->new_events;
pc->current_arm = 0;
pc->new_events = 0;
- if (!pc->context.closing) {
+ if (!pc->task.closing) {
if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
pconnection_maybe_connect_lh(pc);
else
@@ -1180,14 +1182,14 @@
}
}
- if (pc->context.closing && pconnection_is_final(pc)) {
- unlock(&pc->context.mutex);
+ if (pc->task.closing && pconnection_is_final(pc)) {
+ unlock(&pc->task.mutex);
pconnection_cleanup(pc);
return NULL;
}
- unlock(&pc->context.mutex);
- pc->hog_count++; // working context doing work
+ unlock(&pc->task.mutex);
+ pc->hog_count++; // working task doing work
if (waking) {
pn_connection_t *c = pc->driver.connection;
@@ -1245,45 +1247,45 @@
write_flush(pc);
- lock(&pc->context.mutex);
- if (pc->context.closing && pconnection_is_final(pc)) {
- unlock(&pc->context.mutex);
+ lock(&pc->task.mutex);
+ if (pc->task.closing && pconnection_is_final(pc)) {
+ unlock(&pc->task.mutex);
pconnection_cleanup(pc);
return NULL;
}
// Never stop working while work remains. hog_count exception to this rule is elsewhere.
- lock(&pc->context.proactor->sched_mutex);
+ lock(&pc->task.proactor->sched_mutex);
bool workers_free = pconnection_sched_sync(pc);
- unlock(&pc->context.proactor->sched_mutex);
+ unlock(&pc->task.proactor->sched_mutex);
if (pconnection_work_pending(pc)) {
goto retry; // TODO: get rid of goto without adding more locking
}
- pc->context.working = false;
+ pc->task.working = false;
pc->hog_count = 0;
if (pn_connection_driver_finished(&pc->driver)) {
pconnection_begin_close(pc);
if (pconnection_is_final(pc)) {
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
pconnection_cleanup(pc);
return NULL;
}
}
- if (workers_free && !pc->context.closing && !pc->io_doublecheck) {
+ if (workers_free && !pc->task.closing && !pc->io_doublecheck) {
// check one last time for new io before context switch
pc->io_doublecheck = true;
pc->read_blocked = false;
pc->write_blocked = false;
- pc->context.working = true;
+ pc->task.working = true;
goto retry;
}
int wanted = pconnection_rearm_check(pc); // holds rearm_mutex until pconnection_rearm() below
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return right away.
return NULL;
}
@@ -1297,7 +1299,7 @@
(void)setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay));
}
-/* Called with context.lock held */
+/* Called with task.lock held */
void pconnection_connected_lh(pconnection_t *pc) {
if (!pc->connected) {
pc->connected = true;
@@ -1313,7 +1315,7 @@
/* multi-address connections may call pconnection_start multiple times with diffferent FDs */
static void pconnection_start(pconnection_t *pc, int fd) {
- int efd = pc->context.proactor->epollfd;
+ int efd = pc->task.proactor->epollfd;
/* Get the local socket name now, get the peer name in pconnection_connected */
socklen_t len = sizeof(pc->local.ss);
(void)getsockname(fd, (struct sockaddr*)&pc->local.ss, &len);
@@ -1322,7 +1324,7 @@
if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */
int fd = ee->fd; /* Save fd, it will be set to -1 by stop_polling */
stop_polling(ee, efd);
- pclosefd(pc->context.proactor, fd);
+ pclosefd(pc->task.proactor, fd);
}
ee->fd = fd;
pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
@@ -1369,14 +1371,14 @@
}
static inline bool is_inactive(pn_proactor_t *p) {
- return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
+ return (!p->tasks && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
}
-/* If inactive set need_inactive and return true if the proactor needs a wakeup */
-bool wake_if_inactive(pn_proactor_t *p) {
+/* If inactive set need_inactive and return true if poller needs to be unblocked */
+bool schedule_if_inactive(pn_proactor_t *p) {
if (is_inactive(p)) {
p->need_inactive = true;
- return wake(&p->context);
+ return schedule(&p->task);
}
return false;
}
@@ -1392,34 +1394,34 @@
}
// TODO: check case of proactor shutting down
- lock(&pc->context.mutex);
- proactor_add(&pc->context);
+ lock(&pc->task.mutex);
+ proactor_add(&pc->task);
pn_connection_open(pc->driver.connection); /* Auto-open */
bool notify = false;
bool notify_proactor = false;
if (pc->disconnected) {
- notify = wake(&pc->context); /* Error during initialization */
+ notify = schedule(&pc->task); /* Error during initialization */
} else {
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
if (!gai_error) {
pn_connection_open(pc->driver.connection); /* Auto-open */
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
- if (pc->disconnected) notify = wake(&pc->context);
+ if (pc->disconnected) notify = schedule(&pc->task);
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
- notify = wake(&pc->context);
- lock(&p->context.mutex);
- notify_proactor = wake_if_inactive(p);
- unlock(&p->context.mutex);
+ notify = schedule(&pc->task);
+ lock(&p->task.mutex);
+ notify_proactor = schedule_if_inactive(p);
+ unlock(&p->task.mutex);
}
}
/* We need to issue INACTIVE on immediate failure */
- unlock(&pc->context.mutex);
- if (notify) wake_notify(&pc->context);
- if (notify_proactor) wake_notify(&p->context);
+ unlock(&pc->task.mutex);
+ if (notify) notify_poller(&pc->task);
+ if (notify_proactor) notify_poller(&p->task);
}
static void pconnection_tick(pconnection_t *pc) {
@@ -1428,9 +1430,9 @@
uint64_t now = pn_proactor_now_64();
uint64_t next = pn_transport_tick(t, now);
if (next) {
- lock(&pc->context.mutex);
+ lock(&pc->task.mutex);
pc->expected_timeout = next;
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
pni_timer_set(pc->timer, next);
}
}
@@ -1440,14 +1442,14 @@
bool notify = false;
pconnection_t *pc = get_pconnection(c);
if (pc) {
- lock(&pc->context.mutex);
- if (!pc->context.closing) {
+ lock(&pc->task.mutex);
+ if (!pc->task.closing) {
pc->wake_count++;
- notify = wake(&pc->context);
+ notify = schedule(&pc->task);
}
- unlock(&pc->context.mutex);
+ unlock(&pc->task.mutex);
}
- if (notify) wake_notify(&pc->context);
+ if (notify) notify_poller(&pc->task);
}
void pn_proactor_release_connection(pn_connection_t *c) {
@@ -1455,13 +1457,13 @@
pconnection_t *pc = get_pconnection(c);
if (pc) {
set_pconnection(c, NULL);
- lock(&pc->context.mutex);
+ lock(&pc->task.mutex);
pn_connection_driver_release_connection(&pc->driver);
pconnection_begin_close(pc);
- notify = wake(&pc->context);
- unlock(&pc->context.mutex);
+ notify = schedule(&pc->task);
+ unlock(&pc->task.mutex);
}
- if (notify) wake_notify(&pc->context);
+ if (notify) notify_poller(&pc->task);
}
// ========================================================================
@@ -1484,7 +1486,7 @@
return NULL;
}
pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen
- pcontext_init(&l->context, LISTENER, unknown);
+ task_init(&l->task, LISTENER, unknown);
}
return l;
}
@@ -1492,8 +1494,8 @@
void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
{
// TODO: check listener not already listening for this or another proactor
- lock(&l->context.mutex);
- l->context.proactor = p;
+ lock(&l->task.mutex);
+ l->task.proactor = p;
l->pending_accepteds = (accepted_t*)calloc(backlog, sizeof(accepted_t));
assert(l->pending_accepteds);
@@ -1544,7 +1546,7 @@
ps->epoll_io.fd = fd;
ps->epoll_io.wanted = EPOLLIN;
ps->epoll_io.polling = false;
- start_polling(&ps->epoll_io, l->context.proactor->epollfd); // TODO: check for error
+ start_polling(&ps->epoll_io, l->task.proactor->epollfd); // TODO: check for error
l->active_count++;
acceptor->armed = true;
} else {
@@ -1556,7 +1558,7 @@
if (addrinfo) {
freeaddrinfo(addrinfo);
}
- bool notify = wake(&l->context);
+ bool notify = schedule(&l->task);
if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */
l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
@@ -1572,19 +1574,19 @@
} else {
pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_OPEN);
}
- proactor_add(&l->context);
- unlock(&l->context.mutex);
- if (notify) wake_notify(&l->context);
+ proactor_add(&l->task);
+ unlock(&l->task.mutex);
+ if (notify) notify_poller(&l->task);
return;
}
-// call with lock held and context.working false
+// call with lock held and task.working false
static inline bool listener_can_free(pn_listener_t *l) {
- return l->context.closing && l->close_dispatched && !l->context.wake_pending && !l->active_count;
+ return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count;
}
static inline void listener_final_free(pn_listener_t *l) {
- pcontext_finalize(&l->context);
+ task_finalize(&l->task);
free(l->acceptors);
free(l->pending_accepteds);
free(l);
@@ -1599,11 +1601,11 @@
if (l->collector) pn_collector_free(l->collector);
if (l->condition) pn_condition_free(l->condition);
if (l->attachments) pn_free(l->attachments);
- lock(&l->context.mutex);
- if (l->context.proactor) {
- can_free = proactor_remove(&l->context);
+ lock(&l->task.mutex);
+ if (l->task.proactor) {
+ can_free = proactor_remove(&l->task);
}
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
if (can_free)
listener_final_free(l);
}
@@ -1611,8 +1613,8 @@
/* Always call with lock held so it can be unlocked around overflow processing. */
static void listener_begin_close(pn_listener_t* l) {
- if (!l->context.closing) {
- l->context.closing = true;
+ if (!l->task.closing) {
+ l->task.closing = true;
/* Close all listening sockets */
for (size_t i = 0; i < l->acceptors_size; ++i) {
@@ -1623,7 +1625,7 @@
shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback
} else {
int fd = ps->epoll_io.fd;
- stop_polling(&ps->epoll_io, l->context.proactor->epollfd);
+ stop_polling(&ps->epoll_io, l->task.proactor->epollfd);
close(fd);
l->active_count--;
}
@@ -1638,39 +1640,39 @@
}
assert(!l->pending_count);
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
/* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/
proactor_rearm_overflow(pn_listener_proactor(l));
- lock(&l->context.mutex);
+ lock(&l->task.mutex);
pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE);
}
}
void pn_listener_close(pn_listener_t* l) {
bool notify = false;
- lock(&l->context.mutex);
- if (!l->context.closing) {
+ lock(&l->task.mutex);
+ if (!l->task.closing) {
listener_begin_close(l);
- notify = wake(&l->context);
+ notify = schedule(&l->task);
}
- unlock(&l->context.mutex);
- if (notify) wake_notify(&l->context);
+ unlock(&l->task.mutex);
+ if (notify) notify_poller(&l->task);
}
static void listener_forced_shutdown(pn_listener_t *l) {
// Called by proactor_free, no competing threads, no epoll activity.
- lock(&l->context.mutex); // needed because of interaction with proactor_rearm_overflow
+ lock(&l->task.mutex); // needed because of interaction with proactor_rearm_overflow
listener_begin_close(l);
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
// pconnection_process will never be called again. Zero everything.
- l->context.wake_pending = 0;
+ l->task.ready = 0;
l->close_dispatched = true;
l->active_count = 0;
assert(listener_can_free(l));
pn_listener_free(l);
}
-/* Accept a connection as part of listener_process(). Called with listener context lock held. */
+/* Accept a connection as part of listener_process(). Called with listener task lock held. */
/* Keep on accepting until we fill the backlog, would block or get an error */
static void listener_accept_lh(psocket_t *ps) {
pn_listener_t *l = psocket_listener(ps);
@@ -1693,22 +1695,22 @@
}
/* Process a listening socket */
-static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool wake) {
+static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool tsk_ready) {
// TODO: some parallelization of the accept mechanism.
// pn_listener_t *l = psocket_listener(ps);
// acceptor_t *a = psocket_acceptor(ps);
- lock(&l->context.mutex);
+ lock(&l->task.mutex);
if (n_events) {
for (size_t i = 0; i < l->acceptors_size; i++) {
psocket_t *ps = &l->acceptors[i].psocket;
if (ps->working_io_events) {
uint32_t events = ps->working_io_events;
ps->working_io_events = 0;
- if (l->context.closing) {
+ if (l->task.closing) {
l->acceptors[i].armed = false;
int fd = ps->epoll_io.fd;
- stop_polling(&ps->epoll_io, l->context.proactor->epollfd);
+ stop_polling(&ps->epoll_io, l->task.proactor->epollfd);
close(fd);
l->active_count--;
} else {
@@ -1716,37 +1718,37 @@
if (events & EPOLLRDHUP) {
/* Calls listener_begin_close which closes all the listener's sockets */
psocket_error(ps, errno, "listener epoll");
- } else if (!l->context.closing && events & EPOLLIN) {
+ } else if (!l->task.closing && events & EPOLLIN) {
listener_accept_lh(ps);
}
}
}
}
}
- if (wake) {
- wake_done(&l->context); // callback accounting
+ if (tsk_ready) {
+ schedule_done(&l->task); // callback accounting
}
pn_event_batch_t *lb = NULL;
- if (!l->context.working) {
- l->context.working = true;
+ if (!l->task.working) {
+ l->task.working = true;
if (listener_has_event(l))
lb = &l->batch;
else {
- l->context.working = false;
+ l->task.working = false;
if (listener_can_free(l)) {
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
pn_listener_free(l);
return NULL;
}
}
}
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
return lb;
}
static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
pn_listener_t *l = batch_listener(batch);
- lock(&l->context.mutex);
+ lock(&l->task.mutex);
pn_event_t *e = pn_collector_next(l->collector);
if (!e && l->pending_count) {
// empty collector means pn_collector_put() will not coalesce
@@ -1755,21 +1757,21 @@
}
if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
l->close_dispatched = true;
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
return pni_log_event(l, e);
}
static void listener_done(pn_listener_t *l) {
- pn_proactor_t *p = l->context.proactor;
- tslot_t *ts = l->context.runner;
- lock(&l->context.mutex);
+ pn_proactor_t *p = l->task.proactor;
+ tslot_t *ts = l->task.runner;
+ lock(&l->task.mutex);
// Just in case the app didn't accept all the pending accepts
// Shuffle the list back to start at 0
memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t));
l->pending_first = 0;
- if (!l->context.closing) {
+ if (!l->task.closing) {
for (size_t i = 0; i < l->acceptors_size; i++) {
acceptor_t *a = &l->acceptors[i];
psocket_t *ps = &a->psocket;
@@ -1777,7 +1779,7 @@
// Rearm acceptor when appropriate
if (ps->epoll_io.polling && l->pending_count==0 && !a->overflowed) {
if (!a->armed) {
- rearm(l->context.proactor, &ps->epoll_io);
+ rearm(l->task.proactor, &ps->epoll_io);
a->armed = true;
}
}
@@ -1785,7 +1787,7 @@
}
bool notify = false;
- l->context.working = false;
+ l->task.working = false;
lock(&p->sched_mutex);
int n_events = 0;
@@ -1799,34 +1801,34 @@
n_events++;
}
- if (l->context.sched_wake) {
- l->context.sched_wake = false;
- wake_done(&l->context);
+ if (l->task.sched_ready) {
+ l->task.sched_ready = false;
+ schedule_done(&l->task);
}
unlock(&p->sched_mutex);
if (!n_events && listener_can_free(l)) {
- unlock(&l->context.mutex);
+ unlock(&l->task.mutex);
pn_listener_free(l);
lock(&p->sched_mutex);
notify = unassign_thread(ts, UNUSED);
unlock(&p->sched_mutex);
if (notify)
- wake_notify(&p->context);
+ notify_poller(&p->task);
return;
} else if (n_events || listener_has_event(l))
- notify = wake(&l->context);
- unlock(&l->context.mutex);
+ notify = schedule(&l->task);
+ unlock(&l->task.mutex);
lock(&p->sched_mutex);
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
- if (notify) wake_notify(&l->context);
+ if (notify) notify_poller(&l->task);
}
pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
- return l ? l->context.proactor : NULL;
+ return l ? l->task.proactor : NULL;
}
pn_condition_t* pn_listener_condition(pn_listener_t* l) {
@@ -1858,8 +1860,8 @@
int err2 = 0;
int fd = -1;
bool notify = false;
- lock(&l->context.mutex);
- if (l->context.closing)
+ lock(&l->task.mutex);
+ if (l->task.closing)
err2 = EBADF;
else {
accepted_t *a = listener_accepted_next(l);
@@ -1870,8 +1872,8 @@
else err2 = EWOULDBLOCK;
}
- proactor_add(&pc->context);
- lock(&pc->context.mutex);
+ proactor_add(&pc->task);
+ lock(&pc->task.mutex);
if (fd >= 0) {
configure_socket(fd);
pconnection_start(pc, fd);
@@ -1879,11 +1881,11 @@
}
else
psocket_error(&pc->psocket, err2, "pn_listener_accept");
- if (!l->context.working && listener_has_event(l))
- notify = wake(&l->context);
- unlock(&pc->context.mutex);
- unlock(&l->context.mutex);
- if (notify) wake_notify(&l->context);
+ if (!l->task.working && listener_has_event(l))
+ notify = schedule(&l->task);
+ unlock(&pc->task.mutex);
+ unlock(&l->task.mutex);
+ if (notify) notify_poller(&l->task);
}
@@ -1899,7 +1901,7 @@
p->thread_capacity += 8;
} while (p->thread_count > p->thread_capacity);
- p->warm_runnables = (pcontext_t **) realloc(p->warm_runnables, p->thread_capacity * sizeof(pcontext_t *));
+ p->warm_runnables = (task_t **) realloc(p->warm_runnables, p->thread_capacity * sizeof(task_t *));
p->resume_list = (tslot_t **) realloc(p->resume_list, p->thread_capacity * sizeof(tslot_t *));
int old_cap = p->runnables_capacity;
@@ -1908,7 +1910,7 @@
else if (p->runnables_capacity < p->thread_capacity)
p->runnables_capacity = p->thread_capacity;
if (p->runnables_capacity != old_cap) {
- p->runnables = (pcontext_t **) realloc(p->runnables, p->runnables_capacity * sizeof(pcontext_t *));
+ p->runnables = (task_t **) realloc(p->runnables, p->runnables_capacity * sizeof(task_t *));
p->kevents_capacity = p->runnables_capacity;
size_t sz = p->kevents_capacity * sizeof(struct epoll_event);
p->kevents = (struct epoll_event *) realloc(p->kevents, sz);
@@ -1916,10 +1918,10 @@
}
}
-/* Set up an epoll_extended_t to be used for wakeup or interrupts */
- static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd, bool always_set) {
+/* Set up an epoll_extended_t to be used for ready list schedule() or interrupts */
+ static void epoll_eventfd_init(epoll_extended_t *ee, int eventfd, int epollfd, bool always_set) {
ee->fd = eventfd;
- ee->type = WAKE;
+ ee->type = EVENT_FD;
if (always_set) {
uint64_t increment = 1;
if (write(eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t))
@@ -1942,7 +1944,7 @@
pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
if (!p) return NULL;
p->epollfd = p->eventfd = -1;
- pcontext_init(&p->context, PROACTOR, p);
+ task_init(&p->task, PROACTOR, p);
pmutex_init(&p->eventfd_mutex);
pmutex_init(&p->sched_mutex);
pmutex_init(&p->tslot_mutex);
@@ -1954,8 +1956,8 @@
if ((p->collector = pn_collector()) != NULL) {
p->batch.next_event = &proactor_batch_next;
start_polling(&p->timer_manager.epoll_timer, p->epollfd); // TODO: check for error
- epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd, true);
- epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false);
+ epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd, true);
+ epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false);
p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
grow_poller_bufs(p);
return p;
@@ -1985,15 +1987,15 @@
p->eventfd = -1;
close(p->interruptfd);
p->interruptfd = -1;
- while (p->contexts) {
- pcontext_t *ctx = p->contexts;
- p->contexts = ctx->next;
- switch (ctx->type) {
+ while (p->tasks) {
+ task_t *tsk = p->tasks;
+ p->tasks = tsk->next;
+ switch (tsk->type) {
case PCONNECTION:
- pconnection_forced_shutdown(pcontext_pconnection(ctx));
+ pconnection_forced_shutdown(task_pconnection(tsk));
break;
case LISTENER:
- listener_forced_shutdown(pcontext_listener(ctx));
+ listener_forced_shutdown(task_listener(tsk));
break;
default:
break;
@@ -2005,7 +2007,7 @@
pmutex_finalize(&p->tslot_mutex);
pmutex_finalize(&p->sched_mutex);
pmutex_finalize(&p->eventfd_mutex);
- pcontext_finalize(&p->context);
+ task_finalize(&p->task);
for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) {
tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry);
pmutex_finalize(&ts->mutex);
@@ -2022,7 +2024,7 @@
pn_proactor_t *pn_event_proactor(pn_event_t *e) {
if (pn_event_class(e) == PN_CLASSCLASS(pn_proactor)) return (pn_proactor_t*)pn_event_context(e);
pn_listener_t *l = pn_event_listener(e);
- if (l) return l->context.proactor;
+ if (l) return l->task.proactor;
pn_connection_t *c = pn_event_connection(e);
if (c) return pn_connection_proactor(c);
return NULL;
@@ -2060,7 +2062,7 @@
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
pn_proactor_t *p = batch_proactor(batch);
- lock(&p->context.mutex);
+ lock(&p->task.mutex);
proactor_update_batch(p);
pn_event_t *e = pn_collector_next(p->collector);
if (e) {
@@ -2068,91 +2070,91 @@
if (p->current_event_type == PN_PROACTOR_TIMEOUT)
p->timeout_processed = true;
}
- unlock(&p->context.mutex);
+ unlock(&p->task.mutex);
return pni_log_event(p, e);
}
-static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool interrupt, bool wake) {
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool interrupt, bool tsk_ready) {
if (interrupt) {
(void)read_uint64(p->interruptfd);
rearm(p, &p->epoll_interrupt);
}
- lock(&p->context.mutex);
+ lock(&p->task.mutex);
if (interrupt) {
p->need_interrupt = true;
}
- if (wake) {
- wake_done(&p->context);
+ if (tsk_ready) {
+ schedule_done(&p->task);
}
- if (!p->context.working) { /* Can generate proactor events */
+ if (!p->task.working) { /* Can generate proactor events */
if (proactor_update_batch(p)) {
- p->context.working = true;
- unlock(&p->context.mutex);
+ p->task.working = true;
+ unlock(&p->task.mutex);
return &p->batch;
}
}
- unlock(&p->context.mutex);
+ unlock(&p->task.mutex);
return NULL;
}
-void proactor_add(pcontext_t *ctx) {
- pn_proactor_t *p = ctx->proactor;
- lock(&p->context.mutex);
- if (p->contexts) {
- p->contexts->prev = ctx;
- ctx->next = p->contexts;
+void proactor_add(task_t *tsk) {
+ pn_proactor_t *p = tsk->proactor;
+ lock(&p->task.mutex);
+ if (p->tasks) {
+ p->tasks->prev = tsk;
+ tsk->next = p->tasks;
}
- p->contexts = ctx;
- p->context_count++;
- unlock(&p->context.mutex);
+ p->tasks = tsk;
+ p->task_count++;
+ unlock(&p->task.mutex);
}
// call with psocket's mutex held
// return true if safe for caller to free psocket
-bool proactor_remove(pcontext_t *ctx) {
- pn_proactor_t *p = ctx->proactor;
- // Disassociate this context from scheduler
+bool proactor_remove(task_t *tsk) {
+ pn_proactor_t *p = tsk->proactor;
+ // Disassociate this task from scheduler
if (!p->shutting_down) {
lock(&p->sched_mutex);
- ctx->runner->state = DELETING;
+ tsk->runner->state = DELETING;
for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) {
tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry);
- if (ts->context == ctx)
- ts->context = NULL;
- if (ts->prev_context == ctx)
- ts->prev_context = NULL;
+ if (ts->task == tsk)
+ ts->task = NULL;
+ if (ts->prev_task == tsk)
+ ts->prev_task = NULL;
}
unlock(&p->sched_mutex);
}
- lock(&p->context.mutex);
+ lock(&p->task.mutex);
bool can_free = true;
- if (ctx->disconnecting) {
- // No longer on contexts list
+ if (tsk->disconnecting) {
+ // No longer on tasks list
--p->disconnects_pending;
- if (--ctx->disconnect_ops != 0) {
+ if (--tsk->disconnect_ops != 0) {
// procator_disconnect() does the free
can_free = false;
}
}
else {
// normal case
- if (ctx->prev)
- ctx->prev->next = ctx->next;
+ if (tsk->prev)
+ tsk->prev->next = tsk->next;
else {
- p->contexts = ctx->next;
- ctx->next = NULL;
- if (p->contexts)
- p->contexts->prev = NULL;
+ p->tasks = tsk->next;
+ tsk->next = NULL;
+ if (p->tasks)
+ p->tasks->prev = NULL;
}
- if (ctx->next) {
- ctx->next->prev = ctx->prev;
+ if (tsk->next) {
+ tsk->next->prev = tsk->prev;
}
- p->context_count--;
+ p->task_count--;
}
- bool notify = wake_if_inactive(p);
- unlock(&p->context.mutex);
- if (notify) wake_notify(&p->context);
+ bool notify = schedule_if_inactive(p);
+ unlock(&p->task.mutex);
+ if (notify) notify_poller(&p->task);
return can_free;
}
@@ -2189,34 +2191,34 @@
}
// Called with sched lock, returns with sched lock still held.
-static pn_event_batch_t *process(pcontext_t *ctx) {
- bool ctx_wake = false;
- ctx->sched_pending = false;
- if (ctx->sched_wake) {
- // update the wake status before releasing the sched_mutex
- ctx->sched_wake = false;
- ctx_wake = true;
+static pn_event_batch_t *process(task_t *tsk) {
+ bool tsk_ready = false;
+ tsk->sched_pending = false;
+ if (tsk->sched_ready) {
+ // update the ready status before releasing the sched_mutex
+ tsk->sched_ready = false;
+ tsk_ready = true;
}
- pn_proactor_t *p = ctx->proactor;
+ pn_proactor_t *p = tsk->proactor;
pn_event_batch_t* batch = NULL;
- switch (ctx->type) {
+ switch (tsk->type) {
case PROACTOR: {
bool intr = p->sched_interrupt;
if (intr) p->sched_interrupt = false;
unlock(&p->sched_mutex);
- batch = proactor_process(p, intr, ctx_wake);
+ batch = proactor_process(p, intr, tsk_ready);
break;
}
case PCONNECTION: {
- pconnection_t *pc = pcontext_pconnection(ctx);
+ pconnection_t *pc = task_pconnection(tsk);
uint32_t events = pc->psocket.sched_io_events;
if (events) pc->psocket.sched_io_events = 0;
unlock(&p->sched_mutex);
- batch = pconnection_process(pc, events, ctx_wake, false);
+ batch = pconnection_process(pc, events, tsk_ready, false);
break;
}
case LISTENER: {
- pn_listener_t *l = pcontext_listener(ctx);
+ pn_listener_t *l = task_listener(tsk);
int n_events = 0;
for (size_t i = 0; i < l->acceptors_size; i++) {
psocket_t *ps = &l->acceptors[i].psocket;
@@ -2228,12 +2230,12 @@
n_events++;
}
unlock(&p->sched_mutex);
- batch = listener_process(l, n_events, ctx_wake);
+ batch = listener_process(l, n_events, tsk_ready);
break;
}
case RAW_CONNECTION: {
unlock(&p->sched_mutex);
- batch = pni_raw_connection_process(ctx, ctx_wake);
+ batch = pni_raw_connection_process(tsk, tsk_ready);
break;
}
case TIMER_MANAGER: {
@@ -2241,7 +2243,7 @@
bool timeout = tm->sched_timeout;
if (timeout) tm->sched_timeout = false;
unlock(&p->sched_mutex);
- batch = pni_timer_manager_process(tm, timeout, ctx_wake);
+ batch = pni_timer_manager_process(tm, timeout, tsk_ready);
break;
}
default:
@@ -2252,37 +2254,37 @@
}
-// Call with both sched and wake locks
-static void schedule_wake_list(pn_proactor_t *p) {
- // append wake_list_first..wake_list_last to end of sched_wake_last
- if (p->wake_list_first) {
- if (p->sched_wake_last)
- p->sched_wake_last->wake_next = p->wake_list_first; // join them
- if (!p->sched_wake_first)
- p->sched_wake_first = p->wake_list_first;
- p->sched_wake_last = p->wake_list_last;
- if (!p->sched_wake_current)
- p->sched_wake_current = p->sched_wake_first;
- p->wake_list_first = p->wake_list_last = NULL;
+// Call with both sched_mutex and eventfd_mutex held
+static void schedule_ready_list(pn_proactor_t *p) {
+ // append ready_list_first..ready_list_last to end of sched_ready_last
+ if (p->ready_list_first) {
+ if (p->sched_ready_last)
+ p->sched_ready_last->ready_next = p->ready_list_first; // join them
+ if (!p->sched_ready_first)
+ p->sched_ready_first = p->ready_list_first;
+ p->sched_ready_last = p->ready_list_last;
+ if (!p->sched_ready_current)
+ p->sched_ready_current = p->sched_ready_first;
+ p->ready_list_first = p->ready_list_last = NULL;
}
}
// Call with schedule lock held. Called only by poller thread.
-static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
+static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr;
- pcontext_t *ctx = NULL;
+ task_t *tsk = NULL;
switch (ee->type) {
- case WAKE:
+ case EVENT_FD:
if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */
p->sched_interrupt = true;
- ctx = &p->context;
- ctx->sched_pending = true;
+ tsk = &p->task;
+ tsk->sched_pending = true;
} else {
- // main eventfd wake
+ // main ready tasks eventfd
lock(&p->eventfd_mutex);
- schedule_wake_list(p);
- ctx = p->sched_wake_current;
+ schedule_ready_list(p);
+ tsk = p->sched_ready_current;
unlock(&p->eventfd_mutex);
}
break;
@@ -2290,51 +2292,51 @@
psocket_t *ps = containerof(ee, psocket_t, epoll_io);
pconnection_t *pc = psocket_pconnection(ps);
assert(pc);
- ctx = &pc->context;
+ tsk = &pc->task;
ps->sched_io_events = evp->events;
- ctx->sched_pending = true;
+ tsk->sched_pending = true;
break;
}
case LISTENER_IO: {
psocket_t *ps = containerof(ee, psocket_t, epoll_io);
pn_listener_t *l = psocket_listener(ps);
assert(l);
- ctx = &l->context;
+ tsk = &l->task;
ps->sched_io_events = evp->events;
- ctx->sched_pending = true;
+ tsk->sched_pending = true;
break;
}
case RAW_CONNECTION_IO: {
psocket_t *ps = containerof(ee, psocket_t, epoll_io);
- ctx = pni_psocket_raw_context(ps);
+ tsk = pni_psocket_raw_task(ps);
ps->sched_io_events = evp->events;
- ctx->sched_pending = true;
+ tsk->sched_pending = true;
break;
}
case TIMER: {
pni_timer_manager_t *tm = &p->timer_manager;
- ctx = &tm->context;
+ tsk = &tm->task;
tm->sched_timeout = true;
- ctx->sched_pending = true;
+ tsk->sched_pending = true;
break;
}
}
- if (ctx && !ctx->runnable && !ctx->runner)
- return ctx;
+ if (tsk && !tsk->runnable && !tsk->runner)
+ return tsk;
return NULL;
}
-static pcontext_t *post_wake(pn_proactor_t *p, pcontext_t *ctx) {
- ctx->sched_wake = true;
- ctx->sched_pending = true;
- if (!ctx->runnable && !ctx->runner)
- return ctx;
+static task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
+ tsk->sched_ready = true;
+ tsk->sched_pending = true;
+ if (!tsk->runnable && !tsk->runner)
+ return tsk;
return NULL;
}
// call with sched_lock held
-static pcontext_t *next_drain(pn_proactor_t *p, tslot_t *ts) {
+static task_t *next_drain(pn_proactor_t *p, tslot_t *ts) {
// This should be called seldomly, best case once per thread removal on shutdown.
// TODO: how to reduce? Instrumented near 5 percent of earmarks, 1 in 2000 calls to do_epoll().
@@ -2342,12 +2344,12 @@
tslot_t *ts2 = (tslot_t *) pn_hash_value(p->tslot_map, entry);
if (ts2->earmarked) {
// undo the old assign thread and earmark. ts2 may never come back
- pcontext_t *switch_ctx = ts2->context;
+ task_t *switch_tsk = ts2->task;
remove_earmark(ts2);
- assign_thread(ts, switch_ctx);
+ assign_thread(ts, switch_tsk);
ts->earmark_override = ts2;
ts->earmark_override_gen = ts2->generation;
- return switch_ctx;
+ return switch_tsk;
}
}
assert(false);
@@ -2355,51 +2357,51 @@
}
// call with sched_lock held
-static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
- if (ts->context) {
+static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
+ if (ts->task) {
// Already assigned
if (ts->earmarked) {
ts->earmarked = false;
if (--p->earmark_count == 0)
p->earmark_drain = false;
}
- return ts->context;
+ return ts->task;
}
// warm pairing ?
- pcontext_t *ctx = ts->prev_context;
- if (ctx && (ctx->runnable)) { // or ctx->sched_wake too?
- assign_thread(ts, ctx);
- return ctx;
+ task_t *tsk = ts->prev_task;
+ if (tsk && (tsk->runnable)) { // or tsk->sched_ready too?
+ assign_thread(ts, tsk);
+ return tsk;
}
- // check for an unassigned runnable context or unprocessed wake
+ // check for an unassigned runnable task or ready list task
if (p->n_runnables) {
// Any unclaimed runnable?
while (p->n_runnables) {
- ctx = p->runnables[p->next_runnable++];
+ tsk = p->runnables[p->next_runnable++];
if (p->n_runnables == p->next_runnable)
p->n_runnables = 0;
- if (ctx->runnable) {
- assign_thread(ts, ctx);
- return ctx;
+ if (tsk->runnable) {
+ assign_thread(ts, tsk);
+ return tsk;
}
}
}
- if (p->sched_wake_current) {
- ctx = p->sched_wake_current;
- pop_wake(ctx); // updates sched_wake_current
- assert(!ctx->runnable && !ctx->runner);
- assign_thread(ts, ctx);
- return ctx;
+ if (p->sched_ready_current) {
+ tsk = p->sched_ready_current;
+ pop_ready_task(tsk); // updates sched_ready_current
+ assert(!tsk->runnable && !tsk->runner);
+ assign_thread(ts, tsk);
+ return tsk;
}
if (p->earmark_drain) {
- ctx = next_drain(p, ts);
+ tsk = next_drain(p, ts);
if (p->earmark_count == 0)
p->earmark_drain = false;
- return ctx;
+ return tsk;
}
return NULL;
@@ -2412,17 +2414,17 @@
ts->generation++; // wrapping OK. Just looking for any change
lock(&p->sched_mutex);
- assert(ts->context == NULL || ts->earmarked);
+ assert(ts->task == NULL || ts->earmarked);
assert(ts->state == UNUSED || ts->state == NEW);
ts->state = PROCESSING;
// Process outstanding epoll events until we get a batch or need to block.
while (true) {
- // First see if there are any contexts waiting to run and perhaps generate new Proton events,
- pcontext_t *ctx = next_runnable(p, ts);
- if (ctx) {
+ // First see if there are any tasks waiting to run and perhaps generate new Proton events,
+ task_t *tsk = next_runnable(p, ts);
+ if (tsk) {
ts->state = BATCHING;
- pn_event_batch_t *batch = process(ctx);
+ pn_event_batch_t *batch = process(tsk);
if (batch) {
unlock(&p->sched_mutex);
return batch;
@@ -2430,17 +2432,17 @@
bool notify = unassign_thread(ts, PROCESSING);
if (notify) {
unlock(&p->sched_mutex);
- wake_notify(&p->context);
+ notify_poller(&p->task);
lock(&p->sched_mutex);
}
continue; // Long time may have passed. Back to beginning.
}
- // Poll or wait for a runnable context
+ // Poll or wait for a runnable task
if (p->poller == NULL) {
bool return_immediately;
p->poller = ts;
- // Get new epoll events (if any) and mark the relevant contexts as runnable
+ // Get new epoll events (if any) and mark the relevant tasks as runnable
return_immediately = poller_do_epoll(p, ts, can_block);
p->poller = NULL;
if (return_immediately) {
@@ -2467,7 +2469,7 @@
static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) {
// As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls.
int n_events;
- pcontext_t *ctx;
+ task_t *tsk;
while (true) {
assert(p->n_runnables == 0);
@@ -2478,16 +2480,16 @@
p->last_earmark = NULL;
bool unfinished_earmarks = p->earmark_count > 0;
- bool new_wakes = false;
+ bool new_ready_tasks = false;
bool epoll_immediate = unfinished_earmarks || !can_block;
- assert(!p->sched_wake_first);
+ assert(!p->sched_ready_first);
if (!epoll_immediate) {
lock(&p->eventfd_mutex);
- if (p->wake_list_first) {
+ if (p->ready_list_first) {
epoll_immediate = true;
- new_wakes = true;
+ new_ready_tasks = true;
} else {
- p->wakes_in_progress = false;
+ p->ready_list_active = false;
}
unlock(&p->eventfd_mutex);
}
@@ -2505,9 +2507,9 @@
p->earmark_drain = true;
unpolled_work = true;
}
- if (new_wakes) {
+ if (new_ready_tasks) {
lock(&p->eventfd_mutex);
- schedule_wake_list(p);
+ schedule_ready_list(p);
unlock(&p->eventfd_mutex);
unpolled_work = true;
}
@@ -2537,66 +2539,66 @@
for (int i = 0; i < n_events; i++) {
- ctx = post_event(p, &p->kevents[i]);
- if (ctx)
- make_runnable(ctx);
+ tsk = post_event(p, &p->kevents[i]);
+ if (tsk)
+ make_runnable(tsk);
}
if (n_events > 0)
memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);
- // The list of pending wakes can be very long. Traverse part of it looking for warm pairings.
- pcontext_t *wctx = p->sched_wake_current;
+ // The list of ready tasks can be very long. Traverse part of it looking for warm pairings.
+ task_t *ctsk = p->sched_ready_current;
int max_runnables = p->runnables_capacity;
- while (wctx && p->n_runnables < max_runnables) {
- if (wctx->runner == REWAKE_PLACEHOLDER)
- wctx->runner = NULL; // Allow context to run again.
- ctx = post_wake(p, wctx);
- if (ctx)
- make_runnable(ctx);
- pop_wake(wctx);
- wctx = wctx->wake_next;
+ while (ctsk && p->n_runnables < max_runnables) {
+ if (ctsk->runner == RESCHEDULE_PLACEHOLDER)
+ ctsk->runner = NULL; // Allow task to run again.
+ tsk = post_ready(p, ctsk);
+ if (tsk)
+ make_runnable(tsk);
+ pop_ready_task(ctsk);
+ ctsk = ctsk->ready_next;
}
- p->sched_wake_current = wctx;
- // More wakes than places on the runnables list
- while (wctx) {
- if (wctx->runner == REWAKE_PLACEHOLDER)
- wctx->runner = NULL; // Allow context to run again.
- wctx->sched_wake = true;
- wctx->sched_pending = true;
- if (wctx->runnable || wctx->runner)
- pop_wake(wctx);
- wctx = wctx->wake_next;
+ p->sched_ready_current = ctsk;
+ // More ready tasks than places on the runnables list
+ while (ctsk) {
+ if (ctsk->runner == RESCHEDULE_PLACEHOLDER)
+ ctsk->runner = NULL; // Allow task to run again.
+ ctsk->sched_ready = true;
+ ctsk->sched_pending = true;
+ if (ctsk->runnable || ctsk->runner)
+ pop_ready_task(ctsk);
+ ctsk = ctsk->ready_next;
}
- if (pni_immediate && !ts->context) {
+ if (pni_immediate && !ts->task) {
// Poller gets to run if possible
- pcontext_t *pctx;
+ task_t *ptsk;
if (p->n_runnables) {
assert(p->next_runnable == 0);
- pctx = p->runnables[0];
+ ptsk = p->runnables[0];
if (++p->next_runnable == p->n_runnables)
p->n_runnables = 0;
} else if (p->n_warm_runnables) {
- pctx = p->warm_runnables[--p->n_warm_runnables];
- tslot_t *ts2 = pctx->runner;
- ts2->prev_context = ts2->context = NULL;
- pctx->runner = NULL;
+ ptsk = p->warm_runnables[--p->n_warm_runnables];
+ tslot_t *ts2 = ptsk->runner;
+ ts2->prev_task = ts2->task = NULL;
+ ptsk->runner = NULL;
} else if (p->last_earmark) {
- pctx = p->last_earmark->context;
+ ptsk = p->last_earmark->task;
remove_earmark(p->last_earmark);
if (p->earmark_count == 0)
p->earmark_drain = false;
} else {
- pctx = NULL;
+ ptsk = NULL;
}
- if (pctx) {
- assign_thread(ts, pctx);
+ if (ptsk) {
+ assign_thread(ts, ptsk);
}
}
return false;
}
-// Call with sched lock, but only from poller context.
+// Call with sched lock, but only as poller.
static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
// Create a list of available threads to put to work.
// ts is the poller thread
@@ -2609,8 +2611,8 @@
if (max_resumes) {
resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *));
for (int i = 0; i < p->n_warm_runnables ; i++) {
- pcontext_t *ctx = p->warm_runnables[i];
- tslot_t *tsp = ctx->runner;
+ task_t *tsk = p->warm_runnables[i];
+ tslot_t *tsp = tsk->runner;
if (tsp->state == SUSPENDED) {
resume_list2[resume_list_count++] = tsp;
LL_REMOVE(p, suspend_list, tsp);
@@ -2620,11 +2622,11 @@
}
int can_use = p->suspend_list_count;
- if (!ts->context)
+ if (!ts->task)
can_use++;
- // Run as many unpaired runnable contexts as possible and allow for a new poller.
+ // Run as many unpaired runnable tasks as possible and allow for a new poller.
int new_runners = pn_min(p->n_runnables + 1, can_use);
- if (!ts->context)
+ if (!ts->task)
new_runners--; // poller available and does not need resume
// Rare corner case on startup. New inbound threads can make the suspend_list too big for resume list.
@@ -2678,7 +2680,7 @@
void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
pconnection_t *pc = batch_pconnection(batch);
if (pc) {
- tslot_t *ts = pc->context.runner;
+ tslot_t *ts = pc->task.runner;
pconnection_done(pc);
// pc possibly freed/invalid
check_earmark_override(p, ts);
@@ -2686,7 +2688,7 @@
}
pn_listener_t *l = batch_listener(batch);
if (l) {
- tslot_t *ts = l->context.runner;
+ tslot_t *ts = l->task.runner;
listener_done(l);
// l possibly freed/invalid
check_earmark_override(p, ts);
@@ -2694,7 +2696,7 @@
}
praw_connection_t *rc = pni_batch_raw_connection(batch);
if (rc) {
- tslot_t *ts = pni_raw_connection_context(rc)->runner;
+ tslot_t *ts = pni_raw_connection_task(rc)->runner;
pni_raw_connection_done(rc);
// rc possibly freed/invalid
check_earmark_override(p, ts);
@@ -2703,27 +2705,27 @@
pn_proactor_t *bp = batch_proactor(batch);
if (bp == p) {
bool notify = false;
- lock(&p->context.mutex);
- p->context.working = false;
+ lock(&p->task.mutex);
+ p->task.working = false;
if (p->timeout_processed) {
p->timeout_processed = false;
- if (wake_if_inactive(p))
+ if (schedule_if_inactive(p))
notify = true;
}
proactor_update_batch(p);
if (proactor_has_event(p))
- if (wake(&p->context))
+ if (schedule(&p->task))
notify = true;
- unlock(&p->context.mutex);
+ unlock(&p->task.mutex);
lock(&p->sched_mutex);
- tslot_t *ts = p->context.runner;
+ tslot_t *ts = p->task.runner;
if (unassign_thread(ts, UNUSED))
notify = true;
unlock(&p->sched_mutex);
if (notify)
- wake_notify(&p->context);
+ notify_poller(&p->task);
check_earmark_override(p, ts);
return;
}
@@ -2739,84 +2741,84 @@
void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
bool notify = false;
- lock(&p->context.mutex);
+ lock(&p->task.mutex);
p->timeout_set = true;
if (t == 0) {
pni_timer_set(p->timer, 0);
p->need_timeout = true;
- notify = wake(&p->context);
+ notify = schedule(&p->task);
} else {
pni_timer_set(p->timer, t + pn_proactor_now_64());
}
- unlock(&p->context.mutex);
- if (notify) wake_notify(&p->context);
+ unlock(&p->task.mutex);
+ if (notify) notify_poller(&p->task);
}
void pn_proactor_cancel_timeout(pn_proactor_t *p) {
- lock(&p->context.mutex);
+ lock(&p->task.mutex);
p->timeout_set = false;
p->need_timeout = false;
pni_timer_set(p->timer, 0);
- bool notify = wake_if_inactive(p);
- unlock(&p->context.mutex);
- if (notify) wake_notify(&p->context);
+ bool notify = schedule_if_inactive(p);
+ unlock(&p->task.mutex);
+ if (notify) notify_poller(&p->task);
}
void pni_proactor_timeout(pn_proactor_t *p) {
bool notify = false;
- lock(&p->context.mutex);
- if (!p->context.closing) {
+ lock(&p->task.mutex);
+ if (!p->task.closing) {
p->need_timeout = true;
- notify = wake(&p->context);
+ notify = schedule(&p->task);
}
- unlock(&p->context.mutex);
- if (notify) wake_notify(&p->context);
+ unlock(&p->task.mutex);
+ if (notify) notify_poller(&p->task);
}
pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
pconnection_t *pc = get_pconnection(c);
- return pc ? pc->context.proactor : NULL;
+ return pc ? pc->task.proactor : NULL;
}
void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
bool notify = false;
- lock(&p->context.mutex);
- // Move the whole contexts list into a disconnecting state
- pcontext_t *disconnecting_pcontexts = p->contexts;
- p->contexts = NULL;
- // First pass: mark each pcontext as disconnecting and update global pending count.
- pcontext_t *ctx = disconnecting_pcontexts;
- while (ctx) {
- ctx->disconnecting = true;
- ctx->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order.
+ lock(&p->task.mutex);
+ // Move the whole tasks list into a disconnecting state
+ task_t *disconnecting_tasks = p->tasks;
+ p->tasks = NULL;
+ // First pass: mark each task as disconnecting and update global pending count.
+ task_t *tsk = disconnecting_tasks;
+ while (tsk) {
+ tsk->disconnecting = true;
+ tsk->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order.
p->disconnects_pending++;
- ctx = ctx->next;
- p->context_count--;
+ tsk = tsk->next;
+ p->task_count--;
}
- notify = wake_if_inactive(p);
- unlock(&p->context.mutex);
- if (!disconnecting_pcontexts) {
- if (notify) wake_notify(&p->context);
+ notify = schedule_if_inactive(p);
+ unlock(&p->task.mutex);
+ if (!disconnecting_tasks) {
+ if (notify) notify_poller(&p->task);
return;
}
- // Second pass: different locking, close the pcontexts, free them if !disconnect_ops
- pcontext_t *next = disconnecting_pcontexts;
+ // Second pass: different locking, close the tasks, free them if !disconnect_ops
+ task_t *next = disconnecting_tasks;
while (next) {
- ctx = next;
- next = ctx->next; /* Save next pointer in case we free ctx */
+ tsk = next;
+ next = tsk->next; /* Save next pointer in case we free tsk */
bool do_free = false;
- bool ctx_notify = false;
- pmutex *ctx_mutex = NULL;
+ bool tsk_notify = false;
+ pmutex *tsk_mutex = NULL;
// TODO: Need to extend this for raw connections too
- pconnection_t *pc = pcontext_pconnection(ctx);
+ pconnection_t *pc = task_pconnection(tsk);
if (pc) {
- ctx_mutex = &pc->context.mutex;
- lock(ctx_mutex);
- if (!ctx->closing) {
- ctx_notify = true;
- if (ctx->working) {
+ tsk_mutex = &pc->task.mutex;
+ lock(tsk_mutex);
+ if (!tsk->closing) {
+ tsk_notify = true;
+ if (tsk->working) {
// Must defer
pc->queued_disconnect = true;
if (cond) {
@@ -2826,7 +2828,7 @@
}
}
else {
- // No conflicting working context.
+ // No conflicting working task.
if (cond) {
pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
}
@@ -2834,12 +2836,12 @@
}
}
} else {
- pn_listener_t *l = pcontext_listener(ctx);
+ pn_listener_t *l = task_listener(tsk);
assert(l);
- ctx_mutex = &l->context.mutex;
- lock(ctx_mutex);
- if (!ctx->closing) {
- ctx_notify = true;
+ tsk_mutex = &l->task.mutex;
+ lock(tsk_mutex);
+ if (!tsk->closing) {
+ tsk_notify = true;
if (cond) {
pn_condition_copy(pn_listener_condition(l), cond);
}
@@ -2847,29 +2849,29 @@
}
}
- lock(&p->context.mutex);
- if (--ctx->disconnect_ops == 0) {
+ lock(&p->task.mutex);
+ if (--tsk->disconnect_ops == 0) {
do_free = true;
- ctx_notify = false;
- notify = wake_if_inactive(p);
+ tsk_notify = false;
+ notify = schedule_if_inactive(p);
} else {
- // If initiating the close, wake the pcontext to do the free.
- if (ctx_notify)
- ctx_notify = wake(ctx);
- if (ctx_notify)
- wake_notify(ctx);
+ // If initiating the close, schedule the task to do the free.
+ if (tsk_notify)
+ tsk_notify = schedule(tsk);
+ if (tsk_notify)
+ notify_poller(tsk);
}
- unlock(&p->context.mutex);
- unlock(ctx_mutex);
+ unlock(&p->task.mutex);
+ unlock(tsk_mutex);
- // Unsafe to touch ctx after lock release, except if we are the designated final_free
+ // Unsafe to touch tsk after lock release, except if we are the designated final_free
if (do_free) {
if (pc) pconnection_final_free(pc);
- else listener_final_free(pcontext_listener(ctx));
+ else listener_final_free(task_listener(tsk));
}
}
if (notify)
- wake_notify(&p->context);
+ notify_poller(&p->task);
}
const pn_netaddr_t *pn_transport_local_addr(pn_transport_t *t) {
diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c
index c10afe8..0195c57 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -40,7 +40,7 @@
/* epoll specific raw connection struct */
struct praw_connection_t {
- pcontext_t context;
+ task_t task;
struct pn_raw_connection_t raw_connection;
psocket_t psocket;
struct pn_netaddr_t local, remote; /* Actual addresses */
@@ -50,7 +50,7 @@
struct addrinfo *ai; /* Current connect address */
bool connected;
bool disconnected;
- bool waking; // TODO: This is actually protected by context.mutex so should be moved into context (pconnection too)
+ bool waking; // TODO: This is actually protected by task.mutex so should be moved into task (pconnection too)
};
static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -85,7 +85,7 @@
/* multi-address connections may call pconnection_start multiple times with diffferent FDs */
static void praw_connection_start(praw_connection_t *prc, int fd) {
- int efd = prc->context.proactor->epollfd;
+ int efd = prc->task.proactor->epollfd;
/* Get the local socket name now, get the peer name in pconnection_connected */
socklen_t len = sizeof(prc->local.ss);
@@ -95,7 +95,7 @@
if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */
int fd = ee->fd; /* Save fd, it will be set to -1 by stop_polling */
stop_polling(ee, efd);
- pclosefd(prc->context.proactor, fd);
+ pclosefd(prc->task.proactor, fd);
}
ee->fd = fd;
ee->wanted = EPOLLIN | EPOLLOUT;
@@ -139,7 +139,7 @@
static pn_event_t * pni_raw_batch_next(pn_event_batch_t *batch);
static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_raw_connection_t *rc) {
- pcontext_init(&prc->context, RAW_CONNECTION, p);
+ task_init(&prc->task, RAW_CONNECTION, p);
psocket_init(&prc->psocket, RAW_CONNECTION_IO);
prc->connected = false;
@@ -152,15 +152,15 @@
static void praw_connection_cleanup(praw_connection_t *prc) {
int fd = prc->psocket.epoll_io.fd;
- stop_polling(&prc->psocket.epoll_io, prc->context.proactor->epollfd);
+ stop_polling(&prc->psocket.epoll_io, prc->task.proactor->epollfd);
if (fd != -1)
- pclosefd(prc->context.proactor, fd);
+ pclosefd(prc->task.proactor, fd);
- lock(&prc->context.mutex);
- bool can_free = proactor_remove(&prc->context);
- unlock(&prc->context.mutex);
+ lock(&prc->task.mutex);
+ bool can_free = proactor_remove(&prc->task);
+ unlock(&prc->task.mutex);
if (can_free) {
- pcontext_finalize(&prc->context);
+ task_finalize(&prc->task);
free(prc);
}
// else proactor_disconnect logic owns prc and its final free
@@ -181,8 +181,8 @@
praw_connection_init(prc, p, rc);
// TODO: check case of proactor shutting down
- lock(&prc->context.mutex);
- proactor_add(&prc->context);
+ lock(&prc->task.mutex);
+ proactor_add(&prc->task);
bool notify = false;
bool notify_proactor = false;
@@ -197,20 +197,20 @@
if (!gai_error) {
prc->ai = prc->addrinfo;
praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
- if (prc->disconnected) notify = wake(&prc->context);
+ if (prc->disconnected) notify = schedule(&prc->task);
} else {
psocket_gai_error(prc, gai_error, "connect to ", addr);
prc->disconnected = true;
- notify = wake(&prc->context);
- lock(&p->context.mutex);
- notify_proactor = wake_if_inactive(p);
- unlock(&p->context.mutex);
+ notify = schedule(&prc->task);
+ lock(&p->task.mutex);
+ notify_proactor = schedule_if_inactive(p);
+ unlock(&p->task.mutex);
}
/* We need to issue INACTIVE on immediate failure */
- unlock(&prc->context.mutex);
- if (notify) wake_notify(&prc->context);
- if (notify_proactor) wake_notify(&p->context);
+ unlock(&prc->task.mutex);
+ if (notify) notify_poller(&prc->task);
+ if (notify_proactor) notify_poller(&p->task);
}
void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) {
@@ -222,8 +222,8 @@
int err = 0;
int fd = -1;
bool notify = false;
- lock(&l->context.mutex);
- if (l->context.closing)
+ lock(&l->task.mutex);
+ if (l->task.closing)
err = EBADF;
else {
accepted_t *a = listener_accepted_next(l);
@@ -234,9 +234,9 @@
else err = EWOULDBLOCK;
}
- proactor_add(&prc->context);
+ proactor_add(&prc->task);
- lock(&prc->context.mutex);
+ lock(&prc->task.mutex);
if (fd >= 0) {
configure_socket(fd);
praw_connection_start(prc, fd);
@@ -245,12 +245,12 @@
psocket_error(prc, err, "pn_listener_accept");
}
- if (!l->context.working && listener_has_event(l)) {
- notify = wake(&l->context);
+ if (!l->task.working && listener_has_event(l)) {
+ notify = schedule(&l->task);
}
- unlock(&prc->context.mutex);
- unlock(&l->context.mutex);
- if (notify) wake_notify(&l->context);
+ unlock(&prc->task.mutex);
+ unlock(&l->task.mutex);
+ if (notify) notify_poller(&l->task);
}
const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) {
@@ -268,20 +268,20 @@
void pn_raw_connection_wake(pn_raw_connection_t *rc) {
bool notify = false;
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
- lock(&prc->context.mutex);
- if (!prc->context.closing) {
+ lock(&prc->task.mutex);
+ if (!prc->task.closing) {
prc->waking = true;
- notify = wake(&prc->context);
+ notify = schedule(&prc->task);
}
- unlock(&prc->context.mutex);
- if (notify) wake_notify(&prc->context);
+ unlock(&prc->task.mutex);
+ if (notify) notify_poller(&prc->task);
}
void pn_raw_connection_close(pn_raw_connection_t *rc) {
praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
- lock(&prc->context.mutex);
- prc->context.closing = true;
- unlock(&prc->context.mutex);
+ lock(&prc->task.mutex);
+ prc->task.closing = true;
+ unlock(&prc->task.mutex);
pni_raw_close(rc);
}
@@ -291,17 +291,17 @@
// Check wake status every event processed
bool waking = false;
- lock(&rc->context.mutex);
+ lock(&rc->task.mutex);
waking = rc->waking;
rc->waking = false;
- unlock(&rc->context.mutex);
+ unlock(&rc->task.mutex);
if (waking) pni_raw_wake(raw);
return pni_raw_event_next(raw);
}
-pcontext_t *pni_psocket_raw_context(psocket_t* ps) {
- return &containerof(ps, praw_connection_t, psocket)->context;
+task_t *pni_psocket_raw_task(psocket_t* ps) {
+ return &containerof(ps, praw_connection_t, psocket)->task;
}
praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) {
@@ -309,8 +309,8 @@
containerof(batch, praw_connection_t, batch) : NULL;
}
-pcontext_t *pni_raw_connection_context(praw_connection_t *rc) {
- return &rc->context;
+task_t *pni_raw_connection_task(praw_connection_t *rc) {
+ return &rc->task;
}
static long snd(int fd, const void* b, size_t s) {
@@ -325,8 +325,8 @@
psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg);
}
-pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) {
- praw_connection_t *rc = containerof(c, praw_connection_t, context);
+pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready) {
+ praw_connection_t *rc = containerof(t, praw_connection_t, task);
int events = rc->psocket.sched_io_events;
int fd = rc->psocket.epoll_io.fd;
if (!rc->connected) {
@@ -344,14 +344,14 @@
}
bool wake = false;
- lock(&c->mutex);
- c->working = true;
- if (sched_wake) {
- wake_done(c);
+ lock(&t->mutex);
+ t->working = true;
+ if (sched_ready) {
+ schedule_done(t);
wake = rc->waking;
rc->waking = false;
}
- unlock(&c->mutex);
+ unlock(&t->mutex);
if (wake) pni_raw_wake(&rc->raw_connection);
if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error);
@@ -361,17 +361,17 @@
void pni_raw_connection_done(praw_connection_t *rc) {
bool self_notify = false;
- bool wake_pending = false;
- lock(&rc->context.mutex);
- pn_proactor_t *p = rc->context.proactor;
- tslot_t *ts = rc->context.runner;
- rc->context.working = false;
- self_notify = rc->waking && wake(&rc->context);
- // There could be a scheduler wake pending even if we've got no raw connection
+ bool ready = false;
+ lock(&rc->task.mutex);
+ pn_proactor_t *p = rc->task.proactor;
+ tslot_t *ts = rc->task.runner;
+ rc->task.working = false;
+ self_notify = rc->waking && schedule(&rc->task);
+ // The task may be in the ready state even if we've got no raw connection
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
- wake_pending = rc->context.wake_pending;
- unlock(&rc->context.mutex);
- if (self_notify) wake_notify(&rc->context);
+ ready = rc->task.ready;
+ unlock(&rc->task.mutex);
+ if (self_notify) notify_poller(&rc->task);
pn_raw_connection_t *raw = &rc->raw_connection;
int wanted =
@@ -381,7 +381,7 @@
rc->psocket.epoll_io.wanted = wanted;
rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
} else {
- bool finished_disconnect = raw->rclosed && raw->wclosed && !wake_pending && !raw->disconnectpending;
+ bool finished_disconnect = raw->rclosed && raw->wclosed && !ready && !raw->disconnectpending;
if (finished_disconnect) {
// If we're closed and we've sent the disconnect then close
pni_raw_finalize(raw);
@@ -392,5 +392,5 @@
lock(&p->sched_mutex);
bool notify = unassign_thread(ts, UNUSED);
unlock(&p->sched_mutex);
- if (notify) wake_notify(&p->context);
+ if (notify) notify_poller(&p->task);
}
diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c
index 6d288e1..a882128 100644
--- a/c/src/proactor/epoll_timer.c
+++ b/c/src/proactor/epoll_timer.c
@@ -50,7 +50,7 @@
* second AMQP open frame results in a shorter periodic transport timer than the first open frame. In this case, the
* existing timer_deadline is immediately orphaned and a new one created for the rest of the connection's life.
*
- * Lock ordering: tm->context_mutex --> tm->deletion_mutex.
+ * Lock ordering: tm->task_mutex --> tm->deletion_mutex.
*/
static void timerfd_set(int fd, uint64_t t_millis) {
@@ -114,7 +114,7 @@
pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c) {
timer_deadline_t *td = NULL;
pni_timer_t *timer = NULL;
- assert(c || !tm->context.proactor->timer); // Proactor timer. Can only be one.
+ assert(c || !tm->task.proactor->timer); // Proactor timer. Can only be one.
timer = (pni_timer_t *) malloc(sizeof(pni_timer_t));
if (!timer) return NULL;
if (c) {
@@ -126,14 +126,14 @@
}
}
- lock(&tm->context.mutex);
+ lock(&tm->task.mutex);
timer->connection = c;
timer->manager = tm;
timer->timer_deadline = td;
timer->deadline = 0;
if (c)
td->timer = timer;
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
return timer;
}
@@ -143,7 +143,7 @@
bool can_free_td = false;
if (td) pni_timer_set(timer, 0);
pni_timer_manager_t *tm = timer->manager;
- lock(&tm->context.mutex);
+ lock(&tm->task.mutex);
lock(&tm->deletion_mutex);
if (td) {
if (td->list_deadline)
@@ -152,7 +152,7 @@
can_free_td = true;
}
unlock(&tm->deletion_mutex);
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
if (can_free_td) {
pn_free(td);
}
@@ -168,7 +168,7 @@
tm->timers_heap = NULL;
tm->proactor_timer = NULL;
pn_proactor_t *p = containerof(tm, pn_proactor_t, timer_manager);
- pcontext_init(&tm->context, TIMER_MANAGER, p);
+ task_init(&tm->task, TIMER_MANAGER, p);
pmutex_init(&tm->deletion_mutex);
// PN_VOID turns off ref counting for the elements in the list.
@@ -190,8 +190,8 @@
// Only call from proactor's destructor, when it is single threaded and scheduling has stopped.
void pni_timer_manager_finalize(pni_timer_manager_t *tm) {
- lock(&tm->context.mutex);
- unlock(&tm->context.mutex); // Memory barrier
+ lock(&tm->task.mutex);
+ unlock(&tm->task.mutex); // Memory barrier
if (tm->epoll_timer.fd >= 0) close(tm->epoll_timer.fd);
pni_timer_free(tm->proactor_timer);
if (tm->timers_heap) {
@@ -205,14 +205,14 @@
pn_free(tm->timers_heap);
}
pmutex_finalize(&tm->deletion_mutex);
- pcontext_finalize(&tm->context);
+ task_finalize(&tm->task);
}
-// Call with timer_manager lock held. Return true if wake_notify required.
+// Call with timer_manager lock held. Return true if notify_poller required.
static bool adjust_deadline(pni_timer_manager_t *tm) {
- // Make sure the timer_manager context will get a timeout in time for the earliest connection timeout.
- if (tm->context.working)
- return false; // timer_manager context will adjust the timer when it stops working
+ // Make sure the timer_manager task will get a timeout in time for the earliest connection timeout.
+ if (tm->task.working)
+ return false; // timer_manager task will adjust the timer when it stops working
bool notify = false;
uint64_t new_deadline = tm->proactor_timer->deadline;
if (pn_list_size(tm->timers_heap)) {
@@ -227,7 +227,7 @@
uint64_t now = pn_proactor_now_64();
if (new_deadline <= now) {
// no need for a timer update. Wake the timer_manager.
- notify = wake(&tm->context);
+ notify = schedule(&tm->task);
}
else {
timerfd_set(tm->epoll_timer.fd, new_deadline - now);
@@ -238,16 +238,16 @@
return notify;
}
-// Call without context lock or timer_manager lock.
+// Call without task lock or timer_manager lock.
// Calls for connection timers are generated in the proactor and serialized per connection.
// Calls for the proactor timer can come from arbitrary user threads.
void pni_timer_set(pni_timer_t *timer, uint64_t deadline) {
pni_timer_manager_t *tm = timer->manager;
bool notify = false;
- lock(&tm->context.mutex);
+ lock(&tm->task.mutex);
if (deadline == timer->deadline) {
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
return; // No change.
}
@@ -262,7 +262,7 @@
if (td->resequenced)
EPOLL_FATAL("idle timeout sequencing error", 0); //
else {
- // replace drops the lock for malloc. Safe because there can be no competing call to
+ // replace drops the lock for malloc. Safe because there can be no competing call to
// the timer set function by the same pconnection from another thread.
td = replace_timer_deadline(tm, timer);
}
@@ -279,30 +279,30 @@
// Skip a cancelled timer (deadline == 0) since it doesn't change the timerfd deadline.
if (deadline)
notify = adjust_deadline(tm);
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
if (notify)
- wake_notify(&tm->context);
+ notify_poller(&tm->task);
}
-pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake) {
+pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready) {
uint64_t now = pn_proactor_now_64();
- lock(&tm->context.mutex);
- tm->context.working = true;
+ lock(&tm->task.mutex);
+ tm->task.working = true;
if (timeout)
tm->timerfd_deadline = 0;
- if (wake)
- wake_done(&tm->context);
+ if (sched_ready)
+ schedule_done(&tm->task);
// First check for proactor timer expiry.
uint64_t deadline = tm->proactor_timer->deadline;
if (deadline && deadline <= now) {
tm->proactor_timer->deadline = 0;
- unlock(&tm->context.mutex);
- pni_proactor_timeout(tm->context.proactor);
- lock(&tm->context.mutex);
- // If lower latency desired for the proactor timer, we could convert to the proactor context (if not working) and return
- // here with the event batch, and wake the timer manager context to process the connection timers.
+ unlock(&tm->task.mutex);
+ pni_proactor_timeout(tm->task.proactor);
+ lock(&tm->task.mutex);
+ // If lower latency desired for the proactor timer, we could convert to the proactor task (if not working) and return
+ // here with the event batch, and schedule the timer manager task to process the connection timers.
}
// Next, find all expired connection timers at front of the ordered heap.
@@ -321,20 +321,20 @@
// timer deadline extended -> minpush back on list to new spot
// timer freed -> free the associated timer_deadline popped off the list
if (!td->timer) {
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
pn_free(td);
- lock(&tm->context.mutex);
+ lock(&tm->task.mutex);
} else {
uint64_t deadline = td->timer->deadline;
if (deadline) {
if (deadline <= now) {
td->timer->deadline = 0;
pconnection_t *pc = td->timer->connection;
- lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->context.mutex dropped.
- unlock(&tm->context.mutex);
+ lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->task.mutex dropped.
+ unlock(&tm->task.mutex);
pni_pconnection_timeout(pc);
unlock(&tm->deletion_mutex);
- lock(&tm->context.mutex);
+ lock(&tm->task.mutex);
} else {
td->list_deadline = deadline;
pn_list_minpush(tm->timers_heap, td);
@@ -346,20 +346,20 @@
if (timeout) {
// TODO: query whether perf gain by doing these system calls outside the lock, perhaps with additional set_reset_mutex.
timerfd_drain(tm->epoll_timer.fd);
- rearm_polling(&tm->epoll_timer, tm->context.proactor->epollfd);
+ rearm_polling(&tm->epoll_timer, tm->task.proactor->epollfd);
}
- tm->context.working = false; // must be false for adjust_deadline to do adjustment
+ tm->task.working = false; // must be false for adjust_deadline to do adjustment
bool notify = adjust_deadline(tm);
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
if (notify)
- wake_notify(&tm->context);
+ notify_poller(&tm->task);
// The timer_manager never has events to batch.
return NULL;
- // TODO: perhaps become context of one of the timed out timers (if otherwise idle) and process() that context.
+ // TODO: perhaps become task of one of the timed out timers (if otherwise idle) and process() that task.
}
-// Call with timer_manager lock held.
+// Call with timer_manager lock held.
// There can be no competing call to this and timer_set() from the same connection.
static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, pni_timer_t *timer) {
assert(timer->connection);
@@ -368,12 +368,12 @@
// Mark old struct for deletion. No parent timer.
old_td->timer = NULL;
- unlock(&tm->context.mutex);
+ unlock(&tm->task.mutex);
// Create replacement timer for life of connection.
timer_deadline_t *new_td = pni_timer_deadline();
if (!new_td)
EPOLL_FATAL("replacement timer deadline allocation", errno);
- lock(&tm->context.mutex);
+ lock(&tm->task.mutex);
new_td->list_deadline = 0;
new_td->timer = timer;