| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| /**************************************************************************** |
| |
| UnixUDPNet.cc |
| UDPNet implementation |
| |
| |
| ****************************************************************************/ |
| |
| #if defined(darwin) |
| /* This is for IPV6_PKTINFO and IPV6_RECVPKTINFO */ |
| #define __APPLE_USE_RFC_3542 |
| #endif |
| |
| #include "P_Net.h" |
| #include "P_UDPNet.h" |
| |
| #include "netinet/udp.h" |
| #ifndef UDP_SEGMENT |
| // This is needed because old glibc may not have the constant even if Kernel supports it. |
| #define UDP_SEGMENT 103 |
| #endif |
| |
| using UDPNetContHandler = int (UDPNetHandler::*)(int, void *); |
| |
| ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator"); |
| EventType ET_UDP; |
| |
| // |
| // Global Data |
| // |
| |
| UDPNetProcessorInternal udpNetInternal; |
| UDPNetProcessor &udpNet = udpNetInternal; |
| |
| int32_t g_udp_periodicCleanupSlots; |
| int32_t g_udp_periodicFreeCancelledPkts; |
| int32_t g_udp_numSendRetries; |
| |
| // |
| // Public functions |
| // See header for documentation |
| // |
| int G_bwGrapherFd; |
| sockaddr_in6 G_bwGrapherLoc; |
| |
| void |
| initialize_thread_for_udp_net(EThread *thread) |
| { |
| int enable_gso; |
| REC_ReadConfigInteger(enable_gso, "proxy.config.udp.enable_gso"); |
| |
| UDPNetHandler *nh = get_UDPNetHandler(thread); |
| |
| new (reinterpret_cast<ink_dummy_for_new *>(nh)) UDPNetHandler(enable_gso); |
| new (reinterpret_cast<ink_dummy_for_new *>(get_UDPPollCont(thread))) PollCont(thread->mutex); |
| // The UDPNetHandler cannot be accessed across EThreads. |
| // Because the UDPNetHandler should be called back immediately after UDPPollCont. |
| nh->mutex = thread->mutex.get(); |
| nh->thread = thread; |
| |
| PollCont *upc = get_UDPPollCont(thread); |
| PollDescriptor *upd = upc->pollDescriptor; |
| // due to ET_UDP is really simple, it should sleep for a long time |
| // TODO: fixed size |
| upc->poll_timeout = 100; |
| // This variable controls how often we cleanup the cancelled packets. |
| // If it is set to 0, then cleanup never occurs. |
| REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec"); |
| |
| // This variable controls how many "slots" of the udp calendar queue we cleanup. |
| // If it is set to 0, then cleanup never occurs. This value makes sense |
| // only if the above variable is set. |
| REC_ReadConfigInt32(g_udp_periodicCleanupSlots, "proxy.config.udp.periodic_cleanup"); |
| |
| // UDP sends can fail with errno=EAGAIN. This variable determines the # of |
| // times the UDP thread retries before giving up. Set to 0 to keep trying forever. |
| REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries"); |
| g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries; |
| |
| thread->set_tail_handler(nh); |
| thread->ep = static_cast<EventIO *>(ats_malloc(sizeof(EventIO))); |
| new (thread->ep) EventIO(); |
| thread->ep->type = EVENTIO_ASYNC_SIGNAL; |
| #if HAVE_EVENTFD |
| thread->ep->start(upd, thread->evfd, nullptr, EVENTIO_READ); |
| #else |
| thread->ep->start(upd, thread->evpipe[0], nullptr, EVENTIO_READ); |
| #endif |
| } |
| |
| EventType |
| UDPNetProcessorInternal::register_event_type() |
| { |
| ET_UDP = eventProcessor.register_event_type("ET_UDP"); |
| return ET_UDP; |
| } |
| |
| int |
| UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize) |
| { |
| if (n_upd_threads < 1) { |
| return -1; |
| } |
| |
| pollCont_offset = eventProcessor.allocate(sizeof(PollCont)); |
| udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler)); |
| |
| eventProcessor.schedule_spawn(&initialize_thread_for_udp_net, ET_UDP); |
| eventProcessor.spawn_event_threads(ET_UDP, n_upd_threads, stacksize); |
| |
| return 0; |
| } |
| |
| void |
| UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler *nh, UDPConnection *xuc) |
| { |
| UnixUDPConnection *uc = (UnixUDPConnection *)xuc; |
| |
| // receive packet and queue onto UDPConnection. |
| // don't call back connection at this time. |
| int64_t r; |
| int iters = 0; |
| unsigned max_niov = 32; |
| |
| struct msghdr msg; |
| Ptr<IOBufferBlock> chain, next_chain; |
| struct iovec tiovec[max_niov]; |
| int64_t size_index = BUFFER_SIZE_INDEX_2K; |
| int64_t buffer_size = BUFFER_SIZE_FOR_INDEX(size_index); |
| // The max length of receive buffer is 32 * buffer_size (2048) = 65536 bytes. |
| // Because the 'UDP Length' is type of uint16_t defined in RFC 768. |
| // And there is 8 octets in 'User Datagram Header' which means the max length of payload is no more than 65527 bytes. |
| do { |
| // create IOBufferBlock chain to receive data |
| unsigned int niov; |
| IOBufferBlock *b, *last; |
| |
| // build struct iov |
| // reuse the block in chain if available |
| b = chain.get(); |
| last = nullptr; |
| for (niov = 0; niov < max_niov; niov++) { |
| if (b == nullptr) { |
| b = new_IOBufferBlock(); |
| b->alloc(size_index); |
| if (last == nullptr) { |
| chain = b; |
| } else { |
| last->next = b; |
| } |
| } |
| |
| tiovec[niov].iov_base = b->buf(); |
| tiovec[niov].iov_len = b->block_size(); |
| |
| last = b; |
| b = b->next.get(); |
| } |
| |
| // build struct msghdr |
| sockaddr_in6 fromaddr; |
| sockaddr_in6 toaddr; |
| int toaddr_len = sizeof(toaddr); |
| char *cbuf[1024]; |
| msg.msg_name = &fromaddr; |
| msg.msg_namelen = sizeof(fromaddr); |
| msg.msg_iov = tiovec; |
| msg.msg_iovlen = niov; |
| msg.msg_control = cbuf; |
| msg.msg_controllen = sizeof(cbuf); |
| |
| // receive data by recvmsg |
| r = SocketManager::recvmsg(uc->getFd(), &msg, 0); |
| if (r <= 0) { |
| // error |
| break; |
| } |
| |
| // truncated check |
| if (msg.msg_flags & MSG_TRUNC) { |
| Debug("udp-read", "The UDP packet is truncated"); |
| } |
| |
| // fill the IOBufferBlock chain |
| int64_t saved = r; |
| b = chain.get(); |
| while (b && saved > 0) { |
| if (saved > buffer_size) { |
| b->fill(buffer_size); |
| saved -= buffer_size; |
| b = b->next.get(); |
| } else { |
| b->fill(saved); |
| saved = 0; |
| next_chain = b->next.get(); |
| b->next = nullptr; |
| } |
| } |
| |
| safe_getsockname(xuc->getFd(), reinterpret_cast<struct sockaddr *>(&toaddr), &toaddr_len); |
| for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msg, cmsg)) { |
| switch (cmsg->cmsg_type) { |
| #ifdef IP_PKTINFO |
| case IP_PKTINFO: |
| if (cmsg->cmsg_level == IPPROTO_IP) { |
| struct in_pktinfo *pktinfo = reinterpret_cast<struct in_pktinfo *>(CMSG_DATA(cmsg)); |
| reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = pktinfo->ipi_addr.s_addr; |
| } |
| break; |
| #endif |
| #ifdef IP_RECVDSTADDR |
| case IP_RECVDSTADDR: |
| if (cmsg->cmsg_level == IPPROTO_IP) { |
| struct in_addr *addr = reinterpret_cast<struct in_addr *>(CMSG_DATA(cmsg)); |
| reinterpret_cast<sockaddr_in *>(&toaddr)->sin_addr.s_addr = addr->s_addr; |
| } |
| break; |
| #endif |
| #if defined(IPV6_PKTINFO) || defined(IPV6_RECVPKTINFO) |
| case IPV6_PKTINFO: // IPV6_RECVPKTINFO uses IPV6_PKTINFO too |
| if (cmsg->cmsg_level == IPPROTO_IPV6) { |
| struct in6_pktinfo *pktinfo = reinterpret_cast<struct in6_pktinfo *>(CMSG_DATA(cmsg)); |
| memcpy(toaddr.sin6_addr.s6_addr, &pktinfo->ipi6_addr, 16); |
| } |
| break; |
| #endif |
| } |
| } |
| |
| // create packet |
| UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), ats_ip_sa_cast(&toaddr), chain); |
| p->setConnection(uc); |
| // queue onto the UDPConnection |
| uc->inQueue.push((UDPPacketInternal *)p); |
| |
| // reload the unused block |
| chain = next_chain; |
| next_chain = nullptr; |
| iters++; |
| } while (r > 0); |
| if (iters >= 1) { |
| Debug("udp-read", "read %d at a time", iters); |
| } |
| // if not already on to-be-called-back queue, then add it. |
| if (!uc->onCallbackQueue) { |
| ink_assert(uc->callback_link.next == nullptr); |
| ink_assert(uc->callback_link.prev == nullptr); |
| uc->AddRef(); |
| nh->udp_callbacks.enqueue(uc); |
| uc->onCallbackQueue = 1; |
| } |
| } |
| |
| int |
| UDPNetProcessorInternal::udp_callback(UDPNetHandler *nh, UDPConnection *xuc, EThread *thread) |
| { |
| (void)nh; |
| UnixUDPConnection *uc = (UnixUDPConnection *)xuc; |
| |
| if (uc->continuation && uc->mutex) { |
| MUTEX_TRY_LOCK(lock, uc->mutex, thread); |
| if (!lock.is_locked()) { |
| return 1; |
| } |
| uc->AddRef(); |
| uc->callbackHandler(0, nullptr); |
| return 0; |
| } else { |
| ink_assert(!"doesn't reach here"); |
| if (!uc->callbackAction) { |
| uc->AddRef(); |
| uc->callbackAction = eventProcessor.schedule_imm(uc); |
| } |
| return 0; |
| } |
| } |
| |
| #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef |
| |
| // cheesy implementation of a asynchronous read and callback for Unix |
| class UDPReadContinuation : public Continuation |
| { |
| public: |
| UDPReadContinuation(Event *completionToken); |
| UDPReadContinuation(); |
| ~UDPReadContinuation() override; |
| inline void free(); |
| inline void init_token(Event *completionToken); |
| inline void init_read(int fd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen); |
| |
| void |
| set_timer(int seconds) |
| { |
| timeout_interval = HRTIME_SECONDS(seconds); |
| } |
| |
| void cancel(); |
| int readPollEvent(int event, Event *e); |
| |
| Action * |
| getAction() |
| { |
| return event; |
| } |
| |
| void setupPollDescriptor(); |
| |
| private: |
| Event *event = UNINITIALIZED_EVENT_PTR; // the completion event token created |
| // on behalf of the client |
| Ptr<IOBufferBlock> readbuf{nullptr}; |
| int readlen = 0; |
| struct sockaddr_in6 *fromaddr = nullptr; |
| socklen_t *fromaddrlen = nullptr; |
| int fd = NO_FD; // fd we are reading from |
| int ifd = NO_FD; // poll fd index |
| ink_hrtime period = 0; // polling period |
| ink_hrtime elapsed_time = 0; |
| ink_hrtime timeout_interval = 0; |
| }; |
| |
| ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator"); |
| |
| UDPReadContinuation::UDPReadContinuation(Event *completionToken) |
| : Continuation(nullptr), |
| event(completionToken), |
| readbuf(nullptr), |
| |
| fd(-1), |
| ifd(-1) |
| |
| { |
| if (completionToken->continuation) { |
| this->mutex = completionToken->continuation->mutex; |
| } else { |
| this->mutex = new_ProxyMutex(); |
| } |
| } |
| |
| UDPReadContinuation::UDPReadContinuation() : Continuation(nullptr) {} |
| |
| inline void |
| UDPReadContinuation::free() |
| { |
| ink_assert(event != nullptr); |
| completionUtil::destroy(event); |
| event = nullptr; |
| readbuf = nullptr; |
| readlen = 0; |
| fromaddrlen = nullptr; |
| fd = -1; |
| ifd = -1; |
| period = 0; |
| elapsed_time = 0; |
| timeout_interval = 0; |
| mutex = nullptr; |
| udpReadContAllocator.free(this); |
| } |
| |
| inline void |
| UDPReadContinuation::init_token(Event *completionToken) |
| { |
| if (completionToken->continuation) { |
| this->mutex = completionToken->continuation->mutex; |
| } else { |
| this->mutex = new_ProxyMutex(); |
| } |
| event = completionToken; |
| } |
| |
| inline void |
| UDPReadContinuation::init_read(int rfd, IOBufferBlock *buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_) |
| { |
| ink_assert(rfd >= 0 && buf != nullptr && fromaddr_ != nullptr && fromaddrlen_ != nullptr); |
| fd = rfd; |
| readbuf = buf; |
| readlen = len; |
| fromaddr = ats_ip6_cast(fromaddr_); |
| fromaddrlen = fromaddrlen_; |
| SET_HANDLER(&UDPReadContinuation::readPollEvent); |
| period = -HRTIME_MSECONDS(net_event_period); |
| setupPollDescriptor(); |
| this_ethread()->schedule_every(this, period); |
| } |
| |
| UDPReadContinuation::~UDPReadContinuation() |
| { |
| if (event != UNINITIALIZED_EVENT_PTR) { |
| ink_assert(event != nullptr); |
| completionUtil::destroy(event); |
| event = nullptr; |
| } |
| } |
| |
| void |
| UDPReadContinuation::cancel() |
| { |
| // I don't think this actually cancels it correctly right now. |
| event->cancel(); |
| } |
| |
| void |
| UDPReadContinuation::setupPollDescriptor() |
| { |
| #if TS_USE_EPOLL |
| Pollfd *pfd; |
| EThread *et = (EThread *)this_thread(); |
| PollCont *pc = get_PollCont(et); |
| if (pc->nextPollDescriptor == nullptr) { |
| pc->nextPollDescriptor = new PollDescriptor(); |
| } |
| pfd = pc->nextPollDescriptor->alloc(); |
| pfd->fd = fd; |
| ifd = pfd - pc->nextPollDescriptor->pfd; |
| ink_assert(pc->nextPollDescriptor->nfds > ifd); |
| pfd->events = POLLIN; |
| pfd->revents = 0; |
| #endif |
| } |
| |
| int |
| UDPReadContinuation::readPollEvent(int event_, Event *e) |
| { |
| (void)event_; |
| (void)e; |
| |
| // PollCont *pc = get_PollCont(e->ethread); |
| Continuation *c; |
| |
| if (event->cancelled) { |
| e->cancel(); |
| free(); |
| return EVENT_DONE; |
| } |
| |
| // See if the request has timed out |
| if (timeout_interval) { |
| elapsed_time += -period; |
| if (elapsed_time >= timeout_interval) { |
| c = completionUtil::getContinuation(event); |
| // TODO: Should we deal with the return code? |
| c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event); |
| e->cancel(); |
| free(); |
| return EVENT_DONE; |
| } |
| } |
| |
| c = completionUtil::getContinuation(event); |
| // do read |
| socklen_t tmp_fromlen = *fromaddrlen; |
| int rlen = SocketManager::recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen); |
| |
| completionUtil::setThread(event, e->ethread); |
| // call back user with their event |
| if (rlen > 0) { |
| // do callback if read is successful |
| *fromaddrlen = tmp_fromlen; |
| completionUtil::setInfo(event, fd, readbuf, rlen, errno); |
| readbuf->fill(rlen); |
| // TODO: Should we deal with the return code? |
| c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event); |
| e->cancel(); |
| free(); |
| |
| return EVENT_DONE; |
| } else if (rlen < 0 && rlen != -EAGAIN) { |
| // signal error. |
| *fromaddrlen = tmp_fromlen; |
| completionUtil::setInfo(event, fd, readbuf, rlen, errno); |
| c = completionUtil::getContinuation(event); |
| // TODO: Should we deal with the return code? |
| c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event); |
| e->cancel(); |
| free(); |
| |
| return EVENT_DONE; |
| } else { |
| completionUtil::setThread(event, nullptr); |
| } |
| |
| if (event->cancelled) { |
| e->cancel(); |
| free(); |
| |
| return EVENT_DONE; |
| } |
| // reestablish poll |
| setupPollDescriptor(); |
| |
| return EVENT_CONT; |
| } |
| |
| /* recvfrom: |
| * Unix: |
| * assert(buf->write_avail() >= len); |
| * *actual_len = recvfrom(fd,addr,buf->end(),len) |
| * if successful then |
| * buf->fill(*actual_len); |
| * return ACTION_RESULT_DONE |
| * else if nothing read |
| * *actual_len is 0 |
| * create "UDP read continuation" C with 'cont's lock |
| * set user callback to 'cont' |
| * return C's action. |
| * else |
| * return error; |
| */ |
| Action * |
| UDPNetProcessor::recvfrom_re(Continuation *cont, void *token, int fd, struct sockaddr *fromaddr, socklen_t *fromaddrlen, |
| IOBufferBlock *buf, int len, bool useReadCont, int timeout) |
| { |
| (void)useReadCont; |
| ink_assert(buf->write_avail() >= len); |
| int actual; |
| Event *event = completionUtil::create(); |
| |
| completionUtil::setContinuation(event, cont); |
| completionUtil::setHandle(event, token); |
| actual = SocketManager::recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen); |
| |
| if (actual > 0) { |
| completionUtil::setThread(event, this_ethread()); |
| completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno); |
| buf->fill(actual); |
| cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event); |
| completionUtil::destroy(event); |
| return ACTION_RESULT_DONE; |
| } else if (actual == 0 || actual == -EAGAIN) { |
| UDPReadContinuation *c = udpReadContAllocator.alloc(); |
| c->init_token(event); |
| c->init_read(fd, buf, len, fromaddr, fromaddrlen); |
| if (timeout) { |
| c->set_timer(timeout); |
| } |
| return event; |
| } else { |
| completionUtil::setThread(event, this_ethread()); |
| completionUtil::setInfo(event, fd, make_ptr(buf), actual, errno); |
| cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event); |
| completionUtil::destroy(event); |
| return ACTION_IO_ERROR; |
| } |
| } |
| |
| /* sendmsg: |
| * Unix: |
| * *actual_len = sendmsg(fd,msg,default-flags); |
| * if successful, |
| * return ACTION_RESULT_DONE |
| * else |
| * return error |
| */ |
| Action * |
| UDPNetProcessor::sendmsg_re(Continuation *cont, void *token, int fd, struct msghdr *msg) |
| { |
| int actual; |
| Event *event = completionUtil::create(); |
| |
| completionUtil::setContinuation(event, cont); |
| completionUtil::setHandle(event, token); |
| |
| actual = SocketManager::sendmsg(fd, msg, 0); |
| if (actual >= 0) { |
| completionUtil::setThread(event, this_ethread()); |
| completionUtil::setInfo(event, fd, msg, actual, errno); |
| cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event); |
| completionUtil::destroy(event); |
| return ACTION_RESULT_DONE; |
| } else { |
| completionUtil::setThread(event, this_ethread()); |
| completionUtil::setInfo(event, fd, msg, actual, errno); |
| cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event); |
| completionUtil::destroy(event); |
| return ACTION_IO_ERROR; |
| } |
| } |
| |
| /* sendto: |
| * If this were implemented, it might be implemented like this: |
| * Unix: |
| * call sendto(fd,addr,buf->reader()->start(),len); |
| * if successful, |
| * buf->consume(len); |
| * return ACTION_RESULT_DONE |
| * else |
| * return error |
| * |
| */ |
| Action * |
| UDPNetProcessor::sendto_re(Continuation *cont, void *token, int fd, struct sockaddr const *toaddr, int toaddrlen, |
| IOBufferBlock *buf, int len) |
| { |
| (void)token; |
| ink_assert(buf->read_avail() >= len); |
| int nbytes_sent = SocketManager::sendto(fd, buf->start(), len, 0, toaddr, toaddrlen); |
| |
| if (nbytes_sent >= 0) { |
| ink_assert(nbytes_sent == len); |
| buf->consume(nbytes_sent); |
| cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *)-1); |
| return ACTION_RESULT_DONE; |
| } else { |
| cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)static_cast<intptr_t>(nbytes_sent)); |
| return ACTION_IO_ERROR; |
| } |
| } |
| |
| bool |
| UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const *remote_addr, Action **status, NetVCOptions &opt) |
| { |
| int res = 0, fd = -1; |
| int local_addr_len; |
| IpEndpoint local_addr; |
| |
| // Need to do address calculations first, so we can determine the |
| // address family for socket creation. |
| ink_zero(local_addr); |
| |
| bool is_any_address = false; |
| if (NetVCOptions::FOREIGN_ADDR == opt.addr_binding || NetVCOptions::INTF_ADDR == opt.addr_binding) { |
| // Same for now, transparency for foreign addresses must be handled |
| // *after* the socket is created, and we need to do this calculation |
| // before the socket to get the IP family correct. |
| ink_release_assert(opt.local_ip.isValid()); |
| local_addr.assign(opt.local_ip, htons(opt.local_port)); |
| ink_assert(ats_ip_are_compatible(remote_addr, &local_addr.sa)); |
| } else { |
| // No local address specified, so use family option if possible. |
| int family = ats_is_ip(opt.ip_family) ? opt.ip_family : AF_INET; |
| local_addr.setToAnyAddr(family); |
| is_any_address = true; |
| local_addr.network_order_port() = htons(opt.local_port); |
| } |
| |
| *resfd = -1; |
| if ((res = SocketManager::socket(remote_addr->sa_family, SOCK_DGRAM, 0)) < 0) { |
| goto HardError; |
| } |
| |
| fd = res; |
| if (safe_fcntl(fd, F_SETFL, O_NONBLOCK) < 0) { |
| goto HardError; |
| } |
| |
| if (opt.socket_recv_bufsize > 0) { |
| if (unlikely(SocketManager::set_rcvbuf_size(fd, opt.socket_recv_bufsize))) { |
| Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_recv_bufsize); |
| } |
| } |
| if (opt.socket_send_bufsize > 0) { |
| if (unlikely(SocketManager::set_sndbuf_size(fd, opt.socket_send_bufsize))) { |
| Debug("udpnet", "set_dnsbuf_size(%d) failed", opt.socket_send_bufsize); |
| } |
| } |
| |
| if (opt.ip_family == AF_INET) { |
| bool succeeded = false; |
| int enable = 1; |
| #ifdef IP_PKTINFO |
| if (safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| #ifdef IP_RECVDSTADDR |
| if (safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| if (!succeeded) { |
| Debug("udpnet", "setsockeopt for pktinfo failed"); |
| goto HardError; |
| } |
| } else if (opt.ip_family == AF_INET6) { |
| bool succeeded = false; |
| int enable = 1; |
| #ifdef IPV6_PKTINFO |
| if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| #ifdef IPV6_RECVPKTINFO |
| if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| if (!succeeded) { |
| Debug("udpnet", "setsockeopt for pktinfo failed"); |
| goto HardError; |
| } |
| } |
| |
| if (local_addr.network_order_port() || !is_any_address) { |
| if (-1 == SocketManager::ink_bind(fd, &local_addr.sa, ats_ip_size(&local_addr.sa))) { |
| char buff[INET6_ADDRPORTSTRLEN]; |
| Debug("udpnet", "ink bind failed on %s", ats_ip_nptop(local_addr, buff, sizeof(buff))); |
| goto SoftError; |
| } |
| |
| if (safe_getsockname(fd, &local_addr.sa, &local_addr_len) < 0) { |
| Debug("udpnet", "CreateUdpsocket: getsockname didn't work"); |
| goto HardError; |
| } |
| } |
| |
| *resfd = fd; |
| *status = nullptr; |
| Debug("udpnet", "creating a udp socket port = %d, %d---success", ats_ip_port_host_order(remote_addr), |
| ats_ip_port_host_order(local_addr)); |
| return true; |
| SoftError: |
| Debug("udpnet", "creating a udp socket port = %d---soft failure", ats_ip_port_host_order(local_addr)); |
| if (fd != -1) { |
| SocketManager::close(fd); |
| } |
| *resfd = -1; |
| *status = nullptr; |
| return false; |
| HardError: |
| Debug("udpnet", "creating a udp socket port = %d---hard failure", ats_ip_port_host_order(local_addr)); |
| if (fd != -1) { |
| SocketManager::close(fd); |
| } |
| *resfd = -1; |
| *status = ACTION_IO_ERROR; |
| return false; |
| } |
| |
| Action * |
| UDPNetProcessor::UDPBind(Continuation *cont, sockaddr const *addr, int fd, int send_bufsize, int recv_bufsize) |
| { |
| int res = 0; |
| UnixUDPConnection *n = nullptr; |
| IpEndpoint myaddr; |
| int myaddr_len = sizeof(myaddr); |
| PollCont *pc = nullptr; |
| PollDescriptor *pd = nullptr; |
| bool need_bind = true; |
| |
| if (fd == -1) { |
| if ((res = SocketManager::socket(addr->sa_family, SOCK_DGRAM, 0)) < 0) { |
| goto Lerror; |
| } |
| fd = res; |
| } else { |
| need_bind = false; |
| } |
| if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) { |
| goto Lerror; |
| } |
| |
| if (addr->sa_family == AF_INET) { |
| bool succeeded = false; |
| int enable = 1; |
| #ifdef IP_PKTINFO |
| if (safe_setsockopt(fd, IPPROTO_IP, IP_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| #ifdef IP_RECVDSTADDR |
| if (safe_setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| if (!succeeded) { |
| Debug("udpnet", "setsockeopt for pktinfo failed"); |
| goto Lerror; |
| } |
| } else if (addr->sa_family == AF_INET6) { |
| bool succeeded = false; |
| int enable = 1; |
| #ifdef IPV6_PKTINFO |
| if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_PKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| #ifdef IPV6_RECVPKTINFO |
| if (safe_setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, reinterpret_cast<char *>(&enable), sizeof(enable)) == 0) { |
| succeeded = true; |
| } |
| #endif |
| if (!succeeded) { |
| Debug("udpnet", "setsockeopt for pktinfo failed"); |
| goto Lerror; |
| } |
| } |
| |
| // If this is a class D address (i.e. multicast address), use REUSEADDR. |
| if (ats_is_ip_multicast(addr)) { |
| int enable_reuseaddr = 1; |
| |
| if (safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&enable_reuseaddr), sizeof(enable_reuseaddr)) < 0) { |
| goto Lerror; |
| } |
| } |
| |
| if (need_bind && ats_is_ip6(addr) && safe_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, SOCKOPT_ON, sizeof(int)) < 0) { |
| goto Lerror; |
| } |
| |
| if (safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, SOCKOPT_ON, sizeof(int)) < 0) { |
| Debug("udpnet", "setsockopt for SO_REUSEPORT failed"); |
| goto Lerror; |
| } |
| |
| #ifdef SO_REUSEPORT_LB |
| if (safe_setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, SOCKOPT_ON, sizeof(int)) < 0) { |
| Debug("udpnet", "setsockopt for SO_REUSEPORT_LB failed"); |
| goto Lerror; |
| } |
| #endif |
| |
| if (need_bind && (SocketManager::ink_bind(fd, addr, ats_ip_size(addr)) < 0)) { |
| Debug("udpnet", "ink_bind failed"); |
| goto Lerror; |
| } |
| |
| if (recv_bufsize) { |
| if (unlikely(SocketManager::set_rcvbuf_size(fd, recv_bufsize))) { |
| Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize); |
| } |
| } |
| if (send_bufsize) { |
| if (unlikely(SocketManager::set_sndbuf_size(fd, send_bufsize))) { |
| Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize); |
| } |
| } |
| if (safe_getsockname(fd, &myaddr.sa, &myaddr_len) < 0) { |
| goto Lerror; |
| } |
| n = new UnixUDPConnection(fd); |
| |
| Debug("udpnet", "UDPNetProcessor::UDPBind: %p fd=%d", n, fd); |
| n->setBinding(&myaddr.sa); |
| n->bindToThread(cont, cont->getThreadAffinity()); |
| |
| pc = get_UDPPollCont(n->ethread); |
| pd = pc->pollDescriptor; |
| |
| n->ep.start(pd, n, EVENTIO_READ); |
| |
| cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n); |
| return ACTION_RESULT_DONE; |
| Lerror: |
| if (fd != NO_FD) { |
| SocketManager::close(fd); |
| } |
| Debug("udpnet", "Error: %s (%d)", strerror(errno), errno); |
| |
| cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, nullptr); |
| return ACTION_IO_ERROR; |
| } |
| |
| // send out all packets that need to be sent out as of time=now |
| #ifdef SOL_UDP |
| UDPQueue::UDPQueue(bool enable_gso) : use_udp_gso(enable_gso) {} |
| #else |
| UDPQueue::UDPQueue(bool enable_gso) |
| { |
| if (enable_gso) { |
| Warning("Attempted to use UDP GSO per configuration, but it is unavailable"); |
| } |
| } |
| #endif |
| |
| UDPQueue::~UDPQueue() {} |
| |
| /* |
| * Driver function that aggregates packets across cont's and sends them |
| */ |
| void |
| UDPQueue::service(UDPNetHandler *nh) |
| { |
| (void)nh; |
| ink_hrtime now = Thread::get_hrtime_updated(); |
| uint64_t timeSpent = 0; |
| uint64_t pktSendStartTime; |
| ink_hrtime pktSendTime; |
| UDPPacketInternal *p = nullptr; |
| |
| SList(UDPPacketInternal, alink) aq(outQueue.popall()); |
| Queue<UDPPacketInternal> stk; |
| while ((p = aq.pop())) { |
| stk.push(p); |
| } |
| |
| // walk backwards down list since this is actually an atomic stack. |
| while ((p = stk.pop())) { |
| ink_assert(p->link.prev == nullptr); |
| ink_assert(p->link.next == nullptr); |
| // insert into our queue. |
| Debug("udp-send", "Adding %p", p); |
| if (p->conn->lastPktStartTime == 0) { |
| pktSendStartTime = std::max(now, p->delivery_time); |
| } else { |
| pktSendTime = p->delivery_time; |
| pktSendStartTime = std::max(std::max(now, pktSendTime), p->delivery_time); |
| } |
| p->conn->lastPktStartTime = pktSendStartTime; |
| p->delivery_time = pktSendStartTime; |
| |
| pipeInfo.addPacket(p, now); |
| } |
| |
| pipeInfo.advanceNow(now); |
| SendPackets(); |
| |
| timeSpent = ink_hrtime_to_msec(now - last_report); |
| if (timeSpent > 10000) { |
| last_report = now; |
| added = 0; |
| packets = 0; |
| } |
| last_service = now; |
| } |
| |
| void |
| UDPQueue::SendPackets() |
| { |
| UDPPacketInternal *p; |
| static ink_hrtime lastCleanupTime = Thread::get_hrtime_updated(); |
| ink_hrtime now = Thread::get_hrtime_updated(); |
| ink_hrtime send_threshold_time = now + SLOT_TIME; |
| int32_t bytesThisSlot = INT_MAX, bytesUsed = 0; |
| int32_t bytesThisPipe; |
| int64_t pktLen; |
| |
| bytesThisSlot = INT_MAX; |
| |
| #ifdef UIO_MAXIOV |
| constexpr int N_MAX_PACKETS = UIO_MAXIOV; // The limit comes from sendmmsg |
| #else |
| constexpr int N_MAX_PACKETS = 1024; |
| #endif |
| UDPPacketInternal *packets[N_MAX_PACKETS]; |
| int nsent; |
| int npackets; |
| |
| sendPackets: |
| nsent = 0; |
| npackets = 0; |
| bytesThisPipe = bytesThisSlot; |
| |
| while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) { |
| p = pipeInfo.getFirstPacket(); |
| pktLen = p->getPktLength(); |
| |
| if (p->conn->shouldDestroy()) { |
| goto next_pkt; |
| } |
| if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum) { |
| goto next_pkt; |
| } |
| |
| bytesUsed += pktLen; |
| bytesThisPipe -= pktLen; |
| packets[npackets++] = p; |
| next_pkt: |
| if (bytesThisPipe < 0 && npackets == N_MAX_PACKETS) { |
| break; |
| } |
| } |
| |
| if (npackets > 0) { |
| nsent = SendMultipleUDPPackets(packets, npackets); |
| } |
| for (int i = 0; i < nsent; ++i) { |
| packets[i]->free(); |
| } |
| |
| bytesThisSlot -= bytesUsed; |
| |
| if ((bytesThisSlot > 0) && nsent) { |
| // redistribute the slack... |
| now = Thread::get_hrtime_updated(); |
| if (pipeInfo.firstPacket(now) == nullptr) { |
| pipeInfo.advanceNow(now); |
| } |
| goto sendPackets; |
| } |
| |
| if ((g_udp_periodicFreeCancelledPkts) && (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) { |
| pipeInfo.FreeCancelledPackets(g_udp_periodicCleanupSlots); |
| lastCleanupTime = now; |
| } |
| } |
| |
| void |
| UDPQueue::SendUDPPacket(UDPPacketInternal *p) |
| { |
| struct msghdr msg; |
| struct iovec iov[32]; |
| int n, count = 0; |
| |
| p->conn->lastSentPktStartTime = p->delivery_time; |
| Debug("udp-send", "Sending %p", p); |
| |
| msg.msg_control = nullptr; |
| msg.msg_controllen = 0; |
| msg.msg_flags = 0; |
| msg.msg_name = reinterpret_cast<caddr_t>(&p->to.sa); |
| msg.msg_namelen = ats_ip_size(p->to); |
| |
| if (p->segment_size > 0) { |
| ink_assert(p->chain->next == nullptr); |
| msg.msg_iov = iov; |
| msg.msg_iovlen = 1; |
| #ifdef SOL_UDP |
| if (use_udp_gso) { |
| iov[0].iov_base = p->chain.get()->start(); |
| iov[0].iov_len = p->chain.get()->size(); |
| |
| union udp_segment_hdr { |
| char buf[CMSG_SPACE(sizeof(uint16_t))]; |
| struct cmsghdr align; |
| } u; |
| msg.msg_control = u.buf; |
| msg.msg_controllen = sizeof(u.buf); |
| |
| struct cmsghdr *cm = CMSG_FIRSTHDR(&msg); |
| cm->cmsg_level = SOL_UDP; |
| cm->cmsg_type = UDP_SEGMENT; |
| cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); |
| *((uint16_t *)CMSG_DATA(cm)) = p->segment_size; |
| |
| count = 0; |
| while (true) { |
| // stupid Linux problem: sendmsg can return EAGAIN |
| n = ::sendmsg(p->conn->getFd(), &msg, 0); |
| if (n >= 0) { |
| break; |
| } |
| if (errno == EIO && use_udp_gso) { |
| Warning("Disabling UDP GSO due to an error"); |
| use_udp_gso = false; |
| SendUDPPacket(p); |
| return; |
| } |
| if (errno == EAGAIN) { |
| ++count; |
| if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) { |
| // tried too many times; give up |
| Debug("udpnet", "Send failed: too many retries"); |
| return; |
| } |
| } else { |
| Debug("udp-send", "Error: %s (%d)", strerror(errno), errno); |
| return; |
| } |
| } |
| } else { |
| #endif |
| // Send segments seprately if UDP_SEGMENT is not supported |
| int offset = 0; |
| while (offset < p->chain.get()->size()) { |
| iov[0].iov_base = p->chain.get()->start() + offset; |
| iov[0].iov_len = std::min(static_cast<long>(p->segment_size), p->chain.get()->end() - static_cast<char *>(iov[0].iov_base)); |
| |
| count = 0; |
| while (true) { |
| // stupid Linux problem: sendmsg can return EAGAIN |
| n = ::sendmsg(p->conn->getFd(), &msg, 0); |
| if (n >= 0) { |
| break; |
| } |
| if (errno == EAGAIN) { |
| ++count; |
| if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) { |
| // tried too many times; give up |
| Debug("udpnet", "Send failed: too many retries"); |
| return; |
| } |
| } else { |
| Debug("udp-send", "Error: %s (%d)", strerror(errno), errno); |
| return; |
| } |
| } |
| |
| offset += iov[0].iov_len; |
| } |
| ink_assert(offset == p->chain.get()->size()); |
| #ifdef SOL_UDP |
| } // use_udp_segment |
| #endif |
| } else { |
| // Nothing is special |
| int iov_len = 0; |
| for (IOBufferBlock *b = p->chain.get(); b != nullptr; b = b->next.get()) { |
| iov[iov_len].iov_base = static_cast<caddr_t>(b->start()); |
| iov[iov_len].iov_len = b->size(); |
| iov_len++; |
| } |
| msg.msg_iov = iov; |
| msg.msg_iovlen = iov_len; |
| |
| count = 0; |
| while (true) { |
| // stupid Linux problem: sendmsg can return EAGAIN |
| n = ::sendmsg(p->conn->getFd(), &msg, 0); |
| if ((n >= 0) || (errno != EAGAIN)) { |
| // send succeeded or some random error happened. |
| if (n < 0) { |
| Debug("udp-send", "Error: %s (%d)", strerror(errno), errno); |
| } |
| |
| break; |
| } |
| if (errno == EAGAIN) { |
| ++count; |
| if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) { |
| // tried too many times; give up |
| Debug("udpnet", "Send failed: too many retries"); |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| void |
| UDPQueue::send(UDPPacket *p) |
| { |
| // XXX: maybe fastpath for immediate send? |
| outQueue.push((UDPPacketInternal *)p); |
| } |
| |
| int |
| UDPQueue::SendMultipleUDPPackets(UDPPacketInternal **p, uint16_t n) |
| { |
| #ifdef HAVE_SENDMMSG |
| struct mmsghdr *msgvec; |
| int msgvec_size; |
| |
| #ifdef SOL_UDP |
| union udp_segment_hdr { |
| char buf[CMSG_SPACE(sizeof(uint16_t))]; |
| struct cmsghdr align; |
| }; |
| if (use_udp_gso) { |
| msgvec_size = sizeof(struct mmsghdr) * n; |
| } else { |
| msgvec_size = sizeof(struct mmsghdr) * n * 64; |
| } |
| #else |
| msgvec_size = sizeof(struct mmsghdr) * n * 64; |
| #endif |
| msgvec = static_cast<struct mmsghdr *>(alloca(msgvec_size)); |
| memset(msgvec, 0, msgvec_size); |
| |
| int vlen = 0; |
| int fd = p[0]->conn->getFd(); |
| for (int i = 0; i < n; ++i) { |
| UDPPacketInternal *packet; |
| struct msghdr *msg; |
| struct iovec *iov; |
| int iov_len; |
| |
| packet = p[i]; |
| packet->conn->lastSentPktStartTime = packet->delivery_time; |
| ink_assert(packet->conn->getFd() == fd); |
| if (packet->segment_size > 0) { |
| // Presumes one big super buffer is given |
| ink_assert(packet->chain->next == nullptr); |
| #ifdef SOL_UDP |
| if (use_udp_gso) { |
| msg = &msgvec[vlen].msg_hdr; |
| msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa); |
| msg->msg_namelen = ats_ip_size(packet->to); |
| |
| union udp_segment_hdr *u; |
| u = static_cast<union udp_segment_hdr *>(alloca(sizeof(union udp_segment_hdr))); |
| msg->msg_control = u->buf; |
| msg->msg_controllen = sizeof(u->buf); |
| iov = static_cast<struct iovec *>(alloca(sizeof(struct iovec))); |
| iov_len = 1; |
| iov->iov_base = packet->chain.get()->start(); |
| iov->iov_len = packet->chain.get()->size(); |
| msg->msg_iov = iov; |
| msg->msg_iovlen = iov_len; |
| |
| struct cmsghdr *cm = CMSG_FIRSTHDR(msg); |
| cm->cmsg_level = SOL_UDP; |
| cm->cmsg_type = UDP_SEGMENT; |
| cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); |
| *((uint16_t *)CMSG_DATA(cm)) = packet->segment_size; |
| vlen++; |
| } else { |
| #endif |
| // UDP_SEGMENT is unavailable |
| // Send the given data as multiple messages |
| int offset = 0; |
| while (offset < packet->chain.get()->size()) { |
| msg = &msgvec[vlen].msg_hdr; |
| msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa); |
| msg->msg_namelen = ats_ip_size(packet->to); |
| iov = static_cast<struct iovec *>(alloca(sizeof(struct iovec))); |
| iov_len = 1; |
| iov->iov_base = packet->chain.get()->start() + offset; |
| iov->iov_len = |
| std::min(packet->segment_size, static_cast<uint16_t>(packet->chain.get()->end() - static_cast<char *>(iov->iov_base))); |
| msg->msg_iov = iov; |
| msg->msg_iovlen = iov_len; |
| offset += iov->iov_len; |
| vlen++; |
| } |
| ink_assert(offset == packet->chain.get()->size()); |
| #ifdef SOL_UDP |
| } // use_udp_gso |
| #endif |
| } else { |
| // Nothing is special |
| msg = &msgvec[vlen].msg_hdr; |
| msg->msg_name = reinterpret_cast<caddr_t>(&packet->to.sa); |
| msg->msg_namelen = ats_ip_size(packet->to); |
| iov = static_cast<struct iovec *>(alloca(sizeof(struct iovec) * 64)); |
| iov_len = 0; |
| for (IOBufferBlock *b = packet->chain.get(); b != nullptr; b = b->next.get()) { |
| iov[iov_len].iov_base = static_cast<caddr_t>(b->start()); |
| iov[iov_len].iov_len = b->size(); |
| iov_len++; |
| } |
| msg->msg_iov = iov; |
| msg->msg_iovlen = iov_len; |
| vlen++; |
| } |
| } |
| |
| if (vlen == 0) { |
| return 0; |
| } |
| |
| int res = ::sendmmsg(fd, msgvec, vlen, 0); |
| if (res < 0) { |
| #ifdef SOL_UDP |
| if (use_udp_gso && errno == EIO) { |
| Warning("Disabling UDP GSO due to an error"); |
| Debug("udp-send", "Disabling UDP GSO due to an error"); |
| use_udp_gso = false; |
| return SendMultipleUDPPackets(p, n); |
| } else { |
| Debug("udp-send", "udp_gso=%d res=%d errno=%d", use_udp_gso, res, errno); |
| return res; |
| } |
| #else |
| Debug("udp-send", "res=%d errno=%d", res, errno); |
| return res; |
| #endif |
| } |
| |
| if (res > 0) { |
| #ifdef SOL_UDP |
| if (use_udp_gso) { |
| Debug("udp-send", "Sent %d messages by processing %d UDPPackets (GSO)", res, n); |
| } else { |
| #endif |
| int i = 0; |
| int nmsg = res; |
| for (i = 0; i < n && res > 0; ++i) { |
| if (p[i]->segment_size == 0) { |
| res -= 1; |
| } else { |
| res -= (p[i]->chain.get()->size() / p[i]->segment_size) + ((p[i]->chain.get()->size() % p[i]->segment_size) != 0); |
| } |
| } |
| Debug("udp-send", "Sent %d messages by processing %d UDPPackets", nmsg, i); |
| res = i; |
| #ifdef SOL_UDP |
| } |
| #endif |
| } |
| |
| return res; |
| #else |
| // sendmmsg is unavailable |
| for (int i = 0; i < n; ++i) { |
| SendUDPPacket(p[i]); |
| } |
| return n; |
| #endif |
| } |
| |
| #undef LINK |
| |
| static void |
| net_signal_hook_callback(EThread *thread) |
| { |
| #if HAVE_EVENTFD |
| uint64_t counter; |
| ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t))); |
| #elif TS_USE_PORT |
| /* Nothing to drain or do */ |
| #else |
| char dummy[1024]; |
| ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024)); |
| #endif |
| } |
| |
| UDPNetHandler::UDPNetHandler(bool enable_gso) : udpOutQueue(enable_gso) |
| { |
| nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000); |
| lastCheck = 0; |
| SET_HANDLER(&UDPNetHandler::startNetEvent); |
| } |
| |
| int |
| UDPNetHandler::startNetEvent(int event, Event *e) |
| { |
| (void)event; |
| SET_HANDLER(&UDPNetHandler::mainNetEvent); |
| trigger_event = e; |
| e->schedule_every(-HRTIME_MSECONDS(UDP_NH_PERIOD)); |
| return EVENT_CONT; |
| } |
| |
| int |
| UDPNetHandler::mainNetEvent(int event, Event *e) |
| { |
| ink_assert(trigger_event == e && event == EVENT_POLL); |
| return this->waitForActivity(net_config_poll_timeout); |
| } |
| |
| int |
| UDPNetHandler::waitForActivity(ink_hrtime timeout) |
| { |
| UnixUDPConnection *uc; |
| PollCont *pc = get_UDPPollCont(this->thread); |
| pc->do_poll(timeout); |
| |
| /* Notice: the race between traversal of newconn_list and UDPBind() |
| * |
| * If the UDPBind() is called after the traversal of newconn_list, |
| * the UDPConnection, the one from the pollDescriptor->result, did not push into the open_list. |
| * |
| * TODO: |
| * |
| * Take UnixNetVConnection::acceptEvent() as reference to create UnixUDPConnection::newconnEvent(). |
| */ |
| |
| // handle new UDP connection |
| SList(UnixUDPConnection, newconn_alink) ncq(newconn_list.popall()); |
| while ((uc = ncq.pop())) { |
| if (uc->shouldDestroy()) { |
| open_list.remove(uc); // due to the above race |
| uc->Release(); |
| } else { |
| ink_assert(uc->mutex && uc->continuation); |
| open_list.in_or_enqueue(uc); // due to the above race |
| } |
| } |
| |
| // handle UDP outgoing engine |
| udpOutQueue.service(this); |
| |
| // handle UDP read operations |
| int i = 0; |
| EventIO *epd = nullptr; |
| for (i = 0; i < pc->pollDescriptor->result; i++) { |
| epd = static_cast<EventIO *> get_ev_data(pc->pollDescriptor, i); |
| if (epd->type == EVENTIO_UDP_CONNECTION) { |
| // TODO: handle EVENTIO_ERROR |
| if (get_ev_events(pc->pollDescriptor, i) & EVENTIO_READ) { |
| uc = epd->data.uc; |
| ink_assert(uc && uc->mutex && uc->continuation); |
| ink_assert(uc->refcount >= 1); |
| open_list.in_or_enqueue(uc); // due to the above race |
| if (uc->shouldDestroy()) { |
| open_list.remove(uc); |
| uc->Release(); |
| } else { |
| udpNetInternal.udp_read_from_net(this, uc); |
| } |
| } else { |
| Debug("iocore_udp_main", "Unhandled epoll event: 0x%04x", get_ev_events(pc->pollDescriptor, i)); |
| } |
| } else if (epd->type == EVENTIO_DNS_CONNECTION) { |
| // TODO: handle DNS conn if there is ET_UDP |
| if (epd->data.dnscon != nullptr) { |
| epd->data.dnscon->trigger(); |
| #if defined(USE_EDGE_TRIGGER) |
| epd->refresh(EVENTIO_READ); |
| #endif |
| } |
| } else if (epd->type == EVENTIO_ASYNC_SIGNAL) { |
| net_signal_hook_callback(this->thread); |
| } |
| } // end for |
| |
| // remove dead UDP connections |
| ink_hrtime now = Thread::get_hrtime_updated(); |
| if (now >= nextCheck) { |
| forl_LL(UnixUDPConnection, xuc, open_list) |
| { |
| ink_assert(xuc->mutex && xuc->continuation); |
| ink_assert(xuc->refcount >= 1); |
| if (xuc->shouldDestroy()) { |
| open_list.remove(xuc); |
| xuc->Release(); |
| } |
| } |
| nextCheck = Thread::get_hrtime_updated() + HRTIME_MSECONDS(1000); |
| } |
| // service UDPConnections with data ready for callback. |
| Que(UnixUDPConnection, callback_link) q = udp_callbacks; |
| udp_callbacks.clear(); |
| while ((uc = q.dequeue())) { |
| ink_assert(uc->mutex && uc->continuation); |
| if (udpNetInternal.udp_callback(this, uc, this->thread)) { // not successful |
| // schedule on a thread of its own. |
| ink_assert(uc->callback_link.next == nullptr); |
| ink_assert(uc->callback_link.prev == nullptr); |
| udp_callbacks.enqueue(uc); |
| } else { |
| ink_assert(uc->callback_link.next == nullptr); |
| ink_assert(uc->callback_link.prev == nullptr); |
| uc->onCallbackQueue = 0; |
| uc->Release(); |
| } |
| } |
| |
| return EVENT_CONT; |
| } |
| |
| void |
| UDPNetHandler::signalActivity() |
| { |
| #if HAVE_EVENTFD |
| uint64_t counter = 1; |
| ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t))); |
| #elif TS_USE_PORT |
| PollDescriptor *pd = get_PollDescriptor(thread); |
| ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep)); |
| #else |
| char dummy = 1; |
| ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1)); |
| #endif |
| } |