blob: 6f3c4d2e02572584543728ac0f6fdfae0086d747 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.ambari.view;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.ambari.view.ViewContext;
import org.apache.commons.io.IOUtils;
import org.apache.oozie.ambari.view.exception.ErrorCode;
import org.apache.oozie.ambari.view.exception.WfmException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OozieDelegate {
private final static Logger LOGGER = LoggerFactory
.getLogger(OozieDelegate.class);
private static final String OOZIEPARAM_PREFIX = "oozieparam.";
private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX
.length();
private static final String EQUAL_SYMBOL = "=";
private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes";
private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath";
private static final String USER_NAME_HEADER = "user.name";
private static final String USER_OOZIE_SUPER = "oozie";
private static final String DO_AS_HEADER = "doAs";
private static final String SERVICE_URI_PROP = "oozie.service.uri";
private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie";
private ViewContext viewContext;
private OozieUtils oozieUtils = new OozieUtils();
private final Utils utils = new Utils();
private final AmbariIOUtil ambariIOUtil;
public OozieDelegate(ViewContext viewContext) {
super();
this.viewContext = viewContext;
this.ambariIOUtil = new AmbariIOUtil(viewContext);
}
public String submitWorkflowJobToOozie(HttpHeaders headers,
String filePath, MultivaluedMap<String, String> queryParams,
JobType jobType) {
String nameNode = viewContext.getProperties().get("webhdfs.url");
if (nameNode == null) {
LOGGER.error("Name Node couldn't be determined automatically.");
throw new RuntimeException("Name Node couldn't be determined automatically.");
}
if (!queryParams.containsKey("config.nameNode")) {
ArrayList<String> nameNodes = new ArrayList<String>();
LOGGER.info("Namenode===" + nameNode);
nameNodes.add(nameNode);
queryParams.put("config.nameNode", nameNodes);
}
Map<String, String> workflowConigs = getWorkflowConfigs(filePath,
queryParams, jobType, nameNode);
String configXMl = oozieUtils.generateConfigXml(workflowConigs);
LOGGER.info("Config xml==" + configXMl);
HashMap<String, String> customHeaders = new HashMap<String, String>();
customHeaders.put("Content-Type", "application/xml;charset=UTF-8");
Response serviceResponse = consumeService(headers, getServiceUri()
+ "/v2/jobs?" + getJobSumbitOozieParams(queryParams),
HttpMethod.POST, configXMl, customHeaders);
LOGGER.info("Resp from oozie status entity=="
+ serviceResponse.getEntity());
String oozieResp=null;
if (serviceResponse.getEntity() instanceof String) {
oozieResp= (String) serviceResponse.getEntity();
} else {
oozieResp= serviceResponse.getEntity().toString();
}
if (oozieResp != null && oozieResp.trim().startsWith("{")) {
return oozieResp;
}else{
throw new WfmException(oozieResp,ErrorCode.OOZIE_SUBMIT_ERROR);
}
}
public Response consumeService(HttpHeaders headers, String path,
MultivaluedMap<String, String> queryParameters, String method,
String body) throws Exception {
return consumeService(headers, this.buildUri(path, queryParameters),
method, body, null);
}
private Response consumeService(HttpHeaders headers, String urlToRead,
String method, String body, Map<String, String> customHeaders) {
Response response = null;
InputStream stream = readFromOozie(headers, urlToRead, method, body,
customHeaders);
String stringResponse = null;
try {
stringResponse = IOUtils.toString(stream);
} catch (IOException e) {
LOGGER.error("Error while converting stream to string", e);
throw new RuntimeException(e);
}
if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) {
response = Response.status(Response.Status.BAD_REQUEST)
.entity(stringResponse).type(MediaType.TEXT_PLAIN).build();
} else {
response = Response.status(Response.Status.OK)
.entity(stringResponse)
.type(utils.deduceType(stringResponse)).build();
}
return response;
}
public InputStream readFromOozie(HttpHeaders headers, String urlToRead,
String method, String body, Map<String, String> customHeaders) {
Map<String, String> newHeaders = utils.getHeaders(headers);
newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER);
newHeaders.put(DO_AS_HEADER, viewContext.getUsername());
newHeaders.put("Accept", MediaType.APPLICATION_JSON);
if (customHeaders != null) {
newHeaders.putAll(customHeaders);
}
LOGGER.info(String.format("Proxy request for url: [%s] %s", method,
urlToRead));
return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders);
}
private Map<String, String> getWorkflowConfigs(String filePath,
MultivaluedMap<String, String> queryParams, JobType jobType,
String nameNode) {
HashMap<String, String> workflowConigs = new HashMap<String, String>();
if (queryParams.containsKey("resourceManager")
&& "useDefault".equals(queryParams.getFirst("resourceManager"))) {
String jobTrackerNode = viewContext.getProperties()
.get("yarn.resourcemanager.address");
LOGGER.info("jobTrackerNode===" + jobTrackerNode);
workflowConigs.put("resourceManager", jobTrackerNode);
workflowConigs.put("jobTracker", jobTrackerNode);
}
if (queryParams != null) {
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
if (entry.getKey().startsWith("config.")) {
if (entry.getValue() != null && entry.getValue().size() > 0) {
workflowConigs.put(entry.getKey().substring(7), entry
.getValue().get(0));
}
}
}
}
if (queryParams.containsKey("oozieconfig.useSystemLibPath")) {
String useSystemLibPath = queryParams
.getFirst("oozieconfig.useSystemLibPath");
workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY,
useSystemLibPath);
} else {
workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true");
}
if (queryParams.containsKey("oozieconfig.rerunOnFailure")) {
String rerunFailnodes = queryParams
.getFirst("oozieconfig.rerunOnFailure");
workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY,
rerunFailnodes);
} else {
workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true");
}
workflowConigs.put("user.name", viewContext.getUsername());
workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode
+ filePath);
return workflowConigs;
}
private String getJobSumbitOozieParams(
MultivaluedMap<String, String> queryParams) {
StringBuilder query = new StringBuilder();
if (queryParams != null) {
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) {
if (entry.getValue() != null && entry.getValue().size() > 0) {
for (String val : entry.getValue()) {
query.append(
entry.getKey().substring(
OOZIEPARAM_PREFIX_LENGTH))
.append(EQUAL_SYMBOL).append(val)
.append("&");
}
}
}
}
}
return query.toString();
}
private String getServiceUri() {
String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext
.getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI;
return serviceURI;
}
private String buildUri(String absolutePath,
MultivaluedMap<String, String> queryParameters) {
int index = absolutePath.indexOf("proxy/") + 5;
absolutePath = absolutePath.substring(index);
String serviceURI = getServiceUri();
serviceURI += absolutePath;
MultivaluedMap<String, String> params = addOrReplaceUserName(queryParameters);
return serviceURI + utils.convertParamsToUrl(params);
}
private MultivaluedMap<String, String> addOrReplaceUserName(
MultivaluedMap<String, String> parameters) {
for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
if ("user.name".equals(entry.getKey())) {
ArrayList<String> vals = new ArrayList<String>(1);
vals.add(viewContext.getUsername());
entry.setValue(vals);
}
}
return parameters;
}
public String getDagUrl(String jobid) {
return getServiceUri() + "/v2/job/" + jobid + "?show=graph";
}
}