blob: 196dde5af8d55759e2113499a0dc98cb5a246e6f [file] [log] [blame]
/*
*
* Copyright (c) 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <iostream>
#include <boost/bind.hpp>
#include <boost/assert.hpp>
#include "EventChannelConnection.h"
#include "sys/SessionHandlerFactory.h"
#include "QpidError.h"
using namespace std;
using namespace qpid;
using namespace qpid::framing;
namespace qpid {
namespace sys {
const size_t EventChannelConnection::bufferSize = 65536;
EventChannelConnection::EventChannelConnection(
EventChannelThreads::shared_ptr threads_,
SessionHandlerFactory& factory_,
int rfd,
int wfd,
bool isTrace_
) :
readFd(rfd),
writeFd(wfd ? wfd : rfd),
readCallback(boost::bind(&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endInitRead)),
isWriting(false),
isClosed(false),
threads(threads_),
handler(factory_.create(this)),
in(bufferSize),
out(bufferSize),
isTrace(isTrace_)
{
BOOST_ASSERT(readFd > 0);
BOOST_ASSERT(writeFd > 0);
closeOnException(&EventChannelConnection::startRead);
}
void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
{
Monitor::ScopedLock lock(monitor);
assert(frame.get());
writeFrames.push_back(frame.release());
}
closeOnException(&EventChannelConnection::startWrite);
}
void EventChannelConnection::close() {
{
Monitor::ScopedLock lock(monitor);
if (isClosed)
return;
isClosed = true;
}
::close(readFd);
::close(writeFd);
{
Monitor::ScopedLock lock(monitor);
while (busyThreads > 0)
monitor.wait();
}
handler->closed();
}
void EventChannelConnection::closeNoThrow() {
Exception::tryCatchLog<void>(
boost::bind(&EventChannelConnection::close, this),
false,
"Exception closing channel"
);
}
/**
* Call f in a try/catch block and close the connection if
* an exception is thrown.
*/
void EventChannelConnection::closeOnException(MemberFnPtr f)
{
try {
Exception::tryCatchLog<void>(
boost::bind(f, this),
"Closing connection due to exception"
);
return;
} catch (...) {
// Exception was already logged by tryCatchLog
closeNoThrow();
}
}
// Post the write event.
// Always called inside closeOnException.
// Called by endWrite and send, but only one thread writes at a time.
//
void EventChannelConnection::startWrite() {
FrameQueue::auto_type frame;
{
Monitor::ScopedLock lock(monitor);
// Stop if closed or a write event is already in progress.
if (isClosed || isWriting)
return;
if (writeFrames.empty()) {
isWriting = false;
return;
}
isWriting = true;
frame = writeFrames.pop_front();
}
// No need to lock here - only one thread can be writing at a time.
out.clear();
if (isTrace)
cout << "Send on socket " << writeFd << ": " << *frame << endl;
frame->encode(out);
out.flip();
writeEvent = WriteEvent(
writeFd, out.start(), out.available(),
boost::bind(&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endWrite));
threads->post(writeEvent);
}
// ScopedBusy ctor increments busyThreads.
// dtor decrements and calls monitor.notifyAll if it reaches 0.
//
struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
{
ScopedBusy(EventChannelConnection& ecc)
: AtomicCount::ScopedIncrement(
ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
{}
};
// Write event completed.
// Always called by a channel thread inside closeOnException.
//
void EventChannelConnection::endWrite() {
ScopedBusy(*this);
{
Monitor::ScopedLock lock(monitor);
isWriting = false;
if (isClosed)
return;
writeEvent.throwIfException();
}
// Check if there's more in to write in the write queue.
startWrite();
}
// Post the read event.
// Always called inside closeOnException.
// Called from ctor and end[Init]Read, so only one call at a time
// is possible since we only post one read event at a time.
//
void EventChannelConnection::startRead() {
// Non blocking read, as much as we can swallow.
readEvent = ReadEvent(
readFd, in.start(), in.available(), readCallback,true);
threads->post(readEvent);
}
// Completion of initial read, expect protocolInit.
// Always called inside closeOnException in channel thread.
// Only called by one thread at a time.
void EventChannelConnection::endInitRead() {
ScopedBusy(*this);
if (!isClosed) {
readEvent.throwIfException();
in.move(readEvent.getBytesRead());
in.flip();
ProtocolInitiation protocolInit;
if(protocolInit.decode(in)){
handler->initiated(&protocolInit);
readCallback = boost::bind(
&EventChannelConnection::closeOnException,
this, &EventChannelConnection::endRead);
}
in.compact();
// Continue reading.
startRead();
}
}
// Normal reads, expect a frame.
// Always called inside closeOnException in channel thread.
void EventChannelConnection::endRead() {
ScopedBusy(*this);
if (!isClosed) {
readEvent.throwIfException();
in.move(readEvent.getBytesRead());
in.flip();
AMQFrame frame;
while (frame.decode(in)) {
// TODO aconway 2006-11-30: received should take Frame&
if (isTrace)
cout << "Received on socket " << readFd
<< ": " << frame << endl;
handler->received(&frame);
}
in.compact();
startRead();
}
}
}} // namespace qpid::sys