PROTON-2130: Split out structs from epoll.c to make it easier to mess with them
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
new file mode 100644
index 0000000..6a13e7f
--- /dev/null
+++ b/c/src/proactor/epoll-internal.h
@@ -0,0 +1,288 @@
+#ifndef PROACTOR_EPOLL_INTERNAL_H
+#define PROACTOR_EPOLL_INTERNAL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <pthread.h>
+
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+
+
+#include <proton/connection_driver.h>
+#include <proton/proactor.h>
+
+#include "netaddr-internal.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//typedef struct pn_proactor_t pn_proactor_t;
+//typedef struct pn_listener_t pn_listener_t;
+//typedef struct pn_connection_driver_t pn_connection_driver_t;
+typedef struct acceptor_t acceptor_t;
+typedef struct tslot_t tslot_t;
+typedef pthread_mutex_t pmutex;
+
+typedef enum {
+ WAKE, /* see if any work to do in proactor/psocket context */
+ PCONNECTION_IO,
+ PCONNECTION_TIMER,
+ LISTENER_IO,
+ PROACTOR_TIMER
+} epoll_type_t;
+
+// Data to use with epoll.
+typedef struct epoll_extended_t {
+ struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor
+ int fd;
+ epoll_type_t type; // io/timer/wakeup
+ uint32_t wanted; // events to poll for
+ bool polling;
+ pmutex barrier_mutex;
+} epoll_extended_t;
+
+typedef struct ptimer_t {
+ pmutex mutex;
+ int timerfd;
+ epoll_extended_t epoll_io;
+ bool timer_active;
+ bool in_doubt; // 0 or 1 callbacks are possible
+ bool shutting_down;
+} ptimer_t;
+
+typedef enum {
+ PROACTOR,
+ PCONNECTION,
+ LISTENER,
+ WAKEABLE
+} pcontext_type_t;
+
+typedef struct pcontext_t {
+ pmutex mutex;
+ pn_proactor_t *proactor; /* Immutable */
+ void *owner; /* Instance governed by the context */
+ pcontext_type_t type;
+ bool working;
+ bool on_wake_list;
+ bool wake_pending; // unprocessed eventfd wake callback (convert to bool?)
+ struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+ bool closing;
+ // Next 4 are protected by the proactor mutex
+ struct pcontext_t* next; /* Protected by proactor.mutex */
+ struct pcontext_t* prev; /* Protected by proactor.mutex */
+ int disconnect_ops; /* ops remaining before disconnect complete */
+ bool disconnecting; /* pn_proactor_disconnect */
+ // Protected by schedule mutex
+ tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */
+ tslot_t *prev_runner;
+ bool sched_wake;
+ bool sched_pending; /* If true, one or more unseen epoll or other events to process() */
+ bool runnable ; /* in need of scheduling */
+} pcontext_t;
+
+typedef enum {
+ NEW,
+ UNUSED, /* pn_proactor_done() called, may never come back */
+ SUSPENDED,
+ PROCESSING, /* Hunting for a context */
+ BATCHING, /* Doing work on behalf of a context */
+ DELETING,
+ POLLING
+} tslot_state;
+
+// Epoll proactor's concept of a worker thread provided by the application.
+struct tslot_t {
+ pmutex mutex; // suspend and resume
+ pthread_cond_t cond;
+ unsigned int generation;
+ bool suspended;
+ volatile bool scheduled;
+ tslot_state state;
+ pcontext_t *context;
+ pcontext_t *prev_context;
+ bool earmarked;
+ tslot_t *suspend_list_prev;
+ tslot_t *suspend_list_next;
+ tslot_t *earmark_override; // on earmark_drain, which thread was unassigned
+ unsigned int earmark_override_gen;
+};
+
+/* common to connection and listener */
+typedef struct psocket_t {
+ pn_proactor_t *proactor;
+ // Remaining protected by the pconnection/listener mutex
+ int sockfd;
+ epoll_extended_t epoll_io;
+ pn_listener_t *listener; /* NULL for a connection socket */
+ char addr_buf[PN_MAX_ADDR];
+ const char *host, *port;
+ uint32_t sched_io_events;
+ uint32_t working_io_events;
+} psocket_t;
+
+struct pn_proactor_t {
+ pcontext_t context;
+ ptimer_t timer;
+ epoll_extended_t epoll_wake;
+ epoll_extended_t epoll_interrupt;
+ pn_event_batch_t batch;
+ pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */
+ size_t disconnects_pending; /* unfinished proactor disconnects*/
+ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+ bool need_interrupt;
+ bool need_inactive;
+ bool need_timeout;
+ bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+ bool timeout_processed; /* timeout event dispatched in the most recent event batch */
+ bool timer_armed; /* timer is armed in epoll */
+ int context_count;
+
+ // wake subsystem
+ int eventfd;
+ pmutex eventfd_mutex;
+ bool wakes_in_progress;
+ pcontext_t *wake_list_first;
+ pcontext_t *wake_list_last;
+ // Interrupts have a dedicated eventfd because they must be async-signal safe.
+ int interruptfd;
+ // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
+ acceptor_t *overflow;
+ pmutex overflow_mutex;
+
+ // Sched vars specific to proactor context.
+ bool sched_timeout;
+ bool sched_interrupt;
+
+ // Global scheduling/poller vars.
+ // Warm runnables have assigned or earmarked tslots and can run right away.
+ // Other runnables are run as tslots come available.
+ pmutex sched_mutex;
+ int n_runnables;
+ int next_runnable;
+ int n_warm_runnables;
+ tslot_t *suspend_list_head;
+ tslot_t *suspend_list_tail;
+ int suspend_list_count;
+ tslot_t *poller;
+ bool poller_suspended;
+ tslot_t *last_earmark;
+ pcontext_t *sched_wake_first;
+ pcontext_t *sched_wake_last;
+ pcontext_t *sched_wake_current;
+ pmutex tslot_mutex;
+ int earmark_count;
+ bool earmark_drain;
+ bool sched_wakes_pending;
+
+ // Mostly read only: after init or once thread_count stabilizes
+ pn_collector_t *collector __attribute__((aligned(64)));
+ pcontext_t **warm_runnables;
+ pcontext_t **runnables;
+ tslot_t **resume_list;
+ pn_hash_t *tslot_map;
+ struct epoll_event *kevents;
+ int epollfd;
+ int thread_count;
+ int thread_capacity;
+ int runnables_capacity;
+ int kevents_capacity;
+ bool shutting_down;
+};
+
+typedef struct pconnection_t {
+ psocket_t psocket;
+ pcontext_t context;
+ uint32_t new_events;
+ int wake_count;
+ bool server; /* accept, not connect */
+ bool tick_pending;
+ bool timer_armed;
+ bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
+ pn_condition_t *disconnect_condition;
+ ptimer_t timer; // TODO: review one timerfd per connection
+ // Following values only changed by (sole) working context:
+ uint32_t current_arm; // active epoll io events
+ bool connected;
+ bool read_blocked;
+ bool write_blocked;
+ bool disconnected;
+ int hog_count; // thread hogging limiter
+ pn_event_batch_t batch;
+ pn_connection_driver_t driver;
+ bool output_drained;
+ const char *wbuf_current;
+ size_t wbuf_remaining;
+ size_t wbuf_completed;
+ struct pn_netaddr_t local, remote; /* Actual addresses */
+ struct addrinfo *addrinfo; /* Resolved address list */
+ struct addrinfo *ai; /* Current connect address */
+ pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
+ bool io_doublecheck; /* callbacks made and new IO may have arrived */
+ bool sched_timeout;
+} pconnection_t;
+
+/*
+ * A listener can have multiple sockets (as specified in the addrinfo). They
+ * are armed separately. The individual psockets can be part of at most one
+ * list: the global proactor overflow retry list or the per-listener list of
+ * pending accepts (valid inbound socket obtained, but pn_listener_accept not
+ * yet called by the application). These lists will be small and quick to
+ * traverse.
+ */
+
+struct acceptor_t{
+ psocket_t psocket;
+ int accepted_fd;
+ bool armed;
+ bool overflowed;
+ acceptor_t *next; /* next listener list member */
+ struct pn_netaddr_t addr; /* listening address */
+};
+
+struct pn_listener_t {
+ acceptor_t *acceptors; /* Array of listening sockets */
+ size_t acceptors_size;
+ int active_count; /* Number of listener sockets registered with epoll */
+ pcontext_t context;
+ pn_condition_t *condition;
+ pn_collector_t *collector;
+ pn_event_batch_t batch;
+ pn_record_t *attachments;
+ void *listener_context;
+ acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/
+ int pending_count;
+ bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */
+ size_t backlog;
+ bool close_dispatched;
+ pmutex rearm_mutex; /* orders rearms/disarms, nothing else */
+ uint32_t sched_io_events;
+};
+
+#ifdef __cplusplus
+}
+#endif
+#endif // PROACTOR_EPOLL_INTERNAL_H
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 9390595..f8ebbf4 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -57,6 +57,7 @@
/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
#undef _GNU_SOURCE
+#include "epoll-internal.h"
#include "proactor-internal.h"
#include "core/engine-internal.h"
#include "core/logger_private.h"
@@ -128,7 +129,6 @@
// In general all locks to be held singly and shortly (possibly as spin locks).
// See above about lock ordering.
-typedef pthread_mutex_t pmutex;
static void pmutex_init(pthread_mutex_t *pm){
pthread_mutexattr_t attr;
@@ -144,26 +144,6 @@
static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
-typedef struct acceptor_t acceptor_t;
-typedef struct tslot_t tslot_t;
-
-typedef enum {
- WAKE, /* see if any work to do in proactor/psocket context */
- PCONNECTION_IO,
- PCONNECTION_TIMER,
- LISTENER_IO,
- PROACTOR_TIMER } epoll_type_t;
-
-// Data to use with epoll.
-typedef struct epoll_extended_t {
- struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor
- int fd;
- epoll_type_t type; // io/timer/wakeup
- uint32_t wanted; // events to poll for
- bool polling;
- pmutex barrier_mutex;
-} epoll_extended_t;
-
/* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
visible to epoll_wait() thread. This function creates a memory barrier,
@@ -196,15 +176,6 @@
* TODO: review above in light of single poller thread.
*/
-typedef struct ptimer_t {
- pmutex mutex;
- int timerfd;
- epoll_extended_t epoll_io;
- bool timer_active;
- bool in_doubt; // 0 or 1 callbacks are possible
- bool shutting_down;
-} ptimer_t;
-
static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
pmutex_init(&pt->mutex);
@@ -364,60 +335,6 @@
* eventfd to allow a lock-free pn_proactor_interrupt() implementation.
*/
-typedef enum {
- PROACTOR,
- PCONNECTION,
- LISTENER,
- WAKEABLE } pcontext_type_t;
-
-typedef struct pcontext_t {
- pmutex mutex;
- pn_proactor_t *proactor; /* Immutable */
- void *owner; /* Instance governed by the context */
- pcontext_type_t type;
- bool working;
- bool on_wake_list;
- bool wake_pending; // unprocessed eventfd wake callback (convert to bool?)
- struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
- bool closing;
- // Next 4 are protected by the proactor mutex
- struct pcontext_t* next; /* Protected by proactor.mutex */
- struct pcontext_t* prev; /* Protected by proactor.mutex */
- int disconnect_ops; /* ops remaining before disconnect complete */
- bool disconnecting; /* pn_proactor_disconnect */
- // Protected by schedule mutex
- tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */
- tslot_t *prev_runner;
- bool sched_wake;
- bool sched_pending; /* If true, one or more unseen epoll or other events to process() */
- bool runnable ; /* in need of scheduling */
-} pcontext_t;
-
-typedef enum {
- NEW,
- UNUSED, /* pn_proactor_done() called, may never come back */
- SUSPENDED,
- PROCESSING, /* Hunting for a context */
- BATCHING, /* Doing work on behalf of a context */
- DELETING,
- POLLING } tslot_state;
-
-// Epoll proactor's concept of a worker thread provided by the application.
-struct tslot_t {
- pmutex mutex; // suspend and resume
- pthread_cond_t cond;
- unsigned int generation;
- bool suspended;
- volatile bool scheduled;
- tslot_state state;
- pcontext_t *context;
- pcontext_t *prev_context;
- bool earmarked;
- tslot_t *suspend_list_prev;
- tslot_t *suspend_list_next;
- tslot_t *earmark_override; // on earmark_drain, which thread was unassigned
- unsigned int earmark_override_gen;
-};
// Fake thread for temporarily disabling the scheduling of a context.
static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
@@ -434,159 +351,6 @@
pmutex_finalize(&ctx->mutex);
}
-
-/* common to connection and listener */
-typedef struct psocket_t {
- pn_proactor_t *proactor;
- // Remaining protected by the pconnection/listener mutex
- int sockfd;
- epoll_extended_t epoll_io;
- pn_listener_t *listener; /* NULL for a connection socket */
- char addr_buf[PN_MAX_ADDR];
- const char *host, *port;
- uint32_t sched_io_events;
- uint32_t working_io_events;
-} psocket_t;
-
-struct pn_proactor_t {
- pcontext_t context;
- ptimer_t timer;
- epoll_extended_t epoll_wake;
- epoll_extended_t epoll_interrupt;
- pn_event_batch_t batch;
- pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */
- size_t disconnects_pending; /* unfinished proactor disconnects*/
- // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
- bool need_interrupt;
- bool need_inactive;
- bool need_timeout;
- bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
- bool timeout_processed; /* timeout event dispatched in the most recent event batch */
- bool timer_armed; /* timer is armed in epoll */
- int context_count;
-
- // wake subsystem
- int eventfd;
- pmutex eventfd_mutex;
- bool wakes_in_progress;
- pcontext_t *wake_list_first;
- pcontext_t *wake_list_last;
- // Interrupts have a dedicated eventfd because they must be async-signal safe.
- int interruptfd;
- // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
- acceptor_t *overflow;
- pmutex overflow_mutex;
-
- // Sched vars specific to proactor context.
- bool sched_timeout;
- bool sched_interrupt;
-
- // Global scheduling/poller vars.
- // Warm runnables have assigned or earmarked tslots and can run right away.
- // Other runnables are run as tslots come available.
- pmutex sched_mutex;
- int n_runnables;
- int next_runnable;
- int n_warm_runnables;
- tslot_t *suspend_list_head;
- tslot_t *suspend_list_tail;
- int suspend_list_count;
- tslot_t *poller;
- bool poller_suspended;
- tslot_t *last_earmark;
- pcontext_t *sched_wake_first;
- pcontext_t *sched_wake_last;
- pcontext_t *sched_wake_current;
- pmutex tslot_mutex;
- int earmark_count;
- bool earmark_drain;
- bool sched_wakes_pending;
-
- // Mostly read only: after init or once thread_count stabilizes
- pn_collector_t *collector __attribute__((aligned(64)));
- pcontext_t **warm_runnables;
- pcontext_t **runnables;
- tslot_t **resume_list;
- pn_hash_t *tslot_map;
- struct epoll_event *kevents;
- int epollfd;
- int thread_count;
- int thread_capacity;
- int runnables_capacity;
- int kevents_capacity;
- bool shutting_down;
-};
-
-typedef struct pconnection_t {
- psocket_t psocket;
- pcontext_t context;
- uint32_t new_events;
- int wake_count;
- bool server; /* accept, not connect */
- bool tick_pending;
- bool timer_armed;
- bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
- pn_condition_t *disconnect_condition;
- ptimer_t timer; // TODO: review one timerfd per connection
- // Following values only changed by (sole) working context:
- uint32_t current_arm; // active epoll io events
- bool connected;
- bool read_blocked;
- bool write_blocked;
- bool disconnected;
- int hog_count; // thread hogging limiter
- pn_event_batch_t batch;
- pn_connection_driver_t driver;
- bool output_drained;
- const char *wbuf_current;
- size_t wbuf_remaining;
- size_t wbuf_completed;
- struct pn_netaddr_t local, remote; /* Actual addresses */
- struct addrinfo *addrinfo; /* Resolved address list */
- struct addrinfo *ai; /* Current connect address */
- pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
- bool io_doublecheck; /* callbacks made and new IO may have arrived */
- bool sched_timeout;
-} pconnection_t;
-
-/*
- * A listener can have multiple sockets (as specified in the addrinfo). They
- * are armed separately. The individual psockets can be part of at most one
- * list: the global proactor overflow retry list or the per-listener list of
- * pending accepts (valid inbound socket obtained, but pn_listener_accept not
- * yet called by the application). These lists will be small and quick to
- * traverse.
- */
-
-struct acceptor_t{
- psocket_t psocket;
- int accepted_fd;
- bool armed;
- bool overflowed;
- acceptor_t *next; /* next listener list member */
- struct pn_netaddr_t addr; /* listening address */
-};
-
-struct pn_listener_t {
- acceptor_t *acceptors; /* Array of listening sockets */
- size_t acceptors_size;
- int active_count; /* Number of listener sockets registered with epoll */
- pcontext_t context;
- pn_condition_t *condition;
- pn_collector_t *collector;
- pn_event_batch_t batch;
- pn_record_t *attachments;
- void *listener_context;
- acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/
- int pending_count;
- bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */
- size_t backlog;
- bool close_dispatched;
- pmutex rearm_mutex; /* orders rearms/disarms, nothing else */
- uint32_t sched_io_events;
-};
-
-
static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
/*