blob: 3e4c61f84e4516e7c56267ed0fdc47ec57c9c277 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
UnixUDPNet.cc
UDPNet implementation
****************************************************************************/
#include "P_Net.h"
typedef int (UDPNetHandler::*UDPNetContHandler) (int, void *);
inkcoreapi ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
inkcoreapi ClassAllocator<UDPWorkContinuation> udpWorkContinuationAllocator("udpWorkContinuationAllocator");
EventType ET_UDP;
#if (HOST_OS == linux) && !defined(DEBUG)
#define NODIAGS
#endif
//
// Global Data
//
UDPNetProcessorInternal udpNetInternal;
UDPNetProcessor &udpNet = udpNetInternal;
inku64 g_udp_bytesPending;
ink32 g_udp_periodicCleanupSlots;
ink32 g_udp_periodicFreeCancelledPkts;
ink32 g_udp_numSendRetries;
#include "P_LibBulkIO.h"
void *G_bulkIOState = NULL;
//
// Public functions
// See header for documentation
//
InkPipeInfo G_inkPipeInfo;
int G_bwGrapherFd;
struct sockaddr_in G_bwGrapherLoc;
void
initialize_thread_for_udp_net(EThread * thread)
{
new((ink_dummy_for_new *) get_UDPPollCont(thread)) PollCont(thread->mutex);
new((ink_dummy_for_new *) get_UDPNetHandler(thread)) UDPNetHandler;
#if defined(USE_LIBEV)
PollCont *pc = get_UDPPollCont(thread);
PollDescriptor *pd = pc->pollDescriptor;
pd->eio = ev_loop_new(LIBEV_BACKEND_LIST);
#endif
// These are hidden variables that control the amount of memory used by UDP
// packets. As usual, defaults are in RecordsConfig.cc
// This variable controls how often we cleanup the cancelled packets.
// If it is set to 0, then cleanup never occurs.
REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec");
// This variable controls how many "slots" of the udp calendar queue we cleanup.
// If it is set to 0, then cleanup never occurs. This value makes sense
// only if the above variable is set.
REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.periodic_cleanup");
// UDP sends can fail with errno=EAGAIN. This variable determines the # of
// times the UDP thread retries before giving up. Set to 0 to keep trying forever.
REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries");
g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries;
thread->schedule_every(get_UDPPollCont(thread), -9);
thread->schedule_imm(get_UDPNetHandler(thread));
Debug("bulk-io", "%s bulk-io for sends", G_bulkIOState ? "Using" : "Not using");
}
int
UDPNetProcessorInternal::start(int n_upd_threads)
{
if (n_upd_threads < 1)
return -1;
ET_UDP = eventProcessor.spawn_event_threads(n_upd_threads);
if (ET_UDP < 0) // Probably can't happen, maybe at some point EventType should be unsigned ?
return -1;
pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
for (int i = 0; i < eventProcessor.n_threads_for_type[ET_UDP]; i++)
initialize_thread_for_udp_net(eventProcessor.eventthread[ET_UDP][i]);
return 0;
}
void
UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler * nh,
UDPConnection * xuc, PollDescriptor * pd, EThread * thread)
{
(void) thread;
UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
// receive packet and queue onto UDPConnection.
// don't call back connection at this time.
int r;
int iters = 0;
do {
struct sockaddr_in fromaddr;
socklen_t fromlen = sizeof(fromaddr);
// XXX: want to be 0 copy.
// XXX: really should read into next contiguous region of an IOBufferData
// which gets referenced by IOBufferBlock.
char buf[65536];
int buflen = sizeof(buf);
r = socketManager.recvfrom(uc->getFd(), buf, buflen, 0, (struct sockaddr *) &fromaddr, &fromlen);
if (r <= 0) {
// error
break;
}
// create packet
UDPPacket *p = new_incoming_UDPPacket(&fromaddr, buf, r);
p->setConnection(uc);
// XXX: is this expensive? I] really want to know this information
p->setArrivalTime(ink_get_hrtime_internal());
// queue onto the UDPConnection
ink_atomiclist_push(&uc->inQueue, p);
iters++;
} while (r > 0);
if (iters >= 1) {
Debug("udp-read", "read %d at a time", iters);
}
// if not already on to-be-called-back queue, then add it.
if (!uc->onCallbackQueue) {
ink_assert(uc->callback_link.next == NULL);
ink_assert(uc->callback_link.prev == NULL);
uc->AddRef();
nh->udp_callbacks.enqueue(uc);
uc->onCallbackQueue = 1;
}
}
int
UDPNetProcessorInternal::udp_callback(UDPNetHandler * nh, UDPConnection * xuc, EThread * thread)
{
(void) nh;
UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
if (uc->continuation && uc->mutex) {
MUTEX_TRY_LOCK_FOR(lock, uc->mutex, thread, uc->continuation);
if (!lock) {
return 1;
}
uc->AddRef();
uc->callbackHandler(0, 0);
return 0;
} else {
ink_assert(!"doesn't reach here");
if (!uc->callbackAction) {
uc->AddRef();
uc->callbackAction = eventProcessor.schedule_imm(uc);
}
return 0;
}
}
// cheesy implementation of a asynchronous read and callback for Unix
class UDPReadContinuation:public Continuation
{
public:
UDPReadContinuation(Event * completionToken);
UDPReadContinuation();
~UDPReadContinuation();
inline void free(void);
inline void init_token(Event * completionToken);
inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen); // start up polling
void set_timer(int seconds)
{
timeout_interval = HRTIME_SECONDS(seconds);
}
void cancel();
int readPollEvent(int event, Event * e);
Action *getAction()
{
return event;
}
void setupPollDescriptor();
private:
Event * event; // the completion event token created
// on behalf of the client
Ptr<IOBufferBlock> readbuf;
int readlen;
struct sockaddr *fromaddr;
socklen_t *fromaddrlen;
int fd; // fd we are reading from
int ifd; // poll fd index
ink_hrtime period; // polling period
ink_hrtime elapsed_time;
ink_hrtime timeout_interval;
};
ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator");
UDPReadContinuation::UDPReadContinuation(Event * completionToken)
: Continuation(NULL), event(completionToken), readbuf(NULL),
readlen(0), fromaddrlen(0), fd(-1), ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
{
if (completionToken->continuation)
this->mutex = completionToken->continuation->mutex;
else
this->mutex = new_ProxyMutex();
}
UDPReadContinuation::UDPReadContinuation()
: Continuation(NULL), event(0), readbuf(NULL),
readlen(0), fromaddrlen(0), fd(-1), ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
{
}
inline void
UDPReadContinuation::free(void)
{
ink_assert(event != NULL);
completionUtil::destroy(event);
event = NULL;
readbuf = NULL;
readlen = 0;
fromaddrlen = 0;
fd = -1;
ifd = -1;
period = 0;
elapsed_time = 0;
timeout_interval = 0;
mutex = NULL;
udpReadContAllocator.free(this);
}
inline void
UDPReadContinuation::init_token(Event * completionToken)
{
if (completionToken->continuation) {
this->mutex = completionToken->continuation->mutex;
} else {
this->mutex = new_ProxyMutex();
}
event = completionToken;
}
inline void
UDPReadContinuation::init_read(int rfd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_)
{
ink_assert(rfd >= 0 && buf != NULL && fromaddr_ != NULL && fromaddrlen_ != NULL);
fd = rfd;
readbuf = buf;
readlen = len;
fromaddr = fromaddr_;
fromaddrlen = fromaddrlen_;
SET_HANDLER(&UDPReadContinuation::readPollEvent);
period = NET_PERIOD;
setupPollDescriptor();
this_ethread()->schedule_every(this, period);
}
UDPReadContinuation::~UDPReadContinuation()
{
ink_assert(event != NULL);
completionUtil::destroy(event);
event = NULL;
}
void
UDPReadContinuation::cancel()
{
// I don't think this actually cancels it correctly right now.
event->cancel();
}
void
UDPReadContinuation::setupPollDescriptor()
{
#ifdef USE_EPOLL
Pollfd *pfd;
EThread *et = (EThread *) this_thread();
PollCont *pc = get_PollCont(et);
pfd = pc->nextPollDescriptor->alloc();
pfd->fd = fd;
ifd = pfd - pc->nextPollDescriptor->pfd;
ink_assert(pc->nextPollDescriptor->nfds > ifd);
pfd->events = POLLIN;
pfd->revents = 0;
#endif
}
int
UDPReadContinuation::readPollEvent(int event_, Event * e)
{
(void) event_;
(void) e;
int res;
//PollCont *pc = get_PollCont(e->ethread);
Continuation *c;
if (event->cancelled) {
e->cancel();
free();
// delete this;
return EVENT_DONE;
}
// See if the request has timed out
if (timeout_interval) {
elapsed_time += -period;
if (elapsed_time >= timeout_interval) {
c = completionUtil::getContinuation(event);
res = c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
e->cancel();
free();
// delete this;
return EVENT_DONE;
}
}
//ink_assert(ifd < 0 || event_ == EVENT_INTERVAL || (event_ == EVENT_POLL && pc->pollDescriptor->nfds > ifd && pc->pollDescriptor->pfd[ifd].fd == fd));
//if (ifd < 0 || event_ == EVENT_INTERVAL || (pc->pollDescriptor->pfd[ifd].revents & POLLIN)) {
ink_debug_assert(!"incomplete");
c = completionUtil::getContinuation(event);
// do read
socklen_t tmp_fromlen = *fromaddrlen;
int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen,
0, // default flags
fromaddr, &tmp_fromlen);
completionUtil::setThread(event, e->ethread);
// call back user with their event
if (rlen > 0) {
// do callback if read is successful
*fromaddrlen = tmp_fromlen;
completionUtil::setInfo(event, fd, readbuf, rlen, errno);
readbuf->fill(rlen);
res = c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
e->cancel();
free();
// delete this;
return EVENT_DONE;
} else if (rlen < 0 && rlen != -EAGAIN) {
// signal error.
*fromaddrlen = tmp_fromlen;
completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno);
c = completionUtil::getContinuation(event);
res = c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
e->cancel();
free();
//delete this;
return EVENT_DONE;
} else {
completionUtil::setThread(event, NULL);
}
//}
if (event->cancelled) {
e->cancel();
free();
//delete this;
return EVENT_DONE;
}
// reestablish poll
setupPollDescriptor();
return EVENT_CONT;
}
/* recvfrom:
* Unix:
* assert(buf->write_avail() >= len);
* *actual_len = recvfrom(fd,addr,buf->end(),len)
* if successful then
* buf->fill(*actual_len);
* return ACTION_RESULT_DONE
* else if nothing read
* *actual_len is 0
* create "UDP read continuation" C with 'cont's lock
* set user callback to 'cont'
* return C's action.
* else
* return error;
*/
Action *
UDPNetProcessor::recvfrom_re(Continuation * cont,
void *token,
int fd,
struct sockaddr * fromaddr, socklen_t *fromaddrlen,
IOBufferBlock * buf, int len, bool useReadCont, int timeout)
{
(void) useReadCont;
ink_assert(buf->write_avail() >= len);
int actual;
Event *event = completionUtil::create();
completionUtil::setContinuation(event, cont);
completionUtil::setHandle(event, token);
actual = socketManager.recvfrom(fd, buf->end(), len, 0, // default flags
fromaddr, fromaddrlen);
if (actual > 0) {
completionUtil::setThread(event, this_ethread());
completionUtil::setInfo(event, fd, buf, actual, errno);
buf->fill(actual);
cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
completionUtil::destroy(event);
return ACTION_RESULT_DONE;
} else if (actual == 0 || (actual < 0 && actual == -EAGAIN)) {
UDPReadContinuation *c = udpReadContAllocator.alloc();
c->init_token(event);
c->init_read(fd, buf, len, fromaddr, fromaddrlen);
if (timeout) {
c->set_timer(timeout);
}
return event;
} else {
completionUtil::setThread(event, this_ethread());
completionUtil::setInfo(event, fd, buf, actual, errno);
cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
completionUtil::destroy(event);
return ACTION_IO_ERROR;
}
}
/* sendmsg:
* Unix:
* *actual_len = sendmsg(fd,msg,default-flags);
* if successful,
* return ACTION_RESULT_DONE
* else
* return error
*/
Action *
UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msghdr * msg)
{
int actual;
Event *event = completionUtil::create();
completionUtil::setContinuation(event, cont);
completionUtil::setHandle(event, token);
actual = socketManager.sendmsg(fd, msg, 0);
if (actual >= 0) {
completionUtil::setThread(event, this_ethread());
completionUtil::setInfo(event, fd, msg, actual, errno);
cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event);
completionUtil::destroy(event);
return ACTION_RESULT_DONE;
} else {
completionUtil::setThread(event, this_ethread());
completionUtil::setInfo(event, fd, msg, actual, errno);
cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event);
completionUtil::destroy(event);
return ACTION_IO_ERROR;
}
}
/* sendto:
* If this were implemented, it might be implemented like this:
* Unix:
* call sendto(fd,addr,buf->reader()->start(),len);
* if successful,
* buf->consume(len);
* return ACTION_RESULT_DONE
* else
* return error
*
*/
Action *
UDPNetProcessor::sendto_re(Continuation * cont,
void *token, int fd, struct sockaddr * toaddr, int toaddrlen, IOBufferBlock * buf, int len)
{
(void) token;
ink_assert(buf->read_avail() >= len);
int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0,
toaddr, toaddrlen);
if (nbytes_sent >= 0) {
ink_assert(nbytes_sent == len);
buf->consume(nbytes_sent);
cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *) -1);
return ACTION_RESULT_DONE;
} else {
cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)(intptr_t)nbytes_sent);
return ACTION_IO_ERROR;
}
}
Action *
UDPNetProcessor::UDPCreatePortPairs(Continuation * cont,
int nPairs,
unsigned int myIP, unsigned int destIP, int send_bufsize, int recv_bufsize)
{
UDPWorkContinuation *worker = udpWorkContinuationAllocator.alloc();
// UDPWorkContinuation *worker = NEW(new UDPWorkContinuation);
worker->init(cont, nPairs, myIP, destIP, send_bufsize, recv_bufsize);
eventProcessor.schedule_imm(worker, ET_UDP);
return &(worker->action);
}
bool
UDPNetProcessor::CreateUDPSocket(int *resfd,
struct sockaddr_in * addr,
Action ** status, int my_port, unsigned int my_ip, int send_bufsize, int recv_bufsize)
{
int res = 0, fd = -1;
struct sockaddr_in bind_sa;
struct sockaddr_in myaddr;
int myaddr_len = sizeof(myaddr);
*resfd = -1;
if ((res = socketManager.socket(AF_INET, SOCK_DGRAM, 0)) < 0)
goto HardError;
fd = res;
if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0)
goto HardError;
memset(&bind_sa, 0, sizeof(bind_sa));
bind_sa.sin_family = AF_INET;
bind_sa.sin_port = htons(my_port);
bind_sa.sin_addr.s_addr = my_ip;
if ((res = socketManager.ink_bind(fd, (struct sockaddr *) &bind_sa, sizeof(bind_sa), IPPROTO_UDP)) < 0) {
unsigned char *pt = (unsigned char *) &my_ip;
Debug("udpnet", "ink bind failed --- my_ip = %d.%d.%d.%d", pt[0], pt[1], pt[2], pt[3]);
goto SoftError;
}
if (recv_bufsize) {
if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize)))
Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
}
if (send_bufsize) {
if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize)))
Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
}
if ((res = safe_getsockname(fd, (struct sockaddr *) &myaddr, &myaddr_len)) < 0) {
Debug("udpnet", "CreateUdpsocket: getsockname didnt' work");
goto HardError;
}
if (!myaddr.sin_addr.s_addr) {
// set to default IP address for machine
/** netfixme ... this_machine() is in proxy.
if (this_machine()) {
myaddr.sin_addr.s_addr = this_machine()->ip;
} else {
Debug("udpnet","CreateUdpSocket -- machine not initialized");
}
*/
}
*resfd = fd;
memcpy(addr, &myaddr, myaddr_len);
*status = NULL;
Debug("udpnet", "creating a udp socket port = %d, %d---success", my_port, addr->sin_port);
return true;
SoftError:
Debug("udpnet", "creating a udp socket port = %d---soft failure", my_port);
if (fd != -1)
socketManager.close(fd, keSocket);
*resfd = -1;
*status = NULL;
return false;
HardError:
Debug("udpnet", "creating a udp socket port = %d---hard failure", my_port);
if (fd != -1)
socketManager.close(fd, keSocket);
*resfd = -1;
*status = ACTION_IO_ERROR;
return false;
}
void
UDPNetProcessor::UDPClassifyConnection(Continuation * udpConn, int destIP)
{
int i;
UDPConnectionInternal *p = (UDPConnectionInternal *) udpConn;
if (G_inkPipeInfo.numPipes == 0) {
p->pipe_class = 0;
return;
}
p->pipe_class = -1;
// find a match: 0 is best-effort
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
if (G_inkPipeInfo.perPipeInfo[i].destIP == destIP)
p->pipe_class = i;
// no match; set it to the destIP=0 class
if (p->pipe_class == -1) {
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
if (G_inkPipeInfo.perPipeInfo[i].destIP == 0) {
p->pipe_class = i;
break;
}
}
Debug("udpnet-pipe", "Pipe class = %d", p->pipe_class);
ink_debug_assert(p->pipe_class != -1);
if (p->pipe_class == -1)
p->pipe_class = 0;
G_inkPipeInfo.perPipeInfo[p->pipe_class].count++;
}
Action *
UDPNetProcessor::UDPBind(Continuation * cont, int my_port, int my_ip, int send_bufsize, int recv_bufsize)
{
int res = 0;
int fd = -1;
UnixUDPConnection *n = NULL;
struct sockaddr_in bind_sa;
struct sockaddr_in myaddr;
int myaddr_len = sizeof(myaddr);
if ((res = socketManager.socket(AF_INET, SOCK_DGRAM, 0)) < 0)
goto Lerror;
fd = res;
if ((res = fcntl(fd, F_SETFL, O_NONBLOCK) < 0))
goto Lerror;
// If this is a class D address (i.e. multicast address), use REUSEADDR.
if ((((unsigned int) (ntohl(my_ip))) & 0xf0000000) == 0xe0000000) {
int enable_reuseaddr = 1;
if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable_reuseaddr, sizeof(enable_reuseaddr)) < 0)) {
goto Lerror;
}
}
memset(&bind_sa, 0, sizeof(bind_sa));
bind_sa.sin_family = AF_INET;
bind_sa.sin_port = htons(my_port);
bind_sa.sin_addr.s_addr = my_ip;
if ((res = socketManager.ink_bind(fd, (struct sockaddr *) &bind_sa, sizeof(bind_sa))) < 0) {
goto Lerror;
}
if (recv_bufsize) {
if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize)))
Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
}
if (send_bufsize) {
if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize)))
Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
}
if ((res = safe_getsockname(fd, (struct sockaddr *) &myaddr, &myaddr_len)) < 0) {
goto Lerror;
}
if (!myaddr.sin_addr.s_addr) {
// set to default IP address for machine
/** netfixme this_machine is in proxy/
if (this_machine()) {
myaddr.sin_addr.s_addr = this_machine()->ip;
} else {
Debug("udpnet","UDPNetProcessor::UDPBind -- machine not initialized");
}
*/
}
n = NEW(new UnixUDPConnection(fd));
Debug("udpnet", "UDPNetProcessor::UDPBind: %x fd=%d", n, fd);
n->setBinding(&myaddr);
n->bindToThread(cont);
cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n);
return ACTION_RESULT_DONE;
Lerror:
if (fd != NO_FD)
socketManager.close(fd, keSocket);
cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, NULL);
return ACTION_IO_ERROR;
}
bool
UDPNetProcessor::AllocBandwidth(Continuation * udpConn, double desiredMbps)
{
UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
ink64 desiredbps = (ink64) (desiredMbps * 1024.0 * 1024.0);
if (G_inkPipeInfo.numPipes == 0) {
udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
return true;
}
if ((udpIntConn->pipe_class == 0) ||
(G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc + desiredbps >
G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit)) {
Debug("udpnet-admit", "Denying flow with %lf Mbps", desiredMbps);
return false;
}
udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
udpIntConn->allocedbps = desiredbps;
ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, desiredbps);
Debug("udpnet-admit", "Admitting flow with %lf Mbps (a=%lld, lim=%lld)",
desiredMbps,
G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc,
G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit);
return true;
}
bool
UDPNetProcessor::ChangeBandwidth(Continuation * udpConn, double desiredMbps)
{
UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
ink64 desiredbps = (ink64) (desiredMbps * 1024.0 * 1024.0);
ink64 oldbps = (ink64) (udpIntConn->flowRateBps * 8.0);
if (G_inkPipeInfo.numPipes == 0) {
udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
return true;
}
// arithmetic here is in bits-per-sec.
if ((udpIntConn->pipe_class == 0) ||
(G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc +
desiredbps - oldbps) > G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit) {
Debug("udpnet-admit", "Unable to change b/w for flow to %lf Mbps", desiredMbps);
return false;
}
udpIntConn->flowRateBps = (desiredMbps * 1024.0 * 1024.0) / 8.0;
udpIntConn->allocedbps = desiredbps;
ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, desiredbps - oldbps);
Debug("udpnet-admit", "Changing flow's b/w from %lf Mbps to %lf Mbps (a=%lld, lim=%lld)",
(double) oldbps / (1024.0 * 1024.0),
desiredMbps,
G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc,
G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwLimit);
return true;
}
void
UDPNetProcessor::FreeBandwidth(Continuation * udpConn)
{
UDPConnectionInternal *udpIntConn = (UDPConnectionInternal *) udpConn;
ink64 bps;
if (G_inkPipeInfo.numPipes == 0)
return;
Debug("udpnet-free", "Trying to releasing %lf (%lld) Kbps", udpIntConn->flowRateBps, udpIntConn->allocedbps);
bps = udpIntConn->allocedbps;
if (bps <= 0)
return;
ink_atomic_increment64(&G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc, -bps);
Debug("udpnet-free", "Releasing %lf Kbps", bps / 1024.0);
if (G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc < 0)
G_inkPipeInfo.perPipeInfo[udpIntConn->pipe_class].bwAlloc = 0;
udpIntConn->flowRateBps = 0.0;
udpIntConn->allocedbps = 0;
}
double
UDPNetProcessor::GetAvailableBandwidth()
{
int i;
double usedBw = 0.0;
if (G_inkPipeInfo.numPipes == 0)
// return 100Mbps if there are no pipes
return 100.0;
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
usedBw += G_inkPipeInfo.perPipeInfo[i].bwUsed;
}
return G_inkPipeInfo.interfaceMbps - usedBw;
}
// send out all packets that need to be sent out as of time=now
UDPQueue::UDPQueue()
: last_report(0)
, last_service(0)
, last_byteperiod(0)
, bytesSent(0)
, packets(0)
, added(0)
{
}
UDPQueue::~UDPQueue()
{
UDPPacketInternal *p;
while ((p = reliabilityPktQueue.dequeue()) != NULL)
p->free();
}
/*
* Driver function that aggregates packets across cont's and sends them
*/
void
UDPQueue::service(UDPNetHandler * nh)
{
ink_hrtime now = ink_get_hrtime_internal();
inku64 timeSpent = 0;
UDPPacketInternal *p;
ink_hrtime pktSendTime;
double minPktSpacing;
inku32 pktSize;
ink32 pktLen;
int i;
bool addToGuaranteedQ;
(void) nh;
static ink_hrtime lastPrintTime = ink_get_hrtime_internal();
static ink_hrtime lastSchedTime = ink_get_hrtime_internal();
static inku32 schedJitter = 0;
static inku32 numTimesSched = 0;
schedJitter += ink_hrtime_to_msec(now - lastSchedTime);
numTimesSched++;
p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue);
if (p) {
UDPPacketInternal *pnext = NULL;
Queue<UDPPacketInternal> stk;
while (p) {
pnext = p->alink.next;
p->alink.next = NULL;
stk.push(p);
p = pnext;
}
// walk backwards down list since this is actually an atomic stack.
while (stk.head) {
p = stk.pop();
ink_assert(p->link.prev == NULL);
ink_assert(p->link.next == NULL);
if (p->isReliabilityPkt) {
reliabilityPktQueue.enqueue(p);
continue;
}
// insert into our queue.
Debug("udp-send", "Adding 0x%x", p);
addToGuaranteedQ = ((p->conn->pipe_class > 0) && (p->conn->flowRateBps > 10.0));
pktLen = p->getPktLength();
if (p->conn->lastPktStartTime == 0) {
p->pktSendStartTime = MAX(now, p->delivery_time);
} else {
pktSize = MAX(INK_ETHERNET_MTU_SIZE, pktLen);
if (addToGuaranteedQ) {
// NOTE: this is flow rate in Bytes per sec.; convert to milli-sec.
minPktSpacing = 1000.0 / (p->conn->flowRateBps / p->conn->avgPktSize);
pktSendTime = p->conn->lastPktStartTime + ink_hrtime_from_msec((inku32) minPktSpacing);
} else {
minPktSpacing = 0.0;
pktSendTime = p->delivery_time;
}
p->pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
if (p->conn->flowRateBps > 25600.0)
Debug("udpnet-pkt", "Pkt size = %.1lf now = %lld, send = %lld, del = %lld, Delay delta = %lld; delta = %lld",
p->conn->avgPktSize,
now, pktSendTime, p->delivery_time,
ink_hrtime_to_msec(p->pktSendStartTime - now),
ink_hrtime_to_msec(p->pktSendStartTime - p->conn->lastPktStartTime));
p->conn->avgPktSize = ((4.0 * p->conn->avgPktSize) / 5.0) + (pktSize / 5.0);
}
p->conn->lastPktStartTime = p->pktSendStartTime;
p->delivery_time = p->pktSendStartTime;
p->conn->nBytesTodo += pktLen;
g_udp_bytesPending += pktLen;
if (addToGuaranteedQ)
G_inkPipeInfo.perPipeInfo[p->conn->pipe_class].queue->addPacket(p, now);
else {
// stick in the best-effort queue: either it was a best-effort flow or
// the thingy wasn't alloc'ed bandwidth
G_inkPipeInfo.perPipeInfo[0].queue->addPacket(p, now);
}
}
}
if ((now - lastPrintTime) > ink_hrtime_from_sec(30)) {
Debug("udp-pending-packets", "udp bytes pending: %lld", g_udp_bytesPending);
Debug("udp-sched-jitter", "avg. udp sched jitter: %f", (double) schedJitter / numTimesSched);
schedJitter = 0;
numTimesSched = 0;
lastPrintTime = now;
}
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++)
G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
if (G_bulkIOState) {
BulkIOSend();
} else {
SendPackets();
}
timeSpent = ink_hrtime_to_msec(now - last_report);
if (timeSpent > 10000) {
// if (bytesSent > 0)
// timespent is in milli-seconds
char temp[2048], *p1;
char bwMessage[2048];
double bw, totalBw;
unsigned char *ip;
temp[0] = '\0';
bwMessage[0] = '\0';
p1 = temp;
if (bytesSent > 0)
totalBw = (bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
else
totalBw = 1.0;
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
// bw is in Mbps
bw = (G_inkPipeInfo.perPipeInfo[i].bytesSent * 8.0 * 1000.0) / (timeSpent * 1024.0 * 1024.0);
snprintf(p1, sizeof(temp), "\t class[%d] = %f Mbps, alloc = %f Mbps, (conf'ed = %f, got = %f) \n",
i, bw, (G_inkPipeInfo.perPipeInfo[i].bwAlloc / (1024.0 * 1024.0)),
G_inkPipeInfo.perPipeInfo[i].wt, bw / totalBw);
p1 += strlen(p1);
ip = (unsigned char *) &(G_inkPipeInfo.perPipeInfo[i].destIP);
// use a weighted estimator of current usage
G_inkPipeInfo.perPipeInfo[i].bwUsed = (4.0 * G_inkPipeInfo.perPipeInfo[i].bwUsed / 5.0) + (bw / 5.0);
G_inkPipeInfo.perPipeInfo[i].bytesSent = 0;
G_inkPipeInfo.perPipeInfo[i].pktsSent = 0;
}
if (temp[0])
Debug("udpnet-bw", "B/w: %f Mbps; breakdown: \n%s", totalBw, temp);
bytesSent = 0;
last_report = now;
added = 0;
packets = 0;
}
last_service = now;
}
void
UDPQueue::SendPackets()
{
UDPPacketInternal *p;
static ink_hrtime lastCleanupTime = ink_get_hrtime_internal();
ink_hrtime now = ink_get_hrtime_internal();
// ink_hrtime send_threshold_time = now + HRTIME_MSECONDS(5);
// send packets for SLOT_TIME per attempt
ink_hrtime send_threshold_time = now + SLOT_TIME;
ink32 bytesThisSlot = INT_MAX, bytesUsed = 0, reliabilityBytes = 0;
ink32 bytesThisPipe, sentOne, i;
ink32 pktLen;
ink_hrtime timeDelta = 0;
if (now > last_service)
timeDelta = ink_hrtime_to_msec(now - last_service);
if (G_inkPipeInfo.numPipes > 0) {
bytesThisSlot = (ink32) (((G_inkPipeInfo.reliabilityMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta);
if (bytesThisSlot == 0) {
// use at most 10% for reliability
bytesThisSlot = (ink32) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) / (8.0 * 1000.0)) * timeDelta * 0.1);
reliabilityBytes = bytesThisSlot;
}
}
while ((p = reliabilityPktQueue.dequeue()) != NULL) {
pktLen = p->getPktLength();
g_udp_bytesPending -= pktLen;
p->conn->nBytesTodo -= pktLen;
p->conn->nBytesDone += pktLen;
if (p->conn->shouldDestroy())
goto next_pkt_3;
if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
goto next_pkt_3;
SendUDPPacket(p, pktLen);
bytesThisSlot -= pktLen;
if (bytesThisSlot < 0)
break;
next_pkt_3:
p->free();
}
if (G_inkPipeInfo.numPipes > 0)
bytesThisSlot = (ink32) (((G_inkPipeInfo.interfaceMbps * 1024.0 * 1024.0) /
(8.0 * 1000.0)) * timeDelta - reliabilityBytes);
else
bytesThisSlot = INT_MAX;
sendPackets:
sentOne = false;
send_threshold_time = now + SLOT_TIME;
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
bytesThisPipe = (ink32) (bytesThisSlot * G_inkPipeInfo.perPipeInfo[i].wt);
while ((bytesThisPipe > 0) && (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time))) {
p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
pktLen = p->getPktLength();
g_udp_bytesPending -= pktLen;
p->conn->nBytesTodo -= pktLen;
p->conn->nBytesDone += pktLen;
if (p->conn->shouldDestroy())
goto next_pkt;
if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
goto next_pkt;
G_inkPipeInfo.perPipeInfo[i].bytesSent += pktLen;
SendUDPPacket(p, pktLen);
bytesUsed += pktLen;
bytesThisPipe -= pktLen;
next_pkt:
sentOne = true;
p->free();
if (bytesThisPipe < 0)
break;
}
}
bytesThisSlot -= bytesUsed;
if ((bytesThisSlot > 0) && (sentOne)) {
// redistribute the slack...
now = ink_get_hrtime_internal();
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
if (G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(now) == NULL) {
G_inkPipeInfo.perPipeInfo[i].queue->advanceNow(now);
}
}
goto sendPackets;
}
if ((g_udp_periodicFreeCancelledPkts) &&
(now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
inku64 nbytes = g_udp_bytesPending;
ink_hrtime startTime = ink_get_hrtime_internal(), endTime;
for (i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
G_inkPipeInfo.perPipeInfo[i].queue->FreeCancelledPackets(g_udp_periodicCleanupSlots);
}
endTime = ink_get_hrtime_internal();
Debug("udp-pending-packets", "Did cleanup of %d buckets: %lld bytes in %d m.sec",
g_udp_periodicCleanupSlots, nbytes - g_udp_bytesPending, ink_hrtime_to_msec(endTime - startTime));
lastCleanupTime = now;
}
}
void
UDPQueue::SendUDPPacket(UDPPacketInternal * p, ink32 pktLen)
{
IOBufferBlock *b;
struct msghdr msg;
struct iovec iov[32];
int real_len = 0;
int n, count, iov_len = 0;
if (!p->isReliabilityPkt) {
p->conn->SetLastSentPktTSSeqNum(p->pktTSSeqNum);
p->conn->lastSentPktStartTime = p->delivery_time;
}
Debug("udp-send", "Sending 0x%x", p);
#if (HOST_OS != solaris)
msg.msg_control = 0;
msg.msg_controllen = 0;
msg.msg_flags = 0;
#endif
msg.msg_name = (caddr_t) & p->to;
msg.msg_namelen = sizeof(p->to);
iov_len = 0;
bytesSent += pktLen;
for (b = p->chain; b != NULL; b = b->next) {
iov[iov_len].iov_base = (caddr_t) b->start();
iov[iov_len].iov_len = b->size();
real_len += iov[iov_len].iov_len;
iov_len++;
}
msg.msg_iov = iov;
msg.msg_iovlen = iov_len;
count = 0;
while (1) {
// stupid Linux problem: sendmsg can return EAGAIN
n =::sendmsg(p->conn->getFd(), &msg, 0);
if ((n >= 0) || ((n < 0) && (errno != EAGAIN)))
// send succeeded or some random error happened.
break;
if (errno == EAGAIN) {
count++;
if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
// tried too many times; give up
Debug("udpnet", "Send failed: too many retries");
break;
}
}
}
}
#ifndef BULK_IO_SEND_IS_BROKEN
void
UDPQueue::BulkIOSend()
{
ink_assert(!"Don't call here...");
}
#else
void
UDPQueue::BulkIOSend()
{
bool sentOne = false;
UDPPacketInternal *p;
ink_hrtime now = ink_get_hrtime_internal();
ink_hrtime send_threshold_time = now + SLOT_TIME;
for (int i = 0; i < G_inkPipeInfo.numPipes + 1; i++) {
while (p = G_inkPipeInfo.perPipeInfo[i].queue->firstPacket(send_threshold_time)) {
p = G_inkPipeInfo.perPipeInfo[i].queue->getFirstPacket();
sentOne = true;
Debug("bulk-io-pkt", "Adding a packet...");
BulkIOAddPkt(G_bulkIOState, &G_bulkIOAggregator, p, p->conn->getPortNum());
bytesSent += p->getPktLength();
// Now the packet is "sent"; get rid of it
p->free();
}
}
if (sentOne) {
BulkIOFlush(G_bulkIOState, &G_bulkIOAggregator);
}
}
#endif
void
UDPQueue::send(UDPPacket * p)
{
// XXX: maybe fastpath for immediate send?
ink_atomiclist_push(&atomicQueue, p);
}
#undef LINK
UDPNetHandler::UDPNetHandler()
{
mutex = new_ProxyMutex();
ink_atomiclist_init(&udpOutQueue.atomicQueue, "Outgoing UDP Packet queue", ink_offsetof(UDPPacketInternal, alink.next));
ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", ink_offsetof(UnixUDPConnection, newconn_alink.next));
nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
lastCheck = 0;
SET_HANDLER((UDPNetContHandler) & UDPNetHandler::startNetEvent);
}
int
UDPNetHandler::startNetEvent(int event, Event * e)
{
(void) event;
SET_HANDLER((UDPNetContHandler) & UDPNetHandler::mainNetEvent);
trigger_event = e;
e->schedule_every(-HRTIME_MSECONDS(9));
return EVENT_CONT;
}
int
UDPNetHandler::mainNetEvent(int event, Event * e)
{
ink_assert(trigger_event == e && event == EVENT_POLL);
(void) event;
(void) e;
PollCont *pc = get_UDPPollCont(e->ethread);
// handle UDP outgoing engine
udpOutQueue.service(this);
// handle UDP read operations
UnixUDPConnection *uc, *next;
int i;
int nread = 0;
EventIO *temp_eptr = NULL;
for (i = 0; i < pc->pollDescriptor->result; i++) {
temp_eptr = (EventIO*) get_ev_data(pc->pollDescriptor,i);
if ((get_ev_events(pc->pollDescriptor,i) & EVENTIO_READ)
&& temp_eptr->type == EVENTIO_UDP_CONNECTION) {
uc = temp_eptr->data.uc;
ink_assert(uc && uc->mutex && uc->continuation);
ink_assert(uc->refcount >= 1);
if (uc->shouldDestroy()) {
// udp_polling->remove(uc,uc->polling_link);
uc->Release();
} else {
udpNetInternal.udp_read_from_net(this, uc, pc->pollDescriptor, trigger_event->ethread);
nread++;
}
} //if EPOLLIN
} //end for
// remove dead UDP connections
ink_hrtime now = ink_get_hrtime_internal();
if (now >= nextCheck) {
for (uc = udp_polling.head; uc; uc = next) {
ink_assert(uc->mutex && uc->continuation);
ink_assert(uc->refcount >= 1);
next = uc->polling_link.next;
if (uc->shouldDestroy()) {
if (G_inkPipeInfo.numPipes > 0)
G_inkPipeInfo.perPipeInfo[uc->pipe_class].count--;
//changed by YTS Team, yamsat
//udp_polling->remove(uc,uc->polling_link);
uc->Release();
}
}
nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
}
// service UDPConnections with data ready for callback.
Que(UnixUDPConnection, callback_link) q = udp_callbacks;
udp_callbacks.clear();
while ((uc = q.dequeue())) {
ink_assert(uc->mutex && uc->continuation);
if (udpNetInternal.udp_callback(this, uc, trigger_event->ethread)) { // not successful
// schedule on a thread of its own.
ink_assert(uc->callback_link.next == NULL);
ink_assert(uc->callback_link.prev == NULL);
udp_callbacks.enqueue(uc);
} else {
ink_assert(uc->callback_link.next == NULL);
ink_assert(uc->callback_link.prev == NULL);
uc->onCallbackQueue = 0;
uc->Release();
}
}
return EVENT_CONT;
}
/////////////////////////////////////////////////////////////////////
//
// A helper continuation that creates a pair of UDP ports in a non-blocking
// way. This continuation runs on the UDP thread; a run lasts for at most 500ms.
//
/////////////////////////////////////////////////////////////////////
void
UDPWorkContinuation::init(Continuation * c, int numPairs,
unsigned int my_ip, unsigned int dest_ip, int s_bufsize, int r_bufsize)
{
mutex = c->mutex;
cont = c;
action = c;
numPairs = numPairs;
myIP = my_ip;
destIP = dest_ip;
sendbufsize = s_bufsize;
recvbufsize = r_bufsize;
udpConns = NULL;
SET_HANDLER((UDPWorkContinuation_Handler) & UDPWorkContinuation::StateCreatePortPairs);
}
int
UDPWorkContinuation::StateCreatePortPairs(int event, void *data)
{
// int res = 0;
int numUdpPorts = 2 * numPairs;
int fd1 = -1, fd2 = -1;
// struct sockaddr_in bind_sa;
struct sockaddr_in myaddr1, myaddr2;
int portNum, i;
// int myaddr_len = sizeof(myaddr1);
static int lastAllocPort = 10000;
ink_hrtime startTime, endTime;
Action *status;
//epoll changes
PollCont *pc = NULL;
//epoll changes ends here
ink_debug_assert(mutex->thread_holding == this_ethread());
if (action.cancelled) {
action = NULL;
mutex = NULL;
udpWorkContinuationAllocator.free(this);
return EVENT_CONT;
}
startTime = ink_get_hrtime_internal();
udpConns = NEW(new UnixUDPConnection *[numUdpPorts]);
for (i = 0; i < numUdpPorts; i++)
udpConns[i] = NULL;
ink_atomic_swap(&portNum, lastAllocPort);
portNum %= 50000;
if (portNum == 0)
portNum = 10000;
i = 0;
while (i < numUdpPorts) {
if (udpNet.CreateUDPSocket(&fd1, &myaddr1, &status, portNum, myIP, sendbufsize, recvbufsize)) {
if (udpNet.CreateUDPSocket(&fd2, &myaddr2, &status, portNum + 1, myIP, sendbufsize, recvbufsize)) {
udpConns[i] = NEW(new UnixUDPConnection(fd1)); // new_UnixUDPConnection(fd1);
udpConns[i]->setBinding(&myaddr1);
i++;
udpConns[i] = NEW(new UnixUDPConnection(fd2)); // new_UnixUDPConnection(fd2);
udpConns[i]->setBinding(&myaddr2);
i++;
// remember the last alloc'ed port
ink_atomic_swap(&lastAllocPort, portNum + 2);
} else {
if (fd1 != NO_FD)
socketManager.close(fd1, keSocket);
if (status == ACTION_IO_ERROR)
goto Lerror;
}
Debug("udpnet", "Created port pair with ports = %d, %d", portNum, portNum + 1);
} else if (status == ACTION_IO_ERROR)
goto Lerror;
// pick the next port pair value
portNum += 2;
// wrap around at 50K
portNum %= 50000;
if (portNum == 0)
portNum = 10000;
endTime = ink_get_hrtime_internal();
// if we spend more than 500 ms. bail!
if (ink_hrtime_to_msec(endTime - startTime) > 500) {
status = ACTION_IO_ERROR;
goto Lerror;
}
}
for (i = 0; i < numUdpPorts; i++) {
udpNet.UDPClassifyConnection(udpConns[i], destIP);
Debug("udpnet-pipe", "Adding (port = %d) to Pipe class: %d",
udpConns[i]->getPortNum(), udpConns[i]->pipe_class);
}
// assert should *never* fire; we check for this at the begin of the func.
ink_assert(!action.cancelled);
// Bind to threads only on a success. Currently, after you have
// bound to have a thread, the only way to remove a UDPConnection is
// to call destroy(); the thread to which the UDPConnection will
// remove the connection from a linked list and call delete.
for (i = 0; i < numUdpPorts; i++) {
udpConns[i]->bindToThread(cont);
pc = get_UDPPollCont(udpConns[i]->ethread);
udpConns[i]->ep.start(pc->pollDescriptor, udpConns[i], EVENTIO_READ);
}
resultCode = NET_EVENT_DATAGRAM_OPEN;
goto out;
Lerror:
resultCode = NET_EVENT_DATAGRAM_ERROR;
for (i = 0; i < numUdpPorts; i++)
delete udpConns[i];
delete[] udpConns;
udpConns = NULL;
out:
SET_HANDLER((UDPWorkContinuation_Handler) & UDPWorkContinuation::StateDoCallback);
return StateDoCallback(0, NULL);
}
int
UDPWorkContinuation::StateDoCallback(int event, void *data)
{
MUTEX_TRY_LOCK(lock, action.mutex, this_ethread());
if (!lock) {
this_ethread()->schedule_in(this, MUTEX_RETRY_DELAY);
return EVENT_CONT;
}
if (!action.cancelled) {
action.continuation->handleEvent(resultCode, udpConns);
} else {
// else action.cancelled
if (resultCode == NET_EVENT_DATAGRAM_OPEN) {
for (int i = 0; i < numPairs * 2; i++)
// don't call delete on individual connections; the udp thread will do
// that when it cleans up an fd.
udpConns[i]->destroy();
delete[]udpConns; // I think this is OK to delete the array, what we shouldn't do is loop over
udpConns = NULL; // the conns and and do delete udpConns[i].
}
}
action = NULL;
mutex = NULL;
udpWorkContinuationAllocator.free(this);
return EVENT_CONT;
}