blob: 8530413b35ab99954b3ca1d86e1a2612f6719a18 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
#include "QueueSnapshot.h"
#include "ReplicatingSubscription.h"
#include "Primary.h"
#include "HaBroker.h"
#include "qpid/assert.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
#include <sstream>
namespace qpid {
namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
using sys::Mutex;
using broker::amqp_0_10::MessageTransfer;
namespace { const string QPID_HA(QPID_HA_PREFIX); }
const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION(QPID_HA+"repsub");
const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
/* Called by SemanticState::consume to create a consumer */
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
bool ack,
bool acquire,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
) {
boost::shared_ptr<ReplicatingSubscription> rs;
std::string type = arguments.getAsString(QPID_REPLICATING_SUBSCRIPTION);
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
if (rs) rs->initialize();
return rs;
HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue_,
bool ack,
bool /*acquire*/,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
position(0), wasStopped(false), ready(false), cancelled(false),
// Called in subscription's connection thread when the subscription is created.
// Separate from ctor because we need to use shared_from_this
void ReplicatingSubscription::initialize() {
try {
FieldTable ft;
if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
throw InvalidArgumentException(
logPrefix.get()+"Can't subscribe, no broker info: "+getTag());
// Set a log prefix message that identifies the remote broker.
ostringstream os;
os << "Subscription to " << queue->getName() << " at ";
info.printId(os) << ": ";
logPrefix = os.str();
// If there's already a guard (we are in failover) use it, else create one.
if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix));
// NOTE: Once the observer is attached we can have concurrent
// calls to dequeued so we need to lock use of this->dequeues.
// However we must attach the observer _before_ we snapshot for
// initial dequeues to be sure we don't miss any dequeues
// between the snapshot and attaching the observer.
boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
// There may be no snapshot if the queue is being deleted concurrently.
if (!snapshot) {
throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted");
ReplicationIdSet primaryIds = snapshot->getSnapshot();
std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
ReplicationIdSet backupIds;
if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
// Initial dequeues are messages on backup but not on primary.
ReplicationIdSet initDequeues = backupIds - primaryIds;
QueuePosition front,back;
queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
skipEnqueue = backupIds - initDequeues; // Messages already on the backup.
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
position = front;
QPID_LOG(debug, logPrefix << "Subscribed: primary ["
<< front << "," << back << "]=" << primaryIds
<< ", guarded " << guard->getFirst()
<< ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")");
if (primary) primary->addReplica(*this);
Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues to the backup.
// There must be a shared_ptr(this) when sending.
catch (const std::exception& e) {
QPID_LOG(error, logPrefix << "Subscribe failed: " << e.what());
ReplicatingSubscription::~ReplicatingSubscription() {}
void ReplicatingSubscription::stopped() {
Mutex::ScopedLock l(lock);
// We have reached the last available message on the queue.
// Note that if messages have been removed out-of-order this may not be the
// head of the queue. We may not even have reached the guard
// position. However there are no more messages to protect and we will not
// be advanced any further, so we should consider ourselves guarded for
// purposes of readiness.
wasStopped = true;
// True if the next position for the ReplicatingSubscription is a guarded position.
bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
// See comment in stopped()
return wasStopped || (position+1 >= guard->getFirst());
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(
const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
Mutex::ScopedLock l(lock);
ReplicationId id = m.getReplicationId();
position = m.getSequence();
try {
bool result = false;
if (skipEnqueue.contains(id)) {
QPID_LOG(trace, logPrefix << "Skip " << logMessageId(*getQueue(), m));
skipEnqueue -= id;
guard->complete(id); // This will never be acknowledged.
result = true;
else {
QPID_LOG(trace, logPrefix << "Replicated " << logMessageId(*getQueue(), m));
if (!ready && !isGuarded(l)) unready += id;
sendIdEvent(id, l);
result = ConsumerImpl::deliver(c, m);
return result;
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Error replicating " << logMessageId(*getQueue(), m)
<< ": " << e.what());
void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
if (!ready && isGuarded(l) && unready.empty()) {
ready = true;
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
if (position+1 >= guard->getFirst()) {
QPID_LOG(debug, logPrefix << "Caught up at " << position);
} else {
QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst());
if (primary) primary->readyReplica(*this);
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
Mutex::ScopedLock l(lock);
if (cancelled) return;
cancelled = true;
QPID_LOG(debug, logPrefix << "Cancelled");
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
ReplicationId id = r.getReplicationId();
QPID_LOG(trace, logPrefix << "Acknowledged " <<
logMessageId(*getQueue(), r.getMessageId(), id));
Mutex::ScopedLock l(lock);
unready -= id;
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
DequeueEvent d(dequeues);
sendEvent(d, l);
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const broker::Message& m)
ReplicationId id = m.getReplicationId();
QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
Mutex::ScopedLock l(lock);
notify(); // Ensure a call to doDispatch
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
sendEvent(IdEvent(pos), l);
void ReplicatingSubscription::sendEvent(const Event& event, Mutex::ScopedLock&)
Mutex::ScopedUnlock u(lock);
// Send the event directly to the base consumer implementation. The dummy
// consumer prevents acknowledgements being handled, which is what we want
// for events
ConsumerImpl::deliver(QueueCursor(), event.message(), boost::shared_ptr<Consumer>());
// Called in subscription's connection thread.
bool ReplicatingSubscription::doDispatch()
Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
try {
return ConsumerImpl::doDispatch();
catch (const std::exception& e) {
QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
return false;
}} // namespace qpid::ha