blob: e3b2b1f29c4d63203ff9413ae0fa9bc27b0c4060 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
#include "boost/bind.hpp"
#include "qpid/log/Statement.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
namespace qpid {
namespace broker {
using framing::Buffer;
using framing::FieldTable;
using framing::UnauthorizedAccessException;
using framing::connection::CLOSE_CODE_CONNECTION_FORCED;
using management::ManagementAgent;
using management::ManagementObject;
using management::Manageable;
using management::Args;
using sys::Mutex;
using std::stringstream;
using std::string;
namespace _qmf = ::qmf::org::apache::qpid::broker;
struct LinkTimerTask : public sys::TimerTask {
LinkTimerTask(Link& l, sys::Timer& t)
: TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval*
sys::TIME_SEC),
"Link retry timer"),
link(l), timer(t) {}
void fire() {
link.maintenanceVisit();
setupNextFire();
timer.add(this);
}
Link& link;
sys::Timer& timer;
};
Link::Link(LinkRegistry* _links,
MessageStore* _store,
const string& _host,
uint16_t _port,
const string& _transport,
bool _durable,
const string& _authMechanism,
const string& _username,
const string& _password,
Broker* _broker,
Manageable* parent)
: links(_links), store(_store), host(_host), port(_port),
transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
visitCount(0),
currentInterval(1),
closing(false),
reconnectNext(0), // Index of next address for reconnecting in url.
channelCounter(1),
connection(0),
agent(0),
timerTask(new LinkTimerTask(*this, broker->getTimer()))
{
if (parent != 0 && broker != 0)
{
agent = broker->getManagementAgent();
if (agent != 0)
{
mgmtObject = new _qmf::Link(agent, this, parent, _host, _port, _transport, _durable);
agent->addObject(mgmtObject, 0, durable);
}
}
setStateLH(STATE_WAITING);
startConnectionLH();
broker->getTimer().add(timerTask);
}
Link::~Link ()
{
assert(state == STATE_CLOSED); // Can only get here after destroy()
assert(connection == 0);
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
void Link::setStateLH (int newState)
{
if (newState == state)
return;
state = newState;
if (hideManagement())
return;
switch (state)
{
case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
case STATE_FAILED : mgmtObject->set_state("Failed"); break;
case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
case STATE_PASSIVE : mgmtObject->set_state("Passive"); break;
}
}
void Link::startConnectionLH ()
{
assert(state == STATE_WAITING);
try {
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
broker->connect (host, boost::lexical_cast<std::string>(port), transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(const std::exception& e) {
QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
<< e.what());
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (e.what());
}
}
void Link::established(Connection* c)
{
stringstream addr;
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
Mutex::ScopedLock mutex(lock);
assert(state == STATE_CONNECTING);
setStateLH(STATE_OPERATIONAL);
currentInterval = 1;
visitCount = 0;
connection = c;
if (closing)
destroy();
else // Process any IO tasks bridges added before established.
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::setUrl(const Url& u) {
Mutex::ScopedLock mutex(lock);
url = u;
reconnectNext = 0;
}
void Link::opened() {
Mutex::ScopedLock mutex(lock);
assert(connection);
// Get default URL from known-hosts if not already set
if (url.empty()) {
const std::vector<Url>& known = connection->getKnownHosts();
// Flatten vector of URLs into a single URL listing all addresses.
url.clear();
for(size_t i = 0; i < known.size(); ++i)
url.insert(url.end(), known[i].begin(), known[i].end());
reconnectNext = 0;
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
}
void Link::closed(int, std::string text)
{
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
(*i)->closed();
created.push_back(*i);
}
active.clear();
if (state != STATE_FAILED)
{
setStateLH(STATE_WAITING);
if (!hideManagement())
mgmtObject->set_lastError (text);
}
if (closing)
destroy();
}
// Called in connection IO thread.
void Link::destroy ()
{
Bridges toDelete;
{
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
if (connection)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
connection = 0;
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
// corruption of the iterator caused by bridge deletion.
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
(*i)->closed();
toDelete.push_back(*i);
}
active.clear();
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
timerTask->cancel();
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->destroy();
toDelete.clear();
links->destroy (host, port);
}
void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
created.push_back (bridge);
if (connection)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::cancel(Bridge::shared_ptr bridge)
{
bool needIOProcessing = false;
{
Mutex::ScopedLock mutex(lock);
for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
if ((*i).get() == bridge.get()) {
created.erase(i);
break;
}
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
if ((*i).get() == bridge.get()) {
cancellations.push_back(bridge);
bridge->closed();
active.erase(i);
break;
}
}
needIOProcessing = !cancellations.empty();
}
if (needIOProcessing && connection)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
if (state != STATE_OPERATIONAL)
return;
// check for bridge session errors and recover
if (!active.empty()) {
Bridges::iterator removed = std::remove_if(
active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
for (Bridges::iterator i = removed; i != active.end(); ++i) {
Bridge::shared_ptr bridge = *i;
bridge->closed();
bridge->cancel(*connection);
created.push_back(bridge);
}
active.erase(removed, active.end());
}
//process any pending creates and/or cancellations (do
//cancellations first in case any of the creates represent
//recreation of cancelled subscriptions
if (!cancellations.empty()) {
for (Bridges::iterator i = cancellations.begin(); i != cancellations.end(); ++i) {
(*i)->cancel(*connection);
}
cancellations.clear();
}
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
active.push_back(*i);
(*i)->create(*connection);
}
created.clear();
}
}
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
if (state == STATE_WAITING)
{
visitCount++;
if (visitCount >= currentInterval)
{
visitCount = 0;
//switch host and port to next in url list if possible
if (!tryFailoverLH()) {
currentInterval *= 2;
if (currentInterval > MAX_INTERVAL)
currentInterval = MAX_INTERVAL;
startConnectionLH();
}
}
}
else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::reconnectLH(const Address& a)
{
host = a.host;
port = a.port;
transport = a.protocol;
startConnectionLH();
if (!hideManagement()) {
stringstream errorString;
errorString << "Failed over to " << a;
mgmtObject->set_lastError(errorString.str());
}
}
bool Link::tryFailoverLH() {
if (reconnectNext >= url.size()) reconnectNext = 0;
if (url.empty()) return false;
Address next = url[reconnectNext++];
if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
reconnectLH(next);
return true;
}
return false;
}
// Management updates for a linke are inconsistent in a cluster, so they are
// suppressed.
bool Link::hideManagement() const {
return !mgmtObject || ( broker && broker->isInCluster());
}
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
return channelCounter++;
}
void Link::notifyConnectionForced(const string text)
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
if (!hideManagement())
mgmtObject->set_lastError(text);
}
void Link::setPersistenceId(uint64_t id) const
{
persistenceId = id;
}
const string& Link::getName() const
{
return host;
}
Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
uint16_t port;
string transport;
string authMechanism;
string username;
string password;
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
bool durable(buffer.getOctet());
buffer.getShortString(authMechanism);
buffer.getShortString(username);
buffer.getShortString(password);
return links.declare(host, port, transport, durable, authMechanism, username, password).first;
}
void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
buffer.putShortString(host);
buffer.putShort(port);
buffer.putShortString(transport);
buffer.putOctet(durable ? 1 : 0);
buffer.putShortString(authMechanism);
buffer.putShortString(username);
buffer.putShortString(password);
}
uint32_t Link::encodedSize() const
{
return host.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port
+ transport.size() + 1 // short-string(transport)
+ 1 // durable
+ authMechanism.size() + 1
+ username.size() + 1
+ password.size() + 1;
}
ManagementObject* Link::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
}
void Link::close() {
Mutex::ScopedLock mutex(lock);
if (!closing) {
closing = true;
if (state != STATE_CONNECTING && connection) {
//connection can only be closed on the connections own IO processing thread
connection->requestIOProcessing(boost::bind(&Link::destroy, this));
}
}
}
Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& text)
{
switch (op)
{
case _qmf::Link::METHOD_CLOSE :
close();
return Manageable::STATUS_OK;
case _qmf::Link::METHOD_BRIDGE :
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
QPID_LOG(debug, "Link::bridge() request received");
// Durable bridges are only valid on durable links
if (iargs.i_durable && !durable) {
text = "Can't create a durable route on a non-durable link";
return Manageable::STATUS_USER;
}
if (iargs.i_dynamic) {
Exchange::shared_ptr exchange = getBroker()->getExchanges().get(iargs.i_src);
if (exchange.get() == 0) {
text = "Exchange not found";
return Manageable::STATUS_USER;
}
if (!exchange->supportsDynamicBinding()) {
text = "Exchange type does not support dynamic routing";
return Manageable::STATUS_USER;
}
}
std::pair<Bridge::shared_ptr, bool> result =
links->declare (host, port, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
iargs.i_dynamic, iargs.i_sync);
if (result.second && iargs.i_durable)
store->create(*result.first);
return Manageable::STATUS_OK;
}
return Manageable::STATUS_UNKNOWN_METHOD;
}
void Link::setPassive(bool passive)
{
Mutex::ScopedLock mutex(lock);
if (passive) {
setStateLH(STATE_PASSIVE);
} else {
if (state == STATE_PASSIVE) {
setStateLH(STATE_WAITING);
} else {
QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
}
}
}
}} // namespace qpid::broker