/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.ambari.infra.manager;

import com.google.common.collect.Lists;
import org.apache.ambari.infra.model.ExecutionContextResponse;
import org.apache.ambari.infra.model.JobDetailsResponse;
import org.apache.ambari.infra.model.JobExecutionDetailsResponse;
import org.apache.ambari.infra.model.JobExecutionInfoResponse;
import org.apache.ambari.infra.model.JobInstanceDetailsResponse;
import org.apache.ambari.infra.model.JobOperationParams;
import org.apache.ambari.infra.model.StepExecutionContextResponse;
import org.apache.ambari.infra.model.StepExecutionInfoResponse;
import org.apache.ambari.infra.model.StepExecutionProgressResponse;
import org.springframework.batch.admin.history.StepExecutionHistory;
import org.springframework.batch.admin.service.JobService;
import org.springframework.batch.admin.service.NoSuchStepExecutionException;
import org.springframework.batch.admin.web.JobInfo;
import org.springframework.batch.admin.web.StepExecutionProgress;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.launch.NoSuchJobInstanceException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;

@Named
public class JobManager {

  @Inject
  private JobService jobService;

  @Inject
  private JobOperator jobOperator;

  private TimeZone timeZone = TimeZone.getDefault();

  public Set<String> getAllJobNames() {
    return jobOperator.getJobNames();
  }

  /**
   * Launch a new job instance (based on job name) and applies customized parameters to it.
   * Also add a new date parameter to make sure the job instance will be unique
   */
  public JobExecutionInfoResponse launchJob(String jobName, String params)
    throws JobParametersInvalidException, JobInstanceAlreadyExistsException, NoSuchJobException,
    JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
    // TODO: handle params
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addDate("date", new Date());
    return new JobExecutionInfoResponse(jobService.launch(jobName, jobParametersBuilder.toJobParameters()), timeZone);
  }

  /**
   * Get all executions ids that mapped to specific job name,
   */
  public Set<Long> getExecutionIdsByJobName(String jobName) throws NoSuchJobException {
    return jobOperator.getRunningExecutions(jobName);
  }

  /**
   * Stop all running job executions and returns with the number of stopped jobs.
   */
  public Integer stopAllJobs() {
    return jobService.stopAll();
  }

  /**
   * Gather job execution details by job execution id.
   */
  public JobExecutionDetailsResponse getExectionInfo(Long jobExecutionId) throws NoSuchJobExecutionException {
    JobExecution jobExecution = jobService.getJobExecution(jobExecutionId);
    List<StepExecutionInfoResponse> stepExecutionInfos = new ArrayList<StepExecutionInfoResponse>();
    for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
      stepExecutionInfos.add(new StepExecutionInfoResponse(stepExecution, timeZone));
    }
    Collections.sort(stepExecutionInfos, new Comparator<StepExecutionInfoResponse>() {
      @Override
      public int compare(StepExecutionInfoResponse o1, StepExecutionInfoResponse o2) {
        return o1.getId().compareTo(o2.getId());
      }
    });
    return new JobExecutionDetailsResponse(new JobExecutionInfoResponse(jobExecution, timeZone), stepExecutionInfos);
  }

  /**
   * Stop or abandon a running job execution by job execution id
   */
  public JobExecutionInfoResponse stopOrAbandonJobByExecutionId(Long jobExecutionId, JobOperationParams.JobStopOrAbandonOperationParam operation)
    throws NoSuchJobExecutionException, JobExecutionNotRunningException, JobExecutionAlreadyRunningException {
    JobExecution jobExecution;
    if (JobOperationParams.JobStopOrAbandonOperationParam.STOP.equals(operation)) {
      jobExecution = jobService.stop(jobExecutionId);
    } else if (JobOperationParams.JobStopOrAbandonOperationParam.ABANDON.equals(operation)) {
      jobExecution = jobService.abandon(jobExecutionId);
    } else {
      throw new UnsupportedOperationException("Unsupported operaration");
    }
    return new JobExecutionInfoResponse(jobExecution, timeZone);
  }

  /**
   * Get execution context for a job execution instance. (context can be shipped between job executions)
   */
  public ExecutionContextResponse getExecutionContextByJobExecutionId(Long executionId) throws NoSuchJobExecutionException {
    JobExecution jobExecution = jobService.getJobExecution(executionId);
    Map<String, Object> executionMap = new HashMap<>();
    for (Map.Entry<String, Object> entry : jobExecution.getExecutionContext().entrySet()) {
      executionMap.put(entry.getKey(), entry.getValue());
    }
    return new ExecutionContextResponse(executionId, executionMap);
  }

  /**
   * Restart a specific job instance with the same parameters. (only restart operation is supported here)
   */
  public JobExecutionInfoResponse restart(Long jobInstanceId, String jobName,
                                          JobOperationParams.JobRestartOperationParam operation) throws NoSuchJobException, JobParametersInvalidException,
    JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, NoSuchJobExecutionException {
    if (JobOperationParams.JobRestartOperationParam.RESTART.equals(operation)) {
      Collection<JobExecution> jobExecutions = jobService.getJobExecutionsForJobInstance(jobName, jobInstanceId);
      JobExecution jobExecution = jobExecutions.iterator().next();
      Long jobExecutionId = jobExecution.getId();
      return new JobExecutionInfoResponse(jobService.restart(jobExecutionId), timeZone);
    } else {
      throw new UnsupportedOperationException("Unsupported operation (try: RESTART)");
    }
  }

  /**
   * Get all job details. (paged)
   */
  public List<JobInfo> getAllJobs(int start, int pageSize) {
    List<JobInfo> jobs = new ArrayList<>();
    Collection<String> names = jobService.listJobs(start, pageSize);
    for (String name : names) {
      int count = 0;
      try {
        count = jobService.countJobExecutionsForJob(name);
      }
      catch (NoSuchJobException e) {
        // shouldn't happen
      }
      boolean launchable = jobService.isLaunchable(name);
      boolean incrementable = jobService.isIncrementable(name);
      jobs.add(new JobInfo(name, count, null, launchable, incrementable));
    }
    return jobs;
  }

  /**
   * Get all executions for unique job instance.
   */
  public List<JobExecutionInfoResponse> getExecutionsForJobInstance(String jobName, Long jobInstanceId) throws NoSuchJobInstanceException, NoSuchJobException {
    List<JobExecutionInfoResponse> result = Lists.newArrayList();
    JobInstance jobInstance = jobService.getJobInstance(jobInstanceId);
    Collection<JobExecution> jobExecutions = jobService.getJobExecutionsForJobInstance(jobName, jobInstance.getInstanceId());
    for (JobExecution jobExecution : jobExecutions) {
      result.add(new JobExecutionInfoResponse(jobExecution, timeZone));
    }
    return result;
  }

  /**
   * Get job details for a specific job. (paged)
   */
  public JobDetailsResponse getJobDetails(String jobName, int page, int size) throws NoSuchJobException {
    List<JobInstanceDetailsResponse> jobInstanceResponses = Lists.newArrayList();
    Collection<JobInstance> jobInstances = jobService.listJobInstances(jobName, page, size);

    int count = jobService.countJobExecutionsForJob(jobName);
    boolean launchable = jobService.isLaunchable(jobName);
    boolean isIncrementable = jobService.isIncrementable(jobName);

    for (JobInstance jobInstance: jobInstances) {
      List<JobExecutionInfoResponse> executionInfos = Lists.newArrayList();
      Collection<JobExecution> jobExecutions = jobService.getJobExecutionsForJobInstance(jobName, jobInstance.getId());
      if (jobExecutions != null) {
        for (JobExecution jobExecution : jobExecutions) {
          executionInfos.add(new JobExecutionInfoResponse(jobExecution, timeZone));
        }
      }
      jobInstanceResponses.add(new JobInstanceDetailsResponse(jobInstance, executionInfos));
    }
    return new JobDetailsResponse(new JobInfo(jobName, count, launchable, isIncrementable), jobInstanceResponses);
  }

  /**
   * Get step execution details based for job execution id and step execution id.
   */
  public StepExecutionInfoResponse getStepExecution(Long jobExecutionId, Long stepExecutionId) throws NoSuchStepExecutionException, NoSuchJobExecutionException {
    StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
    return new StepExecutionInfoResponse(stepExecution, timeZone);
  }

  /**
   * Get step execution context details. (execution context can be shipped between steps)
   */
  public StepExecutionContextResponse getStepExecutionContext(Long jobExecutionId, Long stepExecutionId) throws NoSuchStepExecutionException, NoSuchJobExecutionException {
    StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
    Map<String, Object> executionMap = new HashMap<>();
    for (Map.Entry<String, Object> entry : stepExecution.getExecutionContext().entrySet()) {
      executionMap.put(entry.getKey(), entry.getValue());
    }
    return new StepExecutionContextResponse(executionMap, jobExecutionId, stepExecutionId, stepExecution.getStepName());
  }

  /**
   * Get step execution progress status detauls.
   */
  public StepExecutionProgressResponse getStepExecutionProgress(Long jobExecutionId, Long stepExecutionId) throws NoSuchStepExecutionException, NoSuchJobExecutionException {
    StepExecution stepExecution = jobService.getStepExecution(jobExecutionId, stepExecutionId);
    StepExecutionInfoResponse stepExecutionInfoResponse = new StepExecutionInfoResponse(stepExecution, timeZone);
    String stepName = stepExecution.getStepName();
    if (stepName.contains(":partition")) {
      stepName = stepName.replaceAll("(:partition).*", "$1*");
    }
    String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
    StepExecutionHistory stepExecutionHistory = computeHistory(jobName, stepName);
    StepExecutionProgress stepExecutionProgress = new StepExecutionProgress(stepExecution, stepExecutionHistory);

    return new StepExecutionProgressResponse(stepExecutionProgress, stepExecutionHistory, stepExecutionInfoResponse);

  }

  private StepExecutionHistory computeHistory(String jobName, String stepName) {
    int total = jobService.countStepExecutionsForStep(jobName, stepName);
    StepExecutionHistory stepExecutionHistory = new StepExecutionHistory(stepName);
    for (int i = 0; i < total; i += 1000) {
      for (StepExecution stepExecution : jobService.listStepExecutionsForStep(jobName, stepName, i, 1000)) {
        stepExecutionHistory.append(stepExecution);
      }
    }
    return stepExecutionHistory;
  }
}
