blob: 4c13ca9dd5b4b3ba9662452e058cb586e71cd6dc [file] [log] [blame]
"""license: Apache License 2.0, see LICENSE for more details."""
import uuid
from nose import SkipTest
from nose.tools import eq_, ok_
from kazoo.testing import KazooTestCase
from kazoo.tests.util import TRAVIS_ZK_VERSION
class KazooQueueTests(KazooTestCase):
def _makeOne(self):
path = "/" + uuid.uuid4().hex
return self.client.Queue(path)
def test_queue_validation(self):
queue = self._makeOne()
self.assertRaises(TypeError, queue.put, {})
self.assertRaises(TypeError, queue.put, b"one", b"100")
self.assertRaises(TypeError, queue.put, b"one", 10.0)
self.assertRaises(ValueError, queue.put, b"one", -100)
self.assertRaises(ValueError, queue.put, b"one", 100000)
def test_empty_queue(self):
queue = self._makeOne()
eq_(len(queue), 0)
self.assertTrue(queue.get() is None)
eq_(len(queue), 0)
def test_queue(self):
queue = self._makeOne()
queue.put(b"one")
queue.put(b"two")
queue.put(b"three")
eq_(len(queue), 3)
eq_(queue.get(), b"one")
eq_(queue.get(), b"two")
eq_(queue.get(), b"three")
eq_(len(queue), 0)
def test_priority(self):
queue = self._makeOne()
queue.put(b"four", priority=101)
queue.put(b"one", priority=0)
queue.put(b"two", priority=0)
queue.put(b"three", priority=10)
eq_(queue.get(), b"one")
eq_(queue.get(), b"two")
eq_(queue.get(), b"three")
eq_(queue.get(), b"four")
class KazooLockingQueueTests(KazooTestCase):
def setUp(self):
KazooTestCase.setUp(self)
skip = False
if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4):
skip = True
elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4):
skip = False
else:
ver = self.client.server_version()
if ver[1] < 4:
skip = True
if skip:
raise SkipTest("Must use Zookeeper 3.4 or above")
def _makeOne(self):
path = "/" + uuid.uuid4().hex
return self.client.LockingQueue(path)
def test_queue_validation(self):
queue = self._makeOne()
self.assertRaises(TypeError, queue.put, {})
self.assertRaises(TypeError, queue.put, b"one", b"100")
self.assertRaises(TypeError, queue.put, b"one", 10.0)
self.assertRaises(ValueError, queue.put, b"one", -100)
self.assertRaises(ValueError, queue.put, b"one", 100000)
self.assertRaises(TypeError, queue.put_all, {})
self.assertRaises(TypeError, queue.put_all, [{}])
self.assertRaises(TypeError, queue.put_all, [b"one"], b"100")
self.assertRaises(TypeError, queue.put_all, [b"one"], 10.0)
self.assertRaises(ValueError, queue.put_all, [b"one"], -100)
self.assertRaises(ValueError, queue.put_all, [b"one"], 100000)
def test_empty_queue(self):
queue = self._makeOne()
eq_(len(queue), 0)
self.assertTrue(queue.get(0) is None)
eq_(len(queue), 0)
def test_queue(self):
queue = self._makeOne()
queue.put(b"one")
queue.put_all([b"two", b"three"])
eq_(len(queue), 3)
ok_(not queue.consume())
ok_(not queue.holds_lock())
eq_(queue.get(1), b"one")
ok_(queue.holds_lock())
# Without consuming, should return the same element
eq_(queue.get(1), b"one")
ok_(queue.consume())
ok_(not queue.holds_lock())
eq_(queue.get(1), b"two")
ok_(queue.holds_lock())
ok_(queue.consume())
ok_(not queue.holds_lock())
eq_(queue.get(1), b"three")
ok_(queue.holds_lock())
ok_(queue.consume())
ok_(not queue.holds_lock())
ok_(not queue.consume())
eq_(len(queue), 0)
def test_consume(self):
queue = self._makeOne()
queue.put(b"one")
ok_(not queue.consume())
queue.get(.1)
ok_(queue.consume())
ok_(not queue.consume())
def test_holds_lock(self):
queue = self._makeOne()
ok_(not queue.holds_lock())
queue.put(b"one")
queue.get(.1)
ok_(queue.holds_lock())
queue.consume()
ok_(not queue.holds_lock())
def test_priority(self):
queue = self._makeOne()
queue.put(b"four", priority=101)
queue.put(b"one", priority=0)
queue.put(b"two", priority=0)
queue.put(b"three", priority=10)
eq_(queue.get(1), b"one")
ok_(queue.consume())
eq_(queue.get(1), b"two")
ok_(queue.consume())
eq_(queue.get(1), b"three")
ok_(queue.consume())
eq_(queue.get(1), b"four")
ok_(queue.consume())
def test_concurrent_execution(self):
queue = self._makeOne()
value1 = []
value2 = []
value3 = []
event1 = self.client.handler.event_object()
event2 = self.client.handler.event_object()
event3 = self.client.handler.event_object()
def get_concurrently(value, event):
q = self.client.LockingQueue(queue.path)
value.append(q.get(.1))
event.set()
self.client.handler.spawn(get_concurrently, value1, event1)
self.client.handler.spawn(get_concurrently, value2, event2)
self.client.handler.spawn(get_concurrently, value3, event3)
queue.put(b"one")
event1.wait(.2)
event2.wait(.2)
event3.wait(.2)
result = value1 + value2 + value3
eq_(result.count(b"one"), 1)
eq_(result.count(None), 2)