blob: fb422c6e453a867b9becc030033f3c89a3db45aa [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "P_Net.h"
#ifdef ROUNDUP
#undef ROUNDUP
#endif
#define ROUNDUP(x, y) ((((x)+((y)-1))/(y))*(y))
typedef int (NetAccept::*NetAcceptHandler) (int, void *);
volatile int dummy_volatile = 0;
int accept_till_done = 1;
void
safe_delay(int msec)
{
socketManager.poll(0, 0, msec);
}
//
// Send the throttling message to up to THROTTLE_AT_ONCE connections,
// delaying to let some of the current connections complete
//
int
send_throttle_message(NetAccept * na)
{
struct pollfd afd;
Connection con[100];
char dummy_read_request[4096];
afd.fd = na->server.fd;
afd.events = POLLIN;
int n = 0;
while (check_net_throttle(ACCEPT, ink_get_hrtime()) && n < THROTTLE_AT_ONCE - 1
&& (socketManager.poll(&afd, 1, 0) > 0)) {
int res = 0;
if ((res = na->server.accept(&con[n])) < 0)
return res;
n++;
}
safe_delay(NET_THROTTLE_DELAY / 2);
int i = 0;
for (i = 0; i < n; i++) {
socketManager.read(con[i].fd, dummy_read_request, 4096);
socketManager.write(con[i].fd, unix_netProcessor.throttle_error_message,
strlen(unix_netProcessor.throttle_error_message));
}
safe_delay(NET_THROTTLE_DELAY / 2);
for (i = 0; i < n; i++)
con[i].close();
return 0;
}
//
// General case network connection accept code
//
int
net_accept(NetAccept * na, void *ep, bool blockable)
{
Event *e = (Event *) ep;
int res = 0;
int count = 0;
int loop = accept_till_done;
UnixNetVConnection *vc = NULL;
if (!blockable)
if (!MUTEX_TAKE_TRY_LOCK_FOR(na->action_->mutex, e->ethread, na->action_->continuation))
return 0;
//do-while for accepting all the connections
//added by YTS Team, yamsat
do {
vc = (UnixNetVConnection *) na->alloc_cache;
if (!vc) {
vc = na->allocateThread(e->ethread);
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
vc->id = net_next_connection_number();
na->alloc_cache = vc;
}
if ((res = na->server.accept(&vc->con)) < 0) {
if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE)
goto Ldone;
if (na->server.fd != NO_FD && !na->action_->cancelled) {
if (!blockable)
na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
else {
MUTEX_LOCK(lock, na->action_->mutex, e->ethread);
na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
}
}
count = res;
goto Ldone;
}
count++;
na->alloc_cache = NULL;
vc->submit_time = ink_get_hrtime();
ats_ip_copy(&vc->server_addr, &vc->con.addr);
vc->mutex = new_ProxyMutex();
vc->action_ = *na->action_;
vc->set_is_transparent(na->server.f_inbound_transparent);
vc->closed = 0;
SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
if (e->ethread->is_event_type(na->etype))
vc->handleEvent(EVENT_NONE, e);
else
eventProcessor.schedule_imm(vc, na->etype);
} while (loop);
Ldone:
if (!blockable)
MUTEX_UNTAKE_LOCK(na->action_->mutex, e->ethread);
return count;
}
UnixNetVConnection *
NetAccept::allocateThread(EThread * t)
{
return ((UnixNetVConnection *)THREAD_ALLOC(netVCAllocator, t));
}
void
NetAccept::freeThread(UnixNetVConnection * vc, EThread * t)
{
ink_assert(!vc->from_accept_thread);
THREAD_FREE(vc, netVCAllocator, t);
}
// This allocates directly on the class allocator, used for accept threads.
UnixNetVConnection *
NetAccept::allocateGlobal()
{
return (UnixNetVConnection *)netVCAllocator.alloc();
}
// Virtual function allows the correct
// etype to be used in NetAccept functions (ET_SSL
// or ET_NET).
EventType NetAccept::getEtype()
{
return etype;
}
//
// Initialize the NetAccept for execution in its own thread.
// This should be done for low latency, high connection rate sockets.
//
void
NetAccept::init_accept_loop(const char *thr_name)
{
size_t stacksize;
REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
eventProcessor.spawn_thread(this, thr_name, stacksize);
}
//
// Initialize the NetAccept for execution in a etype thread.
// This should be done for low connection rate sockets.
// (Management, Cluster, etc.) Also, since it adapts to the
// number of connections arriving, it should be reasonable to
// use it for high connection rates as well.
//
void
NetAccept::init_accept(EThread * t)
{
if (!t)
t = eventProcessor.assign_thread(etype);
if (!action_->continuation->mutex) {
action_->continuation->mutex = t->mutex;
action_->mutex = t->mutex;
}
if (do_listen(NON_BLOCKING))
return;
SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
period = ACCEPT_PERIOD;
t->schedule_every(this, period, etype);
}
void
NetAccept::init_accept_per_thread()
{
int i, n;
if (do_listen(NON_BLOCKING))
return;
if (accept_fn == net_accept)
SET_HANDLER((NetAcceptHandler) & NetAccept::acceptFastEvent);
else
SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
period = ACCEPT_PERIOD;
NetAccept *a;
n = eventProcessor.n_threads_for_type[ET_NET];
for (i = 0; i < n; i++) {
if (i < n - 1) {
a = NEW(new NetAccept);
*a = *this;
} else
a = this;
EThread *t = eventProcessor.eventthread[ET_NET][i];
PollDescriptor *pd = get_PollDescriptor(t);
if (a->ep.start(pd, a, EVENTIO_READ) < 0)
Warning("[NetAccept::init_accept_per_thread]:error starting EventIO");
a->mutex = get_NetHandler(t)->mutex;
t->schedule_every(a, period, etype);
}
}
int
NetAccept::do_listen(bool non_blocking, bool transparent)
{
int res = 0;
if (server.fd != NO_FD) {
if ((res = server.setup_fd_for_listen(non_blocking, recv_bufsize, send_bufsize, transparent))) {
Warning("unable to listen on main accept port %d: errno = %d, %s", ntohs(server.accept_addr.port()), errno, strerror(errno));
goto Lretry;
}
} else {
Lretry:
if ((res = server.listen(non_blocking, recv_bufsize, send_bufsize, transparent)))
Warning("unable to listen on port %d: %d %d, %s", ntohs(server.accept_addr.port()), res, errno, strerror(errno));
}
if (callback_on_open && !action_->cancelled) {
if (res)
action_->continuation->handleEvent(NET_EVENT_ACCEPT_FAILED, this);
else
action_->continuation->handleEvent(NET_EVENT_ACCEPT_SUCCEED, this);
mutex = NULL;
}
return res;
}
int
NetAccept::do_blocking_accept(EThread * t)
{
int res = 0;
int loop = accept_till_done;
UnixNetVConnection *vc = NULL;
//do-while for accepting all the connections
//added by YTS Team, yamsat
do {
vc = (UnixNetVConnection *)alloc_cache;
if (likely(!vc)) {
//vc = allocateThread(t);
vc = allocateGlobal(); // Bypass proxy / thread allocator
vc->from_accept_thread = true;
vc->id = net_next_connection_number();
alloc_cache = vc;
}
ink_hrtime now = ink_get_hrtime();
// Throttle accepts
while (!backdoor && check_net_throttle(ACCEPT, now)) {
check_throttle_warning();
if (!unix_netProcessor.throttle_error_message) {
safe_delay(NET_THROTTLE_DELAY);
} else if (send_throttle_message(this) < 0) {
goto Lerror;
}
now = ink_get_hrtime();
}
if ((res = server.accept(&vc->con)) < 0) {
Lerror:
int seriousness = accept_error_seriousness(res);
if (seriousness >= 0) { // not so bad
if (!seriousness) // bad enough to warn about
check_transient_accept_error(res);
safe_delay(NET_THROTTLE_DELAY);
return 0;
}
if (!action_->cancelled) {
MUTEX_LOCK(lock, action_->mutex, t);
action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
MUTEX_UNTAKE_LOCK(action_->mutex, t);
Warning("accept thread received fatal error: errno = %d", errno);
}
return -1;
}
check_emergency_throttle(vc->con);
alloc_cache = NULL;
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
vc->submit_time = now;
ats_ip_copy(&vc->server_addr, &vc->con.addr);
vc->set_is_transparent(server.f_inbound_transparent);
vc->mutex = new_ProxyMutex();
vc->action_ = *action_;
SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
//eventProcessor.schedule_imm(vc, getEtype());
eventProcessor.schedule_imm_signal(vc, getEtype());
} while (loop);
return 1;
}
int
NetAccept::acceptEvent(int event, void *ep)
{
(void) event;
Event *e = (Event *) ep;
//PollDescriptor *pd = get_PollDescriptor(e->ethread);
ProxyMutex *m = 0;
if (action_->mutex)
m = action_->mutex;
else
m = mutex;
MUTEX_TRY_LOCK(lock, m, e->ethread);
if (lock) {
if (action_->cancelled) {
e->cancel();
NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
delete this;
return EVENT_DONE;
}
//ink_assert(ifd < 0 || event == EVENT_INTERVAL || (pd->nfds > ifd && pd->pfd[ifd].fd == server.fd));
//if (ifd < 0 || event == EVENT_INTERVAL || (pd->pfd[ifd].revents & (POLLIN | POLLERR | POLLHUP | POLLNVAL))) {
//ink_assert(!"incomplete");
int res;
if ((res = accept_fn(this, e, false)) < 0) {
NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
/* INKqa11179 */
Warning("Accept on port %d failed with error no %d",
ats_ip_port_host_order(&server.addr), res
);
Warning("Traffic Server may be unable to accept more network" "connections on %d",
ats_ip_port_host_order(&server.addr)
);
e->cancel();
delete this;
return EVENT_DONE;
}
//}
}
return EVENT_CONT;
}
int
NetAccept::acceptFastEvent(int event, void *ep)
{
Event *e = (Event *) ep;
(void) event;
(void) e;
int bufsz, res;
PollDescriptor *pd = get_PollDescriptor(e->ethread);
UnixNetVConnection *vc = NULL;
int loop = accept_till_done;
do {
if (!backdoor && check_net_throttle(ACCEPT, ink_get_hrtime())) {
ifd = -1;
return EVENT_CONT;
}
vc = allocateThread(e->ethread);
socklen_t sz = sizeof(vc->con.addr);
int fd = socketManager.accept(server.fd, &vc->con.addr.sa, &sz);
if (likely(fd >= 0)) {
Debug("iocore_net", "accepted a new socket: %d", fd);
if (send_bufsize > 0) {
if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
bufsz = ROUNDUP(send_bufsize, 1024);
while (bufsz > 0) {
if (!socketManager.set_sndbuf_size(fd, bufsz))
break;
bufsz -= 1024;
}
}
}
if (recv_bufsize > 0) {
if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
bufsz = ROUNDUP(recv_bufsize, 1024);
while (bufsz > 0) {
if (!socketManager.set_rcvbuf_size(fd, bufsz))
break;
bufsz -= 1024;
}
}
}
if (sockopt_flags & 1) { // we have to disable Nagle
safe_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int));
Debug("socket", "::acceptFastEvent: setsockopt() TCP_NODELAY on socket");
}
if (sockopt_flags & 2) {
safe_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int));
Debug("socket", "::acceptFastEvent: setsockopt() SO_KEEPALIVE on socket");
}
#if TS_HAS_SO_MARK
if (packet_mark != 0) {
safe_setsockopt(fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
}
#endif
#if TS_HAS_IP_TOS
if (packet_tos != 0) {
safe_setsockopt(fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
}
#endif
do {
res = safe_nonblocking(fd);
} while (res < 0 && (errno == EAGAIN || errno == EINTR));
} else {
res = fd;
}
if (res < 0) {
res = -errno;
if (res == -EAGAIN || res == -ECONNABORTED
#if defined(linux)
|| res == -EPIPE
#endif
) {
ink_assert(vc->con.fd == NO_FD);
ink_assert(!vc->link.next && !vc->link.prev);
freeThread(vc, e->ethread);
goto Ldone;
} else if (accept_error_seriousness(res) >= 0) {
check_transient_accept_error(res);
freeThread(vc, e->ethread);
goto Ldone;
}
if (!action_->cancelled)
action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
goto Lerror;
}
vc->con.fd = fd;
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
vc->id = net_next_connection_number();
vc->submit_time = ink_get_hrtime();
ats_ip_copy(&vc->server_addr, &vc->con.addr);
vc->set_is_transparent(server.f_inbound_transparent);
vc->mutex = new_ProxyMutex();
vc->thread = e->ethread;
vc->nh = get_NetHandler(e->ethread);
SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::mainEvent);
if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
Warning("[NetAccept::acceptFastEvent]: Error in inserting fd[%d] in kevent\n", vc->con.fd);
close_UnixNetVConnection(vc, e->ethread);
return EVENT_DONE;
}
vc->nh->open_list.enqueue(vc);
#ifdef USE_EDGE_TRIGGER
// Set the vc as triggered and place it in the read ready queue in case there is already data on the socket.
Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue");
vc->read.triggered = 1;
vc->nh->read_ready_list.enqueue(vc);
#endif
if (!action_->cancelled)
action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc);
else
close_UnixNetVConnection(vc, e->ethread);
} while (loop);
Ldone:
return EVENT_CONT;
Lerror:
server.close();
e->cancel();
freeThread(vc, e->ethread);
NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
delete this;
return EVENT_DONE;
}
int
NetAccept::acceptLoopEvent(int event, Event * e)
{
(void) event;
(void) e;
EThread *t = this_ethread();
while (1)
do_blocking_accept(t);
// Don't think this ever happens ...
NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
delete this;
return EVENT_DONE;
}
//
// Accept Event handler
//
//
NetAccept::NetAccept()
: Continuation(NULL),
period(0),
alloc_cache(0),
ifd(-1),
callback_on_open(false),
backdoor(false),
recv_bufsize(0),
send_bufsize(0),
sockopt_flags(0),
packet_mark(0),
packet_tos(0),
etype(0)
{ }
//
// Stop listening. When the next poll takes place, an error will result.
// THIS ONLY WORKS WITH POLLING STYLE ACCEPTS!
//
void
NetAccept::cancel()
{
action_->cancel();
server.close();
}