blob: 076d9f124131f4449dab76f3400388b420c9adaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/* $Rev$ $Date$ */
#ifndef tuscany_qpid_hpp
#define tuscany_qpid_hpp
/**
* AMQP queue access functions.
*/
// Ignore conversion issues and redundant declarations in Qpid headers
#ifdef WANT_MAINTAINER_WARNINGS
#pragma GCC diagnostic ignored "-Wconversion"
#pragma GCC diagnostic ignored "-Wredundant-decls"
#endif
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include "string.hpp"
#include "list.hpp"
#include "value.hpp"
#include "monad.hpp"
#include "../../modules/scheme/eval.hpp"
namespace tuscany {
namespace queue {
/**
* Represents a Qpid connection.
*/
class QpidConnection {
public:
QpidConnection() : owner(true) {
debug("queue::qpidonnection");
c.open("localhost", 5672);
}
QpidConnection(const bool owner) : owner(owner) {
debug("queue::qpidonnection");
c.open("localhost", 5672);
}
QpidConnection(const QpidConnection& qc) : owner(false), c(qc.c) {
debug("queue::qpidonnection::copy");
}
QpidConnection& operator=(const QpidConnection& qc) = delete;
~QpidConnection() {
if (!owner)
return;
c.close();
}
private:
friend const failable<bool> close(QpidConnection& qc);
friend class QpidSession;
const bool owner;
qpid::client::Connection c;
};
/**
* Close a Qpid connection.
*/
const failable<bool> close(QpidConnection& qc) {
qc.c.close();
return true;
}
/**
* Represents a Qpid session.
*/
class QpidSession {
public:
QpidSession(QpidConnection& qc) : owner(true), s(qc.c.newSession()) {
debug("queue::qpidsession");
}
QpidSession(QpidConnection& qc, const bool owner) : owner(owner), s(qc.c.newSession()) {
debug("queue::qpidsession");
}
QpidSession(const QpidSession& qs) : owner(false), s(qs.s) {
debug("queue::qpidsession::copy");
}
~QpidSession() {
if (!owner)
return;
s.close();
}
private:
friend const failable<bool> close(QpidSession& qs);
friend const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs);
friend const failable<bool> post(const value& key, const value& val, QpidSession& qs);
friend class QpidSubscription;
const bool owner;
qpid::client::Session s;
};
/**
* Close a Qpid session.
*/
const failable<bool> close(QpidSession& qs) {
try {
qs.s.close();
} catch (const qpid::Exception& e) {
return mkfailure<bool>(string("Qpid failure: ") + e.what());
}
return true;
}
/**
* Declare a key / AMQP queue pair.
*/
const failable<bool> declareQueue(const value& key, const string& name, QpidSession& qs) {
const string ks(write(content(scheme::writeValue(key))));
try {
qs.s.queueDeclare(qpid::client::arg::queue=c_str(name));
qs.s.exchangeBind(qpid::client::arg::exchange="amq.direct", qpid::client::arg::queue=c_str(name), qpid::client::arg::bindingKey=c_str(ks));
} catch (const qpid::Exception& e) {
return mkfailure<bool>(string("Qpid failure: ") + e.what());
}
return true;
}
/**
* Post a key / value pair message to an AMQP broker.
*/
const failable<bool> post(const value& key, const value& val, QpidSession& qs) {
// Send in a message with the given key.
const string ks(write(content(scheme::writeValue(key))));
const string vs(write(content(scheme::writeValue(val))));
try {
qpid::client::Message message;
message.getDeliveryProperties().setRoutingKey(c_str(ks));
message.setData(c_str(vs));
qs.s.messageTransfer(qpid::client::arg::content=message, qpid::client::arg::destination="amq.direct");
} catch (const qpid::Exception& e) {
return mkfailure<bool>(string("Qpid failure: ") + e.what());
}
return true;
}
/**
* Represents a Qpid subscription.
*/
class QpidSubscription {
public:
QpidSubscription(QpidSession& qs) : owner(true), subs(qs.s) {
}
QpidSubscription(QpidSession& qs, const bool owner) : owner(owner), subs(qs.s) {
}
QpidSubscription(const QpidSubscription& qsub) : owner(false), subs(qsub.subs) {
}
~QpidSubscription() {
if (!owner)
return;
try {
subs.stop();
} catch (const qpid::Exception& e) {
mkfailure<bool>(string("Qpid failure: ") + e.what());
}
}
private:
friend const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub);
friend const failable<bool> stop(QpidSubscription& qsub);
const bool owner;
qpid::client::SubscriptionManager subs;
};
/**
* Register a listener function with an AMQP queue.
*/
class Listener : public qpid::client::MessageListener {
public:
Listener(const lambda<const bool(const value&, const value&)> l, qpid::client::SubscriptionManager& subs) : l(l), subs(subs) {
}
virtual void received(qpid::client::Message& msg) {
// Call the listener function
const value k(content(scheme::readValue(msg.getDeliveryProperties().getRoutingKey().c_str())));
const value v(content(scheme::readValue(msg.getData().c_str())));
const bool r = l(k, v);
if (!r) {
try {
subs.cancel(msg.getDestination());
} catch (const qpid::Exception& e) {
mkfailure<bool>(string("Qpid failure: ") + e.what());
}
}
}
private:
const lambda<const bool(const value&, const value&)> l;
qpid::client::SubscriptionManager& subs;
};
const failable<bool> listen(const string& name, const lambda<const bool(const value&, const value&)>& l, QpidSubscription& qsub) {
debug("queue::listen");
Listener listener(l, qsub.subs);
try {
qsub.subs.subscribe(listener, c_str(name));
qsub.subs.run();
} catch (const qpid::Exception& e) {
return mkfailure<bool>(string("Qpid failure: ") + e.what());
}
debug("queue::listen::stopped");
return true;
}
/**
* Stop an AMQP subscription.
*/
const failable<bool> stop(QpidSubscription& qsub) {
debug("queue::stop");
try {
qsub.subs.stop();
} catch (const qpid::Exception& e) {
return mkfailure<bool>(string("Qpid failure: ") + e.what());
}
debug("queue::stopped");
return true;
}
}
}
// Re-enable conversion and redundant declarations warnings
#ifdef WANT_MAINTAINER_WARNINGS
#pragma GCC diagnostic warning "-Wconversion"
#pragma GCC diagnostic warning "-Wredundant-decls"
#endif
#endif /* tuscany_qpid_hpp */