blob: 0070b24ec065c23954ad0d0da5774285826558fd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/client/TCPConnector.h"
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/ConnectionSettings.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Codec.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/SecurityLayer.h"
#include "qpid/Msg.h"
#include <iostream>
#include <boost/bind.hpp>
#include <boost/format.hpp>
namespace qpid {
namespace client {
using namespace qpid::sys;
using namespace qpid::framing;
using boost::format;
using boost::str;
struct TCPConnector::Buff : public AsynchIO::BufferBase {
Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
~Buff() { delete [] bytes;}
};
// Static constructor which registers connector here
namespace {
Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
return new TCPConnector(p, v, s, c);
}
struct StaticInit {
StaticInit() {
Connector::registerFactory("tcp", &create);
};
} init;
}
TCPConnector::TCPConnector(Poller::shared_ptr p,
ProtocolVersion ver,
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
lastEof(0),
currentSize(0),
bounds(cimpl),
version(ver),
initiated(false),
closed(true),
shutdownHandler(0),
connector(0),
aio(0),
poller(p)
{
QPID_LOG(debug, "TCPConnector created for " << version);
settings.configureSocket(socket);
}
TCPConnector::~TCPConnector() {
close();
}
void TCPConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
socket,
host, port,
boost::bind(&TCPConnector::connected, this, _1),
boost::bind(&TCPConnector::connectFailed, this, _3));
closed = false;
connector->start(poller);
}
void TCPConnector::connected(const Socket&) {
connector = 0;
aio = AsynchIO::create(socket,
boost::bind(&TCPConnector::readbuff, this, _1, _2),
boost::bind(&TCPConnector::eof, this, _1),
boost::bind(&TCPConnector::disconnected, this, _1),
boost::bind(&TCPConnector::socketClosed, this, _1, _2),
0, // nobuffs
boost::bind(&TCPConnector::writebuff, this, _1));
start(aio);
initAmqp();
aio->start(poller);
}
void TCPConnector::start(sys::AsynchIO* aio_) {
aio = aio_;
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff(maxFrameSize));
}
identifier = str(format("[%1%]") % socket.getFullAddress());
}
void TCPConnector::initAmqp() {
ProtocolInitiation init(version);
writeDataBlock(init);
}
void TCPConnector::connectFailed(const std::string& msg) {
connector = 0;
QPID_LOG(warning, "Connect failed: " << msg);
socket.close();
if (!closed)
closed = true;
if (shutdownHandler)
shutdownHandler->shutdown();
}
void TCPConnector::close() {
Mutex::ScopedLock l(lock);
if (!closed) {
closed = true;
if (aio)
aio->queueWriteClose();
}
}
void TCPConnector::socketClosed(AsynchIO&, const Socket&) {
if (aio)
aio->queueForDeletion();
if (shutdownHandler)
shutdownHandler->shutdown();
}
void TCPConnector::abort() {
// Can't abort a closed connection
if (!closed) {
if (aio) {
// Established connection
aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
} else if (connector) {
// We're still connecting
connector->stop();
connectFailed("Connection timedout");
}
}
}
void TCPConnector::setInputHandler(InputHandler* handler){
input = handler;
}
void TCPConnector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
OutputHandler* TCPConnector::getOutputHandler() {
return this;
}
sys::ShutdownHandler* TCPConnector::getShutdownHandler() const {
return shutdownHandler;
}
const std::string& TCPConnector::getIdentifier() const {
return identifier;
}
void TCPConnector::send(AMQFrame& frame) {
bool notifyWrite = false;
{
Mutex::ScopedLock l(lock);
frames.push_back(frame);
//only ask to write if this is the end of a frameset or if we
//already have a buffers worth of data
currentSize += frame.encodedSize();
if (frame.getEof()) {
lastEof = frames.size();
notifyWrite = true;
} else {
notifyWrite = (currentSize >= maxFrameSize);
}
/*
NOTE: Moving the following line into this mutex block
is a workaround for BZ 570168, in which the test
testConcurrentSenders causes a hang about 1.5%
of the time. ( To see the hang much more frequently
leave this line out of the mutex block, and put a
small usleep just before it.)
TODO mgoulish - fix the underlying cause and then
move this call back outside the mutex.
*/
if (notifyWrite && !closed) aio->notifyPendingWrite();
}
}
void TCPConnector::writebuff(AsynchIO& /*aio*/)
{
// It's possible to be disconnected and be writable
if (closed)
return;
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
if (codec->canEncode()) {
std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
buffer->dataCount = encoded;
aio->queueWrite(buffer.release());
}
}
// Called in IO thread.
bool TCPConnector::canEncode()
{
Mutex::ScopedLock l(lock);
//have at least one full frameset or a whole buffers worth of data
return lastEof || currentSize >= maxFrameSize;
}
// Called in IO thread.
size_t TCPConnector::encode(const char* buffer, size_t size)
{
framing::Buffer out(const_cast<char*>(buffer), size);
size_t bytesWritten(0);
{
Mutex::ScopedLock l(lock);
while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
frames.front().encode(out);
QPID_LOG(trace, "SENT " << identifier << ": " << frames.front());
frames.pop_front();
if (lastEof) --lastEof;
}
bytesWritten = size - out.available();
currentSize -= bytesWritten;
}
if (bounds) bounds->reduce(bytesWritten);
return bytesWritten;
}
bool TCPConnector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff)
{
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
int32_t decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
if (decoded < buff->dataCount) {
// Adjust buffer for used bytes and then "unread them"
buff->dataStart += decoded;
buff->dataCount -= decoded;
aio.unread(buff);
} else {
// Give whole buffer back to aio subsystem
aio.queueReadBuffer(buff);
}
return true;
}
size_t TCPConnector::decode(const char* buffer, size_t size)
{
framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
if(!(protocolInit==version)){
throw Exception(QPID_MSG("Unsupported version: " << protocolInit
<< " supported version " << version));
}
}
initiated = true;
}
AMQFrame frame;
while(frame.decode(in)){
QPID_LOG(trace, "RECV " << identifier << ": " << frame);
input->received(frame);
}
return size - in.available();
}
void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
aio->queueWrite(buff);
}
void TCPConnector::eof(AsynchIO&) {
close();
}
void TCPConnector::disconnected(AsynchIO&) {
close();
socketClosed(*aio, socket);
}
void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;
securityLayer->init(this);
}
}} // namespace qpid::client