| #ifndef QPID_CLUSTER_CLUSTER_H |
| #define QPID_CLUSTER_CLUSTER_H |
| |
| /* |
| * |
| * Copyright (c) 2006 The Apache Software Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| #include "ClusterMap.h" |
| #include "ClusterSettings.h" |
| #include "Cpg.h" |
| #include "Decoder.h" |
| #include "ErrorCheck.h" |
| #include "Event.h" |
| #include "EventFrame.h" |
| #include "ExpiryPolicy.h" |
| #include "FailoverExchange.h" |
| #include "InitialStatusMap.h" |
| #include "LockedConnectionMap.h" |
| #include "Multicaster.h" |
| #include "NoOpConnectionOutputHandler.h" |
| #include "PollableQueue.h" |
| #include "PollerDispatch.h" |
| #include "Quorum.h" |
| #include "StoreStatus.h" |
| #include "UpdateReceiver.h" |
| |
| #include "qmf/org/apache/qpid/cluster/Cluster.h" |
| #include "qpid/Url.h" |
| #include "qpid/broker/Broker.h" |
| #include "qpid/management/Manageable.h" |
| #include "qpid/sys/Monitor.h" |
| |
| #include <boost/bind.hpp> |
| #include <boost/intrusive_ptr.hpp> |
| #include <boost/optional.hpp> |
| |
| #include <algorithm> |
| #include <map> |
| #include <vector> |
| |
| namespace qpid { |
| |
| namespace broker { |
| class Message; |
| class AclModule; |
| } |
| |
| namespace framing { |
| class AMQFrame; |
| class AMQBody; |
| struct Uuid; |
| } |
| |
| namespace sys { |
| class Timer; |
| class AbsTime; |
| class Duration; |
| } |
| |
| namespace cluster { |
| |
| class Connection; |
| struct EventFrame; |
| class ClusterTimer; |
| class UpdateDataExchange; |
| |
| /** |
| * Connection to the cluster |
| */ |
| class Cluster : private Cpg::Handler, public management::Manageable { |
| public: |
| typedef boost::intrusive_ptr<Connection> ConnectionPtr; |
| typedef std::vector<ConnectionPtr> ConnectionVector; |
| |
| // Public functions are thread safe unless otherwise mentioned in a comment. |
| |
| // Construct the cluster in plugin earlyInitialize. |
| Cluster(const ClusterSettings&, broker::Broker&); |
| virtual ~Cluster(); |
| |
| // Called by plugin initialize: cluster start-up requires transport plugins . |
| // Thread safety: only called by plugin initialize. |
| void initialize(); |
| |
| // Connection map. |
| void addLocalConnection(const ConnectionPtr&); |
| void addShadowConnection(const ConnectionPtr&); |
| void erase(const ConnectionId&); |
| |
| // URLs of current cluster members. |
| std::vector<std::string> getIds() const; |
| std::vector<Url> getUrls() const; |
| boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } |
| |
| // Leave the cluster - called when fatal errors occur. |
| void leave(); |
| |
| // Update completed - called in update thread |
| void updateInClosed(); |
| void updateInDone(const ClusterMap&); |
| void updateInRetracted(); |
| // True if we are expecting to receive catch-up connections. |
| bool isExpectingUpdate(); |
| |
| MemberId getId() const; |
| broker::Broker& getBroker() const; |
| Multicaster& getMulticast() { return mcast; } |
| |
| const ClusterSettings& getSettings() const { return settings; } |
| |
| void deliverFrame(const EventFrame&); |
| |
| // Called in deliverFrame thread to indicate an error from the broker. |
| void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg); |
| |
| // Called only during update by Connection::shadowReady |
| Decoder& getDecoder() { return decoder; } |
| |
| ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } |
| |
| UpdateReceiver& getUpdateReceiver() { return updateReceiver; } |
| |
| bool isElder() const; |
| |
| // Generates a log message for debugging purposes. |
| std::string debugSnapshot(); |
| |
| // Defer messages delivered in an unsafe context by multicasting. |
| bool deferDeliveryImpl(const std::string& queue, |
| const boost::intrusive_ptr<broker::Message>& msg); |
| |
| sys::AbsTime getClusterTime(); |
| void sendClockUpdate(); |
| void clock(const uint64_t time); |
| |
| static bool loggable(const framing::AMQFrame&); // True if the frame should be logged. |
| |
| private: |
| typedef sys::Monitor::ScopedLock Lock; |
| |
| typedef PollableQueue<Event> PollableEventQueue; |
| typedef PollableQueue<EventFrame> PollableFrameQueue; |
| typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; |
| |
| /** Version number of the cluster protocol, to avoid mixed versions. */ |
| static const uint32_t CLUSTER_VERSION; |
| |
| // NB: A dummy Lock& parameter marks functions that must only be |
| // called with Cluster::lock locked. |
| |
| void leave(Lock&); |
| std::vector<std::string> getIds(Lock&) const; |
| std::vector<Url> getUrls(Lock&) const; |
| |
| // == Called in main thread from Broker destructor. |
| void brokerShutdown(); |
| |
| // == Called in deliverEventQueue thread |
| void deliveredEvent(const Event&); |
| |
| // == Called in deliverFrameQueue thread |
| void deliveredFrame(const EventFrame&); |
| void processFrame(const EventFrame&, Lock&); |
| |
| // Cluster controls implement XML methods from cluster.xml. |
| void updateRequest(const MemberId&, const std::string&, Lock&); |
| void updateOffer(const MemberId& updater, uint64_t updatee, Lock&); |
| void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); |
| void initialStatus(const MemberId&, |
| uint32_t version, |
| bool active, |
| const framing::Uuid& clusterId, |
| framing::cluster::StoreState, |
| const framing::Uuid& shutdownId, |
| const std::string& firstConfig, |
| Lock&); |
| void ready(const MemberId&, const std::string&, Lock&); |
| void configChange(const MemberId&, |
| const std::string& members, |
| const std::string& left, |
| const std::string& joined, |
| Lock& l); |
| void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); |
| void timerWakeup(const MemberId&, const std::string& name, Lock&); |
| void timerDrop(const MemberId&, const std::string& name, Lock&); |
| void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); |
| void deliverToQueue(const std::string& queue, const std::string& message, Lock&); |
| void clock(const uint64_t time, Lock&); |
| |
| // Helper functions |
| ConnectionPtr getConnection(const EventFrame&, Lock&); |
| ConnectionVector getConnections(Lock&); |
| void updateStart(const MemberId& updatee, const Url& url, Lock&); |
| void makeOffer(const MemberId&, Lock&); |
| void setReady(Lock&); |
| void memberUpdate(Lock&); |
| void setClusterId(const framing::Uuid&, Lock&); |
| void erase(const ConnectionId&, Lock&); |
| void requestUpdate(Lock& ); |
| void initMapCompleted(Lock&); |
| void becomeElder(Lock&); |
| void setMgmtStatus(Lock&); |
| void updateMgmtMembership(Lock&); |
| |
| // == Called in CPG dispatch thread |
| void deliver( // CPG deliver callback. |
| cpg_handle_t /*handle*/, |
| const struct cpg_name *group, |
| uint32_t /*nodeid*/, |
| uint32_t /*pid*/, |
| void* /*msg*/, |
| int /*msg_len*/); |
| |
| void deliverEvent(const Event&); |
| |
| void configChange( // CPG config change callback. |
| cpg_handle_t /*handle*/, |
| const struct cpg_name */*group*/, |
| const struct cpg_address */*members*/, int /*nMembers*/, |
| const struct cpg_address */*left*/, int /*nLeft*/, |
| const struct cpg_address */*joined*/, int /*nJoined*/ |
| ); |
| |
| // == Called in management threads. |
| virtual qpid::management::ManagementObject* GetManagementObject() const; |
| virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); |
| |
| void stopClusterNode(Lock&); |
| void stopFullCluster(Lock&); |
| |
| // == Called in connection IO threads . |
| void checkUpdateIn(Lock&); |
| |
| // == Called in UpdateClient thread. |
| void updateOutDone(); |
| void updateOutError(const std::exception&); |
| void updateOutDone(Lock&); |
| |
| // Immutable members set on construction, never changed. |
| const ClusterSettings settings; |
| broker::Broker& broker; |
| qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle |
| boost::shared_ptr<sys::Poller> poller; |
| Cpg cpg; |
| const std::string name; |
| Url myUrl; |
| const MemberId self; |
| framing::Uuid clusterId; |
| NoOpConnectionOutputHandler shadowOut; |
| qpid::management::ManagementAgent* mAgent; |
| boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; |
| |
| // Thread safe members |
| Multicaster mcast; |
| PollerDispatch dispatcher; |
| PollableEventQueue deliverEventQueue; |
| PollableFrameQueue deliverFrameQueue; |
| boost::shared_ptr<FailoverExchange> failoverExchange; |
| boost::shared_ptr<UpdateDataExchange> updateDataExchange; |
| Quorum quorum; |
| LockedConnectionMap localConnections; |
| |
| // Used only in deliverEventQueue thread or when stalled for update. |
| Decoder decoder; |
| bool discarding; |
| |
| |
| // Remaining members are protected by lock. |
| mutable sys::Monitor lock; |
| |
| |
| // Local cluster state, cluster map |
| enum { |
| PRE_INIT,///< Have not yet received complete initial status map. |
| INIT, ///< Waiting to reach cluster-size. |
| JOINER, ///< Sent update request, waiting for update offer. |
| UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. |
| CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. |
| READY, ///< Fully operational |
| OFFER, ///< Sent an offer, waiting for accept/reject. |
| UPDATER, ///< Offer accepted, sending a state update. |
| LEFT ///< Final state, left the cluster. |
| } state; |
| |
| ConnectionMap connections; |
| InitialStatusMap initMap; |
| StoreStatus store; |
| ClusterMap map; |
| MemberSet elders; |
| bool elder; |
| size_t lastAliveCount; |
| bool lastBroker; |
| sys::Thread updateThread; |
| boost::optional<ClusterMap> updatedMap; |
| bool updateRetracted, updateClosed; |
| ErrorCheck error; |
| UpdateReceiver updateReceiver; |
| ClusterTimer* timer; |
| sys::Timer clockTimer; |
| sys::AbsTime clusterTime; |
| sys::Duration clusterTimeOffset; |
| broker::AclModule* acl; |
| |
| friend std::ostream& operator<<(std::ostream&, const Cluster&); |
| friend struct ClusterDispatcher; |
| }; |
| |
| }} // namespace qpid::cluster |
| |
| |
| |
| #endif /*!QPID_CLUSTER_CLUSTER_H*/ |