blob: 104b34da8b98e5ce003a711e263d869a1d2d7dbe [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "HeadersExchange.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <algorithm>
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
namespace _qmf = qmf::org::apache::qpid::broker;
// TODO aconway 2006-09-20: More efficient matching algorithm.
// The current search algorithm really sucks.
// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
using namespace qpid::broker;
namespace {
const std::string all("all");
const std::string any("any");
const std::string x_match("x-match");
const std::string empty;
}
HeadersExchange::HeadersExchange(const string& _name, Manageable* _parent) :
Exchange(_name, _parent)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
const FieldTable& _args, Manageable* _parent) :
Exchange(_name, _durable, _args, _parent)
{
if (mgmtExchange != 0)
mgmtExchange->set_type (typeName);
}
std::string HeadersExchange::getMatch(const FieldTable* args)
{
if (!args) {
throw InternalErrorException(QPID_MSG("No arguments given."));
}
FieldTable::ValuePtr what = args->get(x_match);
if (!what) {
return empty;
}
if (!what->convertsTo<std::string>()) {
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
}
return what->get<std::string>();
}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable* args){
std::string what = getMatch(args);
if (what != all && what != any)
throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
Binding::shared_ptr binding (new Binding (bindingKey, queue, this, *args));
if (bindings.add_unless(binding, MatchArgs(queue, args))) {
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->inc_bindingCount();
}
routeIVE();
return true;
} else {
return false;
}
}
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, const FieldTable*){
if (bindings.remove_if(MatchKey(queue, bindingKey))) {
if (mgmtExchange != 0) {
mgmtExchange->dec_bindingCount();
((_qmf::Queue*) queue->GetManagementObject())->dec_bindingCount();
}
return true;
} else {
return false;
}
}
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
PreRoute pr(msg, this);
uint32_t count(0);
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
if ((*i)->mgmtBinding != 0)
(*i)->mgmtBinding->inc_msgMatched ();
}
}
if (mgmtExchange != 0)
{
mgmtExchange->inc_msgReceives ();
mgmtExchange->inc_byteReceives (msg.contentSize ());
if (count == 0)
{
mgmtExchange->inc_msgDrops ();
mgmtExchange->inc_byteDrops (msg.contentSize ());
}
else
{
mgmtExchange->inc_msgRoutes (count);
mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
}
}
}
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
return true;
}
}
}
return false;
}
HeadersExchange::~HeadersExchange() {}
const std::string HeadersExchange::typeName("headers");
namespace
{
bool match_values(const FieldValue& bind, const FieldValue& msg) {
return bind.empty() || bind == msg;
}
}
bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
typedef FieldTable::ValueMap Map;
std::string what = getMatch(&bind);
if (what == all) {
for (Map::const_iterator i = bind.begin();
i != bind.end();
++i)
{
if (i->first != x_match)
{
Map::const_iterator j = msg.find(i->first);
if (j == msg.end()) return false;
if (!match_values(*(i->second), *(j->second))) return false;
}
}
return true;
} else if (what == any) {
for (Map::const_iterator i = bind.begin();
i != bind.end();
++i)
{
if (i->first != x_match)
{
Map::const_iterator j = msg.find(i->first);
if (j != msg.end()) {
if (match_values(*(i->second), *(j->second))) return true;
}
}
}
return false;
} else {
return false;
}
}
bool HeadersExchange::equal(const FieldTable& a, const FieldTable& b) {
typedef FieldTable::ValueMap Map;
for (Map::const_iterator i = a.begin();
i != a.end();
++i)
{
Map::const_iterator j = b.find(i->first);
if (j == b.end()) return false;
if (!match_values(*(i->second), *(j->second))) return false;
}
return true;
}
HeadersExchange::MatchArgs::MatchArgs(Queue::shared_ptr q, const qpid::framing::FieldTable* a) : queue(q), args(a) {}
bool HeadersExchange::MatchArgs::operator()(Exchange::Binding::shared_ptr b)
{
return b->queue == queue && b->args == *args;
}
HeadersExchange::MatchKey::MatchKey(Queue::shared_ptr q, const std::string& k) : queue(q), key(k) {}
bool HeadersExchange::MatchKey::operator()(Exchange::Binding::shared_ptr b)
{
return b->queue == queue && b->key == key;
}