blob: 1f135cf931983e9b01b58765b0069f590c35c90d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Connection.h"
#include "Session.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/OutputControl.h"
#include <sstream>
extern "C" {
#include <proton/engine.h>
#include <proton/error.h>
}
namespace qpid {
namespace broker {
namespace amqp {
Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, bool saslInUse)
: ManagedConnection(b, i),
connection(pn_connection()),
transport(pn_transport()),
out(o), id(i), broker(b), haveOutput(true)
{
if (pn_transport_bind(transport, connection)) {
//error
}
out.activateOutput();
bool enableTrace(false);
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
if (!saslInUse) {
//feed in a dummy AMQP 1.0 header as engine expects one, but
//we already read it (if sasl is in use we read the sasl
//header,not the AMQP 1.0 header).
std::vector<char> protocolHeader(8);
qpid::framing::ProtocolInitiation pi(getVersion());
qpid::framing::Buffer buffer(&protocolHeader[0], protocolHeader.size());
pi.encode(buffer);
pn_transport_input(transport, &protocolHeader[0], protocolHeader.size());
//wont get a userid, so set a dummy one on the ManagedConnection to trigger event
setUserid("no authentication used");
}
}
Connection::~Connection()
{
pn_transport_free(transport);
pn_connection_free(connection);
}
pn_transport_t* Connection::getTransport()
{
return transport;
}
size_t Connection::decode(const char* buffer, size_t size)
{
QPID_LOG(trace, id << " decode(" << size << ")")
//TODO: Fix pn_engine_input() to take const buffer
ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
//If engine returns EOS, have no way of knowing how many bytes
//it processed, but can assume none need to be reprocessed so
//consider them all read:
if (n == PN_EOS) n = size;
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
process();
pn_transport_tick(transport, 0);
if (!haveOutput) {
haveOutput = true;
out.activateOutput();
}
return n;
} else if (n == PN_ERR) {
throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
}
size_t Connection::encode(char* buffer, size_t size)
{
QPID_LOG(trace, "encode(" << size << ")")
ssize_t n = pn_transport_output(transport, buffer, size);
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
return n;
} else if (n == PN_EOS) {
haveOutput = size;
return size;//Is this right?
} else if (n == PN_ERR) {
throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
} else {
haveOutput = false;
return 0;
}
}
bool Connection::canEncode()
{
for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
if (i->second->dispatch()) haveOutput = true;
}
process();
//TODO: proper handling of time in and out of tick
pn_transport_tick(transport, 0);
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
}
void Connection::closed()
{
//TODO: tear down sessions and associated links
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
i->second->close();
}
}
bool Connection::isClosed() const
{
return pn_connection_state(connection) & PN_REMOTE_CLOSED;
}
framing::ProtocolVersion Connection::getVersion() const
{
return qpid::framing::ProtocolVersion(1,0);
}
namespace {
pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
}
void Connection::process()
{
QPID_LOG(trace, id << " process()");
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
QPID_LOG_CAT(debug, model, id << " connection opened");
pn_connection_set_container(connection, broker.getFederationTag().c_str());
pn_connection_open(connection);
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
QPID_LOG_CAT(debug, model, id << " session begun");
pn_session_open(s);
boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
sessions[s] = ssn;
}
for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
pn_link_open(l);
Sessions::iterator session = sessions.find(pn_link_session(l));
if (session == sessions.end()) {
QPID_LOG(error, id << " Link attached on unknown session!");
} else {
try {
session->second->attach(l);
QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
} catch (const std::exception& e) {
QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
//TODO: set error details on detach when that is exposed via engine API
pn_link_close(l);
}
}
}
//handle deliveries
for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
pn_link_t* link = pn_delivery_link(delivery);
if (pn_link_is_receiver(link)) {
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
i->second->incoming(link, delivery);
} else {
pn_delivery_update(delivery, PN_REJECTED);
}
} else { //i.e. SENDER
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
i->second->outgoing(link, delivery);
} else {
QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
}
}
}
for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) {
pn_link_close(l);
Sessions::iterator session = sessions.find(pn_link_session(l));
if (session == sessions.end()) {
QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
} else {
session->second->detach(l);
QPID_LOG_CAT(debug, model, id << " link detached");
}
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) {
pn_session_close(s);
Sessions::iterator i = sessions.find(s);
if (i != sessions.end()) {
i->second->close();
sessions.erase(i);
QPID_LOG_CAT(debug, model, id << " session ended");
} else {
QPID_LOG(error, id << " peer attempted to close unrecognised session");
}
}
if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
QPID_LOG_CAT(debug, model, id << " connection closed");
pn_connection_close(connection);
}
}
std::string Connection::getError()
{
std::stringstream text;
pn_error_t* cerror = pn_connection_error(connection);
if (cerror) text << "connection error " << pn_error_text(cerror);
pn_error_t* terror = pn_transport_error(transport);
if (terror) text << "transport error " << pn_error_text(terror);
return text.str();
}
}}} // namespace qpid::broker::amqp