blob: 933716e8faf9bb18b430ac53eb395435ef6687bc [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "QueueGuard.h"
#include "QueueRange.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
#include <sstream>
namespace qpid {
namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
using sys::Mutex;
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
namespace {
const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
// Scan the queue for gaps and add them to the subscriptions dequed set.
class DequeueScanner
{
public:
DequeueScanner(
ReplicatingSubscription& rs,
SequenceNumber front_,
SequenceNumber back_ // Inclusive
) : subscription(rs), front(front_), back(back_)
{
assert(front <= back);
// INVARIANT deques have been added for positions <= at.
at = front - 1;
}
void operator()(const Message& m) {
if (m.getSequence() >= front && m.getSequence() <= back) {
if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
at = m.getSequence();
}
}
// Must call after scanning the queue.
void finish() {
if (at < back) subscription.dequeued(at+1, back);
}
private:
ReplicatingSubscription& subscription;
SequenceNumber front;
SequenceNumber back;
SequenceNumber at;
};
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
}
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
bool ack,
bool acquire,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
) {
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
rs->initialize();
}
return rs;
}
ReplicatingSubscription::ReplicatingSubscription(
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
bool ack,
bool /*acquire*/,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
ready(false)
{
try {
FieldTable ft;
if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
throw Exception("Replicating subscription does not have broker info: " + tag);
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
ostringstream os;
os << "Primary " << queue->getName() << "@" << info << ": ";
logPrefix = os.str();
// NOTE: Once the guard is attached we can have concurrent
// calls to dequeued so we need to lock use of this->dequeues.
//
// However we must attach the guard _before_ we scan for
// initial dequeues to be sure we don't miss any dequeues
// between the scan and attaching the guard.
if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
guard->attach(*this);
QueueRange backup(arguments); // Remote backup range.
QueueRange backupOriginal(backup);
QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
// Sync backup and primary queues, don't send messages already on the backup
if (backup.front > primary.front || // Missing messages at front
backup.back < primary.front || // No overlap
primary.empty() || backup.empty()) // Empty
{
// No useful overlap - erase backup and start from the beginning
if (!backup.empty()) dequeued(backup.front, backup.back);
position = primary.front-1;
}
else { // backup and primary do overlap.
// Remove messages from backup that are not in primary.
if (primary.back < backup.back) {
dequeued(primary.back+1, backup.back); // Trim excess messages at back
backup.back = primary.back;
}
if (backup.front < primary.front) {
dequeued(backup.front, primary.front-1); // Trim excess messages at front
backup.front = primary.front;
}
DequeueScanner scan(*this, backup.front, backup.back);
// FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
scan.finish();
position = backup.back;
//move cursor to position
queue->seek(*this, position);
}
// NOTE: we are assuming that the messages that are on the backup are
// consistent with those on the primary. If the backup is a replica
// queue and hasn't been tampered with then that will be the case.
QPID_LOG(debug, logPrefix << "Subscribed:"
<< " backup:" << backupOriginal << " adjusted backup:" << backup
<< " primary:" << primary
<< " catch-up: " << position << "-" << primary.back
<< "(" << primary.back-position << ")");
// Check if we are ready yet.
if (guard->subscriptionStart(position)) setReady();
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Creation error: " << e.what()
<< ": arguments=" << getArguments());
throw;
}
}
ReplicatingSubscription::~ReplicatingSubscription() {}
// Called in subscription's connection thread when the subscription is created.
// Called separate from ctor because sending events requires
// shared_from_this
//
void ReplicatingSubscription::initialize() {
try {
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues and position to the backup.
// There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
sendPositionEvent(position, l);
backupPosition = position;
}
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
<< ": arguments=" << getArguments());
throw;
}
}
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(
const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
{
try {
QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
{
Mutex::ScopedLock l(lock);
position = m.getSequence();
// m.getSequence() is the position of the new message on local queue.
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
QPID_MSG(logPrefix << "Expected position > " << backupPosition
<< " but got " << m.getSequence()));
if (m.getSequence() - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
// Send the position before message was enqueued.
sendPositionEvent(m.getSequence()-1, l);
}
// Backup will automatically advance by 1 on delivery of message.
backupPosition = m.getSequence();
}
return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
<< ": " << e.what());
throw;
}
}
void ReplicatingSubscription::setReady() {
{
Mutex::ScopedLock l(lock);
if (ready) return;
ready = true;
}
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
QPID_LOG(debug, logPrefix << "Cancelled");
guard->cancel();
ConsumerImpl::cancel();
}
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
guard->complete(r.getMessageId());
// If next message is protected by the guard then we are ready
if (r.getMessageId() >= guard->getRange().back) setReady();
ConsumerImpl::acknowledged(r);
}
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
dequeues.clear();
buffer.reset();
{
Mutex::ScopedUnlock u(lock);
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
}
}
// Called via QueueObserver::dequeued override on guard.
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const Message& m)
{
QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
{
Mutex::ScopedLock l(lock);
dequeues.add(m.getSequence());
}
notify(); // Ensure a call to doDispatch
}
// Called during construction while scanning for initial dequeues.
void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
{
Mutex::ScopedLock l(lock);
dequeues.add(first,last);
}
}
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
pos.encode(buffer);
buffer.reset();
{
Mutex::ScopedUnlock u(lock);
sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
}
}
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());
header.setBof(false);
header.setEof(false);
header.setBos(true);
header.setEos(true);
content.setBof(false);
content.setEof(true);
content.setBos(true);
content.setEos(true);
event->getFrames().append(method);
event->getFrames().append(header);
event->getFrames().append(content);
DeliveryProperties* props =
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
// Send the event directly to the base consumer implementation.
//dummy consumer prevents acknowledgements being handled, which is what we want for events
ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
}
// Called in subscription's connection thread.
bool ReplicatingSubscription::doDispatch()
{
{
Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
try {
return ConsumerImpl::doDispatch();
}
catch (const std::exception& e) {
// FIXME aconway 2012-10-05: detect queue deletion, no warning.
QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
return false;
}
}
}} // namespace qpid::ha