QPID-5799: provide notification callback for received messages.
diff --git a/qpid/messaging/driver.py b/qpid/messaging/driver.py
index e7d564f..7c30e5d 100644
--- a/qpid/messaging/driver.py
+++ b/qpid/messaging/driver.py
@@ -1368,7 +1368,8 @@
assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
rcv.received += 1
log.debug("RCVD[%s]: %s", ssn.log_id, msg)
- ssn.incoming.append(msg)
+ ssn.message_received(msg)
+
def _decode(self, xfr):
dp = EMPTY_DP
diff --git a/qpid/messaging/endpoints.py b/qpid/messaging/endpoints.py
index 7d353e1..6d58b4a 100644
--- a/qpid/messaging/endpoints.py
+++ b/qpid/messaging/endpoints.py
@@ -569,6 +569,7 @@
self.closed = False
self._lock = connection._lock
+ self._msg_received = None
def __repr__(self):
return "<Session %s>" % self.name
@@ -600,6 +601,11 @@
if self.closed:
raise SessionClosed()
+ def message_received(self, msg):
+ self.incoming.append(msg)
+ if self._msg_received:
+ self._msg_received()
+
@synchronized
def sender(self, target, **options):
"""
@@ -685,6 +691,18 @@
return None
@synchronized
+ def set_message_received_handler(self, handler):
+ """Register a callback that will be invoked when a message arrives on the
+ session. Use with caution: since this callback is invoked in the context
+ of the driver thread, it is not safe to call any of the public messaging
+ APIs from within this callback. The intent of the handler is to provide
+ an efficient way to notify the application that a message has arrived.
+ This can be useful for those applications that need to schedule a task
+ to poll for received messages without blocking in the messaging API.
+ """
+ self._msg_received = handler
+
+ @synchronized
def next_receiver(self, timeout=None):
if self._ecwait(lambda: self.incoming, timeout):
return self.incoming[0]._receiver
diff --git a/qpid/tests/messaging/endpoints.py b/qpid/tests/messaging/endpoints.py
index 247d6e9..5672237 100644
--- a/qpid/tests/messaging/endpoints.py
+++ b/qpid/tests/messaging/endpoints.py
@@ -660,6 +660,31 @@
except Detached:
pass
+ def testRxCallback(self):
+ """Verify that the callback is invoked when a message is received.
+ """
+ ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}'
+ class CallbackHandler:
+ def __init__(self):
+ self.handler_called = False
+ def __call__(self):
+ self.handler_called = True
+ cb = CallbackHandler()
+ self.ssn.set_message_received_handler(cb)
+ rcv = self.ssn.receiver(ADDR)
+ rcv.capacity = UNLIMITED
+ snd = self.ssn.sender(ADDR)
+ assert not cb.handler_called
+ snd.send("Ping")
+ deadline = time.time() + self.timeout()
+ while time.time() < deadline:
+ if cb.handler_called:
+ break;
+ assert cb.handler_called
+ snd.close()
+ rcv.close()
+
+
RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
class ReceiverTests(Base):