/**
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.aurora.scheduler.thrift;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.Nullable;
import javax.inject.Inject;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;

import org.apache.aurora.common.stats.StatsProvider;
import org.apache.aurora.gen.DrainHostsResult;
import org.apache.aurora.gen.EndMaintenanceResult;
import org.apache.aurora.gen.ExplicitReconciliationSettings;
import org.apache.aurora.gen.Hosts;
import org.apache.aurora.gen.InstanceKey;
import org.apache.aurora.gen.InstanceTaskConfig;
import org.apache.aurora.gen.JobConfiguration;
import org.apache.aurora.gen.JobKey;
import org.apache.aurora.gen.JobUpdate;
import org.apache.aurora.gen.JobUpdateInstructions;
import org.apache.aurora.gen.JobUpdateKey;
import org.apache.aurora.gen.JobUpdatePulseStatus;
import org.apache.aurora.gen.JobUpdateQuery;
import org.apache.aurora.gen.JobUpdateRequest;
import org.apache.aurora.gen.JobUpdateSettings;
import org.apache.aurora.gen.JobUpdateSummary;
import org.apache.aurora.gen.ListBackupsResult;
import org.apache.aurora.gen.MaintenanceStatusResult;
import org.apache.aurora.gen.PulseJobUpdateResult;
import org.apache.aurora.gen.QueryRecoveryResult;
import org.apache.aurora.gen.ReadOnlyScheduler;
import org.apache.aurora.gen.ResourceAggregate;
import org.apache.aurora.gen.Response;
import org.apache.aurora.gen.Result;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.gen.SlaPolicy;
import org.apache.aurora.gen.StartJobUpdateResult;
import org.apache.aurora.gen.StartMaintenanceResult;
import org.apache.aurora.gen.TaskQuery;
import org.apache.aurora.gen.VariableBatchJobUpdateStrategy;
import org.apache.aurora.scheduler.base.JobKeys;
import org.apache.aurora.scheduler.base.Numbers;
import org.apache.aurora.scheduler.base.Query;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.configuration.ConfigurationManager;
import org.apache.aurora.scheduler.configuration.ConfigurationManager.TaskDescriptionException;
import org.apache.aurora.scheduler.configuration.SanitizedConfiguration;
import org.apache.aurora.scheduler.cron.CronException;
import org.apache.aurora.scheduler.cron.CronJobManager;
import org.apache.aurora.scheduler.cron.SanitizedCronJob;
import org.apache.aurora.scheduler.maintenance.MaintenanceController;
import org.apache.aurora.scheduler.quota.QuotaCheckResult;
import org.apache.aurora.scheduler.quota.QuotaManager;
import org.apache.aurora.scheduler.quota.QuotaManager.QuotaException;
import org.apache.aurora.scheduler.reconciliation.TaskReconciler;
import org.apache.aurora.scheduler.state.StateChangeResult;
import org.apache.aurora.scheduler.state.StateManager;
import org.apache.aurora.scheduler.state.UUIDGenerator;
import org.apache.aurora.scheduler.storage.SnapshotStore;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
import org.apache.aurora.scheduler.storage.backup.Recovery;
import org.apache.aurora.scheduler.storage.backup.StorageBackup;
import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
import org.apache.aurora.scheduler.storage.entities.IHostStatus;
import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
import org.apache.aurora.scheduler.storage.entities.IJobKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateRequest;
import org.apache.aurora.scheduler.storage.entities.IJobUpdateSettings;
import org.apache.aurora.scheduler.storage.entities.IMetadata;
import org.apache.aurora.scheduler.storage.entities.IRange;
import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin;
import org.apache.aurora.scheduler.thrift.aop.ThriftWorkload;
import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift;
import org.apache.aurora.scheduler.updater.JobDiff;
import org.apache.aurora.scheduler.updater.JobUpdateController;
import org.apache.aurora.scheduler.updater.JobUpdateController.AuditData;
import org.apache.aurora.scheduler.updater.JobUpdateController.JobUpdatingException;
import org.apache.aurora.scheduler.updater.UpdateInProgressException;
import org.apache.aurora.scheduler.updater.UpdateStateException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank;
import static org.apache.aurora.gen.ResponseCode.INVALID_REQUEST;
import static org.apache.aurora.gen.ResponseCode.JOB_UPDATING_ERROR;
import static org.apache.aurora.gen.ResponseCode.OK;
import static org.apache.aurora.scheduler.base.Numbers.convertRanges;
import static org.apache.aurora.scheduler.base.Numbers.toRanges;
import static org.apache.aurora.scheduler.base.Tasks.ACTIVE_STATES;
import static org.apache.aurora.scheduler.base.Tasks.TERMINAL_STATES;
import static org.apache.aurora.scheduler.quota.QuotaCheckResult.Result.INSUFFICIENT_QUOTA;
import static org.apache.aurora.scheduler.thrift.Responses.addMessage;
import static org.apache.aurora.scheduler.thrift.Responses.empty;
import static org.apache.aurora.scheduler.thrift.Responses.error;
import static org.apache.aurora.scheduler.thrift.Responses.invalidRequest;
import static org.apache.aurora.scheduler.thrift.Responses.ok;

/**
 * Aurora scheduler thrift server implementation.
 * <p/>
 * Interfaces between users and the scheduler to access/modify jobs and perform cluster
 * administration tasks.
 */
@DecoratedThrift
class SchedulerThriftInterface implements AnnotatedAuroraAdmin {

  @VisibleForTesting
  static final String STAT_PREFIX = "thrift_workload_";
  @VisibleForTesting
  static final String CREATE_JOB = STAT_PREFIX + "createJob";
  @VisibleForTesting
  static final String CREATE_OR_UPDATE_CRON = STAT_PREFIX + "createOrUpdateCronTemplate";
  @VisibleForTesting
  static final String KILL_TASKS = STAT_PREFIX + "killTasks";
  @VisibleForTesting
  static final String RESTART_SHARDS = STAT_PREFIX + "restartShards";
  @VisibleForTesting
  static final String START_MAINTENANCE = STAT_PREFIX + "startMaintenance";
  @VisibleForTesting
  static final String DRAIN_HOSTS = STAT_PREFIX + "drainHosts";
  @VisibleForTesting
  static final String SLA_DRAIN_HOSTS = STAT_PREFIX + "slaDrainHosts";
  @VisibleForTesting
  static final String MAINTENANCE_STATUS = STAT_PREFIX + "maintenanceStatus";
  @VisibleForTesting
  static final String END_MAINTENANCE = STAT_PREFIX + "endMaintenance";
  @VisibleForTesting
  static final String ADD_INSTANCES = STAT_PREFIX + "addInstances";
  @VisibleForTesting
  static final String START_JOB_UPDATE = STAT_PREFIX + "startJobUpdate";
  @VisibleForTesting
  static final String PRUNE_TASKS = STAT_PREFIX + "pruneTasks";

  private static final Logger LOG = LoggerFactory.getLogger(SchedulerThriftInterface.class);

  private final ConfigurationManager configurationManager;
  private final Thresholds thresholds;
  private final NonVolatileStorage storage;
  private final SnapshotStore snapshotStore;
  private final StorageBackup backup;
  private final Recovery recovery;
  private final MaintenanceController maintenance;
  private final CronJobManager cronJobManager;
  private final QuotaManager quotaManager;
  private final StateManager stateManager;
  private final UUIDGenerator uuidGenerator;
  private final JobUpdateController jobUpdateController;
  private final ReadOnlyScheduler.Iface readOnlyScheduler;
  private final AuditMessages auditMessages;
  private final TaskReconciler taskReconciler;

  private final AtomicLong createJobCounter;
  private final AtomicLong createOrUpdateCronCounter;
  private final AtomicLong killTasksCounter;
  private final AtomicLong restartShardsCounter;
  private final AtomicLong startMaintenanceCounter;
  private final AtomicLong drainHostsCounter;
  private final AtomicLong slaDrainHostsCounter;
  private final AtomicLong maintenanceStatusCounter;
  private final AtomicLong endMaintenanceCounter;
  private final AtomicLong addInstancesCounter;
  private final AtomicLong startJobUpdateCounter;
  private final AtomicLong pruneTasksCounter;

  @Inject
  SchedulerThriftInterface(
      ConfigurationManager configurationManager,
      Thresholds thresholds,
      NonVolatileStorage storage,
      SnapshotStore snapshotStore,
      StorageBackup backup,
      Recovery recovery,
      CronJobManager cronJobManager,
      MaintenanceController maintenance,
      QuotaManager quotaManager,
      StateManager stateManager,
      UUIDGenerator uuidGenerator,
      JobUpdateController jobUpdateController,
      ReadOnlyScheduler.Iface readOnlyScheduler,
      AuditMessages auditMessages,
      TaskReconciler taskReconciler,
      StatsProvider statsProvider) {

    this.configurationManager = requireNonNull(configurationManager);
    this.thresholds = requireNonNull(thresholds);
    this.storage = requireNonNull(storage);
    this.snapshotStore = requireNonNull(snapshotStore);
    this.backup = requireNonNull(backup);
    this.recovery = requireNonNull(recovery);
    this.maintenance = requireNonNull(maintenance);
    this.cronJobManager = requireNonNull(cronJobManager);
    this.quotaManager = requireNonNull(quotaManager);
    this.stateManager = requireNonNull(stateManager);
    this.uuidGenerator = requireNonNull(uuidGenerator);
    this.jobUpdateController = requireNonNull(jobUpdateController);
    this.readOnlyScheduler = requireNonNull(readOnlyScheduler);
    this.auditMessages = requireNonNull(auditMessages);
    this.taskReconciler = requireNonNull(taskReconciler);

    this.createJobCounter = statsProvider.makeCounter(CREATE_JOB);
    this.createOrUpdateCronCounter = statsProvider.makeCounter(CREATE_OR_UPDATE_CRON);
    this.killTasksCounter = statsProvider.makeCounter(KILL_TASKS);
    this.restartShardsCounter = statsProvider.makeCounter(RESTART_SHARDS);
    this.startMaintenanceCounter = statsProvider.makeCounter(START_MAINTENANCE);
    this.drainHostsCounter = statsProvider.makeCounter(DRAIN_HOSTS);
    this.slaDrainHostsCounter = statsProvider.makeCounter(SLA_DRAIN_HOSTS);
    this.maintenanceStatusCounter = statsProvider.makeCounter(MAINTENANCE_STATUS);
    this.endMaintenanceCounter = statsProvider.makeCounter(END_MAINTENANCE);
    this.addInstancesCounter = statsProvider.makeCounter(ADD_INSTANCES);
    this.startJobUpdateCounter = statsProvider.makeCounter(START_JOB_UPDATE);
    this.pruneTasksCounter = statsProvider.makeCounter(PRUNE_TASKS);
  }

  @Override
  public Response createJob(JobConfiguration mutableJob) {
    SanitizedConfiguration sanitized;
    try {
      sanitized = SanitizedConfiguration.fromUnsanitized(
          configurationManager,
          IJobConfiguration.build(mutableJob));
    } catch (TaskDescriptionException e) {
      return error(INVALID_REQUEST, e);
    }

    if (sanitized.isCron()) {
      return invalidRequest(NO_CRON);
    }

    return storage.write(storeProvider -> {
      IJobConfiguration job = sanitized.getJobConfig();

      try {
        jobUpdateController.assertNotUpdating(job.getKey());

        checkJobExists(storeProvider, job.getKey());

        ITaskConfig template = sanitized.getJobConfig().getTaskConfig();
        int count = sanitized.getJobConfig().getInstanceCount();

        validateTaskLimits(
            count,
            quotaManager.checkInstanceAddition(template, count, storeProvider));

        LOG.info("Launching " + count + " tasks.");
        stateManager.insertPendingTasks(
            storeProvider,
            template,
            sanitized.getInstanceIds());
        createJobCounter.addAndGet(sanitized.getInstanceIds().size());

        return ok();
      } catch (JobUpdatingException e) {
        return error(JOB_UPDATING_ERROR, e);
      } catch (JobExistsException | TaskValidationException e) {
        return error(INVALID_REQUEST, e);
      }
    });
  }

  private static class JobExistsException extends Exception {
    JobExistsException(String message) {
      super(message);
    }
  }

  private void checkJobExists(StoreProvider store, IJobKey jobKey) throws JobExistsException {
    if (!Iterables.isEmpty(store.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()))
        || getCronJob(store, jobKey).isPresent()) {

      throw new JobExistsException(jobAlreadyExistsMessage(jobKey));
    }
  }

  private Response createOrUpdateCronTemplate(
      JobConfiguration mutableJob,
      boolean updateOnly) {

    IJobConfiguration job = IJobConfiguration.build(mutableJob);
    IJobKey jobKey = JobKeys.assertValid(job.getKey());

    SanitizedConfiguration sanitized;
    try {
      sanitized = SanitizedConfiguration.fromUnsanitized(configurationManager, job);
    } catch (TaskDescriptionException e) {
      return error(INVALID_REQUEST, e);
    }

    if (!sanitized.isCron()) {
      return invalidRequest(noCronScheduleMessage(jobKey));
    }

    return storage.write(storeProvider -> {
      try {
        jobUpdateController.assertNotUpdating(jobKey);

        int count = sanitized.getJobConfig().getInstanceCount();

        validateTaskLimits(
            count,
            quotaManager.checkCronUpdate(sanitized.getJobConfig(), storeProvider));

        // TODO(mchucarroll): Merge CronJobManager.createJob/updateJob
        if (updateOnly || getCronJob(storeProvider, jobKey).isPresent()) {
          // The job already has a schedule: so update it.
          cronJobManager.updateJob(SanitizedCronJob.from(sanitized));
        } else {
          checkJobExists(storeProvider, jobKey);
          cronJobManager.createJob(SanitizedCronJob.from(sanitized));
        }
        createOrUpdateCronCounter.addAndGet(count);

        return ok();
      } catch (JobUpdatingException e) {
        return error(JOB_UPDATING_ERROR, e);
      } catch (JobExistsException | TaskValidationException | CronException e) {
        return error(INVALID_REQUEST, e);
      }
    });
  }

  @Override
  public Response scheduleCronJob(JobConfiguration mutableJob) {
    return createOrUpdateCronTemplate(mutableJob, false);
  }

  @Override
  public Response replaceCronTemplate(JobConfiguration mutableJob) {
    return createOrUpdateCronTemplate(mutableJob, true);
  }

  @Override
  public Response descheduleCronJob(JobKey mutableJobKey) {
    try {
      IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
      jobUpdateController.assertNotUpdating(jobKey);

      if (cronJobManager.deleteJob(jobKey)) {
        return ok();
      } else {
        return addMessage(empty(), OK, notScheduledCronMessage(jobKey));
      }
    } catch (JobUpdatingException e) {
      return error(JOB_UPDATING_ERROR, e);
    }
  }

  @Override
  public Response populateJobConfig(JobConfiguration description) throws TException {
    return readOnlyScheduler.populateJobConfig(description);
  }

  @Override
  public Response startCronJob(JobKey mutableJobKey) {
    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));

    try {
      cronJobManager.startJobNow(jobKey);
      return ok();
    } catch (CronException e) {
      return invalidRequest("Failed to start cron job - " + e.getMessage());
    }
  }

  // TODO(William Farner): Provide status information about cron jobs here.
  @ThriftWorkload
  @Override
  public Response getTasksStatus(TaskQuery query) throws TException {
    return readOnlyScheduler.getTasksStatus(query);
  }

  @ThriftWorkload
  @Override
  public Response getTasksWithoutConfigs(TaskQuery query) throws TException {
    return readOnlyScheduler.getTasksWithoutConfigs(query);
  }

  @ThriftWorkload
  @Override
  public Response getPendingReason(TaskQuery query) throws TException {
    return readOnlyScheduler.getPendingReason(query);
  }

  @ThriftWorkload
  @Override
  public Response getConfigSummary(JobKey job) throws TException {
    return readOnlyScheduler.getConfigSummary(job);
  }

  @ThriftWorkload
  @Override
  public Response getRoleSummary() throws TException {
    return readOnlyScheduler.getRoleSummary();
  }

  @ThriftWorkload
  @Override
  public Response getJobSummary(@Nullable String maybeNullRole) throws TException {
    return readOnlyScheduler.getJobSummary(maybeNullRole);
  }

  @ThriftWorkload
  @Override
  public Response getJobs(@Nullable String maybeNullRole) throws TException {
    return readOnlyScheduler.getJobs(maybeNullRole);
  }

  @Override
  public Response getJobUpdateDiff(JobUpdateRequest request) throws TException {
    return readOnlyScheduler.getJobUpdateDiff(request);
  }

  @Override
  public Response getTierConfigs() throws TException {
    return readOnlyScheduler.getTierConfigs();
  }

  private static Query.Builder implicitKillQuery(Query.Builder query) {
    // Unless statuses were specifically supplied, only attempt to kill active tasks.
    return query.get().getStatuses().isEmpty() ? query.byStatus(ACTIVE_STATES) : query;
  }

  @Override
  public Response killTasks(
      @Nullable JobKey mutableJob,
      @Nullable Set<Integer> instances,
      @Nullable String message) {

    Response response = empty();
    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJob));
    Query.Builder query;
    if (instances == null || Iterables.isEmpty(instances)) {
      query = implicitKillQuery(Query.jobScoped(jobKey));
    } else {
      query = implicitKillQuery(Query.instanceScoped(jobKey, instances));
    }

    return storage.write(storeProvider -> {
      try {
        jobUpdateController.assertNotUpdating(jobKey);
      } catch (JobUpdatingException e) {
        return error(JOB_UPDATING_ERROR, e);
      }

      Iterable<IScheduledTask> tasks = storeProvider.getTaskStore().fetchTasks(query);

      LOG.info("Killing tasks matching " + query);

      int tasksKilled = 0;
      for (String taskId : Tasks.ids(tasks)) {
        if (StateChangeResult.SUCCESS == stateManager.changeState(
            storeProvider,
            taskId,
            Optional.empty(),
            ScheduleStatus.KILLING,
            auditMessages.killedByRemoteUser(Optional.ofNullable(message)))) {
          ++tasksKilled;
        }
      }
      killTasksCounter.addAndGet(tasksKilled);

      return tasksKilled > 0
          ? response.setResponseCode(OK)
          : addMessage(response, OK, NO_TASKS_TO_KILL_MESSAGE);
    });
  }

  @Override
  public Response restartShards(JobKey mutableJobKey, Set<Integer> shardIds) {
    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(mutableJobKey));
    checkNotBlank(shardIds);

    return storage.write(storeProvider -> {
      try {
        jobUpdateController.assertNotUpdating(jobKey);
      } catch (JobUpdatingException e) {
        return error(JOB_UPDATING_ERROR, e);
      }

      Query.Builder query = Query.instanceScoped(jobKey, shardIds).active();
      Iterable<IScheduledTask> matchingTasks = storeProvider.getTaskStore().fetchTasks(query);
      if (Iterables.size(matchingTasks) != shardIds.size()) {
        return invalidRequest("Not all requested shards are active.");
      }

      LOG.info("Restarting shards matching " + query);
      for (String taskId : Tasks.ids(matchingTasks)) {
        stateManager.changeState(
            storeProvider,
            taskId,
            Optional.empty(),
            ScheduleStatus.RESTARTING,
            auditMessages.restartedByRemoteUser());
      }
      restartShardsCounter.addAndGet(shardIds.size());

      return ok();
    });
  }

  @Override
  public Response getQuota(String ownerRole) throws TException {
    return readOnlyScheduler.getQuota(ownerRole);
  }

  @Override
  public Response setQuota(String ownerRole, ResourceAggregate resourceAggregate) {
    checkNotBlank(ownerRole);
    requireNonNull(resourceAggregate);

    try {
      storage.write((NoResult<QuotaException>) store -> quotaManager.saveQuota(
          ownerRole,
          ThriftBackfill.backfillResourceAggregate(resourceAggregate),
          store));
      return ok();
    } catch (QuotaException e) {
      return error(INVALID_REQUEST, e);
    }
  }

  @Override
  public Response startMaintenance(Hosts hosts) {
    startMaintenanceCounter.addAndGet(hosts.getHostNamesSize());
    return ok(Result.startMaintenanceResult(
        new StartMaintenanceResult()
            .setStatuses(IHostStatus.toBuildersSet(
                maintenance.startMaintenance(hosts.getHostNames())))));
  }

  @Override
  public Response drainHosts(Hosts hosts) {
    drainHostsCounter.addAndGet(hosts.getHostNamesSize());
    return ok(Result.drainHostsResult(
        new DrainHostsResult().setStatuses(IHostStatus.toBuildersSet(
            maintenance.drain(hosts.getHostNames())))));
  }

  @Override
  public Response maintenanceStatus(Hosts hosts) {
    maintenanceStatusCounter.addAndGet(hosts.getHostNamesSize());
    return ok(Result.maintenanceStatusResult(
        new MaintenanceStatusResult().setStatuses(IHostStatus.toBuildersSet(
            maintenance.getStatus(hosts.getHostNames())))));
  }

  @Override
  public Response endMaintenance(Hosts hosts) {
    endMaintenanceCounter.addAndGet(hosts.getHostNamesSize());
    return ok(Result.endMaintenanceResult(
        new EndMaintenanceResult()
            .setStatuses(IHostStatus.toBuildersSet(
                maintenance.endMaintenance(hosts.getHostNames())))));
  }

  @Override
  public Response slaDrainHosts(Hosts hosts, SlaPolicy defaultSlaPolicy, long timeoutSecs) {
    slaDrainHostsCounter.addAndGet(hosts.getHostNamesSize());
    return ok(Result.drainHostsResult(
        new DrainHostsResult().setStatuses(IHostStatus.toBuildersSet(
            maintenance.slaDrain(hosts.getHostNames(), defaultSlaPolicy, timeoutSecs)))));
  }

  @Override
  public Response forceTaskState(String taskId, ScheduleStatus status) {
    checkNotBlank(taskId);
    requireNonNull(status);

    storage.write(storeProvider -> stateManager.changeState(
        storeProvider,
        taskId,
        Optional.empty(),
        status,
        auditMessages.transitionedBy()));

    return ok();
  }

  @Override
  public Response performBackup() {
    backup.backupNow();
    return ok();
  }

  @Override
  public Response listBackups() {
    return ok(Result.listBackupsResult(new ListBackupsResult()
        .setBackups(recovery.listBackups())));
  }

  @Override
  public Response stageRecovery(String backupId) {
    recovery.stage(backupId);
    return ok();
  }

  @Override
  public Response queryRecovery(TaskQuery query) {
    return ok(Result.queryRecoveryResult(new QueryRecoveryResult()
        .setTasks(IScheduledTask.toBuildersSet(recovery.query(Query.arbitrary(query))))));
  }

  @Override
  public Response deleteRecoveryTasks(TaskQuery query) {
    recovery.deleteTasks(Query.arbitrary(query));
    return ok();
  }

  @Override
  public Response commitRecovery() {
    recovery.commit();
    return ok();
  }

  @Override
  public Response unloadRecovery() {
    recovery.unload();
    return ok();
  }

  @Override
  public Response snapshot() {
    snapshotStore.snapshot();
    return ok();
  }

  @Override
  public Response triggerExplicitTaskReconciliation(ExplicitReconciliationSettings settings)
      throws TException {
    try {
      requireNonNull(settings);
      Preconditions.checkArgument(!settings.isSetBatchSize() || settings.getBatchSize() > 0,
          "Batch size must be greater than zero.");

      Optional<Integer> batchSize = settings.isSetBatchSize()
          ? Optional.of(settings.getBatchSize())
          : Optional.empty();

      taskReconciler.triggerExplicitReconciliation(batchSize);
      return ok();
    } catch (IllegalArgumentException e) {
      return error(INVALID_REQUEST, e);
    }
  }

  @Override
  public Response triggerImplicitTaskReconciliation() throws TException {
    taskReconciler.triggerImplicitReconciliation();
    return ok();
  }

  @Override
  public Response addInstances(InstanceKey key, int count) {
    IJobKey jobKey = JobKeys.assertValid(IJobKey.build(key.getJobKey()));

    if (count <= 0) {
      return invalidRequest(INVALID_INSTANCE_COUNT);
    }

    Response response = empty();
    return storage.write(storeProvider -> {
      try {
        if (getCronJob(storeProvider, jobKey).isPresent()) {
          return invalidRequest("Instances may not be added to cron jobs.");
        }

        jobUpdateController.assertNotUpdating(jobKey);

        List<IScheduledTask> currentTasks = ImmutableList.copyOf(
            storeProvider.getTaskStore().fetchTasks(Query.jobScoped(jobKey).active()));

        Optional<IScheduledTask> templateTask = currentTasks.stream()
            .filter(e -> e.getAssignedTask().getInstanceId() == key.getInstanceId())
            .findFirst();
        if (!templateTask.isPresent()) {
          return invalidRequest(INVALID_INSTANCE_ID);
        }

        int lastId = currentTasks.stream()
            .map(e -> e.getAssignedTask().getInstanceId())
            .max(Comparator.naturalOrder())
            .get();

        Set<Integer> instanceIds = ContiguousSet.create(
            Range.openClosed(lastId, lastId + count),
            DiscreteDomain.integers());

        ITaskConfig task = templateTask.get().getAssignedTask().getTask();
        validateTaskLimits(
            Iterables.size(currentTasks) + instanceIds.size(),
            quotaManager.checkInstanceAddition(task, instanceIds.size(), storeProvider));

        stateManager.insertPendingTasks(storeProvider, task, instanceIds);
        addInstancesCounter.addAndGet(instanceIds.size());

        return response.setResponseCode(OK);
      } catch (JobUpdatingException e) {
        return error(JOB_UPDATING_ERROR, e);
      } catch (TaskValidationException | IllegalArgumentException e) {
        return error(INVALID_REQUEST, e);
      }
    });
  }

  private Optional<IJobConfiguration> getCronJob(StoreProvider storeProvider, IJobKey jobKey) {
    requireNonNull(jobKey);
    return storeProvider.getCronJobStore().fetchJob(jobKey);
  }

  private static class TaskValidationException extends Exception {
    TaskValidationException(String message) {
      super(message);
    }
  }

  private void validateTaskLimits(
      int totalInstances,
      QuotaCheckResult quotaCheck) throws TaskValidationException {

    if (totalInstances <= 0 || totalInstances > thresholds.getMaxTasksPerJob()) {
      throw new TaskValidationException(String.format(
          "Instance count must be between 1 and %d inclusive.", thresholds.getMaxTasksPerJob()));
    }

    if (quotaCheck.getResult() == INSUFFICIENT_QUOTA) {
      throw new TaskValidationException("Insufficient resource quota: "
          + quotaCheck.getDetails().orElse(""));
    }
  }

  private static Set<InstanceTaskConfig> buildInitialState(Map<Integer, ITaskConfig> tasks) {
    // Translate tasks into instance IDs.
    Multimap<ITaskConfig, Integer> instancesByConfig = HashMultimap.create();
    Multimaps.invertFrom(Multimaps.forMap(tasks), instancesByConfig);

    // Reduce instance IDs into contiguous ranges.
    Map<ITaskConfig, Set<Range<Integer>>> rangesByConfig =
        Maps.transformValues(instancesByConfig.asMap(), Numbers::toRanges);

    ImmutableSet.Builder<InstanceTaskConfig> builder = ImmutableSet.builder();
    for (Map.Entry<ITaskConfig, Set<Range<Integer>>> entry : rangesByConfig.entrySet()) {
      builder.add(new InstanceTaskConfig()
          .setTask(entry.getKey().newBuilder())
          .setInstances(IRange.toBuildersSet(convertRanges(entry.getValue()))));
    }

    return builder.build();
  }

  @Override
  public Response startJobUpdate(JobUpdateRequest mutableRequest, @Nullable String message) {
    requireNonNull(mutableRequest);

    if (!mutableRequest.getTaskConfig().isIsService()) {
      return invalidRequest(NON_SERVICE_TASK);
    }

    int totalInstancesFromGroups;
    JobUpdateSettings settings = requireNonNull(mutableRequest.getSettings());

    // Gracefully handle a client sending an update with an older thrift schema
    // TODO(rdelvalle): Remove after version 0.22.0 ships
    ThriftBackfill.backfillUpdateStrategy(settings);

    // Keep old job schema in case we want to revert to a lower version of Aurora that doesn't
    // support variable update group sizes
    if (settings.getUpdateStrategy().isSetQueueStrategy()) {
      totalInstancesFromGroups = settings.getUpdateStrategy().getQueueStrategy().getGroupSize();
    } else if (settings.getUpdateStrategy().isSetBatchStrategy()) {
      totalInstancesFromGroups = settings.getUpdateStrategy().getBatchStrategy().getGroupSize();
    } else if (settings.getUpdateStrategy().isSetVarBatchStrategy()) {
      VariableBatchJobUpdateStrategy strategy = settings.getUpdateStrategy().getVarBatchStrategy();

      if (strategy.getGroupSizes().stream().anyMatch(x -> x <= 0)) {
        return invalidRequest(INVALID_GROUP_SIZE);
      }

      totalInstancesFromGroups = strategy.getGroupSizes().stream().reduce(0, Integer::sum);
    } else {
      return invalidRequest(UNKNOWN_UPDATE_STRATEGY);
    }

    if (totalInstancesFromGroups <= 0) {
      return invalidRequest(NO_INSTANCES_MODIFIED);
    }

    if (settings.getMaxPerInstanceFailures() < 0) {
      return invalidRequest(INVALID_MAX_INSTANCE_FAILURES);
    }

    if (settings.getMaxFailedInstances() < 0) {
      return invalidRequest(INVALID_MAX_FAILED_INSTANCES);
    }

    if (settings.getMaxPerInstanceFailures() * mutableRequest.getInstanceCount()
            > thresholds.getMaxUpdateInstanceFailures()) {
      return invalidRequest(TOO_MANY_POTENTIAL_FAILED_INSTANCES);
    }

    if (settings.getMinWaitInInstanceRunningMs() < 0) {
      return invalidRequest(INVALID_MIN_WAIT_TO_RUNNING);
    }

    if (settings.getBlockIfNoPulsesAfterMs() < 0) {
      return invalidRequest(INVALID_PULSE_TIMEOUT);
    }

    if (settings.isSlaAware() && !mutableRequest.getTaskConfig().isSetSlaPolicy()) {
      return invalidRequest(INVALID_SLA_AWARE_UPDATE);
    }

    IJobUpdateRequest request;
    try {
      request = IJobUpdateRequest.build(
          new JobUpdateRequest(mutableRequest)
              .setTaskConfig(configurationManager.validateAndPopulate(
                  ITaskConfig.build(mutableRequest.getTaskConfig()),
                  mutableRequest.getInstanceCount())
              .newBuilder()));
    } catch (TaskDescriptionException e) {
      return error(INVALID_REQUEST, e);
    }

    return storage.write(storeProvider -> {
      IJobKey job = request.getTaskConfig().getJob();
      if (getCronJob(storeProvider, job).isPresent()) {
        return invalidRequest(NO_CRON);
      }

      String updateId = uuidGenerator.createNew().toString();
      IJobUpdateSettings settings1 = request.getSettings();

      JobDiff diff = JobDiff.compute(
          storeProvider.getTaskStore(),
          job,
          JobDiff.asMap(request.getTaskConfig(), request.getInstanceCount()),
          settings1.getUpdateOnlyTheseInstances());

      Set<Integer> invalidScope = diff.getOutOfScopeInstances(
          Numbers.rangesToInstanceIds(settings1.getUpdateOnlyTheseInstances()));
      if (!invalidScope.isEmpty()) {
        return invalidRequest(
            "The update request attempted to update specific instances,"
                + " but some are irrelevant to the update and current job state: "
                + invalidScope);
      }

      if (diff.isNoop()) {
        return addMessage(empty(), OK, NOOP_JOB_UPDATE_MESSAGE);
      }

      JobUpdateInstructions instructions = new JobUpdateInstructions()
          .setSettings(settings1.newBuilder())
          .setInitialState(buildInitialState(diff.getReplacedInstances()));

      Set<Integer> replacements = diff.getReplacementInstances();
      if (!replacements.isEmpty()) {
        instructions.setDesiredState(
            new InstanceTaskConfig()
                .setTask(request.getTaskConfig().newBuilder())
                .setInstances(IRange.toBuildersSet(convertRanges(toRanges(replacements)))));
      }

      String remoteUserName = auditMessages.getRemoteUserName();
      IJobUpdate update = IJobUpdate.build(new JobUpdate()
          .setSummary(new JobUpdateSummary()
              .setKey(new JobUpdateKey(job.newBuilder(), updateId))
              .setUser(remoteUserName)
              .setMetadata(IMetadata.toBuildersSet(request.getMetadata())))
          .setInstructions(instructions));

      Response response = empty();
      try {
        validateTaskLimits(
            request.getInstanceCount(),
            quotaManager.checkJobUpdate(update, storeProvider));

        jobUpdateController.start(
            update,
            new AuditData(remoteUserName, Optional.ofNullable(message)));
        startJobUpdateCounter.addAndGet(request.getInstanceCount());
        return response.setResponseCode(OK)
            .setResult(Result.startJobUpdateResult(
                new StartJobUpdateResult(update.getSummary().getKey().newBuilder())
                    .setUpdateSummary(update.getSummary().newBuilder())));
      } catch (UpdateInProgressException e) {
        return error(INVALID_REQUEST, e)
            .setResult(Result.startJobUpdateResult(
                new StartJobUpdateResult(e.getInProgressUpdateSummary().getKey().newBuilder())
                    .setUpdateSummary(e.getInProgressUpdateSummary().newBuilder())));
      } catch (UpdateStateException | TaskValidationException e) {
        return error(INVALID_REQUEST, e);
      }
    });
  }

  private Response changeJobUpdateState(
      JobUpdateKey mutableKey,
      JobUpdateStateChange change,
      Optional<String> message) {

    IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
    JobKeys.assertValid(key.getJob());
    return storage.write(storeProvider -> {
      try {
        change.modifyUpdate(
            jobUpdateController,
            key,
            new AuditData(auditMessages.getRemoteUserName(), message));
        return ok();
      } catch (UpdateStateException e) {
        return error(INVALID_REQUEST, e);
      }
    });
  }

  private interface JobUpdateStateChange {
    void modifyUpdate(JobUpdateController controller, IJobUpdateKey key, AuditData auditData)
        throws UpdateStateException;
  }

  @Override
  public Response pauseJobUpdate(JobUpdateKey mutableKey, @Nullable String message) {
    return changeJobUpdateState(
        mutableKey,
        JobUpdateController::pause,
        Optional.ofNullable(message));
  }

  @Override
  public Response resumeJobUpdate(JobUpdateKey mutableKey, @Nullable String message) {
    return changeJobUpdateState(
        mutableKey,
        JobUpdateController::resume,
        Optional.ofNullable(message));
  }

  @Override
  public Response abortJobUpdate(JobUpdateKey mutableKey, @Nullable String message) {
    return changeJobUpdateState(
        mutableKey,
        JobUpdateController::abort,
        Optional.ofNullable(message));
  }

  @Override
  public Response rollbackJobUpdate(JobUpdateKey mutableKey, @Nullable String message) {
    return changeJobUpdateState(
        mutableKey,
        JobUpdateController::rollback,
        Optional.ofNullable(message));
  }

  @Override
  public Response pulseJobUpdate(JobUpdateKey mutableUpdateKey) {
    IJobUpdateKey updateKey = validateJobUpdateKey(mutableUpdateKey);
    try {
      JobUpdatePulseStatus result = jobUpdateController.pulse(updateKey);
      return ok(Result.pulseJobUpdateResult(new PulseJobUpdateResult(result)));
    } catch (UpdateStateException e) {
      return error(INVALID_REQUEST, e);
    }
  }

  @Override
  public Response pruneTasks(TaskQuery query) throws TException {
    if (query.isSetStatuses() && query.getStatuses().stream().anyMatch(ACTIVE_STATES::contains)) {
      return error("Tasks in non-terminal state cannot be pruned.");
    } else if (!query.isSetStatuses()) {
      query.setStatuses(TERMINAL_STATES);
    }

    Iterable<IScheduledTask> tasks = storage.read(storeProvider ->
        storeProvider.getTaskStore().fetchTasks(Query.arbitrary(query)));
    // For some reason fetchTasks ignores the offset/limit options of a TaskQuery. So we have to
    // manually apply the limit here. To be fixed in AURORA-1892.
    if (query.isSetLimit()) {
      tasks = Iterables.limit(tasks, query.getLimit());
    }

    Iterable<String> taskIds = Iterables.transform(
        tasks,
        task -> task.getAssignedTask().getTaskId());

    return storage.write(storeProvider -> {
      Set<String> taskIdsSet = Sets.newHashSet(taskIds);
      stateManager.deleteTasks(storeProvider, taskIdsSet);
      pruneTasksCounter.addAndGet(taskIdsSet.size());
      return ok();
    });
  }

  @ThriftWorkload
  @Override
  public Response getJobUpdateSummaries(JobUpdateQuery mutableQuery) throws TException {
    return readOnlyScheduler.getJobUpdateSummaries(mutableQuery);
  }

  @ThriftWorkload
  @Override
  public Response getJobUpdateDetails(JobUpdateKey key, JobUpdateQuery query) throws TException {
    return readOnlyScheduler.getJobUpdateDetails(key, query);
  }

  private static IJobUpdateKey validateJobUpdateKey(JobUpdateKey mutableKey) {
    IJobUpdateKey key = IJobUpdateKey.build(mutableKey);
    JobKeys.assertValid(key.getJob());
    checkNotBlank(key.getId());
    return key;
  }

  @VisibleForTesting
  static String noCronScheduleMessage(IJobKey jobKey) {
    return String.format("Job %s has no cron schedule", JobKeys.canonicalString(jobKey));
  }

  @VisibleForTesting
  static String notScheduledCronMessage(IJobKey jobKey) {
    return String.format("Job %s is not scheduled with cron", JobKeys.canonicalString(jobKey));
  }

  @VisibleForTesting
  static String jobAlreadyExistsMessage(IJobKey jobKey) {
    return String.format("Job %s already exists", JobKeys.canonicalString(jobKey));
  }

  @VisibleForTesting
  static final String NO_TASKS_TO_KILL_MESSAGE = "No tasks to kill.";

  @VisibleForTesting
  static final String NOOP_JOB_UPDATE_MESSAGE = "Job is unchanged by proposed update.";

  @VisibleForTesting
  static final String NO_CRON = "Cron jobs may only be created/updated by calling scheduleCronJob.";

  @VisibleForTesting
  static final String NO_INSTANCES_MODIFIED = "Update results in no instance being modified.";

  @VisibleForTesting
  static final String NON_SERVICE_TASK = "Updates are not supported for non-service tasks.";

  @VisibleForTesting
  static final String INVALID_GROUP_SIZE = "All update group sizes must be positive.";

  @VisibleForTesting
  static final String INVALID_MAX_FAILED_INSTANCES = "maxFailedInstances must be non-negative.";

  @VisibleForTesting
  static final String TOO_MANY_POTENTIAL_FAILED_INSTANCES = "Your update allows too many failures "
      + "to occur, consider decreasing the per-instance failures or maxFailedInstances.";

  @VisibleForTesting
  static final String INVALID_MAX_INSTANCE_FAILURES
      = "maxPerInstanceFailures must be non-negative.";

  @VisibleForTesting
  static final String INVALID_MIN_WAIT_TO_RUNNING =
      "minWaitInInstanceRunningMs must be non-negative.";

  @VisibleForTesting
  static final String INVALID_PULSE_TIMEOUT = "blockIfNoPulsesAfterMs must be positive.";

  @VisibleForTesting
  static final String INVALID_INSTANCE_ID = "No active task found for a given instance ID.";

  @VisibleForTesting
  static final String INVALID_INSTANCE_COUNT = "Instance count must be positive.";

  @VisibleForTesting
  static final String UNKNOWN_UPDATE_STRATEGY = "Update strategy provided is unknown.";

  @VisibleForTesting
  static final String INVALID_SLA_AWARE_UPDATE = "slaAware is true, but no task slaPolicy is "
      + "specified.";
}
