blob: e9885f546244e5ccaacd41b15265ace45b55deb9 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
#include <iostream>
#include <boost/format.hpp>
using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
using std::pair;
using std::stringstream;
using boost::intrusive_ptr;
using boost::format;
using boost::str;
namespace _qmf = qmf::org::apache::qpid::broker;
#define LINK_MAINT_INTERVAL 2
// TODO: This constructor is only used by the store unit tests -
// That probably indicates that LinkRegistry isn't correctly
// factored: The persistence element and maintenance element
// should be factored separately
LinkRegistry::LinkRegistry () :
broker(0), timer(0),
parent(0), store(0), passive(false), passiveChanged(false),
realm("")
{
}
LinkRegistry::LinkRegistry (Broker* _broker) :
broker(_broker), timer(&broker->getTimer()),
maintenanceTask(new Periodic(*this)),
parent(0), store(0), passive(false), passiveChanged(false),
realm(broker->getOptions().realm)
{
timer->add(maintenanceTask);
}
LinkRegistry::~LinkRegistry()
{
// This test is only necessary if the default constructor above is present
if (maintenanceTask)
maintenanceTask->cancel();
}
LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
TimerTask (Duration (LINK_MAINT_INTERVAL * TIME_SEC),"LinkRegistry"), links(_links) {}
void LinkRegistry::Periodic::fire ()
{
links.periodicMaintenance ();
setupNextFire();
links.timer->add(this);
}
void LinkRegistry::periodicMaintenance ()
{
Mutex::ScopedLock locker(lock);
linksToDestroy.clear();
bridgesToDestroy.clear();
if (passiveChanged) {
if (passive) { QPID_LOG(info, "Passivating links"); }
else { QPID_LOG(info, "Activating links"); }
for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
i->second->setPassive(passive);
}
passiveChanged = false;
}
for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
i->second->maintenanceVisit();
//now process any requests for re-addressing
for (AddressMap::iterator i = reMappings.begin(); i != reMappings.end(); i++)
updateAddress(i->first, i->second);
reMappings.clear();
}
void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
{
//done on periodic maintenance thread; hold changes in separate
//map to avoid modifying the link map that is iterated over
reMappings[createKey(oldAddress)] = newAddress;
}
bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress)
{
std::string newKey = createKey(newAddress);
if (links.find(newKey) != links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " to " << newKey << " which is already in use");
return false;
} else {
LinkMap::iterator i = links.find(oldKey);
if (i == links.end()) {
QPID_LOG(error, "Attempted to update key from " << oldKey << " which does not exist, to " << newKey);
return false;
} else {
links[newKey] = i->second;
i->second->reconnect(newAddress);
links.erase(oldKey);
QPID_LOG(info, "Updated link key from " << oldKey << " to " << newKey);
return true;
}
}
}
pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
uint16_t port,
string& transport,
bool durable,
string& authMechanism,
string& username,
string& password)
{
Mutex::ScopedLock locker(lock);
string key = createKey(host, port);
LinkMap::iterator i = links.find(key);
if (i == links.end())
{
Link::shared_ptr link;
link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
authMechanism, username, password,
broker, parent));
if (passive) link->setPassive(true);
links[key] = link;
return std::pair<Link::shared_ptr, bool>(link, true);
}
return std::pair<Link::shared_ptr, bool>(i->second, false);
}
pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
uint16_t port,
bool durable,
std::string& src,
std::string& dest,
std::string& key,
bool isQueue,
bool isLocal,
std::string& tag,
std::string& excludes,
bool dynamic,
uint16_t sync)
{
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
string linkKey = createKey(host, port);
stringstream keystream;
keystream << linkKey << "!" << src << "!" << dest << "!" << key;
string bridgeKey = keystream.str();
LinkMap::iterator l = links.find(linkKey);
if (l == links.end())
return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
BridgeMap::iterator b = bridges.find(bridgeKey);
if (b == bridges.end())
{
_qmf::ArgsLinkBridge args;
Bridge::shared_ptr bridge;
args.i_durable = durable;
args.i_src = src;
args.i_dest = dest;
args.i_key = key;
args.i_srcIsQueue = isQueue;
args.i_srcIsLocal = isLocal;
args.i_tag = tag;
args.i_excludes = excludes;
args.i_dynamic = dynamic;
args.i_sync = sync;
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
boost::bind(&LinkRegistry::destroy, this,
host, port, src, dest, key), args));
bridges[bridgeKey] = bridge;
l->second->add(bridge);
return std::pair<Bridge::shared_ptr, bool>(bridge, true);
}
return std::pair<Bridge::shared_ptr, bool>(b->second, false);
}
void LinkRegistry::destroy(const string& host, const uint16_t port)
{
Mutex::ScopedLock locker(lock);
string key = createKey(host, port);
LinkMap::iterator i = links.find(key);
if (i != links.end())
{
if (i->second->isDurable() && store)
store->destroy(*(i->second));
linksToDestroy[key] = i->second;
links.erase(i);
}
}
void LinkRegistry::destroy(const std::string& host,
const uint16_t port,
const std::string& src,
const std::string& dest,
const std::string& key)
{
Mutex::ScopedLock locker(lock);
string linkKey = createKey(host, port);
stringstream keystream;
keystream << linkKey << "!" << src << "!" << dest << "!" << key;
string bridgeKey = keystream.str();
LinkMap::iterator l = links.find(linkKey);
if (l == links.end())
return;
BridgeMap::iterator b = bridges.find(bridgeKey);
if (b == bridges.end())
return;
l->second->cancel(b->second);
if (b->second->isDurable())
store->destroy(*(b->second));
bridgesToDestroy[bridgeKey] = b->second;
bridges.erase(b);
}
void LinkRegistry::setStore (MessageStore* _store)
{
store = _store;
}
MessageStore* LinkRegistry::getStore() const {
return store;
}
Link::shared_ptr LinkRegistry::findLink(const std::string& keyOrMgmtId)
{
// Convert keyOrMgmtId to a host:port key.
//
// TODO aconway 2011-02-01: centralize code that constructs/parses
// connection management IDs. Currently sys:: protocol factories
// and IO plugins construct the IDs and LinkRegistry parses them.
size_t separator = keyOrMgmtId.find('-');
if (separator == std::string::npos) separator = 0;
std::string key = keyOrMgmtId.substr(separator+1, std::string::npos);
Mutex::ScopedLock locker(lock);
LinkMap::iterator l = links.find(key);
if (l != links.end()) return l->second;
else return Link::shared_ptr();
}
void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
Link::shared_ptr link = findLink(key);
if (link) {
link->established();
link->setConnection(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}
void LinkRegistry::notifyClosed(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (link) {
link->closed(0, "Closed by peer");
}
}
void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text)
{
Link::shared_ptr link = findLink(key);
if (link) {
link->notifyConnectionForced(text);
}
}
std::string LinkRegistry::getAuthMechanism(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (link)
return link->getAuthMechanism();
return string("ANONYMOUS");
}
std::string LinkRegistry::getAuthCredentials(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return string();
string result;
result += '\0';
result += link->getUsername();
result += '\0';
result += link->getPassword();
return result;
}
std::string LinkRegistry::getUsername(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return string();
return link->getUsername();
}
std::string LinkRegistry::getHost(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return string();
return link->getHost();
}
uint16_t LinkRegistry::getPort(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return 0;
return link->getPort();
}
std::string LinkRegistry::getPassword(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return string();
return link->getPassword();
}
std::string LinkRegistry::getAuthIdentity(const std::string& key)
{
Link::shared_ptr link = findLink(key);
if (!link)
return string();
return link->getUsername();
}
std::string LinkRegistry::createKey(const qpid::Address& a) {
// TODO aconway 2010-05-11: key should also include protocol/transport to
// be unique. Requires refactor of LinkRegistry interface.
return createKey(a.host, a.port);
}
std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
// TODO aconway 2010-05-11: key should also include protocol/transport to
// be unique. Requires refactor of LinkRegistry interface.
stringstream keystream;
keystream << host << ":" << port;
return keystream.str();
}
void LinkRegistry::setPassive(bool p)
{
Mutex::ScopedLock locker(lock);
passiveChanged = p != passive;
passive = p;
//will activate or passivate links on maintenance visit
}
void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f) {
for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
}
void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)> f) {
for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
}