blob: ca3198010ea13e07c0b0c79c9d78d9c64d9adfdc [file] [log] [blame]
import cPickle
from cPickle import PickleError
import posixpath
from twitter.common import log
from .state import MySQLCluster, Scheduler, StateProvider
from kazoo.exceptions import KazooException, NoNodeError
class ZooKeeperStateProvider(StateProvider):
"""
StateProvider implementation backed by ZooKeeper.
This class is thread-safe.
"""
def __init__(self, client, base_path):
"""
:param client: Kazoo client.
:param base_path: The base path for the scheduler state on ZooKeeper.
"""
self._client = client
self._base_path = base_path
def dump_scheduler_state(self, state):
if not isinstance(state, Scheduler):
raise TypeError("'state' should be an instance of Scheduler")
path = self._get_scheduler_state_path()
self._client.retry(self._client.ensure_path, posixpath.dirname(path))
content = cPickle.dumps(state)
try:
self._client.retry(self._create_or_set, path, content)
except KazooException as e:
raise self.Error('Failed to persist Scheduler: %s' % e)
def load_scheduler_state(self):
path = self._get_scheduler_state_path()
try:
content = self._client.get(path)[0]
state = cPickle.loads(content)
if not isinstance(state, Scheduler):
raise self.Error("Invalid state object. Expect Scheduler, got %s" % type(state))
return state
except NoNodeError:
log.info('No scheduler state found on path %s' % path)
return None
except (KazooException, PickleError, ValueError) as e:
raise self.Error('Failed to recover Scheduler: %s' % e)
def dump_cluster_state(self, state):
if not isinstance(state, MySQLCluster):
raise TypeError("'state' should be an instance of MySQLCluster")
path = self._get_cluster_state_path(state.name)
self._client.retry(self._client.ensure_path, posixpath.dirname(path))
content = cPickle.dumps(state)
self._client.retry(self._create_or_set, path, content)
def load_cluster_state(self, cluster_name):
path = self._get_cluster_state_path(cluster_name)
try:
content = self._client.get(path)[0]
state = cPickle.loads(content)
if not isinstance(state, MySQLCluster):
raise self.Error("Invalid state object. Expect MySQLCluster, got %s" % type(state))
return state
except NoNodeError:
log.info('No cluster state found on path %s' % path)
return None
except (KazooException, PickleError, ValueError) as e:
raise self.Error('Failed to recover MySQLCluster: %s' % e)
def remove_cluster_state(self, cluster_name):
path = self._get_cluster_state_path(cluster_name)
try:
self._client.retry(self._client.delete, path, recursive=True)
except KazooException as e:
raise self.Error("Failed to remove MySQLCluster: %s" % e)
# --- Helper methods. ---
def _get_scheduler_state_path(self):
return posixpath.join(self._base_path, posixpath.join(*self._get_scheduler_state_key()))
def _get_cluster_state_path(self, cluster_name):
return posixpath.join(
self._base_path, posixpath.join(*self._get_cluster_state_key(cluster_name)))
def _create_or_set(self, path, content):
"""Set the ZNode if the path exists, otherwise create it."""
if self._client.exists(path):
self._client.set(path, content)
else:
self._client.create(path, content)