PROTON-335: Add access to link attach properties (C and Python)
pn_link_properties()
pn_link_remote_properties()
This closes #260.
diff --git a/c/include/proton/link.h b/c/include/proton/link.h
index c7e3f5b..0143976 100644
--- a/c/include/proton/link.h
+++ b/c/include/proton/link.h
@@ -681,6 +681,35 @@
PN_EXTERN uint64_t pn_link_remote_max_message_size(pn_link_t *link);
/**
+ * Access/modify the AMQP properties data for a link object.
+ *
+ * This operation will return a pointer to a ::pn_data_t object that is valid
+ * until the link object is freed. Any data contained by the ::pn_data_t object
+ * will be sent as the AMQP properties for the link object when the link is
+ * opened by calling ::pn_link_open. Note that this MUST take the form of a
+ * symbol keyed map to be valid.
+ *
+ * The ::pn_data_t pointer returned is valid until the link object is freed.
+ *
+ * @param[in] link the link object
+ * @return a pointer to a pn_data_t representing the link properties
+ */
+PN_EXTERN pn_data_t *pn_link_properties(pn_link_t *link);
+
+/**
+ * Access the AMQP link properties supplied by the remote link endpoint.
+ *
+ * This operation will return a pointer to a ::pn_data_t object that
+ * is valid until the link object is freed. This data object
+ * will be empty until the remote link is opened as indicated by
+ * the ::PN_REMOTE_ACTIVE flag.
+ *
+ * @param[in] link the link object
+ * @return the remote link properties
+ */
+PN_EXTERN pn_data_t *pn_link_remote_properties(pn_link_t *link);
+
+/**
* @}
*/
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
index ba6c051..9dbc919 100644
--- a/c/src/core/engine-internal.h
+++ b/c/src/core/engine-internal.h
@@ -296,6 +296,8 @@
pn_delivery_t *unsettled_tail;
pn_delivery_t *current;
pn_record_t *context;
+ pn_data_t *properties;
+ pn_data_t *remote_properties;
size_t unsettled_count;
uint64_t max_message_size;
uint64_t remote_max_message_size;
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index 19dcc63..02062f6 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -1125,6 +1125,8 @@
if (endpoint->referenced) {
pn_decref(link->session);
}
+ pn_free(link->properties);
+ pn_free(link->remote_properties);
}
#define pn_link_refcount pn_object_refcount
@@ -1168,6 +1170,8 @@
link->remote_snd_settle_mode = PN_SND_MIXED;
link->remote_rcv_settle_mode = PN_RCV_FIRST;
link->detached = false;
+ link->properties = 0;
+ link->remote_properties = 0;
// begin transport state
link->state.local_handle = -1;
@@ -1972,6 +1976,21 @@
return link->remote_max_message_size;
}
+pn_data_t *pn_link_properties(pn_link_t *link)
+{
+ assert(link);
+ if (!link->properties)
+ link->properties = pn_data(0);
+ return link->properties;
+}
+
+pn_data_t *pn_link_remote_properties(pn_link_t *link)
+{
+ assert(link);
+ return link->remote_properties;
+}
+
+
pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
{
assert(delivery);
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index dde697d..c2f1c05 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1335,14 +1335,19 @@
bool snd_settle, rcv_settle;
uint8_t snd_settle_mode, rcv_settle_mode;
uint64_t max_msgsz;
- int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL]", &name, &handle,
+ bool has_props;
+ pn_data_t *rem_props = pn_data(0);
+ int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?C]", &name, &handle,
&is_sender,
&snd_settle, &snd_settle_mode,
&rcv_settle, &rcv_settle_mode,
&source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
&target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
- &idc, &max_msgsz);
- if (err) return err;
+ &idc, &max_msgsz, &has_props, rem_props);
+ if (err) {
+ pn_free(rem_props);
+ return err;
+ }
char strbuf[128]; // avoid malloc for most link names
char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL;
char *strname = strheap ? strheap : strbuf;
@@ -1353,12 +1358,14 @@
if (!ssn) {
pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
if (strheap) free(strheap);
+ pn_free(rem_props);
return PN_EOS;
}
pn_link_t *link = pni_find_link(ssn, name, is_sender);
if (link && (int32_t)link->state.remote_handle >= 0) {
pn_do_error(transport, "amqp:invalid-field", "link name already attached: %s", strname);
if (strheap) free(strheap);
+ pn_free(rem_props);
return PN_EOS;
}
if (!link) { /* Make a new link for the attach */
@@ -1373,6 +1380,12 @@
free(strheap);
}
+ if (has_props) {
+ link->remote_properties = rem_props;
+ } else {
+ pn_free(rem_props);
+ }
+
pni_map_remote_handle(link, handle);
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
pn_terminus_t *rsrc = &link->remote_source;
@@ -2078,7 +2091,7 @@
if (err) return err;
} else {
int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel,
- "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnIL]", ATTACH,
+ "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", ATTACH,
pn_string_get(link->name),
state->local_handle,
endpoint->type == RECEIVER,
@@ -2106,7 +2119,9 @@
link->target.properties,
link->target.capabilities,
- 0, link->max_message_size);
+ 0,
+ link->max_message_size,
+ link->properties);
if (err) return err;
}
}
diff --git a/c/tests/engine_test.cpp b/c/tests/engine_test.cpp
index fa6e23f..f8b174e 100644
--- a/c/tests/engine_test.cpp
+++ b/c/tests/engine_test.cpp
@@ -23,6 +23,8 @@
#include <proton/engine.h>
+using namespace pn_test;
+
// push data from one transport to another
static int xfer(pn_transport_t *src, pn_transport_t *dest) {
ssize_t out = pn_transport_pending(src);
@@ -316,3 +318,51 @@
pn_transport_free(t2);
pn_connection_free(c2);
}
+
+
+TEST_CASE("link_properties)") {
+ pn_connection_t *c1 = pn_connection();
+ pn_transport_t *t1 = pn_transport();
+ pn_transport_bind(t1, c1);
+
+ pn_connection_t *c2 = pn_connection();
+ pn_transport_t *t2 = pn_transport();
+ pn_transport_set_server(t2);
+ pn_transport_bind(t2, c2);
+
+ pn_connection_open(c1);
+ pn_connection_open(c2);
+
+ pn_session_t *s1 = pn_session(c1);
+ pn_session_open(s1);
+
+ pn_link_t *rx = pn_receiver(s1, "props");
+ pn_data_t *props = pn_link_properties(rx);
+ REQUIRE(props != NULL);
+
+ pn_data_clear(props);
+ pn_data_fill(props, "{S[iii]SI}", "foo", 1, 987, 3, "bar", 965);
+ pn_link_open(rx);
+
+ while (pump(t1, t2)) {
+ process_endpoints(c1);
+ process_endpoints(c2);
+ }
+
+ // session and link should be up, c2 should have a sender link:
+ REQUIRE(pn_link_state(rx) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+ REQUIRE(pn_link_remote_properties(rx) == NULL);
+
+ pn_link_t *tx = pn_link_head(c2, (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE));
+
+ REQUIRE(pn_link_remote_properties(tx) != NULL);
+ CHECK("{\"foo\"=[1, 987, 3], \"bar\"=965}" == pn_test::inspect(pn_link_remote_properties(tx)));
+
+ pn_transport_unbind(t1);
+ pn_transport_free(t1);
+ pn_connection_free(c1);
+
+ pn_transport_unbind(t2);
+ pn_transport_free(t2);
+ pn_connection_free(c2);
+}
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index f518137..50cf677 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -53,7 +53,8 @@
pn_terminus_get_durability, pn_terminus_get_expiry_policy, pn_terminus_get_timeout, pn_terminus_get_type, \
pn_terminus_is_dynamic, pn_terminus_outcomes, pn_terminus_properties, pn_terminus_set_address, \
pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, \
- pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head
+ pn_terminus_set_expiry_policy, pn_terminus_set_timeout, pn_terminus_set_type, pn_work_head, \
+ pn_link_properties, pn_link_remote_properties
from ._common import unicode2utf8, utf82unicode
from ._condition import cond2obj, obj2cond
@@ -771,6 +772,10 @@
def __init__(self, impl):
Wrapper.__init__(self, impl, pn_link_attachments)
+ def _init(self):
+ Endpoint._init(self)
+ self.properties = None
+
def _get_attachments(self):
return pn_link_attachments(self._impl)
@@ -796,6 +801,7 @@
sent to the peer. A link is fully active once both peers have
attached it.
"""
+ obj2dat(self.properties, pn_link_properties(self._impl))
pn_link_open(self._impl)
def close(self):
@@ -1200,6 +1206,41 @@
"""
pn_link_free(self._impl)
+ @property
+ def remote_properties(self):
+ """
+ The properties specified by the remote peer for this link.
+
+ This operation will return a :class:`Data` object that
+ is valid until the link object is freed. This :class:`Data`
+ object will be empty until the remote link is opened as
+ indicated by the :const:`REMOTE_ACTIVE` flag.
+
+ :type: :class:`Data`
+ """
+ return dat2obj(pn_link_remote_properties(self._impl))
+
+
+ def _get_properties(self):
+ return self._properties_dict
+
+ def _set_properties(self, properties_dict):
+ if isinstance(properties_dict, dict):
+ self._properties_dict = PropertyDict(properties_dict, raise_on_error=False)
+ else:
+ self._properties_dict = properties_dict
+
+ properties = property(_get_properties, _set_properties, doc="""
+ Link properties as a dictionary of key/values. The AMQP 1.0
+ specification restricts this dictionary to have keys that are only
+ :class:`symbol` types. It is possible to use the special ``dict``
+ subclass :class:`PropertyDict` which will by default enforce this
+ restrictions on construction. In addition, if strings type are used,
+ this will silently convert them into symbols.
+
+ :type: ``dict`` containing :class:`symbol`` keys.
+ """)
+
class Sender(Link):
"""
diff --git a/python/tests/proton_tests/engine.py b/python/tests/proton_tests/engine.py
index 48bee6b..1b4c02b 100644
--- a/python/tests/proton_tests/engine.py
+++ b/python/tests/proton_tests/engine.py
@@ -765,6 +765,17 @@
self.pump()
assert self.rcv.remote_max_message_size == 13579
+ def test_properties(self):
+ sender_props = {symbol('key1'): 'value1',
+ symbol('key2'): 'value2'}
+ self.snd.properties = sender_props
+ self.snd.open()
+ self.rcv.open()
+ self.pump()
+
+ assert self.rcv.remote_properties == sender_props, (self.rcv.remote_properties, sender_props)
+ assert self.snd.remote_properties == None, (self.snd.remote_properties, None)
+
def test_cleanup(self):
snd, rcv = self.link("test-link")
snd.open()