Updated event injection and timer
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index bc8d6da..ce3ce79 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -31,6 +31,7 @@
self.url = url
self.delay = 0
self.sent = 0
+ self.load_count = 0
self.records = Queue.Queue(maxsize=50)
def on_start(self, event):
@@ -39,16 +40,19 @@
self.sender = self.container.create_sender(self.url)
def on_records_loaded(self, event):
- if self.records.empty() and event.subject == self.sent:
- print "Exhausted available data, waiting to recheck..."
- # check for new data after 5 seconds
- self.container.schedule(time.time() + 5, link=self.sender, subject="data")
+ if self.records.empty():
+ if event.subject == self.load_count:
+ print "Exhausted available data, waiting to recheck..."
+ # check for new data after 5 seconds
+ self.container.schedule(time.time() + 5, link=self.sender, subject="data")
else:
self.send()
def request_records(self):
if not self.records.full():
- self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.sent))
+ print "loading records..."
+ self.load_count += 1
+ self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.load_count))
def on_credit(self, event):
self.send()
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index e375723..8d19fa3 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -3405,7 +3405,7 @@
def __del__(self):
pn_collector_free(self._impl)
-class EventType:
+class EventType(object):
TYPES = {}
@@ -3485,6 +3485,8 @@
return self.context.connection
elif self.clazz == "pn_delivery" and not self.context.released:
return self.context.link.connection
+ elif hasattr(self.context, 'connection'):
+ return self.context.connection
else:
return None
@@ -3496,6 +3498,8 @@
return self.context.session
elif self.clazz == "pn_delivery" and not self.context.released:
return self.context.link.session
+ elif hasattr(self.context, 'session'):
+ return self.context.session
else:
return None
@@ -3505,6 +3509,8 @@
return self.context
elif self.clazz == "pn_delivery" and not self.context.released:
return self.context.link
+ elif hasattr(self.context, 'link'):
+ return self.context.link
else:
return None
@@ -3528,9 +3534,17 @@
def delivery(self):
if self.clazz == "pn_delivery":
return self.context
+ elif hasattr(self.context, 'delivery'):
+ return self.context.delivery
else:
return None
+ def __getattr__(self, name):
+ if hasattr(self.context, name):
+ return getattr(self.context, name)
+ else:
+ raise AttributeError
+
def __repr__(self):
return "%s(%s)" % (self.type, self.context)
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 7837d64..71ab837 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -72,23 +72,20 @@
or connection scoped handlers that will only be called with events
for the object to which they are scoped.
"""
- scopes = {
- "pn_connection": ["connection"],
- "pn_session": ["session", "connection"],
- "pn_link": ["link", "session", "connection"],
- "pn_delivery": ["delivery", "link", "session", "connection"]
- }
+ scopes = ["delivery", "link", "session", "connection"]
def on_unhandled(self, method, args):
event = args[0]
if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]:
return
- objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])]
+
+ objects = [getattr(event, attr) for attr in self.scopes if hasattr(event, attr) and getattr(event, attr)]
targets = [getattr(o, "context") for o in objects if hasattr(o, "context")]
handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)]
for h in handlers:
h(event)
+
class OutgoingMessageHandler(Handler):
"""
A utility for simpler and more intuitive handling of delivery
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index 16a87e4..52e64b6 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -16,10 +16,12 @@
# specific language governing permissions and limitations
# under the License.
#
-import heapq, os, Queue, socket, time, types
-from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
-from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Session, Terminus, Timeout
-from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
+import os, Queue, socket, time, types
+from heapq import heappush, heappop, nsmallest
+from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
+from proton import Endpoint, Event, EventType, generate_uuid, Handler, Link, Message
+from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, symbol
+from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
from select import select
from proton.handlers import nested_handlers, ScopedHandler
@@ -152,7 +154,7 @@
def tick(self):
t = self.transport.tick(time.time())
- if t: return t - time.time()
+ if t: return t
else: return None
class AmqpAcceptor:
@@ -207,8 +209,8 @@
external thread but handled on the event thread associated with
the loop.
"""
- def __init__(self, events):
- self.events = events
+ def __init__(self, collector):
+ self.collector = collector
self.queue = Queue.Queue()
self.pipe = os.pipe()
self._closed = False
@@ -235,14 +237,64 @@
def readable(self):
os.read(self.pipe[0], 512)
while not self.queue.empty():
- self.events.dispatch(self.queue.get())
+ event = self.queue.get()
+ self.collector.put(event.context, event.type)
def removed(self): pass
def tick(self): return None
+class PQueue:
+
+ def __init__(self):
+ self.entries = []
+
+ def add(self, priority, task):
+ heappush(self.entries, (priority, task))
+
+ def peek(self):
+ if self.entries:
+ return nsmallest(1, self.entries)[0]
+ else:
+ return None
+
+ def pop(self):
+ if self.entries:
+ return heappop(self.entries)
+ else:
+ return None
+
+ def __nonzero__(self):
+ if self.entries:
+ return True
+ else:
+ return False
+
+class Timer:
+ def __init__(self, collector):
+ self.collector = collector
+ self.events = PQueue()
+
+ def schedule(self, deadline, event):
+ self.events.add(deadline, event)
+
+ def tick(self):
+ while self.events:
+ deadline, event = self.events.peek()
+ if time.time() > deadline:
+ self.events.pop()
+ self.collector.put(event.context, event.type)
+ else:
+ return deadline
+ return None
+
+ @property
+ def pending(self):
+ return bool(self.events)
+
class Events(object):
def __init__(self, *handlers):
self.collector = Collector()
+ self.timer = Timer(self.collector)
self.handlers = handlers
def connection(self):
@@ -251,34 +303,61 @@
return conn
def process(self):
+ result = False
while True:
ev = self.collector.peek()
if ev:
self.dispatch(ev)
self.collector.pop()
+ result = True
else:
- return
+ return result
def dispatch(self, event):
for h in self.handlers:
event.dispatch(h)
@property
- def next_interval(self):
- return None
-
- @property
def empty(self):
- return self.collector.peek() == None
+ return self.collector.peek() == None and not self.timer.pending
-class ExtendedEventType(object):
+class Names(object):
+ def __init__(self, base=10000):
+ self.names = []
+ self.base = base
+
+ def number(self, name):
+ if name not in self.names:
+ self.names.append(name)
+ return self.names.index(name) + self.base
+
+class ExtendedEventType(EventType):
+ USED = Names()
"""
Event type identifier for events defined outside the proton-c
library
"""
- def __init__(self, name):
+ def __init__(self, name, number=None):
+ super(ExtendedEventType, self).__init__(number or ExtendedEventType.USED.number(name), "on_%s" % name)
self.name = name
- self.method = "on_%s" % name
+
+class ApplicationEventContext(object):
+ def __init__(self, connection=None, session=None, link=None, delivery=None, subject=None):
+ self.connection = connection
+ self.session = session
+ self.link = link
+ self.delivery = delivery
+ if self.delivery:
+ self.link = self.delivery.link
+ if self.link:
+ self.session = self.link.session
+ if self.session:
+ self.connection = self.session.connection
+ self.subject = subject
+
+ def __repr__(self):
+ objects = [self.connection, self.session, self.link, self.delivery, self.subject]
+ return ", ".join([str(o) for o in objects if o is not None])
class ApplicationEvent(Event):
"""
@@ -286,63 +365,13 @@
an engine object and or an arbitrary subject
"""
def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
- self.type = ExtendedEventType(typename)
- self.subject = subject
- if delivery:
- self.context = delivery
- self.clazz = "pn_delivery"
- elif link:
- self.context = link
- self.clazz = "pn_link"
- elif session:
- self.context = session
- self.clazz = "pn_session"
- elif connection:
- self.context = connection
- self.clazz = "pn_connection"
- else:
- self.context = None
- self.clazz = "none"
-
- def __repr__(self):
- objects = [self.context, self.subject]
- return "%s(%s)" % (self.type.name,
- ", ".join([str(o) for o in objects if o is not None]))
+ super(ApplicationEvent, self).__init__(PN_PYREF, ApplicationEventContext(connection, session, link, delivery, subject), ExtendedEventType(typename))
class StartEvent(ApplicationEvent):
def __init__(self, container):
super(StartEvent, self).__init__("start")
self.container = container
-class ScheduledEvents(Events):
- """
- Support for timed events
- """
- def __init__(self, *handlers):
- super(ScheduledEvents, self).__init__(*handlers)
- self._events = []
-
- def schedule(self, deadline, event):
- heapq.heappush(self._events, (deadline, event))
-
- def process(self):
- super(ScheduledEvents, self).process()
- while self._events and self._events[0][0] <= time.time():
- self.dispatch(heapq.heappop(self._events)[1])
-
- @property
- def next_interval(self):
- if len(self._events):
- deadline = self._events[0][0]
- now = time.time()
- return deadline - now if deadline > now else 0
- else:
- return None
-
- @property
- def empty(self):
- return super(ScheduledEvents, self).empty and len(self._events) == 0
-
def _min(a, b):
if a and b: return min(a, b)
elif a: return a
@@ -380,15 +409,16 @@
def do_work(self, timeout=None):
"""@return True if some work was done, False if time-out expired"""
- self.events.process()
- if self._abort: return
+ tick = self.events.timer.tick()
+ while self.events.process():
+ if self._abort: return
+ tick = self.events.timer.tick()
stable = False
while not stable:
reading = []
writing = []
closed = []
- tick = None
for s in self.selectables:
if s.reading(): reading.append(s)
if s.writing(): writing.append(s)
@@ -403,12 +433,10 @@
if self.redundant:
return
+ if tick:
+ timeout = _min(tick - time.time(), timeout)
if timeout and timeout < 0:
timeout = 0
- if self.events.next_interval and (timeout is None or self.events.next_interval < timeout):
- timeout = self.events.next_interval
- if tick:
- timeout = _min(tick, timeout)
if reading or writing or timeout:
readable, writable, _ = select(reading, writing, [], timeout)
for s in self.selectables:
@@ -683,7 +711,7 @@
def __init__(self, *handlers):
h = [Connector(self), ScopedHandler()]
h.extend(nested_handlers(handlers))
- self.events = ScheduledEvents(*h)
+ self.events = Events(*h)
self.loop = SelectLoop(self.events)
self.trigger = None
self.container_id = str(generate_uuid())
@@ -776,11 +804,11 @@
return AmqpAcceptor(self.events, self, host, port)
def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None):
- self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
+ self.events.timer.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject))
def get_event_trigger(self):
if not self.trigger or self.trigger.closed():
- self.trigger = EventInjector(self.events)
+ self.trigger = EventInjector(self.events.collector)
self.add(self.trigger)
return self.trigger