blob: d67de2e995072124d265236b769b27f5fd24eef7 [file] [log] [blame]
import Queue
import functools
import posixpath
import sys
import threading
from twitter.common import log
from twitter.common.zookeeper.serverset.endpoint import ServiceInstance
from twitter.mysos.common import zookeeper
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import EventType
from kazoo.recipe.watchers import ChildrenWatch, DataWatch
def get_cluster_path(zk_root, cluster_name):
:param zk_root: the root path for the mysos scheduler.
:param cluster_name: Name of the the cluster.
:return: The path for the cluster.
return posixpath.join(zk_root, cluster_name)
class Cluster(object):
A class that represents all members of the MySQL cluster.
A newly added cluster member becomes a read-only slave until it's promoted to a master.
Only the master can write.
The members of the cluster are maintained in two ZooKeeper groups: a slaves group and a master
group under the same 'directory'. Slaves have unique member IDs backed by ZooKeeper's sequential
ZNodes. When a slave is promoted to a master, it is moved (its ID preserved) from the slave
group to the master group.
There is at most one member in the master group.
SLAVES_GROUP = 'slaves'
MASTER_GROUP = 'master'
MEMBER_PREFIX = "member_" # Use the prefix so the path conforms to the ServerSet convention.
def __init__(self, cluster_path):
self.cluster_path = cluster_path
self.members = {} # {ID : (serialized) ServiceInstance} mappings for members of the cluster.
self.master = None # The master's member ID.
self.slaves_group = posixpath.join(cluster_path, self.SLAVES_GROUP)
self.master_group = posixpath.join(cluster_path, self.MASTER_GROUP)
# TODO(jyx): Handle errors e.g. sessions expirations and recoverable failures.
class ClusterManager(object):
Kazoo wrapper used by the scheduler to inform executors about cluster change.
NOTE: ClusterManager is thread safe, i.e., it can be accessed from multiple threads at once.
class Error(Exception): pass
def __init__(self, client, cluster_path):
:param client: Kazoo client.
:param cluster_path: The path for this cluster on ZooKeeper.
self._client = client
self._cluster = Cluster(cluster_path)
self._lock = threading.Lock()
def _read_child_content(self, group, member_id):
return self._client.get(posixpath.join(group, member_id))[0]
except NoNodeError:
return None
def _populate(self):
# Populate slaves.
for child in self._client.get_children(self._cluster.slaves_group):
child_content = self._read_child_content(self._cluster.slaves_group, child)
if child_content:
self._cluster.members[child] = child_content
# Populate the master.
master_group = self._client.get_children(self._cluster.master_group)
assert len(master_group) <= 1
if len(master_group) == 1:
child = master_group[0]
child_content = self._read_child_content(self._cluster.master_group, child)
if child_content:
self._cluster.members[child] = child_content
self._cluster.master = child
def add_member(self, service_instance):
Add the member to the ZooKeeper group.
- New members are slaves until being promoted.
- A new member is not added if the specified service_instance already exists in the group.
:return: The member ID for the ServiceInstance generated by ZooKeeper.
if not isinstance(service_instance, ServiceInstance):
raise TypeError("'service_instance' should be a ServiceInstance")
content = ServiceInstance.pack(service_instance)
for k, v in self._cluster.members.items():
if content == v:"%s not added because it already exists in the group" % service_instance)
return k
znode_path = self._client.create(
posixpath.join(self._cluster.slaves_group, self._cluster.MEMBER_PREFIX),
_, member_id = posixpath.split(znode_path)
with self._lock:
self._cluster.members[member_id] = content
return member_id
def remove_member(self, member_id):
Remove the member if it is in the group.
:return: True if the member is deleted. False if the member cannot be found.
with self._lock:
if member_id not in self._cluster.members:"Member %s is not in the ZK group" % member_id)
return False
self._cluster.members.pop(member_id, None)
if member_id == self._cluster.master:
self._cluster.master = None
self._client.delete(posixpath.join(self._cluster.master_group, member_id))
self._client.delete(posixpath.join(self._cluster.slaves_group, member_id))
return True
def promote_member(self, member_id):
Promote the member with the given ID to be the master of the cluster if it's not already the
:return: True if the member is promoted. False if the member is already the master.
with self._lock:
if member_id not in self._cluster.members:
raise ValueError("Invalid member_id: %s" % member_id)
# Do nothing if the member is already the master.
if self._cluster.master and self._cluster.master == member_id:"Not promoting %s because is already the master" % member_id)
return False
tx = self._client.transaction()
if self._cluster.master:
tx.delete(posixpath.join(self._cluster.master_group, self._cluster.master))
# "Move" the ZNode, i.e., create a ZNode of the same ID in the master group.
tx.delete(posixpath.join(self._cluster.slaves_group, member_id))
posixpath.join(self._cluster.master_group, member_id),
self._cluster.master = member_id
return True
def delete_cluster(self):
with self._lock:
if self._cluster.members:
raise self.Error("Cannot remove a cluster that is not empty")
# Need to delete master/slave sub-dirs.
self._client.delete(self._cluster.cluster_path, recursive=True)
# TODO(wickman): Implement kazoo connection acquiescence.
class ClusterListener(object):
"""Kazoo wrapper used by the executor to listen to cluster change."""
def __init__(self,
:param client: Kazoo client.
:param cluster_path: The path for this cluster on ZooKeeper.
:param self_instance: The local ServiceInstance associated with this listener.
:param promotion_callback: Invoked when 'self_instance' is promoted.
:param demotion_callback: Invoked when 'self_instance' is demoted.
:param master_callback: Invoked when there is a master change otherwise.
:param termination_callback: Invoked when the cluster is terminated.
NOTE: Callbacks are executed synchronously in Kazoo's completion thread to ensure the delivery
order of events. Blocking the callback method means no future callbacks will be invoked.
self._client = client
self._cluster = Cluster(cluster_path)
self._self_content = ServiceInstance.pack(self_instance) if self_instance else None
self._master = None
self._master_content = None
self._promotion_callback = promotion_callback or (lambda: True)
self._demotion_callback = demotion_callback or (lambda: True)
self._master_callback = master_callback or (lambda x: True)
self._termination_callback = termination_callback or (lambda: True)
self._children_watch = None # Set when the watcher detects that the master group exists.
def start(self):
Start the listener to watch the master group.
NOTE: The listener only starts watching master after the base ZNode for the group is created.
DataWatch(self._client, self._cluster.cluster_path, func=self._cluster_path_callback)
DataWatch(self._client, self._cluster.master_group, func=self._master_group_callback)
def _swap(self, master, master_content):
i_was_master = self._self_content and self._master_content == self._self_content
self._master, self._master_content = master, master_content
i_am_master = self._self_content and self._master_content == self._self_content
# Invoke callbacks accordingly.
# NOTE: No callbacks are invoked if there is currently no master and 'self_instance' wasn't the
# master.
if i_was_master and not i_am_master:
elif not i_was_master and i_am_master:
elif not i_was_master and not i_am_master and master:
assert master_content
def _data_callback(self, master_id, master_completion):
master_content, _ = master_completion.get()
except NoNodeError:
# ZNode could be gone after we detected it but before we read it.
master_id, master_content = None, None
self._swap(master_id, master_content)
def _child_callback(self, masters):
assert len(masters) <= 1, "There should be at most one master"
if len(masters) == 1 and self._master != masters[0]:
self._client.get_async(posixpath.join(self._cluster.master_group, masters[0])).rawlink(
functools.partial(self._data_callback, masters[0]))
elif len(masters) == 0:
self._swap(None, None)
def _cluster_path_callback(self, data, stat, event):
if event and event.type == EventType.DELETED:
def _master_group_callback(self, data, stat, event):
if stat and not self._children_watch:"Master group %s exists. Starting to watch for election result" %
self._children_watch = ChildrenWatch(
self._client, self._cluster.master_group, func=self._child_callback)
def resolve_master(
cluster_url, master_callback=lambda: True, termination_callback=lambda: True, zk_client=None):
Resolve the MySQL cluster master's endpoint from the given URL for this cluster.
:param cluster_url: The ZooKeeper URL for this cluster.
:param master_callback: A callback method with one argument: the ServiceInstance for the elected
:param termination_callback: A callback method with no argument. Invoked when the cluster
:param zk_client: Use a custom ZK client instead of Kazoo if specified.
_, zk_servers, cluster_path = zookeeper.parse(cluster_url)
except Exception as e:
raise ValueError("Invalid cluster_url: %s" % e.message)
if not zk_client:
zk_client = KazooClient(zk_servers)
listener = ClusterListener(
def wait_for_master(cluster_url, zk_client=None):
Convenience function to wait for the master to be elected and return the master.
:param cluster_url: The ZooKeeper URL for this cluster.
:param zk_client: Use a custom ZK client instead of Kazoo if specified.
:return: The ServiceInstance for the elected master.
master = Queue.Queue()
master_callback=lambda x: master.put(x),
termination_callback=lambda: True,
# Block forever but using sys.maxint makes the wait interruptable by Ctrl-C. See
return master.get(True, sys.maxint)
def wait_for_termination(cluster_url, zk_client=None):
Convenience function to wait for the cluster to terminate. The corresponding ZNode is removed
when the cluster terminates.
:param cluster_url: The ZooKeeper URL for this cluster.
:param zk_client: Use a custom ZK client instead of Kazoo if specified.
terminated = threading.Event()
master_callback=lambda x: True,
termination_callback=lambda: terminated.set(),
# Block forever but using sys.maxint makes the wait interruptable by Ctrl-C. See