| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "qpid/amqp_0_10/Codecs.h" |
| #include "Connection.h" |
| #include "UpdateClient.h" |
| #include "Cluster.h" |
| #include "UpdateReceiver.h" |
| #include "qpid/assert.h" |
| #include "qpid/broker/DtxAck.h" |
| #include "qpid/broker/DtxBuffer.h" |
| #include "qpid/broker/SessionState.h" |
| #include "qpid/broker/SemanticState.h" |
| #include "qpid/broker/TxBuffer.h" |
| #include "qpid/broker/TxPublish.h" |
| #include "qpid/broker/TxAccept.h" |
| #include "qpid/broker/RecoveredEnqueue.h" |
| #include "qpid/broker/RecoveredDequeue.h" |
| #include "qpid/broker/Exchange.h" |
| #include "qpid/broker/Fairshare.h" |
| #include "qpid/broker/Link.h" |
| #include "qpid/broker/Bridge.h" |
| #include "qpid/broker/StatefulQueueObserver.h" |
| #include "qpid/broker/Queue.h" |
| #include "qpid/framing/enum.h" |
| #include "qpid/framing/AMQFrame.h" |
| #include "qpid/framing/AllInvoker.h" |
| #include "qpid/framing/DeliveryProperties.h" |
| #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" |
| #include "qpid/framing/ClusterConnectionAnnounceBody.h" |
| #include "qpid/framing/ConnectionCloseBody.h" |
| #include "qpid/framing/ConnectionCloseOkBody.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/sys/ClusterSafe.h" |
| #include "qpid/types/Variant.h" |
| #include "qpid/management/ManagementAgent.h" |
| #include <boost/current_function.hpp> |
| |
| |
| namespace qpid { |
| namespace cluster { |
| |
| using namespace framing; |
| using namespace framing::cluster; |
| using amqp_0_10::ListCodec; |
| using types::Variant; |
| |
| qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); |
| |
| Connection::NullFrameHandler Connection::nullFrameHandler; |
| |
| struct NullFrameHandler : public framing::FrameHandler { |
| void handle(framing::AMQFrame&) {} |
| }; |
| |
| |
| namespace { |
| sys::AtomicValue<uint64_t> idCounter; |
| const std::string shadowPrefix("[shadow]"); |
| } |
| |
| |
| // Shadow connection |
| Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, |
| const std::string& mgmtId, |
| const ConnectionId& id, const qpid::sys::SecuritySettings& external) |
| : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), |
| connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), |
| expectProtocolHeader(false), |
| mcastFrameHandler(cluster.getMulticast(), self), |
| updateIn(c.getUpdateReceiver()), |
| secureConnection(0) |
| {} |
| |
| // Local connection |
| Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, |
| const std::string& mgmtId, MemberId member, |
| bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external |
| ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out), |
| connectionCtor(&output, cluster.getBroker(), |
| mgmtId, |
| external, |
| isLink, |
| isCatchUp ? ++catchUpId : 0, |
| // The first catch-up connection is not considered a shadow |
| // as it needs to be authenticated. |
| isCatchUp && self.second > 1), |
| expectProtocolHeader(isLink), |
| mcastFrameHandler(cluster.getMulticast(), self), |
| updateIn(c.getUpdateReceiver()), |
| secureConnection(0) |
| { |
| if (isLocalClient()) { |
| giveReadCredit(cluster.getSettings().readMax); // Flow control |
| // Delay adding the connection to the management map until announce() |
| connectionCtor.delayManagement = true; |
| } |
| else { |
| // Catch-up shadow connections initialized using nextShadow id. |
| assert(catchUp); |
| if (!updateIn.nextShadowMgmtId.empty()) |
| connectionCtor.mgmtId = updateIn.nextShadowMgmtId; |
| updateIn.nextShadowMgmtId.clear(); |
| } |
| init(); |
| QPID_LOG(debug, cluster << " local connection " << *this); |
| } |
| |
| void Connection::setSecureConnection(broker::SecureConnection* sc) { |
| secureConnection = sc; |
| if (connection.get()) connection->setSecureConnection(sc); |
| } |
| |
| void Connection::init() { |
| connection = connectionCtor.construct(); |
| if (isLocalClient()) { |
| if (secureConnection) connection->setSecureConnection(secureConnection); |
| // Actively send cluster-order frames from local node |
| connection->setClusterOrderOutput(mcastFrameHandler); |
| } |
| else { // Shadow or catch-up connection |
| // Passive, discard cluster-order frames |
| connection->setClusterOrderOutput(nullFrameHandler); |
| } |
| if (!isCatchUp()) |
| connection->setErrorListener(this); |
| } |
| |
| // Called when we have consumed a read buffer to give credit to the |
| // connection layer to continue reading. |
| void Connection::giveReadCredit(int credit) { |
| if (cluster.getSettings().readMax && credit) |
| output.giveReadCredit(credit); |
| } |
| |
| void Connection::announce( |
| const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict, |
| const std::string& username, const std::string& initialFrames) |
| { |
| QPID_ASSERT(mgmtId == connectionCtor.mgmtId); |
| QPID_ASSERT(ssf == connectionCtor.external.ssf); |
| QPID_ASSERT(authid == connectionCtor.external.authid); |
| QPID_ASSERT(nodict == connectionCtor.external.nodict); |
| // Local connections are already initialized but with management delayed. |
| if (isLocalClient()) { |
| connection->addManagementObject(); |
| } |
| else if (isShadow()) { |
| init(); |
| // Play initial frames into the connection. |
| Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); |
| AMQFrame frame; |
| while (frame.decode(buf)) |
| connection->received(frame); |
| connection->setUserId(username); |
| } |
| // Do managment actions now that the connection is replicated. |
| connection->raiseConnectEvent(); |
| QPID_LOG(debug, cluster << " replicated connection " << *this); |
| } |
| |
| Connection::~Connection() { |
| if (connection.get()) connection->setErrorListener(0); |
| // Don't trigger cluster-safe asserts in broker:: ~Connection as |
| // it may be called in an IO thread context during broker |
| // shutdown. |
| sys::ClusterSafeScope css; |
| connection.reset(); |
| } |
| |
| bool Connection::doOutput() { |
| return output.doOutput(); |
| } |
| |
| // Received from a directly connected client. |
| void Connection::received(framing::AMQFrame& f) { |
| if (!connection.get()) { |
| QPID_LOG(warning, cluster << " ignoring frame on closed connection " |
| << *this << ": " << f); |
| return; |
| } |
| QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f); |
| if (isLocal()) { // Local catch-up connection. |
| currentChannel = f.getChannel(); |
| if (!framing::invoke(*this, *f.getBody()).wasHandled()) |
| connection->received(f); |
| } |
| else { // Shadow or updated catch-up connection. |
| if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { |
| if (isShadow()) |
| cluster.addShadowConnection(this); |
| AMQFrame ok((ConnectionCloseOkBody())); |
| connection->getOutput().send(ok); |
| output.closeOutput(); |
| catchUp = false; |
| } |
| else |
| QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f); |
| } |
| } |
| |
| bool Connection::checkUnsupported(const AMQBody&) { |
| // Throw an exception for unsupported commands. Currently all are supported. |
| return false; |
| } |
| |
| struct GiveReadCreditOnExit { |
| Connection& connection; |
| int credit; |
| GiveReadCreditOnExit(Connection& connection_, int credit_) : |
| connection(connection_), credit(credit_) {} |
| ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); } |
| }; |
| |
| void Connection::deliverDoOutput(uint32_t limit) { |
| output.deliverDoOutput(limit); |
| } |
| |
| // Called in delivery thread, in cluster order. |
| void Connection::deliveredFrame(const EventFrame& f) { |
| GiveReadCreditOnExit gc(*this, f.readCredit); |
| assert(!catchUp); |
| currentChannel = f.frame.getChannel(); |
| if (f.frame.getBody() // frame can be emtpy with just readCredit |
| && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. |
| && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. |
| { |
| if (f.type == DATA) // incoming data frames to broker::Connection |
| connection->received(const_cast<AMQFrame&>(f.frame)); |
| else { // frame control, send frame via SessionState |
| broker::SessionState* ss = connection->getChannel(currentChannel).getSession(); |
| if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); |
| } |
| } |
| } |
| |
| // A local connection is closed by the network layer. Called in the connection thread. |
| void Connection::closed() { |
| try { |
| if (isUpdated()) { |
| QPID_LOG(debug, cluster << " update connection closed " << *this); |
| close(); |
| cluster.updateInClosed(); |
| } |
| else if (catchUp && cluster.isExpectingUpdate()) { |
| QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); |
| cluster.leave(); |
| } |
| else if (isLocal()) { |
| // This was a local replicated connection. Multicast a deliver |
| // closed and process any outstanding frames from the cluster |
| // until self-delivery of deliver-close. |
| output.closeOutput(); |
| if (announced) |
| cluster.getMulticast().mcastControl( |
| ClusterConnectionDeliverCloseBody(), self); |
| } |
| } |
| catch (const std::exception& e) { |
| QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what()); |
| } |
| } |
| |
| // Self-delivery of close message, close the connection. |
| void Connection::deliverClose () { |
| close(); |
| cluster.erase(self); |
| } |
| |
| // Close the connection |
| void Connection::close() { |
| if (connection.get()) { |
| QPID_LOG(debug, cluster << " closed connection " << *this); |
| connection->closed(); |
| connection.reset(); |
| } |
| } |
| |
| // The connection has sent invalid data and should be aborted. |
| // All members will get the same abort since they all process the same data. |
| void Connection::abort() { |
| connection->abort(); |
| // Aborting the connection will result in a call to ::closed() |
| // and allow the connection to close in an orderly manner. |
| } |
| |
| // ConnectionCodec::decode receives read buffers from directly-connected clients. |
| size_t Connection::decode(const char* data, size_t size) { |
| GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default. |
| const char* ptr = data; |
| const char* end = data + size; |
| if (catchUp) { // Handle catch-up locally. |
| if (!cluster.isExpectingUpdate()) { |
| QPID_LOG(error, "Rejecting unexpected catch-up connection."); |
| abort(); // Cluster is not expecting catch-up connections. |
| } |
| bool wasOpen = connection->isOpen(); |
| Buffer buf(const_cast<char*>(ptr), size); |
| ptr += size; |
| while (localDecoder.decode(buf)) |
| received(localDecoder.getFrame()); |
| if (!wasOpen && connection->isOpen()) { |
| // Connections marked with setUserProxyAuth are allowed to proxy |
| // messages with user-ID that doesn't match the connection's |
| // authenticated ID. This is important for updates. |
| connection->setUserProxyAuth(isCatchUp()); |
| } |
| } |
| else { // Multicast local connections. |
| assert(isLocalClient()); |
| assert(connection.get()); |
| if (!checkProtocolHeader(ptr, size)) // Updates ptr |
| return 0; // Incomplete header |
| |
| if (!connection->isOpen()) |
| processInitialFrames(ptr, end-ptr); // Updates ptr |
| |
| if (connection->isOpen() && end - ptr > 0) { |
| // We're multi-casting, we will give read credit on delivery. |
| grc.credit = 0; |
| cluster.getMulticast().mcastBuffer(ptr, end - ptr, self); |
| ptr = end; |
| } |
| } |
| return ptr - data; |
| } |
| |
| // Decode the protocol header if needed. Updates data and size |
| // returns true if the header is complete or already read. |
| bool Connection::checkProtocolHeader(const char*& data, size_t size) { |
| if (expectProtocolHeader) { |
| // This is an outgoing link connection, we will receive a protocol |
| // header which needs to be decoded first |
| framing::ProtocolInitiation pi; |
| Buffer buf(const_cast<char*&>(data), size); |
| if (pi.decode(buf)) { |
| //TODO: check the version is correct |
| expectProtocolHeader = false; |
| data += pi.encodedSize(); |
| } else { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| void Connection::processInitialFrames(const char*& ptr, size_t size) { |
| // Process the initial negotiation locally and store it so |
| // it can be replayed on other brokers in announce() |
| Buffer buf(const_cast<char*>(ptr), size); |
| framing::AMQFrame frame; |
| while (!connection->isOpen() && frame.decode(buf)) |
| received(frame); |
| initialFrames.append(ptr, buf.getPosition()); |
| ptr += buf.getPosition(); |
| if (connection->isOpen()) { // initial negotiation complete |
| cluster.getMulticast().mcastControl( |
| ClusterConnectionAnnounceBody( |
| ProtocolVersion(), |
| connectionCtor.mgmtId, |
| connectionCtor.external.ssf, |
| connectionCtor.external.authid, |
| connectionCtor.external.nodict, |
| connection->getUserId(), |
| initialFrames), |
| getId()); |
| announced = true; |
| initialFrames.clear(); |
| } |
| } |
| |
| broker::SessionState& Connection::sessionState() { |
| return *connection->getChannel(currentChannel).getSession(); |
| } |
| |
| broker::SemanticState& Connection::semanticState() { |
| return sessionState().getSemanticState(); |
| } |
| |
| void Connection::shadowPrepare(const std::string& mgmtId) { |
| updateIn.nextShadowMgmtId = mgmtId; |
| } |
| |
| void Connection::shadowSetUser(const std::string& userId) { |
| connection->setUserId(userId); |
| } |
| |
| void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, |
| uint32_t usedMsgCredit, uint32_t usedByteCredit) |
| { |
| broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); |
| c->setPosition(position); |
| c->setBlocked(blocked); |
| if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); |
| if (notifyEnabled) c->enableNotify(); else c->disableNotify(); |
| updateIn.consumerNumbering.add(c); |
| } |
| |
| |
| void Connection::sessionState( |
| const SequenceNumber& replayStart, |
| const SequenceNumber& sendCommandPoint, |
| const SequenceSet& sentIncomplete, |
| const SequenceNumber& expected, |
| const SequenceNumber& received, |
| const SequenceSet& unknownCompleted, |
| const SequenceSet& receivedIncomplete, |
| bool dtxSelected) |
| { |
| sessionState().setState( |
| replayStart, |
| sendCommandPoint, |
| sentIncomplete, |
| expected, |
| received, |
| unknownCompleted, |
| receivedIncomplete); |
| if (dtxSelected) semanticState().selectDtx(); |
| QPID_LOG(debug, cluster << " received session state update for " |
| << sessionState().getId()); |
| // The output tasks will be added later in the update process. |
| connection->getOutputTasks().removeAll(); |
| } |
| |
| void Connection::outputTask(uint16_t channel, const std::string& name) { |
| broker::SessionState* session = connection->getChannel(channel).getSession(); |
| if (!session) |
| throw Exception(QPID_MSG(cluster << " channel not attached " << *this |
| << "[" << channel << "] ")); |
| OutputTask* task = session->getSemanticState().find(name).get(); |
| connection->getOutputTasks().addOutputTask(task); |
| } |
| |
| void Connection::shadowReady( |
| uint64_t memberId, uint64_t connectionId, const string& mgmtId, |
| const string& username, const string& fragment, uint32_t sendMax) |
| { |
| QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId()); |
| ConnectionId shadowId = ConnectionId(memberId, connectionId); |
| QPID_LOG(debug, cluster << " catch-up connection " << *this |
| << " becomes shadow " << shadowId); |
| self = shadowId; |
| connection->setUserId(username); |
| // OK to use decoder here because cluster is stalled for update. |
| cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); |
| connection->setErrorListener(this); |
| output.setSendMax(sendMax); |
| } |
| |
| void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) { |
| broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); |
| broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid); |
| broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index]; |
| if (bufRef.suspended) |
| bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer; |
| else |
| bufRef.semanticState->setDtxBuffer(buffer); |
| } |
| |
| // Marks the end of the update. |
| void Connection::membership(const FieldTable& joiners, const FieldTable& members, |
| const framing::SequenceNumber& frameSeq) |
| { |
| QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); |
| updateIn.consumerNumbering.clear(); |
| for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(), |
| boost::bind(&Connection::setDtxBuffer, this, _1)); |
| closeUpdated(); |
| cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); |
| } |
| |
| void Connection::retractOffer() { |
| QPID_LOG(info, cluster << " incoming update retracted on connection " << *this); |
| closeUpdated(); |
| cluster.updateInRetracted(); |
| } |
| |
| void Connection::closeUpdated() { |
| self.second = 0; // Mark this as completed update connection. |
| if (connection.get()) |
| connection->close(connection::CLOSE_CODE_NORMAL, "OK"); |
| } |
| |
| bool Connection::isLocal() const { |
| return self.first == cluster.getId() && self.second; |
| } |
| |
| bool Connection::isShadow() const { |
| return self.first != cluster.getId(); |
| } |
| |
| bool Connection::isUpdated() const { |
| return self.first == cluster.getId() && self.second == 0; |
| } |
| |
| |
| boost::shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { |
| boost::shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname); |
| if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname)); |
| return queue; |
| } |
| |
| broker::QueuedMessage Connection::getUpdateMessage() { |
| boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); |
| assert(!updateq->isDurable()); |
| broker::QueuedMessage m = updateq->get(); |
| if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); |
| return m; |
| } |
| |
| void Connection::deliveryRecord(const string& qname, |
| const SequenceNumber& position, |
| const string& tag, |
| const SequenceNumber& id, |
| bool acquired, |
| bool accepted, |
| bool cancelled, |
| bool completed, |
| bool ended, |
| bool windowing, |
| bool enqueued, |
| uint32_t credit) |
| { |
| broker::QueuedMessage m; |
| broker::Queue::shared_ptr queue = findQueue(qname); |
| if (!ended) { // Has a message |
| if (acquired) { // Message is on the update queue |
| m = getUpdateMessage(); |
| m.queue = queue.get(); |
| m.position = position; |
| if (enqueued) queue->updateEnqueued(m); //inform queue of the message |
| } else { // Message at original position in original queue |
| queue->find(position, m); |
| } |
| // NOTE: removed: |
| // if (!m.payload) |
| // throw Exception(QPID_MSG("deliveryRecord no update message")); |
| // |
| // It seems this could happen legitimately in the case one |
| // session browses message M, then another session acquires |
| // it. In that case the browsers delivery record is !acquired |
| // but the message is not on its original Queue. In that case |
| // we'll get a deliveryRecord with no payload for the browser. |
| // |
| } |
| |
| broker::DeliveryRecord dr(m, queue, tag, semanticState().find(tag), |
| acquired, accepted, windowing, credit); |
| dr.setId(id); |
| if (cancelled) dr.cancel(dr.getTag()); |
| if (completed) dr.complete(); |
| if (ended) dr.setEnded(); // Exsitance of message |
| |
| if (dtxBuffer) // Record for next dtx-ack |
| dtxAckRecords.push_back(dr); |
| else |
| semanticState().record(dr); // Record on session's unacked list. |
| } |
| |
| void Connection::queuePosition(const string& qname, const SequenceNumber& position) { |
| findQueue(qname)->setPosition(position); |
| } |
| |
| void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) |
| { |
| if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { |
| QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); |
| } |
| } |
| |
| |
| namespace { |
| // find a StatefulQueueObserver that matches a given identifier |
| class ObserverFinder { |
| const std::string id; |
| boost::shared_ptr<broker::QueueObserver> target; |
| ObserverFinder(const ObserverFinder&) {} |
| public: |
| ObserverFinder(const std::string& _id) : id(_id) {} |
| broker::StatefulQueueObserver *getObserver() |
| { |
| if (target) |
| return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); |
| return 0; |
| } |
| void operator() (boost::shared_ptr<broker::QueueObserver> o) |
| { |
| if (!target) { |
| broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); |
| if (p && p->getId() == id) { |
| target = o; |
| } |
| } |
| } |
| }; |
| } |
| |
| |
| void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) |
| { |
| boost::shared_ptr<broker::Queue> queue(findQueue(qname)); |
| ObserverFinder finder(observerId); // find this observer |
| queue->eachObserver<ObserverFinder &>(finder); |
| broker::StatefulQueueObserver *so = finder.getObserver(); |
| if (so) { |
| so->setState( state ); |
| QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); |
| return; |
| } |
| QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); |
| } |
| |
| std::ostream& operator<<(std::ostream& o, const Connection& c) { |
| const char* type="unknown"; |
| if (c.isLocal()) type = "local"; |
| else if (c.isShadow()) type = "shadow"; |
| else if (c.isUpdated()) type = "updated"; |
| const broker::Connection* bc = c.getBrokerConnection(); |
| if (bc) o << bc->getMgmtId(); |
| else o << "<disconnected>"; |
| return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")"; |
| } |
| |
| void Connection::txStart() { |
| txBuffer.reset(new broker::TxBuffer()); |
| } |
| |
| void Connection::txAccept(const framing::SequenceSet& acked) { |
| txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( |
| new broker::TxAccept(acked, semanticState().getUnacked()))); |
| } |
| |
| void Connection::txDequeue(const std::string& queue) { |
| txBuffer->enlist(boost::shared_ptr<broker::RecoveredDequeue>( |
| new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload))); |
| } |
| |
| void Connection::txEnqueue(const std::string& queue) { |
| txBuffer->enlist(boost::shared_ptr<broker::RecoveredEnqueue>( |
| new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); |
| } |
| |
| void Connection::txPublish(const framing::Array& queues, bool delivered) |
| { |
| boost::shared_ptr<broker::TxPublish> txPub( |
| new broker::TxPublish(getUpdateMessage().payload)); |
| for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) |
| txPub->deliverTo(findQueue((*i)->get<std::string>())); |
| txPub->delivered = delivered; |
| txBuffer->enlist(txPub); |
| } |
| |
| void Connection::txEnd() { |
| semanticState().setTxBuffer(txBuffer); |
| } |
| |
| void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { |
| semanticState().setAccumulatedAck(s); |
| } |
| |
| void Connection::dtxStart(const std::string& xid, |
| bool ended, |
| bool suspended, |
| bool failed, |
| bool expired) |
| { |
| dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired)); |
| txBuffer = dtxBuffer; |
| } |
| |
| void Connection::dtxEnd() { |
| broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); |
| std::string xid = dtxBuffer->getXid(); |
| if (mgr.exists(xid)) |
| mgr.join(xid, dtxBuffer); |
| else |
| mgr.start(xid, dtxBuffer); |
| dtxBuffer.reset(); |
| txBuffer.reset(); |
| } |
| |
| // Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords |
| void Connection::dtxAck() { |
| dtxBuffer->enlist( |
| boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords))); |
| dtxAckRecords.clear(); |
| } |
| |
| void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) { |
| // Save the association between DtxBuffers and the session so we |
| // can set the DtxBuffers at the end of the update when the |
| // DtxManager has been replicated. |
| updateIn.dtxBuffers.push_back( |
| UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState())); |
| } |
| |
| // Sent at end of work record. |
| void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout) |
| { |
| broker::DtxManager& mgr = cluster.getBroker().getDtxManager(); |
| if (timeout) mgr.setTimeout(xid, timeout); |
| if (prepared) mgr.prepare(xid); |
| } |
| |
| |
| void Connection::exchange(const std::string& encoded) { |
| Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); |
| broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); |
| if(ex.get() && ex->isDurable() && !ex->getName().find("amq.") == 0 && !ex->getName().find("qpid.") == 0) { |
| cluster.getBroker().getStore().create(*(ex.get()), ex->getArgs()); |
| } |
| QPID_LOG(debug, cluster << " updated exchange " << ex->getName()); |
| } |
| |
| void Connection::sessionError(uint16_t , const std::string& msg) { |
| // Ignore errors before isOpen(), we're not multicasting yet. |
| if (connection->isOpen()) |
| cluster.flagError(*this, ERROR_TYPE_SESSION, msg); |
| } |
| |
| void Connection::connectionError(const std::string& msg) { |
| // Ignore errors before isOpen(), we're not multicasting yet. |
| if (connection->isOpen()) { |
| cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); |
| } |
| else |
| cluster.eraseLocal(self); |
| } |
| |
| void Connection::addQueueListener(const std::string& q, uint32_t listener) { |
| if (listener >= updateIn.consumerNumbering.size()) |
| throw Exception(QPID_MSG("Invalid listener ID: " << listener)); |
| findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); |
| } |
| |
| // |
| // This is the handler for incoming managementsetup messages. |
| // |
| void Connection::managementSetupState( |
| uint64_t objectNum, uint16_t bootSequence, const framing::Uuid& id, |
| const std::string& vendor, const std::string& product, const std::string& instance) |
| { |
| QPID_LOG(debug, cluster << " updated management: object number=" |
| << objectNum << " boot sequence=" << bootSequence |
| << " broker-id=" << id |
| << " vendor=" << vendor |
| << " product=" << product |
| << " instance=" << instance); |
| management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); |
| if (!agent) |
| throw Exception(QPID_MSG("Management schema update but management not enabled.")); |
| agent->setNextObjectId(objectNum); |
| agent->setBootSequence(bootSequence); |
| agent->setUuid(id); |
| agent->setName(vendor, product, instance); |
| } |
| |
| void Connection::config(const std::string& encoded) { |
| Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); |
| string kind; |
| buf.getShortString (kind); |
| if (kind == "link") { |
| broker::Link::shared_ptr link = |
| broker::Link::decode(cluster.getBroker().getLinks(), buf); |
| QPID_LOG(debug, cluster << " updated link " |
| << link->getHost() << ":" << link->getPort()); |
| } |
| else if (kind == "bridge") { |
| broker::Bridge::shared_ptr bridge = |
| broker::Bridge::decode(cluster.getBroker().getLinks(), buf); |
| QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); |
| } |
| else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); |
| } |
| |
| void Connection::doCatchupIoCallbacks() { |
| // We need to process IO callbacks during the catch-up phase in |
| // order to service asynchronous completions for messages |
| // transferred during catch-up. |
| |
| if (catchUp) getBrokerConnection()->doIoCallbacks(); |
| } |
| |
| void Connection::clock(uint64_t time) { |
| QPID_LOG(debug, "Cluster connection received time update"); |
| cluster.clock(time); |
| } |
| |
| void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) { |
| boost::shared_ptr<broker::Queue> queue(findQueue(qname)); |
| queue->setDequeueSincePurge(dequeueSincePurge); |
| } |
| |
| }} // Namespace qpid::cluster |
| |