blob: 983b976d7647acfda9fb73b90856673925dad397 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "BrokerReplicator.h"
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <sstream>
#include <iostream>
#include <assert.h>
namespace qpid {
namespace ha {
using qmf::org::apache::qpid::broker::EventBind;
using qmf::org::apache::qpid::broker::EventUnbind;
using qmf::org::apache::qpid::broker::EventExchangeDeclare;
using qmf::org::apache::qpid::broker::EventExchangeDelete;
using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
using qpid::broker::amqp_0_10::MessageTransfer;
using namespace framing;
using namespace std;
using std::ostream;
using types::Variant;
using namespace broker;
namespace {
const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
const string QUERY_RESPONSE("_query_response");
const string VALUES("_values");
const string SCHEMA_ID("_schema_id");
const string WHAT("_what");
const string ALTEX("altEx");
const string ALTEXCHANGE("altExchange");
const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
const string BIND("bind");
const string BINDING("binding");
const string BINDING_KEY("bindingKey");
const string CREATED("created");
const string DISP("disp");
const string DEST("dest");
const string DURABLE("durable");
const string EXCHANGE("exchange");
const string EXCL("excl");
const string EXCLUSIVE("exclusive");
const string EXNAME("exName");
const string EXTYPE("exType");
const string HA_BROKER("habroker");
const string KEY("key");
const string NAME("name");
const string PARTIAL("partial");
const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
const string UNBIND("unbind");
const string CONSUMER_COUNT("consumerCount");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
const string QMF2("qmf2");
const string QMF_CONTENT("qmf.content");
const string QMF_DEFAULT_TOPIC("qmf.default.topic");
const string QMF_OPCODE("qmf.opcode");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
const string MEMBERS("members");
const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
{
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[WHAT] = OBJECT;
Variant::Map schema;
schema[CLASS_NAME] = className;
schema[PACKAGE_NAME] = packageName;
request[SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
method.setBof(true);
method.setEof(false);
method.setBos(true);
method.setEos(true);
AMQHeaderBody headerBody;
MessageProperties* props = headerBody.get<MessageProperties>(true);
props->setReplyTo(qpid::framing::ReplyTo("", queueName));
props->setAppId(QMF2);
props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
headerBody.get<qpid::framing::MessageProperties>(true)->setCorrelationId(className);
AMQFrame header(headerBody);
header.setBof(false);
header.setEof(false);
header.setBos(true);
header.setEos(true);
AMQContentBody data;
qpid::amqp_0_10::MapCodec::encode(request, data.getData());
AMQFrame content(data);
content.setBof(false);
content.setEof(true);
content.setBos(true);
content.setEos(true);
sessionHandler.out->handle(method);
sessionHandler.out->handle(header);
sessionHandler.out->handle(content);
}
// Like Variant::asMap but treat void value as an empty map.
Variant::Map asMapVoid(const Variant& value) {
if (!value.isVoid()) return value.asMap();
else return Variant::Map();
}
} // namespace
// Listens for errors on the bridge session.
class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const std::string& lp, BrokerReplicator& br) :
logPrefix(lp), brokerReplicator(br) {}
void connectionException(framing::connection::CloseCode, const std::string& msg) {
QPID_LOG(error, logPrefix << "Connection error: " << msg);
}
void channelException(framing::session::DetachCode, const std::string& msg) {
QPID_LOG(error, logPrefix << "Channel error: " << msg);
}
void executionException(framing::execution::ErrorCode, const std::string& msg) {
QPID_LOG(error, logPrefix << "Execution error: " << msg);
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached.");
}
private:
std::string logPrefix;
BrokerReplicator& brokerReplicator;
};
class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
{
public:
ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
virtual void connection(Connection&) {}
virtual void opened(Connection&) {}
virtual void closed(Connection& c) {
if (brokerReplicator.link && &c == brokerReplicator.connection)
brokerReplicator.disconnected();
}
virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
private:
BrokerReplicator& brokerReplicator;
};
/** Keep track of queues or exchanges during the update process to solve 2
* problems.
*
* 1. Once all responses are processed, remove any queues/exchanges
* that were not mentioned as they no longer exist on the primary.
*
* 2. During the update if we see an event for an object we should
* ignore any subsequent responses for that object as they are out
* of date.
*/
class BrokerReplicator::UpdateTracker {
public:
typedef std::set<std::string> Names;
typedef boost::function<void (const std::string&)> CleanFn;
UpdateTracker(const std::string& type_, // "queue" or "exchange"
CleanFn f, const ReplicationTest& rt)
: type(type_), cleanFn(f), repTest(rt) {}
/** Destructor cleans up remaining initial queues. */
~UpdateTracker() {
// Don't throw in a destructor.
try { for_each(initial.begin(), initial.end(), cleanFn); }
catch (const std::exception& e) {
QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
}
}
/** Add an exchange name */
void addExchange(Exchange::shared_ptr ex) {
if (repTest.getLevel(*ex))
initial.insert(ex->getName());
}
/** Add a queue name. */
void addQueue(Queue::shared_ptr q) {
if (repTest.getLevel(*q))
initial.insert(q->getName());
}
/** Received an event for name */
void event(const std::string& name) {
initial.erase(name); // no longer a candidate for deleting
events.insert(name); // we have seen an event for this name
}
/** Received a response for name.
*@return true if this response should be processed, false if we have
*already seen an event for this object.
*/
bool response(const std::string& name) {
initial.erase(name); // no longer a candidate for deleting
return events.find(name) == events.end(); // true if no event seen yet.
}
private:
void clean(const std::string& name) {
QPID_LOG(info, "Backup updated, deleting " << type << " " << name);
cleanFn(name);
}
std::string type;
Names initial, events;
CleanFn cleanFn;
ReplicationTest repTest;
};
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
logPrefix("Backup: "), replicationTest(NONE),
haBroker(hb), broker(hb.getBroker()),
exchanges(broker.getExchanges()), queues(broker.getQueues()),
link(l),
initialized(false),
alternates(hb.getBroker().getExchanges()),
connection(0)
{
connectionObserver.reset(new ConnectionObserver(*this));
broker.getConnectionObservers().add(connectionObserver);
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare;
dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete;
dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind;
dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind;
dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate;
dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe;
}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
types::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
name, // name for bridge
*link, // parent
false, // durable
QPID_CONFIGURATION_REPLICATOR, // src
QPID_CONFIGURATION_REPLICATOR, // dest
"", // key
false, // isQueue
false, // isLocal
"", // id/tag
"", // excludes
false, // dynamic
0, // sync?
// shared_ptr keeps this in memory until outstanding connected
// calls are run.
boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
assert(result.second);
result.first->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
BrokerReplicator::~BrokerReplicator() { shutdown(); }
namespace {
void collectQueueReplicators(
const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
{
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
if (qr) collect.insert(qr);
}
} // namespace
void BrokerReplicator::shutdown() {
// NOTE: this is called in a QMF dispatch thread, not the Link's connection
// thread. It's OK to be unlocked because it doesn't use any mutable state,
// it only calls thread safe functions objects belonging to the Broker.
// Unregister with broker objects:
if (connectionObserver) {
broker.getConnectionObservers().remove(connectionObserver);
connectionObserver.reset();
}
broker.getExchanges().destroy(getName());
}
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) {
// Use the credentials of the outgoing Link connection for creating queues,
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
//
connection = link->getConnection();
assert(connection);
userId = link->getConnection()->getUserId();
remoteHost = link->getConnection()->getUrl();
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting")
<< " to primary " << primary
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
exchangeTracker.reset(
new UpdateTracker("exchange",
boost::bind(&BrokerReplicator::deleteExchange, this, _1),
replicationTest));
exchanges.eachExchange(
boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
queueTracker.reset(
new UpdateTracker("queue",
boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
replicationTest));
queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
FieldTable declareArgs;
declareArgs.setString(QPID_REPLICATE, printable(NONE).str());
peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
//subscribe to the queue
FieldTable arguments;
arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
peer.getMessage().subscribe(
queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
false/*exclusive*/, "", 0, arguments);
peer.getMessage().setFlowMode(args.i_dest, 1); // Window
peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages());
peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes());
// Issue a query request for queues, exchanges, bindings and the habroker
// using event queue as the reply-to address
sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
}
void BrokerReplicator::route(Deliverable& msg) {
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
if (haBroker.getStatus() == JOINING) {
haBroker.getMembership().setStatus(CATCHUP);
QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
}
Variant::List list;
try {
if (!MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getContent();
qpid::amqp_0_10::ListCodec::decode(content, list);
if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
EventDispatchMap::iterator j = dispatch.find(key);
if (j != dispatch.end()) (this->*(j->second))(values);
}
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
QPID_LOG(trace, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
QPID_LOG(debug, logPrefix << "All exchange responses received.")
exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
QPID_LOG(debug, logPrefix << "All queue responses received.");
queueTracker.reset(); // Clean up queues that no longer exist in the primary
}
}
} catch (const std::exception& e) {
;
haBroker.shutdown(
QPID_MSG(logPrefix << "Configuration replication failed: "
<< e.what() << ": while handling: " << list));
throw;
}
}
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
if (queueTracker.get()) queueTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (queues.find(name)) {
QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: "
<< name);
deleteQueue(name);
}
replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
values[ALTEX].asString());
}
}
boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator(
const std::string& qname)
{
string rname = QueueReplicator::replicatorName(qname);
boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname);
return boost::dynamic_pointer_cast<QueueReplicator>(ex);
}
void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = queues.find(name);
if (queue && replicationTest.getLevel(*queue)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
if (queueTracker.get()) queueTracker->event(name);
deleteQueue(name);
}
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (exchanges.find(name)) {
deleteExchange(name);
QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
<< name);
}
CreateExchangeResult result = createExchange(
name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
values[ALTEX].asString());
replicatedExchanges.insert(name);
assert(result.second);
}
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
} else if (!replicationTest.getLevel(*exchange)) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
if (exchangeTracker.get()) exchangeTracker->event(name);
deleteExchange(name);
replicatedExchanges.erase(name);
}
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
queues.find(values[QNAME].asString());
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
// We only replicate binds for a replicated queue to replicated exchange
// that both exist locally. Respect the replication level set in the
// bind arguments, but replicate by default.
if (exchange && replicationTest.getLevel(*exchange) &&
queue && replicationTest.getLevel(*queue) &&
ReplicationTest(ALL).getLevel(args))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key
<< " args=" << args);
exchange->bind(queue, key, &args, 0);
}
}
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.getLevel(*exchange) &&
queue && replicationTest.getLevel(*queue))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->unbind(queue, key, &args, 0);
}
}
void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
Variant::List members = values[MEMBERS].asList();
setMembership(members);
}
void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
// Ignore queue replicator subscriptions.
if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
if (qr) {
qr->setSubscribed();
QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
}
}
namespace {
// Get the alternate exchange from the exchange field of a queue or exchange response.
static const string EXCHANGE_KEY_PREFIX("org.apache.qpid.broker:exchange:");
string getAltExchange(const types::Variant& var) {
if (!var.isVoid()) {
management::ObjectId oid(var);
string key = oid.getV2Key();
if (key.find(EXCHANGE_KEY_PREFIX) != 0) throw Exception("Invalid exchange reference: "+key);
return key.substr(EXCHANGE_KEY_PREFIX.size());
}
else return string();
}
Variant getHaUuid(const Variant::Map& map) {
Variant::Map::const_iterator i = map.find(QPID_HA_UUID);
return i == map.end() ? Variant() : i->second;
}
} // namespace
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
if (!queueTracker.get())
throw Exception(QPID_MSG("Unexpected queue response: " << values));
if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
// If we see a queue with the same name as one we have, but not the same UUID,
// then replace the one we have.
boost::shared_ptr<Queue> queue = queues.find(name);
if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
<< name);
deleteQueue(name);
}
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<QueueReplicator> qr = replicateQueue(
name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
if (qr) {
Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
if (i != values.end() && isIntegerType(i->second.getType())) {
if (i->second.asInt64()) qr->setSubscribed();
}
}
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicationTest.getLevel(argsMap)) return;
string name = values[NAME].asString();
if (!exchangeTracker.get())
throw Exception(QPID_MSG("Unexpected exchange response: " << values));
if (!exchangeTracker->response(name)) return; // Response is out of date.
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we see an exchange with the same name as one we have, but not the same UUID,
// then replace the one we have.
boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (exchange &&
exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
{
QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
<< name);
deleteExchange(name);
}
CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
replicatedExchanges.insert(name);
}
namespace {
const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
std::string getRefName(const std::string& prefix, const Variant& ref) {
Variant::Map map(ref.asMap());
Variant::Map::const_iterator i = map.find(OBJECT_NAME);
if (i == map.end())
throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
const std::string name = i->second.asString();
if (name.compare(0, prefix.size(), prefix) != 0)
throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
std::string ret = name.substr(prefix.size());
return ret;
}
const std::string EXCHANGE_REF("exchangeRef");
const std::string QUEUE_REF("queueRef");
} // namespace
void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
boost::shared_ptr<Queue> queue = queues.find(qName);
framing::FieldTable args;
qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
// Automatically replicate binding if queue and exchange exist and are replicated.
// Respect replicate setting in binding args but default to replicated.
if (exchange && replicationTest.getLevel(*exchange) &&
queue && replicationTest.getLevel(*queue) &&
ReplicationTest(ALL).getLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
<< " key:" << key
<< " args:" << args);
// framing::FieldTable args;
// qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
exchange->bind(queue, key, &args, 0);
}
}
namespace {
const string REPLICATE_DEFAULT="replicateDefault";
}
// Received the ha-broker configuration object for the primary broker.
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
haBroker.shutdown(
QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
<< ": " << values));
throw;
}
}
boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
const boost::shared_ptr<Queue>& queue)
{
if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
if (!exchanges.registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
return qr;
}
return boost::shared_ptr<QueueReplicator>();
}
void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
Queue::shared_ptr queue = queues.find(name);
if (queue) {
// Purge before deleting to ensure that we don't reroute any
// messages. Any reroutes will be done at the primary and
// replicated as normal.
if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
broker.deleteQueue(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
}
}
void BrokerReplicator::deleteExchange(const std::string& name) {
try {
boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
return;
}
if (exchange->inUseAsAlternate()) {
QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
return;
}
broker.deleteExchange(name, userId, remoteHost);
QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
} catch (const framing::NotFoundException&) {
QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
}
}
boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange)
{
QueueSettings settings(durable, autodelete);
settings.populate(arguments, settings.storeSettings);
CreateQueueResult result =
broker.createQueue(
name,
settings,
0,// no owner regardless of exclusivity on primary
string(), // Set alternate exchange below
userId,
remoteHost);
boost::shared_ptr<QueueReplicator> qr;
if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
if (result.second && !alternateExchange.empty()) {
alternates.setAlternate(
alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
return qr;
}
BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange)
{
CreateExchangeResult result =
broker.createExchange(
name,
type,
durable,
string(), // Set alternate exchange below
args,
userId,
remoteHost);
alternates.addExchange(result.first);
if (!alternateExchange.empty()) {
alternates.setAlternate(
alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
}
return result;
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
// DataSource interface - used to write persistence data to async store
uint64_t BrokerReplicator::getSize() { return 0; }
void BrokerReplicator::write(char* /*target*/) {}
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
if (!qr) return;
assert(qr);
if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
if (qr->getQueue()->getSettings().autoDeleteDelay) {
// Start the auto-delete timer
Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
}
else {
// Delete immediately. Don't purge, the primary is gone so we need
// to reroute the deleted messages.
deleteQueue(qr->getQueue()->getName(), false);
}
}
}
// Callback function for accumulating exchange candidates
namespace {
void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
c.push_back(i);
}
}
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected from " << primary);
connection = 0;
// Clean up auto-delete queues
vector<boost::shared_ptr<Exchange> > collect;
// Make a copy so we can work outside the ExchangeRegistry lock
exchanges.eachExchange(
boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
for_each(collect.begin(), collect.end(),
boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
Membership& membership(haBroker.getMembership());
membership.assign(brokers);
// Check if the primary has signalled a change in my status:
// from CATCHUP to READY when we are caught up.
// from READY TO CATCHUP if we are timed out during fail-over.
BrokerInfo info;
if (membership.get(membership.getSelf(), info)) {
BrokerStatus oldStatus = haBroker.getStatus();
BrokerStatus newStatus = info.getStatus();
if (oldStatus == CATCHUP && newStatus == READY) {
QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
haBroker.getMembership().setStatus(READY);
}
else if (oldStatus == READY && newStatus == CATCHUP) {
QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
haBroker.getMembership().setStatus(CATCHUP);
}
}
}
}} // namespace broker