blob: 60cff1da54579f22c7eb2908956d17ee27b4eb2e [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
from time import sleep
class HeadersFederationTests(TestBase010):
def remote_host(self):
return self.defines.get("remote-host", "localhost")
def remote_port(self):
return int(self.defines["remote-port"])
def verify_cleanup(self):
attempts = 0
total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link"))
while total > 0:
attempts += 1
if attempts >= 10:
self.fail("Bridges and links didn't clean up")
return
sleep(1)
total = len(self.qmf.getObjects(_class="bridge")) + len(self.qmf.getObjects(_class="link"))
def test_dynamic_headers_unbind(self):
session = self.session
r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
r_session = r_conn.session("test_dynamic_headers_unbind")
session.exchange_declare(exchange="fed.headers_unbind", type="headers")
r_session.exchange_declare(exchange="fed.headers_unbind", type="headers")
self.startQmf()
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "fed.headers_unbind", "fed.headers_unbind", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
queue = qmf.getObjects(_class="queue", name="fed1")[0]
queue.update()
self.assertEqual(queue.bindingCount, 1,
"bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
session.exchange_bind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
queue.update()
self.assertEqual(queue.bindingCount, 2,
"bindings not accounted for (expected 2, got %d)" % queue.bindingCount)
session.exchange_unbind(queue="fed1", exchange="fed.headers_unbind", binding_key="key1")
queue.update()
self.assertEqual(queue.bindingCount, 1,
"bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
result = bridge.close()
self.assertEqual(result.status, 0)
result = link.close()
self.assertEqual(result.status, 0)
self.verify_cleanup()
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
return None
def getAppHeader(self, msg, name):
headers = self.getProperty(msg, "application_headers")
if headers:
return headers[name]
return None