| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import sys |
| from qpid.testlib import TestBase010 |
| from qpid.datatypes import Message |
| from qpid.queue import Empty |
| from time import sleep |
| |
| class HeadersFederationTests(TestBase010): |
| |
| def remote_host(self): |
| return self.defines.get("remote-host", "localhost") |
| |
| def remote_port(self): |
| return int(self.defines["remote-port"]) |
| |
| def verify_cleanup(self): |
| attempts = 0 |
| total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link")) |
| while total > 0: |
| attempts += 1 |
| if attempts >= 10: |
| self.fail("Bridges and links didn't clean up") |
| return |
| sleep(1) |
| total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link")) |
| |
| def test_dynamic_headers_unbind(self): |
| session = self.session |
| r_conn = self.connect(host=self.remote_host(), port=self.remote_port()) |
| r_session = r_conn.session("test_dynamic_headers_unbind") |
| |
| session.exchange_declare(exchange="fed.headers_unbind", type="headers") |
| r_session.exchange_declare(exchange="fed.headers_unbind", type="headers") |
| |
| self.startQmf() |
| qmf = self.qmf |
| |
| broker = qmf.getObjects(_class="broker")[0] |
| result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") |
| self.assertEqual(result.status, 0) |
| |
| link = qmf.getObjects(_class="link")[0] |
| result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0) |
| self.assertEqual(result.status, 0) |
| bridge = qmf.getObjects(_class="bridge")[0] |
| sleep(5) |
| |
| session.queue_declare(queue="fed1", exclusive=True, auto_delete=True) |
| queue = qmf.getObjects(_class="queue", name="fed1")[0] |
| queue.update() |
| self.assertEqual(queue.bindingCount, 1, |
| "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) |
| |
| session.exchange_bind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1", arguments={'x-match':'any', 'class':'first'}) |
| queue.update() |
| self.assertEqual(queue.bindingCount, 2, |
| "bindings not accounted for (expected 2, got %d)" % queue.bindingCount) |
| |
| session.exchange_unbind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1") |
| queue.update() |
| self.assertEqual(queue.bindingCount, 1, |
| "bindings not accounted for (expected 1, got %d)" % queue.bindingCount) |
| |
| result = bridge.close() |
| self.assertEqual(result.status, 0) |
| result = link.close() |
| self.assertEqual(result.status, 0) |
| |
| self.verify_cleanup() |
| |
| def getProperty(self, msg, name): |
| for h in msg.headers: |
| if hasattr(h, name): return getattr(h, name) |
| return None |
| |
| def getAppHeader(self, msg, name): |
| headers = self.getProperty(msg, "application_headers") |
| if headers: |
| return headers[name] |
| return None |