| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "qpid/broker/Broker.h" |
| #include "qpid/broker/DeliverableMessage.h" |
| #include "qpid/broker/Exchange.h" |
| #include "qpid/broker/ExchangeRegistry.h" |
| #include "qpid/broker/FedOps.h" |
| #include "qpid/broker/Queue.h" |
| #include "qpid/framing/MessageProperties.h" |
| #include "qpid/framing/reply_exceptions.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/management/ManagementAgent.h" |
| #include "qpid/sys/ExceptionHolder.h" |
| #include <stdexcept> |
| |
| namespace qpid { |
| namespace broker { |
| |
| using namespace qpid::framing; |
| using qpid::framing::Buffer; |
| using qpid::framing::FieldTable; |
| using qpid::sys::Mutex; |
| using qpid::management::ManagementAgent; |
| using qpid::management::ManagementObject; |
| using qpid::management::Manageable; |
| using qpid::management::Args; |
| namespace _qmf = qmf::org::apache::qpid::broker; |
| |
| namespace |
| { |
| const std::string qpidMsgSequence("qpid.msg_sequence"); |
| const std::string qpidSequenceCounter("qpid.sequence_counter"); |
| const std::string qpidIVE("qpid.ive"); |
| const std::string QPID_MANAGEMENT("qpid.management"); |
| } |
| |
| |
| Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { |
| if (parent){ |
| if (parent->sequence || parent->ive) parent->sequenceLock.lock(); |
| |
| if (parent->sequence){ |
| parent->sequenceNo++; |
| msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo); |
| } |
| if (parent->ive) { |
| parent->lastMsg = &( msg.getMessage()); |
| } |
| } |
| } |
| |
| Exchange::PreRoute::~PreRoute(){ |
| if (parent && (parent->sequence || parent->ive)){ |
| parent->sequenceLock.unlock(); |
| } |
| } |
| |
| namespace { |
| /** Store information about an exception to be thrown later. |
| * If multiple exceptions are stored, save the first of the "most severe" |
| * exceptions, SESSION is les sever than CONNECTION etc. |
| */ |
| class ExInfo { |
| public: |
| enum Type { NONE, SESSION, CONNECTION, OTHER }; |
| |
| ExInfo(string exchange) : type(NONE), exchange(exchange) {} |
| void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) { |
| QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue " |
| << queue->getName() << ": " << exception_.what()); |
| if (type < type_) { // Replace less severe exception |
| type = type_; |
| exception = exception_; |
| } |
| } |
| |
| void raise() { |
| exception.raise(); |
| } |
| |
| private: |
| Type type; |
| string exchange; |
| qpid::sys::ExceptionHolder exception; |
| }; |
| } |
| |
| void Exchange::doRoute(Deliverable& msg, ConstBindingList b) |
| { |
| int count = 0; |
| |
| if (b.get()) { |
| // Block the content release if the message is transient AND there is more than one binding |
| if (!msg.getMessage().isPersistent() && b->size() > 1) { |
| msg.getMessage().blockContentRelease(); |
| } |
| |
| |
| ExInfo error(getName()); // Save exception to throw at the end. |
| for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { |
| try { |
| msg.deliverTo((*i)->queue); |
| if ((*i)->mgmtBinding != 0) |
| (*i)->mgmtBinding->inc_msgMatched(); |
| } |
| catch (const SessionException& e) { |
| error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue); |
| } |
| catch (const ConnectionException& e) { |
| error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue); |
| } |
| catch (const std::exception& e) { |
| error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue); |
| } |
| } |
| error.raise(); |
| } |
| |
| if (mgmtExchange != 0) |
| { |
| qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics(); |
| uint64_t contentSize = msg.contentSize(); |
| |
| eStats->msgReceives += 1; |
| eStats->byteReceives += contentSize; |
| if (count == 0) |
| { |
| //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); |
| eStats->msgDrops += 1; |
| eStats->byteDrops += contentSize; |
| if (brokerMgmtObject) |
| brokerMgmtObject->inc_discardsNoRoute(); |
| } |
| else |
| { |
| eStats->msgRoutes += count; |
| eStats->byteRoutes += count * contentSize; |
| } |
| } |
| } |
| |
| void Exchange::routeIVE(){ |
| if (ive && lastMsg.get()){ |
| DeliverableMessage dmsg(lastMsg); |
| route(dmsg); |
| } |
| } |
| |
| |
| Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : |
| name(_name), durable(false), persistenceId(0), sequence(false), |
| sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) |
| { |
| if (parent != 0 && broker != 0) |
| { |
| ManagementAgent* agent = broker->getManagementAgent(); |
| if (agent != 0) |
| { |
| mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); |
| mgmtExchange->set_durable(durable); |
| mgmtExchange->set_autoDelete(false); |
| agent->addObject(mgmtExchange, 0, durable); |
| if (broker) |
| brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); |
| } |
| } |
| } |
| |
| Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, |
| Manageable* parent, Broker* b) |
| : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), |
| args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) |
| { |
| if (parent != 0 && broker != 0) |
| { |
| ManagementAgent* agent = broker->getManagementAgent(); |
| if (agent != 0) |
| { |
| mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); |
| mgmtExchange->set_durable(durable); |
| mgmtExchange->set_autoDelete(false); |
| mgmtExchange->set_arguments(ManagementAgent::toMap(args)); |
| agent->addObject(mgmtExchange, 0, durable); |
| if (broker) |
| brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); |
| } |
| } |
| |
| sequence = _args.get(qpidMsgSequence); |
| if (sequence) { |
| QPID_LOG(debug, "Configured exchange " << _name << " with Msg sequencing"); |
| args.setInt64(std::string(qpidSequenceCounter), sequenceNo); |
| } |
| |
| ive = _args.get(qpidIVE); |
| if (ive) { |
| if (broker && broker->isInCluster()) |
| throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster"); |
| QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); |
| } |
| } |
| |
| Exchange::~Exchange () |
| { |
| if (mgmtExchange != 0) |
| mgmtExchange->resourceDestroy (); |
| } |
| |
| void Exchange::setAlternate(Exchange::shared_ptr _alternate) |
| { |
| alternate = _alternate; |
| if (mgmtExchange != 0) { |
| if (alternate.get() != 0) |
| mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); |
| else |
| mgmtExchange->clr_altExchange(); |
| } |
| } |
| |
| void Exchange::setPersistenceId(uint64_t id) const |
| { |
| persistenceId = id; |
| } |
| |
| Exchange::shared_ptr Exchange::decode(ExchangeRegistry& exchanges, Buffer& buffer) |
| { |
| string name; |
| string type; |
| string altName; |
| FieldTable args; |
| |
| buffer.getShortString(name); |
| bool durable(buffer.getOctet()); |
| buffer.getShortString(type); |
| buffer.get(args); |
| // For backwards compatibility on restoring exchanges from before the alt-exchange update, perform check |
| if (buffer.available()) |
| buffer.getShortString(altName); |
| |
| try { |
| Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first; |
| exch->sequenceNo = args.getAsInt64(qpidSequenceCounter); |
| exch->alternateName.assign(altName); |
| return exch; |
| } catch (const UnknownExchangeTypeException&) { |
| QPID_LOG(warning, "Could not create exchange " << name << "; type " << type << " is not recognised"); |
| return Exchange::shared_ptr(); |
| } |
| } |
| |
| void Exchange::encode(Buffer& buffer) const |
| { |
| buffer.putShortString(name); |
| buffer.putOctet(durable); |
| buffer.putShortString(getType()); |
| if (args.isSet(qpidSequenceCounter)) |
| args.setInt64(std::string(qpidSequenceCounter),sequenceNo); |
| buffer.put(args); |
| buffer.putShortString(alternate.get() ? alternate->getName() : string("")); |
| } |
| |
| uint32_t Exchange::encodedSize() const |
| { |
| return name.size() + 1/*short string size*/ |
| + 1 /*durable*/ |
| + getType().size() + 1/*short string size*/ |
| + (alternate.get() ? alternate->getName().size() : 0) + 1/*short string size*/ |
| + args.encodedSize(); |
| } |
| |
| void Exchange::recoveryComplete(ExchangeRegistry& exchanges) |
| { |
| if (!alternateName.empty()) { |
| Exchange::shared_ptr ae = exchanges.find(alternateName); |
| if (ae) setAlternate(ae); |
| else QPID_LOG(warning, "Could not set alternate exchange \"" |
| << alternateName << "\": does not exist."); |
| } |
| } |
| |
| ManagementObject* Exchange::GetManagementObject (void) const |
| { |
| return (ManagementObject*) mgmtExchange; |
| } |
| |
| void Exchange::registerDynamicBridge(DynamicBridge* db) |
| { |
| if (!supportsDynamicBinding()) |
| throw Exception("Exchange type does not support dynamic binding"); |
| |
| { |
| Mutex::ScopedLock l(bridgeLock); |
| for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); |
| iter != bridgeVector.end(); iter++) |
| (*iter)->sendReorigin(); |
| |
| bridgeVector.push_back(db); |
| } |
| |
| FieldTable args; |
| args.setString(qpidFedOp, fedOpReorigin); |
| bind(Queue::shared_ptr(), string(), &args); |
| } |
| |
| void Exchange::removeDynamicBridge(DynamicBridge* db) |
| { |
| Mutex::ScopedLock l(bridgeLock); |
| for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); |
| iter != bridgeVector.end(); iter++) |
| if (*iter == db) { |
| bridgeVector.erase(iter); |
| break; |
| } |
| } |
| |
| void Exchange::handleHelloRequest() |
| { |
| } |
| |
| void Exchange::propagateFedOp(const string& routingKey, const string& tags, const string& op, const string& origin, qpid::framing::FieldTable* extra_args) |
| { |
| Mutex::ScopedLock l(bridgeLock); |
| string myOp(op.empty() ? fedOpBind : op); |
| |
| for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin(); |
| iter != bridgeVector.end(); iter++) |
| (*iter)->propagateBinding(routingKey, tags, op, origin, extra_args); |
| } |
| |
| Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, |
| FieldTable _args, const string& _origin) |
| : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) |
| { |
| } |
| |
| Exchange::Binding::~Binding () |
| { |
| if (mgmtBinding != 0) { |
| ManagementObject* mo = queue->GetManagementObject(); |
| if (mo != 0) |
| static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); |
| mgmtBinding->resourceDestroy (); |
| } |
| } |
| |
| void Exchange::Binding::startManagement() |
| { |
| if (parent != 0) |
| { |
| Broker* broker = parent->getBroker(); |
| if (broker != 0) { |
| ManagementAgent* agent = broker->getManagementAgent(); |
| if (agent != 0) { |
| ManagementObject* mo = queue->GetManagementObject(); |
| if (mo != 0) { |
| management::ObjectId queueId = mo->getObjectId(); |
| |
| mgmtBinding = new _qmf::Binding |
| (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); |
| if (!origin.empty()) |
| mgmtBinding->set_origin(origin); |
| agent->addObject(mgmtBinding); |
| static_cast<_qmf::Queue*>(mo)->inc_bindingCount(); |
| } |
| } |
| } |
| } |
| } |
| |
| ManagementObject* Exchange::Binding::GetManagementObject () const |
| { |
| return (ManagementObject*) mgmtBinding; |
| } |
| |
| Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} |
| |
| bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) |
| { |
| return b->queue == queue; |
| } |
| |
| void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { |
| msg->setExchange(getName()); |
| } |
| |
| bool Exchange::routeWithAlternate(Deliverable& msg) |
| { |
| route(msg); |
| if (!msg.delivered && alternate) { |
| alternate->route(msg); |
| } |
| return msg.delivered; |
| } |
| |
| }} |