blob: 7837d64a443a4cb1a920ea82b027505ca5d024f3 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import heapq, os, Queue, re, socket, time, types
from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
from select import select
class FlowController(Handler):
"""
A handler that controls a configured credit window for associated
receivers.
"""
def __init__(self, window=1):
self.window = window
def top_up(self, link):
delta = self.window - link.credit
link.flow(delta)
def on_link_local_open(self, event):
if event.link.is_receiver:
self.top_up(event.link)
def on_link_remote_open(self, event):
if event.link.is_receiver:
self.top_up(event.link)
def on_link_flow(self, event):
if event.link.is_receiver:
self.top_up(event.link)
def on_delivery(self, event):
if not event.delivery.released and event.delivery.link.is_receiver:
self.top_up(event.delivery.link)
def nested_handlers(handlers):
# currently only allows for a single level of nesting
nested = []
for h in handlers:
nested.append(h)
if hasattr(h, 'handlers'):
nested.extend(getattr(h, 'handlers'))
return nested
def add_nested_handler(handler, nested):
if hasattr(handler, 'handlers'):
getattr(handler, 'handlers').append(nested)
else:
handler.handlers = [nested]
class ScopedHandler(Handler):
"""
An internal handler that checks for handlers scoped to the engine
objects an event relates to. E.g it allows delivery, link, session
or connection scoped handlers that will only be called with events
for the object to which they are scoped.
"""
scopes = {
"pn_connection": ["connection"],
"pn_session": ["session", "connection"],
"pn_link": ["link", "session", "connection"],
"pn_delivery": ["delivery", "link", "session", "connection"]
}
def on_unhandled(self, method, args):
event = args[0]
if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
return
objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])]
targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
for h in handlers:
h(event)
class OutgoingMessageHandler(Handler):
"""
A utility for simpler and more intuitive handling of delivery
events related to outgoing i.e. sent messages.
"""
def __init__(self, auto_settle=True, delegate=None):
self.auto_settle = auto_settle
self.delegate = delegate
def on_link_flow(self, event):
if event.link.is_sender and event.link.credit:
self.on_credit(event)
def on_delivery(self, event):
dlv = event.delivery
if dlv.released: return
if dlv.link.is_sender and dlv.updated:
if dlv.remote_state == Delivery.ACCEPTED:
self.on_accepted(event)
elif dlv.remote_state == Delivery.REJECTED:
self.on_rejected(event)
elif dlv.remote_state == Delivery.RELEASED:
self.on_released(event)
elif dlv.remote_state == Delivery.MODIFIED:
self.on_modified(event)
if dlv.settled:
self.on_settled(event)
if self.auto_settle:
dlv.settle()
def on_credit(self, event):
if self.delegate:
dispatch(self.delegate, 'on_credit', event)
def on_accepted(self, event):
if self.delegate:
dispatch(self.delegate, 'on_accepted', event)
def on_rejected(self, event):
if self.delegate:
dispatch(self.delegate, 'on_rejected', event)
def on_released(self, event):
if self.delegate:
dispatch(self.delegate, 'on_released', event)
def on_modified(self, event):
if self.delegate:
dispatch(self.delegate, 'on_modified', event)
def on_settled(self, event):
if self.delegate:
dispatch(self.delegate, 'on_settled', event)
def recv_msg(delivery):
msg = Message()
msg.decode(delivery.link.recv(delivery.pending))
delivery.link.advance()
return msg
class Reject(ProtonException):
"""
An exception that indicate a message should be rejected
"""
pass
class Acking(object):
def accept(self, delivery):
self.settle(delivery, Delivery.ACCEPTED)
def reject(self, delivery):
self.settle(delivery, Delivery.REJECTED)
def release(self, delivery, delivered=True):
if delivered:
self.settle(delivery, Delivery.MODIFIED)
else:
self.settle(delivery, Delivery.RELEASED)
def settle(self, delivery, state=None):
if state:
delivery.update(state)
delivery.settle()
class IncomingMessageHandler(Handler, Acking):
"""
A utility for simpler and more intuitive handling of delivery
events related to incoming i.e. received messages.
"""
def __init__(self, auto_accept=True, delegate=None):
self.delegate = delegate
self.auto_accept = auto_accept
def on_delivery(self, event):
dlv = event.delivery
if dlv.released or not dlv.link.is_receiver: return
if dlv.readable and not dlv.partial:
event.message = recv_msg(dlv)
try:
self.on_message(event)
if self.auto_accept:
dlv.update(Delivery.ACCEPTED)
dlv.settle()
except Reject:
dlv.update(Delivery.REJECTED)
dlv.settle()
elif dlv.updated and dlv.settled:
self.on_settled(event)
def on_message(self, event):
if self.delegate:
dispatch(self.delegate, 'on_message', event)
def on_settled(self, event):
if self.delegate:
dispatch(self.delegate, 'on_settled', event)
class EndpointStateHandler(Handler):
"""
A utility that exposes 'endpoint' events i.e. the open/close for
links, sessions and connections in a more intuitive manner. A
XXX_opened method will be called when both local and remote peers
have opened the link, session or connection. This can be used to
confirm a locally initiated action for example. A XXX_opening
method will be called when the remote peer has requested an open
that was not initiated locally. By default this will simply open
locally, which then triggers the XXX_opened call. The same applies
to close.
"""
def __init__(self, peer_close_is_error=False, delegate=None):
self.delegate = delegate
self.peer_close_is_error = peer_close_is_error
def is_local_open(self, endpoint):
return endpoint.state & Endpoint.LOCAL_ACTIVE
def is_local_uninitialised(self, endpoint):
return endpoint.state & Endpoint.LOCAL_UNINIT
def is_local_closed(self, endpoint):
return endpoint.state & Endpoint.LOCAL_CLOSED
def is_remote_open(self, endpoint):
return endpoint.state & Endpoint.REMOTE_ACTIVE
def is_remote_closed(self, endpoint):
return endpoint.state & Endpoint.REMOTE_CLOSED
def print_error(self, endpoint, endpoint_type):
if endpoint.remote_condition:
print endpoint.remote_condition.description
elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint):
print "%s closed by peer" % endpoint_type
def on_link_remote_close(self, event):
if event.link.remote_condition:
self.on_link_error(event)
elif self.is_local_closed(event.link):
self.on_link_closed(event)
else:
self.on_link_closing(event)
event.link.close()
def on_session_remote_close(self, event):
if event.session.remote_condition:
self.on_session_error(event)
elif self.is_local_closed(event.session):
self.on_session_closed(event)
else:
self.on_session_closing(event)
event.session.close()
def on_connection_remote_close(self, event):
if event.connection.remote_condition:
self.on_connection_error(event)
elif self.is_local_closed(event.connection):
self.on_connection_closed(event)
else:
self.on_connection_closing(event)
event.connection.close()
def on_connection_local_open(self, event):
if self.is_remote_open(event.connection):
self.on_connection_opened(event)
def on_connection_remote_open(self, event):
if self.is_local_open(event.connection):
self.on_connection_opened(event)
elif self.is_local_uninitialised(event.connection):
self.on_connection_opening(event)
event.connection.open()
def on_session_local_open(self, event):
if self.is_remote_open(event.session):
self.on_session_opened(event)
def on_session_remote_open(self, event):
if self.is_local_open(event.session):
self.on_session_opened(event)
elif self.is_local_uninitialised(event.session):
self.on_session_opening(event)
event.session.open()
def on_link_local_open(self, event):
if self.is_remote_open(event.link):
self.on_link_opened(event)
def on_link_remote_open(self, event):
if self.is_local_open(event.link):
self.on_link_opened(event)
elif self.is_local_uninitialised(event.link):
self.on_link_opening(event)
event.link.open()
def on_connection_opened(self, event):
if self.delegate:
dispatch(self.delegate, 'on_connection_opened', event)
def on_session_opened(self, event):
if self.delegate:
dispatch(self.delegate, 'on_session_opened', event)
def on_link_opened(self, event):
if self.delegate:
dispatch(self.delegate, 'on_link_opened', event)
def on_connection_opening(self, event):
if self.delegate:
dispatch(self.delegate, 'on_connection_opening', event)
def on_session_opening(self, event):
if self.delegate:
dispatch(self.delegate, 'on_session_opening', event)
def on_link_opening(self, event):
if self.delegate:
dispatch(self.delegate, 'on_link_opening', event)
def on_connection_error(self, event):
if self.delegate:
dispatch(self.delegate, 'on_connection_error', event)
else:
self.print_error(event.connection, "connection")
def on_session_error(self, event):
if self.delegate:
dispatch(self.delegate, 'on_session_error', event)
else:
self.print_error(event.session, "session")
event.connection.close()
def on_link_error(self, event):
if self.delegate:
dispatch(self.delegate, 'on_link_error', event)
else:
self.print_error(event.link, "link")
event.connection.close()
def on_connection_closed(self, event):
if self.delegate:
dispatch(self.delegate, 'on_connection_closed', event)
def on_session_closed(self, event):
if self.delegate:
dispatch(self.delegate, 'on_session_closed', event)
def on_link_closed(self, event):
if self.delegate:
dispatch(self.delegate, 'on_link_closed', event)
def on_connection_closing(self, event):
if self.delegate:
dispatch(self.delegate, 'on_connection_closing', event)
elif self.peer_close_is_error:
self.on_connection_error(event)
def on_session_closing(self, event):
if self.delegate:
dispatch(self.delegate, 'on_session_closing', event)
elif self.peer_close_is_error:
self.on_session_error(event)
def on_link_closing(self, event):
if self.delegate:
dispatch(self.delegate, 'on_link_closing', event)
elif self.peer_close_is_error:
self.on_link_error(event)
class MessagingHandler(Handler, Acking):
"""
A general purpose handler that makes the proton-c events somewhat
simpler to deal with and.or avoids repetitive tasks for common use
cases.
"""
def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
self.handlers = []
# FlowController if used needs to see event before
# IncomingMessageHandler, as the latter may involve the
# delivery being released
if prefetch:
self.handlers.append(FlowController(prefetch))
self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
self.handlers.append(IncomingMessageHandler(auto_accept, self))
self.handlers.append(OutgoingMessageHandler(auto_settle, self))
class TransactionalAcking(object):
def accept(self, delivery, transaction):
transaction.accept(delivery)
class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
def __init__(self, auto_settle=True, delegate=None):
super(TransactionHandler, self).__init__(auto_settle, delegate)
def on_settled(self, event):
if hasattr(event.delivery, "transaction"):
event.transaction = event.delivery.transaction
event.delivery.transaction.handle_outcome(event)
def on_transaction_declared(self, event):
if self.delegate:
dispatch(self.delegate, 'on_transaction_declared', event)
def on_transaction_committed(self, event):
if self.delegate:
dispatch(self.delegate, 'on_transaction_committed', event)
def on_transaction_aborted(self, event):
if self.delegate:
dispatch(self.delegate, 'on_transaction_aborted', event)
def on_transaction_declare_failed(self, event):
if self.delegate:
dispatch(self.delegate, 'on_transaction_declare_failed', event)
def on_transaction_commit_failed(self, event):
if self.delegate:
dispatch(self.delegate, 'on_transaction_commit_failed', event)
class TransactionalClientHandler(Handler, TransactionalAcking):
def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
super(TransactionalClientHandler, self).__init__()
self.handlers = []
# FlowController if used needs to see event before
# IncomingMessageHandler, as the latter may involve the
# delivery being released
if prefetch:
self.handlers.append(FlowController(prefetch))
self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
self.handlers.append(IncomingMessageHandler(auto_accept, self))
self.handlers.append(TransactionHandler(auto_settle, self))