blob: 77674692abe57bb8ee046aec6a9901baa1784216 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/Broker.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/BrokerOptions.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
#include "qpid/broker/NameGenerator.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/PersistableObject.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueRedirect.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventQueueRedirect.h"
#include "qmf/org/apache/qpid/broker/EventQueueRedirectCancelled.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/TransportFactory.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Timer.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/Address.h"
#include "qpid/StringUtils.h"
#include "qpid/Url.h"
#include "qpid/Version.h"
#include "config.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <iostream>
#include <memory>
#include <set>
using qpid::sys::TransportAcceptor;
using qpid::sys::TransportConnector;
using qpid::sys::Poller;
using qpid::sys::Dispatcher;
using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::management::getCurrentPublisher;
using qpid::types::Variant;
using std::string;
using std::make_pair;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
const std::string empty;
const std::string amq_direct("amq.direct");
const std::string amq_topic("amq.topic");
const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
const std::string qpid_management("qpid.management");
const std::string knownHostsNone("none");
BrokerOptions::BrokerOptions(const std::string& name) :
qpid::Options(name),
noDataDir(0),
port(DEFAULT_PORT),
workerThreads(5),
connectionBacklog(10),
enableMgmt(1),
mgmtPublish(1),
mgmtPubInterval(10*sys::TIME_SEC),
queueCleanInterval(60*sys::TIME_SEC*10),//10 minutes
auth(SaslAuthenticator::available()),
realm("QPID"),
replayFlushLimit(0),
replayHardLimit(0),
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(true),
requireEncrypted(false),
knownHosts(knownHostsNone),
qmf2Support(true),
qmf1Support(false),
queueFlowStopRatio(80),
queueFlowResumeRatio(70),
queueThresholdEventRatio(80),
defaultMsgGroup("qpid.no-group"),
timestampRcvMsgs(false), // set the 0.10 timestamp delivery property
linkMaintenanceInterval(2*sys::TIME_SEC),
linkHeartbeatInterval(120*sys::TIME_SEC),
dtxDefaultTimeout(60), // 60s
dtxMaxTimeout(3600), // 3600s
maxNegotiateTime(10000) // 10s
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
std::string home = getHome();
if (home.length() == 0)
dataDir += DEFAULT_DATA_DIR_LOCATION;
else
dataDir += home;
dataDir += DEFAULT_DATA_DIR_NAME;
addOptions()
("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker")
("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored")
("paging-dir", optValue(pagingDir,"DIR"), "Directory in which paging files will be created for paged queues")
("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT")
("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections")
("listen-disable", optValue(listenDisabled, "<transport name>"), "Transports to disable listening")
("protocols", optValue(protocols, "<protocol name+version>"), "Which protocol versions to allow")
("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
("sasl-service-name", optValue(saslServiceName, "NAME"), "The service name to specify for SASL")
("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
("sasl-config", optValue(saslConfigPath, "DIR"), "Allows SASL config path, if supported by platform, to be overridden. For default location on Linux, see Cyrus SASL documentation. There is no SASL config dir on Windows.")
("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.")
("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.")
("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised")
("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.")
("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.")
("link-maintenance-interval", optValue(linkMaintenanceInterval, "SECONDS"),
"Interval to check federation link health and re-connect if need be")
("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"),
"Heartbeat interval for a federation link")
("dtx-default-timeout", optValue(dtxDefaultTimeout, "SECONDS"), "Default timeout for DTX transaction before aborting it")
("dtx-max-timeout", optValue(dtxMaxTimeout, "SECONDS"), "Maximum allowed timeout for DTX transaction. A value of zero disables maximum timeout limit checks and allows arbitrarily large timeout settings.")
("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation")
("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag")
;
}
namespace {
// Arguments to declare a non-replicated exchange.
framing::FieldTable noReplicateArgs() {
framing::FieldTable args;
args.setString("qpid.replicate", "none");
return args;
}
}
Broker::LogPrefix::LogPrefix() :
std::string(Msg() << "Broker (pid=" << sys::SystemInfo::getProcessId() << ") ") {
QPID_LOG(notice, *this << "start-up");
}
Broker::LogPrefix::~LogPrefix() { QPID_LOG(notice, *this << "shut-down"); }
Broker::Broker(const BrokerOptions& conf) :
poller(new Poller),
timer(new qpid::sys::Timer),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
disabledListeningTransports(conf.listenDisabled.begin(), conf.listenDisabled.end()),
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
pagingDir(!conf.pagingDir.empty() ? conf.pagingDir :
dataDir.isEnabled() ? dataDir.getPath() + BrokerOptions::DEFAULT_PAGED_QUEUE_DIR :
std::string() ),
queues(this),
exchanges(this),
links(this),
dtxManager(*timer.get(), conf.dtxDefaultTimeout),
sessionManager(
qpid::SessionState::Configuration(
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
queueCleaner(queues, poller, timer.get()),
recoveryInProgress(false),
protocolRegistry(std::set<std::string>(conf.protocols.begin(), conf.protocols.end()), this),
timestampRcvMsgs(conf.timestampRcvMsgs),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (!dataDir.isEnabled()) {
QPID_LOG (info, "No data directory - Disabling persistent configuration");
}
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
systemObject = System::shared_ptr(system);
try {
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish,
conf.mgmtPubInterval/sys::TIME_SEC, this, conf.workerThreads + 3);
managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
mgmtObject->set_connBacklog(conf.connectionBacklog);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval/sys::TIME_SEC);
mgmtObject->set_mgmtPublish(conf.mgmtPublish);
mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
mgmtObject->set_dataDir(dataDir.getPath());
else
mgmtObject->clr_dataDir();
managementAgent->addObject(mgmtObject, 0, true);
// Since there is currently no support for virtual hosts, a placeholder object
// representing the implied single virtual host is added here to keep the
// management schema correct.
Vhost* vhost = new Vhost(this, this);
vhostObject = Vhost::shared_ptr(vhost);
if (conf.fedTag.empty()) {
framing::Uuid uuid(managementAgent->getUuid());
federationTag = uuid.str();
} else
federationTag = conf.fedTag;
vhostObject->setFederationTag(federationTag);
queues.setParent(vhost);
exchanges.setParent(vhost);
links.setParent(vhost);
} else {
// Management is disabled so there is no broker management ID.
// Create a unique uuid to use as the federation tag.
if (conf.fedTag.empty()) {
framing::Uuid uuid(true);
federationTag = uuid.str();
} else
federationTag = conf.fedTag;
}
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio);
MessageGroupManager::setDefaults(conf.defaultMsgGroup);
// If no plugin store module registered itself, set up the null store.
if (NullMessageStore::isNullStore(store.get()))
setStore();
framing::FieldTable args;
// Default exchnge is not replicated.
exchanges.declare(empty, DirectExchange::typeName, false, false, noReplicateArgs());
RecoveredObjects objects;
if (store.get() != 0) {
RecoveryManagerImpl recoverer(
queues, exchanges, links, dtxManager, protocolRegistry, objects);
recoveryInProgress = true;
store->recover(recoverer);
recoveryInProgress = false;
}
//ensure standard exchanges exist (done after recovery from store)
declareStandardExchange(amq_direct, DirectExchange::typeName);
declareStandardExchange(amq_topic, TopicExchange::typeName);
declareStandardExchange(amq_fanout, FanOutExchange::typeName);
declareStandardExchange(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, false, noReplicateArgs());
Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
managementAgent->setExchange(mExchange, dExchange);
boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1);
std::string qmfTopic("qmf.default.topic");
std::string qmfDirect("qmf.default.direct");
std::pair<Exchange::shared_ptr, bool> topicPair(
exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, false, noReplicateArgs()));
std::pair<Exchange::shared_ptr, bool> directPair(
exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, false, noReplicateArgs()));
boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
managementAgent->setExchangeV2(topicPair.first, directPair.first);
}
else
QPID_LOG(info, "Management not enabled");
// this feature affects performance, so let's be sure that gets logged!
if (conf.timestampRcvMsgs) {
QPID_LOG(notice, "Receive message timestamping is ENABLED.");
}
/**
* SASL setup, can fail and terminate startup
*/
if (conf.auth) {
SaslAuthenticator::init(qpid::saslName, conf.saslConfigPath);
QPID_LOG(info, "SASL enabled");
} else {
QPID_LOG(notice, "SASL disabled: No Authentication Performed");
}
// Initialize plugins
Plugin::initializeAll(*this);
//recover any objects via object factories
objects.restore(*this);
// Assign to queues their users who created them (can be done after ACL is loaded in Plugin::initializeAll above
if ((getAcl()) && (store.get())) {
queues.eachQueue(boost::bind(&qpid::broker::Queue::updateAclUserQueueCount, _1));
}
if(conf.enableMgmt) {
if (getAcl()) {
mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());
}
}
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval);
}
if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) {
knownBrokers.push_back(Url(conf.knownHosts));
}
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "start-up failed: " << e.what());
finalize();
throw;
}
QPID_LOG(info, logPrefix << "initialized");
}
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
bool storeEnabled = store.get() != NULL;
framing::FieldTable args;
// Standard exchanges are not replicated.
std::pair<Exchange::shared_ptr, bool> status =
exchanges.declare(name, type, storeEnabled, false, noReplicateArgs());
if (status.second && storeEnabled) {
store->create(*status.first, framing::FieldTable ());
}
}
bool Broker::isAuthenticating() const
{
return config.auth;
}
bool Broker::requireEncrypted() const
{
return config.requireEncrypted;
}
std::string Broker::getRealm() const
{
return config.realm;
}
std::string Broker::getSaslServiceName() const
{
return config.saslServiceName;
}
bool Broker::getTcpNoDelay() const
{
return config.tcpNoDelay;
}
uint32_t Broker::getMaxNegotiateTime() const
{
return config.maxNegotiateTime;
}
uint16_t Broker::getPortOption() const
{
return config.port;
}
const std::vector<std::string>& Broker::getListenInterfaces() const
{
return config.listenInterfaces;
}
int Broker::getConnectionBacklog() const
{
return config.connectionBacklog;
}
sys::Duration Broker::getLinkMaintenanceInterval() const
{
return config.linkMaintenanceInterval;
}
sys::Duration Broker::getLinkHeartbeatInterval() const
{
return config.linkHeartbeatInterval;
}
uint32_t Broker::getDtxMaxTimeout() const
{
return config.dtxMaxTimeout;
}
uint16_t Broker::getQueueThresholdEventRatio() const
{
return config.queueThresholdEventRatio;
}
uint Broker::getQueueLimit() const
{
return config.queueLimit;
}
boost::intrusive_ptr<Broker> Broker::create(int16_t port)
{
BrokerOptions config;
config.port=port;
return create(config);
}
boost::intrusive_ptr<Broker> Broker::create(const BrokerOptions& opts)
{
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
void Broker::setStore (const boost::shared_ptr<MessageStore>& _store)
{
// Exit now if multiple store plugins are attempting to load
if (!NullMessageStore::isNullStore(store.get())) {
QPID_LOG(error, "Multiple store plugins are not supported");
throw Exception(QPID_MSG("Failed to start broker: Multiple store plugins were loaded"));
}
store.reset(new MessageStoreModule (_store));
setStore();
}
void Broker::setStore () {
queues.setStore (store.get());
dtxManager.setStore (store.get());
links.setStore (store.get());
}
void Broker::run() {
if (config.workerThreads > 0) {
QPID_LOG(info, logPrefix << "running");
Dispatcher d(poller);
int numIOThreads = config.workerThreads;
std::vector<Thread> t(numIOThreads-1);
// Run n-1 io threads
for (int i=0; i<numIOThreads-1; ++i)
t[i] = Thread(d);
// Run final thread
d.run();
// Now wait for n-1 io threads to exit
for (int i=0; i<numIOThreads-1; ++i) {
t[i].join();
}
QPID_LOG(info, logPrefix << "stopped");
} else {
throw Exception((boost::format("Invalid value for worker-threads: %1%") % config.workerThreads).str());
}
}
void Broker::shutdown() {
// NB: this function must be async-signal safe, it must not
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
poller->shutdown();
}
Broker::~Broker() {
QPID_LOG(info, logPrefix << "shutting down");
if (mgmtObject != 0)
mgmtObject->debugStats("destroying");
shutdown();
finalize(); // Finalize any plugins.
if (config.auth)
SaslAuthenticator::fini();
timer->stop();
managementAgent.reset();
}
ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
return mgmtObject;
}
Manageable* Broker::GetVhostObject(void) const
{
return vhostObject.get();
}
Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
Args& args,
string& text)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
switch (methodId)
{
case _qmf::Broker::METHOD_ECHO :
QPID_LOG (debug, "Broker::echo("
<< dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
<< ", "
<< dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
<< ")");
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CONNECT : {
/** Management is creating a Link to a remote broker using the host and port of
* the remote. This (old) interface does not allow management to specify a name
* for the link, nor does it allow multiple Links to the same remote. Use the
* "create()" broker method if these features are needed.
* TBD: deprecate this interface.
*/
QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID."
" Please use the Broker::create() method with type='link' instead.");
_qmf::ArgsBrokerConnect& hp=
dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
"; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
if (!getTransportInfo(transport).connectorFactory) {
QPID_LOG(error, "Transport '" << transport << "' not supported");
text = "transport type not supported";
return Manageable::STATUS_NOT_IMPLEMENTED;
}
// Does a link to the remote already exist? If so, re-use the existing link
// - this behavior is backward compatible with previous releases.
if (!links.getLink(hp.i_host, hp.i_port, transport)) {
// new link, need to generate a unique name for it
std::pair<Link::shared_ptr, bool> response =
links.declare(Link::createName(transport, hp.i_host, hp.i_port),
hp.i_host, hp.i_port, transport,
hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password);
if (!response.first) {
text = "Unable to create Link";
status = Manageable::STATUS_PARAMETER_INVALID;
break;
}
}
status = Manageable::STATUS_OK;
break;
}
case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : {
_qmf::ArgsBrokerQueueMoveMessages& moveArgs=
dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args);
QPID_LOG (debug, "Broker::queueMoveMessages()");
if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty,
moveArgs.i_filter, getCurrentPublisher()) >=0)
status = Manageable::STATUS_OK;
else
return Manageable::STATUS_PARAMETER_INVALID;
break;
}
case _qmf::Broker::METHOD_SETLOGLEVEL :
setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level);
QPID_LOG (debug, "Broker::setLogLevel()");
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_GETLOGLEVEL :
dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel();
QPID_LOG (debug, "Broker::getLogLevel()");
status = Manageable::STATUS_OK;
break;
case _qmf::Broker::METHOD_CREATE :
{
_qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args);
createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getCurrentPublisher());
status = Manageable::STATUS_OK;
break;
}
case _qmf::Broker::METHOD_DELETE :
{
_qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args);
deleteObject(a.i_type, a.i_name, a.i_options, getCurrentPublisher());
status = Manageable::STATUS_OK;
break;
}
case _qmf::Broker::METHOD_QUERY :
{
_qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args);
status = queryObject(a.i_type, a.i_name, a.o_results, getCurrentPublisher());
break;
}
case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG:
{
_qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args);
status = getTimestampConfig(a.o_receive, getCurrentPublisher());
break;
}
case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG:
{
_qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args);
status = setTimestampConfig(a.i_receive, getCurrentPublisher());
break;
}
case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP:
{
dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp();
QPID_LOG (debug, "Broker::getLogHiresTimestamp()");
status = Manageable::STATUS_OK;
break;
}
case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP:
{
setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires);
QPID_LOG (debug, "Broker::setLogHiresTimestamp()");
status = Manageable::STATUS_OK;
break;
}
case _qmf::Broker::METHOD_QUEUEREDIRECT:
{
string srcQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_sourceQueue);
string tgtQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_targetQueue);
QPID_LOG (debug, "Broker::queueRedirect source queue:" << srcQueue << " to target queue " << tgtQueue);
status = queueRedirect(srcQueue, tgtQueue, getCurrentPublisher());
break;
}
case _qmf::Broker::METHOD_SHUTDOWN :
{
QPID_LOG (info, "Broker received shutdown command");
shutdown();
}
default:
QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]");
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
return status;
}
namespace
{
const std::string TYPE_QUEUE("queue");
const std::string TYPE_EXCHANGE("exchange");
const std::string TYPE_TOPIC("topic");
const std::string TYPE_BINDING("binding");
const std::string TYPE_LINK("link");
const std::string TYPE_BRIDGE("bridge");
const std::string DURABLE("durable");
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string EXCHANGE_TYPE("exchange-type");
const std::string QUEUE_NAME("queue");
const std::string EXCHANGE_NAME("exchange");
const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10");
const std::string _TRUE("true");
const std::string _FALSE("false");
// parameters for creating a Link object, see mgmt schema
const std::string HOST("host");
const std::string PORT("port");
const std::string TRANSPORT("transport");
const std::string AUTH_MECHANISM("authMechanism");
const std::string USERNAME("username");
const std::string PASSWORD("password");
// parameters for creating a Bridge object, see mgmt schema
const std::string LINK("link");
const std::string SRC("src");
const std::string DEST("dest");
const std::string KEY("key");
const std::string TAG("tag");
const std::string EXCLUDES("excludes");
const std::string SRC_IS_QUEUE("srcIsQueue");
const std::string SRC_IS_LOCAL("srcIsLocal");
const std::string DYNAMIC("dynamic");
const std::string SYNC("sync");
const std::string CREDIT("credit");
// parameters for deleting a Queue object
const std::string IF_EMPTY("if_empty");
const std::string IF_UNUSED("if_unused");
}
struct InvalidBindingIdentifier : public qpid::Exception
{
InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {}
std::string getPrefix() const { return "invalid binding"; }
};
struct BindingIdentifier
{
std::string exchange;
std::string queue;
std::string key;
BindingIdentifier(const std::string& name)
{
std::vector<std::string> path;
split(path, name, "/");
switch (path.size()) {
case 1:
queue = path[0];
break;
case 2:
exchange = path[0];
queue = path[1];
break;
case 3:
exchange = path[0];
queue = path[1];
key = path[2];
break;
default:
throw InvalidBindingIdentifier(name);
}
}
};
struct ObjectAlreadyExists : public qpid::Exception
{
ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {}
std::string getPrefix() const { return "object already exists"; }
};
struct UnknownObjectType : public qpid::Exception
{
UnknownObjectType(const std::string& type) : qpid::Exception(type) {}
std::string getPrefix() const { return "unknown object type"; }
};
struct ReservedObjectName : public qpid::Exception
{
ReservedObjectName(const std::string& type) : qpid::Exception(type) {}
std::string getPrefix() const { return std::string("names prefixed with '")
+ QPID_NAME_PREFIX + std::string("' are reserved"); }
};
struct UnsupportedTransport : public qpid::Exception
{
UnsupportedTransport(const std::string& type) : qpid::Exception(type) {}
std::string getPrefix() const { return "transport is not supported"; }
};
struct InvalidParameter : public qpid::Exception
{
InvalidParameter(const std::string& type) : qpid::Exception(type) {}
std::string getPrefix() const { return "invalid parameter to method call"; }
};
void Broker::createObject(const std::string& type, const std::string& name,
const Variant::Map& properties, bool /*strict*/, const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
connectionId = context->getMgmtId();
}
//TODO: implement 'strict' option (check there are no unrecognised properties)
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
if (objectFactory.createObject(*this, type, name, properties, userId, connectionId)) {
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ") handled by registered factory");
} else if (type == TYPE_QUEUE) {
bool durable(false);
bool autodelete(false);
std::string alternateExchange;
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == DURABLE) durable = i->second;
else if (i->first == AUTO_DELETE) autodelete = i->second;
else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
QueueSettings settings(durable, autodelete);
Variant::Map unused;
settings.populate(extensions, unused);
qpid::amqp_0_10::translate(unused, settings.storeSettings);
//TODO: unused doesn't take store settings into account... so can't yet implement strict
QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused);
std::pair<boost::shared_ptr<Queue>, bool> result =
createQueue(name, settings, 0, alternateExchange, userId, connectionId);
if (!result.second) {
throw ObjectAlreadyExists(name);
}
} else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
bool durable(false);
bool autodelete(false);
std::string exchangeType("topic");
std::string alternateExchange;
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == DURABLE) durable = i->second;
else if (i->first == AUTO_DELETE) autodelete = i->second;
else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString();
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
qpid::amqp_0_10::translate(extensions, arguments);
try {
std::pair<boost::shared_ptr<Exchange>, bool> result =
createExchange(name, exchangeType, durable, autodelete, alternateExchange, arguments, userId, connectionId);
if (!result.second) {
throw ObjectAlreadyExists(name);
}
} catch (const UnknownExchangeTypeException&) {
throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType));
}
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
std::string exchangeType("topic");
Variant::Map extensions;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
// extract durable, auto-delete and alternate-exchange properties
if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString();
//treat everything else as extension properties
else extensions[i->first] = i->second;
}
framing::FieldTable arguments;
qpid::amqp_0_10::translate(extensions, arguments);
bind(binding.queue, binding.exchange, binding.key, arguments, 0, userId, connectionId);
} else if (type == TYPE_LINK) {
QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties );
if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
throw ReservedObjectName(name);
}
std::string host;
uint16_t port = 0;
std::string transport = TCP_TRANSPORT;
bool durable = false;
std::string authMech, username, password;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
if (i->first == HOST) host = i->second.asString();
else if (i->first == PORT) port = i->second.asUint16();
else if (i->first == TRANSPORT) transport = i->second.asString();
else if (i->first == DURABLE) durable = bool(i->second);
else if (i->first == AUTH_MECHANISM) authMech = i->second.asString();
else if (i->first == USERNAME) username = i->second.asString();
else if (i->first == PASSWORD) password = i->second.asString();
else {
// TODO: strict checking here
}
}
if (!getTransportInfo(transport).connectorFactory) {
QPID_LOG(error, "Transport '" << transport << "' not supported.");
throw UnsupportedTransport(transport);
}
std::pair<boost::shared_ptr<Link>, bool> rc;
rc = links.declare(name, host, port, transport, durable, authMech, username, password);
if (!rc.first) {
QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port <<
"; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\"");
throw InvalidParameter(name);
}
if (!rc.second) {
QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists.");
throw ObjectAlreadyExists(name);
}
} else if (type == TYPE_BRIDGE) {
QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties );
if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) {
QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'");
throw ReservedObjectName(name);
}
std::string linkName;
std::string src;
std::string dest;
std::string key;
std::string id;
std::string excludes;
std::string queueName;
bool durable = false;
bool srcIsQueue = false;
bool srcIsLocal = false;
bool dynamic = false;
uint16_t sync = 0;
uint32_t credit = LinkRegistry::INFINITE_CREDIT;
for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
if (i->first == LINK) linkName = i->second.asString();
else if (i->first == SRC) src = i->second.asString();
else if (i->first == DEST) dest = i->second.asString();
else if (i->first == KEY) key = i->second.asString();
else if (i->first == TAG) id = i->second.asString();
else if (i->first == EXCLUDES) excludes = i->second.asString();
else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second);
else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second);
else if (i->first == DYNAMIC) dynamic = bool(i->second);
else if (i->first == SYNC) sync = i->second.asUint16();
else if (i->first == CREDIT) credit = i->second.asUint32();
else if (i->first == DURABLE) durable = bool(i->second);
else if (i->first == QUEUE_NAME) queueName = i->second.asString();
else {
// TODO: strict checking here
}
}
boost::shared_ptr<Link> link;
if (linkName.empty() || !(link = links.getLink(linkName))) {
QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed.");
throw InvalidParameter(name);
}
std::pair<Bridge::shared_ptr, bool> rc =
links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes,
dynamic, sync, credit,
0,
queueName);
if (!rc.first) {
QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName <<
"; src=" << src << "; dest=" << dest << "; key=" << key);
throw InvalidParameter(name);
}
if (!rc.second) {
QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists.");
throw ObjectAlreadyExists(name);
}
} else {
throw UnknownObjectType(type);
}
}
void Broker::deleteObject(const std::string& type, const std::string& name,
const Variant::Map& options, const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
connectionId = context->getMgmtId();
}
QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) {
QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ") handled by registered factory");
} else if (type == TYPE_QUEUE) {
// extract ifEmpty and ifUnused from options
bool ifUnused = false, ifEmpty = false;
for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
if (i->first == IF_UNUSED) ifUnused = i->second.asBool();
else if (i->first == IF_EMPTY) ifEmpty = i->second.asBool();
}
deleteQueue(name, userId, connectionId,
boost::bind(&Broker::checkDeleteQueue, this, _1, ifUnused, ifEmpty));
} else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) {
deleteExchange(name, userId, connectionId);
} else if (type == TYPE_BINDING) {
BindingIdentifier binding(name);
unbind(binding.queue, binding.exchange, binding.key, 0, userId, connectionId);
} else if (type == TYPE_LINK) {
boost::shared_ptr<Link> link = links.getLink(name);
if (link) {
link->close();
}
} else if (type == TYPE_BRIDGE) {
boost::shared_ptr<Bridge> bridge = links.getBridge(name);
if (bridge) {
bridge->close();
}
} else {
throw UnknownObjectType(type);
}
}
void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty)
{
if(ifEmpty && queue->getMessageCount() > 0) {
throw qpid::framing::PreconditionFailedException(QPID_MSG("Cannot delete queue "
<< queue->getName() << "; queue not empty"));
} else if(ifUnused && queue->getConsumerCount() > 0) {
throw qpid::framing::PreconditionFailedException(QPID_MSG("Cannot delete queue "
<< queue->getName() << "; queue in use"));
}
}
Manageable::status_t Broker::queryObject(const std::string& type,
const std::string& name,
Variant::Map& results,
const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
connectionId = context->getMgmtId();
}
QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
if (type == TYPE_QUEUE)
return queryQueue( name, userId, connectionId, results );
if (type == TYPE_EXCHANGE ||
type == TYPE_TOPIC ||
type == TYPE_BINDING)
return Manageable::STATUS_NOT_IMPLEMENTED;
throw UnknownObjectType(type);
}
Manageable::status_t Broker::queryQueue( const std::string& name,
const std::string& userId,
const std::string& /*connectionId*/,
Variant::Map& results )
{
(void) results;
if (acl) {
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId));
}
boost::shared_ptr<Queue> q(queues.find(name));
if (!q) {
QPID_LOG(error, "Query failed: queue not found, name=" << name);
return Manageable::STATUS_UNKNOWN_OBJECT;
}
q->query( results );
return Manageable::STATUS_OK;;
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId));
}
receive = timestampRcvMsgs;
return Manageable::STATUS_OK;
}
Manageable::status_t Broker::setTimestampConfig(const bool receive,
const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId));
}
timestampRcvMsgs = receive;
QPID_LOG(notice, "Receive message timestamping is " << ((timestampRcvMsgs) ? "ENABLED." : "DISABLED."));
return Manageable::STATUS_OK;
}
void Broker::setLogLevel(const std::string& level)
{
QPID_LOG(notice, "Changing log level to " << level);
std::vector<std::string> selectors;
split(selectors, level, ", ");
qpid::log::Logger::instance().reconfigure(selectors);
}
std::string Broker::getLogLevel()
{
std::string level;
std::string sep("");
const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
level += sep + *i;
sep = ",";
}
const std::vector<std::string>& disselectors = qpid::log::Logger::instance().getOptions().deselectors;
for (std::vector<std::string>::const_iterator i = disselectors.begin(); i != disselectors.end(); ++i) {
level += sep + "!" + *i;
sep = ",";
}
return level;
}
void Broker::setLogHiresTimestamp(bool enabled)
{
QPID_LOG(notice, "Changing log hires timestamp to " << enabled);
qpid::log::Logger::instance().setHiresTimestamp(enabled);
}
bool Broker::getLogHiresTimestamp()
{
return qpid::log::Logger::instance().getHiresTimestamp();
}
Manageable::status_t Broker::queueRedirect(const std::string& srcQueue,
const std::string& tgtQueue,
const Connection* context)
{
Queue::shared_ptr srcQ(queues.find(srcQueue));
if (!srcQ) {
QPID_LOG(error, "Queue redirect failed: source queue not found: "
<< srcQueue);
return Manageable::STATUS_UNKNOWN_OBJECT;
}
if (!tgtQueue.empty()) {
// NonBlank target queue creates partnership
Queue::shared_ptr tgtQ(queues.find(tgtQueue));
if (!tgtQ) {
QPID_LOG(error, "Queue redirect failed: target queue not found: "
<< tgtQueue);
return Manageable::STATUS_UNKNOWN_OBJECT;
}
if (srcQueue.compare(tgtQueue) == 0) {
QPID_LOG(error, "Queue redirect source queue: "
<< tgtQueue << " cannot be its own target");
return Manageable::STATUS_USER;
}
if (srcQ->isAutoDelete()) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is autodelete and can not be part of redirect");
return Manageable::STATUS_USER;
}
if (tgtQ->isAutoDelete()) {
QPID_LOG(error, "Queue redirect target queue: "
<< tgtQueue << " is autodelete and can not be part of redirect");
return Manageable::STATUS_USER;
}
if (srcQ->getRedirectPeer()) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is already redirected");
return Manageable::STATUS_USER;
}
if (tgtQ->getRedirectPeer()) {
QPID_LOG(error, "Queue redirect target queue: "
<< tgtQueue << " is already redirected");
return Manageable::STATUS_USER;
}
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), &params))
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(uknown)")));
}
// Start the backup overflow partnership
srcQ->setRedirectPeer(tgtQ, true);
tgtQ->setRedirectPeer(srcQ, false);
// Set management state
srcQ->setMgmtRedirectState(tgtQueue, true, true);
tgtQ->setMgmtRedirectState(srcQueue, true, false);
// Management event
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue));
}
QPID_LOG(info, "Queue redirect complete. queue: "
<< srcQueue << " target queue: " << tgtQueue);
return Manageable::STATUS_OK;
} else {
// Blank target queue destroys partnership
Queue::shared_ptr tgtQ(srcQ->getRedirectPeer());
if (!tgtQ) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is not in redirected");
return Manageable::STATUS_USER;
}
if (!srcQ->isRedirectSource()) {
QPID_LOG(error, "Queue redirect source queue: "
<< srcQueue << " is not a redirect source");
return Manageable::STATUS_USER;
}
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName()));
if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), &params))
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(uknown)")));
}
queueRedirectDestroy(srcQ, tgtQ, true);
return Manageable::STATUS_OK;
}
}
void Broker::queueRedirectDestroy(Queue::shared_ptr srcQ,
Queue::shared_ptr tgtQ,
bool moveMsgs) {
QPID_LOG(notice, "Queue redirect destroyed. queue: " << srcQ->getName()
<< " target queue: " << tgtQ->getName());
tgtQ->setMgmtRedirectState(empty, false, false);
srcQ->setMgmtRedirectState(empty, false, false);
if (moveMsgs) {
// TODO: this 'move' works in the static case but has no
// actual locking that does what redirect needs when
// there is a lot of traffic in flight.
tgtQ->move(srcQ, 0);
}
Queue::shared_ptr np;
tgtQ->setRedirectPeer(np, false);
srcQ->setRedirectPeer(np, false);
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventQueueRedirectCancelled(srcQ->getName(), tgtQ->getName()));
}
}
const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const {
static TransportInfo nullTransportInfo;
TransportMap::const_iterator i
= name.empty() ? transportMap.begin() : transportMap.find(name);
if (i == transportMap.end()) return nullTransportInfo;
else return i->second;
}
uint16_t Broker::getPort(const std::string& name) const {
if (int p = getTransportInfo(name).port) {
return p;
} else {
throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'"));
}
}
bool Broker::shouldListen(std::string transport) {
return disabledListeningTransports.count(transport)==0;
}
void Broker::disableListening(std::string transport) {
disabledListeningTransports.insert(transport);
}
void Broker::registerTransport(const std::string& name, boost::shared_ptr<TransportAcceptor> a, boost::shared_ptr<TransportConnector> c, uint16_t p) {
transportMap[name] = TransportInfo(a, c, p);
Url::addProtocol(name);
}
void Broker::accept() {
unsigned accepting = 0;
for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) {
if (i->second.acceptor) {
i->second.acceptor->accept(poller, &protocolRegistry);
++accepting;
}
}
if ( accepting==0 ) {
throw Exception(QPID_MSG("Failed to start broker: No transports are listening for incoming connections"));
}
}
void Broker::connect(
const std::string& name,
const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed)
{
connect(name, host, port, transport, &protocolRegistry, failed);
}
void Broker::connect(
const std::string& name,
const std::string& host, const std::string& port, const std::string& transport,
sys::ConnectionCodec::Factory* f, boost::function2<void, int, std::string> failed)
{
boost::shared_ptr<TransportConnector> tcf = getTransportInfo(transport).connectorFactory;
if (tcf) tcf->connect(poller, name, host, port, f, failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
}
int32_t Broker::queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
const Variant::Map& filter,
const Connection* context)
{
Queue::shared_ptr src_queue = queues.find(srcQueue);
if (!src_queue)
return -1;
Queue::shared_ptr dest_queue = queues.find(destQueue);
if (!dest_queue)
return -1;
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, dest_queue->getName()));
if (!acl->authorise((context)?context->getUserId():"", acl::ACT_MOVE, acl::OBJ_QUEUE, src_queue->getName(), &params))
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied move request from " << ((context)?context->getUserId():"(uknown)")));
}
return (int32_t) src_queue->move(dest_queue, qty, &filter);
}
boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
std::vector<Url>
Broker::getKnownBrokersImpl()
{
return knownBrokers;
}
bool Broker::deferDeliveryImpl(const std::string&, const Message&)
{ return false; }
const std::string Broker::TCP_TRANSPORT("tcp");
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
const QueueSettings& constSettings,
const OwnershipToken* owner,
const std::string& alternateExchange,
const std::string& userId,
const std::string& connectionId)
{
QueueSettings settings(constSettings); // So we can modify them
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy()));
params.insert(make_pair(acl::PROP_PAGING, settings.paging ? _TRUE : _FALSE));
if (settings.paging) {
params.insert(make_pair(acl::PROP_MAXPAGES, boost::lexical_cast<string>(settings.maxPages ? settings.maxPages : DEFAULT_MAX_PAGES)));
params.insert(make_pair(acl::PROP_MAXPAGEFACTOR, boost::lexical_cast<string>(settings.pageFactor ? settings.pageFactor : DEFAULT_PAGE_FACTOR)));
}
if (settings.maxDepth.hasCount())
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount() ? settings.maxDepth.getCount() : std::numeric_limits<uint64_t>::max())));
if (settings.maxDepth.hasSize())
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize() ? settings.maxDepth.getSize() : std::numeric_limits<uint64_t>::max())));
else
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(config.queueLimit)));
if (settings.durable) {
params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount ? settings.maxFileCount : 8)));
params.insert(make_pair(acl::PROP_MAXFILESIZE, boost::lexical_cast<string>(settings.maxFileSize ? settings.maxFileSize : 24)));
}
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
if (!queues.find(name))
if (!acl->approveCreateQueue(userId,name) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
}
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = exchanges.get(alternateExchange);
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
// Identify queues that won't survive a failover: exclusive, auto-delete with no delay.
if (owner && settings.autodelete && !settings.autoDeleteDelay)
settings.isTemporary = true;
std::pair<Queue::shared_ptr, bool> result =
queues.declare(name, settings, alternate, false/*recovering*/,
owner, connectionId, userId);
if (result.second) {
//add default binding:
result.first->bind(exchanges.getDefault(), name);
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
<< " durable:" << (settings.durable ? "T" : "F")
<< " owner:" << owner
<< " autodelete:" << (settings.autodelete ? "T" : "F")
<< " alternateExchange:" << alternateExchange );
}
return result;
}
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
);
Queue::shared_ptr queue = queues.find(name);
if (queue) {
if (acl) {
std::map<acl::Property, std::string> params;
boost::shared_ptr<Exchange> altEx = queue->getAlternateExchange();
params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" ));
params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy()));
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
}
if (check) check(queue);
if (acl)
acl->recordDestroyQueue(name);
Queue::shared_ptr peerQ(queue->getRedirectPeer());
if (peerQ)
queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ,
queue->isRedirectSource() ? peerQ : queue,
false);
queues.destroy(name, connectionId, userId);
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
}
std::pair<Exchange::shared_ptr, bool> Broker::createExchange(
const std::string& name,
const std::string& type,
bool durable,
bool autodelete,
const std::string& alternateExchange,
const qpid::framing::FieldTable& arguments,
const std::string& userId,
const std::string& connectionId)
{
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_TYPE, type));
params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange));
params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE));
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId));
}
Exchange::shared_ptr alternate;
if (!alternateExchange.empty()) {
alternate = exchanges.get(alternateExchange);
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
std::pair<Exchange::shared_ptr, bool> result;
result = exchanges.declare(
name, type, durable, autodelete, arguments, alternate, connectionId, userId);
if (result.second) {
if (durable) {
store->create(*result.first, arguments);
}
QPID_LOG_CAT(debug, model, "Create exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
<< " type:" << type
<< " alternateExchange:" << alternateExchange
<< " durable:" << (durable ? "T" : "F")
<< " autodelete:" << (autodelete ? "T" : "F"));
}
return result;
}
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId);
if (name.empty()) {
throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange"));
}
Exchange::shared_ptr exchange(exchanges.get(name));
if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
if (acl) {
std::map<acl::Property, std::string> params;
Exchange::shared_ptr altEx = exchange->getAlternate();
params.insert(make_pair(acl::PROP_TYPE, exchange->getType()));
params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" ));
params.insert(make_pair(acl::PROP_DURABLE, exchange->isDurable() ? _TRUE : _FALSE));
if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
}
if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange."));
if (exchange->isDurable()) store->destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
exchanges.destroy(name, connectionId, userId);
}
void Broker::bind(const std::string& queueName,
const std::string& exchangeName,
const std::string& key,
const qpid::framing::FieldTable& arguments,
const OwnershipToken* owner,
const std::string& userId,
const std::string& connectionId)
{
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
}
if (exchangeName.empty()) {
throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange"));
}
Queue::shared_ptr queue = queues.find(queueName);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
if (!queue) {
throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
} else if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(owner)) {
throw framing::ResourceLockedException(QPID_MSG("Cannot bind queue "
<< queue->getName() << "; it is exclusive to another session"));
} else {
if (queue->bind(exchange, key, arguments)) {
getBrokerObservers().bind(exchange, queue, key, arguments);
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName,
queueName, key, ManagementAgent::toMap(arguments)));
}
QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
<< " queue:" << queueName
<< " key:" << key
<< " arguments:" << arguments
<< " user:" << userId
<< " rhost:" << connectionId);
}
}
}
void Broker::unbind(const std::string& queueName,
const std::string& exchangeName,
const std::string& key,
const OwnershipToken* owner,
const std::string& userId,
const std::string& connectionId)
{
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_QUEUENAME, queueName));
params.insert(make_pair(acl::PROP_ROUTINGKEY, key));
if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
}
if (exchangeName.empty()) {
throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange"));
}
Queue::shared_ptr queue = queues.find(queueName);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
if (!queue) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName));
} else if (!exchange) {
throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
} else if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(owner)) {
throw framing::ResourceLockedException(QPID_MSG("Cannot unbind queue "
<< queue->getName() << "; it is exclusive to another session"));
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
}
getBrokerObservers().unbind(
exchange, queue, key, framing::FieldTable());
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
}
QPID_LOG_CAT(debug, model, "Delete binding. exchange:" << exchangeName
<< " queue:" << queueName
<< " key:" << key
<< " user:" << userId
<< " rhost:" << connectionId);
}
}
}
// FIXME aconway 2012-04-27: access to linkClientProperties is
// not properly thread safe, you could lose fields if 2 threads
// attempt to add a field concurrently.
framing::FieldTable Broker::getLinkClientProperties() const {
sys::Mutex::ScopedLock l(linkClientPropertiesLock);
return linkClientProperties;
}
void Broker::setLinkClientProperties(const framing::FieldTable& ft) {
sys::Mutex::ScopedLock l(linkClientPropertiesLock);
linkClientProperties = ft;
}
}} // namespace qpid::broker