blob: e639e48cbdff9500a3dd4138dd5b219cd2df2c24 [file] [log] [blame]
#ifndef PROACTOR_EPOLL_INTERNAL_H
#define PROACTOR_EPOLL_INTERNAL_H
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <proton/connection_driver.h>
#include <proton/proactor.h>
#include "netaddr-internal.h"
#ifdef __cplusplus
extern "C" {
#endif
//typedef struct pn_proactor_t pn_proactor_t;
//typedef struct pn_listener_t pn_listener_t;
//typedef struct pn_connection_driver_t pn_connection_driver_t;
typedef struct acceptor_t acceptor_t;
typedef struct tslot_t tslot_t;
typedef pthread_mutex_t pmutex;
typedef enum {
WAKE, /* see if any work to do in proactor/psocket context */
PCONNECTION_IO,
PCONNECTION_TIMER,
LISTENER_IO,
PROACTOR_TIMER
} epoll_type_t;
// Data to use with epoll.
typedef struct epoll_extended_t {
int fd;
epoll_type_t type; // io/timer/wakeup
uint32_t wanted; // events to poll for
bool polling;
pmutex barrier_mutex;
} epoll_extended_t;
typedef struct ptimer_t {
pmutex mutex;
epoll_extended_t epoll_io;
bool timer_active;
bool in_doubt; // 0 or 1 callbacks are possible
bool shutting_down;
} ptimer_t;
typedef enum {
PROACTOR,
PCONNECTION,
LISTENER,
WAKEABLE
} pcontext_type_t;
typedef struct pcontext_t {
pmutex mutex;
pn_proactor_t *proactor; /* Immutable */
void *owner; /* Instance governed by the context */
pcontext_type_t type;
bool working;
bool on_wake_list;
bool wake_pending; // unprocessed eventfd wake callback (convert to bool?)
struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
bool closing;
// Next 4 are protected by the proactor mutex
struct pcontext_t* next; /* Protected by proactor.mutex */
struct pcontext_t* prev; /* Protected by proactor.mutex */
int disconnect_ops; /* ops remaining before disconnect complete */
bool disconnecting; /* pn_proactor_disconnect */
// Protected by schedule mutex
tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */
tslot_t *prev_runner;
bool sched_wake;
bool sched_pending; /* If true, one or more unseen epoll or other events to process() */
bool runnable ; /* in need of scheduling */
} pcontext_t;
typedef enum {
NEW,
UNUSED, /* pn_proactor_done() called, may never come back */
SUSPENDED,
PROCESSING, /* Hunting for a context */
BATCHING, /* Doing work on behalf of a context */
DELETING,
POLLING
} tslot_state;
// Epoll proactor's concept of a worker thread provided by the application.
struct tslot_t {
pmutex mutex; // suspend and resume
pthread_cond_t cond;
unsigned int generation;
bool suspended;
volatile bool scheduled;
tslot_state state;
pcontext_t *context;
pcontext_t *prev_context;
bool earmarked;
tslot_t *suspend_list_prev;
tslot_t *suspend_list_next;
tslot_t *earmark_override; // on earmark_drain, which thread was unassigned
unsigned int earmark_override_gen;
};
struct pn_proactor_t {
pcontext_t context;
ptimer_t timer;
epoll_extended_t epoll_wake;
epoll_extended_t epoll_interrupt;
pn_event_batch_t batch;
pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */
size_t disconnects_pending; /* unfinished proactor disconnects*/
// need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
bool need_interrupt;
bool need_inactive;
bool need_timeout;
bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
bool timeout_processed; /* timeout event dispatched in the most recent event batch */
bool timer_armed; /* timer is armed in epoll */
int context_count;
// wake subsystem
int eventfd;
pmutex eventfd_mutex;
bool wakes_in_progress;
pcontext_t *wake_list_first;
pcontext_t *wake_list_last;
// Interrupts have a dedicated eventfd because they must be async-signal safe.
int interruptfd;
// If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
acceptor_t *overflow;
pmutex overflow_mutex;
// Sched vars specific to proactor context.
bool sched_timeout;
bool sched_interrupt;
// Global scheduling/poller vars.
// Warm runnables have assigned or earmarked tslots and can run right away.
// Other runnables are run as tslots come available.
pmutex sched_mutex;
int n_runnables;
int next_runnable;
int n_warm_runnables;
tslot_t *suspend_list_head;
tslot_t *suspend_list_tail;
int suspend_list_count;
tslot_t *poller;
bool poller_suspended;
tslot_t *last_earmark;
pcontext_t *sched_wake_first;
pcontext_t *sched_wake_last;
pcontext_t *sched_wake_current;
pmutex tslot_mutex;
int earmark_count;
bool earmark_drain;
bool sched_wakes_pending;
// Mostly read only: after init or once thread_count stabilizes
pn_collector_t *collector __attribute__((aligned(64)));
pcontext_t **warm_runnables;
pcontext_t **runnables;
tslot_t **resume_list;
pn_hash_t *tslot_map;
struct epoll_event *kevents;
int epollfd;
int thread_count;
int thread_capacity;
int runnables_capacity;
int kevents_capacity;
bool shutting_down;
};
/* common to connection and listener */
typedef struct psocket_t {
pn_proactor_t *proactor;
// Remaining protected by the pconnection/listener mutex
epoll_extended_t epoll_io;
uint32_t sched_io_events;
uint32_t working_io_events;
} psocket_t;
typedef struct pconnection_t {
psocket_t psocket;
pcontext_t context;
ptimer_t timer; // TODO: review one timerfd per connection
char addr_buf[PN_MAX_ADDR];
const char *host, *port;
uint32_t new_events;
int wake_count;
bool server; /* accept, not connect */
bool tick_pending;
bool timer_armed;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
pn_condition_t *disconnect_condition;
// Following values only changed by (sole) working context:
uint32_t current_arm; // active epoll io events
bool connected;
bool read_blocked;
bool write_blocked;
bool disconnected;
int hog_count; // thread hogging limiter
pn_event_batch_t batch;
pn_connection_driver_t driver;
bool output_drained;
const char *wbuf_current;
size_t wbuf_remaining;
size_t wbuf_completed;
struct pn_netaddr_t local, remote; /* Actual addresses */
struct addrinfo *addrinfo; /* Resolved address list */
struct addrinfo *ai; /* Current connect address */
pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
bool io_doublecheck; /* callbacks made and new IO may have arrived */
bool sched_timeout;
} pconnection_t;
/*
* A listener can have multiple sockets (as specified in the addrinfo). They
* are armed separately. The individual psockets can be part of at most one
* list: the global proactor overflow retry list or the per-listener list of
* pending accepts (valid inbound socket obtained, but pn_listener_accept not
* yet called by the application). These lists will be small and quick to
* traverse.
*/
struct acceptor_t{
psocket_t psocket;
struct pn_netaddr_t addr; /* listening address */
pn_listener_t *listener;
acceptor_t *next; /* next listener list member */
int accepted_fd;
bool armed;
bool overflowed;
};
struct pn_listener_t {
pcontext_t context;
acceptor_t *acceptors; /* Array of listening sockets */
size_t acceptors_size;
char addr_buf[PN_MAX_ADDR];
const char *host, *port;
int active_count; /* Number of listener sockets registered with epoll */
pn_condition_t *condition;
pn_collector_t *collector;
pn_event_batch_t batch;
pn_record_t *attachments;
void *listener_context;
acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/
int pending_count;
bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */
size_t backlog;
bool close_dispatched;
pmutex rearm_mutex; /* orders rearms/disarms, nothing else */
uint32_t sched_io_events;
};
#ifdef __cplusplus
}
#endif
#endif // PROACTOR_EPOLL_INTERNAL_H