blob: 0017cc82cd4b1ba7029efa0167a34444f5693189 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
#include <sstream>
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
const std::string TYPE_NAME("qpid.queue-replicator");
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
}
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
{
std::stringstream ss;
ss << "HA: Backup " << queue->getName() << ": ";
logPrefix = ss.str();
QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
}
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
// Note this may create a new bridge or use an existing one.
queue->getBroker()->getLinks().declare(
link->getHost(), link->getPort(),
false, // durable
queue->getName(), // src
getName(), // dest
"", // key
false, // isQueue
false, // isLocal
"", // id/tag
"", // excludes
false, // dynamic
0, // sync?
// Include shared_ptr to self to ensure we are not deleted
// before initializeBridge is called.
boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, shared_from_this())
);
}
QueueReplicator::~QueueReplicator() {}
void QueueReplicator::deactivate() {
sys::Mutex::ScopedLock l(lock);
queue->getBroker()->getLinks().destroy(
link->getHost(), link->getPort(), queue->getName(), getName(), string());
QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
}
// Called in a broker connection thread when the bridge is created.
// shared_ptr to self ensures we are not deleted before initializeBridge is called.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler,
boost::shared_ptr<QueueReplicator> /*self*/) {
sys::Mutex::ScopedLock l(lock);
bridgeName = bridge.getName();
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
// during fail-over. This optimization was removed to simplify
// the logic till we get the basic replication stable, it
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
// Clear out any old messages, reset the queue to start replicating fresh.
queue->purge();
queue->setPosition(0);
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
// TODO aconway 2011-12-19: optimize.
settings.setInt(QPID_SYNC_FREQUENCY, 1);
peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings);
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
}
namespace {
template <class T> T decodeContent(Message& m) {
std::string content;
m.getFrames().getContent(content);
Buffer buffer(const_cast<char*>(content.c_str()), content.size());
T result;
result.decode(buffer);
return result;
}
}
void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
QueuedMessage message;
if (queue->acquireMessageAt(n, message))
queue->dequeue(0, message);
}
}
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg, const std::string& key, const FieldTable*)
{
sys::Mutex::ScopedLock l(lock);
if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
dequeue(*i, l);
} else if (key == POSITION_EVENT_KEY) {
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
<< " to " << position);
assert(queue->getPosition() <= position);
//TODO aconway 2011-12-14: Optimize this?
for (SequenceNumber i = queue->getPosition(); i < position; ++i)
dequeue(i,l);
queue->setPosition(position);
} else {
msg.deliverTo(queue);
QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
}
}
// Unused Exchange methods.
bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const FieldTable*) { return false; }
bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const FieldTable* const) { return false; }
std::string QueueReplicator::getType() const { return TYPE_NAME; }
}} // namespace qpid::broker