blob: 0d92661aa9c127cfea6cf123f25a41a21dd92b49 [file] [log] [blame]
"""license: Apache License 2.0, see LICENSE for more details."""
"""Zookeeper Partitioner Implementation
:Maintainer: None
:Status: Unknown
:class:`SetPartitioner` implements a partitioning scheme using
Zookeeper for dividing up resources amongst members of a party.
This is useful when there is a set of resources that should only be
accessed by a single process at a time that multiple processes
across a cluster might want to divide up.
Example Use-Case
----------------
- Multiple workers across a cluster need to divide up a list of queues
so that no two workers own the same queue.
"""
import logging
import os
import socket
from functools import partial
from kazoo.exceptions import KazooException
from kazoo.protocol.states import KazooState
from kazoo.recipe.watchers import PatientChildrenWatch
log = logging.getLogger(__name__)
class PartitionState(object):
"""High level partition state values
.. attribute:: ALLOCATING
The set needs to be partitioned, and may require an existing
partition set to be released before acquiring a new partition
of the set.
.. attribute:: ACQUIRED
The set has been partitioned and acquired.
.. attribute:: RELEASE
The set needs to be repartitioned, and the current partitions
must be released before a new allocation can be made.
.. attribute:: FAILURE
The set partition has failed. This occurs when the maximum
time to partition the set is exceeded or the Zookeeper session
is lost. The partitioner is unusable after this state and must
be recreated.
"""
ALLOCATING = "ALLOCATING"
ACQUIRED = "ACQUIRED"
RELEASE = "RELEASE"
FAILURE = "FAILURE"
class SetPartitioner(object):
"""Partitions a set amongst members of a party
This class will partition a set amongst members of a party such
that each member will be given zero or more items of the set and
each set item will be given to a single member. When new members
enter or leave the party, the set will be re-partitioned amongst
the members.
When the :class:`SetPartitioner` enters the
:attr:`~PartitionState.FAILURE` state, it is unrecoverable
and a new :class:`SetPartitioner` should be created.
Example:
.. code-block:: python
from kazoo.client import KazooClient
client = KazooClient()
qp = client.SetPartitioner(
path='/work_queues', set=('queue-1', 'queue-2', 'queue-3'))
while 1:
if qp.failed:
raise Exception("Lost or unable to acquire partition")
elif qp.release:
qp.release_set()
elif qp.acquired:
for partition in qp:
# Do something with each partition
elif qp.allocating:
qp.wait_for_acquire()
**State Transitions**
When created, the :class:`SetPartitioner` enters the
:attr:`PartitionState.ALLOCATING` state.
:attr:`~PartitionState.ALLOCATING` ->
:attr:`~PartitionState.ACQUIRED`
Set was partitioned successfully, the partition list assigned
is accessible via list/iter methods or calling list() on the
:class:`SetPartitioner` instance.
:attr:`~PartitionState.ALLOCATING` ->
:attr:`~PartitionState.FAILURE`
Allocating the set failed either due to a Zookeeper session
expiration, or failure to acquire the items of the set within
the timeout period.
:attr:`~PartitionState.ACQUIRED` ->
:attr:`~PartitionState.RELEASE`
The members of the party have changed, and the set needs to be
repartitioned. :meth:`SetPartitioner.release` should be called
as soon as possible.
:attr:`~PartitionState.ACQUIRED` ->
:attr:`~PartitionState.FAILURE`
The current partition was lost due to a Zookeeper session
expiration.
:attr:`~PartitionState.RELEASE` ->
:attr:`~PartitionState.ALLOCATING`
The current partition was released and is being re-allocated.
"""
def __init__(self, client, path, set, partition_func=None,
identifier=None, time_boundary=30):
"""Create a :class:`~SetPartitioner` instance
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The partition path to use.
:param set: The set of items to partition.
:param partition_func: A function to use to decide how to
partition the set.
:param identifier: An identifier to use for this member of the
party when participating. Defaults to the
hostname + process id.
:param time_boundary: How long the party members must be stable
before allocation can complete.
"""
self.state = PartitionState.ALLOCATING
self._client = client
self._path = path
self._set = set
self._partition_set = []
self._partition_func = partition_func or self._partitioner
self._identifier = identifier or '%s-%s' % (
socket.getfqdn(), os.getpid())
self._locks = []
self._lock_path = '/'.join([path, 'locks'])
self._party_path = '/'.join([path, 'party'])
self._time_boundary = time_boundary
self._acquire_event = client.handler.event_object()
# Create basic path nodes
client.ensure_path(path)
client.ensure_path(self._lock_path)
client.ensure_path(self._party_path)
# Join the party
self._party = client.ShallowParty(self._party_path,
identifier=self._identifier)
self._party.join()
self._was_allocated = False
self._state_change = client.handler.rlock_object()
client.add_listener(self._establish_sessionwatch)
# Now watch the party and set the callback on the async result
# so we know when we're ready
self._children_updated = False
self._child_watching(self._allocate_transition, async=True)
def __iter__(self):
"""Return the partitions in this partition set"""
for partition in self._partition_set:
yield partition
@property
def failed(self):
"""Corresponds to the :attr:`PartitionState.FAILURE` state"""
return self.state == PartitionState.FAILURE
@property
def release(self):
"""Corresponds to the :attr:`PartitionState.RELEASE` state"""
return self.state == PartitionState.RELEASE
@property
def allocating(self):
"""Corresponds to the :attr:`PartitionState.ALLOCATING`
state"""
return self.state == PartitionState.ALLOCATING
@property
def acquired(self):
"""Corresponds to the :attr:`PartitionState.ACQUIRED` state"""
return self.state == PartitionState.ACQUIRED
def wait_for_acquire(self, timeout=30):
"""Wait for the set to be partitioned and acquired
:param timeout: How long to wait before returning.
:type timeout: int
"""
self._acquire_event.wait(timeout)
def release_set(self):
"""Call to release the set
This method begins the step of allocating once the set has
been released.
"""
self._release_locks()
if self._locks: # pragma: nocover
# This shouldn't happen, it means we couldn't release our
# locks, abort
self._fail_out()
return
else:
with self._state_change:
if self.failed:
return
self.state = PartitionState.ALLOCATING
self._child_watching(self._allocate_transition, async=True)
def finish(self):
"""Call to release the set and leave the party"""
self._release_locks()
self._fail_out()
def _fail_out(self):
with self._state_change:
self.state = PartitionState.FAILURE
if self._party.participating:
try:
self._party.leave()
except KazooException: # pragma: nocover
pass
def _allocate_transition(self, result):
"""Called when in allocating mode, and the children settled"""
# Did we get an exception waiting for children to settle?
if result.exception: # pragma: nocover
self._fail_out()
return
children, async_result = result.get()
self._children_updated = False
# Add a callback when children change on the async_result
def updated(result):
with self._state_change:
if self.acquired:
self.state = PartitionState.RELEASE
self._children_updated = True
async_result.rawlink(updated)
# Split up the set
self._partition_set = self._partition_func(
self._identifier, list(self._party), self._set)
# Proceed to acquire locks for the working set as needed
for member in self._partition_set:
if self._children_updated or self.failed:
# Still haven't settled down, release locks acquired
# so far and go back
return self._abort_lock_acquisition()
lock = self._client.Lock(self._lock_path + '/' +
str(member))
try:
lock.acquire()
except KazooException: # pragma: nocover
return self.finish()
self._locks.append(lock)
# All locks acquired! Time for state transition, make sure
# we didn't inadvertently get lost thus far
with self._state_change:
if self.failed: # pragma: nocover
return self.finish()
self.state = PartitionState.ACQUIRED
self._acquire_event.set()
def _release_locks(self):
"""Attempt to completely remove all the locks"""
self._acquire_event.clear()
for lock in self._locks[:]:
try:
lock.release()
except KazooException: # pragma: nocover
# We proceed to remove as many as possible, and leave
# the ones we couldn't remove
pass
else:
self._locks.remove(lock)
def _abort_lock_acquisition(self):
"""Called during lock acquisition if a party change occurs"""
self._partition_set = []
self._release_locks()
if self._locks:
# This shouldn't happen, it means we couldn't release our
# locks, abort
self._fail_out()
return
return self._child_watching(self._allocate_transition)
def _child_watching(self, func=None, async=False):
"""Called when children are being watched to stabilize
This actually returns immediately, child watcher spins up a
new thread/greenlet and waits for it to stabilize before
any callbacks might run.
"""
watcher = PatientChildrenWatch(self._client, self._party_path,
self._time_boundary)
asy = watcher.start()
if func is not None:
# We spin up the function in a separate thread/greenlet
# to ensure that the rawlink's it might use won't be
# blocked
if async:
func = partial(self._client.handler.spawn, func)
asy.rawlink(func)
return asy
def _establish_sessionwatch(self, state):
"""Register ourself to listen for session events, we shut down
if we become lost"""
with self._state_change:
# Handle network partition: If connection gets suspended,
# change state to ALLOCATING if we had already ACQUIRED. This way
# the caller does not process the members since we could eventually
# lose session get repartitioned. If we got connected after a suspension
# it means we've not lost the session and still have our members. Hence,
# restore to ACQUIRED
if state == KazooState.SUSPENDED:
if self.state == PartitionState.ACQUIRED:
self._was_allocated = True
self.state = PartitionState.ALLOCATING
elif state == KazooState.CONNECTED:
if self._was_allocated:
self._was_allocated = False
self.state = PartitionState.ACQUIRED
if state == KazooState.LOST:
self._client.handler.spawn(self._fail_out)
return True
def _partitioner(self, identifier, members, partitions):
# Ensure consistent order of partitions/members
all_partitions = sorted(partitions)
workers = sorted(members)
i = workers.index(identifier)
# Now return the partition list starting at our location and
# skipping the other workers
return all_partitions[i::len(workers)]