blob: 5e8efd656f6a9def2c909427d0372f3155ea42f6 [file] [log] [blame]
from abc import abstractmethod
import cPickle
from cPickle import PickleError
import os
from twitter.common import log
from twitter.common.collections.orderedset import OrderedSet
from twitter.common.dirutil import safe_mkdir
from mesos.interface import mesos_pb2
class StateProvider(object):
"""
StateProvider is responsible for checkpointing and restoring the state of the Mysos scheduler.
It maintains the following key hierarchy:
/state/scheduler # Scheduler-level state.
/state/clusters/ # Folder for all cluster-level states.
cluster1 # State for 'cluster1'.
cluster2 # State for 'cluster2'.
...
"""
class Error(Exception): pass
@abstractmethod
def dump_scheduler_state(self, state):
"""Persist scheduler-level state."""
pass
@abstractmethod
def load_scheduler_state(self):
"""
Restore scheduler-level state.
:return: The Scheduler object. None if no state is available.
"""
pass
@abstractmethod
def dump_cluster_state(self, state):
"""Persist cluster-level state."""
pass
@abstractmethod
def load_cluster_state(self, cluster_name):
"""
Restore cluster-level state.
:return: The MySQLCluster object. None if no state is available.
"""
pass
@abstractmethod
def remove_cluster_state(self, cluster_name):
"""Remove cluster-level state."""
pass
# --- Helper methods. ---
@classmethod
def _get_scheduler_state_key(cls):
return ['state', 'scheduler']
@classmethod
def _get_cluster_state_key(cls, cluster_name):
return ['state', 'clusters', cluster_name]
class Scheduler(object):
"""
Scheduler-level state.
NOTE: It references cluster-level states indirectly through cluster names.
"""
def __init__(self, framework_info):
self.framework_info = framework_info
self.clusters = OrderedSet() # Names of clusters this scheduler manages. cluster creation
# order is preserved with the OrderedSet.
class MySQLCluster(object):
"""
The state of a MySQL cluster.
It includes tasks (MySQLTask) for members of the cluster.
"""
def __init__(self, name, user, password, num_nodes, backup_id=None):
if not isinstance(num_nodes, int):
raise TypeError("'num_nodes' should be an int")
self.name = name
self.user = user
self.password = password
self.num_nodes = num_nodes
self.backup_id = backup_id
self.members = {} # {TaskID : MemberID} mappings. MemberIDs are assigned by ZooKeeper. A task
# must be running and published to ZK before it becomes a member.
self.master_id = None # MemberID of the MySQL master.
self.tasks = {} # {TaskID : MySQLTask} mappings
self.next_epoch = 0 # Monotonically increasing number after each master change.
self.next_id = 0 # Monotonically increasing number for unique task IDs.
@property
def active_tasks(self):
"""Tasks that have been launched and have not terminated."""
return [t for t in self.tasks.values() if t.state in (
mesos_pb2.TASK_STAGING, mesos_pb2.TASK_STARTING, mesos_pb2.TASK_RUNNING)]
@property
def running_tasks(self):
return [t for t in self.tasks.values() if t.state == mesos_pb2.TASK_RUNNING]
class MySQLTask(object):
"""The state of a MySQL task."""
def __init__(self, cluster_name, task_id, mesos_slave_id, hostname, port):
if not isinstance(port, int):
raise TypeError("'port' should be an int")
self.cluster_name = cluster_name # So we can refer back to the cluster it belongs to.
self.task_id = task_id
self.mesos_slave_id = mesos_slave_id
self.hostname = hostname
self.port = port
self.state = mesos_pb2.TASK_STAGING # Initial state. Will be updated by statusUpdate().
class LocalStateProvider(StateProvider):
"""StateProvider implementation that uses local disk to store the state."""
def __init__(self, work_dir):
"""
:param work_dir: The root directory under which the scheduler state is stored. e.g. The path
for 'cluster1' is <work_dir>/state/clusters/cluster1.
"""
self._work_dir = work_dir
def dump_scheduler_state(self, state):
if not isinstance(state, Scheduler):
raise TypeError("'state' should be an instance of Scheduler")
path = self._get_scheduler_state_path()
safe_mkdir(os.path.dirname(path))
try:
with open(path, 'wb') as f:
cPickle.dump(state, f)
except PickleError as e:
raise self.Error('Failed to persist Scheduler: %s' % e)
def load_scheduler_state(self):
path = self._get_scheduler_state_path()
if not os.path.isfile(path):
log.info("No scheduler state found on path %s" % path)
return None
try:
with open(path, 'rb') as f:
return cPickle.load(f)
except PickleError as e:
raise self.Error('Failed to recover Scheduler: %s' % e)
def dump_cluster_state(self, state):
if not isinstance(state, MySQLCluster):
raise TypeError("'state' should be an instance of MySQLCluster")
path = self._get_cluster_state_path(state.name)
safe_mkdir(os.path.dirname(path))
try:
with open(path, 'wb') as f:
return cPickle.dump(state, f)
except PickleError as e:
raise self.Error('Failed to persist state for cluster %s: %s' % (state.name, e))
def load_cluster_state(self, cluster_name):
path = self._get_cluster_state_path(cluster_name)
if not os.path.isfile(path):
log.info("No cluster state found on path %s" % path)
return None
try:
with open(path, 'rb') as f:
return cPickle.load(f)
except PickleError as e:
raise self.Error('Failed to recover MySQLCluster: %s' % e)
@abstractmethod
def remove_cluster_state(self, cluster_name):
path = self._get_cluster_state_path(cluster_name)
if not os.path.isfile(path):
log.info("No cluster state found on path %s" % path)
return
os.remove(path)
# --- Helper methods. ---
def _get_scheduler_state_path(self):
return os.path.join(self._work_dir, os.path.join(*self._get_scheduler_state_key()))
def _get_cluster_state_path(self, cluster_name):
return os.path.join(self._work_dir, os.path.join(*self._get_cluster_state_key(cluster_name)))