| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| #include "qpid/broker/TopicExchange.h" |
| #include "qpid/broker/FedOps.h" |
| #include "qpid/log/Statement.h" |
| #include <algorithm> |
| |
| |
| namespace qpid { |
| namespace broker { |
| |
| using namespace qpid::framing; |
| using namespace qpid::sys; |
| using namespace std; |
| namespace _qmf = qmf::org::apache::qpid::broker; |
| |
| // iterator for federation ReOrigin bind operation |
| class TopicExchange::ReOriginIter : public BindingNode::TreeIterator { |
| public: |
| ReOriginIter() {}; |
| ~ReOriginIter() {}; |
| bool visit(BindingNode& node) { |
| if (node.bindings.fedBinding.hasLocal()) { |
| keys2prop.push_back(node.routePattern); |
| } |
| return true; |
| } |
| std::vector<std::string> keys2prop; |
| }; |
| |
| |
| // match iterator used by route(): builds BindingList of all unique queues |
| // that match the routing key. |
| class TopicExchange::BindingsFinderIter : public BindingNode::TreeIterator { |
| public: |
| BindingsFinderIter(BindingList &bl) : b(bl) {}; |
| ~BindingsFinderIter() {}; |
| |
| BindingList& b; |
| std::set<std::string> qSet; |
| |
| bool visit(BindingNode& node) { |
| |
| Binding::vector& qv(node.bindings.bindingVector); |
| for (Binding::vector::iterator j = qv.begin(); j != qv.end(); j++) { |
| // do not duplicate queues on the binding list |
| if (qSet.insert(j->get()->queue->getName()).second) { |
| b->push_back(*j); |
| } |
| } |
| return true; |
| } |
| }; |
| |
| |
| |
| // Iterator to visit all bindings until a given queue is found |
| class TopicExchange::QueueFinderIter : public BindingNode::TreeIterator { |
| public: |
| QueueFinderIter(Queue::shared_ptr queue) : queue(queue), found(false) {}; |
| ~QueueFinderIter() {}; |
| bool visit(BindingNode& node) { |
| |
| Binding::vector& qv(node.bindings.bindingVector); |
| Binding::vector::iterator q; |
| for (q = qv.begin(); q != qv.end(); q++) { |
| if ((*q)->queue == queue) { |
| found = true; |
| return false; // search done |
| } |
| } |
| return true; // continue search |
| } |
| |
| Queue::shared_ptr queue; |
| bool found; |
| }; |
| |
| |
| class TopicExchange::Normalizer : public TokenIterator { |
| public: |
| Normalizer(string& p) |
| : TokenIterator(&p[0], &p[0]+p.size()), pattern(p) |
| { normalize(); } |
| |
| private: |
| // Apply 2 transformations: #.* -> *.# and #.# -> # |
| void normalize() { |
| while (!finished()) { |
| if (match1('#')) { |
| const char* hash1=token.first; |
| next(); |
| if (!finished()) { |
| if (match1('#')) { // Erase #.# -> # |
| pattern.erase(hash1-pattern.data(), 2); |
| token.first -= 2; |
| token.second -= 2; |
| end -= 2; |
| } |
| else if (match1('*')) { // Swap #.* -> *.# |
| swap(*const_cast<char*>(hash1), |
| *const_cast<char*>(token.first)); |
| } |
| } |
| } |
| else |
| next(); |
| } |
| } |
| |
| string& pattern; |
| }; |
| |
| |
| |
| // Convert sequences of * and # to a sequence of * followed by a single # |
| string TopicExchange::normalize(const string& pattern) { |
| string normal(pattern); |
| Normalizer n(normal); |
| return normal; |
| } |
| |
| |
| TopicExchange::TopicExchange(const string& _name, Manageable* _parent, Broker* b) |
| : Exchange(_name, _parent, b), |
| nBindings(0) |
| { |
| if (mgmtExchange != 0) |
| mgmtExchange->set_type (typeName); |
| } |
| |
| TopicExchange::TopicExchange(const std::string& _name, bool _durable, |
| const FieldTable& _args, Manageable* _parent, Broker* b) : |
| Exchange(_name, _durable, _args, _parent, b), |
| nBindings(0) |
| { |
| if (mgmtExchange != 0) |
| mgmtExchange->set_type (typeName); |
| } |
| |
| bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* args, AsyncStore* const /*store*/) |
| { |
| ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. |
| string fedOp(args ? args->getAsString(qpidFedOp) : fedOpBind); |
| string fedTags(args ? args->getAsString(qpidFedTags) : ""); |
| string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); |
| bool propagate = false; |
| string routingPattern = normalize(routingKey); |
| |
| if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { |
| RWlock::ScopedWlock l(lock); |
| BindingKey *bk = bindingTree.add(routingPattern); |
| if (bk) { |
| Binding::vector& qv(bk->bindingVector); |
| Binding::vector::iterator q; |
| for (q = qv.begin(); q != qv.end(); q++) { |
| if ((*q)->queue == queue) { |
| // already bound, but may be from a different fedOrigin |
| bk->fedBinding.addOrigin(queue->getName(), fedOrigin); |
| return false; |
| } |
| } |
| |
| Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin)); |
| binding->startManagement(); |
| bk->bindingVector.push_back(binding); |
| nBindings++; |
| propagate = bk->fedBinding.addOrigin(queue->getName(), fedOrigin); |
| if (mgmtExchange != 0) { |
| mgmtExchange->inc_bindingCount(); |
| } |
| QPID_LOG(debug, "Binding key [" << routingPattern << "] to queue " << queue->getName() |
| << " on exchange " << getName() << " (origin=" << fedOrigin << ")"); |
| } |
| } else if (fedOp == fedOpUnbind) { |
| RWlock::ScopedWlock l(lock); |
| BindingKey* bk = getQueueBinding(queue, routingPattern); |
| if (bk) { |
| QPID_LOG(debug, "FedOpUnbind [" << routingPattern << "] from exchange " << getName() |
| << " on queue=" << queue->getName() << " origin=" << fedOrigin); |
| propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); |
| // if this was the last binding for the queue, delete the binding |
| if (bk->fedBinding.countFedBindings(queue->getName()) == 0) { |
| deleteBinding(queue, routingPattern, bk); |
| } |
| } |
| } else if (fedOp == fedOpReorigin) { |
| /** gather up all the keys that need rebinding in a local vector |
| * while holding the lock. Then propagate once the lock is |
| * released |
| */ |
| ReOriginIter reOriginIter; |
| { |
| RWlock::ScopedRlock l(lock); |
| bindingTree.iterateAll( reOriginIter ); |
| } /* lock dropped */ |
| |
| for (std::vector<std::string>::const_iterator key = reOriginIter.keys2prop.begin(); |
| key != reOriginIter.keys2prop.end(); key++) { |
| propagateFedOp( *key, string(), fedOpBind, string()); |
| } |
| } |
| |
| cc.clearCache(); // clear the cache before we IVE route. |
| routeIVE(); |
| if (propagate) |
| propagateFedOp(routingKey, fedTags, fedOp, fedOrigin); |
| return true; |
| } |
| |
| bool TopicExchange::unbind(Queue::shared_ptr queue, const string& constRoutingKey, const FieldTable* args, AsyncStore* const /*store*/) |
| { |
| string fedOrigin(args ? args->getAsString(qpidFedOrigin) : ""); |
| QPID_LOG(debug, "Unbinding key [" << constRoutingKey << "] from queue " << queue->getName() |
| << " on exchange " << getName() << " origin=" << fedOrigin << ")" ); |
| |
| ClearCache cc(&cacheLock,&bindingCache); // clear the cache on function exit. |
| RWlock::ScopedWlock l(lock); |
| string routingKey = normalize(constRoutingKey); |
| BindingKey* bk = getQueueBinding(queue, routingKey); |
| if (!bk) return false; |
| bool propagate = bk->fedBinding.delOrigin(queue->getName(), fedOrigin); |
| deleteBinding(queue, routingKey, bk); |
| if (propagate) |
| propagateFedOp(routingKey, string(), fedOpUnbind, string()); |
| return true; |
| } |
| |
| |
| bool TopicExchange::deleteBinding(Queue::shared_ptr queue, |
| const std::string& routingKey, |
| BindingKey *bk) |
| { |
| // Note well: write lock held by caller |
| Binding::vector& qv(bk->bindingVector); |
| Binding::vector::iterator q; |
| for (q = qv.begin(); q != qv.end(); q++) |
| if ((*q)->queue == queue) |
| break; |
| if(q == qv.end()) return false; |
| qv.erase(q); |
| assert(nBindings > 0); |
| nBindings--; |
| |
| if(qv.empty()) { |
| bindingTree.remove(routingKey); |
| } |
| if (mgmtExchange != 0) { |
| mgmtExchange->dec_bindingCount(); |
| } |
| QPID_LOG(debug, "Unbound key [" << routingKey << "] from queue " << queue->getName() |
| << " on exchange " << getName()); |
| return true; |
| } |
| |
| /** returns a pointer to the BindingKey if the given queue is bound to this |
| * exchange using the routing pattern. 0 if queue binding does not exist. |
| */ |
| TopicExchange::BindingKey *TopicExchange::getQueueBinding(Queue::shared_ptr queue, const string& pattern) |
| { |
| // Note well: lock held by caller.... |
| BindingKey *bk = bindingTree.get(pattern); // Exact match against binding pattern |
| if (!bk) return 0; |
| Binding::vector& qv(bk->bindingVector); |
| Binding::vector::iterator q; |
| for (q = qv.begin(); q != qv.end(); q++) |
| if ((*q)->queue == queue) |
| break; |
| return (q != qv.end()) ? bk : 0; |
| } |
| |
| void TopicExchange::route(Deliverable& msg) |
| { |
| const string& routingKey = msg.getMessage().getRoutingKey(); |
| // Note: PERFORMANCE CRITICAL!!! |
| BindingList b; |
| std::map<std::string, BindingList>::iterator it; |
| { // only lock the cache for read |
| RWlock::ScopedRlock cl(cacheLock); |
| it = bindingCache.find(routingKey); |
| if (it != bindingCache.end()) { |
| b = it->second; |
| } |
| } |
| PreRoute pr(msg, this); |
| if (!b.get()) // no cache hit |
| { |
| RWlock::ScopedRlock l(lock); |
| b = BindingList(new std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >); |
| BindingsFinderIter bindingsFinder(b); |
| bindingTree.iterateMatch(routingKey, bindingsFinder); |
| RWlock::ScopedWlock cwl(cacheLock); |
| bindingCache[routingKey] = b; // update cache |
| } |
| doRoute(msg, b); |
| } |
| |
| bool TopicExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) |
| { |
| RWlock::ScopedRlock l(lock); |
| if (routingKey && queue) { |
| string key(normalize(*routingKey)); |
| return getQueueBinding(queue, key) != 0; |
| } else if (!routingKey && !queue) { |
| return nBindings > 0; |
| } else if (routingKey) { |
| if (bindingTree.get(*routingKey)) { |
| return true; |
| } |
| } else { |
| QueueFinderIter queueFinder(queue); |
| bindingTree.iterateAll( queueFinder ); |
| return queueFinder.found; |
| } |
| return false; |
| } |
| |
| TopicExchange::~TopicExchange() {} |
| |
| const std::string TopicExchange::typeName("topic"); |
| |
| // DataSource interface - used to write persistence data to async store |
| // TODO: kpvdr: implement |
| uint64_t TopicExchange::getSize() { |
| return 0; |
| } |
| void TopicExchange::write(char* /*target*/) {} |
| |
| }} // namespace qpid::broker |