blob: c4ab712f14b93acbbd1172ff9dbd76f67cdec3a4 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys, re, traceback, socket
from getopt import getopt, GetoptError
from qpid.connection import Connection
from qpid.util import connect
from qpid.datatypes import Message, RangedSet
from qpid.queue import Empty
from qpid.session import SessionException
from qpid.testlib import TestBase010
from time import sleep
class PersistenceTest(TestBase010):
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
XA_OK = 0
def createMessage(self, **kwargs):
session = self.session
dp = {}
dp['delivery_mode'] = 2
mp = {}
for k, v in kwargs.iteritems():
if k in ['routing_key', 'delivery_mode']: dp[k] = v
if k in ['message_id', 'correlation_id', 'application_headers']: mp[k] = v
args = []
args.append(session.delivery_properties(**dp))
if len(mp):
args.append(session.message_properties(**mp))
if kwargs.has_key('body'): args.append(kwargs['body'])
return Message(*args)
def phase1(self):
session = self.session
session.queue_declare(queue="queue-a", durable=True)
session.queue_declare(queue="queue-b", durable=True)
session.exchange_bind(queue="queue-a", exchange="amq.direct", binding_key="a")
session.exchange_bind(queue="queue-b", exchange="amq.direct", binding_key="b")
session.message_transfer(destination="amq.direct",
message=self.createMessage(routing_key="a", correlation_id="Msg0001", body="A_Message1"))
session.message_transfer(destination="amq.direct",
message=self.createMessage(routing_key="b", correlation_id="Msg0002", body="B_Message1"))
# session.queue_declare(queue="lvq-test", durable=True, arguments={"qpid.last_value_queue":True})
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B1"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A1"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A2"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B2"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"B"}, body="B3"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C1"))
def phase2(self):
session = self.session
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)
session.queue_declare(queue="queue-b", durable=True, passive=True)
#check they are still bound to amq.direct correctly
responses = []
responses.append(session.exchange_bound(queue="queue-a", exchange="amq.direct", binding_key="a"))
responses.append(session.exchange_bound(queue="queue-b", exchange="amq.direct", binding_key="b"))
for r in responses:
self.assert_(not r.exchange_not_found)
self.assert_(not r.queue_not_found)
self.assert_(not r.key_not_matched)
#check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0001", "A_Message1")
self.assertMessageOnQueue("queue-b", "Msg0002", "B_Message1")
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
session.queue_declare(queue="queue-c", durable=True)
#send a message to a topic such that it reaches all queues
session.exchange_bind(queue="queue-a", exchange="amq.topic", binding_key="abc")
session.exchange_bind(queue="queue-b", exchange="amq.topic", binding_key="abc")
session.exchange_bind(queue="queue-c", exchange="amq.topic", binding_key="abc")
session.message_transfer(destination="amq.topic",
message=self.createMessage(routing_key="abc", correlation_id="Msg0003", body="AB_Message2"))
# #check LVQ exists and has exepected messages:
# session.queue_declare(queue="lvq-test", durable=True, passive=True)
# session.message_subscribe(destination="lvq", queue="lvq-test")
# lvq = session.incoming("lvq")
# lvq.start()
# accepted = RangedSet()
# for m in ["A2", "B3", "C1"]:
# msg = lvq.get(timeout=1)
# self.assertEquals(m, msg.body)
# accepted.add(msg.id)
# try:
# extra = lvq.get(timeout=1)
# self.fail("lvq-test not empty, contains: " + extra.body)
# except Empty: None
# #publish some more messages while subscriber is active (no replacement):
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C2"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C3"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A3"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"A"}, body="A4"))
# session.message_transfer(message=self.createMessage(routing_key="lvq-test", application_headers={"qpid.LVQ_key":"C"}, body="C4"))
# #check that accepting replaced messages is safe
# session.message_accept(accepted)
def phase3(self):
session = self.session
# #lvq recovery validation
# session.queue_declare(queue="lvq-test", durable=True, passive=True)
# session.message_subscribe(destination="lvq", queue="lvq-test")
# lvq = session.incoming("lvq")
# lvq.start()
# accepted = RangedSet()
# lvq.start()
# for m in ["C4", "A4"]:
# msg = lvq.get(timeout=1)
# self.assertEquals(m, msg.body)
# accepted.add(msg.id)
# session.message_accept(accepted)
# try:
# extra = lvq.get(timeout=1)
# self.fail("lvq-test not empty, contains: " + extra.body)
# except Empty: None
# session.message_cancel(destination="lvq")
# session.queue_delete(queue="lvq-test")
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)
session.queue_declare(queue="queue-b", durable=True, passive=True)
session.queue_declare(queue="queue-c", durable=True, passive=True)
session.tx_select()
#check expected messages are there
self.assertMessageOnQueue("queue-a", "Msg0003", "AB_Message2")
self.assertMessageOnQueue("queue-b", "Msg0003", "AB_Message2")
self.assertMessageOnQueue("queue-c", "Msg0003", "AB_Message2")
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
self.assertEmptyQueue("queue-c")
#note: default bindings must be restored for this to work
session.message_transfer(message=self.createMessage(
routing_key="queue-a", correlation_id="Msg0004", body="A_Message3"))
session.message_transfer(message=self.createMessage(
routing_key="queue-a", correlation_id="Msg0005", body="A_Message4"))
session.message_transfer(message=self.createMessage(
routing_key="queue-a", correlation_id="Msg0006", body="A_Message5"))
session.tx_commit()
#delete a queue
session.queue_delete(queue="queue-c")
session.message_subscribe(destination="ctag", queue="queue-a", accept_mode=0)
session.message_flow(destination="ctag", unit=0, value=0xFFFFFFFF)
session.message_flow(destination="ctag", unit=1, value=0xFFFFFFFF)
included = session.incoming("ctag")
msg1 = included.get(timeout=1)
self.assertExpectedContent(msg1, "Msg0004", "A_Message3")
msg2 = included.get(timeout=1)
self.assertExpectedContent(msg2, "Msg0005", "A_Message4")
msg3 = included.get(timeout=1)
self.assertExpectedContent(msg3, "Msg0006", "A_Message5")
self.ack(msg1, msg2, msg3)
session.message_transfer(destination="amq.direct", message=self.createMessage(
routing_key="queue-b", correlation_id="Msg0007", body="B_Message3"))
session.tx_rollback()
def phase4(self):
session = self.session
#check queues exists
session.queue_declare(queue="queue-a", durable=True, passive=True)
session.queue_declare(queue="queue-b", durable=True, passive=True)
self.assertMessageOnQueue("queue-a", "Msg0004", "A_Message3")
self.assertMessageOnQueue("queue-a", "Msg0005", "A_Message4")
self.assertMessageOnQueue("queue-a", "Msg0006", "A_Message5")
self.assertEmptyQueue("queue-a")
self.assertEmptyQueue("queue-b")
#check this queue doesn't exist
try:
session.queue_declare(queue="queue-c", durable=True, passive=True)
raise Exception("Expected queue-c to have been deleted")
except SessionException, e:
self.assertEquals(404, e.args[0].error_code)
def phase5(self):
session = self.session
queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
for q in queues:
session.queue_declare(queue=q, durable=True)
session.queue_purge(queue=q)
session.message_transfer(message=self.createMessage(
routing_key="queue-a1", correlation_id="MsgA", body="MessageA"))
session.message_transfer(message=self.createMessage(
routing_key="queue-b1", correlation_id="MsgB", body="MessageB"))
session.message_transfer(message=self.createMessage(
routing_key="queue-c1", correlation_id="MsgC", body="MessageC"))
session.message_transfer(message=self.createMessage(
routing_key="queue-d1", correlation_id="MsgD", body="MessageD"))
session.dtx_select()
txa = self.xid('a')
txb = self.xid('b')
txc = self.xid('c')
txd = self.xid('d')
self.txswap("queue-a1", "queue-a2", txa)
self.txswap("queue-b1", "queue-b2", txb)
self.txswap("queue-c1", "queue-c2", txc)
self.txswap("queue-d1", "queue-d2", txd)
#no queue should have any messages accessible
for q in queues:
self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
self.assertEqual(self.XA_OK, session.dtx_commit(xid=txa, one_phase=True).status)
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txb).status)
self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txc).status)
self.assertEqual(self.XA_OK, session.dtx_prepare(xid=txd).status)
#further checks
not_empty = ["queue-a2", "queue-b1"]
for q in queues:
if q in not_empty:
self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
else:
self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
def phase6(self):
session = self.session
#check prepared transaction are reported correctly by recover
txc = self.xid('c')
txd = self.xid('d')
xids = session.dtx_recover().in_doubt
ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
if txc.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txc))
if txd.global_id not in ids:
self.fail("Recovered xids not as expected. missing: %s" % (txd))
self.assertEqual(2, len(xids))
queues = ["queue-a1", "queue-a2", "queue-b1", "queue-b2", "queue-c1", "queue-c2", "queue-d1", "queue-d2"]
not_empty = ["queue-a2", "queue-b1"]
#re-check
not_empty = ["queue-a2", "queue-b1"]
for q in queues:
if q in not_empty:
self.assertEqual(1, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
else:
self.assertEqual(0, session.queue_query(queue=q).message_count, "Bad count for %s" % (q))
#complete the prepared transactions
self.assertEqual(self.XA_OK, session.dtx_commit(xid=txc).status)
self.assertEqual(self.XA_OK, session.dtx_rollback(xid=txd).status)
not_empty.append("queue-c2")
not_empty.append("queue-d1")
for q in queues:
if q in not_empty:
self.assertEqual(1, session.queue_query(queue=q).message_count)
else:
self.assertEqual(0, session.queue_query(queue=q).message_count)
def phase7(self):
session = self.session
session.synchronous = False
# check xids from phase 6 are gone
txc = self.xid('c')
txd = self.xid('d')
xids = session.dtx_recover().in_doubt
ids = [x.global_id for x in xids] #TODO: come up with nicer way to test these
if txc.global_id in ids:
self.fail("Xid still present : %s" % (txc))
if txd.global_id in ids:
self.fail("Xid still present : %s" % (txc))
self.assertEqual(0, len(xids))
#test deletion of queue after publish
#create queue
session.queue_declare(queue = "q", auto_delete=True, durable=True)
#send message
for i in range(1, 10):
session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message"))
session.synchronous = True
#explicitly delete queue
session.queue_delete(queue = "q")
#test acking of message from auto-deleted queue
#create queue
session.queue_declare(queue = "q", auto_delete=True, durable=True)
#send message
session.message_transfer(message=self.createMessage(routing_key = "q", body = "my-message"))
#create consumer
session.message_subscribe(queue = "q", destination = "a", accept_mode=0, acquire_mode=0)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
session.message_flow(unit = 0, value = 10, destination = "a")
queue = session.incoming("a")
#consume the message, cancel subscription (triggering auto-delete), then ack it
msg = queue.get(timeout = 5)
session.message_cancel(destination = "a")
self.ack(msg)
#test implicit deletion of bindings when queue is deleted
session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
session.queue_delete(queue = "durable-subscriber-queue")
#test unbind:
#create a series of bindings to a queue
session.queue_declare(queue = "binding-test-queue", durable=True)
session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="abc")
session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr")
session.exchange_bind(exchange="amq.direct", queue="binding-test-queue", binding_key="xyz")
session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="a", arguments={"x-match":"all", "p":"a"})
session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="b", arguments={"x-match":"all", "p":"b"})
session.exchange_bind(exchange="amq.match", queue="binding-test-queue", binding_key="c", arguments={"x-match":"all", "p":"c"})
#then restart broker...
def phase8(self):
session = self.session
#continue testing unbind:
#send messages to the queue via each of the bindings
for k in ["abc", "pqr", "xyz"]:
data = "first %s" % (k)
session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
data = "first %s" % (a["p"])
session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
#unbind some bindings (using final 0-10 semantics)
session.exchange_unbind(exchange="amq.direct", queue="binding-test-queue", binding_key="pqr")
session.exchange_unbind(exchange="amq.match", queue="binding-test-queue", binding_key="b")
#send messages again
for k in ["abc", "pqr", "xyz"]:
data = "second %s" % (k)
session.message_transfer(destination= "amq.direct", message=self.createMessage(routing_key=k, body=data))
for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
data = "second %s" % (a["p"])
session.message_transfer(destination="amq.match", message=self.createMessage(application_headers=a, body=data))
#check that only the correct messages are received
expected = []
for k in ["abc", "pqr", "xyz"]:
expected.append("first %s" % (k))
for a in [{"p":"a"}, {"p":"b"}, {"p":"c"}]:
expected.append("first %s" % (a["p"]))
for k in ["abc", "xyz"]:
expected.append("second %s" % (k))
for a in [{"p":"a"}, {"p":"c"}]:
expected.append("second %s" % (a["p"]))
session.message_subscribe(queue = "binding-test-queue", destination = "binding-test")
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "binding-test")
session.message_flow(unit = 0, value = 10, destination = "binding-test")
queue = session.incoming("binding-test")
while len(expected):
msg = queue.get(timeout=1)
if msg.body not in expected:
self.fail("Missing message: %s" % msg.body)
expected.remove(msg.body)
try:
msg = queue.get(timeout=1)
self.fail("Got extra message: %s" % msg.body)
except Empty: pass
session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
session.message_transfer(destination= "amq.topic", message=self.createMessage(routing_key = "xyz", body = "my-message"))
session.queue_delete(queue = "durable-subscriber-queue")
def xid(self, txid, branchqual = ''):
return self.session.xid(format=0, global_id=txid, branch_id=branchqual)
def txswap(self, src, dest, tx):
self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)
self.session.message_subscribe(destination="temp-swap", queue=src, accept_mode=0)
self.session.message_flow(destination="temp-swap", unit=0, value=1)
self.session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
msg = self.session.incoming("temp-swap").get(timeout=1)
self.session.message_cancel(destination="temp-swap")
self.session.message_transfer(message=self.createMessage(routing_key=dest, correlation_id=self.getProperty(msg, 'correlation_id'),
body=msg.body))
self.ack(msg)
self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)
def assertEmptyQueue(self, name):
self.assertEqual(0, self.session.queue_query(queue=name).message_count)
def assertConnectionException(self, expectedCode, message):
self.assertEqual("connection", message.method.klass.name)
self.assertEqual("close", message.method.name)
self.assertEqual(expectedCode, message.reply_code)
def assertExpectedMethod(self, reply, klass, method):
self.assertEqual(klass, reply.method.klass.name)
self.assertEqual(method, reply.method.name)
def assertExpectedContent(self, msg, id, body):
self.assertEqual(id, self.getProperty(msg, 'correlation_id'))
self.assertEqual(body, msg.body)
return msg
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
return None
def ack(self, *msgs):
session = self.session
set = RangedSet()
for m in msgs:
set.add(m.id)
#TODO: tidy up completion
session.receiver._completed.add(m.id)
session.message_accept(set)
session.channel.session_completed(session.receiver._completed)
def assertExpectedGetResult(self, id, body):
return self.assertExpectedContent(session.incoming("incoming-gets").get(timeout=1), id, body)
def assertEqual(self, expected, actual, msg=''):
if expected != actual: raise Exception("%s expected: %s actual: %s" % (msg, expected, actual))
def assertMessageOnQueue(self, queue, id, body):
self.session.message_subscribe(destination="incoming-gets", queue=queue, accept_mode=0)
self.session.message_flow(destination="incoming-gets", unit=0, value=1)
self.session.message_flow(destination="incoming-gets", unit=1, value=0xFFFFFFFF)
msg = self.session.incoming("incoming-gets").get(timeout=1)
self.assertExpectedContent(msg, id, body)
self.ack(msg)
self.session.message_cancel(destination="incoming-gets")
def __init__(self):
TestBase010.__init__(self, "run")
self.setBroker("localhost")
self.errata = []
def connect(self):
""" Connects to the broker """
self.conn = Connection(connect(self.host, self.port))
self.conn.start(timeout=10)
self.session = self.conn.session("test-session", timeout=10)
def run(self, args=sys.argv[1:]):
try:
opts, extra = getopt(args, "r:s:e:b:p:h", ["retry=", "spec=", "errata=", "broker=", "phase=", "help"])
except GetoptError, e:
self._die(str(e))
phase = 0
retry = 0;
for opt, value in opts:
if opt in ("-h", "--help"): self._die()
if opt in ("-s", "--spec"): self.spec = value
if opt in ("-e", "--errata"): self.errata.append(value)
if opt in ("-b", "--broker"): self.setBroker(value)
if opt in ("-p", "--phase"): phase = int(value)
if opt in ("-r", "--retry"): retry = int(value)
if not phase: self._die("please specify the phase to run")
phase = "phase%d" % phase
self.connect()
try:
getattr(self, phase)()
print phase, "succeeded"
res = True;
except Exception, e:
print phase, "failed: ", e
traceback.print_exc()
res = False
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
# Crude fix to wait for thread in client to exit after return from session_close()
# Reduces occurrences of "Unhandled exception in thread" messages after each test
import time
time.sleep(1)
return res
def setBroker(self, broker):
rex = re.compile(r"""
# [ <user> [ / <password> ] @] <host> [ :<port> ]
^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
match = rex.match(broker)
if not match: self._die("'%s' is not a valid broker" % (broker))
self.user, self.password, self.host, self.port = match.groups()
self.port = int(default(self.port, 5672))
self.user = default(self.user, "guest")
self.password = default(self.password, "guest")
def _die(self, message = None):
if message: print message
print """
Options:
-h/--help : this message
-s/--spec <spec.xml> : file containing amqp XML spec
-p/--phase : test phase to run
-b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
"""
sys.exit(1)
def default(value, default):
if (value == None): return default
else: return value
if __name__ == "__main__":
test = PersistenceTest()
if not test.run(): sys.exit(1)