blob: 97e3ba12634788654b92e4609a88cc8b9c824bc6 [file] [log] [blame]
import Queue
import json
import os
import signal
from subprocess import CalledProcessError
import sys
import threading
from twitter.common import log
from twitter.common.concurrent import defer
from twitter.common.zookeeper.serverset.endpoint import Endpoint, ServiceInstance
from twitter.mysos.common.cluster import ClusterListener, get_cluster_path
from twitter.mysos.common.zookeeper import parse
from .installer import PackageInstaller
from .state import StateManager
from .task_runner import TaskError, TaskRunner, TaskRunnerProvider
from .task_control import TaskControl
from kazoo.client import KazooClient
class MysosTaskRunnerProvider(TaskRunnerProvider):
def __init__(self, task_control_provider, installer_provider, backup_store_provider):
self._task_control_provider = task_control_provider
self._installer_provider = installer_provider
self._backup_store_provider = backup_store_provider
def from_task(self, task, sandbox):
data = json.loads(task.data)
cluster_name, host, port, zk_url = data['cluster'], data['host'], data['port'], data['zk_url']
_, servers, path = parse(zk_url)
kazoo = KazooClient(servers)
kazoo.start()
self_instance = ServiceInstance(Endpoint(host, port))
try:
task_control = self._task_control_provider.from_task(task, sandbox)
installer = self._installer_provider.from_task(task, sandbox)
backup_store = self._backup_store_provider.from_task(task, sandbox)
except (TaskControl.Error, PackageInstaller.Error) as e:
raise TaskError(e.message)
state_manager = StateManager(sandbox, backup_store)
return MysosTaskRunner(
self_instance,
kazoo,
get_cluster_path(path, cluster_name),
installer,
task_control,
state_manager)
class MysosTaskRunner(TaskRunner):
"""
A runner that manages the lifecycle of a MySQL task (through the provided 'task_control').
The task is executed as a long-running process its return code can be obtained using 'join()'.
Thread-safety:
This class is accessed from the MysosExecutor thread (not the ExecutorDriver thread because
MysosExecutor invokes operations asynchronously) and the ClusterListener thread and is
thread-safe.
TODO(jyx): Push the knowledge of the underlying subprocess down to the task control and stop the
the subprocess using the task control.
"""
def __init__(self, self_instance, kazoo, cluster_root, installer, task_control, state_manager):
"""
:param self_instance: The local ServiceInstance associated with this task runner.
:param kazoo: Kazoo client, it should be started before being passed in.
:param cluster_root: The ZooKeeper root path for *this cluster*.
:param installer: The PackageInstaller for MySQL.
:param task_control: The TaskControl that interacts with the task process.
:param state_manager: The StateManager for managing the executor state.
"""
self._installer = installer
self._env = None # The environment variables for the 'task_control' commands. Set by the
# installer.
self._task_control = task_control
self._state_manager = state_manager
self._lock = threading.Lock()
self._popen = None # The singleton task process started by '_task_control'.
self._started = False # Indicates whether start() has already been called.
self._stopping = False # Indicates whether stop() has already been called.
self._exited = threading.Event() # Set when the task process has exited.
self._result = Queue.Queue() # The returncode returned by the task process or an exception.
# Public events and queue.
self.promoted = threading.Event()
self.demoted = threading.Event()
self.master = Queue.Queue() # Set when a master change is detected.
self._listener = ClusterListener(
kazoo,
cluster_root,
self_instance,
promotion_callback=self._on_promote,
demotion_callback=self._on_demote,
master_callback=self._on_master_change) # Listener started by start().
# --- Public interface. ---
def start(self):
"""
Start the runner in a separate thread and wait for the task process to be forked.
"""
with self._lock:
if self._started:
raise TaskError("Runner already started")
self._started = True
# Can potentially hold the lock for a long time but it's OK since the runner is not accessed
# by multiple threads until after it's started; can be a noop as well, depending on the
# installer implementation.
try:
# 1. Install the application.
self._env = self._installer.install()
log.info("Package installation completed. Resulting environment variables: %s" % self._env)
# 2. Restore/initialize the application state.
self._state_manager.bootstrap(self._task_control, self._env)
log.info("Executor state fully bootstrapped")
# 3. Start the task subprocess.
# Store the process so we can kill it if necessary.
self._popen = self._task_control.start(env=self._env)
log.info("Task started in subprocess %s" % self._popen.pid)
defer(self._wait)
# 4. Start monitoring.
# Only start listening to ZK events after the task subprocess has been successfully started.
self._listener.start()
except (PackageInstaller.Error, StateManager.Error, CalledProcessError) as e:
raise TaskError("Failed to start MySQL task: %s" % e)
def _wait(self):
# Block until the subprocess exits and delivers the return code.
self._result.put(self._popen.wait())
# Notify stop() if it is waiting.
self._exited.set()
def stop(self, timeout=10):
"""
Stop the runner and wait for its thread (and the sub-processes) to exit.
:param timeout: The timeout that the process should die before a hard SIGKILL is issued
(SIGTERM is used initially).
:return: True if an active runner is stopped, False if the runner is not started or already
stopping/stopped.
"""
if not self._started:
log.warn("Cannot stop the runner because it's not started")
return False
if self._stopping:
log.warn("The runner is already stopping/stopped")
return False
with self._lock:
log.info("Stopping runner")
self._stopping = True
if not self._popen:
log.info("The runner task did not start successfully so no need to kill it")
return False
try:
log.info("Terminating process group: %s" % self._popen.pid)
os.killpg(self._popen.pid, signal.SIGTERM)
except OSError as e:
log.info("The sub-processes are already terminated: %s" % e)
return False
log.info("Waiting for process to terminate due to SIGTERM")
# Escalate to SIGKILL if SIGTERM is not sufficient.
if not self._exited.wait(timeout=timeout):
with self._lock:
try:
log.warn("Killing process group %s which failed to terminate cleanly within %s secs" %
(self._popen.pid, timeout))
os.killpg(self._popen.pid, signal.SIGKILL)
except OSError as e:
log.info("The sub-processes are already terminated: %s" % e)
return False
log.info("Waiting for process to terminate due to SIGKILL")
if not self._exited.wait(timeout=timeout):
raise TaskError("Failed to kill process group %s" % self._popen.pid)
return True
def get_log_position(self):
"""
Get the log position of the MySQL slave. Return None if it cannot be obtained.
"""
try:
log_position = self._task_control.get_log_position(env=self._env)
return log_position
except CalledProcessError as e:
raise TaskError("Unable to get the slave's log position: %s" % e)
def join(self):
"""
Wait for the runner to terminate.
:return: The return code of the subprocess. NOTE: A negative value -N indicates that the
child was terminated by signal N (on Unix).
:exception: The TaskError exception due to an error in task control operations.
"""
# Using 'sys.maxint' makes this forever wait interruptible.
result = self._result.get(True, sys.maxint)
if isinstance(result, Exception):
raise result
else:
return result
# --- ClusterListener handlers. ---
def _on_promote(self):
self.promoted.set()
if not self._exited.is_set():
defer(self._promote)
def _promote(self):
try:
self._task_control.promote(env=self._env)
except CalledProcessError as e:
self._result.put(TaskError("Failed to promote the slave: %s" % e))
self.stop()
def _on_demote(self):
"""
Executor shuts itself down when demoted.
"""
self.demoted.set()
# Stop the runner asynchronously.
if not self._exited.is_set():
log.info("Shutting down runner because it is demoted.")
# Call stop() asynchronously because this callback is invoked from the Kazoo thread which we
# don't want to block.
defer(self.stop)
def _on_master_change(self, master):
self.master.put(master)
if not self._exited.is_set():
defer(lambda: self._reparent(master))
def _reparent(self, master):
try:
self._task_control.reparent(
master.service_endpoint.host,
master.service_endpoint.port,
env=self._env)
except CalledProcessError as e:
self._result.put(TaskError("Failed to reparent the slave: %s" % e))
self.stop()