blob: 8d253a670df3710f7639a631772b2f81b0e48aea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/* $Rev$ $Date$ */
/**
* AMQP queue listener component implementation.
*/
#define WANT_HTTPD_LOG 1
#include "string.hpp"
#include "function.hpp"
#include "list.hpp"
#include "value.hpp"
#include "monad.hpp"
#include "parallel.hpp"
#include "qpid.hpp"
// Ignore conversion issues and redundant declarations in Qpid headers
#ifdef WANT_MAINTAINER_WARNINGS
#pragma GCC diagnostic ignored "-Wconversion"
#pragma GCC diagnostic ignored "-Wredundant-decls"
#endif
namespace tuscany {
namespace queue {
/**
* A relay function that posts the AMQP messages it receives to a relay component reference.
*/
class relay {
public:
relay(const lambda<value(const list<value>&)>& rel) : rel(rel) {
}
const bool operator()(const value& k, const value& v) const {
debug(k, "queue::relay::key");
debug(v, "queue::relay::value");
const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
return true;
}
private:
const lambda<value(const list<value>&)> rel;
};
/**
* Subscribe and listen to an AMQP queue.
*/
class subscribe {
public:
subscribe(const string& qname, const lambda<bool(const value&, const value&)>& l, const QpidSubscription& qsub) : qname(qname), l(l), qsub(qsub) {
}
const failable<bool> operator()() const {
gc_pool pool;
debug(qname, "queue::subscribe::listen");
const failable<bool> r = listen(qname, l, const_cast<QpidSubscription&>(qsub));
debug(qname, "queue::subscribe::stopped");
return r;
}
private:
const string qname;
const lambda<bool(const value&, const value&)> l;
const QpidSubscription qsub;
};
/**
* Listener lambda function, responsible for starting an AMQP subscription in a worker thread, and
* apply any function calls to the listener component. The only supported function is stop(),
* called to stop the listener component and shutdown the worker thread.
*/
class listener {
public:
listener(QpidConnection& qc, QpidSession& qs, QpidSubscription& qsub, worker& w) : qc(qc), qs(qs), qsub(qsub), w(w) {
}
const value operator()(const list<value>& params) const {
const tuscany::value func(car(params));
// Stop the component
if (func != "stop")
return tuscany::mkfailure<tuscany::value>();
debug("queue::listener::stop");
// TODO check why stop() and close() hang in child processes
stop(const_cast<QpidSubscription&>(qsub));
close(const_cast<QpidSession&>(qs));
close(const_cast<QpidConnection&>(qc));
cancel(const_cast<worker&>(w));
debug("queue::listener::stopped");
return failable<value>(value(lambda<value(const list<value>&)>()));
}
private:
QpidConnection qc;
QpidSession qs;
QpidSubscription qsub;
worker w;
};
/**
* Start the component.
*/
const failable<value> start(const list<value>& params) {
// Extract the relay reference and the AMQP key and queue name
const value rel = car(params);
const value pk = ((lambda<value(list<value>)>)cadr(params))(list<value>());
const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
const value qname = ((lambda<value(list<value>)>)caddr(params))(list<value>());
// Create an AMQP session
QpidConnection qc(false);
QpidSession qs(qc, false);
// Declare the configured AMQP key / queue pair
declareQueue(key, qname, qs);
// Listen and relay messages in a worker thread
QpidSubscription qsub(qs, false);
worker w(3);
const lambda<bool(const value&, const value&)> rl = relay(rel);
submit<failable<bool> >(w, lambda<failable<bool>()>(subscribe(qname, rl, qsub)));
// Return the listener component lambda function
return value(lambda<value(const list<value>&)>(listener(qc, qs, qsub, w)));
}
}
}
extern "C" {
const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
const tuscany::value func(car(params));
if (func == "start")
return tuscany::queue::start(cdr(params));
return tuscany::mkfailure<tuscany::value>();
}
}