blob: 4722fc0b8b0631fa3029f2a6b4eedab3e7a3de61 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "DispatchHandle.h"
#include <boost/cast.hpp>
#include <assert.h>
namespace qpid {
namespace sys {
DispatchHandle::~DispatchHandle() {
stopWatch();
}
void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
bool r = readableCallback;
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
assert(state == IDLE);
// If no callbacks set then do nothing (that is what we were asked to do!)
// TODO: Maybe this should be an assert instead
if (!r && !w) {
state = INACTIVE;
return;
}
Poller::Direction d = r ?
(w ? Poller::INOUT : Poller::INPUT) :
Poller::OUTPUT;
poller = poller0;
poller->addFd(*this, d);
state = r ?
(w ? ACTIVE_RW : ACTIVE_R) :
ACTIVE_W;
}
void DispatchHandle::rewatch() {
bool r = readableCallback;
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
case DELAYED_INACTIVE:
state = r ?
(w ? DELAYED_RW : DELAYED_R) :
DELAYED_W;
break;
case DELAYED_DELETE:
break;
case INACTIVE:
case ACTIVE_R:
case ACTIVE_W: {
assert(poller);
Poller::Direction d = r ?
(w ? Poller::INOUT : Poller::INPUT) :
Poller::OUTPUT;
poller->modFd(*this, d);
state = r ?
(w ? ACTIVE_RW : ACTIVE_R) :
ACTIVE_W;
break;
}
case DELAYED_RW:
case ACTIVE_RW:
// Don't need to do anything already waiting for readable/writable
break;
}
}
void DispatchHandle::rewatchRead() {
if (!readableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_RW:
case DELAYED_DELETE:
break;
case DELAYED_W:
state = DELAYED_RW;
break;
case DELAYED_INACTIVE:
state = DELAYED_R;
break;
case ACTIVE_R:
case ACTIVE_RW:
// Nothing to do: already waiting for readable
break;
case INACTIVE:
assert(poller);
poller->modFd(*this, Poller::INPUT);
state = ACTIVE_R;
break;
case ACTIVE_W:
assert(poller);
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
}
}
void DispatchHandle::rewatchWrite() {
if (!writableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
case DELAYED_IDLE:
break;
case DELAYED_W:
case DELAYED_RW:
case DELAYED_DELETE:
break;
case DELAYED_R:
state = DELAYED_RW;
break;
case DELAYED_INACTIVE:
state = DELAYED_W;
break;
case INACTIVE:
assert(poller);
poller->modFd(*this, Poller::OUTPUT);
state = ACTIVE_W;
break;
case ACTIVE_R:
assert(poller);
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
case ACTIVE_W:
case ACTIVE_RW:
// Nothing to do: already waiting for writable
break;
}
}
void DispatchHandle::unwatchRead() {
if (!readableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
case DELAYED_IDLE:
break;
case DELAYED_R:
state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_W;
break;
case DELAYED_W:
case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_R:
assert(poller);
poller->modFd(*this, Poller::NONE);
state = INACTIVE;
break;
case ACTIVE_RW:
assert(poller);
poller->modFd(*this, Poller::OUTPUT);
state = ACTIVE_W;
break;
case ACTIVE_W:
case INACTIVE:
break;
}
}
void DispatchHandle::unwatchWrite() {
if (!writableCallback) {
return;
}
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
case DELAYED_IDLE:
break;
case DELAYED_W:
state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_R;
break;
case DELAYED_R:
case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_W:
assert(poller);
poller->modFd(*this, Poller::NONE);
state = INACTIVE;
break;
case ACTIVE_RW:
assert(poller);
poller->modFd(*this, Poller::INPUT);
state = ACTIVE_R;
break;
case ACTIVE_R:
case INACTIVE:
break;
}
}
void DispatchHandle::unwatch() {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
case DELAYED_INACTIVE:
state = DELAYED_INACTIVE;
break;
case DELAYED_DELETE:
break;
default:
assert(poller);
poller->modFd(*this, Poller::NONE);
state = INACTIVE;
break;
}
}
void DispatchHandle::stopWatch() {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
case DELAYED_IDLE:
case DELAYED_DELETE:
return;
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
case DELAYED_INACTIVE:
state = DELAYED_IDLE;
break;
default:
state = IDLE;
break;
}
assert(poller);
poller->delFd(*this);
poller.reset();
}
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
void DispatchHandle::doDelete() {
// Ensure that we're no longer watching anything
stopWatch();
// If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case DELAYED_IDLE:
case DELAYED_DELETE:
state = DELAYED_DELETE;
return;
case IDLE:
break;
default:
// Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
assert(false);
}
}
// If we're not then do it right away
delete this;
}
void DispatchHandle::processEvent(Poller::EventType type) {
// Note that we are now doing the callbacks
{
ScopedLock<Mutex> lock(stateLock);
// Set up to wait for same events next time unless reset
switch(state) {
case ACTIVE_R:
state = DELAYED_R;
break;
case ACTIVE_W:
state = DELAYED_W;
break;
case ACTIVE_RW:
state = DELAYED_RW;
break;
// Can only get here in a DELAYED_* state in the rare case
// that we're already here for reading and we get activated for
// writing and we can write (it might be possible the other way
// round too). In this case we're already processing the handle
// in a different thread in this function so return right away
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
case DELAYED_INACTIVE:
case DELAYED_IDLE:
case DELAYED_DELETE:
return;
default:
assert(false);
}
}
// Do callbacks - whilst we are doing the callbacks we are prevented from processing
// the same handle until we re-enable it. To avoid rentering the callbacks for a single
// handle re-enabling in the callbacks is actually deferred until they are complete.
switch (type) {
case Poller::READABLE:
readableCallback(*this);
break;
case Poller::WRITABLE:
writableCallback(*this);
break;
case Poller::READ_WRITABLE:
readableCallback(*this);
writableCallback(*this);
break;
case Poller::DISCONNECTED:
{
ScopedLock<Mutex> lock(stateLock);
state = DELAYED_INACTIVE;
}
if (disconnectedCallback) {
disconnectedCallback(*this);
}
break;
default:
assert(false);
}
// If any of the callbacks re-enabled reading/writing then actually
// do it now
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case DELAYED_R:
poller->modFd(*this, Poller::INPUT);
state = ACTIVE_R;
return;
case DELAYED_W:
poller->modFd(*this, Poller::OUTPUT);
state = ACTIVE_W;
return;
case DELAYED_RW:
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
return;
case DELAYED_INACTIVE:
state = INACTIVE;
return;
case DELAYED_IDLE:
state = IDLE;
return;
default:
// This should be impossible
assert(false);
return;
case DELAYED_DELETE:
break;
}
}
delete this;
}
}}