/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.oozie.ambari.view;

import static org.apache.oozie.ambari.view.Constants.MESSAGE_KEY;
import static org.apache.oozie.ambari.view.Constants.STATUS_KEY;
import static org.apache.oozie.ambari.view.Constants.STATUS_OK;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;

import org.apache.ambari.view.ViewContext;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.oozie.ambari.view.assets.AssetResource;
import org.apache.oozie.ambari.view.exception.ErrorCode;
import org.apache.oozie.ambari.view.exception.WfmException;
import org.apache.oozie.ambari.view.exception.WfmWebException;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowManagerService;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowsManagerResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Singleton;

/**
 * This is a class used to bridge the communication between the and the Oozie
 * API executing inside ambari.
 */
@Singleton
public class OozieProxyImpersonator {
  private final static Logger LOGGER = LoggerFactory
    .getLogger(OozieProxyImpersonator.class);
  private static final boolean PROJ_MANAGER_ENABLED = true;
  public static final String RESPONSE_TYPE = "response-type";
  public static final String OLDER_FORMAT_DRAFT_INGORED = "olderFormatDraftIngored";

  private final ViewContext viewContext;
  private final Utils utils = new Utils();


  private final HDFSFileUtils hdfsFileUtils;
  private final WorkflowFilesService workflowFilesService;
  private WorkflowManagerService workflowManagerService;

  private final OozieDelegate oozieDelegate;
  private final OozieUtils oozieUtils = new OozieUtils();
  private final AssetResource assetResource;


  private static enum WorkflowFormat{
    XML("xml"),
    DRAFT("draft");
    String value;
    WorkflowFormat(String value) {
      this.value=value;
    }

    public String getValue() {
      return value;
    }
  }
  @Inject
  public OozieProxyImpersonator(ViewContext viewContext) {
    this.viewContext = viewContext;
    hdfsFileUtils = new HDFSFileUtils(viewContext);
    workflowFilesService = new WorkflowFilesService(hdfsFileUtils);
    this.oozieDelegate = new OozieDelegate(viewContext);
    assetResource = new AssetResource(viewContext);
    if (PROJ_MANAGER_ENABLED) {
      workflowManagerService = new WorkflowManagerService(viewContext);
    }

    LOGGER.info(String.format(
      "OozieProxyImpersonator initialized for instance: %s",
      viewContext.getInstanceName()));

  }

  @GET
  @Path("hdfsCheck")
  public Response hdfsCheck(){
    try {
      hdfsFileUtils.hdfsCheck();
      return Response.ok().build();
    }catch (Exception ex){
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @GET
  @Path("homeDirCheck")
  public Response homeDirCheck(){
    try{
      hdfsFileUtils.homeDirCheck();
      return Response.ok().build();
    }catch (Exception ex){
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @Path("/fileServices")
  public FileServices fileServices() {
    return new FileServices(viewContext);
  }

  @Path("/wfprojects")
  public WorkflowsManagerResource workflowsManagerResource() {
    return new WorkflowsManagerResource(viewContext);
  }

  @Path("/assets")
  public AssetResource assetResource() {
    return this.assetResource;
  }

  @GET
  @Path("/getCurrentUserName")
  public Response getCurrentUserName() {
    return Response.ok(viewContext.getUsername()).build();
  }

  @GET
  @Path("/getWorkflowManagerConfigs")
  public Response getWorkflowConfigs() {
    try {
      HashMap<String, String> workflowConfigs = new HashMap<String, String>();
      workflowConfigs.put("nameNode", viewContext.getProperties().get("webhdfs.url"));
      workflowConfigs.put("resourceManager", viewContext.getProperties().get("yarn.resourcemanager.address"));
      workflowConfigs.put("userName", viewContext.getUsername());
      workflowConfigs.put("checkHomeDir",hdfsFileUtils.shouldCheckForHomeDir().toString());
      return Response.ok(workflowConfigs).build();
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      throw new WfmWebException(e);
    }
  }

  @POST
  @Path("/submitJob")
  @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
  public Response submitJob(String postBody, @Context HttpHeaders headers,
                            @Context UriInfo ui, @QueryParam("app.path") String appPath,
                            @QueryParam("projectId") String projectId,
                            @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
                            @QueryParam("description") String description,
                            @QueryParam("jobType") String jobTypeString) {
    LOGGER.info("submit workflow job called");
    JobType jobType = JobType.valueOf(jobTypeString);
    if (StringUtils.isEmpty(appPath)) {
      throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
    }
    appPath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
    try {
      if (!overwrite) {
        boolean fileExists = hdfsFileUtils.fileExists(appPath);
        if (fileExists) {
          throw new WfmWebException(ErrorCode.WORKFLOW_PATH_EXISTS);
        }
      }
      postBody = utils.formatXml(postBody);

      String filePath = workflowFilesService.createFile(appPath, postBody, overwrite);
      LOGGER.info(String.format("submit workflow job done. filePath=[%s]", filePath));

      if (PROJ_MANAGER_ENABLED) {
        String name = oozieUtils.deduceWorkflowNameFromXml(postBody);
        workflowManagerService.saveWorkflow(projectId, appPath, jobType,
          null, viewContext.getUsername(), name);
      }
      String response = oozieDelegate.submitWorkflowJobToOozie(headers,
        appPath, ui.getQueryParameters(), jobType);
      return Response.status(Status.OK).entity(response).build();
    } catch (WfmWebException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw ex;
    } catch(WfmException ex){
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex,ex.getErrorCode());
    } catch(Exception ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @POST
  @Path("/saveWorkflow")
  @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
  public Response saveWorkflow(String postBody, @Context HttpHeaders headers,
                               @Context UriInfo ui, @QueryParam("app.path") String appPath,
                               @QueryParam("jobType") String jobTypeStr,
                               @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
    LOGGER.info("save workflow  called");
    if (StringUtils.isEmpty(appPath)) {
      throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
    }
    JobType jobType = StringUtils.isEmpty(jobTypeStr) ? JobType.WORKFLOW : JobType.valueOf(jobTypeStr);
    String workflowFilePath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
    try {
      if (!overwrite) {
        boolean fileExists = hdfsFileUtils.fileExists(workflowFilePath);
        if (fileExists) {
          throw new WfmWebException(ErrorCode.WORKFLOW_PATH_EXISTS);
        }
      }
      if (utils.isXml(postBody)) {
        saveWorkflowXml(jobType, appPath, postBody, overwrite);
      } else {
        saveDraft(jobType, appPath, postBody, overwrite);
      }
      if (PROJ_MANAGER_ENABLED) {
        workflowManagerService.saveWorkflow(null, workflowFilePath, jobType, null,
          viewContext.getUsername(), getWorkflowName(postBody));
      }
    } catch (WfmWebException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw ex;
    } catch (Exception ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
    return Response.ok().build();
  }

  private String getWorkflowName(String postBody) {
    if (utils.isXml(postBody)) {
      return oozieUtils.deduceWorkflowNameFromXml(postBody);
    } else {
      return oozieUtils.deduceWorkflowNameFromJson(postBody);
    }
  }

  private void saveWorkflowXml(JobType jobType, String appPath, String postBody,
                               Boolean overwrite) throws IOException {
    appPath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
    postBody = utils.formatXml(postBody);
    workflowFilesService.createFile(appPath, postBody, overwrite);
    String workflowDraftPath = workflowFilesService.getWorkflowDraftFileName(appPath.trim(), jobType);
    if (hdfsFileUtils.fileExists(workflowDraftPath)) {
      hdfsFileUtils.deleteFile(workflowDraftPath);
    }
  }

  private void saveDraft(JobType jobType, String appPath, String postBody, Boolean overwrite) throws IOException {
    String workflowFilePath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
    if (!hdfsFileUtils.fileExists(workflowFilePath)) {
      String noOpWorkflow = oozieUtils.getNoOpWorkflowXml(postBody, jobType);
      workflowFilesService.createFile(workflowFilePath, noOpWorkflow, overwrite);
    }
    String workflowDraftPath = workflowFilesService.getWorkflowDraftFileName(appPath.trim(), jobType);
    workflowFilesService.createFile(workflowDraftPath, postBody, true);
  }

  @POST
  @Path("/publishAsset")
  @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
  public Response publishAsset(String postBody, @Context HttpHeaders headers,
                               @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath,
                               @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
    LOGGER.info("publish asset called");
    if (StringUtils.isEmpty(uploadPath)) {
      throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
    }
    uploadPath = uploadPath.trim();
    try {
      Map<String, String> validateAsset = assetResource.validateAsset(headers, postBody,
        ui.getQueryParameters());
      if (!STATUS_OK.equals(validateAsset.get(STATUS_KEY))) {
        WfmWebException wfmEx=new WfmWebException(ErrorCode.INVALID_ASSET_INPUT);
        wfmEx.setAdditionalDetail(validateAsset.get(MESSAGE_KEY));
        throw wfmEx;
      }
      return saveAsset(postBody, uploadPath, overwrite);
    } catch (WfmWebException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw ex;
    } catch (Exception ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  private Response saveAsset(String postBody, String uploadPath, Boolean overwrite) throws IOException {
    uploadPath = workflowFilesService.getAssetFileName(uploadPath);
    if (!overwrite) {
      boolean fileExists = hdfsFileUtils.fileExists(uploadPath);
      if (fileExists) {
        throw new WfmWebException(ErrorCode.WORKFLOW_PATH_EXISTS);
      }
    }
    postBody = utils.formatXml(postBody);
    String filePath = workflowFilesService.createAssetFile(uploadPath, postBody, overwrite);
    LOGGER.info(String.format("publish asset job done. filePath=[%s]", filePath));
    return Response.ok().build();
  }

  @GET
  @Path("/readAsset")
  public Response readAsset(@QueryParam("assetPath") String assetPath) {
    if (StringUtils.isEmpty(assetPath)) {
      throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
    }
    try {
      final InputStream is = workflowFilesService.readAssset(assetPath);
      StreamingOutput streamer = utils.streamResponse(is);
      return Response.ok(streamer).status(200).build();
    } catch (IOException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @GET
  @Path("/readWorkflowDraft")
  public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) {
    if (StringUtils.isEmpty(workflowPath)) {
      throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
    }
    try {
      final InputStream is = workflowFilesService.readDraft(workflowPath);
      StreamingOutput streamer = utils.streamResponse(is);
      return Response.ok(streamer).status(200).build();
    } catch (IOException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @POST
  @Path("/discardWorkflowDraft")
  public Response discardDraft(
    @QueryParam("workflowXmlPath") String workflowPath) {
    try {
      workflowFilesService.discardDraft(workflowPath);
      return Response.ok().build();
    } catch (IOException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @GET
  @Path("/readWorkflow")
  public Response readWorkflow(
    @QueryParam("workflowPath") String workflowPath, @QueryParam("jobType") String jobTypeStr) {
    try {
      String workflowFileName = workflowFilesService.getWorkflowFileName(workflowPath, JobType.valueOf(jobTypeStr));
      if (!hdfsFileUtils.fileExists(workflowFileName)) {
        throw new WfmWebException(ErrorCode.WORKFLOW_XML_DOES_NOT_EXIST);
      }
      WorkflowFileInfo workflowDetails = workflowFilesService
        .getWorkflowDetails(workflowPath, JobType.valueOf(jobTypeStr));
      if (workflowPath.endsWith(Constants.WF_DRAFT_EXTENSION) || workflowDetails.getIsDraftCurrent()) {
        String filePath = workflowFilesService.getWorkflowDraftFileName(workflowPath, JobType.valueOf(jobTypeStr));

        InputStream inputStream = workflowFilesService.readWorkflowXml(filePath);
        String stringResponse = IOUtils.toString(inputStream);
        if (!workflowFilesService.isDraftFormatCurrent(stringResponse)) {
          filePath = workflowFilesService.getWorkflowFileName(workflowPath, JobType.valueOf(jobTypeStr));
          return getWorkflowResponse(filePath, WorkflowFormat.XML.getValue(), true);
        } else {
          return Response.ok(stringResponse).header(RESPONSE_TYPE, WorkflowFormat.DRAFT.getValue()).build();
        }
      } else {
        String filePath = workflowFilesService.getWorkflowFileName(workflowPath, JobType.valueOf(jobTypeStr));
        return getWorkflowResponse(filePath, WorkflowFormat.XML.getValue(), false);
      }
    } catch (WfmWebException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw ex;
    } catch (Exception ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  private Response getWorkflowResponse(String filePath, String responseType,
                                       boolean olderFormatDraftIngored) throws IOException {
    final InputStream is = workflowFilesService.readWorkflowXml(filePath);
    StreamingOutput streamer = utils.streamResponse(is);
    Response.ResponseBuilder responseBuilder = Response.ok(streamer).header(RESPONSE_TYPE, responseType);
    if (olderFormatDraftIngored) {
      responseBuilder.header(OLDER_FORMAT_DRAFT_INGORED, Boolean.TRUE.toString());
    }
    return responseBuilder.build();

  }

  @GET
  @Path("/readWorkflowXml")
  public Response readWorkflowXml(
    @QueryParam("workflowXmlPath") String workflowPath,@QueryParam("jobType") String jobTypeStr) {
    if (StringUtils.isEmpty(workflowPath)) {
      throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
    }
    try {
      if (!hdfsFileUtils.fileExists(workflowPath)) {
        throw new WfmWebException(ErrorCode.WORKFLOW_XML_DOES_NOT_EXIST);
      }
      final InputStream is = workflowFilesService.readWorkflowXml(workflowPath);
      StreamingOutput streamer = utils.streamResponse(is);
      return Response.ok(streamer).status(200).build();
    } catch (WfmWebException ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw ex;
    } catch (Exception ex) {
      LOGGER.error(ex.getMessage(),ex);
      throw new WfmWebException(ex);
    }
  }

  @GET
  @Path("/{path: .*}")
  public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) {
    try {
      return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
        .getPath(), ui.getQueryParameters(), HttpMethod.GET, null);
    } catch (Exception ex) {
      LOGGER.error("Error in GET proxy", ex);
      throw new WfmWebException(ex);
    }
  }

  @POST
  @Path("/{path: .*}")
  public Response handlePost(String xml, @Context HttpHeaders headers,
                             @Context UriInfo ui) {
    try {
      return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
        .getPath(), ui.getQueryParameters(), HttpMethod.POST, xml);
    } catch (Exception ex) {
      LOGGER.error("Error in POST proxy", ex);
      throw new WfmWebException(ex);
    }
  }

  @DELETE
  @Path("/{path: .*}")
  public Response handleDelete(@Context HttpHeaders headers,
                               @Context UriInfo ui) {
    try {
      return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
        .getPath(), ui.getQueryParameters(), HttpMethod.POST, null);
    } catch (Exception ex) {
      LOGGER.error("Error in DELETE proxy", ex);
      throw new WfmWebException(ex);
    }
  }

  @PUT
  @Path("/{path: .*}")
  public Response handlePut(String body, @Context HttpHeaders headers,
                            @Context UriInfo ui) {
    try {
      return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
        .getPath(), ui.getQueryParameters(), HttpMethod.PUT, body);
    } catch (Exception ex) {
      LOGGER.error("Error in PUT proxy", ex);
      throw new WfmWebException(ex);
    }
  }
}
