blob: 3264d22c84300de556110c7326d7b70d54e8d2a4 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Runs QMF tests using the qmf.client API.
import unittest, qmf.client, os
class QmfClientTest(unittest.TestCase):
"""
Test QMFv2 support using the qmf.console library.
"""
def configure(self, config):
"""Called by the qpid-python-test framework with broker config"""
self.broker = config.broker
def setUp(self):
self.agent = qmf.client.BrokerAgent.connect(self.broker)
def test_broker(self):
self.assertEqual(self.agent.getBroker().name, "amqp-broker")
def test_connections(self):
connections = self.agent.getAllConnections()
self.assertTrue(len(connections) > 0)
def test_queues(self):
connections = self.agent.getAllConnections()
qnames = [ "qq%s"%i for i in xrange(10)]
for q in qnames:
self.agent.addQueue(q)
self.assertEqual(q, self.agent.getQueue(q).name)
queues = self.agent.getAllQueues()
self.assertLess(set(qnames), set([q.name for q in queues]))
self.agent.delQueue("qq0")
self.assertIs(None, self.agent.getQueue("qq0"))
try:
self.agent.delQueue("nosuch")
except:
pass
def test_exchanges(self):
connections = self.agent.getAllConnections()
enames = [ "ee%s"%i for i in xrange(10)]
for e in enames:
self.agent.addExchange('fanout', e)
self.assertEqual(e, self.agent.getExchange(e).name)
exchanges = self.agent.getAllExchanges()
self.assertLess(set(enames), set([e.name for e in exchanges]))
self.agent.delExchange("ee0")
self.assertIs(None, self.agent.getExchange("ee0"))
try:
self.agent.delExchange("nosuch")
except:
pass
def test_bind(self):
self.agent.addQueue('qq')
self.agent.addExchange('direct', 'ex')
self.agent.bind('ex', 'qq', 'kk')
self.assertTrue([b for b in self.agent.getAllBindings() if b.bindingKey == 'kk'])
self.agent.unbind('ex', 'qq', 'kk')
self.assertFalse([b for b in self.agent.getAllBindings() if b.bindingKey == 'kk'])
def test_fork(self):
"""Ensure that the client is fork-safe."""
self.agent.addQueue('parent')
pid = os.fork()
if pid: # parent
self.assertEqual((pid,0), os.waitpid(pid, 0))
self.assertIs(None, self.agent.addQueue('parent'))
self.assertEqual('child', self.agent.getQueue('child').name)
else: # child
# Can't use the parent's connection.
agent = qmf.client.BrokerAgent.connect(self.broker)
agent.delQueue('parent')
agent.addQueue('child')
os._exit(0) # Force exit, test framework will catch SystemExit
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
os.execvp("qpid-python-test", ["qpid-python-test", "-m", "qmf_client_tests"])