QPID-3767: add multilink federation tests

git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3767@1333954 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 8952f5d..09eebc5 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -768,6 +768,35 @@
         fetch(cluster[2])
 
 
+    def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
+        """ Prove that traffic can pass between two federated brokers.
+        """
+        tot_time = 0
+        active = False
+        send_session = src_broker.connect().session()
+        sender = send_session.sender(src)
+        receive_session = dst_broker.connect().session()
+        receiver = receive_session.receiver(dst)
+        while not active and tot_time < timeout:
+            sender.send(Message("Hello from Source!"))
+            try:
+                receiver.fetch(timeout = 1)
+                receive_session.acknowledge()
+                # Get this far without Empty exception, and the link is good!
+                active = True
+                while True:
+                    # Keep receiving msgs, as several may have accumulated
+                    receiver.fetch(timeout = 1)
+                    receive_session.acknowledge()
+            except Empty:
+                if not active:
+                    tot_time += 1
+        receiver.close()
+        receive_session.close()
+        sender.close()
+        send_session.close()
+        return active
+
     def test_federation_failover(self):
         """
         Verify that federation operates across failures occuring in a cluster.
@@ -778,38 +807,6 @@
         cluster to newly-added members
         """
 
-        TIMEOUT = 30
-        def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
-            """ Prove that traffic can pass from source fed broker to
-            destination fed broker
-            """
-            tot_time = 0
-            active = False
-            send_session = src_broker.connect().session()
-            sender = send_session.sender(src)
-            receive_session = dst_broker.connect().session()
-            receiver = receive_session.receiver(dst)
-            while not active and tot_time < timeout:
-                sender.send(Message("Hello from Source!"))
-                try:
-                    receiver.fetch(timeout = 1)
-                    receive_session.acknowledge()
-                    # Get this far without Empty exception, and the link is good!
-                    active = True
-                    while True:
-                        # Keep receiving msgs, as several may have accumulated
-                        receiver.fetch(timeout = 1)
-                        receive_session.acknowledge()
-                except Empty:
-                    if not active:
-                        tot_time += 1
-            receiver.close()
-            receive_session.close()
-            sender.close()
-            send_session.close()
-            self.assertTrue(active, "Bridge failed to become active")
-
-
         # 2 node cluster source, 2 node cluster destination
         src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
         src_cluster.ready();
@@ -848,43 +845,145 @@
         self.assertEqual(result.status, 0, result)
 
         # check that traffic passes
-        verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
 
         # add src[2] broker to source cluster
         src_cluster.start(expect=EXPECT_EXIT_FAIL);
         src_cluster.ready();
-        verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
 
         # Kill src[0]. dst[0] should fail over to src[1]
         src_cluster[0].kill()
         for b in src_cluster[1:]: b.ready()
-        verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
 
         # Kill src[1], dst[0] should fail over to src[2]
         src_cluster[1].kill()
         for b in src_cluster[2:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
 
         # Kill dest[0], force failover to dest[1]
         dst_cluster[0].kill()
         for b in dst_cluster[1:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
 
         # Add dest[2]
         # dest[1] syncs dest[2] to current remote state
         dst_cluster.start(expect=EXPECT_EXIT_FAIL);
         for b in dst_cluster[1:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
 
         # Kill dest[1], force failover to dest[2]
         dst_cluster[1].kill()
         for b in dst_cluster[2:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
 
         for i in range(2, len(src_cluster)): src_cluster[i].kill()
         for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
 
 
+    def test_federation_multilink_failover(self):
+        """
+        Verify that multi-link federation operates across failures occuring in
+        a cluster.
+        """
+
+        # 1 node cluster source, 1 node cluster destination
+        src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+        src_cluster.ready();
+        dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+        dst_cluster.ready();
+
+        # federate a direct binding across two separate links
+
+        # first, create a direct exchange bound to two queues using different
+        # bindings
+        cmd = self.popen(["qpid-config",
+                          "--broker", src_cluster[0].host_port(),
+                          "add", "exchange", "direct", "FedX"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "exchange", "direct", "FedX"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "queue", "destQ1"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "bind", "FedX", "destQ1", "one"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "queue", "destQ2"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "bind", "FedX", "destQ2", "two"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        # Create two separate links between the dst and source brokers, bind
+        # each to different keys
+        dst_cluster[0].startQmf()
+        dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+
+        for _l in [("link1", "bridge1", "one"),
+                   ("link2", "bridge2", "two")]:
+            result = dst_broker.create("link", _l[0],
+                                       {"host":src_cluster[0].host(),
+                                        "port":src_cluster[0].port()},
+                                       False)
+            self.assertEqual(result.status, 0, result);
+            result = dst_broker.create("bridge", _l[1],
+                                       {"link":_l[0],
+                                        "src":"FedX",
+                                        "dest":"FedX",
+                                        "key":_l[2]}, False)
+            self.assertEqual(result.status, 0);
+
+        # check that traffic passes
+        assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+        assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+        # add new member, verify traffic
+        src_cluster.start(expect=EXPECT_EXIT_FAIL);
+        src_cluster.ready();
+
+        dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+        dst_cluster.ready();
+
+        assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+        assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+        src_cluster[0].kill()
+        for b in src_cluster[1:]: b.ready()
+
+        assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
+        assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
+
+        dst_cluster[0].kill()
+        for b in dst_cluster[1:]: b.ready()
+
+        assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
+        assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
+
+        for i in range(1, len(src_cluster)): src_cluster[i].kill()
+        for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
+
 # Some utility code for transaction tests
 XA_RBROLLBACK = 1
 XA_RBTIMEOUT = 2
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index 5f483ad..5bcf67d 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -23,6 +23,7 @@
 from qpid.datatypes import Message
 from qpid.queue import Empty
 from qpid.util import URL
+import qpid.messaging
 from time import sleep, time
 
 
@@ -94,6 +95,11 @@
                     break
             self._brokers.append(_b)
 
+        # add a new-style messaging connection to each broker
+        for _b in self._brokers:
+            _b.connection = qpid.messaging.Connection(_b.url)
+            _b.connection.open()
+
     def _teardown_brokers(self):
         """ Un-does _setup_brokers()
         """
@@ -103,7 +109,7 @@
             if not _b.client_session.error():
                 _b.client_session.close(timeout=10)
             _b.client_conn.close(timeout=10)
-
+            _b.connection.close()
 
     def test_bridge_create_and_close(self):
         self.startQmf();
@@ -2165,3 +2171,158 @@
         self.verify_cleanup()
 
 
+    def test_multilink_direct(self):
+        """ Verify that two distinct links can be created between federated
+        brokers.
+        """
+        self.startQmf()
+        qmf = self.qmf
+        self._setup_brokers()
+        src_broker = self._brokers[0]
+        dst_broker = self._brokers[1]
+
+        # create a direct exchange on each broker
+        for _b in [src_broker, dst_broker]:
+            _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+            self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+                             "direct", "exchange_declare failed!")
+
+        # create destination queues
+        for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+            dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True)
+            dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+
+        # create two connections, one for high priority traffic
+        for _q in ["HiPri", "Traffic"]:
+            result = dst_broker.qmf_object.create("link", _q,
+                                                  {"host":src_broker.host,
+                                                   "port":src_broker.port},
+                                                  False)
+            self.assertEqual(result.status, 0);
+
+        links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+        for _l in links:
+            if _l.name == "HiPri":
+                hi_link = _l
+            elif _l.name == "Traffic":
+                data_link = _l
+            else:
+                self.fail("Unexpected Link found: " + _l.name)
+
+        # now create a route for messages sent with key "high" to use the
+        # hi_link
+        result = dst_broker.qmf_object.create("bridge", "HiPriBridge",
+                                              {"link":hi_link.name,
+                                               "src":"fedX.direct",
+                                               "dest":"fedX.direct",
+                                               "key":"high"}, False)
+        self.assertEqual(result.status, 0);
+
+
+        # create routes for the "medium" and "low" links to use the normal
+        # data_link
+        for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]:
+            result = dst_broker.qmf_object.create("bridge", _b[0],
+                                                  {"link":data_link.name,
+                                                   "src":"fedX.direct",
+                                                   "dest":"fedX.direct",
+                                                   "key":_b[1]}, False)
+            self.assertEqual(result.status, 0);
+
+        # now wait for the links to become operational
+        for _l in [hi_link, data_link]:
+            expire_time = time() + 30
+            while _l.state != "Operational" and time() < expire_time:
+                _l.update()
+            self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+        # verify each link uses a different connection
+        self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef,
+                            "Different links using the same connection")
+
+        hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+                                 _objectId=hi_link.connectionRef)[0]
+        data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+                                 _objectId=data_link.connectionRef)[0]
+
+
+        # send hi data, verify only goes over hi link
+
+        r_ssn = dst_broker.connection.session()
+        hi_receiver = r_ssn.receiver("HiQ");
+        med_receiver = r_ssn.receiver("MedQ");
+        low_receiver = r_ssn.receiver("LoQ");
+
+        for _c in [hi_conn, data_conn]:
+            _c.update()
+            self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+        s_ssn = src_broker.connection.session()
+        hi_sender = s_ssn.sender("fedX.direct/high")
+        med_sender = s_ssn.sender("fedX.direct/medium")
+        low_sender = s_ssn.sender("fedX.direct/low")
+
+        try:
+            hi_sender.send(qpid.messaging.Message(content="hi priority"))
+            msg = hi_receiver.fetch(timeout=10)
+            r_ssn.acknowledge()
+            self.assertEqual(msg.content, "hi priority");
+        except:
+            self.fail("Hi Pri message failure")
+
+        hi_conn.update()
+        data_conn.update()
+        self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+        self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages")
+
+        # send low and medium, verify it does not go over hi link
+
+        try:
+            med_sender.send(qpid.messaging.Message(content="medium priority"))
+            msg = med_receiver.fetch(timeout=10)
+            r_ssn.acknowledge()
+            self.assertEqual(msg.content, "medium priority");
+        except:
+            self.fail("Medium Pri message failure")
+
+        hi_conn.update()
+        data_conn.update()
+        self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+        self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message")
+
+        try:
+            low_sender.send(qpid.messaging.Message(content="low priority"))
+            msg = low_receiver.fetch(timeout=10)
+            r_ssn.acknowledge()
+            self.assertEqual(msg.content, "low priority");
+        except:
+            self.fail("Low Pri message failure")
+
+        hi_conn.update()
+        data_conn.update()
+        self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+        self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message")
+
+        # cleanup
+
+        for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+            result = _b.close()
+            self.assertEqual(result.status, 0)
+
+        for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+            result = _l.close()
+            self.assertEqual(result.status, 0)
+
+        for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+            dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+            dst_broker.client_session.queue_delete(queue=_q[0])
+
+        for _b in [src_broker, dst_broker]:
+            _b.client_session.exchange_delete(exchange="fedX.direct")
+
+        self._teardown_brokers()
+
+        self.verify_cleanup()
+
+
+