blob: cc846aefd4faa6ced8b0b8c676c41c21c6853c64 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import re
from brokertest import BrokerTest
from qpid.messaging import Empty
from qmf.console import Session
import qpid.messaging, brokertest
brokertest.qm = qpid.messaging # TODO aconway 2014-04-04: Tests fail with SWIG client.
def store_args(store_dir = None):
"""Return the broker args necessary to load the async store"""
assert BrokerTest.store_lib
if store_dir == None:
return []
return ["--store-dir", store_dir]
class Qmf:
"""
QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available.
"""
def __init__(self, broker):
self.__session = Session()
self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port())
def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False,
arguments = None):
"""Add a new exchange"""
amqp_session = self.__broker.getAmqpSession()
if arguments == None:
arguments = {}
if alt_exchange_name:
amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
alternate_exchange=alt_exchange_name, passive=passive, durable=durable,
arguments=arguments)
else:
amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable,
arguments=arguments)
def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None):
"""Add a new queue"""
amqp_session = self.__broker.getAmqpSession()
if arguments == None:
arguments = {}
if alt_exchange_name:
amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive,
durable=durable, arguments=arguments)
else:
amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments)
def delete_queue(self, queue_name):
"""Delete an existing queue"""
amqp_session = self.__broker.getAmqpSession()
amqp_session.queue_delete(queue_name)
def _query(self, name, _class, package, alt_exchange_name=None):
"""Qmf query function which can optionally look for the presence of an alternate exchange name"""
try:
obj_list = self.__session.getObjects(_class=_class, _package=package)
found = False
for obj in obj_list:
if obj.name == name:
found = True
if alt_exchange_name != None:
alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange)
if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name:
return False
break
return found
except Exception:
return False
def query_exchange(self, exchange_name, alt_exchange_name=None):
"""Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
value."""
return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name)
def query_queue(self, queue_name, alt_exchange_name=None):
"""Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
value."""
return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name)
def queue_message_count(self, queue_name):
"""Query the number of messages on a queue"""
queue_list = self.__session.getObjects(_class="queue", _name=queue_name)
if len(queue_list):
return queue_list[0].msgDepth
def queue_empty(self, queue_name):
"""Check if a queue is empty (has no messages waiting)"""
return self.queue_message_count(queue_name) == 0
def get_objects(self, target_class, target_package="org.apache.qpid.broker"):
return self.__session.getObjects(_class=target_class, _package=target_package)
def close(self):
self.__session.delBroker(self.__broker)
self.__session = None
class StoreTest(BrokerTest):
"""
This subclass of BrokerTest adds some convenience test/check functions
"""
def _chk_empty(self, queue, receiver):
"""Check if a queue is empty (has no more messages)"""
try:
msg = receiver.fetch(timeout=0)
self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg))
except Empty:
pass
@staticmethod
def make_message(msg_count, msg_size):
"""Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count"""
msg = "msg-%04d" % msg_count
msg_len = len(msg)
buff = ""
if msg_size != None and msg_size > msg_len:
for index in range(0, msg_size - msg_len):
if index == msg_size - msg_len - 1:
buff += "-"
else:
buff += chr(ord('a') + (index % 26))
return buff + msg
# Functions for formatting address strings
@staticmethod
def _fmt_csv(string_list, list_braces = None):
"""Format a list using comma-separation. Braces are optionally added."""
if len(string_list) == 0:
return ""
first = True
str_ = ""
if list_braces != None:
str_ += list_braces[0]
for string in string_list:
if string != None:
if first:
first = False
else:
str_ += ", "
str_ += string
if list_braces != None:
str_ += list_braces[1]
return str_
def _fmt_map(self, string_list):
"""Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map
element('key:val')."""
return self._fmt_csv(string_list, list_braces="{}")
def _fmt_list(self, string_list):
"""Format a list [l1, l2, l3, ...] from a string list."""
return self._fmt_csv(string_list, list_braces="[]")
def addr_fmt(self, node_name, **kwargs):
"""Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address
string."""
# Get keyword args
node_subject = kwargs.get("node_subject")
create_policy = kwargs.get("create_policy")
delete_policy = kwargs.get("delete_policy")
assert_policy = kwargs.get("assert_policy")
mode = kwargs.get("mode")
link = kwargs.get("link", False)
link_name = kwargs.get("link_name")
node_type = kwargs.get("node_type")
durable = kwargs.get("durable", False)
link_reliability = kwargs.get("link_reliability")
x_declare_list = kwargs.get("x_declare_list", [])
x_bindings_list = kwargs.get("x_bindings_list", [])
x_subscribe_list = kwargs.get("x_subscribe_list", [])
node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0)
link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or
len(x_bindings_list) > 0 or len(x_subscribe_list) > 0)
assert not (node_flag and link_flag)
opt_str_list = []
if create_policy != None:
opt_str_list.append("create: %s" % create_policy)
if delete_policy != None:
opt_str_list.append("delete: %s" % delete_policy)
if assert_policy != None:
opt_str_list.append("assert: %s" % assert_policy)
if mode != None:
opt_str_list.append("mode: %s" % mode)
if node_flag or link_flag:
node_str_list = []
if link_name != None:
node_str_list.append("name: \"%s\"" % link_name)
if node_type != None:
node_str_list.append("type: %s" % node_type)
if durable:
node_str_list.append("durable: True")
if link_reliability != None:
node_str_list.append("reliability: %s" % link_reliability)
if len(x_declare_list) > 0:
node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list))
if len(x_bindings_list) > 0:
node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list))
if len(x_subscribe_list) > 0:
node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list))
if node_flag:
opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
else:
opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
addr_str = node_name
if node_subject != None:
addr_str += "/%s" % node_subject
if len(opt_str_list) > 0:
addr_str += "; %s" % self._fmt_map(opt_str_list)
return addr_str
def snd_addr(self, node_name, **kwargs):
""" Create a send (node) address"""
# Get keyword args
topic = kwargs.get("topic")
topic_flag = kwargs.get("topic_flag", False)
auto_create = kwargs.get("auto_create", True)
auto_delete = kwargs.get("auto_delete", False)
durable = kwargs.get("durable", False)
exclusive = kwargs.get("exclusive", False)
ftd_count = kwargs.get("ftd_count")
ftd_size = kwargs.get("ftd_size")
policy = kwargs.get("policy", "flow-to-disk")
exchage_type = kwargs.get("exchage_type")
create_policy = None
if auto_create:
create_policy = "always"
delete_policy = None
if auto_delete:
delete_policy = "always"
node_type = None
if topic != None or topic_flag:
node_type = "topic"
x_declare_list = ["\"exclusive\": %s" % exclusive]
if ftd_count != None or ftd_size != None:
queue_policy = ["\'qpid.policy_type\': %s" % policy]
if ftd_count:
queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
if ftd_size:
queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
if exchage_type != None:
x_declare_list.append("type: %s" % exchage_type)
return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy,
node_type=node_type, durable=durable, x_declare_list=x_declare_list)
def rcv_addr(self, node_name, **kwargs):
""" Create a receive (link) address"""
# Get keyword args
auto_create = kwargs.get("auto_create", True)
auto_delete = kwargs.get("auto_delete", False)
link_name = kwargs.get("link_name")
durable = kwargs.get("durable", False)
browse = kwargs.get("browse", False)
exclusive = kwargs.get("exclusive", False)
binding_list = kwargs.get("binding_list", [])
ftd_count = kwargs.get("ftd_count")
ftd_size = kwargs.get("ftd_size")
policy = kwargs.get("policy", "flow-to-disk")
create_policy = None
if auto_create:
create_policy = "always"
delete_policy = None
if auto_delete:
delete_policy = "always"
mode = None
if browse:
mode = "browse"
x_declare_list = ["\"exclusive\": %s" % exclusive]
if ftd_count != None or ftd_size != None:
queue_policy = ["\'qpid.policy_type\': %s" % policy]
if ftd_count:
queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
if ftd_size:
queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
x_bindings_list = []
for binding in binding_list:
x_bindings_list.append("{exchange: %s, key: %s}" % binding)
if durable: reliability = 'at-least-once'
else: reliability = None
return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
link_name=link_name, durable=durable, x_declare_list=x_declare_list,
x_bindings_list=x_bindings_list, link_reliability=reliability)
def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
"""Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse)
def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False,
emtpy_flag=False):
"""Check that messages is on a queue by dequeuing them and comparing them to the expected messages"""
if emtpy_flag:
num_msgs = 0
else:
num_msgs = len(exp_msg_list)
ssn = broker.connect().session(transactional=transactional)
rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs)
if num_msgs > 0:
try:
recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)]
except Empty:
self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i))
for i in range(0, len(recieved_msg_list)):
self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content)
self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id)
if empty:
self._chk_empty(queue, rcvr)
if ack:
ssn.acknowledge()
if transactional:
ssn.commit()
ssn.connection.close()
else:
if transactional:
ssn.commit()
return ssn
# Functions for finding strings in the broker log file (or other files)
@staticmethod
def _read_file(file_name):
"""Returns the content of file named file_name as a string"""
file_handle = file(file_name)
try:
return file_handle.read()
finally:
file_handle.close()
def _get_hits(self, broker, search):
"""Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple
queues)"""
# TODO: Use sets when RHEL-4 is no longer supported
hits = []
for hit in search.findall(self._read_file(broker.log)):
if hit not in hits:
hits.append(hit)
return hits
def _reconsile_hits(self, broker, ftd_msgs, release_hits):
"""Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining
release_hits."""
for msg in ftd_msgs:
found = False
for hit in release_hits:
if str(msg.id) in hit:
release_hits.remove(hit)
#print "Found %s in %s" % (msg.id, broker.log)
found = True
break
if not found:
self.assert_(False, "Unable to locate released message %s in log %s" % (msg.id, broker.log))
if len(release_hits) > 0:
err = "Messages were unexpectedly released in log %s:\n" % broker.log
for hit in release_hits:
err += " %s\n" % hit
self.assert_(False, err)
def check_msg_release(self, broker, ftd_msgs):
""" Check for 'Content released' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
def check_msg_release_on_commit(self, broker, ftd_msgs):
""" Check for 'Content released on commit' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released on commit$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
def check_msg_release_on_recover(self, broker, ftd_msgs):
""" Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content released after recovery$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
def check_msg_block(self, broker, ftd_msgs):
"""Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content release blocked$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)
def check_msg_block_on_commit(self, broker, ftd_msgs):
"""Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
"Content release blocked on commit$", re.MULTILINE))
self._reconsile_hits(broker, ftd_msgs, hits)