blob: a8f05c1fe3ee700ff789be4e25d11887d9b515c0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "BrokerReplicator.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Link.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include <algorithm>
namespace qpid {
namespace ha {
using qmf::org::apache::qpid::broker::EventBind;
using qmf::org::apache::qpid::broker::EventUnbind;
using qmf::org::apache::qpid::broker::EventExchangeDeclare;
using qmf::org::apache::qpid::broker::EventExchangeDelete;
using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
using namespace framing;
using std::string;
using types::Variant;
using namespace broker;
namespace {
const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
const string QPID_REPLICATE("qpid.replicate");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
const string QUERY_RESPONSE("_query_response");
const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
const string ALTEX("altEx");
const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
const string BIND("bind");
const string UNBIND("unbind");
const string BINDING("binding");
const string CREATED("created");
const string DISP("disp");
const string DURABLE("durable");
const string EXCHANGE("exchange");
const string EXNAME("exName");
const string EXTYPE("exType");
const string KEY("key");
const string NAME("name");
const string QNAME("qName");
const string QUEUE("queue");
const string RHOST("rhost");
const string TYPE("type");
const string USER("user");
const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
const string QMF_CONTENT("qmf.content");
const string QMF_DEFAULT_TOPIC("qmf.default.topic");
const string QMF_OPCODE("qmf.opcode");
const string _WHAT("_what");
const string _CLASS_NAME("_class_name");
const string _PACKAGE_NAME("_package_name");
const string _SCHEMA_ID("_schema_id");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
bool isQMFv2(const Message& message) {
const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
return props && props->getAppId() == QMF2;
}
template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_MESSAGES };
const string S_NONE="none";
const string S_CONFIGURATION="configuration";
const string S_MESSAGES="messages";
ReplicateLevel replicateLevel(const string& level) {
if (level == S_NONE) return RL_NONE;
if (level == S_CONFIGURATION) return RL_CONFIGURATION;
if (level == S_MESSAGES) return RL_MESSAGES;
throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
}
ReplicateLevel replicateLevel(const framing::FieldTable& f) {
if (f.isSet(QPID_REPLICATE)) return replicateLevel(f.getAsString(QPID_REPLICATE));
else return RL_NONE;
}
ReplicateLevel replicateLevel(const Variant::Map& m) {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end()) return replicateLevel(i->second.asString());
else return RL_NONE;
}
void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
request[_WHAT] = OBJECT;
Variant::Map schema;
schema[_CLASS_NAME] = className;
schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
request[_SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
method.setBof(true);
method.setEof(false);
method.setBos(true);
method.setEos(true);
AMQHeaderBody headerBody;
MessageProperties* props = headerBody.get<MessageProperties>(true);
props->setReplyTo(qpid::framing::ReplyTo("", queueName));
props->setAppId(QMF2);
props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
AMQFrame header(headerBody);
header.setBof(false);
header.setEof(false);
header.setBos(true);
header.setEos(true);
AMQContentBody data;
qpid::amqp_0_10::MapCodec::encode(request, data.getData());
AMQFrame content(data);
content.setBof(false);
content.setEof(true);
content.setBos(true);
content.setEos(true);
sessionHandler.out->handle(method);
sessionHandler.out->handle(header);
sessionHandler.out->handle(content);
}
// Like Variant::asMap but treat void value as an empty map.
Variant::Map asMapVoid(const Variant& value) {
if (!value.isVoid()) return value.asMap();
else return Variant::Map();
}
} // namespace
BrokerReplicator::~BrokerReplicator() {}
BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
{
QPID_LOG(info, "HA: Backup replicating from " <<
link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
broker.getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
QPID_CONFIGURATION_REPLICATOR, // src
QPID_CONFIGURATION_REPLICATOR, // dest
"", // key
false, // isQueue
false, // isLocal
"", // id/tag
"", // excludes
false, // dynamic
0, // sync?
boost::bind(&BrokerReplicator::initializeBridge, this, _1, _2)
);
}
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
framing::AMQP_ServerProxy peer(sessionHandler.out);
string queueName = bridge.getQueueName();
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
//subscribe to the queue
peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
//issue a query request for queues and another for exchanges using event queue as the reply-to address
sendQuery(QUEUE, queueName, sessionHandler);
sendQuery(EXCHANGE, queueName, sessionHandler);
sendQuery(BINDING, queueName, sessionHandler);
QPID_LOG(debug, "HA: Backup activated configuration bridge: " << queueName);
}
// FIXME aconway 2011-12-02: error handling in route.
void BrokerReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
Variant::List list;
try {
if (!isQMFv2(msg.getMessage()) || !headers)
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getFrames().getContent();
amqp_0_10::ListCodec::decode(content, list);
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
else if (match<EventBind>(schema)) doEventBind(values);
else if (match<EventUnbind>(schema)) doEventUnbind(values);
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
Variant::Map& values = i->asMap()[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
else QPID_LOG(error, "HA: Backup received unknown response type=" << type
<< " values=" << values);
}
} else QPID_LOG(error, "HA: Backup received unexpected message: " << *headers);
} catch (const std::exception& e) {
QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list);
}
}
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
values[AUTODEL].asBool(),
0 /*i.e. no owner regardless of exclusivity on master*/,
values[ALTEX].asString(),
args,
values[USER].asString(),
values[RHOST].asString());
if (result.second) {
// FIXME aconway 2011-11-22: should delete old queue and
// re-create from event.
// Events are always up to date, whereas responses may be
// out of date.
QPID_LOG(debug, "HA: Backup created queue: " << name);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-12-02: what's the right way to handle this?
QPID_LOG(warning, "HA: Backup queue already exists: " << name);
}
}
}
void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (queue && replicateLevel(queue->getSettings())) {
QPID_LOG(debug, "HA: Backup deleting queue: " << name);
string rname = QueueReplicator::replicatorName(name);
boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
if (qr) qr->deactivate();
// QueueReplicator's bridge is now queued for destruction but may not
// actually be destroyed, deleting the exhange
broker.getExchanges().destroy(rname);
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
}
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
if (values[DISP] == CREATED && replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
if (broker.createExchange(
name,
values[EXTYPE].asString(),
values[DURABLE].asBool(),
values[ALTEX].asString(),
args,
values[USER].asString(),
values[RHOST].asString()).second)
{
QPID_LOG(debug, "HA: Backup created exchange: " << name);
} else {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
// and re-create from event. See comment in doEventQueueDeclare.
QPID_LOG(warning, "HA: Backup exchange already exists: " << name);
}
}
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
try {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (exchange && replicateLevel(exchange->getArgs())) {
QPID_LOG(debug, "HA: Backup deleting exchange:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
values[RHOST].asString());
}
} catch (const framing::NotFoundException&) {}
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
broker.getQueues().find(values[QNAME].asString());
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicateLevel(exchange->getArgs()) &&
queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, "HA: Backup replicated binding exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->bind(queue, key, &args);
}
}
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
broker.getExchanges().find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
broker.getQueues().find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicateLevel(exchange->getArgs()) &&
queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, "HA: Backup replicated unbinding exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
exchange->unbind(queue, key, &args);
}
}
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
// FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
string name(values[NAME].asString());
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
values[DURABLE].asBool(),
values[AUTODELETE].asBool(),
0 /*i.e. no owner regardless of exclusivity on master*/,
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
if (result.second) {
QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
startQueueReplicator(result.first);
} else {
// FIXME aconway 2011-11-22: Normal to find queue already
// exists if we're failing over.
QPID_LOG(warning, "HA: Backup catch-up queue already exists: " << name);
}
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
if (broker.createExchange(
values[NAME].asString(),
values[TYPE].asString(),
values[DURABLE].asBool(),
""/*TODO: need to include alternate-exchange*/,
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second)
{
QPID_LOG(debug, "HA: Backup catch-up exchange: " << values[NAME]);
} else {
QPID_LOG(warning, "HA: Backup catch-up exchange already exists: " << values[QNAME]);
}
}
namespace {
const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
std::string getRefName(const std::string& prefix, const Variant& ref) {
Variant::Map map(ref.asMap());
Variant::Map::const_iterator i = map.find(OBJECT_NAME);
if (i == map.end())
throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
const std::string name = i->second.asString();
if (name.compare(0, prefix.size(), prefix) != 0)
throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
std::string ret = name.substr(prefix.size());
return ret;
}
const std::string EXCHANGE_REF("exchangeRef");
const std::string QUEUE_REF("queueRef");
} // namespace
void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
// FIXME aconway 2011-11-24: more flexible configuration for binding replication.
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicateLevel(exchange->getArgs()) &&
queue && replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
QPID_LOG(debug, "HA: Backup catch-up binding: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
}
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
if (replicateLevel(queue->getSettings()) == RL_MESSAGES) {
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
broker.getExchanges().registerExchange(qr);
qr->activate();
}
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
}} // namespace broker