blob: cb66d079e5378face8f84d1e4d75821bf3165fac [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.content import Content
from qpid.testlib import TestBase
class QueryTests(TestBase):
"""Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
def test_exchange_query(self):
"""
Test that the exchange_query method works as expected
"""
channel = self.channel
#check returned type for the standard exchanges
self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
self.assertEqual("direct", channel.exchange_query(name="").type)
#declare an exchange
channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
#check that the result of a query is as expected
response = channel.exchange_query(name="my-test-exchange")
self.assertEqual("direct", response.type)
self.assertEqual(False, response.durable)
self.assertEqual(False, response.not_found)
#delete the exchange
channel.exchange_delete(exchange="my-test-exchange")
#check that the query now reports not-found
self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
def test_binding_query_direct(self):
"""
Test that the binding_query method works as expected with the direct exchange
"""
self.binding_query_with_key("amq.direct")
def test_binding_query_topic(self):
"""
Test that the binding_query method works as expected with the direct exchange
"""
self.binding_query_with_key("amq.topic")
def binding_query_with_key(self, exchange_name):
channel = self.channel
#setup: create two queues
channel.queue_declare(queue="used-queue", exclusive=True)
channel.queue_declare(queue="unused-queue", exclusive=True)
channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
# test detection of any binding to specific queue
response = channel.binding_query(exchange=exchange_name, queue="used-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
# test detection of specific binding to any queue
response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.key_not_matched)
# test detection of specific binding to specific queue
response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
self.assertEqual(False, response.key_not_matched)
# test unmatched queue, unspecified binding
response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
# test unspecified queue, unmatched binding
response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.key_not_matched)
# test matched queue, unmatched binding
response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
self.assertEqual(True, response.key_not_matched)
# test unmatched queue, matched binding
response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(False, response.key_not_matched)
# test unmatched queue, unmatched binding
response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(True, response.key_not_matched)
#test exchange not found
self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
#test queue not found
self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
def test_binding_query_fanout(self):
"""
Test that the binding_query method works as expected with fanout exchange
"""
channel = self.channel
#setup
channel.queue_declare(queue="used-queue", exclusive=True)
channel.queue_declare(queue="unused-queue", exclusive=True)
channel.queue_bind(exchange="amq.fanout", queue="used-queue")
# test detection of any binding to specific queue
response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
# test unmatched queue, unspecified binding
response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
#test exchange not found
self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
#test queue not found
self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
def test_binding_query_header(self):
"""
Test that the binding_query method works as expected with headers exchanges
"""
channel = self.channel
#setup
channel.queue_declare(queue="used-queue", exclusive=True)
channel.queue_declare(queue="unused-queue", exclusive=True)
channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
# test detection of any binding to specific queue
response = channel.binding_query(exchange="amq.match", queue="used-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
# test detection of specific binding to any queue
response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.args_not_matched)
# test detection of specific binding to specific queue
response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
self.assertEqual(False, response.args_not_matched)
# test unmatched queue, unspecified binding
response = channel.binding_query(exchange="amq.match", queue="unused-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
# test unspecified queue, unmatched binding
response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.args_not_matched)
# test matched queue, unmatched binding
response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(False, response.queue_not_matched)
self.assertEqual(True, response.args_not_matched)
# test unmatched queue, matched binding
response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(False, response.args_not_matched)
# test unmatched queue, unmatched binding
response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(True, response.args_not_matched)
#test exchange not found
self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
#test queue not found
self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)