QPID-7329: Merge branch 'github/pr/10' into trunk

Closes #10

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1750587 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 625c6cc..858e874 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -345,6 +345,19 @@
     }
 }
 
+void Queue::mergeMessageAnnotations(const QueueCursor& position,
+                                    const qpid::types::Variant::Map& messageAnnotations)
+{
+  Mutex::ScopedLock locker(messageLock);
+  Message *message = messages->find(position);
+  if (!message) return;
+
+  qpid::types::Variant::Map::const_iterator it;
+  for (it = messageAnnotations.begin(); it != messageAnnotations.end(); ++it) {
+    message->addAnnotation(it->first, it->second);
+  }
+}
+
 void Queue::release(const QueueCursor& position, bool markRedelivered)
 {
     QueueListeners::NotificationSet copy;
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index cf4c1a8..4b63a41 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -332,6 +332,13 @@
     QPID_BROKER_EXTERN void deliverTo(Message, TxBuffer* = 0);
   public:
     /**
+     * Merges message annotations for an in-memory message as a result of
+     * a modified disposition outcome
+     */
+    QPID_BROKER_EXTERN void mergeMessageAnnotations(const QueueCursor& msg,
+                                                    const qpid::types::Variant::Map& annotations);
+
+    /**
      * Returns a message to the in-memory queue (due to lack
      * of acknowledegement from a receiver). If a consumer is
      * available it will be dispatched immediately, else it
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 20f32a1..abd96a6 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/broker/amqp/DataReader.h"
 #include "qpid/broker/amqp/Outgoing.h"
 #include "qpid/broker/amqp/Exception.h"
 #include "qpid/broker/amqp/Header.h"
@@ -108,6 +109,19 @@
     pn_link_send(link, data, size);
 }
 
+void OutgoingFromQueue::mergeMessageAnnotationsIfRequired(const Record &r)
+{
+    pn_data_t *remoteAnnotationsRaw =
+      pn_disposition_annotations(pn_delivery_remote(r.delivery));
+    if (remoteAnnotationsRaw == 0) {
+      return;
+    }
+
+    qpid::types::Variant::Map remoteMessageAnnotations;
+    DataReader::read(remoteAnnotationsRaw, remoteMessageAnnotations);
+    queue->mergeMessageAnnotations(r.cursor, remoteMessageAnnotations);
+}
+
 void OutgoingFromQueue::handle(pn_delivery_t* delivery)
 {
     size_t i = Record::getIndex(pn_delivery_tag(delivery));
@@ -141,7 +155,7 @@
                 break;
               case PN_MODIFIED:
                 if (preAcquires()) {
-                    //TODO: handle message-annotations
+                    mergeMessageAnnotationsIfRequired(r);
                     if (pn_disposition_is_undeliverable(pn_delivery_remote(delivery))) {
                         if (!trackingUndeliverableMessages) {
                             // observe queue for changes to track undeliverable messages
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index c56c8c0..f4ca469 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -152,6 +152,8 @@
         static size_t getIndex(pn_delivery_tag_t);
     };
 
+    void mergeMessageAnnotationsIfRequired(const Record &r);
+
     const bool exclusive;
     const bool isControllingUser;
     boost::shared_ptr<Queue> queue;