| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| from org.apache.qpid.proton import Proton |
| from org.apache.qpid.proton.amqp import Symbol |
| from org.apache.qpid.proton.amqp.messaging import Source, Target, \ |
| TerminusDurability, TerminusExpiryPolicy, Received, Accepted, \ |
| Rejected, Released, Modified |
| from org.apache.qpid.proton.amqp.transaction import Coordinator |
| from org.apache.qpid.proton.amqp.transport import ErrorCondition, \ |
| SenderSettleMode, ReceiverSettleMode |
| from org.apache.qpid.proton.engine import EndpointState, Sender, \ |
| Receiver, Transport as _Transport, TransportException, EventType |
| |
| from java.util import EnumSet |
| from compat import array, zeros |
| |
| from cerror import * |
| from ccodec import * |
| |
| # from proton/engine.h |
| PN_LOCAL_UNINIT = 1 |
| PN_LOCAL_ACTIVE = 2 |
| PN_LOCAL_CLOSED = 4 |
| PN_REMOTE_UNINIT = 8 |
| PN_REMOTE_ACTIVE = 16 |
| PN_REMOTE_CLOSED = 32 |
| |
| PN_SND_UNSETTLED = 0 |
| PN_SND_SETTLED = 1 |
| PN_SND_MIXED = 2 |
| |
| PN_RCV_FIRST = 0 |
| PN_RCV_SECOND = 1 |
| |
| PN_UNSPECIFIED = 0 |
| PN_SOURCE = 1 |
| PN_TARGET = 2 |
| PN_COORDINATOR = 3 |
| |
| PN_NONDURABLE = 0 |
| PN_CONFIGURATION = 1 |
| PN_DELIVERIES = 2 |
| |
| PN_EXPIRE_WITH_LINK = 0 |
| PN_EXPIRE_WITH_SESSION = 1 |
| PN_EXPIRE_WITH_CONNECTION = 2 |
| PN_EXPIRE_NEVER = 3 |
| |
| PN_DIST_MODE_UNSPECIFIED = 0 |
| PN_DIST_MODE_COPY = 1 |
| PN_DIST_MODE_MOVE = 2 |
| |
| PN_RECEIVED = (0x0000000000000023) |
| PN_ACCEPTED = (0x0000000000000024) |
| PN_REJECTED = (0x0000000000000025) |
| PN_RELEASED = (0x0000000000000026) |
| PN_MODIFIED = (0x0000000000000027) |
| |
| PN_TRACE_OFF = _Transport.TRACE_OFF |
| PN_TRACE_RAW = _Transport.TRACE_RAW |
| PN_TRACE_FRM = _Transport.TRACE_FRM |
| PN_TRACE_DRV = _Transport.TRACE_DRV |
| |
| def wrap(obj, wrapper): |
| if obj: |
| ctx = obj.getContext() |
| if not ctx: |
| ctx = wrapper(obj) |
| obj.setContext(ctx) |
| return ctx |
| |
| class pn_condition: |
| |
| def __init__(self): |
| self.name = None |
| self.description = None |
| self.info = pn_data(0) |
| |
| def decode(self, impl): |
| if impl is None: |
| self.name = None |
| self.description = None |
| self.info.clear() |
| else: |
| cond = impl.getCondition() |
| if cond is None: |
| self.name = None |
| else: |
| self.name = cond.toString() |
| self.description = impl.getDescription() |
| obj2dat(impl.getInfo(), self.info) |
| |
| def encode(self): |
| if self.name is None: |
| return None |
| else: |
| impl = ErrorCondition() |
| impl.setCondition(Symbol.valueOf(self.name)) |
| impl.setDescription(self.description) |
| impl.setInfo(dat2obj(self.info)) |
| return impl |
| |
| def pn_condition_is_set(cond): |
| return bool(cond.name) |
| |
| def pn_condition_get_name(cond): |
| return cond.name |
| |
| def pn_condition_set_name(cond, name): |
| cond.name = name |
| |
| def pn_condition_get_description(cond): |
| return cond.description |
| |
| def pn_condition_set_description(cond, description): |
| cond.description = description |
| |
| def pn_condition_clear(cond): |
| cond.name = None |
| cond.description = None |
| cond.info.clear() |
| |
| def pn_condition_info(cond): |
| return cond.info |
| |
| class endpoint_wrapper: |
| |
| def __init__(self, impl): |
| self.impl = impl |
| self.condition = pn_condition() |
| self.remote_condition = pn_condition() |
| |
| def on_close(self): |
| cond = self.condition.encode() |
| if cond: |
| self.impl.setCondition(cond) |
| |
| def remote_condition(self): |
| self.remote_condition.decode(self.impl.getRemoteCondition()) |
| return self.remote_condition |
| |
| class pn_connection_wrapper(endpoint_wrapper): |
| |
| def __init__(self, impl): |
| endpoint_wrapper.__init__(self, impl) |
| self.properties = pn_data(0) |
| self.offered_capabilities = pn_data(0) |
| self.desired_capabilities = pn_data(0) |
| |
| def pn_connection(): |
| return wrap(Proton.connection(), pn_connection_wrapper) |
| |
| def set2mask(local, remote): |
| mask = 0 |
| if local.contains(EndpointState.UNINITIALIZED): |
| mask |= PN_LOCAL_UNINIT |
| if local.contains(EndpointState.ACTIVE): |
| mask |= PN_LOCAL_ACTIVE |
| if local.contains(EndpointState.CLOSED): |
| mask |= PN_LOCAL_CLOSED |
| if remote.contains(EndpointState.UNINITIALIZED): |
| mask |= PN_REMOTE_UNINIT |
| if remote.contains(EndpointState.ACTIVE): |
| mask |= PN_REMOTE_ACTIVE |
| if remote.contains(EndpointState.CLOSED): |
| mask |= PN_REMOTE_CLOSED |
| return mask |
| |
| def endpoint_state(impl): |
| return set2mask(EnumSet.of(impl.getLocalState()), |
| EnumSet.of(impl.getRemoteState())) |
| |
| def pn_connection_state(conn): |
| return endpoint_state(conn.impl) |
| |
| def pn_connection_condition(conn): |
| return conn.condition |
| |
| def pn_connection_remote_condition(conn): |
| return remote_condition(conn) |
| |
| def pn_connection_properties(conn): |
| return conn.properties |
| |
| def pn_connection_remote_properties(conn): |
| return obj2dat(conn.impl.getRemoteProperties()) |
| |
| def pn_connection_offered_capabilities(conn): |
| return conn.offered_capabilities |
| |
| def pn_connection_remote_offered_capabilities(conn): |
| return array2dat(conn.impl.getRemoteOfferedCapabilities(), PN_SYMBOL) |
| |
| def pn_connection_desired_capabilities(conn): |
| return conn.desired_capabilities |
| |
| def pn_connection_remote_desired_capabilities(conn): |
| return array2dat(conn.impl.getRemoteDesiredCapabilities(), PN_SYMBOL) |
| |
| def pn_connection_attachments(conn): |
| return conn.impl.attachments() |
| |
| def pn_connection_set_container(conn, name): |
| conn.impl.setContainer(name) |
| |
| def pn_connection_get_container(conn): |
| return conn.impl.getContainer() |
| |
| def pn_connection_remote_container(conn): |
| return conn.impl.getRemoteContainer() |
| |
| def pn_connection_get_hostname(conn): |
| return conn.impl.getHostname() |
| |
| def pn_connection_set_hostname(conn, name): |
| conn.impl.setHostname(name) |
| |
| def pn_connection_remote_hostname(conn): |
| return conn.impl.getRemoteHostname() |
| |
| def pn_connection_open(conn): |
| props = dat2obj(conn.properties) |
| offered = dat2obj(conn.offered_capabilities) |
| desired = dat2obj(conn.desired_capabilities) |
| if props: |
| conn.impl.setProperties(props) |
| if offered: |
| conn.impl.setOfferedCapabilities(array(list(offered), Symbol)) |
| if desired: |
| conn.impl.setDesiredCapabilities(array(list(desired), Symbol)) |
| conn.impl.open() |
| |
| def pn_connection_close(conn): |
| conn.on_close() |
| conn.impl.close() |
| |
| def pn_connection_release(conn): |
| conn.impl.free() |
| |
| def pn_connection_transport(conn): |
| return wrap(conn.impl.getTransport(), pn_transport_wrapper) |
| |
| class pn_session_wrapper(endpoint_wrapper): |
| pass |
| |
| def pn_session(conn): |
| return wrap(conn.impl.session(), pn_session_wrapper) |
| |
| def pn_session_attachments(ssn): |
| return ssn.impl.attachments() |
| |
| def pn_session_state(ssn): |
| return endpoint_state(ssn.impl) |
| |
| def pn_session_get_incoming_capacity(ssn): |
| return ssn.impl.getIncomingCapacity() |
| |
| def pn_session_set_incoming_capacity(ssn, capacity): |
| ssn.impl.setIncomingCapacity(capacity) |
| |
| def pn_session_incoming_bytes(ssn): |
| return ssn.impl.getIncomingBytes() |
| |
| def pn_session_outgoing_bytes(ssn): |
| return ssn.impl.getOutgoingBytes() |
| |
| def pn_session_get_outgoing_window(ssn): |
| return ssn.impl.getOutgoingWindow() |
| |
| def pn_session_set_outgoing_window(ssn, window): |
| ssn.impl.setOutgoingWindow(window) |
| |
| def pn_session_condition(ssn): |
| return ssn.condition |
| |
| def pn_session_remote_condition(ssn): |
| return remote_condition(ssn) |
| |
| def pn_session_open(ssn): |
| ssn.impl.open() |
| |
| def pn_session_close(ssn): |
| ssn.on_close() |
| ssn.impl.close() |
| |
| def mask2set(mask): |
| local = [] |
| remote = [] |
| if PN_LOCAL_UNINIT & mask: |
| local.append(EndpointState.UNINITIALIZED) |
| if PN_LOCAL_ACTIVE & mask: |
| local.append(EndpointState.ACTIVE) |
| if PN_LOCAL_CLOSED & mask: |
| local.append(EndpointState.CLOSED) |
| if PN_REMOTE_UNINIT & mask: |
| remote.append(EndpointState.UNINITIALIZED) |
| if PN_REMOTE_ACTIVE & mask: |
| remote.append(EndpointState.ACTIVE) |
| if PN_REMOTE_CLOSED & mask: |
| remote.append(EndpointState.CLOSED) |
| |
| if local: |
| local = EnumSet.of(*local) |
| else: |
| local = None |
| if remote: |
| remote = EnumSet.of(*remote) |
| else: |
| remote = None |
| |
| return local, remote |
| |
| def pn_session_head(conn, mask): |
| local, remote = mask2set(mask) |
| return wrap(conn.impl.sessionHead(local, remote), pn_session_wrapper) |
| |
| def pn_session_connection(ssn): |
| return wrap(ssn.impl.getConnection(), pn_connection_wrapper) |
| |
| def pn_sender(ssn, name): |
| return wrap(ssn.impl.sender(name), pn_link_wrapper) |
| |
| def pn_receiver(ssn, name): |
| return wrap(ssn.impl.receiver(name), pn_link_wrapper) |
| |
| def pn_session_free(ssn): |
| ssn.impl.free() |
| |
| TERMINUS_TYPES_J2P = { |
| Source: PN_SOURCE, |
| Target: PN_TARGET, |
| Coordinator: PN_COORDINATOR, |
| None.__class__: PN_UNSPECIFIED |
| } |
| |
| TERMINUS_TYPES_P2J = { |
| PN_SOURCE: Source, |
| PN_TARGET: Target, |
| PN_COORDINATOR: Coordinator, |
| PN_UNSPECIFIED: lambda: None |
| } |
| |
| DURABILITY_P2J = { |
| PN_NONDURABLE: TerminusDurability.NONE, |
| PN_CONFIGURATION: TerminusDurability.CONFIGURATION, |
| PN_DELIVERIES: TerminusDurability.UNSETTLED_STATE |
| } |
| |
| DURABILITY_J2P = { |
| TerminusDurability.NONE: PN_NONDURABLE, |
| TerminusDurability.CONFIGURATION: PN_CONFIGURATION, |
| TerminusDurability.UNSETTLED_STATE: PN_DELIVERIES |
| } |
| |
| EXPIRY_POLICY_P2J = { |
| PN_EXPIRE_WITH_LINK: TerminusExpiryPolicy.LINK_DETACH, |
| PN_EXPIRE_WITH_SESSION: TerminusExpiryPolicy.SESSION_END, |
| PN_EXPIRE_WITH_CONNECTION: TerminusExpiryPolicy.CONNECTION_CLOSE, |
| PN_EXPIRE_NEVER: TerminusExpiryPolicy.NEVER |
| } |
| |
| EXPIRY_POLICY_J2P = { |
| TerminusExpiryPolicy.LINK_DETACH: PN_EXPIRE_WITH_LINK, |
| TerminusExpiryPolicy.SESSION_END: PN_EXPIRE_WITH_SESSION, |
| TerminusExpiryPolicy.CONNECTION_CLOSE: PN_EXPIRE_WITH_CONNECTION, |
| TerminusExpiryPolicy.NEVER: PN_EXPIRE_NEVER |
| } |
| |
| DISTRIBUTION_MODE_P2J = { |
| PN_DIST_MODE_UNSPECIFIED: None, |
| PN_DIST_MODE_COPY: Symbol.valueOf("copy"), |
| PN_DIST_MODE_MOVE: Symbol.valueOf("move") |
| } |
| |
| DISTRIBUTION_MODE_J2P = { |
| None: PN_DIST_MODE_UNSPECIFIED, |
| Symbol.valueOf("copy"): PN_DIST_MODE_COPY, |
| Symbol.valueOf("move"): PN_DIST_MODE_MOVE |
| } |
| |
| class pn_terminus: |
| |
| def __init__(self, type): |
| self.type = type |
| self.address = None |
| self.durability = PN_NONDURABLE |
| self.expiry_policy = PN_EXPIRE_WITH_SESSION |
| self.distribution_mode = PN_DIST_MODE_UNSPECIFIED |
| self.timeout = 0 |
| self.dynamic = False |
| self.properties = pn_data(0) |
| self.capabilities = pn_data(0) |
| self.outcomes = pn_data(0) |
| self.filter = pn_data(0) |
| |
| def copy(self, src): |
| self.type = src.type |
| self.address = src.address |
| self.durability = src.durability |
| self.expiry_policy = src.expiry_policy |
| self.timeout = src.timeout |
| self.dynamic = src.dynamic |
| self.properties = src.properties |
| self.capabilities = src.capabilities |
| self.outcomes = src.outcomes |
| self.filter = src.filter |
| |
| def decode(self, impl): |
| if impl is not None: |
| self.type = TERMINUS_TYPES_J2P[impl.__class__] |
| if self.type in (PN_SOURCE, PN_TARGET): |
| self.address = impl.getAddress() |
| self.durability = DURABILITY_J2P[impl.getDurable()] |
| self.expiry_policy = EXPIRY_POLICY_J2P[impl.getExpiryPolicy()] |
| self.timeout = impl.getTimeout().longValue() |
| self.dynamic = impl.getDynamic() |
| obj2dat(impl.getDynamicNodeProperties(), self.properties) |
| array2dat(impl.getCapabilities(), PN_SYMBOL, self.capabilities) |
| if self.type == PN_SOURCE: |
| self.distribution_mode = DISTRIBUTION_MODE_J2P[impl.getDistributionMode()] |
| array2dat(impl.getOutcomes(), PN_SYMBOL, self.outcomes) |
| obj2dat(impl.getFilter(), self.filter) |
| |
| def encode(self): |
| impl = TERMINUS_TYPES_P2J[self.type]() |
| if self.type in (PN_SOURCE, PN_TARGET): |
| impl.setAddress(self.address) |
| impl.setDurable(DURABILITY_P2J[self.durability]) |
| impl.setExpiryPolicy(EXPIRY_POLICY_P2J[self.expiry_policy]) |
| impl.setTimeout(UnsignedInteger.valueOf(self.timeout)) |
| impl.setDynamic(self.dynamic) |
| props = dat2obj(self.properties) |
| caps = dat2obj(self.capabilities) |
| if props: impl.setDynamicNodeProperties(props) |
| if caps: |
| impl.setCapabilities(*array(list(caps), Symbol)) |
| if self.type == PN_SOURCE: |
| impl.setDistributionMode(DISTRIBUTION_MODE_P2J[self.distribution_mode]) |
| outcomes = dat2obj(self.outcomes) |
| filter = dat2obj(self.filter) |
| if outcomes: impl.setOutcomes(outcomes) |
| if filter: impl.setFilter(filter) |
| return impl |
| |
| def pn_terminus_get_type(terminus): |
| return terminus.type |
| |
| def pn_terminus_set_type(terminus, type): |
| terminus.type = type |
| return 0 |
| |
| def pn_terminus_get_address(terminus): |
| return terminus.address |
| |
| def pn_terminus_set_address(terminus, address): |
| terminus.address = address |
| return 0 |
| |
| def pn_terminus_get_durability(terminus): |
| return terminus.durability |
| |
| def pn_terminus_get_expiry_policy(terminus): |
| return terminus.expiry_policy |
| |
| def pn_terminus_set_timeout(terminus, timeout): |
| terminus.timeout = timeout |
| return 0 |
| |
| def pn_terminus_get_timeout(terminus): |
| return terminus.timeout |
| |
| def pn_terminus_get_distribution_mode(terminus): |
| return terminus.distribution_mode |
| |
| def pn_terminus_set_distribution_mode(terminus, mode): |
| terminus.distribution_mode = mode |
| return 0 |
| |
| def pn_terminus_is_dynamic(terminus): |
| return terminus.dynamic |
| |
| def pn_terminus_set_dynamic(terminus, dynamic): |
| terminus.dynamic = dynamic |
| return 0 |
| |
| def pn_terminus_properties(terminus): |
| return terminus.properties |
| |
| def pn_terminus_capabilities(terminus): |
| return terminus.capabilities |
| |
| def pn_terminus_outcomes(terminus): |
| return terminus.outcomes |
| |
| def pn_terminus_filter(terminus): |
| return terminus.filter |
| |
| def pn_terminus_copy(terminus, src): |
| terminus.copy(src) |
| return 0 |
| |
| class pn_link_wrapper(endpoint_wrapper): |
| |
| def __init__(self, impl): |
| endpoint_wrapper.__init__(self, impl) |
| self.source = pn_terminus(PN_SOURCE) |
| self.remote_source = pn_terminus(PN_UNSPECIFIED) |
| self.target = pn_terminus(PN_TARGET) |
| self.remote_target = pn_terminus(PN_UNSPECIFIED) |
| |
| def on_open(self): |
| self.impl.setSource(self.source.encode()) |
| self.impl.setTarget(self.target.encode()) |
| |
| def pn_link_attachments(link): |
| return link.impl.attachments() |
| |
| def pn_link_source(link): |
| link.source.decode(link.impl.getSource()) |
| return link.source |
| |
| def pn_link_remote_source(link): |
| link.remote_source.decode(link.impl.getRemoteSource()) |
| return link.remote_source |
| |
| def pn_link_target(link): |
| link.target.decode(link.impl.getTarget()) |
| return link.target |
| |
| def pn_link_remote_target(link): |
| link.remote_target.decode(link.impl.getRemoteTarget()) |
| return link.remote_target |
| |
| def pn_link_condition(link): |
| return link.condition |
| |
| def pn_link_remote_condition(link): |
| return remote_condition(link) |
| |
| SND_SETTLE_MODE_P2J = { |
| PN_SND_UNSETTLED: SenderSettleMode.UNSETTLED, |
| PN_SND_SETTLED: SenderSettleMode.SETTLED, |
| PN_SND_MIXED: SenderSettleMode.MIXED, |
| None: None |
| } |
| |
| SND_SETTLE_MODE_J2P = { |
| SenderSettleMode.UNSETTLED: PN_SND_UNSETTLED, |
| SenderSettleMode.SETTLED: PN_SND_SETTLED, |
| SenderSettleMode.MIXED: PN_SND_MIXED, |
| None: None |
| } |
| |
| def pn_link_set_snd_settle_mode(link, mode): |
| link.impl.setSenderSettleMode(SND_SETTLE_MODE_P2J[mode]) |
| |
| def pn_link_snd_settle_mode(link): |
| return SND_SETTLE_MODE_J2P[link.impl.getSenderSettleMode()] |
| |
| def pn_link_remote_snd_settle_mode(link): |
| return SND_SETTLE_MODE_J2P[link.impl.getRemoteSenderSettleMode()] |
| |
| RCV_SETTLE_MODE_P2J = { |
| PN_RCV_FIRST: ReceiverSettleMode.FIRST, |
| PN_RCV_SECOND: ReceiverSettleMode.SECOND, |
| None: None |
| } |
| |
| RCV_SETTLE_MODE_J2P = { |
| ReceiverSettleMode.FIRST: PN_RCV_FIRST, |
| ReceiverSettleMode.SECOND: PN_RCV_SECOND, |
| None: None |
| } |
| |
| def pn_link_set_rcv_settle_mode(link, mode): |
| link.impl.setReceiverSettleMode(RCV_SETTLE_MODE_P2J[mode]) |
| |
| def pn_link_rcv_settle_mode(link): |
| return RCV_SETTLE_MODE_J2P[link.impl.getReceiverSettleMode()] |
| |
| def pn_link_remote_rcv_settle_mode(link): |
| return RCV_SETTLE_MODE_J2P[link.impl.getRemoteReceiverSettleMode()] |
| |
| def pn_link_is_sender(link): |
| return isinstance(link.impl, Sender) |
| |
| def pn_link_is_receiver(link): |
| return isinstance(link.impl, Receiver) |
| |
| def pn_link_head(conn, mask): |
| local, remote = mask2set(mask) |
| return wrap(conn.impl.linkHead(local, remote), pn_link_wrapper) |
| |
| def pn_link_next(link, mask): |
| local, remote = mask2set(mask) |
| return wrap(link.impl.next(local, remote), pn_link_wrapper) |
| |
| def pn_link_session(link): |
| return wrap(link.impl.getSession(), pn_session_wrapper) |
| |
| def pn_link_state(link): |
| return endpoint_state(link.impl) |
| |
| def pn_link_name(link): |
| return link.impl.getName() |
| |
| def pn_link_open(link): |
| link.on_open() |
| link.impl.open() |
| |
| def pn_link_close(link): |
| link.on_close() |
| link.impl.close() |
| |
| def pn_link_detach(link): |
| link.on_close() |
| link.impl.detach() |
| |
| def pn_link_flow(link, n): |
| link.impl.flow(n) |
| |
| def pn_link_drain(link, n): |
| link.impl.drain(n) |
| |
| def pn_link_drained(link): |
| return link.impl.drained() |
| |
| def pn_link_draining(link): |
| return link.impl.draining() |
| |
| def pn_link_credit(link): |
| return link.impl.getCredit() |
| |
| def pn_link_queued(link): |
| return link.impl.getQueued() |
| |
| def pn_link_get_drain(link): |
| return link.impl.getDrain(); |
| |
| def pn_link_set_drain(link, drain): |
| return link.impl.setDrain(drain); |
| |
| def pn_link_unsettled(link): |
| return link.impl.getUnsettled() |
| |
| def pn_link_send(link, bytes): |
| return link.impl.send(array(bytes, 'b'), 0, len(bytes)) |
| |
| def pn_link_recv(link, limit): |
| ary = zeros(limit, 'b') |
| n = link.impl.recv(ary, 0, limit) |
| if n >= 0: |
| bytes = ary[:n].tostring() |
| else: |
| bytes = None |
| return n, bytes |
| |
| def pn_link_advance(link): |
| return link.impl.advance() |
| |
| def pn_link_current(link): |
| return wrap(link.impl.current(), pn_delivery_wrapper) |
| |
| def pn_link_free(link): |
| link.impl.free() |
| |
| def pn_work_head(conn): |
| return wrap(conn.impl.getWorkHead(), pn_delivery_wrapper) |
| |
| def pn_work_next(dlv): |
| return wrap(dlv.impl.getWorkNext(), pn_delivery_wrapper) |
| |
| DELIVERY_STATES = { |
| Received: PN_RECEIVED, |
| Accepted: PN_ACCEPTED, |
| Rejected: PN_REJECTED, |
| Released: PN_RELEASED, |
| Modified: PN_MODIFIED, |
| None.__class__: 0 |
| } |
| |
| DISPOSITIONS = { |
| PN_RECEIVED: Received, |
| PN_ACCEPTED: Accepted, |
| PN_REJECTED: Rejected, |
| PN_RELEASED: Released, |
| PN_MODIFIED: Modified, |
| 0: lambda: None |
| } |
| |
| class pn_disposition: |
| |
| def __init__(self): |
| self.type = 0 |
| self.data = pn_data(0) |
| self.failed = False |
| self.undeliverable = False |
| self.annotations = pn_data(0) |
| self.condition = pn_condition() |
| self.section_number = 0 |
| self.section_offset = 0 |
| |
| def decode(self, impl): |
| self.type = DELIVERY_STATES[impl.__class__] |
| |
| if self.type == PN_REJECTED: |
| self.condition.decode(impl.getError()) |
| else: |
| pn_condition_clear(self.condition) |
| |
| if self.type == PN_MODIFIED: |
| self.failed = impl.getDeliveryFailed() |
| self.undeliverable = impl.getUndeliverableHere() |
| obj2dat(impl.getMessageAnnotations(), self.annotations) |
| else: |
| self.failed = False |
| self.undeliverable = False |
| pn_data_clear(self.annotations) |
| |
| if self.type == PN_RECEIVED: |
| self.section_number = impl.getSectionNumber().longValue() |
| self.section_offset = impl.getSectionOffset().longValue() |
| else: |
| self.section_number = 0 |
| self.section_offset = 0 |
| |
| self.data.clear() |
| if impl: |
| # XXX |
| #self.data.putObject(impl) |
| pass |
| self.data.rewind() |
| |
| def encode(self): |
| impl = DISPOSITIONS[self.type]() |
| |
| if impl is None: |
| return impl |
| |
| if self.type == PN_REJECTED: |
| impl.setError(self.condition.encode()) |
| |
| if self.type == PN_MODIFIED: |
| impl.setDeliveryFailed(self.failed) |
| impl.setUndeliverableHere(self.undeliverable) |
| ann = dat2obj(self.annotations) |
| if ann: impl.setMessageAnnotations(ann) |
| |
| if self.type == PN_RECEIVED: |
| if self.section_number: |
| impl.setSectionNumber(UnsignedInteger.valueOf(self.section_number)) |
| if self.section_offset: |
| impl.setSectionOffset(UnsignedLong.valueOf(self.section_offset)) |
| |
| return impl |
| |
| def pn_disposition_type(dsp): |
| return dsp.type |
| |
| def pn_disposition_is_failed(dsp): |
| return dsp.failed |
| |
| def pn_disposition_set_failed(dsp, failed): |
| dsp.failed = failed |
| |
| def pn_disposition_is_undeliverable(dsp): |
| return dsp.undeliverable |
| |
| def pn_disposition_set_undeliverable(dsp, undeliverable): |
| dsp.undeliverable = undeliverable |
| |
| def pn_disposition_data(dsp): |
| return dsp.data |
| |
| def pn_disposition_annotations(dsp): |
| return dsp.annotations |
| |
| def pn_disposition_condition(dsp): |
| return dsp.condition |
| |
| def pn_disposition_get_section_number(dsp): |
| return dsp.section_number |
| |
| def pn_disposition_set_section_number(dsp, number): |
| dsp.section_number = number |
| |
| def pn_disposition_get_section_offset(dsp): |
| return dsp.section_offset |
| |
| def pn_disposition_set_section_offset(dsp, offset): |
| dsp.section_offset = offset |
| |
| class pn_delivery_wrapper: |
| |
| def __init__(self, impl): |
| self.impl = impl |
| self.local = pn_disposition() |
| self.remote = pn_disposition() |
| |
| def pn_delivery(link, tag): |
| return wrap(link.impl.delivery(array(tag, 'b')), pn_delivery_wrapper) |
| |
| def pn_delivery_tag(dlv): |
| return dlv.impl.getTag().tostring() |
| |
| def pn_delivery_attachments(dlv): |
| return dlv.impl.attachments() |
| |
| def pn_delivery_partial(dlv): |
| return dlv.impl.isPartial() |
| |
| def pn_delivery_pending(dlv): |
| return dlv.impl.pending() |
| |
| def pn_delivery_writable(dlv): |
| return dlv.impl.isWritable() |
| |
| def pn_delivery_readable(dlv): |
| return dlv.impl.isReadable() |
| |
| def pn_delivery_updated(dlv): |
| return dlv.impl.isUpdated() |
| |
| def pn_delivery_settled(dlv): |
| return dlv.impl.remotelySettled() |
| |
| def pn_delivery_local(dlv): |
| dlv.local.decode(dlv.impl.getLocalState()) |
| return dlv.local |
| |
| def pn_delivery_local_state(dlv): |
| dlv.local.decode(dlv.impl.getLocalState()) |
| return dlv.local.type |
| |
| def pn_delivery_remote(dlv): |
| dlv.remote.decode(dlv.impl.getRemoteState()) |
| return dlv.remote |
| |
| def pn_delivery_remote_state(dlv): |
| dlv.remote.decode(dlv.impl.getRemoteState()) |
| return dlv.remote.type |
| |
| def pn_delivery_update(dlv, state): |
| dlv.local.type = state |
| dlv.impl.disposition(dlv.local.encode()) |
| |
| def pn_delivery_link(dlv): |
| return wrap(dlv.impl.getLink(), pn_link_wrapper) |
| |
| def pn_delivery_settle(dlv): |
| dlv.impl.settle() |
| |
| class pn_transport_wrapper: |
| def __init__(self, impl): |
| self.impl = impl |
| self.server = False |
| self.condition = pn_condition() |
| |
| def pn_transport(): |
| return wrap(Proton.transport(), pn_transport_wrapper) |
| |
| def pn_transport_attachments(trans): |
| return trans.impl.attachments() |
| |
| def pn_transport_set_server(trans): |
| trans.server = True; |
| |
| def pn_transport_get_max_frame(trans): |
| return trans.impl.getMaxFrameSize() |
| |
| def pn_transport_set_max_frame(trans, value): |
| trans.impl.setMaxFrameSize(value) |
| |
| def pn_transport_get_remote_max_frame(trans): |
| return trans.impl.getRemoteMaxFrameSize() |
| |
| def pn_transport_set_idle_timeout(trans, value): |
| trans.impl.setIdleTimeout(value); |
| |
| def pn_transport_get_idle_timeout(trans): |
| return trans.impl.getIdleTimeout() |
| |
| def pn_transport_get_remote_idle_timeout(trans): |
| return trans.impl.getRemoteIdleTimeout() |
| |
| def pn_transport_get_frames_input(trans): |
| return trans.impl.getFramesInput() |
| |
| def pn_transport_get_frames_output(trans): |
| return trans.impl.getFramesOutput() |
| |
| def pn_transport_set_channel_max(trans, n): |
| trans.impl.setChannelMax(n) |
| |
| def pn_transport_get_channel_max(trans): |
| return trans.impl.getChannelMax() |
| |
| def pn_transport_remote_channel_max(trans): |
| return trans.impl.getRemoteChannelMax() |
| |
| def pn_transport_tick(trans, now): |
| return trans.impl.tick(now); |
| |
| def pn_transport_bind(trans, conn): |
| trans.impl.bind(conn.impl) |
| return 0 |
| |
| def pn_transport_unbind(trans): |
| trans.impl.unbind() |
| return 0 |
| |
| def pn_transport_set_pytracer(trans, tracer): |
| import warnings |
| warnings.warn("TODO pn_transport_set_tracer", stacklevel=2) |
| |
| def pn_transport_trace(trans, n): |
| trans.impl.trace(n) |
| |
| def pn_transport_pending(trans): |
| return trans.impl.pending() |
| |
| def pn_transport_peek(trans, size): |
| size = min(trans.impl.pending(), size) |
| ba = zeros(size, 'b') |
| if size: |
| bb = trans.impl.head() |
| bb.get(ba) |
| bb.position(0) |
| return 0, ba.tostring() |
| |
| def pn_transport_pop(trans, size): |
| trans.impl.pop(size) |
| |
| def pn_transport_capacity(trans): |
| return trans.impl.capacity() |
| |
| def pn_transport_push(trans, input): |
| result = 0 |
| while input: |
| cap = pn_transport_capacity(trans) |
| if cap < 0: |
| return cap |
| elif len(input) > cap: |
| trimmed = input[:cap] |
| else: |
| trimmed = input |
| |
| bb = trans.impl.tail() |
| bb.put(array(trimmed, 'b')) |
| trans.impl.process() |
| input = input[cap:] |
| result += len(trimmed) |
| return result |
| |
| def pn_transport_close_head(trans): |
| trans.impl.close_head() |
| return 0 |
| |
| def pn_transport_close_tail(trans): |
| trans.impl.close_tail() |
| return 0 |
| |
| def pn_transport_closed(trans): |
| return trans.impl.isClosed() |
| |
| def pn_transport_condition(trans): |
| trans.condition.decode(trans.impl.getCondition()) |
| return trans.condition |
| |
| from org.apache.qpid.proton.engine import Event |
| |
| PN_REACTOR_INIT = Event.Type.REACTOR_INIT |
| PN_REACTOR_QUIESCED = Event.Type.REACTOR_QUIESCED |
| PN_REACTOR_FINAL = Event.Type.REACTOR_FINAL |
| |
| PN_TIMER_TASK = Event.Type.TIMER_TASK |
| |
| PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT |
| PN_CONNECTION_BOUND = Event.Type.CONNECTION_BOUND |
| PN_CONNECTION_UNBOUND = Event.Type.CONNECTION_UNBOUND |
| PN_CONNECTION_LOCAL_OPEN = Event.Type.CONNECTION_LOCAL_OPEN |
| PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN |
| PN_CONNECTION_LOCAL_CLOSE = Event.Type.CONNECTION_LOCAL_CLOSE |
| PN_CONNECTION_REMOTE_CLOSE = Event.Type.CONNECTION_REMOTE_CLOSE |
| PN_CONNECTION_FINAL = Event.Type.CONNECTION_FINAL |
| PN_SESSION_INIT = Event.Type.SESSION_INIT |
| PN_SESSION_LOCAL_OPEN = Event.Type.SESSION_LOCAL_OPEN |
| PN_SESSION_REMOTE_OPEN = Event.Type.SESSION_REMOTE_OPEN |
| PN_SESSION_LOCAL_CLOSE = Event.Type.SESSION_LOCAL_CLOSE |
| PN_SESSION_REMOTE_CLOSE = Event.Type.SESSION_REMOTE_CLOSE |
| PN_SESSION_FINAL = Event.Type.SESSION_FINAL |
| PN_LINK_INIT = Event.Type.LINK_INIT |
| PN_LINK_LOCAL_OPEN = Event.Type.LINK_LOCAL_OPEN |
| PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN |
| PN_LINK_LOCAL_CLOSE = Event.Type.LINK_LOCAL_CLOSE |
| PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE |
| PN_LINK_LOCAL_DETACH = Event.Type.LINK_LOCAL_DETACH |
| PN_LINK_REMOTE_DETACH = Event.Type.LINK_REMOTE_DETACH |
| PN_LINK_FLOW = Event.Type.LINK_FLOW |
| PN_LINK_FINAL = Event.Type.LINK_FINAL |
| PN_DELIVERY = Event.Type.DELIVERY |
| PN_TRANSPORT = Event.Type.TRANSPORT |
| PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR |
| PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED |
| PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED |
| PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED |
| PN_SELECTABLE_INIT = Event.Type.SELECTABLE_INIT |
| PN_SELECTABLE_UPDATED = Event.Type.SELECTABLE_UPDATED |
| PN_SELECTABLE_READABLE = Event.Type.SELECTABLE_READABLE |
| PN_SELECTABLE_WRITABLE = Event.Type.SELECTABLE_WRITABLE |
| PN_SELECTABLE_EXPIRED = Event.Type.SELECTABLE_EXPIRED |
| PN_SELECTABLE_ERROR = Event.Type.SELECTABLE_ERROR |
| PN_SELECTABLE_FINAL = Event.Type.SELECTABLE_FINAL |
| |
| def pn_collector(): |
| return Proton.collector() |
| |
| def pn_connection_collect(conn, coll): |
| conn.impl.collect(coll) |
| |
| class pn_event: |
| |
| def __init__(self, impl): |
| self.impl = impl |
| |
| def copy(self): |
| return pn_event(self.impl.copy()) |
| |
| def pn_collector_peek(coll): |
| ev = coll.peek() |
| if ev: |
| return pn_event(ev.copy()) |
| else: |
| return None |
| |
| def pn_collector_pop(coll): |
| coll.pop() |
| |
| def pn_collector_free(coll): |
| pass |
| |
| def pn_event_reactor(event): |
| return event.impl.getReactor() |
| |
| def pn_event_connection(event): |
| return wrap(event.impl.getConnection(), pn_connection_wrapper) |
| |
| def pn_event_session(event): |
| return wrap(event.impl.getSession(), pn_session_wrapper) |
| |
| def pn_event_link(event): |
| return wrap(event.impl.getLink(), pn_link_wrapper) |
| |
| def pn_event_delivery(event): |
| return wrap(event.impl.getDelivery(), pn_delivery_wrapper) |
| |
| def pn_event_transport(event): |
| return wrap(event.impl.getTransport(), pn_transport_wrapper) |
| |
| from org.apache.qpid.proton.engine.impl import ConnectionImpl, SessionImpl, \ |
| SenderImpl, ReceiverImpl, DeliveryImpl, TransportImpl |
| from org.apache.qpid.proton.reactor.impl import TaskImpl, SelectableImpl |
| |
| J2C = { |
| ConnectionImpl: "pn_connection", |
| SessionImpl: "pn_session", |
| SenderImpl: "pn_link", |
| ReceiverImpl: "pn_link", |
| DeliveryImpl: "pn_delivery", |
| TransportImpl: "pn_transport", |
| TaskImpl: "pn_task", |
| SelectableImpl: "pn_selectable" |
| } |
| |
| wrappers = { |
| "pn_connection": lambda x: wrap(x, pn_connection_wrapper), |
| "pn_session": lambda x: wrap(x, pn_session_wrapper), |
| "pn_link": lambda x: wrap(x, pn_link_wrapper), |
| "pn_delivery": lambda x: wrap(x, pn_delivery_wrapper), |
| "pn_transport": lambda x: wrap(x, pn_transport_wrapper), |
| "pn_task": lambda x: x, |
| "pn_selectable": lambda x: x, |
| "pn_void": lambda x: x |
| } |
| |
| def pn_event_class(event): |
| ctx = event.impl.getContext() |
| return J2C.get(ctx.getClass(), "pn_void") |
| |
| def pn_event_context(event): |
| return wrappers[pn_event_class(event)](event.impl.getContext()) |
| |
| def pn_event_type(event): |
| return event.impl.getEventType() |
| |
| def pn_event_root(event): |
| return event.impl.getRootHandler() |
| |
| def pn_event_type_name(etype): |
| return str(etype) |
| |
| def pn_event_category(event): |
| return event.impl.getCategory() |
| |
| def pn_event_attachments(event): |
| return event.impl.attachments() |
| |
| def pn_event_copy(event): |
| return event.copy() |
| |
| class TypeExtender: |
| def __init__(self, number): |
| pass |
| def next(self): |
| class CustomEvent(EventType): |
| def isValid(self): |
| return True |
| return CustomEvent() |