| from collections import namedtuple |
| import json |
| import random |
| import threading |
| |
| from mysos.common.cluster import ClusterManager, get_cluster_path |
| from mysos.common import zookeeper |
| |
| from .elector import MySQLMasterElector |
| from .state import MySQLCluster, MySQLTask, StateProvider |
| from .password import PasswordBox |
| |
| import mesos.interface.mesos_pb2 as mesos_pb2 |
| from twitter.common import log |
| from twitter.common.quantity import Amount, Data, Time |
| from twitter.common.zookeeper.serverset.endpoint import Endpoint, ServiceInstance |
| |
| |
| EXECUTOR_NAME = 'mysos.executor' |
| |
| |
| class MySQLClusterLauncher(object): |
| """ |
| Responsible for launching and maintaining a MySQL cluster. |
| |
| Thread-safety: |
| The launcher is thread-safe. It uses a separate thread to wait for the election result and |
| can launch a new election within that thread. All other public methods are called from the |
| scheduler driver thread and the web UI threads. |
| """ |
| |
| class Error(Exception): pass |
| class IncompatibleRoleError(Error): pass |
| class PermissionError(Error): pass |
| |
| def __init__( |
| self, |
| driver, |
| cluster, |
| state_provider, |
| zk_url, |
| kazoo, |
| framework_user, |
| executor_uri, |
| executor_cmd, |
| election_timeout, |
| admin_keypath, |
| scheduler_key, |
| installer_args=None, |
| backup_store_args=None, |
| executor_environ=None, |
| executor_source_prefix=None, |
| framework_role='*', |
| query_interval=Amount(1, Time.SECONDS)): |
| """ |
| :param driver: Mesos scheduler driver. |
| :param cluster: The MySQLCluster state object. |
| :param state_provider: For restoring and persisting the cluster state. |
| :param zk_url: The ZooKeeper URL for cluster member discovery and master election. |
| :param kazoo: The Kazoo client to access ZooKeeper with. |
| :param executor_uri: See flags. |
| :param executor_cmd: See flags. |
| :param election_timeout: See flags. |
| :param admin_keypath: See flags. |
| :param scheduler_key: Used for encrypting cluster passwords. |
| :param installer_args: See flags. |
| :param backup_store_args: See flags. |
| :param executor_environ: See flags. |
| :param executor_source_prefix: See flags. |
| :param framework_role: See flags. |
| :param query_interval: See MySQLMasterElector. Use the default value for production and allow |
| tests to use a different value. |
| """ |
| self._driver = driver |
| |
| if not isinstance(cluster, MySQLCluster): |
| raise TypeError("'cluster' should be an instance of MySQLCluster") |
| self._cluster = cluster |
| |
| if not isinstance(state_provider, StateProvider): |
| raise TypeError("'state_provider' should be an instance of StateProvider") |
| self._state_provider = state_provider |
| |
| self._framework_role = framework_role |
| |
| # Passed along to executors. |
| self._zk_url = zk_url |
| self._framework_user = framework_user |
| self._executor_uri = executor_uri |
| self._executor_cmd = executor_cmd |
| self._election_timeout = election_timeout |
| self._admin_keypath = admin_keypath |
| self._installer_args = installer_args |
| self._backup_store_args = backup_store_args |
| self._executor_environ = executor_environ |
| self._executor_source_prefix = executor_source_prefix |
| |
| # Used by the elector. |
| self._query_interval = query_interval |
| |
| zk_root = zookeeper.parse(zk_url)[2] |
| self._cluster_manager = ClusterManager(kazoo, get_cluster_path(zk_root, cluster.name)) |
| |
| self._password_box = PasswordBox(scheduler_key) |
| self._password_box.decrypt(cluster.encrypted_password) # Validate the password. |
| |
| self._lock = threading.Lock() |
| |
| if self._cluster.master_id: |
| log.info("Republish master %s for cluster %s in case it's not published" % ( |
| self._cluster.master_id, self.cluster_name)) |
| self._cluster_manager.promote_member(self._cluster.master_id) |
| |
| if len(self._cluster.tasks) > 0: |
| log.info("Recovered %s tasks for cluster '%s'" % ( |
| len(self._cluster.tasks), self.cluster_name)) |
| |
| # A recovered launcher should continue the election if the previous one was incomplete when the |
| # scheduler failed over. Mesos will deliver all missed events that affect the election to the |
| # scheduler. |
| if len(self._cluster.running_tasks) > 0 and not self._cluster.master_id: |
| log.info("Restarting election for the recovered launcher") |
| self._elector = self._new_elector() |
| # Add current slaves. |
| for t in self._cluster.running_tasks: |
| self._elector.add_slave(t.task_id, t.mesos_slave_id) |
| self._elector.start() |
| else: |
| # New launcher, the elector is set when the election starts and reset to None when it ends. |
| self._elector = None |
| |
| self._terminating = False |
| |
| @property |
| def cluster_name(self): |
| return self._cluster.name |
| |
| @property |
| def cluster_info(self): |
| with self._lock: |
| ClusterInfo = namedtuple('ClusterInfo', [ |
| 'name', 'user', 'num_nodes', 'total_cpus', 'total_mem_mb', 'total_disk_mb']) |
| return ClusterInfo( |
| name=self._cluster.name, |
| user=self._cluster.user, |
| num_nodes=self._cluster.num_nodes, |
| total_cpus=self._cluster.cpus * self._cluster.num_nodes, |
| total_mem_mb=self._cluster.mem.as_(Data.MB) * self._cluster.num_nodes, |
| total_disk_mb=self._cluster.disk.as_(Data.MB) * self._cluster.num_nodes) |
| |
| def launch(self, offer): |
| """ |
| Try to launch a MySQL task with the given offer. |
| |
| :returns: |
| Task ID: Either the task ID of the task just launched or None if this offer is not used. |
| Remaining resources: Resources from this offer that are unused by the task. If no task is |
| launched, all the resources from the offer are returned. |
| |
| :raises IncompatibleRoleError: Raised when the offer has some resource with incompatible role. |
| """ |
| with self._lock: |
| if len(self._cluster.active_tasks) == self._cluster.num_nodes: |
| # All nodes of this cluster have been launched and none have died. |
| return None, offer.resources |
| |
| if self._terminating: |
| return None, offer.resources |
| |
| cpus, mem, disk, ports = self._get_resources(offer.resources) |
| |
| task_cpus = self._cluster.cpus |
| task_mem = self._cluster.mem |
| task_disk = self._cluster.disk |
| |
| if cpus < task_cpus or mem < task_mem or disk < task_disk or len(ports) == 0: |
| # Offer doesn't fit. |
| return None, offer.resources |
| |
| log.info("Launcher %s accepted offer %s on Mesos slave %s (%s)" % ( |
| self.cluster_name, offer.id.value, offer.slave_id.value, offer.hostname)) |
| |
| task_port = random.choice(list(ports)) # Randomly pick a port in the offer. |
| |
| task_info = self._new_task(offer, task_cpus, task_mem, task_disk, task_port) |
| self._cluster.tasks[task_info.task_id.value] = MySQLTask( |
| self._cluster.name, |
| task_info.task_id.value, |
| task_info.slave_id.value, |
| offer.hostname, |
| task_port) |
| self._cluster.next_id += 1 |
| |
| # Checkpoint task data. The task can fail to launch. The problem is solved by the TODO below. |
| self._state_provider.dump_cluster_state(self._cluster) |
| |
| log.info('Launching task %s on Mesos slave %s (%s)' % ( |
| task_info.task_id.value, offer.slave_id.value, offer.hostname)) |
| |
| # Mysos launches at most a single task for each offer. Note that the SchedulerDriver API |
| # expects a list of tasks. |
| # TODO(jyx): Reconcile after failover because the scheduler can crash before successfully |
| # launching the task. Also run implicit reconciliation periodically. |
| self._driver.launchTasks(offer.id, [task_info]) |
| |
| # Update the offer's resources and return them for other clusters to use. |
| remaining = create_resources( |
| cpus - task_cpus, |
| mem - task_mem, |
| disk - task_disk, |
| ports - set([task_port]), |
| role=self._framework_role) |
| return task_info.task_id.value, remaining |
| |
| def kill(self, password): |
| """ |
| Kill the cluster. |
| |
| NOTE: Cluster killing is asynchronous. Use 'terminated' property to check if all tasks in the |
| cluster are killed. |
| """ |
| with self._lock: |
| if not self._password_box.match(password, self._cluster.encrypted_password): |
| raise self.PermissionError("No permission to kill cluster %s" % self.cluster_name) |
| |
| self._terminating = True |
| |
| # TODO(jyx): Task killing is unreliable. Reconciliation should retry killing. |
| for task_id in self._cluster.tasks: |
| log.info("Killing task %s of cluster %s" % (task_id, self.cluster_name)) |
| self._driver.killTask(mesos_pb2.TaskID(value=task_id)) |
| |
| @property |
| def terminated(self): |
| """True if all tasks in the cluster are killed.""" |
| return self._terminating and len(self._cluster.active_tasks) == 0 |
| |
| def _get_resources(self, resources): |
| """Return a tuple of the resources: cpus, mem, disk, set of ports.""" |
| cpus, mem, disk, ports = 0.0, Amount(0, Data.MB), Amount(0, Data.MB), set() |
| for resource in resources: |
| # We do the following check: |
| # 1. We only care about the role of the resources we are going to use. |
| # 2. For this resource if it is not of the role we want we throw an exception. This implies |
| # that when a slave offers resources that include both the '*' role and the Mysos framework |
| # role we'll decline the entire offer. We expect Mesos slave hosts that run Mysos executors |
| # to dedicate *all* its resources to it as we are not currently optimizing for the use |
| # cases where Mysos tasks run side-by-side with tasks from other frameworks. This also |
| # simplifies the launcher's role filtering logic. |
| # TODO(jyx): Revisit this when the above assumption changes. |
| if (resource.name in ('cpus', 'mem', 'disk', 'ports') and |
| resource.role != self._framework_role): |
| raise self.IncompatibleRoleError("Offered resource %s has role %s, expecting %s" % ( |
| resource.name, resource.role, self._framework_role)) |
| |
| if resource.name == 'cpus': |
| cpus = resource.scalar.value |
| elif resource.name == 'mem': |
| # 'Amount' requires an integer while 'value' is double. We convert it bytes to minimize |
| # precision loss. |
| mem = Amount(int(resource.scalar.value * 1024 * 1024), Data.BYTES) |
| elif resource.name == 'disk': |
| disk = Amount(int(resource.scalar.value * 1024 * 1024), Data.BYTES) |
| elif resource.name == 'ports' and resource.ranges.range: |
| for r in resource.ranges.range: |
| ports |= set(range(r.begin, r.end + 1)) |
| |
| return cpus, mem, disk, ports |
| |
| def _new_task(self, offer, task_cpus, task_mem, task_disk, task_port): |
| """Return a new task with the requested resources.""" |
| server_id = self._cluster.next_id |
| task_id = "mysos-" + self.cluster_name + "-" + str(server_id) |
| |
| task = mesos_pb2.TaskInfo() |
| task.task_id.value = task_id |
| task.slave_id.value = offer.slave_id.value |
| task.name = task_id |
| |
| task.executor.executor_id.value = task_id # Use task_id as executor_id. |
| task.executor.name = EXECUTOR_NAME |
| |
| source = [self._cluster.name, str(server_id)] |
| if self._executor_source_prefix and self._executor_source_prefix.strip('.'): |
| source = [self._executor_source_prefix.strip('.')] + source |
| |
| task.executor.source = '.'.join(source) |
| task.executor.command.value = self._executor_cmd |
| |
| if self._executor_environ: # Could be 'None' since it's an optional argument. |
| executor_environ_ = json.loads(self._executor_environ) |
| if executor_environ_: |
| for var_ in executor_environ_: |
| log.info("Executor will use environment variable: %s" % var_) |
| var = task.executor.command.environment.variables.add() |
| var.name = var_['name'] |
| var.value = var_['value'] |
| |
| uri = task.executor.command.uris.add() |
| uri.value = self._executor_uri |
| uri.executable = True |
| uri.extract = False # Don't need to decompress pex. |
| |
| task.data = json.dumps({ |
| 'framework_user': self._framework_user, |
| 'host': offer.hostname, |
| 'port': task_port, |
| 'cluster': self._cluster.name, |
| 'cluster_user': self._cluster.user, |
| 'cluster_password': self._password_box.decrypt(self._cluster.encrypted_password), |
| 'server_id': server_id, # Use the integer Task ID as the server ID. |
| 'zk_url': self._zk_url, |
| 'admin_keypath': self._admin_keypath, |
| 'installer_args': self._installer_args, |
| 'backup_store_args': self._backup_store_args, |
| 'backup_id': self._cluster.backup_id, |
| }) |
| |
| resources = create_resources( |
| task_cpus, task_mem, task_disk, set([task_port]), role=self._framework_role) |
| task.resources.extend(resources) |
| |
| return task |
| |
| def _new_elector(self): |
| """Create a new instance of MySQLMasterElector.""" |
| elector = MySQLMasterElector( |
| self._driver, |
| self.cluster_name, |
| self._cluster.next_epoch, |
| self._master_elected, |
| self._election_timeout, |
| query_interval=self._query_interval) |
| |
| log.info("Created elector for epoch %s for cluster %s" % ( |
| self._cluster.next_epoch, self.cluster_name)) |
| self._cluster.next_epoch += 1 |
| |
| # Save the epoch so the new elector will use a new epoch after scheduler failover. |
| self._state_provider.dump_cluster_state(self._cluster) |
| |
| return elector |
| |
| def status_update(self, status): |
| """ |
| Handle the status update for a task of this cluster. |
| |
| NOTE: |
| Duplicate status updates may be handled by either the same scheduler instance or a new |
| instance with the restored state. |
| """ |
| with self._lock: |
| task_id = status.task_id.value |
| |
| if task_id not in self._cluster.tasks: |
| log.warn("Ignoring status update for unknown task %s" % task_id) |
| return |
| |
| task = self._cluster.tasks[task_id] |
| previous_state = task.state |
| |
| # We don't want to ignore a duplicate update if the previous one was not successfully handled. |
| # Therefore, we should not checkpoint the status change until we have finished all operations. |
| if previous_state == status.state: |
| log.info('Ignoring duplicate status update %s for task %s' % ( |
| mesos_pb2.TaskState.Name(status.state), |
| task_id)) |
| return |
| |
| if is_terminal(previous_state): |
| log.info('Ignoring status update %s for task %s as it is in terminal state %s' % ( |
| mesos_pb2.TaskState.Name(status.state), |
| task_id, |
| mesos_pb2.TaskState.Name(previous_state))) |
| return |
| |
| log.info('Updating state of task %s of cluster %s from %s to %s' % ( |
| status.task_id.value, |
| self.cluster_name, |
| mesos_pb2.TaskState.Name(previous_state), |
| mesos_pb2.TaskState.Name(status.state))) |
| task.state = status.state |
| |
| if status.state == mesos_pb2.TASK_RUNNING: |
| # Register this cluster member. |
| endpoint = Endpoint( |
| self._cluster.tasks[task_id].hostname, self._cluster.tasks[task_id].port) |
| |
| # If the scheduler fails over after ZK is updated but before the state change is |
| # checkpointed, it will receive the same status update again and try to publish a duplicate |
| # member to ZK. ClusterManager.add_member() is idempotent and doesn't update ZK in this |
| # case. |
| member_id = self._cluster_manager.add_member(ServiceInstance(endpoint)) |
| log.info('Added %s (member id=%s) to cluster %s' % (endpoint, member_id, self.cluster_name)) |
| self._cluster.members[task_id] = member_id |
| |
| # Checkpoint the status update here. It's OK if the elector fails to launch later because |
| # the new scheduler instance will retry based on the fact that there are running instances |
| # of the cluster but no master. |
| self._state_provider.dump_cluster_state(self._cluster) |
| |
| # If MySQL master is already elected for this cluster don't bother adding it to the elector. |
| if self._cluster.master_id: |
| log.info( |
| "MySQL slave task %s on %s started after a master is already elected for cluster %s" % |
| (task_id, endpoint.host, self.cluster_name)) |
| return |
| |
| if not self._elector: |
| self._elector = self._new_elector() |
| # Add current slaves. |
| for t in self._cluster.running_tasks: |
| self._elector.add_slave(t.task_id, t.mesos_slave_id) |
| self._elector.start() |
| else: |
| self._elector.add_slave(task_id, status.slave_id.value) |
| elif status.state == mesos_pb2.TASK_FINISHED: |
| raise self.Error("Task %s is in unexpected state %s with message '%s'" % ( |
| status.task_id.value, |
| mesos_pb2.TaskState.Name(status.state), |
| status.message)) |
| elif is_terminal(status.state): |
| if status.state == mesos_pb2.TASK_KILLED: |
| log.info("Task %s was successfully killed" % status.task_id.value) |
| else: |
| log.error("Task %s is now in terminal state %s with message '%s'" % ( |
| status.task_id.value, |
| mesos_pb2.TaskState.Name(status.state), |
| status.message)) |
| del self._cluster.tasks[task_id] |
| |
| if task_id in self._cluster.members: |
| member_id = self._cluster.members[task_id] |
| del self._cluster.members[task_id] |
| |
| # If the scheduler fails over after ZK is updated but before its result is persisted, it |
| # will receive the same status update and try to remove the non-existent member. |
| # Removing a non-existent member is a no-op for ClusterManager.remove_member(). |
| # Note that if the order is reversed, the scheduler will fail to clean up the orphan ZK |
| # entry. |
| self._cluster_manager.remove_member(member_id) |
| |
| if member_id == self._cluster.master_id: |
| self._cluster.master_id = None |
| log.info("Master of cluster %s has terminated. Restarting election" % self.cluster_name) |
| |
| assert not self._elector, "Election must not be running since there is a current master" |
| self._elector = self._new_elector() |
| |
| # Add current slaves after removing the terminated task. |
| for t in self._cluster.running_tasks: |
| self._elector.add_slave(t.task_id, t.mesos_slave_id) |
| self._elector.start() |
| else: |
| # It will be rescheduled next time the launcher is given an offer. |
| log.info("Slave %s of cluster %s has terminated" % (task_id, self.cluster_name)) |
| else: |
| assert previous_state != mesos_pb2.TASK_RUNNING, ( |
| "Task must exist in ClusterManager if it was running") |
| log.warn("Slave %s of cluster %s failed to start running" % (task_id, self.cluster_name)) |
| |
| if self.terminated: |
| log.info("Shutting down launcher for cluster %s" % self.cluster_name) |
| self._shutdown() |
| return |
| |
| # Finally, checkpoint the status update. |
| self._state_provider.dump_cluster_state(self._cluster) |
| log.info("Checkpointed the status update for task %s of cluster %s" % ( |
| task_id, self.cluster_name)) |
| |
| def _shutdown(self): |
| self._cluster_manager.delete_cluster() |
| log.info("Deleted cluster %s from ZooKeeper" % self.cluster_name) |
| self._state_provider.remove_cluster_state(self.cluster_name) |
| log.info("Removed the state of cluster %s" % self.cluster_name) |
| |
| if self._elector: |
| self._elector.abort() |
| self._elector = None |
| |
| def _master_elected(self, master_task): |
| """ |
| Invoked by the elector when a master is elected for this cluster. |
| |
| :param master_task: The task ID for the elected master. |
| |
| NOTE: A new election can be started if the currently elected master has already terminated |
| before the election result arrives at the launcher. |
| """ |
| if not master_task: |
| log.error("No master can be elected for cluster %s" % self.cluster_name) |
| return |
| |
| with self._lock: |
| self._elector = None # Elector will terminate soon. |
| |
| if master_task not in self._cluster.tasks: |
| log.info("Slave %s of cluster %s was elected but has died. Restarting election" % ( |
| master_task, self.cluster_name)) |
| self._elector = self._new_elector() |
| # Add current slaves. |
| for t in self._cluster.running_tasks: |
| self._elector.add_slave(t.task_id, t.mesos_slave_id) |
| self._elector.start() |
| return |
| |
| assert master_task in self._cluster.members, ( |
| "Elected master must have been added to 'members'") |
| master_id = self._cluster.members[master_task] |
| slave_host = self._cluster.tasks[master_task].hostname |
| |
| log.info('Promoting MySQL task %s on host %s (member ID: %s) as the master for cluster %s' % ( |
| master_task, slave_host, master_id, self.cluster_name)) |
| |
| # Persist the elected master before publishing to ZK. If the scheduler fails over before the |
| # result is persisted, it re-elects one. |
| self._cluster.master_id = master_id |
| self._state_provider.dump_cluster_state(self._cluster) |
| |
| # Publish the elected master. If the scheduler fails over before the master is published, it |
| # republishes it. |
| self._cluster_manager.promote_member(master_id) |
| |
| def framework_message(self, task_id, slave_id, message): |
| with self._lock: |
| if self._elector: |
| data = json.loads(message) |
| self._elector.update_position(int(data["epoch"]), task_id, data["position"]) |
| else: |
| log.info("Received framework message '%s' from task %s (%s) when there is no pending " |
| "election" % (message, task_id, slave_id)) |
| |
| |
| # --- Utility methods. --- |
| def create_resources(cpus, mem, disk, ports, role='*'): |
| """Return a list of 'Resource' protobuf for the provided resources.""" |
| cpus_resources = mesos_pb2.Resource() |
| cpus_resources.name = 'cpus' |
| cpus_resources.type = mesos_pb2.Value.SCALAR |
| cpus_resources.role = role |
| cpus_resources.scalar.value = cpus |
| |
| mem_resources = mesos_pb2.Resource() |
| mem_resources.name = 'mem' |
| mem_resources.type = mesos_pb2.Value.SCALAR |
| mem_resources.role = role |
| mem_resources.scalar.value = mem.as_(Data.MB) |
| |
| disk_resources = mesos_pb2.Resource() |
| disk_resources.name = 'disk' |
| disk_resources.type = mesos_pb2.Value.SCALAR |
| disk_resources.role = role |
| disk_resources.scalar.value = disk.as_(Data.MB) |
| |
| ports_resources = mesos_pb2.Resource() |
| ports_resources.name = 'ports' |
| ports_resources.type = mesos_pb2.Value.RANGES |
| ports_resources.role = role |
| for port in ports: |
| port_range = ports_resources.ranges.range.add() |
| port_range.begin = port |
| port_range.end = port |
| |
| return [cpus_resources, mem_resources, disk_resources, ports_resources] |
| |
| |
| def is_terminal(state): |
| """Return true if the task reached terminal state.""" |
| return state in [ |
| mesos_pb2.TASK_FINISHED, |
| mesos_pb2.TASK_KILLED, |
| mesos_pb2.TASK_FAILED, |
| mesos_pb2.TASK_LOST] |