| /* |
| * |
| * Copyright (c) 2006 The Apache Software Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| /** |
| * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> |
| * |
| * The cluster works on the principle that if all members of the |
| * cluster receive identical input, they will all produce identical |
| * results. cluster::Connections intercept data received from clients |
| * and multicast it via CPG. The data is processed (passed to the |
| * broker::Connection) only when it is received from CPG in cluster |
| * order. Each cluster member has Connection objects for directly |
| * connected clients and "shadow" Connection objects for connections |
| * to other members. |
| * |
| * This assumes that all broker actions occur deterministically in |
| * response to data arriving on client connections. There are two |
| * situations where this assumption fails: |
| * - sending data in response to polling local connections for writabiliy. |
| * - taking actions based on a timer or timestamp comparison. |
| * |
| * IMPORTANT NOTE: any time code is added to the broker that uses timers, |
| * the cluster may need to be updated to take account of this. |
| * |
| * |
| * USE OF TIMESTAMPS IN THE BROKER |
| * |
| * The following are the current areas where broker uses timers or timestamps: |
| * |
| * - Producer flow control: broker::SemanticState uses |
| * connection::getClusterOrderOutput. a FrameHandler that sends |
| * frames to the client via the cluster. Used by broker::SessionState |
| * |
| * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is |
| * implemented by cluster::ExpiryPolicy. |
| * |
| * - Connection heartbeat: sends connection controls, not part of |
| * session command counting so OK to ignore. |
| * |
| * - LinkRegistry: only cluster elder is ever active for links. |
| * |
| * - management::ManagementBroker: uses MessageHandler supplied by cluster |
| * to send messages to the broker via the cluster. |
| * |
| * - Dtx: not yet supported with cluster. |
| * |
| * cluster::ExpiryPolicy implements the strategy for message expiry. |
| * |
| * ClusterTimer implements periodic timed events in the cluster context. |
| * Used for periodic management events. |
| * |
| * <h1>CLUSTER PROTOCOL OVERVIEW</h1> |
| * |
| * Messages sent to/from CPG are called Events. |
| * |
| * An Event carries a ConnectionId, which includes a MemberId and a |
| * connection number. |
| * |
| * Events are either |
| * - Connection events: non-0 connection number and are associated with a connection. |
| * - Cluster Events: 0 connection number, are not associated with a connection. |
| * |
| * Events are further categorized as: |
| * - Control: carries method frame(s) that affect cluster behavior. |
| * - Data: carries raw data received from a client connection. |
| * |
| * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml |
| * which defines two classes: |
| * - cluster: cluster control information. |
| * - cluster.connection: control information for a specific connection. |
| * |
| * The following combinations are legal: |
| * - Data frames carrying connection data. |
| * - Cluster control events carrying cluster commands. |
| * - Connection control events carrying cluster.connection commands. |
| * - Connection control events carrying non-cluster frames: frames sent to the client. |
| * e.g. flow-control frames generated on a timer. |
| * |
| * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> |
| * |
| * @see InitialStatusMap |
| * |
| * When a new member joins the CPG group, all members (including the |
| * new one) multicast their "initial status." The new member is in |
| * PRE_INIT mode until it gets a complete set of initial status |
| * messages from all cluster members. In a newly-forming cluster is |
| * then in INIT mode until the configured cluster-size members have |
| * joined. |
| * |
| * The newcomer uses initial status to determine |
| * - The cluster UUID |
| * - Am I speaking the correct version of the cluster protocol? |
| * - Do I need to get an update from an existing active member? |
| * - Can I recover from my own store? |
| * |
| * Pre-initialization happens in the Cluster constructor (plugin |
| * early-init phase) because it needs to set the recovery flag before |
| * the store initializes. This phase lasts until inital-status is |
| * received for all active members. The PollableQueues and Multicaster |
| * are in "bypass" mode during this phase since the poller has not |
| * started so there are no threads to serve pollable queues. |
| * |
| * The remaining initialization happens in Cluster::initialize() or, |
| * if cluster-size=N is specified, in the deliver thread when an |
| * initial-status control is delivered that brings the total to N. |
| */ |
| #include "qpid/Exception.h" |
| #include "qpid/cluster/Cluster.h" |
| #include "qpid/sys/ClusterSafe.h" |
| #include "qpid/cluster/ClusterSettings.h" |
| #include "qpid/cluster/Connection.h" |
| #include "qpid/cluster/UpdateClient.h" |
| #include "qpid/cluster/RetractClient.h" |
| #include "qpid/cluster/FailoverExchange.h" |
| #include "qpid/cluster/UpdateExchange.h" |
| #include "qpid/cluster/ClusterTimer.h" |
| |
| #include "qpid/assert.h" |
| #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" |
| #include "qmf/org/apache/qpid/cluster/Package.h" |
| #include "qpid/broker/Broker.h" |
| #include "qpid/broker/Connection.h" |
| #include "qpid/broker/NullMessageStore.h" |
| #include "qpid/broker/QueueRegistry.h" |
| #include "qpid/broker/SessionState.h" |
| #include "qpid/broker/SignalHandler.h" |
| #include "qpid/framing/AMQFrame.h" |
| #include "qpid/framing/AMQP_AllOperations.h" |
| #include "qpid/framing/AllInvoker.h" |
| #include "qpid/framing/ClusterConfigChangeBody.h" |
| #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" |
| #include "qpid/framing/ClusterConnectionAbortBody.h" |
| #include "qpid/framing/ClusterRetractOfferBody.h" |
| #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" |
| #include "qpid/framing/ClusterReadyBody.h" |
| #include "qpid/framing/ClusterShutdownBody.h" |
| #include "qpid/framing/ClusterUpdateOfferBody.h" |
| #include "qpid/framing/ClusterUpdateRequestBody.h" |
| #include "qpid/framing/ClusterConnectionAnnounceBody.h" |
| #include "qpid/framing/ClusterErrorCheckBody.h" |
| #include "qpid/framing/ClusterTimerWakeupBody.h" |
| #include "qpid/framing/MessageTransferBody.h" |
| #include "qpid/log/Helpers.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/management/IdAllocator.h" |
| #include "qpid/management/ManagementAgent.h" |
| #include "qpid/memory.h" |
| #include "qpid/sys/Thread.h" |
| |
| #include <boost/shared_ptr.hpp> |
| #include <boost/bind.hpp> |
| #include <boost/cast.hpp> |
| #include <boost/current_function.hpp> |
| #include <algorithm> |
| #include <iterator> |
| #include <map> |
| #include <ostream> |
| |
| |
| namespace qpid { |
| namespace cluster { |
| using namespace qpid; |
| using namespace qpid::framing; |
| using namespace qpid::sys; |
| using namespace qpid::cluster; |
| using namespace framing::cluster; |
| using namespace std; |
| using management::ManagementAgent; |
| using management::ManagementObject; |
| using management::Manageable; |
| using management::Args; |
| namespace _qmf = ::qmf::org::apache::qpid::cluster; |
| |
| /** |
| * NOTE: must increment this number whenever any incompatible changes in |
| * cluster protocol/behavior are made. It allows early detection and |
| * sensible reporting of an attempt to mix different versions in a |
| * cluster. |
| * |
| * Currently use SVN revision to avoid clashes with versions from |
| * different branches. |
| */ |
| const uint32_t Cluster::CLUSTER_VERSION = 904565; |
| |
| struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { |
| qpid::cluster::Cluster& cluster; |
| MemberId member; |
| Cluster::Lock& l; |
| ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} |
| |
| void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } |
| |
| void initialStatus(uint32_t version, bool active, const Uuid& clusterId, |
| uint8_t storeState, const Uuid& shutdownId, |
| const std::string& firstConfig) |
| { |
| cluster.initialStatus( |
| member, version, active, clusterId, |
| framing::cluster::StoreState(storeState), shutdownId, |
| firstConfig, l); |
| } |
| void ready(const std::string& url) { |
| cluster.ready(member, url, l); |
| } |
| void configChange(const std::string& members, |
| const std::string& left, |
| const std::string& joined) |
| { |
| cluster.configChange(member, members, left, joined, l); |
| } |
| void updateOffer(uint64_t updatee) { |
| cluster.updateOffer(member, updatee, l); |
| } |
| void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } |
| void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } |
| void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { |
| cluster.errorCheck(member, type, frameSeq, l); |
| } |
| void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } |
| void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } |
| |
| void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } |
| |
| bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } |
| }; |
| |
| Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : |
| settings(set), |
| broker(b), |
| mgmtObject(0), |
| poller(b.getPoller()), |
| cpg(*this), |
| name(settings.name), |
| myUrl(settings.url.empty() ? Url() : Url(settings.url)), |
| self(cpg.self()), |
| clusterId(true), |
| expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), |
| mcast(cpg, poller, boost::bind(&Cluster::leave, this)), |
| dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), |
| deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), |
| boost::bind(&Cluster::leave, this), |
| "Error decoding events, may indicate a broker version mismatch", |
| poller), |
| deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1), |
| boost::bind(&Cluster::leave, this), |
| "Error delivering frames", |
| poller), |
| quorum(boost::bind(&Cluster::leave, this)), |
| decoder(boost::bind(&Cluster::deliverFrame, this, _1)), |
| discarding(true), |
| state(PRE_INIT), |
| initMap(self, settings.size), |
| store(broker.getDataDir().getPath()), |
| elder(false), |
| lastSize(0), |
| lastBroker(false), |
| updateRetracted(false), |
| error(*this) |
| { |
| // We give ownership of the timer to the broker and keep a plain pointer. |
| // This is OK as it means the timer has the same lifetime as the broker. |
| timer = new ClusterTimer(*this); |
| broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); |
| |
| mAgent = broker.getManagementAgent(); |
| if (mAgent != 0){ |
| _qmf::Package packageInit(mAgent); |
| mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); |
| mAgent->addObject (mgmtObject); |
| mgmtObject->set_status("JOINING"); |
| } |
| |
| // Failover exchange provides membership updates to clients. |
| failoverExchange.reset(new FailoverExchange(this)); |
| broker.getExchanges().registerExchange(failoverExchange); |
| |
| // Update exchange is used during updates to replicate messages |
| // without modifying delivery-properties.exchange. |
| broker.getExchanges().registerExchange( |
| boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); |
| |
| // Load my store status before we go into initialization |
| if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { |
| store.load(); |
| if (store.getClusterId()) |
| clusterId = store.getClusterId(); // Use stored ID if there is one. |
| QPID_LOG(notice, "Cluster store state: " << store) |
| } |
| cpg.join(name); |
| // pump the CPG dispatch manually till we get past PRE_INIT. |
| while (state == PRE_INIT) |
| cpg.dispatchOne(); |
| } |
| |
| Cluster::~Cluster() { |
| broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer |
| if (updateThread.id()) updateThread.join(); // Join the previous updatethread. |
| } |
| |
| void Cluster::initialize() { |
| if (settings.quorum) quorum.start(poller); |
| if (myUrl.empty()) |
| myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); |
| broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); |
| broker.setExpiryPolicy(expiryPolicy); |
| dispatcher.start(); |
| deliverEventQueue.bypassOff(); |
| deliverEventQueue.start(); |
| deliverFrameQueue.bypassOff(); |
| deliverFrameQueue.start(); |
| mcast.start(); |
| |
| // Run initMapCompleted immediately to process the initial configuration. |
| assert(state == INIT); |
| initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. |
| |
| // Add finalizer last for exception safety. |
| broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); |
| } |
| |
| // Called in connection thread to insert a client connection. |
| void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { |
| assert(c->getId().getMember() == self); |
| localConnections.insert(c); |
| } |
| |
| // Called in connection thread to insert an updated shadow connection. |
| void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { |
| QPID_LOG(info, *this << " new shadow connection " << c->getId()); |
| // Safe to use connections here because we're pre-catchup, stalled |
| // and discarding, so deliveredFrame is not processing any |
| // connection events. |
| assert(discarding); |
| pair<ConnectionMap::iterator, bool> ib |
| = connections.insert(ConnectionMap::value_type(c->getId(), c)); |
| assert(ib.second); |
| } |
| |
| void Cluster::erase(const ConnectionId& id) { |
| Lock l(lock); |
| erase(id,l); |
| } |
| |
| // Called by Connection::deliverClose() in deliverFrameQueue thread. |
| void Cluster::erase(const ConnectionId& id, Lock&) { |
| QPID_LOG(info, *this << " connection closed " << id); |
| connections.erase(id); |
| decoder.erase(id); |
| } |
| |
| std::vector<string> Cluster::getIds() const { |
| Lock l(lock); |
| return getIds(l); |
| } |
| |
| std::vector<string> Cluster::getIds(Lock&) const { |
| return map.memberIds(); |
| } |
| |
| std::vector<Url> Cluster::getUrls() const { |
| Lock l(lock); |
| return getUrls(l); |
| } |
| |
| std::vector<Url> Cluster::getUrls(Lock&) const { |
| return map.memberUrls(); |
| } |
| |
| void Cluster::leave() { |
| Lock l(lock); |
| leave(l); |
| } |
| |
| #define LEAVE_TRY(STMT) try { STMT; } \ |
| catch (const std::exception& e) { \ |
| QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ |
| } do {} while(0) |
| |
| void Cluster::leave(Lock&) { |
| if (state != LEFT) { |
| state = LEFT; |
| QPID_LOG(notice, *this << " leaving cluster " << name); |
| // Finalize connections now now to avoid problems later in destructor. |
| ClusterSafeScope css; // Don't trigger cluster-safe assertions. |
| LEAVE_TRY(localConnections.clear()); |
| LEAVE_TRY(connections.clear()); |
| LEAVE_TRY(broker::SignalHandler::shutdown()); |
| } |
| } |
| |
| // Deliver CPG message. |
| void Cluster::deliver( |
| cpg_handle_t /*handle*/, |
| const cpg_name* /*group*/, |
| uint32_t nodeid, |
| uint32_t pid, |
| void* msg, |
| int msg_len) |
| { |
| MemberId from(nodeid, pid); |
| framing::Buffer buf(static_cast<char*>(msg), msg_len); |
| Event e(Event::decodeCopy(from, buf)); |
| deliverEvent(e); |
| } |
| |
| void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } |
| |
| void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } |
| |
| const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { |
| return (body && body->getMethod() && |
| body->getMethod()->isA<ClusterUpdateOfferBody>()) ? |
| static_cast<const ClusterUpdateOfferBody*>(body) : 0; |
| } |
| |
| const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { |
| return (body && body->getMethod() && |
| body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ? |
| static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; |
| } |
| |
| // Handler for deliverEventQueue. |
| // This thread decodes frames from events. |
| void Cluster::deliveredEvent(const Event& e) { |
| if (e.isCluster()) { |
| EventFrame ef(e, e.getFrame()); |
| // Stop the deliverEventQueue on update offers. |
| // This preserves the connection decoder fragments for an update. |
| const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); |
| if (offer) { |
| QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId() |
| << " to " << MemberId(offer->getUpdatee())); |
| deliverEventQueue.stop(); |
| } |
| deliverFrame(ef); |
| } |
| else if(!discarding) { |
| if (e.isControl()) |
| deliverFrame(EventFrame(e, e.getFrame())); |
| else { |
| try { decoder.decode(e, e.getData()); } |
| catch (const Exception& ex) { |
| // Close a connection that is sending us invalid data. |
| QPID_LOG(error, *this << " aborting connection " |
| << e.getConnectionId() << ": " << ex.what()); |
| framing::AMQFrame abort((ClusterConnectionAbortBody())); |
| deliverFrame(EventFrame(EventHeader(CONTROL, e.getConnectionId()), abort)); |
| } |
| } |
| } |
| } |
| |
| void Cluster::flagError( |
| Connection& connection, ErrorCheck::ErrorType type, const std::string& msg) |
| { |
| Mutex::ScopedLock l(lock); |
| if (connection.isCatchUp()) { |
| QPID_LOG(critical, *this << " error on update connection " << connection |
| << ": " << msg); |
| leave(l); |
| } |
| error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); |
| } |
| |
| // Handler for deliverFrameQueue. |
| // This thread executes the main logic. |
| void Cluster::deliveredFrame(const EventFrame& efConst) { |
| Mutex::ScopedLock l(lock); |
| sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. |
| if (state == LEFT) return; |
| EventFrame e(efConst); |
| const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); |
| if (offer && error.isUnresolved()) { |
| // We can't honour an update offer that is delivered while an |
| // error is in progress so replace it with a retractOffer and re-start |
| // the event queue. |
| e.frame = AMQFrame( |
| ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); |
| deliverEventQueue.start(); |
| } |
| // Process each frame through the error checker. |
| if (error.isUnresolved()) { |
| error.delivered(e); |
| while (error.canProcess()) // There is a frame ready to process. |
| processFrame(error.getNext(), l); |
| } |
| else |
| processFrame(e, l); |
| } |
| |
| |
| void Cluster::processFrame(const EventFrame& e, Lock& l) { |
| if (e.isCluster()) { |
| QPID_LOG(trace, *this << " DLVR: " << e); |
| ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); |
| if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) |
| throw Exception(QPID_MSG("Invalid cluster control")); |
| } |
| else if (state >= CATCHUP) { |
| map.incrementFrameSeq(); |
| ConnectionPtr connection = getConnection(e, l); |
| if (connection) { |
| QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); |
| connection->deliveredFrame(e); |
| } |
| else |
| QPID_LOG(trace, *this << " DROP (no connection): " << e); |
| } |
| else // Drop connection frames while state < CATCHUP |
| QPID_LOG(trace, *this << " DROP (joining): " << e); |
| } |
| |
| // Called in deliverFrameQueue thread |
| ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { |
| ConnectionId id = e.connectionId; |
| ConnectionMap::iterator i = connections.find(id); |
| if (i != connections.end()) return i->second; |
| ConnectionPtr cp; |
| // If the frame is an announcement for a new connection, add it. |
| const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); |
| if (e.frame.getBody() && e.frame.getMethod() && announce) |
| { |
| if (id.getMember() == self) { // Announces one of my own |
| cp = localConnections.getErase(id); |
| assert(cp); |
| } |
| else { // New remote connection, create a shadow. |
| qpid::sys::SecuritySettings secSettings; |
| if (announce) { |
| secSettings.ssf = announce->getSsf(); |
| secSettings.authid = announce->getAuthid(); |
| secSettings.nodict = announce->getNodict(); |
| } |
| cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings); |
| } |
| connections.insert(ConnectionMap::value_type(id, cp)); |
| } |
| return cp; |
| } |
| |
| Cluster::ConnectionVector Cluster::getConnections(Lock&) { |
| ConnectionVector result(connections.size()); |
| std::transform(connections.begin(), connections.end(), result.begin(), |
| boost::bind(&ConnectionMap::value_type::second, _1)); |
| return result; |
| } |
| |
| // CPG config-change callback. |
| void Cluster::configChange ( |
| cpg_handle_t /*handle*/, |
| const cpg_name */*group*/, |
| const cpg_address *members, int nMembers, |
| const cpg_address *left, int nLeft, |
| const cpg_address *joined, int nJoined) |
| { |
| Mutex::ScopedLock l(lock); |
| string membersStr, leftStr, joinedStr; |
| // Encode members and enqueue as an event so the config change can |
| // be executed in the correct thread. |
| for (const cpg_address* p = members; p < members+nMembers; ++p) |
| membersStr.append(MemberId(*p).str()); |
| for (const cpg_address* p = left; p < left+nLeft; ++p) |
| leftStr.append(MemberId(*p).str()); |
| for (const cpg_address* p = joined; p < joined+nJoined; ++p) |
| joinedStr.append(MemberId(*p).str()); |
| deliverEvent(Event::control(ClusterConfigChangeBody( |
| ProtocolVersion(), membersStr, leftStr, joinedStr), |
| self)); |
| } |
| |
| void Cluster::setReady(Lock&) { |
| state = READY; |
| if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); |
| mcast.setReady(); |
| broker.getQueueEvents().enable(); |
| enableClusterSafe(); // Enable cluster-safe assertions. |
| } |
| |
| void Cluster::initMapCompleted(Lock& l) { |
| // Called on completion of the initial status map. |
| QPID_LOG(debug, *this << " initial status map complete. "); |
| if (state == PRE_INIT) { |
| // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. |
| // We decide here whether we want to recover from our store. |
| // We won't recover if we are joining an active cluster or our store is dirty. |
| if (store.hasStore() && |
| store.getState() != STORE_STATE_EMPTY_STORE && |
| (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) |
| broker.setRecovery(false); // Ditch my current store. |
| state = INIT; |
| } |
| else if (state == INIT) { |
| // INIT means we are past Cluster::initialize(). |
| |
| // If we're forming an initial cluster (no active members) |
| // then we wait to reach the configured cluster-size |
| if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { |
| QPID_LOG(info, *this << initMap.getActualSize() |
| << " members, waiting for at least " << initMap.getRequiredSize()); |
| return; |
| } |
| initMap.checkConsistent(); |
| |
| elders = initMap.getElders(); |
| QPID_LOG(debug, *this << " elders: " << elders); |
| if (elders.empty()) |
| becomeElder(l); |
| else { |
| broker.getLinks().setPassive(true); |
| broker.getQueueEvents().disable(); |
| QPID_LOG(info, *this << " not active for links."); |
| } |
| setClusterId(initMap.getClusterId(), l); |
| if (store.hasStore()) store.dirty(clusterId); |
| |
| if (initMap.isUpdateNeeded()) { // Joining established cluster. |
| broker.setRecovery(false); // Ditch my current store. |
| broker.setClusterUpdatee(true); |
| if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. |
| state = JOINER; |
| mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); |
| QPID_LOG(notice, *this << " joining cluster " << name); |
| } |
| else { // I can go ready. |
| discarding = false; |
| map.resetConfigSeq(); // Start from config-seq = 0 |
| store.setConfigSeq(map.getConfigSeq()); |
| setReady(l); |
| memberUpdate(l); |
| mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); |
| QPID_LOG(notice, *this << " joined cluster " << name); |
| } |
| } |
| } |
| |
| void Cluster::configChange(const MemberId&, |
| const std::string& membersStr, |
| const std::string& leftStr, |
| const std::string& joinedStr, |
| Lock& l) |
| { |
| if (state == LEFT) return; |
| MemberSet members = decodeMemberSet(membersStr); |
| MemberSet left = decodeMemberSet(leftStr); |
| MemberSet joined = decodeMemberSet(joinedStr); |
| QPID_LOG(notice, *this << " Membership update " << map.getConfigSeq() << ": " |
| << members); |
| QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); |
| QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); |
| |
| // Update initital status for members joining or leaving. |
| elders = intersection(elders, members); |
| if (elders.empty() && INIT < state && state < CATCHUP) { |
| QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); |
| leave(l); |
| return; |
| } |
| bool memberChange = map.configChange(members); |
| store.setConfigSeq(map.getConfigSeq()); |
| |
| // Update initital status for members joining or leaving. |
| initMap.configChange(members); |
| if (initMap.isResendNeeded()) { |
| mcast.mcastControl( |
| ClusterInitialStatusBody( |
| ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, |
| store.getState(), store.getShutdownId(), |
| initMap.getFirstConfigStr() |
| ), |
| self); |
| } |
| if (initMap.transitionToComplete()) initMapCompleted(l); |
| |
| if (state >= CATCHUP && memberChange) { |
| memberUpdate(l); |
| if (elders.empty()) becomeElder(l); |
| } |
| } |
| |
| void Cluster::becomeElder(Lock&) { |
| if (elder) return; // We were already the elder. |
| // We are the oldest, reactive links if necessary |
| QPID_LOG(info, *this << " became the elder, active for links."); |
| elder = true; |
| broker.getLinks().setPassive(false); |
| timer->becomeElder(); |
| } |
| |
| void Cluster::makeOffer(const MemberId& id, Lock& ) { |
| if (state == READY && map.isJoiner(id)) { |
| state = OFFER; |
| QPID_LOG(info, *this << " send update-offer to " << id); |
| mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self); |
| } |
| } |
| |
| namespace { |
| struct AppendQueue { |
| ostream* os; |
| AppendQueue(ostream& o) : os(&o) {} |
| void operator()(const boost::shared_ptr<broker::Queue>& q) { |
| (*os) << " " << q->getName() << "=" << q->getMessageCount(); |
| } |
| }; |
| } // namespace |
| |
| // Log a snapshot of broker state, used for debugging inconsistency problems. |
| // May only be called in deliver thread. |
| std::string Cluster::debugSnapshot() { |
| assertClusterSafe(); |
| std::ostringstream msg; |
| msg << "queue snapshot at " << map.getFrameSeq() << ":"; |
| AppendQueue append(msg); |
| broker.getQueues().eachQueue(append); |
| return msg.str(); |
| } |
| |
| // Called from Broker::~Broker when broker is shut down. At this |
| // point we know the poller has stopped so no poller callbacks will be |
| // invoked. We must ensure that CPG has also shut down so no CPG |
| // callbacks will be invoked. |
| // |
| void Cluster::brokerShutdown() { |
| sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. |
| try { cpg.shutdown(); } |
| catch (const std::exception& e) { |
| QPID_LOG(error, *this << " shutting down CPG: " << e.what()); |
| } |
| delete this; |
| } |
| |
| void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { |
| map.updateRequest(id, url); |
| makeOffer(id, l); |
| } |
| |
| void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, |
| const framing::Uuid& id, |
| framing::cluster::StoreState store, |
| const framing::Uuid& shutdownId, |
| const std::string& firstConfig, |
| Lock& l) |
| { |
| if (version != CLUSTER_VERSION) { |
| QPID_LOG(critical, *this << " incompatible cluster versions " << |
| version << " != " << CLUSTER_VERSION); |
| leave(l); |
| return; |
| } |
| QPID_LOG_IF(debug, state == PRE_INIT, *this |
| << " received initial status from " << member); |
| initMap.received( |
| member, |
| ClusterInitialStatusBody(ProtocolVersion(), version, active, id, |
| store, shutdownId, firstConfig) |
| ); |
| if (initMap.transitionToComplete()) initMapCompleted(l); |
| } |
| |
| void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { |
| try { |
| if (map.ready(id, Url(url))) |
| memberUpdate(l); |
| if (state == CATCHUP && id == self) { |
| setReady(l); |
| QPID_LOG(notice, *this << " caught up."); |
| } |
| } catch (const Url::Invalid& e) { |
| QPID_LOG(error, "Invalid URL in cluster ready command: " << url); |
| } |
| } |
| |
| void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { |
| // NOTE: deliverEventQueue has been stopped at the update offer by |
| // deliveredEvent in case an update is required. |
| if (state == LEFT) return; |
| MemberId updatee(updateeInt); |
| boost::optional<Url> url = map.updateOffer(updater, updatee); |
| if (updater == self) { |
| assert(state == OFFER); |
| if (url) // My offer was first. |
| updateStart(updatee, *url, l); |
| else { // Another offer was first. |
| QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall"); |
| setReady(l); |
| makeOffer(map.firstJoiner(), l); // Maybe make another offer. |
| deliverEventQueue.start(); // Go back to normal processing |
| } |
| } |
| else if (updatee == self && url) { |
| assert(state == JOINER); |
| state = UPDATEE; |
| QPID_LOG(notice, *this << " receiving update from " << updater); |
| checkUpdateIn(l); |
| } |
| else { |
| QPID_LOG(debug,*this << " unstall, ignore update " << updater |
| << " to " << updatee); |
| deliverEventQueue.start(); // Not involved in update. |
| } |
| if (updatee != self && url) { |
| QPID_LOG(debug, debugSnapshot()); |
| if (mAgent) mAgent->clusterUpdate(); |
| // Updatee will call clusterUpdate when update completes |
| } |
| } |
| |
| static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { |
| client::ConnectionSettings cs; |
| cs.username = settings.username; |
| cs.password = settings.password; |
| cs.mechanism = settings.mechanism; |
| return cs; |
| } |
| |
| void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { |
| // An offer was received while handling an error, and converted to a retract. |
| // Behavior is very similar to updateOffer. |
| if (state == LEFT) return; |
| MemberId updatee(updateeInt); |
| boost::optional<Url> url = map.updateOffer(updater, updatee); |
| if (updater == self) { |
| assert(state == OFFER); |
| if (url) { // My offer was first. |
| if (updateThread.id()) |
| updateThread.join(); // Join the previous updateThread to avoid leaks. |
| updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); |
| } |
| setReady(l); |
| makeOffer(map.firstJoiner(), l); // Maybe make another offer. |
| // Don't unstall the event queue, that was already done in deliveredFrame |
| } |
| QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee); |
| } |
| |
| void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { |
| // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. |
| if (state == LEFT) return; |
| assert(state == OFFER); |
| state = UPDATER; |
| QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url); |
| if (updateThread.id()) |
| updateThread.join(); // Join the previous updateThread to avoid leaks. |
| updateThread = Thread( |
| new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, |
| getConnections(l), decoder, |
| boost::bind(&Cluster::updateOutDone, this), |
| boost::bind(&Cluster::updateOutError, this, _1), |
| connectionSettings(settings))); |
| } |
| |
| // Called in update thread. |
| void Cluster::updateInDone(const ClusterMap& m) { |
| Lock l(lock); |
| updatedMap = m; |
| checkUpdateIn(l); |
| } |
| |
| void Cluster::updateInRetracted() { |
| Lock l(lock); |
| updateRetracted = true; |
| map.clearStatus(); |
| checkUpdateIn(l); |
| } |
| |
| void Cluster::checkUpdateIn(Lock& l) { |
| if (state != UPDATEE) return; // Wait till we reach the stall point. |
| if (updatedMap) { // We're up to date |
| map = *updatedMap; |
| failoverExchange->setUrls(getUrls(l)); |
| mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); |
| state = CATCHUP; |
| broker.setClusterUpdatee(false); |
| if (mAgent) mAgent->suppress(false); // Enable management output. |
| discarding = false; // ok to set, we're stalled for update. |
| QPID_LOG(notice, *this << " update complete, starting catch-up."); |
| QPID_LOG(debug, debugSnapshot()); |
| if (mAgent) mAgent->clusterUpdate(); |
| deliverEventQueue.start(); |
| } |
| else if (updateRetracted) { // Update was retracted, request another update |
| updateRetracted = false; |
| state = JOINER; |
| QPID_LOG(notice, *this << " update retracted, sending new update request."); |
| mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); |
| deliverEventQueue.start(); |
| } |
| } |
| |
| void Cluster::updateOutDone() { |
| Monitor::ScopedLock l(lock); |
| updateOutDone(l); |
| } |
| |
| void Cluster::updateOutDone(Lock& l) { |
| QPID_LOG(notice, *this << " update sent"); |
| assert(state == UPDATER); |
| state = READY; |
| deliverEventQueue.start(); // Start processing events again. |
| makeOffer(map.firstJoiner(), l); // Try another offer |
| } |
| |
| void Cluster::updateOutError(const std::exception& e) { |
| Monitor::ScopedLock l(lock); |
| QPID_LOG(error, *this << " error sending update: " << e.what()); |
| updateOutDone(l); |
| } |
| |
| void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { |
| QPID_LOG(notice, *this << " cluster shut down by administrator."); |
| if (store.hasStore()) store.clean(Uuid(id)); |
| leave(l); |
| } |
| |
| ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } |
| |
| Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { |
| Lock l(lock); |
| QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); |
| switch (methodId) { |
| case _qmf::Cluster::METHOD_STOPCLUSTERNODE : |
| { |
| _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; |
| stringstream stream; |
| stream << self; |
| if (iargs.i_brokerId == stream.str()) |
| stopClusterNode(l); |
| } |
| break; |
| case _qmf::Cluster::METHOD_STOPFULLCLUSTER : |
| stopFullCluster(l); |
| break; |
| default: |
| return Manageable::STATUS_UNKNOWN_METHOD; |
| } |
| return Manageable::STATUS_OK; |
| } |
| |
| void Cluster::stopClusterNode(Lock& l) { |
| QPID_LOG(notice, *this << " cluster member stopped by administrator."); |
| leave(l); |
| } |
| |
| void Cluster::stopFullCluster(Lock& ) { |
| QPID_LOG(notice, *this << " shutting down cluster " << name); |
| mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); |
| } |
| |
| void Cluster::memberUpdate(Lock& l) { |
| QPID_LOG(info, *this << " member update: " << map); |
| std::vector<Url> urls = getUrls(l); |
| std::vector<string> ids = getIds(l); |
| size_t size = urls.size(); |
| failoverExchange->updateUrls(urls); |
| |
| if (store.hasStore()) { |
| // Mark store clean if I am the only broker, dirty otherwise. |
| if (size == 1 ) { |
| if (!store.isClean()) store.clean(Uuid(true)); |
| } else { |
| if (!store.isDirty()) store.dirty(clusterId); |
| } |
| } |
| |
| if (size == 1 && lastSize > 1 && state >= CATCHUP) { |
| QPID_LOG(notice, *this << " last broker standing, update queue policies"); |
| lastBroker = true; |
| broker.getQueues().updateQueueClusterState(true); |
| } |
| else if (size > 1 && lastBroker) { |
| QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); |
| lastBroker = false; |
| broker.getQueues().updateQueueClusterState(false); |
| } |
| lastSize = size; |
| |
| if (mgmtObject) { |
| mgmtObject->set_clusterSize(size); |
| string urlstr; |
| for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) { |
| if (iter != urls.begin()) urlstr += ";"; |
| urlstr += iter->str(); |
| } |
| string idstr; |
| for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) { |
| if (iter != ids.begin()) idstr += ";"; |
| idstr += (*iter); |
| } |
| mgmtObject->set_members(urlstr); |
| mgmtObject->set_memberIDs(idstr); |
| } |
| |
| // Close connections belonging to members that have left the cluster. |
| ConnectionMap::iterator i = connections.begin(); |
| while (i != connections.end()) { |
| ConnectionMap::iterator j = i++; |
| MemberId m = j->second->getId().getMember(); |
| if (m != self && !map.isMember(m)) { |
| j->second->getBrokerConnection().closed(); |
| erase(j->second->getId(), l); |
| } |
| } |
| } |
| |
| std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { |
| static const char* STATE[] = { |
| "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", |
| "READY", "OFFER", "UPDATER", "LEFT" |
| }; |
| assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); |
| o << "cluster(" << cluster.self << " " << STATE[cluster.state]; |
| if (cluster.error.isUnresolved()) o << "/error"; |
| return o << ")";; |
| } |
| |
| MemberId Cluster::getId() const { |
| return self; // Immutable, no need to lock. |
| } |
| |
| broker::Broker& Cluster::getBroker() const { |
| return broker; // Immutable, no need to lock. |
| } |
| |
| void Cluster::setClusterId(const Uuid& uuid, Lock&) { |
| clusterId = uuid; |
| if (mgmtObject) { |
| stringstream stream; |
| stream << self; |
| mgmtObject->set_clusterID(clusterId.str()); |
| mgmtObject->set_memberID(stream.str()); |
| } |
| QPID_LOG(debug, *this << " cluster-uuid = " << clusterId); |
| } |
| |
| void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { |
| expiryPolicy->deliverExpire(id); |
| } |
| |
| void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { |
| // If we see an errorCheck here (rather than in the ErrorCheck |
| // class) then we have processed succesfully past the point of the |
| // error. |
| if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened |
| error.respondNone(from, type, frameSeq); |
| } |
| |
| void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { |
| if (state >= CATCHUP) // Pre catchup our timer isn't set up. |
| timer->deliverWakeup(name); |
| } |
| |
| void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { |
| QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) |
| if (state >= CATCHUP) // Pre catchup our timer isn't set up. |
| timer->deliverDrop(name); |
| } |
| |
| bool Cluster::isElder() const { |
| Monitor::ScopedLock l(lock); |
| return elder; |
| } |
| |
| }} // namespace qpid::cluster |