blob: cf08b482e6b52fad692b311ca0eee51031018376 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/sys/AsynchIOHandler.h"
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
namespace qpid {
namespace sys {
struct ProtocolTimeoutTask : public sys::TimerTask {
AsynchIOHandler& handler;
std::string id;
ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
TimerTask(timeout, "ProtocolTimeout"),
handler(h),
id(i)
{}
void fire() {
// If this fires it means that we didn't negotiate the connection in the timeout period
// Schedule closing the connection for the io thread
QPID_LOG(error, "Connection " << id << " No protocol received closing");
handler.abort();
}
};
AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
identifier(id),
aio(0),
factory(f),
codec(0),
reads(0),
readError(false),
isClient(isClient0),
nodict(nodict0)
{}
AsynchIOHandler::~AsynchIOHandler() {
if (codec)
codec->closed();
if (timeoutTimerTask)
timeoutTimerTask->cancel();
delete codec;
}
void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime) {
aio = a;
// Start timer for this connection
timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
timer.add(timeoutTimerTask);
// Give connection some buffers to use
aio->createBuffers();
}
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "]: INIT(" << data << ")");
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
aio->queueWrite(buff);
}
void AsynchIOHandler::abort() {
// Don't disconnect if we're already disconnecting
if (!readError) {
aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
}
aio->queueWriteClose();
}
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
namespace {
SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
{
SecuritySettings settings = aio->getSecuritySettings();
settings.nodict = nodict;
return settings;
}
}
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
return;
}
++reads;
size_t decoded = 0;
if (codec) { // Already initiated
try {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
// When we've decoded 3 reads (probably frames) we will have authenticated and
// started heartbeats, if specified, in many (but not all) cases so now we will cancel
// the idle connection timeout - this is really hacky, and would be better implemented
// in the connection, but that isn't actually created until the first decode.
if (reads == 3) {
timeoutTimerTask->cancel();
}
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
aio->queueWriteClose();
}
}else{
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict));
if (!codec) {
//TODO: may still want to revise this...
//send valid version header & close connection.
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
readError = true;
aio->queueWriteClose();
} else {
//read any further data that may already have been sent
decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition());
}
} catch (const std::exception& e) {
QPID_LOG(error, e.what());
readError = true;
aio->queueWriteClose();
}
}
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
if (decoded != size_t(buff->dataCount)) {
// Adjust buffer for used bytes and then "unread them"
buff->dataStart += decoded;
buff->dataCount -= decoded;
aio->unread(buff);
} else {
// Give whole buffer back to aio subsystem
aio->queueReadBuffer(buff);
}
}
void AsynchIOHandler::eof(AsynchIO& a) {
disconnect(a);
readError = true;
aio->queueWriteClose();
}
void AsynchIOHandler::closedSocket(AsynchIO&, const Socket& s) {
// If we closed with data still to send log a warning
if (!aio->writeQueueEmpty()) {
QPID_LOG(warning, "CLOSING [" << identifier << "] unsent data (probably due to client disconnect)");
}
delete &s;
aio->queueForDeletion();
delete this;
}
void AsynchIOHandler::disconnect(AsynchIO&) {
QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
if (codec) codec->closed();
}
// Notifications
void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
write(framing::ProtocolInitiation(codec->getVersion()));
// We've just sent the protocol negotiation so we can cancel the timeout for that
// This is not ideal, because we've not received anything yet, but heartbeats will
// be active soon
timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
if (!codec->canEncode()) {
return;
}
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
if (buff) {
try {
size_t encoded=codec->encode(buff->bytes, buff->byteCount);
buff->dataCount = encoded;
aio->queueWrite(buff);
if (!codec->isClosed()) {
return;
}
} catch (const std::exception& e) {
QPID_LOG(error, e.what());
}
}
readError = true;
aio->queueWriteClose();
}
}} // namespace qpid::sys