| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from qpid.tests.messaging.implementation import * |
| from qpid.tests.messaging import Base |
| from qpid.compat import set |
| import math |
| |
| class PriorityTests (Base): |
| """ |
| Test prioritised messaging |
| """ |
| |
| def setup_connection(self): |
| return Connection.establish(self.broker, **self.connection_options()) |
| |
| def setup_session(self): |
| return self.conn.session() |
| |
| def prioritised_delivery(self, priorities, levels=10, key="x-qpid-priorities"): |
| """ |
| Test that message on a queue are delivered in priority order. |
| """ |
| msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] |
| |
| snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s}}}}" % (key, levels), |
| durable=self.durable()) |
| for m in msgs: snd.send(m) |
| |
| rcv = self.ssn.receiver(snd.target) |
| for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True): |
| msg = rcv.fetch(0) |
| #print "expected priority %s got %s" % (expected.priority, msg.priority) |
| assert msg.content == expected.content |
| self.ssn.acknowledge(msg) |
| |
| def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10, level_key="x-qpid-priorities", fairshare_key="x-qpid-fairshare"): |
| msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] |
| |
| limit_policy = "'%s':%s" % (fairshare_key, default_limit) |
| if limits: |
| for k, v in limits.items(): |
| limit_policy += ", '%s-%s':%s" % (fairshare_key, k, v) |
| |
| snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s, %s}}}}" |
| % (level_key, levels, limit_policy), |
| durable=self.durable()) |
| for m in msgs: snd.send(m) |
| |
| rcv = self.ssn.receiver(snd.target) |
| if limits: |
| limit_function = lambda x : limits.get(x, 0) |
| else: |
| limit_function = lambda x : default_limit |
| for expected in fairshare(sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True), |
| limit_function, levels): |
| msg = rcv.fetch(0) |
| #print "expected priority %s got %s" % (expected.priority, msg.priority) |
| assert msg.priority == expected.priority |
| assert msg.content == expected.content |
| self.ssn.acknowledge(msg) |
| |
| def test_prioritised_delivery_1(self): |
| self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2], levels = 10) |
| |
| def test_prioritised_delivery_with_alias(self): |
| self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10, key="qpid.priorities") |
| |
| def test_prioritised_delivery_2(self): |
| self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2], levels = 5) |
| |
| def test_fairshare_1(self): |
| self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]) |
| |
| def test_fairshare_with_alias(self): |
| self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,2,3], level_key="qpid.priorities", fairshare_key="qpid.fairshare") |
| |
| def test_fairshare_2(self): |
| self.fairshare_delivery(priorities = [10 for i in range(30)]) |
| |
| def test_fairshare_3(self): |
| self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3], limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8) |
| |
| def test_browsing(self): |
| priorities = [4,5,3,6,0,1,2,8,2,0,2,1,6,0,1,3,3,3,8,1,3,0,3,7,9,0,1,9,0,2,3] |
| msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] |
| snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", |
| durable=self.durable()) |
| for m in msgs: snd.send(m) |
| |
| rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}") |
| received = [] |
| try: |
| while True: received.append(rcv.fetch(0)) |
| except Empty: None |
| #check all messages on the queue were received by the browser; don't relay on any specific ordering at present |
| assert set([m.content for m in msgs]) == set([m.content for m in received]) |
| |
| def ring_queue_check(self, msgs, count=10): |
| """ |
| Ensure that a ring queue removes lowest priority messages first. |
| """ |
| snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':%s" % count), |
| durable=self.durable()) |
| for m in msgs: snd.send(m) |
| |
| rcv = self.ssn.receiver(snd.target) |
| received = [] |
| try: |
| while True: received.append(rcv.fetch(0)) |
| except Empty: None |
| |
| expected = sorted_(msgs, key=lambda x: priority_level(x.priority,10))[len(msgs)-count:] |
| expected = sorted_(expected, key=lambda x: priority_level(x.priority,10), reverse=True) |
| #print "sent %s; expected %s; got %s" % ([m.priority for m in msgs], [m.priority for m in expected], [m.priority for m in received]) |
| #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received]) |
| assert [m.content for m in expected] == [m.content for m in received] |
| |
| def test_ring_queue_1(self): |
| priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3] |
| seq = content("msg") |
| self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) |
| |
| def test_ring_queue_2(self): |
| priorities = [9,0,2,3,6,3,4,2,9,2,9,9,1,9,4,7,1,1,3,9,7,3,9,3,9,1,5,1,9,7,2,3,0,9] |
| seq = content("msg") |
| self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) |
| |
| def test_ring_queue_3(self): |
| #test case given for QPID-3866 |
| priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] |
| seq = content("msg") |
| self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities], 5) |
| |
| def test_ring_queue_4(self): |
| priorities = [9,0,2,3,6,3,4,2,9,2,9,3,1,9,4,7,1,1,3,2,7,3,9,3,6,1,5,1,9,7,2,3,0,2] |
| seq = content("msg") |
| self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) |
| |
| def test_requeue(self): |
| priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3] |
| msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] |
| |
| snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", |
| durable=self.durable()) |
| #want to have some messages requeued so enable prefetch on a dummy receiver |
| other = self.conn.session() |
| dummy = other.receiver("priority-queue") |
| dummy.capacity = 10 |
| |
| for m in msgs: snd.send(m) |
| |
| #fetch some with dummy receiver on which prefetch is also enabled |
| for i in range(5): |
| msg = dummy.fetch(0) |
| #close session without acknowledgements to requeue messages |
| other.close() |
| |
| #now test delivery works as expected after that |
| rcv = self.ssn.receiver(snd.target) |
| for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,10), reverse=True): |
| msg = rcv.fetch(0) |
| #print "expected priority %s got %s" % (expected.priority, msg.priority) |
| #print "expected content %s got %s" % (expected.content, msg.content) |
| assert msg.content == expected.content |
| self.ssn.acknowledge(msg) |
| |
| def content(base, counter=1): |
| while True: |
| yield "%s-%s" % (base, counter) |
| counter += 1 |
| |
| def address(name, create_policy="sender", delete_policy="receiver", arguments=None): |
| if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments |
| else: node = "node: {}" |
| return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node) |
| |
| def fairshare(msgs, limit, levels): |
| """ |
| Generator to return prioritised messages in expected order for a given fairshare limit |
| """ |
| count = 0 |
| last_priority = None |
| postponed = [] |
| while msgs or postponed: |
| if not msgs: |
| msgs = postponed |
| count = 0 |
| last_priority = None |
| postponed = [] |
| msg = msgs.pop(0) |
| if last_priority and priority_level(msg.priority, levels) == last_priority: |
| count += 1 |
| else: |
| last_priority = priority_level(msg.priority, levels) |
| count = 1 |
| l = limit(last_priority) |
| if (l and count > l): |
| postponed.append(msg) |
| else: |
| yield msg |
| return |
| |
| def effective_priority(value, levels): |
| """ |
| Method to determine effective priority given a distinct number of |
| levels supported. Returns the lowest priority value that is of |
| equivalent priority to the value passed in. |
| """ |
| if value <= 5-math.ceil(levels/2.0): return 0 |
| if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0) |
| return value |
| |
| def priority_level(value, levels): |
| """ |
| Method to determine which of a distinct number of priority levels |
| a given value falls into. |
| """ |
| offset = 5-math.ceil(levels/2.0) |
| return min(max(value - offset, 0), levels-1) |
| |
| def sorted_(msgs, key=None, reverse=False): |
| """ |
| Workaround lack of sorted builtin function in python 2.3 and lack |
| of keyword arguments to list.sort() |
| """ |
| temp = [m for m in msgs] |
| temp.sort(key_to_cmp(key, reverse=reverse)) |
| return temp |
| |
| def key_to_cmp(key, reverse=False): |
| if key: |
| if reverse: return lambda a, b: cmp(key(b), key(a)) |
| else: return lambda a, b: cmp(key(a), key(b)) |
| else: |
| return None |