blob: a9610bf4ebc721aa411ec787eb04a67a858771a6 [file] [log] [blame]
"""license: Apache License 2.0, see LICENSE for more details."""
import uuid
import sys
import threading
from nose.tools import eq_
from kazoo.testing import KazooTestCase
from kazoo.tests.util import wait
class UniqueError(Exception):
"""Error raised only by test leader function
"""
class KazooElectionTests(KazooTestCase):
def setUp(self):
super(KazooElectionTests, self).setUp()
self.path = "/" + uuid.uuid4().hex
self.condition = threading.Condition()
# election contenders set these when elected. The exit event is set by
# the test to make the leader exit.
self.leader_id = None
self.exit_event = None
# tests set this before the event to make the leader raise an error
self.raise_exception = False
# set by a worker thread when an unexpected error is hit.
# better way to do this?
self.thread_exc_info = None
def _spawn_contender(self, contender_id, election):
thread = threading.Thread(target=self._election_thread,
args=(contender_id, election))
thread.daemon = True
thread.start()
return thread
def _election_thread(self, contender_id, election):
try:
election.run(self._leader_func, contender_id)
except UniqueError:
if not self.raise_exception:
self.thread_exc_info = sys.exc_info()
except Exception:
self.thread_exc_info = sys.exc_info()
else:
if self.raise_exception:
e = Exception("expected leader func to raise exception")
self.thread_exc_info = (Exception, e, None)
def _leader_func(self, name):
exit_event = threading.Event()
with self.condition:
self.exit_event = exit_event
self.leader_id = name
self.condition.notify_all()
exit_event.wait(45)
if self.raise_exception:
raise UniqueError("expected error in the leader function")
def _check_thread_error(self):
if self.thread_exc_info:
t, o, tb = self.thread_exc_info
raise t(o)
def test_election(self):
elections = {}
threads = {}
for _ in range(3):
contender = "c" + uuid.uuid4().hex
elections[contender] = self.client.Election(self.path, contender)
threads[contender] = self._spawn_contender(contender,
elections[contender])
# wait for a leader to be elected
times = 0
with self.condition:
while not self.leader_id:
self.condition.wait(5)
times += 1
if times > 5:
raise Exception("Still not a leader: lid: %s",
self.leader_id)
election = self.client.Election(self.path)
# make sure all contenders are in the pool
wait(lambda: len(election.contenders()) == len(elections))
contenders = election.contenders()
eq_(set(contenders), set(elections.keys()))
# first one in list should be leader
first_leader = contenders[0]
eq_(first_leader, self.leader_id)
# tell second one to cancel election. should never get elected.
elections[contenders[1]].cancel()
# make leader exit. third contender should be elected.
self.exit_event.set()
with self.condition:
while self.leader_id == first_leader:
self.condition.wait(45)
eq_(self.leader_id, contenders[2])
self._check_thread_error()
# make first contender re-enter the race
threads[first_leader].join()
threads[first_leader] = self._spawn_contender(first_leader,
elections[first_leader])
# contender set should now be the current leader plus the first leader
wait(lambda: len(election.contenders()) == 2)
contenders = election.contenders()
eq_(set(contenders), set([self.leader_id, first_leader]))
# make current leader raise an exception. first should be reelected
self.raise_exception = True
self.exit_event.set()
with self.condition:
while self.leader_id != first_leader:
self.condition.wait(45)
eq_(self.leader_id, first_leader)
self._check_thread_error()
self.exit_event.set()
for thread in threads.values():
thread.join()
self._check_thread_error()
def test_bad_func(self):
election = self.client.Election(self.path)
self.assertRaises(ValueError, election.run, "not a callable")