#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

from twitter.common import log

from apache.aurora.common.aurora_job_key import AuroraJobKey
from apache.aurora.common.cluster import Cluster

from .restarter import Restarter
from .scheduler_client import SchedulerProxy
from .sla import Sla
from .updater_util import UpdaterConfig

from gen.apache.aurora.api.constants import LIVE_STATES
from gen.apache.aurora.api.ttypes import (
    ExplicitReconciliationSettings,
    InstanceKey,
    JobKey,
    JobUpdateKey,
    JobUpdateQuery,
    JobUpdateRequest,
    Metadata,
    PercentageSlaPolicy,
    Resource,
    ResourceAggregate,
    SlaPolicy,
    TaskQuery
)


class AuroraClientAPI(object):
  """This class provides the API to talk to the twitter scheduler"""

  class Error(Exception): pass
  class ClusterMismatch(Error, ValueError): pass
  class ThriftInternalError(Error): pass
  class UpdateConfigError(Error): pass

  def __init__(
      self,
      cluster,
      user_agent,
      verbose=False,
      bypass_leader_redirect=False):

    if not isinstance(cluster, Cluster):
      raise TypeError('AuroraClientAPI expects instance of Cluster for "cluster", got %s' %
          type(cluster))

    self._scheduler_proxy = SchedulerProxy(
        cluster,
        verbose=verbose,
        user_agent=user_agent,
        bypass_leader_redirect=bypass_leader_redirect)
    self._cluster = cluster

  @property
  def cluster(self):
    return self._cluster

  @property
  def scheduler_proxy(self):
    return self._scheduler_proxy

  def create_job(self, config):
    log.info('Creating job %s' % config.name())
    log.debug('Full configuration: %s' % config.job())
    return self._scheduler_proxy.createJob(config.job())

  def schedule_cron(self, config):
    log.info("Registering job %s with cron" % config.name())
    log.debug('Full configuration: %s' % config.job())
    return self._scheduler_proxy.scheduleCronJob(config.job())

  def deschedule_cron(self, jobkey):
    log.info("Removing cron schedule for job %s" % jobkey)
    return self._scheduler_proxy.descheduleCronJob(jobkey.to_thrift())

  def populate_job_config(self, config):
    # read-only calls are retriable.
    return self._scheduler_proxy.populateJobConfig(config.job(), retry=True)

  def start_cronjob(self, job_key):
    self._assert_valid_job_key(job_key)

    log.info("Starting cron job: %s" % job_key)
    return self._scheduler_proxy.startCronJob(job_key.to_thrift())

  def get_jobs(self, role):
    log.info("Retrieving jobs for role %s" % role)
    # read-only calls are retriable.
    return self._scheduler_proxy.getJobs(role, retry=True)

  def add_instances(self, job_key, instance_id, count):
    key = InstanceKey(jobKey=job_key.to_thrift(), instanceId=instance_id)
    log.info("Adding %s instances to %s using the task config of instance %s"
             % (count, job_key, instance_id))
    return self._scheduler_proxy.addInstances(key, count)

  def kill_job(self, job_key, instances=None, message=None):
    log.info("Killing tasks for job: %s" % job_key)
    self._assert_valid_job_key(job_key)

    if instances is not None:
      log.info("Instances to be killed: %s" % instances)
      instances = frozenset([int(s) for s in instances])
    return self._scheduler_proxy.killTasks(job_key.to_thrift(), instances, message)

  def check_status(self, job_key):
    self._assert_valid_job_key(job_key)

    log.info("Checking status of %s" % job_key)
    return self.query_no_configs(job_key.to_thrift_query())

  @classmethod
  def build_query(cls, role, job, env=None, instances=None, statuses=LIVE_STATES):
    return TaskQuery(jobKeys=[JobKey(role=role, environment=env, name=job)],
                     statuses=statuses,
                     instanceIds=instances)

  def query(self, query):
    try:
      # read-only calls are retriable.
      return self._scheduler_proxy.getTasksStatus(query, retry=True)
    except SchedulerProxy.ThriftInternalError as e:
      raise self.ThriftInternalError(e.args[0])

  def query_no_configs(self, query):
    """Returns all matching tasks without TaskConfig.executorConfig set."""
    try:
      # read-only calls are retriable.
      return self._scheduler_proxy.getTasksWithoutConfigs(query, retry=True)
    except SchedulerProxy.ThriftInternalError as e:
      raise self.ThriftInternalError(e.args[0])

  def _job_update_request(self, config, instances=None, metadata=None):
    try:
      settings = UpdaterConfig(**config.update_config().get()).to_thrift_update_settings(instances)
    except ValueError as e:
      raise self.UpdateConfigError(str(e))

    return JobUpdateRequest(
        instanceCount=config.instances(),
        settings=settings,
        taskConfig=config.job().taskConfig,
        metadata={Metadata(k, v) for k, v in metadata.items()} if metadata else None
    )

  def start_job_update(self, config, message, instances=None, metadata=None):
    """Requests Scheduler to start job update process.

    Arguments:
    config -- AuroraConfig instance with update details.
    message -- Audit message to include with the change.
    instances -- Optional list of instances to restrict update to.
    metadata -- Optional set of metadata (key, value) to associate with the update.

    Returns response object with update ID and acquired job lock.
    """
    request = self._job_update_request(config, instances, metadata)
    log.info("Starting update for: %s" % config.name())
    # retring starting a job update is safe, client and scheduler reconcile state if the
    # job update is in progress (AURORA-1711).
    return self._scheduler_proxy.startJobUpdate(request, message, retry=True)

  def pause_job_update(self, update_key, message):
    """Requests Scheduler to pause active job update.

    Arguments:
    update_key -- Update identifier.
    message -- Audit message to include with the change.

    Returns response object.
    """
    return self._scheduler_proxy.pauseJobUpdate(update_key, message)

  def resume_job_update(self, update_key, message):
    """Requests Scheduler to resume a job update paused previously.

    Arguments:
    update_key -- Update identifier.
    message -- Audit message to include with the change.

    Returns response object.
    """
    return self._scheduler_proxy.resumeJobUpdate(update_key, message)

  def abort_job_update(self, update_key, message):
    """Requests Scheduler to abort active or paused job update.

    Arguments:
    update_key -- Update identifier.
    message -- Audit message to include with the change.

    Returns response object.
    """
    return self._scheduler_proxy.abortJobUpdate(update_key, message)

  def rollback_job_update(self, update_key, message):
    """Requests Scheduler to rollback active job update.

    Arguments:
    update_key -- Update identifier.
    message -- Audit message to include with the change.

    Returns response object.
    """
    return self._scheduler_proxy.rollbackJobUpdate(update_key, message)

  def get_job_update_diff(self, config, instances=None):
    """Requests scheduler to calculate difference between scheduler and client job views.

    Arguments:
    config -- AuroraConfig instance with update details.
    message -- Audit message to include with the change.
    instances -- Optional list of instances to restrict update to.

    Returns response object with job update diff results.
    """
    request = self._job_update_request(config, instances)
    log.debug("Requesting job update diff details for: %s" % config.name())
    # read-only calls are retriable.
    return self._scheduler_proxy.getJobUpdateDiff(request, retry=True)

  def query_job_updates(
      self,
      role=None,
      job_key=None,
      user=None,
      update_statuses=None,
      update_key=None):
    """Returns all job updates matching the query.

    Arguments:
    role -- job role.
    job_key -- job key.
    user -- user who initiated an update.
    update_statuses -- set of JobUpdateStatus to match.
    update_key -- JobUpdateKey to match.

    Returns response object with all matching job update summaries.
    """
    # TODO(wfarner): Consider accepting JobUpdateQuery in this function instead of kwargs.
    # read-only calls are retriable.
    return self._scheduler_proxy.getJobUpdateSummaries(
        JobUpdateQuery(
            role=role,
            jobKey=job_key.to_thrift() if job_key else None,
            user=user,
            updateStatuses=update_statuses,
            key=update_key),
        retry=True)

  def get_job_update_details(self, key):
    """Gets JobUpdateDetails for the specified job update ID.

    Arguments:
    id -- job update ID.

    Returns a response object with JobUpdateDetails.
    """
    if not isinstance(key, JobUpdateKey):
      raise self.TypeError('Invalid key %r: expected %s but got %s'
                           % (key, JobUpdateKey.__name__, key.__class__.__name__))

    query = JobUpdateQuery(key=key)
    # read-only calls are retriable.
    return self._scheduler_proxy.getJobUpdateDetails(key, query, retry=True)

  def restart(self, job_key, instances, restart_settings):
    """Perform a rolling restart of the job.

       If instances is None or [], restart all instances.  Returns the
       scheduler response for the last restarted batch of instances (which
       allows the client to show the job URL), or the status check response
       if no tasks were active.
    """
    self._assert_valid_job_key(job_key)

    return Restarter(job_key, restart_settings, self._scheduler_proxy).restart(instances)

  def start_maintenance(self, hosts):
    log.info("Starting maintenance for: %s" % hosts.hostNames)
    return self._scheduler_proxy.startMaintenance(hosts)

  def drain_hosts(self, hosts):
    log.info("Draining tasks on: %s" % hosts.hostNames)
    return self._scheduler_proxy.drainHosts(hosts)

  def sla_drain_hosts(self, hosts, percentage=None, duration=None, timeout=None):
    log.info("Asking scheduler to drain tasks by SLA on: %s" % hosts.hostNames)
    return self._scheduler_proxy.slaDrainHosts(
      hosts,
      SlaPolicy(
        percentageSlaPolicy=PercentageSlaPolicy(
          percentage=percentage,
          durationSecs=duration)),
      timeout)

  def maintenance_status(self, hosts):
    log.info("Maintenance status for: %s" % hosts.hostNames)
    # read-only calls are retriable.
    return self._scheduler_proxy.maintenanceStatus(hosts, retry=True)

  def end_maintenance(self, hosts):
    log.info("Ending maintenance for: %s" % hosts.hostNames)
    return self._scheduler_proxy.endMaintenance(hosts)

  def get_quota(self, role):
    log.info("Getting quota for: %s" % role)
    # read-only calls are retriable.
    return self._scheduler_proxy.getQuota(role, retry=True)

  def set_quota(self, role, cpu, ram, disk):
    log.info("Setting quota for user:%s cpu:%f ram:%d disk: %d"
              % (role, cpu, ram, disk))
    return self._scheduler_proxy.setQuota(
        role,
        ResourceAggregate(frozenset([
            Resource(numCpus=cpu),
            Resource(ramMb=ram),
            Resource(diskMb=disk)])))

  def get_tier_configs(self):
    log.debug("Getting tier configurations")
    # read-only calls are retriable.
    return self._scheduler_proxy.getTierConfigs(retry=True)

  def force_task_state(self, task_id, status):
    log.info("Requesting that task %s transition to state %s" % (task_id, status))
    return self._scheduler_proxy.forceTaskState(task_id, status)

  def perform_backup(self):
    return self._scheduler_proxy.performBackup()

  def list_backups(self):
    return self._scheduler_proxy.listBackups()

  def stage_recovery(self, backup_id):
    return self._scheduler_proxy.stageRecovery(backup_id)

  def query_recovery(self, query):
    # read-only calls are retriable.
    return self._scheduler_proxy.queryRecovery(query, retry=True)

  def delete_recovery_tasks(self, query):
    return self._scheduler_proxy.deleteRecoveryTasks(query)

  def commit_recovery(self):
    return self._scheduler_proxy.commitRecovery()

  def unload_recovery(self):
    return self._scheduler_proxy.unloadRecovery()

  def snapshot(self):
    return self._scheduler_proxy.snapshot()

  def prune_tasks(self, query):
    return self._scheduler_proxy.pruneTasks(query)

  def sla_get_job_uptime_vector(self, job_key):
    self._assert_valid_job_key(job_key)
    return Sla(self._scheduler_proxy).get_job_uptime_vector(job_key)

  def sla_get_safe_domain_vector(self, min_instance_count, hosts=None):
    return Sla(self._scheduler_proxy).get_domain_uptime_vector(
        self._cluster,
        min_instance_count,
        hosts)

  def reconcile_explicit(self, batch_size):
    return self._scheduler_proxy.triggerExplicitTaskReconciliation(
      ExplicitReconciliationSettings(batchSize=batch_size))

  def reconcile_implicit(self):
    return self._scheduler_proxy.triggerImplicitTaskReconciliation()

  def _assert_valid_job_key(self, job_key):
    if not isinstance(job_key, AuroraJobKey):
      raise TypeError('Invalid job_key %r: expected %s but got %s'
          % (job_key, AuroraJobKey.__name__, job_key.__class__.__name__))
    if job_key.cluster != self.cluster.name:
      raise self.ClusterMismatch('job %s does not belong to cluster %s' % (job_key,
          self.cluster.name))
