QPID-5799: provide notification callback for received messages.
diff --git a/qpid/messaging/driver.py b/qpid/messaging/driver.py
index e7d564f..7c30e5d 100644
--- a/qpid/messaging/driver.py
+++ b/qpid/messaging/driver.py
@@ -1368,7 +1368,8 @@
       assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
     rcv.received += 1
     log.debug("RCVD[%s]: %s", ssn.log_id, msg)
-    ssn.incoming.append(msg)
+    ssn.message_received(msg)
+
 
   def _decode(self, xfr):
     dp = EMPTY_DP
diff --git a/qpid/messaging/endpoints.py b/qpid/messaging/endpoints.py
index 7d353e1..6d58b4a 100644
--- a/qpid/messaging/endpoints.py
+++ b/qpid/messaging/endpoints.py
@@ -569,6 +569,7 @@
     self.closed = False
 
     self._lock = connection._lock
+    self._msg_received = None
 
   def __repr__(self):
     return "<Session %s>" % self.name
@@ -600,6 +601,11 @@
     if self.closed:
       raise SessionClosed()
 
+  def message_received(self, msg):
+      self.incoming.append(msg)
+      if self._msg_received:
+          self._msg_received()
+
   @synchronized
   def sender(self, target, **options):
     """
@@ -685,6 +691,18 @@
     return None
 
   @synchronized
+  def set_message_received_handler(self, handler):
+      """Register a callback that will be invoked when a message arrives on the
+      session.  Use with caution: since this callback is invoked in the context
+      of the driver thread, it is not safe to call any of the public messaging
+      APIs from within this callback.  The intent of the handler is to provide
+      an efficient way to notify the application that a message has arrived.
+      This can be useful for those applications that need to schedule a task
+      to poll for received messages without blocking in the messaging API.
+      """
+      self._msg_received = handler
+
+  @synchronized
   def next_receiver(self, timeout=None):
     if self._ecwait(lambda: self.incoming, timeout):
       return self.incoming[0]._receiver
diff --git a/qpid/tests/messaging/endpoints.py b/qpid/tests/messaging/endpoints.py
index 247d6e9..5672237 100644
--- a/qpid/tests/messaging/endpoints.py
+++ b/qpid/tests/messaging/endpoints.py
@@ -660,6 +660,31 @@
     except Detached:
       pass
 
+  def testRxCallback(self):
+    """Verify that the callback is invoked when a message is received.
+    """
+    ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}'
+    class CallbackHandler:
+        def __init__(self):
+            self.handler_called = False
+        def __call__(self):
+            self.handler_called = True
+    cb = CallbackHandler()
+    self.ssn.set_message_received_handler(cb)
+    rcv = self.ssn.receiver(ADDR)
+    rcv.capacity = UNLIMITED
+    snd = self.ssn.sender(ADDR)
+    assert not cb.handler_called
+    snd.send("Ping")
+    deadline = time.time() + self.timeout()
+    while time.time() < deadline:
+        if cb.handler_called:
+            break;
+    assert cb.handler_called
+    snd.close()
+    rcv.close()
+
+
 RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}'
 
 class ReceiverTests(Base):