| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| # setup, usage, teardown, errors(sync), errors(async), stress, soak, |
| # boundary-conditions, config |
| |
| import errno, os, socket, sys, time |
| from qpid import compat |
| from qpid.compat import set |
| from qpid.messaging import * |
| from qpid.messaging.transports import TRANSPORTS |
| from qpid.tests.messaging import Base |
| from threading import Thread |
| |
| class SetupTests(Base): |
| |
| def testEstablish(self): |
| self.conn = Connection.establish(self.broker, **self.connection_options()) |
| self.ping(self.conn.session()) |
| |
| def testOpen(self): |
| self.conn = Connection(self.broker, **self.connection_options()) |
| self.conn.open() |
| self.ping(self.conn.session()) |
| |
| def testOpenReconnectURLs(self): |
| options = self.connection_options() |
| options["reconnect_urls"] = [self.broker, self.broker] |
| self.conn = Connection(self.broker, **options) |
| self.conn.open() |
| self.ping(self.conn.session()) |
| |
| def testTcpNodelay(self): |
| self.conn = Connection.establish(self.broker, tcp_nodelay=True) |
| assert self.conn._driver._transport.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) |
| |
| def testConnectError(self): |
| try: |
| # Specifying port 0 yields a bad address on Windows; port 4 is unassigned |
| self.conn = Connection.establish("localhost:4") |
| assert False, "connect succeeded" |
| except ConnectError, e: |
| assert "refused" in str(e) |
| |
| def testGetError(self): |
| self.conn = Connection("localhost:0") |
| try: |
| self.conn.open() |
| assert False, "connect succeeded" |
| except ConnectError, e: |
| assert self.conn.get_error() == e |
| |
| def use_fds(self): |
| fds = [] |
| try: |
| while True: |
| fds.append(os.open(getattr(os, "devnull", "/dev/null"), os.O_RDONLY)) |
| except OSError, e: |
| if e.errno != errno.EMFILE: |
| raise e |
| else: |
| return fds |
| |
| def testOpenCloseResourceLeaks(self): |
| fds = self.use_fds() |
| try: |
| for i in range(32): |
| if fds: os.close(fds.pop()) |
| for i in xrange(64): |
| conn = Connection.establish(self.broker, **self.connection_options()) |
| conn.close() |
| finally: |
| while fds: |
| os.close(fds.pop()) |
| |
| def testOpenFailResourceLeaks(self): |
| fds = self.use_fds() |
| try: |
| for i in range(32): |
| if fds: os.close(fds.pop()) |
| for i in xrange(64): |
| conn = Connection("localhost:0", **self.connection_options()) |
| # XXX: we need to force a waiter to be created for this test |
| # to work |
| conn._lock.acquire() |
| conn._wait(lambda: False, timeout=0.001) |
| conn._lock.release() |
| try: |
| conn.open() |
| except ConnectError, e: |
| pass |
| finally: |
| while fds: |
| os.close(fds.pop()) |
| |
| def testReconnect(self): |
| options = self.connection_options() |
| real = TRANSPORTS["tcp"] |
| |
| class flaky: |
| |
| def __init__(self, conn, host, port): |
| self.real = real(conn, host, port) |
| self.sent_count = 0 |
| self.recv_count = 0 |
| |
| def fileno(self): |
| return self.real.fileno() |
| |
| def reading(self, reading): |
| return self.real.reading(reading) |
| |
| def writing(self, writing): |
| return self.real.writing(writing) |
| |
| def send(self, bytes): |
| if self.sent_count > 2048: |
| raise socket.error("fake error") |
| n = self.real.send(bytes) |
| self.sent_count += n |
| return n |
| |
| def recv(self, n): |
| if self.recv_count > 2048: |
| return "" |
| bytes = self.real.recv(n) |
| self.recv_count += len(bytes) |
| return bytes |
| |
| def close(self): |
| self.real.close() |
| |
| TRANSPORTS["flaky"] = flaky |
| |
| options["reconnect"] = True |
| options["reconnect_interval"] = 0 |
| options["reconnect_limit"] = 100 |
| options["reconnect_log"] = False |
| options["transport"] = "flaky" |
| |
| self.conn = Connection.establish(self.broker, **options) |
| ssn = self.conn.session() |
| snd = ssn.sender("test-reconnect-queue; {create: always, delete: always}") |
| rcv = ssn.receiver(snd.target) |
| |
| msgs = [self.message("testReconnect", i) for i in range(20)] |
| for m in msgs: |
| snd.send(m) |
| |
| content = set() |
| drained = [] |
| duplicates = [] |
| try: |
| while True: |
| m = rcv.fetch(timeout=0) |
| if m.content not in content: |
| content.add(m.content) |
| drained.append(m) |
| else: |
| duplicates.append(m) |
| ssn.acknowledge(m) |
| except Empty: |
| pass |
| # XXX: apparently we don't always get duplicates, should figure out why |
| #assert duplicates, "no duplicates" |
| assert len(drained) == len(msgs) |
| for m, d in zip(msgs, drained): |
| # XXX: we should figure out how to provide proper end to end |
| # redelivered |
| self.assertEcho(m, d, d.redelivered) |
| |
| class ConnectionTests(Base): |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def testCheckClosed(self): |
| assert not self.conn.check_closed() |
| |
| def testSessionAnon(self): |
| ssn1 = self.conn.session() |
| ssn2 = self.conn.session() |
| self.ping(ssn1) |
| self.ping(ssn2) |
| assert ssn1 is not ssn2 |
| |
| def testSessionNamed(self): |
| ssn1 = self.conn.session("one") |
| ssn2 = self.conn.session("two") |
| self.ping(ssn1) |
| self.ping(ssn2) |
| assert ssn1 is not ssn2 |
| assert ssn1 is self.conn.session("one") |
| assert ssn2 is self.conn.session("two") |
| |
| def testDetach(self): |
| ssn = self.conn.session() |
| self.ping(ssn) |
| self.conn.detach() |
| try: |
| self.ping(ssn) |
| assert False, "ping succeeded" |
| except Detached: |
| # this is the expected failure when pinging on a detached |
| # connection |
| pass |
| self.conn.attach() |
| self.ping(ssn) |
| |
| def testClose(self): |
| self.conn.close() |
| assert not self.conn.attached() |
| |
| def testSimultaneousClose(self): |
| ssns = [self.conn.session() for i in range(3)] |
| for s in ssns: |
| for i in range(3): |
| s.receiver("amq.topic") |
| s.sender("amq.topic") |
| |
| def closer(errors): |
| try: |
| self.conn.close() |
| except: |
| _, e, _ = sys.exc_info() |
| errors.append(compat.format_exc(e)) |
| |
| t1_errors = [] |
| t2_errors = [] |
| t1 = Thread(target=lambda: closer(t1_errors)) |
| t2 = Thread(target=lambda: closer(t2_errors)) |
| t1.start() |
| t2.start() |
| t1.join(self.delay()) |
| t2.join(self.delay()) |
| |
| assert not t1_errors, t1_errors[0] |
| assert not t2_errors, t2_errors[0] |
| |
| class hangable: |
| |
| def __init__(self, conn, host, port): |
| self.tcp = TRANSPORTS["tcp"](conn, host, port) |
| self.hung = False |
| |
| def hang(self): |
| self.hung = True |
| |
| def fileno(self): |
| return self.tcp.fileno() |
| |
| def reading(self, reading): |
| if self.hung: |
| return True |
| else: |
| return self.tcp.reading(reading) |
| |
| def writing(self, writing): |
| if self.hung: |
| return False |
| else: |
| return self.tcp.writing(writing) |
| |
| def send(self, bytes): |
| if self.hung: |
| return 0 |
| else: |
| return self.tcp.send(bytes) |
| |
| def recv(self, n): |
| if self.hung: |
| return "" |
| else: |
| return self.tcp.recv(n) |
| |
| def close(self): |
| self.tcp.close() |
| |
| TRANSPORTS["hangable"] = hangable |
| |
| class TimeoutTests(Base): |
| |
| def setup_connection(self): |
| options = self.connection_options() |
| options["transport"] = "hangable" |
| return Connection.establish(self.broker, **options) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def setup_sender(self): |
| return self.ssn.sender("amq.topic") |
| |
| def setup_receiver(self): |
| return self.ssn.receiver("amq.topic; {link: {reliability: unreliable}}") |
| |
| def teardown_connection(self, conn): |
| try: |
| conn.detach(timeout=0) |
| except Timeout: |
| pass |
| |
| def hang(self): |
| self.conn._driver._transport.hang() |
| |
| def timeoutTest(self, method): |
| self.hang() |
| try: |
| method(timeout=self.delay()) |
| assert False, "did not time out" |
| except Timeout: |
| pass |
| |
| def testSenderSync(self): |
| self.snd.send(self.content("testSenderSync"), sync=False) |
| self.timeoutTest(self.snd.sync) |
| |
| def testSenderClose(self): |
| self.snd.send(self.content("testSenderClose"), sync=False) |
| self.timeoutTest(self.snd.close) |
| |
| def testReceiverClose(self): |
| self.timeoutTest(self.rcv.close) |
| |
| def testSessionSync(self): |
| self.snd.send(self.content("testSessionSync"), sync=False) |
| self.timeoutTest(self.ssn.sync) |
| |
| def testSessionClose(self): |
| self.timeoutTest(self.ssn.close) |
| |
| def testConnectionDetach(self): |
| self.timeoutTest(self.conn.detach) |
| |
| def testConnectionClose(self): |
| self.timeoutTest(self.conn.close) |
| |
| def testConnectionOpen(self): |
| options = self.connection_options() |
| options["reconnect"] = True |
| options["reconnect_timeout"] = self.delay() |
| try: |
| bad_conn = Connection.establish("badhostname", **options) |
| assert False, "did not time out" |
| except Timeout: |
| pass |
| |
| ACK_QC = 'test-ack-queue; {create: always}' |
| ACK_QD = 'test-ack-queue; {delete: always}' |
| |
| class SessionTests(Base): |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def testSender(self): |
| snd = self.ssn.sender('test-snd-queue; {create: sender, delete: receiver}', |
| durable=self.durable()) |
| snd2 = self.ssn.sender(snd.target, durable=self.durable()) |
| assert snd is not snd2 |
| snd2.close() |
| |
| content = self.content("testSender") |
| snd.send(content) |
| rcv = self.ssn.receiver(snd.target) |
| msg = rcv.fetch(0) |
| assert msg.content == content |
| self.ssn.acknowledge(msg) |
| |
| def testReceiver(self): |
| rcv = self.ssn.receiver('test-rcv-queue; {create: always}') |
| rcv2 = self.ssn.receiver(rcv.source) |
| assert rcv is not rcv2 |
| rcv2.close() |
| |
| content = self.content("testReceiver") |
| snd = self.ssn.sender(rcv.source, durable=self.durable()) |
| snd.send(content) |
| msg = rcv.fetch(0) |
| assert msg.content == content |
| self.ssn.acknowledge(msg) |
| snd2 = self.ssn.receiver('test-rcv-queue; {delete: always}') |
| |
| def testDetachedReceiver(self): |
| self.conn.detach() |
| rcv = self.ssn.receiver("test-dis-rcv-queue; {create: always, delete: always}") |
| m = self.content("testDetachedReceiver") |
| self.conn.attach() |
| snd = self.ssn.sender("test-dis-rcv-queue") |
| snd.send(m) |
| self.drain(rcv, expected=[m]) |
| |
| def testNextReceiver(self): |
| ADDR = 'test-next-rcv-queue; {create: always, delete: always}' |
| rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) |
| rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) |
| rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) |
| |
| snd = self.ssn.sender(ADDR) |
| |
| msgs = [] |
| for i in range(10): |
| content = self.content("testNextReceiver", i) |
| snd.send(content) |
| msgs.append(content) |
| |
| fetched = [] |
| try: |
| while True: |
| rcv = self.ssn.next_receiver(timeout=self.delay()) |
| assert rcv in (rcv1, rcv2, rcv3) |
| assert rcv.available() > 0 |
| fetched.append(rcv.fetch().content) |
| except Empty: |
| pass |
| assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) |
| self.ssn.acknowledge() |
| #we set the capacity to 0 to prevent the deletion of the queue - |
| #triggered the deletion policy when the first receiver is closed - |
| #resulting in session exceptions being issued for the remaining |
| #active subscriptions: |
| for r in [rcv1, rcv2, rcv3]: |
| r.capacity = 0 |
| |
| # XXX, we need a convenient way to assert that required queues are |
| # empty on setup, and possibly also to drain queues on teardown |
| def ackTest(self, acker, ack_capacity=None): |
| # send a bunch of messages |
| snd = self.ssn.sender(ACK_QC, durable=self.durable()) |
| contents = [self.content("ackTest", i) for i in range(15)] |
| for c in contents: |
| snd.send(c) |
| |
| # drain the queue, verify the messages are there and then close |
| # without acking |
| rcv = self.ssn.receiver(ACK_QC) |
| self.drain(rcv, expected=contents) |
| self.ssn.close() |
| |
| # drain the queue again, verify that they are all the messages |
| # were requeued, and ack this time before closing |
| self.ssn = self.conn.session() |
| if ack_capacity is not None: |
| self.ssn.ack_capacity = ack_capacity |
| rcv = self.ssn.receiver(ACK_QC) |
| self.drain(rcv, expected=contents) |
| acker(self.ssn) |
| self.ssn.close() |
| |
| # drain the queue a final time and verify that the messages were |
| # dequeued |
| self.ssn = self.conn.session() |
| rcv = self.ssn.receiver(ACK_QD) |
| self.assertEmpty(rcv) |
| |
| def testAcknowledge(self): |
| self.ackTest(lambda ssn: ssn.acknowledge()) |
| |
| def testAcknowledgeAsync(self): |
| self.ackTest(lambda ssn: ssn.acknowledge(sync=False)) |
| |
| def testAcknowledgeAsyncAckCap0(self): |
| try: |
| try: |
| self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0) |
| assert False, "acknowledge shouldn't succeed with ack_capacity of zero" |
| except InsufficientCapacity: |
| pass |
| finally: |
| self.ssn.ack_capacity = UNLIMITED |
| self.drain(self.ssn.receiver(ACK_QD)) |
| self.ssn.acknowledge() |
| |
| def testAcknowledgeAsyncAckCap1(self): |
| self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1) |
| |
| def testAcknowledgeAsyncAckCap5(self): |
| self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5) |
| |
| def testAcknowledgeAsyncAckCapUNLIMITED(self): |
| self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED) |
| |
| def testRelease(self): |
| msgs = [self.message("testRelease", i) for i in range(3)] |
| snd = self.ssn.sender("test-release-queue; {create: always, delete: always}") |
| for m in msgs: |
| snd.send(m) |
| rcv = self.ssn.receiver(snd.target) |
| echos = self.drain(rcv, expected=msgs) |
| self.ssn.acknowledge(echos[0]) |
| self.ssn.acknowledge(echos[1], Disposition(RELEASED, set_redelivered=True)) |
| self.ssn.acknowledge(echos[2], Disposition(RELEASED)) |
| self.drain(rcv, limit=1, expected=msgs[1:2], redelivered=True) |
| self.drain(rcv, expected=msgs[2:3]) |
| self.ssn.acknowledge() |
| |
| def testReject(self): |
| msgs = [self.message("testReject", i) for i in range(3)] |
| snd = self.ssn.sender(""" |
| test-reject-queue; { |
| create: always, |
| delete: always, |
| node: { |
| x-declare: { |
| alternate-exchange: 'amq.topic' |
| } |
| } |
| } |
| """) |
| for m in msgs: |
| snd.send(m) |
| rcv = self.ssn.receiver(snd.target) |
| rej = self.ssn.receiver("amq.topic") |
| echos = self.drain(rcv, expected=msgs) |
| self.ssn.acknowledge(echos[0]) |
| self.ssn.acknowledge(echos[1], Disposition(REJECTED)) |
| self.ssn.acknowledge(echos[2], |
| Disposition(REJECTED, code=0, text="test-reject")) |
| self.drain(rej, expected=msgs[1:]) |
| self.ssn.acknowledge() |
| |
| def send(self, ssn, target, base, count=1): |
| snd = ssn.sender(target, durable=self.durable()) |
| messages = [] |
| for i in range(count): |
| c = self.message(base, i) |
| snd.send(c) |
| messages.append(c) |
| snd.close() |
| return messages |
| |
| def txTest(self, commit): |
| TX_Q = 'test-tx-queue; {create: sender, delete: receiver}' |
| TX_Q_COPY = 'test-tx-queue-copy; {create: always, delete: always}' |
| txssn = self.conn.session(transactional=True) |
| messages = self.send(self.ssn, TX_Q, "txTest", 3) |
| txrcv = txssn.receiver(TX_Q) |
| txsnd = txssn.sender(TX_Q_COPY, durable=self.durable()) |
| rcv = self.ssn.receiver(txrcv.source) |
| copy_rcv = self.ssn.receiver(txsnd.target) |
| self.assertEmpty(copy_rcv) |
| for i in range(3): |
| m = txrcv.fetch(0) |
| txsnd.send(m) |
| self.assertEmpty(copy_rcv) |
| txssn.acknowledge() |
| if commit: |
| txssn.commit() |
| self.assertEmpty(rcv) |
| self.drain(copy_rcv, expected=messages) |
| else: |
| txssn.rollback() |
| self.drain(rcv, expected=messages, redelivered=True) |
| self.assertEmpty(copy_rcv) |
| self.ssn.acknowledge() |
| |
| def testCommit(self): |
| self.txTest(True) |
| |
| def testRollback(self): |
| self.txTest(False) |
| |
| def txTestSend(self, commit): |
| TX_SEND_Q = 'test-tx-send-queue; {create: sender, delete: receiver}' |
| txssn = self.conn.session(transactional=True) |
| messages = self.send(txssn, TX_SEND_Q, "txTestSend", 3) |
| rcv = self.ssn.receiver(TX_SEND_Q) |
| self.assertEmpty(rcv) |
| |
| if commit: |
| txssn.commit() |
| self.drain(rcv, expected=messages) |
| self.ssn.acknowledge() |
| else: |
| txssn.rollback() |
| self.assertEmpty(rcv) |
| txssn.commit() |
| self.assertEmpty(rcv) |
| |
| def testCommitSend(self): |
| self.txTestSend(True) |
| |
| def testRollbackSend(self): |
| self.txTestSend(False) |
| |
| def txTestAck(self, commit): |
| TX_ACK_QC = 'test-tx-ack-queue; {create: always}' |
| TX_ACK_QD = 'test-tx-ack-queue; {delete: always}' |
| txssn = self.conn.session(transactional=True) |
| txrcv = txssn.receiver(TX_ACK_QC) |
| self.assertEmpty(txrcv) |
| messages = self.send(self.ssn, TX_ACK_QC, "txTestAck", 3) |
| self.drain(txrcv, expected=messages) |
| |
| if commit: |
| txssn.acknowledge() |
| else: |
| txssn.rollback() |
| self.drain(txrcv, expected=messages, redelivered=True) |
| txssn.acknowledge() |
| txssn.rollback() |
| self.drain(txrcv, expected=messages, redelivered=True) |
| txssn.commit() # commit without ack |
| self.assertEmpty(txrcv) |
| |
| txssn.close() |
| |
| txssn = self.conn.session(transactional=True) |
| txrcv = txssn.receiver(TX_ACK_QC) |
| self.drain(txrcv, expected=messages, redelivered=True) |
| txssn.acknowledge() |
| txssn.commit() |
| rcv = self.ssn.receiver(TX_ACK_QD) |
| self.assertEmpty(rcv) |
| txssn.close() |
| self.assertEmpty(rcv) |
| |
| def testCommitAck(self): |
| self.txTestAck(True) |
| |
| def testRollbackAck(self): |
| self.txTestAck(False) |
| |
| def testDoubleCommit(self): |
| ssn = self.conn.session(transactional=True) |
| snd = ssn.sender("amq.direct/doubleCommit") |
| rcv = ssn.receiver("amq.direct/doubleCommit") |
| msgs = [self.message("testDoubleCommit", i, subject="doubleCommit") for i in range(3)] |
| for m in msgs: |
| snd.send(m) |
| ssn.commit() |
| self.drain(rcv, expected=msgs) |
| ssn.acknowledge() |
| ssn.commit() |
| |
| def testClose(self): |
| self.ssn.close() |
| try: |
| self.ping(self.ssn) |
| assert False, "ping succeeded" |
| except Detached: |
| pass |
| |
| def testRxCallback(self): |
| """Verify that the callback is invoked when a message is received. |
| """ |
| ADDR = 'test-rx_callback-queue; {create: always, delete: receiver}' |
| class CallbackHandler: |
| def __init__(self): |
| self.handler_called = False |
| def __call__(self): |
| self.handler_called = True |
| cb = CallbackHandler() |
| self.ssn.set_message_received_handler(cb) |
| rcv = self.ssn.receiver(ADDR) |
| rcv.capacity = UNLIMITED |
| snd = self.ssn.sender(ADDR) |
| assert not cb.handler_called |
| snd.send("Ping") |
| deadline = time.time() + self.timeout() |
| while time.time() < deadline: |
| if cb.handler_called: |
| break; |
| assert cb.handler_called |
| snd.close() |
| rcv.close() |
| |
| |
| RECEIVER_Q = 'test-receiver-queue; {create: always, delete: always}' |
| |
| class ReceiverTests(Base): |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def setup_sender(self): |
| return self.ssn.sender(RECEIVER_Q) |
| |
| def setup_receiver(self): |
| return self.ssn.receiver(RECEIVER_Q) |
| |
| def send(self, base, count = None, sync=True): |
| content = self.content(base, count) |
| self.snd.send(content, sync=sync) |
| return content |
| |
| def testFetch(self): |
| try: |
| msg = self.rcv.fetch(0) |
| assert False, "unexpected message: %s" % msg |
| except Empty: |
| pass |
| try: |
| start = time.time() |
| msg = self.rcv.fetch(self.delay()) |
| assert False, "unexpected message: %s" % msg |
| except Empty: |
| elapsed = time.time() - start |
| assert elapsed >= self.delay() |
| |
| one = self.send("testFetch", 1) |
| two = self.send("testFetch", 2) |
| three = self.send("testFetch", 3) |
| msg = self.rcv.fetch(0) |
| assert msg.content == one |
| msg = self.rcv.fetch(self.delay()) |
| assert msg.content == two |
| msg = self.rcv.fetch() |
| assert msg.content == three |
| self.ssn.acknowledge() |
| |
| def fetchFromClosedTest(self, entry): |
| entry.close() |
| try: |
| msg = self.rcv.fetch(0) |
| assert False, "unexpected result: %s" % msg |
| except Empty, e: |
| assert False, "unexpected exception: %s" % e |
| except LinkClosed, e: |
| pass |
| |
| def testFetchFromClosedReceiver(self): |
| self.fetchFromClosedTest(self.rcv) |
| |
| def testFetchFromClosedSession(self): |
| self.fetchFromClosedTest(self.ssn) |
| |
| def testFetchFromClosedConnection(self): |
| self.fetchFromClosedTest(self.conn) |
| |
| def fetchFromConcurrentCloseTest(self, entry): |
| def closer(): |
| self.sleep() |
| entry.close() |
| t = Thread(target=closer) |
| t.start() |
| try: |
| msg = self.rcv.fetch() |
| assert False, "unexpected result: %s" % msg |
| except Empty, e: |
| assert False, "unexpected exception: %s" % e |
| except LinkClosed, e: |
| pass |
| t.join() |
| |
| def testFetchFromConcurrentCloseReceiver(self): |
| self.fetchFromConcurrentCloseTest(self.rcv) |
| |
| def testFetchFromConcurrentCloseSession(self): |
| self.fetchFromConcurrentCloseTest(self.ssn) |
| |
| def testFetchFromConcurrentCloseConnection(self): |
| self.fetchFromConcurrentCloseTest(self.conn) |
| |
| def testCapacityIncrease(self): |
| content = self.send("testCapacityIncrease") |
| self.sleep() |
| assert self.rcv.available() == 0 |
| self.rcv.capacity = UNLIMITED |
| self.sleep() |
| assert self.rcv.available() == 1 |
| msg = self.rcv.fetch(0) |
| assert msg.content == content |
| assert self.rcv.available() == 0 |
| self.ssn.acknowledge() |
| |
| def testCapacityDecrease(self): |
| self.rcv.capacity = UNLIMITED |
| one = self.send("testCapacityDecrease", 1) |
| self.sleep() |
| assert self.rcv.available() == 1 |
| msg = self.rcv.fetch(0) |
| assert msg.content == one |
| |
| self.rcv.capacity = 0 |
| |
| two = self.send("testCapacityDecrease", 2) |
| self.sleep() |
| assert self.rcv.available() == 0 |
| msg = self.rcv.fetch(0) |
| assert msg.content == two |
| |
| self.ssn.acknowledge() |
| |
| def capacityTest(self, capacity, threshold=None): |
| if threshold is not None: |
| self.rcv.threshold = threshold |
| self.rcv.capacity = capacity |
| self.assertAvailable(self.rcv, 0) |
| |
| for i in range(2*capacity): |
| self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False) |
| self.snd.sync() |
| self.sleep() |
| self.assertAvailable(self.rcv) |
| |
| first = capacity/2 |
| second = capacity - first |
| self.drain(self.rcv, limit = first) |
| self.sleep() |
| self.assertAvailable(self.rcv) |
| self.drain(self.rcv, limit = second) |
| self.sleep() |
| self.assertAvailable(self.rcv) |
| |
| drained = self.drain(self.rcv) |
| assert len(drained) == capacity, "%s, %s" % (len(drained), drained) |
| self.assertAvailable(self.rcv, 0) |
| |
| self.ssn.acknowledge() |
| |
| def testCapacity5(self): |
| self.capacityTest(5) |
| |
| def testCapacity5Threshold1(self): |
| self.capacityTest(5, 1) |
| |
| def testCapacity10(self): |
| self.capacityTest(10) |
| |
| def testCapacity10Threshold1(self): |
| self.capacityTest(10, 1) |
| |
| def testCapacity100(self): |
| self.capacityTest(100) |
| |
| def testCapacity100Threshold1(self): |
| self.capacityTest(100, 1) |
| |
| def testCapacityUNLIMITED(self): |
| self.rcv.capacity = UNLIMITED |
| self.assertAvailable(self.rcv, 0) |
| |
| for i in range(10): |
| self.send("testCapacityUNLIMITED", i) |
| self.sleep() |
| self.assertAvailable(self.rcv, 10) |
| |
| self.drain(self.rcv) |
| self.assertAvailable(self.rcv, 0) |
| |
| self.ssn.acknowledge() |
| |
| def testAvailable(self): |
| self.rcv.capacity = UNLIMITED |
| assert self.rcv.available() == 0 |
| |
| for i in range(3): |
| self.send("testAvailable", i) |
| self.sleep() |
| assert self.rcv.available() == 3 |
| |
| for i in range(3, 10): |
| self.send("testAvailable", i) |
| self.sleep() |
| assert self.rcv.available() == 10 |
| |
| self.drain(self.rcv, limit=3) |
| assert self.rcv.available() == 7 |
| |
| self.drain(self.rcv) |
| assert self.rcv.available() == 0 |
| |
| self.ssn.acknowledge() |
| |
| def testDoubleClose(self): |
| m1 = self.content("testDoubleClose", 1) |
| m2 = self.content("testDoubleClose", 2) |
| |
| snd = self.ssn.sender("""test-double-close; { |
| create: always, |
| delete: sender, |
| node: { |
| type: topic |
| } |
| } |
| """) |
| r1 = self.ssn.receiver(snd.target) |
| r2 = self.ssn.receiver(snd.target) |
| snd.send(m1) |
| self.drain(r1, expected=[m1]) |
| self.drain(r2, expected=[m1]) |
| r1.close() |
| snd.send(m2) |
| self.drain(r2, expected=[m2]) |
| r2.close() |
| |
| # XXX: need testClose |
| |
| def testMode(self): |
| msgs = [self.content("testMode", 1), |
| self.content("testMode", 2), |
| self.content("testMode", 3)] |
| |
| for m in msgs: |
| self.snd.send(m) |
| |
| rb = self.ssn.receiver('test-receiver-queue; {mode: browse}') |
| rc = self.ssn.receiver('test-receiver-queue; {mode: consume}') |
| self.drain(rb, expected=msgs) |
| self.drain(rc, expected=msgs) |
| rb2 = self.ssn.receiver(rb.source) |
| self.assertEmpty(rb2) |
| self.drain(self.rcv, expected=[]) |
| |
| def testUnsettled(self): |
| # just tests the code path and not the value |
| rcv = self.ssn.receiver('test-receiver-unsettled-queue; {create: always, delete: always}') |
| rcv.unsettled() |
| |
| def unreliabilityTest(self, mode="unreliable"): |
| msgs = [self.message("testUnreliable", i) for i in range(3)] |
| snd = self.ssn.sender("test-unreliability-queue; {create: sender, delete: receiver}") |
| rcv = self.ssn.receiver(snd.target) |
| for m in msgs: |
| snd.send(m) |
| |
| # close without ack on reliable receiver, messages should be requeued |
| ssn = self.conn.session() |
| rrcv = ssn.receiver("test-unreliability-queue") |
| self.drain(rrcv, expected=msgs) |
| ssn.close() |
| |
| # close without ack on unreliable receiver, messages should not be requeued |
| ssn = self.conn.session() |
| urcv = ssn.receiver("test-unreliability-queue; {link: {reliability: %s}}" % mode) |
| self.drain(urcv, expected=msgs, redelivered=True) |
| ssn.close() |
| |
| self.assertEmpty(rcv) |
| |
| def testUnreliable(self): |
| self.unreliabilityTest(mode="unreliable") |
| |
| def testAtMostOnce(self): |
| self.unreliabilityTest(mode="at-most-once") |
| |
| class AddressTests(Base): |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def badOption(self, options, error): |
| try: |
| self.ssn.sender("test-bad-options-snd; %s" % options) |
| assert False |
| except InvalidOption, e: |
| assert "error in options: %s" % error == str(e), e |
| |
| try: |
| self.ssn.receiver("test-bad-options-rcv; %s" % options) |
| assert False |
| except InvalidOption, e: |
| assert "error in options: %s" % error == str(e), e |
| |
| def testIllegalKey(self): |
| self.badOption("{create: always, node: " |
| "{this-property-does-not-exist: 3}}", |
| "node: this-property-does-not-exist: " |
| "illegal key") |
| |
| def testWrongValue(self): |
| self.badOption("{create: asdf}", "create: asdf not in " |
| "('always', 'sender', 'receiver', 'never')") |
| |
| def testWrongType1(self): |
| self.badOption("{node: asdf}", |
| "node: asdf is not a map") |
| |
| def testWrongType2(self): |
| self.badOption("{node: {durable: []}}", |
| "node: durable: [] is not a bool") |
| |
| def testCreateQueue(self): |
| snd = self.ssn.sender("test-create-queue; {create: always, delete: always, " |
| "node: {type: queue, durable: False, " |
| "x-declare: {auto_delete: true}}}") |
| content = self.content("testCreateQueue") |
| snd.send(content) |
| rcv = self.ssn.receiver("test-create-queue") |
| self.drain(rcv, expected=[content]) |
| |
| def createExchangeTest(self, props=""): |
| addr = """test-create-exchange; { |
| create: always, |
| delete: always, |
| node: { |
| type: topic, |
| durable: False, |
| x-declare: {auto_delete: true, %s} |
| } |
| }""" % props |
| snd = self.ssn.sender(addr) |
| snd.send("ping") |
| rcv1 = self.ssn.receiver("test-create-exchange/first") |
| rcv2 = self.ssn.receiver("test-create-exchange/first") |
| rcv3 = self.ssn.receiver("test-create-exchange/second") |
| for r in (rcv1, rcv2, rcv3): |
| try: |
| r.fetch(0) |
| assert False |
| except Empty: |
| pass |
| msg1 = Message(self.content("testCreateExchange", 1), subject="first") |
| msg2 = Message(self.content("testCreateExchange", 2), subject="second") |
| snd.send(msg1) |
| snd.send(msg2) |
| self.drain(rcv1, expected=[msg1.content]) |
| self.drain(rcv2, expected=[msg1.content]) |
| self.drain(rcv3, expected=[msg2.content]) |
| |
| def testCreateExchange(self): |
| self.createExchangeTest() |
| |
| def testCreateExchangeDirect(self): |
| self.createExchangeTest("type: direct") |
| |
| def testCreateExchangeTopic(self): |
| self.createExchangeTest("type: topic") |
| |
| def testDeleteBySender(self): |
| snd = self.ssn.sender("test-delete; {create: always}") |
| snd.send("ping") |
| snd.close() |
| snd = self.ssn.sender("test-delete; {delete: always}") |
| snd.send("ping") |
| snd.close() |
| try: |
| self.ssn.sender("test-delete") |
| except NotFound, e: |
| assert "no such queue" in str(e) |
| |
| def testDeleteByReceiver(self): |
| rcv = self.ssn.receiver("test-delete; {create: always, delete: always}") |
| try: |
| rcv.fetch(0) |
| except Empty: |
| pass |
| rcv.close() |
| |
| try: |
| self.ssn.receiver("test-delete") |
| assert False |
| except NotFound, e: |
| assert "no such queue" in str(e) |
| |
| def testDeleteSpecial(self): |
| snd = self.ssn.sender("amq.topic; {delete: always}") |
| snd.send("asdf") |
| try: |
| snd.close() |
| assert False, "successfully deleted amq.topic" |
| except SessionError, e: |
| assert e.code == 530 |
| # XXX: need to figure out close after error |
| self.conn._remove_session(self.ssn) |
| |
| def testNodeBindingsQueue(self): |
| snd = self.ssn.sender(""" |
| test-node-bindings-queue; { |
| create: always, |
| delete: always, |
| node: { |
| x-bindings: [{exchange: "amq.topic", key: "a.#"}, |
| {exchange: "amq.direct", key: "b"}, |
| {exchange: "amq.topic", key: "c.*"}] |
| } |
| } |
| """) |
| snd.send("one") |
| snd_a = self.ssn.sender("amq.topic/a.foo") |
| snd_b = self.ssn.sender("amq.direct/b") |
| snd_c = self.ssn.sender("amq.topic/c.bar") |
| snd_a.send("two") |
| snd_b.send("three") |
| snd_c.send("four") |
| rcv = self.ssn.receiver("test-node-bindings-queue") |
| self.drain(rcv, expected=["one", "two", "three", "four"]) |
| |
| def testNodeBindingsTopic(self): |
| rcv = self.ssn.receiver("test-node-bindings-topic-queue; {create: always, delete: always}") |
| rcv_a = self.ssn.receiver("test-node-bindings-topic-queue-a; {create: always, delete: always}") |
| rcv_b = self.ssn.receiver("test-node-bindings-topic-queue-b; {create: always, delete: always}") |
| rcv_c = self.ssn.receiver("test-node-bindings-topic-queue-c; {create: always, delete: always}") |
| snd = self.ssn.sender(""" |
| test-node-bindings-topic; { |
| create: always, |
| delete: always, |
| node: { |
| type: topic, |
| x-bindings: [{queue: test-node-bindings-topic-queue, key: "#"}, |
| {queue: test-node-bindings-topic-queue-a, key: "a.#"}, |
| {queue: test-node-bindings-topic-queue-b, key: "b"}, |
| {queue: test-node-bindings-topic-queue-c, key: "c.*"}] |
| } |
| } |
| """) |
| m1 = Message("one") |
| m2 = Message(subject="a.foo", content="two") |
| m3 = Message(subject="b", content="three") |
| m4 = Message(subject="c.bar", content="four") |
| snd.send(m1) |
| snd.send(m2) |
| snd.send(m3) |
| snd.send(m4) |
| self.drain(rcv, expected=[m1, m2, m3, m4]) |
| self.drain(rcv_a, expected=[m2]) |
| self.drain(rcv_b, expected=[m3]) |
| self.drain(rcv_c, expected=[m4]) |
| |
| def testLinkBindings(self): |
| m_a = self.message("testLinkBindings", 1, subject="a") |
| m_b = self.message("testLinkBindings", 2, subject="b") |
| |
| self.ssn.sender("test-link-bindings-queue; {create: always, delete: always}") |
| snd = self.ssn.sender("amq.topic") |
| |
| snd.send(m_a) |
| snd.send(m_b) |
| snd.close() |
| |
| rcv = self.ssn.receiver("test-link-bindings-queue") |
| self.assertEmpty(rcv) |
| |
| snd = self.ssn.sender(""" |
| amq.topic; { |
| link: { |
| x-bindings: [{queue: test-link-bindings-queue, key: a}] |
| } |
| } |
| """) |
| |
| snd.send(m_a) |
| snd.send(m_b) |
| |
| self.drain(rcv, expected=[m_a]) |
| rcv.close() |
| |
| rcv = self.ssn.receiver(""" |
| test-link-bindings-queue; { |
| link: { |
| x-bindings: [{exchange: "amq.topic", key: b}] |
| } |
| } |
| """) |
| |
| snd.send(m_a) |
| snd.send(m_b) |
| |
| self.drain(rcv, expected=[m_a, m_b]) |
| |
| def testSubjectOverride(self): |
| snd = self.ssn.sender("amq.topic/a") |
| rcv_a = self.ssn.receiver("amq.topic/a") |
| rcv_b = self.ssn.receiver("amq.topic/b") |
| m1 = self.content("testSubjectOverride", 1) |
| m2 = self.content("testSubjectOverride", 2) |
| snd.send(m1) |
| snd.send(Message(subject="b", content=m2)) |
| self.drain(rcv_a, expected=[m1]) |
| self.drain(rcv_b, expected=[m2]) |
| |
| def testSubjectDefault(self): |
| m1 = self.content("testSubjectDefault", 1) |
| m2 = self.content("testSubjectDefault", 2) |
| snd = self.ssn.sender("amq.topic/a") |
| rcv = self.ssn.receiver("amq.topic") |
| snd.send(m1) |
| snd.send(Message(subject="b", content=m2)) |
| e1 = rcv.fetch(timeout=0) |
| e2 = rcv.fetch(timeout=0) |
| assert e1.subject == "a", "subject: %s" % e1.subject |
| assert e2.subject == "b", "subject: %s" % e2.subject |
| self.assertEmpty(rcv) |
| |
| def doReliabilityTest(self, reliability, messages, expected): |
| snd = self.ssn.sender("amq.topic") |
| rcv = self.ssn.receiver("amq.topic; {link: {reliability: %s}}" % reliability) |
| for m in messages: |
| snd.send(m) |
| self.conn.detach() |
| self.conn.attach() |
| self.drain(rcv, expected=expected) |
| |
| def testReliabilityUnreliable(self): |
| msgs = [self.message("testReliabilityUnreliable", i) for i in range(3)] |
| self.doReliabilityTest("unreliable", msgs, []) |
| |
| def testReliabilityAtLeastOnce(self): |
| msgs = [self.message("testReliabilityAtLeastOnce", i) for i in range(3)] |
| self.doReliabilityTest("at-least-once", msgs, msgs) |
| |
| def testLinkName(self): |
| msgs = [self.message("testLinkName", i) for i in range(3)] |
| snd = self.ssn.sender("amq.topic") |
| trcv = self.ssn.receiver("amq.topic; {link: {name: test-link-name}}") |
| qrcv = self.ssn.receiver("test-link-name") |
| for m in msgs: |
| snd.send(m) |
| self.drain(qrcv, expected=msgs) |
| |
| def testAssert1(self): |
| try: |
| snd = self.ssn.sender("amq.topic; {assert: always, node: {type: queue}}") |
| assert 0, "assertion failed to trigger" |
| except AssertionFailed, e: |
| pass |
| |
| def testAssert2(self): |
| snd = self.ssn.sender("amq.topic; {assert: always}") |
| |
| NOSUCH_Q = "this-queue-should-not-exist" |
| UNPARSEABLE_ADDR = "name/subject; {bad options" |
| UNLEXABLE_ADDR = "\0x0\0x1\0x2\0x3" |
| |
| class AddressErrorTests(Base): |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def senderErrorTest(self, addr, exc, check=lambda e: True): |
| try: |
| self.ssn.sender(addr, durable=self.durable()) |
| assert False, "sender creation succeeded" |
| except exc, e: |
| assert check(e), "unexpected error: %s" % compat.format_exc(e) |
| |
| def receiverErrorTest(self, addr, exc, check=lambda e: True): |
| try: |
| self.ssn.receiver(addr) |
| assert False, "receiver creation succeeded" |
| except exc, e: |
| assert check(e), "unexpected error: %s" % compat.format_exc(e) |
| |
| def testNoneTarget(self): |
| self.senderErrorTest(None, MalformedAddress) |
| |
| def testNoneSource(self): |
| self.receiverErrorTest(None, MalformedAddress) |
| |
| def testNoTarget(self): |
| self.senderErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) |
| |
| def testNoSource(self): |
| self.receiverErrorTest(NOSUCH_Q, NotFound, lambda e: NOSUCH_Q in str(e)) |
| |
| def testUnparseableTarget(self): |
| self.senderErrorTest(UNPARSEABLE_ADDR, MalformedAddress, |
| lambda e: "expecting COLON" in str(e)) |
| |
| def testUnparseableSource(self): |
| self.receiverErrorTest(UNPARSEABLE_ADDR, MalformedAddress, |
| lambda e: "expecting COLON" in str(e)) |
| |
| def testUnlexableTarget(self): |
| self.senderErrorTest(UNLEXABLE_ADDR, MalformedAddress, |
| lambda e: "unrecognized characters" in str(e)) |
| |
| def testUnlexableSource(self): |
| self.receiverErrorTest(UNLEXABLE_ADDR, MalformedAddress, |
| lambda e: "unrecognized characters" in str(e)) |
| |
| def testInvalidMode(self): |
| self.receiverErrorTest('name; {mode: "this-is-a-bad-receiver-mode"}', |
| InvalidOption, |
| lambda e: "not in ('browse', 'consume')" in str(e)) |
| |
| SENDER_Q = 'test-sender-q; {create: always, delete: always}' |
| |
| class SenderTests(Base): |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def setup_sender(self): |
| return self.ssn.sender(SENDER_Q) |
| |
| def setup_receiver(self): |
| return self.ssn.receiver(SENDER_Q) |
| |
| def checkContent(self, content): |
| self.snd.send(content) |
| msg = self.rcv.fetch(0) |
| assert msg.content == content |
| |
| out = Message(content) |
| self.snd.send(out) |
| echo = self.rcv.fetch(0) |
| assert out.content == echo.content |
| assert echo.content == msg.content |
| self.ssn.acknowledge() |
| |
| def testSendString(self): |
| self.checkContent(self.content("testSendString")) |
| |
| def testSendList(self): |
| self.checkContent(["testSendList", 1, 3.14, self.test_id]) |
| |
| def testSendMap(self): |
| self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14}) |
| |
| def asyncTest(self, capacity): |
| self.snd.capacity = capacity |
| msgs = [self.content("asyncTest", i) for i in range(15)] |
| for m in msgs: |
| self.snd.send(m, sync=False) |
| self.drain(self.rcv, timeout=self.delay(), expected=msgs) |
| self.ssn.acknowledge() |
| |
| def testSendAsyncCapacity0(self): |
| try: |
| self.asyncTest(0) |
| assert False, "send shouldn't succeed with zero capacity" |
| except InsufficientCapacity: |
| # this is expected |
| pass |
| |
| def testSendAsyncCapacity1(self): |
| self.asyncTest(1) |
| |
| def testSendAsyncCapacity5(self): |
| self.asyncTest(5) |
| |
| def testSendAsyncCapacityUNLIMITED(self): |
| self.asyncTest(UNLIMITED) |
| |
| def testCapacityTimeout(self): |
| self.snd.capacity = 1 |
| msgs = [] |
| caught = False |
| while len(msgs) < 100: |
| m = self.content("testCapacity", len(msgs)) |
| try: |
| self.snd.send(m, sync=False, timeout=0) |
| msgs.append(m) |
| except InsufficientCapacity: |
| caught = True |
| break |
| self.snd.sync() |
| self.drain(self.rcv, expected=msgs) |
| self.ssn.acknowledge() |
| assert caught, "did not exceed capacity" |
| |
| def testEINTR(self): |
| m1 = self.content("testEINTR", 0) |
| m2 = self.content("testEINTR", 1) |
| |
| self.snd.send(m1, timeout=self.timeout()) |
| try: |
| os.setuid(500) |
| assert False, "setuid should fail" |
| except: |
| pass |
| self.snd.send(m2, timeout=self.timeout()) |