blob: 3a9c9a86d9692424e24bc1c194ad766aca795d2b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Backup.h"
#include "HaBroker.h"
#include "Primary.h"
#include "ReplicationTest.h"
#include "IdSetter.h"
#include "ReplicatingSubscription.h"
#include "RemoteBackup.h"
#include "ConnectionObserver.h"
#include "QueueReplicator.h"
#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Uuid.h"
#include "qpid/types/Variant.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace ha {
using sys::Mutex;
using boost::shared_ptr;
using boost::intrusive_ptr;
using namespace std;
using namespace framing;
namespace {
const std::string CLIENT_PROCESS_NAME("qpid.client_process");
const std::string CLIENT_PID("qpid.client_pid");
const std::string CLIENT_PPID("qpid.client_ppid");
class PrimaryConnectionObserver : public broker::ConnectionObserver
{
public:
PrimaryConnectionObserver(Primary& p) : primary(p) {}
void opened(broker::Connection& c) { primary.opened(c); }
void closed(broker::Connection& c) { primary.closed(c); }
private:
Primary& primary;
};
class PrimaryBrokerObserver : public broker::BrokerObserver
{
public:
PrimaryBrokerObserver(Primary& p) : primary(p) {}
void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
private:
Primary& primary;
};
class ExpectedBackupTimerTask : public sys::TimerTask {
public:
ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline)
: TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {}
void fire() { primary.timeoutExpectedBackups(); }
private:
Primary& primary;
};
class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
public:
PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
void connectionException(framing::connection::CloseCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
}
void channelException(framing::session::DetachCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
}
void executionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
}
void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
void detach() {}
private:
const LogPrefix& logPrefix;
};
class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
public:
PrimarySessionHandlerObserver(const LogPrefix& logPrefix)
: errorListener(new PrimaryErrorListener(logPrefix)) {}
void newSessionHandler(broker::SessionHandler& sh) {
BrokerInfo info;
// Suppress error logging for backup connections
// TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
sh.setErrorListener(errorListener);
}
}
private:
boost::shared_ptr<PrimaryErrorListener> errorListener;
};
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix(hb.logPrefix), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
{
// Note that at this point, we are still rejecting client connections.
// So we are safe from client interference while we set up the primary.
hb.getMembership().setStatus(RECOVERING);
QPID_LOG(notice, logPrefix << "Promoted to primary");
// Process all QueueReplicators, handles auto-delete queues.
QueueReplicator::Vector qrs;
QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
if (!expect.empty()) {
// NOTE: RemoteBackups must be created before we set the BrokerObserver
// or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
QPID_LOG(notice, logPrefix << "Recovering backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, haBroker.logPrefix));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
setCatchupQueues(backup, true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout);
timerTask = new ExpectedBackupTimerTask(*this, deadline);
hb.getBroker().getTimer().add(timerTask);
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
checkReady(); // Outside lock
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
haBroker.getObserver()->setObserver(connectionObserver);
}
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}
void Primary::checkReady() {
bool activate = false;
{
Mutex::ScopedLock l(lock);
if (!active && expectedBackups.empty())
activate = active = true;
}
if (activate) {
membership.setStatus(ACTIVE); // Outside of lock.
QPID_LOG(notice, logPrefix << "All backups recovered.");
}
}
void Primary::checkReady(boost::shared_ptr<RemoteBackup> backup) {
bool ready = false;
{
Mutex::ScopedLock l(lock);
if (backup->reportReady()) {
BrokerInfo info = backup->getBrokerInfo();
info.setStatus(READY);
membership.add(info);
if (expectedBackups.erase(backup)) {
QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info);
ready = true;
}
else
QPID_LOG(info, logPrefix << "New backup is ready: " << info);
}
}
if (ready) checkReady(); // Outside lock
}
void Primary::timeoutExpectedBackups() {
try {
sys::Mutex::ScopedLock l(lock);
if (active) return; // Already activated
// Remove records for any expectedBackups that are not yet connected
// Allow backups that are connected to continue becoming ready.
for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end();)
{
// This loop erases elements of backups in backupDisconnect, so
// save and increment the iterator.
BackupSet::iterator j = i++;
boost::shared_ptr<RemoteBackup> backup = *j;
if (!backup->getConnection()) {
BrokerInfo info = backup->getBrokerInfo();
QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info);
backupDisconnect(backup, l); // Calls erase(j)
// Keep broker in membership but downgrade status to CATCHUP.
// The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
membership.add(info);
}
}
}
catch(const std::exception& e) {
QPID_LOG(error, logPrefix << "Error timing out backups: " << e.what());
// No-where for this exception to go.
}
checkReady();
}
void Primary::readyReplica(const ReplicatingSubscription& rs) {
shared_ptr<RemoteBackup> backup;
{
sys::Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
if (i != backups.end()) {
backup = i->second;
backup->ready(rs.getQueue());
}
}
if (backup) checkReady(backup);
}
// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
// Set replication argument.
ReplicateLevel level = replicationTest.useLevel(*q);
q->addArgument(QPID_REPLICATE, printable(level).str());
if (level) {
QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
<< " replication: " << printable(level));
// Give each queue a unique id. Used by backups to avoid confusion of
// same-named queues.
q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
{
Mutex::ScopedLock l(lock);
queueLimits.addQueue(q); // Throws if limit exceeded
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueCreate(q);
}
checkReady(); // Outside lock
}
}
// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
if (replicationTest.useLevel(*q)) {
QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
{
Mutex::ScopedLock l(lock);
queueLimits.removeQueue(q);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueDestroy(q);
}
checkReady(); // Outside lock
}
}
// NOTE: Called with exchange registry lock held.
void Primary::exchangeCreate(const ExchangePtr& ex) {
ReplicateLevel level = replicationTest.useLevel(*ex);
FieldTable args = ex->getArgs();
args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
if (level) {
QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
<< " replication: " << printable(level));
// Give each exchange a unique id to avoid confusion of same-named exchanges.
args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(Uuid(true).data())));
}
ex->setArgs(args);
}
// NOTE: Called with exchange registry lock held.
void Primary::exchangeDestroy(const ExchangePtr& ex) {
if (replicationTest.useLevel(*ex)) {
QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
// Do nothing
}
}
// New backup connected
shared_ptr<RemoteBackup> Primary::backupConnect(
const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
{
shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, haBroker.logPrefix));
queueLimits.addBackup(backup);
backups[info.getSystemId()] = backup;
return backup;
}
// Remove a backup. Caller should not release the shared pointer returend till
// outside the lock.
void Primary::backupDisconnect(shared_ptr<RemoteBackup> backup, Mutex::ScopedLock&) {
queueLimits.addBackup(backup);
types::Uuid id = backup->getBrokerInfo().getSystemId();
backup->cancel();
expectedBackups.erase(backup);
backups.erase(id);
membership.remove(id);
}
void Primary::opened(broker::Connection& connection) {
BrokerInfo info;
shared_ptr<RemoteBackup> backup;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (info.getStatus() == JOINING) {
info.setStatus(CATCHUP);
membership.add(info);
}
if (i == backups.end()) {
if (info.getStatus() == JOINING) {
info.setStatus(CATCHUP);
membership.add(info);
}
QPID_LOG(info, logPrefix << "New backup connection: " << info);
backup = backupConnect(info, connection, l);
}
else if (i->second->getConnection()) {
// The backup is failing over before we recieved the closed() call
// for its previous connection. Remove the old entry and create a new one.
QPID_LOG(error, logPrefix << "Known backup reconnect before disconnection: " << info);
backupDisconnect(i->second, l);
backup = backupConnect(info, connection, l);
} else {
QPID_LOG(info, logPrefix << "Known backup reconnection: " << info);
i->second->setConnection(&connection);
backup = i->second;
}
}
else {
const types::Variant::Map& properties = connection.getClientProperties();
std::ostringstream pinfo;
types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME);
// FIXME aconway 2014-08-13: Conditional on logging.
if (i != properties.end()) {
pinfo << " " << i->second;
i = properties.find(CLIENT_PID);
if (i != properties.end())
pinfo << "(" << i->second << ")";
}
QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str());
}
// Outside lock
if (backup) {
setCatchupQueues(backup, false);
checkReady(backup);
}
checkReady();
}
void Primary::closed(broker::Connection& connection) {
BrokerInfo info;
shared_ptr<RemoteBackup> backup;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
// NOTE: It is possible for a backup connection to be rejected while we
// are a backup, but closed() is called after we have become primary.
// Checking isConnected() lets us ignore such spurious closes.
if (i == backups.end()) {
QPID_LOG(info, logPrefix << "Disconnect from unknown backup " << info);
}
else if (i->second->getConnection() != &connection) {
QPID_LOG(info, logPrefix << "Late disconnect from backup " << info);
}
else {
QPID_LOG(info, logPrefix << "Disconnect from "
<< (i->second->getConnection() ? "" : "disconnected ")
<< "backup " << info);
// Assign to shared_ptr so it will be deleted after we release the lock.
backup = i->second;
backupDisconnect(backup, l);
}
}
checkReady();
}
boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
{
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
Role* Primary::promote() {
QPID_LOG(info, logPrefix << "Ignoring promotion, already primary");
return 0;
}
void Primary::setCatchupQueues(const RemoteBackupPtr& backup, bool createGuards) {
// Do queue iteration outside the lock to avoid deadlocks with QueueRegistry.
haBroker.getBroker().getQueues().eachQueue(
boost::bind(&RemoteBackup::catchupQueue, backup, _1, createGuards));
backup->startCatchup();
}
}} // namespace qpid::ha