blob: 9203ce17e409b344cf2ae792438dd1b5750211b4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Session.h"
#include "Incoming.h"
#include "Outgoing.h"
#include "Message.h"
#include "Connection.h"
#include "Domain.h"
#include "Exception.h"
#include "Interconnects.h"
#include "Relay.h"
#include "Topic.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/amqp/Filter.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/format.hpp>
#include <map>
#include <sstream>
extern "C" {
#include <proton/engine.h>
}
namespace qpid {
namespace broker {
namespace amqp {
namespace {
pn_bytes_t convert(const std::string& s)
{
pn_bytes_t result;
result.start = const_cast<char*>(s.data());
result.size = s.size();
return result;
}
std::string convert(pn_bytes_t in)
{
return std::string(in.start, in.size);
}
//capabilities
const std::string CREATE_ON_DEMAND("create-on-demand");
const std::string DURABLE("durable");
const std::string QUEUE("queue");
const std::string TOPIC("topic");
const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
const std::string SHARED("shared");
void writeCapabilities(pn_data_t* out, const std::vector<std::string>& supported)
{
if (supported.size() == 1) {
pn_data_put_symbol(out, convert(supported.front()));
} else if (supported.size() > 1) {
pn_data_put_array(out, false, PN_SYMBOL);
pn_data_enter(out);
for (std::vector<std::string>::const_iterator i = supported.begin(); i != supported.end(); ++i) {
pn_data_put_symbol(out, convert(*i));
}
pn_data_exit(out);
}
}
template <class F>
void readCapabilities(pn_data_t* data, F f)
{
pn_data_rewind(data);
if (pn_data_next(data)) {
pn_type_t type = pn_data_type(data);
if (type == PN_ARRAY) {
pn_data_enter(data);
while (pn_data_next(data)) {
std::string s = convert(pn_data_get_symbol(data));
f(s);
}
pn_data_exit(data);
} else if (type == PN_SYMBOL) {
std::string s = convert(pn_data_get_symbol(data));
f(s);
} else {
QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
}
}
}
void matchCapability(const std::string& name, bool* result, const std::string& s)
{
if (s == name) *result = true;
}
bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
{
bool result(false);
readCapabilities(capabilities, boost::bind(&matchCapability, name, &result, _1));
return result;
}
void collectQueueCapabilities(boost::shared_ptr<Queue> node, std::vector<std::string>* supported, const std::string& s)
{
if (s == DURABLE) {
if (node->isDurable()) supported->push_back(s);
} else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
supported->push_back(s);
}
}
void collectExchangeCapabilities(boost::shared_ptr<Exchange> node, std::vector<std::string>* supported, const std::string& s)
{
if (s == DURABLE) {
if (node->isDurable()) supported->push_back(s);
} else if (s == SHARED) {
supported->push_back(s);
} else if (s == CREATE_ON_DEMAND || s == TOPIC) {
supported->push_back(s);
} else if (s == DIRECT_FILTER) {
if (node->getType() == DirectExchange::typeName) supported->push_back(s);
} else if (s == TOPIC_FILTER) {
if (node->getType() == TopicExchange::typeName) supported->push_back(s);
}
}
void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
{
std::vector<std::string> supported;
readCapabilities(in, boost::bind(&collectQueueCapabilities, node, &supported, _1));
writeCapabilities(out, supported);
}
void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
{
std::vector<std::string> supported;
readCapabilities(in, boost::bind(&collectExchangeCapabilities, node, &supported, _1));
writeCapabilities(out, supported);
}
}
class IncomingToQueue : public DecodingIncoming
{
public:
IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l, const std::string& source, bool icl)
: DecodingIncoming(l, b, p, source, q->getName(), pn_link_name(l)), queue(q), isControllingLink(icl)
{
queue->markInUse(isControllingLink);
}
~IncomingToQueue() { queue->releaseFromUse(isControllingLink); }
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
bool isControllingLink;
};
class IncomingToExchange : public DecodingIncoming
{
public:
IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source)
: DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
Authorise& authorise;
};
Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false),
authorise(connection.getUserId(), connection.getBroker().getAcl()) {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
{
ResolvedNode node;
node.exchange = connection.getBroker().getExchanges().find(name);
node.queue = connection.getBroker().getQueues().find(name);
node.topic = connection.getTopics().get(name);
bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus));
//Strictly speaking, properties should only be specified when the
//terminus is dynamic. However we will not enforce that here. If
//properties are set on the attach request, we will set them on
//our reply. This allows the 'create' and 'assert' options in the
//qpid messaging API to be implemented over 1.0.
node.properties.read(pn_terminus_properties(terminus));
if (node.topic) node.exchange = node.topic->getExchange();
if (node.exchange && !node.queue && createOnDemand) {
if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
//emulate 0-10 exchange-declare behaviour
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
}
}
if (!node.queue && !node.exchange) {
if (pn_terminus_is_dynamic(terminus) || createOnDemand) {
//is it a queue or an exchange?
if (node.properties.isQueue()) {
node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
} else {
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(node.properties.getProperties(), args);
node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
args, connection.getUserId(), connection.getId()).first;
}
} else {
size_t i = name.find('@');
if (i != std::string::npos && (i+1) < name.length()) {
std::string domain = name.substr(i+1);
std::string local = name.substr(0, i);
std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str();
//does this domain exist?
boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain);
if (d) {
node.relay = boost::shared_ptr<Relay>(new Relay(1000));
if (incoming) {
d->connect(false, id, name, local, connection, node.relay);
} else {
d->connect(true, id, local, name, connection, node.relay);
}
}
}
}
} else if (node.queue && node.topic) {
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic");
node.queue.reset();
} else if (node.queue && node.exchange) {
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
node.exchange.reset();
}
if (node.properties.isExclusive() && node.queue && node.queue->setExclusiveOwner(this)) {
exclusiveQueues.insert(node.queue);
}
return node;
}
std::string Session::generateName(pn_link_t* link)
{
std::stringstream s;
if (connection.getContainerId().empty()) {
s << qpid::types::Uuid(true);
} else {
s << connection.getContainerId();
}
s << "_" << pn_link_name(link);
return s.str();
}
std::string Session::qualifyName(const std::string& name)
{
if (connection.getDomain().empty()) {
return name;
} else {
std::stringstream s;
s << name << "@" << connection.getDomain();
return s.str();
}
}
void Session::attach(pn_link_t* link)
{
if (pn_link_is_sender(link)) {
pn_terminus_t* source = pn_link_remote_source(link);
//i.e a subscription
std::string name;
if (pn_terminus_get_type(source) == PN_UNSPECIFIED) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No source specified!");
} else if (pn_terminus_is_dynamic(source)) {
name = generateName(link);
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
pn_terminus_set_address(pn_link_source(link), qualifyName(name).c_str());
} else {
name = pn_terminus_get_address(source);
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
pn_terminus_set_address(pn_link_source(link), name.c_str());
}
setupOutgoing(link, source, name);
} else {
pn_terminus_t* target = pn_link_remote_target(link);
std::string name;
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!");
} else if (pn_terminus_is_dynamic(target)) {
name = generateName(link);
QPID_LOG(debug, "Received attach request for incoming link to " << name);
pn_terminus_set_address(pn_link_target(link), qualifyName(name).c_str());
} else {
name = pn_terminus_get_address(target);
QPID_LOG(debug, "Received attach request for incoming link to " << name);
pn_terminus_set_address(pn_link_target(link), name.c_str());
}
setupIncoming(link, target, name);
}
}
void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name)
{
ResolvedNode node = resolve(name, target, true);
//set capabilities
if (node.queue) {
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
authorise.incoming(node.queue);
node.properties.write(pn_terminus_properties(pn_link_target(link)), node.queue);
} else if (node.exchange) {
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
authorise.incoming(node.exchange);
node.properties.write(pn_terminus_properties(pn_link_target(link)), node.exchange);
}
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
if (!sourceAddress) {
sourceAddress = pn_terminus_get_address(pn_link_source(link));
}
std::string source;
if (sourceAddress) {
source = sourceAddress;
}
if (node.queue) {
boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink()));
incoming[link] = q;
} else if (node.exchange) {
boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
incoming[link] = e;
} else if (node.relay) {
boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay));
incoming[link] = in;
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);
}
if (connection.getBroker().getOptions().auth && !connection.isLink())
incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm);
QPID_LOG(debug, "Incoming link attached");
}
void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
{
ResolvedNode node = resolve(name, source, false);
if (node.queue) {
setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue);
} else if (node.exchange) {
setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange);
}
Filter filter;
filter.read(pn_terminus_filter(source));
const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link));
if (!targetAddress) {
targetAddress = pn_terminus_get_address(pn_link_target(link));
}
std::string target;
if (targetAddress) {
target = targetAddress;
}
if (node.queue) {
authorise.outgoing(node.queue);
SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY ? BROWSER : CONSUMER;
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
} else if (node.exchange) {
authorise.access(node.exchange);//do separate access check before trying to create the queue
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) != PN_SND_UNSETTLED;
QueueSettings settings(durable, autodelete);
std::string altExchange;
if (node.topic) {
settings = node.topic->getPolicy();
settings.durable = durable;
settings.autodelete = autodelete;
altExchange = node.topic->getAlternateExchange();
}
settings.autoDeleteDelay = pn_terminus_get_timeout(source);
if (settings.autoDeleteDelay) {
settings.autodelete = true;
settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay;
}
filter.configure(settings);
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
//overridden when access to link properties is provided
//(PROTON-335))
queueName << pn_link_name(link);
} else {
//combination of container id and link name is unique
queueName << connection.getContainerId() << "_" << pn_link_name(link);
}
boost::shared_ptr<qpid::broker::Queue> queue
= connection.getBroker().createQueue(queueName.str(), settings, this, altExchange, connection.getUserId(), connection.getId()).first;
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, CONSUMER, !shared, false));
q->init();
outgoing[link] = q;
} else if (node.relay) {
boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay));
outgoing[link] = out;
out->init();
} else {
pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
throw Exception(qpid::amqp::error_conditions::NOT_FOUND, std::string("Node not found: ") + name);/*not-found*/
}
filter.write(pn_terminus_filter(pn_link_source(link)));
QPID_LOG(debug, "Outgoing link attached");
}
/**
* Called for links initiated by the broker
*/
void Session::attach(pn_link_t* link, const std::string& src, const std::string& tgt, boost::shared_ptr<Relay> relay)
{
pn_terminus_t* source = pn_link_source(link);
pn_terminus_t* target = pn_link_target(link);
pn_terminus_set_address(source, src.c_str());
pn_terminus_set_address(target, tgt.c_str());
if (relay) {
if (pn_link_is_sender(link)) {
boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay));
outgoing[link] = out;
out->init();
} else {
boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay));
incoming[link] = in;
}
} else {
if (pn_link_is_sender(link)) {
setupOutgoing(link, source, src);
} else {
setupIncoming(link, target, tgt);
}
}
}
void Session::detach(pn_link_t* link)
{
if (pn_link_is_sender(link)) {
OutgoingLinks::iterator i = outgoing.find(link);
if (i != outgoing.end()) {
i->second->detached();
boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
if (q) connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId());
outgoing.erase(i);
QPID_LOG(debug, "Outgoing link detached");
}
} else {
IncomingLinks::iterator i = incoming.find(link);
if (i != incoming.end()) {
i->second->detached();
incoming.erase(i);
QPID_LOG(debug, "Incoming link detached");
}
}
}
void Session::accepted(pn_delivery_t* delivery, bool sync)
{
if (sync) {
//this is on IO thread
pn_delivery_update(delivery, PN_ACCEPTED);
pn_delivery_settle(delivery);//do we need to check settlement modes/orders?
incomingMessageAccepted();
} else {
//this is not on IO thread, need to delay processing until on IO thread
qpid::sys::Mutex::ScopedLock l(lock);
if (!deleted) {
completed.push_back(delivery);
out.activateOutput();
}
}
}
void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
{
pn_delivery_tag_t tag = pn_delivery_tag(delivery);
QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size));
incomingMessageReceived();
IncomingLinks::iterator target = incoming.find(link);
if (target == incoming.end()) {
QPID_LOG(error, "Received message on unknown link");
pn_delivery_update(delivery, PN_REJECTED);
pn_delivery_settle(delivery);//do we need to check settlement modes/orders?
incomingMessageRejected();
} else {
target->second->readable(delivery);
if (target->second->haveWork()) out.activateOutput();
}
}
void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
{
OutgoingLinks::iterator sender = outgoing.find(link);
if (sender == outgoing.end()) {
QPID_LOG(error, "Delivery returned for unknown link");
} else {
sender->second->handle(delivery);
}
}
bool Session::dispatch()
{
bool output(false);
for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
try {
if (s->second->doWork()) output = true;
++s;
} catch (const Exception& e) {
pn_condition_t* error = pn_link_condition(s->first);
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(s->first);
s->second->detached();
outgoing.erase(s++);
output = true;
}
}
if (completed.size()) {
output = true;
std::deque<pn_delivery_t*> copy;
{
qpid::sys::Mutex::ScopedLock l(lock);
completed.swap(copy);
}
for (std::deque<pn_delivery_t*>::iterator i = copy.begin(); i != copy.end(); ++i) {
accepted(*i, true);
}
}
for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end();) {
try {
if (i->second->doWork()) output = true;
++i;
} catch (const Exception& e) {
pn_condition_t* error = pn_link_condition(i->first);
pn_condition_set_name(error, e.symbol());
pn_condition_set_description(error, e.what());
pn_link_close(i->first);
i->second->detached();
incoming.erase(i++);
output = true;
}
}
return output;
}
void Session::close()
{
for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
i->second->detached();
}
for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) {
i->second->detached();
}
outgoing.clear();
incoming.clear();
QPID_LOG(debug, "Session " << session << " closed, all links detached.");
for (std::set< boost::shared_ptr<Queue> >::const_iterator i = exclusiveQueues.begin(); i != exclusiveQueues.end(); ++i) {
(*i)->releaseExclusiveOwnership();
}
exclusiveQueues.clear();
qpid::sys::Mutex::ScopedLock l(lock);
deleted = true;
}
void Session::wakeup()
{
out.activateOutput();
}
Authorise& Session::getAuthorise()
{
return authorise;
}
void IncomingToQueue::handle(qpid::broker::Message& message)
{
if (queue->isDeleted()) {
std::stringstream msg;
msg << " Queue " << queue->getName() << " has been deleted";
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str());
}
queue->deliver(message);
}
void IncomingToExchange::handle(qpid::broker::Message& message)
{
authorise.route(exchange, message);
DeliverableMessage deliverable(message, 0);
exchange->route(deliverable);
if (!deliverable.delivered) {
if (exchange->getAlternate()) {
exchange->getAlternate()->route(deliverable);
}
}
}
}}} // namespace qpid::broker::amqp