blob: 07a63ea98f8fe2340fea98b5eb3649566908a134 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "P_Net.h"
using namespace std::literals;
ink_hrtime last_throttle_warning;
ink_hrtime last_shedding_warning;
int net_connections_throttle;
bool net_memory_throttle = false;
int fds_throttle;
int fds_limit = 8000;
ink_hrtime last_transient_accept_error;
NetHandler::Config NetHandler::global_config;
std::bitset<std::numeric_limits<unsigned int>::digits> NetHandler::active_thread_types;
const std::bitset<NetHandler::CONFIG_ITEM_COUNT> NetHandler::config_value_affects_per_thread_value{0x3};
extern "C" void fd_reify(struct ev_loop *);
// INKqa10496
// One Inactivity cop runs on each thread once every second and
// loops through the list of NetEvents and calls the timeouts
class InactivityCop : public Continuation
{
public:
explicit InactivityCop(Ptr<ProxyMutex> &m) : Continuation(m.get()) { SET_HANDLER(&InactivityCop::check_inactivity); }
int
check_inactivity(int event, Event *e)
{
(void)event;
ink_hrtime now = Thread::get_hrtime();
NetHandler &nh = *get_NetHandler(this_ethread());
Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id);
// The rest NetEvents in cop_list which are not triggered between InactivityCop runs.
// Use pop() to catch any closes caused by callbacks.
while (NetEvent *ne = nh.cop_list.pop()) {
// If we cannot get the lock don't stop just keep cleaning
MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread());
if (!lock.is_locked()) {
NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
continue;
}
if (ne->closed) {
nh.free_netevent(ne);
continue;
}
// set a default inactivity timeout if one is not set
// The event `EVENT_INACTIVITY_TIMEOUT` only be triggered if a read
// or write I/O operation was set by `do_io_read()` or `do_io_write()`.
if (ne->next_inactivity_timeout_at == 0 && nh.config.default_inactivity_timeout > 0 &&
(ne->read.enabled || ne->write.enabled)) {
Debug("inactivity_cop", "vc: %p inactivity timeout not set, setting a default of %d", ne,
nh.config.default_inactivity_timeout);
ne->set_default_inactivity_timeout(HRTIME_SECONDS(nh.config.default_inactivity_timeout));
NET_INCREMENT_DYN_STAT(default_inactivity_timeout_applied_stat);
}
if (ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at < now) {
if (ne->is_default_inactivity_timeout()) {
// track the connections that timed out due to default inactivity
NET_INCREMENT_DYN_STAT(default_inactivity_timeout_count_stat);
}
if (nh.keep_alive_queue.in(ne)) {
// only stat if the connection is in keep-alive, there can be other inactivity timeouts
ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND;
NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
}
Debug("inactivity_cop_verbose", "ne: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, ne,
ink_hrtime_to_sec(now), ne->next_inactivity_timeout_at, ne->inactivity_timeout_in);
ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, e);
} else if (ne->next_activity_timeout_at && ne->next_activity_timeout_at < now) {
Debug("inactivity_cop_verbose", "active ne: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, ne,
ink_hrtime_to_sec(now), ne->next_activity_timeout_at, ne->active_timeout_in);
ne->callback(VC_EVENT_ACTIVE_TIMEOUT, e);
}
}
// The cop_list is empty now.
// Let's reload the cop_list from open_list again.
forl_LL(NetEvent, ne, nh.open_list)
{
if (ne->get_thread() == this_ethread()) {
nh.cop_list.push(ne);
}
}
// NetHandler will remove NetEvent from cop_list if it is triggered.
// As the NetHandler runs, the number of NetEvents in the cop_list is decreasing.
// NetHandler runs 100 times maximum between InactivityCop runs.
// Therefore we don't have to check all the NetEvents as much as open_list.
// Cleanup the active and keep-alive queues periodically
nh.manage_active_queue(nullptr, true); // close any connections over the active timeout
nh.manage_keep_alive_queue();
return 0;
}
};
PollCont::PollCont(Ptr<ProxyMutex> &m, int pt)
: Continuation(m.get()), net_handler(nullptr), nextPollDescriptor(nullptr), poll_timeout(pt)
{
pollDescriptor = new PollDescriptor();
SET_HANDLER(&PollCont::pollEvent);
}
PollCont::PollCont(Ptr<ProxyMutex> &m, NetHandler *nh, int pt)
: Continuation(m.get()), net_handler(nh), nextPollDescriptor(nullptr), poll_timeout(pt)
{
pollDescriptor = new PollDescriptor();
SET_HANDLER(&PollCont::pollEvent);
}
PollCont::~PollCont()
{
delete pollDescriptor;
if (nextPollDescriptor != nullptr) {
delete nextPollDescriptor;
}
}
//
// PollCont continuation which does the epoll_wait
// and stores the resultant events in ePoll_Triggered_Events
//
int
PollCont::pollEvent(int, Event *)
{
this->do_poll(-1);
return EVENT_CONT;
}
void
PollCont::do_poll(ink_hrtime timeout)
{
if (likely(net_handler)) {
/* checking to see whether there are connections on the ready_queue (either read or write) that need processing [ebalsa] */
if (likely(!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() ||
!net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) {
NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->read_ready_list.empty(),
net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(),
net_handler->write_enable_list.empty());
poll_timeout = 0; // poll immediately returns -- we have triggered stuff to process right now
} else if (timeout >= 0) {
poll_timeout = ink_hrtime_to_msec(timeout);
} else {
poll_timeout = net_config_poll_timeout;
}
}
// wait for fd's to trigger, or don't wait if timeout is 0
#if TS_USE_EPOLL
pollDescriptor->result =
epoll_wait(pollDescriptor->epoll_fd, pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd,
poll_timeout, pollDescriptor->result);
#elif TS_USE_KQUEUE
struct timespec tv;
tv.tv_sec = poll_timeout / 1000;
tv.tv_nsec = 1000000 * (poll_timeout % 1000);
pollDescriptor->result =
kevent(pollDescriptor->kqueue_fd, nullptr, 0, pollDescriptor->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv);
NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] kqueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd,
poll_timeout, pollDescriptor->result);
#elif TS_USE_PORT
int retval;
timespec_t ptimeout;
ptimeout.tv_sec = poll_timeout / 1000;
ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
unsigned nget = 1;
if ((retval = port_getn(pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events, POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) <
0) {
pollDescriptor->result = 0;
switch (errno) {
case EINTR:
case EAGAIN:
case ETIME:
if (nget > 0) {
pollDescriptor->result = (int)nget;
}
break;
default:
ink_assert(!"unhandled port_getn() case:");
break;
}
} else {
pollDescriptor->result = (int)nget;
}
NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)", retval,
retval < 0 ? strerror(errno) : "ok", pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events,
POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pollDescriptor->result);
#else
#error port me
#endif
}
static void
net_signal_hook_callback(EThread *thread)
{
#if HAVE_EVENTFD
uint64_t counter;
ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
#elif TS_USE_PORT
/* Nothing to drain or do */
#else
char dummy[1024];
ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
#endif
}
void
initialize_thread_for_net(EThread *thread)
{
NetHandler *nh = get_NetHandler(thread);
new (reinterpret_cast<ink_dummy_for_new *>(nh)) NetHandler();
new (reinterpret_cast<ink_dummy_for_new *>(get_PollCont(thread))) PollCont(thread->mutex, nh);
nh->mutex = new_ProxyMutex();
nh->thread = thread;
PollCont *pc = get_PollCont(thread);
PollDescriptor *pd = pc->pollDescriptor;
InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
int cop_freq = 1;
REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config));
nh->configure_per_thread_values();
thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
thread->set_tail_handler(nh);
thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO)));
new (thread->ep) EventIO();
thread->ep->type = EVENTIO_ASYNC_SIGNAL;
#if HAVE_EVENTFD
thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ);
#else
thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ);
#endif
}
// NetHandler method definitions
NetHandler::NetHandler() : Continuation(nullptr)
{
SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
}
int
NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *)
{
uint32_t *updated_member = nullptr; // direct pointer to config member for update.
std::string_view name{str};
if (name == "proxy.config.net.max_connections_in"sv) {
updated_member = &NetHandler::global_config.max_connections_in;
Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
} else if (name == "proxy.config.net.max_requests_in"sv) {
updated_member = &NetHandler::global_config.max_requests_in;
Debug("net_queue", "proxy.config.net.max_requests_in updated to %" PRId64, data.rec_int);
} else if (name == "proxy.config.net.inactive_threshold_in"sv) {
updated_member = &NetHandler::global_config.inactive_threshold_in;
Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int);
} else if (name == "proxy.config.net.transaction_no_activity_timeout_in"sv) {
updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in;
Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
} else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"sv) {
updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in;
Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
} else if (name == "proxy.config.net.default_inactivity_timeout"sv) {
updated_member = &NetHandler::global_config.default_inactivity_timeout;
Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
}
if (updated_member) {
*updated_member = data.rec_int; // do the actual update.
// portable form of the update, an index converted to <void*> so it can be passed as an event cookie.
void *idx = reinterpret_cast<void *>(static_cast<intptr_t>(updated_member - &global_config[0]));
// Signal the NetHandler instances, passing the index of the updated config value.
for (int i = 0; i < eventProcessor.n_thread_groups; ++i) {
if (!active_thread_types[i]) {
continue;
}
for (EThread **tp = eventProcessor.thread_group[i]._thread,
**limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count;
tp < limit; ++tp) {
NetHandler *nh = get_NetHandler(*tp);
if (nh) {
nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx);
}
}
}
}
return REC_ERR_OKAY;
}
void
NetHandler::init_for_process()
{
// read configuration values and setup callbacks for when they change
REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in");
REC_ReadConfigInt32(global_config.max_requests_in, "proxy.config.net.max_requests_in");
REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in");
REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr);
RecRegisterConfigUpdateCb("proxy.config.net.max_requests_in", update_nethandler_config, nullptr);
RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr);
RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr);
RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr);
RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr);
Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in);
Debug("net_queue", "proxy.config.net.max_requests_in updated to %d", global_config.max_requests_in);
Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in);
Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d",
global_config.transaction_no_activity_timeout_in);
Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d",
global_config.keep_alive_no_activity_timeout_in);
Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout);
}
//
// Function used to release a NetEvent and free it.
//
void
NetHandler::free_netevent(NetEvent *ne)
{
EThread *t = this->thread;
ink_assert(t == this_ethread());
ink_release_assert(ne->get_thread() == t);
ink_release_assert(ne->nh == this);
// Release ne from InactivityCop
stopCop(ne);
// Release ne from NetHandler
stopIO(ne);
// Clear and deallocate ne
ne->free(t);
}
//
// Move VC's enabled on a different thread to the ready list
//
void
NetHandler::process_enabled_list()
{
NetEvent *ne = nullptr;
SListM(NetEvent, NetState, read, enable_link) rq(read_enable_list.popall());
while ((ne = rq.pop())) {
ne->ep.modify(EVENTIO_READ);
ne->ep.refresh(EVENTIO_READ);
ne->read.in_enabled_list = 0;
if ((ne->read.enabled && ne->read.triggered) || ne->closed) {
read_ready_list.in_or_enqueue(ne);
}
}
SListM(NetEvent, NetState, write, enable_link) wq(write_enable_list.popall());
while ((ne = wq.pop())) {
ne->ep.modify(EVENTIO_WRITE);
ne->ep.refresh(EVENTIO_WRITE);
ne->write.in_enabled_list = 0;
if ((ne->write.enabled && ne->write.triggered) || ne->closed) {
write_ready_list.in_or_enqueue(ne);
}
}
}
//
// Walk through the ready list
//
void
NetHandler::process_ready_list()
{
NetEvent *ne = nullptr;
#if defined(USE_EDGE_TRIGGER)
// NetEvent *
while ((ne = read_ready_list.dequeue())) {
// Initialize the thread-local continuation flags
set_cont_flags(ne->get_control_flags());
if (ne->closed) {
free_netevent(ne);
} else if (ne->read.enabled && ne->read.triggered) {
ne->net_read_io(this, this->thread);
} else if (!ne->read.enabled) {
read_ready_list.remove(ne);
#if defined(solaris)
if (ne->read.triggered && ne->write.enabled) {
ne->ep.modify(-EVENTIO_READ);
ne->ep.refresh(EVENTIO_WRITE);
ne->writeReschedule(this);
}
#endif
}
}
while ((ne = write_ready_list.dequeue())) {
set_cont_flags(ne->get_control_flags());
if (ne->closed) {
free_netevent(ne);
} else if (ne->write.enabled && ne->write.triggered) {
ne->net_write_io(this, this->thread);
} else if (!ne->write.enabled) {
write_ready_list.remove(ne);
#if defined(solaris)
if (ne->write.triggered && ne->read.enabled) {
ne->ep.modify(-EVENTIO_WRITE);
ne->ep.refresh(EVENTIO_READ);
ne->readReschedule(this);
}
#endif
}
}
#else /* !USE_EDGE_TRIGGER */
while ((ne = read_ready_list.dequeue())) {
set_cont_flags(ne->get_control_flags());
if (ne->closed)
free_netevent(ne);
else if (ne->read.enabled && ne->read.triggered)
ne->net_read_io(this, this->thread);
else if (!ne->read.enabled)
ne->ep.modify(-EVENTIO_READ);
}
while ((ne = write_ready_list.dequeue())) {
set_cont_flags(ne->get_control_flags());
if (ne->closed)
free_netevent(ne);
else if (ne->write.enabled && ne->write.triggered)
write_to_net(this, ne, this->thread);
else if (!ne->write.enabled)
ne->ep.modify(-EVENTIO_WRITE);
}
#endif /* !USE_EDGE_TRIGGER */
}
//
// The main event for NetHandler
int
NetHandler::mainNetEvent(int event, Event *e)
{
if (TS_EVENT_MGMT_UPDATE == event) {
intptr_t idx = reinterpret_cast<intptr_t>(e->cookie);
// Copy to the same offset in the instance struct.
config[idx] = global_config[idx];
if (config_value_affects_per_thread_value[idx]) {
this->configure_per_thread_values();
}
return EVENT_CONT;
} else {
ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
return this->waitForActivity(-1);
}
}
int
NetHandler::waitForActivity(ink_hrtime timeout)
{
EventIO *epd = nullptr;
NET_INCREMENT_DYN_STAT(net_handler_run_stat);
SCOPED_MUTEX_LOCK(lock, mutex, this->thread);
process_enabled_list();
// Polling event by PollCont
PollCont *p = get_PollCont(this->thread);
p->do_poll(timeout);
// Get & Process polling result
PollDescriptor *pd = get_PollDescriptor(this->thread);
NetEvent *ne = nullptr;
for (int x = 0; x < pd->result; x++) {
epd = static_cast<EventIO *> get_ev_data(pd, x);
if (epd->type == EVENTIO_READWRITE_VC) {
ne = epd->data.ne;
// Remove triggered NetEvent from cop_list because it won't be timeout before next InactivityCop runs.
if (cop_list.in(ne)) {
cop_list.remove(ne);
}
int flags = get_ev_events(pd, x);
if (flags & (EVENTIO_ERROR)) {
ne->set_error_from_socket();
}
if (flags & (EVENTIO_READ)) {
ne->read.triggered = 1;
if (!read_ready_list.in(ne)) {
read_ready_list.enqueue(ne);
}
}
if (flags & (EVENTIO_WRITE)) {
ne->write.triggered = 1;
if (!write_ready_list.in(ne)) {
write_ready_list.enqueue(ne);
}
} else if (!(flags & (EVENTIO_READ))) {
Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", flags);
// In practice we sometimes see EPOLLERR and EPOLLHUP through there
// Anything else would be surprising
ink_assert((flags & ~(EVENTIO_ERROR)) == 0);
ne->write.triggered = 1;
if (!write_ready_list.in(ne)) {
write_ready_list.enqueue(ne);
}
}
} else if (epd->type == EVENTIO_DNS_CONNECTION) {
if (epd->data.dnscon != nullptr) {
epd->data.dnscon->trigger(); // Make sure the DNSHandler for this con knows we triggered
#if defined(USE_EDGE_TRIGGER)
epd->refresh(EVENTIO_READ);
#endif
}
} else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
net_signal_hook_callback(this->thread);
} else if (epd->type == EVENTIO_NETACCEPT) {
this->thread->schedule_imm(epd->data.c);
}
ev_next_event(pd, x);
}
pd->result = 0;
process_ready_list();
return EVENT_CONT;
}
void
NetHandler::signalActivity()
{
#if HAVE_EVENTFD
uint64_t counter = 1;
ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
#elif TS_USE_PORT
PollDescriptor *pd = get_PollDescriptor(thread);
ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
#else
char dummy = 1;
ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
#endif
}
bool
NetHandler::manage_active_queue(NetEvent *enabling_ne, bool ignore_queue_size = false)
{
const int total_connections_in = active_queue_size + keep_alive_queue_size;
Debug("v_net_queue",
"max_connections_per_thread_in: %d max_requests_per_thread_in: %d total_connections_in: %d "
"active_queue_size: %d keep_alive_queue_size: %d",
max_connections_per_thread_in, max_requests_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
if (!max_requests_per_thread_in) {
// active queue has no max
return true;
}
if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) {
return true;
}
ink_hrtime now = Thread::get_hrtime();
// loop over the non-active connections and try to close them
NetEvent *ne = active_queue.head;
NetEvent *ne_next = nullptr;
int closed = 0;
int handle_event = 0;
int total_idle_time = 0;
int total_idle_count = 0;
for (; ne != nullptr; ne = ne_next) {
ne_next = ne->active_queue_link.next;
// It seems dangerous closing the current ne at this point
// Let the activity_cop deal with it
if (ne == enabling_ne) {
continue;
}
if ((ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at <= now) ||
(ne->next_activity_timeout_at && ne->next_activity_timeout_at <= now)) {
_close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count);
}
if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) {
return true;
}
}
if (max_requests_per_thread_in > active_queue_size) {
return true;
}
return false; // failed to make room in the queue, all connections are active
}
void
NetHandler::configure_per_thread_values()
{
// figure out the number of threads and calculate the number of connections per thread
int threads = eventProcessor.thread_group[ET_NET]._count;
max_connections_per_thread_in = config.max_connections_in / threads;
max_requests_per_thread_in = config.max_requests_in / threads;
Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads);
Debug("net_queue", "max_requests_per_thread_in updated to %d threads: %d", max_requests_per_thread_in, threads);
}
void
NetHandler::manage_keep_alive_queue()
{
uint32_t total_connections_in = active_queue_size + keep_alive_queue_size;
ink_hrtime now = Thread::get_hrtime();
Debug("v_net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size: %d keep_alive_queue_size: %d",
max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
if (!max_connections_per_thread_in || total_connections_in <= max_connections_per_thread_in) {
return;
}
// loop over the non-active connections and try to close them
NetEvent *ne_next = nullptr;
int closed = 0;
int handle_event = 0;
int total_idle_time = 0;
int total_idle_count = 0;
for (NetEvent *ne = keep_alive_queue.head; ne != nullptr; ne = ne_next) {
ne_next = ne->keep_alive_queue_link.next;
_close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count);
total_connections_in = active_queue_size + keep_alive_queue_size;
if (total_connections_in <= max_connections_per_thread_in) {
break;
}
}
if (total_idle_count > 0) {
Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event: %d mean idle: %d",
max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed, handle_event,
total_idle_time / total_idle_count);
}
}
void
NetHandler::_close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count)
{
if (ne->get_thread() != this_ethread()) {
return;
}
MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread());
if (!lock.is_locked()) {
return;
}
ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND;
if (diff > 0) {
total_idle_time += diff;
++total_idle_count;
NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
}
Debug("net_queue", "closing connection NetEvent=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, ne,
keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(ne->next_inactivity_timeout_at),
ink_hrtime_to_sec(ne->inactivity_timeout_in), diff);
if (ne->closed) {
free_netevent(ne);
++closed;
} else {
ne->next_inactivity_timeout_at = now;
// create a dummy event
Event event;
event.ethread = this_ethread();
if (ne->inactivity_timeout_in && ne->next_inactivity_timeout_at <= now) {
if (ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) {
++handle_event;
}
} else if (ne->active_timeout_in && ne->next_activity_timeout_at <= now) {
if (ne->callback(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) {
++handle_event;
}
}
}
}
void
NetHandler::add_to_keep_alive_queue(NetEvent *ne)
{
Debug("net_queue", "NetEvent: %p", ne);
ink_assert(mutex->thread_holding == this_ethread());
if (keep_alive_queue.in(ne)) {
// already in the keep-alive queue, move the head
keep_alive_queue.remove(ne);
} else {
// in the active queue or no queue, new to this queue
remove_from_active_queue(ne);
++keep_alive_queue_size;
}
keep_alive_queue.enqueue(ne);
// if keep-alive queue is over size then close connections
manage_keep_alive_queue();
}
void
NetHandler::remove_from_keep_alive_queue(NetEvent *ne)
{
Debug("net_queue", "NetEvent: %p", ne);
ink_assert(mutex->thread_holding == this_ethread());
if (keep_alive_queue.in(ne)) {
keep_alive_queue.remove(ne);
--keep_alive_queue_size;
}
}
bool
NetHandler::add_to_active_queue(NetEvent *ne)
{
Debug("net_queue", "NetEvent: %p", ne);
Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d",
max_connections_per_thread_in, active_queue_size, keep_alive_queue_size);
ink_assert(mutex->thread_holding == this_ethread());
bool active_queue_full = false;
// if active queue is over size then close inactive connections
if (manage_active_queue(ne) == false) {
active_queue_full = true;
}
if (active_queue.in(ne)) {
// already in the active queue, move the head
active_queue.remove(ne);
} else {
if (active_queue_full) {
// there is no room left in the queue
NET_SUM_DYN_STAT(net_requests_max_throttled_in_stat, 1);
return false;
}
// in the keep-alive queue or no queue, new to this queue
remove_from_keep_alive_queue(ne);
++active_queue_size;
}
active_queue.enqueue(ne);
return true;
}
void
NetHandler::remove_from_active_queue(NetEvent *ne)
{
Debug("net_queue", "NetEvent: %p", ne);
ink_assert(mutex->thread_holding == this_ethread());
if (active_queue.in(ne)) {
active_queue.remove(ne);
--active_queue_size;
}
}