blob: e73c0587a727d27ce68b85c9e6fa6286ed94acda [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import time
from math import ceil
from qpid.harness import Skipped
from qpid.tests.messaging.implementation import *
from qpid.tests import Test
from qpid.util import URL
class Base(Test):
def setup_connection(self):
return None
def setup_session(self):
return None
def setup_sender(self):
return None
def setup_receiver(self):
return None
def setup(self):
self.test_id = uuid4()
if self.is_swigged_qpid_messaging():
self.broker = CompatURL(self.config.broker)
else:
self.broker = self.config.broker
try:
self.conn = self.setup_connection()
except ConnectError, e:
raise Skipped(e)
self.ssn = self.setup_session()
self.snd = self.setup_sender()
if self.snd is not None:
self.snd.durable = self.durable()
self.rcv = self.setup_receiver()
def teardown(self):
if self.conn is not None and self.conn.attached():
self.teardown_connection(self.conn)
self.conn = None
def teardown_connection(self, conn):
conn.close(timeout=self.timeout())
def content(self, base, count = None):
if count is None:
return "%s[%s]" % (base, self.test_id)
else:
return "%s[%s, %s]" % (base, count, self.test_id)
def message(self, base, count = None, **kwargs):
return Message(content=self.content(base, count), **kwargs)
def ping(self, ssn):
PING_Q = 'ping-queue; {create: always, delete: always}'
# send a message
sender = ssn.sender(PING_Q, durable=self.durable())
content = self.content("ping")
sender.send(content)
receiver = ssn.receiver(PING_Q)
msg = receiver.fetch(0)
ssn.acknowledge()
assert msg.content == content, "expected %r, got %r" % (content, msg.content)
def drain(self, rcv, limit=None, timeout=0, expected=None, redelivered=False):
messages = []
try:
while limit is None or len(messages) < limit:
messages.append(rcv.fetch(timeout=timeout))
except Empty:
pass
if expected is not None:
self.assertEchos(expected, messages, redelivered)
return messages
def diff(self, m1, m2, excluded_properties=()):
result = {}
for attr in ("id", "subject", "user_id", "reply_to",
"correlation_id", "durable", "priority", "ttl",
"redelivered", "content_type", "content"):
a1 = getattr(m1, attr)
a2 = getattr(m2, attr)
if a1 != a2:
result[attr] = (a1, a2)
p1 = dict(m1.properties)
p2 = dict(m2.properties)
for ep in excluded_properties:
p1.pop(ep, None)
p2.pop(ep, None)
if p1 != p2:
result["properties"] = (p1, p2)
return result
def assertEcho(self, msg, echo, redelivered=False):
if not isinstance(msg, Message) or not isinstance(echo, Message):
if isinstance(msg, Message):
msg = msg.content
if isinstance(echo, Message):
echo = echo.content
assert msg == echo, "expected %s, got %s" % (msg, echo)
else:
delta = self.diff(msg, echo, ("x-amqp-0-10.routing-key","qpid.subject"))
mttl, ettl = delta.pop("ttl", (0, 0))
if redelivered:
assert echo.redelivered, \
"expected %s to be redelivered: %s" % (msg, echo)
if delta.has_key("redelivered"):
del delta["redelivered"]
assert mttl is not None and ettl is not None, "%s, %s" % (mttl, ettl)
assert mttl >= ettl, "%s, %s" % (mttl, ettl)
assert not delta, "expected %s, got %s, delta %s" % (msg, echo, delta)
def assertEchos(self, msgs, echoes, redelivered=False):
assert len(msgs) == len(echoes), "%s, %s" % (msgs, echoes)
for m, e in zip(msgs, echoes):
self.assertEcho(m, e, redelivered)
def assertEmpty(self, rcv):
contents = self.drain(rcv)
assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
def assertAvailable(self, rcv, expected=None, lower=None, upper=None):
if expected is not None:
if lower is not None or upper is not None:
raise ValueError("cannot specify lower or upper when specifying expected")
lower = expected
upper = expected
else:
if lower is None:
lower = int(ceil(rcv.threshold*rcv.capacity))
if upper is None:
upper = rcv.capacity
p = rcv.available()
if upper == lower:
assert p == lower, "expected %s, got %s" % (lower, p)
else:
assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper)
def sleep(self):
time.sleep(self.delay())
def delay(self):
return float(self.config.defines.get("delay", "2"))
def timeout(self):
return float(self.config.defines.get("timeout", "60"))
def get_bool(self, name):
return self.config.defines.get(name, "false").lower() in ("true", "yes", "1")
def durable(self):
return self.get_bool("durable")
def reconnect(self):
return self.get_bool("reconnect")
def transport(self):
if self.broker.scheme == self.broker.AMQPS:
return "ssl"
else:
return "tcp"
def connection_options(self):
protocol_version = self.config.defines.get("protocol_version")
if protocol_version:
return {"reconnect": self.reconnect(),
"transport": self.transport(),
"protocol":protocol_version}
else:
return {"reconnect": self.reconnect(),
"transport": self.transport()}
def is_swigged_qpid_messaging(self):
return Connection.__module__ == 'qpid_messaging'
class VersionTest (Base):
def create_connection(self, version="amqp1.0", force=False):
opts = self.connection_options()
if force or not 'protocol' in opts:
opts['protocol'] = version;
return Connection.establish(self.broker, **opts)
def setup_connection(self):
return self.create_connection()
def setup_session(self):
return self.conn.session()
class CompatURL(URL):
"""The formation of the URL for the C++ client is different to that of the pure Python client in that
the C++ client expects <scheme>:<user>/<pass>.. whereas the pure Python's URL class expects
<scheme>://<user>:<pass>.."""
def __init__(self, *args, **kwargs):
URL.__init__(self, *args, **kwargs)
def __str__(self):
s = ""
if self.scheme:
s += "%s:" % self.scheme
if self.user:
s += self.user
if self.password:
s += "/%s" % self.password
s += "@"
if ':' not in self.host:
s += self.host
else:
s += "[%s]" % self.host
if self.port:
s += ":%s" % self.port
return s
import address, endpoints, message, selector