| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "PnData.h" |
| #include "qpid/messaging/amqp/AddressHelper.h" |
| #include "qpid/messaging/Address.h" |
| #include "qpid/messaging/AddressImpl.h" |
| #include "qpid/amqp/descriptors.h" |
| #include "qpid/types/encodings.h" |
| #include "qpid/log/Statement.h" |
| #include <vector> |
| #include <set> |
| #include <sstream> |
| #include <boost/assign.hpp> |
| #include <boost/format.hpp> |
| extern "C" { |
| #include <proton/engine.h> |
| } |
| |
| |
| namespace qpid { |
| namespace messaging { |
| namespace amqp { |
| |
| using qpid::types::Variant; |
| |
| namespace { |
| //policy types |
| const std::string CREATE("create"); |
| const std::string ASSERT("assert"); |
| const std::string DELETE("delete"); |
| |
| //policy values |
| const std::string ALWAYS("always"); |
| const std::string NEVER("never"); |
| const std::string RECEIVER("receiver"); |
| const std::string SENDER("sender"); |
| |
| const std::string NODE("node"); |
| const std::string LINK("link"); |
| const std::string CAPABILITIES("capabilities"); |
| const std::string PROPERTIES("properties"); |
| const std::string MODE("mode"); |
| const std::string BROWSE("browse"); |
| const std::string CONSUME("consume"); |
| const std::string TIMEOUT("timeout"); |
| |
| const std::string TYPE("type"); |
| const std::string TOPIC("topic"); |
| const std::string QUEUE("queue"); |
| const std::string DURABLE("durable"); |
| const std::string NAME("name"); |
| const std::string RELIABILITY("reliability"); |
| const std::string SELECTOR("selector"); |
| const std::string FILTER("filter"); |
| const std::string DESCRIPTOR("descriptor"); |
| const std::string VALUE("value"); |
| const std::string SUBJECT_FILTER("subject-filter"); |
| const std::string SOURCE("sender-source"); |
| const std::string TARGET("receiver-target"); |
| |
| //reliability options: |
| const std::string UNRELIABLE("unreliable"); |
| const std::string AT_MOST_ONCE("at-most-once"); |
| const std::string AT_LEAST_ONCE("at-least-once"); |
| const std::string EXACTLY_ONCE("exactly-once"); |
| |
| //distribution modes: |
| const std::string MOVE("move"); |
| const std::string COPY("copy"); |
| |
| const std::string SUPPORTED_DIST_MODES("supported-dist-modes"); |
| const std::string AUTO_DELETE("auto-delete"); |
| const std::string LIFETIME_POLICY("lifetime-policy"); |
| const std::string DELETE_ON_CLOSE("delete-on-close"); |
| const std::string DELETE_IF_UNUSED("delete-if-unused"); |
| const std::string DELETE_IF_EMPTY("delete-if-empty"); |
| const std::string DELETE_IF_UNUSED_AND_EMPTY("delete-if-unused-and-empty"); |
| const std::string CREATE_ON_DEMAND("create-on-demand"); |
| |
| const std::string X_DECLARE("x-declare"); |
| const std::string X_BINDINGS("x-bindings"); |
| const std::string X_SUBSCRIBE("x-subscribe"); |
| const std::string ARGUMENTS("arguments"); |
| const std::string EXCHANGE_TYPE("exchange-type"); |
| |
| const std::string NULL_ADDRESS("<null>"); |
| |
| const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER); |
| const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER); |
| |
| class Verifier |
| { |
| public: |
| Verifier(); |
| void verify(const Address& address) const; |
| private: |
| Variant::Map defined; |
| void verify(const Variant::Map& allowed, const Variant::Map& actual) const; |
| }; |
| const Verifier verifier; |
| |
| pn_bytes_t convert(const std::string& s) |
| { |
| pn_bytes_t result; |
| result.start = const_cast<char*>(s.data()); |
| result.size = s.size(); |
| return result; |
| } |
| |
| std::string convert(pn_bytes_t in) |
| { |
| return std::string(in.start, in.size); |
| } |
| |
| bool hasWildcards(const std::string& key) |
| { |
| return key.find('*') != std::string::npos || key.find('#') != std::string::npos; |
| } |
| |
| uint64_t getFilterDescriptor(const std::string& key) |
| { |
| return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE; |
| } |
| |
| bool test(const Variant::Map& options, const std::string& name) |
| { |
| Variant::Map::const_iterator j = options.find(name); |
| if (j == options.end()) { |
| return false; |
| } else { |
| return j->second; |
| } |
| } |
| |
| template <typename T> T get(const Variant::Map& options, const std::string& name, T defaultValue) |
| { |
| Variant::Map::const_iterator j = options.find(name); |
| if (j == options.end()) { |
| return defaultValue; |
| } else { |
| return j->second; |
| } |
| } |
| |
| bool getOption(const Variant::Map& options, const std::string& name, std::string& variable) |
| { |
| Variant::Map::const_iterator j = options.find(name); |
| if (j == options.end()) { |
| return false; |
| } else { |
| variable = j->second.asString(); |
| return true; |
| } |
| } |
| |
| bool getOption(const Variant::Map& options, const std::string& name, Variant::Map& variable) |
| { |
| Variant::Map::const_iterator j = options.find(name); |
| if (j == options.end()) { |
| return false; |
| } else { |
| variable = j->second.asMap(); |
| return true; |
| } |
| } |
| |
| bool getOption(const Variant::Map& options, const std::string& name, Variant::List& variable) |
| { |
| Variant::Map::const_iterator j = options.find(name); |
| if (j == options.end()) { |
| return false; |
| } else { |
| variable = j->second.asList(); |
| return true; |
| } |
| } |
| |
| bool getAddressOption(const Address& address, const std::string& name, std::string& variable) |
| { |
| return getOption(address.getOptions(), name, variable); |
| } |
| |
| bool getAddressOption(const Address& address, const std::string& name, Variant::Map& variable) |
| { |
| return getOption(address.getOptions(), name, variable); |
| } |
| |
| bool in(const std::string& value, const std::vector<std::string>& choices) |
| { |
| for (std::vector<std::string>::const_iterator i = choices.begin(); i != choices.end(); ++i) { |
| if (value == *i) return true; |
| } |
| return false; |
| } |
| void add(Variant::Map& target, const Variant::Map& source) |
| { |
| for (Variant::Map::const_iterator i = source.begin(); i != source.end(); ++i) { |
| target[i->first] = i->second; |
| } |
| } |
| void flatten(Variant::Map& base, const std::string& nested) |
| { |
| Variant::Map::iterator i = base.find(nested); |
| if (i != base.end()) { |
| add(base, i->second.asMap()); |
| base.erase(i); |
| } |
| } |
| bool replace(Variant::Map& map, const std::string& original, const std::string& desired) |
| { |
| Variant::Map::iterator i = map.find(original); |
| if (i != map.end()) { |
| map[desired] = i->second; |
| map.erase(original); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes |
| const uint32_t DEFAULT_TIMEOUT(0); |
| } |
| |
| AddressHelper::AddressHelper(const Address& address) : |
| isTemporary(AddressImpl::isTemporary(address)), |
| name(address.getName()), |
| type(address.getType()), |
| durableNode(false), |
| durableLink(false), |
| timeout(0), |
| browse(false) |
| { |
| verifier.verify(address); |
| getAddressOption(address, CREATE, createPolicy); |
| getAddressOption(address, DELETE, deletePolicy); |
| getAddressOption(address, ASSERT, assertPolicy); |
| |
| getAddressOption(address, NODE, node); |
| getAddressOption(address, LINK, link); |
| getOption(node, PROPERTIES, properties); |
| getOption(node, CAPABILITIES, capabilities); |
| getOption(link, RELIABILITY, reliability); |
| durableNode = test(node, DURABLE); |
| durableLink = test(link, DURABLE); |
| timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); |
| std::string mode; |
| if (getAddressOption(address, MODE, mode)) { |
| if (mode == BROWSE) { |
| browse = true; |
| } else if (mode != CONSUME) { |
| throw qpid::messaging::AddressError("Invalid value for mode; must be 'browse' or 'consume'."); |
| } |
| } |
| |
| if (!deletePolicy.empty()) { |
| throw qpid::messaging::AddressError("Delete policies not supported over AMQP 1.0."); |
| } |
| if (node.find(X_BINDINGS) != node.end()) { |
| throw qpid::messaging::AddressError("Node scoped x-bindings element not supported over AMQP 1.0."); |
| } |
| if (link.find(X_BINDINGS) != link.end()) { |
| throw qpid::messaging::AddressError("Link scoped x-bindings element not supported over AMQP 1.0."); |
| } |
| if (link.find(X_SUBSCRIBE) != link.end()) { |
| throw qpid::messaging::AddressError("Link scoped x-subscribe element not supported over AMQP 1.0."); |
| } |
| if (link.find(X_DECLARE) != link.end()) { |
| throw qpid::messaging::AddressError("Link scoped x-declare element not supported over AMQP 1.0."); |
| } |
| //massage x-declare into properties |
| Variant::Map::iterator i = node.find(X_DECLARE); |
| if (i != node.end()) { |
| Variant::Map x_declare = i->second.asMap(); |
| replace(x_declare, TYPE, EXCHANGE_TYPE); |
| flatten(x_declare, ARGUMENTS); |
| add(properties, x_declare); |
| node.erase(i); |
| } |
| //for temp queues, if neither lifetime-policy nor autodelete are specified, assume delete-on-close |
| if (isTemporary && properties.find(LIFETIME_POLICY) == properties.end() && properties.find(AUTO_DELETE) == properties.end()) { |
| properties[LIFETIME_POLICY] = DELETE_ON_CLOSE; |
| } |
| |
| if (properties.size() && !(isTemporary || !createPolicy.empty() || !assertPolicy.empty())) { |
| QPID_LOG(warning, "Properties will be ignored! " << address); |
| } |
| |
| qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR); |
| if (selector != link.end()) { |
| addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second); |
| } |
| if (!address.getSubject().empty()) { |
| addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject()); |
| } |
| qpid::types::Variant::Map::const_iterator filter = link.find(FILTER); |
| if (filter != link.end()) { |
| if (filter->second.getType() == qpid::types::VAR_MAP) { |
| addFilter(filter->second.asMap()); |
| } else if (filter->second.getType() == qpid::types::VAR_LIST) { |
| addFilters(filter->second.asList()); |
| } else { |
| throw qpid::messaging::AddressError("Filter must be a map or a list of maps, each containing name, descriptor and value."); |
| } |
| } |
| } |
| |
| void AddressHelper::addFilters(const qpid::types::Variant::List& f) |
| { |
| for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) { |
| addFilter(i->asMap()); |
| } |
| } |
| |
| void AddressHelper::addFilter(const qpid::types::Variant::Map& f) |
| { |
| qpid::types::Variant::Map::const_iterator name = f.find(NAME); |
| qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR); |
| qpid::types::Variant::Map::const_iterator value = f.find(VALUE); |
| //all fields are required at present (may relax this at a later stage): |
| if (name == f.end()) { |
| throw qpid::messaging::AddressError("Filter entry must specify name"); |
| } |
| if (descriptor == f.end()) { |
| throw qpid::messaging::AddressError("Filter entry must specify descriptor"); |
| } |
| if (value == f.end()) { |
| throw qpid::messaging::AddressError("Filter entry must specify value"); |
| } |
| try { |
| addFilter(name->second.asString(), descriptor->second.asUint64(), value->second); |
| } catch (const qpid::types::InvalidConversion&) { |
| addFilter(name->second.asString(), descriptor->second.asString(), value->second); |
| } |
| |
| } |
| |
| AddressHelper::Filter::Filter() : descriptorCode(0), confirmed(false) {} |
| AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant& v) : name(n), descriptorCode(d), value(v), confirmed(false) {} |
| AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant& v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v), confirmed(false) {} |
| |
| void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant& value) |
| { |
| filters.push_back(Filter(name, descriptor, value)); |
| } |
| void AddressHelper::addFilter(const std::string& name, const std::string& descriptor, const qpid::types::Variant& value) |
| { |
| filters.push_back(Filter(name, descriptor, value)); |
| } |
| |
| namespace { |
| bool checkLifetimePolicy(const std::string& requested, const std::string& actual) |
| { |
| if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL && requested == DELETE_ON_CLOSE) return true; |
| else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL && requested == DELETE_IF_UNUSED) return true; |
| else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL && requested == DELETE_IF_EMPTY) return true; |
| else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL && requested == DELETE_IF_UNUSED_AND_EMPTY) return true; |
| else return actual == requested; |
| } |
| bool checkLifetimePolicy(const std::string& requested, uint64_t actual) |
| { |
| if (actual == qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_CODE) |
| return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL); |
| else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_CODE) |
| return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL); |
| else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_CODE) |
| return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL); |
| else if (actual == qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE) |
| return checkLifetimePolicy(requested, qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL); |
| else |
| return false; |
| } |
| bool checkLifetimePolicy(const std::string& requested, pn_data_t* actual) |
| { |
| bool result(false); |
| if (pn_data_is_described(actual)) { |
| pn_data_enter(actual); |
| pn_data_next(actual); |
| if (pn_data_type(actual) == PN_ULONG) { |
| result = checkLifetimePolicy(requested, pn_data_get_ulong(actual)); |
| } else if (pn_data_type(actual) == PN_SYMBOL) { |
| result = checkLifetimePolicy(requested, convert(pn_data_get_symbol(actual))); |
| } |
| pn_data_exit(actual); |
| } |
| return result; |
| } |
| } |
| void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode) |
| { |
| if (assertEnabled(mode)) { |
| QPID_LOG(debug, "checking capabilities: " << capabilities); |
| //ensure all desired capabilities have been offered |
| std::set<std::string> desired; |
| for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { |
| if (*i != CREATE_ON_DEMAND) desired.insert(i->asString()); |
| } |
| pn_data_t* data = pn_terminus_capabilities(terminus); |
| if (pn_data_next(data)) { |
| pn_type_t type = pn_data_type(data); |
| if (type == PN_ARRAY) { |
| pn_data_enter(data); |
| while (pn_data_next(data)) { |
| desired.erase(convert(pn_data_get_symbol(data))); |
| } |
| pn_data_exit(data); |
| } else if (type == PN_SYMBOL) { |
| desired.erase(convert(pn_data_get_symbol(data))); |
| } else { |
| QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type)); |
| } |
| } |
| |
| if (desired.size()) { |
| std::stringstream missing; |
| missing << "Desired capabilities not met: "; |
| bool first(true); |
| for (std::set<std::string>::const_iterator i = desired.begin(); i != desired.end(); ++i) { |
| if (first) first = false; |
| else missing << ", "; |
| missing << *i; |
| } |
| throw qpid::messaging::AssertionFailed(missing.str()); |
| } |
| |
| //ensure all desired filters are in use |
| data = pn_terminus_filter(terminus); |
| if (pn_data_next(data)) { |
| size_t count = pn_data_get_map(data); |
| pn_data_enter(data); |
| for (size_t i = 0; i < count && pn_data_next(data); ++i) { |
| //skip key: |
| if (!pn_data_next(data)) break; |
| //expecting described value: |
| if (pn_data_is_described(data)) { |
| pn_data_enter(data); |
| pn_data_next(data); |
| if (pn_data_type(data) == PN_ULONG) { |
| confirmFilter(pn_data_get_ulong(data)); |
| } else if (pn_data_type(data) == PN_SYMBOL) { |
| confirmFilter(convert(pn_data_get_symbol(data))); |
| } |
| pn_data_exit(data); |
| } |
| } |
| pn_data_exit(data); |
| } |
| std::stringstream missing; |
| missing << "Desired filters not in use: "; |
| bool first(true); |
| for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { |
| if (!i->confirmed) { |
| if (first) first = false; |
| else missing << ", "; |
| missing << i->name << "("; |
| if (i->descriptorSymbol.empty()) missing << "0x" << std::hex << i->descriptorCode; |
| else missing << i->descriptorSymbol; |
| missing << ")"; |
| } |
| } |
| if (!first) throw qpid::messaging::AssertionFailed(missing.str()); |
| |
| //assert on properties (Note: this violates the AMQP 1.0 |
| //specification - as does the create option - by sending |
| //node-properties even if the dynamic option is not |
| //set. However this can be avoided by not specifying any node |
| //properties when asserting) |
| if (!type.empty() || durableNode || !properties.empty()) { |
| bool isAutoDeleted = false; |
| qpid::types::Variant::Map requested = properties; |
| if (!type.empty()) requested[SUPPORTED_DIST_MODES] = type == TOPIC ? COPY : MOVE; |
| if (durableNode) requested[DURABLE] = true; |
| |
| data = pn_terminus_properties(terminus); |
| if (pn_data_next(data)) { |
| size_t count = pn_data_get_map(data); |
| pn_data_enter(data); |
| for (size_t i = 0; i < count && pn_data_next(data); ++i) { |
| std::string key = convert(pn_data_get_symbol(data)); |
| pn_data_next(data); |
| qpid::types::Variant::Map::const_iterator j = requested.find(key); |
| qpid::types::Variant v; |
| if (key == LIFETIME_POLICY) { |
| isAutoDeleted = true; |
| if (j != requested.end() && checkLifetimePolicy(j->second.asString(), data)) { |
| requested.erase(j->first); |
| } |
| } else if (key == AUTO_DELETE) { |
| PnData(data).get(v); |
| isAutoDeleted = v.asBool(); |
| } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) { |
| requested.erase(j->first); |
| } |
| } |
| pn_data_exit(data); |
| qpid::types::Variant::Map::iterator i = requested.find(AUTO_DELETE); |
| if (i != requested.end() && i->second.asBool() == isAutoDeleted) { |
| requested.erase(i); |
| } |
| if (!requested.empty()) { |
| std::stringstream missing; |
| missing << "Requested node properties not met: " << requested; |
| throw qpid::messaging::AssertionFailed(missing.str()); |
| } |
| } |
| } |
| } |
| } |
| |
| void AddressHelper::confirmFilter(const std::string& descriptor) |
| { |
| for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { |
| if (descriptor == i->descriptorSymbol) i->confirmed = true; |
| } |
| } |
| |
| void AddressHelper::confirmFilter(uint64_t descriptor) |
| { |
| for (std::vector<Filter>::iterator i = filters.begin(); i != filters.end(); ++i) { |
| if (descriptor == i->descriptorCode) i->confirmed = true; |
| } |
| } |
| |
| bool AddressHelper::createEnabled(CheckMode mode) const |
| { |
| return enabled(createPolicy, mode); |
| } |
| |
| bool AddressHelper::assertEnabled(CheckMode mode) const |
| { |
| return enabled(assertPolicy, mode); |
| } |
| |
| bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const |
| { |
| bool result = false; |
| switch (mode) { |
| case FOR_RECEIVER: |
| result = in(policy, RECEIVER_MODES); |
| break; |
| case FOR_SENDER: |
| result = in(policy, SENDER_MODES); |
| break; |
| } |
| return result; |
| } |
| |
| bool AddressHelper::isNameNull() const |
| { |
| return name == NULL_ADDRESS; |
| } |
| |
| bool AddressHelper::isUnreliable() const |
| { |
| return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || |
| (reliability.empty() && browse); // A browser defaults to unreliable. |
| } |
| |
| const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const |
| { |
| return node; |
| } |
| const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const |
| { |
| return link; |
| } |
| |
| bool AddressHelper::getLinkSource(std::string& out) const |
| { |
| return getLinkOption(SOURCE, out); |
| } |
| |
| bool AddressHelper::getLinkTarget(std::string& out) const |
| { |
| return getLinkOption(TARGET, out); |
| } |
| |
| bool AddressHelper::getLinkOption(const std::string& name, std::string& out) const |
| { |
| qpid::types::Variant::Map::const_iterator i = link.find(name); |
| if (i != link.end()) { |
| out = i->second.asString(); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode) |
| { |
| bool createOnDemand(false); |
| if (isTemporary) { |
| //application expects a name to be generated |
| pn_terminus_set_dynamic(terminus, true); |
| setNodeProperties(terminus); |
| } else if (name != NULL_ADDRESS) { |
| pn_terminus_set_address(terminus, name.c_str()); |
| if (createEnabled(mode)) { |
| //application expects name of node to be as specified |
| setNodeProperties(terminus); |
| createOnDemand = true; |
| } else if (assertEnabled(mode)) { |
| setNodeProperties(terminus); |
| } |
| } |
| |
| setCapabilities(terminus, createOnDemand); |
| if (durableLink) { |
| pn_terminus_set_durability(terminus, PN_DELIVERIES); |
| } |
| if (mode == FOR_RECEIVER) { |
| if (timeout) pn_terminus_set_timeout(terminus, timeout); |
| if (browse) { |
| pn_terminus_set_distribution_mode(terminus, PN_DIST_MODE_COPY); |
| } |
| //set filter(s): |
| if (!filters.empty()) { |
| pn_data_t* filter = pn_terminus_filter(terminus); |
| pn_data_put_map(filter); |
| pn_data_enter(filter); |
| for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end(); ++i) { |
| pn_data_put_symbol(filter, convert(i->name)); |
| pn_data_put_described(filter); |
| pn_data_enter(filter); |
| if (i->descriptorSymbol.size()) { |
| pn_data_put_symbol(filter, convert(i->descriptorSymbol)); |
| } else { |
| pn_data_put_ulong(filter, i->descriptorCode); |
| } |
| PnData(filter).put(i->value); |
| pn_data_exit(filter); |
| } |
| pn_data_exit(filter); |
| } |
| } |
| if (isUnreliable()) { |
| pn_link_set_snd_settle_mode(link, PN_SND_SETTLED); |
| } else if (!reliability.empty()) { |
| if (reliability == EXACTLY_ONCE ) { |
| QPID_LOG(warning, "Unsupported reliability mode: " << reliability); |
| } else if (reliability != AT_LEAST_ONCE ) { |
| QPID_LOG(warning, "Unrecognised reliability mode: " << reliability); |
| } |
| pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED); |
| } |
| } |
| |
| void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create) |
| { |
| if (create) capabilities.push_back(CREATE_ON_DEMAND); |
| if (!type.empty()) capabilities.push_back(type); |
| if (durableNode) capabilities.push_back(DURABLE); |
| |
| pn_data_t* data = pn_terminus_capabilities(terminus); |
| if (capabilities.size() == 1) { |
| pn_data_put_symbol(data, convert(capabilities.front().asString())); |
| } else if (capabilities.size() > 1) { |
| pn_data_put_array(data, false, PN_SYMBOL); |
| pn_data_enter(data); |
| for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) { |
| pn_data_put_symbol(data, convert(i->asString())); |
| } |
| pn_data_exit(data); |
| } |
| } |
| std::string AddressHelper::getLinkName(const Address& address) |
| { |
| AddressHelper helper(address); |
| const qpid::types::Variant::Map& linkProps = helper.getLinkProperties(); |
| qpid::types::Variant::Map::const_iterator i = linkProps.find(NAME); |
| if (i != linkProps.end()) { |
| return i->second.asString(); |
| } else { |
| std::stringstream name; |
| name << address.getName() << "_" << qpid::types::Uuid(true); |
| return name.str(); |
| } |
| } |
| namespace { |
| std::string toLifetimePolicy(const std::string& value) |
| { |
| if (value == DELETE_ON_CLOSE) return qpid::amqp::lifetime_policy::DELETE_ON_CLOSE_SYMBOL; |
| else if (value == DELETE_IF_UNUSED) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; |
| else if (value == DELETE_IF_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; |
| else if (value == DELETE_IF_UNUSED_AND_EMPTY) return qpid::amqp::lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; |
| else return value;//asume value is itself the symbolic descriptor |
| } |
| void putLifetimePolicy(pn_data_t* data, const std::string& value) |
| { |
| pn_data_put_described(data); |
| pn_data_enter(data); |
| pn_data_put_symbol(data, convert(value)); |
| pn_data_put_list(data); |
| pn_data_exit(data); |
| } |
| } |
| void AddressHelper::setNodeProperties(pn_terminus_t* terminus) |
| { |
| if (properties.size() || type.size() || durableNode) { |
| pn_data_t* data = pn_terminus_properties(terminus); |
| pn_data_put_map(data); |
| pn_data_enter(data); |
| if (type.size()) { |
| pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES)); |
| pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE)); |
| } |
| if (durableNode) { |
| pn_data_put_symbol(data, convert(DURABLE)); |
| pn_data_put_bool(data, true); |
| } |
| for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { |
| if (i->first == LIFETIME_POLICY) { |
| pn_data_put_symbol(data, convert(i->first)); |
| putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); |
| } else { |
| pn_data_put_symbol(data, convert(i->first)); |
| PnData(data).put(i->second); |
| } |
| } |
| pn_data_exit(data); |
| } |
| } |
| |
| Verifier::Verifier() |
| { |
| defined[CREATE] = true; |
| defined[ASSERT] = true; |
| defined[DELETE] = true; |
| defined[MODE] = true; |
| Variant::Map node; |
| node[TYPE] = true; |
| node[DURABLE] = true; |
| node[PROPERTIES] = true; |
| node[CAPABILITIES] = true; |
| node[X_DECLARE] = true; |
| node[X_BINDINGS] = true; |
| defined[NODE] = node; |
| Variant::Map link; |
| link[NAME] = true; |
| link[DURABLE] = true; |
| link[RELIABILITY] = true; |
| link[TIMEOUT] = true; |
| link[SOURCE] = true; |
| link[TARGET] = true; |
| link[X_SUBSCRIBE] = true; |
| link[X_DECLARE] = true; |
| link[X_BINDINGS] = true; |
| link[SELECTOR] = true; |
| link[FILTER] = true; |
| defined[LINK] = link; |
| } |
| void Verifier::verify(const Address& address) const |
| { |
| verify(defined, address.getOptions()); |
| } |
| |
| void Verifier::verify(const Variant::Map& allowed, const Variant::Map& actual) const |
| { |
| for (Variant::Map::const_iterator i = actual.begin(); i != actual.end(); ++i) { |
| Variant::Map::const_iterator option = allowed.find(i->first); |
| if (option == allowed.end()) { |
| throw AddressError((boost::format("Unrecognised option: %1%") % i->first).str()); |
| } else if (option->second.getType() == qpid::types::VAR_MAP) { |
| verify(option->second.asMap(), i->second.asMap()); |
| } |
| } |
| } |
| |
| }}} // namespace qpid::messaging::amqp |