blob: 41dd9ff00cf38e0be6a85c946f26d536e7f8d65b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qmf/engine/ResilientConnection.h"
#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/ConnectionSettingsImpl.h"
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/Message.h>
#include <qpid/sys/Thread.h>
#include <qpid/sys/Runnable.h>
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Condition.h>
#include <qpid/sys/Time.h>
#include <qpid/log/Statement.h>
#include <qpid/RefCounted.h>
#include <boost/bind.hpp>
#include <string>
#include <deque>
#include <vector>
#include <set>
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <unistd.h>
#include <fcntl.h>
using namespace std;
using namespace qmf::engine;
using namespace qpid;
using qpid::sys::Mutex;
namespace qmf {
namespace engine {
struct ResilientConnectionEventImpl {
ResilientConnectionEvent::EventKind kind;
void* sessionContext;
string errorText;
MessageImpl message;
ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
const MessageImpl& m = MessageImpl()) :
kind(k), sessionContext(0), message(m) {}
ResilientConnectionEvent copy();
};
struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
typedef boost::intrusive_ptr<RCSession> Ptr;
ResilientConnectionImpl& connImpl;
string name;
client::Connection& connection;
client::Session session;
client::SubscriptionManager* subscriptions;
string userId;
void* userContext;
vector<string> dests;
qpid::sys::Thread thread;
RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
~RCSession();
void received(client::Message& msg);
void run();
void stop();
};
class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
public:
ResilientConnectionImpl(const ConnectionSettings& settings);
~ResilientConnectionImpl();
bool isConnected() const;
bool getEvent(ResilientConnectionEvent& event);
void popEvent();
bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
void destroySession(SessionHandle handle);
void sendMessage(SessionHandle handle, qmf::engine::Message& message);
void declareQueue(SessionHandle handle, char* queue);
void deleteQueue(SessionHandle handle, char* queue);
void bind(SessionHandle handle, char* exchange, char* queue, char* key);
void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
void setNotifyFd(int fd);
void notify();
void run();
void failure();
void sessionClosed(RCSession* sess);
void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
void* sessionContext = 0,
const MessageImpl& message = MessageImpl(),
const string& errorText = "");
private:
int notifyFd;
bool connected;
bool shutdown;
string lastError;
const ConnectionSettings settings;
client::Connection connection;
mutable qpid::sys::Mutex lock;
int delayMin;
int delayMax;
int delayFactor;
qpid::sys::Condition cond;
deque<ResilientConnectionEventImpl> eventQueue;
set<RCSession::Ptr> sessions;
qpid::sys::Thread connThread;
};
}
}
ResilientConnectionEvent ResilientConnectionEventImpl::copy()
{
ResilientConnectionEvent item;
::memset(&item, 0, sizeof(ResilientConnectionEvent));
item.kind = kind;
item.sessionContext = sessionContext;
item.message = message.copy();
item.errorText = const_cast<char*>(errorText.c_str());
return item;
}
RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
{
const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
userId = operSettings.username;
}
RCSession::~RCSession()
{
subscriptions->stop();
thread.join();
session.close();
delete subscriptions;
}
void RCSession::run()
{
try {
subscriptions->run();
} catch (exception& /*e*/) {
connImpl.sessionClosed(this);
}
}
void RCSession::stop()
{
subscriptions->stop();
}
void RCSession::received(client::Message& msg)
{
MessageImpl qmsg;
qmsg.body = msg.getData();
qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
if (dp.hasRoutingKey()) {
qmsg.routingKey = dp.getRoutingKey();
}
qpid::framing::MessageProperties mp = msg.getMessageProperties();
if (mp.hasReplyTo()) {
const qpid::framing::ReplyTo& rt = mp.getReplyTo();
qmsg.replyExchange = rt.getExchange();
qmsg.replyKey = rt.getRoutingKey();
}
if (mp.hasUserId()) {
qmsg.userId = mp.getUserId();
}
connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
}
ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
{
connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
}
ResilientConnectionImpl::~ResilientConnectionImpl()
{
shutdown = true;
connected = false;
cond.notify();
connThread.join();
connection.close();
}
bool ResilientConnectionImpl::isConnected() const
{
Mutex::ScopedLock _lock(lock);
return connected;
}
bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
{
Mutex::ScopedLock _lock(lock);
if (eventQueue.empty())
return false;
event = eventQueue.front().copy();
return true;
}
void ResilientConnectionImpl::popEvent()
{
Mutex::ScopedLock _lock(lock);
if (!eventQueue.empty())
eventQueue.pop_front();
}
bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
SessionHandle& handle)
{
Mutex::ScopedLock _lock(lock);
if (!connected)
return false;
RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
handle.impl = (void*) sess.get();
sessions.insert(sess);
return true;
}
void ResilientConnectionImpl::destroySession(SessionHandle handle)
{
Mutex::ScopedLock _lock(lock);
RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
set<RCSession::Ptr>::iterator iter = sessions.find(sess);
if (iter != sessions.end()) {
for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
sess->subscriptions->cancel(dIter->c_str());
sess->subscriptions->stop();
sess->subscriptions->wait();
sessions.erase(iter);
return;
}
}
void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
{
Mutex::ScopedLock _lock(lock);
RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
set<RCSession::Ptr>::iterator iter = sessions.find(sess);
qpid::client::Message msg;
string data(message.body, message.length);
msg.getDeliveryProperties().setRoutingKey(message.routingKey);
msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
if (settings.impl->getSendUserId())
msg.getMessageProperties().setUserId(sess->userId);
msg.setData(data);
try {
sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
} catch(exception& e) {
QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
sessions.erase(iter);
EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
}
}
void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
{
Mutex::ScopedLock _lock(lock);
RCSession* sess = (RCSession*) handle.impl;
sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
sess->subscriptions->subscribe(*sess, queue, queue);
sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
sess->dests.push_back(string(queue));
}
void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
{
Mutex::ScopedLock _lock(lock);
RCSession* sess = (RCSession*) handle.impl;
sess->session.queueDelete(client::arg::queue=queue);
for (vector<string>::iterator iter = sess->dests.begin();
iter != sess->dests.end(); iter++)
if (*iter == queue) {
sess->subscriptions->cancel(queue);
sess->dests.erase(iter);
break;
}
}
void ResilientConnectionImpl::bind(SessionHandle handle,
char* exchange, char* queue, char* key)
{
Mutex::ScopedLock _lock(lock);
RCSession* sess = (RCSession*) handle.impl;
sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
}
void ResilientConnectionImpl::unbind(SessionHandle handle,
char* exchange, char* queue, char* key)
{
Mutex::ScopedLock _lock(lock);
RCSession* sess = (RCSession*) handle.impl;
sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
}
void ResilientConnectionImpl::notify()
{
if (notifyFd != -1)
{
(void) ::write(notifyFd, ".", 1);
}
}
void ResilientConnectionImpl::setNotifyFd(int fd)
{
notifyFd = fd;
if (notifyFd > 0) {
int original = fcntl(notifyFd, F_GETFL);
fcntl(notifyFd, F_SETFL, O_NONBLOCK | original);
}
}
void ResilientConnectionImpl::run()
{
int delay(delayMin);
while (true) {
try {
QPID_LOG(trace, "Trying to open connection...");
connection.open(settings.impl->getClientSettings());
{
Mutex::ScopedLock _lock(lock);
connected = true;
EnqueueEvent(ResilientConnectionEvent::CONNECTED);
while (connected)
cond.wait(lock);
delay = delayMin;
while (!sessions.empty()) {
set<RCSession::Ptr>::iterator iter = sessions.begin();
RCSession::Ptr sess = *iter;
sessions.erase(iter);
EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
Mutex::ScopedUnlock _u(lock);
sess->stop();
// Nullify the intrusive pointer within the scoped unlock, otherwise,
// the reference is held until overwritted above (under lock) which causes
// the session destructor to be called with the lock held.
sess = 0;
}
EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
if (shutdown)
return;
}
connection.close();
} catch (exception &e) {
QPID_LOG(debug, "connection.open exception: " << e.what());
Mutex::ScopedLock _lock(lock);
lastError = e.what();
if (delay < delayMax)
delay *= delayFactor;
}
::qpid::sys::sleep(delay);
}
}
void ResilientConnectionImpl::failure()
{
Mutex::ScopedLock _lock(lock);
connected = false;
lastError = "Closed by Peer";
cond.notify();
}
void ResilientConnectionImpl::sessionClosed(RCSession*)
{
Mutex::ScopedLock _lock(lock);
connected = false;
lastError = "Closed due to Session failure";
cond.notify();
}
void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
void* sessionContext,
const MessageImpl& message,
const string& errorText)
{
{
Mutex::ScopedLock _lock(lock);
ResilientConnectionEventImpl event(kind, message);
event.sessionContext = sessionContext;
event.errorText = errorText;
eventQueue.push_back(event);
}
if (notifyFd != -1)
{
(void) ::write(notifyFd, ".", 1);
}
}
//==================================================================
// Wrappers
//==================================================================
ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
{
impl = new ResilientConnectionImpl(settings);
}
ResilientConnection::~ResilientConnection()
{
delete impl;
}
bool ResilientConnection::isConnected() const
{
return impl->isConnected();
}
bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
{
return impl->getEvent(event);
}
void ResilientConnection::popEvent()
{
impl->popEvent();
}
bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
{
return impl->createSession(name, sessionContext, handle);
}
void ResilientConnection::destroySession(SessionHandle handle)
{
impl->destroySession(handle);
}
void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
{
impl->sendMessage(handle, message);
}
void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
{
impl->declareQueue(handle, queue);
}
void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
{
impl->deleteQueue(handle, queue);
}
void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
{
impl->bind(handle, exchange, queue, key);
}
void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
{
impl->unbind(handle, exchange, queue, key);
}
void ResilientConnection::setNotifyFd(int fd)
{
impl->setNotifyFd(fd);
}
void ResilientConnection::notify()
{
impl->notify();
}