blob: 3fc694b954c7d90b70d780394b6d40612e80d301 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/link.hpp"
#include "proton/link_options.hpp"
#include "proton/handler.hpp"
#include "msg.hpp"
#include "messaging_adapter.hpp"
namespace proton {
namespace {
std::string lifetime_policy_symbol(enum link_options::lifetime_policy lp) {
switch (lp) {
case link_options::DELETE_ON_CLOSE: return "amqp:delete-on-close:list";
case link_options::DELETE_ON_NO_LINKS: return "amqp:delete-on-no-links:list";
case link_options::DELETE_ON_NO_MESSAGES: return "amqp:delete-on-no-messages:list";
case link_options::DELETE_ON_NO_LINKS_OR_MESSAGES: return "amqp:delete-on-no-links-or-messages:list";
default: break;
}
return "";
}
std::string distribution_mode_symbol(enum terminus::distribution_mode dm) {
switch (dm) {
case terminus::COPY: return "copy";
case terminus::MOVE: return "move";
default: break;
}
return "";
}
}
template <class T> struct option {
T value;
bool set;
option() : value(), set(false) {}
option& operator=(const T& x) { value = x; set = true; return *this; }
void override(const option<T>& x) { if (x.set) *this = x.value; }
};
class link_options::impl {
public:
option<proton_handler*> handler;
option<enum terminus::distribution_mode> distribution_mode;
option<bool> durable_subscription;
option<enum delivery_mode> delivery_mode;
option<bool> dynamic_address;
option<std::string> local_address;
option<enum lifetime_policy> lifetime_policy;
option<std::string> selector;
void apply(link& l) {
if (l.state() & endpoint::LOCAL_UNINIT) {
bool sender = !l.receiver();
if (local_address.set) {
if (sender)
l.local_target().address(local_address.value);
else
l.local_source().address(local_address.value);
}
if (delivery_mode.set) {
switch (delivery_mode.value) {
case AT_MOST_ONCE:
l.sender_settle_mode(link_options::SETTLED);
break;
case AT_LEAST_ONCE:
l.sender_settle_mode(link_options::UNSETTLED);
l.receiver_settle_mode(link_options::SETTLE_ALWAYS);
break;
default:
break;
}
}
if (handler.set) {
if (handler.value)
l.handler(*handler.value);
else
l.detach_handler();
}
if (dynamic_address.set) {
terminus t = sender ? l.local_target() : l.local_source();
t.dynamic(dynamic_address.value);
if (dynamic_address.value) {
std::string lp, dm;
if (lifetime_policy.set) lp = lifetime_policy_symbol(lifetime_policy.value);
if (!sender && distribution_mode.set) dm = distribution_mode_symbol(distribution_mode.value);
if (lp.size() || dm.size()) {
encoder enc = t.node_properties().encode();
enc << start::map();
if (dm.size())
enc << amqp_symbol("supported-dist-modes") << amqp_string(dm);
if (lp.size())
enc << amqp_symbol("lifetime-policy") << start::described()
<< amqp_symbol(lp) << start::list() << finish();
}
}
}
if (!sender) {
// receiver only options
if (distribution_mode.set) l.local_source().distribution_mode(distribution_mode.value);
if (durable_subscription.set && durable_subscription.value) {
l.local_source().durability(terminus::DELIVERIES);
l.local_source().expiry_policy(terminus::EXPIRE_NEVER);
}
if (selector.set && selector.value.size()) {
encoder enc = l.local_source().filter().encode();
enc << start::map() << amqp_symbol("selector") << start::described()
<< amqp_symbol("apache.org:selector-filter:string") << amqp_binary(selector.value) << finish();
}
}
}
}
void override(const impl& x) {
handler.override(x.handler);
distribution_mode.override(x.distribution_mode);
durable_subscription.override(x.durable_subscription);
delivery_mode.override(x.delivery_mode);
dynamic_address.override(x.dynamic_address);
local_address.override(x.local_address);
lifetime_policy.override(x.lifetime_policy);
selector.override(x.selector);
}
};
link_options::link_options() : impl_(new impl()) {}
link_options::link_options(const link_options& x) : impl_(new impl()) {
*this = x;
}
link_options::~link_options() {}
link_options& link_options::operator=(const link_options& x) {
*impl_ = *x.impl_;
return *this;
}
void link_options::override(const link_options& x) { impl_->override(*x.impl_); }
link_options& link_options::handler(class handler *h) { impl_->handler = h->messaging_adapter_.get(); return *this; }
link_options& link_options::browsing(bool b) { distribution_mode(b ? terminus::COPY : terminus::MOVE); return *this; }
link_options& link_options::distribution_mode(enum terminus::distribution_mode m) { impl_->distribution_mode = m; return *this; }
link_options& link_options::durable_subscription(bool b) {impl_->durable_subscription = b; return *this; }
link_options& link_options::delivery_mode(enum delivery_mode m) {impl_->delivery_mode = m; return *this; }
link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b; return *this; }
link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; }
link_options& link_options::lifetime_policy(enum lifetime_policy lp) {impl_->lifetime_policy = lp; return *this; }
link_options& link_options::selector(const std::string &str) {impl_->selector = str; return *this; }
void link_options::apply(link& l) const { impl_->apply(l); }
proton_handler* link_options::handler() const { return impl_->handler.value; }
} // namespace proton