blob: 21d08017783a0bf85295450da1be8e8811114232 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Event.h"
#include "HaBroker.h"
#include "IdSetter.h"
#include "LogPrefix.h"
#include "QueueReplicator.h"
#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "types.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/Msg.h"
#include "qpid/assert.h"
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/bind.hpp>
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
using boost::shared_ptr;
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QUEUE_REPLICATOR_PREFIX + queueName;
}
bool QueueReplicator::isReplicatorName(const std::string& name) {
return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
namespace {
void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) {
shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
if (qr) v.push_back(qr);
}
}
void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}
// Debug log expected exceptions on queue replicator, check incoming execution
// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
: queueReplicator(qr), logPrefix(qr->logPrefix.prePrefix, qr->logPrefix.get()) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << "Outgoing " << framing::createConnectionException(code, msg).what());
}
void channelException(framing::session::DetachCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << "Outgoing " << framing::createChannelException(code, msg).what());
}
void executionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(error, logPrefix << "Outgoing " << framing::createSessionException(code, msg).what());
}
void incomingExecutionException(ErrorCode code, const std::string& msg) {
boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
if (!(qr && qr->deletedOnPrimary(code, msg)))
QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
void detach() {}
private:
boost::weak_ptr<QueueReplicator> queueReplicator;
LogPrefix2 logPrefix;
};
class QueueReplicator::QueueObserver : public broker::QueueObserver {
public:
typedef boost::shared_ptr<QueueReplicator> Ptr;
QueueObserver(Ptr qr) : queueReplicator(qr) {}
void enqueued(const Message& m) {
Ptr qr = queueReplicator.lock();
if (qr) qr->enqueued(m);
}
void dequeued(const Message& m) {
Ptr qr = queueReplicator.lock();
if (qr) qr->dequeued(m);
}
void acquired(const Message&) {}
void requeued(const Message&) {}
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
// Queue observer is destroyed when the queue is.
void destroy() {
Ptr qr = queueReplicator.lock();
if (qr) qr->destroy();
}
private:
boost::weak_ptr<QueueReplicator> queueReplicator;
};
boost::shared_ptr<QueueReplicator> QueueReplicator::create(
HaBroker& hb, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l)
{
boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(hb, q, l));
qr->initialize();
return qr;
}
QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
brokerInfo(hb.getBrokerInfo()),
link(l),
queue(q),
sessionHandler(0),
logPrefix(hb.logPrefix, "Backup of "+q->getName()+": "),
subscribed(false),
settings(hb.getSettings()),
nextId(0), maxId(0)
{
QPID_LOG(debug, logPrefix << "Created");
// The QueueReplicator will take over setting replication IDs.
boost::shared_ptr<IdSetter> setter =
q->getMessageInterceptors().findType<IdSetter>();
if (setter) q->getMessageInterceptors().remove(setter);
args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
// Don't allow backup queues to auto-delete, primary decides when to delete.
if (q->isAutoDelete()) q->markInUse(false);
dispatch[DequeueEvent::KEY] =
boost::bind(&QueueReplicator::dequeueEvent, this, _1, _2);
dispatch[IdEvent::KEY] =
boost::bind(&QueueReplicator::idEvent, this, _1, _2);
}
QueueReplicator::~QueueReplicator() {}
void QueueReplicator::initialize() {
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
// Enable callback to route()
if (!getBroker()->getExchanges().registerExchange(shared_from_this()))
throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));
// Enable callback to initializeBridge
boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare(
bridgeName,
*link,
false, // durable
queue->getName(), // src
getName(), // dest
"", // key
false, // isQueue
false, // isLocal
"", // id/tag
"", // excludes
false, // dynamic
0, // sync?
LinkRegistry::INFINITE_CREDIT,
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
).first;
b->setErrorListener(
boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
bridge = b; // bridge is a weak_ptr to avoid a cycle.
// Enable callback to destroy()
queue->getObservers().add(
boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this())));
}
void QueueReplicator::disconnect() {
Mutex::ScopedLock l(lock);
sessionHandler = 0;
}
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
QPID_LOG(debug, logPrefix << "Destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
bridge2 = bridge.lock(); // !call close outside the lock.
destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
void QueueReplicator::destroy(Mutex::ScopedLock&) {
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
bridge.reset();
getBroker()->getExchanges().destroy(getName());
}
boost::shared_ptr<QueueSnapshot> QueueReplicator::getSnapshot()
{
boost::shared_ptr<broker::Queue> q;
{
Mutex::ScopedLock l(lock);
if (queue) {
q = queue;
} else {
return boost::shared_ptr<QueueSnapshot>();
}
}
return q->getObservers().findType<QueueSnapshot>();
}
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
boost::shared_ptr<QueueSnapshot> qs = getSnapshot();//do outside the lock
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
sessionHandler = &sessionHandler_;
if (sessionHandler->getSession()) {
// Don't overwrite the exchange property set on the primary.
sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false);
}
AMQP_ServerProxy peer(sessionHandler->out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable arguments;
arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
ReplicationIdSet snapshot;
if (qs) {
snapshot = qs->getSnapshot();
arguments.set(
ReplicatingSubscription::QPID_ID_SET,
FieldTable::ValuePtr(new Var32Value(encodeStr(snapshot), TYPE_CODE_VBIN32)));
}
try {
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, arguments);
peer.getMessage().setFlowMode(getName(), 1); // Window
peer.getMessage().flow(getName(), 0, settings.getFlowMessages());
peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
}
catch(const exception& e) {
QPID_LOG(error, logPrefix << "Cannot connect to primary: " << e.what());
throw;
}
qpid::Address primary;
link->getRemoteAddress(primary);
QPID_LOG(debug, logPrefix << "Connected to " << primary << " snapshot=" << snapshot << " bridge=" << bridgeName);
QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
namespace {
template <class T> T decodeContent(Message& m) {
std::string content = m.getContent();
Buffer buffer(const_cast<char*>(content.c_str()), content.size());
T result;
result.decode(buffer);
return result;
}
}
void QueueReplicator::dequeueEvent(const string& data, Mutex::ScopedLock&) {
DequeueEvent e;
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Dequeue " << e.ids);
//TODO: should be able to optimise the following
for (ReplicationIdSet::iterator i = e.ids.begin(); i != e.ids.end(); ++i) {
QueuePosition position;
{
Mutex::ScopedLock l(lock);
PositionMap::iterator j = positions.find(*i);
if (j == positions.end()) continue;
position = j->second;
}
Mutex::ScopedUnlock u(lock);//this method is called under lock, so need to release
queue->dequeueMessageAt(position); // Outside lock, will call dequeued().
// positions will be cleaned up in dequeued()
}
}
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& deliverable)
{
try {
broker::Message& message(deliverable.getMessage());
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
string key(message.getRoutingKey());
if (isEventKey(key)) {
DispatchMap::iterator i = dispatch.find(key);
if (i == dispatch.end()) {
QPID_LOG(info, logPrefix << "Ignoring unknown event: " << key);
} else {
(i->second)(message.getContent(), l);
}
return;
}
ReplicationId id = nextId++;
message.setReplicationId(id);
PositionMap::iterator i = positions.find(id);
if (i != positions.end()) {
QPID_LOG(trace, logPrefix << "Already on queue: " << logMessageId(*queue, message));
return;
}
QPID_LOG(trace, logPrefix << "Received: " << logMessageId(*queue, message));
}
deliver(message); // Outside lock, will call enqueued()
}
catch (const std::exception& e) {
haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
void QueueReplicator::deliver(const broker::Message& m) {
queue->deliver(m);
}
// Called via QueueObserver when message is enqueued. Could be as part of deliver()
// or in a different thread if a message is enqueued via a transaction.
//
void QueueReplicator::enqueued(const broker::Message& m) {
Mutex::ScopedLock l(lock);
maxId = std::max(maxId, ReplicationId(m.getReplicationId()));
positions[m.getReplicationId()] = m.getSequence();
QPID_LOG(trace, logPrefix << "Enqueued " << logMessageId(*queue, m));
}
// Called via QueueObserver
void QueueReplicator::dequeued(const broker::Message& m) {
Mutex::ScopedLock l(lock);
positions.erase(m.getReplicationId());
}
void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
// get a not-found or resource-deleted exception before the
// BrokerReplicator gets the queue-delete event. Shut down the bridge by
// calling destroy(), we can let the BrokerReplicator delete the queue
// when the queue-delete arrives.
QPID_LOG(debug, logPrefix << "Deleted on primary: "
<< framing::createSessionException(e, msg).what());
destroy();
return true;
}
return false;
}
// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
void QueueReplicator::promoted() {
if (queue) {
// On primary QueueReplicator no longer sets IDs, start an IdSetter.
QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
queue->getMessageInterceptors().add(
boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, queue->getName(), maxId+1)));
// Process auto-deletes
if (queue->isAutoDelete()) {
// Make a temporary shared_ptr to prevent premature deletion of queue.
// Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
// which could delete the queue while it's still running it's destroyed logic.
boost::shared_ptr<Queue> q(queue);
// See markInUse in ctor: release but don't delete if never used.
q->releaseFromUse(false/*controller*/, subscribed/*doDelete*/);
}
}
}
}} // namespace qpid::broker