blob: b2a9b979b6d2cd27b9a375d6fe7a42fc74ff75b7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "ConnectionContext.h"
#include "DriverImpl.h"
#include "ReceiverContext.h"
#include "Sasl.h"
#include "SenderContext.h"
#include "SessionContext.h"
#include "Transport.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include <vector>
extern "C" {
#include <proton/engine.h>
}
namespace qpid {
namespace messaging {
namespace amqp {
ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o)
: qpid::messaging::ConnectionOptions(o),
url(u),
engine(pn_transport()),
connection(pn_connection()),
//note: disabled read/write of header as now handled by engine
writeHeader(false),
readHeader(false),
haveOutput(false),
state(DISCONNECTED),
codecSwitch(*this)
{
if (pn_transport_bind(engine, connection)) {
//error
}
pn_connection_set_container(connection, "qpid::messaging");//TODO: take this from a connection option
bool enableTrace(false);
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
}
ConnectionContext::~ConnectionContext()
{
close();
sessions.clear();
pn_transport_free(engine);
pn_connection_free(connection);
}
namespace {
const std::string COLON(":");
}
void ConnectionContext::open()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
if (!driver) driver = DriverImpl::getDefault();
for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) {
transport = driver->getTransport(i->protocol, *this);
std::stringstream port;
port << i->port;
id = i->host + COLON + port.str();
if (useSasl()) {
sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host));
}
state = CONNECTING;
try {
QPID_LOG(debug, id << " Connecting ...");
transport->connect(i->host, port.str());
} catch (const std::exception& e) {
QPID_LOG(info, id << " Error while connecting: " << e.what());
}
while (state == CONNECTING) {
lock.wait();
}
if (state == DISCONNECTED) {
QPID_LOG(debug, id << " Failed to connect");
transport = boost::shared_ptr<Transport>();
} else {
QPID_LOG(debug, id << " Connected");
}
}
if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url));
if (sasl.get()) {
wakeupDriver();
while (!sasl->authenticated()) {
QPID_LOG(debug, id << " Waiting to be authenticated...");
wait();
}
QPID_LOG(debug, id << " Authenticated");
}
QPID_LOG(debug, id << " Opening...");
pn_connection_open(connection);
wakeupDriver(); //want to write
while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
wait();
}
if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
throw qpid::messaging::ConnectionError("Failed to open connection");
}
QPID_LOG(debug, id << " Opened");
}
bool ConnectionContext::isOpen() const
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_session_close(ssn->session);
//TODO: need to destroy session and remove context from map
wakeupDriver();
}
void ConnectionContext::close()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
//wait for outstanding sends to settle
while (!i->second->settled()) {
QPID_LOG(debug, "Waiting for sends to settle before closing");
wait();//wait until message has been confirmed
}
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
}
pn_connection_close(connection);
wakeupDriver();
//wait for close to be confirmed by peer?
while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
wait();
}
sessions.clear();
}
transport->close();
while (state != DISCONNECTED) {
lock.wait();
}
}
bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (!lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
}
}
if (get(ssn, lnk, message, timeout)) {
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
wakeupDriver();
}
return true;
} else {
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait();
}
if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
pn_link_flow(lnk->receiver, lnk->capacity);
}
}
if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
}
return true;
} else {
return false;
}
}
}
qpid::sys::AbsTime convert(qpid::messaging::Duration timeout)
{
qpid::sys::AbsTime until;
uint64_t ms = timeout.getMilliseconds();
if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) {
return qpid::sys::AbsTime(qpid::sys::now(), ms * qpid::sys::TIME_MSEC);
} else {
return qpid::sys::FAR_FUTURE;
}
}
bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
qpid::sys::AbsTime until(convert(timeout));
while (true) {
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
if (current) {
qpid::messaging::MessageImpl& impl = MessageImplAccess::get(message);
boost::shared_ptr<EncodedMessage> encoded(new EncodedMessage(pn_delivery_pending(current)));
ssize_t read = pn_link_recv(lnk->receiver, encoded->getData(), encoded->getSize());
if (read < 0) throw qpid::messaging::MessagingException("Failed to read message");
encoded->trim((size_t) read);
QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: ");
encoded->init(impl);
impl.setEncoded(encoded);
impl.setInternalId(ssn->record(current));
pn_link_advance(lnk->receiver);
return true;
} else if (until > qpid::sys::now()) {
wait();
} else {
return false;
}
}
return false;
}
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (message) {
ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
} else {
ssn->acknowledge();
}
wakeupDriver();
}
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
lnk->configure();
attach(ssn->session, (pn_link_t*) lnk->sender);
if (!pn_link_remote_target((pn_link_t*) lnk->sender)) {
std::string msg("No such target : ");
msg += lnk->getTarget();
throw qpid::messaging::NotFound(msg);
}
}
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
lnk->configure();
attach(ssn->session, lnk->receiver, lnk->capacity);
if (!pn_link_remote_source(lnk->receiver)) {
std::string msg("No such source : ");
msg += lnk->getSource();
throw qpid::messaging::NotFound(msg);
}
}
void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(debug, "Attaching link " << link << ", state=" << pn_link_state(link));
pn_link_open(link);
QPID_LOG(debug, "Link attached " << link << ", state=" << pn_link_state(link));
if (credit) pn_link_flow(link, credit);
wakeupDriver();
while (pn_link_state(link) & PN_REMOTE_UNINIT) {
QPID_LOG(debug, "waiting for confirmation of link attach for " << link << ", state=" << pn_link_state(link));
wait();
}
}
void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
SenderContext::Delivery* delivery(0);
while (!(delivery = snd->send(message))) {
QPID_LOG(debug, "Waiting for capacity...");
wait();//wait for capacity
}
wakeupDriver();
if (sync) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
wait();//wait until message has been confirmed
}
}
}
void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
sender->setCapacity(capacity);
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return sender->getCapacity();
}
uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return sender->getUnsettled();
}
void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
receiver->setCapacity(capacity);
pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
wakeupDriver();
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return receiver->getCapacity();
}
uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return receiver->getAvailable();
}
uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return receiver->getUnsettled();
}
void ConnectionContext::activateOutput()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
wakeupDriver();
}
/**
* Expects lock to be held by caller
*/
void ConnectionContext::wakeupDriver()
{
switch (state) {
case CONNECTED:
haveOutput = true;
transport->activateOutput();
QPID_LOG(debug, "wakeupDriver()");
break;
case DISCONNECTED:
case CONNECTING:
QPID_LOG(error, "wakeupDriver() called while not connected");
break;
}
}
void ConnectionContext::wait()
{
lock.wait();
if (state == DISCONNECTED) {
throw qpid::messaging::TransportFailure("Disconnected");
}
//check for any closed links, sessions or indeed the connection
}
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported");
std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n;
SessionMap::const_iterator i = sessions.find(name);
if (i == sessions.end()) {
boost::shared_ptr<SessionContext> s(new SessionContext(connection));
s->session = pn_session(connection);
pn_session_open(s->session);
sessions[name] = s;
wakeupDriver();
while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
wait();
}
return s;
} else {
throw qpid::messaging::KeyError(std::string("Session already exists: ") + name);
}
}
boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const
{
SessionMap::const_iterator i = sessions.find(name);
if (i == sessions.end()) {
throw qpid::messaging::KeyError(std::string("No such session") + name);
} else {
return i->second;
}
}
void ConnectionContext::setOption(const std::string& name, const qpid::types::Variant& value)
{
set(name, value);
}
std::string ConnectionContext::getAuthenticatedUsername()
{
return sasl.get() ? sasl->getAuthenticatedUsername() : std::string();
}
std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(trace, id << " decode(" << size << ")");
if (readHeader) {
size_t decoded = readProtocolHeader(buffer, size);
if (decoded < size) {
decoded += decode(buffer + decoded, size - decoded);
}
return decoded;
}
//TODO: Fix pn_engine_input() to take const buffer
ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
//If engine returns EOS, have no way of knowing how many bytes
//it processed, but can assume none need to be reprocessed so
//consider them all read:
if (n == PN_EOS) n = size;
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
pn_transport_tick(engine, 0);
lock.notifyAll();
return n;
} else if (n == PN_ERR) {
throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
}
std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(trace, id << " encode(" << size << ")");
if (writeHeader) {
size_t encoded = writeProtocolHeader(buffer, size);
if (encoded < size) {
encoded += encode(buffer + encoded, size - encoded);
}
return encoded;
}
ssize_t n = pn_transport_output(engine, buffer, size);
if (n > 0) {
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
return n;
} else if (n == PN_ERR) {
throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
} else if (n == PN_EOS) {
haveOutput = false;
return 0;//Is this right?
} else {
haveOutput = false;
return 0;
}
}
bool ConnectionContext::canEncode()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return haveOutput && state == CONNECTED;
}
void ConnectionContext::closed()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
state = DISCONNECTED;
lock.notifyAll();
}
void ConnectionContext::opened()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
state = CONNECTED;
lock.notifyAll();
}
bool ConnectionContext::isClosed() const
{
return !isOpen();
}
namespace {
qpid::framing::ProtocolVersion AMQP_1_0_PLAIN(1,0,qpid::framing::ProtocolVersion::AMQP);
}
std::string ConnectionContext::getError()
{
std::stringstream text;
pn_error_t* cerror = pn_connection_error(connection);
if (cerror) text << "connection error " << pn_error_text(cerror);
pn_error_t* terror = pn_transport_error(engine);
if (terror) text << "transport error " << pn_error_text(terror);
return text.str();
}
framing::ProtocolVersion ConnectionContext::getVersion() const
{
return AMQP_1_0_PLAIN;
}
std::size_t ConnectionContext::readProtocolHeader(const char* buffer, std::size_t size)
{
framing::ProtocolInitiation pi(getVersion());
if (size >= pi.encodedSize()) {
readHeader = false;
qpid::framing::Buffer out(const_cast<char*>(buffer), size);
pi.decode(out);
QPID_LOG_CAT(debug, protocol, id << " read protocol header: " << pi);
return pi.encodedSize();
} else {
return 0;
}
}
std::size_t ConnectionContext::writeProtocolHeader(char* buffer, std::size_t size)
{
framing::ProtocolInitiation pi(getVersion());
if (size >= pi.encodedSize()) {
QPID_LOG_CAT(debug, protocol, id << " writing protocol header: " << pi);
writeHeader = false;
qpid::framing::Buffer out(buffer, size);
pi.encode(out);
return pi.encodedSize();
} else {
QPID_LOG_CAT(debug, protocol, id << " insufficient buffer for protocol header: " << size)
return 0;
}
}
bool ConnectionContext::useSasl()
{
return !(mechanism == "none" || mechanism == "NONE" || mechanism == "None");
}
qpid::sys::Codec& ConnectionContext::getCodec()
{
return codecSwitch;
}
ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
size_t decoded = 0;
if (parent.sasl.get() && !parent.sasl->authenticated()) {
decoded = parent.sasl->decode(buffer, size);
if (!parent.sasl->authenticated()) return decoded;
}
if (decoded < size) {
if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
else decoded += parent.decode(buffer+decoded, size-decoded);
}
return decoded;
}
std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
size_t encoded = 0;
if (parent.sasl.get() && parent.sasl->canEncode()) {
encoded += parent.sasl->encode(buffer, size);
if (!parent.sasl->authenticated()) return encoded;
}
if (encoded < size) {
if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
else encoded += parent.encode(buffer+encoded, size-encoded);
}
return encoded;
}
bool ConnectionContext::CodecSwitch::canEncode()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
if (parent.sasl.get()) {
if (parent.sasl->canEncode()) return true;
else if (!parent.sasl->authenticated()) return false;
else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
}
return parent.canEncode();
}
}}} // namespace qpid::messaging::amqp