blob: 8d2b5a585cb64e02b97085ce3022ad27c98a48a9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.ambari.view;
import static org.apache.oozie.ambari.view.Constants.MESSAGE_KEY;
import static org.apache.oozie.ambari.view.Constants.STATUS_KEY;
import static org.apache.oozie.ambari.view.Constants.STATUS_OK;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.ambari.view.ViewContext;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.oozie.ambari.view.assets.AssetResource;
import org.apache.oozie.ambari.view.exception.ErrorCode;
import org.apache.oozie.ambari.view.exception.WfmException;
import org.apache.oozie.ambari.view.exception.WfmWebException;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowManagerService;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowsManagerResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Singleton;
/**
* This is a class used to bridge the communication between the and the Oozie
* API executing inside ambari.
*/
@Singleton
public class OozieProxyImpersonator {
private final static Logger LOGGER = LoggerFactory
.getLogger(OozieProxyImpersonator.class);
private static final boolean PROJ_MANAGER_ENABLED = true;
public static final String RESPONSE_TYPE = "response-type";
public static final String OLDER_FORMAT_DRAFT_INGORED = "olderFormatDraftIngored";
private final ViewContext viewContext;
private final Utils utils = new Utils();
private final HDFSFileUtils hdfsFileUtils;
private final WorkflowFilesService workflowFilesService;
private WorkflowManagerService workflowManagerService;
private final OozieDelegate oozieDelegate;
private final OozieUtils oozieUtils = new OozieUtils();
private final AssetResource assetResource;
private static enum WorkflowFormat{
XML("xml"),
DRAFT("draft");
String value;
WorkflowFormat(String value) {
this.value=value;
}
public String getValue() {
return value;
}
}
@Inject
public OozieProxyImpersonator(ViewContext viewContext) {
this.viewContext = viewContext;
hdfsFileUtils = new HDFSFileUtils(viewContext);
workflowFilesService = new WorkflowFilesService(hdfsFileUtils);
this.oozieDelegate = new OozieDelegate(viewContext);
assetResource = new AssetResource(viewContext);
if (PROJ_MANAGER_ENABLED) {
workflowManagerService = new WorkflowManagerService(viewContext);
}
LOGGER.info(String.format(
"OozieProxyImpersonator initialized for instance: %s",
viewContext.getInstanceName()));
}
@GET
@Path("hdfsCheck")
public Response hdfsCheck(){
try {
hdfsFileUtils.hdfsCheck();
return Response.ok().build();
}catch (Exception ex){
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@GET
@Path("homeDirCheck")
public Response homeDirCheck(){
try{
hdfsFileUtils.homeDirCheck();
return Response.ok().build();
}catch (Exception ex){
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@Path("/fileServices")
public FileServices fileServices() {
return new FileServices(viewContext);
}
@Path("/wfprojects")
public WorkflowsManagerResource workflowsManagerResource() {
return new WorkflowsManagerResource(viewContext);
}
@Path("/assets")
public AssetResource assetResource() {
return this.assetResource;
}
@GET
@Path("/getCurrentUserName")
public Response getCurrentUserName() {
return Response.ok(viewContext.getUsername()).build();
}
@GET
@Path("/getWorkflowManagerConfigs")
public Response getWorkflowConfigs() {
try {
HashMap<String, String> workflowConfigs = new HashMap<String, String>();
workflowConfigs.put("nameNode", viewContext.getProperties().get("webhdfs.url"));
workflowConfigs.put("resourceManager", viewContext.getProperties().get("yarn.resourcemanager.address"));
workflowConfigs.put("userName", viewContext.getUsername());
workflowConfigs.put("checkHomeDir",hdfsFileUtils.shouldCheckForHomeDir().toString());
return Response.ok(workflowConfigs).build();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new WfmWebException(e);
}
}
@POST
@Path("/submitJob")
@Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
public Response submitJob(String postBody, @Context HttpHeaders headers,
@Context UriInfo ui, @QueryParam("app.path") String appPath,
@QueryParam("projectId") String projectId,
@DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
@QueryParam("description") String description,
@QueryParam("jobType") String jobTypeString) {
LOGGER.info("submit workflow job called");
JobType jobType = JobType.valueOf(jobTypeString);
if (StringUtils.isEmpty(appPath)) {
throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
}
appPath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
try {
if (!overwrite) {
boolean fileExists = hdfsFileUtils.fileExists(appPath);
if (fileExists) {
throw new WfmWebException(ErrorCode.WORKFLOW_PATH_EXISTS);
}
}
postBody = utils.formatXml(postBody);
String filePath = workflowFilesService.createFile(appPath, postBody, overwrite);
LOGGER.info(String.format("submit workflow job done. filePath=[%s]", filePath));
if (PROJ_MANAGER_ENABLED) {
String name = oozieUtils.deduceWorkflowNameFromXml(postBody);
workflowManagerService.saveWorkflow(projectId, appPath, jobType,
null, viewContext.getUsername(), name);
}
String response = oozieDelegate.submitWorkflowJobToOozie(headers,
appPath, ui.getQueryParameters(), jobType);
return Response.status(Status.OK).entity(response).build();
} catch (WfmWebException ex) {
LOGGER.error(ex.getMessage(),ex);
throw ex;
} catch(WfmException ex){
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex,ex.getErrorCode());
} catch(Exception ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@POST
@Path("/saveWorkflow")
@Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
public Response saveWorkflow(String postBody, @Context HttpHeaders headers,
@Context UriInfo ui, @QueryParam("app.path") String appPath,
@QueryParam("jobType") String jobTypeStr,
@DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
LOGGER.info("save workflow called");
if (StringUtils.isEmpty(appPath)) {
throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
}
JobType jobType = StringUtils.isEmpty(jobTypeStr) ? JobType.WORKFLOW : JobType.valueOf(jobTypeStr);
String workflowFilePath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
try {
if (!overwrite) {
boolean fileExists = hdfsFileUtils.fileExists(workflowFilePath);
if (fileExists) {
throw new WfmWebException(ErrorCode.WORKFLOW_PATH_EXISTS);
}
}
if (utils.isXml(postBody)) {
saveWorkflowXml(jobType, appPath, postBody, overwrite);
} else {
saveDraft(jobType, appPath, postBody, overwrite);
}
if (PROJ_MANAGER_ENABLED) {
workflowManagerService.saveWorkflow(null, workflowFilePath, jobType, null,
viewContext.getUsername(), getWorkflowName(postBody));
}
} catch (WfmWebException ex) {
LOGGER.error(ex.getMessage(),ex);
throw ex;
} catch (Exception ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
return Response.ok().build();
}
private String getWorkflowName(String postBody) {
if (utils.isXml(postBody)) {
return oozieUtils.deduceWorkflowNameFromXml(postBody);
} else {
return oozieUtils.deduceWorkflowNameFromJson(postBody);
}
}
private void saveWorkflowXml(JobType jobType, String appPath, String postBody,
Boolean overwrite) throws IOException {
appPath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
postBody = utils.formatXml(postBody);
workflowFilesService.createFile(appPath, postBody, overwrite);
String workflowDraftPath = workflowFilesService.getWorkflowDraftFileName(appPath.trim(), jobType);
if (hdfsFileUtils.fileExists(workflowDraftPath)) {
hdfsFileUtils.deleteFile(workflowDraftPath);
}
}
private void saveDraft(JobType jobType, String appPath, String postBody, Boolean overwrite) throws IOException {
String workflowFilePath = workflowFilesService.getWorkflowFileName(appPath.trim(), jobType);
if (!hdfsFileUtils.fileExists(workflowFilePath)) {
String noOpWorkflow = oozieUtils.getNoOpWorkflowXml(postBody, jobType);
workflowFilesService.createFile(workflowFilePath, noOpWorkflow, overwrite);
}
String workflowDraftPath = workflowFilesService.getWorkflowDraftFileName(appPath.trim(), jobType);
workflowFilesService.createFile(workflowDraftPath, postBody, true);
}
@POST
@Path("/publishAsset")
@Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
public Response publishAsset(String postBody, @Context HttpHeaders headers,
@Context UriInfo ui, @QueryParam("uploadPath") String uploadPath,
@DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
LOGGER.info("publish asset called");
if (StringUtils.isEmpty(uploadPath)) {
throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
}
uploadPath = uploadPath.trim();
try {
Map<String, String> validateAsset = assetResource.validateAsset(headers, postBody,
ui.getQueryParameters());
if (!STATUS_OK.equals(validateAsset.get(STATUS_KEY))) {
WfmWebException wfmEx=new WfmWebException(ErrorCode.INVALID_ASSET_INPUT);
wfmEx.setAdditionalDetail(validateAsset.get(MESSAGE_KEY));
throw wfmEx;
}
return saveAsset(postBody, uploadPath, overwrite);
} catch (WfmWebException ex) {
LOGGER.error(ex.getMessage(),ex);
throw ex;
} catch (Exception ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
private Response saveAsset(String postBody, String uploadPath, Boolean overwrite) throws IOException {
uploadPath = workflowFilesService.getAssetFileName(uploadPath);
if (!overwrite) {
boolean fileExists = hdfsFileUtils.fileExists(uploadPath);
if (fileExists) {
throw new WfmWebException(ErrorCode.WORKFLOW_PATH_EXISTS);
}
}
postBody = utils.formatXml(postBody);
String filePath = workflowFilesService.createAssetFile(uploadPath, postBody, overwrite);
LOGGER.info(String.format("publish asset job done. filePath=[%s]", filePath));
return Response.ok().build();
}
@GET
@Path("/readAsset")
public Response readAsset(@QueryParam("assetPath") String assetPath) {
if (StringUtils.isEmpty(assetPath)) {
throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
}
try {
final InputStream is = workflowFilesService.readAssset(assetPath);
StreamingOutput streamer = utils.streamResponse(is);
return Response.ok(streamer).status(200).build();
} catch (IOException ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@GET
@Path("/readWorkflowDraft")
public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) {
if (StringUtils.isEmpty(workflowPath)) {
throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
}
try {
final InputStream is = workflowFilesService.readDraft(workflowPath);
StreamingOutput streamer = utils.streamResponse(is);
return Response.ok(streamer).status(200).build();
} catch (IOException ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@POST
@Path("/discardWorkflowDraft")
public Response discardDraft(
@QueryParam("workflowXmlPath") String workflowPath) {
try {
workflowFilesService.discardDraft(workflowPath);
return Response.ok().build();
} catch (IOException ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@GET
@Path("/readWorkflow")
public Response readWorkflow(
@QueryParam("workflowPath") String workflowPath, @QueryParam("jobType") String jobTypeStr) {
try {
String workflowFileName = workflowFilesService.getWorkflowFileName(workflowPath, JobType.valueOf(jobTypeStr));
if (!hdfsFileUtils.fileExists(workflowFileName)) {
throw new WfmWebException(ErrorCode.WORKFLOW_XML_DOES_NOT_EXIST);
}
WorkflowFileInfo workflowDetails = workflowFilesService
.getWorkflowDetails(workflowPath, JobType.valueOf(jobTypeStr));
if (workflowPath.endsWith(Constants.WF_DRAFT_EXTENSION) || workflowDetails.getIsDraftCurrent()) {
String filePath = workflowFilesService.getWorkflowDraftFileName(workflowPath, JobType.valueOf(jobTypeStr));
InputStream inputStream = workflowFilesService.readWorkflowXml(filePath);
String stringResponse = IOUtils.toString(inputStream);
if (!workflowFilesService.isDraftFormatCurrent(stringResponse)) {
filePath = workflowFilesService.getWorkflowFileName(workflowPath, JobType.valueOf(jobTypeStr));
return getWorkflowResponse(filePath, WorkflowFormat.XML.getValue(), true);
} else {
return Response.ok(stringResponse).header(RESPONSE_TYPE, WorkflowFormat.DRAFT.getValue()).build();
}
} else {
String filePath = workflowFilesService.getWorkflowFileName(workflowPath, JobType.valueOf(jobTypeStr));
return getWorkflowResponse(filePath, WorkflowFormat.XML.getValue(), false);
}
} catch (WfmWebException ex) {
LOGGER.error(ex.getMessage(),ex);
throw ex;
} catch (Exception ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
private Response getWorkflowResponse(String filePath, String responseType,
boolean olderFormatDraftIngored) throws IOException {
final InputStream is = workflowFilesService.readWorkflowXml(filePath);
StreamingOutput streamer = utils.streamResponse(is);
Response.ResponseBuilder responseBuilder = Response.ok(streamer).header(RESPONSE_TYPE, responseType);
if (olderFormatDraftIngored) {
responseBuilder.header(OLDER_FORMAT_DRAFT_INGORED, Boolean.TRUE.toString());
}
return responseBuilder.build();
}
@GET
@Path("/readWorkflowXml")
public Response readWorkflowXml(
@QueryParam("workflowXmlPath") String workflowPath,@QueryParam("jobType") String jobTypeStr) {
if (StringUtils.isEmpty(workflowPath)) {
throw new WfmWebException(ErrorCode.INVALID_EMPTY_INPUT);
}
try {
if (!hdfsFileUtils.fileExists(workflowPath)) {
throw new WfmWebException(ErrorCode.WORKFLOW_XML_DOES_NOT_EXIST);
}
final InputStream is = workflowFilesService.readWorkflowXml(workflowPath);
StreamingOutput streamer = utils.streamResponse(is);
return Response.ok(streamer).status(200).build();
} catch (WfmWebException ex) {
LOGGER.error(ex.getMessage(),ex);
throw ex;
} catch (Exception ex) {
LOGGER.error(ex.getMessage(),ex);
throw new WfmWebException(ex);
}
}
@GET
@Path("/{path: .*}")
public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) {
try {
return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
.getPath(), ui.getQueryParameters(), HttpMethod.GET, null);
} catch (Exception ex) {
LOGGER.error("Error in GET proxy", ex);
throw new WfmWebException(ex);
}
}
@POST
@Path("/{path: .*}")
public Response handlePost(String xml, @Context HttpHeaders headers,
@Context UriInfo ui) {
try {
return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
.getPath(), ui.getQueryParameters(), HttpMethod.POST, xml);
} catch (Exception ex) {
LOGGER.error("Error in POST proxy", ex);
throw new WfmWebException(ex);
}
}
@DELETE
@Path("/{path: .*}")
public Response handleDelete(@Context HttpHeaders headers,
@Context UriInfo ui) {
try {
return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
.getPath(), ui.getQueryParameters(), HttpMethod.POST, null);
} catch (Exception ex) {
LOGGER.error("Error in DELETE proxy", ex);
throw new WfmWebException(ex);
}
}
@PUT
@Path("/{path: .*}")
public Response handlePut(String body, @Context HttpHeaders headers,
@Context UriInfo ui) {
try {
return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
.getPath(), ui.getQueryParameters(), HttpMethod.PUT, body);
} catch (Exception ex) {
LOGGER.error("Error in PUT proxy", ex);
throw new WfmWebException(ex);
}
}
}