blob: 2d3566a323970a20e9839c124c02bab1b6403e82 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Connection.h"
#include "DataReader.h"
#include "Session.h"
#include "Exception.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/Broker.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Timer.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/Version.h"
#include "config.h"
#include <sstream>
extern "C" {
#include <proton/engine.h>
#include <proton/error.h>
#ifdef HAVE_PROTON_EVENTS
#include <proton/event.h>
#endif
}
namespace qpid {
namespace broker {
namespace amqp {
namespace {
void do_trace(pn_transport_t* transport, const char* message)
{
Connection* c = reinterpret_cast<Connection*>(pn_transport_get_context(transport));
if (c) c->trace(message);
}
void set_tracer(pn_transport_t* transport, void* context)
{
pn_transport_set_context(transport, context);
pn_transport_set_tracer(transport, &do_trace);
}
#ifdef USE_PROTON_TRANSPORT_CONDITION
std::string get_error(pn_connection_t* connection, pn_transport_t* transport)
{
std::stringstream text;
pn_error_t* cerror = pn_connection_error(connection);
if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]";
pn_condition_t* tcondition = pn_transport_condition(transport);
if (pn_condition_is_set(tcondition)) text << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition);
return text.str();
}
#else
std::string get_error(pn_connection_t* connection, pn_transport_t* transport)
{
std::stringstream text;
pn_error_t* cerror = pn_connection_error(connection);
if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]";
pn_error_t* terror = pn_transport_error(transport);
if (terror) text << "transport error " << pn_error_text(terror) << " [" << terror << "]";
return text.str();
}
#endif
}
void Connection::trace(const char* message) const
{
QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message);
}
namespace {
struct ConnectionTickerTask : public qpid::sys::TimerTask
{
qpid::sys::Timer& timer;
Connection& connection;
ConnectionTickerTask(uint64_t interval, qpid::sys::Timer& t, Connection& c) :
TimerTask(qpid::sys::Duration(interval*qpid::sys::TIME_MSEC), "ConnectionTicker"),
timer(t),
connection(c)
{}
void fire() {
// Setup next firing
setupNextFire();
timer.add(this);
// Send Ticker
connection.requestIO();
}
};
const std::string ANONYMOUS_RELAY("ANONYMOUS-RELAY");
}
Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse, bool brokerInitiated)
: BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
connection(pn_connection()),
transport(pn_transport()),
collector(0),
out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false), ioRequested(false)
{
#ifdef HAVE_PROTON_EVENTS
collector = pn_collector();
pn_connection_collect(connection, collector);
#endif
if (pn_transport_bind(transport, connection)) {
//error
QPID_LOG(error, "Failed to bind transport to connection: " << getError());
}
out.activateOutput();
bool enableTrace(false);
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) {
pn_transport_trace(transport, PN_TRACE_FRM);
set_tracer(transport, this);
}
getBroker().getConnectionObservers().connection(*this);
if (!saslInUse) {
//feed in a dummy AMQP 1.0 header as engine expects one, but
//we already read it (if sasl is in use we read the sasl
//header,not the AMQP 1.0 header).
std::vector<char> protocolHeader(8);
qpid::framing::ProtocolInitiation pi(getVersion());
qpid::framing::Buffer buffer(&protocolHeader[0], protocolHeader.size());
pi.encode(buffer);
pn_transport_input(transport, &protocolHeader[0], protocolHeader.size());
setUserId("none");
}
}
void Connection::requestIO()
{
ioRequested = true;
out.activateOutput();
}
Connection::~Connection()
{
if (ticker) ticker->cancel();
getBroker().getConnectionObservers().closed(*this);
pn_connection_free(connection);
pn_transport_free(transport);
#ifdef HAVE_PROTON_EVENTS
pn_collector_free(collector);
#endif
}
pn_transport_t* Connection::getTransport()
{
return transport;
}
size_t Connection::decode(const char* buffer, size_t size)
{
QPID_LOG(trace, id << " decode(" << size << ")");
if (size == 0) return 0;
ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
// PN_EOS either means we received a Close (which also means we've
// consumed all the input), OR some Very Bad Thing happened and this
// connection is toast.
if (n == PN_EOS)
{
std::string error;
if (checkTransportError(error)) {
// "He's dead, Jim."
QPID_LOG_CAT(error, network, id << " connection failed: " << error);
out.abort();
return 0;
} else {
n = size; // assume all consumed
}
}
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
try {
process();
} catch (const Exception& e) {
QPID_LOG(error, id << ": " << e.what());
pn_condition_t* error = pn_connection_condition(connection);
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
close();
} catch (const std::exception& e) {
QPID_LOG(error, id << ": " << e.what());
pn_condition_t* error = pn_connection_condition(connection);
pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
pn_condition_set_description(error, e.what());
close();
}
// QPID-6698: don't use wallclock here, use monotonic clock
int64_t now = qpid::sys::Duration(qpid::sys::ZERO, qpid::sys::AbsTime::now());
pn_transport_tick(transport, now / int64_t(qpid::sys::TIME_MSEC));
if (!haveOutput) {
haveOutput = true;
out.activateOutput();
}
return n;
} else if (n == PN_ERR) {
std::string error;
checkTransportError(error);
QPID_LOG_CAT(error, network, id << " connection error: " << error);
out.abort();
return 0;
} else {
return 0;
}
}
size_t Connection::encode(char* buffer, size_t size)
{
QPID_LOG(trace, "encode(" << size << ")");
doOutput(size);
ssize_t n = pn_transport_output(transport, buffer, size);
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
if (ticker) ticker->restart();
return n;
} else if (n == PN_EOS) {
haveOutput = false;
// Normal close, or error?
std::string error;
if (checkTransportError(error)) {
QPID_LOG_CAT(error, network, id << " connection failed: " << error);
out.abort();
}
return 0;
} else if (n == PN_ERR) {
std::string error;
checkTransportError(error);
QPID_LOG_CAT(error, network, id << " connection error: " << error);
out.abort();
return 0;
} else {
haveOutput = false;
return 0;
}
}
void Connection::doOutput(size_t capacity)
{
ssize_t n = 0;
do {
if (dispatch()) {
processDeliveries();
ssize_t next = pn_transport_pending(transport);
if (n == next) break;
n = next;
} else break;
} while (n > 0 && n < (ssize_t) capacity);
}
bool Connection::dispatch()
{
bool result = false;
for (Sessions::iterator i = sessions.begin();i != sessions.end();) {
if (i->second->endedByManagement()) {
pn_session_close(i->first);
i->second->close();
sessions.erase(i++);
result = true;
QPID_LOG_CAT(debug, model, id << " session ended by management");
} else {
if (i->second->dispatch()) result = true;
++i;
}
}
return result;
}
bool Connection::canEncode()
{
if (!closeInitiated) {
if (closeRequested) {
close();
return true;
}
try {
if (dispatch()) haveOutput = true;
process();
} catch (const Exception& e) {
QPID_LOG(error, id << ": " << e.what());
pn_condition_t* error = pn_connection_condition(connection);
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
close();
haveOutput = true;
} catch (const std::exception& e) {
QPID_LOG(error, id << ": " << e.what());
pn_condition_t* error = pn_connection_condition(connection);
pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
pn_condition_set_description(error, e.what());
close();
haveOutput = true;
}
} else {
QPID_LOG(info, "Connection " << id << " has been closed locally");
}
if (ioRequested.valueCompareAndSwap(true, false)) haveOutput = true;
// QPID-6698: don't use wallclock here, use monotonic clock
int64_t now = qpid::sys::Duration(qpid::sys::ZERO, qpid::sys::AbsTime::now());
pn_transport_tick(transport, (now / int64_t(qpid::sys::TIME_MSEC)));
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
}
void Connection::open()
{
readPeerProperties();
pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
uint32_t timeout = pn_transport_get_remote_idle_timeout(transport);
if (timeout) {
// if idle generate empty frames at 1/2 the timeout interval as keepalives:
ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask((timeout/2)+1,
getBroker().getTimer(),
*this));
getBroker().getTimer().add(ticker);
// Note: in version 0-10 of the protocol, idle timeout applies to both
// ends. AMQP 1.0 changes that - it's now asymmetric: each end can
// configure/disable it independently. For backward compatibility, by
// default mimic the old behavior and set our local timeout.
// Use 2x the remote's timeout, as per the spec the remote should
// advertise 1/2 its actual timeout threshold
pn_transport_set_idle_timeout(transport, timeout * 2);
QPID_LOG_CAT(debug, network, id << " AMQP 1.0 idle-timeout set:"
<< " local=" << pn_transport_get_idle_timeout(transport)
<< " remote=" << pn_transport_get_remote_idle_timeout(transport));
}
pn_data_t* offered_capabilities = pn_connection_offered_capabilities(connection);
if (offered_capabilities) {
pn_data_put_array(offered_capabilities, false, PN_SYMBOL);
pn_data_enter(offered_capabilities);
pn_data_put_symbol(offered_capabilities, pn_bytes(ANONYMOUS_RELAY.size(), ANONYMOUS_RELAY.c_str()));
pn_data_exit(offered_capabilities);
pn_data_rewind(offered_capabilities);
}
// QPID-6592: put self-identifying information into the connection
// properties. Use keys defined by the 0-10 spec, as AMQP 1.0 has yet to
// define any.
//
pn_data_t *props = pn_connection_properties(connection);
if (props) {
boost::shared_ptr<const System> sysInfo = getBroker().getSystem();
pn_data_clear(props);
pn_data_put_map(props);
pn_data_enter(props);
pn_data_put_symbol(props, pn_bytes(7, "product"));
pn_data_put_string(props, pn_bytes(qpid::product.size(), qpid::product.c_str()));
pn_data_put_symbol(props, pn_bytes(7, "version"));
pn_data_put_string(props, pn_bytes(qpid::version.size(), qpid::version.c_str()));
if (sysInfo) {
std::string osName(sysInfo->getOsName());
std::string nodeName(sysInfo->getNodeName());
pn_data_put_symbol(props, pn_bytes(8, "platform"));
pn_data_put_string(props, pn_bytes(osName.size(), osName.c_str()));
pn_data_put_symbol(props, pn_bytes(4, "host"));
pn_data_put_string(props, pn_bytes(nodeName.size(), nodeName.c_str()));
}
pn_data_exit(props);
pn_data_rewind(props);
}
pn_connection_open(connection);
out.connectionEstablished();
opened();
getBroker().getConnectionObservers().opened(*this);
}
void Connection::readPeerProperties()
{
qpid::types::Variant::Map properties;
DataReader::read(pn_connection_remote_properties(connection), properties);
setPeerProperties(properties);
}
void Connection::closed()
{
if (ticker) ticker->cancel();
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
i->second->close();
}
}
void Connection::close()
{
if (!closeInitiated) {
closeInitiated = true;
closed();
QPID_LOG_CAT(debug, model, id << " connection closed");
pn_connection_close(connection);
}
}
bool Connection::isClosed() const
{
return pn_connection_state(connection) & PN_REMOTE_CLOSED;
}
framing::ProtocolVersion Connection::getVersion() const
{
return qpid::framing::ProtocolVersion(1,0);
}
void Connection::process()
{
QPID_LOG(trace, id << " process()");
#ifdef HAVE_PROTON_EVENTS
pn_event_t *event = pn_collector_peek(collector);
while (event) {
switch (pn_event_type(event)) {
case PN_CONNECTION_REMOTE_OPEN:
doConnectionRemoteOpen();
break;
case PN_CONNECTION_REMOTE_CLOSE:
doConnectionRemoteClose();
break;
case PN_SESSION_REMOTE_OPEN:
doSessionRemoteOpen(pn_event_session(event));
break;
case PN_SESSION_REMOTE_CLOSE:
doSessionRemoteClose(pn_event_session(event));
break;
case PN_LINK_REMOTE_OPEN:
QPID_LOG(notice, "Got link open event");
doLinkRemoteOpen(pn_event_link(event));
break;
case PN_LINK_REMOTE_DETACH:
doLinkRemoteDetach(pn_event_link(event), false);
break;
case PN_LINK_REMOTE_CLOSE:
doLinkRemoteClose(pn_event_link(event));
break;
case PN_DELIVERY:
doDeliveryUpdated(pn_event_delivery(event));
break;
default:
break;
}
pn_collector_pop(collector);
event = pn_collector_peek(collector);
}
#else // !HAVE_PROTON_EVENTS
const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE;
const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
doConnectionRemoteOpen();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
doSessionRemoteOpen(s);
}
for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) {
doLinkRemoteOpen(l);
}
processDeliveries();
for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE), *next = 0;
l; l = next) {
next = pn_link_next(l, REQUIRES_CLOSE);
doLinkRemoteClose(l);
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE), *next = 0;
s; s = next) {
next = pn_session_next(s, REQUIRES_CLOSE);
doSessionRemoteClose(s);
}
if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
doConnectionRemoteClose();
}
#endif // !HAVE_PROTON_EVENTS
}
void Connection::processDeliveries()
{
#ifdef HAVE_PROTON_EVENTS
// with the event API, there's no way to selectively process only
// the delivery-related events. We have to process all events:
process();
#else
for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) {
doDeliveryUpdated(delivery);
}
#endif
}
std::string Connection::getError()
{
return get_error(connection, transport);
}
void Connection::abort()
{
out.abort();
}
void Connection::setUserId(const std::string& user)
{
ManagedConnection::setUserId(user);
AclModule* acl = getBroker().getAcl();
if (acl && !acl->approveConnection(*this))
{
throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
}
}
void Connection::closedByManagement()
{
closeRequested = true;
out.activateOutput();
}
// the peer has issued an Open performative
void Connection::doConnectionRemoteOpen()
{
// respond in kind if we haven't yet
if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
QPID_LOG_CAT(debug, model, id << " connection opened");
open();
setContainerId(pn_connection_remote_container(connection));
}
}
// the peer has issued a Close performative
void Connection::doConnectionRemoteClose()
{
if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) {
QPID_LOG_CAT(debug, model, id << " connection closed");
pn_connection_close(connection);
}
}
// the peer has issued a Begin performative
void Connection::doSessionRemoteOpen(pn_session_t *session)
{
if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
QPID_LOG_CAT(debug, model, id << " session begun");
pn_session_open(session);
boost::shared_ptr<Session> ssn(new Session(session, *this, out));
sessions[session] = ssn;
}
}
// the peer has issued an End performative
void Connection::doSessionRemoteClose(pn_session_t *session)
{
if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) {
pn_session_close(session);
Sessions::iterator i = sessions.find(session);
if (i != sessions.end()) {
i->second->close();
sessions.erase(i);
QPID_LOG_CAT(debug, model, id << " session ended");
} else {
QPID_LOG(error, id << " peer attempted to close unrecognised session");
}
}
pn_session_free(session);
}
// the peer has issued an Attach performative
void Connection::doLinkRemoteOpen(pn_link_t *link)
{
if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) {
pn_link_open(link);
Sessions::iterator session = sessions.find(pn_link_session(link));
if (session == sessions.end()) {
QPID_LOG(error, id << " Link attached on unknown session!");
} else {
try {
session->second->attach(link);
QPID_LOG_CAT(debug, protocol, id << " link " << link << " attached on " << pn_link_session(link));
} catch (const Exception& e) {
QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
pn_condition_t* error = pn_link_condition(link);
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(link);
} catch (const qpid::framing::UnauthorizedAccessException& e) {
QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
pn_condition_t* error = pn_link_condition(link);
pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str());
pn_condition_set_description(error, e.what());
pn_link_close(link);
} catch (const std::exception& e) {
QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
pn_condition_t* error = pn_link_condition(link);
pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
pn_condition_set_description(error, e.what());
pn_link_close(link);
}
}
}
}
// the peer has issued a Detach performative with closed=true
void Connection::doLinkRemoteClose(pn_link_t *link)
{
doLinkRemoteDetach(link, true);
}
// the peer has issued a Detach performative
void Connection::doLinkRemoteDetach(pn_link_t *link, bool closed)
{
if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) {
if (closed) pn_link_close(link);
//pn_link_detach was only introduced after 0.7, as was the event interface:
#ifdef HAVE_PROTON_EVENTS
else pn_link_detach(link);
#endif
Sessions::iterator session = sessions.find(pn_link_session(link));
if (session == sessions.end()) {
QPID_LOG(error, id << " peer attempted to detach link on unknown session!");
} else {
session->second->detach(link, closed);
QPID_LOG_CAT(debug, model, id << " link detached");
}
}
pn_link_free(link);
}
// the status of the delivery has changed
void Connection::doDeliveryUpdated(pn_delivery_t *delivery)
{
pn_link_t* link = pn_delivery_link(delivery);
if (pn_link_state(link) & PN_LOCAL_CLOSED) return;
try {
if (pn_link_is_receiver(link)) {
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
i->second->readable(link, delivery);
} else {
pn_delivery_update(delivery, PN_REJECTED);
}
} else { //i.e. SENDER
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
i->second->writable(link, delivery);
} else {
QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
}
}
} catch (const Exception& e) {
QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what());
pn_condition_t* error = pn_link_condition(link);
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(link);
}
}
// check for failures of the transport:
bool Connection::checkTransportError(std::string& text)
{
std::stringstream info;
#ifdef USE_PROTON_TRANSPORT_CONDITION
pn_condition_t* tcondition = pn_transport_condition(transport);
if (pn_condition_is_set(tcondition))
info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition);
#else
pn_error_t* terror = pn_transport_error(transport);
if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]";
#endif
text = info.str();
return !text.empty();
}
}}} // namespace qpid::broker::amqp