blob: f6793ecae729320925dccc9008f08da41127dc19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/* $Rev$ $Date$ */
/**
* AMQP queue listener component implementation.
*/
#define WANT_HTTPD_LOG 1
#include "string.hpp"
#include "function.hpp"
#include "list.hpp"
#include "value.hpp"
#include "monad.hpp"
#include "parallel.hpp"
#include "qpid.hpp"
// Ignore conversion issues and redundant declarations in Qpid headers
#ifdef WANT_MAINTAINER_WARNINGS
#pragma GCC diagnostic ignored "-Wconversion"
#pragma GCC diagnostic ignored "-Wredundant-decls"
#endif
namespace tuscany {
namespace queue {
/**
* Start the component.
*/
const failable<value> start(const list<value>& params) {
// Extract the relay reference and the AMQP key and queue name
const value rel = car(params);
const value pk = ((lvvlambda)cadr(params))(nilListValue);
const value key = isList(pk)? (list<value>)pk : mklist<value>(pk);
const value qname = ((lvvlambda)caddr(params))(nilListValue);
// Create an AMQP session
QpidConnection qc(false);
QpidSession qs(qc, false);
// Declare the configured AMQP key / queue pair
declareQueue(key, qname, qs);
// Listen and relay messages in a worker thread
QpidSubscription qsub(qs, false);
const worker w(3);
const lambda<const bool(const value&, const value&)> rl = [rel](const value& k, const value& v) -> const bool {
// A relay function that posts the AMQP messages it receives to a relay component reference.
debug(k, "queue::relay::key");
debug(v, "queue::relay::value");
const value res = rel(mklist<value>("post", isList(k)? (list<value>)k : mklist<value>(k), v));
return true;
};
// Subscribe and listen to the AMQP queue.
const lambda<const failable<bool>()> subscribe = [qname, rl, qsub]() -> const failable<bool> {
const gc_pool pool;
debug(qname, "queue::subscribe::listen");
const failable<bool> r = listen(qname, rl, const_cast<QpidSubscription&>(qsub));
debug(qname, "queue::subscribe::stopped");
return r;
};
submit<failable<bool> >(w, subscribe);
// Return the listener component lambda function
const lvvlambda listener = [qc, qs, qsub, w](const list<value>& params) -> const value {
const tuscany::value func(car(params));
// Stop the component
if (func != "stop")
return mkfailure<value>();
debug("queue::listener::stop");
// TODO check why stop() and close() hang in child processes
stop(const_cast<QpidSubscription&>(qsub));
close(const_cast<QpidSession&>(qs));
close(const_cast<QpidConnection&>(qc));
cancel(const_cast<worker&>(w));
debug("queue::listener::stopped");
return failable<value>(value(lvvlambda()));
};
return value(listener);
}
}
}
extern "C" {
const tuscany::value apply(const tuscany::list<tuscany::value>& params) {
const tuscany::value func(car(params));
if (func == "start")
return tuscany::queue::start(cdr(params));
return tuscany::mkfailure<tuscany::value>();
}
}