blob: 16c7ec9c3f6d1f1930f4df90c44f14ca227d3dc7 [file] [log] [blame]
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <mqueue.h>
#include <string.h>
#include <iostream>
#include <sys/errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <typeinfo>
#include <iostream>
#include <queue>
#include <boost/ptr_container/ptr_map.hpp>
#include <boost/current_function.hpp>
#include <QpidError.h>
#include <sys/Monitor.h>
#include "check.h"
#include "EventChannel.h"
using namespace std;
// Convenience template to zero out a struct.
template <class S> struct ZeroStruct : public S {
ZeroStruct() { memset(this, 0, sizeof(*this)); }
};
namespace qpid {
namespace sys {
/**
* EventHandler wraps an epoll file descriptor. Acts as private
* interface between EventChannel and subclasses.
*
* Also implements Event interface for events that are not associated
* with a file descriptor and are passed via the message queue.
*/
class EventHandler : public Event, private Monitor
{
public:
EventHandler(int epollSize = 256);
~EventHandler();
int getEpollFd() { return epollFd; }
void epollAdd(int fd, uint32_t epollEvents, Event* event);
void epollMod(int fd, uint32_t epollEvents, Event* event);
void epollDel(int fd);
void mqPut(Event* event);
Event* mqGet();
protected:
// Should never be called, only complete.
void prepare(EventHandler&) { assert(0); }
Event* complete(EventHandler& eh);
private:
int epollFd;
std::string mqName;
int mqFd;
std::queue<Event*> mqEvents;
};
EventHandler::EventHandler(int epollSize)
{
epollFd = epoll_create(epollSize);
if (epollFd < 0) throw QPID_POSIX_ERROR(errno);
// Create a POSIX message queue for non-fd events.
// We write one byte and never read it is always ready for read
// when we add it to epoll.
//
ZeroStruct<struct mq_attr> attr;
attr.mq_maxmsg = 1;
attr.mq_msgsize = 1;
do {
char tmpnam[L_tmpnam];
tmpnam_r(tmpnam);
mqName = tmpnam + 4; // Skip "tmp/"
mqFd = mq_open(
mqName.c_str(), O_CREAT|O_EXCL|O_RDWR|O_NONBLOCK, S_IRWXU, &attr);
if (mqFd < 0) throw QPID_POSIX_ERROR(errno);
} while (mqFd == EEXIST); // Name already taken, try again.
static char zero = '\0';
mq_send(mqFd, &zero, 1, 0);
epollAdd(mqFd, 0, this);
}
EventHandler::~EventHandler() {
mq_close(mqFd);
mq_unlink(mqName.c_str());
}
void EventHandler::mqPut(Event* event) {
ScopedLock l(*this);
assert(event != 0);
mqEvents.push(event);
epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
}
Event* EventHandler::mqGet() {
ScopedLock l(*this);
if (mqEvents.empty())
return 0;
Event* event = mqEvents.front();
mqEvents.pop();
if(!mqEvents.empty())
epollMod(mqFd, EPOLLIN|EPOLLONESHOT, this);
return event;
}
void EventHandler::epollAdd(int fd, uint32_t epollEvents, Event* event)
{
ZeroStruct<struct epoll_event> ee;
ee.data.ptr = event;
ee.events = epollEvents;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ee) < 0)
throw QPID_POSIX_ERROR(errno);
}
void EventHandler::epollMod(int fd, uint32_t epollEvents, Event* event)
{
ZeroStruct<struct epoll_event> ee;
ee.data.ptr = event;
ee.events = epollEvents;
if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ee) < 0)
throw QPID_POSIX_ERROR(errno);
}
void EventHandler::epollDel(int fd) {
if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, 0) < 0)
throw QPID_POSIX_ERROR(errno);
}
Event* EventHandler::complete(EventHandler& eh)
{
assert(&eh == this);
Event* event = mqGet();
return event==0 ? 0 : event->complete(eh);
}
// ================================================================
// EventChannel
EventChannel::shared_ptr EventChannel::create() {
return shared_ptr(new EventChannel());
}
EventChannel::EventChannel() : handler(new EventHandler()) {}
EventChannel::~EventChannel() {}
void EventChannel::postEvent(Event& e)
{
e.prepare(*handler);
}
Event* EventChannel::getEvent()
{
static const int infiniteTimeout = -1;
ZeroStruct<struct epoll_event> epollEvent;
// Loop until we can complete the event. Some events may re-post
// themselves and return 0 from complete, e.g. partial reads. //
Event* event = 0;
while (event == 0) {
int eventCount = epoll_wait(handler->getEpollFd(),
&epollEvent, 1, infiniteTimeout);
if (eventCount < 0) {
if (errno != EINTR) {
// TODO aconway 2006-11-28: Proper handling/logging of errors.
cerr << BOOST_CURRENT_FUNCTION << " ignoring error "
<< PosixError::getMessage(errno) << endl;
assert(0);
}
}
else if (eventCount == 1) {
event = reinterpret_cast<Event*>(epollEvent.data.ptr);
assert(event != 0);
try {
event = event->complete(*handler);
}
catch (const Exception& e) {
if (event)
event->setError(e);
}
catch (const std::exception& e) {
if (event)
event->setError(e);
}
}
}
return event;
}
Event::~Event() {}
void Event::prepare(EventHandler& handler)
{
handler.mqPut(this);
}
bool Event::hasError() const {
return error;
}
void Event::throwIfError() throw (Exception) {
if (hasError())
error.throwSelf();
}
Event* Event::complete(EventHandler&)
{
return this;
}
void Event::dispatch()
{
try {
if (!callback.empty())
callback();
} catch (const std::exception&) {
throw;
} catch (...) {
throw QPID_ERROR(INTERNAL_ERROR, "Unknown exception.");
}
}
void Event::setError(const ExceptionHolder& e) {
error = e;
}
void ReadEvent::prepare(EventHandler& handler)
{
handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
}
ssize_t ReadEvent::doRead() {
ssize_t n = ::read(descriptor, static_cast<char*>(buffer) + received,
size - received);
if (n > 0) received += n;
return n;
}
Event* ReadEvent::complete(EventHandler& handler)
{
// Read as much as possible without blocking.
ssize_t n = doRead();
while (n > 0 && received < size) doRead();
if (received == size) {
handler.epollDel(descriptor);
received = 0; // Reset for re-use.
return this;
}
else if (n <0 && (errno == EAGAIN)) {
// Keep polling for more.
handler.epollMod(descriptor, EPOLLIN | EPOLLONESHOT, this);
return 0;
}
else {
// Unexpected EOF or error. Throw ENODATA for EOF.
handler.epollDel(descriptor);
received = 0; // Reset for re-use.
throw QPID_POSIX_ERROR((n < 0) ? errno : ENODATA);
}
}
void WriteEvent::prepare(EventHandler& handler)
{
handler.epollAdd(descriptor, EPOLLOUT | EPOLLONESHOT, this);
}
Event* WriteEvent::complete(EventHandler& handler)
{
ssize_t n = write(descriptor, static_cast<const char*>(buffer) + written,
size - written);
if (n < 0) throw QPID_POSIX_ERROR(errno);
written += n;
if(written < size) {
// Keep polling.
handler.epollMod(descriptor, EPOLLOUT | EPOLLONESHOT, this);
return 0;
}
written = 0; // Reset for re-use.
handler.epollDel(descriptor);
return this;
}
void AcceptEvent::prepare(EventHandler& handler)
{
handler.epollAdd(descriptor, EPOLLIN | EPOLLONESHOT, this);
}
Event* AcceptEvent::complete(EventHandler& handler)
{
handler.epollDel(descriptor);
accepted = ::accept(descriptor, 0, 0);
if (accepted < 0) throw QPID_POSIX_ERROR(errno);
return this;
}
}}