import Queue
import json
import os
import signal
from subprocess import CalledProcessError
import sys
import threading

from mysos.common.cluster import ClusterListener, get_cluster_path
from mysos.common.zookeeper import parse

from .installer import PackageInstaller
from .state import StateManager
from .task_runner import TaskError, TaskRunner, TaskRunnerProvider
from .task_control import TaskControl

from kazoo.client import KazooClient
from twitter.common import log
from twitter.common.concurrent import defer
from twitter.common.zookeeper.serverset.endpoint import Endpoint, ServiceInstance


class MysosTaskRunnerProvider(TaskRunnerProvider):
  def __init__(self, task_control_provider, installer_provider, backup_store_provider):
    self._task_control_provider = task_control_provider
    self._installer_provider = installer_provider
    self._backup_store_provider = backup_store_provider

  def from_task(self, task, sandbox):
    data = json.loads(task.data)
    cluster_name, host, port, zk_url = data['cluster'], data['host'], data['port'], data['zk_url']
    _, servers, path = parse(zk_url)
    kazoo = KazooClient(servers)
    kazoo.start()
    self_instance = ServiceInstance(Endpoint(host, port))

    try:
      task_control = self._task_control_provider.from_task(task, sandbox)
      installer = self._installer_provider.from_task(task, sandbox)
      backup_store = self._backup_store_provider.from_task(task, sandbox)
    except (TaskControl.Error, PackageInstaller.Error) as e:
      kazoo.stop()  # Kazoo needs to be cleaned up. See kazoo/issues/217.
      raise TaskError(e.message)

    state_manager = StateManager(sandbox, backup_store)

    return MysosTaskRunner(
        self_instance,
        kazoo,
        get_cluster_path(path, cluster_name),
        installer,
        task_control,
        state_manager)


class MysosTaskRunner(TaskRunner):
  """
    A runner that manages the lifecycle of a MySQL task (through the provided 'task_control').

    The task is executed as a long-running process its return code can be obtained using 'join()'.

    Thread-safety:
      This class is accessed from the MysosExecutor thread (not the ExecutorDriver thread because
      MysosExecutor invokes operations asynchronously) and the ClusterListener thread and is
      thread-safe.

    TODO(jyx): Push the knowledge of the underlying subprocess down to the task control and stop the
               the subprocess using the task control.
  """

  def __init__(self, self_instance, kazoo, cluster_root, installer, task_control, state_manager):
    """
      :param self_instance: The local ServiceInstance associated with this task runner.
      :param kazoo: Kazoo client, it should be started before being passed in.
      :param cluster_root: The ZooKeeper root path for *this cluster*.
      :param installer: The PackageInstaller for MySQL.
      :param task_control: The TaskControl that interacts with the task process.
      :param state_manager: The StateManager for managing the executor state.
    """
    self._installer = installer
    self._env = None  # The environment variables for the 'task_control' commands. Set by the
                      # installer.

    self._task_control = task_control
    self._state_manager = state_manager

    self._lock = threading.Lock()
    self._popen = None  # The singleton task process started by '_task_control'.

    self._started = False  # Indicates whether start() has already been called.
    self._stopping = False  # Indicates whether stop() has already been called.
    self._exited = threading.Event()  # Set when the task process has exited.
    self._result = Queue.Queue()  # The returncode returned by the task process or an exception.

    # Public events and queue.
    self.promoted = threading.Event()
    self.demoted = threading.Event()
    self.master = Queue.Queue()  # Set when a master change is detected.

    self._kazoo = kazoo
    self._listener = ClusterListener(
        kazoo,
        cluster_root,
        self_instance,
        promotion_callback=self._on_promote,
        demotion_callback=self._on_demote,
        master_callback=self._on_master_change)  # Listener started by start().

  # --- Public interface. ---
  def start(self):
    """
      Start the runner in a separate thread and wait for the task process to be forked.
    """
    with self._lock:
      if self._started:
        raise TaskError("Runner already started")
      self._started = True

      # Can potentially hold the lock for a long time but it's OK since the runner is not accessed
      # by multiple threads until after it's started; can be a noop as well, depending on the
      # installer implementation.
      try:
        # 1. Install the application.
        self._env = self._installer.install()
        log.info("Package installation completed. Resulting environment variables: %s" % self._env)

        # 2. Restore/initialize the application state.
        self._state_manager.bootstrap(self._task_control, self._env)
        log.info("Executor state fully bootstrapped")

        # 3. Start the task subprocess.
        # Store the process so we can kill it if necessary.
        self._popen = self._task_control.start(env=self._env)
        log.info("Task started in subprocess %s" % self._popen.pid)
        defer(self._wait)

        # 4. Start monitoring.
        # Only start listening to ZK events after the task subprocess has been successfully started.
        self._listener.start()
      except (PackageInstaller.Error, StateManager.Error, CalledProcessError) as e:
        raise TaskError("Failed to start MySQL task: %s" % e)

  def _wait(self):
    # Block until the subprocess exits and delivers the return code.
    self._result.put(self._popen.wait())

    # Notify stop() if it is waiting.
    self._exited.set()

  def stop(self, timeout=10):
    try:
      return self._stop(timeout)
    finally:
      self._kazoo.stop()
      log.info("Runner cleaned up")

  def _stop(self, timeout):
    """
      Stop the runner and wait for its thread (and the sub-processes) to exit.

      :param timeout: The timeout that the process should die before a hard SIGKILL is issued
                      (SIGTERM is used initially).
      :return: True if an active runner is stopped, False if the runner is not started or already
               stopping/stopped.
    """
    if not self._started:
      log.warn("Cannot stop the runner because it's not started")
      return False

    if self._stopping:
      log.warn("The runner is already stopping/stopped")
      return False

    with self._lock:
      log.info("Stopping runner")
      self._stopping = True

      if not self._popen:
        log.info("The runner task did not start successfully so no need to kill it")
        return False

      try:
        log.info("Terminating process group: %s" % self._popen.pid)
        os.killpg(self._popen.pid, signal.SIGTERM)
      except OSError as e:
        log.info("The sub-processes are already terminated: %s" % e)
        return False

    log.info("Waiting for process to terminate due to SIGTERM")

    # Escalate to SIGKILL if SIGTERM is not sufficient.
    if not self._exited.wait(timeout=timeout):
      with self._lock:
        try:
          log.warn("Killing process group %s which failed to terminate cleanly within %s secs" %
                   (self._popen.pid, timeout))
          os.killpg(self._popen.pid, signal.SIGKILL)
        except OSError as e:
          log.info("The sub-processes are already terminated: %s" % e)
          return False

    log.info("Waiting for process to terminate due to SIGKILL")
    if not self._exited.wait(timeout=timeout):
      raise TaskError("Failed to kill process group %s" % self._popen.pid)

    return True

  def get_log_position(self):
    """
      Get the log position of the MySQL slave. Return None if it cannot be obtained.
    """
    try:
      log_position = self._task_control.get_log_position(env=self._env)
      return log_position
    except CalledProcessError as e:
      raise TaskError("Unable to get the slave's log position: %s" % e)

  def join(self):
    """
      Wait for the runner to terminate.

      :return: The return code of the subprocess. NOTE: A negative value -N indicates that the
               child was terminated by signal N (on Unix).

      :exception: The TaskError exception due to an error in task control operations.
    """
    # Using 'sys.maxint' makes this forever wait interruptible.
    result = self._result.get(True, sys.maxint)
    if isinstance(result, Exception):
      raise result
    else:
      return result

  # --- ClusterListener handlers. ---
  def _on_promote(self):
    self.promoted.set()
    if not self._exited.is_set():
      defer(self._promote)

  def _promote(self):
    try:
      self._task_control.promote(env=self._env)
    except CalledProcessError as e:
      self._result.put(TaskError("Failed to promote the slave: %s" % e))
      self.stop()

  def _on_demote(self):
    """
      Executor shuts itself down when demoted.
    """
    self.demoted.set()

    # Stop the runner asynchronously.
    if not self._exited.is_set():
      log.info("Shutting down runner because it is demoted.")
      # Call stop() asynchronously because this callback is invoked from the Kazoo thread which we
      # don't want to block.
      defer(self.stop)

  def _on_master_change(self, master):
    self.master.put(master)
    if not self._exited.is_set():
      defer(lambda: self._reparent(master))

  def _reparent(self, master):
    try:
      self._task_control.reparent(
          master.service_endpoint.host,
          master.service_endpoint.port,
          env=self._env)
    except CalledProcessError as e:
      self._result.put(TaskError("Failed to reparent the slave: %s" % e))
      self.stop()
